diff --git a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala index 84e609fcd92..4cc3f968d27 100644 --- a/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala @@ -49,29 +49,50 @@ class LeaveGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBa numPartitions = 3 ) + def instanceId(memberId: String): String = "instance_" + memberId + val memberIds = Range(0, 3).map { __ => + Uuid.randomUuid().toString + } + for (version <- 3 to ApiKeys.LEAVE_GROUP.latestVersion(isUnstableApiEnabled)) { - val memberId = Uuid.randomUuid().toString - assertEquals(Errors.NONE.code, consumerGroupHeartbeat( - groupId = "group", - memberId = memberId, - memberEpoch = 0, - instanceId = "instance-id", - rebalanceTimeoutMs = 5 * 60 * 1000, - subscribedTopicNames = List("foo"), - topicPartitions = List.empty, - ).errorCode) + // Join with all the members. + memberIds.foreach { memberId => + assertEquals(Errors.NONE.code, consumerGroupHeartbeat( + groupId = "group", + memberId = memberId, + memberEpoch = 0, + instanceId = instanceId(memberId), + rebalanceTimeoutMs = 5 * 60 * 1000, + subscribedTopicNames = List("foo"), + topicPartitions = List.empty, + ).errorCode) + } assertEquals( new LeaveGroupResponseData() .setMembers(List( new LeaveGroupResponseData.MemberResponse() - .setMemberId(memberId) - .setGroupInstanceId("instance-id") + .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID) + .setGroupInstanceId(instanceId(memberIds(0))), + new LeaveGroupResponseData.MemberResponse() + .setMemberId(memberIds(1)) + .setGroupInstanceId(instanceId(memberIds(1))), + new LeaveGroupResponseData.MemberResponse() + .setMemberId(memberIds(2)) + .setGroupInstanceId(null) ).asJava), classicLeaveGroup( groupId = "group", - memberIds = List(JoinGroupRequest.UNKNOWN_MEMBER_ID), - groupInstanceIds = List("instance-id"), + memberIds = List( + JoinGroupRequest.UNKNOWN_MEMBER_ID, + memberIds(1), + memberIds(2) + ), + groupInstanceIds = List( + instanceId(memberIds(0)), + instanceId(memberIds(1)), + null + ), version = version.toShort ) ) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index dd0c6954088..6b9a5e7cbd5 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -6020,7 +6020,7 @@ public class GroupMetadataManager { } if (group.type() == CLASSIC) { - return classicGroupLeaveToClassicGroup((ClassicGroup) group, context, request); + return classicGroupLeaveToClassicGroup((ClassicGroup) group, request); } else if (group.type() == CONSUMER) { return classicGroupLeaveToConsumerGroup((ConsumerGroup) group, request); } else { @@ -6046,48 +6046,46 @@ public class GroupMetadataManager { List records = new ArrayList<>(); for (MemberIdentity memberIdentity : request.members()) { - String memberId = memberIdentity.memberId(); - String instanceId = memberIdentity.groupInstanceId(); String reason = memberIdentity.reason() != null ? memberIdentity.reason() : "not provided"; - ConsumerGroupMember member; try { - if (instanceId == null) { - member = group.getOrMaybeCreateMember(memberId, false); - throwIfMemberDoesNotUseClassicProtocol(member); + ConsumerGroupMember member; + + if (memberIdentity.groupInstanceId() == null) { + member = group.getOrMaybeCreateMember(memberIdentity.memberId(), false); log.info("[GroupId {}] Dynamic member {} has left group " + "through explicit `LeaveGroup` request; client reason: {}", - groupId, memberId, reason); + groupId, memberIdentity.memberId(), reason); } else { - member = group.staticMember(instanceId); - throwIfStaticMemberIsUnknown(member, instanceId); + member = group.staticMember(memberIdentity.groupInstanceId()); + throwIfStaticMemberIsUnknown(member, memberIdentity.groupInstanceId()); // The LeaveGroup API allows administrative removal of members by GroupInstanceId // in which case we expect the MemberId to be undefined. - if (!UNKNOWN_MEMBER_ID.equals(memberId)) { - throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId); - throwIfMemberDoesNotUseClassicProtocol(member); + if (!UNKNOWN_MEMBER_ID.equals(memberIdentity.memberId())) { + throwIfInstanceIdIsFenced(member, groupId, memberIdentity.memberId(), memberIdentity.groupInstanceId()); } - memberId = member.memberId(); log.info("[GroupId {}] Static member {} with instance id {} has left group " + "through explicit `LeaveGroup` request; client reason: {}", - groupId, memberId, instanceId, reason); + groupId, memberIdentity.memberId(), memberIdentity.groupInstanceId(), reason); } - removeMember(records, groupId, memberId); - cancelTimers(groupId, memberId); + removeMember(records, groupId, member.memberId()); + cancelTimers(groupId, member.memberId()); + memberResponses.add( new MemberResponse() - .setMemberId(memberId) - .setGroupInstanceId(instanceId) + .setMemberId(memberIdentity.memberId()) + .setGroupInstanceId(memberIdentity.groupInstanceId()) ); + validLeaveGroupMembers.add(member); } catch (KafkaException e) { memberResponses.add( new MemberResponse() - .setMemberId(memberId) - .setGroupInstanceId(instanceId) + .setMemberId(memberIdentity.memberId()) + .setGroupInstanceId(memberIdentity.groupInstanceId()) .setErrorCode(Errors.forException(e).code()) ); } @@ -6126,7 +6124,6 @@ public class GroupMetadataManager { * Handle a classic LeaveGroupRequest to a ClassicGroup. * * @param group The ClassicGroup. - * @param context The request context. * @param request The actual LeaveGroup request. * * @return The LeaveGroup response and the GroupMetadata record to append if the group @@ -6134,7 +6131,6 @@ public class GroupMetadataManager { */ private CoordinatorResult classicGroupLeaveToClassicGroup( ClassicGroup group, - RequestContext context, LeaveGroupRequestData request ) throws UnknownMemberIdException { if (group.isInState(DEAD)) { diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 18540253487..0535f763b4f 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -117,8 +117,6 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; -import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol; -import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection; import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR; import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH; @@ -13861,15 +13859,17 @@ public class GroupMetadataManagerTest { context.assertJoinTimeout(groupId, memberId2, member2.rebalanceTimeoutMs()); context.assertSessionTimeout(groupId, memberId2, member2.classicMemberMetadata().get().sessionTimeoutMs()); - // Member 1 and member 2 leave the group. + // Member 1, member 2 and member 3 leave the group. CoordinatorResult leaveResult = context.sendClassicGroupLeave( new LeaveGroupRequestData() .setGroupId("group-id") .setMembers(List.of( // Valid member id. new MemberIdentity() - .setMemberId(memberId1), + .setMemberId(memberId1) + .setGroupInstanceId(null), new MemberIdentity() + .setMemberId(UNKNOWN_MEMBER_ID) .setGroupInstanceId(instanceId2), // Member that doesn't use the classic protocol. new MemberIdentity() @@ -13877,8 +13877,10 @@ public class GroupMetadataManagerTest { .setGroupInstanceId(instanceId3), // Unknown member id. new MemberIdentity() - .setMemberId("unknown-member-id"), + .setMemberId("unknown-member-id") + .setGroupInstanceId(null), new MemberIdentity() + .setMemberId(UNKNOWN_MEMBER_ID) .setGroupInstanceId("unknown-instance-id"), // Fenced instance id. new MemberIdentity() @@ -13895,11 +13897,10 @@ public class GroupMetadataManagerTest { .setMemberId(memberId1), new LeaveGroupResponseData.MemberResponse() .setGroupInstanceId(instanceId2) - .setMemberId(memberId2), + .setMemberId(UNKNOWN_MEMBER_ID), new LeaveGroupResponseData.MemberResponse() .setGroupInstanceId(instanceId3) - .setMemberId(memberId3) - .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()), + .setMemberId(memberId3), new LeaveGroupResponseData.MemberResponse() .setGroupInstanceId(null) .setMemberId("unknown-member-id") @@ -13908,8 +13909,8 @@ public class GroupMetadataManagerTest { .setGroupInstanceId("unknown-instance-id") .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()), new LeaveGroupResponseData.MemberResponse() - .setGroupInstanceId(instanceId3) .setMemberId("unknown-member-id") + .setGroupInstanceId(instanceId3) .setErrorCode(Errors.FENCED_INSTANCE_ID.code()) )), leaveResult.response() @@ -13924,6 +13925,12 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId2), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId2), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId2), + // Remove member 3. + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId3), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId3), + GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId3), + // Update subscription metadata. + GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Collections.emptyMap()), // Bump the group epoch. GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11) ); @@ -14045,7 +14052,7 @@ public class GroupMetadataManagerTest { String groupId = "group-id"; String memberId = Uuid.randomUuid().toString(); - // Consumer group without member using the classic protocol. + // Consumer group. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId) @@ -14058,9 +14065,7 @@ public class GroupMetadataManagerTest { .setGroupId("group-id") .setMembers(List.of( new MemberIdentity() - .setMemberId("unknown-member-id"), - new MemberIdentity() - .setMemberId(memberId) + .setMemberId("unknown-member-id") )) ); @@ -14070,10 +14075,6 @@ public class GroupMetadataManagerTest { new LeaveGroupResponseData.MemberResponse() .setGroupInstanceId(null) .setMemberId("unknown-member-id") - .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()), - new LeaveGroupResponseData.MemberResponse() - .setGroupInstanceId(null) - .setMemberId(memberId) .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) )), leaveResult.response() @@ -16047,9 +16048,15 @@ public class GroupMetadataManagerTest { new LeaveGroupRequestData() .setGroupId(groupId) .setMembers(List.of( - new MemberIdentity().setGroupInstanceId(memberId1), - new MemberIdentity().setGroupInstanceId(memberId2), - new MemberIdentity().setGroupInstanceId(memberId3) + new MemberIdentity() + .setMemberId(memberId1) + .setGroupInstanceId(null), + new MemberIdentity() + .setMemberId(memberId2) + .setGroupInstanceId(memberId2), + new MemberIdentity() + .setMemberId(UNKNOWN_MEMBER_ID) + .setGroupInstanceId(memberId3) )) ); @@ -16058,12 +16065,12 @@ public class GroupMetadataManagerTest { .setMembers(List.of( new LeaveGroupResponseData.MemberResponse() .setMemberId(memberId1) - .setGroupInstanceId(memberId1), + .setGroupInstanceId(null), new LeaveGroupResponseData.MemberResponse() .setMemberId(memberId2) .setGroupInstanceId(memberId2), new LeaveGroupResponseData.MemberResponse() - .setMemberId(memberId3) + .setMemberId(UNKNOWN_MEMBER_ID) .setGroupInstanceId(memberId3) )), result.response()