KAFKA-13834: add test coverage for RecordAccumulatorTest (#12092)

Reviewers: Luke Chen <showuon@gmail.com>
This commit is contained in:
ruanliang 2022-04-24 17:06:19 +08:00 committed by GitHub
parent ff3d42a18c
commit e8c675ed56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 19 additions and 7 deletions

View File

@ -50,6 +50,7 @@ import org.mockito.Mockito;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
@ -61,6 +62,7 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static java.util.Arrays.asList;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -107,7 +109,7 @@ public class RecordAccumulatorTest {
PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, null, null);
long batchSize = value.length + DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
RecordAccumulator accum = createTestRecordAccumulator((int) batchSize, 1024, CompressionType.NONE, 10);
RecordAccumulator accum = createTestRecordAccumulator((int) batchSize, Integer.MAX_VALUE, CompressionType.NONE, 10);
Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3, part4),
Collections.emptySet(), Collections.emptySet());
@ -142,15 +144,25 @@ public class RecordAccumulatorTest {
// drain batches from 2 nodes: node1 => tp2, node2 => tp3 (because tp4 is muted)
Map<Integer, List<ProducerBatch>> batches4 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
verifyTopicPartitionInBatches(batches4, tp2, tp3);
// add record for tp1, tp2, tp3, and unmute tp4
accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
accum.unmutePartition(tp4);
// set maxSize as a max value, so that the all partitions in 2 nodes should be drained: node1 => [tp1, tp2], node2 => [tp3, tp4]
Map<Integer, List<ProducerBatch>> batches5 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), Integer.MAX_VALUE, 0);
verifyTopicPartitionInBatches(batches5, tp1, tp2, tp3, tp4);
}
private void verifyTopicPartitionInBatches(Map<Integer, List<ProducerBatch>> batches, TopicPartition... tp) {
assertEquals(tp.length, batches.size());
private void verifyTopicPartitionInBatches(Map<Integer, List<ProducerBatch>> nodeBatches, TopicPartition... tp) {
int allTpBatchCount = nodeBatches.values().stream().flatMap(Collection::stream).collect(Collectors.toList()).size();
assertEquals(tp.length, allTpBatchCount);
List<TopicPartition> topicPartitionsInBatch = new ArrayList<TopicPartition>();
for (Map.Entry<Integer, List<ProducerBatch>> entry : batches.entrySet()) {
List<ProducerBatch> batchList = entry.getValue();
assertEquals(1, batchList.size());
topicPartitionsInBatch.add(batchList.get(0).topicPartition);
for (Map.Entry<Integer, List<ProducerBatch>> entry : nodeBatches.entrySet()) {
List<ProducerBatch> tpBatchList = entry.getValue();
List<TopicPartition> tpList = tpBatchList.stream().map(producerBatch -> producerBatch.topicPartition).collect(Collectors.toList());
topicPartitionsInBatch.addAll(tpList);
}
for (int i = 0; i < tp.length; i++) {