diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 506203b9b83..4816f780ab6 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -790,7 +790,7 @@ public class GroupMetadataManager { } else { if (group.type() == CONSUMER) { return (ConsumerGroup) group; - } else if (createIfNotExists && validateOnlineUpgrade((ClassicGroup) group)) { + } else if (createIfNotExists && group.type() == CLASSIC && validateOnlineUpgrade((ClassicGroup) group)) { return convertToConsumerGroup((ClassicGroup) group, records); } else { throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", @@ -3863,10 +3863,21 @@ public class GroupMetadataManager { CompletableFuture responseFuture ) { Group group = groups.get(request.groupId(), Long.MAX_VALUE); - if (group != null && group.type() == CONSUMER && !group.isEmpty()) { - // classicGroupJoinToConsumerGroup takes the join requests to non-empty consumer groups. - // The empty consumer groups should be converted to classic groups in classicGroupJoinToClassicGroup. - return classicGroupJoinToConsumerGroup((ConsumerGroup) group, context, request, responseFuture); + if (group != null) { + if (group.type() == CONSUMER && !group.isEmpty()) { + // classicGroupJoinToConsumerGroup takes the join requests to non-empty consumer groups. + // The empty consumer groups should be converted to classic groups in classicGroupJoinToClassicGroup. + return classicGroupJoinToConsumerGroup((ConsumerGroup) group, context, request, responseFuture); + } else if (group.type() == CONSUMER || group.type() == CLASSIC) { + return classicGroupJoinToClassicGroup(context, request, responseFuture); + } else { + // Group exists but it's not a consumer group + responseFuture.complete(new JoinGroupResponseData() + .setMemberId(UNKNOWN_MEMBER_ID) + .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code()) + ); + return EMPTY_RESULT; + } } else { return classicGroupJoinToClassicGroup(context, request, responseFuture); } @@ -5087,8 +5098,12 @@ public class GroupMetadataManager { if (group.type() == CLASSIC) { return classicGroupSyncToClassicGroup((ClassicGroup) group, context, request, responseFuture); - } else { + } else if (group.type() == CONSUMER) { return classicGroupSyncToConsumerGroup((ConsumerGroup) group, context, request, responseFuture); + } else { + responseFuture.complete(new SyncGroupResponseData() + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())); + return EMPTY_RESULT; } } @@ -5355,8 +5370,12 @@ public class GroupMetadataManager { if (group.type() == CLASSIC) { return classicGroupHeartbeatToClassicGroup((ClassicGroup) group, context, request); - } else { + } else if (group.type() == CONSUMER) { return classicGroupHeartbeatToConsumerGroup((ConsumerGroup) group, context, request); + } else { + throw new UnknownMemberIdException( + String.format("Group %s not found.", request.groupId()) + ); } } @@ -5536,8 +5555,10 @@ public class GroupMetadataManager { if (group.type() == CLASSIC) { return classicGroupLeaveToClassicGroup((ClassicGroup) group, context, request); - } else { + } else if (group.type() == CONSUMER) { return classicGroupLeaveToConsumerGroup((ConsumerGroup) group, context, request); + } else { + throw new UnknownMemberIdException(String.format("Group %s not found.", request.groupId())); } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java index f54b6559b91..b7c0c433842 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java @@ -17,6 +17,7 @@ package org.apache.kafka.coordinator.group.modern.share; import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.message.ShareGroupDescribeResponseData; import org.apache.kafka.common.protocol.Errors; @@ -184,7 +185,7 @@ public class ShareGroup extends ModernGroup { boolean isTransactional, short apiVersion ) { - throw new UnsupportedOperationException("validateOffsetCommit is not supported for Share Groups."); + throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", groupId)); } @Override @@ -193,12 +194,12 @@ public class ShareGroup extends ModernGroup { int memberEpoch, long lastCommittedOffset ) { - throw new UnsupportedOperationException("validateOffsetFetch is not supported for Share Groups."); + throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", groupId)); } @Override public void validateOffsetDelete() { - throw new UnsupportedOperationException("validateOffsetDelete is not supported for Share Groups."); + throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", groupId)); } /** diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index b500bf57add..22ae88aaf53 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -14285,6 +14285,241 @@ public class GroupMetadataManagerTest { context.assertNoRebalanceTimeout(groupId, memberId); } + @Test + public void testConsumerGroupHeartbeatOnShareGroup() { + String groupId = "group-foo"; + String memberId = Uuid.randomUuid().toString(); + + MockPartitionAssignor assignor = new MockPartitionAssignor("share"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withShareGroupAssignor(assignor) + .withMetadataImage(MetadataImage.EMPTY) + .withShareGroup(new ShareGroupBuilder(groupId, 1) + .withMember(new ShareGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setSubscribedTopicNames(Collections.singletonList("foo")) + .build()) + .withAssignment(memberId, mkAssignment()) + .withAssignmentEpoch(1)) + .build(); + + assertThrows(GroupIdNotFoundException.class, () -> context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberEpoch(0) + .setServerAssignor("range") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setTopicPartitions(Collections.emptyList()))); + } + + @Test + public void testClassicGroupJoinOnShareGroup() throws Exception { + String groupId = "group-foo"; + String memberId = Uuid.randomUuid().toString(); + + MockPartitionAssignor assignor = new MockPartitionAssignor("share"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withShareGroupAssignor(assignor) + .withMetadataImage(MetadataImage.EMPTY) + .withShareGroup(new ShareGroupBuilder(groupId, 1) + .withMember(new ShareGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setSubscribedTopicNames(Collections.singletonList("foo")) + .build()) + .withAssignment(memberId, mkAssignment()) + .withAssignmentEpoch(1)) + .build(); + + JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() + .withGroupId(groupId) + .withMemberId(UNKNOWN_MEMBER_ID) + .withProtocolType("consumer") + .withProtocols(new JoinGroupRequestProtocolCollection(0)) + .build(); + + GroupMetadataManagerTestContext.JoinResult joinResult = context.sendClassicGroupJoin(request); + assertTrue(joinResult.joinFuture.isDone()); + assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code(), joinResult.joinFuture.get().errorCode()); + } + + @Test + public void testClassicGroupSyncOnShareGroup() throws Exception { + String groupId = "group-foo"; + String memberId = Uuid.randomUuid().toString(); + + MockPartitionAssignor assignor = new MockPartitionAssignor("share"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withShareGroupAssignor(assignor) + .withMetadataImage(MetadataImage.EMPTY) + .withShareGroup(new ShareGroupBuilder(groupId, 1) + .withMember(new ShareGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setSubscribedTopicNames(Collections.singletonList("foo")) + .build()) + .withAssignment(memberId, mkAssignment()) + .withAssignmentEpoch(1)) + .build(); + + SyncGroupRequestData request = new GroupMetadataManagerTestContext.SyncGroupRequestBuilder() + .withGroupId(groupId) + .withGenerationId(1) + .withMemberId(memberId) + .build(); + + GroupMetadataManagerTestContext.SyncResult syncResult = context.sendClassicGroupSync(request); + + assertTrue(syncResult.records.isEmpty()); + assertTrue(syncResult.syncFuture.isDone()); + assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), syncResult.syncFuture.get().errorCode()); + } + + @Test + public void testClassicGroupLeaveOnShareGroup() throws Exception { + String groupId = "group-foo"; + String memberId = Uuid.randomUuid().toString(); + + MockPartitionAssignor assignor = new MockPartitionAssignor("share"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withShareGroupAssignor(assignor) + .withMetadataImage(MetadataImage.EMPTY) + .withShareGroup(new ShareGroupBuilder(groupId, 1) + .withMember(new ShareGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setSubscribedTopicNames(Collections.singletonList("foo")) + .build()) + .withAssignment(memberId, mkAssignment()) + .withAssignmentEpoch(1)) + .build(); + + assertThrows(UnknownMemberIdException.class, () -> context.sendClassicGroupLeave( + new LeaveGroupRequestData() + .setGroupId(groupId) + .setMembers(Collections.singletonList( + new MemberIdentity() + .setMemberId(memberId))))); + } + + @Test + public void testConsumerGroupDescribeOnShareGroup() { + String groupId = "group-foo"; + String memberId = Uuid.randomUuid().toString(); + + MockPartitionAssignor assignor = new MockPartitionAssignor("share"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withShareGroupAssignor(assignor) + .withMetadataImage(MetadataImage.EMPTY) + .withShareGroup(new ShareGroupBuilder(groupId, 1) + .withMember(new ShareGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setSubscribedTopicNames(Collections.singletonList("foo")) + .build()) + .withAssignment(memberId, mkAssignment()) + .withAssignmentEpoch(1)) + .build(); + + List expected = Collections.singletonList( + new ConsumerGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupId) + .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()) + ); + + List actual = context.sendConsumerGroupDescribe(Collections.singletonList(groupId)); + assertEquals(expected, actual); + } + + @Test + public void testShareGroupHeartbeatOnConsumerGroup() { + String groupId = "group-foo"; + // Use a static member id as it makes the test easier. + String memberId1 = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + + // Consumer group with one static member. + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroupAssignors(Collections.singletonList(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.STABLE) + .setInstanceId(memberId1) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .build()) + .withAssignment(memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .withAssignmentEpoch(10)) + .build(); + + assertThrows(GroupIdNotFoundException.class, () -> + context.shareGroupHeartbeat( + new ShareGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(Uuid.randomUuid().toString()) + .setMemberEpoch(1) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")))); + } + + @Test + public void testShareGroupDescribeOnConsumerGroup() { + String groupId = "group-foo"; + String memberId = Uuid.randomUuid().toString(); + + int epoch = 10; + String topicName = "topicName"; + ConsumerGroupMember.Builder memberBuilder = new ConsumerGroupMember.Builder(memberId) + .setSubscribedTopicNames(Collections.singletonList(topicName)) + .setServerAssignorName("assignorName"); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroupAssignors(Collections.singletonList(assignor)) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, epoch) + .withMember(memberBuilder.build())) + .build(); + + List expected = Collections.singletonList( + new ShareGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupId) + .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()) + ); + + List actual = context.sendShareGroupDescribe(Collections.singletonList(groupId)); + assertEquals(expected, actual); + } + private static void checkJoinGroupResponse( JoinGroupResponseData expectedResponse, JoinGroupResponseData actualResponse, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java index 0d84d0c0da4..e53a9dac910 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.coordinator.group.modern.share; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.message.ShareGroupDescribeResponseData; @@ -551,7 +552,7 @@ public class ShareGroupTest { @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) public void testValidateOffsetCommit(short version) { ShareGroup shareGroup = createShareGroup("group-foo"); - assertThrows(UnsupportedOperationException.class, () -> + assertThrows(GroupIdNotFoundException.class, () -> shareGroup.validateOffsetCommit(null, null, -1, false, version)); } @@ -581,14 +582,14 @@ public class ShareGroupTest { @Test public void testValidateOffsetFetch() { ShareGroup shareGroup = createShareGroup("group-foo"); - assertThrows(UnsupportedOperationException.class, () -> + assertThrows(GroupIdNotFoundException.class, () -> shareGroup.validateOffsetFetch(null, -1, -1)); } @Test public void testValidateOffsetDelete() { ShareGroup shareGroup = createShareGroup("group-foo"); - assertThrows(UnsupportedOperationException.class, shareGroup::validateOffsetDelete); + assertThrows(GroupIdNotFoundException.class, shareGroup::validateOffsetDelete); } @Test