From f309299f3cf92c3ed6fe545c628117b9028c2917 Mon Sep 17 00:00:00 2001 From: zhaohaidao <76048421@qq.com> Date: Fri, 15 Sep 2023 14:45:03 +0800 Subject: [PATCH] KAFKA-14503: Implement ListGroups (#14271) This patch implements the ListGroups API in the new group coordinator. Reviewers: David Jacot --- checkstyle/suppressions.xml | 2 +- .../apache/kafka/coordinator/group/Group.java | 11 ++ .../group/GroupCoordinatorService.java | 34 +++- .../group/GroupCoordinatorShard.java | 17 ++ .../group/GroupMetadataManager.java | 20 +++ .../group/consumer/ConsumerGroup.java | 26 +++ .../group/generic/GenericGroup.java | 14 +- .../group/GroupCoordinatorServiceTest.java | 148 ++++++++++++++++++ .../group/GroupMetadataManagerTest.java | 87 +++++++++- .../group/consumer/ConsumerGroupTest.java | 14 ++ 10 files changed, 365 insertions(+), 8 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 6e946b1bead..9862a237fa9 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -326,7 +326,7 @@ + files="(GroupMetadataManager|GroupMetadataManagerTest|GroupCoordinatorServiceTest).java"/> future = new CompletableFuture<>(); + final List results = new ArrayList<>(); + final Set existingPartitionSet = runtime.partitions(); + final AtomicInteger cnt = new AtomicInteger(existingPartitionSet.size()); + + for (TopicPartition tp : existingPartitionSet) { + runtime.scheduleReadOperation( + "list-groups", + tp, + (coordinator, lastCommittedOffset) -> coordinator.listGroups(request.statesFilter(), lastCommittedOffset) + ).handle((groups, exception) -> { + if (exception == null) { + synchronized (results) { + results.addAll(groups); + } + } else { + if (!(exception instanceof NotCoordinatorException)) { + future.complete(new ListGroupsResponseData().setErrorCode(Errors.forException(exception).code())); + } + } + if (cnt.decrementAndGet() == 0) { + future.complete(new ListGroupsResponseData().setGroups(results)); + } + return null; + }); + } + return future; } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index ed68bb22c5a..97327c9722f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.message.JoinGroupResponseData; import org.apache.kafka.common.message.LeaveGroupRequestData; import org.apache.kafka.common.message.LeaveGroupResponseData; +import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.OffsetCommitResponseData; import org.apache.kafka.common.message.OffsetFetchRequestData; @@ -60,6 +61,7 @@ import org.apache.kafka.image.MetadataImage; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.timeline.SnapshotRegistry; +import java.util.List; import java.util.concurrent.CompletableFuture; /** @@ -308,6 +310,21 @@ public class GroupCoordinatorShard implements CoordinatorShard { return offsetMetadataManager.commitOffset(context, request); } + /** + * Handles a ListGroups request. + * + * @param statesFilter The states of the groups we want to list. + * If empty all groups are returned with their state. + * @param committedOffset A specified committed offset corresponding to this shard + * @return A list containing the ListGroupsResponseData.ListedGroup + */ + public List listGroups( + List statesFilter, + long committedOffset + ) throws ApiException { + return groupMetadataManager.listGroups(statesFilter, committedOffset); + } + /** * Handles a LeaveGroup request. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index a174a0fb1ff..002b7956857 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -41,6 +41,7 @@ import org.apache.kafka.common.message.LeaveGroupRequestData; import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity; import org.apache.kafka.common.message.LeaveGroupResponseData; import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse; +import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.protocol.Errors; @@ -95,6 +96,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.kafka.common.protocol.Errors.COORDINATOR_NOT_AVAILABLE; import static org.apache.kafka.common.protocol.Errors.ILLEGAL_GENERATION; @@ -424,6 +426,24 @@ public class GroupMetadataManager { return group; } + /** + * Get the Group List. + * + * @param statesFilter The states of the groups we want to list. + * If empty all groups are returned with their state. + * @param committedOffset A specified committed offset corresponding to this shard + * + * @return A list containing the ListGroupsResponseData.ListedGroup + */ + + public List listGroups(List statesFilter, long committedOffset) { + Stream groupStream = groups.values(committedOffset).stream(); + if (!statesFilter.isEmpty()) { + groupStream = groupStream.filter(group -> statesFilter.contains(group.stateAsString(committedOffset))); + } + return groupStream.map(group -> group.asListedGroup(committedOffset)).collect(Collectors.toList()); + } + /** * Gets or maybe creates a consumer group. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java index 5328020baac..10222c3a3cc 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java @@ -16,9 +16,11 @@ */ package org.apache.kafka.coordinator.group.consumer; +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.StaleMemberEpochException; import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.coordinator.group.Group; import org.apache.kafka.image.ClusterImage; import org.apache.kafka.image.TopicImage; @@ -179,6 +181,23 @@ public class ConsumerGroup implements Group { return state.get().toString(); } + /** + * @return The current state as a String with given committedOffset. + */ + public String stateAsString(long committedOffset) { + return state.get(committedOffset).toString(); + } + + /** + * @return the group formatted as a list group response based on the committed offset. + */ + public ListGroupsResponseData.ListedGroup asListedGroup(long committedOffset) { + return new ListGroupsResponseData.ListedGroup() + .setGroupId(groupId) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState(state.get(committedOffset).toString()); + } + /** * @return The group id. */ @@ -194,6 +213,13 @@ public class ConsumerGroup implements Group { return state.get(); } + /** + * @return The current state based on committed offset. + */ + public ConsumerGroupState state(long committedOffset) { + return state.get(committedOffset); + } + /** * @return The group epoch. */ diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java index d163f385fe0..18f96f2f78e 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java @@ -242,6 +242,16 @@ public class GenericGroup implements Group { return this.state.toString(); } + /** + * The state of this group based on the committed offset. + * + * @return The current state as a String. + */ + @Override + public String stateAsString(long committedOffset) { + return this.state.toString(); + } + /** * @return the group id. */ @@ -1167,9 +1177,9 @@ public class GenericGroup implements Group { } /** - * @return the group formatted as a list group response. + * @return the group formatted as a list group response based on the committed offset. */ - public ListGroupsResponseData.ListedGroup asListedGroup() { + public ListGroupsResponseData.ListedGroup asListedGroup(long committedOffset) { return new ListGroupsResponseData.ListedGroup() .setGroupId(groupId) .setProtocolType(protocolType.orElse("")) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index 61af5582824..8f39f524484 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.coordinator.group; +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.errors.CoordinatorLoadInProgressException; @@ -23,6 +24,7 @@ import org.apache.kafka.common.errors.CoordinatorNotAvailableException; import org.apache.kafka.common.errors.InvalidFetchSizeException; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.errors.NotCoordinatorException; import org.apache.kafka.common.errors.NotEnoughReplicasException; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.errors.RebalanceInProgressException; @@ -36,6 +38,8 @@ import org.apache.kafka.common.message.HeartbeatRequestData; import org.apache.kafka.common.message.HeartbeatResponseData; import org.apache.kafka.common.message.JoinGroupRequestData; import org.apache.kafka.common.message.JoinGroupResponseData; +import org.apache.kafka.common.message.ListGroupsRequestData; +import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.OffsetFetchRequestData; import org.apache.kafka.common.message.OffsetFetchResponseData; import org.apache.kafka.common.message.LeaveGroupRequestData; @@ -63,10 +67,12 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentMatchers; +import org.mockito.internal.util.collections.Sets; import java.net.InetAddress; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.OptionalInt; import java.util.Properties; import java.util.concurrent.CompletableFuture; @@ -603,6 +609,148 @@ public class GroupCoordinatorServiceTest { ); } + @Test + public void testListGroups() throws ExecutionException, InterruptedException, TimeoutException { + CoordinatorRuntime runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + int partitionCount = 3; + service.startup(() -> partitionCount); + + ListGroupsRequestData request = new ListGroupsRequestData(); + + List expectedResults = Arrays.asList( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group0") + .setGroupState("Stable") + .setProtocolType("protocol1"), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group1") + .setGroupState("Empty") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group2") + .setGroupState("Dead") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + ); + when(runtime.partitions()).thenReturn(Sets.newSet( + new TopicPartition("__consumer_offsets", 0), + new TopicPartition("__consumer_offsets", 1), + new TopicPartition("__consumer_offsets", 2) + )); + for (int i = 0; i < partitionCount; i++) { + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("list-groups"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", i)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i)))); + } + + CompletableFuture responseFuture = service.listGroups( + requestContext(ApiKeys.LIST_GROUPS), + request + ); + + List actualResults = responseFuture.get(5, TimeUnit.SECONDS).groups(); + assertEquals(expectedResults, actualResults); + } + + @Test + public void testListGroupsFailedWithNotCoordinatorException() + throws InterruptedException, ExecutionException, TimeoutException { + CoordinatorRuntime runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + int partitionCount = 3; + service.startup(() -> partitionCount); + + List expectedResults = Arrays.asList( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group0") + .setGroupState("Stable") + .setProtocolType("protocol1"), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group1") + .setGroupState("Empty") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + ); + + ListGroupsRequestData request = new ListGroupsRequestData(); + when(runtime.partitions()).thenReturn(Sets.newSet( + new TopicPartition("__consumer_offsets", 0), + new TopicPartition("__consumer_offsets", 1), + new TopicPartition("__consumer_offsets", 2) + )); + for (int i = 0; i < 2; i++) { + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("list-groups"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", i)), + ArgumentMatchers.any() + )).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i)))); + } + + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("list-groups"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)), + ArgumentMatchers.any() + )).thenReturn(FutureUtils.failedFuture(new NotCoordinatorException(""))); + + CompletableFuture responseFuture = service.listGroups( + requestContext(ApiKeys.LIST_GROUPS), + request + ); + List actualResults = responseFuture.get(5, TimeUnit.SECONDS).groups(); + assertEquals(expectedResults, actualResults); + } + + @Test + public void testListGroupsFailedImmediately() + throws InterruptedException, ExecutionException, TimeoutException { + CoordinatorRuntime runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime + ); + int partitionCount = 3; + service.startup(() -> partitionCount); + + ListGroupsRequestData request = new ListGroupsRequestData(); + when(runtime.partitions()).thenReturn(Sets.newSet( + new TopicPartition("__consumer_offsets", 0), + new TopicPartition("__consumer_offsets", 1), + new TopicPartition("__consumer_offsets", 2) + )); + for (int i = 0; i < 2; i++) { + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("list-groups"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", i)), + ArgumentMatchers.any() + )).thenReturn(new CompletableFuture<>()); + } + + when(runtime.scheduleReadOperation( + ArgumentMatchers.eq("list-groups"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)), + ArgumentMatchers.any() + )).thenReturn(FutureUtils.failedFuture(new CoordinatorLoadInProgressException(""))); + + CompletableFuture responseFuture = service.listGroups( + requestContext(ApiKeys.LIST_GROUPS), + request + ); + ListGroupsResponseData listGroupsResponseData = responseFuture.get(5, TimeUnit.SECONDS); + + assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS.code(), listGroupsResponseData.errorCode()); + assertEquals(Collections.emptyList(), listGroupsResponseData.groups()); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testFetchOffsets( diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 3a735db37e3..a1b25575e8e 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -41,6 +41,7 @@ import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMe import org.apache.kafka.common.message.LeaveGroupRequestData; import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity; import org.apache.kafka.common.message.LeaveGroupResponseData; +import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment; import org.apache.kafka.common.message.SyncGroupResponseData; @@ -115,8 +116,10 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol; @@ -455,8 +458,6 @@ public class GroupMetadataManagerTest { public CoordinatorResult consumerGroupHeartbeat( ConsumerGroupHeartbeatRequestData request ) { - snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset); - RequestContext context = new RequestContext( new RequestHeader( ApiKeys.CONSUMER_GROUP_HEARTBEAT, @@ -1023,6 +1024,10 @@ public class GroupMetadataManagerTest { ); } + public List sendListGroups(List statesFilter) { + return groupMetadataManager.listGroups(statesFilter, lastCommittedOffset); + } + public void verifyHeartbeat( String groupId, JoinGroupResponseData joinResponse, @@ -1202,6 +1207,7 @@ public class GroupMetadataManagerTest { } lastWrittenOffset++; + snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset); } } @@ -8603,6 +8609,83 @@ public class GroupMetadataManagerTest { assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), heartbeatResponse.errorCode()); } + @Test + public void testListGroups() { + String consumerGroupId = "consumer-group-id"; + String genericGroupId = "generic-group-id"; + String memberId1 = Uuid.randomUuid().toString(); + String genericGroupType = "generic"; + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10)) + .build(); + context.replay(newGroupMetadataRecord( + genericGroupId, + new GroupMetadataValue() + .setMembers(Collections.emptyList()) + .setGeneration(2) + .setLeader(null) + .setProtocolType(genericGroupType) + .setProtocol("range") + .setCurrentStateTimestamp(context.time.milliseconds()), + MetadataVersion.latest())); + context.commit(); + GenericGroup genericGroup = context.groupMetadataManager.getOrMaybeCreateGenericGroup(genericGroupId, false); + context.replay(RecordHelpers.newMemberSubscriptionRecord(consumerGroupId, new ConsumerGroupMember.Builder(memberId1) + .setSubscribedTopicNames(Collections.singletonList(fooTopicName)) + .build())); + context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 11)); + + Map actualAllGroupMap = + context.sendListGroups(Collections.emptyList()) + .stream().collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity())); + Map expectAllGroupMap = + Stream.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId(genericGroup.groupId()) + .setProtocolType(genericGroupType) + .setGroupState(EMPTY.toString()), + new ListGroupsResponseData.ListedGroup() + .setGroupId(consumerGroupId) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState(ConsumerGroup.ConsumerGroupState.EMPTY.toString()) + ).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity())); + + assertEquals(expectAllGroupMap, actualAllGroupMap); + + context.commit(); + actualAllGroupMap = context.sendListGroups(Collections.emptyList()).stream() + .collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity())); + expectAllGroupMap = + Stream.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId(genericGroup.groupId()) + .setProtocolType(genericGroupType) + .setGroupState(EMPTY.toString()), + new ListGroupsResponseData.ListedGroup() + .setGroupId(consumerGroupId) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState(ConsumerGroup.ConsumerGroupState.ASSIGNING.toString()) + ).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity())); + + assertEquals(expectAllGroupMap, actualAllGroupMap); + + actualAllGroupMap = context.sendListGroups(Collections.singletonList("Empty")).stream() + .collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity())); + expectAllGroupMap = Stream.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId(genericGroup.groupId()) + .setProtocolType(genericGroupType) + .setGroupState(EMPTY.toString()) + ).collect(Collectors.toMap(ListGroupsResponseData.ListedGroup::groupId, Function.identity())); + + assertEquals(expectAllGroupMap, actualAllGroupMap); + } + public static void assertUnorderedListEquals( List expected, List actual diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java index aa848aa3f5d..9c9421958c5 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java @@ -632,6 +632,20 @@ public class ConsumerGroupTest { group.validateOffsetCommit("member-id", "", 0); } + @Test + public void testAsListedGroup() { + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + ConsumerGroup group = new ConsumerGroup(snapshotRegistry, "group-foo"); + snapshotRegistry.getOrCreateSnapshot(0); + assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), group.stateAsString(0)); + group.updateMember(new ConsumerGroupMember.Builder("member1") + .setSubscribedTopicNames(Collections.singletonList("foo")) + .build()); + snapshotRegistry.getOrCreateSnapshot(1); + assertEquals(ConsumerGroup.ConsumerGroupState.EMPTY.toString(), group.stateAsString(0)); + assertEquals(ConsumerGroup.ConsumerGroupState.STABLE.toString(), group.stateAsString(1)); + } + @Test public void testValidateOffsetFetch() { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());