diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala index 4d55c30b397..78d67e6a2e8 100644 --- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala +++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala @@ -37,7 +37,7 @@ import org.mockito.invocation.InvocationOnMock import org.slf4j.{Logger, LoggerFactory} import java.io.{File, FileInputStream, IOException, PrintWriter, UncheckedIOException} -import java.nio.file.{Files, NoSuchFileException, Paths} +import java.nio.file.{Files, NoSuchFileException, Path, Paths} import java.util import java.util.concurrent.{CountDownLatch, Executors, Future, TimeUnit} import java.util.stream.Collectors @@ -67,10 +67,9 @@ class RemoteIndexCacheTest { Files.createDirectory(tpDir.toPath) val remoteLogSegmentId = RemoteLogSegmentId.generateNew(idPartition) - rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L)) - - cache = new RemoteIndexCache(defaultRemoteIndexCacheSizeBytes, rsm, tpDir.toString) - + rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset, time.milliseconds(), + brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L)) + cache = new RemoteIndexCache(defaultRemoteIndexCacheSizeBytes, rsm, logDir.toString) mockRsmFetchIndex(rsm) } @@ -176,7 +175,7 @@ class RemoteIndexCacheTest { val estimateEntryBytesSize = estimateOneEntryBytesSize() // close existing cache created in test setup before creating a new one Utils.closeQuietly(cache, "RemoteIndexCache created for unit test") - cache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, tpDir.toString) + cache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, logDir.toString) val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)) val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId) @@ -221,7 +220,7 @@ class RemoteIndexCacheTest { // close existing cache created in test setup before creating a new one Utils.closeQuietly(cache, "RemoteIndexCache created for unit test") - cache = new RemoteIndexCache(2 * estimateOneEntryBytesSize(), rsm, tpDir.toString) + cache = new RemoteIndexCache(2 * estimateOneEntryBytesSize(), rsm, logDir.toString) val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)) val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId) @@ -292,13 +291,13 @@ class RemoteIndexCacheTest { verify(cacheEntry.txnIndex).renameTo(any(classOf[File])) // verify no index files on disk - assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent, + assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isEmpty, s"Offset index file should not be present on disk at ${tpDir.toPath}") - assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, + assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isEmpty, s"Txn index file should not be present on disk at ${tpDir.toPath}") - assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, + assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isEmpty, s"Time index file should not be present on disk at ${tpDir.toPath}") - assertFalse(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isPresent, + assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isEmpty, s"Index file marked for deletion should not be present on disk at ${tpDir.toPath}") } @@ -426,7 +425,7 @@ class RemoteIndexCacheTest { val estimateEntryBytesSize = estimateOneEntryBytesSize() // close existing cache created in test setup before creating a new one Utils.closeQuietly(cache, "RemoteIndexCache created for unit test") - cache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, tpDir.toString) + cache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, logDir.toString) val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)) val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId) @@ -460,7 +459,7 @@ class RemoteIndexCacheTest { cache.close() // Reload the cache from the disk and check the cache size is same as earlier - val reloadedCache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, tpDir.toString) + val reloadedCache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, logDir.toString) assertEquals(2, reloadedCache.internalCache.asMap().size()) reloadedCache.close() @@ -540,13 +539,13 @@ class RemoteIndexCacheTest { "Failed to cleanup cache entry after resizing cache.") // verify no index files on remote cache dir - TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent, + TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isEmpty, s"Offset index file should not be present on disk at ${cache.cacheDir()}") - TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, + TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isEmpty, s"Txn index file should not be present on disk at ${cache.cacheDir()}") - TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, + TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isEmpty, s"Time index file should not be present on disk at ${cache.cacheDir()}") - TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isPresent, + TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isEmpty, s"Index file marked for deletion should not be present on disk at ${cache.cacheDir()}") assertCacheSize(0) @@ -562,13 +561,13 @@ class RemoteIndexCacheTest { TestUtils.waitUntilTrue(() => entryToVerify.isCleanStarted, "Failed to cleanup evicted cache entry after resizing cache.") // verify no index files for `entryToVerify` on remote cache dir - TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, remoteOffsetIndexFileName(metadataToVerify)).isPresent, + TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, remoteOffsetIndexFileName(metadataToVerify)).isEmpty, s"Offset index file for evicted entry should not be present on disk at ${cache.cacheDir()}") - TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, remoteTimeIndexFileName(metadataToVerify)).isPresent, + TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, remoteTimeIndexFileName(metadataToVerify)).isEmpty, s"Time index file for evicted entry should not be present on disk at ${cache.cacheDir()}") - TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, remoteTransactionIndexFileName(metadataToVerify)).isPresent, + TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, remoteTransactionIndexFileName(metadataToVerify)).isEmpty, s"Txn index file for evicted entry should not be present on disk at ${cache.cacheDir()}") - TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, remoteDeletedSuffixIndexFileName(metadataToVerify)).isPresent, + TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, remoteDeletedSuffixIndexFileName(metadataToVerify)).isEmpty, s"Index file marked for deletion for evicted entry should not be present on disk at ${cache.cacheDir()}") } @@ -576,7 +575,7 @@ class RemoteIndexCacheTest { assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteOffsetIndexFileName(metadataToVerify)).isPresent) assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteTimeIndexFileName(metadataToVerify)).isPresent) assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteTransactionIndexFileName(metadataToVerify)).isPresent) - assertTrue(!getIndexFileFromRemoteCacheDir(cache, remoteDeletedSuffixIndexFileName(metadataToVerify)).isPresent) + assertTrue(getIndexFileFromRemoteCacheDir(cache, remoteDeletedSuffixIndexFileName(metadataToVerify)).isEmpty) } // The test process for resizing is: put 1 entry -> evict to empty -> put 3 entries with limited capacity of 2 entries -> @@ -603,13 +602,13 @@ class RemoteIndexCacheTest { "Failed to cleanup cache entry after resizing cache.") // verify no index files on remote cache dir - TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent, + TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isEmpty, s"Offset index file should not be present on disk at ${cache.cacheDir()}") - TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, + TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isEmpty, s"Txn index file should not be present on disk at ${cache.cacheDir()}") - TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, + TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isEmpty, s"Time index file should not be present on disk at ${cache.cacheDir()}") - TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isPresent, + TestUtils.waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isEmpty, s"Index file marked for deletion should not be present on disk at ${cache.cacheDir()}") assertCacheSize(0) @@ -618,11 +617,11 @@ class RemoteIndexCacheTest { cache.resizeCacheSize(2 * estimateEntryBytesSize) assertCacheSize(0) - val entry0 = cache.getIndexEntry(metadataList(0)) + val entry0 = cache.getIndexEntry(metadataList.head) val entry1 = cache.getIndexEntry(metadataList(1)) cache.getIndexEntry(metadataList(2)) assertCacheSize(2) - verifyEntryIsEvicted(metadataList(0), entry0) + verifyEntryIsEvicted(metadataList.head, entry0) // Reduce cache capacity to only store 1 entry cache.resizeCacheSize(1 * estimateEntryBytesSize) @@ -680,9 +679,9 @@ class RemoteIndexCacheTest { // Create a spy Cache Entry val rlsMetadata = new RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L)) - val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) - val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) - val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME))) + val timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, new File(logDir, DIR_NAME))) + val txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, new File(logDir, DIR_NAME))) + val offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, new File(logDir, DIR_NAME))) val spyEntry = spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, txIndex)) cache.internalCache.put(rlsMetadata.remoteLogSegmentId().id(), spyEntry) @@ -823,19 +822,18 @@ class RemoteIndexCacheTest { val remoteIndexCacheDir = cache.cacheDir() val tempSuffix = ".tmptest" - def renameRemoteCacheIndexFileFromDisk(suffix: String): Unit = { - Files.walk(remoteIndexCacheDir.toPath) - .filter(Files.isRegularFile(_)) - .filter(path => path.getFileName.toString.endsWith(suffix)) - .forEach(f => Utils.atomicMoveWithFallback(f, f.resolveSibling(f.getFileName.toString.stripSuffix(tempSuffix)))) + def renameRemoteCacheIndexFileFromDisk(tmpOffsetIdxFile: Path, tmpTxnIdxFile: Path, tmpTimeIdxFile: Path): Unit = { + for (f <- Seq(tmpOffsetIdxFile, tmpTxnIdxFile, tmpTimeIdxFile)) { + Utils.atomicMoveWithFallback(f, f.resolveSibling(f.getFileName.toString.stripSuffix(tempSuffix))) + } } val entry = cache.getIndexEntry(rlsMetadata) verifyFetchIndexInvocation(count = 1) // copy files with temporary name - Files.copy(entry.offsetIndex().file().toPath, Paths.get(Utils.replaceSuffix(entry.offsetIndex().file().getPath, "", tempSuffix))) - Files.copy(entry.txnIndex().file().toPath, Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath, "", tempSuffix))) - Files.copy(entry.timeIndex().file().toPath, Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath, "", tempSuffix))) + val tmpOffsetIdxPath = Files.copy(entry.offsetIndex().file().toPath, Paths.get(Utils.replaceSuffix(entry.offsetIndex().file().getPath, "", tempSuffix))) + val tmpTxnIdxPath = Files.copy(entry.txnIndex().file().toPath, Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath, "", tempSuffix))) + val tmpTimeIdxPath = Files.copy(entry.timeIndex().file().toPath, Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath, "", tempSuffix))) cache.remove(rlsMetadata.remoteLogSegmentId().id()) @@ -846,7 +844,7 @@ class RemoteIndexCacheTest { "Failed to cleanup cache entry after invalidation") // restore index files - renameRemoteCacheIndexFileFromDisk(tempSuffix) + renameRemoteCacheIndexFileFromDisk(tmpOffsetIdxPath, tmpTxnIdxPath, tmpTimeIdxPath) // validate cache entry for the above key should be null assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id())) cache.getIndexEntry(rlsMetadata) @@ -914,16 +912,42 @@ class RemoteIndexCacheTest { "Failed to cleanup cache entry after invalidation") // verify no index files on disk - waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent, + waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isEmpty, s"Offset index file should not be present on disk at ${remoteIndexCacheDir.toPath}") - waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, + waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TXN_INDEX_FILE_SUFFIX).isEmpty, s"Txn index file should not be present on disk at ${remoteIndexCacheDir.toPath}") - waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, + waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isEmpty, s"Time index file should not be present on disk at ${remoteIndexCacheDir.toPath}") - waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isPresent, + waitUntilTrue(() => getIndexFileFromRemoteCacheDir(cache, LogFileUtils.DELETED_FILE_SUFFIX).isEmpty, s"Index file marked for deletion should not be present on disk at ${remoteIndexCacheDir.toPath}") } + @Test + def testDeleteInvalidIndexFilesOnInit(): Unit = { + val cacheDir = new File(logDir, RemoteIndexCache.DIR_NAME) + val baseOffset: Long = 100L + val uuid = Uuid.randomUuid() + + val invalidOffsetIdxFilename = "%s_%s%s%s".format(baseOffset, uuid, LogFileUtils.INDEX_FILE_SUFFIX, LogFileUtils.DELETED_FILE_SUFFIX) + val invalidOffsetIdxFile = new File(cacheDir, invalidOffsetIdxFilename) + invalidOffsetIdxFile.createNewFile() + + val invalidTimeIdxFilename = "%s_%s%s%s".format(baseOffset, uuid, LogFileUtils.TIME_INDEX_FILE_SUFFIX, ".tmp") + val invalidTimeIndexFile = new File(cacheDir, invalidTimeIdxFilename) + invalidTimeIndexFile.createNewFile() + + val rlsMetadata = new RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), baseOffset + 100, + lastOffset, time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L)) + val validOffsetIdx = createOffsetIndexForSegmentMetadata(rlsMetadata, logDir) + val validTimeIdx = createTxIndexForSegmentMetadata(rlsMetadata, logDir) + + new RemoteIndexCache(defaultRemoteIndexCacheSizeBytes, rsm, logDir.toString) + assertFalse(invalidOffsetIdxFile.exists()) + assertFalse(invalidTimeIndexFile.exists()) + assertTrue(validOffsetIdx.file().exists()) + assertTrue(validTimeIdx.file().exists()) + } + private def generateSpyCacheEntry(remoteLogSegmentId: RemoteLogSegmentId = RemoteLogSegmentId.generateNew(idPartition)): RemoteIndexCache.Entry = { val rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L)) @@ -1019,10 +1043,12 @@ class RemoteIndexCacheTest { Files.createDirectory(tpDir.toPath) val rsm = mock(classOf[RemoteStorageManager]) mockRsmFetchIndex(rsm) - val cache = new RemoteIndexCache(2L, rsm, tpDir.toString) + val cache = new RemoteIndexCache(2L, rsm, logDir.toString) val metadataList = generateRemoteLogSegmentMetadata(size = 1, tpId) val entry = cache.getIndexEntry(metadataList.head) val entrySizeInBytes = entry.entrySizeBytes() + entry.markForCleanup() + entry.cleanup() Utils.closeQuietly(cache, "RemoteIndexCache created for estimating entry size") entrySizeInBytes } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java index f974f29af5a..12bb084901e 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java @@ -267,8 +267,9 @@ public class RemoteIndexCache implements Closeable { // Delete any .deleted or .tmp files remained from the earlier run of the broker. try (Stream paths = Files.list(cacheDir.toPath())) { paths.forEach(path -> { - if (path.endsWith(LogFileUtils.DELETED_FILE_SUFFIX) || - path.endsWith(TMP_FILE_SUFFIX)) { + String filename = path.getFileName().toString(); + if (filename.endsWith(LogFileUtils.DELETED_FILE_SUFFIX) || + filename.endsWith(TMP_FILE_SUFFIX)) { try { if (Files.deleteIfExists(path)) { log.debug("Deleted file path {} on cache initialization", path);