diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java index 1e9724a57aa..e1e80476cf8 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java @@ -70,6 +70,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.stream.Collectors; +import static java.lang.Math.min; import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.CoordinatorWriteEvent.NOT_QUEUED; /** @@ -758,8 +759,14 @@ public class CoordinatorRuntime, U> implements Aut // Cancel the linger timeout. currentBatch.lingerTimeoutTask.ifPresent(TimerTask::cancel); - // Release the buffer. - bufferSupplier.release(currentBatch.buffer); + // Release the buffer only if it is not larger than the maxBatchSize. + int maxBatchSize = partitionWriter.config(tp).maxMessageSize(); + + if (currentBatch.builder.buffer().capacity() <= maxBatchSize) { + bufferSupplier.release(currentBatch.builder.buffer()); + } else if (currentBatch.buffer.capacity() <= maxBatchSize) { + bufferSupplier.release(currentBatch.buffer); + } currentBatch = null; } @@ -859,7 +866,7 @@ public class CoordinatorRuntime, U> implements Aut LogConfig logConfig = partitionWriter.config(tp); int maxBatchSize = logConfig.maxMessageSize(); long prevLastWrittenOffset = coordinator.lastWrittenOffset(); - ByteBuffer buffer = bufferSupplier.get(maxBatchSize); + ByteBuffer buffer = bufferSupplier.get(min(INITIAL_BUFFER_SIZE, maxBatchSize)); MemoryRecordsBuilder builder = new MemoryRecordsBuilder( buffer, @@ -1888,9 +1895,9 @@ public class CoordinatorRuntime, U> implements Aut } /** - * 16KB. Used for initial buffer size for write operations. + * 512KB. Used for initial buffer size for write operations. */ - static final int MIN_BUFFER_SIZE = 16384; + static final int INITIAL_BUFFER_SIZE = 512 * 1024; /** * The log prefix. diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java index 9e4e6f7bb9b..9198e207e4b 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.coordinator.common.runtime; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.errors.NotCoordinatorException; import org.apache.kafka.common.errors.NotEnoughReplicasException; import org.apache.kafka.common.errors.RecordTooLargeException; @@ -65,6 +66,7 @@ import java.util.Comparator; import java.util.Deque; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.OptionalInt; import java.util.Set; @@ -84,7 +86,7 @@ import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.Coo import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.CoordinatorState.INITIAL; import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.CoordinatorState.LOADING; import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.HighWatermarkListener.NO_OFFSET; -import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.MIN_BUFFER_SIZE; +import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime.INITIAL_BUFFER_SIZE; import static org.apache.kafka.test.TestUtils.assertFutureThrows; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -3486,11 +3488,11 @@ public class CoordinatorRuntimeTest { assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); int maxBatchSize = writer.config(TP).maxMessageSize(); - assertTrue(maxBatchSize > MIN_BUFFER_SIZE); + assertTrue(maxBatchSize > INITIAL_BUFFER_SIZE); - // Generate enough records to create a batch that has 16KB < batchSize < maxBatchSize + // Generate enough records to create a batch that has INITIAL_BUFFER_SIZE < batchSize < maxBatchSize List records = new ArrayList<>(); - for (int i = 0; i < 3000; i++) { + for (int i = 0; i < 50000; i++) { records.add("record-" + i); } @@ -3504,7 +3506,210 @@ public class CoordinatorRuntimeTest { assertFalse(write1.isCompletedExceptionally()); int batchSize = writer.entries(TP).get(0).sizeInBytes(); - assertTrue(batchSize > MIN_BUFFER_SIZE && batchSize < maxBatchSize); + assertTrue(batchSize > INITIAL_BUFFER_SIZE && batchSize < maxBatchSize); + } + + @Test + public void testCoordinatorDoNotRetainBufferLargeThanMaxMessageSize() { + MockTimer timer = new MockTimer(); + InMemoryPartitionWriter mockWriter = new InMemoryPartitionWriter(false) { + @Override + public LogConfig config(TopicPartition tp) { + return new LogConfig(Map.of( + TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 * 1024) // 1MB + )); + } + }; + StringSerializer serializer = new StringSerializer(); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(mockWriter) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) + .withSerializer(serializer) + .withExecutorService(mock(ExecutorService.class)) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); + + // Generate a record larger than the maxBatchSize. + List largeRecords = List.of("A".repeat(100 * 1024 * 1024)); + + // Write #1. + CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, + state -> new CoordinatorResult<>(largeRecords, "response1", null, true, false) + ); + + // Verify that the write has not completed exceptionally. + // This will catch any exceptions thrown including RecordTooLargeException. + assertFalse(write1.isCompletedExceptionally()); + + // Verify that the next buffer retrieved from the bufferSupplier is the initial small one, not the large buffer. + assertEquals(INITIAL_BUFFER_SIZE, ctx.bufferSupplier.get(1).capacity()); + } + + @Test + public void testCoordinatorRetainExpandedBufferLessOrEqualToMaxMessageSize() { + MockTimer timer = new MockTimer(); + InMemoryPartitionWriter mockWriter = new InMemoryPartitionWriter(false) { + @Override + public LogConfig config(TopicPartition tp) { + return new LogConfig(Map.of( + TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 * 1024 * 1024) // 1GB + )); + } + }; + StringSerializer serializer = new StringSerializer(); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(mockWriter) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) + .withSerializer(serializer) + .withExecutorService(mock(ExecutorService.class)) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); + + // Generate enough records to create a batch that has INITIAL_BUFFER_SIZE < batchSize < maxBatchSize + List records = new ArrayList<>(); + for (int i = 0; i < 1000000; i++) { + records.add("record-" + i); + } + + // Write #1. + CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, + state -> new CoordinatorResult<>(records, "response1") + ); + + // Verify that the write has not completed exceptionally. + // This will catch any exceptions thrown including RecordTooLargeException. + assertFalse(write1.isCompletedExceptionally()); + + int batchSize = mockWriter.entries(TP).get(0).sizeInBytes(); + int maxBatchSize = mockWriter.config(TP).maxMessageSize(); + assertTrue(INITIAL_BUFFER_SIZE < batchSize && batchSize <= maxBatchSize); + + // Verify that the next buffer retrieved from the bufferSupplier is the expanded buffer. + assertTrue(ctx.bufferSupplier.get(1).capacity() > INITIAL_BUFFER_SIZE); + } + + @Test + public void testBufferShrinkWhenMaxMessageSizeReducedBelowInitialBufferSize() { + MockTimer timer = new MockTimer(); + var mockWriter = new InMemoryPartitionWriter(false) { + private LogConfig config = new LogConfig(Map.of( + TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 * 1024) // 1MB + )); + + @Override + public LogConfig config(TopicPartition tp) { + return config; + } + + public void updateConfig(LogConfig newConfig) { + this.config = newConfig; + } + }; + StringSerializer serializer = new StringSerializer(); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(mockWriter) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) + .withSerializer(serializer) + .withExecutorService(mock(ExecutorService.class)) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); + + List records = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + records.add("record-" + i); + } + + // Write #1. + CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, + state -> new CoordinatorResult<>(records, "response1") + ); + + // Verify that the write has not completed exceptionally. + // This will catch any exceptions thrown including RecordTooLargeException. + assertFalse(write1.isCompletedExceptionally()); + + int batchSize = mockWriter.entries(TP).get(0).sizeInBytes(); + int maxBatchSize = mockWriter.config(TP).maxMessageSize(); + assertTrue(batchSize <= INITIAL_BUFFER_SIZE && INITIAL_BUFFER_SIZE <= maxBatchSize); + + ByteBuffer cachedBuffer = ctx.bufferSupplier.get(1); + assertEquals(INITIAL_BUFFER_SIZE, cachedBuffer.capacity()); + // ctx.bufferSupplier.get(1); will clear cachedBuffer in bufferSupplier. Use release to put it back to bufferSupplier + ctx.bufferSupplier.release(cachedBuffer); + + // Reduce max message size below initial buffer size. + mockWriter.updateConfig(new LogConfig( + Map.of(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(INITIAL_BUFFER_SIZE - 66)))); + assertEquals(INITIAL_BUFFER_SIZE - 66, mockWriter.config(TP).maxMessageSize()); + + // Write #2. + CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, DEFAULT_WRITE_TIMEOUT, + state -> new CoordinatorResult<>(records, "response2") + ); + assertFalse(write2.isCompletedExceptionally()); + + // Verify that there is no cached buffer since the cached buffer size is greater than new maxMessageSize. + assertEquals(1, ctx.bufferSupplier.get(1).capacity()); + + // Write #3. + CompletableFuture write3 = runtime.scheduleWriteOperation("write#3", TP, DEFAULT_WRITE_TIMEOUT, + state -> new CoordinatorResult<>(records, "response3") + ); + assertFalse(write3.isCompletedExceptionally()); + + // Verify that the cached buffer size is equals to new maxMessageSize that less than INITIAL_BUFFER_SIZE. + assertEquals(mockWriter.config(TP).maxMessageSize(), ctx.bufferSupplier.get(1).capacity()); } @Test