KAFKA-17184: Fix the error thrown while accessing the RemoteIndexCache (#19462)

For segments that are uploaded to remote, RemoteIndexCache caches the
fetched offset, timestamp, and transaction index entries on the first
invocation to remote, then the subsequent invocations are accessed from
local.

The remote indexes that are cached locally gets removed on two cases:

1. Remote segments that are deleted due to breach by retention size/time
and start-offset.
2. The number of cached indexes exceed the remote-log-index-cache size
limit of 1 GB (default).

There are two layers of locks used in the RemoteIndexCache. First-layer
lock on the RemoteIndexCache and the second-layer lock on the
RemoteIndexCache#Entry.

**Issue**

1. The first-layer of lock coordinates the remote-log reader and deleter
threads. To ensure that the reader and deleter threads are not blocked
on each other, we only take `lock.readLock()` when accessing/deleting
the cached index entries.
2. The issue happens when both the reader and deleter threads took the
readLock, then the deleter thread marked the index as
`markedForCleanup`. Now, the reader thread which holds the `indexEntry`
gets an IllegalStateException when accessing it.
3. This is a concurrency issue, where we mark the entry as
`markedForCleanup` before removing it from the cache. See
RemoteIndexCache#remove, and RemoteIndexCache#removeAll methods.
4. When an entry gets evicted from cache due to breach by maxSize of 1
GB, then the cache remove that entry before calling the evictionListener
and all the operations are performed atomically by caffeine cache.

**Solution**

1. When the deleter thread marks an Entry for deletion, then we rename
the underlying index files with ".deleted" as suffix and add a job to
the remote-log-index-cleaner thread which perform the actual cleanup.
Previously, the indexes were not accessible once it was marked for
deletion. Now, we allow to access those renamed files (from entry that
is about to be removed and held by reader thread) until those relevant
files are removed from disk.
2. Similar to local-log index/segment deletion, once the files gets
renamed with ".deleted" as suffix then the actual deletion of file
happens after `file.delete.delay.ms` delay of 1 minute. The renamed
index files gets deleted after 30 seconds.
3. During this time, if the same index entry gets fetched again from
remote, then it does not have conflict with the deleted entry as the
file names are different.

Reviewers: Satish Duggana <satishd@apache.org>
This commit is contained in:
Kamal Chandraprakash 2025-04-18 16:43:37 +05:30 committed by GitHub
parent 8df700245c
commit 2cd733c9b3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 296 additions and 175 deletions

View File

@ -207,4 +207,24 @@ class SchedulerTest {
tickExecutor.shutdownNow()
}
}
@Test
def testPendingTaskSize(): Unit = {
val latch1 = new CountDownLatch(1)
val latch2 = new CountDownLatch(2)
val task1 = new Runnable {
override def run(): Unit = {
latch1.await()
}
}
scheduler.scheduleOnce("task1", task1, 0)
scheduler.scheduleOnce("task2", () => latch2.countDown(), 5)
scheduler.scheduleOnce("task3", () => latch2.countDown(), 5)
assertEquals(2, scheduler.pendingTaskSize())
latch1.countDown()
latch2.await()
assertEquals(0, scheduler.pendingTaskSize())
scheduler.shutdown()
assertEquals(0, scheduler.pendingTaskSize())
}
}

View File

@ -187,4 +187,8 @@ public class KafkaScheduler implements Scheduler {
ScheduledThreadPoolExecutor e = executor;
return e != null && e.getQueue().contains(task);
}
public int pendingTaskSize() {
return isStarted() ? executor.getQueue().size() : 0;
}
}

View File

