mirror of https://github.com/apache/kafka.git
remove useless tests
This commit is contained in:
parent
616e5feab8
commit
4762a9975d
|
@ -1066,111 +1066,6 @@ public class RecordAccumulatorTest {
|
|||
assertEquals(1, future2.get().offset());
|
||||
}
|
||||
|
||||
// This test confirms us that splitting a single large record
|
||||
// creates an unsplittable batch (does not really split it)
|
||||
// that will continue to fail with MESSAGE_TOO_LARGE,
|
||||
// causing infinite retry loops
|
||||
@Test
|
||||
public void testSplitAndReenqueueWithSingleLargeRecord() throws ExecutionException, InterruptedException {
|
||||
long now = time.milliseconds();
|
||||
int smallBatchSize = 1024;
|
||||
RecordAccumulator accum = createTestRecordAccumulator(smallBatchSize, 10 * 1024, Compression.NONE, 10);
|
||||
|
||||
// create a single record that is much larger than the batch size limit
|
||||
// we are trying to mimic by send a record larger than broker's message.max.bytes
|
||||
byte[] largeValue = new byte[4 * 1024]; // 4KB > 1KB
|
||||
|
||||
// Create a buffer with enough space for the large record
|
||||
ByteBuffer buffer = ByteBuffer.allocate(8192);
|
||||
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, TimestampType.CREATE_TIME, 0L);
|
||||
ProducerBatch batch = new ProducerBatch(tp1, builder, now, true);
|
||||
|
||||
final AtomicInteger acked = new AtomicInteger(0);
|
||||
Callback cb = (metadata, exception) -> acked.incrementAndGet();
|
||||
|
||||
// create a large batch but only with one single record
|
||||
Future<RecordMetadata> future = batch.tryAppend(now, key, largeValue, Record.EMPTY_HEADERS, cb, now);
|
||||
assertNotNull(future, "Should be able to append the large record to batch");
|
||||
assertEquals(1, batch.recordCount, "Batch should contain exactly one record");
|
||||
batch.close();
|
||||
|
||||
// try to split and reenqueue a single large record
|
||||
SplitAndReenqueueResult result = performSplitAndReenqueueCycle(accum, batch, 0);
|
||||
|
||||
// The below asserts tests that the single large record
|
||||
// results in exactly one "split" batch
|
||||
assertEquals(1, result.numSplitBatches, "Single large record should result in exactly one split batch");
|
||||
assertEquals(1, result.originalRecordCount, "Original batch should have exactly one record");
|
||||
assertEquals(1, result.splitBatch.recordCount, "Split batch should still contain exactly one record");
|
||||
assertTrue(result.originalBatchSize > smallBatchSize, "Original batch should exceed batch size limit");
|
||||
|
||||
// the "split" batch is still oversized and contains the same record
|
||||
assertTrue(result.splitBatch.estimatedSizeInBytes() > smallBatchSize,
|
||||
"Split batch is still oversized - it cannot be split further and will cause an error, will retry infinitely");
|
||||
}
|
||||
|
||||
// This test retries for infinite times (controlled for 5 times for testing)
|
||||
// because the record can never be split further
|
||||
@Test
|
||||
public void testRetrySplitAndReenqueueBehaviourWithSingleLargeRecord() throws ExecutionException, InterruptedException {
|
||||
long now = time.milliseconds();
|
||||
int smallBatchSize = 1024;
|
||||
RecordAccumulator accum = createTestRecordAccumulator(smallBatchSize, 10 * 1024, Compression.NONE, 10);
|
||||
|
||||
// create a single record that is much larger than the batch size limit
|
||||
// we are trying to mimic by send a record larger than broker's message.max.bytes
|
||||
byte[] largeValue = new byte[4 * 1024]; // 4KB > 1KB
|
||||
|
||||
// Create a buffer with enough space for the large record
|
||||
ByteBuffer buffer = ByteBuffer.allocate(8192);
|
||||
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, TimestampType.CREATE_TIME, 0L);
|
||||
ProducerBatch originalBatch = new ProducerBatch(tp1, builder, now, true);
|
||||
|
||||
final AtomicInteger acked = new AtomicInteger(0);
|
||||
Callback cb = (metadata, exception) -> acked.incrementAndGet();
|
||||
|
||||
Future<RecordMetadata> future = originalBatch.tryAppend(now, key, largeValue, Record.EMPTY_HEADERS, cb, now);
|
||||
assertNotNull(future, "Should be able to append the large record to batch");
|
||||
assertEquals(1, originalBatch.recordCount, "Original batch should contain exactly one record");
|
||||
originalBatch.close();
|
||||
|
||||
// controlled test case, retry behavior across multiple cycles
|
||||
// 5 cycles for testing but mimics infinite retries in reality
|
||||
final int maxRetryCycles = 5;
|
||||
|
||||
ProducerBatch currentBatch = originalBatch;
|
||||
List<SplitAndReenqueueResult> results = new ArrayList<>();
|
||||
|
||||
for (int retryAttempt = 0; retryAttempt < maxRetryCycles; retryAttempt++) {
|
||||
SplitAndReenqueueResult result = performSplitAndReenqueueCycle(accum, currentBatch, retryAttempt);
|
||||
results.add(result);
|
||||
|
||||
// Verify that each retry produces exactly 1 "split" batch (cannot be split further)
|
||||
assertEquals(1, result.numSplitBatches, "Single record should result in exactly one split batch in retry attempt " + retryAttempt);
|
||||
assertEquals(1, result.originalRecordCount, "Original batch should have exactly one record in retry attempt " + retryAttempt);
|
||||
assertTrue(result.originalBatchSize > smallBatchSize, "Original batch should exceed size limit in retry attempt " + retryAttempt);
|
||||
assertEquals(1, result.splitBatch.recordCount, "Split batch should still contain exactly one record in retry attempt " + retryAttempt);
|
||||
|
||||
// The split batch is still oversized and will fail with MESSAGE_TOO_LARGE again
|
||||
assertTrue(result.splitBatch.estimatedSizeInBytes() > smallBatchSize,
|
||||
"Split batch in retry " + retryAttempt + " is still oversized and will fail MESSAGE_TOO_LARGE again");
|
||||
|
||||
// the new batch must be the split batch
|
||||
currentBatch = result.splitBatch;
|
||||
}
|
||||
|
||||
// making sure that all the retry attempts were tracked
|
||||
assertEquals(maxRetryCycles, results.size(), "Should have tracked all retry attempts");
|
||||
|
||||
// consistency across all retry cycles - each produces exactly 1 unsplittable batch
|
||||
for (int i = 0; i < maxRetryCycles; i++) {
|
||||
SplitAndReenqueueResult result = results.get(i);
|
||||
assertEquals(1, result.numSplitBatches, "Retry attempt " + i + " should produce exactly 1 split batch");
|
||||
assertEquals(1, result.originalRecordCount, "Retry attempt " + i + " should have exactly 1 record");
|
||||
assertTrue(result.originalBatchSize > smallBatchSize, "Retry attempt " + i + " batch should exceed size limit");
|
||||
}
|
||||
}
|
||||
|
||||
// here I am testing the hasRoomFor() behaviour
|
||||
// It allows the first record no matter the size
|
||||
// but does not allow the second record
|
||||
|
@ -1737,55 +1632,6 @@ public class RecordAccumulatorTest {
|
|||
}
|
||||
}
|
||||
|
||||
private static class SplitAndReenqueueResult {
|
||||
final int numSplitBatches;
|
||||
final int originalRecordCount;
|
||||
final int originalBatchSize;
|
||||
final ProducerBatch splitBatch;
|
||||
|
||||
SplitAndReenqueueResult(int numSplitBatches, int originalRecordCount, int originalBatchSize, ProducerBatch splitBatch) {
|
||||
this.numSplitBatches = numSplitBatches;
|
||||
this.originalRecordCount = originalRecordCount;
|
||||
this.originalBatchSize = originalBatchSize;
|
||||
this.splitBatch = splitBatch;
|
||||
}
|
||||
}
|
||||
|
||||
private SplitAndReenqueueResult performSplitAndReenqueueCycle(RecordAccumulator accum, ProducerBatch batch, int retryAttempt) {
|
||||
long now = time.milliseconds();
|
||||
|
||||
// Enqueue the batch for processing
|
||||
accum.reenqueue(batch, now);
|
||||
time.sleep(121L); // Wait for retry backoff
|
||||
|
||||
RecordAccumulator.ReadyCheckResult result = accum.ready(metadataCache, time.milliseconds());
|
||||
assertFalse(result.readyNodes.isEmpty(), "Batch should be ready for retry attempt " + retryAttempt);
|
||||
|
||||
Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
|
||||
assertEquals(1, drained.size(), "Only node1 should be drained in retry attempt " + retryAttempt);
|
||||
assertEquals(1, drained.get(node1.id()).size(), "Only one batch should be drained in retry attempt " + retryAttempt);
|
||||
|
||||
ProducerBatch drainedBatch = drained.get(node1.id()).get(0);
|
||||
assertEquals(1, drainedBatch.recordCount, "Drained batch should have exactly one record in retry attempt " + retryAttempt);
|
||||
|
||||
int originalRecordCount = drainedBatch.recordCount;
|
||||
int originalBatchSize = drainedBatch.estimatedSizeInBytes();
|
||||
|
||||
int numSplitBatches = accum.splitAndReenqueue(drainedBatch);
|
||||
|
||||
// wait for split batch to become ready
|
||||
time.sleep(101L);
|
||||
result = accum.ready(metadataCache, time.milliseconds());
|
||||
drained = accum.drain(metadataCache, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
|
||||
assertFalse(drained.isEmpty(), "Split batch should be ready for draining in retry attempt " + retryAttempt);
|
||||
assertFalse(drained.get(node1.id()).isEmpty(), "Node1 should have the split batch in retry attempt " + retryAttempt);
|
||||
|
||||
ProducerBatch splitBatch = drained.get(node1.id()).get(0);
|
||||
assertEquals(1, splitBatch.recordCount, "Split batch should still contain exactly one record in retry attempt " + retryAttempt);
|
||||
|
||||
return new SplitAndReenqueueResult(numSplitBatches, originalRecordCount, originalBatchSize, splitBatch);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the offset delta.
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue