KAFKA-16000: Migrated MembershipManagerImplTest away from ConsumerTestBuilder (#16312)

Finishing migration of MembershipManagerImplTest away from ConsumerTestBuilder and removed all spy objects.

Reviewers: Lianet Magrans <lianetmr@gmail.com>, Philip Nee <pnee@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
brenden20 2024-06-28 14:13:27 -05:00 committed by GitHub
parent b0054f3a2f
commit 836f52b0ba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 29 additions and 30 deletions

View File

@ -39,7 +39,6 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
@ -60,10 +59,12 @@ import java.util.SortedSet;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.invokeRebalanceCallbacks; import static org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.invokeRebalanceCallbacks;
import static org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR;
import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkMap;
@ -82,6 +83,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyCollection; import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -97,14 +99,11 @@ public class MembershipManagerImplTest {
private static final String MEMBER_ID = "test-member-1"; private static final String MEMBER_ID = "test-member-1";
private static final int REBALANCE_TIMEOUT = 100; private static final int REBALANCE_TIMEOUT = 100;
private static final int MEMBER_EPOCH = 1; private static final int MEMBER_EPOCH = 1;
private static final LogContext LOG_CONTEXT = new LogContext();
private final LogContext logContext = new LogContext();
private SubscriptionState subscriptionState; private SubscriptionState subscriptionState;
private ConsumerMetadata metadata; private ConsumerMetadata metadata;
private CommitRequestManager commitRequestManager; private CommitRequestManager commitRequestManager;
private ConsumerTestBuilder testBuilder;
private BlockingQueue<BackgroundEvent> backgroundEventQueue; private BlockingQueue<BackgroundEvent> backgroundEventQueue;
private BackgroundEventHandler backgroundEventHandler; private BackgroundEventHandler backgroundEventHandler;
private Time time; private Time time;
@ -113,22 +112,16 @@ public class MembershipManagerImplTest {
@BeforeEach @BeforeEach
public void setup() { public void setup() {
testBuilder = new ConsumerTestBuilder(ConsumerTestBuilder.createDefaultGroupInformation()); metadata = mock(ConsumerMetadata.class);
metadata = testBuilder.metadata; subscriptionState = mock(SubscriptionState.class);
subscriptionState = testBuilder.subscriptions; commitRequestManager = mock(CommitRequestManager.class);
commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); backgroundEventQueue = new LinkedBlockingQueue<>();
backgroundEventQueue = testBuilder.backgroundEventQueue; backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue);
backgroundEventHandler = testBuilder.backgroundEventHandler;
time = new MockTime(0); time = new MockTime(0);
metrics = new Metrics(time); metrics = new Metrics(time);
rebalanceMetricsManager = new RebalanceMetricsManager(metrics); rebalanceMetricsManager = new RebalanceMetricsManager(metrics);
}
@AfterEach when(commitRequestManager.maybeAutoCommitSyncBeforeRevocation(anyLong())).thenReturn(CompletableFuture.completedFuture(null));
public void tearDown() {
if (testBuilder != null) {
testBuilder.close();
}
} }
private MembershipManagerImpl createMembershipManagerJoiningGroup() { private MembershipManagerImpl createMembershipManagerJoiningGroup() {
@ -144,7 +137,7 @@ public class MembershipManagerImplTest {
private MembershipManagerImpl createMembershipManager(String groupInstanceId) { private MembershipManagerImpl createMembershipManager(String groupInstanceId) {
return spy(new MembershipManagerImpl( return spy(new MembershipManagerImpl(
GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT, Optional.empty(), GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT, Optional.empty(),
subscriptionState, commitRequestManager, metadata, logContext, Optional.empty(), subscriptionState, commitRequestManager, metadata, LOG_CONTEXT, Optional.empty(),
backgroundEventHandler, time, rebalanceMetricsManager)); backgroundEventHandler, time, rebalanceMetricsManager));
} }
@ -153,7 +146,7 @@ public class MembershipManagerImplTest {
MembershipManagerImpl manager = spy(new MembershipManagerImpl( MembershipManagerImpl manager = spy(new MembershipManagerImpl(
GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT, GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT,
Optional.ofNullable(serverAssignor), subscriptionState, commitRequestManager, Optional.ofNullable(serverAssignor), subscriptionState, commitRequestManager,
metadata, logContext, Optional.empty(), backgroundEventHandler, time, metadata, LOG_CONTEXT, Optional.empty(), backgroundEventHandler, time,
rebalanceMetricsManager)); rebalanceMetricsManager));
manager.transitionToJoining(); manager.transitionToJoining();
return manager; return manager;
@ -178,7 +171,7 @@ public class MembershipManagerImplTest {
// First join should register to get metadata updates // First join should register to get metadata updates
MembershipManagerImpl manager = new MembershipManagerImpl( MembershipManagerImpl manager = new MembershipManagerImpl(
GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(), GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(),
subscriptionState, commitRequestManager, metadata, logContext, Optional.empty(), subscriptionState, commitRequestManager, metadata, LOG_CONTEXT, Optional.empty(),
backgroundEventHandler, time, rebalanceMetricsManager); backgroundEventHandler, time, rebalanceMetricsManager);
manager.transitionToJoining(); manager.transitionToJoining();
clearInvocations(metadata); clearInvocations(metadata);
@ -248,7 +241,7 @@ public class MembershipManagerImplTest {
public void testTransitionToFailedWhenTryingToJoin() { public void testTransitionToFailedWhenTryingToJoin() {
MembershipManagerImpl membershipManager = new MembershipManagerImpl( MembershipManagerImpl membershipManager = new MembershipManagerImpl(
GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(), GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(),
subscriptionState, commitRequestManager, metadata, logContext, Optional.empty(), subscriptionState, commitRequestManager, metadata, LOG_CONTEXT, Optional.empty(),
backgroundEventHandler, time, rebalanceMetricsManager); backgroundEventHandler, time, rebalanceMetricsManager);
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
membershipManager.transitionToJoining(); membershipManager.transitionToJoining();
@ -728,7 +721,7 @@ public class MembershipManagerImplTest {
// Member should update the subscription and send ack when the delayed reconciliation // Member should update the subscription and send ack when the delayed reconciliation
// completes. // completes.
verify(subscriptionState).assignFromSubscribed(Collections.emptySet()); verify(subscriptionState).assignFromSubscribedAwaitingCallback(Collections.emptySet(), Collections.emptySet());
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state()); assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
// Pending assignment that was discovered in metadata should be ready to reconcile in the // Pending assignment that was discovered in metadata should be ready to reconcile in the
@ -747,7 +740,7 @@ public class MembershipManagerImplTest {
membershipManager.poll(time.milliseconds()); membershipManager.poll(time.milliseconds());
assertEquals(Collections.emptySet(), membershipManager.topicsAwaitingReconciliation()); assertEquals(Collections.emptySet(), membershipManager.topicsAwaitingReconciliation());
verify(subscriptionState).assignFromSubscribed(topicPartitions(topic2Assignment, topic2Metadata)); verify(subscriptionState).assignFromSubscribedAwaitingCallback(topicPartitions(topic2Assignment, topic2Metadata), topicPartitions(topic2Assignment, topic2Metadata));
} }
/** /**
@ -1162,7 +1155,7 @@ public class MembershipManagerImplTest {
Set<TopicPartition> expectedAssignment = Collections.singleton(new TopicPartition(topicName, 0)); Set<TopicPartition> expectedAssignment = Collections.singleton(new TopicPartition(topicName, 0));
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state()); assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
verify(subscriptionState).assignFromSubscribed(expectedAssignment); verify(subscriptionState).assignFromSubscribedAwaitingCallback(expectedAssignment, expectedAssignment);
// When ack for the reconciled assignment is sent, member should go back to STABLE // When ack for the reconciled assignment is sent, member should go back to STABLE
// because the first assignment that was not resolved should have been discarded // because the first assignment that was not resolved should have been discarded
@ -1256,7 +1249,7 @@ public class MembershipManagerImplTest {
// Assignment should have been reconciled. // Assignment should have been reconciled.
Set<TopicPartition> expectedAssignment = Collections.singleton(new TopicPartition(topicName, 1)); Set<TopicPartition> expectedAssignment = Collections.singleton(new TopicPartition(topicName, 1));
verify(subscriptionState).assignFromSubscribed(expectedAssignment); verify(subscriptionState).assignFromSubscribedAwaitingCallback(expectedAssignment, expectedAssignment);
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state()); assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
assertTrue(membershipManager.topicsAwaitingReconciliation().isEmpty()); assertTrue(membershipManager.topicsAwaitingReconciliation().isEmpty());
} }
@ -1402,6 +1395,7 @@ public class MembershipManagerImplTest {
receiveEmptyAssignment(membershipManager); receiveEmptyAssignment(membershipManager);
verifyReconciliationNotTriggered(membershipManager); verifyReconciliationNotTriggered(membershipManager);
when(commitRequestManager.maybeAutoCommitSyncBeforeRevocation(anyLong())).thenReturn(commitResult);
membershipManager.poll(time.milliseconds()); membershipManager.poll(time.milliseconds());
// Member stays in RECONCILING while the commit request hasn't completed. // Member stays in RECONCILING while the commit request hasn't completed.
@ -1458,11 +1452,14 @@ public class MembershipManagerImplTest {
verifyReconciliationNotTriggered(membershipManager); verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds()); membershipManager.poll(time.milliseconds());
TreeSet<TopicPartition> expectedSet = new TreeSet<>(TOPIC_PARTITION_COMPARATOR);
expectedSet.add(new TopicPartition(topicName, 1));
expectedSet.add(new TopicPartition(topicName, 2));
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state()); assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
assertEquals(topicIdPartitionsMap(topicId, 1, 2), membershipManager.currentAssignment().partitions); assertEquals(topicIdPartitionsMap(topicId, 1, 2), membershipManager.currentAssignment().partitions);
assertFalse(membershipManager.reconciliationInProgress()); assertFalse(membershipManager.reconciliationInProgress());
verify(subscriptionState).assignFromSubscribed(anyCollection()); verify(subscriptionState).assignFromSubscribedAwaitingCallback(expectedSet, expectedSet);
} }
@Test @Test
@ -1931,7 +1928,8 @@ public class MembershipManagerImplTest {
membershipManager.onHeartbeatRequestSent(); membershipManager.onHeartbeatRequestSent();
assertEquals(MemberState.STALE, membershipManager.state()); assertEquals(MemberState.STALE, membershipManager.state());
verify(backgroundEventHandler).add(any(ConsumerRebalanceListenerCallbackNeededEvent.class)); assertFalse(backgroundEventQueue.isEmpty());
assertInstanceOf(ConsumerRebalanceListenerCallbackNeededEvent.class, backgroundEventQueue.peek());
// Stale member triggers onPartitionLost callback that will not complete just yet // Stale member triggers onPartitionLost callback that will not complete just yet
ConsumerRebalanceListenerCallbackCompletedEvent callbackEvent = performCallback( ConsumerRebalanceListenerCallbackCompletedEvent callbackEvent = performCallback(
@ -2430,7 +2428,7 @@ public class MembershipManagerImplTest {
// Assignment applied // Assignment applied
List<TopicPartition> expectedTopicPartitions = buildTopicPartitions(expectedAssignment); List<TopicPartition> expectedTopicPartitions = buildTopicPartitions(expectedAssignment);
verify(subscriptionState).assignFromSubscribed(new HashSet<>(expectedTopicPartitions)); verify(subscriptionState).assignFromSubscribedAwaitingCallback(eq(new HashSet<>(expectedTopicPartitions)), any());
Map<Uuid, SortedSet<Integer>> assignmentByTopicId = assignmentByTopicId(expectedAssignment); Map<Uuid, SortedSet<Integer>> assignmentByTopicId = assignmentByTopicId(expectedAssignment);
assertEquals(assignmentByTopicId, membershipManager.currentAssignment().partitions); assertEquals(assignmentByTopicId, membershipManager.currentAssignment().partitions);
@ -2489,7 +2487,8 @@ public class MembershipManagerImplTest {
verify(subscriptionState).markPendingRevocation(anySet()); verify(subscriptionState).markPendingRevocation(anySet());
List<TopicPartition> expectedTopicPartitionAssignment = List<TopicPartition> expectedTopicPartitionAssignment =
buildTopicPartitions(expectedCurrentAssignment); buildTopicPartitions(expectedCurrentAssignment);
verify(subscriptionState).assignFromSubscribed(new HashSet<>(expectedTopicPartitionAssignment)); HashSet<TopicPartition> expectedSet = new HashSet<>(expectedTopicPartitionAssignment);
verify(subscriptionState).assignFromSubscribedAwaitingCallback(expectedSet, Collections.emptySet());
} }
private Map<Uuid, SortedSet<Integer>> assignmentByTopicId(List<TopicIdPartition> topicIdPartitions) { private Map<Uuid, SortedSet<Integer>> assignmentByTopicId(List<TopicIdPartition> topicIdPartitions) {
@ -2543,7 +2542,7 @@ public class MembershipManagerImplTest {
if (triggerReconciliation) { if (triggerReconciliation) {
membershipManager.poll(time.milliseconds()); membershipManager.poll(time.milliseconds());
verify(subscriptionState).assignFromSubscribed(anyCollection()); verify(subscriptionState).assignFromSubscribedAwaitingCallback(anyCollection(), anyCollection());
} else { } else {
verify(subscriptionState, never()).assignFromSubscribed(anyCollection()); verify(subscriptionState, never()).assignFromSubscribed(anyCollection());
} }