diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 87f10655bd7..6e946b1bead 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -330,7 +330,7 @@
+ files="(RecordHelpersTest|GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorServiceTest).java"/>
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index f018fa0ae6e..ac3aa9d45b4 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
@@ -79,6 +80,7 @@ import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntSupplier;
+import java.util.stream.Collectors;
/**
* The group coordinator service.
@@ -302,7 +304,8 @@ public class GroupCoordinatorService implements GroupCoordinator {
return responseFuture;
}
- runtime.scheduleWriteOperation("generic-group-join",
+ runtime.scheduleWriteOperation(
+ "generic-group-join",
topicPartitionFor(request.groupId()),
coordinator -> coordinator.genericGroupJoin(context, request, responseFuture)
).exceptionally(exception -> {
@@ -341,7 +344,8 @@ public class GroupCoordinatorService implements GroupCoordinator {
CompletableFuture responseFuture = new CompletableFuture<>();
- runtime.scheduleWriteOperation("generic-group-sync",
+ runtime.scheduleWriteOperation(
+ "generic-group-sync",
topicPartitionFor(request.groupId()),
coordinator -> coordinator.genericGroupSync(context, request, responseFuture)
).exceptionally(exception -> {
@@ -411,9 +415,37 @@ public class GroupCoordinatorService implements GroupCoordinator {
return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ if (!isGroupIdNotEmpty(request.groupId())) {
+ return CompletableFuture.completedFuture(new LeaveGroupResponseData()
+ .setErrorCode(Errors.INVALID_GROUP_ID.code()));
+ }
+
+ return runtime.scheduleWriteOperation(
+ "generic-group-leave",
+ topicPartitionFor(request.groupId()),
+ coordinator -> coordinator.genericGroupLeave(context, request)
+ ).exceptionally(exception -> {
+ if (!(exception instanceof KafkaException)) {
+ log.error("LeaveGroup request {} hit an unexpected exception: {}",
+ request, exception.getMessage());
+ }
+
+ if (exception instanceof UnknownMemberIdException) {
+ // Group was not found.
+ List memberResponses = request.members().stream()
+ .map(member -> new LeaveGroupResponseData.MemberResponse()
+ .setMemberId(member.memberId())
+ .setGroupInstanceId(member.groupInstanceId())
+ .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()))
+ .collect(Collectors.toList());
+
+ return new LeaveGroupResponseData()
+ .setMembers(memberResponses);
+ }
+
+ return new LeaveGroupResponseData()
+ .setErrorCode(Errors.forException(exception).code());
+ });
}
/**
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 94a4329f54c..ed68bb22c5a 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
@@ -306,6 +308,22 @@ public class GroupCoordinatorShard implements CoordinatorShard {
return offsetMetadataManager.commitOffset(context, request);
}
+ /**
+ * Handles a LeaveGroup request.
+ *
+ * @param context The request context.
+ * @param request The actual LeaveGroup request.
+ *
+ * @return A Result containing the LeaveGroup response and
+ * a list of records to update the state machine.
+ */
+ public CoordinatorResult genericGroupLeave(
+ RequestContext context,
+ LeaveGroupRequestData request
+ ) throws ApiException {
+ return groupMetadataManager.genericGroupLeave(context, request);
+ }
+
/**
* The coordinator has been loaded. This is used to apply any
* post loading operations (e.g. registering timers).
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index fce74d58561..a174a0fb1ff 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -37,6 +37,10 @@ import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
+import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.protocol.Errors;
@@ -2904,6 +2908,149 @@ public class GroupMetadataManager {
}
}
+ /**
+ * Handle a generic LeaveGroupRequest.
+ *
+ * @param context The request context.
+ * @param request The actual LeaveGroup request.
+ *
+ * @return The LeaveGroup response and the GroupMetadata record to append if the group
+ * no longer has any members.
+ */
+ public CoordinatorResult genericGroupLeave(
+ RequestContext context,
+ LeaveGroupRequestData request
+ ) throws UnknownMemberIdException, GroupIdNotFoundException {
+ GenericGroup group = getOrMaybeCreateGenericGroup(request.groupId(), false);
+ if (group.isInState(DEAD)) {
+ return new CoordinatorResult<>(
+ Collections.emptyList(),
+ new LeaveGroupResponseData()
+ .setErrorCode(COORDINATOR_NOT_AVAILABLE.code())
+ );
+ }
+
+ List memberResponses = new ArrayList<>();
+
+ for (MemberIdentity member: request.members()) {
+ String reason = member.reason() != null ? member.reason() : "not provided";
+ // The LeaveGroup API allows administrative removal of members by GroupInstanceId
+ // in which case we expect the MemberId to be undefined.
+ if (UNKNOWN_MEMBER_ID.equals(member.memberId())) {
+ if (member.groupInstanceId() != null && group.hasStaticMember(member.groupInstanceId())) {
+ removeCurrentMemberFromGenericGroup(
+ group,
+ group.staticMemberId(member.groupInstanceId()),
+ reason
+ );
+ memberResponses.add(
+ new MemberResponse()
+ .setMemberId(member.memberId())
+ .setGroupInstanceId(member.groupInstanceId())
+ );
+ } else {
+ memberResponses.add(
+ new MemberResponse()
+ .setMemberId(member.memberId())
+ .setGroupInstanceId(member.groupInstanceId())
+ .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
+ );
+ }
+ } else if (group.isPendingMember(member.memberId())) {
+ group.remove(member.memberId());
+ timer.cancel(genericGroupHeartbeatKey(group.groupId(), member.memberId()));
+ log.info("[Group {}] Pending member {} has left group through explicit `LeaveGroup` request; client reason: {}",
+ group.groupId(), member.memberId(), reason);
+
+ memberResponses.add(
+ new MemberResponse()
+ .setMemberId(member.memberId())
+ .setGroupInstanceId(member.groupInstanceId())
+ );
+ } else {
+ try {
+ group.validateMember(member.memberId(), member.groupInstanceId(), "leave-group");
+ removeCurrentMemberFromGenericGroup(
+ group,
+ member.memberId(),
+ reason
+ );
+ memberResponses.add(
+ new MemberResponse()
+ .setMemberId(member.memberId())
+ .setGroupInstanceId(member.groupInstanceId())
+ );
+ } catch (KafkaException e) {
+ memberResponses.add(
+ new MemberResponse()
+ .setMemberId(member.memberId())
+ .setGroupInstanceId(member.groupInstanceId())
+ .setErrorCode(Errors.forException(e).code())
+ );
+ }
+ }
+ }
+
+ List validLeaveGroupMembers = memberResponses.stream()
+ .filter(response -> response.errorCode() == Errors.NONE.code())
+ .map(MemberResponse::memberId)
+ .collect(Collectors.toList());
+
+ String reason = "explicit `LeaveGroup` request for (" + String.join(", ", validLeaveGroupMembers) + ") members.";
+ CoordinatorResult coordinatorResult = EMPTY_RESULT;
+
+ if (!validLeaveGroupMembers.isEmpty()) {
+ switch (group.currentState()) {
+ case STABLE:
+ case COMPLETING_REBALANCE:
+ coordinatorResult = maybePrepareRebalanceOrCompleteJoin(group, reason);
+ break;
+ case PREPARING_REBALANCE:
+ coordinatorResult = maybeCompleteJoinPhase(group);
+ break;
+ default:
+ }
+ }
+
+ return new CoordinatorResult<>(
+ coordinatorResult.records(),
+ new LeaveGroupResponseData()
+ .setMembers(memberResponses),
+ coordinatorResult.appendFuture()
+ );
+ }
+
+ /**
+ * Remove a member from the group. Cancel member's heartbeat, and prepare rebalance
+ * or complete the join phase if necessary.
+ *
+ * @param group The generic group.
+ * @param memberId The member id.
+ * @param reason The reason for the LeaveGroup request.
+ *
+ */
+ private void removeCurrentMemberFromGenericGroup(
+ GenericGroup group,
+ String memberId,
+ String reason
+ ) {
+ GenericGroupMember member = group.member(memberId);
+ timer.cancel(genericGroupHeartbeatKey(group.groupId(), memberId));
+ log.info("[Group {}] Member {} has left group through explicit `LeaveGroup` request; client reason: {}",
+ group.groupId(), memberId, reason);
+
+ // New members may timeout with a pending JoinGroup while the group is still rebalancing, so we have
+ // to invoke the callback before removing the member. We return UNKNOWN_MEMBER_ID so that the consumer
+ // will retry the JoinGroup request if is still active.
+ group.completeJoinFuture(
+ member,
+ new JoinGroupResponseData()
+ .setMemberId(UNKNOWN_MEMBER_ID)
+ .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
+ );
+ group.remove(member.memberId());
+ }
+
/**
* Checks whether the given protocol type or name in the request is inconsistent with the group's.
*
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
index 3347970aaf0..d163f385fe0 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java
@@ -692,10 +692,10 @@ public class GenericGroup implements Group {
}
/**
- * @return all static members in the group.
+ * @return the ids of all static members in the group.
*/
public Set allStaticMemberIds() {
- return staticMembers.keySet();
+ return new HashSet<>(staticMembers.values());
}
// For testing only.
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorResult.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorResult.java
index 92907797581..53ce2f886de 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorResult.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorResult.java
@@ -41,7 +41,7 @@ public class CoordinatorResult {
/**
* The future to complete once the records are committed.
*/
- private final CompletableFuture appendFuture;
+ private final CompletableFuture appendFuture;
/**
* Constructs a Result with records and a response.
@@ -64,7 +64,7 @@ public class CoordinatorResult {
*/
public CoordinatorResult(
List records,
- CompletableFuture appendFuture
+ CompletableFuture appendFuture
) {
this(records, null, appendFuture);
}
@@ -79,7 +79,7 @@ public class CoordinatorResult {
public CoordinatorResult(
List records,
T response,
- CompletableFuture appendFuture
+ CompletableFuture appendFuture
) {
this.records = Objects.requireNonNull(records);
this.response = response;
@@ -114,7 +114,7 @@ public class CoordinatorResult {
/**
* @return The append-future.
*/
- public CompletableFuture appendFuture() {
+ public CompletableFuture appendFuture() {
return appendFuture;
}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
index 9ae0d1fc42a..9cf277d80c4 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
@@ -698,10 +698,10 @@ public class CoordinatorRuntime, U> implements Aut
*/
@Override
public void complete(Throwable exception) {
- CompletableFuture appendFuture = result != null ? result.appendFuture() : null;
+ CompletableFuture appendFuture = result != null ? result.appendFuture() : null;
if (exception == null) {
- if (appendFuture != null) result.appendFuture().complete(result.response());
+ if (appendFuture != null) result.appendFuture().complete(null);
future.complete(result.response());
} else {
if (appendFuture != null) result.appendFuture().completeExceptionally(exception);
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index c5e0c97555b..61af5582824 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
@@ -37,6 +38,8 @@ import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.network.ClientInformation;
@@ -62,6 +65,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import java.net.InetAddress;
+import java.util.Arrays;
import java.util.Collections;
import java.util.OptionalInt;
import java.util.Properties;
@@ -701,4 +705,87 @@ public class GroupCoordinatorServiceTest {
assertEquals(response, future.get(5, TimeUnit.SECONDS));
}
+
+ @Test
+ public void testLeaveGroup() throws Exception {
+ CoordinatorRuntime runtime = mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+
+ LeaveGroupRequestData request = new LeaveGroupRequestData()
+ .setGroupId("foo");
+
+ service.startup(() -> 1);
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("generic-group-leave"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(
+ new LeaveGroupResponseData()
+ ));
+
+ CompletableFuture future = service.leaveGroup(
+ requestContext(ApiKeys.LEAVE_GROUP),
+ request
+ );
+
+ assertTrue(future.isDone());
+ assertEquals(new LeaveGroupResponseData(), future.get());
+ }
+
+ @Test
+ public void testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
+ CoordinatorRuntime runtime = mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+
+ LeaveGroupRequestData request = new LeaveGroupRequestData()
+ .setGroupId("foo")
+ .setMembers(Arrays.asList(
+ new LeaveGroupRequestData.MemberIdentity()
+ .setMemberId("member-1")
+ .setGroupInstanceId("instance-1"),
+ new LeaveGroupRequestData.MemberIdentity()
+ .setMemberId("member-2")
+ .setGroupInstanceId("instance-2")
+ ));
+
+ service.startup(() -> 1);
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("generic-group-leave"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(FutureUtils.failedFuture(
+ new UnknownMemberIdException()
+ ));
+
+ CompletableFuture future = service.leaveGroup(
+ requestContext(ApiKeys.LEAVE_GROUP),
+ request
+ );
+
+ assertTrue(future.isDone());
+ LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setMembers(Arrays.asList(
+ new LeaveGroupResponseData.MemberResponse()
+ .setMemberId("member-1")
+ .setGroupInstanceId("instance-1")
+ .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()),
+ new LeaveGroupResponseData.MemberResponse()
+ .setMemberId("member-2")
+ .setGroupInstanceId("instance-2")
+ .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
+ ));
+
+ assertEquals(expectedResponse, future.get());
+ }
}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index ef553db41b9..3a735db37e3 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -38,6 +38,9 @@ import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
+import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment;
import org.apache.kafka.common.message.SyncGroupResponseData;
@@ -738,7 +741,6 @@ public class GroupMetadataManagerTest {
return new SyncResult(responseFuture, coordinatorResult);
}
-
public RebalanceResult staticMembersJoinAndRebalance(
String groupId,
String leaderInstanceId,
@@ -854,7 +856,7 @@ public class GroupMetadataManagerTest {
);
}
- public JoinGroupResponseData setupGroupWithPendingMember(GenericGroup group) throws Exception {
+ public PendingMemberGroupResult setupGroupWithPendingMember(GenericGroup group) throws Exception {
// Add the first member
JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
.withGroupId("group-id")
@@ -957,9 +959,14 @@ public class GroupMetadataManagerTest {
assertTrue(followerJoinResult.records.isEmpty());
assertFalse(followerJoinResult.joinFuture.isDone());
assertTrue(group.isInState(PREPARING_REBALANCE));
+ assertEquals(2, group.size());
assertEquals(1, group.numPendingJoinMembers());
- return pendingMemberJoinResult.joinFuture.get();
+ return new PendingMemberGroupResult(
+ leaderJoinResponse.memberId(),
+ followerId,
+ pendingMemberJoinResult.joinFuture.get()
+ );
}
public void verifySessionExpiration(GenericGroup group, int timeoutMs) {
@@ -1099,6 +1106,28 @@ public class GroupMetadataManagerTest {
return joinResponses;
}
+ public CoordinatorResult sendGenericGroupLeave(
+ LeaveGroupRequestData request
+ ) {
+ RequestContext context = new RequestContext(
+ new RequestHeader(
+ ApiKeys.LEAVE_GROUP,
+ ApiKeys.LEAVE_GROUP.latestVersion(),
+ "client",
+ 0
+ ),
+ "1",
+ InetAddress.getLoopbackAddress(),
+ KafkaPrincipal.ANONYMOUS,
+ ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
+ SecurityProtocol.PLAINTEXT,
+ ClientInformation.EMPTY,
+ false
+ );
+
+ return groupMetadataManager.genericGroupLeave(context, request);
+ }
+
private ApiMessage messageOrNull(ApiMessageAndVersion apiMessageAndVersion) {
if (apiMessageAndVersion == null) {
return null;
@@ -7730,7 +7759,7 @@ public class GroupMetadataManagerTest {
// Set up a group in with a pending member. The test checks if the pending member joining
// completes the rebalancing operation
- JoinGroupResponseData pendingMemberResponse = context.setupGroupWithPendingMember(group);
+ JoinGroupResponseData pendingMemberResponse = context.setupGroupWithPendingMember(group).pendingMemberResponse;
// Compete join group for the pending member
JoinGroupRequestData request = new JoinGroupRequestBuilder()
@@ -8747,6 +8776,552 @@ public class GroupMetadataManagerTest {
assertEquals(expectedGroupInstanceIds, groupInstanceIds);
}
+ @Test
+ public void testGroupStuckInRebalanceTimeoutDueToNonjoinedStaticMember() throws Exception {
+ GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+ .build();
+ int longSessionTimeoutMs = 10000;
+ int rebalanceTimeoutMs = 5000;
+ RebalanceResult rebalanceResult = context.staticMembersJoinAndRebalance(
+ "group-id",
+ "leader-instance-id",
+ "follower-instance-id",
+ rebalanceTimeoutMs,
+ longSessionTimeoutMs
+ );
+ GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup("group-id", false);
+
+ // New member joins
+ JoinResult joinResult = context.sendGenericGroupJoin(
+ new JoinGroupRequestBuilder()
+ .withGroupId("group-id")
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withProtocolSuperset()
+ .withSessionTimeoutMs(longSessionTimeoutMs)
+ .build()
+ );
+
+ // The new dynamic member has been elected as leader
+ assertNoOrEmptyResult(context.sleep(rebalanceTimeoutMs));
+ assertTrue(joinResult.joinFuture.isDone());
+ assertEquals(Errors.NONE.code(), joinResult.joinFuture.get().errorCode());
+ assertEquals(joinResult.joinFuture.get().leader(), joinResult.joinFuture.get().memberId());
+ assertEquals(3, joinResult.joinFuture.get().members().size());
+ assertEquals(2, joinResult.joinFuture.get().generationId());
+ assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+ assertEquals(
+ mkSet(rebalanceResult.leaderId, rebalanceResult.followerId, joinResult.joinFuture.get().memberId()),
+ group.allMemberIds()
+ );
+ assertEquals(
+ mkSet(rebalanceResult.leaderId, rebalanceResult.followerId),
+ group.allStaticMemberIds()
+ );
+ assertEquals(
+ mkSet(joinResult.joinFuture.get().memberId()),
+ group.allDynamicMemberIds()
+ );
+
+ // Send a special leave group request from static follower, moving group towards PreparingRebalance
+ CoordinatorResult leaveResult = context.sendGenericGroupLeave(
+ new LeaveGroupRequestData()
+ .setGroupId("group-id")
+ .setMembers(Collections.singletonList(
+ new MemberIdentity()
+ .setMemberId(rebalanceResult.followerId)
+ .setGroupInstanceId("follower-instance-id")
+ ))
+ );
+
+ LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+ .setMembers(Collections.singletonList(
+ new LeaveGroupResponseData.MemberResponse()
+ .setMemberId(rebalanceResult.followerId)
+ .setGroupInstanceId("follower-instance-id")));
+
+ assertEquals(expectedResponse, leaveResult.response());
+ assertTrue(group.isInState(PREPARING_REBALANCE));
+
+ context.sleep(rebalanceTimeoutMs);
+ // Only static leader is maintained, and group is stuck at PreparingRebalance stage
+ assertTrue(group.allDynamicMemberIds().isEmpty());
+ assertEquals(Collections.singleton(rebalanceResult.leaderId), group.allMemberIds());
+ assertTrue(group.allDynamicMemberIds().isEmpty());
+ assertEquals(2, group.generationId());
+ assertTrue(group.isInState(PREPARING_REBALANCE));
+ }
+
+ @Test
+ public void testPendingMembersLeaveGroup() throws Exception {
+ GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+ .build();
+ GenericGroup group = context.createGenericGroup("group-id");
+ JoinGroupResponseData pendingJoinResponse = context.setupGroupWithPendingMember(group).pendingMemberResponse;
+
+ CoordinatorResult leaveResult = context.sendGenericGroupLeave(
+ new LeaveGroupRequestData()
+ .setGroupId("group-id")
+ .setMembers(Collections.singletonList(
+ new MemberIdentity()
+ .setMemberId(pendingJoinResponse.memberId())
+ ))
+ );
+
+ LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+ .setMembers(Collections.singletonList(
+ new LeaveGroupResponseData.MemberResponse()
+ .setGroupInstanceId(null)
+ .setMemberId(pendingJoinResponse.memberId())));
+
+ assertEquals(expectedResponse, leaveResult.response());
+ assertTrue(leaveResult.records().isEmpty());
+
+ assertTrue(group.isInState(COMPLETING_REBALANCE));
+ assertEquals(2, group.allMembers().size());
+ assertEquals(2, group.allDynamicMemberIds().size());
+ assertEquals(0, group.numPendingJoinMembers());
+ }
+
+ @Test
+ public void testLeaveGroupInvalidGroup() {
+ GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+ .build();
+ context.createGenericGroup("group-id");
+
+ assertThrows(UnknownMemberIdException.class, () -> context.sendGenericGroupLeave(
+ new LeaveGroupRequestData()
+ .setGroupId("invalid-group-id")
+ ));
+ }
+
+ @Test
+ public void testLeaveGroupUnknownGroup() {
+ GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ assertThrows(UnknownMemberIdException.class, () -> context.sendGenericGroupLeave(
+ new LeaveGroupRequestData()
+ .setGroupId("unknown-group-id")
+ .setMembers(Collections.singletonList(
+ new MemberIdentity()
+ .setMemberId("member-id")
+ ))
+ ));
+ }
+
+ @Test
+ public void testLeaveGroupUnknownMemberIdExistingGroup() throws Exception {
+ GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+ .build();
+ context.createGenericGroup("group-id");
+
+ context.joinGenericGroupAsDynamicMemberAndCompleteJoin(
+ new JoinGroupRequestBuilder()
+ .withGroupId("group-id")
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withDefaultProtocolTypeAndProtocols()
+ .build()
+ );
+
+ CoordinatorResult leaveResult = context.sendGenericGroupLeave(
+ new LeaveGroupRequestData()
+ .setGroupId("group-id")
+ .setMembers(Collections.singletonList(
+ new MemberIdentity()
+ .setMemberId("unknown-member-id")
+ ))
+ );
+
+ LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+ .setMembers(Collections.singletonList(
+ new LeaveGroupResponseData.MemberResponse()
+ .setGroupInstanceId(null)
+ .setMemberId("unknown-member-id")
+ .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())));
+
+ assertEquals(expectedResponse, leaveResult.response());
+ assertTrue(leaveResult.records().isEmpty());
+ }
+
+ @Test
+ public void testLeaveDeadGroup() {
+ GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+ .build();
+ GenericGroup group = context.createGenericGroup("group-id");
+ group.transitionTo(DEAD);
+
+ CoordinatorResult leaveResult = context.sendGenericGroupLeave(
+ new LeaveGroupRequestData()
+ .setGroupId("group-id")
+ .setMembers(Collections.singletonList(
+ new MemberIdentity()
+ .setMemberId("member-id")
+ ))
+ );
+
+ LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code());
+
+ assertEquals(expectedResponse, leaveResult.response());
+ assertTrue(leaveResult.records().isEmpty());
+ }
+
+ @Test
+ public void testValidLeaveGroup() throws Exception {
+ GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+ .build();
+ GenericGroup group = context.createGenericGroup("group-id");
+
+ JoinGroupResponseData joinResponse = context.joinGenericGroupAsDynamicMemberAndCompleteJoin(
+ new JoinGroupRequestBuilder()
+ .withGroupId("group-id")
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withDefaultProtocolTypeAndProtocols()
+ .build()
+ );
+
+ // Dynamic member leaves. The group becomes empty.
+ CoordinatorResult leaveResult = context.sendGenericGroupLeave(
+ new LeaveGroupRequestData()
+ .setGroupId("group-id")
+ .setMembers(Collections.singletonList(
+ new MemberIdentity()
+ .setMemberId(joinResponse.memberId())
+ ))
+ );
+ assertEquals(
+ Collections.singletonList(newGroupMetadataRecordWithCurrentState(group, MetadataVersion.latest())),
+ leaveResult.records()
+ );
+ // Simulate a successful write to the log.
+ leaveResult.appendFuture().complete(null);
+
+ LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+ .setMembers(Collections.singletonList(
+ new LeaveGroupResponseData.MemberResponse()
+ .setGroupInstanceId(null)
+ .setMemberId(joinResponse.memberId())));
+
+ assertEquals(expectedResponse, leaveResult.response());
+ assertTrue(group.isInState(EMPTY));
+ assertEquals(2, group.generationId());
+ }
+
+ @Test
+ public void testLeaveGroupWithFencedInstanceId() throws Exception {
+ GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+ .build();
+ context.createGenericGroup("group-id");
+
+ context.joinGenericGroupAndCompleteJoin(
+ new JoinGroupRequestBuilder()
+ .withGroupId("group-id")
+ .withGroupInstanceId("group-instance-id")
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withDefaultProtocolTypeAndProtocols()
+ .build(),
+ true,
+ true
+ );
+
+ CoordinatorResult leaveResult = context.sendGenericGroupLeave(
+ new LeaveGroupRequestData()
+ .setGroupId("group-id")
+ .setMembers(Collections.singletonList(
+ new MemberIdentity()
+ .setGroupInstanceId("group-instance-id")
+ .setMemberId("other-member-id") // invalid member id
+ ))
+ );
+
+ LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+ .setMembers(Collections.singletonList(
+ new LeaveGroupResponseData.MemberResponse()
+ .setGroupInstanceId("group-instance-id")
+ .setMemberId("other-member-id")
+ .setErrorCode(Errors.FENCED_INSTANCE_ID.code())));
+
+ assertEquals(expectedResponse, leaveResult.response());
+ assertTrue(leaveResult.records().isEmpty());
+ }
+
+ @Test
+ public void testLeaveGroupStaticMemberWithUnknownMemberId() throws Exception {
+ GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+ .build();
+ context.createGenericGroup("group-id");
+
+ context.joinGenericGroupAndCompleteJoin(
+ new JoinGroupRequestBuilder()
+ .withGroupId("group-id")
+ .withGroupInstanceId("group-instance-id")
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withDefaultProtocolTypeAndProtocols()
+ .build(),
+ true,
+ true
+ );
+
+ // Having unknown member id will not affect the request processing due to valid group instance id.
+ CoordinatorResult leaveResult = context.sendGenericGroupLeave(
+ new LeaveGroupRequestData()
+ .setGroupId("group-id")
+ .setMembers(Collections.singletonList(
+ new MemberIdentity()
+ .setGroupInstanceId("group-instance-id")
+ .setMemberId(UNKNOWN_MEMBER_ID)
+ ))
+ );
+
+ LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+ .setMembers(Collections.singletonList(
+ new LeaveGroupResponseData.MemberResponse()
+ .setGroupInstanceId("group-instance-id")));
+
+ assertEquals(expectedResponse, leaveResult.response());
+ }
+
+ @Test
+ public void testStaticMembersValidBatchLeaveGroup() throws Exception {
+ GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+ .build();
+ context.staticMembersJoinAndRebalance(
+ "group-id",
+ "leader-instance-id",
+ "follower-instance-id"
+ );
+
+ CoordinatorResult leaveResult = context.sendGenericGroupLeave(
+ new LeaveGroupRequestData()
+ .setGroupId("group-id")
+ .setMembers(
+ Arrays.asList(
+ new MemberIdentity()
+ .setGroupInstanceId("leader-instance-id"),
+ new MemberIdentity()
+ .setGroupInstanceId("follower-instance-id")
+ )
+ )
+ );
+
+ LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+ .setMembers(Arrays.asList(
+ new LeaveGroupResponseData.MemberResponse()
+ .setGroupInstanceId("leader-instance-id"),
+ new LeaveGroupResponseData.MemberResponse()
+ .setGroupInstanceId("follower-instance-id")));
+
+ assertEquals(expectedResponse, leaveResult.response());
+ }
+
+ @Test
+ public void testStaticMembersLeaveUnknownGroup() throws Exception {
+ GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+ .build();
+ context.staticMembersJoinAndRebalance(
+ "group-id",
+ "leader-instance-id",
+ "follower-instance-id"
+ );
+
+ assertThrows(UnknownMemberIdException.class, () -> context.sendGenericGroupLeave(
+ new LeaveGroupRequestData()
+ .setGroupId("invalid-group-id") // Invalid group id
+ .setMembers(
+ Arrays.asList(
+ new MemberIdentity()
+ .setGroupInstanceId("leader-instance-id"),
+ new MemberIdentity()
+ .setGroupInstanceId("follower-instance-id")
+ )
+ )
+ ));
+ }
+
+ @Test
+ public void testStaticMembersFencedInstanceBatchLeaveGroup() throws Exception {
+ GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+ .build();
+ context.staticMembersJoinAndRebalance(
+ "group-id",
+ "leader-instance-id",
+ "follower-instance-id"
+ );
+
+ CoordinatorResult leaveResult = context.sendGenericGroupLeave(
+ new LeaveGroupRequestData()
+ .setGroupId("group-id")
+ .setMembers(
+ Arrays.asList(
+ new MemberIdentity()
+ .setGroupInstanceId("leader-instance-id"),
+ new MemberIdentity()
+ .setGroupInstanceId("follower-instance-id")
+ .setMemberId("invalid-member-id")
+ )
+ )
+ );
+
+ LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+ .setMembers(Arrays.asList(
+ new LeaveGroupResponseData.MemberResponse()
+ .setGroupInstanceId("leader-instance-id"),
+ new LeaveGroupResponseData.MemberResponse()
+ .setGroupInstanceId("follower-instance-id")
+ .setMemberId("invalid-member-id")
+ .setErrorCode(Errors.FENCED_INSTANCE_ID.code())));
+
+ assertEquals(expectedResponse, leaveResult.response());
+ }
+
+ @Test
+ public void testStaticMembersUnknownInstanceBatchLeaveGroup() throws Exception {
+ GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+ .build();
+ context.staticMembersJoinAndRebalance(
+ "group-id",
+ "leader-instance-id",
+ "follower-instance-id"
+ );
+
+ CoordinatorResult leaveResult = context.sendGenericGroupLeave(
+ new LeaveGroupRequestData()
+ .setGroupId("group-id")
+ .setMembers(
+ Arrays.asList(
+ new MemberIdentity()
+ .setGroupInstanceId("unknown-instance-id"), // Unknown instance id
+ new MemberIdentity()
+ .setGroupInstanceId("follower-instance-id")
+ )
+ )
+ );
+
+ LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+ .setMembers(Arrays.asList(
+ new LeaveGroupResponseData.MemberResponse()
+ .setGroupInstanceId("unknown-instance-id")
+ .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()),
+ new LeaveGroupResponseData.MemberResponse()
+ .setGroupInstanceId("follower-instance-id")));
+
+ assertEquals(expectedResponse, leaveResult.response());
+ assertTrue(leaveResult.records().isEmpty());
+ }
+
+ @Test
+ public void testPendingMemberBatchLeaveGroup() throws Exception {
+ GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+ .build();
+ GenericGroup group = context.createGenericGroup("group-id");
+ JoinGroupResponseData pendingJoinResponse = context.setupGroupWithPendingMember(group).pendingMemberResponse;
+
+ CoordinatorResult leaveResult = context.sendGenericGroupLeave(
+ new LeaveGroupRequestData()
+ .setGroupId("group-id")
+ .setMembers(
+ Arrays.asList(
+ new MemberIdentity()
+ .setGroupInstanceId("unknown-instance-id"), // Unknown instance id
+ new MemberIdentity()
+ .setMemberId(pendingJoinResponse.memberId())
+ )
+ )
+ );
+
+ LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+ .setMembers(Arrays.asList(
+ new LeaveGroupResponseData.MemberResponse()
+ .setGroupInstanceId("unknown-instance-id")
+ .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()),
+ new LeaveGroupResponseData.MemberResponse()
+ .setGroupInstanceId(null)
+ .setMemberId(pendingJoinResponse.memberId())));
+
+ assertEquals(expectedResponse, leaveResult.response());
+ }
+
+ @Test
+ public void testJoinedMemberPendingMemberBatchLeaveGroup() throws Exception {
+ GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+ .build();
+ GenericGroup group = context.createGenericGroup("group-id");
+ PendingMemberGroupResult pendingMemberGroupResult = context.setupGroupWithPendingMember(group);
+
+ CoordinatorResult leaveResult = context.sendGenericGroupLeave(
+ new LeaveGroupRequestData()
+ .setGroupId("group-id")
+ .setMembers(
+ Arrays.asList(
+ new MemberIdentity()
+ .setMemberId(pendingMemberGroupResult.leaderId),
+ new MemberIdentity()
+ .setMemberId(pendingMemberGroupResult.followerId),
+ new MemberIdentity()
+ .setMemberId(pendingMemberGroupResult.pendingMemberResponse.memberId())
+ )
+ )
+ );
+
+ LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+ .setMembers(Arrays.asList(
+ new LeaveGroupResponseData.MemberResponse()
+ .setGroupInstanceId(null)
+ .setMemberId(pendingMemberGroupResult.leaderId),
+ new LeaveGroupResponseData.MemberResponse()
+ .setGroupInstanceId(null)
+ .setMemberId(pendingMemberGroupResult.followerId),
+ new LeaveGroupResponseData.MemberResponse()
+ .setGroupInstanceId(null)
+ .setMemberId(pendingMemberGroupResult.pendingMemberResponse.memberId())));
+
+ assertEquals(expectedResponse, leaveResult.response());
+ }
+
+ @Test
+ public void testJoinedMemberPendingMemberBatchLeaveGroupWithUnknownMember() throws Exception {
+ GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+ .build();
+ GenericGroup group = context.createGenericGroup("group-id");
+ PendingMemberGroupResult pendingMemberGroupResult = context.setupGroupWithPendingMember(group);
+
+ CoordinatorResult leaveResult = context.sendGenericGroupLeave(
+ new LeaveGroupRequestData()
+ .setGroupId("group-id")
+ .setMembers(
+ Arrays.asList(
+ new MemberIdentity()
+ .setMemberId(pendingMemberGroupResult.leaderId),
+ new MemberIdentity()
+ .setMemberId(pendingMemberGroupResult.followerId),
+ new MemberIdentity()
+ .setMemberId(pendingMemberGroupResult.pendingMemberResponse.memberId()),
+ new MemberIdentity()
+ .setMemberId("unknown-member-id")
+ )
+ )
+ );
+
+ LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData()
+ .setMembers(Arrays.asList(
+ new LeaveGroupResponseData.MemberResponse()
+ .setGroupInstanceId(null)
+ .setMemberId(pendingMemberGroupResult.leaderId),
+ new LeaveGroupResponseData.MemberResponse()
+ .setGroupInstanceId(null)
+ .setMemberId(pendingMemberGroupResult.followerId),
+ new LeaveGroupResponseData.MemberResponse()
+ .setGroupInstanceId(null)
+ .setMemberId(pendingMemberGroupResult.pendingMemberResponse.memberId()),
+ new LeaveGroupResponseData.MemberResponse()
+ .setGroupInstanceId(null)
+ .setMemberId("unknown-member-id")
+ .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())));
+
+ assertEquals(expectedResponse, leaveResult.response());
+ }
+
private static void assertNoOrEmptyResult(List> timeouts) {
assertTrue(timeouts.size() <= 1);
timeouts.forEach(timeout -> assertEquals(EMPTY_RESULT, timeout.result));
@@ -8970,6 +9545,22 @@ public class GroupMetadataManagerTest {
}
}
+ private static class PendingMemberGroupResult {
+ String leaderId;
+ String followerId;
+ JoinGroupResponseData pendingMemberResponse;
+
+ public PendingMemberGroupResult(
+ String leaderId,
+ String followerId,
+ JoinGroupResponseData pendingMemberResponse
+ ) {
+ this.leaderId = leaderId;
+ this.followerId = followerId;
+ this.pendingMemberResponse = pendingMemberResponse;
+ }
+ }
+
private static class JoinResult {
CompletableFuture joinFuture;
List records;