mirror of https://github.com/apache/kafka.git
add helper functions and result for tests
This commit is contained in:
parent
611f4128b4
commit
73b02c991e
|
@ -1597,6 +1597,55 @@ public class RecordAccumulatorTest {
|
|||
}
|
||||
}
|
||||
|
||||
private static class SplitAndReenqueueResult {
|
||||
final int numSplitBatches;
|
||||
final int originalRecordCount;
|
||||
final int originalBatchSize;
|
||||
final ProducerBatch splitBatch;
|
||||
|
||||
SplitAndReenqueueResult(int numSplitBatches, int originalRecordCount, int originalBatchSize, ProducerBatch splitBatch) {
|
||||
this.numSplitBatches = numSplitBatches;
|
||||
this.originalRecordCount = originalRecordCount;
|
||||
this.originalBatchSize = originalBatchSize;
|
||||
this.splitBatch = splitBatch;
|
||||
}
|
||||
}
|
||||
|
||||
private SplitAndReenqueueResult performSplitAndReenqueueCycle(RecordAccumulator accum, ProducerBatch batch, int retryAttempt) {
|
||||
long now = time.milliseconds();
|
||||
|
||||
// Enqueue the batch for processing
|
||||
accum.reenqueue(batch, now);
|
||||
time.sleep(121L); // Wait for retry backoff
|
||||
|
||||
RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds());
|
||||
assertFalse(result.readyNodes.isEmpty(), "Batch should be ready for retry attempt " + retryAttempt);
|
||||
|
||||
Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
|
||||
assertEquals(1, drained.size(), "Only node1 should be drained in retry attempt " + retryAttempt);
|
||||
assertEquals(1, drained.get(node1.id()).size(), "Only one batch should be drained in retry attempt " + retryAttempt);
|
||||
|
||||
ProducerBatch drainedBatch = drained.get(node1.id()).get(0);
|
||||
assertEquals(1, drainedBatch.recordCount, "Drained batch should have exactly one record in retry attempt " + retryAttempt);
|
||||
|
||||
int originalRecordCount = drainedBatch.recordCount;
|
||||
int originalBatchSize = drainedBatch.estimatedSizeInBytes();
|
||||
|
||||
int numSplitBatches = accum.splitAndReenqueue(drainedBatch);
|
||||
|
||||
// wait for split batch to become ready
|
||||
time.sleep(101L);
|
||||
result = accum.ready(metadataCache, time.milliseconds());
|
||||
drained = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
|
||||
assertFalse(drained.isEmpty(), "Split batch should be ready for draining in retry attempt " + retryAttempt);
|
||||
assertFalse(drained.get(node1.id()).isEmpty(), "Node1 should have the split batch in retry attempt " + retryAttempt);
|
||||
|
||||
ProducerBatch splitBatch = drained.get(node1.id()).get(0);
|
||||
assertEquals(1, splitBatch.recordCount, "Split batch should still contain exactly one record in retry attempt " + retryAttempt);
|
||||
|
||||
return new SplitAndReenqueueResult(numSplitBatches, originalRecordCount, originalBatchSize, splitBatch);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the offset delta.
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue