KAFKA-19427 Allow the coordinator to grow its buffer dynamically (#20040)
CI / build (push) Waiting to run Details

* Coordinator starts with a smaller buffer, which can grow as needed.

* In freeCurrentBatch, release the appropriate buffer:
  * The Coordinator recycles the expanded buffer
(`currentBatch.builder.buffer()`), not `currentBatch.buffer`, because
`MemoryBuilder` may allocate a new `ByteBuffer` if the existing one
isn't large enough.

  * There are two cases that buffer may exceeds `maxMessageSize`      1.
If there's a single record whose size exceeds `maxMessageSize` (which,
so far, is derived from `max.message.bytes`) and the write is in
`non-atomic` mode, it's still possible for the buffer to grow beyond
`maxMessageSize`. In this case, the Coordinator should revert to using a
smaller buffer afterward.      2. Coordinator do not recycles the buffer
that larger than `maxMessageSize`. If the user dynamically reduces
`maxMessageSize` to a value even smaller than `INITIAL_BUFFER_SIZE`, the
Coordinator should avoid recycling any buffer larger than
`maxMessageSize` so that Coordinator can allocate the smaller buffer in
the next round.

* Add tests to verify the above scenarios.

Reviewers: David Jacot <djacot@confluent.io>, Sean Quah
<squah@confluent.io>, Ken Huang <s7133700@gmail.com>, PoAn Yang
<payang@apache.org>, TaiJuWu <tjwu1217@gmail.com>, Jhen-Yung Hsu
<jhenyunghsu@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Ming-Yen Chung 2025-07-16 22:06:33 +08:00 committed by Chia-Ping Tsai
parent 98cb8df7a5
commit 05f012c7f1
2 changed files with 222 additions and 10 deletions

View File

