KAFKA-15169: Added TestCase in RemoteIndexCache (#14482)

est Cases Covered

    1. Index Files already exist on disk but not in Cache i.e. RemoteIndexCache should not call remoteStorageManager to fetch it instead cache it from the local index file present.
    2. RSM returns CorruptedIndex File i.e. RemoteIndexCache should throw CorruptedIndexException instead of successfull execution.
    3. Deleted Suffix Indexes file already present on disk i.e. If cleaner thread is slow , then there is a chance of deleted index files present on the disk while in parallel same index Entry is invalidated. To understand more refer https://issues.apache.org/jira/browse/KAFKA-15169

Reviewers: Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>, Kamal Chandraprakash<kamal.chandraprakash@gmail.com>
This commit is contained in:
Arpit Goyal 2023-10-11 08:28:17 +05:30 committed by Divij Vaidya
parent 2b2c242b81
commit 90c79f4e1f
2 changed files with 253 additions and 12 deletions

View File

@ -17,23 +17,26 @@
package kafka.log.remote
import kafka.utils.TestUtils
import kafka.utils.TestUtils.waitUntilTrue
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteResourceNotFoundException, RemoteStorageManager}
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.RemoteIndexCache.{REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD, remoteOffsetIndexFile, remoteOffsetIndexFileName, remoteTimeIndexFile, remoteTimeIndexFileName, remoteTransactionIndexFile, remoteTransactionIndexFileName}
import org.apache.kafka.storage.internals.log.{LogFileUtils, OffsetIndex, OffsetPosition, RemoteIndexCache, TimeIndex, TransactionIndex}
import org.apache.kafka.storage.internals.log.{AbortedTxn, CorruptIndexException, LogFileUtils, OffsetIndex, OffsetPosition, RemoteIndexCache, TimeIndex, TransactionIndex}
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.slf4j.{Logger, LoggerFactory}
import java.io.{File, FileInputStream, IOException, PrintWriter}
import java.nio.file.Files
import java.nio.file.{Files, Paths}
import java.util
import java.util.Collections
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
@ -524,23 +527,223 @@ class RemoteIndexCacheTest {
}
}
@Test
def testCorruptOffsetIndexFileExistsButNotInCache(): Unit = {
// create Corrupt Offset Index File
createCorruptRemoteIndexCacheOffsetFile()
@ParameterizedTest
@EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", "TRANSACTION"))
def testCorruptCacheIndexFileExistsButNotInCache(indexType: IndexType): Unit = {
// create Corrupted Index File in remote index cache
createCorruptedIndexFile(indexType, cache.cacheDir())
val entry = cache.getIndexEntry(rlsMetadata)
// Test would fail if it throws corrupt Exception
val expectedOffsetIndexFileName: String = remoteOffsetIndexFileName(rlsMetadata)
// Test would fail if it throws Exception other than CorruptIndexException
val offsetIndexFile = entry.offsetIndex.file().toPath
val txnIndexFile = entry.txnIndex.file().toPath
val timeIndexFile = entry.timeIndex.file().toPath
val expectedOffsetIndexFileName: String = remoteOffsetIndexFileName(rlsMetadata)
val expectedTimeIndexFileName: String = remoteTimeIndexFileName(rlsMetadata)
val expectedTxnIndexFileName: String = remoteTransactionIndexFileName(rlsMetadata)
assertEquals(expectedOffsetIndexFileName, offsetIndexFile.getFileName.toString)
assertEquals(expectedTxnIndexFileName, txnIndexFile.getFileName.toString)
assertEquals(expectedTimeIndexFileName, timeIndexFile.getFileName.toString)
// assert that parent directory for the index files is correct
assertEquals(RemoteIndexCache.DIR_NAME, offsetIndexFile.getParent.getFileName.toString,
s"offsetIndex=$offsetIndexFile is not overwrite under incorrect parent")
s"offsetIndex=$offsetIndexFile is created under incorrect parent")
assertEquals(RemoteIndexCache.DIR_NAME, txnIndexFile.getParent.getFileName.toString,
s"txnIndex=$txnIndexFile is created under incorrect parent")
assertEquals(RemoteIndexCache.DIR_NAME, timeIndexFile.getParent.getFileName.toString,
s"timeIndex=$timeIndexFile is created under incorrect parent")
// file is corrupted it should fetch from remote storage again
verifyFetchIndexInvocation(count = 1)
}
@Test
def testMultipleIndexEntriesExecutionInCorruptException(): Unit = {
reset(rsm)
when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
.thenAnswer(ans => {
val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
val indexType = ans.getArgument[IndexType](1)
val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
val timeIdx = createTimeIndexForSegmentMetadata(metadata)
val txnIdx = createTxIndexForSegmentMetadata(metadata)
maybeAppendIndexEntries(offsetIdx, timeIdx)
// Create corrupted index file
createCorruptTimeIndexOffsetFile(tpDir)
indexType match {
case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed.
}
})
assertThrows(classOf[CorruptIndexException], () => cache.getIndexEntry(rlsMetadata))
assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id()))
verifyFetchIndexInvocation(1, Seq(IndexType.OFFSET, IndexType.TIMESTAMP))
verifyFetchIndexInvocation(0, Seq(IndexType.TRANSACTION))
// Current status
// (cache is null)
// RemoteCacheDir contain
// 1. Offset Index File is fine and not corrupted
// 2. Time Index File is corrupted
// What should be the code flow in next execution
// 1. No rsm call for fetching OffSet Index File.
// 2. Time index file should be fetched from remote storage again as it is corrupted in the first execution.
// 3. Transaction index file should be fetched from remote storage.
reset(rsm)
// delete all files created in tpDir
Files.walk(tpDir.toPath, 1)
.filter(Files.isRegularFile(_))
.forEach(path => Files.deleteIfExists(path))
// rsm should return no corrupted file in the 2nd execution
when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
.thenAnswer(ans => {
val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
val indexType = ans.getArgument[IndexType](1)
val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
val timeIdx = createTimeIndexForSegmentMetadata(metadata)
val txnIdx = createTxIndexForSegmentMetadata(metadata)
maybeAppendIndexEntries(offsetIdx, timeIdx)
indexType match {
case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed.
}
})
cache.getIndexEntry(rlsMetadata)
// rsm should not be called to fetch offset Index
verifyFetchIndexInvocation(0, Seq(IndexType.OFFSET))
verifyFetchIndexInvocation(1, Seq(IndexType.TIMESTAMP))
// Transaction index would be fetched again
// as previous getIndexEntry failed before fetchTransactionIndex
verifyFetchIndexInvocation(1, Seq(IndexType.TRANSACTION))
}
@Test
def testIndexFileAlreadyExistOnDiskButNotInCache(): Unit = {
val remoteIndexCacheDir = cache.cacheDir()
val tempSuffix = ".tmptest"
def getRemoteCacheIndexFileFromDisk(suffix: String) = {
Files.walk(remoteIndexCacheDir.toPath)
.filter(Files.isRegularFile(_))
.filter(path => path.getFileName.toString.endsWith(suffix))
.findAny()
}
def renameRemoteCacheIndexFileFromDisk(suffix: String) = {
Files.walk(remoteIndexCacheDir.toPath)
.filter(Files.isRegularFile(_))
.filter(path => path.getFileName.toString.endsWith(suffix))
.forEach(f => Utils.atomicMoveWithFallback(f, f.resolveSibling(f.getFileName().toString().stripSuffix(tempSuffix))))
}
val entry = cache.getIndexEntry(rlsMetadata)
verifyFetchIndexInvocation(count = 1)
// copy files with temporary name
Files.copy(entry.offsetIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.offsetIndex().file().getPath(), "", tempSuffix)))
Files.copy(entry.txnIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath(), "", tempSuffix)))
Files.copy(entry.timeIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath(), "", tempSuffix)))
cache.internalCache().invalidate(rlsMetadata.remoteLogSegmentId().id())
// wait until entry is marked for deletion
TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup,
"Failed to mark cache entry for cleanup after invalidation")
TestUtils.waitUntilTrue(() => entry.isCleanStarted,
"Failed to cleanup cache entry after invalidation")
// restore index files
renameRemoteCacheIndexFileFromDisk(tempSuffix)
// validate cache entry for the above key should be null
assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id()))
cache.getIndexEntry(rlsMetadata)
// Index Files already exist ,rsm should not fetch them again.
verifyFetchIndexInvocation(count = 1)
// verify index files on disk
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}")
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should be present on disk at ${remoteIndexCacheDir.toPath}")
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should be present on disk at ${remoteIndexCacheDir.toPath}")
}
@ParameterizedTest
@EnumSource(value = classOf[IndexType], names = Array("OFFSET", "TIMESTAMP", "TRANSACTION"))
def testRSMReturnCorruptedIndexFile(testIndexType: IndexType): Unit = {
when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
.thenAnswer(ans => {
val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
val indexType = ans.getArgument[IndexType](1)
val offsetIdx = createOffsetIndexForSegmentMetadata(metadata)
val timeIdx = createTimeIndexForSegmentMetadata(metadata)
val txnIdx = createTxIndexForSegmentMetadata(metadata)
maybeAppendIndexEntries(offsetIdx, timeIdx)
// Create corrupt index file return from RSM
createCorruptedIndexFile(testIndexType, tpDir)
indexType match {
case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
case IndexType.TRANSACTION => new FileInputStream(txnIdx.file)
case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed.
}
})
assertThrows(classOf[CorruptIndexException], () => cache.getIndexEntry(rlsMetadata))
}
@Test
def testConcurrentCacheDeletedFileExists(): Unit = {
val remoteIndexCacheDir = cache.cacheDir()
def getRemoteCacheIndexFileFromDisk(suffix: String) = {
Files.walk(remoteIndexCacheDir.toPath)
.filter(Files.isRegularFile(_))
.filter(path => path.getFileName.toString.endsWith(suffix))
.findAny()
}
val entry = cache.getIndexEntry(rlsMetadata)
// verify index files on disk
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, s"Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}")
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent, s"Txn index file should be present on disk at ${remoteIndexCacheDir.toPath}")
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent, s"Time index file should be present on disk at ${remoteIndexCacheDir.toPath}")
// Simulating a concurrency issue where deleted files already exist on disk
// This happen when cleanerThread is slow and not able to delete index entries
// while same index Entry is cached again and invalidated.
// The new deleted file created should be replaced by existing deleted file.
// create deleted suffix file
Files.copy(entry.offsetIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.offsetIndex().file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)))
Files.copy(entry.txnIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)))
Files.copy(entry.timeIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath(), "", LogFileUtils.DELETED_FILE_SUFFIX)))
// verify deleted file exists on disk
assertTrue(getRemoteCacheIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent, s"Deleted Offset index file should be present on disk at ${remoteIndexCacheDir.toPath}")
cache.internalCache().invalidate(rlsMetadata.remoteLogSegmentId().id())
// wait until entry is marked for deletion
TestUtils.waitUntilTrue(() => entry.isMarkedForCleanup,
"Failed to mark cache entry for cleanup after invalidation")
TestUtils.waitUntilTrue(() => entry.isCleanStarted,
"Failed to cleanup cache entry after invalidation")
// verify no index files on disk
waitUntilTrue(() => !getRemoteCacheIndexFileFromDisk(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,
s"Offset index file should not be present on disk at ${remoteIndexCacheDir.toPath}")
waitUntilTrue(() => !getRemoteCacheIndexFileFromDisk(LogFileUtils.TXN_INDEX_FILE_SUFFIX).isPresent,
s"Txn index file should not be present on disk at ${remoteIndexCacheDir.toPath}")
waitUntilTrue(() => !getRemoteCacheIndexFileFromDisk(LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent,
s"Time index file should not be present on disk at ${remoteIndexCacheDir.toPath}")
waitUntilTrue(() => !getRemoteCacheIndexFileFromDisk(LogFileUtils.DELETED_FILE_SUFFIX).isPresent,
s"Index file marked for deletion should not be present on disk at ${remoteIndexCacheDir.toPath}")
}
private def generateSpyCacheEntry(remoteLogSegmentId: RemoteLogSegmentId
= RemoteLogSegmentId.generateNew(idPartition)): RemoteIndexCache.Entry = {
val rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset,
@ -581,6 +784,22 @@ class RemoteIndexCacheTest {
new TransactionIndex(metadata.startOffset(), txnIdxFile)
}
private def createCorruptTxnIndexForSegmentMetadata(dir: File, metadata: RemoteLogSegmentMetadata): TransactionIndex = {
val txnIdxFile = remoteTransactionIndexFile(dir, metadata)
txnIdxFile.createNewFile()
val txnIndex = new TransactionIndex(metadata.startOffset(), txnIdxFile)
val abortedTxns = List(
new AbortedTxn(0L, 0, 10, 11),
new AbortedTxn(1L, 5, 15, 13),
new AbortedTxn(2L, 18, 35, 25),
new AbortedTxn(3L, 32, 50, 40))
abortedTxns.foreach(txnIndex.append)
txnIndex.close()
// open the index with a different starting offset to fake invalid data
return new TransactionIndex(100L, txnIdxFile)
}
private def createTimeIndexForSegmentMetadata(metadata: RemoteLogSegmentMetadata): TimeIndex = {
val maxEntries = (metadata.endOffset() - metadata.startOffset()).asInstanceOf[Int]
new TimeIndex(remoteTimeIndexFile(tpDir, metadata), metadata.startOffset(), maxEntries * 12)
@ -616,12 +835,29 @@ class RemoteIndexCacheTest {
}
}
private def createCorruptRemoteIndexCacheOffsetFile(): Unit = {
val pw = new PrintWriter(remoteOffsetIndexFile(new File(tpDir, RemoteIndexCache.DIR_NAME), rlsMetadata))
private def createCorruptOffsetIndexFile(dir: File): Unit = {
val pw = new PrintWriter(remoteOffsetIndexFile(dir, rlsMetadata))
pw.write("Hello, world")
// The size of the string written in the file is 12 bytes,
// but it should be multiple of Offset Index EntrySIZE which is equal to 8.
pw.close()
}
private def createCorruptTimeIndexOffsetFile(dir: File): Unit = {
val pw = new PrintWriter(remoteTimeIndexFile(dir, rlsMetadata))
pw.write("Hello, world1")
// The size of the string written in the file is 13 bytes,
// but it should be multiple of Time Index EntrySIZE which is equal to 12.
pw.close()
}
private def createCorruptedIndexFile(indexType: IndexType, dir: File): Unit = {
if (indexType == IndexType.OFFSET) {
createCorruptOffsetIndexFile(dir)
} else if (indexType == IndexType.TIMESTAMP) {
createCorruptTimeIndexOffsetFile(dir)
} else if (indexType == IndexType.TRANSACTION) {
createCorruptTxnIndexForSegmentMetadata(dir, rlsMetadata)
}
}
}

View File

@ -166,6 +166,11 @@ public class RemoteIndexCache implements Closeable {
return internalCache;
}
// Visible for testing
public File cacheDir() {
return cacheDir;
}
public void remove(Uuid key) {
lock.readLock().lock();
try {
@ -674,4 +679,4 @@ public class RemoteIndexCache implements Closeable {
return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) + LogFileUtils.TXN_INDEX_FILE_SUFFIX;
}
}
}