From 934b0159bbea61fee27db6cd3e672037cb526a65 Mon Sep 17 00:00:00 2001 From: TaiJuWu Date: Wed, 19 Feb 2025 00:21:38 +0800 Subject: [PATCH] KAFKA-18089: Upgrade Caffeine lib to 3.1.8 (#18004) - Fixed the RemoteIndexCacheTest that fails with caffeine > 3.1.1 Reviewers: Luke Chen , Kamal Chandraprakash --- LICENSE-binary | 2 +- .../log/remote/RemoteIndexCacheTest.scala | 92 +++++++++++++------ gradle/dependencies.gradle | 5 +- .../internals/log/RemoteIndexCache.java | 6 -- 4 files changed, 65 insertions(+), 40 deletions(-) diff --git a/LICENSE-binary b/LICENSE-binary index 7bd0bec63f8..1875770c6dd 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -205,7 +205,7 @@ This project bundles some components that are also licensed under the Apache License Version 2.0: -- caffeine-3.1.1 +- caffeine-3.1.8 - commons-beanutils-1.9.4 - commons-collections-3.2.2 - commons-digester-2.1 diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala index 78d67e6a2e8..1ba2b508bf6 100644 --- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala +++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala @@ -450,10 +450,11 @@ class RemoteIndexCacheTest { // The cache max size is 2, it will remove one entry and keep the overall size to 2 cache.getIndexEntry(metadataList(2)) assertCacheSize(2) - // Calling getIndex on the same entry should not call rsm#fetchIndex again, but it should retrieve from cache + // Calling getIndex on the same entry may call rsm#fetchIndex or not, it only depend on cache implementation so + // we only need to verify the number of calling is in our range. cache.getIndexEntry(metadataList(2)) assertCacheSize(2) - verifyFetchIndexInvocation(count = 3) + verifyFetchIndexInvocationWithRange(lower = 3, upper = 4) // Close the cache cache.close() @@ -553,29 +554,47 @@ class RemoteIndexCacheTest { @Test def testCorrectnessForCacheAndIndexFilesWhenResizeCache(): Unit = { - - def verifyEntryIsEvicted(metadataToVerify: RemoteLogSegmentMetadata, entryToVerify: Entry): Unit = { - // wait until `entryToVerify` is marked for deletion - TestUtils.waitUntilTrue(() => entryToVerify.isMarkedForCleanup, - "Failed to mark evicted cache entry for cleanup after resizing cache.") - TestUtils.waitUntilTrue(() => entryToVerify.isCleanStarted, - "Failed to cleanup evicted cache entry after resizing cache.") - // verify no index files for `entryToVerify` on remote cache dir - TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, remoteOffsetIndexFileName(metadataToVerify)).isEmpty, - s"Offset index file for evicted entry should not be present on disk at ${cache.cacheDir()}") - TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, remoteTimeIndexFileName(metadataToVerify)).isEmpty, - s"Time index file for evicted entry should not be present on disk at ${cache.cacheDir()}") - TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, remoteTransactionIndexFileName(metadataToVerify)).isEmpty, - s"Txn index file for evicted entry should not be present on disk at ${cache.cacheDir()}") - TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, remoteDeletedSuffixIndexFileName(metadataToVerify)).isEmpty, - s"Index file marked for deletion for evicted entry should not be present on disk at ${cache.cacheDir()}") + def getRemoteLogSegMetadataIsKept(metadataToVerify: List[RemoteLogSegmentMetadata]): List[RemoteLogSegmentMetadata] = { + metadataToVerify.filter(s => { cache.internalCache().asMap().containsKey(s.remoteLogSegmentId().id())}) } - def verifyEntryIsKept(metadataToVerify: RemoteLogSegmentMetadata): Unit = { - assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteOffsetIndexFileName(metadataToVerify)).isPresent) - assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteTimeIndexFileName(metadataToVerify)).isPresent) - assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteTransactionIndexFileName(metadataToVerify)).isPresent) - assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteDeletedSuffixIndexFileName(metadataToVerify)).isEmpty) + def verifyEntryIsEvicted(metadataToVerify: List[RemoteLogSegmentMetadata], entriesToVerify: List[Entry], + numOfMarkAsDeleted: Int): (List[RemoteLogSegmentMetadata], List[Entry]) = { + TestUtils.waitUntilTrue(() => entriesToVerify.count(_.isMarkedForCleanup).equals(numOfMarkAsDeleted), + "Failed to mark evicted cache entry for cleanup after resizing cache.") + + TestUtils.waitUntilTrue(() => entriesToVerify.count(_.isCleanStarted).equals(numOfMarkAsDeleted), + "Failed to cleanup evicted cache entry after resizing cache.") + + val entriesIsMarkedForCleanup = entriesToVerify.filter(_.isMarkedForCleanup) + val entriesIsCleanStarted = entriesToVerify.filter(_.isCleanStarted) + // clean up entries and clean start entries should be the same + assertTrue(entriesIsMarkedForCleanup.equals(entriesIsCleanStarted)) + + // get the logSegMetadata are evicted + val metadataDeleted = metadataToVerify.filter(s => { !cache.internalCache().asMap().containsKey(s.remoteLogSegmentId().id())}) + assertEquals(numOfMarkAsDeleted, metadataDeleted.size) + for (metadata <- metadataDeleted) { + // verify no index files for `entryToVerify` on remote cache dir + TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, remoteOffsetIndexFileName(metadata)).isEmpty, + s"Offset index file for evicted entry should not be present on disk at ${cache.cacheDir()}") + TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, remoteTimeIndexFileName(metadata)).isEmpty, + s"Time index file for evicted entry should not be present on disk at ${cache.cacheDir()}") + TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, remoteTransactionIndexFileName(metadata)).isEmpty, + s"Txn index file for evicted entry should not be present on disk at ${cache.cacheDir()}") + TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isEmpty, + s"Index file marked for deletion for evicted entry should not be present on disk at ${cache.cacheDir()}") + } + (metadataDeleted, entriesIsMarkedForCleanup) + } + + def verifyEntryIsKept(metadataToVerify: List[RemoteLogSegmentMetadata]): Unit = { + for (metadata <- metadataToVerify) { + assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteOffsetIndexFileName(metadata)).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteTimeIndexFileName(metadata)).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteTransactionIndexFileName(metadata)).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isEmpty) + } } // The test process for resizing is: put 1 entry -> evict to empty -> put 3 entries with limited capacity of 2 entries -> @@ -619,27 +638,32 @@ class RemoteIndexCacheTest { val entry0 = cache.getIndexEntry(metadataList.head) val entry1 = cache.getIndexEntry(metadataList(1)) - cache.getIndexEntry(metadataList(2)) + val entry2 = cache.getIndexEntry(metadataList(2)) + val entries = List(entry0, entry1, entry2) assertCacheSize(2) - verifyEntryIsEvicted(metadataList.head, entry0) + val (evictedSegmentMetadata, evictedEntry) = verifyEntryIsEvicted(metadataList, entries, 1) // Reduce cache capacity to only store 1 entry cache.resizeCacheSize(1 * estimateEntryBytesSize) assertCacheSize(1) - verifyEntryIsEvicted(metadataList(1), entry1) + // After resize, we need to check an entry is deleted from cache and the existing segmentMetadata + val entryInCache = entries.filterNot(evictedEntry.contains(_)) + val updatedSegmentMetadata = metadataList.filterNot(evictedSegmentMetadata.contains(_)) + verifyEntryIsEvicted(updatedSegmentMetadata, entryInCache, 1) // resize to the same size, all entries should be kept cache.resizeCacheSize(1 * estimateEntryBytesSize) + val entriesKept = getRemoteLogSegMetadataIsKept(metadataList) // verify all existing entries (`cache.getIndexEntry(metadataList(2))`) are kept - verifyEntryIsKept(metadataList(2)) + verifyEntryIsKept(entriesKept) assertCacheSize(1) // increase the size cache.resizeCacheSize(2 * estimateEntryBytesSize) - // verify all existing entries (`cache.getIndexEntry(metadataList(2))`) are kept - verifyEntryIsKept(metadataList(2)) + // verify all entries are kept + verifyEntryIsKept(entriesKept) assertCacheSize(1) } @@ -981,6 +1005,16 @@ class RemoteIndexCacheTest { } } + private def verifyFetchIndexInvocationWithRange(lower: Int, + upper: Int, + indexTypes: Seq[IndexType] = + Seq(IndexType.OFFSET, IndexType.TIMESTAMP, IndexType.TRANSACTION)): Unit = { + for (indexType <- indexTypes) { + verify(rsm, atLeast(lower)).fetchIndex(any(classOf[RemoteLogSegmentMetadata]), ArgumentMatchers.eq(indexType)) + verify(rsm, atMost(upper)).fetchIndex(any(classOf[RemoteLogSegmentMetadata]), ArgumentMatchers.eq(indexType)) + } + } + private def createTxIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata, dir: File): TransactionIndex = { val txnIdxFile = remoteTransactionIndexFile(dir, metadata) txnIdxFile.createNewFile() diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 742249948e3..d207a9b6efb 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -54,10 +54,7 @@ versions += [ apacheds: "2.0.0-M24", argparse4j: "0.7.0", bcpkix: "1.78.1", - // Version >=3.1.2 includes an improvement to prevent hash DOS attacks, - // but currently, tests are failing in >=3.1.2. Therefore, we are temporarily using version 3.1.1. - // The failing tests should be fixed under KAFKA-18089, allowing us to upgrade to >=3.1.2. - caffeine: "3.1.1", + caffeine: "3.1.8", bndlib: "7.0.0", checkstyle: project.hasProperty('checkstyleVersion') ? checkstyleVersion : "10.20.2", commonsValidator: "1.9.0", diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java index 12bb084901e..b07255fcc53 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java @@ -728,10 +728,4 @@ public class RemoteIndexCache implements Closeable { public static String remoteTransactionIndexFileName(RemoteLogSegmentMetadata remoteLogSegmentMetadata) { return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) + LogFileUtils.TXN_INDEX_FILE_SUFFIX; } - - // Visible for testing - public static String remoteDeletedSuffixIndexFileName(RemoteLogSegmentMetadata remoteLogSegmentMetadata) { - return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) + LogFileUtils.DELETED_FILE_SUFFIX; - } - } \ No newline at end of file