MINOR: Client state machine fix for ignoring new assignments when leaving (#15003)

This includes a fix for properly handling cases where the member might receive a new assignment from the broker when the member is already getting ready to leave the group. The expectation is that the member should just ignore the new assignment and continue with the leave group process.

Reviewer: Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
Lianet Magrans 2023-12-14 02:35:52 -05:00 committed by Bruno Cadonna
parent 51ab90d876
commit 476e024bee
3 changed files with 78 additions and 23 deletions

View File

@ -114,7 +114,7 @@ public enum MemberState {
STABLE.previousValidStates = Arrays.asList(JOINING, ACKNOWLEDGING, RECONCILING);
RECONCILING.previousValidStates = Arrays.asList(STABLE, JOINING, ACKNOWLEDGING);
RECONCILING.previousValidStates = Arrays.asList(STABLE, JOINING, ACKNOWLEDGING, RECONCILING);
ACKNOWLEDGING.previousValidStates = Arrays.asList(RECONCILING);
@ -145,4 +145,13 @@ public enum MemberState {
public List<MemberState> getPreviousValidStates() {
return this.previousValidStates;
}
/**
* @return True if the member is in a state where it should reconcile the new assignment.
* Expected to be true whenever the member is part of the group and intends of staying in it
* (ex. false when the member is preparing to leave the group).
*/
public boolean canHandleNewAssignment() {
return MemberState.RECONCILING.getPreviousValidStates().contains(this);
}
}

View File

@ -348,34 +348,54 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
ConsumerGroupHeartbeatResponseData.Assignment assignment = response.assignment();
if (assignment != null) {
replaceUnresolvedAssignmentWithNewAssignment(assignment);
if (!assignmentUnresolved.equals(currentAssignment)) {
// Transition the member to RECONCILING when receiving a new target
// assignment from the broker, different from the current assignment. Note that the
// reconciliation might not be triggered just yet because of missing metadata.
transitionTo(MemberState.RECONCILING);
assignmentReadyToReconcile.clear();
resolveMetadataForUnresolvedAssignment();
reconcile();
} else {
// Same assignment received, nothing to reconcile.
log.debug("Target assignment {} received from the broker is equals to the member " +
"current assignment {}. Nothing to reconcile.",
assignmentUnresolved, currentAssignment);
// Make sure we transition the member back to STABLE if it was RECONCILING (ex.
// member was RECONCILING unresolved assignments that were just removed by the
// broker).
if (state == MemberState.RECONCILING) {
// This is the case where a member was RECONCILING an unresolved
// assignment that was removed by the broker in a following assignment.
transitionTo(MemberState.STABLE);
}
if (!state.canHandleNewAssignment()) {
// New assignment received but member is in a state where it cannot take new
// assignments (ex. preparing to leave the group)
log.debug("Ignoring new assignment {} received from server because member is in {} state.",
assignment, state);
return;
}
processAssignmentReceived(assignment);
} else if (allPendingAssignmentsReconciled()) {
transitionTo(MemberState.STABLE);
}
}
/**
* This will process the assignment received if it is different from the member's current
* assignment. If a new assignment is received, this will try to resolve the topic names from
* metadata, reconcile the resolved assignment, and keep the unresolved to be reconciled when
* metadata is discovered.
*
* @param assignment Assignment received from the broker.
*/
private void processAssignmentReceived(ConsumerGroupHeartbeatResponseData.Assignment assignment) {
replaceUnresolvedAssignmentWithNewAssignment(assignment);
if (!assignmentUnresolved.equals(currentAssignment)) {
// Transition the member to RECONCILING when receiving a new target
// assignment from the broker, different from the current assignment. Note that the
// reconciliation might not be triggered just yet because of missing metadata.
transitionTo(MemberState.RECONCILING);
assignmentReadyToReconcile.clear();
resolveMetadataForUnresolvedAssignment();
reconcile();
} else {
// Same assignment received, nothing to reconcile.
log.debug("Target assignment {} received from the broker is equals to the member " +
"current assignment {}. Nothing to reconcile.",
assignmentUnresolved, currentAssignment);
// Make sure we transition the member back to STABLE if it was RECONCILING (ex.
// member was RECONCILING unresolved assignments that were just removed by the
// broker).
if (state == MemberState.RECONCILING) {
// This is the case where a member was RECONCILING an unresolved
// assignment that was removed by the broker in a following assignment.
transitionTo(MemberState.STABLE);
}
}
}
/**
* Overwrite collection of unresolved topic Ids with the new target assignment. This will
* effectively achieve the following:

View File

@ -270,6 +270,32 @@ public class MembershipManagerImplTest {
assertTrue(membershipManager.shouldSkipHeartbeat());
}
@Test
public void testNewAssignmentIgnoredWhenStateIsPrepareLeaving() {
MembershipManagerImpl membershipManager = createMemberInStableState();
// Start leaving group, blocked waiting for commit of all consumed to complete.
CompletableFuture<Void> commitResult = mockPrepareLeavingStuckCommitting();
membershipManager.leaveGroup();
assertEquals(MemberState.PREPARE_LEAVING, membershipManager.state());
// Get new assignment while preparing to leave the group. Member should continue leaving
// the group, ignoring the new assignment received.
Uuid topicId = Uuid.randomUuid();
mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, "topic1",
Collections.emptyList(), true);
receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
assertEquals(MemberState.PREPARE_LEAVING, membershipManager.state());
assertTrue(membershipManager.assignmentReadyToReconcile().isEmpty());
assertTrue(membershipManager.topicsWaitingForMetadata().isEmpty());
verify(membershipManager, never()).markReconciliationInProgress();
// When commit completes member should transition to LEAVING.
when(subscriptionState.rebalanceListener()).thenReturn(Optional.empty());
commitResult.complete(null);
assertEquals(MemberState.LEAVING, membershipManager.state());
}
@Test
public void testFencingWhenStateIsLeaving() {
MembershipManagerImpl membershipManager = createMemberInStableState();