mirror of https://github.com/apache/kafka.git
KAFKA-15849: Fix ListGroups API when runtime partition size is zero (#14785)
When the group coordinator does not host any __consumer_offsets partitions, the existing ListGroup implementation won't schedule any operation, thus a `new CompletableFuture<>()` is returned directly and never gets completed. This patch fixes the issue. Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
parent
fe7a373baa
commit
b1796ce6d2
|
@ -501,6 +501,10 @@ public class GroupCoordinatorService implements GroupCoordinator {
|
||||||
final Set<TopicPartition> existingPartitionSet = runtime.partitions();
|
final Set<TopicPartition> existingPartitionSet = runtime.partitions();
|
||||||
final AtomicInteger cnt = new AtomicInteger(existingPartitionSet.size());
|
final AtomicInteger cnt = new AtomicInteger(existingPartitionSet.size());
|
||||||
|
|
||||||
|
if (existingPartitionSet.isEmpty()) {
|
||||||
|
return CompletableFuture.completedFuture(new ListGroupsResponseData());
|
||||||
|
}
|
||||||
|
|
||||||
for (TopicPartition tp : existingPartitionSet) {
|
for (TopicPartition tp : existingPartitionSet) {
|
||||||
runtime.scheduleReadOperation(
|
runtime.scheduleReadOperation(
|
||||||
"list-groups",
|
"list-groups",
|
||||||
|
|
|
@ -832,6 +832,30 @@ public class GroupCoordinatorServiceTest {
|
||||||
assertEquals(Collections.emptyList(), listGroupsResponseData.groups());
|
assertEquals(Collections.emptyList(), listGroupsResponseData.groups());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testListGroupsWithEmptyTopicPartitions() throws ExecutionException, InterruptedException {
|
||||||
|
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
|
||||||
|
GroupCoordinatorService service = new GroupCoordinatorService(
|
||||||
|
new LogContext(),
|
||||||
|
createConfig(),
|
||||||
|
runtime
|
||||||
|
);
|
||||||
|
int partitionCount = 0;
|
||||||
|
service.startup(() -> partitionCount);
|
||||||
|
|
||||||
|
ListGroupsRequestData request = new ListGroupsRequestData();
|
||||||
|
|
||||||
|
CompletableFuture<ListGroupsResponseData> future = service.listGroups(
|
||||||
|
requestContext(ApiKeys.LIST_GROUPS),
|
||||||
|
request
|
||||||
|
);
|
||||||
|
|
||||||
|
assertEquals(
|
||||||
|
new ListGroupsResponseData(),
|
||||||
|
future.get()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testListGroupsWhenNotStarted() throws ExecutionException, InterruptedException {
|
public void testListGroupsWhenNotStarted() throws ExecutionException, InterruptedException {
|
||||||
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
|
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
|
||||||
|
|
Loading…
Reference in New Issue