mirror of https://github.com/apache/kafka.git
KAFKA-18188; Admin LeaveGroup should allow removing member using consumer protocol by member id (#18116)
The LeaveGroup API is used by the admin client to remove static members or remove all members from the group. The latter does not work because the API does not allow removing a member using the CONSUMER protocol by member id. Moreover, the response should only include the member id if the member id was included in the request. This patch fixes both issues. Reviewers: Dongnuo Lyu <dlyu@confluent.io>, Christo Lolov <lolovc@amazon.com>, Jeff Kim <jeff.kim@confluent.io>
This commit is contained in:
parent
3cf8745243
commit
57737a357f
|
@ -49,29 +49,50 @@ class LeaveGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBa
|
||||||
numPartitions = 3
|
numPartitions = 3
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def instanceId(memberId: String): String = "instance_" + memberId
|
||||||
|
val memberIds = Range(0, 3).map { __ =>
|
||||||
|
Uuid.randomUuid().toString
|
||||||
|
}
|
||||||
|
|
||||||
for (version <- 3 to ApiKeys.LEAVE_GROUP.latestVersion(isUnstableApiEnabled)) {
|
for (version <- 3 to ApiKeys.LEAVE_GROUP.latestVersion(isUnstableApiEnabled)) {
|
||||||
val memberId = Uuid.randomUuid().toString
|
// Join with all the members.
|
||||||
|
memberIds.foreach { memberId =>
|
||||||
assertEquals(Errors.NONE.code, consumerGroupHeartbeat(
|
assertEquals(Errors.NONE.code, consumerGroupHeartbeat(
|
||||||
groupId = "group",
|
groupId = "group",
|
||||||
memberId = memberId,
|
memberId = memberId,
|
||||||
memberEpoch = 0,
|
memberEpoch = 0,
|
||||||
instanceId = "instance-id",
|
instanceId = instanceId(memberId),
|
||||||
rebalanceTimeoutMs = 5 * 60 * 1000,
|
rebalanceTimeoutMs = 5 * 60 * 1000,
|
||||||
subscribedTopicNames = List("foo"),
|
subscribedTopicNames = List("foo"),
|
||||||
topicPartitions = List.empty,
|
topicPartitions = List.empty,
|
||||||
).errorCode)
|
).errorCode)
|
||||||
|
}
|
||||||
|
|
||||||
assertEquals(
|
assertEquals(
|
||||||
new LeaveGroupResponseData()
|
new LeaveGroupResponseData()
|
||||||
.setMembers(List(
|
.setMembers(List(
|
||||||
new LeaveGroupResponseData.MemberResponse()
|
new LeaveGroupResponseData.MemberResponse()
|
||||||
.setMemberId(memberId)
|
.setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
|
||||||
.setGroupInstanceId("instance-id")
|
.setGroupInstanceId(instanceId(memberIds(0))),
|
||||||
|
new LeaveGroupResponseData.MemberResponse()
|
||||||
|
.setMemberId(memberIds(1))
|
||||||
|
.setGroupInstanceId(instanceId(memberIds(1))),
|
||||||
|
new LeaveGroupResponseData.MemberResponse()
|
||||||
|
.setMemberId(memberIds(2))
|
||||||
|
.setGroupInstanceId(null)
|
||||||
).asJava),
|
).asJava),
|
||||||
classicLeaveGroup(
|
classicLeaveGroup(
|
||||||
groupId = "group",
|
groupId = "group",
|
||||||
memberIds = List(JoinGroupRequest.UNKNOWN_MEMBER_ID),
|
memberIds = List(
|
||||||
groupInstanceIds = List("instance-id"),
|
JoinGroupRequest.UNKNOWN_MEMBER_ID,
|
||||||
|
memberIds(1),
|
||||||
|
memberIds(2)
|
||||||
|
),
|
||||||
|
groupInstanceIds = List(
|
||||||
|
instanceId(memberIds(0)),
|
||||||
|
instanceId(memberIds(1)),
|
||||||
|
null
|
||||||
|
),
|
||||||
version = version.toShort
|
version = version.toShort
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
|
@ -6020,7 +6020,7 @@ public class GroupMetadataManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (group.type() == CLASSIC) {
|
if (group.type() == CLASSIC) {
|
||||||
return classicGroupLeaveToClassicGroup((ClassicGroup) group, context, request);
|
return classicGroupLeaveToClassicGroup((ClassicGroup) group, request);
|
||||||
} else if (group.type() == CONSUMER) {
|
} else if (group.type() == CONSUMER) {
|
||||||
return classicGroupLeaveToConsumerGroup((ConsumerGroup) group, request);
|
return classicGroupLeaveToConsumerGroup((ConsumerGroup) group, request);
|
||||||
} else {
|
} else {
|
||||||
|
@ -6046,48 +6046,46 @@ public class GroupMetadataManager {
|
||||||
List<CoordinatorRecord> records = new ArrayList<>();
|
List<CoordinatorRecord> records = new ArrayList<>();
|
||||||
|
|
||||||
for (MemberIdentity memberIdentity : request.members()) {
|
for (MemberIdentity memberIdentity : request.members()) {
|
||||||
String memberId = memberIdentity.memberId();
|
|
||||||
String instanceId = memberIdentity.groupInstanceId();
|
|
||||||
String reason = memberIdentity.reason() != null ? memberIdentity.reason() : "not provided";
|
String reason = memberIdentity.reason() != null ? memberIdentity.reason() : "not provided";
|
||||||
|
|
||||||
ConsumerGroupMember member;
|
|
||||||
try {
|
try {
|
||||||
if (instanceId == null) {
|
ConsumerGroupMember member;
|
||||||
member = group.getOrMaybeCreateMember(memberId, false);
|
|
||||||
throwIfMemberDoesNotUseClassicProtocol(member);
|
if (memberIdentity.groupInstanceId() == null) {
|
||||||
|
member = group.getOrMaybeCreateMember(memberIdentity.memberId(), false);
|
||||||
|
|
||||||
log.info("[GroupId {}] Dynamic member {} has left group " +
|
log.info("[GroupId {}] Dynamic member {} has left group " +
|
||||||
"through explicit `LeaveGroup` request; client reason: {}",
|
"through explicit `LeaveGroup` request; client reason: {}",
|
||||||
groupId, memberId, reason);
|
groupId, memberIdentity.memberId(), reason);
|
||||||
} else {
|
} else {
|
||||||
member = group.staticMember(instanceId);
|
member = group.staticMember(memberIdentity.groupInstanceId());
|
||||||
throwIfStaticMemberIsUnknown(member, instanceId);
|
throwIfStaticMemberIsUnknown(member, memberIdentity.groupInstanceId());
|
||||||
// The LeaveGroup API allows administrative removal of members by GroupInstanceId
|
// The LeaveGroup API allows administrative removal of members by GroupInstanceId
|
||||||
// in which case we expect the MemberId to be undefined.
|
// in which case we expect the MemberId to be undefined.
|
||||||
if (!UNKNOWN_MEMBER_ID.equals(memberId)) {
|
if (!UNKNOWN_MEMBER_ID.equals(memberIdentity.memberId())) {
|
||||||
throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
|
throwIfInstanceIdIsFenced(member, groupId, memberIdentity.memberId(), memberIdentity.groupInstanceId());
|
||||||
throwIfMemberDoesNotUseClassicProtocol(member);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
memberId = member.memberId();
|
|
||||||
log.info("[GroupId {}] Static member {} with instance id {} has left group " +
|
log.info("[GroupId {}] Static member {} with instance id {} has left group " +
|
||||||
"through explicit `LeaveGroup` request; client reason: {}",
|
"through explicit `LeaveGroup` request; client reason: {}",
|
||||||
groupId, memberId, instanceId, reason);
|
groupId, memberIdentity.memberId(), memberIdentity.groupInstanceId(), reason);
|
||||||
}
|
}
|
||||||
|
|
||||||
removeMember(records, groupId, memberId);
|
removeMember(records, groupId, member.memberId());
|
||||||
cancelTimers(groupId, memberId);
|
cancelTimers(groupId, member.memberId());
|
||||||
|
|
||||||
memberResponses.add(
|
memberResponses.add(
|
||||||
new MemberResponse()
|
new MemberResponse()
|
||||||
.setMemberId(memberId)
|
.setMemberId(memberIdentity.memberId())
|
||||||
.setGroupInstanceId(instanceId)
|
.setGroupInstanceId(memberIdentity.groupInstanceId())
|
||||||
);
|
);
|
||||||
|
|
||||||
validLeaveGroupMembers.add(member);
|
validLeaveGroupMembers.add(member);
|
||||||
} catch (KafkaException e) {
|
} catch (KafkaException e) {
|
||||||
memberResponses.add(
|
memberResponses.add(
|
||||||
new MemberResponse()
|
new MemberResponse()
|
||||||
.setMemberId(memberId)
|
.setMemberId(memberIdentity.memberId())
|
||||||
.setGroupInstanceId(instanceId)
|
.setGroupInstanceId(memberIdentity.groupInstanceId())
|
||||||
.setErrorCode(Errors.forException(e).code())
|
.setErrorCode(Errors.forException(e).code())
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -6126,7 +6124,6 @@ public class GroupMetadataManager {
|
||||||
* Handle a classic LeaveGroupRequest to a ClassicGroup.
|
* Handle a classic LeaveGroupRequest to a ClassicGroup.
|
||||||
*
|
*
|
||||||
* @param group The ClassicGroup.
|
* @param group The ClassicGroup.
|
||||||
* @param context The request context.
|
|
||||||
* @param request The actual LeaveGroup request.
|
* @param request The actual LeaveGroup request.
|
||||||
*
|
*
|
||||||
* @return The LeaveGroup response and the GroupMetadata record to append if the group
|
* @return The LeaveGroup response and the GroupMetadata record to append if the group
|
||||||
|
@ -6134,7 +6131,6 @@ public class GroupMetadataManager {
|
||||||
*/
|
*/
|
||||||
private CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> classicGroupLeaveToClassicGroup(
|
private CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> classicGroupLeaveToClassicGroup(
|
||||||
ClassicGroup group,
|
ClassicGroup group,
|
||||||
RequestContext context,
|
|
||||||
LeaveGroupRequestData request
|
LeaveGroupRequestData request
|
||||||
) throws UnknownMemberIdException {
|
) throws UnknownMemberIdException {
|
||||||
if (group.isInState(DEAD)) {
|
if (group.isInState(DEAD)) {
|
||||||
|
|
|
@ -117,8 +117,6 @@ import java.util.stream.Collectors;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
|
|
||||||
import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
|
|
||||||
import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR;
|
import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR;
|
||||||
import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
|
import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
|
||||||
import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
|
import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
|
||||||
|
@ -13861,15 +13859,17 @@ public class GroupMetadataManagerTest {
|
||||||
context.assertJoinTimeout(groupId, memberId2, member2.rebalanceTimeoutMs());
|
context.assertJoinTimeout(groupId, memberId2, member2.rebalanceTimeoutMs());
|
||||||
context.assertSessionTimeout(groupId, memberId2, member2.classicMemberMetadata().get().sessionTimeoutMs());
|
context.assertSessionTimeout(groupId, memberId2, member2.classicMemberMetadata().get().sessionTimeoutMs());
|
||||||
|
|
||||||
// Member 1 and member 2 leave the group.
|
// Member 1, member 2 and member 3 leave the group.
|
||||||
CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> leaveResult = context.sendClassicGroupLeave(
|
CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> leaveResult = context.sendClassicGroupLeave(
|
||||||
new LeaveGroupRequestData()
|
new LeaveGroupRequestData()
|
||||||
.setGroupId("group-id")
|
.setGroupId("group-id")
|
||||||
.setMembers(List.of(
|
.setMembers(List.of(
|
||||||
// Valid member id.
|
// Valid member id.
|
||||||
new MemberIdentity()
|
new MemberIdentity()
|
||||||
.setMemberId(memberId1),
|
.setMemberId(memberId1)
|
||||||
|
.setGroupInstanceId(null),
|
||||||
new MemberIdentity()
|
new MemberIdentity()
|
||||||
|
.setMemberId(UNKNOWN_MEMBER_ID)
|
||||||
.setGroupInstanceId(instanceId2),
|
.setGroupInstanceId(instanceId2),
|
||||||
// Member that doesn't use the classic protocol.
|
// Member that doesn't use the classic protocol.
|
||||||
new MemberIdentity()
|
new MemberIdentity()
|
||||||
|
@ -13877,8 +13877,10 @@ public class GroupMetadataManagerTest {
|
||||||
.setGroupInstanceId(instanceId3),
|
.setGroupInstanceId(instanceId3),
|
||||||
// Unknown member id.
|
// Unknown member id.
|
||||||
new MemberIdentity()
|
new MemberIdentity()
|
||||||
.setMemberId("unknown-member-id"),
|
.setMemberId("unknown-member-id")
|
||||||
|
.setGroupInstanceId(null),
|
||||||
new MemberIdentity()
|
new MemberIdentity()
|
||||||
|
.setMemberId(UNKNOWN_MEMBER_ID)
|
||||||
.setGroupInstanceId("unknown-instance-id"),
|
.setGroupInstanceId("unknown-instance-id"),
|
||||||
// Fenced instance id.
|
// Fenced instance id.
|
||||||
new MemberIdentity()
|
new MemberIdentity()
|
||||||
|
@ -13895,11 +13897,10 @@ public class GroupMetadataManagerTest {
|
||||||
.setMemberId(memberId1),
|
.setMemberId(memberId1),
|
||||||
new LeaveGroupResponseData.MemberResponse()
|
new LeaveGroupResponseData.MemberResponse()
|
||||||
.setGroupInstanceId(instanceId2)
|
.setGroupInstanceId(instanceId2)
|
||||||
.setMemberId(memberId2),
|
.setMemberId(UNKNOWN_MEMBER_ID),
|
||||||
new LeaveGroupResponseData.MemberResponse()
|
new LeaveGroupResponseData.MemberResponse()
|
||||||
.setGroupInstanceId(instanceId3)
|
.setGroupInstanceId(instanceId3)
|
||||||
.setMemberId(memberId3)
|
.setMemberId(memberId3),
|
||||||
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()),
|
|
||||||
new LeaveGroupResponseData.MemberResponse()
|
new LeaveGroupResponseData.MemberResponse()
|
||||||
.setGroupInstanceId(null)
|
.setGroupInstanceId(null)
|
||||||
.setMemberId("unknown-member-id")
|
.setMemberId("unknown-member-id")
|
||||||
|
@ -13908,8 +13909,8 @@ public class GroupMetadataManagerTest {
|
||||||
.setGroupInstanceId("unknown-instance-id")
|
.setGroupInstanceId("unknown-instance-id")
|
||||||
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()),
|
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()),
|
||||||
new LeaveGroupResponseData.MemberResponse()
|
new LeaveGroupResponseData.MemberResponse()
|
||||||
.setGroupInstanceId(instanceId3)
|
|
||||||
.setMemberId("unknown-member-id")
|
.setMemberId("unknown-member-id")
|
||||||
|
.setGroupInstanceId(instanceId3)
|
||||||
.setErrorCode(Errors.FENCED_INSTANCE_ID.code())
|
.setErrorCode(Errors.FENCED_INSTANCE_ID.code())
|
||||||
)),
|
)),
|
||||||
leaveResult.response()
|
leaveResult.response()
|
||||||
|
@ -13924,6 +13925,12 @@ public class GroupMetadataManagerTest {
|
||||||
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId2),
|
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId2),
|
||||||
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId2),
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId2),
|
||||||
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId2),
|
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId2),
|
||||||
|
// Remove member 3.
|
||||||
|
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId3),
|
||||||
|
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId3),
|
||||||
|
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId3),
|
||||||
|
// Update subscription metadata.
|
||||||
|
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Collections.emptyMap()),
|
||||||
// Bump the group epoch.
|
// Bump the group epoch.
|
||||||
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
|
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
|
||||||
);
|
);
|
||||||
|
@ -14045,7 +14052,7 @@ public class GroupMetadataManagerTest {
|
||||||
String groupId = "group-id";
|
String groupId = "group-id";
|
||||||
String memberId = Uuid.randomUuid().toString();
|
String memberId = Uuid.randomUuid().toString();
|
||||||
|
|
||||||
// Consumer group without member using the classic protocol.
|
// Consumer group.
|
||||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||||
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
|
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
|
||||||
.withMember(new ConsumerGroupMember.Builder(memberId)
|
.withMember(new ConsumerGroupMember.Builder(memberId)
|
||||||
|
@ -14058,9 +14065,7 @@ public class GroupMetadataManagerTest {
|
||||||
.setGroupId("group-id")
|
.setGroupId("group-id")
|
||||||
.setMembers(List.of(
|
.setMembers(List.of(
|
||||||
new MemberIdentity()
|
new MemberIdentity()
|
||||||
.setMemberId("unknown-member-id"),
|
.setMemberId("unknown-member-id")
|
||||||
new MemberIdentity()
|
|
||||||
.setMemberId(memberId)
|
|
||||||
))
|
))
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -14070,10 +14075,6 @@ public class GroupMetadataManagerTest {
|
||||||
new LeaveGroupResponseData.MemberResponse()
|
new LeaveGroupResponseData.MemberResponse()
|
||||||
.setGroupInstanceId(null)
|
.setGroupInstanceId(null)
|
||||||
.setMemberId("unknown-member-id")
|
.setMemberId("unknown-member-id")
|
||||||
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()),
|
|
||||||
new LeaveGroupResponseData.MemberResponse()
|
|
||||||
.setGroupInstanceId(null)
|
|
||||||
.setMemberId(memberId)
|
|
||||||
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
|
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
|
||||||
)),
|
)),
|
||||||
leaveResult.response()
|
leaveResult.response()
|
||||||
|
@ -16047,9 +16048,15 @@ public class GroupMetadataManagerTest {
|
||||||
new LeaveGroupRequestData()
|
new LeaveGroupRequestData()
|
||||||
.setGroupId(groupId)
|
.setGroupId(groupId)
|
||||||
.setMembers(List.of(
|
.setMembers(List.of(
|
||||||
new MemberIdentity().setGroupInstanceId(memberId1),
|
new MemberIdentity()
|
||||||
new MemberIdentity().setGroupInstanceId(memberId2),
|
.setMemberId(memberId1)
|
||||||
new MemberIdentity().setGroupInstanceId(memberId3)
|
.setGroupInstanceId(null),
|
||||||
|
new MemberIdentity()
|
||||||
|
.setMemberId(memberId2)
|
||||||
|
.setGroupInstanceId(memberId2),
|
||||||
|
new MemberIdentity()
|
||||||
|
.setMemberId(UNKNOWN_MEMBER_ID)
|
||||||
|
.setGroupInstanceId(memberId3)
|
||||||
))
|
))
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -16058,12 +16065,12 @@ public class GroupMetadataManagerTest {
|
||||||
.setMembers(List.of(
|
.setMembers(List.of(
|
||||||
new LeaveGroupResponseData.MemberResponse()
|
new LeaveGroupResponseData.MemberResponse()
|
||||||
.setMemberId(memberId1)
|
.setMemberId(memberId1)
|
||||||
.setGroupInstanceId(memberId1),
|
.setGroupInstanceId(null),
|
||||||
new LeaveGroupResponseData.MemberResponse()
|
new LeaveGroupResponseData.MemberResponse()
|
||||||
.setMemberId(memberId2)
|
.setMemberId(memberId2)
|
||||||
.setGroupInstanceId(memberId2),
|
.setGroupInstanceId(memberId2),
|
||||||
new LeaveGroupResponseData.MemberResponse()
|
new LeaveGroupResponseData.MemberResponse()
|
||||||
.setMemberId(memberId3)
|
.setMemberId(UNKNOWN_MEMBER_ID)
|
||||||
.setGroupInstanceId(memberId3)
|
.setGroupInstanceId(memberId3)
|
||||||
)),
|
)),
|
||||||
result.response()
|
result.response()
|
||||||
|
|
Loading…
Reference in New Issue