From 73b02c991efc503831ec512f5fcdd384e6c2e8d0 Mon Sep 17 00:00:00 2001 From: Shashank Hosahalli Shivamurthy Date: Thu, 31 Jul 2025 15:07:22 -0700 Subject: [PATCH] add helper functions and result for tests --- .../internals/RecordAccumulatorTest.java | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 750440d2595..b87284747f3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -1597,6 +1597,55 @@ public class RecordAccumulatorTest { } } + private static class SplitAndReenqueueResult { + final int numSplitBatches; + final int originalRecordCount; + final int originalBatchSize; + final ProducerBatch splitBatch; + + SplitAndReenqueueResult(int numSplitBatches, int originalRecordCount, int originalBatchSize, ProducerBatch splitBatch) { + this.numSplitBatches = numSplitBatches; + this.originalRecordCount = originalRecordCount; + this.originalBatchSize = originalBatchSize; + this.splitBatch = splitBatch; + } + } + + private SplitAndReenqueueResult performSplitAndReenqueueCycle(RecordAccumulator accum, ProducerBatch batch, int retryAttempt) { + long now = time.milliseconds(); + + // Enqueue the batch for processing + accum.reenqueue(batch, now); + time.sleep(121L); // Wait for retry backoff + + RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds()); + assertFalse(result.readyNodes.isEmpty(), "Batch should be ready for retry attempt " + retryAttempt); + + Map> drained = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); + assertEquals(1, drained.size(), "Only node1 should be drained in retry attempt " + retryAttempt); + assertEquals(1, drained.get(node1.id()).size(), "Only one batch should be drained in retry attempt " + retryAttempt); + + ProducerBatch drainedBatch = drained.get(node1.id()).get(0); + assertEquals(1, drainedBatch.recordCount, "Drained batch should have exactly one record in retry attempt " + retryAttempt); + + int originalRecordCount = drainedBatch.recordCount; + int originalBatchSize = drainedBatch.estimatedSizeInBytes(); + + int numSplitBatches = accum.splitAndReenqueue(drainedBatch); + + // wait for split batch to become ready + time.sleep(101L); + result = accum.ready(metadataCache, time.milliseconds()); + drained = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); + assertFalse(drained.isEmpty(), "Split batch should be ready for draining in retry attempt " + retryAttempt); + assertFalse(drained.get(node1.id()).isEmpty(), "Node1 should have the split batch in retry attempt " + retryAttempt); + + ProducerBatch splitBatch = drained.get(node1.id()).get(0); + assertEquals(1, splitBatch.recordCount, "Split batch should still contain exactly one record in retry attempt " + retryAttempt); + + return new SplitAndReenqueueResult(numSplitBatches, originalRecordCount, originalBatchSize, splitBatch); + } + /** * Return the offset delta. */