diff --git a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala index 45bfe69844a..e6bdce63e68 100644 --- a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala @@ -87,8 +87,9 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void], } override def onExpiration(): Unit = { - // cancel the remote storage read task, if it has not been executed yet - val cancelled = remoteFetchTask.cancel(true) + // cancel the remote storage read task, if it has not been executed yet and + // avoid interrupting the task if it is already running as it may force closing opened/cached resources as transaction index. + val cancelled = remoteFetchTask.cancel(false) if (!cancelled) debug(s"Remote fetch task for RemoteStorageFetchInfo: $remoteFetchInfo could not be cancelled and its isDone value is ${remoteFetchTask.isDone}") DelayedRemoteFetchMetrics.expiredRequestMeter.mark() diff --git a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala index 424c8cc04ec..21c59ab89a1 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala @@ -200,7 +200,7 @@ class DelayedRemoteFetchTest { delayedRemoteFetch.run() // Check that the task was cancelled and force-completed - verify(remoteFetchTask).cancel(true) + verify(remoteFetchTask).cancel(false) assertTrue(delayedRemoteFetch.isCompleted) // Check that the ExpiresPerSec metric was incremented diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java index 13a2d47aabd..b07255fcc53 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java @@ -509,7 +509,6 @@ public class RemoteIndexCache implements Closeable { public Entry(OffsetIndex offsetIndex, TimeIndex timeIndex, TransactionIndex txnIndex) { this.offsetIndex = offsetIndex; this.timeIndex = timeIndex; - // If txn index does not exist on the source, it's an empty file on the index entry this.txnIndex = txnIndex; this.entrySizeBytes = estimatedEntrySize(); } @@ -546,7 +545,7 @@ public class RemoteIndexCache implements Closeable { private long estimatedEntrySize() { entryLock.readLock().lock(); try { - return offsetIndex.sizeInBytes() + timeIndex.sizeInBytes() + Files.size(txnIndex.path()); + return offsetIndex.sizeInBytes() + timeIndex.sizeInBytes() + Files.size(txnIndex.file().toPath()); } catch (IOException e) { log.warn("Error occurred when estimating remote index cache entry bytes size, just set 0 firstly.", e); return 0L; diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java index 0f5e55fbd14..8e089dc3cfc 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java @@ -20,21 +20,19 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.utils.PrimitiveRef; import org.apache.kafka.common.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.Closeable; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Files; -import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; import java.util.function.Supplier; /** @@ -50,8 +48,6 @@ import java.util.function.Supplier; */ public class TransactionIndex implements Closeable { - private static final Logger log = LoggerFactory.getLogger(TransactionIndex.class); - private static class AbortedTxnWithPosition { final AbortedTxn txn; final int position; @@ -63,45 +59,60 @@ public class TransactionIndex implements Closeable { private final long startOffset; - final TransactionIndexFile txnFile; + private volatile File file; - private Long lastOffset = null; + // note that the file is not created until we need it + private Optional maybeChannel = Optional.empty(); + private OptionalLong lastOffset = OptionalLong.empty(); public TransactionIndex(long startOffset, File file) throws IOException { this.startOffset = startOffset; - this.txnFile = new TransactionIndexFile(file.toPath()); - } + this.file = file; - public Path path() { - return txnFile.path(); + if (file.exists()) + openChannel(); } public File file() { - return txnFile.path().toFile(); + return file; } public void updateParentDir(File parentDir) { - txnFile.updateParentDir(parentDir.toPath()); + this.file = new File(parentDir, file.getName()); } - public void renameTo(File f) throws IOException { - txnFile.renameTo(f.toPath()); + public void append(AbortedTxn abortedTxn) throws IOException { + lastOffset.ifPresent(offset -> { + if (offset >= abortedTxn.lastOffset()) + throw new IllegalArgumentException("The last offset of appended transactions must increase sequentially, but " + + abortedTxn.lastOffset() + " is not greater than current last offset " + offset + " of index " + + file.getAbsolutePath()); + }); + lastOffset = OptionalLong.of(abortedTxn.lastOffset()); + Utils.writeFully(channel(), abortedTxn.buffer.duplicate()); } public void flush() throws IOException { - txnFile.flush(); + FileChannel channel = channelOrNull(); + if (channel != null) + channel.force(true); } /** * Remove all the entries from the index. Unlike `AbstractIndex`, this index is not resized ahead of time. */ public void reset() throws IOException { - txnFile.truncate(0); - lastOffset = null; + FileChannel channel = channelOrNull(); + if (channel != null) + channel.truncate(0); + lastOffset = OptionalLong.empty(); } public void close() throws IOException { - txnFile.closeChannel(); + FileChannel channel = channelOrNull(); + if (channel != null) + channel.close(); + maybeChannel = Optional.empty(); } /** @@ -112,32 +123,31 @@ public class TransactionIndex implements Closeable { * not exist */ public boolean deleteIfExists() throws IOException { - return txnFile.deleteIfExists(); + close(); + return Files.deleteIfExists(file.toPath()); } - public void append(AbortedTxn abortedTxn) throws IOException { - if (lastOffset != null) { - if (lastOffset >= abortedTxn.lastOffset()) - throw new IllegalArgumentException("The last offset of appended transactions must increase sequentially, but " - + abortedTxn.lastOffset() + " is not greater than current last offset " + lastOffset + " of index " - + txnFile.path().toAbsolutePath()); + public void renameTo(File f) throws IOException { + try { + if (file.exists()) + Utils.atomicMoveWithFallback(file.toPath(), f.toPath(), false); + } finally { + this.file = f; } - lastOffset = abortedTxn.lastOffset(); - txnFile.write(abortedTxn.buffer.duplicate()); } public void truncateTo(long offset) throws IOException { ByteBuffer buffer = ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE); - Long newLastOffset = null; + OptionalLong newLastOffset = OptionalLong.empty(); for (AbortedTxnWithPosition txnWithPosition : iterable(() -> buffer)) { AbortedTxn abortedTxn = txnWithPosition.txn; long position = txnWithPosition.position; if (abortedTxn.lastOffset() >= offset) { - txnFile.truncate(position); + channel().truncate(position); lastOffset = newLastOffset; return; } - newLastOffset = abortedTxn.lastOffset(); + newLastOffset = OptionalLong.of(abortedTxn.lastOffset()); } } @@ -180,7 +190,7 @@ public class TransactionIndex implements Closeable { AbortedTxn abortedTxn = txnWithPosition.txn; if (abortedTxn.lastOffset() < startOffset) throw new CorruptIndexException("Last offset of aborted transaction " + abortedTxn + " in index " - + txnFile.path().toAbsolutePath() + " is less than start offset " + startOffset); + + file.getAbsolutePath() + " is less than start offset " + startOffset); } } @@ -192,12 +202,33 @@ public class TransactionIndex implements Closeable { return !iterable().iterator().hasNext(); } + private FileChannel openChannel() throws IOException { + FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, + StandardOpenOption.READ, StandardOpenOption.WRITE); + maybeChannel = Optional.of(channel); + channel.position(channel.size()); + return channel; + } + + private FileChannel channel() throws IOException { + FileChannel channel = channelOrNull(); + if (channel == null) + return openChannel(); + else + return channel; + } + + private FileChannel channelOrNull() { + return maybeChannel.orElse(null); + } + private Iterable iterable() { return iterable(() -> ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE)); } private Iterable iterable(Supplier allocate) { - if (!txnFile.exists()) + FileChannel channel = channelOrNull(); + if (channel == null) return Collections.emptyList(); PrimitiveRef.IntRef position = PrimitiveRef.ofInt(0); @@ -207,9 +238,9 @@ public class TransactionIndex implements Closeable { @Override public boolean hasNext() { try { - return txnFile.currentPosition() - position.value >= AbortedTxn.TOTAL_SIZE; + return channel.position() - position.value >= AbortedTxn.TOTAL_SIZE; } catch (IOException e) { - throw new KafkaException("Failed read position from the transaction index " + txnFile.path().toAbsolutePath(), e); + throw new KafkaException("Failed read position from the transaction index " + file.getAbsolutePath(), e); } } @@ -217,13 +248,13 @@ public class TransactionIndex implements Closeable { public AbortedTxnWithPosition next() { try { ByteBuffer buffer = allocate.get(); - txnFile.readFully(buffer, position.value); + Utils.readFully(channel, buffer, position.value); buffer.flip(); AbortedTxn abortedTxn = new AbortedTxn(buffer); if (abortedTxn.version() > AbortedTxn.CURRENT_VERSION) throw new KafkaException("Unexpected aborted transaction version " + abortedTxn.version() - + " in transaction index " + txnFile.path().toAbsolutePath() + ", current version is " + + " in transaction index " + file.getAbsolutePath() + ", current version is " + AbortedTxn.CURRENT_VERSION); AbortedTxnWithPosition nextEntry = new AbortedTxnWithPosition(abortedTxn, position.value); position.value += AbortedTxn.TOTAL_SIZE; @@ -231,113 +262,11 @@ public class TransactionIndex implements Closeable { } catch (IOException e) { // We received an unexpected error reading from the index file. We propagate this as an // UNKNOWN error to the consumer, which will cause it to retry the fetch. - throw new KafkaException("Failed to read from the transaction index " + txnFile.path().toAbsolutePath(), e); + throw new KafkaException("Failed to read from the transaction index " + file.getAbsolutePath(), e); } } }; } - // Visible for testing - static class TransactionIndexFile { - // note that the file is not created until we need it - private volatile Path path; - // channel is reopened as long as there are reads and writes - private FileChannel channel; - - TransactionIndexFile(Path path) throws IOException { - this.path = path; - - if (Files.exists(path)) - openChannel(); - } - - private void openChannel() throws IOException { - channel = FileChannel.open( - path, - StandardOpenOption.CREATE, - StandardOpenOption.READ, - StandardOpenOption.WRITE - ); - channel.position(channel.size()); - } - - synchronized void updateParentDir(Path parentDir) { - this.path = parentDir.resolve(path.getFileName()); - } - - synchronized void renameTo(Path other) throws IOException { - try { - if (Files.exists(path)) - Utils.atomicMoveWithFallback(path, other, false); - } finally { - this.path = other; - } - } - - synchronized void flush() throws IOException { - if (channel != null) - channel.force(true); - } - - synchronized void closeChannel() throws IOException { - if (channel != null) - channel.close(); - } - - synchronized boolean isChannelOpen() { - return channel != null && channel.isOpen(); - } - - Path path() { - return path; - } - - synchronized void truncate(long position) throws IOException { - if (channel != null) - channel.truncate(position); - } - - boolean exists() { - return Files.exists(path); - } - - boolean deleteIfExists() throws IOException { - closeChannel(); - return Files.deleteIfExists(path()); - } - - void write(ByteBuffer buffer) throws IOException { - Utils.writeFully(channel(), buffer); - } - - void readFully(ByteBuffer buffer, int position) throws IOException { - Utils.readFully(channel(), buffer, position); - } - - long currentPosition() throws IOException { - return channel().position(); - } - - /** - * Use to read or write values to the index. - * The file is the source of truth and if available values should be read from or written to. - * - * @return an open file channel with the position at the end of the file - * @throws IOException if any I/O error happens, but not if existing channel is closed. - * In that case, it is reopened. - */ - private synchronized FileChannel channel() throws IOException { - if (channel == null) { - openChannel(); - } else { - // as channel is exposed, it could be closed without setting it to null - if (!isChannelOpen()) { - log.debug("Transaction index channel was closed directly and is going to be reopened"); - openChannel(); - } - } - return channel; - } - } } diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java index a98254c9a00..1d19d433e05 100644 --- a/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java +++ b/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java @@ -23,20 +23,15 @@ import org.junit.jupiter.api.Test; import java.io.File; import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ClosedByInterruptException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -227,99 +222,4 @@ public class TransactionIndexTest { index.append(new AbortedTxn(0L, 0, 10, 2)); assertFalse(index.isEmpty()); } - - @Test - void testDoNotCreateFileUntilNeeded() throws IOException { - // Given that index file does not exist yet - file.delete(); - // When index is created, reset, or flushed - // Then it is not created - final TransactionIndex index = assertDoesNotThrow(() -> new TransactionIndex(0, file)); - assertFalse(file.exists()); - index.reset(); - assertFalse(file.exists()); - index.flush(); - assertFalse(file.exists()); - // only when modifying it, it gets created - index.append(new AbortedTxn(0L, 0, 10, 2)); - assertTrue(file.exists()); - } - - @Test - void testAppendAndCollectAfterClose() throws IOException { - // Given the index - // When closed - index.close(); - // Then it should still append data - index.append(new AbortedTxn(0L, 0, 10, 2)); - // When channel is closed - index.txnFile.closeChannel(); - // Then it should still append data - assertDoesNotThrow(() -> index.append(new AbortedTxn(1L, 5, 15, 16))); - // When closed - index.close(); - // Then it should still read data - List abortedTxns = assertDoesNotThrow(() -> - index.collectAbortedTxns(0L, 100L).abortedTransactions); - assertEquals(2, abortedTxns.size()); - assertEquals(0, abortedTxns.get(0).firstOffset()); - assertEquals(5, abortedTxns.get(1).firstOffset()); - // When channel is closed - index.txnFile.closeChannel(); - // Then it should still read data - abortedTxns = assertDoesNotThrow(() -> - index.collectAbortedTxns(0L, 100L).abortedTransactions); - assertEquals(2, abortedTxns.size()); - } - - @Test - void testAppendAndCollectAfterInterrupted() throws Exception { - // Given the index - // When closed - index.close(); - // Then it should still append data - index.append(new AbortedTxn(0L, 0, 10, 2)); - - // Given a thread reading from the channel - final CountDownLatch ready = new CountDownLatch(1); - final Exception[] exceptionHolder = new Exception[1]; - - Thread t = new Thread(() -> { - try { - ByteBuffer buffer = ByteBuffer.allocate(100); - - while (index.txnFile.isChannelOpen()) { - index.txnFile.readFully(buffer, 0); - buffer.clear(); - // wait until first reading happens to mark it as ready - if (ready.getCount() > 0) ready.countDown(); - } - } catch (ClosedByInterruptException e) { - // Expected exception - exceptionHolder[0] = e; - } catch (Exception e) { - e.printStackTrace(); - } - }); - t.start(); - - assertTrue(ready.await(5, TimeUnit.SECONDS), "Timeout waiting for thread to finish"); - - // When thread is interrupted - t.interrupt(); - - t.join(); - - // Check if ClosedByInterruptException was thrown - assertNotNull(exceptionHolder[0], "An exception should have been thrown"); - assertTrue(exceptionHolder[0] instanceof ClosedByInterruptException, - "Expected ClosedByInterruptException, but got: " + exceptionHolder[0].getClass().getName()); - - assertFalse(index.txnFile.isChannelOpen()); - - // Then it should still read data - List abortedTxns = assertDoesNotThrow(() -> - index.collectAbortedTxns(0L, 100L).abortedTransactions); - assertEquals(1, abortedTxns.size()); - } }