KAFKA-14503: Implement ListGroups (#14271)

This patch implements the ListGroups API in the new group coordinator.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
zhaohaidao 2023-09-15 14:45:03 +08:00 committed by GitHub
parent ac39342d47
commit f309299f3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 365 additions and 8 deletions

View File

@ -326,7 +326,7 @@
<suppress checks="(NPathComplexity|MethodLength)"
files="(GroupMetadataManager|ConsumerGroupTest|GroupMetadataManagerTest).java"/>
<suppress checks="ClassFanOutComplexity"
files="(GroupMetadataManager|GroupMetadataManagerTest).java"/>
files="(GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorServiceTest).java"/>
<suppress checks="ParameterNumber"
files="(ConsumerGroupMember|GroupMetadataManager).java"/>
<suppress checks="ClassDataAbstractionCouplingCheck"

View File

@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.message.ListGroupsResponseData;
/**
* Interface common for all groups.
@ -48,6 +49,16 @@ public interface Group {
*/
String stateAsString();
/**
* @return The {{@link GroupType}}'s String representation based on the committed offset.
*/
String stateAsString(long committedOffset);
/**
* @return the group formatted as a list group response based on the committed offset.
*/
public ListGroupsResponseData.ListedGroup asListedGroup(long committedOffset);
/**
* @return The group id.
*/

View File

@ -22,6 +22,7 @@ import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.errors.InvalidFetchSizeException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
@ -73,12 +74,15 @@ import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.Timer;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntSupplier;
import java.util.stream.Collectors;
@ -460,9 +464,33 @@ public class GroupCoordinatorService implements GroupCoordinator {
return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
"This API is not implemented yet."
));
final CompletableFuture<ListGroupsResponseData> future = new CompletableFuture<>();
final List<ListGroupsResponseData.ListedGroup> results = new ArrayList<>();
final Set<TopicPartition> existingPartitionSet = runtime.partitions();
final AtomicInteger cnt = new AtomicInteger(existingPartitionSet.size());
for (TopicPartition tp : existingPartitionSet) {
runtime.scheduleReadOperation(
"list-groups",
tp,
(coordinator, lastCommittedOffset) -> coordinator.listGroups(request.statesFilter(), lastCommittedOffset)
).handle((groups, exception) -> {
if (exception == null) {
synchronized (results) {
results.addAll(groups);
}
} else {
if (!(exception instanceof NotCoordinatorException)) {
future.complete(new ListGroupsResponseData().setErrorCode(Errors.forException(exception).code()));
}
}
if (cnt.decrementAndGet() == 0) {
future.complete(new ListGroupsResponseData().setGroups(results));
}
return null;
});
}
return future;
}
/**

View File

@ -24,6 +24,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
@ -60,6 +61,7 @@ import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
@ -308,6 +310,21 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> {
return offsetMetadataManager.commitOffset(context, request);
}
/**
* Handles a ListGroups request.
*
* @param statesFilter The states of the groups we want to list.
* If empty all groups are returned with their state.
* @param committedOffset A specified committed offset corresponding to this shard
* @return A list containing the ListGroupsResponseData.ListedGroup
*/
public List<ListGroupsResponseData.ListedGroup> listGroups(
List<String> statesFilter,
long committedOffset
) throws ApiException {
return groupMetadataManager.listGroups(statesFilter, committedOffset);
}
/**
* Handles a LeaveGroup request.
*

View File

@ -41,6 +41,7 @@ import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.protocol.Errors;
@ -95,6 +96,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.kafka.common.protocol.Errors.COORDINATOR_NOT_AVAILABLE;
import static org.apache.kafka.common.protocol.Errors.ILLEGAL_GENERATION;
@ -424,6 +426,24 @@ public class GroupMetadataManager {
return group;
}
/**
* Get the Group List.
*
* @param statesFilter The states of the groups we want to list.
* If empty all groups are returned with their state.
* @param committedOffset A specified committed offset corresponding to this shard
*
* @return A list containing the ListGroupsResponseData.ListedGroup
*/
public List<ListGroupsResponseData.ListedGroup> listGroups(List<String> statesFilter, long committedOffset) {
Stream<Group> groupStream = groups.values(committedOffset).stream();
if (!statesFilter.isEmpty()) {
groupStream = groupStream.filter(group -> statesFilter.contains(group.stateAsString(committedOffset)));
}
return groupStream.map(group -> group.asListedGroup(committedOffset)).collect(Collectors.toList());
}
/**
* Gets or maybe creates a consumer group.
*

View File

@ -16,9 +16,11 @@
*/
package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.TopicImage;
@ -179,6 +181,23 @@ public class ConsumerGroup implements Group {
return state.get().toString();
}
/**
* @return The current state as a String with given committedOffset.
*/
public String stateAsString(long committedOffset) {
return state.get(committedOffset).toString();
}
/**
* @return the group formatted as a list group response based on the committed offset.
*/
public ListGroupsResponseData.ListedGroup asListedGroup(long committedOffset) {
return new ListGroupsResponseData.ListedGroup()
.setGroupId(groupId)
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState(state.get(committedOffset).toString());
}
/**
* @return The group id.
*/
@ -194,6 +213,13 @@ public class ConsumerGroup implements Group {
return state.get();
}
/**
* @return The current state based on committed offset.
*/
public ConsumerGroupState state(long committedOffset) {
return state.get(committedOffset);
}
/**
* @return The group epoch.
*/

