diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 39306a9dadd..09e9c810536 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -2671,19 +2671,27 @@ public class GroupMetadataManager { ); } - private boolean initializedAssignmentPending(ShareGroup group) { - if (!shareGroupStatePartitionMetadata.containsKey(group.groupId())) { + // Visibility for testing + boolean initializedAssignmentPending(ShareGroup group) { + if (group.isEmpty()) { + // No members then no point in computing assignment. + return false; + } + + String groupId = group.groupId(); + + if (!shareGroupStatePartitionMetadata.containsKey(groupId) || + shareGroupStatePartitionMetadata.get(groupId).initializedTopics().isEmpty()) { // No initialized share partitions for the group so nothing can be assigned. return false; } - if (group.isEmpty()) { - // No members then no point of computing assignment. + Set subscribedTopicNames = group.subscribedTopicNames().keySet(); + // No subscription then no need to compute assignment. + if (subscribedTopicNames.isEmpty()) { return false; } - // We need to check if all the group initialized share partitions are part of the group assignment. - Map> initializedTps = stripInitValue(shareGroupStatePartitionMetadata.get(group.groupId()).initializedTopics()); Map> currentAssigned = new HashMap<>(); for (Assignment assignment : group.targetAssignment().values()) { for (Map.Entry> tps : assignment.partitions().entrySet()) { @@ -2692,7 +2700,20 @@ public class GroupMetadataManager { } } - return !initializedTps.equals(currentAssigned); + for (Map.Entry entry : shareGroupStatePartitionMetadata.get(groupId).initializedTopics().entrySet()) { + if (subscribedTopicNames.contains(entry.getValue().name())) { + // This topic is currently subscribed, so investigate further. + Set currentAssignedPartitions = currentAssigned.get(entry.getKey()); + if (currentAssignedPartitions != null && currentAssignedPartitions.equals(entry.getValue().partitions())) { + // The assigned and initialized partitions match, so assignment does not need to be recomputed. + continue; + } + // The assigned and initialized partitions do not match, OR + // this topic is not currently assigned, so recompute the assignment. + return true; + } + } + return false; } /** diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 111a1deb921..6564b9c3b58 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -112,6 +112,7 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.modern.Assignment; import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl; import org.apache.kafka.coordinator.group.modern.MemberState; +import org.apache.kafka.coordinator.group.modern.SubscriptionCount; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupBuilder; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember; @@ -23158,6 +23159,203 @@ public class GroupMetadataManagerTest { assertDoesNotThrow(() -> context.replay(record)); } + private record PendingAssignmentCase( + String description, + String groupId, + ShareGroup group, + boolean expectedValue, + Runnable assertions + ) { + } + + private static Stream> generatePendingAssignmentCases() { + String groupId1 = "groupId"; + Uuid tid1 = Uuid.randomUuid(); + String tName1 = "t1"; + Uuid tid2 = Uuid.randomUuid(); + String tName2 = "t2"; + + return Stream.of( + (GroupMetadataManagerTestContext context) -> { + ShareGroup group = mock(ShareGroup.class); + when(group.isEmpty()).thenReturn(true); + return new PendingAssignmentCase("Group is empty", groupId1, group, false, () -> { + verify(group, times(0)).groupId(); + verify(group).isEmpty(); + }); + }, + (GroupMetadataManagerTestContext context) -> { + ShareGroup group = mock(ShareGroup.class); + when(group.groupId()).thenReturn(groupId1); + when(group.isEmpty()).thenReturn(false); + return new PendingAssignmentCase("Group not in metadata", groupId1, group, false, () -> { + verify(group).groupId(); + verify(group).isEmpty(); + }); + }, + (GroupMetadataManagerTestContext context) -> { + ShareGroup group = mock(ShareGroup.class); + when(group.groupId()).thenReturn(groupId1); + when(group.isEmpty()).thenReturn(false); + context.groupMetadataManager.replay( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId1), + new ShareGroupStatePartitionMetadataValue() + ); + context.commit(); + return new PendingAssignmentCase("Group metadata initialized topics empty", groupId1, group, false, () -> { + verify(group).groupId(); + verify(group).isEmpty(); + }); + }, + (GroupMetadataManagerTestContext context) -> { + ShareGroup group = mock(ShareGroup.class); + when(group.groupId()).thenReturn(groupId1); + when(group.isEmpty()).thenReturn(false); + when(group.subscribedTopicNames()).thenReturn(Map.of()); + context.groupMetadataManager.replay( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId1), + new ShareGroupStatePartitionMetadataValue() + .setInitializedTopics(List.of( + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicName(tName1) + .setTopicId(tid1) + .setPartitions(List.of(0, 1)) + )) + ); + context.commit(); + return new PendingAssignmentCase("Empty group subscription", groupId1, group, false, () -> { + verify(group).groupId(); + verify(group).isEmpty(); + verify(group).subscribedTopicNames(); + }); + }, + (GroupMetadataManagerTestContext context) -> { + ShareGroup group = mock(ShareGroup.class); + when(group.groupId()).thenReturn(groupId1); + when(group.isEmpty()).thenReturn(false); + when(group.subscribedTopicNames()).thenReturn(Map.of(tName2, new SubscriptionCount(1, 1))); + when(group.targetAssignment()).thenReturn(Map.of()); + context.groupMetadataManager.replay( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId1), + new ShareGroupStatePartitionMetadataValue() + .setInitializedTopics(List.of( + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicName(tName1) + .setTopicId(tid1) + .setPartitions(List.of(0, 1)) + )) + ); + context.commit(); + return new PendingAssignmentCase("Subscribed topics not in metadata and empty assignment.", groupId1, group, false, () -> { + verify(group).groupId(); + verify(group).isEmpty(); + verify(group).subscribedTopicNames(); + verify(group).targetAssignment(); + }); + }, + (GroupMetadataManagerTestContext context) -> { + ShareGroup group = mock(ShareGroup.class); + when(group.groupId()).thenReturn(groupId1); + when(group.isEmpty()).thenReturn(false); + when(group.subscribedTopicNames()).thenReturn(Map.of(tName1, new SubscriptionCount(1, 1))); + when(group.targetAssignment()).thenReturn(Map.of(tName1, new Assignment(Map.of(tid1, Set.of(0, 1))))); + context.groupMetadataManager.replay( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId1), + new ShareGroupStatePartitionMetadataValue() + .setInitializedTopics(List.of( + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicName(tName1) + .setTopicId(tid1) + .setPartitions(List.of(0, 1)) + )) + ); + context.commit(); + return new PendingAssignmentCase("Subscribed topics in metadata and assigned partitions match.", groupId1, group, false, () -> { + verify(group).groupId(); + verify(group).isEmpty(); + verify(group).subscribedTopicNames(); + verify(group).targetAssignment(); + }); + }, + (GroupMetadataManagerTestContext context) -> { + ShareGroup group = mock(ShareGroup.class); + when(group.groupId()).thenReturn(groupId1); + when(group.isEmpty()).thenReturn(false); + when(group.subscribedTopicNames()).thenReturn(Map.of(tName1, new SubscriptionCount(1, 1))); + when(group.targetAssignment()).thenReturn(Map.of(tName1, new Assignment(Map.of(tid1, Set.of(0))))); + context.groupMetadataManager.replay( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId1), + new ShareGroupStatePartitionMetadataValue() + .setInitializedTopics(List.of( + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicName(tName1) + .setTopicId(tid1) + .setPartitions(List.of(0, 1)) + )) + ); + context.commit(); + return new PendingAssignmentCase("Subscribed topics in metadata but assigned partitions differ.", groupId1, group, true, () -> { + verify(group).groupId(); + verify(group).isEmpty(); + verify(group).subscribedTopicNames(); + verify(group).targetAssignment(); + }); + }, + (GroupMetadataManagerTestContext context) -> { + ShareGroup group = mock(ShareGroup.class); + when(group.groupId()).thenReturn(groupId1); + when(group.isEmpty()).thenReturn(false); + when(group.subscribedTopicNames()).thenReturn(Map.of(tName1, new SubscriptionCount(1, 1))); + when(group.targetAssignment()).thenReturn(Map.of( + tName1, new Assignment(Map.of(tid1, Set.of(0, 1))), + tName2, new Assignment(Map.of(tid2, Set.of(0))) + )); + context.groupMetadataManager.replay( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId1), + new ShareGroupStatePartitionMetadataValue() + .setInitializedTopics(List.of( + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicName(tName1) + .setTopicId(tid1) + .setPartitions(List.of(0, 1)), + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicName(tName2) + .setTopicId(tid2) + .setPartitions(List.of(0)) + )) + ); + context.commit(); + return new PendingAssignmentCase("Subscribed topics in metadata but assigned has other topics too.", groupId1, group, false, () -> { + verify(group).groupId(); + verify(group).isEmpty(); + verify(group).subscribedTopicNames(); + verify(group).targetAssignment(); + }); + } + ); + } + + @SuppressWarnings("ClassEscapesDefinedScope") + @ParameterizedTest + @MethodSource("generatePendingAssignmentCases") + public void testShareGroupPendingAssignments(Function testCase) { + MockPartitionAssignor assignor = new MockPartitionAssignor("simple"); + assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withShareGroupAssignor(assignor) + .build(); + + PendingAssignmentCase test = testCase.apply(context); + assertEquals(test.expectedValue, context.groupMetadataManager.initializedAssignmentPending(test.group), test.description); + test.assertions.run(); + } + private static void checkJoinGroupResponse( JoinGroupResponseData expectedResponse, JoinGroupResponseData actualResponse,