@ -69,6 +69,7 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static java.lang.Math.min;
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.CoordinatorWriteEvent.NOT_QUEUED;
/**
@ -758,8 +759,14 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
// Cancel the linger timeout.
currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel);
// Release the buffer.
bufferSupplier.release(currentBatch.buffer);
// Release the buffer only if it is not larger than the maxBatchSize.
int maxBatchSize = partitionWriter.config(tp).maxMessageSize();
if (currentBatch.builder.buffer().capacity() <= maxBatchSize) {
bufferSupplier.release(currentBatch.builder.buffer());
} else if (currentBatch.buffer.capacity() <= maxBatchSize) {
bufferSupplier.release(currentBatch.buffer);
}
currentBatch = null;
}
@ -859,7 +866,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
LogConfig logConfig = partitionWriter.config(tp);
int maxBatchSize = logConfig.maxMessageSize();
long prevLastWrittenOffset = coordinator.lastWrittenOffset();
ByteBuffer buffer = bufferSupplier.get(maxBatchSize);
ByteBuffer buffer = bufferSupplier.get(min(INITIAL_BUFFER_SIZE, maxBatchSize));
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
buffer,
@ -1909,9 +1916,9 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
}
/**
* 16KB. Used for initial buffer size for write operations.
* 512KB. Used for initial buffer size for write operations.
*/
static final int MIN_BUFFER_SIZE = 16384;
static final int INITIAL_BUFFER_SIZE = 512 * 1024;
/**
* The log prefix.

View File

@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.common.runtime;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.RecordTooLargeException;
@ -44,6 +45,7 @@ import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentMatcher;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@ -66,7 +68,7 @@ import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.Coo
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.CoordinatorState.INITIAL;
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.CoordinatorState.LOADING;
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.HighWatermarkListener.NO_OFFSET;
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.MIN_BUFFER_SIZE;
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.INITIAL_BUFFER_SIZE;
import static org.apache.kafka.coordinator.common.runtime.TestUtil.endTransactionMarker;
import static org.apache.kafka.coordinator.common.runtime.TestUtil.records;
import static org.apache.kafka.coordinator.common.runtime.TestUtil.transactionalRecords;
@ -2919,11 +2921,11 @@ public class CoordinatorRuntimeTest {
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
int maxBatchSize = writer.config(TP).maxMessageSize();
assertTrue(maxBatchSize > MIN_BUFFER_SIZE);
assertTrue(maxBatchSize > INITIAL_BUFFER_SIZE);
// Generate enough records to create a batch that has 16KB < batchSize < maxBatchSize
// Generate enough records to create a batch that has INITIAL_BUFFER_SIZE < batchSize < maxBatchSize
List<String> records = new ArrayList<>();
for (int i = 0; i < 3000; i++) {
for (int i = 0; i < 50000; i++) {
records.add("record-" + i);
}
@ -2937,7 +2939,210 @@ public class CoordinatorRuntimeTest {
assertFalse(write1.isCompletedExceptionally());
int batchSize = writer.entries(TP).get(0).sizeInBytes();
assertTrue(batchSize > MIN_BUFFER_SIZE && batchSize < maxBatchSize);
assertTrue(batchSize > INITIAL_BUFFER_SIZE && batchSize < maxBatchSize);
}
@Test
public void testCoordinatorDoNotRetainBufferLargeThanMaxMessageSize() {
MockTimer timer = new MockTimer();
InMemoryPartitionWriter mockWriter = new InMemoryPartitionWriter(false) {
@Override
public LogConfig config(TopicPartition tp) {
return new LogConfig(Map.of(
TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 * 1024) // 1MB
));
}
};
StringSerializer serializer = new StringSerializer();
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(mockWriter)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(serializer)
.withExecutorService(mock(ExecutorService.class))
.build();
// Schedule the loading.
runtime.scheduleLoadOperation(TP, 10);
// Verify the initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
// Generate a record larger than the maxBatchSize.
List<String> largeRecords = List.of("A".repeat(100 * 1024 * 1024));
// Write #1.
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(largeRecords, "response1", null, true, false)
);
// Verify that the write has not completed exceptionally.
// This will catch any exceptions thrown including RecordTooLargeException.
assertFalse(write1.isCompletedExceptionally());
// Verify that the next buffer retrieved from the bufferSupplier is the initial small one, not the large buffer.
assertEquals(INITIAL_BUFFER_SIZE, ctx.bufferSupplier.get(1).capacity());
}
@Test
public void testCoordinatorRetainExpandedBufferLessOrEqualToMaxMessageSize() {
MockTimer timer = new MockTimer();
InMemoryPartitionWriter mockWriter = new InMemoryPartitionWriter(false) {
@Override
public LogConfig config(TopicPartition tp) {
return new LogConfig(Map.of(
TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 * 1024 * 1024) // 1GB
));
}
};
StringSerializer serializer = new StringSerializer();
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(mockWriter)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(serializer)
.withExecutorService(mock(ExecutorService.class))
.build();
// Schedule the loading.
runtime.scheduleLoadOperation(TP, 10);
// Verify the initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
// Generate enough records to create a batch that has INITIAL_BUFFER_SIZE < batchSize < maxBatchSize
List<String> records = new ArrayList<>();
for (int i = 0; i < 1000000; i++) {
records.add("record-" + i);
}
// Write #1.
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(records, "response1")
);
// Verify that the write has not completed exceptionally.
// This will catch any exceptions thrown including RecordTooLargeException.
assertFalse(write1.isCompletedExceptionally());
int batchSize = mockWriter.entries(TP).get(0).sizeInBytes();
int maxBatchSize = mockWriter.config(TP).maxMessageSize();
assertTrue(INITIAL_BUFFER_SIZE < batchSize && batchSize <= maxBatchSize);
// Verify that the next buffer retrieved from the bufferSupplier is the expanded buffer.
assertTrue(ctx.bufferSupplier.get(1).capacity() > INITIAL_BUFFER_SIZE);
}
@Test
public void testBufferShrinkWhenMaxMessageSizeReducedBelowInitialBufferSize() {
MockTimer timer = new MockTimer();
var mockWriter = new InMemoryPartitionWriter(false) {
private LogConfig config = new LogConfig(Map.of(
TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 * 1024) // 1MB
));
@Override
public LogConfig config(TopicPartition tp) {
return config;
}
public void updateConfig(LogConfig newConfig) {
this.config = newConfig;
}
};
StringSerializer serializer = new StringSerializer();
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(mockWriter)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(serializer)
.withExecutorService(mock(ExecutorService.class))
.build();
// Schedule the loading.
runtime.scheduleLoadOperation(TP, 10);
// Verify the initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList());
List<String> records = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
records.add("record-" + i);
}
// Write #1.
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(records, "response1")
);
// Verify that the write has not completed exceptionally.
// This will catch any exceptions thrown including RecordTooLargeException.
assertFalse(write1.isCompletedExceptionally());
int batchSize = mockWriter.entries(TP).get(0).sizeInBytes();
int maxBatchSize = mockWriter.config(TP).maxMessageSize();
assertTrue(batchSize <= INITIAL_BUFFER_SIZE && INITIAL_BUFFER_SIZE <= maxBatchSize);
ByteBuffer cachedBuffer = ctx.bufferSupplier.get(1);
assertEquals(INITIAL_BUFFER_SIZE, cachedBuffer.capacity());
// ctx.bufferSupplier.get(1); will clear cachedBuffer in bufferSupplier. Use release to put it back to bufferSupplier
ctx.bufferSupplier.release(cachedBuffer);
// Reduce max message size below initial buffer size.
mockWriter.updateConfig(new LogConfig(
Map.of(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(INITIAL_BUFFER_SIZE - 66))));
assertEquals(INITIAL_BUFFER_SIZE - 66, mockWriter.config(TP).maxMessageSize());
// Write #2.
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(records, "response2")
);
assertFalse(write2.isCompletedExceptionally());
// Verify that there is no cached buffer since the cached buffer size is greater than new maxMessageSize.
assertEquals(1, ctx.bufferSupplier.get(1).capacity());
// Write #3.
CompletableFuture<String> write3 = runtime.scheduleWriteOperation("write#3", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(records, "response3")
);
assertFalse(write3.isCompletedExceptionally());
// Verify that the cached buffer size is equals to new maxMessageSize that less than INITIAL_BUFFER_SIZE.
assertEquals(mockWriter.config(TP).maxMessageSize(), ctx.bufferSupplier.get(1).capacity());
}
@Test