KAFKA-17425: Improve coexistence of consumer and share groups (#17039)

This PR ensures that using the various group RPCs work properly when issued against the wrong type of group, such as DescribeConsumerGroups for a share group, or ConsumerGroupHeartbeat for a share group. There are no changes to the RPC error codes required.

The significant code changes are:

Making sure that the group coordinator does not assume that only classic and consumer groups exist. This was the cause of a ClassCastException when ConsumerGroupHeartbeat was being used against a share group.
Making sure that committing offsets to a share group fails with GroupIdNotFoundException rather than java.lang.UnsupportedOperation. This was the cause of a name collision between a share group and a consumer group when using kafka-consumer-groups.sh --reset-offsets which inadvertently created a consumer group of the same name.

 Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
Andrew Schofield 2024-09-03 19:46:15 +01:00 committed by GitHub
parent b8ea409132
commit b0d0956b20
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 272 additions and 14 deletions

View File

@ -790,7 +790,7 @@ public class GroupMetadataManager {
} else { } else {
if (group.type() == CONSUMER) { if (group.type() == CONSUMER) {
return (ConsumerGroup) group; return (ConsumerGroup) group;
} else if (createIfNotExists && validateOnlineUpgrade((ClassicGroup) group)) { } else if (createIfNotExists && group.type() == CLASSIC && validateOnlineUpgrade((ClassicGroup) group)) {
return convertToConsumerGroup((ClassicGroup) group, records); return convertToConsumerGroup((ClassicGroup) group, records);
} else { } else {
throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.",
@ -3863,10 +3863,21 @@ public class GroupMetadataManager {
CompletableFuture<JoinGroupResponseData> responseFuture CompletableFuture<JoinGroupResponseData> responseFuture
) { ) {
Group group = groups.get(request.groupId(), Long.MAX_VALUE); Group group = groups.get(request.groupId(), Long.MAX_VALUE);
if (group != null && group.type() == CONSUMER && !group.isEmpty()) { if (group != null) {
if (group.type() == CONSUMER && !group.isEmpty()) {
// classicGroupJoinToConsumerGroup takes the join requests to non-empty consumer groups. // classicGroupJoinToConsumerGroup takes the join requests to non-empty consumer groups.
// The empty consumer groups should be converted to classic groups in classicGroupJoinToClassicGroup. // The empty consumer groups should be converted to classic groups in classicGroupJoinToClassicGroup.
return classicGroupJoinToConsumerGroup((ConsumerGroup) group, context, request, responseFuture); return classicGroupJoinToConsumerGroup((ConsumerGroup) group, context, request, responseFuture);
} else if (group.type() == CONSUMER || group.type() == CLASSIC) {
return classicGroupJoinToClassicGroup(context, request, responseFuture);
} else {
// Group exists but it's not a consumer group
responseFuture.complete(new JoinGroupResponseData()
.setMemberId(UNKNOWN_MEMBER_ID)
.setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code())
);
return EMPTY_RESULT;
}
} else { } else {
return classicGroupJoinToClassicGroup(context, request, responseFuture); return classicGroupJoinToClassicGroup(context, request, responseFuture);
} }
@ -5087,8 +5098,12 @@ public class GroupMetadataManager {
if (group.type() == CLASSIC) { if (group.type() == CLASSIC) {
return classicGroupSyncToClassicGroup((ClassicGroup) group, context, request, responseFuture); return classicGroupSyncToClassicGroup((ClassicGroup) group, context, request, responseFuture);
} else { } else if (group.type() == CONSUMER) {
return classicGroupSyncToConsumerGroup((ConsumerGroup) group, context, request, responseFuture); return classicGroupSyncToConsumerGroup((ConsumerGroup) group, context, request, responseFuture);
} else {
responseFuture.complete(new SyncGroupResponseData()
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
return EMPTY_RESULT;
} }
} }
@ -5355,8 +5370,12 @@ public class GroupMetadataManager {
if (group.type() == CLASSIC) { if (group.type() == CLASSIC) {
return classicGroupHeartbeatToClassicGroup((ClassicGroup) group, context, request); return classicGroupHeartbeatToClassicGroup((ClassicGroup) group, context, request);
} else { } else if (group.type() == CONSUMER) {
return classicGroupHeartbeatToConsumerGroup((ConsumerGroup) group, context, request); return classicGroupHeartbeatToConsumerGroup((ConsumerGroup) group, context, request);
} else {
throw new UnknownMemberIdException(
String.format("Group %s not found.", request.groupId())
);
} }
} }
@ -5536,8 +5555,10 @@ public class GroupMetadataManager {
if (group.type() == CLASSIC) { if (group.type() == CLASSIC) {
return classicGroupLeaveToClassicGroup((ClassicGroup) group, context, request); return classicGroupLeaveToClassicGroup((ClassicGroup) group, context, request);
} else { } else if (group.type() == CONSUMER) {
return classicGroupLeaveToConsumerGroup((ConsumerGroup) group, context, request); return classicGroupLeaveToConsumerGroup((ConsumerGroup) group, context, request);
} else {
throw new UnknownMemberIdException(String.format("Group %s not found.", request.groupId()));
} }
} }

View File

@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group.modern.share; package org.apache.kafka.coordinator.group.modern.share;
import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
@ -184,7 +185,7 @@ public class ShareGroup extends ModernGroup<ShareGroupMember> {
boolean isTransactional, boolean isTransactional,
short apiVersion short apiVersion
) { ) {
throw new UnsupportedOperationException("validateOffsetCommit is not supported for Share Groups."); throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", groupId));
} }
@Override @Override
@ -193,12 +194,12 @@ public class ShareGroup extends ModernGroup<ShareGroupMember> {
int memberEpoch, int memberEpoch,
long lastCommittedOffset long lastCommittedOffset
) { ) {
throw new UnsupportedOperationException("validateOffsetFetch is not supported for Share Groups."); throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", groupId));
} }
@Override @Override
public void validateOffsetDelete() { public void validateOffsetDelete() {
throw new UnsupportedOperationException("validateOffsetDelete is not supported for Share Groups."); throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", groupId));
} }
/** /**

View File

@ -14285,6 +14285,241 @@ public class GroupMetadataManagerTest {
context.assertNoRebalanceTimeout(groupId, memberId); context.assertNoRebalanceTimeout(groupId, memberId);
} }
@Test
public void testConsumerGroupHeartbeatOnShareGroup() {
String groupId = "group-foo";
String memberId = Uuid.randomUuid().toString();
MockPartitionAssignor assignor = new MockPartitionAssignor("share");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.withMetadataImage(MetadataImage.EMPTY)
.withShareGroup(new ShareGroupBuilder(groupId, 1)
.withMember(new ShareGroupMember.Builder(memberId)
.setState(MemberState.STABLE)
.setMemberEpoch(1)
.setPreviousMemberEpoch(0)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setSubscribedTopicNames(Collections.singletonList("foo"))
.build())
.withAssignment(memberId, mkAssignment())
.withAssignmentEpoch(1))
.build();
assertThrows(GroupIdNotFoundException.class, () -> context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberEpoch(0)
.setServerAssignor("range")
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
.setTopicPartitions(Collections.emptyList())));
}
@Test
public void testClassicGroupJoinOnShareGroup() throws Exception {
String groupId = "group-foo";
String memberId = Uuid.randomUuid().toString();
MockPartitionAssignor assignor = new MockPartitionAssignor("share");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.withMetadataImage(MetadataImage.EMPTY)
.withShareGroup(new ShareGroupBuilder(groupId, 1)
.withMember(new ShareGroupMember.Builder(memberId)
.setState(MemberState.STABLE)
.setMemberEpoch(1)
.setPreviousMemberEpoch(0)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setSubscribedTopicNames(Collections.singletonList("foo"))
.build())
.withAssignment(memberId, mkAssignment())
.withAssignmentEpoch(1))
.build();
JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
.withGroupId(groupId)
.withMemberId(UNKNOWN_MEMBER_ID)
.withProtocolType("consumer")
.withProtocols(new JoinGroupRequestProtocolCollection(0))
.build();
GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request);
assertTrue(joinResult.joinFuture.isDone());
assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code(), joinResult.joinFuture.get().errorCode());
}
@Test
public void testClassicGroupSyncOnShareGroup() throws Exception {
String groupId = "group-foo";
String memberId = Uuid.randomUuid().toString();
MockPartitionAssignor assignor = new MockPartitionAssignor("share");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.withMetadataImage(MetadataImage.EMPTY)
.withShareGroup(new ShareGroupBuilder(groupId, 1)
.withMember(new ShareGroupMember.Builder(memberId)
.setState(MemberState.STABLE)
.setMemberEpoch(1)
.setPreviousMemberEpoch(0)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setSubscribedTopicNames(Collections.singletonList("foo"))
.build())
.withAssignment(memberId, mkAssignment())
.withAssignmentEpoch(1))
.build();
SyncGroupRequestData request = new GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
.withGroupId(groupId)
.withGenerationId(1)
.withMemberId(memberId)
.build();
GroupMetadataManagerTestContext.SyncResult syncResult = context.sendClassicGroupSync(request);
assertTrue(syncResult.records.isEmpty());
assertTrue(syncResult.syncFuture.isDone());
assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), syncResult.syncFuture.get().errorCode());
}
@Test
public void testClassicGroupLeaveOnShareGroup() throws Exception {
String groupId = "group-foo";
String memberId = Uuid.randomUuid().toString();
MockPartitionAssignor assignor = new MockPartitionAssignor("share");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.withMetadataImage(MetadataImage.EMPTY)
.withShareGroup(new ShareGroupBuilder(groupId, 1)
.withMember(new ShareGroupMember.Builder(memberId)
.setState(MemberState.STABLE)
.setMemberEpoch(1)
.setPreviousMemberEpoch(0)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setSubscribedTopicNames(Collections.singletonList("foo"))
.build())
.withAssignment(memberId, mkAssignment())
.withAssignmentEpoch(1))
.build();
assertThrows(UnknownMemberIdException.class, () -> context.sendClassicGroupLeave(
new LeaveGroupRequestData()
.setGroupId(groupId)
.setMembers(Collections.singletonList(
new MemberIdentity()
.setMemberId(memberId)))));
}
@Test
public void testConsumerGroupDescribeOnShareGroup() {
String groupId = "group-foo";
String memberId = Uuid.randomUuid().toString();
MockPartitionAssignor assignor = new MockPartitionAssignor("share");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.withMetadataImage(MetadataImage.EMPTY)
.withShareGroup(new ShareGroupBuilder(groupId, 1)
.withMember(new ShareGroupMember.Builder(memberId)
.setState(MemberState.STABLE)
.setMemberEpoch(1)
.setPreviousMemberEpoch(0)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setSubscribedTopicNames(Collections.singletonList("foo"))
.build())
.withAssignment(memberId, mkAssignment())
.withAssignmentEpoch(1))
.build();
List<ConsumerGroupDescribeResponseData.DescribedGroup> expected = Collections.singletonList(
new ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupId)
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
);
List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = context.sendConsumerGroupDescribe(Collections.singletonList(groupId));
assertEquals(expected, actual);
}
@Test
public void testShareGroupHeartbeatOnConsumerGroup() {
String groupId = "group-foo";
// Use a static member id as it makes the test easier.
String memberId1 = Uuid.randomUuid().toString();
Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
// Consumer group with one static member.
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withConsumerGroupAssignors(Collections.singletonList(assignor))
.withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.build())
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
.withMember(new ConsumerGroupMember.Builder(memberId1)
.setState(MemberState.STABLE)
.setInstanceId(memberId1)
.setMemberEpoch(10)
.setPreviousMemberEpoch(9)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
.setServerAssignorName("range")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2)))
.build())
.withAssignment(memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2)))
.withAssignmentEpoch(10))
.build();
assertThrows(GroupIdNotFoundException.class, () ->
context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(Uuid.randomUuid().toString())
.setMemberEpoch(1)
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))));
}
@Test
public void testShareGroupDescribeOnConsumerGroup() {
String groupId = "group-foo";
String memberId = Uuid.randomUuid().toString();
int epoch = 10;
String topicName = "topicName";
ConsumerGroupMember.Builder memberBuilder = new ConsumerGroupMember.Builder(memberId)
.setSubscribedTopicNames(Collections.singletonList(topicName))
.setServerAssignorName("assignorName");
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withConsumerGroupAssignors(Collections.singletonList(assignor))
.withConsumerGroup(new ConsumerGroupBuilder(groupId, epoch)
.withMember(memberBuilder.build()))
.build();
List<ShareGroupDescribeResponseData.DescribedGroup> expected = Collections.singletonList(
new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupId)
.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
);
List<ShareGroupDescribeResponseData.DescribedGroup> actual = context.sendShareGroupDescribe(Collections.singletonList(groupId));
assertEquals(expected, actual);
}
private static void checkJoinGroupResponse( private static void checkJoinGroupResponse(
JoinGroupResponseData expectedResponse, JoinGroupResponseData expectedResponse,
JoinGroupResponseData actualResponse, JoinGroupResponseData actualResponse,

View File

@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group.modern.share; package org.apache.kafka.coordinator.group.modern.share;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
@ -551,7 +552,7 @@ public class ShareGroupTest {
@ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
public void testValidateOffsetCommit(short version) { public void testValidateOffsetCommit(short version) {
ShareGroup shareGroup = createShareGroup("group-foo"); ShareGroup shareGroup = createShareGroup("group-foo");
assertThrows(UnsupportedOperationException.class, () -> assertThrows(GroupIdNotFoundException.class, () ->
shareGroup.validateOffsetCommit(null, null, -1, false, version)); shareGroup.validateOffsetCommit(null, null, -1, false, version));
} }
@ -581,14 +582,14 @@ public class ShareGroupTest {
@Test @Test
public void testValidateOffsetFetch() { public void testValidateOffsetFetch() {
ShareGroup shareGroup = createShareGroup("group-foo"); ShareGroup shareGroup = createShareGroup("group-foo");
assertThrows(UnsupportedOperationException.class, () -> assertThrows(GroupIdNotFoundException.class, () ->
shareGroup.validateOffsetFetch(null, -1, -1)); shareGroup.validateOffsetFetch(null, -1, -1));
} }
@Test @Test
public void testValidateOffsetDelete() { public void testValidateOffsetDelete() {
ShareGroup shareGroup = createShareGroup("group-foo"); ShareGroup shareGroup = createShareGroup("group-foo");
assertThrows(UnsupportedOperationException.class, shareGroup::validateOffsetDelete); assertThrows(GroupIdNotFoundException.class, shareGroup::validateOffsetDelete);
} }
@Test @Test