From da4fbba2793528e283458e080a690ad141857b0b Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Fri, 4 Jul 2025 14:55:19 +0100 Subject: [PATCH] KAFKA-19468: Ignore unsubscribed topics when computing share assignment (#20101) When the group coordinator is processing a heartbeat from a share consumer, it must decide whether the recompute the assignment. Part of this decision hinges on whether the assigned partitions match the partitions initialised by the share coordinator. However, when the set of subscribed topics changes, there may be initialised partitions which are not currently assigned. Topics which are not subscribed should be omitted from the calculation about whether to recompute the assignment. Co-authored-by: Sushant Mahajan Reviewers: Lan Ding <53332773+DL1231@users.noreply.github.com>, Sushant Mahajan , Apoorv Mittal --- .../group/GroupMetadataManager.java | 35 +++- .../group/GroupMetadataManagerTest.java | 198 ++++++++++++++++++ 2 files changed, 226 insertions(+), 7 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 39306a9dadd..09e9c810536 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -2671,19 +2671,27 @@ public class GroupMetadataManager { ); } - private boolean initializedAssignmentPending(ShareGroup group) { - if (!shareGroupStatePartitionMetadata.containsKey(group.groupId())) { + // Visibility for testing + boolean initializedAssignmentPending(ShareGroup group) { + if (group.isEmpty()) { + // No members then no point in computing assignment. + return false; + } + + String groupId = group.groupId(); + + if (!shareGroupStatePartitionMetadata.containsKey(groupId) || + shareGroupStatePartitionMetadata.get(groupId).initializedTopics().isEmpty()) { // No initialized share partitions for the group so nothing can be assigned. return false; } - if (group.isEmpty()) { - // No members then no point of computing assignment. + Set subscribedTopicNames = group.subscribedTopicNames().keySet(); + // No subscription then no need to compute assignment. + if (subscribedTopicNames.isEmpty()) { return false; } - // We need to check if all the group initialized share partitions are part of the group assignment. - Map> initializedTps = stripInitValue(shareGroupStatePartitionMetadata.get(group.groupId()).initializedTopics()); Map> currentAssigned = new HashMap<>(); for (Assignment assignment : group.targetAssignment().values()) { for (Map.Entry> tps : assignment.partitions().entrySet()) { @@ -2692,7 +2700,20 @@ public class GroupMetadataManager { } } - return !initializedTps.equals(currentAssigned); + for (Map.Entry entry : shareGroupStatePartitionMetadata.get(groupId).initializedTopics().entrySet()) { + if (subscribedTopicNames.contains(entry.getValue().name())) { + // This topic is currently subscribed, so investigate further. + Set currentAssignedPartitions = currentAssigned.get(entry.getKey()); + if (currentAssignedPartitions != null && currentAssignedPartitions.equals(entry.getValue().partitions())) { + // The assigned and initialized partitions match, so assignment does not need to be recomputed. + continue; + } + // The assigned and initialized partitions do not match, OR + // this topic is not currently assigned, so recompute the assignment. + return true; + } + } + return false; } /** diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 111a1deb921..6564b9c3b58 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -112,6 +112,7 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue; import org.apache.kafka.coordinator.group.modern.Assignment; import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl; import org.apache.kafka.coordinator.group.modern.MemberState; +import org.apache.kafka.coordinator.group.modern.SubscriptionCount; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupBuilder; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember; @@ -23158,6 +23159,203 @@ public class GroupMetadataManagerTest { assertDoesNotThrow(() -> context.replay(record)); } + private record PendingAssignmentCase( + String description, + String groupId, + ShareGroup group, + boolean expectedValue, + Runnable assertions + ) { + } + + private static Stream> generatePendingAssignmentCases() { + String groupId1 = "groupId"; + Uuid tid1 = Uuid.randomUuid(); + String tName1 = "t1"; + Uuid tid2 = Uuid.randomUuid(); + String tName2 = "t2"; + + return Stream.of( + (GroupMetadataManagerTestContext context) -> { + ShareGroup group = mock(ShareGroup.class); + when(group.isEmpty()).thenReturn(true); + return new PendingAssignmentCase("Group is empty", groupId1, group, false, () -> { + verify(group, times(0)).groupId(); + verify(group).isEmpty(); + }); + }, + (GroupMetadataManagerTestContext context) -> { + ShareGroup group = mock(ShareGroup.class); + when(group.groupId()).thenReturn(groupId1); + when(group.isEmpty()).thenReturn(false); + return new PendingAssignmentCase("Group not in metadata", groupId1, group, false, () -> { + verify(group).groupId(); + verify(group).isEmpty(); + }); + }, + (GroupMetadataManagerTestContext context) -> { + ShareGroup group = mock(ShareGroup.class); + when(group.groupId()).thenReturn(groupId1); + when(group.isEmpty()).thenReturn(false); + context.groupMetadataManager.replay( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId1), + new ShareGroupStatePartitionMetadataValue() + ); + context.commit(); + return new PendingAssignmentCase("Group metadata initialized topics empty", groupId1, group, false, () -> { + verify(group).groupId(); + verify(group).isEmpty(); + }); + }, + (GroupMetadataManagerTestContext context) -> { + ShareGroup group = mock(ShareGroup.class); + when(group.groupId()).thenReturn(groupId1); + when(group.isEmpty()).thenReturn(false); + when(group.subscribedTopicNames()).thenReturn(Map.of()); + context.groupMetadataManager.replay( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId1), + new ShareGroupStatePartitionMetadataValue() + .setInitializedTopics(List.of( + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicName(tName1) + .setTopicId(tid1) + .setPartitions(List.of(0, 1)) + )) + ); + context.commit(); + return new PendingAssignmentCase("Empty group subscription", groupId1, group, false, () -> { + verify(group).groupId(); + verify(group).isEmpty(); + verify(group).subscribedTopicNames(); + }); + }, + (GroupMetadataManagerTestContext context) -> { + ShareGroup group = mock(ShareGroup.class); + when(group.groupId()).thenReturn(groupId1); + when(group.isEmpty()).thenReturn(false); + when(group.subscribedTopicNames()).thenReturn(Map.of(tName2, new SubscriptionCount(1, 1))); + when(group.targetAssignment()).thenReturn(Map.of()); + context.groupMetadataManager.replay( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId1), + new ShareGroupStatePartitionMetadataValue() + .setInitializedTopics(List.of( + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicName(tName1) + .setTopicId(tid1) + .setPartitions(List.of(0, 1)) + )) + ); + context.commit(); + return new PendingAssignmentCase("Subscribed topics not in metadata and empty assignment.", groupId1, group, false, () -> { + verify(group).groupId(); + verify(group).isEmpty(); + verify(group).subscribedTopicNames(); + verify(group).targetAssignment(); + }); + }, + (GroupMetadataManagerTestContext context) -> { + ShareGroup group = mock(ShareGroup.class); + when(group.groupId()).thenReturn(groupId1); + when(group.isEmpty()).thenReturn(false); + when(group.subscribedTopicNames()).thenReturn(Map.of(tName1, new SubscriptionCount(1, 1))); + when(group.targetAssignment()).thenReturn(Map.of(tName1, new Assignment(Map.of(tid1, Set.of(0, 1))))); + context.groupMetadataManager.replay( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId1), + new ShareGroupStatePartitionMetadataValue() + .setInitializedTopics(List.of( + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicName(tName1) + .setTopicId(tid1) + .setPartitions(List.of(0, 1)) + )) + ); + context.commit(); + return new PendingAssignmentCase("Subscribed topics in metadata and assigned partitions match.", groupId1, group, false, () -> { + verify(group).groupId(); + verify(group).isEmpty(); + verify(group).subscribedTopicNames(); + verify(group).targetAssignment(); + }); + }, + (GroupMetadataManagerTestContext context) -> { + ShareGroup group = mock(ShareGroup.class); + when(group.groupId()).thenReturn(groupId1); + when(group.isEmpty()).thenReturn(false); + when(group.subscribedTopicNames()).thenReturn(Map.of(tName1, new SubscriptionCount(1, 1))); + when(group.targetAssignment()).thenReturn(Map.of(tName1, new Assignment(Map.of(tid1, Set.of(0))))); + context.groupMetadataManager.replay( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId1), + new ShareGroupStatePartitionMetadataValue() + .setInitializedTopics(List.of( + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicName(tName1) + .setTopicId(tid1) + .setPartitions(List.of(0, 1)) + )) + ); + context.commit(); + return new PendingAssignmentCase("Subscribed topics in metadata but assigned partitions differ.", groupId1, group, true, () -> { + verify(group).groupId(); + verify(group).isEmpty(); + verify(group).subscribedTopicNames(); + verify(group).targetAssignment(); + }); + }, + (GroupMetadataManagerTestContext context) -> { + ShareGroup group = mock(ShareGroup.class); + when(group.groupId()).thenReturn(groupId1); + when(group.isEmpty()).thenReturn(false); + when(group.subscribedTopicNames()).thenReturn(Map.of(tName1, new SubscriptionCount(1, 1))); + when(group.targetAssignment()).thenReturn(Map.of( + tName1, new Assignment(Map.of(tid1, Set.of(0, 1))), + tName2, new Assignment(Map.of(tid2, Set.of(0))) + )); + context.groupMetadataManager.replay( + new ShareGroupStatePartitionMetadataKey() + .setGroupId(groupId1), + new ShareGroupStatePartitionMetadataValue() + .setInitializedTopics(List.of( + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicName(tName1) + .setTopicId(tid1) + .setPartitions(List.of(0, 1)), + new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo() + .setTopicName(tName2) + .setTopicId(tid2) + .setPartitions(List.of(0)) + )) + ); + context.commit(); + return new PendingAssignmentCase("Subscribed topics in metadata but assigned has other topics too.", groupId1, group, false, () -> { + verify(group).groupId(); + verify(group).isEmpty(); + verify(group).subscribedTopicNames(); + verify(group).targetAssignment(); + }); + } + ); + } + + @SuppressWarnings("ClassEscapesDefinedScope") + @ParameterizedTest + @MethodSource("generatePendingAssignmentCases") + public void testShareGroupPendingAssignments(Function testCase) { + MockPartitionAssignor assignor = new MockPartitionAssignor("simple"); + assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withShareGroupAssignor(assignor) + .build(); + + PendingAssignmentCase test = testCase.apply(context); + assertEquals(test.expectedValue, context.groupMetadataManager.initializedAssignmentPending(test.group), test.description); + test.assertions.run(); + } + private static void checkJoinGroupResponse( JoinGroupResponseData expectedResponse, JoinGroupResponseData actualResponse,