mirror of https://github.com/apache/kafka.git
KAFKA-19195: Only send the right group ID subset to each GC shard (#19548)
If a streams, share or consumer group is described, all group IDs sent to all shards of the group coordinator. This change fixes it. It tested in the unit tests, since it's somewhat inconvenient to test the passed read operation lambda. Reviewers: David Jacot <djacot@confluent.io>, Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
parent
ee4debb9f0
commit
e79f5f0f65
|
@ -867,7 +867,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
|
|||
runtime.scheduleReadOperation(
|
||||
"consumer-group-describe",
|
||||
topicPartition,
|
||||
(coordinator, lastCommittedOffset) -> coordinator.consumerGroupDescribe(groupIds, lastCommittedOffset)
|
||||
(coordinator, lastCommittedOffset) -> coordinator.consumerGroupDescribe(groupList, lastCommittedOffset)
|
||||
).exceptionally(exception -> handleOperationException(
|
||||
"consumer-group-describe",
|
||||
groupList,
|
||||
|
@ -919,7 +919,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
|
|||
runtime.scheduleReadOperation(
|
||||
"streams-group-describe",
|
||||
topicPartition,
|
||||
(coordinator, lastCommittedOffset) -> coordinator.streamsGroupDescribe(groupIds, lastCommittedOffset)
|
||||
(coordinator, lastCommittedOffset) -> coordinator.streamsGroupDescribe(groupList, lastCommittedOffset)
|
||||
).exceptionally(exception -> handleOperationException(
|
||||
"streams-group-describe",
|
||||
groupList,
|
||||
|
@ -971,7 +971,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
|
|||
runtime.scheduleReadOperation(
|
||||
"share-group-describe",
|
||||
topicPartition,
|
||||
(coordinator, lastCommittedOffset) -> coordinator.shareGroupDescribe(groupIds, lastCommittedOffset)
|
||||
(coordinator, lastCommittedOffset) -> coordinator.shareGroupDescribe(groupList, lastCommittedOffset)
|
||||
).exceptionally(exception -> handleOperationException(
|
||||
"share-group-describe",
|
||||
groupList,
|
||||
|
|
|
@ -116,6 +116,7 @@ import org.junit.jupiter.params.provider.CsvSource;
|
|||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
import org.junit.jupiter.params.provider.NullSource;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.ArgumentMatchers;
|
||||
|
||||
import java.net.InetAddress;
|
||||
|
@ -1417,6 +1418,10 @@ public class GroupCoordinatorServiceTest {
|
|||
int partitionCount = 2;
|
||||
service.startup(() -> partitionCount);
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
ArgumentCaptor<CoordinatorRuntime.CoordinatorReadOperation<GroupCoordinatorShard, List<ConsumerGroupDescribeResponseData.DescribedGroup>>> readOperationCaptor =
|
||||
ArgumentCaptor.forClass(CoordinatorRuntime.CoordinatorReadOperation.class);
|
||||
|
||||
ConsumerGroupDescribeResponseData.DescribedGroup describedGroup1 = new ConsumerGroupDescribeResponseData.DescribedGroup()
|
||||
.setGroupId("group-id-1");
|
||||
ConsumerGroupDescribeResponseData.DescribedGroup describedGroup2 = new ConsumerGroupDescribeResponseData.DescribedGroup()
|
||||
|
@ -1429,14 +1434,14 @@ public class GroupCoordinatorServiceTest {
|
|||
when(runtime.scheduleReadOperation(
|
||||
ArgumentMatchers.eq("consumer-group-describe"),
|
||||
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
|
||||
ArgumentMatchers.any()
|
||||
readOperationCaptor.capture()
|
||||
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup1)));
|
||||
|
||||
CompletableFuture<Object> describedGroupFuture = new CompletableFuture<>();
|
||||
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> describedGroupFuture = new CompletableFuture<>();
|
||||
when(runtime.scheduleReadOperation(
|
||||
ArgumentMatchers.eq("consumer-group-describe"),
|
||||
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
|
||||
ArgumentMatchers.any()
|
||||
readOperationCaptor.capture()
|
||||
)).thenReturn(describedGroupFuture);
|
||||
|
||||
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> future =
|
||||
|
@ -1445,6 +1450,12 @@ public class GroupCoordinatorServiceTest {
|
|||
assertFalse(future.isDone());
|
||||
describedGroupFuture.complete(List.of(describedGroup2));
|
||||
assertEquals(expectedDescribedGroups, future.get());
|
||||
|
||||
// Validate that the captured read operations, on the first and the second partition
|
||||
GroupCoordinatorShard shard = mock(GroupCoordinatorShard.class);
|
||||
readOperationCaptor.getAllValues().forEach(x -> x.generateResponse(shard, 100));
|
||||
verify(shard).consumerGroupDescribe(List.of("group-id-2"), 100);
|
||||
verify(shard).consumerGroupDescribe(List.of("group-id-1"), 100);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1545,6 +1556,9 @@ public class GroupCoordinatorServiceTest {
|
|||
.build();
|
||||
int partitionCount = 2;
|
||||
service.startup(() -> partitionCount);
|
||||
@SuppressWarnings("unchecked")
|
||||
ArgumentCaptor<CoordinatorRuntime.CoordinatorReadOperation<GroupCoordinatorShard, List<StreamsGroupDescribeResponseData.DescribedGroup>>> readOperationCaptor =
|
||||
ArgumentCaptor.forClass(CoordinatorRuntime.CoordinatorReadOperation.class);
|
||||
|
||||
StreamsGroupDescribeResponseData.DescribedGroup describedGroup1 = new StreamsGroupDescribeResponseData.DescribedGroup()
|
||||
.setGroupId("group-id-1");
|
||||
|
@ -1558,14 +1572,14 @@ public class GroupCoordinatorServiceTest {
|
|||
when(runtime.scheduleReadOperation(
|
||||
ArgumentMatchers.eq("streams-group-describe"),
|
||||
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
|
||||
ArgumentMatchers.any()
|
||||
readOperationCaptor.capture()
|
||||
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup1)));
|
||||
|
||||
CompletableFuture<Object> describedGroupFuture = new CompletableFuture<>();
|
||||
CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> describedGroupFuture = new CompletableFuture<>();
|
||||
when(runtime.scheduleReadOperation(
|
||||
ArgumentMatchers.eq("streams-group-describe"),
|
||||
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)),
|
||||
ArgumentMatchers.any()
|
||||
readOperationCaptor.capture()
|
||||
)).thenReturn(describedGroupFuture);
|
||||
|
||||
CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> future =
|
||||
|
@ -1574,6 +1588,12 @@ public class GroupCoordinatorServiceTest {
|
|||
assertFalse(future.isDone());
|
||||
describedGroupFuture.complete(List.of(describedGroup2));
|
||||
assertEquals(expectedDescribedGroups, future.get());
|
||||
|
||||
// Validate that the captured read operations, on the first and the second partition
|
||||
GroupCoordinatorShard shard = mock(GroupCoordinatorShard.class);
|
||||
readOperationCaptor.getAllValues().forEach(x -> x.generateResponse(shard, 100));
|
||||
verify(shard).streamsGroupDescribe(List.of("group-id-2"), 100);
|
||||
verify(shard).streamsGroupDescribe(List.of("group-id-1"), 100);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -2706,6 +2726,10 @@ public class GroupCoordinatorServiceTest {
|
|||
int partitionCount = 2;
|
||||
service.startup(() -> partitionCount);
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
ArgumentCaptor<CoordinatorRuntime.CoordinatorReadOperation<GroupCoordinatorShard, List<ShareGroupDescribeResponseData.DescribedGroup>>> readOperationCaptor =
|
||||
ArgumentCaptor.forClass(CoordinatorRuntime.CoordinatorReadOperation.class);
|
||||
|
||||
ShareGroupDescribeResponseData.DescribedGroup describedGroup1 = new ShareGroupDescribeResponseData.DescribedGroup()
|
||||
.setGroupId("share-group-id-1");
|
||||
ShareGroupDescribeResponseData.DescribedGroup describedGroup2 = new ShareGroupDescribeResponseData.DescribedGroup()
|
||||
|
@ -2718,14 +2742,14 @@ public class GroupCoordinatorServiceTest {
|
|||
when(runtime.scheduleReadOperation(
|
||||
ArgumentMatchers.eq("share-group-describe"),
|
||||
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
|
||||
ArgumentMatchers.any()
|
||||
readOperationCaptor.capture()
|
||||
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup1)));
|
||||
|
||||
CompletableFuture<Object> describedGroupFuture = new CompletableFuture<>();
|
||||
CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>> describedGroupFuture = new CompletableFuture<>();
|
||||
when(runtime.scheduleReadOperation(
|
||||
ArgumentMatchers.eq("share-group-describe"),
|
||||
ArgumentMatchers.eq(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
|
||||
ArgumentMatchers.any()
|
||||
readOperationCaptor.capture()
|
||||
)).thenReturn(describedGroupFuture);
|
||||
|
||||
CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>> future =
|
||||
|
@ -2734,6 +2758,12 @@ public class GroupCoordinatorServiceTest {
|
|||
assertFalse(future.isDone());
|
||||
describedGroupFuture.complete(List.of(describedGroup2));
|
||||
assertEquals(expectedDescribedGroups, future.get());
|
||||
|
||||
// Validate that the captured read operations, on the first and the second partition
|
||||
GroupCoordinatorShard shard = mock(GroupCoordinatorShard.class);
|
||||
readOperationCaptor.getAllValues().forEach(x -> x.generateResponse(shard, 100));
|
||||
verify(shard).shareGroupDescribe(List.of("share-group-id-2"), 100);
|
||||
verify(shard).shareGroupDescribe(List.of("share-group-id-1"), 100);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue