KAFKA-18089: Upgrade Caffeine lib to 3.1.8 (#18004)

- Fixed the RemoteIndexCacheTest that fails with caffeine > 3.1.1

Reviewers: Luke Chen <showuon@gmail.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
This commit is contained in:
TaiJuWu 2025-02-19 00:21:38 +08:00 committed by GitHub
parent 4c8d96c0f0
commit 934b0159bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 65 additions and 40 deletions

View File

@ -205,7 +205,7 @@
This project bundles some components that are also licensed under the Apache
License Version 2.0:
- caffeine-3.1.1
- caffeine-3.1.8
- commons-beanutils-1.9.4
- commons-collections-3.2.2
- commons-digester-2.1

View File

@ -450,10 +450,11 @@ class RemoteIndexCacheTest {
// The cache max size is 2, it will remove one entry and keep the overall size to 2
cache.getIndexEntry(metadataList(2))
assertCacheSize(2)
// Calling getIndex on the same entry should not call rsm#fetchIndex again, but it should retrieve from cache
// Calling getIndex on the same entry may call rsm#fetchIndex or not, it only depend on cache implementation so
// we only need to verify the number of calling is in our range.
cache.getIndexEntry(metadataList(2))
assertCacheSize(2)
verifyFetchIndexInvocation(count = 3)
verifyFetchIndexInvocationWithRange(lower = 3, upper = 4)
// Close the cache
cache.close()
@ -553,29 +554,47 @@ class RemoteIndexCacheTest {
@Test
def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = {
def verifyEntryIsEvicted(metadataToVerify: RemoteLogSegmentMetadata, entryToVerify: Entry): Unit = {
// wait until `entryToVerify` is marked for deletion
TestUtils.waitUntilTrue(() => entryToVerify.isMarkedForCleanup,
"Failed to mark evicted cache entry for cleanup after resizing cache.")
TestUtils.waitUntilTrue(() => entryToVerify.isCleanStarted,
"Failed to cleanup evicted cache entry after resizing cache.")
// verify no index files for `entryToVerify` on remote cache dir
TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, remoteOffsetIndexFileName(metadataToVerify)).isEmpty,
s"Offset index file for evicted entry should not be present on disk at ${cache.cacheDir()}")
TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, remoteTimeIndexFileName(metadataToVerify)).isEmpty,
s"Time index file for evicted entry should not be present on disk at ${cache.cacheDir()}")
TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, remoteTransactionIndexFileName(metadataToVerify)).isEmpty,
s"Txn index file for evicted entry should not be present on disk at ${cache.cacheDir()}")
TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, remoteDeletedSuffixIndexFileName(metadataToVerify)).isEmpty,
s"Index file marked for deletion for evicted entry should not be present on disk at ${cache.cacheDir()}")
def getRemoteLogSegMetadataIsKept(metadataToVerify: List[RemoteLogSegmentMetadata]): List[RemoteLogSegmentMetadata] = {
metadataToVerify.filter(s => { cache.internalCache().asMap().containsKey(s.remoteLogSegmentId().id())})
}
def verifyEntryIsKept(metadataToVerify: RemoteLogSegmentMetadata): Unit = {
assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteOffsetIndexFileName(metadataToVerify)).isPresent)
assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteTimeIndexFileName(metadataToVerify)).isPresent)
assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteTransactionIndexFileName(metadataToVerify)).isPresent)
assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteDeletedSuffixIndexFileName(metadataToVerify)).isEmpty)
def verifyEntryIsEvicted(metadataToVerify: List[RemoteLogSegmentMetadata], entriesToVerify: List[Entry],
numOfMarkAsDeleted: Int): (List[RemoteLogSegmentMetadata], List[Entry]) = {
TestUtils.waitUntilTrue(() => entriesToVerify.count(_.isMarkedForCleanup).equals(numOfMarkAsDeleted),
"Failed to mark evicted cache entry for cleanup after resizing cache.")
TestUtils.waitUntilTrue(() => entriesToVerify.count(_.isCleanStarted).equals(numOfMarkAsDeleted),
"Failed to cleanup evicted cache entry after resizing cache.")
val entriesIsMarkedForCleanup = entriesToVerify.filter(_.isMarkedForCleanup)
val entriesIsCleanStarted = entriesToVerify.filter(_.isCleanStarted)
// clean up entries and clean start entries should be the same
assertTrue(entriesIsMarkedForCleanup.equals(entriesIsCleanStarted))
// get the logSegMetadata are evicted
val metadataDeleted = metadataToVerify.filter(s => { !cache.internalCache().asMap().containsKey(s.remoteLogSegmentId().id())})
assertEquals(numOfMarkAsDeleted, metadataDeleted.size)
for (metadata <- metadataDeleted) {
// verify no index files for `entryToVerify` on remote cache dir
TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, remoteOffsetIndexFileName(metadata)).isEmpty,
s"Offset index file for evicted entry should not be present on disk at ${cache.cacheDir()}")
TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, remoteTimeIndexFileName(metadata)).isEmpty,
s"Time index file for evicted entry should not be present on disk at ${cache.cacheDir()}")
TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, remoteTransactionIndexFileName(metadata)).isEmpty,
s"Txn index file for evicted entry should not be present on disk at ${cache.cacheDir()}")
TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isEmpty,
s"Index file marked for deletion for evicted entry should not be present on disk at ${cache.cacheDir()}")
}
(metadataDeleted, entriesIsMarkedForCleanup)
}
def verifyEntryIsKept(metadataToVerify: List[RemoteLogSegmentMetadata]): Unit = {
for (metadata <- metadataToVerify) {
assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteOffsetIndexFileName(metadata)).isPresent)
assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteTimeIndexFileName(metadata)).isPresent)
assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteTransactionIndexFileName(metadata)).isPresent)
assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isEmpty)
}
}
// The test process for resizing is: put 1 entry -> evict to empty -> put 3 entries with limited capacity of 2 entries ->
@ -619,27 +638,32 @@ class RemoteIndexCacheTest {
val entry0 = cache.getIndexEntry(metadataList.head)
val entry1 = cache.getIndexEntry(metadataList(1))
cache.getIndexEntry(metadataList(2))
val entry2 = cache.getIndexEntry(metadataList(2))
val entries = List(entry0, entry1, entry2)
assertCacheSize(2)
verifyEntryIsEvicted(metadataList.head, entry0)
val (evictedSegmentMetadata, evictedEntry) = verifyEntryIsEvicted(metadataList, entries, 1)
// Reduce cache capacity to only store 1 entry
cache.resizeCacheSize(1 * estimateEntryBytesSize)
assertCacheSize(1)
verifyEntryIsEvicted(metadataList(1), entry1)
// After resize, we need to check an entry is deleted from cache and the existing segmentMetadata
val entryInCache = entries.filterNot(evictedEntry.contains(_))
val updatedSegmentMetadata = metadataList.filterNot(evictedSegmentMetadata.contains(_))
verifyEntryIsEvicted(updatedSegmentMetadata, entryInCache, 1)
// resize to the same size, all entries should be kept
cache.resizeCacheSize(1 * estimateEntryBytesSize)
val entriesKept = getRemoteLogSegMetadataIsKept(metadataList)
// verify all existing entries (`cache.getIndexEntry(metadataList(2))`) are kept
verifyEntryIsKept(metadataList(2))
verifyEntryIsKept(entriesKept)
assertCacheSize(1)
// increase the size
cache.resizeCacheSize(2 * estimateEntryBytesSize)
// verify all existing entries (`cache.getIndexEntry(metadataList(2))`) are kept
verifyEntryIsKept(metadataList(2))
// verify all entries are kept
verifyEntryIsKept(entriesKept)
assertCacheSize(1)
}
@ -981,6 +1005,16 @@ class RemoteIndexCacheTest {
}
}
private def verifyFetchIndexInvocationWithRange(lower: Int,
upper: Int,
indexTypes: Seq[IndexType] =
Seq(IndexType.OFFSET, IndexType.TIMESTAMP, IndexType.TRANSACTION)): Unit = {
for (indexType <- indexTypes) {
verify(rsm, atLeast(lower)).fetchIndex(any(classOf[RemoteLogSegmentMetadata]), ArgumentMatchers.eq(indexType))
verify(rsm, atMost(upper)).fetchIndex(any(classOf[RemoteLogSegmentMetadata]), ArgumentMatchers.eq(indexType))
}
}
private def createTxIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata, dir: File): TransactionIndex = {
val txnIdxFile = remoteTransactionIndexFile(dir, metadata)
txnIdxFile.createNewFile()

View File

@ -54,10 +54,7 @@ versions += [
apacheds: "2.0.0-M24",
argparse4j: "0.7.0",
bcpkix: "1.78.1",
// Version >=3.1.2 includes an improvement to prevent hash DOS attacks,
// but currently, tests are failing in >=3.1.2. Therefore, we are temporarily using version 3.1.1.
// The failing tests should be fixed under KAFKA-18089, allowing us to upgrade to >=3.1.2.
caffeine: "3.1.1",
caffeine: "3.1.8",
bndlib: "7.0.0",
checkstyle: project.hasProperty('checkstyleVersion') ? checkstyleVersion : "10.20.2",
commonsValidator: "1.9.0",

View File

@ -728,10 +728,4 @@ public class RemoteIndexCache implements Closeable {
public static String remoteTransactionIndexFileName(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) + LogFileUtils.TXN_INDEX_FILE_SUFFIX;
}
// Visible for testing
public static String remoteDeletedSuffixIndexFileName(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) + LogFileUtils.DELETED_FILE_SUFFIX;
}
}