KAFKA-19468: Ignore unsubscribed topics when computing share assignment (#20101)

When the group coordinator is processing a heartbeat from a share
consumer, it must decide whether the recompute the assignment. Part of
this decision hinges on whether the assigned partitions match the
partitions initialised by the share coordinator. However, when the set
of subscribed topics changes, there may be initialised partitions which
are not currently assigned. Topics which are not subscribed should be
omitted from the calculation about whether to recompute the assignment.

Co-authored-by: Sushant Mahajan <smahajan@confluent.io>

Reviewers: Lan Ding <53332773+DL1231@users.noreply.github.com>, Sushant
 Mahajan <smahajan@confluent.io>, Apoorv Mittal
 <apoorvmittal10@gmail.com>
This commit is contained in:
Andrew Schofield 2025-07-04 14:55:19 +01:00 committed by GitHub
parent 860853dba2
commit da4fbba279
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 226 additions and 7 deletions

View File

@ -2671,19 +2671,27 @@ public class GroupMetadataManager {
);
}
private boolean initializedAssignmentPending(ShareGroup group) {
if (!shareGroupStatePartitionMetadata.containsKey(group.groupId())) {
// Visibility for testing
boolean initializedAssignmentPending(ShareGroup group) {
if (group.isEmpty()) {
// No members then no point in computing assignment.
return false;
}
String groupId = group.groupId();
if (!shareGroupStatePartitionMetadata.containsKey(groupId) ||
shareGroupStatePartitionMetadata.get(groupId).initializedTopics().isEmpty()) {
// No initialized share partitions for the group so nothing can be assigned.
return false;
}
if (group.isEmpty()) {
// No members then no point of computing assignment.
Set<String> subscribedTopicNames = group.subscribedTopicNames().keySet();
// No subscription then no need to compute assignment.
if (subscribedTopicNames.isEmpty()) {
return false;
}
// We need to check if all the group initialized share partitions are part of the group assignment.
Map<Uuid, Set<Integer>> initializedTps = stripInitValue(shareGroupStatePartitionMetadata.get(group.groupId()).initializedTopics());
Map<Uuid, Set<Integer>> currentAssigned = new HashMap<>();
for (Assignment assignment : group.targetAssignment().values()) {
for (Map.Entry<Uuid, Set<Integer>> tps : assignment.partitions().entrySet()) {
@ -2692,7 +2700,20 @@ public class GroupMetadataManager {
}
}
return !initializedTps.equals(currentAssigned);
for (Map.Entry<Uuid, InitMapValue> entry : shareGroupStatePartitionMetadata.get(groupId).initializedTopics().entrySet()) {
if (subscribedTopicNames.contains(entry.getValue().name())) {
// This topic is currently subscribed, so investigate further.
Set<Integer> currentAssignedPartitions = currentAssigned.get(entry.getKey());
if (currentAssignedPartitions != null && currentAssignedPartitions.equals(entry.getValue().partitions())) {
// The assigned and initialized partitions match, so assignment does not need to be recomputed.
continue;
}
// The assigned and initialized partitions do not match, OR
// this topic is not currently assigned, so recompute the assignment.
return true;
}
}
return false;
}
/**

View File

@ -112,6 +112,7 @@ import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
import org.apache.kafka.coordinator.group.modern.MemberState;
import org.apache.kafka.coordinator.group.modern.SubscriptionCount;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupBuilder;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
@ -23158,6 +23159,203 @@ public class GroupMetadataManagerTest {
assertDoesNotThrow(() -> context.replay(record));
}
private record PendingAssignmentCase(
String description,
String groupId,
ShareGroup group,
boolean expectedValue,
Runnable assertions
) {
}
private static Stream<Function<GroupMetadataManagerTestContext, PendingAssignmentCase>> generatePendingAssignmentCases() {
String groupId1 = "groupId";
Uuid tid1 = Uuid.randomUuid();
String tName1 = "t1";
Uuid tid2 = Uuid.randomUuid();
String tName2 = "t2";
return Stream.of(
(GroupMetadataManagerTestContext context) -> {
ShareGroup group = mock(ShareGroup.class);
when(group.isEmpty()).thenReturn(true);
return new PendingAssignmentCase("Group is empty", groupId1, group, false, () -> {
verify(group, times(0)).groupId();
verify(group).isEmpty();
});
},
(GroupMetadataManagerTestContext context) -> {
ShareGroup group = mock(ShareGroup.class);
when(group.groupId()).thenReturn(groupId1);
when(group.isEmpty()).thenReturn(false);
return new PendingAssignmentCase("Group not in metadata", groupId1, group, false, () -> {
verify(group).groupId();
verify(group).isEmpty();
});
},
(GroupMetadataManagerTestContext context) -> {
ShareGroup group = mock(ShareGroup.class);
when(group.groupId()).thenReturn(groupId1);
when(group.isEmpty()).thenReturn(false);
context.groupMetadataManager.replay(
new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId1),
new ShareGroupStatePartitionMetadataValue()
);
context.commit();
return new PendingAssignmentCase("Group metadata initialized topics empty", groupId1, group, false, () -> {
verify(group).groupId();
verify(group).isEmpty();
});
},
(GroupMetadataManagerTestContext context) -> {
ShareGroup group = mock(ShareGroup.class);
when(group.groupId()).thenReturn(groupId1);
when(group.isEmpty()).thenReturn(false);
when(group.subscribedTopicNames()).thenReturn(Map.of());
context.groupMetadataManager.replay(
new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId1),
new ShareGroupStatePartitionMetadataValue()
.setInitializedTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicName(tName1)
.setTopicId(tid1)
.setPartitions(List.of(0, 1))
))
);
context.commit();
return new PendingAssignmentCase("Empty group subscription", groupId1, group, false, () -> {
verify(group).groupId();
verify(group).isEmpty();
verify(group).subscribedTopicNames();
});
},
(GroupMetadataManagerTestContext context) -> {
ShareGroup group = mock(ShareGroup.class);
when(group.groupId()).thenReturn(groupId1);
when(group.isEmpty()).thenReturn(false);
when(group.subscribedTopicNames()).thenReturn(Map.of(tName2, new SubscriptionCount(1, 1)));
when(group.targetAssignment()).thenReturn(Map.of());
context.groupMetadataManager.replay(
new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId1),
new ShareGroupStatePartitionMetadataValue()
.setInitializedTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicName(tName1)
.setTopicId(tid1)
.setPartitions(List.of(0, 1))
))
);
context.commit();
return new PendingAssignmentCase("Subscribed topics not in metadata and empty assignment.", groupId1, group, false, () -> {
verify(group).groupId();
verify(group).isEmpty();
verify(group).subscribedTopicNames();
verify(group).targetAssignment();
});
},
(GroupMetadataManagerTestContext context) -> {
ShareGroup group = mock(ShareGroup.class);
when(group.groupId()).thenReturn(groupId1);
when(group.isEmpty()).thenReturn(false);
when(group.subscribedTopicNames()).thenReturn(Map.of(tName1, new SubscriptionCount(1, 1)));
when(group.targetAssignment()).thenReturn(Map.of(tName1, new Assignment(Map.of(tid1, Set.of(0, 1)))));
context.groupMetadataManager.replay(
new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId1),
new ShareGroupStatePartitionMetadataValue()
.setInitializedTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicName(tName1)
.setTopicId(tid1)
.setPartitions(List.of(0, 1))
))
);
context.commit();
return new PendingAssignmentCase("Subscribed topics in metadata and assigned partitions match.", groupId1, group, false, () -> {
verify(group).groupId();
verify(group).isEmpty();
verify(group).subscribedTopicNames();
verify(group).targetAssignment();
});
},
(GroupMetadataManagerTestContext context) -> {
ShareGroup group = mock(ShareGroup.class);
when(group.groupId()).thenReturn(groupId1);
when(group.isEmpty()).thenReturn(false);
when(group.subscribedTopicNames()).thenReturn(Map.of(tName1, new SubscriptionCount(1, 1)));
when(group.targetAssignment()).thenReturn(Map.of(tName1, new Assignment(Map.of(tid1, Set.of(0)))));
context.groupMetadataManager.replay(
new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId1),
new ShareGroupStatePartitionMetadataValue()
.setInitializedTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicName(tName1)
.setTopicId(tid1)
.setPartitions(List.of(0, 1))
))
);
context.commit();
return new PendingAssignmentCase("Subscribed topics in metadata but assigned partitions differ.", groupId1, group, true, () -> {
verify(group).groupId();
verify(group).isEmpty();
verify(group).subscribedTopicNames();
verify(group).targetAssignment();
});
},
(GroupMetadataManagerTestContext context) -> {
ShareGroup group = mock(ShareGroup.class);
when(group.groupId()).thenReturn(groupId1);
when(group.isEmpty()).thenReturn(false);
when(group.subscribedTopicNames()).thenReturn(Map.of(tName1, new SubscriptionCount(1, 1)));
when(group.targetAssignment()).thenReturn(Map.of(
tName1, new Assignment(Map.of(tid1, Set.of(0, 1))),
tName2, new Assignment(Map.of(tid2, Set.of(0)))
));
context.groupMetadataManager.replay(
new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId1),
new ShareGroupStatePartitionMetadataValue()
.setInitializedTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicName(tName1)
.setTopicId(tid1)
.setPartitions(List.of(0, 1)),
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicName(tName2)
.setTopicId(tid2)
.setPartitions(List.of(0))
))
);
context.commit();
return new PendingAssignmentCase("Subscribed topics in metadata but assigned has other topics too.", groupId1, group, false, () -> {
verify(group).groupId();
verify(group).isEmpty();
verify(group).subscribedTopicNames();
verify(group).targetAssignment();
});
}
);
}
@SuppressWarnings("ClassEscapesDefinedScope")
@ParameterizedTest
@MethodSource("generatePendingAssignmentCases")
public void testShareGroupPendingAssignments(Function<GroupMetadataManagerTestContext, PendingAssignmentCase> testCase) {
MockPartitionAssignor assignor = new MockPartitionAssignor("simple");
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.build();
PendingAssignmentCase test = testCase.apply(context);
assertEquals(test.expectedValue, context.groupMetadataManager.initializedAssignmentPending(test.group), test.description);
test.assertions.run();
}
private static void checkJoinGroupResponse(
JoinGroupResponseData expectedResponse,
JoinGroupResponseData actualResponse,