KAFKA-14502; Implement LeaveGroup protocol in new GroupCoordinator (#14147)

This patch implements the LeaveGroup API in the new group coordinator.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Jeff Kim 2023-09-13 04:43:37 -04:00 committed by GitHub
parent 8a7e5e8ea0
commit e9057aab37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 893 additions and 18 deletions

View File

@ -330,7 +330,7 @@
<suppress checks="ParameterNumber"
files="(ConsumerGroupMember|GroupMetadataManager).java"/>
<suppress checks="ClassDataAbstractionCouplingCheck"
files="(RecordHelpersTest|GroupMetadataManagerTest|GroupCoordinatorServiceTest).java"/>
files="(RecordHelpersTest|GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorServiceTest).java"/>
<suppress checks="JavaNCSS"
files="GroupMetadataManagerTest.java"/>

View File

@ -26,6 +26,7 @@ import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
@ -79,6 +80,7 @@ import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntSupplier;
import java.util.stream.Collectors;
/**
* The group coordinator service.
@ -302,7 +304,8 @@ public class GroupCoordinatorService implements GroupCoordinator {
return responseFuture;
}
runtime.scheduleWriteOperation("generic-group-join",
runtime.scheduleWriteOperation(
"generic-group-join",
topicPartitionFor(request.groupId()),
coordinator -> coordinator.genericGroupJoin(context, request, responseFuture)
).exceptionally(exception -> {
@ -341,7 +344,8 @@ public class GroupCoordinatorService implements GroupCoordinator {
CompletableFuture<SyncGroupResponseData> responseFuture = new CompletableFuture<>();
runtime.scheduleWriteOperation("generic-group-sync",
runtime.scheduleWriteOperation(
"generic-group-sync",
topicPartitionFor(request.groupId()),
coordinator -> coordinator.genericGroupSync(context, request, responseFuture)
).exceptionally(exception -> {
@ -411,9 +415,37 @@ public class GroupCoordinatorService implements GroupCoordinator {
return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
"This API is not implemented yet."
));
if (!isGroupIdNotEmpty(request.groupId())) {
return CompletableFuture.completedFuture(new LeaveGroupResponseData()
.setErrorCode(Errors.INVALID_GROUP_ID.code()));
}
return runtime.scheduleWriteOperation(
"generic-group-leave",
topicPartitionFor(request.groupId()),
coordinator -> coordinator.genericGroupLeave(context, request)
).exceptionally(exception -> {
if (!(exception instanceof KafkaException)) {
log.error("LeaveGroup request {} hit an unexpected exception: {}",
request, exception.getMessage());
}
if (exception instanceof UnknownMemberIdException) {
// Group was not found.
List<LeaveGroupResponseData.MemberResponse> memberResponses = request.members().stream()
.map(member -> new LeaveGroupResponseData.MemberResponse()
.setMemberId(member.memberId())
.setGroupInstanceId(member.groupInstanceId())
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()))
.collect(Collectors.toList());
return new LeaveGroupResponseData()
.setMembers(memberResponses);
}
return new LeaveGroupResponseData()
.setErrorCode(Errors.forException(exception).code());
});
}
/**

View File

@ -22,6 +22,8 @@ import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
@ -306,6 +308,22 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> {
return offsetMetadataManager.commitOffset(context, request);
}
/**
* Handles a LeaveGroup request.
*
* @param context The request context.
* @param request The actual LeaveGroup request.
*
* @return A Result containing the LeaveGroup response and
* a list of records to update the state machine.
*/
public CoordinatorResult<LeaveGroupResponseData, Record> genericGroupLeave(
RequestContext context,
LeaveGroupRequestData request
) throws ApiException {
return groupMetadataManager.genericGroupLeave(context, request);
}
/**
* The coordinator has been loaded. This is used to apply any
* post loading operations (e.g. registering timers).

View File

@ -37,6 +37,10 @@ import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.protocol.Errors;
@ -2904,6 +2908,149 @@ public class GroupMetadataManager {
}
}
/**
* Handle a generic LeaveGroupRequest.
*
* @param context The request context.
* @param request The actual LeaveGroup request.
*
* @return The LeaveGroup response and the GroupMetadata record to append if the group
* no longer has any members.
*/
public CoordinatorResult<LeaveGroupResponseData, Record> genericGroupLeave(
RequestContext context,
LeaveGroupRequestData request
) throws UnknownMemberIdException, GroupIdNotFoundException {
GenericGroup group = getOrMaybeCreateGenericGroup(request.groupId(), false);
if (group.isInState(DEAD)) {
return new CoordinatorResult<>(
Collections.emptyList(),
new LeaveGroupResponseData()
.setErrorCode(COORDINATOR_NOT_AVAILABLE.code())
);
}
List<MemberResponse> memberResponses = new ArrayList<>();
for (MemberIdentity member: request.members()) {
String reason = member.reason() != null ? member.reason() : "not provided";
// The LeaveGroup API allows administrative removal of members by GroupInstanceId
// in which case we expect the MemberId to be undefined.
if (UNKNOWN_MEMBER_ID.equals(member.memberId())) {
if (member.groupInstanceId() != null && group.hasStaticMember(member.groupInstanceId())) {
removeCurrentMemberFromGenericGroup(
group,
group.staticMemberId(member.groupInstanceId()),
reason
);
memberResponses.add(
new MemberResponse()
.setMemberId(member.memberId())
.setGroupInstanceId(member.groupInstanceId())
);
} else {
memberResponses.add(
new MemberResponse()
.setMemberId(member.memberId())
.setGroupInstanceId(member.groupInstanceId())
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
);
}
} else if (group.isPendingMember(member.memberId())) {
group.remove(member.memberId());
timer.cancel(genericGroupHeartbeatKey(group.groupId(), member.memberId()));
log.info("[Group {}] Pending member {} has left group through explicit `LeaveGroup` request; client reason: {}",
group.groupId(), member.memberId(), reason);
memberResponses.add(
new MemberResponse()
.setMemberId(member.memberId())
.setGroupInstanceId(member.groupInstanceId())
);
} else {
try {
group.validateMember(member.memberId(), member.groupInstanceId(), "leave-group");
removeCurrentMemberFromGenericGroup(
group,
member.memberId(),
reason
);
memberResponses.add(
new MemberResponse()
.setMemberId(member.memberId())
.setGroupInstanceId(member.groupInstanceId())
);
} catch (KafkaException e) {
memberResponses.add(
new MemberResponse()
.setMemberId(member.memberId())
.setGroupInstanceId(member.groupInstanceId())
.setErrorCode(Errors.forException(e).code())
);
}
}
}
List<String> validLeaveGroupMembers = memberResponses.stream()
.filter(response -> response.errorCode() == Errors.NONE.code())
.map(MemberResponse::memberId)
.collect(Collectors.toList());
String reason = "explicit `LeaveGroup` request for (" + String.join(", ", validLeaveGroupMembers) + ") members.";
CoordinatorResult<Void, Record> coordinatorResult = EMPTY_RESULT;
if (!validLeaveGroupMembers.isEmpty()) {
switch (group.currentState()) {
case STABLE:
case COMPLETING_REBALANCE:
coordinatorResult = maybePrepareRebalanceOrCompleteJoin(group, reason);
break;
case PREPARING_REBALANCE:
coordinatorResult = maybeCompleteJoinPhase(group);
break;
default:
}
}
return new CoordinatorResult<>(
coordinatorResult.records(),
new LeaveGroupResponseData()
.setMembers(memberResponses),
coordinatorResult.appendFuture()
);
}
/**
* Remove a member from the group. Cancel member's heartbeat, and prepare rebalance
* or complete the join phase if necessary.
*
* @param group The generic group.
* @param memberId The member id.
* @param reason The reason for the LeaveGroup request.
*
*/
private void removeCurrentMemberFromGenericGroup(
GenericGroup group,
String memberId,
String reason
) {
GenericGroupMember member = group.member(memberId);
timer.cancel(genericGroupHeartbeatKey(group.groupId(), memberId));
log.info("[Group {}] Member {} has left group through explicit `LeaveGroup` request; client reason: {}",
group.groupId(), memberId, reason);
// New members may timeout with a pending JoinGroup while the group is still rebalancing, so we have
// to invoke the callback before removing the member. We return UNKNOWN_MEMBER_ID so that the consumer
// will retry the JoinGroup request if is still active.
group.completeJoinFuture(
member,
new JoinGroupResponseData()
.setMemberId(UNKNOWN_MEMBER_ID)
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
);
group.remove(member.memberId());
}
/**
* Checks whether the given protocol type or name in the request is inconsistent with the group's.
*

View File

@ -692,10 +692,10 @@ public class GenericGroup implements Group {
}
/**
* @return all static members in the group.
* @return the ids of all static members in the group.
*/
public Set<String> allStaticMemberIds() {
return staticMembers.keySet();
return new HashSet<>(staticMembers.values());
}
// For testing only.

View File

@ -41,7 +41,7 @@ public class CoordinatorResult<T, U> {
/**
* The future to complete once the records are committed.
*/
private final CompletableFuture<T> appendFuture;
private final CompletableFuture<Void> appendFuture;
/**
* Constructs a Result with records and a response.
@ -64,7 +64,7 @@ public class CoordinatorResult<T, U> {
*/
public CoordinatorResult(
List<U> records,
CompletableFuture<T> appendFuture
CompletableFuture<Void> appendFuture
) {
this(records, null, appendFuture);
}
@ -79,7 +79,7 @@ public class CoordinatorResult<T, U> {
public CoordinatorResult(
List<U> records,
T response,
CompletableFuture<T> appendFuture
CompletableFuture<Void> appendFuture
) {
this.records = Objects.requireNonNull(records);
this.response = response;
@ -114,7 +114,7 @@ public class CoordinatorResult<T, U> {
/**
* @return The append-future.
*/
public CompletableFuture<T> appendFuture() {
public CompletableFuture<Void> appendFuture() {
return appendFuture;
}

View File

@ -698,10 +698,10 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
*/
@Override
public void complete(Throwable exception) {
CompletableFuture<T> appendFuture = result != null ? result.appendFuture() : null;
CompletableFuture<Void> appendFuture = result != null ? result.appendFuture() : null;
if (exception == null) {
if (appendFuture != null) result.appendFuture().complete(result.response());
if (appendFuture != null) result.appendFuture().complete(null);
future.complete(result.response());
} else {
if (appendFuture != null) result.appendFuture().completeExceptionally(exception);

View File

@ -28,6 +28,7 @@ import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
@ -37,6 +38,8 @@ import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.network.ClientInformation;
@ -62,6 +65,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.OptionalInt;
import java.util.Properties;
@ -701,4 +705,87 @@ public class GroupCoordinatorServiceTest {
assertEquals(response, future.get(5, TimeUnit.SECONDS));
}
@Test
public void testLeaveGroup() throws Exception {
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
);
LeaveGroupRequestData request = new LeaveGroupRequestData()
.setGroupId("foo");
service.startup(() -> 1);
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("generic-group-leave"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(
new LeaveGroupResponseData()
));
CompletableFuture<LeaveGroupResponseData> future = service.leaveGroup(
requestContext(ApiKeys.LEAVE_GROUP),
request
);
assertTrue(future.isDone());
assertEquals(new LeaveGroupResponseData(), future.get());
}
@Test
public void testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
);
LeaveGroupRequestData request = new LeaveGroupRequestData()
.setGroupId("foo")
.setMembers(Arrays.asList(
new LeaveGroupRequestData.MemberIdentity()
.setMemberId("member-1")
.setGroupInstanceId("instance-1"),
new LeaveGroupRequestData.MemberIdentity()
.setMemberId("member-2")
.setGroupInstanceId("instance-2")
));
service.startup(() -> 1);
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("generic-group-leave"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenReturn(FutureUtils.failedFuture(
new UnknownMemberIdException()
));
CompletableFuture<LeaveGroupResponseData> future = service.leaveGroup(
requestContext(ApiKeys.LEAVE_GROUP),
request
);
assertTrue(future.isDone());
LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
.setErrorCode(Errors.NONE.code())
.setMembers(Arrays.asList(
new LeaveGroupResponseData.MemberResponse()
.setMemberId("member-1")
.setGroupInstanceId("instance-1")
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()),
new LeaveGroupResponseData.MemberResponse()
.setMemberId("member-2")
.setGroupInstanceId("instance-2")
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
));
assertEquals(expectedResponse, future.get());
}
}

View File

@ -38,6 +38,9 @@ import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment;
import org.apache.kafka.common.message.SyncGroupResponseData;
@ -738,7 +741,6 @@ public class GroupMetadataManagerTest {
return new SyncResult(responseFuture, coordinatorResult);
}
public RebalanceResult staticMembersJoinAndRebalance(
String groupId,
String leaderInstanceId,
@ -854,7 +856,7 @@ public class GroupMetadataManagerTest {
);
}
public JoinGroupResponseData setupGroupWithPendingMember(GenericGroup group) throws Exception {
public PendingMemberGroupResult setupGroupWithPendingMember(GenericGroup group) throws Exception {
// Add the first member
JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
.withGroupId("group-id")
@ -957,9 +959,14 @@ public class GroupMetadataManagerTest {
assertTrue(followerJoinResult.records.isEmpty());
assertFalse(followerJoinResult.joinFuture.isDone());
assertTrue(group.isInState(PREPARING_REBALANCE));
assertEquals(2, group.size());
assertEquals(1, group.numPendingJoinMembers());
return pendingMemberJoinResult.joinFuture.get();
return new PendingMemberGroupResult(
leaderJoinResponse.memberId(),
followerId,
pendingMemberJoinResult.joinFuture.get()
);
}
public void verifySessionExpiration(GenericGroup group, int timeoutMs) {
@ -1099,6 +1106,28 @@ public class GroupMetadataManagerTest {
return joinResponses;
}
public CoordinatorResult<LeaveGroupResponseData, Record> sendGenericGroupLeave(
LeaveGroupRequestData request
) {
RequestContext context = new RequestContext(
new RequestHeader(
ApiKeys.LEAVE_GROUP,
ApiKeys.LEAVE_GROUP.latestVersion(),
"client",
0
),
"1",
InetAddress.getLoopbackAddress(),
KafkaPrincipal.ANONYMOUS,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
SecurityProtocol.PLAINTEXT,
ClientInformation.EMPTY,
false
);
return groupMetadataManager.genericGroupLeave(context, request);
}
private ApiMessage messageOrNull(ApiMessageAndVersion apiMessageAndVersion) {
if (apiMessageAndVersion == null) {
return null;
@ -7730,7 +7759,7 @@ public class GroupMetadataManagerTest {
// Set up a group in with a pending member. The test checks if the pending member joining
// completes the rebalancing operation
JoinGroupResponseData pendingMemberResponse = context.setupGroupWithPendingMember(group);
JoinGroupResponseData pendingMemberResponse = context.setupGroupWithPendingMember(group).pendingMemberResponse;
// Compete join group for the pending member
JoinGroupRequestData request = new JoinGroupRequestBuilder()
@ -8747,6 +8776,552 @@ public class GroupMetadataManagerTest {
assertEquals(expectedGroupInstanceIds, groupInstanceIds);
}
@Test
public void testGroupStuckInRebalanceTimeoutDueToNonjoinedStaticMember() throws Exception {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
int longSessionTimeoutMs = 10000;
int rebalanceTimeoutMs = 5000;
RebalanceResult rebalanceResult = context.staticMembersJoinAndRebalance(
"group-id",
"leader-instance-id",
"follower-instance-id",
rebalanceTimeoutMs,
longSessionTimeoutMs
);
GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup("group-id", false);
// New member joins
JoinResult joinResult = context.sendGenericGroupJoin(
new JoinGroupRequestBuilder()
.withGroupId("group-id")
.withMemberId(UNKNOWN_MEMBER_ID)
.withProtocolSuperset()
.withSessionTimeoutMs(longSessionTimeoutMs)
.build()
);
// The new dynamic member has been elected as leader
assertNoOrEmptyResult(context.sleep(rebalanceTimeoutMs));
assertTrue(joinResult.joinFuture.isDone());
assertEquals(Errors.NONE.code(), joinResult.joinFuture.get().errorCode());
assertEquals(joinResult.joinFuture.get().leader(), joinResult.joinFuture.get().memberId());
assertEquals(3, joinResult.joinFuture.get().members().size());
assertEquals(2, joinResult.joinFuture.get().generationId());
assertTrue(group.isInState(COMPLETING_REBALANCE));
assertEquals(
mkSet(rebalanceResult.leaderId, rebalanceResult.followerId, joinResult.joinFuture.get().memberId()),
group.allMemberIds()
);
assertEquals(
mkSet(rebalanceResult.leaderId, rebalanceResult.followerId),
group.allStaticMemberIds()
);
assertEquals(
mkSet(joinResult.joinFuture.get().memberId()),
group.allDynamicMemberIds()
);
// Send a special leave group request from static follower, moving group towards PreparingRebalance
CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave(
new LeaveGroupRequestData()
.setGroupId("group-id")
.setMembers(Collections.singletonList(
new MemberIdentity()
.setMemberId(rebalanceResult.followerId)
.setGroupInstanceId("follower-instance-id")
))
);
LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
.setMembers(Collections.singletonList(
new LeaveGroupResponseData.MemberResponse()
.setMemberId(rebalanceResult.followerId)
.setGroupInstanceId("follower-instance-id")));
assertEquals(expectedResponse, leaveResult.response());
assertTrue(group.isInState(PREPARING_REBALANCE));
context.sleep(rebalanceTimeoutMs);
// Only static leader is maintained, and group is stuck at PreparingRebalance stage
assertTrue(group.allDynamicMemberIds().isEmpty());
assertEquals(Collections.singleton(rebalanceResult.leaderId), group.allMemberIds());
assertTrue(group.allDynamicMemberIds().isEmpty());
assertEquals(2, group.generationId());
assertTrue(group.isInState(PREPARING_REBALANCE));
}
@Test
public void testPendingMembersLeaveGroup() throws Exception {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
GenericGroup group = context.createGenericGroup("group-id");
JoinGroupResponseData pendingJoinResponse = context.setupGroupWithPendingMember(group).pendingMemberResponse;
CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave(
new LeaveGroupRequestData()
.setGroupId("group-id")
.setMembers(Collections.singletonList(
new MemberIdentity()
.setMemberId(pendingJoinResponse.memberId())
))
);
LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
.setMembers(Collections.singletonList(
new LeaveGroupResponseData.MemberResponse()
.setGroupInstanceId(null)
.setMemberId(pendingJoinResponse.memberId())));
assertEquals(expectedResponse, leaveResult.response());
assertTrue(leaveResult.records().isEmpty());
assertTrue(group.isInState(COMPLETING_REBALANCE));
assertEquals(2, group.allMembers().size());
assertEquals(2, group.allDynamicMemberIds().size());
assertEquals(0, group.numPendingJoinMembers());
}
@Test
public void testLeaveGroupInvalidGroup() {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
context.createGenericGroup("group-id");
assertThrows(UnknownMemberIdException.class, () -> context.sendGenericGroupLeave(
new LeaveGroupRequestData()
.setGroupId("invalid-group-id")
));
}
@Test
public void testLeaveGroupUnknownGroup() {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
assertThrows(UnknownMemberIdException.class, () -> context.sendGenericGroupLeave(
new LeaveGroupRequestData()
.setGroupId("unknown-group-id")
.setMembers(Collections.singletonList(
new MemberIdentity()
.setMemberId("member-id")
))
));
}
@Test
public void testLeaveGroupUnknownMemberIdExistingGroup() throws Exception {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
context.createGenericGroup("group-id");
context.joinGenericGroupAsDynamicMemberAndCompleteJoin(
new JoinGroupRequestBuilder()
.withGroupId("group-id")
.withMemberId(UNKNOWN_MEMBER_ID)
.withDefaultProtocolTypeAndProtocols()
.build()
);
CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave(
new LeaveGroupRequestData()
.setGroupId("group-id")
.setMembers(Collections.singletonList(
new MemberIdentity()
.setMemberId("unknown-member-id")
))
);
LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
.setMembers(Collections.singletonList(
new LeaveGroupResponseData.MemberResponse()
.setGroupInstanceId(null)
.setMemberId("unknown-member-id")
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())));
assertEquals(expectedResponse, leaveResult.response());
assertTrue(leaveResult.records().isEmpty());
}
@Test
public void testLeaveDeadGroup() {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
GenericGroup group = context.createGenericGroup("group-id");
group.transitionTo(DEAD);
CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave(
new LeaveGroupRequestData()
.setGroupId("group-id")
.setMembers(Collections.singletonList(
new MemberIdentity()
.setMemberId("member-id")
))
);
LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code());
assertEquals(expectedResponse, leaveResult.response());
assertTrue(leaveResult.records().isEmpty());
}
@Test
public void testValidLeaveGroup() throws Exception {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
GenericGroup group = context.createGenericGroup("group-id");
JoinGroupResponseData joinResponse = context.joinGenericGroupAsDynamicMemberAndCompleteJoin(
new JoinGroupRequestBuilder()
.withGroupId("group-id")
.withMemberId(UNKNOWN_MEMBER_ID)
.withDefaultProtocolTypeAndProtocols()
.build()
);
// Dynamic member leaves. The group becomes empty.
CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave(
new LeaveGroupRequestData()
.setGroupId("group-id")
.setMembers(Collections.singletonList(
new MemberIdentity()
.setMemberId(joinResponse.memberId())
))
);
assertEquals(
Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())),
leaveResult.records()
);
// Simulate a successful write to the log.
leaveResult.appendFuture().complete(null);
LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
.setMembers(Collections.singletonList(
new LeaveGroupResponseData.MemberResponse()
.setGroupInstanceId(null)
.setMemberId(joinResponse.memberId())));
assertEquals(expectedResponse, leaveResult.response());
assertTrue(group.isInState(EMPTY));
assertEquals(2, group.generationId());
}
@Test
public void testLeaveGroupWithFencedInstanceId() throws Exception {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
context.createGenericGroup("group-id");
context.joinGenericGroupAndCompleteJoin(
new JoinGroupRequestBuilder()
.withGroupId("group-id")
.withGroupInstanceId("group-instance-id")
.withMemberId(UNKNOWN_MEMBER_ID)
.withDefaultProtocolTypeAndProtocols()
.build(),
true,
true
);
CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave(
new LeaveGroupRequestData()
.setGroupId("group-id")
.setMembers(Collections.singletonList(
new MemberIdentity()
.setGroupInstanceId("group-instance-id")
.setMemberId("other-member-id") // invalid member id
))
);
LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
.setMembers(Collections.singletonList(
new LeaveGroupResponseData.MemberResponse()
.setGroupInstanceId("group-instance-id")
.setMemberId("other-member-id")
.setErrorCode(Errors.FENCED_INSTANCE_ID.code())));
assertEquals(expectedResponse, leaveResult.response());
assertTrue(leaveResult.records().isEmpty());
}
@Test
public void testLeaveGroupStaticMemberWithUnknownMemberId() throws Exception {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
context.createGenericGroup("group-id");
context.joinGenericGroupAndCompleteJoin(
new JoinGroupRequestBuilder()
.withGroupId("group-id")
.withGroupInstanceId("group-instance-id")
.withMemberId(UNKNOWN_MEMBER_ID)
.withDefaultProtocolTypeAndProtocols()
.build(),
true,
true
);
// Having unknown member id will not affect the request processing due to valid group instance id.
CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave(
new LeaveGroupRequestData()
.setGroupId("group-id")
.setMembers(Collections.singletonList(
new MemberIdentity()
.setGroupInstanceId("group-instance-id")
.setMemberId(UNKNOWN_MEMBER_ID)
))
);
LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
.setMembers(Collections.singletonList(
new LeaveGroupResponseData.MemberResponse()
.setGroupInstanceId("group-instance-id")));
assertEquals(expectedResponse, leaveResult.response());
}
@Test
public void testStaticMembersValidBatchLeaveGroup() throws Exception {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
context.staticMembersJoinAndRebalance(
"group-id",
"leader-instance-id",
"follower-instance-id"
);
CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave(
new LeaveGroupRequestData()
.setGroupId("group-id")
.setMembers(
Arrays.asList(
new MemberIdentity()
.setGroupInstanceId("leader-instance-id"),
new MemberIdentity()
.setGroupInstanceId("follower-instance-id")
)
)
);
LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
.setMembers(Arrays.asList(
new LeaveGroupResponseData.MemberResponse()
.setGroupInstanceId("leader-instance-id"),
new LeaveGroupResponseData.MemberResponse()
.setGroupInstanceId("follower-instance-id")));
assertEquals(expectedResponse, leaveResult.response());
}
@Test
public void testStaticMembersLeaveUnknownGroup() throws Exception {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
context.staticMembersJoinAndRebalance(
"group-id",
"leader-instance-id",
"follower-instance-id"
);
assertThrows(UnknownMemberIdException.class, () -> context.sendGenericGroupLeave(
new LeaveGroupRequestData()
.setGroupId("invalid-group-id") // Invalid group id
.setMembers(
Arrays.asList(
new MemberIdentity()
.setGroupInstanceId("leader-instance-id"),
new MemberIdentity()
.setGroupInstanceId("follower-instance-id")
)
)
));
}
@Test
public void testStaticMembersFencedInstanceBatchLeaveGroup() throws Exception {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
context.staticMembersJoinAndRebalance(
"group-id",
"leader-instance-id",
"follower-instance-id"
);
CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave(
new LeaveGroupRequestData()
.setGroupId("group-id")
.setMembers(
Arrays.asList(
new MemberIdentity()
.setGroupInstanceId("leader-instance-id"),
new MemberIdentity()
.setGroupInstanceId("follower-instance-id")
.setMemberId("invalid-member-id")
)
)
);
LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
.setMembers(Arrays.asList(
new LeaveGroupResponseData.MemberResponse()
.setGroupInstanceId("leader-instance-id"),
new LeaveGroupResponseData.MemberResponse()
.setGroupInstanceId("follower-instance-id")
.setMemberId("invalid-member-id")
.setErrorCode(Errors.FENCED_INSTANCE_ID.code())));
assertEquals(expectedResponse, leaveResult.response());
}
@Test
public void testStaticMembersUnknownInstanceBatchLeaveGroup() throws Exception {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
context.staticMembersJoinAndRebalance(
"group-id",
"leader-instance-id",
"follower-instance-id"
);
CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave(
new LeaveGroupRequestData()
.setGroupId("group-id")
.setMembers(
Arrays.asList(
new MemberIdentity()
.setGroupInstanceId("unknown-instance-id"), // Unknown instance id
new MemberIdentity()
.setGroupInstanceId("follower-instance-id")
)
)
);
LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
.setMembers(Arrays.asList(
new LeaveGroupResponseData.MemberResponse()
.setGroupInstanceId("unknown-instance-id")
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()),
new LeaveGroupResponseData.MemberResponse()
.setGroupInstanceId("follower-instance-id")));
assertEquals(expectedResponse, leaveResult.response());
assertTrue(leaveResult.records().isEmpty());
}
@Test
public void testPendingMemberBatchLeaveGroup() throws Exception {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
GenericGroup group = context.createGenericGroup("group-id");
JoinGroupResponseData pendingJoinResponse = context.setupGroupWithPendingMember(group).pendingMemberResponse;
CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave(
new LeaveGroupRequestData()
.setGroupId("group-id")
.setMembers(
Arrays.asList(
new MemberIdentity()
.setGroupInstanceId("unknown-instance-id"), // Unknown instance id
new MemberIdentity()
.setMemberId(pendingJoinResponse.memberId())
)
)
);
LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
.setMembers(Arrays.asList(
new LeaveGroupResponseData.MemberResponse()
.setGroupInstanceId("unknown-instance-id")
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()),
new LeaveGroupResponseData.MemberResponse()
.setGroupInstanceId(null)
.setMemberId(pendingJoinResponse.memberId())));
assertEquals(expectedResponse, leaveResult.response());
}
@Test
public void testJoinedMemberPendingMemberBatchLeaveGroup() throws Exception {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
GenericGroup group = context.createGenericGroup("group-id");
PendingMemberGroupResult pendingMemberGroupResult = context.setupGroupWithPendingMember(group);
CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave(
new LeaveGroupRequestData()
.setGroupId("group-id")
.setMembers(
Arrays.asList(
new MemberIdentity()
.setMemberId(pendingMemberGroupResult.leaderId),
new MemberIdentity()
.setMemberId(pendingMemberGroupResult.followerId),
new MemberIdentity()
.setMemberId(pendingMemberGroupResult.pendingMemberResponse.memberId())
)
)
);
LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
.setMembers(Arrays.asList(
new LeaveGroupResponseData.MemberResponse()
.setGroupInstanceId(null)
.setMemberId(pendingMemberGroupResult.leaderId),
new LeaveGroupResponseData.MemberResponse()
.setGroupInstanceId(null)
.setMemberId(pendingMemberGroupResult.followerId),
new LeaveGroupResponseData.MemberResponse()
.setGroupInstanceId(null)
.setMemberId(pendingMemberGroupResult.pendingMemberResponse.memberId())));
assertEquals(expectedResponse, leaveResult.response());
}
@Test
public void testJoinedMemberPendingMemberBatchLeaveGroupWithUnknownMember() throws Exception {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.build();
GenericGroup group = context.createGenericGroup("group-id");
PendingMemberGroupResult pendingMemberGroupResult = context.setupGroupWithPendingMember(group);
CoordinatorResult<LeaveGroupResponseData, Record> leaveResult = context.sendGenericGroupLeave(
new LeaveGroupRequestData()
.setGroupId("group-id")
.setMembers(
Arrays.asList(
new MemberIdentity()
.setMemberId(pendingMemberGroupResult.leaderId),
new MemberIdentity()
.setMemberId(pendingMemberGroupResult.followerId),
new MemberIdentity()
.setMemberId(pendingMemberGroupResult.pendingMemberResponse.memberId()),
new MemberIdentity()
.setMemberId("unknown-member-id")
)
)
);
LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
.setMembers(Arrays.asList(
new LeaveGroupResponseData.MemberResponse()
.setGroupInstanceId(null)
.setMemberId(pendingMemberGroupResult.leaderId),
new LeaveGroupResponseData.MemberResponse()
.setGroupInstanceId(null)
.setMemberId(pendingMemberGroupResult.followerId),
new LeaveGroupResponseData.MemberResponse()
.setGroupInstanceId(null)
.setMemberId(pendingMemberGroupResult.pendingMemberResponse.memberId()),
new LeaveGroupResponseData.MemberResponse()
.setGroupInstanceId(null)
.setMemberId("unknown-member-id")
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())));
assertEquals(expectedResponse, leaveResult.response());
}
private static void assertNoOrEmptyResult(List<ExpiredTimeout<Void, Record>> timeouts) {
assertTrue(timeouts.size() <= 1);
timeouts.forEach(timeout -> assertEquals(EMPTY_RESULT, timeout.result));
@ -8970,6 +9545,22 @@ public class GroupMetadataManagerTest {
}
}
private static class PendingMemberGroupResult {
String leaderId;
String followerId;
JoinGroupResponseData pendingMemberResponse;
public PendingMemberGroupResult(
String leaderId,
String followerId,
JoinGroupResponseData pendingMemberResponse
) {
this.leaderId = leaderId;
this.followerId = followerId;
this.pendingMemberResponse = pendingMemberResponse;
}
}
private static class JoinResult {
CompletableFuture<JoinGroupResponseData> joinFuture;
List<Record> records;