From 32c887b05e088f175f49d871506312caf34db256 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Tue, 19 Nov 2024 15:47:12 +0000 Subject: [PATCH] KAFKA-17949: Introduce GroupState and replace ShareGroupState (#17763) This PR introduces the unified GroupState enum for all group types from KIP-1043. This PR also removes ShareGroupState and begins the work to replace Admin.listShareGroups with Admin.listGroups. That will complete in a future PR. Reviewers: Manikumar Reddy --- .../admin/ConsumerGroupDescription.java | 72 +++- .../clients/admin/ConsumerGroupListing.java | 76 +++- .../kafka/clients/admin/GroupListing.java | 33 +- .../kafka/clients/admin/KafkaAdminClient.java | 37 +- .../admin/ListConsumerGroupsOptions.java | 34 +- .../clients/admin/ListGroupsOptions.java | 19 + .../clients/admin/ListShareGroupsOptions.java | 8 +- .../clients/admin/ShareGroupDescription.java | 24 +- .../clients/admin/ShareGroupListing.java | 20 +- .../DescribeConsumerGroupsHandler.java | 6 +- .../internals/DescribeShareGroupsHandler.java | 6 +- .../kafka/common/ConsumerGroupState.java | 2 + .../org/apache/kafka/common/GroupState.java | 93 +++++ .../apache/kafka/common/ShareGroupState.java | 56 --- .../kafka/clients/admin/GroupListingTest.java | 9 +- .../clients/admin/KafkaAdminClientTest.java | 55 ++- .../kafka/clients/admin/MockAdminClient.java | 3 +- .../common/requests/RequestResponseTest.java | 4 +- .../connect/mirror/MirrorCheckpointTask.java | 6 +- .../api/PlaintextAdminIntegrationTest.scala | 339 ++++++++++++++++-- .../ShareGroupDescribeRequestTest.scala | 4 +- .../metrics/GroupCoordinatorMetricsTest.java | 8 +- .../consumer/group/ConsumerGroupCommand.java | 83 ++--- ...{GroupState.java => GroupInformation.java} | 10 +- .../consumer/group/ShareGroupCommand.java | 69 ++-- .../apache/kafka/tools/GroupsCommandTest.java | 41 +-- .../group/ConsumerGroupServiceTest.java | 20 +- .../group/DeleteConsumerGroupsTest.java | 10 +- .../group/DescribeConsumerGroupTest.java | 100 +++--- .../consumer/group/ListConsumerGroupTest.java | 124 +++---- .../group/ResetConsumerGroupOffsetTest.java | 8 +- .../consumer/group/ShareGroupCommandTest.java | 83 ++--- 32 files changed, 993 insertions(+), 469 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/common/GroupState.java delete mode 100644 clients/src/main/java/org/apache/kafka/common/ShareGroupState.java rename tools/src/main/java/org/apache/kafka/tools/consumer/group/{GroupState.java => GroupInformation.java} (81%) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java index 13ec5965eed..4f3dc837cde 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java @@ -18,6 +18,7 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.GroupState; import org.apache.kafka.common.GroupType; import org.apache.kafka.common.Node; import org.apache.kafka.common.acl.AclOperation; @@ -38,10 +39,14 @@ public class ConsumerGroupDescription { private final Collection members; private final String partitionAssignor; private final GroupType type; - private final ConsumerGroupState state; + private final GroupState groupState; private final Node coordinator; private final Set authorizedOperations; + /** + * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupState, Node)}. + */ + @Deprecated public ConsumerGroupDescription(String groupId, boolean isSimpleConsumerGroup, Collection members, @@ -51,6 +56,10 @@ public class ConsumerGroupDescription { this(groupId, isSimpleConsumerGroup, members, partitionAssignor, state, coordinator, Collections.emptySet()); } + /** + * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupState, Node, Set)}. + */ + @Deprecated public ConsumerGroupDescription(String groupId, boolean isSimpleConsumerGroup, Collection members, @@ -61,6 +70,10 @@ public class ConsumerGroupDescription { this(groupId, isSimpleConsumerGroup, members, partitionAssignor, GroupType.CLASSIC, state, coordinator, authorizedOperations); } + /** + * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupType, GroupState, Node, Set)}. + */ + @Deprecated public ConsumerGroupDescription(String groupId, boolean isSimpleConsumerGroup, Collection members, @@ -75,7 +88,45 @@ public class ConsumerGroupDescription { Collections.unmodifiableList(new ArrayList<>(members)); this.partitionAssignor = partitionAssignor == null ? "" : partitionAssignor; this.type = type; - this.state = state; + this.groupState = GroupState.parse(state.name()); + this.coordinator = coordinator; + this.authorizedOperations = authorizedOperations; + } + + public ConsumerGroupDescription(String groupId, + boolean isSimpleConsumerGroup, + Collection members, + String partitionAssignor, + GroupState groupState, + Node coordinator) { + this(groupId, isSimpleConsumerGroup, members, partitionAssignor, groupState, coordinator, Collections.emptySet()); + } + + public ConsumerGroupDescription(String groupId, + boolean isSimpleConsumerGroup, + Collection members, + String partitionAssignor, + GroupState groupState, + Node coordinator, + Set authorizedOperations) { + this(groupId, isSimpleConsumerGroup, members, partitionAssignor, GroupType.CLASSIC, groupState, coordinator, authorizedOperations); + } + + public ConsumerGroupDescription(String groupId, + boolean isSimpleConsumerGroup, + Collection members, + String partitionAssignor, + GroupType type, + GroupState groupState, + Node coordinator, + Set authorizedOperations) { + this.groupId = groupId == null ? "" : groupId; + this.isSimpleConsumerGroup = isSimpleConsumerGroup; + this.members = members == null ? Collections.emptyList() : + Collections.unmodifiableList(new ArrayList<>(members)); + this.partitionAssignor = partitionAssignor == null ? "" : partitionAssignor; + this.type = type; + this.groupState = groupState; this.coordinator = coordinator; this.authorizedOperations = authorizedOperations; } @@ -90,14 +141,14 @@ public class ConsumerGroupDescription { Objects.equals(members, that.members) && Objects.equals(partitionAssignor, that.partitionAssignor) && type == that.type && - state == that.state && + groupState == that.groupState && Objects.equals(coordinator, that.coordinator) && Objects.equals(authorizedOperations, that.authorizedOperations); } @Override public int hashCode() { - return Objects.hash(groupId, isSimpleConsumerGroup, members, partitionAssignor, type, state, coordinator, authorizedOperations); + return Objects.hash(groupId, isSimpleConsumerGroup, members, partitionAssignor, type, groupState, coordinator, authorizedOperations); } /** @@ -138,9 +189,18 @@ public class ConsumerGroupDescription { /** * The consumer group state, or UNKNOWN if the state is too new for us to parse. + * @deprecated Since 4.0. Use {@link #groupState()} instead. */ + @Deprecated public ConsumerGroupState state() { - return state; + return ConsumerGroupState.parse(groupState.name()); + } + + /** + * The group state, or UNKNOWN if the state is too new for us to parse. + */ + public GroupState groupState() { + return groupState; } /** @@ -164,7 +224,7 @@ public class ConsumerGroupDescription { ", members=" + members.stream().map(MemberDescription::toString).collect(Collectors.joining(",")) + ", partitionAssignor=" + partitionAssignor + ", type=" + type + - ", state=" + state + + ", groupState=" + groupState + ", coordinator=" + coordinator + ", authorizedOperations=" + authorizedOperations + ")"; diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java index 96a3ecce9d0..076b51b89de 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java @@ -18,6 +18,7 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.GroupState; import org.apache.kafka.common.GroupType; import java.util.Objects; @@ -29,28 +30,30 @@ import java.util.Optional; public class ConsumerGroupListing { private final String groupId; private final boolean isSimpleConsumerGroup; - private final Optional state; + private final Optional groupState; private final Optional type; /** * Create an instance with the specified parameters. * - * @param groupId Group Id - * @param isSimpleConsumerGroup If consumer group is simple or not. + * @param groupId Group Id. + * @param isSimpleConsumerGroup If consumer group is simple or not. */ public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup) { - this(groupId, isSimpleConsumerGroup, Optional.empty(), Optional.empty()); + this(groupId, Optional.empty(), Optional.empty(), isSimpleConsumerGroup); } /** * Create an instance with the specified parameters. * - * @param groupId Group Id - * @param isSimpleConsumerGroup If consumer group is simple or not. - * @param state The state of the consumer group + * @param groupId Group Id. + * @param isSimpleConsumerGroup If consumer group is simple or not. + * @param state The state of the consumer group. + * @deprecated Since 4.0. Use {@link #ConsumerGroupListing(String, Optional, boolean)}. */ + @Deprecated public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup, Optional state) { - this(groupId, isSimpleConsumerGroup, state, Optional.empty()); + this(groupId, Objects.requireNonNull(state).map(state0 -> GroupState.parse(state0.name())), Optional.empty(), isSimpleConsumerGroup); } /** @@ -60,17 +63,51 @@ public class ConsumerGroupListing { * @param isSimpleConsumerGroup If consumer group is simple or not. * @param state The state of the consumer group. * @param type The type of the consumer group. + * @deprecated Since 4.0. Use {@link #ConsumerGroupListing(String, Optional, Optional, boolean)}. */ + @Deprecated public ConsumerGroupListing( String groupId, boolean isSimpleConsumerGroup, Optional state, Optional type + ) { + this(groupId, Objects.requireNonNull(state).map(state0 -> GroupState.parse(state0.name())), type, isSimpleConsumerGroup); + } + + /** + * Create an instance with the specified parameters. + * + * @param groupId Group Id. + * @param groupState The state of the consumer group. + * @param isSimpleConsumerGroup If consumer group is simple or not. + */ + public ConsumerGroupListing( + String groupId, + Optional groupState, + boolean isSimpleConsumerGroup + ) { + this(groupId, groupState, Optional.empty(), isSimpleConsumerGroup); + } + + /** + * Create an instance with the specified parameters. + * + * @param groupId Group Id. + * @param groupState The state of the consumer group. + * @param type The type of the consumer group. + * @param isSimpleConsumerGroup If consumer group is simple or not. + */ + public ConsumerGroupListing( + String groupId, + Optional groupState, + Optional type, + boolean isSimpleConsumerGroup ) { this.groupId = groupId; - this.isSimpleConsumerGroup = isSimpleConsumerGroup; - this.state = Objects.requireNonNull(state); + this.groupState = Objects.requireNonNull(groupState); this.type = Objects.requireNonNull(type); + this.isSimpleConsumerGroup = isSimpleConsumerGroup; } /** @@ -88,10 +125,19 @@ public class ConsumerGroupListing { } /** - * Consumer Group state + * Group state */ + public Optional groupState() { + return groupState; + } + + /** + * Consumer Group state + * @deprecated Since 4.0. Use {@link #groupState()}. + */ + @Deprecated public Optional state() { - return state; + return groupState.map(state0 -> ConsumerGroupState.parse(state0.name())); } /** @@ -108,14 +154,14 @@ public class ConsumerGroupListing { return "(" + "groupId='" + groupId + '\'' + ", isSimpleConsumerGroup=" + isSimpleConsumerGroup + - ", state=" + state + + ", groupState=" + groupState + ", type=" + type + ')'; } @Override public int hashCode() { - return Objects.hash(groupId, isSimpleConsumerGroup(), state, type); + return Objects.hash(groupId, isSimpleConsumerGroup(), groupState, type); } @Override @@ -125,7 +171,7 @@ public class ConsumerGroupListing { ConsumerGroupListing that = (ConsumerGroupListing) o; return isSimpleConsumerGroup() == that.isSimpleConsumerGroup() && Objects.equals(groupId, that.groupId) && - Objects.equals(state, that.state) && + Objects.equals(groupState, that.groupState) && Objects.equals(type, that.type); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/GroupListing.java b/clients/src/main/java/org/apache/kafka/clients/admin/GroupListing.java index 0ee2a211e70..8a0727c284f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/GroupListing.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/GroupListing.java @@ -17,7 +17,9 @@ package org.apache.kafka.clients.admin; +import org.apache.kafka.common.GroupState; import org.apache.kafka.common.GroupType; +import org.apache.kafka.common.annotation.InterfaceStability; import java.util.Objects; import java.util.Optional; @@ -25,22 +27,26 @@ import java.util.Optional; /** * A listing of a group in the cluster. */ +@InterfaceStability.Evolving public class GroupListing { private final String groupId; private final Optional type; private final String protocol; + private final Optional groupState; /** * Create an instance with the specified parameters. * - * @param groupId Group Id - * @param type Group type - * @param protocol Protocol + * @param groupId Group Id + * @param type Group type + * @param protocol Protocol + * @param groupState Group state */ - public GroupListing(String groupId, Optional type, String protocol) { + public GroupListing(String groupId, Optional type, String protocol, Optional groupState) { this.groupId = groupId; this.type = Objects.requireNonNull(type); this.protocol = protocol; + this.groupState = groupState; } /** @@ -75,6 +81,19 @@ public class GroupListing { return protocol; } + /** + * The group state. + *

+ * If the broker returns a group state which is not recognised, as might + * happen when talking to a broker with a later version, the state will be + * Optional.of(GroupState.UNKNOWN). + * + * @return An Optional containing the state, if available. + */ + public Optional groupState() { + return groupState; + } + /** * If the group is a simple consumer group or not. */ @@ -88,12 +107,13 @@ public class GroupListing { "groupId='" + groupId + '\'' + ", type=" + type.map(GroupType::toString).orElse("none") + ", protocol='" + protocol + '\'' + + ", groupState=" + groupState.map(GroupState::toString).orElse("none") + ')'; } @Override public int hashCode() { - return Objects.hash(groupId, type, protocol); + return Objects.hash(groupId, type, protocol, groupState); } @Override @@ -103,6 +123,7 @@ public class GroupListing { GroupListing that = (GroupListing) o; return Objects.equals(groupId, that.groupId) && Objects.equals(type, that.type) && - Objects.equals(protocol, that.protocol); + Objects.equals(protocol, that.protocol) && + Objects.equals(groupState, that.groupState); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 1b6d3efc8e1..bfa15077c8e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -63,8 +63,8 @@ import org.apache.kafka.clients.admin.internals.RemoveMembersFromConsumerGroupHa import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.GroupState; import org.apache.kafka.common.GroupType; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; @@ -72,7 +72,6 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.ShareGroupState; import org.apache.kafka.common.TopicCollection; import org.apache.kafka.common.TopicCollection.TopicIdCollection; import org.apache.kafka.common.TopicCollection.TopicNameCollection; @@ -3596,8 +3595,13 @@ public class KafkaAdminClient extends AdminClient { .stream() .map(GroupType::toString) .collect(Collectors.toList()); + List groupStates = options.groupStates() + .stream() + .map(GroupState::toString) + .collect(Collectors.toList()); return new ListGroupsRequest.Builder(new ListGroupsRequestData() .setTypesFilter(groupTypes) + .setStatesFilter(groupStates) ); } @@ -3610,10 +3614,17 @@ public class KafkaAdminClient extends AdminClient { type = Optional.of(GroupType.parse(group.groupType())); } final String protocolType = group.protocolType(); + final Optional groupState; + if (group.groupState() == null || group.groupState().isEmpty()) { + groupState = Optional.empty(); + } else { + groupState = Optional.of(GroupState.parse(group.groupState())); + } final GroupListing groupListing = new GroupListing( groupId, type, - protocolType + protocolType, + groupState ); results.addListing(groupListing); } @@ -3738,9 +3749,9 @@ public class KafkaAdminClient extends AdminClient { runnable.call(new Call("listConsumerGroups", deadline, new ConstantNodeIdProvider(node.id())) { @Override ListGroupsRequest.Builder createRequest(int timeoutMs) { - List states = options.states() + List states = options.groupStates() .stream() - .map(ConsumerGroupState::toString) + .map(GroupState::toString) .collect(Collectors.toList()); List groupTypes = options.types() .stream() @@ -3756,17 +3767,17 @@ public class KafkaAdminClient extends AdminClient { String protocolType = group.protocolType(); if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) { final String groupId = group.groupId(); - final Optional state = group.groupState().isEmpty() + final Optional groupState = group.groupState().isEmpty() ? Optional.empty() - : Optional.of(ConsumerGroupState.parse(group.groupState())); + : Optional.of(GroupState.parse(group.groupState())); final Optional type = group.groupType().isEmpty() ? Optional.empty() : Optional.of(GroupType.parse(group.groupType())); final ConsumerGroupListing groupListing = new ConsumerGroupListing( groupId, - protocolType.isEmpty(), - state, - type + groupState, + type, + protocolType.isEmpty() ); results.addListing(groupListing); } @@ -3927,7 +3938,7 @@ public class KafkaAdminClient extends AdminClient { ListGroupsRequest.Builder createRequest(int timeoutMs) { List states = options.states() .stream() - .map(ShareGroupState::toString) + .map(GroupState::toString) .collect(Collectors.toList()); List types = Collections.singletonList(GroupType.SHARE.toString()); return new ListGroupsRequest.Builder(new ListGroupsRequestData() @@ -3938,9 +3949,9 @@ public class KafkaAdminClient extends AdminClient { private void maybeAddShareGroup(ListGroupsResponseData.ListedGroup group) { final String groupId = group.groupId(); - final Optional state = group.groupState().isEmpty() + final Optional state = group.groupState().isEmpty() ? Optional.empty() - : Optional.of(ShareGroupState.parse(group.groupState())); + : Optional.of(GroupState.parse(group.groupState())); final ShareGroupListing groupListing = new ShareGroupListing(groupId, state); results.addListing(groupListing); } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java index acbe3f00d60..9005f65a0c6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java @@ -18,12 +18,14 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.GroupState; import org.apache.kafka.common.GroupType; import org.apache.kafka.common.annotation.InterfaceStability; import java.util.Collections; import java.util.HashSet; import java.util.Set; +import java.util.stream.Collectors; /** * Options for {@link Admin#listConsumerGroups()}. @@ -33,17 +35,30 @@ import java.util.Set; @InterfaceStability.Evolving public class ListConsumerGroupsOptions extends AbstractOptions { - private Set states = Collections.emptySet(); - + private Set groupStates = Collections.emptySet(); private Set types = Collections.emptySet(); + /** + * If groupStates is set, only groups in these states will be returned by listGroups(). + * Otherwise, all groups are returned. + * This operation is supported by brokers with version 2.6.0 or later. + */ + public ListConsumerGroupsOptions inGroupStates(Set groupStates) { + this.groupStates = (groupStates == null || groupStates.isEmpty()) ? Collections.emptySet() : Set.copyOf(groupStates); + return this; + } + /** * If states is set, only groups in these states will be returned by listConsumerGroups(). * Otherwise, all groups are returned. * This operation is supported by brokers with version 2.6.0 or later. + * @deprecated Since 4.0. Use {@link #inGroupStates(Set)}. */ + @Deprecated public ListConsumerGroupsOptions inStates(Set states) { - this.states = (states == null || states.isEmpty()) ? Collections.emptySet() : new HashSet<>(states); + this.groupStates = (states == null || states.isEmpty()) + ? Collections.emptySet() + : states.stream().map(state -> GroupState.parse(state.name())).collect(Collectors.toSet()); return this; } @@ -57,10 +72,19 @@ public class ListConsumerGroupsOptions extends AbstractOptions groupStates() { + return groupStates; + } + + /** + * Returns the list of States that are requested or empty if no states have been specified. + * @deprecated Since 4.0. Use {@link #inGroupStates(Set)}. + */ + @Deprecated public Set states() { - return states; + return groupStates.stream().map(groupState -> ConsumerGroupState.parse(groupState.name())).collect(Collectors.toSet()); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsOptions.java index 042c88dc80f..d1fa2c7b288 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsOptions.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.admin; +import org.apache.kafka.common.GroupState; import org.apache.kafka.common.GroupType; import org.apache.kafka.common.annotation.InterfaceStability; @@ -31,8 +32,19 @@ import java.util.Set; @InterfaceStability.Evolving public class ListGroupsOptions extends AbstractOptions { + private Set groupStates = Collections.emptySet(); private Set types = Collections.emptySet(); + /** + * If groupStates is set, only groups in these states will be returned by listGroups(). + * Otherwise, all groups are returned. + * This operation is supported by brokers with version 2.6.0 or later. + */ + public ListGroupsOptions inGroupStates(Set groupStates) { + this.groupStates = (groupStates == null || groupStates.isEmpty()) ? Collections.emptySet() : Set.copyOf(groupStates); + return this; + } + /** * If types is set, only groups of these types will be returned by listGroups(). * Otherwise, all groups are returned. @@ -42,6 +54,13 @@ public class ListGroupsOptions extends AbstractOptions { return this; } + /** + * Returns the list of group states that are requested or empty if no states have been specified. + */ + public Set groupStates() { + return groupStates; + } + /** * Returns the list of group types that are requested or empty if no types have been specified. */ diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupsOptions.java index 61f0aa40eb2..9dc9cddef4f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupsOptions.java @@ -17,7 +17,7 @@ package org.apache.kafka.clients.admin; -import org.apache.kafka.common.ShareGroupState; +import org.apache.kafka.common.GroupState; import org.apache.kafka.common.annotation.InterfaceStability; import java.util.Collections; @@ -32,12 +32,12 @@ import java.util.Set; @InterfaceStability.Evolving public class ListShareGroupsOptions extends AbstractOptions { - private Set states = Collections.emptySet(); + private Set states = Collections.emptySet(); /** * If states is set, only groups in these states will be returned. Otherwise, all groups are returned. */ - public ListShareGroupsOptions inStates(Set states) { + public ListShareGroupsOptions inStates(Set states) { this.states = (states == null) ? Collections.emptySet() : new HashSet<>(states); return this; } @@ -45,7 +45,7 @@ public class ListShareGroupsOptions extends AbstractOptions states() { + public Set states() { return states; } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupDescription.java index 30d88e31192..52fe0136aa2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupDescription.java @@ -17,8 +17,8 @@ package org.apache.kafka.clients.admin; +import org.apache.kafka.common.GroupState; import org.apache.kafka.common.Node; -import org.apache.kafka.common.ShareGroupState; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.annotation.InterfaceStability; @@ -36,26 +36,26 @@ import java.util.stream.Collectors; public class ShareGroupDescription { private final String groupId; private final Collection members; - private final ShareGroupState state; + private final GroupState groupState; private final Node coordinator; private final Set authorizedOperations; public ShareGroupDescription(String groupId, Collection members, - ShareGroupState state, + GroupState groupState, Node coordinator) { - this(groupId, members, state, coordinator, Collections.emptySet()); + this(groupId, members, groupState, coordinator, Collections.emptySet()); } public ShareGroupDescription(String groupId, Collection members, - ShareGroupState state, + GroupState groupState, Node coordinator, Set authorizedOperations) { this.groupId = groupId == null ? "" : groupId; this.members = members == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(members)); - this.state = state; + this.groupState = groupState; this.coordinator = coordinator; this.authorizedOperations = authorizedOperations; } @@ -67,14 +67,14 @@ public class ShareGroupDescription { final ShareGroupDescription that = (ShareGroupDescription) o; return Objects.equals(groupId, that.groupId) && Objects.equals(members, that.members) && - state == that.state && + groupState == that.groupState && Objects.equals(coordinator, that.coordinator) && Objects.equals(authorizedOperations, that.authorizedOperations); } @Override public int hashCode() { - return Objects.hash(groupId, members, state, coordinator, authorizedOperations); + return Objects.hash(groupId, members, groupState, coordinator, authorizedOperations); } /** @@ -92,10 +92,10 @@ public class ShareGroupDescription { } /** - * The share group state, or UNKNOWN if the state is too new for us to parse. + * The group state, or UNKNOWN if the state is too new for us to parse. */ - public ShareGroupState state() { - return state; + public GroupState groupState() { + return groupState; } /** @@ -116,7 +116,7 @@ public class ShareGroupDescription { public String toString() { return "(groupId=" + groupId + ", members=" + members.stream().map(MemberDescription::toString).collect(Collectors.joining(",")) + - ", state=" + state + + ", groupState=" + groupState + ", coordinator=" + coordinator + ", authorizedOperations=" + authorizedOperations + ")"; diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupListing.java b/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupListing.java index 05e605a8277..da760e88c8c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupListing.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupListing.java @@ -17,7 +17,7 @@ package org.apache.kafka.clients.admin; -import org.apache.kafka.common.ShareGroupState; +import org.apache.kafka.common.GroupState; import org.apache.kafka.common.annotation.InterfaceStability; import java.util.Objects; @@ -31,7 +31,7 @@ import java.util.Optional; @InterfaceStability.Evolving public class ShareGroupListing { private final String groupId; - private final Optional state; + private final Optional groupState; /** * Create an instance with the specified parameters. @@ -46,11 +46,11 @@ public class ShareGroupListing { * Create an instance with the specified parameters. * * @param groupId Group Id - * @param state The state of the share group + * @param groupState The state of the share group */ - public ShareGroupListing(String groupId, Optional state) { + public ShareGroupListing(String groupId, Optional groupState) { this.groupId = groupId; - this.state = Objects.requireNonNull(state); + this.groupState = Objects.requireNonNull(groupState); } /** @@ -63,21 +63,21 @@ public class ShareGroupListing { /** * The share group state. */ - public Optional state() { - return state; + public Optional groupState() { + return groupState; } @Override public String toString() { return "(" + "groupId='" + groupId + '\'' + - ", state=" + state + + ", groupState=" + groupState + ')'; } @Override public int hashCode() { - return Objects.hash(groupId, state); + return Objects.hash(groupId, groupState); } @Override @@ -86,6 +86,6 @@ public class ShareGroupListing { if (!(o instanceof ShareGroupListing)) return false; ShareGroupListing that = (ShareGroupListing) o; return Objects.equals(groupId, that.groupId) && - Objects.equals(state, that.state); + Objects.equals(groupState, that.groupState); } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java index 6233c4082e6..e66d45e2635 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java @@ -21,7 +21,7 @@ import org.apache.kafka.clients.admin.MemberAssignment; import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; -import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.GroupState; import org.apache.kafka.common.GroupType; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; @@ -231,7 +231,7 @@ public class DescribeConsumerGroupsHandler implements AdminApiHandler + * The following table shows the correspondence between the group states and types. + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
StateClassic groupConsumer groupShare group
UNKNOWNYesYesYes
PREPARING_REBALANCEYesYes
COMPLETING_REBALANCEYesYes
STABLEYesYesYes
DEADYesYesYes
EMPTYYesYesYes
ASSIGNINGYes
RECONCILINGYes
+ */ +@InterfaceStability.Evolving +public enum GroupState { + UNKNOWN("Unknown"), + PREPARING_REBALANCE("PreparingRebalance"), + COMPLETING_REBALANCE("CompletingRebalance"), + STABLE("Stable"), + DEAD("Dead"), + EMPTY("Empty"), + ASSIGNING("Assigning"), + RECONCILING("Reconciling"); + + private static final Map NAME_TO_ENUM = Arrays.stream(values()) + .collect(Collectors.toMap(state -> state.name.toUpperCase(Locale.ROOT), Function.identity())); + + private final String name; + + GroupState(String name) { + this.name = name; + } + + /** + * Case-insensitive group state lookup by string name. + */ + public static GroupState parse(String name) { + GroupState state = NAME_TO_ENUM.get(name.toUpperCase(Locale.ROOT)); + return state == null ? UNKNOWN : state; + } + + public static Set groupStatesForType(GroupType type) { + if (type == GroupType.CLASSIC) { + return Set.of(PREPARING_REBALANCE, COMPLETING_REBALANCE, STABLE, DEAD, EMPTY); + } else if (type == GroupType.CONSUMER) { + return Set.of(PREPARING_REBALANCE, COMPLETING_REBALANCE, STABLE, DEAD, EMPTY, ASSIGNING, RECONCILING); + } else if (type == GroupType.SHARE) { + return Set.of(STABLE, DEAD, EMPTY); + } else { + throw new IllegalArgumentException("Group type not known"); + } + } + + @Override + public String toString() { + return name; + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/ShareGroupState.java b/clients/src/main/java/org/apache/kafka/common/ShareGroupState.java deleted file mode 100644 index ad73a6bd095..00000000000 --- a/clients/src/main/java/org/apache/kafka/common/ShareGroupState.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.common; - -import java.util.Arrays; -import java.util.Locale; -import java.util.Map; -import java.util.function.Function; -import java.util.stream.Collectors; - -/** - * The share group state. - */ -public enum ShareGroupState { - UNKNOWN("Unknown"), - STABLE("Stable"), - DEAD("Dead"), - EMPTY("Empty"); - - private static final Map NAME_TO_ENUM = Arrays.stream(values()) - .collect(Collectors.toMap(state -> state.name.toUpperCase(Locale.ROOT), Function.identity())); - - private final String name; - - ShareGroupState(String name) { - this.name = name; - } - - /** - * Case-insensitive share group state lookup by string name. - */ - public static ShareGroupState parse(String name) { - ShareGroupState state = NAME_TO_ENUM.get(name.toUpperCase(Locale.ROOT)); - return state == null ? UNKNOWN : state; - } - - @Override - public String toString() { - return name; - } -} diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/GroupListingTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/GroupListingTest.java index 7a8279be34a..f07752d36cd 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/GroupListingTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/GroupListingTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.GroupState; import org.apache.kafka.common.GroupType; import org.junit.jupiter.api.Test; @@ -33,16 +34,16 @@ public class GroupListingTest { @Test public void testSimpleConsumerGroup() { - GroupListing gl = new GroupListing(GROUP_ID, Optional.of(GroupType.CLASSIC), ""); + GroupListing gl = new GroupListing(GROUP_ID, Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.EMPTY)); assertTrue(gl.isSimpleConsumerGroup()); - gl = new GroupListing(GROUP_ID, Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE); + gl = new GroupListing(GROUP_ID, Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE, Optional.of(GroupState.STABLE)); assertFalse(gl.isSimpleConsumerGroup()); - gl = new GroupListing(GROUP_ID, Optional.of(GroupType.CONSUMER), ""); + gl = new GroupListing(GROUP_ID, Optional.of(GroupType.CONSUMER), "", Optional.of(GroupState.EMPTY)); assertFalse(gl.isSimpleConsumerGroup()); - gl = new GroupListing(GROUP_ID, Optional.empty(), ""); + gl = new GroupListing(GROUP_ID, Optional.empty(), "", Optional.empty()); assertFalse(gl.isSimpleConsumerGroup()); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 1b44b93c704..a836e982140 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -30,14 +30,13 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; import org.apache.kafka.common.ClassicGroupState; import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.GroupState; import org.apache.kafka.common.GroupType; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.ShareGroupState; import org.apache.kafka.common.TopicCollection; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionReplica; @@ -3134,8 +3133,8 @@ public class KafkaAdminClientTest { assertEquals(2, listings.size()); List expected = new ArrayList<>(); - expected.add(new GroupListing("group-2", Optional.of(GroupType.CLASSIC), "")); - expected.add(new GroupListing("group-1", Optional.of(GroupType.CONSUMER), ConsumerProtocol.PROTOCOL_TYPE)); + expected.add(new GroupListing("group-2", Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.EMPTY))); + expected.add(new GroupListing("group-1", Optional.of(GroupType.CONSUMER), ConsumerProtocol.PROTOCOL_TYPE, Optional.of(GroupState.STABLE))); assertEquals(expected, listings); assertEquals(0, result.errors().get().size()); } @@ -3163,7 +3162,7 @@ public class KafkaAdminClientTest { assertEquals(1, listings.size()); List expected = new ArrayList<>(); - expected.add(new GroupListing("group-1", Optional.empty(), "any")); + expected.add(new GroupListing("group-1", Optional.empty(), "any", Optional.empty())); assertEquals(expected, listings); assertEquals(0, result.errors().get().size()); } @@ -3199,8 +3198,8 @@ public class KafkaAdminClientTest { assertEquals(2, listing.size()); List expected = new ArrayList<>(); - expected.add(new GroupListing("group-2", Optional.of(GroupType.CONSUMER), "")); - expected.add(new GroupListing("group-1", Optional.of(GroupType.CONSUMER), ConsumerProtocol.PROTOCOL_TYPE)); + expected.add(new GroupListing("group-2", Optional.of(GroupType.CONSUMER), "", Optional.of(GroupState.EMPTY))); + expected.add(new GroupListing("group-1", Optional.of(GroupType.CONSUMER), ConsumerProtocol.PROTOCOL_TYPE, Optional.of(GroupState.STABLE))); assertEquals(expected, listing); assertEquals(0, result.errors().get().size()); } @@ -3387,8 +3386,8 @@ public class KafkaAdminClientTest { assertEquals(2, listings.size()); List expected = new ArrayList<>(); - expected.add(new ConsumerGroupListing("group-2", true, Optional.of(ConsumerGroupState.EMPTY))); - expected.add(new ConsumerGroupListing("group-1", false, Optional.of(ConsumerGroupState.STABLE))); + expected.add(new ConsumerGroupListing("group-2", Optional.of(GroupState.EMPTY), true)); + expected.add(new ConsumerGroupListing("group-1", Optional.of(GroupState.STABLE), false)); assertEquals(expected, listings); assertEquals(0, result.errors().get().size()); } @@ -3403,7 +3402,7 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); env.kafkaClient().prepareResponseFrom( - expectListGroupsRequestWithFilters(singleton(ConsumerGroupState.STABLE.toString()), Collections.emptySet()), + expectListGroupsRequestWithFilters(singleton(GroupState.STABLE.toString()), Collections.emptySet()), new ListGroupsResponse(new ListGroupsResponseData() .setErrorCode(Errors.NONE.code()) .setGroups(singletonList( @@ -3414,13 +3413,13 @@ public class KafkaAdminClientTest { .setGroupType(GroupType.CLASSIC.toString())))), env.cluster().nodeById(0)); - final ListConsumerGroupsOptions options = new ListConsumerGroupsOptions().inStates(singleton(ConsumerGroupState.STABLE)); + final ListConsumerGroupsOptions options = new ListConsumerGroupsOptions().inGroupStates(singleton(GroupState.STABLE)); final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(options); Collection listings = result.valid().get(); assertEquals(1, listings.size()); List expected = new ArrayList<>(); - expected.add(new ConsumerGroupListing("group-1", false, Optional.of(ConsumerGroupState.STABLE), Optional.of(GroupType.CLASSIC))); + expected.add(new ConsumerGroupListing("group-1", Optional.of(GroupState.STABLE), Optional.of(GroupType.CLASSIC), false)); assertEquals(expected, listings); assertEquals(0, result.errors().get().size()); @@ -3449,8 +3448,8 @@ public class KafkaAdminClientTest { assertEquals(2, listings2.size()); List expected2 = new ArrayList<>(); - expected2.add(new ConsumerGroupListing("group-2", true, Optional.of(ConsumerGroupState.EMPTY), Optional.of(GroupType.CONSUMER))); - expected2.add(new ConsumerGroupListing("group-1", false, Optional.of(ConsumerGroupState.STABLE), Optional.of(GroupType.CONSUMER))); + expected2.add(new ConsumerGroupListing("group-2", Optional.of(GroupState.EMPTY), Optional.of(GroupType.CONSUMER), true)); + expected2.add(new ConsumerGroupListing("group-1", Optional.of(GroupState.STABLE), Optional.of(GroupType.CONSUMER), false)); assertEquals(expected2, listings2); assertEquals(0, result.errors().get().size()); } @@ -3488,7 +3487,7 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareUnsupportedVersionResponse( body -> body instanceof ListGroupsRequest); - options = new ListConsumerGroupsOptions().inStates(singleton(ConsumerGroupState.STABLE)); + options = new ListConsumerGroupsOptions().inGroupStates(singleton(GroupState.STABLE)); result = env.adminClient().listConsumerGroups(options); TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class); } @@ -3507,23 +3506,23 @@ public class KafkaAdminClientTest { // Check if we can list groups with older broker if we specify states and don't specify types. env.kafkaClient().prepareResponseFrom( - expectListGroupsRequestWithFilters(singleton(ConsumerGroupState.STABLE.toString()), Collections.emptySet()), + expectListGroupsRequestWithFilters(singleton(GroupState.STABLE.toString()), Collections.emptySet()), new ListGroupsResponse(new ListGroupsResponseData() .setErrorCode(Errors.NONE.code()) .setGroups(Collections.singletonList( new ListGroupsResponseData.ListedGroup() .setGroupId("group-1") .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) - .setGroupState(ConsumerGroupState.STABLE.toString())))), + .setGroupState(GroupState.STABLE.toString())))), env.cluster().nodeById(0)); - ListConsumerGroupsOptions options = new ListConsumerGroupsOptions().inStates(singleton(ConsumerGroupState.STABLE)); + ListConsumerGroupsOptions options = new ListConsumerGroupsOptions().inGroupStates(singleton(GroupState.STABLE)); ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(options); Collection listing = result.all().get(); assertEquals(1, listing.size()); List expected = Collections.singletonList( - new ConsumerGroupListing("group-1", false, Optional.of(ConsumerGroupState.STABLE)) + new ConsumerGroupListing("group-1", Optional.of(GroupState.STABLE), false) ); assertEquals(expected, listing); @@ -4109,7 +4108,7 @@ public class KafkaAdminClientTest { ), "range", GroupType.CONSUMER, - ConsumerGroupState.STABLE, + GroupState.STABLE, env.cluster().controller(), Collections.emptySet() )); @@ -4129,7 +4128,7 @@ public class KafkaAdminClientTest { ), "range", GroupType.CLASSIC, - ConsumerGroupState.STABLE, + GroupState.STABLE, env.cluster().controller(), Collections.emptySet() )); @@ -5029,7 +5028,7 @@ public class KafkaAdminClientTest { ShareGroupDescribeResponseData group0Data = new ShareGroupDescribeResponseData(); group0Data.groups().add(new ShareGroupDescribeResponseData.DescribedGroup() .setGroupId(GROUP_ID) - .setGroupState(ShareGroupState.STABLE.toString()) + .setGroupState(GroupState.STABLE.toString()) .setMembers(asList(memberOne, memberTwo))); final List expectedTopicPartitions = new ArrayList<>(); @@ -5044,7 +5043,7 @@ public class KafkaAdminClientTest { new MemberAssignment(new HashSet<>(expectedTopicPartitions)))); data.groups().add(new ShareGroupDescribeResponseData.DescribedGroup() .setGroupId(GROUP_ID) - .setGroupState(ShareGroupState.STABLE.toString()) + .setGroupState(GroupState.STABLE.toString()) .setMembers(asList(memberOne, memberTwo))); env.kafkaClient().prepareResponse(new ShareGroupDescribeResponse(data)); @@ -5097,7 +5096,7 @@ public class KafkaAdminClientTest { ShareGroupDescribeResponseData group0Data = new ShareGroupDescribeResponseData(); group0Data.groups().add(new ShareGroupDescribeResponseData.DescribedGroup() .setGroupId(GROUP_ID) - .setGroupState(ShareGroupState.STABLE.toString()) + .setGroupState(GroupState.STABLE.toString()) .setMembers(asList( new ShareGroupDescribeResponseData.Member() .setMemberId("0") @@ -5113,7 +5112,7 @@ public class KafkaAdminClientTest { ShareGroupDescribeResponseData group1Data = new ShareGroupDescribeResponseData(); group1Data.groups().add(new ShareGroupDescribeResponseData.DescribedGroup() .setGroupId("group-1") - .setGroupState(ShareGroupState.STABLE.toString()) + .setGroupState(GroupState.STABLE.toString()) .setMembers(asList( new ShareGroupDescribeResponseData.Member() .setMemberId("0") @@ -5231,7 +5230,7 @@ public class KafkaAdminClientTest { Set groupIds = new HashSet<>(); for (ShareGroupListing listing : listings) { groupIds.add(listing.groupId()); - assertTrue(listing.state().isPresent()); + assertTrue(listing.groupState().isPresent()); } assertEquals(Set.of("share-group-1", "share-group-2", "share-group-3", "share-group-4"), groupIds); @@ -5289,8 +5288,8 @@ public class KafkaAdminClientTest { assertEquals(2, listings.size()); List expected = new ArrayList<>(); - expected.add(new ShareGroupListing("share-group-1", Optional.of(ShareGroupState.STABLE))); - expected.add(new ShareGroupListing("share-group-2", Optional.of(ShareGroupState.EMPTY))); + expected.add(new ShareGroupListing("share-group-1", Optional.of(GroupState.STABLE))); + expected.add(new ShareGroupListing("share-group-2", Optional.of(GroupState.EMPTY))); assertEquals(expected, listings); assertEquals(0, result.errors().get().size()); } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index dc683598026..cc675f2bb39 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.internals.CoordinatorKey; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.GroupState; import org.apache.kafka.common.GroupType; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Metric; @@ -724,7 +725,7 @@ public class MockAdminClient extends AdminClient { @Override public synchronized ListGroupsResult listGroups(ListGroupsOptions options) { KafkaFutureImpl> future = new KafkaFutureImpl<>(); - future.complete(groupConfigs.keySet().stream().map(g -> new GroupListing(g, Optional.of(GroupType.CONSUMER), ConsumerProtocol.PROTOCOL_TYPE)).collect(Collectors.toList())); + future.complete(groupConfigs.keySet().stream().map(g -> new GroupListing(g, Optional.of(GroupType.CONSUMER), ConsumerProtocol.PROTOCOL_TYPE, Optional.of(GroupState.STABLE))).collect(Collectors.toList())); return new ListGroupsResult(future); } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 0154e6e2c9e..cb6a2458261 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -18,9 +18,9 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.GroupState; import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.Node; -import org.apache.kafka.common.ShareGroupState; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; @@ -1487,7 +1487,7 @@ public class RequestResponseTest { .setGroupId("group") .setErrorCode((short) 0) .setErrorMessage(Errors.forCode((short) 0).message()) - .setGroupState(ShareGroupState.EMPTY.toString()) + .setGroupState(GroupState.EMPTY.toString()) .setMembers(new ArrayList<>(0)) )) .setThrottleTimeMs(1000); diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java index e659c4aae79..bc49c3a09b7 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java @@ -20,7 +20,7 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.GroupState; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.UnknownMemberIdException; @@ -297,11 +297,11 @@ public class MirrorCheckpointTask extends SourceTask { for (String group : consumerGroups) { try { ConsumerGroupDescription consumerGroupDesc = consumerGroupsDesc.get(group).get(); - ConsumerGroupState consumerGroupState = consumerGroupDesc.state(); + GroupState consumerGroupState = consumerGroupDesc.groupState(); // sync offset to the target cluster only if the state of current consumer group is: // (1) idle: because the consumer at target is not actively consuming the mirrored topic // (2) dead: the new consumer that is recently created at source and never existed at target - if (consumerGroupState == ConsumerGroupState.EMPTY) { + if (consumerGroupState == GroupState.EMPTY) { idleConsumerGroupsOffset.put( group, adminCall( diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 15925714968..8c72c48aace 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -48,7 +48,7 @@ import org.apache.kafka.common.requests.DeleteRecordsRequest import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType} import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} import org.apache.kafka.common.utils.{Time, Utils} -import org.apache.kafka.common.{ConsumerGroupState, ElectionType, GroupType, IsolationLevel, ShareGroupState, TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica, Uuid} +import org.apache.kafka.common.{ConsumerGroupState, ElectionType, GroupState, GroupType, IsolationLevel, TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica, Uuid} import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig} import org.apache.kafka.network.SocketServerConfigs @@ -1899,7 +1899,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { TestUtils.waitUntilTrue(() => { val matching = client.listConsumerGroups.all.get.asScala.filter(group => group.groupId == testGroupId && - group.state.get == ConsumerGroupState.STABLE) + group.groupState.get == GroupState.STABLE) matching.size == 1 }, s"Expected to be able to list $testGroupId") @@ -1907,6 +1907,279 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val options = new ListConsumerGroupsOptions().withTypes(Set(groupType).asJava) val matching = client.listConsumerGroups(options).all.get.asScala.filter(group => group.groupId == testGroupId && + group.groupState.get == GroupState.STABLE) + matching.size == 1 + }, s"Expected to be able to list $testGroupId in group type $groupType") + + TestUtils.waitUntilTrue(() => { + val options = new ListConsumerGroupsOptions().withTypes(Set(groupType).asJava) + .inGroupStates(Set(GroupState.STABLE).asJava) + val matching = client.listConsumerGroups(options).all.get.asScala.filter(group => + group.groupId == testGroupId && + group.groupState.get == GroupState.STABLE) + matching.size == 1 + }, s"Expected to be able to list $testGroupId in group type $groupType and state Stable") + + TestUtils.waitUntilTrue(() => { + val options = new ListConsumerGroupsOptions().inGroupStates(Set(GroupState.STABLE).asJava) + val matching = client.listConsumerGroups(options).all.get.asScala.filter(group => + group.groupId == testGroupId && + group.groupState.get == GroupState.STABLE) + matching.size == 1 + }, s"Expected to be able to list $testGroupId in state Stable") + + TestUtils.waitUntilTrue(() => { + val options = new ListConsumerGroupsOptions().inGroupStates(Set(GroupState.EMPTY).asJava) + val matching = client.listConsumerGroups(options).all.get.asScala.filter( + _.groupId == testGroupId) + matching.isEmpty + }, s"Expected to find zero groups") + + val describeWithFakeGroupResult = client.describeConsumerGroups(Seq(testGroupId, fakeGroupId).asJava, + new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true)) + assertEquals(2, describeWithFakeGroupResult.describedGroups().size()) + + // Test that we can get information about the test consumer group. + assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(testGroupId)) + var testGroupDescription = describeWithFakeGroupResult.describedGroups().get(testGroupId).get() + + assertEquals(testGroupId, testGroupDescription.groupId()) + assertFalse(testGroupDescription.isSimpleConsumerGroup) + assertEquals(groupInstanceSet.size, testGroupDescription.members().size()) + val members = testGroupDescription.members() + members.asScala.foreach(member => assertEquals(testClientId, member.clientId())) + val topicPartitionsByTopic = members.asScala.flatMap(_.assignment().topicPartitions().asScala).groupBy(_.topic()) + topicSet.foreach { topic => + val topicPartitions = topicPartitionsByTopic.getOrElse(topic, List.empty) + assertEquals(testNumPartitions, topicPartitions.size) + } + + val expectedOperations = AclEntry.supportedOperations(ResourceType.GROUP) + assertEquals(expectedOperations, testGroupDescription.authorizedOperations()) + + // Test that the fake group is listed as dead. + assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(fakeGroupId)) + val fakeGroupDescription = describeWithFakeGroupResult.describedGroups().get(fakeGroupId).get() + + assertEquals(fakeGroupId, fakeGroupDescription.groupId()) + assertEquals(0, fakeGroupDescription.members().size()) + assertEquals("", fakeGroupDescription.partitionAssignor()) + assertEquals(GroupState.DEAD, fakeGroupDescription.groupState()) + assertEquals(expectedOperations, fakeGroupDescription.authorizedOperations()) + + // Test that all() returns 2 results + assertEquals(2, describeWithFakeGroupResult.all().get().size()) + + val testTopicPart0 = new TopicPartition(testTopicName, 0) + + // Test listConsumerGroupOffsets + TestUtils.waitUntilTrue(() => { + val parts = client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get() + parts.containsKey(testTopicPart0) && (parts.get(testTopicPart0).offset() == 1) + }, s"Expected the offset for partition 0 to eventually become 1.") + + // Test listConsumerGroupOffsets with requireStable true + val options = new ListConsumerGroupOffsetsOptions().requireStable(true) + var parts = client.listConsumerGroupOffsets(testGroupId, options) + .partitionsToOffsetAndMetadata().get() + assertTrue(parts.containsKey(testTopicPart0)) + assertEquals(1, parts.get(testTopicPart0).offset()) + + // Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec + val groupSpecs = Collections.singletonMap(testGroupId, + new ListConsumerGroupOffsetsSpec().topicPartitions(Collections.singleton(new TopicPartition(testTopicName, 0)))) + parts = client.listConsumerGroupOffsets(groupSpecs).partitionsToOffsetAndMetadata().get() + assertTrue(parts.containsKey(testTopicPart0)) + assertEquals(1, parts.get(testTopicPart0).offset()) + + // Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec and requireStable option + parts = client.listConsumerGroupOffsets(groupSpecs, options).partitionsToOffsetAndMetadata().get() + assertTrue(parts.containsKey(testTopicPart0)) + assertEquals(1, parts.get(testTopicPart0).offset()) + + // Test delete non-exist consumer instance + val invalidInstanceId = "invalid-instance-id" + var removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, new RemoveMembersFromConsumerGroupOptions( + Collections.singleton(new MemberToRemove(invalidInstanceId)) + )) + + TestUtils.assertFutureExceptionTypeEquals(removeMembersResult.all, classOf[UnknownMemberIdException]) + val firstMemberFuture = removeMembersResult.memberResult(new MemberToRemove(invalidInstanceId)) + TestUtils.assertFutureExceptionTypeEquals(firstMemberFuture, classOf[UnknownMemberIdException]) + + // Test consumer group deletion + var deleteResult = client.deleteConsumerGroups(Seq(testGroupId, fakeGroupId).asJava) + assertEquals(2, deleteResult.deletedGroups().size()) + + // Deleting the fake group ID should get GroupIdNotFoundException. + assertTrue(deleteResult.deletedGroups().containsKey(fakeGroupId)) + assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(fakeGroupId), + classOf[GroupIdNotFoundException]) + + // Deleting the real group ID should get GroupNotEmptyException + assertTrue(deleteResult.deletedGroups().containsKey(testGroupId)) + assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(testGroupId), + classOf[GroupNotEmptyException]) + + // Test delete one correct static member + val removeOptions = new RemoveMembersFromConsumerGroupOptions(Collections.singleton(new MemberToRemove(testInstanceId1))) + removeOptions.reason("test remove") + removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, removeOptions) + + assertNull(removeMembersResult.all().get()) + val validMemberFuture = removeMembersResult.memberResult(new MemberToRemove(testInstanceId1)) + assertNull(validMemberFuture.get()) + + val describeTestGroupResult = client.describeConsumerGroups(Seq(testGroupId).asJava, + new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true)) + assertEquals(1, describeTestGroupResult.describedGroups().size()) + + testGroupDescription = describeTestGroupResult.describedGroups().get(testGroupId).get() + + assertEquals(testGroupId, testGroupDescription.groupId) + assertFalse(testGroupDescription.isSimpleConsumerGroup) + assertEquals(consumerSet.size - 1, testGroupDescription.members().size()) + + // Delete all active members remaining (a static member + a dynamic member) + removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, new RemoveMembersFromConsumerGroupOptions()) + assertNull(removeMembersResult.all().get()) + + // The group should contain no members now. + testGroupDescription = client.describeConsumerGroups(Seq(testGroupId).asJava, + new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true)) + .describedGroups().get(testGroupId).get() + assertTrue(testGroupDescription.members().isEmpty) + + // Consumer group deletion on empty group should succeed + deleteResult = client.deleteConsumerGroups(Seq(testGroupId).asJava) + assertEquals(1, deleteResult.deletedGroups().size()) + + assertTrue(deleteResult.deletedGroups().containsKey(testGroupId)) + assertNull(deleteResult.deletedGroups().get(testGroupId).get()) + + // Test alterConsumerGroupOffsets + val alterConsumerGroupOffsetsResult = client.alterConsumerGroupOffsets(testGroupId, + Collections.singletonMap(testTopicPart0, new OffsetAndMetadata(0L))) + assertNull(alterConsumerGroupOffsetsResult.all().get()) + assertNull(alterConsumerGroupOffsetsResult.partitionResult(testTopicPart0).get()) + + // Verify alterConsumerGroupOffsets success + TestUtils.waitUntilTrue(() => { + val parts = client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get() + parts.containsKey(testTopicPart0) && (parts.get(testTopicPart0).offset() == 0) + }, s"Expected the offset for partition 0 to eventually become 0.") + } finally { + consumerThreads.foreach { + case consumerThread => + consumerThread.interrupt() + consumerThread.join() + } + } + } finally { + consumerSet.zip(groupInstanceSet).foreach(zipped => Utils.closeQuietly(zipped._1, zipped._2)) + } + } finally { + Utils.closeQuietly(client, "adminClient") + } + } + + /** + * Test the consumer group APIs. + */ + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17960")) + def testConsumerGroupsDeprecatedConsumerGroupState(quorum: String, groupProtocol: String): Unit = { + val config = createConfig + client = Admin.create(config) + try { + // Verify that initially there are no consumer groups to list. + val list1 = client.listConsumerGroups() + assertEquals(0, list1.all().get().size()) + assertEquals(0, list1.errors().get().size()) + assertEquals(0, list1.valid().get().size()) + val testTopicName = "test_topic" + val testTopicName1 = testTopicName + "1" + val testTopicName2 = testTopicName + "2" + val testNumPartitions = 2 + + client.createTopics(util.Arrays.asList( + new NewTopic(testTopicName, testNumPartitions, 1.toShort), + new NewTopic(testTopicName1, testNumPartitions, 1.toShort), + new NewTopic(testTopicName2, testNumPartitions, 1.toShort) + )).all().get() + waitForTopics(client, List(testTopicName, testTopicName1, testTopicName2), List()) + + val producer = createProducer() + try { + producer.send(new ProducerRecord(testTopicName, 0, null, null)).get() + } finally { + Utils.closeQuietly(producer, "producer") + } + + val EMPTY_GROUP_INSTANCE_ID = "" + val testGroupId = "test_group_id" + val testClientId = "test_client_id" + val testInstanceId1 = "test_instance_id_1" + val testInstanceId2 = "test_instance_id_2" + val fakeGroupId = "fake_group_id" + + def createProperties(groupInstanceId: String): Properties = { + val newConsumerConfig = new Properties(consumerConfig) + // We need to disable the auto commit because after the members got removed from group, the offset commit + // will cause the member rejoining and the test will be flaky (check ConsumerCoordinator#OffsetCommitResponseHandler) + newConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId) + newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId) + if (groupInstanceId != EMPTY_GROUP_INSTANCE_ID) { + newConsumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId) + } + newConsumerConfig + } + + // contains two static members and one dynamic member + val groupInstanceSet = Set(testInstanceId1, testInstanceId2, EMPTY_GROUP_INSTANCE_ID) + val consumerSet = groupInstanceSet.map { groupInstanceId => createConsumer(configOverrides = createProperties(groupInstanceId))} + val topicSet = Set(testTopicName, testTopicName1, testTopicName2) + + val latch = new CountDownLatch(consumerSet.size) + try { + def createConsumerThread[K,V](consumer: Consumer[K,V], topic: String): Thread = { + new Thread { + override def run : Unit = { + consumer.subscribe(Collections.singleton(topic)) + try { + while (true) { + consumer.poll(JDuration.ofSeconds(5)) + if (!consumer.assignment.isEmpty && latch.getCount > 0L) + latch.countDown() + consumer.commitSync() + } + } catch { + case _: InterruptException => // Suppress the output to stderr + } + } + } + } + + // Start consumers in a thread that will subscribe to a new group. + val consumerThreads = consumerSet.zip(topicSet).map(zipped => createConsumerThread(zipped._1, zipped._2)) + val groupType = if (groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name)) GroupType.CONSUMER else GroupType.CLASSIC + + try { + consumerThreads.foreach(_.start()) + assertTrue(latch.await(30000, TimeUnit.MILLISECONDS)) + // Test that we can list the new group. + TestUtils.waitUntilTrue(() => { + val matching = client.listConsumerGroups.all.get.asScala.filter(group => + group.groupId == testGroupId && + group.state.get == ConsumerGroupState.STABLE) + matching.size == 1 + }, s"Expected to be able to list $testGroupId") + + TestUtils.waitUntilTrue(() => { + val options = new ListConsumerGroupsOptions().withTypes(Set(groupType).asJava) + val matching = client.listConsumerGroups(options).all.get.asScala.filter(group => + group.groupId == testGroupId && group.state.get == ConsumerGroupState.STABLE) matching.size == 1 }, s"Expected to be able to list $testGroupId in group type $groupType") @@ -1923,7 +2196,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { TestUtils.waitUntilTrue(() => { val options = new ListConsumerGroupsOptions().inStates(Set(ConsumerGroupState.STABLE).asJava) val matching = client.listConsumerGroups(options).all.get.asScala.filter(group => - group.groupId == testGroupId && + group.groupId == testGroupId && group.state.get == ConsumerGroupState.STABLE) matching.size == 1 }, s"Expected to be able to list $testGroupId in state Stable") @@ -1931,7 +2204,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { TestUtils.waitUntilTrue(() => { val options = new ListConsumerGroupsOptions().inStates(Set(ConsumerGroupState.EMPTY).asJava) val matching = client.listConsumerGroups(options).all.get.asScala.filter( - _.groupId == testGroupId) + _.groupId == testGroupId) matching.isEmpty }, s"Expected to find zero groups") @@ -2046,8 +2319,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { // The group should contain no members now. testGroupDescription = client.describeConsumerGroups(Seq(testGroupId).asJava, - new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true)) - .describedGroups().get(testGroupId).get() + new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true)) + .describedGroups().get(testGroupId).get() assertTrue(testGroupDescription.members().isEmpty) // Consumer group deletion on empty group should succeed @@ -2068,13 +2341,13 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val parts = client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get() parts.containsKey(testTopicPart0) && (parts.get(testTopicPart0).offset() == 0) }, s"Expected the offset for partition 0 to eventually become 0.") - } finally { - consumerThreads.foreach { - case consumerThread => - consumerThread.interrupt() - consumerThread.join() + } finally { + consumerThreads.foreach { + case consumerThread => + consumerThread.interrupt() + consumerThread.join() + } } - } } finally { consumerSet.zip(groupInstanceSet).foreach(zipped => Utils.closeQuietly(zipped._1, zipped._2)) } @@ -2203,10 +2476,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { groups.size() == 4 }, "Expected to find all groups") - val classicGroupListing = new GroupListing(classicGroupId, Optional.of(GroupType.CLASSIC), "consumer") - val consumerGroupListing = new GroupListing(consumerGroupId, Optional.of(GroupType.CONSUMER), "consumer") - val shareGroupListing = new GroupListing(shareGroupId, Optional.of(GroupType.SHARE), "share") - val simpleGroupListing = new GroupListing(simpleGroupId, Optional.of(GroupType.CLASSIC), "") + val classicGroupListing = new GroupListing(classicGroupId, Optional.of(GroupType.CLASSIC), "consumer", Optional.of(GroupState.STABLE)) + val consumerGroupListing = new GroupListing(consumerGroupId, Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)) + val shareGroupListing = new GroupListing(shareGroupId, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)) + val simpleGroupListing = new GroupListing(simpleGroupId, Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.EMPTY)) var listGroupsResult = client.listGroups() assertTrue(listGroupsResult.errors().get().isEmpty) @@ -2327,6 +2600,11 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val producer = createProducer() try { // Verify that initially there are no share groups to list. + val list = client.listGroups() + assertEquals(0, list.all().get().size()) + assertEquals(0, list.errors().get().size()) + assertEquals(0, list.valid().get().size()) + val list1 = client.listShareGroups() assertEquals(0, list1.all().get().size()) assertEquals(0, list1.errors().get().size()) @@ -2350,21 +2628,40 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { TestUtils.waitUntilTrue(() => { client.listShareGroups.all.get.stream().filter(group => group.groupId == testGroupId && - group.state.get == ShareGroupState.STABLE).count() == 1 + group.groupState.get == GroupState.STABLE).count() == 1 }, s"Expected to be able to list $testGroupId") TestUtils.waitUntilTrue(() => { - val options = new ListShareGroupsOptions().inStates(Collections.singleton(ShareGroupState.STABLE)) + val options = new ListShareGroupsOptions().inStates(Collections.singleton(GroupState.STABLE)) client.listShareGroups(options).all.get.stream().filter(group => group.groupId == testGroupId && - group.state.get == ShareGroupState.STABLE).count() == 1 + group.groupState.get == GroupState.STABLE).count() == 1 }, s"Expected to be able to list $testGroupId in state Stable") TestUtils.waitUntilTrue(() => { - val options = new ListShareGroupsOptions().inStates(Collections.singleton(ShareGroupState.EMPTY)) + val options = new ListShareGroupsOptions().inStates(Collections.singleton(GroupState.EMPTY)) client.listShareGroups(options).all.get.stream().filter(_.groupId == testGroupId).count() == 0 }, s"Expected to find zero groups") + // listGroups is equivalent to listShareGroups so ensure that works too + TestUtils.waitUntilTrue(() => { + client.listGroups.all.get.stream().filter(group => + group.groupId == testGroupId && + group.groupState.get == GroupState.STABLE).count() == 1 + }, s"Expected to be able to list $testGroupId") + + TestUtils.waitUntilTrue(() => { + val options = new ListGroupsOptions().withTypes(Collections.singleton(GroupType.SHARE)).inGroupStates(Collections.singleton(GroupState.STABLE)) + client.listGroups(options).all.get.stream().filter(group => + group.groupId == testGroupId && + group.groupState.get == GroupState.STABLE).count() == 1 + }, s"Expected to be able to list $testGroupId in state Stable") + + TestUtils.waitUntilTrue(() => { + val options = new ListGroupsOptions().withTypes(Collections.singleton(GroupType.SHARE)).inGroupStates(Collections.singleton(GroupState.EMPTY)) + client.listGroups(options).all.get.stream().filter(_.groupId == testGroupId).count() == 0 + }, s"Expected to find zero groups") + val describeWithFakeGroupResult = client.describeShareGroups(util.Arrays.asList(testGroupId, fakeGroupId), new DescribeShareGroupsOptions().includeAuthorizedOperations(true)) assertEquals(2, describeWithFakeGroupResult.describedGroups().size()) @@ -2393,7 +2690,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(fakeGroupId, fakeGroupDescription.groupId()) assertEquals(0, fakeGroupDescription.members().size()) - assertEquals(ShareGroupState.DEAD, fakeGroupDescription.state()) + assertEquals(GroupState.DEAD, fakeGroupDescription.groupState()) assertNull(fakeGroupDescription.authorizedOperations()) // Test that all() returns 2 results diff --git a/core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala b/core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala index 6adfce9b57c..a6a1129d084 100644 --- a/core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala @@ -20,7 +20,7 @@ import org.apache.kafka.common.test.api.ClusterInstance import org.apache.kafka.common.test.api._ import org.apache.kafka.common.test.api.ClusterTestExtensions import kafka.utils.TestUtils -import org.apache.kafka.common.ShareGroupState +import org.apache.kafka.common.GroupState import org.apache.kafka.common.message.ShareGroupDescribeResponseData.DescribedGroup import org.apache.kafka.common.message.{ShareGroupDescribeRequestData, ShareGroupDescribeResponseData, ShareGroupHeartbeatResponseData} import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -114,7 +114,7 @@ class ShareGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCoord val expected = List( new DescribedGroup() .setGroupId("grp-1") - .setGroupState(ShareGroupState.STABLE.toString) + .setGroupState(GroupState.STABLE.toString) .setGroupEpoch(1) .setAssignmentEpoch(1) .setAssignorName("simple") diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java index db6fae506ff..d04aa533873 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java @@ -16,8 +16,8 @@ */ package org.apache.kafka.coordinator.group.metrics; +import org.apache.kafka.common.GroupState; import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.ShareGroupState; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.metrics.Metrics; @@ -116,19 +116,19 @@ public class GroupCoordinatorMetricsTest { GroupCoordinatorMetrics.METRICS_GROUP, "The number of share groups in empty state.", "protocol", Group.GroupType.SHARE.toString(), - "state", ShareGroupState.EMPTY.toString()), + "state", GroupState.EMPTY.toString()), metrics.metricName( "group-count", GroupCoordinatorMetrics.METRICS_GROUP, "The number of share groups in stable state.", "protocol", Group.GroupType.SHARE.toString(), - "state", ShareGroupState.STABLE.toString()), + "state", GroupState.STABLE.toString()), metrics.metricName( "group-count", GroupCoordinatorMetrics.METRICS_GROUP, "The number of share groups in dead state.", "protocol", Group.GroupType.SHARE.toString(), - "state", ShareGroupState.DEAD.toString()) + "state", GroupState.DEAD.toString()) )); try { diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java index d56513854d9..71af2a05ff5 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java @@ -38,7 +38,7 @@ import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.GroupState; import org.apache.kafka.common.GroupType; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; @@ -130,11 +130,12 @@ public class ConsumerGroupCommand { } } - static Set consumerGroupStatesFromString(String input) { - Set parsedStates = Arrays.stream(input.split(",")).map(s -> ConsumerGroupState.parse(s.trim())).collect(Collectors.toSet()); - if (parsedStates.contains(ConsumerGroupState.UNKNOWN)) { - Collection validStates = Arrays.stream(ConsumerGroupState.values()).filter(s -> s != ConsumerGroupState.UNKNOWN).collect(Collectors.toList()); - throw new IllegalArgumentException("Invalid state list '" + input + "'. Valid states are: " + validStates.stream().map(ConsumerGroupState::toString).collect(Collectors.joining(", "))); + static Set groupStatesFromString(String input) { + Set parsedStates = Arrays.stream(input.split(",")).map(s -> GroupState.parse(s.trim())).collect(Collectors.toSet()); + Set validStates = GroupState.groupStatesForType(GroupType.CONSUMER); + if (!validStates.containsAll(parsedStates)) { + throw new IllegalArgumentException("Invalid state list '" + input + "'. Valid states are: " + + validStates.stream().map(GroupState::toString).collect(Collectors.joining(", "))); } return parsedStates; } @@ -204,7 +205,7 @@ public class ConsumerGroupCommand { if (includeType || includeState) { Set types = typeValues(); - Set states = stateValues(); + Set states = stateValues(); List listings = listConsumerGroupsWithFilters(types, states); printGroupInfo(listings, includeType, includeState); @@ -213,11 +214,11 @@ public class ConsumerGroupCommand { } } - private Set stateValues() { + private Set stateValues() { String stateValue = opts.options.valueOf(opts.stateOpt); return (stateValue == null || stateValue.isEmpty()) ? Collections.emptySet() - : consumerGroupStatesFromString(stateValue); + : groupStatesFromString(stateValue); } private Set typeValues() { @@ -230,7 +231,7 @@ public class ConsumerGroupCommand { private void printGroupInfo(List groups, boolean includeType, boolean includeState) { Function groupId = ConsumerGroupListing::groupId; Function groupType = groupListing -> groupListing.type().orElse(GroupType.UNKNOWN).toString(); - Function groupState = groupListing -> groupListing.state().orElse(ConsumerGroupState.UNKNOWN).toString(); + Function groupState = groupListing -> groupListing.groupState().orElse(GroupState.UNKNOWN).toString(); OptionalInt maybeMax = groups.stream().mapToInt(groupListing -> Math.max(15, groupId.apply(groupListing).length())).max(); int maxGroupLen = maybeMax.orElse(15) + 10; @@ -270,26 +271,26 @@ public class ConsumerGroupCommand { } } - List listConsumerGroupsWithFilters(Set types, Set states) throws ExecutionException, InterruptedException { + List listConsumerGroupsWithFilters(Set types, Set states) throws ExecutionException, InterruptedException { ListConsumerGroupsOptions listConsumerGroupsOptions = withTimeoutMs(new ListConsumerGroupsOptions()); listConsumerGroupsOptions - .inStates(states) + .inGroupStates(states) .withTypes(types); ListConsumerGroupsResult result = adminClient.listConsumerGroups(listConsumerGroupsOptions); return new ArrayList<>(result.all().get()); } - private boolean shouldPrintMemberState(String group, Optional state, Optional numRows) { + private boolean shouldPrintMemberState(String group, Optional state, Optional numRows) { // numRows contains the number of data rows, if any, compiled from the API call in the caller method. // if it's undefined or 0, there is no relevant group information to display. - if (!numRows.isPresent()) { + if (numRows.isEmpty()) { printError("The consumer group '" + group + "' does not exist.", Optional.empty()); return false; } int num = numRows.get(); - ConsumerGroupState state0 = state.orElse(ConsumerGroupState.UNKNOWN); + GroupState state0 = state.orElse(GroupState.UNKNOWN); switch (state0) { case DEAD: printError("Consumer group '" + group + "' does not exist.", Optional.empty()); @@ -310,16 +311,16 @@ public class ConsumerGroupCommand { throw new KafkaException("Expected a valid consumer group state, but found '" + state0 + "'."); } - return !state0.equals(ConsumerGroupState.DEAD) && num > 0; + return !state0.equals(GroupState.DEAD) && num > 0; } private Optional size(Optional> colOpt) { return colOpt.map(Collection::size); } - private void printOffsets(Map, Optional>>> offsets) { + private void printOffsets(Map, Optional>>> offsets) { offsets.forEach((groupId, tuple) -> { - Optional state = tuple.getKey(); + Optional state = tuple.getKey(); Optional> assignments = tuple.getValue(); if (shouldPrintMemberState(groupId, state, size(assignments))) { @@ -361,14 +362,14 @@ public class ConsumerGroupCommand { return "\n%" + (-maxGroupLen) + "s %" + (-maxTopicLen) + "s %-10s %-15s %-15s %-15s %" + (-maxConsumerIdLen) + "s %" + (-maxHostLen) + "s %s"; } - private void printMembers(Map, Optional>>> members, boolean verbose) { + private void printMembers(Map, Optional>>> members, boolean verbose) { members.forEach((groupId, tuple) -> { - Optional state = tuple.getKey(); + Optional groupState = tuple.getKey(); Optional> assignments = tuple.getValue(); int maxGroupLen = 15, maxConsumerIdLen = 15, maxGroupInstanceIdLen = 17, maxHostLen = 15, maxClientIdLen = 15; boolean includeGroupInstanceId = false; - if (shouldPrintMemberState(groupId, state, size(assignments))) { + if (shouldPrintMemberState(groupId, groupState, size(assignments))) { // find proper columns width if (assignments.isPresent()) { for (MemberAssignmentState memberAssignment : assignments.get()) { @@ -425,16 +426,16 @@ public class ConsumerGroupCommand { }); } - private void printStates(Map states) { + private void printStates(Map states) { states.forEach((groupId, state) -> { - if (shouldPrintMemberState(groupId, Optional.of(state.state), Optional.of(1))) { + if (shouldPrintMemberState(groupId, Optional.of(state.groupState), Optional.of(1))) { String coordinator = state.coordinator.host() + ":" + state.coordinator.port() + " (" + state.coordinator.idString() + ")"; int coordinatorColLen = Math.max(25, coordinator.length()); String format = "\n%" + -coordinatorColLen + "s %-25s %-20s %-15s %s"; System.out.printf(format, "GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS"); - System.out.printf(format, state.group, coordinator, state.assignmentStrategy, state.state, state.numMembers); + System.out.printf(format, state.group, coordinator, state.assignmentStrategy, state.groupState, state.numMembers); System.out.println(); } }); @@ -450,15 +451,15 @@ public class ConsumerGroupCommand { long subActions = Stream.of(membersOptPresent, offsetsOptPresent, stateOptPresent).filter(x -> x).count(); if (subActions == 0 || offsetsOptPresent) { - TreeMap, Optional>>> offsets + TreeMap, Optional>>> offsets = collectGroupsOffsets(groupIds); printOffsets(offsets); } else if (membersOptPresent) { - TreeMap, Optional>>> members + TreeMap, Optional>>> members = collectGroupsMembers(groupIds, opts.options.has(opts.verboseOpt)); printMembers(members, opts.options.has(opts.verboseOpt)); } else { - TreeMap states = collectGroupsState(groupIds); + TreeMap states = collectGroupsState(groupIds); printStates(states); } } @@ -530,7 +531,7 @@ public class ConsumerGroupCommand { consumerGroups.forEach((groupId, groupDescription) -> { try { - String state = groupDescription.get().state().toString(); + String state = groupDescription.get().groupState().toString(); switch (state) { case "Empty": case "Dead": @@ -689,19 +690,19 @@ public class ConsumerGroupCommand { /** * Returns the state of the specified consumer group and partition assignment states */ - Entry, Optional>> collectGroupOffsets(String groupId) throws Exception { + Entry, Optional>> collectGroupOffsets(String groupId) throws Exception { return collectGroupsOffsets(Collections.singletonList(groupId)).getOrDefault(groupId, new SimpleImmutableEntry<>(Optional.empty(), Optional.empty())); } /** * Returns states of the specified consumer groups and partition assignment states */ - TreeMap, Optional>>> collectGroupsOffsets(Collection groupIds) throws Exception { + TreeMap, Optional>>> collectGroupsOffsets(Collection groupIds) throws Exception { Map consumerGroups = describeConsumerGroups(groupIds); - TreeMap, Optional>>> groupOffsets = new TreeMap<>(); + TreeMap, Optional>>> groupOffsets = new TreeMap<>(); consumerGroups.forEach((groupId, consumerGroup) -> { - ConsumerGroupState state = consumerGroup.state(); + GroupState state = consumerGroup.groupState(); Map committedOffsets = getCommittedOffsets(groupId); // The admin client returns `null` as a value to indicate that there is not committed offset for a partition. Function> getPartitionOffset = tp -> Optional.ofNullable(committedOffsets.get(tp)).map(OffsetAndMetadata::offset); @@ -746,16 +747,16 @@ public class ConsumerGroupCommand { return groupOffsets; } - Entry, Optional>> collectGroupMembers(String groupId, boolean verbose) throws Exception { + Entry, Optional>> collectGroupMembers(String groupId, boolean verbose) throws Exception { return collectGroupsMembers(Collections.singleton(groupId), verbose).get(groupId); } - TreeMap, Optional>>> collectGroupsMembers(Collection groupIds, boolean verbose) throws Exception { + TreeMap, Optional>>> collectGroupsMembers(Collection groupIds, boolean verbose) throws Exception { Map consumerGroups = describeConsumerGroups(groupIds); - TreeMap, Optional>>> res = new TreeMap<>(); + TreeMap, Optional>>> res = new TreeMap<>(); consumerGroups.forEach((groupId, consumerGroup) -> { - ConsumerGroupState state = consumerGroup.state(); + GroupState state = consumerGroup.groupState(); List memberAssignmentStates = consumerGroup.members().stream().map(consumer -> new MemberAssignmentState( groupId, @@ -771,19 +772,19 @@ public class ConsumerGroupCommand { return res; } - GroupState collectGroupState(String groupId) throws Exception { + GroupInformation collectGroupState(String groupId) throws Exception { return collectGroupsState(Collections.singleton(groupId)).get(groupId); } - TreeMap collectGroupsState(Collection groupIds) throws Exception { + TreeMap collectGroupsState(Collection groupIds) throws Exception { Map consumerGroups = describeConsumerGroups(groupIds); - TreeMap res = new TreeMap<>(); + TreeMap res = new TreeMap<>(); consumerGroups.forEach((groupId, groupDescription) -> - res.put(groupId, new GroupState( + res.put(groupId, new GroupInformation( groupId, groupDescription.coordinator(), groupDescription.partitionAssignor(), - groupDescription.state(), + groupDescription.groupState(), groupDescription.members().size() ))); return res; diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/GroupState.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/GroupInformation.java similarity index 81% rename from tools/src/main/java/org/apache/kafka/tools/consumer/group/GroupState.java rename to tools/src/main/java/org/apache/kafka/tools/consumer/group/GroupInformation.java index 18cd3f4bd5f..df1008cd439 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/GroupState.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/GroupInformation.java @@ -16,21 +16,21 @@ */ package org.apache.kafka.tools.consumer.group; -import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.GroupState; import org.apache.kafka.common.Node; -class GroupState { +class GroupInformation { final String group; final Node coordinator; final String assignmentStrategy; - final ConsumerGroupState state; + final GroupState groupState; final int numMembers; - GroupState(String group, Node coordinator, String assignmentStrategy, ConsumerGroupState state, int numMembers) { + GroupInformation(String group, Node coordinator, String assignmentStrategy, GroupState groupState, int numMembers) { this.group = group; this.coordinator = coordinator; this.assignmentStrategy = assignmentStrategy; - this.state = state; + this.groupState = groupState; this.numMembers = numMembers; } } diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java index 4e7dd8a45c5..85406e8c626 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java @@ -20,14 +20,15 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.AbstractOptions; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.DescribeShareGroupsResult; +import org.apache.kafka.clients.admin.GroupListing; +import org.apache.kafka.clients.admin.ListGroupsOptions; +import org.apache.kafka.clients.admin.ListGroupsResult; import org.apache.kafka.clients.admin.ListOffsetsResult; -import org.apache.kafka.clients.admin.ListShareGroupsOptions; -import org.apache.kafka.clients.admin.ListShareGroupsResult; import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.ShareGroupDescription; -import org.apache.kafka.clients.admin.ShareGroupListing; -import org.apache.kafka.common.ShareGroupState; +import org.apache.kafka.common.GroupState; +import org.apache.kafka.common.GroupType; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.server.util.CommandLineUtils; @@ -89,14 +90,13 @@ public class ShareGroupCommand { } } - static Set shareGroupStatesFromString(String input) { - Set parsedStates = - Arrays.stream(input.split(",")).map(s -> ShareGroupState.parse(s.trim())).collect(Collectors.toSet()); - if (parsedStates.contains(ShareGroupState.UNKNOWN)) { - Collection validStates = - Arrays.stream(ShareGroupState.values()).filter(s -> s != ShareGroupState.UNKNOWN).collect(Collectors.toList()); + static Set groupStatesFromString(String input) { + Set parsedStates = + Arrays.stream(input.split(",")).map(s -> GroupState.parse(s.trim())).collect(Collectors.toSet()); + Set validStates = GroupState.groupStatesForType(GroupType.SHARE); + if (!validStates.containsAll(parsedStates)) { throw new IllegalArgumentException("Invalid state list '" + input + "'. Valid states are: " + - validStates.stream().map(Object::toString).collect(Collectors.joining(", "))); + validStates.stream().map(GroupState::toString).collect(Collectors.joining(", "))); } return parsedStates; } @@ -128,10 +128,10 @@ public class ShareGroupCommand { public void listGroups() throws ExecutionException, InterruptedException { if (opts.options.has(opts.stateOpt)) { String stateValue = opts.options.valueOf(opts.stateOpt); - Set states = (stateValue == null || stateValue.isEmpty()) + Set states = (stateValue == null || stateValue.isEmpty()) ? Collections.emptySet() - : shareGroupStatesFromString(stateValue); - List listings = listShareGroupsWithState(states); + : groupStatesFromString(stateValue); + List listings = listShareGroupsInStates(states); printGroupInfo(listings); } else @@ -140,31 +140,32 @@ public class ShareGroupCommand { List listShareGroups() { try { - ListShareGroupsResult result = adminClient.listShareGroups(withTimeoutMs(new ListShareGroupsOptions())); - Collection listings = result.all().get(); - return listings.stream().map(ShareGroupListing::groupId).collect(Collectors.toList()); + ListGroupsResult result = adminClient.listGroups(withTimeoutMs(new ListGroupsOptions()).withTypes(Set.of(GroupType.SHARE))); + Collection listings = result.all().get(); + return listings.stream().map(GroupListing::groupId).collect(Collectors.toList()); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } } - List listShareGroupsWithState(Set states) throws ExecutionException, InterruptedException { - ListShareGroupsOptions listShareGroupsOptions = withTimeoutMs(new ListShareGroupsOptions()); - listShareGroupsOptions.inStates(states); - ListShareGroupsResult result = adminClient.listShareGroups(listShareGroupsOptions); + List listShareGroupsInStates(Set states) throws ExecutionException, InterruptedException { + ListGroupsOptions listGroupsOptions = withTimeoutMs(new ListGroupsOptions()); + listGroupsOptions.withTypes(Set.of(GroupType.SHARE)); + listGroupsOptions.inGroupStates(states); + ListGroupsResult result = adminClient.listGroups(listGroupsOptions); return new ArrayList<>(result.all().get()); } - private void printGroupInfo(List groups) { + private void printGroupInfo(List groups) { // find proper columns width int maxGroupLen = 15; - for (ShareGroupListing group : groups) { + for (GroupListing group : groups) { maxGroupLen = Math.max(maxGroupLen, group.groupId().length()); } System.out.printf("%" + (-maxGroupLen) + "s %s\n", "GROUP", "STATE"); - for (ShareGroupListing group : groups) { + for (GroupListing group : groups) { String groupId = group.groupId(); - String state = group.state().orElse(ShareGroupState.UNKNOWN).toString(); + String state = group.groupState().orElse(GroupState.UNKNOWN).toString(); System.out.printf("%" + (-maxGroupLen) + "s %s\n", groupId, state); } } @@ -174,14 +175,14 @@ public class ShareGroupCommand { * * @return Whether the group detail should be printed */ - public static boolean maybePrintEmptyGroupState(String group, ShareGroupState state, int numRows) { - if (state == ShareGroupState.DEAD) { + public static boolean maybePrintEmptyGroupState(String group, GroupState state, int numRows) { + if (state == GroupState.DEAD) { printError("Share group '" + group + "' does not exist.", Optional.empty()); - } else if (state == ShareGroupState.EMPTY) { + } else if (state == GroupState.EMPTY) { System.err.println("\nShare group '" + group + "' has no active members."); } - return !state.equals(ShareGroupState.DEAD) && numRows > 0; + return !state.equals(GroupState.DEAD) && numRows > 0; } public void describeGroups() throws ExecutionException, InterruptedException { @@ -233,9 +234,9 @@ public class ShareGroupCommand { private void printOffsets(ShareGroupDescription description) throws ExecutionException, InterruptedException { Map offsets = getOffsets(description.members()); - if (maybePrintEmptyGroupState(description.groupId(), description.state(), offsets.size())) { + if (maybePrintEmptyGroupState(description.groupId(), description.groupState(), offsets.size())) { String fmt = printOffsetFormat(description, offsets); - System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "OFFSET"); + System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "START-OFFSET"); for (Map.Entry offset : offsets.entrySet()) { System.out.printf(fmt, description.groupId(), offset.getKey().topic(), offset.getKey().partition(), offset.getValue()); @@ -253,7 +254,7 @@ public class ShareGroupCommand { } private void printStates(ShareGroupDescription description) { - maybePrintEmptyGroupState(description.groupId(), description.state(), 1); + maybePrintEmptyGroupState(description.groupId(), description.groupState(), 1); int groupLen = Math.max(15, description.groupId().length()); String coordinator = description.coordinator().host() + ":" + description.coordinator().port() + " (" + description.coordinator().idString() + ")"; @@ -261,14 +262,14 @@ public class ShareGroupCommand { String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s %-15s %s\n"; System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE", "#MEMBERS"); - System.out.printf(fmt, description.groupId(), coordinator, description.state().toString(), description.members().size()); + System.out.printf(fmt, description.groupId(), coordinator, description.groupState().toString(), description.members().size()); } private void printMembers(ShareGroupDescription description) { int groupLen = Math.max(15, description.groupId().length()); int maxConsumerIdLen = 15, maxHostLen = 15, maxClientIdLen = 15; Collection members = description.members(); - if (maybePrintEmptyGroupState(description.groupId(), description.state(), description.members().size())) { + if (maybePrintEmptyGroupState(description.groupId(), description.groupState(), description.members().size())) { for (MemberDescription member : members) { maxConsumerIdLen = Math.max(maxConsumerIdLen, member.consumerId().length()); maxHostLen = Math.max(maxHostLen, member.host().length()); diff --git a/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java index b5a884d9e49..0bc71a2b813 100644 --- a/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.clients.consumer.GroupProtocol; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaShareConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.GroupState; import org.apache.kafka.common.GroupType; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.Errors; @@ -198,9 +199,9 @@ public class GroupsCommandTest { GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient); ListGroupsResult result = AdminClientTestUtils.listGroupsResult( - new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer"), - new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer"), - new GroupListing("SG", Optional.of(GroupType.SHARE), "share") + new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer", Optional.of(GroupState.STABLE)), + new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)), + new GroupListing("SG", Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)) ); when(adminClient.listGroups()).thenReturn(result); @@ -225,9 +226,9 @@ public class GroupsCommandTest { GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient); ListGroupsResult result = AdminClientTestUtils.listGroupsResult( - new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer"), - new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer"), - new GroupListing("SG", Optional.of(GroupType.SHARE), "share") + new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer", Optional.of(GroupState.STABLE)), + new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)), + new GroupListing("SG", Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)) ); when(adminClient.listGroups()).thenReturn(result); @@ -251,9 +252,9 @@ public class GroupsCommandTest { GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient); ListGroupsResult result = AdminClientTestUtils.listGroupsResult( - new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer"), - new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer"), - new GroupListing("SG", Optional.of(GroupType.SHARE), "share") + new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer", Optional.of(GroupState.STABLE)), + new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)), + new GroupListing("SG", Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)) ); when(adminClient.listGroups()).thenReturn(result); @@ -276,9 +277,9 @@ public class GroupsCommandTest { GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient); ListGroupsResult result = AdminClientTestUtils.listGroupsResult( - new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer"), - new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer"), - new GroupListing("SG", Optional.of(GroupType.SHARE), "share") + new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer", Optional.of(GroupState.STABLE)), + new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)), + new GroupListing("SG", Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)) ); when(adminClient.listGroups()).thenReturn(result); @@ -302,9 +303,9 @@ public class GroupsCommandTest { GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient); ListGroupsResult result = AdminClientTestUtils.listGroupsResult( - new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer"), - new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer"), - new GroupListing("SG", Optional.of(GroupType.SHARE), "share") + new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer", Optional.of(GroupState.STABLE)), + new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)), + new GroupListing("SG", Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)) ); when(adminClient.listGroups()).thenReturn(result); @@ -327,9 +328,9 @@ public class GroupsCommandTest { GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient); ListGroupsResult result = AdminClientTestUtils.listGroupsResult( - new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer"), - new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer"), - new GroupListing("SG", Optional.of(GroupType.SHARE), "share") + new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer", Optional.of(GroupState.STABLE)), + new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)), + new GroupListing("SG", Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)) ); when(adminClient.listGroups()).thenReturn(result); @@ -352,8 +353,8 @@ public class GroupsCommandTest { GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient); ListGroupsResult result = AdminClientTestUtils.listGroupsResult( - new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer"), - new GroupListing("SG", Optional.of(GroupType.SHARE), "share") + new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)), + new GroupListing("SG", Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)) ); when(adminClient.listGroups()).thenReturn(result); diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java index daf5ff2ce9a..dcb8fd05481 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java @@ -31,7 +31,7 @@ import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.RangeAssignor; -import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.GroupState; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; @@ -85,14 +85,14 @@ public class ConsumerGroupServiceTest { ConsumerGroupCommand.ConsumerGroupService groupService = consumerGroupService(args); when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(GROUP)), any())) - .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE)); + .thenReturn(describeGroupsResult(GroupState.STABLE)); when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec()), any())) .thenReturn(listGroupOffsetsResult(GROUP)); when(admin.listOffsets(offsetsArgMatcher(), any())) .thenReturn(listOffsetsResult()); - Entry, Optional>> statesAndAssignments = groupService.collectGroupOffsets(GROUP); - assertEquals(Optional.of(ConsumerGroupState.STABLE), statesAndAssignments.getKey()); + Entry, Optional>> statesAndAssignments = groupService.collectGroupOffsets(GROUP); + assertEquals(Optional.of(GroupState.STABLE), statesAndAssignments.getKey()); assertTrue(statesAndAssignments.getValue().isPresent()); assertEquals(TOPIC_PARTITIONS.size(), statesAndAssignments.getValue().get().size()); @@ -139,7 +139,7 @@ public class ConsumerGroupServiceTest { true, Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions))), RangeAssignor.class.getName(), - ConsumerGroupState.STABLE, + GroupState.STABLE, new Node(1, "localhost", 9092)); Function, ArgumentMatcher>> offsetsArgMatcher = expectedPartitions -> @@ -164,8 +164,8 @@ public class ConsumerGroupServiceTest { )).thenReturn(new ListOffsetsResult(endOffsets.entrySet().stream().filter(e -> unassignedTopicPartitions.contains(e.getKey())) .collect(Collectors.toMap(Entry::getKey, Entry::getValue)))); - Entry, Optional>> statesAndAssignments = groupService.collectGroupOffsets(GROUP); - Optional state = statesAndAssignments.getKey(); + Entry, Optional>> statesAndAssignments = groupService.collectGroupOffsets(GROUP); + Optional state = statesAndAssignments.getKey(); Optional> assignments = statesAndAssignments.getValue(); Map> returnedOffsets = assignments.map(results -> @@ -183,7 +183,7 @@ public class ConsumerGroupServiceTest { expectedOffsets.put(testTopicPartition4, Optional.of(100L)); expectedOffsets.put(testTopicPartition5, Optional.empty()); - assertEquals(Optional.of(ConsumerGroupState.STABLE), state); + assertEquals(Optional.of(GroupState.STABLE), state); assertEquals(expectedOffsets, returnedOffsets); verify(admin, times(1)).describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(GROUP)), any()); @@ -203,7 +203,7 @@ public class ConsumerGroupServiceTest { ConsumerGroupCommand.ConsumerGroupService groupService = consumerGroupService(args.toArray(new String[0])); when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(GROUP)), any())) - .thenReturn(describeGroupsResult(ConsumerGroupState.DEAD)); + .thenReturn(describeGroupsResult(GroupState.DEAD)); when(admin.describeTopics(ArgumentMatchers.eq(topicsWithoutPartitionsSpecified), any())) .thenReturn(describeTopicsResult(topicsWithoutPartitionsSpecified)); when(admin.listOffsets(offsetsArgMatcher(), any())) @@ -227,7 +227,7 @@ public class ConsumerGroupServiceTest { }; } - private DescribeConsumerGroupsResult describeGroupsResult(ConsumerGroupState groupState) { + private DescribeConsumerGroupsResult describeGroupsResult(GroupState groupState) { MemberDescription member1 = new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", null); ConsumerGroupDescription description = new ConsumerGroupDescription(GROUP, true, diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java index c39f8ae3145..cf9c31cdd4a 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java @@ -20,7 +20,7 @@ import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.GroupProtocol; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; -import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.GroupState; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; import org.apache.kafka.common.protocol.Errors; @@ -56,8 +56,8 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CO import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.common.ConsumerGroupState.EMPTY; -import static org.apache.kafka.common.ConsumerGroupState.STABLE; +import static org.apache.kafka.common.GroupState.EMPTY; +import static org.apache.kafka.common.GroupState.STABLE; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -318,8 +318,8 @@ public class DeleteConsumerGroupsTest { ); } - private boolean checkGroupState(ConsumerGroupCommand.ConsumerGroupService service, String groupId, ConsumerGroupState state) throws Exception { - return Objects.equals(service.collectGroupState(groupId).state, state); + private boolean checkGroupState(ConsumerGroupCommand.ConsumerGroupService service, String groupId, GroupState state) throws Exception { + return Objects.equals(service.collectGroupState(groupId).groupState, state); } private ConsumerGroupCommand.ConsumerGroupService getConsumerGroupService(String[] args) { diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java index 3cc350be196..4efcd244e24 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java @@ -26,7 +26,7 @@ import org.apache.kafka.clients.consumer.GroupProtocol; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.clients.consumer.RoundRobinAssignor; -import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.GroupState; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.StringDeserializer; @@ -113,8 +113,8 @@ public class DescribeConsumerGroupTest { // note the group to be queried is a different (non-existing) group ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup}) ) { - Entry, Optional>> res = service.collectGroupOffsets(missingGroup); - assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false), + Entry, Optional>> res = service.collectGroupOffsets(missingGroup); + assertTrue(res.getKey().map(s -> s.equals(GroupState.DEAD)).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false), "Expected the state to be 'Dead', with no members in the group '" + missingGroup + "'."); } } @@ -132,12 +132,12 @@ public class DescribeConsumerGroupTest { // note the group to be queried is a different (non-existing) group ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup}) ) { - Entry, Optional>> res = service.collectGroupMembers(missingGroup, false); - assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false), + Entry, Optional>> res = service.collectGroupMembers(missingGroup, false); + assertTrue(res.getKey().map(s -> s.equals(GroupState.DEAD)).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false), "Expected the state to be 'Dead', with no members in the group '" + missingGroup + "'."); - Entry, Optional>> res2 = service.collectGroupMembers(missingGroup, true); - assertTrue(res2.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res2.getValue().map(Collection::isEmpty).orElse(false), + Entry, Optional>> res2 = service.collectGroupMembers(missingGroup, true); + assertTrue(res2.getKey().map(s -> s.equals(GroupState.DEAD)).orElse(false) && res2.getValue().map(Collection::isEmpty).orElse(false), "Expected the state to be 'Dead', with no members in the group '" + missingGroup + "' (verbose option)."); } } @@ -155,8 +155,8 @@ public class DescribeConsumerGroupTest { // note the group to be queried is a different (non-existing) group ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup}) ) { - GroupState state = service.collectGroupState(missingGroup); - assertTrue(Objects.equals(state.state, ConsumerGroupState.DEAD) && state.numMembers == 0 && + GroupInformation state = service.collectGroupState(missingGroup); + assertTrue(Objects.equals(state.groupState, GroupState.DEAD) && state.numMembers == 0 && state.coordinator != null && clusterInstance.brokerIds().contains(state.coordinator.id()), "Expected the state to be 'Dead', with no members in the group '" + missingGroup + "'." ); @@ -277,13 +277,13 @@ public class DescribeConsumerGroupTest { ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) ) { TestUtils.waitForCondition(() -> { - Entry, Optional>> groupOffsets = service.collectGroupOffsets(group); - Optional state = groupOffsets.getKey(); + Entry, Optional>> groupOffsets = service.collectGroupOffsets(group); + Optional state = groupOffsets.getKey(); Optional> assignments = groupOffsets.getValue(); Predicate isGrp = s -> Objects.equals(s.group, group); - boolean res = state.map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) && + boolean res = state.map(s -> s.equals(GroupState.STABLE)).orElse(false) && assignments.isPresent() && assignments.get().stream().filter(isGrp).count() == 1; @@ -322,7 +322,7 @@ public class DescribeConsumerGroupTest { return consumerGroupDescription.members().size() == 1 && consumerGroupDescription.members().iterator().next().assignment().topicPartitions().size() == 1; }, "Expected a 'Stable' group status, rows and valid member information for group " + group + "."); - Entry, Optional>> res = service.collectGroupMembers(group, true); + Entry, Optional>> res = service.collectGroupMembers(group, true); assertTrue(res.getValue().isPresent()); assertTrue(res.getValue().get().size() == 1 && res.getValue().get().iterator().next().assignment.size() == 1, @@ -344,8 +344,8 @@ public class DescribeConsumerGroupTest { ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) ) { TestUtils.waitForCondition(() -> { - GroupState state = service.collectGroupState(group); - return Objects.equals(state.state, ConsumerGroupState.STABLE) && + GroupInformation state = service.collectGroupState(group); + return Objects.equals(state.groupState, GroupState.STABLE) && state.numMembers == 1 && state.coordinator != null && clusterInstance.brokerIds().contains(state.coordinator.id()); @@ -376,8 +376,8 @@ public class DescribeConsumerGroupTest { try (ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group})) { TestUtils.waitForCondition(() -> { - GroupState state = service.collectGroupState(group); - return Objects.equals(state.state, ConsumerGroupState.STABLE) && + GroupInformation state = service.collectGroupState(group); + return Objects.equals(state.groupState, GroupState.STABLE) && state.numMembers == 1 && Objects.equals(state.assignmentStrategy, expectedName) && state.coordinator != null && @@ -434,8 +434,8 @@ public class DescribeConsumerGroupTest { ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) ) { TestUtils.waitForCondition(() -> { - Entry, Optional>> res = service.collectGroupOffsets(group); - return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) + Entry, Optional>> res = service.collectGroupOffsets(group); + return res.getKey().map(s -> s.equals(GroupState.STABLE)).orElse(false) && res.getValue().map(c -> c.stream().anyMatch(assignment -> Objects.equals(assignment.group, group) && assignment.offset.isPresent())).orElse(false); }, "Expected the group to initially become stable, and to find group in assignments after initial offset commit."); @@ -443,12 +443,12 @@ public class DescribeConsumerGroupTest { protocolConsumerGroupExecutor.close(); TestUtils.waitForCondition(() -> { - Entry, Optional>> offsets = service.collectGroupOffsets(group); - Optional state = offsets.getKey(); + Entry, Optional>> offsets = service.collectGroupOffsets(group); + Optional state = offsets.getKey(); Optional> assignments = offsets.getValue(); List testGroupAssignments = assignments.get().stream().filter(a -> Objects.equals(a.group, group)).collect(Collectors.toList()); PartitionAssignmentState assignment = testGroupAssignments.get(0); - return state.map(s -> s.equals(ConsumerGroupState.EMPTY)).orElse(false) && + return state.map(s -> s.equals(GroupState.EMPTY)).orElse(false) && testGroupAssignments.size() == 1 && assignment.consumerId.map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && // the member should be gone assignment.clientId.map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && @@ -471,8 +471,8 @@ public class DescribeConsumerGroupTest { ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) ) { TestUtils.waitForCondition(() -> { - Entry, Optional>> res = service.collectGroupMembers(group, false); - return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) + Entry, Optional>> res = service.collectGroupMembers(group, false); + return res.getKey().map(s -> s.equals(GroupState.STABLE)).orElse(false) && res.getValue().map(c -> c.stream().anyMatch(m -> Objects.equals(m.group, group))).orElse(false); }, "Expected the group to initially become stable, and to find group in assignments after initial offset commit."); @@ -480,8 +480,8 @@ public class DescribeConsumerGroupTest { protocolConsumerGroupExecutor.close(); TestUtils.waitForCondition(() -> { - Entry, Optional>> res = service.collectGroupMembers(group, false); - return res.getKey().map(s -> s.equals(ConsumerGroupState.EMPTY)).orElse(false) && res.getValue().isPresent() && res.getValue().get().isEmpty(); + Entry, Optional>> res = service.collectGroupMembers(group, false); + return res.getKey().map(s -> s.equals(GroupState.EMPTY)).orElse(false) && res.getValue().isPresent() && res.getValue().get().isEmpty(); }, "Expected no member in describe group members results for group '" + group + "'"); } } @@ -500,8 +500,8 @@ public class DescribeConsumerGroupTest { ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) ) { TestUtils.waitForCondition(() -> { - GroupState state = service.collectGroupState(group); - return Objects.equals(state.state, ConsumerGroupState.STABLE) && + GroupInformation state = service.collectGroupState(group); + return Objects.equals(state.groupState, GroupState.STABLE) && state.numMembers == 1 && state.coordinator != null && clusterInstance.brokerIds().contains(state.coordinator.id()); @@ -511,8 +511,8 @@ public class DescribeConsumerGroupTest { protocolConsumerGroupExecutor.close(); TestUtils.waitForCondition(() -> { - GroupState state = service.collectGroupState(group); - return Objects.equals(state.state, ConsumerGroupState.EMPTY) && state.numMembers == 0; + GroupInformation state = service.collectGroupState(group); + return Objects.equals(state.groupState, GroupState.EMPTY) && state.numMembers == 0; }, "Expected the group to become empty after the only member leaving."); } } @@ -556,8 +556,8 @@ public class DescribeConsumerGroupTest { ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) ) { TestUtils.waitForCondition(() -> { - Entry, Optional>> res = service.collectGroupOffsets(group); - return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).isPresent() && + Entry, Optional>> res = service.collectGroupOffsets(group); + return res.getKey().map(s -> s.equals(GroupState.STABLE)).isPresent() && res.getValue().isPresent() && res.getValue().get().stream().filter(s -> Objects.equals(s.group, group)).count() == 1 && res.getValue().get().stream().filter(x -> Objects.equals(x.group, group) && x.partition.isPresent()).count() == 1; @@ -579,8 +579,8 @@ public class DescribeConsumerGroupTest { ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) ) { TestUtils.waitForCondition(() -> { - Entry, Optional>> res = service.collectGroupMembers(group, false); - return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) && + Entry, Optional>> res = service.collectGroupMembers(group, false); + return res.getKey().map(s -> s.equals(GroupState.STABLE)).orElse(false) && res.getValue().isPresent() && res.getValue().get().stream().filter(s -> Objects.equals(s.group, group)).count() == 2 && res.getValue().get().stream().filter(x -> Objects.equals(x.group, group) && x.numPartitions == 1).count() == 1 && @@ -588,8 +588,8 @@ public class DescribeConsumerGroupTest { res.getValue().get().stream().allMatch(s -> s.assignment.isEmpty()); }, "Expected rows for consumers with no assigned partitions in describe group results"); - Entry, Optional>> res = service.collectGroupMembers(group, true); - assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) + Entry, Optional>> res = service.collectGroupMembers(group, true); + assertTrue(res.getKey().map(s -> s.equals(GroupState.STABLE)).orElse(false) && res.getValue().map(c -> c.stream().anyMatch(s -> !s.assignment.isEmpty())).orElse(false), "Expected additional columns in verbose version of describe members"); } @@ -609,8 +609,8 @@ public class DescribeConsumerGroupTest { ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) ) { TestUtils.waitForCondition(() -> { - GroupState state = service.collectGroupState(group); - return Objects.equals(state.state, ConsumerGroupState.STABLE) && state.numMembers == 2; + GroupInformation state = service.collectGroupState(group); + return Objects.equals(state.groupState, GroupState.STABLE) && state.numMembers == 2; }, "Expected two consumers in describe group results"); } } @@ -654,8 +654,8 @@ public class DescribeConsumerGroupTest { ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) ) { TestUtils.waitForCondition(() -> { - Entry, Optional>> res = service.collectGroupOffsets(group); - return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) && + Entry, Optional>> res = service.collectGroupOffsets(group); + return res.getKey().map(s -> s.equals(GroupState.STABLE)).orElse(false) && res.getValue().isPresent() && res.getValue().get().stream().filter(s -> Objects.equals(s.group, group)).count() == 2 && res.getValue().get().stream().filter(x -> Objects.equals(x.group, group) && x.partition.isPresent()).count() == 2 && @@ -678,16 +678,16 @@ public class DescribeConsumerGroupTest { ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) ) { TestUtils.waitForCondition(() -> { - Entry, Optional>> res = service.collectGroupMembers(group, false); - return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) && + Entry, Optional>> res = service.collectGroupMembers(group, false); + return res.getKey().map(s -> s.equals(GroupState.STABLE)).orElse(false) && res.getValue().isPresent() && res.getValue().get().stream().filter(s -> Objects.equals(s.group, group)).count() == 2 && res.getValue().get().stream().filter(x -> Objects.equals(x.group, group) && x.numPartitions == 1).count() == 2 && res.getValue().get().stream().noneMatch(x -> Objects.equals(x.group, group) && x.numPartitions == 0); }, "Expected two rows (one row per consumer) in describe group members results."); - Entry, Optional>> res = service.collectGroupMembers(group, true); - assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) && res.getValue().map(s -> s.stream().filter(x -> x.assignment.isEmpty()).count()).orElse(0L) == 0, + Entry, Optional>> res = service.collectGroupMembers(group, true); + assertTrue(res.getKey().map(s -> s.equals(GroupState.STABLE)).orElse(false) && res.getValue().map(s -> s.stream().filter(x -> x.assignment.isEmpty()).count()).orElse(0L) == 0, "Expected additional columns in verbose version of describe members"); } } @@ -706,8 +706,8 @@ public class DescribeConsumerGroupTest { ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) ) { TestUtils.waitForCondition(() -> { - GroupState state = service.collectGroupState(group); - return Objects.equals(state.state, ConsumerGroupState.STABLE) && Objects.equals(state.group, group) && state.numMembers == 2; + GroupInformation state = service.collectGroupState(group); + return Objects.equals(state.groupState, GroupState.STABLE) && Objects.equals(state.group, group) && state.numMembers == 2; }, "Expected a stable group with two members in describe group state result."); } } @@ -726,8 +726,8 @@ public class DescribeConsumerGroupTest { ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) ) { TestUtils.waitForCondition(() -> { - Entry, Optional>> res = service.collectGroupOffsets(group); - return res.getKey().map(s -> s.equals(ConsumerGroupState.EMPTY)).orElse(false) + Entry, Optional>> res = service.collectGroupOffsets(group); + return res.getKey().map(s -> s.equals(GroupState.EMPTY)).orElse(false) && res.getValue().isPresent() && res.getValue().get().stream().filter(s -> Objects.equals(s.group, group)).count() == 2; }, "Expected a stable group with two members in describe group state result."); } @@ -841,11 +841,11 @@ public class DescribeConsumerGroupTest { ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group}) ) { TestUtils.waitForCondition(() -> { - Entry, Optional>> groupOffsets = service.collectGroupOffsets(group); + Entry, Optional>> groupOffsets = service.collectGroupOffsets(group); Predicate isGrp = s -> Objects.equals(s.group, group); - boolean res = groupOffsets.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) && + boolean res = groupOffsets.getKey().map(s -> s.equals(GroupState.STABLE)).orElse(false) && groupOffsets.getValue().isPresent() && groupOffsets.getValue().get().stream().filter(isGrp).count() == 1; diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java index 4c2df4a5def..6222e28654d 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java @@ -24,7 +24,7 @@ import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.GroupProtocol; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; -import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.GroupState; import org.apache.kafka.common.GroupType; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; @@ -98,7 +98,7 @@ public class ListConsumerGroupTest { try (AutoCloseable topicPartitionsConsumerGroupExecutor = consumerGroupClosable(topicPartitionsGroup, Collections.singleton(new TopicPartition(topic, 0))); AutoCloseable topicConsumerGroupExecutor = consumerGroupClosable(GroupProtocol.CLASSIC, topicGroup, topic); AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, protocolGroup, topic); - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--list"}); + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--list"}) ) { Set expectedGroups = set(Arrays.asList(topicPartitionsGroup, topicGroup, protocolGroup)); final AtomicReference foundGroups = new AtomicReference<>(); @@ -131,50 +131,50 @@ public class ListConsumerGroupTest { try (AutoCloseable topicPartitionsConsumerGroupExecutor = consumerGroupClosable(topicPartitionsGroup, Collections.singleton(new TopicPartition(topic, 0))); AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, protocolGroup, topic); - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--state"}); + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--state"}) ) { Set expectedListing = Set.of( new ConsumerGroupListing( topicPartitionsGroup, - true, - Optional.of(ConsumerGroupState.EMPTY), - Optional.of(GroupType.CLASSIC) + Optional.of(GroupState.EMPTY), + Optional.of(GroupType.CLASSIC), + true ), new ConsumerGroupListing( protocolGroup, - false, - Optional.of(ConsumerGroupState.STABLE), - Optional.of(GroupType.parse(groupProtocol.name())) + Optional.of(GroupState.STABLE), + Optional.of(GroupType.parse(groupProtocol.name())), + false ) ); assertGroupListing( service, Collections.emptySet(), - EnumSet.allOf(ConsumerGroupState.class), + EnumSet.allOf(GroupState.class), expectedListing ); expectedListing = Set.of( new ConsumerGroupListing( protocolGroup, - false, - Optional.of(ConsumerGroupState.STABLE), - Optional.of(GroupType.parse(groupProtocol.name())) + Optional.of(GroupState.STABLE), + Optional.of(GroupType.parse(groupProtocol.name())), + false ) ); assertGroupListing( service, Collections.emptySet(), - Set.of(ConsumerGroupState.STABLE), + Set.of(GroupState.STABLE), expectedListing ); assertGroupListing( service, Collections.emptySet(), - Set.of(ConsumerGroupState.PREPARING_REBALANCE), + Set.of(GroupState.PREPARING_REBALANCE), Collections.emptySet() ); } @@ -194,20 +194,20 @@ public class ListConsumerGroupTest { try (AutoCloseable topicPartitionsConsumerGroupExecutor = consumerGroupClosable(topicPartitionsGroup, Collections.singleton(new TopicPartition(topic, 0))); AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, protocolGroup, topic); - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--state"}); + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--state"}) ) { Set expectedListing = Set.of( new ConsumerGroupListing( topicPartitionsGroup, - true, - Optional.of(ConsumerGroupState.EMPTY), - Optional.of(GroupType.CLASSIC) + Optional.of(GroupState.EMPTY), + Optional.of(GroupType.CLASSIC), + true ), new ConsumerGroupListing( protocolGroup, - false, - Optional.of(ConsumerGroupState.STABLE), - Optional.of(GroupType.CLASSIC) + Optional.of(GroupState.STABLE), + Optional.of(GroupType.CLASSIC), + false ) ); @@ -250,7 +250,7 @@ public class ListConsumerGroupTest { try (AutoCloseable topicPartitionsConsumerGroupExecutor = consumerGroupClosable(topicPartitionsGroup, Collections.singleton(new TopicPartition(topic, 0))); AutoCloseable topicConsumerGroupExecutor = consumerGroupClosable(GroupProtocol.CLASSIC, topicGroup, topic); AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, protocolGroup, topic); - ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--list"}); + ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--list"}) ) { @@ -258,21 +258,21 @@ public class ListConsumerGroupTest { Set expectedListing = Set.of( new ConsumerGroupListing( topicPartitionsGroup, - true, - Optional.of(ConsumerGroupState.EMPTY), - Optional.of(GroupType.CLASSIC) + Optional.of(GroupState.EMPTY), + Optional.of(GroupType.CLASSIC), + true ), new ConsumerGroupListing( topicGroup, - false, - Optional.of(ConsumerGroupState.STABLE), - Optional.of(GroupType.CLASSIC) + Optional.of(GroupState.STABLE), + Optional.of(GroupType.CLASSIC), + false ), new ConsumerGroupListing( protocolGroup, - false, - Optional.of(ConsumerGroupState.STABLE), - Optional.of(GroupType.CONSUMER) + Optional.of(GroupState.STABLE), + Optional.of(GroupType.CONSUMER), + false ) ); @@ -288,9 +288,9 @@ public class ListConsumerGroupTest { expectedListing = Set.of( new ConsumerGroupListing( protocolGroup, - false, - Optional.of(ConsumerGroupState.STABLE), - Optional.of(GroupType.CONSUMER) + Optional.of(GroupState.STABLE), + Optional.of(GroupType.CONSUMER), + false ) ); @@ -304,15 +304,15 @@ public class ListConsumerGroupTest { expectedListing = Set.of( new ConsumerGroupListing( topicPartitionsGroup, - true, - Optional.of(ConsumerGroupState.EMPTY), - Optional.of(GroupType.CLASSIC) + Optional.of(GroupState.EMPTY), + Optional.of(GroupType.CLASSIC), + true ), new ConsumerGroupListing( topicGroup, - false, - Optional.of(ConsumerGroupState.STABLE), - Optional.of(GroupType.CLASSIC) + Optional.of(GroupState.STABLE), + Optional.of(GroupType.CLASSIC), + false ) ); @@ -548,20 +548,20 @@ public class ListConsumerGroupTest { /** * Validates the consumer group listings returned against expected values using specified filters. * - * @param service The service to list consumer groups. - * @param typeFilterSet Filters for group types, empty for no filter. - * @param stateFilterSet Filters for group states, empty for no filter. - * @param expectedListing Expected consumer group listings. + * @param service The service to list consumer groups. + * @param typeFilterSet Filters for group types, empty for no filter. + * @param groupStateFilterSet Filters for group states, empty for no filter. + * @param expectedListing Expected consumer group listings. */ private static void assertGroupListing( ConsumerGroupCommand.ConsumerGroupService service, Set typeFilterSet, - Set stateFilterSet, + Set groupStateFilterSet, Set expectedListing ) throws Exception { final AtomicReference> foundListing = new AtomicReference<>(); TestUtils.waitForCondition(() -> { - foundListing.set(set(service.listConsumerGroupsWithFilters(set(typeFilterSet), set(stateFilterSet)))); + foundListing.set(set(service.listConsumerGroupsWithFilters(set(typeFilterSet), set(groupStateFilterSet)))); return Objects.equals(set(expectedListing), foundListing.get()); }, () -> "Expected to show groups " + expectedListing + ", but found " + foundListing.get() + "."); } @@ -614,29 +614,29 @@ public class ListConsumerGroupTest { class ListConsumerGroupUnitTest { @Test public void testConsumerGroupStatesFromString() { - Set result = ConsumerGroupCommand.consumerGroupStatesFromString("Stable"); - Assertions.assertEquals(ListConsumerGroupTest.set(Collections.singleton(ConsumerGroupState.STABLE)), result); + Set result = ConsumerGroupCommand.groupStatesFromString("Stable"); + Assertions.assertEquals(ListConsumerGroupTest.set(Collections.singleton(GroupState.STABLE)), result); - result = ConsumerGroupCommand.consumerGroupStatesFromString("Stable, PreparingRebalance"); - Assertions.assertEquals(ListConsumerGroupTest.set(Arrays.asList(ConsumerGroupState.STABLE, ConsumerGroupState.PREPARING_REBALANCE)), result); + result = ConsumerGroupCommand.groupStatesFromString("Stable, PreparingRebalance"); + Assertions.assertEquals(ListConsumerGroupTest.set(Arrays.asList(GroupState.STABLE, GroupState.PREPARING_REBALANCE)), result); - result = ConsumerGroupCommand.consumerGroupStatesFromString("Dead,CompletingRebalance,"); - Assertions.assertEquals(ListConsumerGroupTest.set(Arrays.asList(ConsumerGroupState.DEAD, ConsumerGroupState.COMPLETING_REBALANCE)), result); + result = ConsumerGroupCommand.groupStatesFromString("Dead,CompletingRebalance,"); + Assertions.assertEquals(ListConsumerGroupTest.set(Arrays.asList(GroupState.DEAD, GroupState.COMPLETING_REBALANCE)), result); - result = ConsumerGroupCommand.consumerGroupStatesFromString("stable"); - Assertions.assertEquals(ListConsumerGroupTest.set(Collections.singletonList(ConsumerGroupState.STABLE)), result); + result = ConsumerGroupCommand.groupStatesFromString("stable"); + Assertions.assertEquals(ListConsumerGroupTest.set(Collections.singletonList(GroupState.STABLE)), result); - result = ConsumerGroupCommand.consumerGroupStatesFromString("stable, assigning"); - Assertions.assertEquals(ListConsumerGroupTest.set(Arrays.asList(ConsumerGroupState.STABLE, ConsumerGroupState.ASSIGNING)), result); + result = ConsumerGroupCommand.groupStatesFromString("stable, assigning"); + Assertions.assertEquals(ListConsumerGroupTest.set(Arrays.asList(GroupState.STABLE, GroupState.ASSIGNING)), result); - result = ConsumerGroupCommand.consumerGroupStatesFromString("dead,reconciling,"); - Assertions.assertEquals(ListConsumerGroupTest.set(Arrays.asList(ConsumerGroupState.DEAD, ConsumerGroupState.RECONCILING)), result); + result = ConsumerGroupCommand.groupStatesFromString("dead,reconciling,"); + Assertions.assertEquals(ListConsumerGroupTest.set(Arrays.asList(GroupState.DEAD, GroupState.RECONCILING)), result); - Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumerGroupCommand.consumerGroupStatesFromString("bad, wrong")); + Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumerGroupCommand.groupStatesFromString("bad, wrong")); - Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumerGroupCommand.consumerGroupStatesFromString(" bad, Stable")); + Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumerGroupCommand.groupStatesFromString(" bad, Stable")); - Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumerGroupCommand.consumerGroupStatesFromString(" , ,")); + Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumerGroupCommand.groupStatesFromString(" , ,")); } @Test diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java index d5c21eba3fa..abfc25cfb0a 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java @@ -25,7 +25,7 @@ import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.ConsumerGroupState; +import org.apache.kafka.common.GroupState; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -834,10 +834,10 @@ public class ResetConsumerGroupOffsetTest { private void awaitConsumerGroupInactive(ConsumerGroupCommand.ConsumerGroupService service, String group) throws Exception { TestUtils.waitForCondition(() -> { - ConsumerGroupState state = service.collectGroupState(group).state; - return Objects.equals(state, ConsumerGroupState.EMPTY) || Objects.equals(state, ConsumerGroupState.DEAD); + GroupState state = service.collectGroupState(group).groupState; + return Objects.equals(state, GroupState.EMPTY) || Objects.equals(state, GroupState.DEAD); }, "Expected that consumer group is inactive. Actual state: " + - service.collectGroupState(group).state); + service.collectGroupState(group).groupState); } private void resetAndAssertOffsetsCommitted(ClusterInstance cluster, diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java index b45902f86e3..1e293368572 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java @@ -18,18 +18,19 @@ package org.apache.kafka.tools.consumer.group; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.DescribeShareGroupsResult; +import org.apache.kafka.clients.admin.GroupListing; import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.apache.kafka.clients.admin.ListGroupsOptions; +import org.apache.kafka.clients.admin.ListGroupsResult; import org.apache.kafka.clients.admin.ListOffsetsResult; -import org.apache.kafka.clients.admin.ListShareGroupsOptions; -import org.apache.kafka.clients.admin.ListShareGroupsResult; import org.apache.kafka.clients.admin.MemberAssignment; import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.admin.ShareGroupDescription; -import org.apache.kafka.clients.admin.ShareGroupListing; +import org.apache.kafka.common.GroupState; +import org.apache.kafka.common.GroupType; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; -import org.apache.kafka.common.ShareGroupState; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.consumer.group.ShareGroupCommand.ShareGroupService; @@ -66,12 +67,12 @@ public class ShareGroupCommandTest { String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--list"}; Admin adminClient = mock(KafkaAdminClient.class); - ListShareGroupsResult result = mock(ListShareGroupsResult.class); + ListGroupsResult result = mock(ListGroupsResult.class); when(result.all()).thenReturn(KafkaFuture.completedFuture(Arrays.asList( - new ShareGroupListing(firstGroup, Optional.of(ShareGroupState.STABLE)), - new ShareGroupListing(secondGroup, Optional.of(ShareGroupState.EMPTY)) + new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)), + new GroupListing(secondGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.EMPTY)) ))); - when(adminClient.listShareGroups(any(ListShareGroupsOptions.class))).thenReturn(result); + when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(result); ShareGroupService service = getShareGroupService(cgcArgs, adminClient); Set expectedGroups = new HashSet<>(Arrays.asList(firstGroup, secondGroup)); @@ -94,7 +95,7 @@ public class ShareGroupCommandTest { Collections.singletonList(new MemberDescription("memid1", "clId1", "host1", new MemberAssignment( Collections.singleton(new TopicPartition("topic1", 0)) ))), - ShareGroupState.STABLE, + GroupState.STABLE, new Node(0, "host1", 9090)); resultMap.put(firstGroup, exp); @@ -133,11 +134,11 @@ public class ShareGroupCommandTest { @Test public void testPrintEmptyGroupState() { - assertFalse(ShareGroupService.maybePrintEmptyGroupState("group", ShareGroupState.EMPTY, 0)); - assertFalse(ShareGroupService.maybePrintEmptyGroupState("group", ShareGroupState.DEAD, 0)); - assertFalse(ShareGroupService.maybePrintEmptyGroupState("group", ShareGroupState.STABLE, 0)); - assertTrue(ShareGroupService.maybePrintEmptyGroupState("group", ShareGroupState.STABLE, 1)); - assertTrue(ShareGroupService.maybePrintEmptyGroupState("group", ShareGroupState.UNKNOWN, 1)); + assertFalse(ShareGroupService.maybePrintEmptyGroupState("group", GroupState.EMPTY, 0)); + assertFalse(ShareGroupService.maybePrintEmptyGroupState("group", GroupState.DEAD, 0)); + assertFalse(ShareGroupService.maybePrintEmptyGroupState("group", GroupState.STABLE, 0)); + assertTrue(ShareGroupService.maybePrintEmptyGroupState("group", GroupState.STABLE, 1)); + assertTrue(ShareGroupService.maybePrintEmptyGroupState("group", GroupState.UNKNOWN, 1)); } @Test @@ -155,59 +156,61 @@ public class ShareGroupCommandTest { String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--list", "--state"}; Admin adminClient = mock(KafkaAdminClient.class); - ListShareGroupsResult resultWithAllStates = mock(ListShareGroupsResult.class); + ListGroupsResult resultWithAllStates = mock(ListGroupsResult.class); when(resultWithAllStates.all()).thenReturn(KafkaFuture.completedFuture(Arrays.asList( - new ShareGroupListing(firstGroup, Optional.of(ShareGroupState.STABLE)), - new ShareGroupListing(secondGroup, Optional.of(ShareGroupState.EMPTY)) + new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)), + new GroupListing(secondGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.EMPTY)) ))); - when(adminClient.listShareGroups(any(ListShareGroupsOptions.class))).thenReturn(resultWithAllStates); + when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithAllStates); ShareGroupService service = getShareGroupService(cgcArgs, adminClient); - Set expectedListing = new HashSet<>(Arrays.asList( - new ShareGroupListing(firstGroup, Optional.of(ShareGroupState.STABLE)), - new ShareGroupListing(secondGroup, Optional.of(ShareGroupState.EMPTY)))); + Set expectedListing = new HashSet<>(Arrays.asList( + new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)), + new GroupListing(secondGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.EMPTY)))); final Set[] foundListing = new Set[]{Collections.emptySet()}; TestUtils.waitForCondition(() -> { - foundListing[0] = new HashSet<>(service.listShareGroupsWithState(new HashSet<>(Arrays.asList(ShareGroupState.values())))); + foundListing[0] = new HashSet<>(service.listShareGroupsInStates(new HashSet<>(Arrays.asList(GroupState.values())))); return Objects.equals(expectedListing, foundListing[0]); }, "Expected to show groups " + expectedListing + ", but found " + foundListing[0]); - ListShareGroupsResult resultWithStableState = mock(ListShareGroupsResult.class); + ListGroupsResult resultWithStableState = mock(ListGroupsResult.class); when(resultWithStableState.all()).thenReturn(KafkaFuture.completedFuture(Collections.singletonList( - new ShareGroupListing(firstGroup, Optional.of(ShareGroupState.STABLE)) + new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)) ))); - when(adminClient.listShareGroups(any(ListShareGroupsOptions.class))).thenReturn(resultWithStableState); - Set expectedListingStable = Collections.singleton( - new ShareGroupListing(firstGroup, Optional.of(ShareGroupState.STABLE))); + when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithStableState); + Set expectedListingStable = Collections.singleton( + new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE))); foundListing[0] = Collections.emptySet(); TestUtils.waitForCondition(() -> { - foundListing[0] = new HashSet<>(service.listShareGroupsWithState(Collections.singleton(ShareGroupState.STABLE))); + foundListing[0] = new HashSet<>(service.listShareGroupsInStates(Collections.singleton(GroupState.STABLE))); return Objects.equals(expectedListingStable, foundListing[0]); }, "Expected to show groups " + expectedListingStable + ", but found " + foundListing[0]); service.close(); } @Test - public void testShareGroupStatesFromString() { - Set result = ShareGroupCommand.shareGroupStatesFromString("Stable"); - assertEquals(Collections.singleton(ShareGroupState.STABLE), result); + public void testGroupStatesFromString() { + Set result = ShareGroupCommand.groupStatesFromString("Stable"); + assertEquals(Collections.singleton(GroupState.STABLE), result); - result = ShareGroupCommand.shareGroupStatesFromString("stable"); - assertEquals(new HashSet<>(Collections.singletonList(ShareGroupState.STABLE)), result); + result = ShareGroupCommand.groupStatesFromString("stable"); + assertEquals(new HashSet<>(Collections.singletonList(GroupState.STABLE)), result); - result = ShareGroupCommand.shareGroupStatesFromString("dead"); - assertEquals(new HashSet<>(Collections.singletonList(ShareGroupState.DEAD)), result); + result = ShareGroupCommand.groupStatesFromString("dead"); + assertEquals(new HashSet<>(Collections.singletonList(GroupState.DEAD)), result); - result = ShareGroupCommand.shareGroupStatesFromString("empty"); - assertEquals(new HashSet<>(Collections.singletonList(ShareGroupState.EMPTY)), result); + result = ShareGroupCommand.groupStatesFromString("empty"); + assertEquals(new HashSet<>(Collections.singletonList(GroupState.EMPTY)), result); - assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.shareGroupStatesFromString("bad, wrong")); + assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.groupStatesFromString("assigning")); - assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.shareGroupStatesFromString(" bad, Stable")); + assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.groupStatesFromString("bad, wrong")); - assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.shareGroupStatesFromString(" , ,")); + assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.groupStatesFromString(" bad, Stable")); + + assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.groupStatesFromString(" , ,")); } ShareGroupService getShareGroupService(String[] args, Admin adminClient) {