KAFKA-15931: Cancel RemoteLogReader gracefully (#19197)

Reverts commit
2723dbf3a0
and
269e8892ad.

Instead of reopening the transaction index, it cancels the
RemoteFetchTask without interrupting it--avoiding to close the
TransactionIndex channel.

This will lead to complete the execution of the remote fetch but ignoring
the results. Given that this is considered a rare case, we could live
with this. If it becomes a performance issue, it could be optimized.

Reviewers: Jun Rao <junrao@gmail.com>
This commit is contained in:
Jorge Esteban Quilcate Otoya 2025-03-20 18:20:44 +01:00 committed by GitHub
parent 804bf596a0
commit f24945b519
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 75 additions and 246 deletions

View File

@ -87,8 +87,9 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
}
override def onExpiration(): Unit = {
// cancel the remote storage read task, if it has not been executed yet
val cancelled = remoteFetchTask.cancel(true)
// cancel the remote storage read task, if it has not been executed yet and
// avoid interrupting the task if it is already running as it may force closing opened/cached resources as transaction index.
val cancelled = remoteFetchTask.cancel(false)
if (!cancelled) debug(s"Remote fetch task for RemoteStorageFetchInfo: $remoteFetchInfo could not be cancelled and its isDone value is ${remoteFetchTask.isDone}")
DelayedRemoteFetchMetrics.expiredRequestMeter.mark()

View File

@ -200,7 +200,7 @@ class DelayedRemoteFetchTest {
delayedRemoteFetch.run()
// Check that the task was cancelled and force-completed
verify(remoteFetchTask).cancel(true)
verify(remoteFetchTask).cancel(false)
assertTrue(delayedRemoteFetch.isCompleted)
// Check that the ExpiresPerSec metric was incremented

View File

@ -509,7 +509,6 @@ public class RemoteIndexCache implements Closeable {
public Entry(OffsetIndex offsetIndex, TimeIndex timeIndex, TransactionIndex txnIndex) {
this.offsetIndex = offsetIndex;
this.timeIndex = timeIndex;
// If txn index does not exist on the source, it's an empty file on the index entry
this.txnIndex = txnIndex;
this.entrySizeBytes = estimatedEntrySize();
}
@ -546,7 +545,7 @@ public class RemoteIndexCache implements Closeable {
private long estimatedEntrySize() {
entryLock.readLock().lock();
try {
return offsetIndex.sizeInBytes() + timeIndex.sizeInBytes() + Files.size(txnIndex.path());
return offsetIndex.sizeInBytes() + timeIndex.sizeInBytes() + Files.size(txnIndex.file().toPath());
} catch (IOException e) {
log.warn("Error occurred when estimating remote index cache entry bytes size, just set 0 firstly.", e);
return 0L;

View File

@ -20,21 +20,19 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.PrimitiveRef;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.function.Supplier;
/**
@ -50,8 +48,6 @@ import java.util.function.Supplier;
*/
public class TransactionIndex implements Closeable {
private static final Logger log = LoggerFactory.getLogger(TransactionIndex.class);
private static class AbortedTxnWithPosition {
final AbortedTxn txn;
final int position;
@ -63,45 +59,60 @@ public class TransactionIndex implements Closeable {
private final long startOffset;
final TransactionIndexFile txnFile;
private volatile File file;
private Long lastOffset = null;
// note that the file is not created until we need it
private Optional<FileChannel> maybeChannel = Optional.empty();
private OptionalLong lastOffset = OptionalLong.empty();
public TransactionIndex(long startOffset, File file) throws IOException {
this.startOffset = startOffset;
this.txnFile = new TransactionIndexFile(file.toPath());
}
this.file = file;
public Path path() {
return txnFile.path();
if (file.exists())
openChannel();
}
public File file() {
return txnFile.path().toFile();
return file;
}
public void updateParentDir(File parentDir) {
txnFile.updateParentDir(parentDir.toPath());
this.file = new File(parentDir, file.getName());
}
public void renameTo(File f) throws IOException {
txnFile.renameTo(f.toPath());
public void append(AbortedTxn abortedTxn) throws IOException {
lastOffset.ifPresent(offset -> {
if (offset >= abortedTxn.lastOffset())
throw new IllegalArgumentException("The last offset of appended transactions must increase sequentially, but "
+ abortedTxn.lastOffset() + " is not greater than current last offset " + offset + " of index "
+ file.getAbsolutePath());
});
lastOffset = OptionalLong.of(abortedTxn.lastOffset());
Utils.writeFully(channel(), abortedTxn.buffer.duplicate());
}
public void flush() throws IOException {
txnFile.flush();
FileChannel channel = channelOrNull();
if (channel != null)
channel.force(true);
}
/**
* Remove all the entries from the index. Unlike `AbstractIndex`, this index is not resized ahead of time.
*/
public void reset() throws IOException {
txnFile.truncate(0);
lastOffset = null;
FileChannel channel = channelOrNull();
if (channel != null)
channel.truncate(0);
lastOffset = OptionalLong.empty();
}
public void close() throws IOException {
txnFile.closeChannel();
FileChannel channel = channelOrNull();
if (channel != null)
channel.close();
maybeChannel = Optional.empty();
}
/**
@ -112,32 +123,31 @@ public class TransactionIndex implements Closeable {
* not exist
*/
public boolean deleteIfExists() throws IOException {
return txnFile.deleteIfExists();
close();
return Files.deleteIfExists(file.toPath());
}
public void append(AbortedTxn abortedTxn) throws IOException {
if (lastOffset != null) {
if (lastOffset >= abortedTxn.lastOffset())
throw new IllegalArgumentException("The last offset of appended transactions must increase sequentially, but "
+ abortedTxn.lastOffset() + " is not greater than current last offset " + lastOffset + " of index "
+ txnFile.path().toAbsolutePath());
public void renameTo(File f) throws IOException {
try {
if (file.exists())
Utils.atomicMoveWithFallback(file.toPath(), f.toPath(), false);
} finally {
this.file = f;
}
lastOffset = abortedTxn.lastOffset();
txnFile.write(abortedTxn.buffer.duplicate());
}
public void truncateTo(long offset) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE);
Long newLastOffset = null;
OptionalLong newLastOffset = OptionalLong.empty();
for (AbortedTxnWithPosition txnWithPosition : iterable(() -> buffer)) {
AbortedTxn abortedTxn = txnWithPosition.txn;
long position = txnWithPosition.position;
if (abortedTxn.lastOffset() >= offset) {
txnFile.truncate(position);
channel().truncate(position);
lastOffset = newLastOffset;
return;
}
newLastOffset = abortedTxn.lastOffset();
newLastOffset = OptionalLong.of(abortedTxn.lastOffset());
}
}
@ -180,7 +190,7 @@ public class TransactionIndex implements Closeable {
AbortedTxn abortedTxn = txnWithPosition.txn;
if (abortedTxn.lastOffset() < startOffset)
throw new CorruptIndexException("Last offset of aborted transaction " + abortedTxn + " in index "
+ txnFile.path().toAbsolutePath() + " is less than start offset " + startOffset);
+ file.getAbsolutePath() + " is less than start offset " + startOffset);
}
}
@ -192,12 +202,33 @@ public class TransactionIndex implements Closeable {
return !iterable().iterator().hasNext();
}
private FileChannel openChannel() throws IOException {
FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE,
StandardOpenOption.READ, StandardOpenOption.WRITE);
maybeChannel = Optional.of(channel);
channel.position(channel.size());
return channel;
}
private FileChannel channel() throws IOException {
FileChannel channel = channelOrNull();
if (channel == null)
return openChannel();
else
return channel;
}
private FileChannel channelOrNull() {
return maybeChannel.orElse(null);
}
private Iterable<AbortedTxnWithPosition> iterable() {
return iterable(() -> ByteBuffer.allocate(AbortedTxn.TOTAL_SIZE));
}
private Iterable<AbortedTxnWithPosition> iterable(Supplier<ByteBuffer> allocate) {
if (!txnFile.exists())
FileChannel channel = channelOrNull();
if (channel == null)
return Collections.emptyList();
PrimitiveRef.IntRef position = PrimitiveRef.ofInt(0);
@ -207,9 +238,9 @@ public class TransactionIndex implements Closeable {
@Override
public boolean hasNext() {
try {
return txnFile.currentPosition() - position.value >= AbortedTxn.TOTAL_SIZE;
return channel.position() - position.value >= AbortedTxn.TOTAL_SIZE;
} catch (IOException e) {
throw new KafkaException("Failed read position from the transaction index " + txnFile.path().toAbsolutePath(), e);
throw new KafkaException("Failed read position from the transaction index " + file.getAbsolutePath(), e);
}
}
@ -217,13 +248,13 @@ public class TransactionIndex implements Closeable {
public AbortedTxnWithPosition next() {
try {
ByteBuffer buffer = allocate.get();
txnFile.readFully(buffer, position.value);
Utils.readFully(channel, buffer, position.value);
buffer.flip();
AbortedTxn abortedTxn = new AbortedTxn(buffer);
if (abortedTxn.version() > AbortedTxn.CURRENT_VERSION)
throw new KafkaException("Unexpected aborted transaction version " + abortedTxn.version()
+ " in transaction index " + txnFile.path().toAbsolutePath() + ", current version is "
+ " in transaction index " + file.getAbsolutePath() + ", current version is "
+ AbortedTxn.CURRENT_VERSION);
AbortedTxnWithPosition nextEntry = new AbortedTxnWithPosition(abortedTxn, position.value);
position.value += AbortedTxn.TOTAL_SIZE;
@ -231,113 +262,11 @@ public class TransactionIndex implements Closeable {
} catch (IOException e) {
// We received an unexpected error reading from the index file. We propagate this as an
// UNKNOWN error to the consumer, which will cause it to retry the fetch.
throw new KafkaException("Failed to read from the transaction index " + txnFile.path().toAbsolutePath(), e);
throw new KafkaException("Failed to read from the transaction index " + file.getAbsolutePath(), e);
}
}
};
}
// Visible for testing
static class TransactionIndexFile {
// note that the file is not created until we need it
private volatile Path path;
// channel is reopened as long as there are reads and writes
private FileChannel channel;
TransactionIndexFile(Path path) throws IOException {
this.path = path;
if (Files.exists(path))
openChannel();
}
private void openChannel() throws IOException {
channel = FileChannel.open(
path,
StandardOpenOption.CREATE,
StandardOpenOption.READ,
StandardOpenOption.WRITE
);
channel.position(channel.size());
}
synchronized void updateParentDir(Path parentDir) {
this.path = parentDir.resolve(path.getFileName());
}
synchronized void renameTo(Path other) throws IOException {
try {
if (Files.exists(path))
Utils.atomicMoveWithFallback(path, other, false);
} finally {
this.path = other;
}
}
synchronized void flush() throws IOException {
if (channel != null)
channel.force(true);
}
synchronized void closeChannel() throws IOException {
if (channel != null)
channel.close();
}
synchronized boolean isChannelOpen() {
return channel != null && channel.isOpen();
}
Path path() {
return path;
}
synchronized void truncate(long position) throws IOException {
if (channel != null)
channel.truncate(position);
}
boolean exists() {
return Files.exists(path);
}
boolean deleteIfExists() throws IOException {
closeChannel();
return Files.deleteIfExists(path());
}
void write(ByteBuffer buffer) throws IOException {
Utils.writeFully(channel(), buffer);
}
void readFully(ByteBuffer buffer, int position) throws IOException {
Utils.readFully(channel(), buffer, position);
}
long currentPosition() throws IOException {
return channel().position();
}
/**
* Use to read or write values to the index.
* The file is the source of truth and if available values should be read from or written to.
*
* @return an open file channel with the position at the end of the file
* @throws IOException if any I/O error happens, but not if existing channel is closed.
* In that case, it is reopened.
*/
private synchronized FileChannel channel() throws IOException {
if (channel == null) {
openChannel();
} else {
// as channel is exposed, it could be closed without setting it to null
if (!isChannelOpen()) {
log.debug("Transaction index channel was closed directly and is going to be reopened");
openChannel();
}
}
return channel;
}
}
}

View File

@ -23,20 +23,15 @@ import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -227,99 +222,4 @@ public class TransactionIndexTest {
index.append(new AbortedTxn(0L, 0, 10, 2));
assertFalse(index.isEmpty());
}
@Test
void testDoNotCreateFileUntilNeeded() throws IOException {
// Given that index file does not exist yet
file.delete();
// When index is created, reset, or flushed
// Then it is not created
final TransactionIndex index = assertDoesNotThrow(() -> new TransactionIndex(0, file));
assertFalse(file.exists());
index.reset();
assertFalse(file.exists());
index.flush();
assertFalse(file.exists());
// only when modifying it, it gets created
index.append(new AbortedTxn(0L, 0, 10, 2));
assertTrue(file.exists());
}
@Test
void testAppendAndCollectAfterClose() throws IOException {
// Given the index
// When closed
index.close();
// Then it should still append data
index.append(new AbortedTxn(0L, 0, 10, 2));
// When channel is closed
index.txnFile.closeChannel();
// Then it should still append data
assertDoesNotThrow(() -> index.append(new AbortedTxn(1L, 5, 15, 16)));
// When closed
index.close();
// Then it should still read data
List<AbortedTxn> abortedTxns = assertDoesNotThrow(() ->
index.collectAbortedTxns(0L, 100L).abortedTransactions);
assertEquals(2, abortedTxns.size());
assertEquals(0, abortedTxns.get(0).firstOffset());
assertEquals(5, abortedTxns.get(1).firstOffset());
// When channel is closed
index.txnFile.closeChannel();
// Then it should still read data
abortedTxns = assertDoesNotThrow(() ->
index.collectAbortedTxns(0L, 100L).abortedTransactions);
assertEquals(2, abortedTxns.size());
}
@Test
void testAppendAndCollectAfterInterrupted() throws Exception {
// Given the index
// When closed
index.close();
// Then it should still append data
index.append(new AbortedTxn(0L, 0, 10, 2));
// Given a thread reading from the channel
final CountDownLatch ready = new CountDownLatch(1);
final Exception[] exceptionHolder = new Exception[1];
Thread t = new Thread(() -> {
try {
ByteBuffer buffer = ByteBuffer.allocate(100);
while (index.txnFile.isChannelOpen()) {
index.txnFile.readFully(buffer, 0);
buffer.clear();
// wait until first reading happens to mark it as ready
if (ready.getCount() > 0) ready.countDown();
}
} catch (ClosedByInterruptException e) {
// Expected exception
exceptionHolder[0] = e;
} catch (Exception e) {
e.printStackTrace();
}
});
t.start();
assertTrue(ready.await(5, TimeUnit.SECONDS), "Timeout waiting for thread to finish");
// When thread is interrupted
t.interrupt();
t.join();
// Check if ClosedByInterruptException was thrown
assertNotNull(exceptionHolder[0], "An exception should have been thrown");
assertTrue(exceptionHolder[0] instanceof ClosedByInterruptException,
"Expected ClosedByInterruptException, but got: " + exceptionHolder[0].getClass().getName());
assertFalse(index.txnFile.isChannelOpen());
// Then it should still read data
List<AbortedTxn> abortedTxns = assertDoesNotThrow(() ->
index.collectAbortedTxns(0L, 100L).abortedTransactions);
assertEquals(1, abortedTxns.size());
}
}