KAFKA-14841 Handle callbacks to ConsumerRebalanceListener in MockConsumer (#13455)

Reviewers: Philip Nee <philipnee@gmail.com>
This commit is contained in:
Daniel Scanteianu 2023-05-26 14:33:03 +01:00 committed by GitHub
parent c14e0df617
commit 6d72c26731
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 65 additions and 2 deletions

View File

@ -92,11 +92,27 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
return this.subscriptions.assignedPartitions();
}
/** Simulate a rebalance event. */
/**
* Simulate a rebalance event.
*/
public synchronized void rebalance(Collection<TopicPartition> newAssignment) {
// TODO: Rebalance callbacks
// compute added and removed partitions for rebalance callback
Set<TopicPartition> oldAssignmentSet = this.subscriptions.assignedPartitions();
Set<TopicPartition> newAssignmentSet = new HashSet<>(newAssignment);
List<TopicPartition> added = newAssignment.stream().filter(x -> !oldAssignmentSet.contains(x)).collect(Collectors.toList());
List<TopicPartition> removed = oldAssignmentSet.stream().filter(x -> !newAssignmentSet.contains(x)).collect(Collectors.toList());
// rebalance
this.records.clear();
this.subscriptions.assignFromSubscribed(newAssignment);
// rebalance callbacks
if (!added.isEmpty()) {
this.subscriptions.rebalanceListener().onPartitionsAssigned(added);
}
if (!removed.isEmpty()) {
this.subscriptions.rebalanceListener().onPartitionsRevoked(removed);
}
}
@Override

View File

@ -22,11 +22,13 @@ import org.apache.kafka.common.record.TimestampType;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -139,4 +141,49 @@ public class MockConsumerTest {
assertEquals(11L, (long) consumer.endOffsets(Collections.singleton(partition)).get(partition));
}
@Test
public void testRebalanceListener() {
final List<TopicPartition> revoked = new ArrayList<>();
final List<TopicPartition> assigned = new ArrayList<>();
ConsumerRebalanceListener consumerRebalanceListener = new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
revoked.clear();
revoked.addAll(partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
assigned.clear();
assigned.addAll(partitions);
}
};
consumer.subscribe(Collections.singleton("test"), consumerRebalanceListener);
assertEquals(0, consumer.poll(Duration.ZERO).count());
List<TopicPartition> topicPartitionList = Arrays.asList(new TopicPartition("test", 0), new TopicPartition("test", 1));
consumer.rebalance(topicPartitionList);
assertTrue(revoked.isEmpty());
assertEquals(2, assigned.size());
assertTrue(assigned.contains(topicPartitionList.get(0)));
assertTrue(assigned.contains(topicPartitionList.get(1)));
consumer.rebalance(Collections.emptyList());
assertEquals(2, assigned.size());
assertTrue(revoked.contains(topicPartitionList.get(0)));
assertTrue(revoked.contains(topicPartitionList.get(1)));
consumer.rebalance(Collections.singletonList(topicPartitionList.get(0)));
assertEquals(1, assigned.size());
assertTrue(assigned.contains(topicPartitionList.get(0)));
consumer.rebalance(Collections.singletonList(topicPartitionList.get(1)));
assertEquals(1, assigned.size());
assertTrue(assigned.contains(topicPartitionList.get(1)));
assertEquals(1, revoked.size());
assertTrue(revoked.contains(topicPartitionList.get(0)));
}
}