@ -25,7 +25,7 @@ import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundExceptio
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
import org.apache.kafka.server.util.ShutdownableThread;
import org.apache.kafka.server.util.KafkaScheduler;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
@ -45,12 +45,11 @@ import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Stream;
@ -91,18 +90,14 @@ public class RemoteIndexCache implements Closeable {
*/
private final AtomicBoolean isRemoteIndexCacheClosed = new AtomicBoolean(false);
/**
* Unbounded queue containing the removed entries from the cache which are waiting to be garbage collected.
*/
private final LinkedBlockingQueue<Entry> expiredIndexes = new LinkedBlockingQueue<>();
/**
* Lock used to synchronize close with other read operations. This ensures that when we close, we don't have any other
* concurrent reads in-progress.
*/
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final RemoteStorageManager remoteStorageManager;
private final ShutdownableThread cleanerThread;
private final KafkaScheduler cleanerScheduler = new KafkaScheduler(1, true, REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD);
private int fileDeleteDelayMs = 10_000;
/**
* Actual cache implementation that this file wraps around.
@ -130,10 +125,7 @@ public class RemoteIndexCache implements Closeable {
internalCache = initEmptyCache(maxSize);
init();
// Start cleaner thread that will clean the expired entries.
cleanerThread = createCleanerThread();
cleanerThread.start();
cleanerScheduler.startup();
}
public void resizeCacheSize(long remoteLogIndexFileCacheSize) {
@ -159,24 +151,25 @@ public class RemoteIndexCache implements Closeable {
.evictionListener((Uuid key, Entry entry, RemovalCause cause) -> {
// Mark the entries for cleanup and add them to the queue to be garbage collected later by the background thread.
if (entry != null) {
enqueueEntryForCleanup(entry, key);
enqueueEntryForCleanup(entry);
} else {
log.error("Received entry as null for key {} when the it is removed from the cache.", key);
}
}).build();
}
public Collection<Entry> expiredIndexes() {
return Collections.unmodifiableCollection(expiredIndexes);
// Visible for testing
int expiredIdxPendingForDeletion() {
return cleanerScheduler.pendingTaskSize();
}
// Visible for testing
public Cache<Uuid, Entry> internalCache() {
Cache<Uuid, Entry> internalCache() {
return internalCache;
}
// Visible for testing
public File cacheDir() {
File cacheDir() {
return cacheDir;
}
@ -184,7 +177,7 @@ public class RemoteIndexCache implements Closeable {
lock.readLock().lock();
try {
internalCache.asMap().computeIfPresent(key, (k, v) -> {
enqueueEntryForCleanup(v, k);
enqueueEntryForCleanup(v);
// Returning null to remove the key from the cache
return null;
});
@ -197,7 +190,9 @@ public class RemoteIndexCache implements Closeable {
lock.readLock().lock();
try {
keys.forEach(key -> internalCache.asMap().computeIfPresent(key, (k, v) -> {
enqueueEntryForCleanup(v, k);
// Mark then entry for cleanup before removing it from the cache to avoid contention with the
// next fetch for the same key.
enqueueEntryForCleanup(v);
// Returning null to remove the key from the cache
return null;
}));
@ -206,46 +201,31 @@ public class RemoteIndexCache implements Closeable {
}
}
private void enqueueEntryForCleanup(Entry entry, Uuid key) {
private void enqueueEntryForCleanup(Entry entry) {
try {
entry.markForCleanup();
if (!expiredIndexes.offer(entry)) {
log.error("Error while inserting entry {} for key {} into the cleaner queue because queue is full.", entry, key);
}
Runnable runnable = () -> {
try {
entry.cleanup();
log.debug("Cleaned up index entry {}", entry);
} catch (Exception ex) {
log.error("Error occurred while cleaning up expired entry: {}", entry, ex);
}
};
cleanerScheduler.scheduleOnce("delete-index", runnable, fileDeleteDelayMs);
} catch (IOException e) {
throw new KafkaException(e);
}
}
// Visible for testing
public ShutdownableThread cleanerThread() {
return cleanerThread;
void setFileDeleteDelayMs(int fileDeleteDelayMs) {
this.fileDeleteDelayMs = fileDeleteDelayMs;
}
private ShutdownableThread createCleanerThread() {
ShutdownableThread thread = new ShutdownableThread(REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD) {
public void doWork() {
try {
Entry entry = expiredIndexes.take();
log.debug("Cleaning up index entry {}", entry);
entry.cleanup();
} catch (InterruptedException ie) {
// cleaner thread should only be interrupted when cache is being closed, else it's an error
if (!isRemoteIndexCacheClosed.get()) {
log.error("Cleaner thread received interruption but remote index cache is not closed", ie);
// propagate the InterruptedException outside to correctly close the thread.
throw new KafkaException(ie);
} else {
log.debug("Cleaner thread was interrupted on cache shutdown");
}
} catch (Exception ex) {
// do not exit for exceptions other than InterruptedException
log.error("Error occurred while cleaning up expired entry", ex);
}
}
};
thread.setDaemon(true);
return thread;
// Visible for testing
KafkaScheduler cleanerScheduler() {
return cleanerScheduler;
}
private void init() throws IOException {
@ -335,7 +315,8 @@ public class RemoteIndexCache implements Closeable {
log.info("RemoteIndexCache starts up in {} ms.", Time.SYSTEM.hiResClockMs() - start);
}
private <T> T loadIndexFile(File file, RemoteLogSegmentMetadata remoteLogSegmentMetadata,
private <T> T loadIndexFile(File file,
RemoteLogSegmentMetadata remoteLogSegmentMetadata,
Function<RemoteLogSegmentMetadata, InputStream> fetchRemoteIndex,
Function<File, T> readIndex) throws IOException {
File indexFile = new File(cacheDir, file.getName());
@ -358,26 +339,25 @@ public class RemoteIndexCache implements Closeable {
return index;
}
public Entry getIndexEntry(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
if (isRemoteIndexCacheClosed.get()) {
throw new IllegalStateException("Unable to fetch index for " +
"segment id=" + remoteLogSegmentMetadata.remoteLogSegmentId().id() + ". Instance is already closed.");
}
public Entry getIndexEntry(RemoteLogSegmentMetadata metadata) {
Uuid uuid = metadata.remoteLogSegmentId().id();
throwIfCacheClosed(uuid);
lock.readLock().lock();
try {
// while this thread was waiting for lock, another thread may have changed the value of isRemoteIndexCacheClosed.
// check for index close again
if (isRemoteIndexCacheClosed.get()) {
throw new IllegalStateException("Unable to fetch index for segment-id = "
+ remoteLogSegmentMetadata.remoteLogSegmentId().id() + ". Index instance is already closed.");
}
return internalCache.get(remoteLogSegmentMetadata.remoteLogSegmentId().id(),
uuid -> createCacheEntry(remoteLogSegmentMetadata));
throwIfCacheClosed(uuid);
return internalCache.get(uuid, k -> createCacheEntry(metadata));
} finally {
lock.readLock().unlock();
}
}
private void throwIfCacheClosed(Uuid uuid) {
if (isRemoteIndexCacheClosed.get()) {
throw new IllegalStateException("Unable to fetch index for segment-id = " + uuid +
". RemoteIndexCache is closed.");
}
}
private RemoteIndexCache.Entry createCacheEntry(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
long startOffset = remoteLogSegmentMetadata.startOffset();
try {
@ -466,16 +446,12 @@ public class RemoteIndexCache implements Closeable {
lock.writeLock().lock();
try {
log.info("Close initiated for RemoteIndexCache. Cache stats={}. Cache entries pending delete={}",
internalCache.stats(), expiredIndexes.size());
// Initiate shutdown for cleaning thread
boolean shutdownRequired = cleanerThread.initiateShutdown();
internalCache.stats(), expiredIdxPendingForDeletion());
cleanerScheduler.shutdown();
// Close all the opened indexes to force unload mmap memory. This does not delete the index files from disk.
internalCache.asMap().forEach((uuid, entry) -> entry.close());
// wait for cleaner thread to shutdown
if (shutdownRequired) cleanerThread.awaitShutdown();
// Note that internal cache does not require explicit cleaning/closing. We don't want to invalidate or cleanup
// the cache as both would lead to triggering of removal listener.
internalCache.asMap().forEach((uuid, entry) -> entry.close());
log.info("Close completed for RemoteIndexCache");
} catch (InterruptedException e) {
throw new KafkaException(e);
@ -510,32 +486,30 @@ public class RemoteIndexCache implements Closeable {
this.entrySizeBytes = estimatedEntrySize();
}
// Visible for testing
public OffsetIndex offsetIndex() {
return offsetIndex;
}
// Visible for testing
public TimeIndex timeIndex() {
return timeIndex;
}
// Visible for testing
public TransactionIndex txnIndex() {
return txnIndex;
}
// Visible for testing
public boolean isCleanStarted() {
boolean isCleanStarted() {
return cleanStarted;
}
// Visible for testing
public boolean isMarkedForCleanup() {
boolean isMarkedForCleanup() {
return markedForCleanup;
}
public long entrySizeBytes() {
// Visible for testing
long entrySizeBytes() {
return entrySizeBytes;
}
@ -554,8 +528,9 @@ public class RemoteIndexCache implements Closeable {
public OffsetPosition lookupOffset(long targetOffset) {
entryLock.readLock().lock();
try {
if (markedForCleanup) throw new IllegalStateException("This entry is marked for cleanup");
else return offsetIndex.lookup(targetOffset);
if (cleanStarted)
throw new IllegalStateException("This entry is marked for cleanup");
return offsetIndex.lookup(targetOffset);
} finally {
entryLock.readLock().unlock();
}
@ -564,8 +539,8 @@ public class RemoteIndexCache implements Closeable {
public OffsetPosition lookupTimestamp(long timestamp, long startingOffset) {
entryLock.readLock().lock();
try {
if (markedForCleanup) throw new IllegalStateException("This entry is marked for cleanup");
if (cleanStarted)
throw new IllegalStateException("This entry is marked for cleanup");
TimestampOffset timestampOffset = timeIndex.lookup(timestamp);
return offsetIndex.lookup(Math.max(startingOffset, timestampOffset.offset));
} finally {
@ -573,7 +548,7 @@ public class RemoteIndexCache implements Closeable {
}
}
public void markForCleanup() throws IOException {
void markForCleanup() throws IOException {
entryLock.writeLock().lock();
try {
if (!markedForCleanup) {
@ -587,14 +562,12 @@ public class RemoteIndexCache implements Closeable {
}
}
public void cleanup() throws IOException {
void cleanup() throws IOException {
entryLock.writeLock().lock();
try {
markForCleanup();
// no-op if clean is done already
if (!cleanStarted) {
if (!cleanStarted && isMarkedForCleanup()) {
cleanStarted = true;
List<StorageAction<Void, Exception>> actions = List.of(() -> {
offsetIndex.deleteIfExists();
return null;
@ -605,7 +578,6 @@ public class RemoteIndexCache implements Closeable {
txnIndex.deleteIfExists();
return null;
});
tryAll(actions);
}
} finally {
@ -701,28 +673,28 @@ public class RemoteIndexCache implements Closeable {
return startOffset + "_" + segmentId.toString();
}
public static File remoteOffsetIndexFile(File dir, RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
static File remoteOffsetIndexFile(File dir, RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
return new File(dir, remoteOffsetIndexFileName(remoteLogSegmentMetadata));
}
public static String remoteOffsetIndexFileName(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
static String remoteOffsetIndexFileName(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
String prefix = generateFileNamePrefixForIndex(remoteLogSegmentMetadata);
return prefix + LogFileUtils.INDEX_FILE_SUFFIX;
}
public static File remoteTimeIndexFile(File dir, RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
static File remoteTimeIndexFile(File dir, RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
return new File(dir, remoteTimeIndexFileName(remoteLogSegmentMetadata));
}
public static String remoteTimeIndexFileName(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
static String remoteTimeIndexFileName(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) + TIME_INDEX_FILE_SUFFIX;
}
public static File remoteTransactionIndexFile(File dir, RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
static File remoteTransactionIndexFile(File dir, RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
return new File(dir, remoteTransactionIndexFileName(remoteLogSegmentMetadata));
}
public static String remoteTransactionIndexFileName(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
static String remoteTransactionIndexFileName(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
return generateFileNamePrefixForIndex(remoteLogSegmentMetadata) + LogFileUtils.TXN_INDEX_FILE_SUFFIX;
}
}

View File

@ -34,6 +34,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.invocation.InvocationOnMock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -50,8 +51,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@ -59,6 +62,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.apache.kafka.storage.internals.log.RemoteIndexCache.DIR_NAME;
@ -81,7 +85,6 @@ import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
@ -115,10 +118,10 @@ public class RemoteIndexCacheTest {
Files.createDirectory(tpDir.toPath());
RemoteLogSegmentId remoteLogSegmentId = RemoteLogSegmentId.generateNew(idPartition);
rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L));
cache = new RemoteIndexCache(defaultRemoteIndexCacheSizeBytes, rsm, tpDir.toString());
rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset, time.milliseconds(),
brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L));
cache = new RemoteIndexCache(defaultRemoteIndexCacheSizeBytes, rsm, logDir.toString());
cache.setFileDeleteDelayMs(0);
mockRsmFetchIndex(rsm);
}
@ -192,7 +195,7 @@ public class RemoteIndexCacheTest {
return switch (indexType) {
case OFFSET -> new FileInputStream(offsetIdx.file());
case TIMESTAMP -> new FileInputStream(timeIdx.file());
// Throw RemoteResourceNotFoundException since transaction index is not available
// Throw RemoteResourceNotFoundException since transaction index is not available
case TRANSACTION -> throw new RemoteResourceNotFoundException("txn index not found");
case LEADER_EPOCH -> null; // leader-epoch-cache is not accessed.
case PRODUCER_SNAPSHOT -> null; // producer-snapshot is not accessed.
@ -206,7 +209,7 @@ public class RemoteIndexCacheTest {
}
@Test
public void testPositionForNonExistingIndexFromRemoteStorage() {
public void testPositionForNonExistentEntry() {
OffsetIndex offsetIndex = cache.getIndexEntry(rlsMetadata).offsetIndex();
int lastOffsetPosition = cache.lookupOffset(rlsMetadata, offsetIndex.lastOffset());
long greaterOffsetThanLastOffset = offsetIndex.lastOffset() + 1;
@ -223,7 +226,7 @@ public class RemoteIndexCacheTest {
long estimateEntryBytesSize = estimateOneEntryBytesSize();
// close existing cache created in test setup before creating a new one
Utils.closeQuietly(cache, "RemoteIndexCache created for unit test");
cache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, tpDir.toString());
cache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, logDir.toString());
TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
List<RemoteLogSegmentMetadata> metadataList = generateRemoteLogSegmentMetadata(3, tpId);
@ -267,11 +270,7 @@ public class RemoteIndexCacheTest {
}
@Test
public void testGetIndexAfterCacheClose() throws IOException, RemoteStorageException, InterruptedException {
// close existing cache created in test setup before creating a new one
Utils.closeQuietly(cache, "RemoteIndexCache created for unit test");
cache = new RemoteIndexCache(2 * estimateOneEntryBytesSize(), rsm, tpDir.toString());
public void shouldThrowErrorWhenAccessedAfterCacheClose() throws RemoteStorageException, InterruptedException {
TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
List<RemoteLogSegmentMetadata> metadataList = generateRemoteLogSegmentMetadata(3, tpId);
@ -281,8 +280,6 @@ public class RemoteIndexCacheTest {
verifyFetchIndexInvocation(1);
cache.close();
// Check IllegalStateException is thrown when index is accessed after it is closed.
assertThrows(IllegalStateException.class, () -> cache.getIndexEntry(metadataList.get(0)));
}
@ -313,19 +310,18 @@ public class RemoteIndexCacheTest {
cache.internalCache().put(internalIndexKey, cacheEntry);
// no expired entries yet
assertEquals(0, cache.expiredIndexes().size(), "expiredIndex queue should be zero at start of test");
assertEquals(0, cache.expiredIdxPendingForDeletion(), "expiredIndex queue should be zero at start of test");
// call remove function to mark the entry for removal
cache.remove(internalIndexKey);
// wait until entry is marked for deletion
TestUtils.waitForCondition(cacheEntry::isMarkedForCleanup,
"Failed to mark cache entry for cleanup after invalidation");
assertTrue(cacheEntry::isMarkedForCleanup,
"Failed to mark cache entry for cleanup after remove");
TestUtils.waitForCondition(cacheEntry::isCleanStarted,
"Failed to cleanup cache entry after invalidation");
"Failed to cleanup cache entry after remove");
// first it will be marked for cleanup, second time markForCleanup is called when cleanup() is called
verify(cacheEntry, times(2)).markForCleanup();
verify(cacheEntry).markForCleanup();
// after that async it will be cleaned up
verify(cacheEntry).cleanup();
@ -346,7 +342,7 @@ public class RemoteIndexCacheTest {
}
private Optional<Path> getIndexFileFromDisk(String suffix) throws IOException {
return Files.walk(tpDir.toPath())
return Files.walk(cache.cacheDir().toPath())
.filter(Files::isRegularFile)
.filter(path -> path.getFileName().toString().endsWith(suffix))
.findAny();
@ -356,23 +352,25 @@ public class RemoteIndexCacheTest {
public void testCleanerThreadShutdown() throws IOException, InterruptedException {
// cache is empty at beginning
assertTrue(cache.internalCache().asMap().isEmpty());
// verify that cleaner thread is running
Set<Thread> threads = getRunningCleanerThread();
assertEquals(1, threads.size(),
"Found unexpected " + threads.size() + " threads=" + threads.stream().map(Thread::getName).collect(Collectors.joining(", ")));
// create a new entry
RemoteIndexCache.Entry spyEntry = generateSpyCacheEntry();
// an exception should not close the cleaner thread
doThrow(new RuntimeException("kaboom! I am expected exception in unit test.")).when(spyEntry).cleanup();
doAnswer(invocation -> {
invocation.callRealMethod();
// an exception should not close the cleaner thread
throw new RuntimeException("kaboom! I am expected exception in unit test.");
}).when(spyEntry).cleanup();
Uuid key = Uuid.randomUuid();
cache.internalCache().put(key, spyEntry);
// trigger cleanup
cache.remove(key);
// wait for cleanup to start
TestUtils.waitForCondition(spyEntry::isCleanStarted,
"Failed while waiting for clean up to start");
// Give the thread cleaner thread some time to throw an exception
Thread.sleep(100);
verify(spyEntry, times(1)).cleanup();
// Verify that Cleaner thread is still running even when exception is thrown in doWork()
threads = getRunningCleanerThread();
Set<Thread> threads = getRunningCleanerThread();
assertEquals(1, threads.size(),
"Found unexpected " + threads.size() + " threads=" + threads.stream().map(Thread::getName).collect(Collectors.joining(", ")));
// close the cache properly
@ -381,7 +379,7 @@ public class RemoteIndexCacheTest {
threads = getRunningCleanerThread();
assertTrue(threads.isEmpty(), "Found unexpected " + threads.size() + " threads=" + threads.stream().map(Thread::getName).collect(Collectors.joining(", ")));
// if the thread is correctly being shutdown it will not be running
assertFalse(cache.cleanerThread().isRunning(), "Unexpected thread state=running. Check error logs.");
assertFalse(cache.cleanerScheduler().isStarted(), "Unexpected thread state=running. Check error logs.");
}
@Test
@ -389,7 +387,7 @@ public class RemoteIndexCacheTest {
RemoteIndexCache.Entry spyEntry = generateSpyCacheEntry();
cache.internalCache().put(rlsMetadata.remoteLogSegmentId().id(), spyEntry);
TestUtils.waitForCondition(() -> cache.cleanerThread().isStarted(), "Cleaner thread should be started");
TestUtils.waitForCondition(() -> cache.cleanerScheduler().isStarted(), "Cleaner thread should be started");
// close the cache
cache.close();
@ -408,7 +406,7 @@ public class RemoteIndexCacheTest {
verify(spyEntry.timeIndex(), times(0)).deleteIfExists();
// verify cleaner thread is shutdown
assertTrue(cache.cleanerThread().isShutdownComplete());
assertFalse(cache.cleanerScheduler().isStarted());
}
@Test
@ -479,7 +477,7 @@ public class RemoteIndexCacheTest {
long estimateEntryBytesSize = estimateOneEntryBytesSize();
// close existing cache created in test setup before creating a new one
Utils.closeQuietly(cache, "RemoteIndexCache created for unit test");
cache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, tpDir.toString());
cache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, logDir.toString());
TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
List<RemoteLogSegmentMetadata> metadataList = generateRemoteLogSegmentMetadata(3, tpId);
@ -514,7 +512,7 @@ public class RemoteIndexCacheTest {
cache.close();
// Reload the cache from the disk and check the cache size is same as earlier
RemoteIndexCache reloadedCache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, tpDir.toString());
RemoteIndexCache reloadedCache = new RemoteIndexCache(2 * estimateEntryBytesSize, rsm, logDir.toString());
assertEquals(2, reloadedCache.internalCache().asMap().size());
reloadedCache.close();
@ -522,7 +520,7 @@ public class RemoteIndexCacheTest {
}
@Test
public void testRemoveItem() throws InterruptedException, IOException {
public void testRemoveItem() throws IOException {
RemoteLogSegmentId segmentId = rlsMetadata.remoteLogSegmentId();
Uuid segmentUuid = segmentId.id();
// generate and add entry to cache
@ -533,7 +531,7 @@ public class RemoteIndexCacheTest {
cache.remove(segmentId.id());
assertFalse(cache.internalCache().asMap().containsKey(segmentUuid));
TestUtils.waitForCondition(spyEntry::isMarkedForCleanup, "Failed to mark cache entry for cleanup after invalidation");
assertTrue(spyEntry::isMarkedForCleanup, "Failed to mark cache entry for cleanup after remove");
}
@Test
@ -553,7 +551,7 @@ public class RemoteIndexCacheTest {
}
@Test
public void testRemoveMultipleItems() throws InterruptedException, IOException {
public void testRemoveMultipleItems() throws IOException {
// generate and add entry to cache
Map<Uuid, RemoteIndexCache.Entry> uuidAndEntryList = new HashMap<>();
for (int i = 0; i < 10; i++) {
@ -568,7 +566,7 @@ public class RemoteIndexCacheTest {
}
cache.removeAll(uuidAndEntryList.keySet());
for (RemoteIndexCache.Entry entry : uuidAndEntryList.values()) {
TestUtils.waitForCondition(entry::isMarkedForCleanup, "Failed to mark cache entry for cleanup after invalidation");
assertTrue(entry::isMarkedForCleanup, "Failed to mark cache entry for cleanup after removeAll");
}
}
@ -760,40 +758,32 @@ public class RemoteIndexCacheTest {
}
@Test
public void testConcurrentRemoveReadForCache() throws IOException, InterruptedException, ExecutionException {
public void testConcurrentRemoveReadForCache1() throws IOException, InterruptedException, ExecutionException {
// Create a spy Cache Entry
RemoteLogSegmentMetadata rlsMetadata = new RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L));
TimeIndex timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME)));
TransactionIndex txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME)));
OffsetIndex offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, new File(tpDir, DIR_NAME)));
RemoteIndexCache.Entry spyEntry = spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, txIndex));
RemoteIndexCache.Entry spyEntry = generateSpyCacheEntry();
cache.internalCache().put(rlsMetadata.remoteLogSegmentId().id(), spyEntry);
assertCacheSize(1);
CountDownLatch latchForCacheRead = new CountDownLatch(1);
CountDownLatch latchForCacheRemove = new CountDownLatch(1);
CountDownLatch latchForTestWait = new CountDownLatch(1);
AtomicInteger markForCleanupCallCount = new AtomicInteger(0);
AtomicInteger cleanupCallCount = new AtomicInteger(0);
doAnswer(invocation -> {
markForCleanupCallCount.incrementAndGet();
cleanupCallCount.incrementAndGet();
if (markForCleanupCallCount.get() == 1) {
if (cleanupCallCount.get() == 1) {
// Signal the CacheRead to unblock itself
latchForCacheRead.countDown();
// Wait for signal to start renaming the files
// Wait for signal to start deleting the renamed files
latchForCacheRemove.await();
// Calling the markForCleanup() actual method to start renaming the files
// Calling the cleanup() actual method to remove the renamed files
invocation.callRealMethod();
// Signal TestWait to unblock itself so that test can be completed
latchForTestWait.countDown();
}
return null;
}).when(spyEntry).markForCleanup();
}).when(spyEntry).cleanup();
Runnable removeCache = () -> cache.remove(rlsMetadata.remoteLogSegmentId().id());
@ -804,10 +794,58 @@ public class RemoteIndexCacheTest {
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
cache.getIndexEntry(rlsMetadata);
// Signal the CacheRemove to start renaming the files
latchForCacheRemove.countDown();
};
executeConcurrentRemoveRead(removeCache, readCache, latchForTestWait);
}
@Test
public void testConcurrentRemoveReadForCache2() throws IOException, InterruptedException, ExecutionException {
RemoteIndexCache.Entry spyEntry = generateSpyCacheEntry();
cache.internalCache().put(rlsMetadata.remoteLogSegmentId().id(), spyEntry);
assertCacheSize(1);
CountDownLatch latchForCacheRead = new CountDownLatch(1);
CountDownLatch latchForCacheRemove = new CountDownLatch(1);
CountDownLatch latchForTestWait = new CountDownLatch(1);
AtomicInteger cleanupCallCount = new AtomicInteger(0);
doAnswer((InvocationOnMock invocation) -> {
cleanupCallCount.incrementAndGet();
if (cleanupCallCount.get() == 1) {
// Wait for signal to start renaming the files
latchForCacheRemove.await();
// Calling the cleanup() actual method to remove the renamed files
invocation.callRealMethod();
// Signal the CacheRead to unblock itself
latchForCacheRead.countDown();
}
return null;
}).when(spyEntry).cleanup();
Runnable removeCache = () -> cache.remove(rlsMetadata.remoteLogSegmentId().id());
Runnable readCache = () -> {
// Wait for signal to start CacheRead
latchForCacheRemove.countDown();
try {
latchForCacheRead.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
cache.getIndexEntry(rlsMetadata);
// Signal TestWait to unblock itself so that test can be completed
latchForTestWait.countDown();
};
executeConcurrentRemoveRead(removeCache, readCache, latchForTestWait);
}
private void executeConcurrentRemoveRead(Runnable removeCache,
Runnable readCache,
CountDownLatch latchForTestWait) throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(2);
try {
Future<?> removeCacheFuture = executor.submit(removeCache);
@ -820,16 +858,13 @@ public class RemoteIndexCacheTest {
// Wait for signal to complete the test
latchForTestWait.await();
// We can't determine read thread or remove thread will go first so if,
// 1. Read thread go first, cache file should not exist and cache size should be zero.
// 2. Remove thread go first, cache file should present and cache size should be one.
// so basically here we are making sure that if cache existed, the cache file should exist,
// and if cache is non-existed, the cache file should not exist.
if (getIndexFileFromRemoteCacheDir(cache, LogFileUtils.INDEX_FILE_SUFFIX).isPresent()) {
assertCacheSize(1);
} else {
assertCacheSize(0);
}
// Read or cleaner thread whichever goes first, the cache size should remain one:
// 1. If reader thread runs first, then it will fetch the entry from remote since the previous entry in
// local disk was renamed with ".deleted" as suffix. The previous and current entry objects are different.
// And, the cleaner thread should only remove the files with suffix as ".deleted".
// 2. If removal thread runs first, then it will remove the files with ".deleted" suffix. And, the reader
// thread will fetch the entry again from remote storage.
assertCacheSize(1);
} finally {
executor.shutdownNow();
}
@ -896,9 +931,9 @@ public class RemoteIndexCacheTest {
RemoteIndexCache.Entry entry = cache.getIndexEntry(rlsMetadata);
verifyFetchIndexInvocation(1);
// copy files with temporary name
Files.copy(entry.offsetIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.offsetIndex().file().getPath(), "", tempSuffix)));
Files.copy(entry.txnIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath(), "", tempSuffix)));
Files.copy(entry.timeIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath(), "", tempSuffix)));
Path tmpOffsetIdxPath = Files.copy(entry.offsetIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.offsetIndex().file().getPath(), "", tempSuffix)));
Path tmpTxnIdxPath = Files.copy(entry.txnIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.txnIndex().file().getPath(), "", tempSuffix)));
Path tmpTimeIdxPath = Files.copy(entry.timeIndex().file().toPath(), Paths.get(Utils.replaceSuffix(entry.timeIndex().file().getPath(), "", tempSuffix)));
cache.remove(rlsMetadata.remoteLogSegmentId().id());
@ -909,7 +944,7 @@ public class RemoteIndexCacheTest {
"Failed to cleanup cache entry after invalidation");
// restore index files
renameRemoteCacheIndexFileFromDisk(tempSuffix, remoteIndexCacheDir, tempSuffix);
renameRemoteCacheIndexFileFromDisk(tmpOffsetIdxPath, tmpTxnIdxPath, tmpTimeIdxPath, tempSuffix);
// validate cache entry for the above key should be null
assertNull(cache.internalCache().getIfPresent(rlsMetadata.remoteLogSegmentId().id()));
cache.getIndexEntry(rlsMetadata);
@ -921,12 +956,13 @@ public class RemoteIndexCacheTest {
assertTrue(getIndexFileFromRemoteCacheDir(cache, LogFileUtils.TIME_INDEX_FILE_SUFFIX).isPresent(), "Time index file should be present on disk at " + remoteIndexCacheDir.toPath());
}
private void renameRemoteCacheIndexFileFromDisk(String suffix, File remoteIndexCacheDir, String tempSuffix) throws IOException {
List<Path> paths = Files.walk(remoteIndexCacheDir.toPath())
.filter(Files::isRegularFile)
.filter(path -> path.getFileName().toString().endsWith(suffix)).toList();
for (Path path : paths) {
Utils.atomicMoveWithFallback(path, path.resolveSibling(path.getFileName().toString().replace(tempSuffix, "")));
private void renameRemoteCacheIndexFileFromDisk(Path tmpOffsetIdxFile,
Path tmpTxnIdxFile,
Path tmpTimeIdxFile,
String tempSuffix) throws IOException {
for (Path path : new Path[]{tmpOffsetIdxFile, tmpTxnIdxFile, tmpTimeIdxFile}) {
Utils.atomicMoveWithFallback(path,
path.resolveSibling(path.getFileName().toString().replace(tempSuffix, "")));
}
}
@ -995,15 +1031,104 @@ public class RemoteIndexCacheTest {
"Index file marked for deletion should not be present on disk at " + remoteIndexCacheDir.toPath());
}
@Test
public void testDeleteInvalidIndexFilesOnInit() throws IOException {
File cacheDir = cache.cacheDir();
long baseOffset = 100L;
UUID uuid = UUID.randomUUID();
String invalidOffsetIdxFilename = String.format("%s_%s%s%s", baseOffset, uuid, LogFileUtils.INDEX_FILE_SUFFIX, LogFileUtils.DELETED_FILE_SUFFIX);
File invalidOffsetIdxFile = new File(cacheDir, invalidOffsetIdxFilename);
invalidOffsetIdxFile.createNewFile();
String invalidTimeIdxFilename = String.format("%s_%s%s%s", baseOffset, uuid, LogFileUtils.TIME_INDEX_FILE_SUFFIX, ".tmp");
File invalidTimeIndexFile = new File(cacheDir, invalidTimeIdxFilename);
invalidTimeIndexFile.createNewFile();
RemoteLogSegmentMetadata rlsMetadata = new RemoteLogSegmentMetadata(RemoteLogSegmentId.generateNew(idPartition), baseOffset + 100,
lastOffset, time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L));
OffsetIndex validOffsetIdx = createOffsetIndexForSegmentMetadata(rlsMetadata, logDir);
TransactionIndex validTimeIdx = createTxIndexForSegmentMetadata(rlsMetadata, logDir);
new RemoteIndexCache(defaultRemoteIndexCacheSizeBytes, rsm, logDir.toString());
assertFalse(invalidOffsetIdxFile.exists());
assertFalse(invalidTimeIndexFile.exists());
assertTrue(validOffsetIdx.file().exists());
assertTrue(validTimeIdx.file().exists());
}
@Test
public void testFetchIndexAccessibleWhenMarkedForCleanup() throws IOException, RemoteStorageException {
// setting the delayMs to a large value to disable file deletion by scheduler thread to have deterministic test
cache.setFileDeleteDelayMs(300_000);
File cacheDir = cache.cacheDir();
Uuid segmentUuid = rlsMetadata.remoteLogSegmentId().id();
RemoteIndexCache.Entry indexEntry = cache.getIndexEntry(rlsMetadata);
cache.remove(segmentUuid);
// Once marked for cleanup, the 3 index files should be renamed with ".deleted" suffix
assertEquals(3, countFiles(cacheDir, name -> true));
assertEquals(3, countFiles(cacheDir,
name -> name.contains(segmentUuid.toString()) && name.endsWith(LogFileUtils.DELETED_FILE_SUFFIX)));
// Ensure that the `indexEntry` object still able to access the renamed index files after being marked for deletion
OffsetPosition offsetPosition = indexEntry.offsetIndex().entry(2);
assertEquals(offsetPosition.position, indexEntry.lookupOffset(offsetPosition.offset).position);
assertNull(cache.internalCache().asMap().get(segmentUuid));
verifyFetchIndexInvocation(1);
// Once the entry gets removed from cache, the subsequent call to the cache should re-fetch the entry from remote.
assertEquals(offsetPosition.position, cache.lookupOffset(rlsMetadata, offsetPosition.offset));
verifyFetchIndexInvocation(2);
RemoteIndexCache.Entry indexEntry2 = cache.getIndexEntry(rlsMetadata);
assertNotNull(indexEntry2);
verifyFetchIndexInvocation(2);
// There will be 6 files in the remote-log-index-cache dir: 3 original index files and 3 files with ".deleted" suffix
assertEquals(6, countFiles(cacheDir, name -> true));
assertEquals(3, countFiles(cacheDir,
name -> name.contains(segmentUuid.toString()) && !name.endsWith(LogFileUtils.DELETED_FILE_SUFFIX)));
assertEquals(3, countFiles(cacheDir,
name -> name.contains(segmentUuid.toString()) && name.endsWith(LogFileUtils.DELETED_FILE_SUFFIX)));
// Once the indexEntry2 is marked for cleanup, the 3 index files should be renamed with ".deleted" suffix.
// Both indexEntry and indexEntry2 should be able to access the renamed index files.
cache.remove(segmentUuid);
assertEquals(3, countFiles(cacheDir, name -> true));
assertEquals(3, countFiles(cacheDir,
name -> name.contains(segmentUuid.toString()) && name.endsWith(LogFileUtils.DELETED_FILE_SUFFIX)));
assertEquals(offsetPosition.position, indexEntry.lookupOffset(offsetPosition.offset).position);
assertEquals(offsetPosition.position, indexEntry2.lookupOffset(offsetPosition.offset).position);
indexEntry.cleanup();
assertEquals(0, countFiles(cacheDir, name -> true));
assertThrows(IllegalStateException.class, () -> indexEntry.lookupOffset(offsetPosition.offset));
assertEquals(offsetPosition.position, indexEntry2.lookupOffset(offsetPosition.offset).position);
indexEntry2.cleanup();
assertEquals(0, countFiles(cacheDir, name -> true));
assertThrows(IllegalStateException.class, () -> indexEntry.lookupOffset(offsetPosition.offset));
assertThrows(IllegalStateException.class, () -> indexEntry2.lookupOffset(offsetPosition.offset));
}
private int countFiles(File cacheDir, Predicate<String> condition) {
return Objects.requireNonNull(cacheDir.listFiles((dir, name) -> condition.test(name))).length;
}
private RemoteIndexCache.Entry generateSpyCacheEntry() throws IOException {
return generateSpyCacheEntry(RemoteLogSegmentId.generateNew(idPartition));
}
private RemoteIndexCache.Entry generateSpyCacheEntry(RemoteLogSegmentId remoteLogSegmentId) throws IOException {
RemoteLogSegmentMetadata rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset, time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L));
TimeIndex timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, tpDir));
TransactionIndex txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, tpDir));
OffsetIndex offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, tpDir));
return generateSpyCacheEntry(remoteLogSegmentId, new File(logDir, DIR_NAME));
}
private RemoteIndexCache.Entry generateSpyCacheEntry(RemoteLogSegmentId remoteLogSegmentId,
File dir) throws IOException {
RemoteLogSegmentMetadata rlsMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset,
lastOffset, time.milliseconds(), brokerId, time.milliseconds(),
segmentSize, Collections.singletonMap(0, 0L));
TimeIndex timeIndex = spy(createTimeIndexForSegmentMetadata(rlsMetadata, dir));
TransactionIndex txIndex = spy(createTxIndexForSegmentMetadata(rlsMetadata, dir));
OffsetIndex offsetIndex = spy(createOffsetIndexForSegmentMetadata(rlsMetadata, dir));
return spy(new RemoteIndexCache.Entry(offsetIndex, timeIndex, txIndex));
}
@ -1097,14 +1222,14 @@ public class RemoteIndexCacheTest {
private long estimateOneEntryBytesSize() throws IOException, RemoteStorageException {
TopicPartition tp = new TopicPartition("estimate-entry-bytes-size", 0);
TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), tp);
File tpDir = new File(logDir, tpId.toString());
Files.createDirectory(tpDir.toPath());
RemoteStorageManager rsm = mock(RemoteStorageManager.class);
mockRsmFetchIndex(rsm);
RemoteIndexCache cache = new RemoteIndexCache(2L, rsm, tpDir.toString());
RemoteIndexCache cache = new RemoteIndexCache(2L, rsm, logDir.toString());
List<RemoteLogSegmentMetadata> metadataList = generateRemoteLogSegmentMetadata(1, tpId);
RemoteIndexCache.Entry entry = cache.getIndexEntry(metadataList.get(0));
long entrySizeInBytes = entry.entrySizeBytes();
entry.markForCleanup();
entry.cleanup();
Utils.closeQuietly(cache, "RemoteIndexCache created for estimating entry size");
return entrySizeInBytes;
}