This commit is contained in:
Izzy Harker 2025-10-07 14:53:05 -04:00 committed by GitHub
commit 3f5ac4e588
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 265 additions and 18 deletions

View File

@ -911,6 +911,27 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
}
}
/**
* Handles the case where append() is called with no records.
*
* @param event The event that must be completed when the
* records are written.
*/
private void handleEmptyRecords(DeferredEvent event) {
// If the records are empty, it was a read operation after all. In this case,
// the response can be returned directly iff there are no pending write operations;
// otherwise, the read needs to wait on the last write operation to be completed.
if (currentBatch != null && currentBatch.builder.numRecords() > 0) {
currentBatch.deferredEvents.add(event);
} else {
if (coordinator.lastCommittedOffset() < coordinator.lastWrittenOffset()) {
deferredEventQueue.add(coordinator.lastWrittenOffset(), DeferredEventCollection.of(log, event));
} else {
event.complete(null);
}
}
}
/**
* Appends records to the log and replay them to the state machine.
*
@ -939,18 +960,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
}
if (records.isEmpty()) {
// If the records are empty, it was a read operation after all. In this case,
// the response can be returned directly iff there are no pending write operations;
// otherwise, the read needs to wait on the last write operation to be completed.
if (currentBatch != null && currentBatch.builder.numRecords() > 0) {
currentBatch.deferredEvents.add(event);
} else {
if (coordinator.lastCommittedOffset() < coordinator.lastWrittenOffset()) {
deferredEventQueue.add(coordinator.lastWrittenOffset(), DeferredEventCollection.of(log, event));
} else {
event.complete(null);
}
}
handleEmptyRecords(event);
} else {
// If the records are not empty, first, they are applied to the state machine,
// second, they are appended to the opened batch.
@ -990,6 +1000,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
compression.type(),
recordsToAppend
);
int estimatedSizeUpperBound = AbstractRecords.estimateSizeInBytes(
currentBatch.builder.magic(),
Compression.none().build().type(),
recordsToAppend
);
// Check if the current batch has enough space. We check this before
// replaying the records in order to avoid having to revert back
@ -1000,7 +1015,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
"configured size of " + currentBatch.maxBatchSize + ".");
}
if (!currentBatch.builder.hasRoomFor(estimatedSize)) {
if (!currentBatch.builder.hasRoomFor(estimatedSizeUpperBound)) {
// Otherwise, we write the current batch, allocate a new one and re-verify
// whether the records fit in it.
// If flushing fails, we don't catch the exception in order to let
@ -1075,6 +1090,9 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
// Add the event to the list of pending events associated with the batch.
currentBatch.deferredEvents.add(event);
if (isAtomic && !currentBatch.builder.hasRoomFor(0)) {
flushCurrentBatch();
} else {
// Write the current batch if it is transactional or if the linger timeout
// has expired.
// If flushing fails, we don't catch the exception in order to let
@ -1082,6 +1100,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
maybeFlushCurrentBatch(currentTimeMs);
}
}
}
/**
* Completes a transaction.

View File

@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.common.runtime;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
@ -51,6 +52,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@ -4292,6 +4294,185 @@ public class CoordinatorRuntimeTest {
assertEquals("response3", write3.get(5, TimeUnit.SECONDS));
}
@Test
public void testLargeCompressibleRecordTriggersFlushAndSucceeds() throws Exception {
MockTimer timer = new MockTimer();
// Writer sleeps for 10ms before appending records.
MockPartitionWriter writer = new MockPartitionWriter(timer.time(), Integer.MAX_VALUE, false);
CoordinatorRuntimeMetrics runtimeMetrics = mock(CoordinatorRuntimeMetrics.class);
Compression compression = Compression.gzip().build();
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(Duration.ofMillis(20))
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(runtimeMetrics)
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withCompression(compression)
.withSerializer(new StringSerializer())
.withAppendLingerMs(10)
.withExecutorService(mock(ExecutorService.class))
.build();
// Schedule the loading.
runtime.scheduleLoadOperation(TP, 10);
// Verify the initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertNull(ctx.currentBatch);
// Get the max batch size.
int maxBatchSize = writer.config(TP).maxMessageSize();
// Create 2 records with a quarter of the max batch size each.
List<String> records = Stream.of('1', '2').map(c -> {
char[] payload = new char[maxBatchSize / 4];
Arrays.fill(payload, c);
return new String(payload);
}).collect(Collectors.toList());
// Write #1 with the small records, batch will be about half full
long firstBatchTimestamp = timer.time().milliseconds();
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(50),
state -> new CoordinatorResult<>(records, "response1")
);
// A batch has been created.
assertNotNull(ctx.currentBatch);
// Verify the state - batch is not yet flushed
assertEquals(List.of(), writer.entries(TP));
// Create a large record of highly compressible data
List<String> largeRecord = List.of("a".repeat(3 * maxBatchSize));
// Write #2 with the large record. This record is too large to go into the previous batch
// uncompressed but will fit in the new buffer once compressed, so we should flush the
// previous batch and successfully allocate a new batch for this record. The new batch
// will also trigger an immediate flush.
long secondBatchTimestamp = timer.time().milliseconds();
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(50),
state -> new CoordinatorResult<>(largeRecord, "response2")
);
// Verify the state.
assertEquals(3L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
new MockCoordinatorShard.RecordAndMetadata(2, largeRecord.get(0))
), ctx.coordinator.coordinator().fullRecords());
assertEquals(List.of(
records(firstBatchTimestamp, compression, records),
records(secondBatchTimestamp, compression, largeRecord)
), writer.entries(TP));
// Commit and verify that writes are completed.
writer.commit(TP);
assertTrue(write1.isDone());
assertTrue(write2.isDone());
assertEquals(3L, ctx.coordinator.lastCommittedOffset());
assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
}
@Test
public void testLargeUncompressibleRecordTriggersFlushAndFails() throws Exception {
MockTimer timer = new MockTimer();
// Writer sleeps for 10ms before appending records.
MockPartitionWriter writer = new MockPartitionWriter(timer.time(), Integer.MAX_VALUE, false);
CoordinatorRuntimeMetrics runtimeMetrics = mock(CoordinatorRuntimeMetrics.class);
Compression compression = Compression.gzip().build();
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(Duration.ofMillis(20))
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(runtimeMetrics)
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withCompression(compression)
.withSerializer(new StringSerializer())
.withAppendLingerMs(10)
.withExecutorService(mock(ExecutorService.class))
.build();
// Schedule the loading.
runtime.scheduleLoadOperation(TP, 10);
// Verify the initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertNull(ctx.currentBatch);
// Get the max batch size.
int maxBatchSize = writer.config(TP).maxMessageSize();
// Create 2 records with a quarter of the max batch size each.
List<String> records = Stream.of('1', '2').map(c -> {
char[] payload = new char[maxBatchSize / 4];
Arrays.fill(payload, c);
return new String(payload);
}).collect(Collectors.toList());
// Write #1 with the small records, batch will be about half full
long firstBatchTimestamp = timer.time().milliseconds();
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(50),
state -> new CoordinatorResult<>(records, "response1")
);
// A batch has been created.
assertNotNull(ctx.currentBatch);
// Verify the state - batch is not yet flushed
assertEquals(List.of(), writer.entries(TP));
// Create a large record of not very compressible data
char[] payload = new char[3 * maxBatchSize];
Random offset = new Random();
for (int i = 0; i < payload.length; i++) {
payload[i] = (char) ('a' + ((char) offset.nextInt() % 26));
}
List<String> largeRecord = List.of(new String(payload));
// Write #2 with the large record. This record is too large to go into the previous batch
// and is not compressible so it should be flushed. It is also too large to fit in a new batch
// so the write will fail with RecordTooLargeException
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(50),
state -> new CoordinatorResult<>(largeRecord, "response2")
);
// Verify the state. The first batch was flushed and the largeRecord
// write failed.
assertEquals(2L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1))
), ctx.coordinator.coordinator().fullRecords());
assertEquals(List.of(
records(firstBatchTimestamp, compression, records)
), writer.entries(TP));
// Commit and verify that writes are completed.
writer.commit(TP);
assertTrue(write1.isDone());
assertTrue(write2.isDone());
assertEquals(2L, ctx.coordinator.lastCommittedOffset());
assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
}
@Test
public void testRecordEventPurgatoryTime() throws Exception {
Duration writeTimeout = Duration.ofMillis(1000);

View File

@ -49,6 +49,14 @@ public class TestUtil {
return records(timestamp, Arrays.stream(records).toList());
}
public static MemoryRecords records(
long timestamp,
Compression compression,
String... records
) {
return records(timestamp, compression, Arrays.stream(records).toList());
}
public static MemoryRecords records(
long timestamp,
List<String> records
@ -87,6 +95,45 @@ public class TestUtil {
return builder.build();
}
public static MemoryRecords records(
long timestamp,
Compression compression,
List<String> records
) {
if (records.isEmpty())
return MemoryRecords.EMPTY;
List<SimpleRecord> simpleRecords = records.stream().map(record ->
new SimpleRecord(timestamp, record.getBytes(Charset.defaultCharset()))
).toList();
int sizeEstimate = AbstractRecords.estimateSizeInBytes(
RecordVersion.current().value,
compression.type(),
simpleRecords
);
ByteBuffer buffer = ByteBuffer.allocate(sizeEstimate);
MemoryRecordsBuilder builder = MemoryRecords.builder(
buffer,
RecordVersion.current().value,
compression,
TimestampType.CREATE_TIME,
0L,
timestamp,
RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH,
0,
false,
RecordBatch.NO_PARTITION_LEADER_EPOCH
);
simpleRecords.forEach(builder::append);
return builder.build();
}
public static MemoryRecords transactionalRecords(
long producerId,
short producerEpoch,