diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala index 93e3f01511f..e82c35ffb97 100644 --- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala +++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala @@ -17,23 +17,26 @@ package kafka.log.remote import kafka.utils.TestUtils +import kafka.utils.TestUtils.waitUntilTrue import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteResourceNotFoundException, RemoteStorageManager} import org.apache.kafka.server.util.MockTime import org.apache.kafka.storage.internals.log.RemoteIndexCache.{REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, remoteTimeIndexFileName, remoteTransactionIndexFile, remoteTransactionIndexFileName} -import org.apache.kafka.storage.internals.log.{LogFileUtils, OffsetIndex, OffsetPosition, RemoteIndexCache, TimeIndex, TransactionIndex} +import org.apache.kafka.storage.internals.log.{AbortedTxn, CorruptIndexException, LogFileUtils, OffsetIndex, OffsetPosition, RemoteIndexCache, TimeIndex, TransactionIndex} import org.apache.kafka.test.{TestUtils => JTestUtils} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.EnumSource import org.mockito.ArgumentMatchers import org.mockito.ArgumentMatchers.any import org.mockito.Mockito._ import org.slf4j.{Logger, LoggerFactory} import java.io.{File, FileInputStream, IOException, PrintWriter} -import java.nio.file.Files +import java.nio.file.{Files, Paths} import java.util import java.util.Collections import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} @@ -524,23 +527,223 @@ class RemoteIndexCacheTest { } } - @Test - def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = { - // create Corrupt Offset Index File - createCorruptRemoteIndexCacheOffsetFile() + @ParameterizedTest + @EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", "TRANSACTION")) + def testCorruptCacheIndexFileExistsButNotInCache(indexType: IndexType): Unit = { + // create Corrupted Index File in remote index cache + createCorruptedIndexFile(indexType, cache.cacheDir()) val entry = cache.getIndexEntry(rlsMetadata) - // Test would fail if it throws corrupt Exception - val expectedOffsetIndexFileName: String = remoteOffsetIndexFileName(rlsMetadata) + // Test would fail if it throws Exception other than CorruptIndexException val offsetIndexFile = entry.offsetIndex.file().toPath + val txnIndexFile = entry.txnIndex.file().toPath + val timeIndexFile = entry.timeIndex.file().toPath + + val expectedOffsetIndexFileName: String = remoteOffsetIndexFileName(rlsMetadata) + val expectedTimeIndexFileName: String = remoteTimeIndexFileName(rlsMetadata) + val expectedTxnIndexFileName: String = remoteTransactionIndexFileName(rlsMetadata) assertEquals(expectedOffsetIndexFileName, offsetIndexFile.getFileName.toString) + assertEquals(expectedTxnIndexFileName, txnIndexFile.getFileName.toString) + assertEquals(expectedTimeIndexFileName, timeIndexFile.getFileName.toString) + // assert that parent directory for the index files is correct assertEquals(RemoteIndexCache.DIR_NAME, offsetIndexFile.getParent.getFileName.toString, - s"offsetIndex=$offsetIndexFile is not overwrite under incorrect parent") + s"offsetIndex=$offsetIndexFile is created under incorrect parent") + assertEquals(RemoteIndexCache.DIR_NAME, txnIndexFile.getParent.getFileName.toString, + s"txnIndex=$txnIndexFile is created under incorrect parent") + assertEquals(RemoteIndexCache.DIR_NAME, timeIndexFile.getParent.getFileName.toString, + s"timeIndex=$timeIndexFile is created under incorrect parent") + // file is corrupted it should fetch from remote storage again verifyFetchIndexInvocation(count = 1) } + @Test + def testMultipleIndexEntriesExecutionInCorruptException(): Unit = { + reset(rsm) + when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType]))) + .thenAnswer(ans => { + val metadata = ans.getArgument[RemoteLogSegmentMetadata](0) + val indexType = ans.getArgument[IndexType](1) + val offsetIdx = createOffsetIndexForSegmentMetadata(metadata) + val timeIdx = createTimeIndexForSegmentMetadata(metadata) + val txnIdx = createTxIndexForSegmentMetadata(metadata) + maybeAppendIndexEntries(offsetIdx, timeIdx) + // Create corrupted index file + createCorruptTimeIndexOffsetFile(tpDir) + indexType match { + case IndexType.OFFSET => new FileInputStream(offsetIdx.file) + case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file) + case IndexType.TRANSACTION => new FileInputStream(txnIdx.file) + case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed. + case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed. + } + }) + + assertThrows(classOf[CorruptIndexException], () => cache.getIndexEntry(rlsMetadata)) + assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id())) + verifyFetchIndexInvocation(1, Seq(IndexType.OFFSET, IndexType.TIMESTAMP)) + verifyFetchIndexInvocation(0, Seq(IndexType.TRANSACTION)) + // Current status + // (cache is null) + // RemoteCacheDir contain + // 1. Offset Index File is fine and not corrupted + // 2. Time Index File is corrupted + // What should be the code flow in next execution + // 1. No rsm call for fetching OffSet Index File. + // 2. Time index file should be fetched from remote storage again as it is corrupted in the first execution. + // 3. Transaction index file should be fetched from remote storage. + reset(rsm) + // delete all files created in tpDir + Files.walk(tpDir.toPath, 1) + .filter(Files.isRegularFile(_)) + .forEach(path => Files.deleteIfExists(path)) + // rsm should return no corrupted file in the 2nd execution + when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType]))) + .thenAnswer(ans => { + val metadata = ans.getArgument[RemoteLogSegmentMetadata](0) + val indexType = ans.getArgument[IndexType](1) + val offsetIdx = createOffsetIndexForSegmentMetadata(metadata) + val timeIdx = createTimeIndexForSegmentMetadata(metadata) + val txnIdx = createTxIndexForSegmentMetadata(metadata) + maybeAppendIndexEntries(offsetIdx, timeIdx) + indexType match { + case IndexType.OFFSET => new FileInputStream(offsetIdx.file) + case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file) + case IndexType.TRANSACTION => new FileInputStream(txnIdx.file) + case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed. + case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed. + } + }) + cache.getIndexEntry(rlsMetadata) + // rsm should not be called to fetch offset Index + verifyFetchIndexInvocation(0, Seq(IndexType.OFFSET)) + verifyFetchIndexInvocation(1, Seq(IndexType.TIMESTAMP)) + // Transaction index would be fetched again + // as previous getIndexEntry failed before fetchTransactionIndex + verifyFetchIndexInvocation(1, Seq(IndexType.TRANSACTION)) + } + + @Test + def testIndexFileAlreadyExistOnDiskButNotInCache(): Unit = { + val remoteIndexCacheDir = cache.cacheDir() + val tempSuffix = ".tmptest" + + def getRemoteCacheIndexFileFromDisk(suffix: String) = { + Files.walk(remoteIndexCacheDir.toPath) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .findAny() + } + + def renameRemoteCacheIndexFileFromDisk(suffix: String) = { + Files.walk(remoteIndexCacheDir.toPath) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .forEach(f => Utils.atomicMoveWithFallback(f, f.resolveSibling(f.getFileName().toString().stripSuffix(tempSuffix)))) + } + + val entry = cache.getIndexEntry(rlsMetadata) + verifyFetchIndexInvocation(count = 1) + // copy files with temporary name + Files.copy(entry.offsetIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.offsetIndex().file().getPath(), "", tempSuffix))) + Files.copy(entry.txnIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath(), "", tempSuffix))) + Files.copy(entry.timeIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath(), "", tempSuffix))) + + cache.internalCache().invalidate(rlsMetadata.remoteLogSegmentId().id()) + + // wait until entry is marked for deletion + TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup, + "Failed to mark cache entry for cleanup after invalidation") + TestUtils.waitUntilTrue(() => entry.isCleanStarted, + "Failed to cleanup cache entry after invalidation") + + // restore index files + renameRemoteCacheIndexFileFromDisk(tempSuffix) + // validate cache entry for the above key should be null + assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id())) + cache.getIndexEntry(rlsMetadata) + // Index Files already exist ,rsm should not fetch them again. + verifyFetchIndexInvocation(count = 1) + // verify index files on disk + assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}") + assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should be present on disk at ${remoteIndexCacheDir.toPath}") + assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should be present on disk at ${remoteIndexCacheDir.toPath}") + } + + @ParameterizedTest + @EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", "TRANSACTION")) + def testRSMReturnCorruptedIndexFile(testIndexType: IndexType): Unit = { + when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType]))) + .thenAnswer(ans => { + val metadata = ans.getArgument[RemoteLogSegmentMetadata](0) + val indexType = ans.getArgument[IndexType](1) + val offsetIdx = createOffsetIndexForSegmentMetadata(metadata) + val timeIdx = createTimeIndexForSegmentMetadata(metadata) + val txnIdx = createTxIndexForSegmentMetadata(metadata) + maybeAppendIndexEntries(offsetIdx, timeIdx) + // Create corrupt index file return from RSM + createCorruptedIndexFile(testIndexType, tpDir) + indexType match { + case IndexType.OFFSET => new FileInputStream(offsetIdx.file) + case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file) + case IndexType.TRANSACTION => new FileInputStream(txnIdx.file) + case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed. + case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed. + } + }) + assertThrows(classOf[CorruptIndexException], () => cache.getIndexEntry(rlsMetadata)) + } + + @Test + def testConcurrentCacheDeletedFileExists(): Unit = { + val remoteIndexCacheDir = cache.cacheDir() + + def getRemoteCacheIndexFileFromDisk(suffix: String) = { + Files.walk(remoteIndexCacheDir.toPath) + .filter(Files.isRegularFile(_)) + .filter(path => path.getFileName.toString.endsWith(suffix)) + .findAny() + } + + val entry = cache.getIndexEntry(rlsMetadata) + // verify index files on disk + assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}") + assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should be present on disk at ${remoteIndexCacheDir.toPath}") + assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should be present on disk at ${remoteIndexCacheDir.toPath}") + + // Simulating a concurrency issue where deleted files already exist on disk + // This happen when cleanerThread is slow and not able to delete index entries + // while same index Entry is cached again and invalidated. + // The new deleted file created should be replaced by existing deleted file. + + // create deleted suffix file + Files.copy(entry.offsetIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.offsetIndex().file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX))) + Files.copy(entry.txnIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX))) + Files.copy(entry.timeIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX))) + + // verify deleted file exists on disk + assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent, s"Deleted Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}") + + cache.internalCache().invalidate(rlsMetadata.remoteLogSegmentId().id()) + + // wait until entry is marked for deletion + TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup, + "Failed to mark cache entry for cleanup after invalidation") + TestUtils.waitUntilTrue(() => entry.isCleanStarted, + "Failed to cleanup cache entry after invalidation") + + // verify no index files on disk + waitUntilTrue(() => !getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, + s"Offset index file should not be present on disk at ${remoteIndexCacheDir.toPath}") + waitUntilTrue(() => !getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, + s"Txn index file should not be present on disk at ${remoteIndexCacheDir.toPath}") + waitUntilTrue(() => !getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, + s"Time index file should not be present on disk at ${remoteIndexCacheDir.toPath}") + waitUntilTrue(() => !getRemoteCacheIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent, + s"Index file marked for deletion should not be present on disk at ${remoteIndexCacheDir.toPath}") + } + private def generateSpyCacheEntry(remoteLogSegmentId: RemoteLogSegmentId = RemoteLogSegmentId.generateNew(idPartition)): RemoteIndexCache.Entry = { val rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset, @@ -581,6 +784,22 @@ class RemoteIndexCacheTest { new TransactionIndex(metadata.startOffset(), txnIdxFile) } + private def createCorruptTxnIndexForSegmentMetadata(dir: File, metadata: RemoteLogSegmentMetadata): TransactionIndex = { + val txnIdxFile = remoteTransactionIndexFile(dir, metadata) + txnIdxFile.createNewFile() + val txnIndex = new TransactionIndex(metadata.startOffset(), txnIdxFile) + val abortedTxns = List( + new AbortedTxn(0L, 0, 10, 11), + new AbortedTxn(1L, 5, 15, 13), + new AbortedTxn(2L, 18, 35, 25), + new AbortedTxn(3L, 32, 50, 40)) + abortedTxns.foreach(txnIndex.append) + txnIndex.close() + + // open the index with a different starting offset to fake invalid data + return new TransactionIndex(100L, txnIdxFile) + } + private def createTimeIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata): TimeIndex = { val maxEntries = (metadata.endOffset() - metadata.startOffset()).asInstanceOf[Int] new TimeIndex(remoteTimeIndexFile(tpDir, metadata), metadata.startOffset(), maxEntries * 12) @@ -616,12 +835,29 @@ class RemoteIndexCacheTest { } } - private def createCorruptRemoteIndexCacheOffsetFile(): Unit = { - val pw = new PrintWriter(remoteOffsetIndexFile(new File(tpDir, RemoteIndexCache.DIR_NAME), rlsMetadata)) + private def createCorruptOffsetIndexFile(dir: File): Unit = { + val pw = new PrintWriter(remoteOffsetIndexFile(dir, rlsMetadata)) pw.write("Hello, world") // The size of the string written in the file is 12 bytes, // but it should be multiple of Offset Index EntrySIZE which is equal to 8. pw.close() } + private def createCorruptTimeIndexOffsetFile(dir: File): Unit = { + val pw = new PrintWriter(remoteTimeIndexFile(dir, rlsMetadata)) + pw.write("Hello, world1") + // The size of the string written in the file is 13 bytes, + // but it should be multiple of Time Index EntrySIZE which is equal to 12. + pw.close() + } + + private def createCorruptedIndexFile(indexType: IndexType, dir: File): Unit = { + if (indexType == IndexType.OFFSET) { + createCorruptOffsetIndexFile(dir) + } else if (indexType == IndexType.TIMESTAMP) { + createCorruptTimeIndexOffsetFile(dir) + } else if (indexType == IndexType.TRANSACTION) { + createCorruptTxnIndexForSegmentMetadata(dir, rlsMetadata) + } + } } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java index 2bbe9d76ecf..5cdaf77c294 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java @@ -166,6 +166,11 @@ public class RemoteIndexCache implements Closeable { return internalCache; } + // Visible for testing + public File cacheDir() { + return cacheDir; + } + public void remove(Uuid key) { lock.readLock().lock(); try { @@ -674,4 +679,4 @@ public class RemoteIndexCache implements Closeable { return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) + LogFileUtils.TXN_INDEX_FILE_SUFFIX; } -} \ No newline at end of file +}