View File

@ -242,6 +242,16 @@ public class GenericGroup implements Group {
return this.state.toString();
}
/**
* The state of this group based on the committed offset.
*
* @return The current state as a String.
*/
@Override
public String stateAsString(long committedOffset) {
return this.state.toString();
}
/**
* @return the group id.
*/
@ -1167,9 +1177,9 @@ public class GenericGroup implements Group {
}
/**
* @return the group formatted as a list group response.
* @return the group formatted as a list group response based on the committed offset.
*/
public ListGroupsResponseData.ListedGroup asListedGroup() {
public ListGroupsResponseData.ListedGroup asListedGroup(long committedOffset) {
return new ListGroupsResponseData.ListedGroup()
.setGroupId(groupId)
.setProtocolType(protocolType.orElse(""))

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.coordinator.group;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
@ -23,6 +24,7 @@ import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.InvalidFetchSizeException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
@ -36,6 +38,8 @@ import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
@ -63,10 +67,12 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import org.mockito.internal.util.collections.Sets;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
@ -603,6 +609,148 @@ public class GroupCoordinatorServiceTest {
);
}
@Test
public void testListGroups() throws ExecutionException, InterruptedException, TimeoutException {
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
);
int partitionCount = 3;
service.startup(() -> partitionCount);
ListGroupsRequestData request = new ListGroupsRequestData();
List<ListGroupsResponseData.ListedGroup> expectedResults = Arrays.asList(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group0")
.setGroupState("Stable")
.setProtocolType("protocol1"),
new ListGroupsResponseData.ListedGroup()
.setGroupId("group1")
.setGroupState("Empty")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE),
new ListGroupsResponseData.ListedGroup()
.setGroupId("group2")
.setGroupState("Dead")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
);
when(runtime.partitions()).thenReturn(Sets.newSet(
new TopicPartition("__consumer_offsets", 0),
new TopicPartition("__consumer_offsets", 1),
new TopicPartition("__consumer_offsets", 2)
));
for (int i = 0; i < partitionCount; i++) {
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("list-groups"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", i)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i))));
}
CompletableFuture<ListGroupsResponseData> responseFuture = service.listGroups(
requestContext(ApiKeys.LIST_GROUPS),
request
);
List<ListGroupsResponseData.ListedGroup> actualResults = responseFuture.get(5, TimeUnit.SECONDS).groups();
assertEquals(expectedResults, actualResults);
}
@Test
public void testListGroupsFailedWithNotCoordinatorException()
throws InterruptedException, ExecutionException, TimeoutException {
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
);
int partitionCount = 3;
service.startup(() -> partitionCount);
List<ListGroupsResponseData.ListedGroup> expectedResults = Arrays.asList(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group0")
.setGroupState("Stable")
.setProtocolType("protocol1"),
new ListGroupsResponseData.ListedGroup()
.setGroupId("group1")
.setGroupState("Empty")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
);
ListGroupsRequestData request = new ListGroupsRequestData();
when(runtime.partitions()).thenReturn(Sets.newSet(
new TopicPartition("__consumer_offsets", 0),
new TopicPartition("__consumer_offsets", 1),
new TopicPartition("__consumer_offsets", 2)
));
for (int i = 0; i < 2; i++) {
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("list-groups"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", i)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i))));
}
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("list-groups"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)),
ArgumentMatchers.any()
)).thenReturn(FutureUtils.failedFuture(new NotCoordinatorException("")));
CompletableFuture<ListGroupsResponseData> responseFuture = service.listGroups(
requestContext(ApiKeys.LIST_GROUPS),
request
);
List<ListGroupsResponseData.ListedGroup> actualResults = responseFuture.get(5, TimeUnit.SECONDS).groups();
assertEquals(expectedResults, actualResults);
}
@Test
public void testListGroupsFailedImmediately()
throws InterruptedException, ExecutionException, TimeoutException {
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
);
int partitionCount = 3;
service.startup(() -> partitionCount);
ListGroupsRequestData request = new ListGroupsRequestData();
when(runtime.partitions()).thenReturn(Sets.newSet(
new TopicPartition("__consumer_offsets", 0),
new TopicPartition("__consumer_offsets", 1),
new TopicPartition("__consumer_offsets", 2)
));
for (int i = 0; i < 2; i++) {
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("list-groups"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", i)),
ArgumentMatchers.any()
)).thenReturn(new CompletableFuture<>());
}
when(runtime.scheduleReadOperation(
ArgumentMatchers.eq("list-groups"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)),
ArgumentMatchers.any()
)).thenReturn(FutureUtils.failedFuture(new CoordinatorLoadInProgressException("")));
CompletableFuture<ListGroupsResponseData> responseFuture = service.listGroups(
requestContext(ApiKeys.LIST_GROUPS),
request
);
ListGroupsResponseData listGroupsResponseData = responseFuture.get(5, TimeUnit.SECONDS);
assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS.code(), listGroupsResponseData.errorCode());
assertEquals(Collections.emptyList(), listGroupsResponseData.groups());
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testFetchOffsets(

View File

@ -41,6 +41,7 @@ import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMe
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment;
import org.apache.kafka.common.message.SyncGroupResponseData;
@ -115,8 +116,10 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
@ -455,8 +458,6 @@ public class GroupMetadataManagerTest {
public CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGroupHeartbeat(
ConsumerGroupHeartbeatRequestData request
) {
snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
RequestContext context = new RequestContext(
new RequestHeader(
ApiKeys.CONSUMER_GROUP_HEARTBEAT,
@ -1023,6 +1024,10 @@ public class GroupMetadataManagerTest {
);
}
public List<ListGroupsResponseData.ListedGroup> sendListGroups(List<String> statesFilter) {
return groupMetadataManager.listGroups(statesFilter, lastCommittedOffset);
}
public void verifyHeartbeat(
String groupId,
JoinGroupResponseData joinResponse,
@ -1202,6 +1207,7 @@ public class GroupMetadataManagerTest {
}
lastWrittenOffset++;
snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
}
}
@ -8603,6 +8609,83 @@ public class GroupMetadataManagerTest {
assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), heartbeatResponse.errorCode());
}
@Test
public void testListGroups() {
String consumerGroupId = "consumer-group-id";
String genericGroupId = "generic-group-id";
String memberId1 = Uuid.randomUuid().toString();
String genericGroupType = "generic";
Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(assignor))
.withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10))
.build();
context.replay(newGroupMetadataRecord(
genericGroupId,
new GroupMetadataValue()
.setMembers(Collections.emptyList())
.setGeneration(2)
.setLeader(null)
.setProtocolType(genericGroupType)
.setProtocol("range")
.setCurrentStateTimestamp(context.time.milliseconds()),
MetadataVersion.latest()));
context.commit();
GenericGroup genericGroup = context.groupMetadataManager.getOrMaybeCreateGenericGroup(genericGroupId, false);
context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, new ConsumerGroupMember.Builder(memberId1)
.setSubscribedTopicNames(Collections.singletonList(fooTopicName))
.build()));
context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 11));
Map<String, ListGroupsResponseData.ListedGroup> actualAllGroupMap =
context.sendListGroups(Collections.emptyList())
.stream().collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity()));
Map<String, ListGroupsResponseData.ListedGroup> expectAllGroupMap =
Stream.of(
new ListGroupsResponseData.ListedGroup()
.setGroupId(genericGroup.groupId())
.setProtocolType(genericGroupType)
.setGroupState(EMPTY.toString()),
new ListGroupsResponseData.ListedGroup()
.setGroupId(consumerGroupId)
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState(ConsumerGroup.ConsumerGroupState.EMPTY.toString())
).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity()));
assertEquals(expectAllGroupMap, actualAllGroupMap);
context.commit();
actualAllGroupMap = context.sendListGroups(Collections.emptyList()).stream()
.collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity()));
expectAllGroupMap =
Stream.of(
new ListGroupsResponseData.ListedGroup()
.setGroupId(genericGroup.groupId())
.setProtocolType(genericGroupType)
.setGroupState(EMPTY.toString()),
new ListGroupsResponseData.ListedGroup()
.setGroupId(consumerGroupId)
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState(ConsumerGroup.ConsumerGroupState.ASSIGNING.toString())
).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity()));
assertEquals(expectAllGroupMap, actualAllGroupMap);
actualAllGroupMap = context.sendListGroups(Collections.singletonList("Empty")).stream()
.collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity()));
expectAllGroupMap = Stream.of(
new ListGroupsResponseData.ListedGroup()
.setGroupId(genericGroup.groupId())
.setProtocolType(genericGroupType)
.setGroupState(EMPTY.toString())
).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity()));
assertEquals(expectAllGroupMap, actualAllGroupMap);
}
public static <T> void assertUnorderedListEquals(
List<T> expected,
List<T> actual

View File

@ -632,6 +632,20 @@ public class ConsumerGroupTest {
group.validateOffsetCommit("member-id", "", 0);
}
@Test
public void testAsListedGroup() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo");
snapshotRegistry.getOrCreateSnapshot(0);
assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), group.stateAsString(0));
group.updateMember(new ConsumerGroupMember.Builder("member1")
.setSubscribedTopicNames(Collections.singletonList("foo"))
.build());
snapshotRegistry.getOrCreateSnapshot(1);
assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), group.stateAsString(0));
assertEquals(ConsumerGroup.ConsumerGroupState.STABLE.toString(), group.stateAsString(1));
}
@Test
public void testValidateOffsetFetch() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());