KAFKA-19195: Only send the right group ID subset to each GC shard (#19548)

If a streams, share or consumer group is described, all group IDs sent
to all shards of the group coordinator. This change fixes it. It tested
in the unit tests, since it's somewhat inconvenient to test the passed
read operation lambda.

Reviewers: David Jacot <djacot@confluent.io>, Andrew Schofield
<aschofield@confluent.io>
This commit is contained in:
Lucas Brutschy 2025-04-25 09:14:24 +02:00 committed by GitHub
parent ee4debb9f0
commit e79f5f0f65
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 42 additions and 12 deletions

View File

@ -867,7 +867,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
runtime.scheduleReadOperation(
"consumer-group-describe",
topicPartition,
(coordinator, lastCommittedOffset) -> coordinator.consumerGroupDescribe(groupIds, lastCommittedOffset)
(coordinator, lastCommittedOffset) -> coordinator.consumerGroupDescribe(groupList, lastCommittedOffset)
).exceptionally(exception -> handleOperationException(
"consumer-group-describe",
groupList,
@ -919,7 +919,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
runtime.scheduleReadOperation(
"streams-group-describe",
topicPartition,
(coordinator, lastCommittedOffset) -> coordinator.streamsGroupDescribe(groupIds, lastCommittedOffset)
(coordinator, lastCommittedOffset) -> coordinator.streamsGroupDescribe(groupList, lastCommittedOffset)
).exceptionally(exception -> handleOperationException(
"streams-group-describe",
groupList,
@ -971,7 +971,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
runtime.scheduleReadOperation(
"share-group-describe",
topicPartition,
(coordinator, lastCommittedOffset) -> coordinator.shareGroupDescribe(groupIds, lastCommittedOffset)
(coordinator, lastCommittedOffset) -> coordinator.shareGroupDescribe(groupList, lastCommittedOffset)
).exceptionally(exception -> handleOperationException(
"share-group-describe",
groupList,

View File

@ -116,6 +116,7 @@ import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.NullSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import java.net.InetAddress;
@ -1417,6 +1418,10 @@ public class GroupCoordinatorServiceTest {
int partitionCount = 2;
service.startup(() -> partitionCount);
@SuppressWarnings("unchecked")
ArgumentCaptor<CoordinatorRuntime.CoordinatorReadOperation<GroupCoordinatorShard, List<ConsumerGroupDescribeResponseData.DescribedGroup>>> readOperationCaptor =
ArgumentCaptor.forClass(CoordinatorRuntime.CoordinatorReadOperation.class);
ConsumerGroupDescribeResponseData.DescribedGroup describedGroup1 = new ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupId("group-id-1");
ConsumerGroupDescribeResponseData.DescribedGroup describedGroup2 = new ConsumerGroupDescribeResponseData.DescribedGroup()
@ -1429,14 +1434,14 @@ public class GroupCoordinatorServiceTest {
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("consumer-group-describe"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
readOperationCaptor.capture()
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup1)));
CompletableFuture<Object> describedGroupFuture = new CompletableFuture<>();
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> describedGroupFuture = new CompletableFuture<>();
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("consumer-group-describe"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
ArgumentMatchers.any()
readOperationCaptor.capture()
)).thenReturn(describedGroupFuture);
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> future =
@ -1445,6 +1450,12 @@ public class GroupCoordinatorServiceTest {
assertFalse(future.isDone());
describedGroupFuture.complete(List.of(describedGroup2));
assertEquals(expectedDescribedGroups, future.get());
// Validate that the captured read operations, on the first and the second partition
GroupCoordinatorShard shard = mock(GroupCoordinatorShard.class);
readOperationCaptor.getAllValues().forEach(x -> x.generateResponse(shard, 100));
verify(shard).consumerGroupDescribe(List.of("group-id-2"), 100);
verify(shard).consumerGroupDescribe(List.of("group-id-1"), 100);
}
@Test
@ -1545,6 +1556,9 @@ public class GroupCoordinatorServiceTest {
.build();
int partitionCount = 2;
service.startup(() -> partitionCount);
@SuppressWarnings("unchecked")
ArgumentCaptor<CoordinatorRuntime.CoordinatorReadOperation<GroupCoordinatorShard, List<StreamsGroupDescribeResponseData.DescribedGroup>>> readOperationCaptor =
ArgumentCaptor.forClass(CoordinatorRuntime.CoordinatorReadOperation.class);
StreamsGroupDescribeResponseData.DescribedGroup describedGroup1 = new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId("group-id-1");
@ -1558,14 +1572,14 @@ public class GroupCoordinatorServiceTest {
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("streams-group-describe"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
readOperationCaptor.capture()
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup1)));
CompletableFuture<Object> describedGroupFuture = new CompletableFuture<>();
CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> describedGroupFuture = new CompletableFuture<>();
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("streams-group-describe"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)),
ArgumentMatchers.any()
readOperationCaptor.capture()
)).thenReturn(describedGroupFuture);
CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> future =
@ -1574,6 +1588,12 @@ public class GroupCoordinatorServiceTest {
assertFalse(future.isDone());
describedGroupFuture.complete(List.of(describedGroup2));
assertEquals(expectedDescribedGroups, future.get());
// Validate that the captured read operations, on the first and the second partition
GroupCoordinatorShard shard = mock(GroupCoordinatorShard.class);
readOperationCaptor.getAllValues().forEach(x -> x.generateResponse(shard, 100));
verify(shard).streamsGroupDescribe(List.of("group-id-2"), 100);
verify(shard).streamsGroupDescribe(List.of("group-id-1"), 100);
}
@Test
@ -2706,6 +2726,10 @@ public class GroupCoordinatorServiceTest {
int partitionCount = 2;
service.startup(() -> partitionCount);
@SuppressWarnings("unchecked")
ArgumentCaptor<CoordinatorRuntime.CoordinatorReadOperation<GroupCoordinatorShard, List<ShareGroupDescribeResponseData.DescribedGroup>>> readOperationCaptor =
ArgumentCaptor.forClass(CoordinatorRuntime.CoordinatorReadOperation.class);
ShareGroupDescribeResponseData.DescribedGroup describedGroup1 = new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId("share-group-id-1");
ShareGroupDescribeResponseData.DescribedGroup describedGroup2 = new ShareGroupDescribeResponseData.DescribedGroup()
@ -2718,14 +2742,14 @@ public class GroupCoordinatorServiceTest {
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
readOperationCaptor.capture()
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup1)));
CompletableFuture<Object> describedGroupFuture = new CompletableFuture<>();
CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>> describedGroupFuture = new CompletableFuture<>();
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
ArgumentMatchers.any()
readOperationCaptor.capture()
)).thenReturn(describedGroupFuture);
CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>> future =
@ -2734,6 +2758,12 @@ public class GroupCoordinatorServiceTest {
assertFalse(future.isDone());
describedGroupFuture.complete(List.of(describedGroup2));
assertEquals(expectedDescribedGroups, future.get());
// Validate that the captured read operations, on the first and the second partition
GroupCoordinatorShard shard = mock(GroupCoordinatorShard.class);
readOperationCaptor.getAllValues().forEach(x -> x.generateResponse(shard, 100));
verify(shard).shareGroupDescribe(List.of("share-group-id-2"), 100);
verify(shard).shareGroupDescribe(List.of("share-group-id-1"), 100);
}
@Test