KAFKA-15400: Use readLock when removing an item from the RemoteIndexCache (#14283)

- Caffeine cache is thread safe, we want to hold the writeLock only during the close.
- Fix the flaky tests

Reviewers: Divij Vaidya <diviv@amazon.com>
This commit is contained in:
Kamal Chandraprakash 2023-08-24 17:12:13 +05:30 committed by GitHub
parent 25b128de81
commit 88d2c4460a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 25 additions and 38 deletions

View File

@ -32,7 +32,7 @@ import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.slf4j.{Logger, LoggerFactory}
import java.io.{File, FileInputStream}
import java.io.{File, FileInputStream, IOException}
import java.nio.file.Files
import java.util
import java.util.Collections
@ -91,7 +91,11 @@ class RemoteIndexCacheTest {
Utils.closeQuietly(cache, "RemoteIndexCache created for unit test")
// best effort to delete the per-test resource. Even if we don't delete, it is ok because the parent directory
// will be deleted at the end of test.
try {
Utils.delete(logDir)
} catch {
case _: IOException => // ignore
}
// Verify no lingering threads. It is important to have this as the very last statement in the @aftereach
// because this may throw an exception and prevent cleanup after it
TestUtils.assertNoNonDaemonThreads(REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD)

View File

@ -75,7 +75,6 @@ public class RemoteIndexCache implements Closeable {
private static final Logger log = LoggerFactory.getLogger(RemoteIndexCache.class);
private static final String TMP_FILE_SUFFIX = ".tmp";
public static final String REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD = "remote-log-index-cleaner";
public static final String DIR_NAME = "remote-log-index-cache";
@ -167,20 +166,20 @@ public class RemoteIndexCache implements Closeable {
}
public void remove(Uuid key) {
lock.writeLock().lock();
lock.readLock().lock();
try {
internalCache.invalidate(key);
} finally {
lock.writeLock().unlock();
lock.readLock().unlock();
}
}
public void removeAll(Collection<Uuid> keys) {
lock.writeLock().lock();
lock.readLock().lock();
try {
internalCache.invalidateAll(keys);
} finally {
lock.writeLock().unlock();
lock.readLock().unlock();
}
}
@ -190,7 +189,7 @@ public class RemoteIndexCache implements Closeable {
}
private ShutdownableThread createCleanerThread() {
ShutdownableThread thread = new ShutdownableThread("remote-log-index-cleaner") {
ShutdownableThread thread = new ShutdownableThread(REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD) {
public void doWork() {
try {
Entry entry = expiredIndexes.take();
@ -210,10 +209,8 @@ public class RemoteIndexCache implements Closeable {
log.error("Error occurred while cleaning up expired entry", ex);
}
}
};
thread.setDaemon(true);
return thread;
}
@ -315,36 +312,32 @@ public class RemoteIndexCache implements Closeable {
log.info("Error occurred while loading the stored index file {}", indexFile.getPath(), ex);
}
}
if (index == null) {
File tmpIndexFile = new File(indexFile.getParentFile(), indexFile.getName() + RemoteIndexCache.TMP_FILE_SUFFIX);
try (InputStream inputStream = fetchRemoteIndex.apply(remoteLogSegmentMetadata);) {
try (InputStream inputStream = fetchRemoteIndex.apply(remoteLogSegmentMetadata)) {
Files.copy(inputStream, tmpIndexFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
}
Utils.atomicMoveWithFallback(tmpIndexFile.toPath(), indexFile.toPath(), false);
index = readIndex.apply(indexFile);
}
return index;
}
public Entry getIndexEntry(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
if (isRemoteIndexCacheClosed.get()) throw new IllegalStateException("Unable to fetch index for " +
if (isRemoteIndexCacheClosed.get()) {
throw new IllegalStateException("Unable to fetch index for " +
"segment id=" + remoteLogSegmentMetadata.remoteLogSegmentId().id() + ". Instance is already closed.");
}
lock.readLock().lock();
try {
// while this thread was waiting for lock, another thread may have changed the value of isRemoteIndexCacheClosed.
// check for index close again
if (isRemoteIndexCacheClosed.get()) {
throw new IllegalStateException("Unable to fetch index for segment id="
throw new IllegalStateException("Unable to fetch index for segment-id = "
+ remoteLogSegmentMetadata.remoteLogSegmentId().id() + ". Index instance is already closed.");
}
return internalCache.get(remoteLogSegmentMetadata.remoteLogSegmentId().id(),
(Uuid uuid) -> createCacheEntry(remoteLogSegmentMetadata));
uuid -> createCacheEntry(remoteLogSegmentMetadata));
} finally {
lock.readLock().unlock();
}
@ -352,7 +345,6 @@ public class RemoteIndexCache implements Closeable {
private RemoteIndexCache.Entry createCacheEntry(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
long startOffset = remoteLogSegmentMetadata.startOffset();
try {
File offsetIndexFile = remoteOffsetIndexFile(cacheDir, remoteLogSegmentMetadata);
OffsetIndex offsetIndex = loadIndexFile(offsetIndexFile, remoteLogSegmentMetadata, rlsMetadata -> {
@ -597,40 +589,31 @@ public class RemoteIndexCache implements Closeable {
* @throws KafkaException Any other non IOExceptions are wrapped and thrown as KafkaException
*/
private static void tryAll(List<StorageAction<Void, Exception>> actions) throws IOException {
IOException ioException = null;
List<Exception> exceptions = Collections.emptyList();
IOException firstIOException = null;
List<Exception> exceptions = new ArrayList<>();
for (StorageAction<Void, Exception> action : actions) {
try {
action.execute();
} catch (IOException e) {
if (ioException == null) {
ioException = e;
if (firstIOException == null) {
firstIOException = e;
} else {
if (exceptions.isEmpty()) {
exceptions = new ArrayList<>();
}
exceptions.add(e);
}
} catch (Exception e) {
if (exceptions.isEmpty()) {
exceptions = new ArrayList<>();
}
exceptions.add(e);
}
}
if (ioException != null) {
for (Exception exception : exceptions) {
ioException.addSuppressed(exception);
}
throw ioException;
if (firstIOException != null) {
exceptions.forEach(firstIOException::addSuppressed);
throw firstIOException;
} else if (!exceptions.isEmpty()) {
Iterator<Exception> iterator = exceptions.iterator();
KafkaException kafkaException = new KafkaException(iterator.next());
while (iterator.hasNext()) {
kafkaException.addSuppressed(iterator.next());
}
throw kafkaException;
}
}