mirror of https://github.com/apache/kafka.git
				
				
				
			KAFKA-16831: CoordinatorRuntime should initialize MemoryRecordsBuilder with max batch size write limit (#16059)
CoordinatorRuntime should initialize MemoryRecordsBuilder with max batch size write limit. Otherwise, we default the write limit to the min buffer size of 16384 for the write limit. This causes the coordinator to threw RecordTooLargeException even when it's under the 1MB max batch size limit. Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
		
							parent
							
								
									8eea6b8263
								
							
						
					
					
						commit
						d585a494a4
					
				|  | @ -800,10 +800,10 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut | |||
|                         byte magic = logConfig.recordVersion().value; | ||||
|                         int maxBatchSize = logConfig.maxMessageSize(); | ||||
|                         long currentTimeMs = time.milliseconds(); | ||||
|                         ByteBuffer buffer = context.bufferSupplier.get(Math.min(16384, maxBatchSize)); | ||||
|                         ByteBuffer buffer = context.bufferSupplier.get(Math.min(MIN_BUFFER_SIZE, maxBatchSize)); | ||||
| 
 | ||||
|                         try { | ||||
|                             MemoryRecordsBuilder builder = MemoryRecords.builder( | ||||
|                             MemoryRecordsBuilder builder = new MemoryRecordsBuilder( | ||||
|                                 buffer, | ||||
|                                 magic, | ||||
|                                 compression, | ||||
|  | @ -814,7 +814,9 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut | |||
|                                 producerEpoch, | ||||
|                                 0, | ||||
|                                 producerId != RecordBatch.NO_PRODUCER_ID, | ||||
|                                 RecordBatch.NO_PARTITION_LEADER_EPOCH | ||||
|                                 false, | ||||
|                                 RecordBatch.NO_PARTITION_LEADER_EPOCH, | ||||
|                                 maxBatchSize | ||||
|                             ); | ||||
| 
 | ||||
|                             // Apply the records to the state machine and add them to the batch. | ||||
|  | @ -845,7 +847,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut | |||
|                                     ); | ||||
|                                 } else { | ||||
|                                     throw new RecordTooLargeException("Message batch size is " + builder.estimatedSizeInBytes() + | ||||
|                                         " bytes in append to partition $tp which exceeds the maximum configured size of $maxBatchSize."); | ||||
|                                         " bytes in append to partition " + tp + " which exceeds the maximum " + | ||||
|                                         "configured size of " + maxBatchSize + "."); | ||||
|                                 } | ||||
|                             } | ||||
| 
 | ||||
|  | @ -1365,6 +1368,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut | |||
|         } | ||||
|     } | ||||
| 
 | ||||
|     /** | ||||
|      * 16KB. Used for initial buffer size for write operations. | ||||
|      */ | ||||
|     static final int MIN_BUFFER_SIZE = 16384; | ||||
| 
 | ||||
|     /** | ||||
|      * The log prefix. | ||||
|      */ | ||||
|  |  | |||
|  | @ -81,6 +81,7 @@ import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.Coor | |||
| import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.FAILED; | ||||
| import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.INITIAL; | ||||
| import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.LOADING; | ||||
| import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.MIN_BUFFER_SIZE; | ||||
| import static org.apache.kafka.test.TestUtils.assertFutureThrows; | ||||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||||
| import static org.junit.jupiter.api.Assertions.assertFalse; | ||||
|  | @ -3007,6 +3008,57 @@ public class CoordinatorRuntimeTest { | |||
|         timer.taskQueue().forEach(taskEntry -> assertTrue(taskEntry.cancelled())); | ||||
|     } | ||||
| 
 | ||||
|     @Test | ||||
|     public void testAppendRecordBatchSize() { | ||||
|         MockTimer timer = new MockTimer(); | ||||
|         MockPartitionWriter writer = new MockPartitionWriter(); | ||||
|         StringSerializer serializer = new StringSerializer(); | ||||
| 
 | ||||
|         CoordinatorRuntime<MockCoordinatorShard, String> runtime = | ||||
|             new CoordinatorRuntime.Builder<MockCoordinatorShard, String>() | ||||
|                 .withTime(timer.time()) | ||||
|                 .withTimer(timer) | ||||
|                 .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) | ||||
|                 .withLoader(new MockCoordinatorLoader()) | ||||
|                 .withEventProcessor(new DirectEventProcessor()) | ||||
|                 .withPartitionWriter(writer) | ||||
|                 .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) | ||||
|                 .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) | ||||
|                 .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) | ||||
|                 .withSerializer(serializer) | ||||
|                 .build(); | ||||
| 
 | ||||
|         // Schedule the loading. | ||||
|         runtime.scheduleLoadOperation(TP, 10); | ||||
| 
 | ||||
|         // Verify the initial state. | ||||
|         CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); | ||||
|         assertEquals(0L, ctx.coordinator.lastWrittenOffset()); | ||||
|         assertEquals(0L, ctx.coordinator.lastCommittedOffset()); | ||||
|         assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); | ||||
| 
 | ||||
|         int maxBatchSize = writer.config(TP).maxMessageSize(); | ||||
|         assertTrue(maxBatchSize > MIN_BUFFER_SIZE); | ||||
| 
 | ||||
|         // Generate enough records to create a batch that has 16KB < batchSize < maxBatchSize | ||||
|         List<String> records = new ArrayList<>(); | ||||
|         for (int i = 0; i < 3000; i++) { | ||||
|             records.add("record-" + i); | ||||
|         } | ||||
| 
 | ||||
|         // Write #1. | ||||
|         CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, | ||||
|             state -> new CoordinatorResult<>(records, "response1") | ||||
|         ); | ||||
| 
 | ||||
|         // Verify that the write has not completed exceptionally. | ||||
|         // This will catch any exceptions thrown including RecordTooLargeException. | ||||
|         assertFalse(write1.isCompletedExceptionally()); | ||||
| 
 | ||||
|         int batchSize = writer.entries(TP).get(0).sizeInBytes(); | ||||
|         assertTrue(batchSize > MIN_BUFFER_SIZE && batchSize < maxBatchSize); | ||||
|     } | ||||
| 
 | ||||
|     private static <S extends CoordinatorShard<U>, U> ArgumentMatcher<CoordinatorPlayback<U>> coordinatorMatcher( | ||||
|         CoordinatorRuntime<S, U> runtime, | ||||
|         TopicPartition tp | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue