KAFKA-19195: Only send the right group ID subset to each GC shard (#19555)
CI / build (push) Has been cancelled Details

Cherry-picked from
[e79f5f0](e79f5f0f65)

If a share or consumer group is described, all group IDs sent to all
shards of the group coordinator. This change fixes it. It tested in the
unit tests, since it's somewhat inconvenient to test the passed read
operation lambda.
This commit is contained in:
Lucas Brutschy 2025-04-28 15:33:20 +02:00 committed by GitHub
parent 0297ba2c67
commit 0832c2ceb1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 29 additions and 8 deletions

View File

@ -647,7 +647,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
runtime.scheduleReadOperation( runtime.scheduleReadOperation(
"consumer-group-describe", "consumer-group-describe",
topicPartition, topicPartition,
(coordinator, lastCommittedOffset) -> coordinator.consumerGroupDescribe(groupIds, lastCommittedOffset) (coordinator, lastCommittedOffset) -> coordinator.consumerGroupDescribe(groupList, lastCommittedOffset)
).exceptionally(exception -> handleOperationException( ).exceptionally(exception -> handleOperationException(
"consumer-group-describe", "consumer-group-describe",
groupList, groupList,
@ -698,7 +698,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
runtime.scheduleReadOperation( runtime.scheduleReadOperation(
"share-group-describe", "share-group-describe",
topicPartition, topicPartition,
(coordinator, lastCommittedOffset) -> coordinator.shareGroupDescribe(groupIds, lastCommittedOffset) (coordinator, lastCommittedOffset) -> coordinator.shareGroupDescribe(groupList, lastCommittedOffset)
).exceptionally(exception -> handleOperationException( ).exceptionally(exception -> handleOperationException(
"share-group-describe", "share-group-describe",
groupList, groupList,

View File

@ -82,6 +82,7 @@ import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.NullSource; import org.junit.jupiter.params.provider.NullSource;
import org.junit.jupiter.params.provider.ValueSource; import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers; import org.mockito.ArgumentMatchers;
import java.net.InetAddress; import java.net.InetAddress;
@ -1415,6 +1416,10 @@ public class GroupCoordinatorServiceTest {
int partitionCount = 2; int partitionCount = 2;
service.startup(() -> partitionCount); service.startup(() -> partitionCount);
@SuppressWarnings("unchecked")
ArgumentCaptor<CoordinatorRuntime.CoordinatorReadOperation<GroupCoordinatorShard, List<ConsumerGroupDescribeResponseData.DescribedGroup>>> readOperationCaptor =
ArgumentCaptor.forClass(CoordinatorRuntime.CoordinatorReadOperation.class);
ConsumerGroupDescribeResponseData.DescribedGroup describedGroup1 = new ConsumerGroupDescribeResponseData.DescribedGroup() ConsumerGroupDescribeResponseData.DescribedGroup describedGroup1 = new ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupId("group-id-1"); .setGroupId("group-id-1");
ConsumerGroupDescribeResponseData.DescribedGroup describedGroup2 = new ConsumerGroupDescribeResponseData.DescribedGroup() ConsumerGroupDescribeResponseData.DescribedGroup describedGroup2 = new ConsumerGroupDescribeResponseData.DescribedGroup()
@ -1427,14 +1432,14 @@ public class GroupCoordinatorServiceTest {
when(runtime.scheduleReadOperation( when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("consumer-group-describe"), ArgumentMatchers.eq("consumer-group-describe"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any() readOperationCaptor.capture()
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1))); )).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1)));
CompletableFuture<Object> describedGroupFuture = new CompletableFuture<>(); CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> describedGroupFuture = new CompletableFuture<>();
when(runtime.scheduleReadOperation( when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("consumer-group-describe"), ArgumentMatchers.eq("consumer-group-describe"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)), ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
ArgumentMatchers.any() readOperationCaptor.capture()
)).thenReturn(describedGroupFuture); )).thenReturn(describedGroupFuture);
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> future = CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> future =
@ -1443,6 +1448,12 @@ public class GroupCoordinatorServiceTest {
assertFalse(future.isDone()); assertFalse(future.isDone());
describedGroupFuture.complete(Collections.singletonList(describedGroup2)); describedGroupFuture.complete(Collections.singletonList(describedGroup2));
assertEquals(expectedDescribedGroups, future.get()); assertEquals(expectedDescribedGroups, future.get());
// Validate that the captured read operations, on the first and the second partition
GroupCoordinatorShard shard = mock(GroupCoordinatorShard.class);
readOperationCaptor.getAllValues().forEach(x -> x.generateResponse(shard, 100));
verify(shard).consumerGroupDescribe(List.of("group-id-2"), 100);
verify(shard).consumerGroupDescribe(List.of("group-id-1"), 100);
} }
@Test @Test
@ -2282,6 +2293,10 @@ public class GroupCoordinatorServiceTest {
int partitionCount = 2; int partitionCount = 2;
service.startup(() -> partitionCount); service.startup(() -> partitionCount);
@SuppressWarnings("unchecked")
ArgumentCaptor<CoordinatorRuntime.CoordinatorReadOperation<GroupCoordinatorShard, List<ShareGroupDescribeResponseData.DescribedGroup>>> readOperationCaptor =
ArgumentCaptor.forClass(CoordinatorRuntime.CoordinatorReadOperation.class);
ShareGroupDescribeResponseData.DescribedGroup describedGroup1 = new ShareGroupDescribeResponseData.DescribedGroup() ShareGroupDescribeResponseData.DescribedGroup describedGroup1 = new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId("share-group-id-1"); .setGroupId("share-group-id-1");
ShareGroupDescribeResponseData.DescribedGroup describedGroup2 = new ShareGroupDescribeResponseData.DescribedGroup() ShareGroupDescribeResponseData.DescribedGroup describedGroup2 = new ShareGroupDescribeResponseData.DescribedGroup()
@ -2294,14 +2309,14 @@ public class GroupCoordinatorServiceTest {
when(runtime.scheduleReadOperation( when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-describe"), ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)), ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any() readOperationCaptor.capture()
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1))); )).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1)));
CompletableFuture<Object> describedGroupFuture = new CompletableFuture<>(); CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>> describedGroupFuture = new CompletableFuture<>();
when(runtime.scheduleReadOperation( when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("share-group-describe"), ArgumentMatchers.eq("share-group-describe"),
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)), ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
ArgumentMatchers.any() readOperationCaptor.capture()
)).thenReturn(describedGroupFuture); )).thenReturn(describedGroupFuture);
CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>> future = CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>> future =
@ -2310,6 +2325,12 @@ public class GroupCoordinatorServiceTest {
assertFalse(future.isDone()); assertFalse(future.isDone());
describedGroupFuture.complete(Collections.singletonList(describedGroup2)); describedGroupFuture.complete(Collections.singletonList(describedGroup2));
assertEquals(expectedDescribedGroups, future.get()); assertEquals(expectedDescribedGroups, future.get());
// Validate that the captured read operations, on the first and the second partition
GroupCoordinatorShard shard = mock(GroupCoordinatorShard.class);
readOperationCaptor.getAllValues().forEach(x -> x.generateResponse(shard, 100));
verify(shard).shareGroupDescribe(List.of("share-group-id-2"), 100);
verify(shard).shareGroupDescribe(List.of("share-group-id-1"), 100);
} }
@Test @Test