diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index c04dd59e09e..1c6b8e6fef4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -1066,111 +1066,6 @@ public class RecordAccumulatorTest { assertEquals(1, future2.get().offset()); } - // This test confirms us that splitting a single large record - // creates an unsplittable batch (does not really split it) - // that will continue to fail with MESSAGE_TOO_LARGE, - // causing infinite retry loops - @Test - public void testSplitAndReenqueueWithSingleLargeRecord() throws ExecutionException, InterruptedException { - long now = time.milliseconds(); - int smallBatchSize = 1024; - RecordAccumulator accum = createTestRecordAccumulator(smallBatchSize, 10 * 1024, Compression.NONE, 10); - - // create a single record that is much larger than the batch size limit - // we are trying to mimic by send a record larger than broker's message.max.bytes - byte[] largeValue = new byte[4 * 1024]; // 4KB > 1KB - - // Create a buffer with enough space for the large record - ByteBuffer buffer = ByteBuffer.allocate(8192); - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, TimestampType.CREATE_TIME, 0L); - ProducerBatch batch = new ProducerBatch(tp1, builder, now, true); - - final AtomicInteger acked = new AtomicInteger(0); - Callback cb = (metadata, exception) -> acked.incrementAndGet(); - - // create a large batch but only with one single record - Future future = batch.tryAppend(now, key, largeValue, Record.EMPTY_HEADERS, cb, now); - assertNotNull(future, "Should be able to append the large record to batch"); - assertEquals(1, batch.recordCount, "Batch should contain exactly one record"); - batch.close(); - - // try to split and reenqueue a single large record - SplitAndReenqueueResult result = performSplitAndReenqueueCycle(accum, batch, 0); - - // The below asserts tests that the single large record - // results in exactly one "split" batch - assertEquals(1, result.numSplitBatches, "Single large record should result in exactly one split batch"); - assertEquals(1, result.originalRecordCount, "Original batch should have exactly one record"); - assertEquals(1, result.splitBatch.recordCount, "Split batch should still contain exactly one record"); - assertTrue(result.originalBatchSize > smallBatchSize, "Original batch should exceed batch size limit"); - - // the "split" batch is still oversized and contains the same record - assertTrue(result.splitBatch.estimatedSizeInBytes() > smallBatchSize, - "Split batch is still oversized - it cannot be split further and will cause an error, will retry infinitely"); - } - - // This test retries for infinite times (controlled for 5 times for testing) - // because the record can never be split further - @Test - public void testRetrySplitAndReenqueueBehaviourWithSingleLargeRecord() throws ExecutionException, InterruptedException { - long now = time.milliseconds(); - int smallBatchSize = 1024; - RecordAccumulator accum = createTestRecordAccumulator(smallBatchSize, 10 * 1024, Compression.NONE, 10); - - // create a single record that is much larger than the batch size limit - // we are trying to mimic by send a record larger than broker's message.max.bytes - byte[] largeValue = new byte[4 * 1024]; // 4KB > 1KB - - // Create a buffer with enough space for the large record - ByteBuffer buffer = ByteBuffer.allocate(8192); - MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, TimestampType.CREATE_TIME, 0L); - ProducerBatch originalBatch = new ProducerBatch(tp1, builder, now, true); - - final AtomicInteger acked = new AtomicInteger(0); - Callback cb = (metadata, exception) -> acked.incrementAndGet(); - - Future future = originalBatch.tryAppend(now, key, largeValue, Record.EMPTY_HEADERS, cb, now); - assertNotNull(future, "Should be able to append the large record to batch"); - assertEquals(1, originalBatch.recordCount, "Original batch should contain exactly one record"); - originalBatch.close(); - - // controlled test case, retry behavior across multiple cycles - // 5 cycles for testing but mimics infinite retries in reality - final int maxRetryCycles = 5; - - ProducerBatch currentBatch = originalBatch; - List results = new ArrayList<>(); - - for (int retryAttempt = 0; retryAttempt < maxRetryCycles; retryAttempt++) { - SplitAndReenqueueResult result = performSplitAndReenqueueCycle(accum, currentBatch, retryAttempt); - results.add(result); - - // Verify that each retry produces exactly 1 "split" batch (cannot be split further) - assertEquals(1, result.numSplitBatches, "Single record should result in exactly one split batch in retry attempt " + retryAttempt); - assertEquals(1, result.originalRecordCount, "Original batch should have exactly one record in retry attempt " + retryAttempt); - assertTrue(result.originalBatchSize > smallBatchSize, "Original batch should exceed size limit in retry attempt " + retryAttempt); - assertEquals(1, result.splitBatch.recordCount, "Split batch should still contain exactly one record in retry attempt " + retryAttempt); - - // The split batch is still oversized and will fail with MESSAGE_TOO_LARGE again - assertTrue(result.splitBatch.estimatedSizeInBytes() > smallBatchSize, - "Split batch in retry " + retryAttempt + " is still oversized and will fail MESSAGE_TOO_LARGE again"); - - // the new batch must be the split batch - currentBatch = result.splitBatch; - } - - // making sure that all the retry attempts were tracked - assertEquals(maxRetryCycles, results.size(), "Should have tracked all retry attempts"); - - // consistency across all retry cycles - each produces exactly 1 unsplittable batch - for (int i = 0; i < maxRetryCycles; i++) { - SplitAndReenqueueResult result = results.get(i); - assertEquals(1, result.numSplitBatches, "Retry attempt " + i + " should produce exactly 1 split batch"); - assertEquals(1, result.originalRecordCount, "Retry attempt " + i + " should have exactly 1 record"); - assertTrue(result.originalBatchSize > smallBatchSize, "Retry attempt " + i + " batch should exceed size limit"); - } - } - // here I am testing the hasRoomFor() behaviour // It allows the first record no matter the size // but does not allow the second record @@ -1737,55 +1632,6 @@ public class RecordAccumulatorTest { } } - private static class SplitAndReenqueueResult { - final int numSplitBatches; - final int originalRecordCount; - final int originalBatchSize; - final ProducerBatch splitBatch; - - SplitAndReenqueueResult(int numSplitBatches, int originalRecordCount, int originalBatchSize, ProducerBatch splitBatch) { - this.numSplitBatches = numSplitBatches; - this.originalRecordCount = originalRecordCount; - this.originalBatchSize = originalBatchSize; - this.splitBatch = splitBatch; - } - } - - private SplitAndReenqueueResult performSplitAndReenqueueCycle(RecordAccumulator accum, ProducerBatch batch, int retryAttempt) { - long now = time.milliseconds(); - - // Enqueue the batch for processing - accum.reenqueue(batch, now); - time.sleep(121L); // Wait for retry backoff - - RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds()); - assertFalse(result.readyNodes.isEmpty(), "Batch should be ready for retry attempt " + retryAttempt); - - Map> drained = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); - assertEquals(1, drained.size(), "Only node1 should be drained in retry attempt " + retryAttempt); - assertEquals(1, drained.get(node1.id()).size(), "Only one batch should be drained in retry attempt " + retryAttempt); - - ProducerBatch drainedBatch = drained.get(node1.id()).get(0); - assertEquals(1, drainedBatch.recordCount, "Drained batch should have exactly one record in retry attempt " + retryAttempt); - - int originalRecordCount = drainedBatch.recordCount; - int originalBatchSize = drainedBatch.estimatedSizeInBytes(); - - int numSplitBatches = accum.splitAndReenqueue(drainedBatch); - - // wait for split batch to become ready - time.sleep(101L); - result = accum.ready(metadataCache, time.milliseconds()); - drained = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); - assertFalse(drained.isEmpty(), "Split batch should be ready for draining in retry attempt " + retryAttempt); - assertFalse(drained.get(node1.id()).isEmpty(), "Node1 should have the split batch in retry attempt " + retryAttempt); - - ProducerBatch splitBatch = drained.get(node1.id()).get(0); - assertEquals(1, splitBatch.recordCount, "Split batch should still contain exactly one record in retry attempt " + retryAttempt); - - return new SplitAndReenqueueResult(numSplitBatches, originalRecordCount, originalBatchSize, splitBatch); - } - /** * Return the offset delta. */