diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala index 4b92da4007e..007829c529b 100644 --- a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala +++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala @@ -32,7 +32,7 @@ import org.mockito.ArgumentMatchers.any import org.mockito.Mockito._ import org.slf4j.{Logger, LoggerFactory} -import java.io.{File, FileInputStream} +import java.io.{File, FileInputStream, IOException} import java.nio.file.Files import java.util import java.util.Collections @@ -91,7 +91,11 @@ class RemoteIndexCacheTest { Utils.closeQuietly(cache, "RemoteIndexCache created for unit test") // best effort to delete the per-test resource. Even if we don't delete, it is ok because the parent directory // will be deleted at the end of test. - Utils.delete(logDir) + try { + Utils.delete(logDir) + } catch { + case _: IOException => // ignore + } // Verify no lingering threads. It is important to have this as the very last statement in the @aftereach // because this may throw an exception and prevent cleanup after it TestUtils.assertNoNonDaemonThreads(REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java index 30c4da4ce44..215beabe9f6 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java @@ -75,7 +75,6 @@ public class RemoteIndexCache implements Closeable { private static final Logger log = LoggerFactory.getLogger(RemoteIndexCache.class); private static final String TMP_FILE_SUFFIX = ".tmp"; - public static final String REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD = "remote-log-index-cleaner"; public static final String DIR_NAME = "remote-log-index-cache"; @@ -167,20 +166,20 @@ public class RemoteIndexCache implements Closeable { } public void remove(Uuid key) { - lock.writeLock().lock(); + lock.readLock().lock(); try { internalCache.invalidate(key); } finally { - lock.writeLock().unlock(); + lock.readLock().unlock(); } } public void removeAll(Collection keys) { - lock.writeLock().lock(); + lock.readLock().lock(); try { internalCache.invalidateAll(keys); } finally { - lock.writeLock().unlock(); + lock.readLock().unlock(); } } @@ -190,7 +189,7 @@ public class RemoteIndexCache implements Closeable { } private ShutdownableThread createCleanerThread() { - ShutdownableThread thread = new ShutdownableThread("remote-log-index-cleaner") { + ShutdownableThread thread = new ShutdownableThread(REMOTE_LOG_INDEX_CACHE_CLEANER_THREAD) { public void doWork() { try { Entry entry = expiredIndexes.take(); @@ -210,10 +209,8 @@ public class RemoteIndexCache implements Closeable { log.error("Error occurred while cleaning up expired entry", ex); } } - }; thread.setDaemon(true); - return thread; } @@ -315,36 +312,32 @@ public class RemoteIndexCache implements Closeable { log.info("Error occurred while loading the stored index file {}", indexFile.getPath(), ex); } } - if (index == null) { File tmpIndexFile = new File(indexFile.getParentFile(), indexFile.getName() + RemoteIndexCache.TMP_FILE_SUFFIX); - - try (InputStream inputStream = fetchRemoteIndex.apply(remoteLogSegmentMetadata);) { + try (InputStream inputStream = fetchRemoteIndex.apply(remoteLogSegmentMetadata)) { Files.copy(inputStream, tmpIndexFile.toPath(), StandardCopyOption.REPLACE_EXISTING); } - Utils.atomicMoveWithFallback(tmpIndexFile.toPath(), indexFile.toPath(), false); index = readIndex.apply(indexFile); } - return index; } public Entry getIndexEntry(RemoteLogSegmentMetadata remoteLogSegmentMetadata) { - if (isRemoteIndexCacheClosed.get()) throw new IllegalStateException("Unable to fetch index for " + - "segment id=" + remoteLogSegmentMetadata.remoteLogSegmentId().id() + ". Instance is already closed."); - + if (isRemoteIndexCacheClosed.get()) { + throw new IllegalStateException("Unable to fetch index for " + + "segment id=" + remoteLogSegmentMetadata.remoteLogSegmentId().id() + ". Instance is already closed."); + } lock.readLock().lock(); try { // while this thread was waiting for lock, another thread may have changed the value of isRemoteIndexCacheClosed. // check for index close again if (isRemoteIndexCacheClosed.get()) { - throw new IllegalStateException("Unable to fetch index for segment id=" + throw new IllegalStateException("Unable to fetch index for segment-id = " + remoteLogSegmentMetadata.remoteLogSegmentId().id() + ". Index instance is already closed."); } - return internalCache.get(remoteLogSegmentMetadata.remoteLogSegmentId().id(), - (Uuid uuid) -> createCacheEntry(remoteLogSegmentMetadata)); + uuid -> createCacheEntry(remoteLogSegmentMetadata)); } finally { lock.readLock().unlock(); } @@ -352,7 +345,6 @@ public class RemoteIndexCache implements Closeable { private RemoteIndexCache.Entry createCacheEntry(RemoteLogSegmentMetadata remoteLogSegmentMetadata) { long startOffset = remoteLogSegmentMetadata.startOffset(); - try { File offsetIndexFile = remoteOffsetIndexFile(cacheDir, remoteLogSegmentMetadata); OffsetIndex offsetIndex = loadIndexFile(offsetIndexFile, remoteLogSegmentMetadata, rlsMetadata -> { @@ -597,40 +589,31 @@ public class RemoteIndexCache implements Closeable { * @throws KafkaException Any other non IOExceptions are wrapped and thrown as KafkaException */ private static void tryAll(List> actions) throws IOException { - IOException ioException = null; - List exceptions = Collections.emptyList(); + IOException firstIOException = null; + List exceptions = new ArrayList<>(); for (StorageAction action : actions) { try { action.execute(); } catch (IOException e) { - if (ioException == null) { - ioException = e; + if (firstIOException == null) { + firstIOException = e; } else { - if (exceptions.isEmpty()) { - exceptions = new ArrayList<>(); - } exceptions.add(e); } } catch (Exception e) { - if (exceptions.isEmpty()) { - exceptions = new ArrayList<>(); - } exceptions.add(e); } } - if (ioException != null) { - for (Exception exception : exceptions) { - ioException.addSuppressed(exception); - } - throw ioException; + if (firstIOException != null) { + exceptions.forEach(firstIOException::addSuppressed); + throw firstIOException; } else if (!exceptions.isEmpty()) { Iterator iterator = exceptions.iterator(); KafkaException kafkaException = new KafkaException(iterator.next()); while (iterator.hasNext()) { kafkaException.addSuppressed(iterator.next()); } - throw kafkaException; } }