KAFKA-18200; Handle empty batches in coordinator runtime (#18144)

* Avoid attaching empty writes to empty batches.
* Handle flushes of empty batches, which would return a 0 offset otherwise.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Sean Quah 2024-12-17 07:39:48 +00:00 committed by GitHub
parent cd5a7ee6b2
commit 18f17ed4d3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 113 additions and 2 deletions

View File

@ -768,6 +768,17 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
private void flushCurrentBatch() {
if (currentBatch != null) {
try {
if (currentBatch.builder.numRecords() == 0) {
// The only way we can get here is if append() has failed in an unexpected
// way and left an empty batch. Try to clean it up.
log.debug("Tried to flush an empty batch for {}.", tp);
// There should not be any deferred events attached to the batch. We fail
// the batch just in case. As a side effect, coordinator state is also
// reverted, but there should be no changes since the batch was empty.
failCurrentBatch(new IllegalStateException("Record batch was empty"));
return;
}
long flushStartMs = time.milliseconds();
// Write the records to the log and update the last written offset.
long offset = partitionWriter.append(
@ -926,7 +937,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
// If the records are empty, it was a read operation after all. In this case,
// the response can be returned directly iff there are no pending write operations;
// otherwise, the read needs to wait on the last write operation to be completed.
if (currentBatch != null) {
if (currentBatch != null && currentBatch.builder.numRecords() > 0) {
currentBatch.deferredEvents.add(event);
} else {
if (coordinator.lastCommittedOffset() < coordinator.lastWrittenOffset()) {

View File

@ -54,6 +54,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentMatcher;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.time.Duration;
@ -101,7 +102,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@SuppressWarnings("checkstyle:JavaNCSS")
@SuppressWarnings({"checkstyle:JavaNCSS", "checkstyle:ClassDataAbstractionCoupling"})
public class CoordinatorRuntimeTest {
private static final TopicPartition TP = new TopicPartition("__consumer_offsets", 0);
private static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofMillis(5);
@ -120,6 +121,34 @@ public class CoordinatorRuntimeTest {
}
}
private static class ThrowingSerializer<T> implements Serializer<T> {
private final Serializer<T> serializer;
private boolean throwOnNextOperation;
public ThrowingSerializer(Serializer<T> serializer) {
this.serializer = serializer;
this.throwOnNextOperation = false;
}
public void throwOnNextOperation() {
throwOnNextOperation = true;
}
@Override
public byte[] serializeKey(T record) {
return serializer.serializeKey(record);
}
@Override
public byte[] serializeValue(T record) {
if (throwOnNextOperation) {
throwOnNextOperation = false;
throw new BufferOverflowException();
}
return serializer.serializeValue(record);
}
}
/**
* A CoordinatorEventProcessor that directly executes the operations. This is
* useful in unit tests where execution in threads is not required.
@ -270,6 +299,10 @@ public class CoordinatorRuntimeTest {
if (batch.sizeInBytes() > config(tp).maxMessageSize())
throw new RecordTooLargeException("Batch is larger than the max message size");
// We don't want the coordinator to write empty batches.
if (batch.validBytes() <= 0)
throw new KafkaException("Coordinator tried to write an empty batch");
if (writeCount.incrementAndGet() > maxWrites)
throw new KafkaException("Maximum number of writes reached");
@ -4213,6 +4246,73 @@ public class CoordinatorRuntimeTest {
assertEquals(Collections.emptyList(), writer.entries(TP));
}
@Test
public void testEmptyBatch() throws Exception {
MockTimer timer = new MockTimer();
MockPartitionWriter writer = new MockPartitionWriter();
ThrowingSerializer<String> serializer = new ThrowingSerializer<String>(new StringSerializer());
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(Duration.ofMillis(20))
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(serializer)
.withAppendLingerMs(10)
.withExecutorService(mock(ExecutorService.class))
.build();
// Schedule the loading.
runtime.scheduleLoadOperation(TP, 10);
// Verify the initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertNull(ctx.currentBatch);
// Write #1, which fails.
serializer.throwOnNextOperation();
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(List.of("1"), "response1"));
// Write #1 should fail and leave an empty batch.
assertFutureThrows(write1, BufferOverflowException.class);
assertNotNull(ctx.currentBatch);
// Write #2, with no records.
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(Collections.emptyList(), "response2"));
// Write #2 should not be attached to the empty batch.
assertTrue(write2.isDone());
assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
// Complete transaction #1. It will flush the current empty batch.
// The coordinator must not try to write an empty batch, otherwise the mock partition writer
// will throw an exception.
CompletableFuture<Void> complete1 = runtime.scheduleTransactionCompletion(
"complete#1",
TP,
100L,
(short) 50,
10,
TransactionResult.COMMIT,
DEFAULT_WRITE_TIMEOUT
);
// Verify that the completion is not committed yet.
assertFalse(complete1.isDone());
// Commit and verify that writes are completed.
writer.commit(TP);
assertNull(complete1.get(5, TimeUnit.SECONDS));
}
@Test
public void testRecordFlushTime() throws Exception {
MockTimer timer = new MockTimer();