KAFKA-17949: Introduce GroupState and replace ShareGroupState (#17763)

This PR introduces the unified GroupState enum for all group types from KIP-1043. This PR also removes ShareGroupState and begins the work to replace Admin.listShareGroups with Admin.listGroups. That will complete in a future PR.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
Andrew Schofield 2024-11-19 15:47:12 +00:00 committed by GitHub
parent a211ee99b5
commit 32c887b05e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 993 additions and 469 deletions

View File

@ -18,6 +18,7 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.acl.AclOperation;
@ -38,10 +39,14 @@ public class ConsumerGroupDescription {
private final Collection<MemberDescription> members;
private final String partitionAssignor;
private final GroupType type;
private final ConsumerGroupState state;
private final GroupState groupState;
private final Node coordinator;
private final Set<AclOperation> authorizedOperations;
/**
* @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupState, Node)}.
*/
@Deprecated
public ConsumerGroupDescription(String groupId,
boolean isSimpleConsumerGroup,
Collection<MemberDescription> members,
@ -51,6 +56,10 @@ public class ConsumerGroupDescription {
this(groupId, isSimpleConsumerGroup, members, partitionAssignor, state, coordinator, Collections.emptySet());
}
/**
* @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupState, Node, Set)}.
*/
@Deprecated
public ConsumerGroupDescription(String groupId,
boolean isSimpleConsumerGroup,
Collection<MemberDescription> members,
@ -61,6 +70,10 @@ public class ConsumerGroupDescription {
this(groupId, isSimpleConsumerGroup, members, partitionAssignor, GroupType.CLASSIC, state, coordinator, authorizedOperations);
}
/**
* @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, boolean, Collection, String, GroupType, GroupState, Node, Set)}.
*/
@Deprecated
public ConsumerGroupDescription(String groupId,
boolean isSimpleConsumerGroup,
Collection<MemberDescription> members,
@ -75,7 +88,45 @@ public class ConsumerGroupDescription {
Collections.unmodifiableList(new ArrayList<>(members));
this.partitionAssignor = partitionAssignor == null ? "" : partitionAssignor;
this.type = type;
this.state = state;
this.groupState = GroupState.parse(state.name());
this.coordinator = coordinator;
this.authorizedOperations = authorizedOperations;
}
public ConsumerGroupDescription(String groupId,
boolean isSimpleConsumerGroup,
Collection<MemberDescription> members,
String partitionAssignor,
GroupState groupState,
Node coordinator) {
this(groupId, isSimpleConsumerGroup, members, partitionAssignor, groupState, coordinator, Collections.emptySet());
}
public ConsumerGroupDescription(String groupId,
boolean isSimpleConsumerGroup,
Collection<MemberDescription> members,
String partitionAssignor,
GroupState groupState,
Node coordinator,
Set<AclOperation> authorizedOperations) {
this(groupId, isSimpleConsumerGroup, members, partitionAssignor, GroupType.CLASSIC, groupState, coordinator, authorizedOperations);
}
public ConsumerGroupDescription(String groupId,
boolean isSimpleConsumerGroup,
Collection<MemberDescription> members,
String partitionAssignor,
GroupType type,
GroupState groupState,
Node coordinator,
Set<AclOperation> authorizedOperations) {
this.groupId = groupId == null ? "" : groupId;
this.isSimpleConsumerGroup = isSimpleConsumerGroup;
this.members = members == null ? Collections.emptyList() :
Collections.unmodifiableList(new ArrayList<>(members));
this.partitionAssignor = partitionAssignor == null ? "" : partitionAssignor;
this.type = type;
this.groupState = groupState;
this.coordinator = coordinator;
this.authorizedOperations = authorizedOperations;
}
@ -90,14 +141,14 @@ public class ConsumerGroupDescription {
Objects.equals(members, that.members) &&
Objects.equals(partitionAssignor, that.partitionAssignor) &&
type == that.type &&
state == that.state &&
groupState == that.groupState &&
Objects.equals(coordinator, that.coordinator) &&
Objects.equals(authorizedOperations, that.authorizedOperations);
}
@Override
public int hashCode() {
return Objects.hash(groupId, isSimpleConsumerGroup, members, partitionAssignor, type, state, coordinator, authorizedOperations);
return Objects.hash(groupId, isSimpleConsumerGroup, members, partitionAssignor, type, groupState, coordinator, authorizedOperations);
}
/**
@ -138,9 +189,18 @@ public class ConsumerGroupDescription {
/**
* The consumer group state, or UNKNOWN if the state is too new for us to parse.
* @deprecated Since 4.0. Use {@link #groupState()} instead.
*/
@Deprecated
public ConsumerGroupState state() {
return state;
return ConsumerGroupState.parse(groupState.name());
}
/**
* The group state, or UNKNOWN if the state is too new for us to parse.
*/
public GroupState groupState() {
return groupState;
}
/**
@ -164,7 +224,7 @@ public class ConsumerGroupDescription {
", members=" + members.stream().map(MemberDescription::toString).collect(Collectors.joining(",")) +
", partitionAssignor=" + partitionAssignor +
", type=" + type +
", state=" + state +
", groupState=" + groupState +
", coordinator=" + coordinator +
", authorizedOperations=" + authorizedOperations +
")";

View File

@ -18,6 +18,7 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import java.util.Objects;
@ -29,28 +30,30 @@ import java.util.Optional;
public class ConsumerGroupListing {
private final String groupId;
private final boolean isSimpleConsumerGroup;
private final Optional<ConsumerGroupState> state;
private final Optional<GroupState> groupState;
private final Optional<GroupType> type;
/**
* Create an instance with the specified parameters.
*
* @param groupId Group Id
* @param groupId Group Id.
* @param isSimpleConsumerGroup If consumer group is simple or not.
*/
public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup) {
this(groupId, isSimpleConsumerGroup, Optional.empty(), Optional.empty());
this(groupId, Optional.empty(), Optional.empty(), isSimpleConsumerGroup);
}
/**
* Create an instance with the specified parameters.
*
* @param groupId Group Id
* @param groupId Group Id.
* @param isSimpleConsumerGroup If consumer group is simple or not.
* @param state The state of the consumer group
* @param state The state of the consumer group.
* @deprecated Since 4.0. Use {@link #ConsumerGroupListing(String, Optional, boolean)}.
*/
@Deprecated
public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup, Optional<ConsumerGroupState> state) {
this(groupId, isSimpleConsumerGroup, state, Optional.empty());
this(groupId, Objects.requireNonNull(state).map(state0 -> GroupState.parse(state0.name())), Optional.empty(), isSimpleConsumerGroup);
}
/**
@ -60,17 +63,51 @@ public class ConsumerGroupListing {
* @param isSimpleConsumerGroup If consumer group is simple or not.
* @param state The state of the consumer group.
* @param type The type of the consumer group.
* @deprecated Since 4.0. Use {@link #ConsumerGroupListing(String, Optional, Optional, boolean)}.
*/
@Deprecated
public ConsumerGroupListing(
String groupId,
boolean isSimpleConsumerGroup,
Optional<ConsumerGroupState> state,
Optional<GroupType> type
) {
this(groupId, Objects.requireNonNull(state).map(state0 -> GroupState.parse(state0.name())), type, isSimpleConsumerGroup);
}
/**
* Create an instance with the specified parameters.
*
* @param groupId Group Id.
* @param groupState The state of the consumer group.
* @param isSimpleConsumerGroup If consumer group is simple or not.
*/
public ConsumerGroupListing(
String groupId,
Optional<GroupState> groupState,
boolean isSimpleConsumerGroup
) {
this(groupId, groupState, Optional.empty(), isSimpleConsumerGroup);
}
/**
* Create an instance with the specified parameters.
*
* @param groupId Group Id.
* @param groupState The state of the consumer group.
* @param type The type of the consumer group.
* @param isSimpleConsumerGroup If consumer group is simple or not.
*/
public ConsumerGroupListing(
String groupId,
Optional<GroupState> groupState,
Optional<GroupType> type,
boolean isSimpleConsumerGroup
) {
this.groupId = groupId;
this.isSimpleConsumerGroup = isSimpleConsumerGroup;
this.state = Objects.requireNonNull(state);
this.groupState = Objects.requireNonNull(groupState);
this.type = Objects.requireNonNull(type);
this.isSimpleConsumerGroup = isSimpleConsumerGroup;
}
/**
@ -88,10 +125,19 @@ public class ConsumerGroupListing {
}
/**
* Consumer Group state
* Group state
*/
public Optional<GroupState> groupState() {
return groupState;
}
/**
* Consumer Group state
* @deprecated Since 4.0. Use {@link #groupState()}.
*/
@Deprecated
public Optional<ConsumerGroupState> state() {
return state;
return groupState.map(state0 -> ConsumerGroupState.parse(state0.name()));
}
/**
@ -108,14 +154,14 @@ public class ConsumerGroupListing {
return "(" +
"groupId='" + groupId + '\'' +
", isSimpleConsumerGroup=" + isSimpleConsumerGroup +
", state=" + state +
", groupState=" + groupState +
", type=" + type +
')';
}
@Override
public int hashCode() {
return Objects.hash(groupId, isSimpleConsumerGroup(), state, type);
return Objects.hash(groupId, isSimpleConsumerGroup(), groupState, type);
}
@Override
@ -125,7 +171,7 @@ public class ConsumerGroupListing {
ConsumerGroupListing that = (ConsumerGroupListing) o;
return isSimpleConsumerGroup() == that.isSimpleConsumerGroup() &&
Objects.equals(groupId, that.groupId) &&
Objects.equals(state, that.state) &&
Objects.equals(groupState, that.groupState) &&
Objects.equals(type, that.type);
}
}

View File

@ -17,7 +17,9 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Objects;
import java.util.Optional;
@ -25,10 +27,12 @@ import java.util.Optional;
/**
* A listing of a group in the cluster.
*/
@InterfaceStability.Evolving
public class GroupListing {
private final String groupId;
private final Optional<GroupType> type;
private final String protocol;
private final Optional<GroupState> groupState;
/**
* Create an instance with the specified parameters.
@ -36,11 +40,13 @@ public class GroupListing {
* @param groupId Group Id
* @param type Group type
* @param protocol Protocol
* @param groupState Group state
*/
public GroupListing(String groupId, Optional<GroupType> type, String protocol) {
public GroupListing(String groupId, Optional<GroupType> type, String protocol, Optional<GroupState> groupState) {
this.groupId = groupId;
this.type = Objects.requireNonNull(type);
this.protocol = protocol;
this.groupState = groupState;
}
/**
@ -75,6 +81,19 @@ public class GroupListing {
return protocol;
}
/**
* The group state.
* <p>
* If the broker returns a group state which is not recognised, as might
* happen when talking to a broker with a later version, the state will be
* <code>Optional.of(GroupState.UNKNOWN)</code>.
*
* @return An Optional containing the state, if available.
*/
public Optional<GroupState> groupState() {
return groupState;
}
/**
* If the group is a simple consumer group or not.
*/
@ -88,12 +107,13 @@ public class GroupListing {
"groupId='" + groupId + '\'' +
", type=" + type.map(GroupType::toString).orElse("none") +
", protocol='" + protocol + '\'' +
", groupState=" + groupState.map(GroupState::toString).orElse("none") +
')';
}
@Override
public int hashCode() {
return Objects.hash(groupId, type, protocol);
return Objects.hash(groupId, type, protocol, groupState);
}
@Override
@ -103,6 +123,7 @@ public class GroupListing {
GroupListing that = (GroupListing) o;
return Objects.equals(groupId, that.groupId) &&
Objects.equals(type, that.type) &&
Objects.equals(protocol, that.protocol);
Objects.equals(protocol, that.protocol) &&
Objects.equals(groupState, that.groupState);
}
}

View File

@ -63,8 +63,8 @@ import org.apache.kafka.clients.admin.internals.RemoveMembersFromConsumerGroupHa
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
@ -72,7 +72,6 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.ShareGroupState;
import org.apache.kafka.common.TopicCollection;
import org.apache.kafka.common.TopicCollection.TopicIdCollection;
import org.apache.kafka.common.TopicCollection.TopicNameCollection;
@ -3596,8 +3595,13 @@ public class KafkaAdminClient extends AdminClient {
.stream()
.map(GroupType::toString)
.collect(Collectors.toList());
List<String> groupStates = options.groupStates()
.stream()
.map(GroupState::toString)
.collect(Collectors.toList());
return new ListGroupsRequest.Builder(new ListGroupsRequestData()
.setTypesFilter(groupTypes)
.setStatesFilter(groupStates)
);
}
@ -3610,10 +3614,17 @@ public class KafkaAdminClient extends AdminClient {
type = Optional.of(GroupType.parse(group.groupType()));
}
final String protocolType = group.protocolType();
final Optional<GroupState> groupState;
if (group.groupState() == null || group.groupState().isEmpty()) {
groupState = Optional.empty();
} else {
groupState = Optional.of(GroupState.parse(group.groupState()));
}
final GroupListing groupListing = new GroupListing(
groupId,
type,
protocolType
protocolType,
groupState
);
results.addListing(groupListing);
}
@ -3738,9 +3749,9 @@ public class KafkaAdminClient extends AdminClient {
runnable.call(new Call("listConsumerGroups", deadline, new ConstantNodeIdProvider(node.id())) {
@Override
ListGroupsRequest.Builder createRequest(int timeoutMs) {
List<String> states = options.states()
List<String> states = options.groupStates()
.stream()
.map(ConsumerGroupState::toString)
.map(GroupState::toString)
.collect(Collectors.toList());
List<String> groupTypes = options.types()
.stream()
@ -3756,17 +3767,17 @@ public class KafkaAdminClient extends AdminClient {
String protocolType = group.protocolType();
if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) {
final String groupId = group.groupId();
final Optional<ConsumerGroupState> state = group.groupState().isEmpty()
final Optional<GroupState> groupState = group.groupState().isEmpty()
? Optional.empty()
: Optional.of(ConsumerGroupState.parse(group.groupState()));
: Optional.of(GroupState.parse(group.groupState()));
final Optional<GroupType> type = group.groupType().isEmpty()
? Optional.empty()
: Optional.of(GroupType.parse(group.groupType()));
final ConsumerGroupListing groupListing = new ConsumerGroupListing(
groupId,
protocolType.isEmpty(),
state,
type
groupState,
type,
protocolType.isEmpty()
);
results.addListing(groupListing);
}
@ -3927,7 +3938,7 @@ public class KafkaAdminClient extends AdminClient {
ListGroupsRequest.Builder createRequest(int timeoutMs) {
List<String> states = options.states()
.stream()
.map(ShareGroupState::toString)
.map(GroupState::toString)
.collect(Collectors.toList());
List<String> types = Collections.singletonList(GroupType.SHARE.toString());
return new ListGroupsRequest.Builder(new ListGroupsRequestData()
@ -3938,9 +3949,9 @@ public class KafkaAdminClient extends AdminClient {
private void maybeAddShareGroup(ListGroupsResponseData.ListedGroup group) {
final String groupId = group.groupId();
final Optional<ShareGroupState> state = group.groupState().isEmpty()
final Optional<GroupState> state = group.groupState().isEmpty()
? Optional.empty()
: Optional.of(ShareGroupState.parse(group.groupState()));
: Optional.of(GroupState.parse(group.groupState()));
final ShareGroupListing groupListing = new ShareGroupListing(groupId, state);
results.addListing(groupListing);
}

View File

@ -18,12 +18,14 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Options for {@link Admin#listConsumerGroups()}.
@ -33,17 +35,30 @@ import java.util.Set;
@InterfaceStability.Evolving
public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroupsOptions> {
private Set<ConsumerGroupState> states = Collections.emptySet();
private Set<GroupState> groupStates = Collections.emptySet();
private Set<GroupType> types = Collections.emptySet();
/**
* If groupStates is set, only groups in these states will be returned by listGroups().
* Otherwise, all groups are returned.
* This operation is supported by brokers with version 2.6.0 or later.
*/
public ListConsumerGroupsOptions inGroupStates(Set<GroupState> groupStates) {
this.groupStates = (groupStates == null || groupStates.isEmpty()) ? Collections.emptySet() : Set.copyOf(groupStates);
return this;
}
/**
* If states is set, only groups in these states will be returned by listConsumerGroups().
* Otherwise, all groups are returned.
* This operation is supported by brokers with version 2.6.0 or later.
* @deprecated Since 4.0. Use {@link #inGroupStates(Set)}.
*/
@Deprecated
public ListConsumerGroupsOptions inStates(Set<ConsumerGroupState> states) {
this.states = (states == null || states.isEmpty()) ? Collections.emptySet() : new HashSet<>(states);
this.groupStates = (states == null || states.isEmpty())
? Collections.emptySet()
: states.stream().map(state -> GroupState.parse(state.name())).collect(Collectors.toSet());
return this;
}
@ -57,10 +72,19 @@ public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroup
}
/**
* Returns the list of States that are requested or empty if no states have been specified.
* Returns the list of group states that are requested or empty if no states have been specified.
*/
public Set<GroupState> groupStates() {
return groupStates;
}
/**
* Returns the list of States that are requested or empty if no states have been specified.
* @deprecated Since 4.0. Use {@link #inGroupStates(Set)}.
*/
@Deprecated
public Set<ConsumerGroupState> states() {
return states;
return groupStates.stream().map(groupState -> ConsumerGroupState.parse(groupState.name())).collect(Collectors.toSet());
}
/**

View File

@ -17,6 +17,7 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.annotation.InterfaceStability;
@ -31,8 +32,19 @@ import java.util.Set;
@InterfaceStability.Evolving
public class ListGroupsOptions extends AbstractOptions<ListGroupsOptions> {
private Set<GroupState> groupStates = Collections.emptySet();
private Set<GroupType> types = Collections.emptySet();
/**
* If groupStates is set, only groups in these states will be returned by listGroups().
* Otherwise, all groups are returned.
* This operation is supported by brokers with version 2.6.0 or later.
*/
public ListGroupsOptions inGroupStates(Set<GroupState> groupStates) {
this.groupStates = (groupStates == null || groupStates.isEmpty()) ? Collections.emptySet() : Set.copyOf(groupStates);
return this;
}
/**
* If types is set, only groups of these types will be returned by listGroups().
* Otherwise, all groups are returned.
@ -42,6 +54,13 @@ public class ListGroupsOptions extends AbstractOptions<ListGroupsOptions> {
return this;
}
/**
* Returns the list of group states that are requested or empty if no states have been specified.
*/
public Set<GroupState> groupStates() {
return groupStates;
}
/**
* Returns the list of group types that are requested or empty if no types have been specified.
*/

View File

@ -17,7 +17,7 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.ShareGroupState;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collections;
@ -32,12 +32,12 @@ import java.util.Set;
@InterfaceStability.Evolving
public class ListShareGroupsOptions extends AbstractOptions<ListShareGroupsOptions> {
private Set<ShareGroupState> states = Collections.emptySet();
private Set<GroupState> states = Collections.emptySet();
/**
* If states is set, only groups in these states will be returned. Otherwise, all groups are returned.
*/
public ListShareGroupsOptions inStates(Set<ShareGroupState> states) {
public ListShareGroupsOptions inStates(Set<GroupState> states) {
this.states = (states == null) ? Collections.emptySet() : new HashSet<>(states);
return this;
}
@ -45,7 +45,7 @@ public class ListShareGroupsOptions extends AbstractOptions<ListShareGroupsOptio
/**
* Return the list of States that are requested or empty if no states have been specified.
*/
public Set<ShareGroupState> states() {
public Set<GroupState> states() {
return states;
}
}

View File

@ -17,8 +17,8 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.ShareGroupState;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.annotation.InterfaceStability;
@ -36,26 +36,26 @@ import java.util.stream.Collectors;
public class ShareGroupDescription {
private final String groupId;
private final Collection<MemberDescription> members;
private final ShareGroupState state;
private final GroupState groupState;
private final Node coordinator;
private final Set<AclOperation> authorizedOperations;
public ShareGroupDescription(String groupId,
Collection<MemberDescription> members,
ShareGroupState state,
GroupState groupState,
Node coordinator) {
this(groupId, members, state, coordinator, Collections.emptySet());
this(groupId, members, groupState, coordinator, Collections.emptySet());
}
public ShareGroupDescription(String groupId,
Collection<MemberDescription> members,
ShareGroupState state,
GroupState groupState,
Node coordinator,
Set<AclOperation> authorizedOperations) {
this.groupId = groupId == null ? "" : groupId;
this.members = members == null ? Collections.emptyList() :
Collections.unmodifiableList(new ArrayList<>(members));
this.state = state;
this.groupState = groupState;
this.coordinator = coordinator;
this.authorizedOperations = authorizedOperations;
}
@ -67,14 +67,14 @@ public class ShareGroupDescription {
final ShareGroupDescription that = (ShareGroupDescription) o;
return Objects.equals(groupId, that.groupId) &&
Objects.equals(members, that.members) &&
state == that.state &&
groupState == that.groupState &&
Objects.equals(coordinator, that.coordinator) &&
Objects.equals(authorizedOperations, that.authorizedOperations);
}
@Override
public int hashCode() {
return Objects.hash(groupId, members, state, coordinator, authorizedOperations);
return Objects.hash(groupId, members, groupState, coordinator, authorizedOperations);
}
/**
@ -92,10 +92,10 @@ public class ShareGroupDescription {
}
/**
* The share group state, or UNKNOWN if the state is too new for us to parse.
* The group state, or UNKNOWN if the state is too new for us to parse.
*/
public ShareGroupState state() {
return state;
public GroupState groupState() {
return groupState;
}
/**
@ -116,7 +116,7 @@ public class ShareGroupDescription {
public String toString() {
return "(groupId=" + groupId +
", members=" + members.stream().map(MemberDescription::toString).collect(Collectors.joining(",")) +
", state=" + state +
", groupState=" + groupState +
", coordinator=" + coordinator +
", authorizedOperations=" + authorizedOperations +
")";

View File

@ -17,7 +17,7 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.ShareGroupState;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Objects;
@ -31,7 +31,7 @@ import java.util.Optional;
@InterfaceStability.Evolving
public class ShareGroupListing {
private final String groupId;
private final Optional<ShareGroupState> state;
private final Optional<GroupState> groupState;
/**
* Create an instance with the specified parameters.
@ -46,11 +46,11 @@ public class ShareGroupListing {
* Create an instance with the specified parameters.
*
* @param groupId Group Id
* @param state The state of the share group
* @param groupState The state of the share group
*/
public ShareGroupListing(String groupId, Optional<ShareGroupState> state) {
public ShareGroupListing(String groupId, Optional<GroupState> groupState) {
this.groupId = groupId;
this.state = Objects.requireNonNull(state);
this.groupState = Objects.requireNonNull(groupState);
}
/**
@ -63,21 +63,21 @@ public class ShareGroupListing {
/**
* The share group state.
*/
public Optional<ShareGroupState> state() {
return state;
public Optional<GroupState> groupState() {
return groupState;
}
@Override
public String toString() {
return "(" +
"groupId='" + groupId + '\'' +
", state=" + state +
", groupState=" + groupState +
')';
}
@Override
public int hashCode() {
return Objects.hash(groupId, state);
return Objects.hash(groupId, groupState);
}
@Override
@ -86,6 +86,6 @@ public class ShareGroupListing {
if (!(o instanceof ShareGroupListing)) return false;
ShareGroupListing that = (ShareGroupListing) o;
return Objects.equals(groupId, that.groupId) &&
Objects.equals(state, that.state);
Objects.equals(groupState, that.groupState);
}
}

View File

@ -21,7 +21,7 @@ import org.apache.kafka.clients.admin.MemberAssignment;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
@ -231,7 +231,7 @@ public class DescribeConsumerGroupsHandler implements AdminApiHandler<Coordinato
memberDescriptions,
describedGroup.assignorName(),
GroupType.CONSUMER,
ConsumerGroupState.parse(describedGroup.groupState()),
GroupState.parse(describedGroup.groupState()),
coordinator,
authorizedOperations
);
@ -286,7 +286,7 @@ public class DescribeConsumerGroupsHandler implements AdminApiHandler<Coordinato
memberDescriptions,
describedGroup.protocolData(),
GroupType.CLASSIC,
ConsumerGroupState.parse(describedGroup.groupState()),
GroupState.parse(describedGroup.groupState()),
coordinator,
authorizedOperations);
completed.put(groupIdKey, consumerGroupDescription);

View File

@ -19,8 +19,8 @@ package org.apache.kafka.clients.admin.internals;
import org.apache.kafka.clients.admin.MemberAssignment;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.admin.ShareGroupDescription;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.ShareGroupState;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.message.ShareGroupDescribeRequestData;
@ -129,7 +129,7 @@ public class DescribeShareGroupsHandler extends AdminApiHandler.Batched<Coordina
final ShareGroupDescription shareGroupDescription =
new ShareGroupDescription(groupIdKey.idValue,
memberDescriptions,
ShareGroupState.parse(describedGroup.groupState()),
GroupState.parse(describedGroup.groupState()),
coordinator,
authorizedOperations);
completed.put(groupIdKey, shareGroupDescription);
@ -184,7 +184,7 @@ public class DescribeShareGroupsHandler extends AdminApiHandler.Batched<Coordina
final ShareGroupDescription shareGroupDescription =
new ShareGroupDescription(groupId.idValue,
Collections.emptySet(),
ShareGroupState.DEAD,
GroupState.DEAD,
coordinator,
validAclOperations(describedGroup.authorizedOperations()));
completed.put(groupId, shareGroupDescription);

View File

@ -25,7 +25,9 @@ import java.util.stream.Collectors;
/**
* The consumer group state.
* @deprecated Since 4.0. Use {@link GroupState} instead.
*/
@Deprecated
public enum ConsumerGroupState {
UNKNOWN("Unknown"),
PREPARING_REBALANCE("PreparingRebalance"),

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* The group state.
* <p>
* The following table shows the correspondence between the group states and types.
* <table>
* <thead>
* <tr><th>State</th><th>Classic group</th><th>Consumer group</th><th>Share group</th></tr>
* </thead>
* <tbody>
* <tr><td>UNKNOWN</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
* <tr><td>PREPARING_REBALANCE</td><td>Yes</td><td>Yes</td><td></td></tr>
* <tr><td>COMPLETING_REBALANCE</td><td>Yes</td><td>Yes</td><td></td></tr>
* <tr><td>STABLE</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
* <tr><td>DEAD</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
* <tr><td>EMPTY</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
* <tr><td>ASSIGNING</td><td></td><td>Yes</td><td></td></tr>
* <tr><td>RECONCILING</td><td></td><td>Yes</td><td></td></tr>
* </tbody>
* </table>
*/
@InterfaceStability.Evolving
public enum GroupState {
UNKNOWN("Unknown"),
PREPARING_REBALANCE("PreparingRebalance"),
COMPLETING_REBALANCE("CompletingRebalance"),
STABLE("Stable"),
DEAD("Dead"),
EMPTY("Empty"),
ASSIGNING("Assigning"),
RECONCILING("Reconciling");
private static final Map<String, GroupState> NAME_TO_ENUM = Arrays.stream(values())
.collect(Collectors.toMap(state -> state.name.toUpperCase(Locale.ROOT), Function.identity()));
private final String name;
GroupState(String name) {
this.name = name;
}
/**
* Case-insensitive group state lookup by string name.
*/
public static GroupState parse(String name) {
GroupState state = NAME_TO_ENUM.get(name.toUpperCase(Locale.ROOT));
return state == null ? UNKNOWN : state;
}
public static Set<GroupState> groupStatesForType(GroupType type) {
if (type == GroupType.CLASSIC) {
return Set.of(PREPARING_REBALANCE, COMPLETING_REBALANCE, STABLE, DEAD, EMPTY);
} else if (type == GroupType.CONSUMER) {
return Set.of(PREPARING_REBALANCE, COMPLETING_REBALANCE, STABLE, DEAD, EMPTY, ASSIGNING, RECONCILING);
} else if (type == GroupType.SHARE) {
return Set.of(STABLE, DEAD, EMPTY);
} else {
throw new IllegalArgumentException("Group type not known");
}
}
@Override
public String toString() {
return name;
}
}

View File

@ -1,56 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common;
import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* The share group state.
*/
public enum ShareGroupState {
UNKNOWN("Unknown"),
STABLE("Stable"),
DEAD("Dead"),
EMPTY("Empty");
private static final Map<String, ShareGroupState> NAME_TO_ENUM = Arrays.stream(values())
.collect(Collectors.toMap(state -> state.name.toUpperCase(Locale.ROOT), Function.identity()));
private final String name;
ShareGroupState(String name) {
this.name = name;
}
/**
* Case-insensitive share group state lookup by string name.
*/
public static ShareGroupState parse(String name) {
ShareGroupState state = NAME_TO_ENUM.get(name.toUpperCase(Locale.ROOT));
return state == null ? UNKNOWN : state;
}
@Override
public String toString() {
return name;
}
}

View File

@ -18,6 +18,7 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.junit.jupiter.api.Test;
@ -33,16 +34,16 @@ public class GroupListingTest {
@Test
public void testSimpleConsumerGroup() {
GroupListing gl = new GroupListing(GROUP_ID, Optional.of(GroupType.CLASSIC), "");
GroupListing gl = new GroupListing(GROUP_ID, Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.EMPTY));
assertTrue(gl.isSimpleConsumerGroup());
gl = new GroupListing(GROUP_ID, Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE);
gl = new GroupListing(GROUP_ID, Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE, Optional.of(GroupState.STABLE));
assertFalse(gl.isSimpleConsumerGroup());
gl = new GroupListing(GROUP_ID, Optional.of(GroupType.CONSUMER), "");
gl = new GroupListing(GROUP_ID, Optional.of(GroupType.CONSUMER), "", Optional.of(GroupState.EMPTY));
assertFalse(gl.isSimpleConsumerGroup());
gl = new GroupListing(GROUP_ID, Optional.empty(), "");
gl = new GroupListing(GROUP_ID, Optional.empty(), "", Optional.empty());
assertFalse(gl.isSimpleConsumerGroup());
}
}

View File

@ -30,14 +30,13 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.ClassicGroupState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.ShareGroupState;
import org.apache.kafka.common.TopicCollection;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionReplica;
@ -3134,8 +3133,8 @@ public class KafkaAdminClientTest {
assertEquals(2, listings.size());
List<GroupListing> expected = new ArrayList<>();
expected.add(new GroupListing("group-2", Optional.of(GroupType.CLASSIC), ""));
expected.add(new GroupListing("group-1", Optional.of(GroupType.CONSUMER), ConsumerProtocol.PROTOCOL_TYPE));
expected.add(new GroupListing("group-2", Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.EMPTY)));
expected.add(new GroupListing("group-1", Optional.of(GroupType.CONSUMER), ConsumerProtocol.PROTOCOL_TYPE, Optional.of(GroupState.STABLE)));
assertEquals(expected, listings);
assertEquals(0, result.errors().get().size());
}
@ -3163,7 +3162,7 @@ public class KafkaAdminClientTest {
assertEquals(1, listings.size());
List<GroupListing> expected = new ArrayList<>();
expected.add(new GroupListing("group-1", Optional.empty(), "any"));
expected.add(new GroupListing("group-1", Optional.empty(), "any", Optional.empty()));
assertEquals(expected, listings);
assertEquals(0, result.errors().get().size());
}
@ -3199,8 +3198,8 @@ public class KafkaAdminClientTest {
assertEquals(2, listing.size());
List<GroupListing> expected = new ArrayList<>();
expected.add(new GroupListing("group-2", Optional.of(GroupType.CONSUMER), ""));
expected.add(new GroupListing("group-1", Optional.of(GroupType.CONSUMER), ConsumerProtocol.PROTOCOL_TYPE));
expected.add(new GroupListing("group-2", Optional.of(GroupType.CONSUMER), "", Optional.of(GroupState.EMPTY)));
expected.add(new GroupListing("group-1", Optional.of(GroupType.CONSUMER), ConsumerProtocol.PROTOCOL_TYPE, Optional.of(GroupState.STABLE)));
assertEquals(expected, listing);
assertEquals(0, result.errors().get().size());
}
@ -3387,8 +3386,8 @@ public class KafkaAdminClientTest {
assertEquals(2, listings.size());
List<ConsumerGroupListing> expected = new ArrayList<>();
expected.add(new ConsumerGroupListing("group-2", true, Optional.of(ConsumerGroupState.EMPTY)));
expected.add(new ConsumerGroupListing("group-1", false, Optional.of(ConsumerGroupState.STABLE)));
expected.add(new ConsumerGroupListing("group-2", Optional.of(GroupState.EMPTY), true));
expected.add(new ConsumerGroupListing("group-1", Optional.of(GroupState.STABLE), false));
assertEquals(expected, listings);
assertEquals(0, result.errors().get().size());
}
@ -3403,7 +3402,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
env.kafkaClient().prepareResponseFrom(
expectListGroupsRequestWithFilters(singleton(ConsumerGroupState.STABLE.toString()), Collections.emptySet()),
expectListGroupsRequestWithFilters(singleton(GroupState.STABLE.toString()), Collections.emptySet()),
new ListGroupsResponse(new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(singletonList(
@ -3414,13 +3413,13 @@ public class KafkaAdminClientTest {
.setGroupType(GroupType.CLASSIC.toString())))),
env.cluster().nodeById(0));
final ListConsumerGroupsOptions options = new ListConsumerGroupsOptions().inStates(singleton(ConsumerGroupState.STABLE));
final ListConsumerGroupsOptions options = new ListConsumerGroupsOptions().inGroupStates(singleton(GroupState.STABLE));
final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(options);
Collection<ConsumerGroupListing> listings = result.valid().get();
assertEquals(1, listings.size());
List<ConsumerGroupListing> expected = new ArrayList<>();
expected.add(new ConsumerGroupListing("group-1", false, Optional.of(ConsumerGroupState.STABLE), Optional.of(GroupType.CLASSIC)));
expected.add(new ConsumerGroupListing("group-1", Optional.of(GroupState.STABLE), Optional.of(GroupType.CLASSIC), false));
assertEquals(expected, listings);
assertEquals(0, result.errors().get().size());
@ -3449,8 +3448,8 @@ public class KafkaAdminClientTest {
assertEquals(2, listings2.size());
List<ConsumerGroupListing> expected2 = new ArrayList<>();
expected2.add(new ConsumerGroupListing("group-2", true, Optional.of(ConsumerGroupState.EMPTY), Optional.of(GroupType.CONSUMER)));
expected2.add(new ConsumerGroupListing("group-1", false, Optional.of(ConsumerGroupState.STABLE), Optional.of(GroupType.CONSUMER)));
expected2.add(new ConsumerGroupListing("group-2", Optional.of(GroupState.EMPTY), Optional.of(GroupType.CONSUMER), true));
expected2.add(new ConsumerGroupListing("group-1", Optional.of(GroupState.STABLE), Optional.of(GroupType.CONSUMER), false));
assertEquals(expected2, listings2);
assertEquals(0, result.errors().get().size());
}
@ -3488,7 +3487,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareUnsupportedVersionResponse(
body -> body instanceof ListGroupsRequest);
options = new ListConsumerGroupsOptions().inStates(singleton(ConsumerGroupState.STABLE));
options = new ListConsumerGroupsOptions().inGroupStates(singleton(GroupState.STABLE));
result = env.adminClient().listConsumerGroups(options);
TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class);
}
@ -3507,23 +3506,23 @@ public class KafkaAdminClientTest {
// Check if we can list groups with older broker if we specify states and don't specify types.
env.kafkaClient().prepareResponseFrom(
expectListGroupsRequestWithFilters(singleton(ConsumerGroupState.STABLE.toString()), Collections.emptySet()),
expectListGroupsRequestWithFilters(singleton(GroupState.STABLE.toString()), Collections.emptySet()),
new ListGroupsResponse(new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(Collections.singletonList(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-1")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState(ConsumerGroupState.STABLE.toString())))),
.setGroupState(GroupState.STABLE.toString())))),
env.cluster().nodeById(0));
ListConsumerGroupsOptions options = new ListConsumerGroupsOptions().inStates(singleton(ConsumerGroupState.STABLE));
ListConsumerGroupsOptions options = new ListConsumerGroupsOptions().inGroupStates(singleton(GroupState.STABLE));
ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(options);
Collection<ConsumerGroupListing> listing = result.all().get();
assertEquals(1, listing.size());
List<ConsumerGroupListing> expected = Collections.singletonList(
new ConsumerGroupListing("group-1", false, Optional.of(ConsumerGroupState.STABLE))
new ConsumerGroupListing("group-1", Optional.of(GroupState.STABLE), false)
);
assertEquals(expected, listing);
@ -4109,7 +4108,7 @@ public class KafkaAdminClientTest {
),
"range",
GroupType.CONSUMER,
ConsumerGroupState.STABLE,
GroupState.STABLE,
env.cluster().controller(),
Collections.emptySet()
));
@ -4129,7 +4128,7 @@ public class KafkaAdminClientTest {
),
"range",
GroupType.CLASSIC,
ConsumerGroupState.STABLE,
GroupState.STABLE,
env.cluster().controller(),
Collections.emptySet()
));
@ -5029,7 +5028,7 @@ public class KafkaAdminClientTest {
ShareGroupDescribeResponseData group0Data = new ShareGroupDescribeResponseData();
group0Data.groups().add(new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId(GROUP_ID)
.setGroupState(ShareGroupState.STABLE.toString())
.setGroupState(GroupState.STABLE.toString())
.setMembers(asList(memberOne, memberTwo)));
final List<TopicPartition> expectedTopicPartitions = new ArrayList<>();
@ -5044,7 +5043,7 @@ public class KafkaAdminClientTest {
new MemberAssignment(new HashSet<>(expectedTopicPartitions))));
data.groups().add(new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId(GROUP_ID)
.setGroupState(ShareGroupState.STABLE.toString())
.setGroupState(GroupState.STABLE.toString())
.setMembers(asList(memberOne, memberTwo)));
env.kafkaClient().prepareResponse(new ShareGroupDescribeResponse(data));
@ -5097,7 +5096,7 @@ public class KafkaAdminClientTest {
ShareGroupDescribeResponseData group0Data = new ShareGroupDescribeResponseData();
group0Data.groups().add(new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId(GROUP_ID)
.setGroupState(ShareGroupState.STABLE.toString())
.setGroupState(GroupState.STABLE.toString())
.setMembers(asList(
new ShareGroupDescribeResponseData.Member()
.setMemberId("0")
@ -5113,7 +5112,7 @@ public class KafkaAdminClientTest {
ShareGroupDescribeResponseData group1Data = new ShareGroupDescribeResponseData();
group1Data.groups().add(new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupId("group-1")
.setGroupState(ShareGroupState.STABLE.toString())
.setGroupState(GroupState.STABLE.toString())
.setMembers(asList(
new ShareGroupDescribeResponseData.Member()
.setMemberId("0")
@ -5231,7 +5230,7 @@ public class KafkaAdminClientTest {
Set<String> groupIds = new HashSet<>();
for (ShareGroupListing listing : listings) {
groupIds.add(listing.groupId());
assertTrue(listing.state().isPresent());
assertTrue(listing.groupState().isPresent());
}
assertEquals(Set.of("share-group-1", "share-group-2", "share-group-3", "share-group-4"), groupIds);
@ -5289,8 +5288,8 @@ public class KafkaAdminClientTest {
assertEquals(2, listings.size());
List<ShareGroupListing> expected = new ArrayList<>();
expected.add(new ShareGroupListing("share-group-1", Optional.of(ShareGroupState.STABLE)));
expected.add(new ShareGroupListing("share-group-2", Optional.of(ShareGroupState.EMPTY)));
expected.add(new ShareGroupListing("share-group-1", Optional.of(GroupState.STABLE)));
expected.add(new ShareGroupListing("share-group-2", Optional.of(GroupState.EMPTY)));
assertEquals(expected, listings);
assertEquals(0, result.errors().get().size());
}

View File

@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Metric;
@ -724,7 +725,7 @@ public class MockAdminClient extends AdminClient {
@Override
public synchronized ListGroupsResult listGroups(ListGroupsOptions options) {
KafkaFutureImpl<Collection<Object>> future = new KafkaFutureImpl<>();
future.complete(groupConfigs.keySet().stream().map(g -> new GroupListing(g, Optional.of(GroupType.CONSUMER), ConsumerProtocol.PROTOCOL_TYPE)).collect(Collectors.toList()));
future.complete(groupConfigs.keySet().stream().map(g -> new GroupListing(g, Optional.of(GroupType.CONSUMER), ConsumerProtocol.PROTOCOL_TYPE, Optional.of(GroupState.STABLE))).collect(Collectors.toList()));
return new ListGroupsResult(future);
}

View File

@ -18,9 +18,9 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.ShareGroupState;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
@ -1487,7 +1487,7 @@ public class RequestResponseTest {
.setGroupId("group")
.setErrorCode((short) 0)
.setErrorMessage(Errors.forCode((short) 0).message())
.setGroupState(ShareGroupState.EMPTY.toString())
.setGroupState(GroupState.EMPTY.toString())
.setMembers(new ArrayList<>(0))
))
.setThrottleTimeMs(1000);

View File

@ -20,7 +20,7 @@ import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownMemberIdException;
@ -297,11 +297,11 @@ public class MirrorCheckpointTask extends SourceTask {
for (String group : consumerGroups) {
try {
ConsumerGroupDescription consumerGroupDesc = consumerGroupsDesc.get(group).get();
ConsumerGroupState consumerGroupState = consumerGroupDesc.state();
GroupState consumerGroupState = consumerGroupDesc.groupState();
// sync offset to the target cluster only if the state of current consumer group is:
// (1) idle: because the consumer at target is not actively consuming the mirrored topic
// (2) dead: the new consumer that is recently created at source and never existed at target
if (consumerGroupState == ConsumerGroupState.EMPTY) {
if (consumerGroupState == GroupState.EMPTY) {
idleConsumerGroupsOffset.put(
group,
adminCall(

View File

@ -48,7 +48,7 @@ import org.apache.kafka.common.requests.DeleteRecordsRequest
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{ConsumerGroupState, ElectionType, GroupType, IsolationLevel, ShareGroupState, TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica, Uuid}
import org.apache.kafka.common.{ConsumerGroupState, ElectionType, GroupState, GroupType, IsolationLevel, TopicCollection, TopicPartition, TopicPartitionInfo, TopicPartitionReplica, Uuid}
import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
import org.apache.kafka.network.SocketServerConfigs
@ -1816,6 +1816,279 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17960"))
def testConsumerGroups(quorum: String, groupProtocol: String): Unit = {
val config = createConfig
client = Admin.create(config)
try {
// Verify that initially there are no consumer groups to list.
val list1 = client.listConsumerGroups()
assertEquals(0, list1.all().get().size())
assertEquals(0, list1.errors().get().size())
assertEquals(0, list1.valid().get().size())
val testTopicName = "test_topic"
val testTopicName1 = testTopicName + "1"
val testTopicName2 = testTopicName + "2"
val testNumPartitions = 2
client.createTopics(util.Arrays.asList(
new NewTopic(testTopicName, testNumPartitions, 1.toShort),
new NewTopic(testTopicName1, testNumPartitions, 1.toShort),
new NewTopic(testTopicName2, testNumPartitions, 1.toShort)
)).all().get()
waitForTopics(client, List(testTopicName, testTopicName1, testTopicName2), List())
val producer = createProducer()
try {
producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
} finally {
Utils.closeQuietly(producer, "producer")
}
val EMPTY_GROUP_INSTANCE_ID = ""
val testGroupId = "test_group_id"
val testClientId = "test_client_id"
val testInstanceId1 = "test_instance_id_1"
val testInstanceId2 = "test_instance_id_2"
val fakeGroupId = "fake_group_id"
def createProperties(groupInstanceId: String): Properties = {
val newConsumerConfig = new Properties(consumerConfig)
// We need to disable the auto commit because after the members got removed from group, the offset commit
// will cause the member rejoining and the test will be flaky (check ConsumerCoordinator#OffsetCommitResponseHandler)
newConsumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, testClientId)
if (groupInstanceId != EMPTY_GROUP_INSTANCE_ID) {
newConsumerConfig.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId)
}
newConsumerConfig
}
// contains two static members and one dynamic member
val groupInstanceSet = Set(testInstanceId1, testInstanceId2, EMPTY_GROUP_INSTANCE_ID)
val consumerSet = groupInstanceSet.map { groupInstanceId => createConsumer(configOverrides = createProperties(groupInstanceId))}
val topicSet = Set(testTopicName, testTopicName1, testTopicName2)
val latch = new CountDownLatch(consumerSet.size)
try {
def createConsumerThread[K,V](consumer: Consumer[K,V], topic: String): Thread = {
new Thread {
override def run : Unit = {
consumer.subscribe(Collections.singleton(topic))
try {
while (true) {
consumer.poll(JDuration.ofSeconds(5))
if (!consumer.assignment.isEmpty && latch.getCount > 0L)
latch.countDown()
consumer.commitSync()
}
} catch {
case _: InterruptException => // Suppress the output to stderr
}
}
}
}
// Start consumers in a thread that will subscribe to a new group.
val consumerThreads = consumerSet.zip(topicSet).map(zipped => createConsumerThread(zipped._1, zipped._2))
val groupType = if (groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name)) GroupType.CONSUMER else GroupType.CLASSIC
try {
consumerThreads.foreach(_.start())
assertTrue(latch.await(30000, TimeUnit.MILLISECONDS))
// Test that we can list the new group.
TestUtils.waitUntilTrue(() => {
val matching = client.listConsumerGroups.all.get.asScala.filter(group =>
group.groupId == testGroupId &&
group.groupState.get == GroupState.STABLE)
matching.size == 1
}, s"Expected to be able to list $testGroupId")
TestUtils.waitUntilTrue(() => {
val options = new ListConsumerGroupsOptions().withTypes(Set(groupType).asJava)
val matching = client.listConsumerGroups(options).all.get.asScala.filter(group =>
group.groupId == testGroupId &&
group.groupState.get == GroupState.STABLE)
matching.size == 1
}, s"Expected to be able to list $testGroupId in group type $groupType")
TestUtils.waitUntilTrue(() => {
val options = new ListConsumerGroupsOptions().withTypes(Set(groupType).asJava)
.inGroupStates(Set(GroupState.STABLE).asJava)
val matching = client.listConsumerGroups(options).all.get.asScala.filter(group =>
group.groupId == testGroupId &&
group.groupState.get == GroupState.STABLE)
matching.size == 1
}, s"Expected to be able to list $testGroupId in group type $groupType and state Stable")
TestUtils.waitUntilTrue(() => {
val options = new ListConsumerGroupsOptions().inGroupStates(Set(GroupState.STABLE).asJava)
val matching = client.listConsumerGroups(options).all.get.asScala.filter(group =>
group.groupId == testGroupId &&
group.groupState.get == GroupState.STABLE)
matching.size == 1
}, s"Expected to be able to list $testGroupId in state Stable")
TestUtils.waitUntilTrue(() => {
val options = new ListConsumerGroupsOptions().inGroupStates(Set(GroupState.EMPTY).asJava)
val matching = client.listConsumerGroups(options).all.get.asScala.filter(
_.groupId == testGroupId)
matching.isEmpty
}, s"Expected to find zero groups")
val describeWithFakeGroupResult = client.describeConsumerGroups(Seq(testGroupId, fakeGroupId).asJava,
new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
assertEquals(2, describeWithFakeGroupResult.describedGroups().size())
// Test that we can get information about the test consumer group.
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(testGroupId))
var testGroupDescription = describeWithFakeGroupResult.describedGroups().get(testGroupId).get()
assertEquals(testGroupId, testGroupDescription.groupId())
assertFalse(testGroupDescription.isSimpleConsumerGroup)
assertEquals(groupInstanceSet.size, testGroupDescription.members().size())
val members = testGroupDescription.members()
members.asScala.foreach(member => assertEquals(testClientId, member.clientId()))
val topicPartitionsByTopic = members.asScala.flatMap(_.assignment().topicPartitions().asScala).groupBy(_.topic())
topicSet.foreach { topic =>
val topicPartitions = topicPartitionsByTopic.getOrElse(topic, List.empty)
assertEquals(testNumPartitions, topicPartitions.size)
}
val expectedOperations = AclEntry.supportedOperations(ResourceType.GROUP)
assertEquals(expectedOperations, testGroupDescription.authorizedOperations())
// Test that the fake group is listed as dead.
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(fakeGroupId))
val fakeGroupDescription = describeWithFakeGroupResult.describedGroups().get(fakeGroupId).get()
assertEquals(fakeGroupId, fakeGroupDescription.groupId())
assertEquals(0, fakeGroupDescription.members().size())
assertEquals("", fakeGroupDescription.partitionAssignor())
assertEquals(GroupState.DEAD, fakeGroupDescription.groupState())
assertEquals(expectedOperations, fakeGroupDescription.authorizedOperations())
// Test that all() returns 2 results
assertEquals(2, describeWithFakeGroupResult.all().get().size())
val testTopicPart0 = new TopicPartition(testTopicName, 0)
// Test listConsumerGroupOffsets
TestUtils.waitUntilTrue(() => {
val parts = client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get()
parts.containsKey(testTopicPart0) && (parts.get(testTopicPart0).offset() == 1)
}, s"Expected the offset for partition 0 to eventually become 1.")
// Test listConsumerGroupOffsets with requireStable true
val options = new ListConsumerGroupOffsetsOptions().requireStable(true)
var parts = client.listConsumerGroupOffsets(testGroupId, options)
.partitionsToOffsetAndMetadata().get()
assertTrue(parts.containsKey(testTopicPart0))
assertEquals(1, parts.get(testTopicPart0).offset())
// Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec
val groupSpecs = Collections.singletonMap(testGroupId,
new ListConsumerGroupOffsetsSpec().topicPartitions(Collections.singleton(new TopicPartition(testTopicName, 0))))
parts = client.listConsumerGroupOffsets(groupSpecs).partitionsToOffsetAndMetadata().get()
assertTrue(parts.containsKey(testTopicPart0))
assertEquals(1, parts.get(testTopicPart0).offset())
// Test listConsumerGroupOffsets with listConsumerGroupOffsetsSpec and requireStable option
parts = client.listConsumerGroupOffsets(groupSpecs, options).partitionsToOffsetAndMetadata().get()
assertTrue(parts.containsKey(testTopicPart0))
assertEquals(1, parts.get(testTopicPart0).offset())
// Test delete non-exist consumer instance
val invalidInstanceId = "invalid-instance-id"
var removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, new RemoveMembersFromConsumerGroupOptions(
Collections.singleton(new MemberToRemove(invalidInstanceId))
))
TestUtils.assertFutureExceptionTypeEquals(removeMembersResult.all, classOf[UnknownMemberIdException])
val firstMemberFuture = removeMembersResult.memberResult(new MemberToRemove(invalidInstanceId))
TestUtils.assertFutureExceptionTypeEquals(firstMemberFuture, classOf[UnknownMemberIdException])
// Test consumer group deletion
var deleteResult = client.deleteConsumerGroups(Seq(testGroupId, fakeGroupId).asJava)
assertEquals(2, deleteResult.deletedGroups().size())
// Deleting the fake group ID should get GroupIdNotFoundException.
assertTrue(deleteResult.deletedGroups().containsKey(fakeGroupId))
assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(fakeGroupId),
classOf[GroupIdNotFoundException])
// Deleting the real group ID should get GroupNotEmptyException
assertTrue(deleteResult.deletedGroups().containsKey(testGroupId))
assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(testGroupId),
classOf[GroupNotEmptyException])
// Test delete one correct static member
val removeOptions = new RemoveMembersFromConsumerGroupOptions(Collections.singleton(new MemberToRemove(testInstanceId1)))
removeOptions.reason("test remove")
removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, removeOptions)
assertNull(removeMembersResult.all().get())
val validMemberFuture = removeMembersResult.memberResult(new MemberToRemove(testInstanceId1))
assertNull(validMemberFuture.get())
val describeTestGroupResult = client.describeConsumerGroups(Seq(testGroupId).asJava,
new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
assertEquals(1, describeTestGroupResult.describedGroups().size())
testGroupDescription = describeTestGroupResult.describedGroups().get(testGroupId).get()
assertEquals(testGroupId, testGroupDescription.groupId)
assertFalse(testGroupDescription.isSimpleConsumerGroup)
assertEquals(consumerSet.size - 1, testGroupDescription.members().size())
// Delete all active members remaining (a static member + a dynamic member)
removeMembersResult = client.removeMembersFromConsumerGroup(testGroupId, new RemoveMembersFromConsumerGroupOptions())
assertNull(removeMembersResult.all().get())
// The group should contain no members now.
testGroupDescription = client.describeConsumerGroups(Seq(testGroupId).asJava,
new DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
.describedGroups().get(testGroupId).get()
assertTrue(testGroupDescription.members().isEmpty)
// Consumer group deletion on empty group should succeed
deleteResult = client.deleteConsumerGroups(Seq(testGroupId).asJava)
assertEquals(1, deleteResult.deletedGroups().size())
assertTrue(deleteResult.deletedGroups().containsKey(testGroupId))
assertNull(deleteResult.deletedGroups().get(testGroupId).get())
// Test alterConsumerGroupOffsets
val alterConsumerGroupOffsetsResult = client.alterConsumerGroupOffsets(testGroupId,
Collections.singletonMap(testTopicPart0, new OffsetAndMetadata(0L)))
assertNull(alterConsumerGroupOffsetsResult.all().get())
assertNull(alterConsumerGroupOffsetsResult.partitionResult(testTopicPart0).get())
// Verify alterConsumerGroupOffsets success
TestUtils.waitUntilTrue(() => {
val parts = client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get()
parts.containsKey(testTopicPart0) && (parts.get(testTopicPart0).offset() == 0)
}, s"Expected the offset for partition 0 to eventually become 0.")
} finally {
consumerThreads.foreach {
case consumerThread =>
consumerThread.interrupt()
consumerThread.join()
}
}
} finally {
consumerSet.zip(groupInstanceSet).foreach(zipped => Utils.closeQuietly(zipped._1, zipped._2))
}
} finally {
Utils.closeQuietly(client, "adminClient")
}
}
/**
* Test the consumer group APIs.
*/
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17960"))
def testConsumerGroupsDeprecatedConsumerGroupState(quorum: String, groupProtocol: String): Unit = {
val config = createConfig
client = Admin.create(config)
try {
@ -2203,10 +2476,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
groups.size() == 4
}, "Expected to find all groups")
val classicGroupListing = new GroupListing(classicGroupId, Optional.of(GroupType.CLASSIC), "consumer")
val consumerGroupListing = new GroupListing(consumerGroupId, Optional.of(GroupType.CONSUMER), "consumer")
val shareGroupListing = new GroupListing(shareGroupId, Optional.of(GroupType.SHARE), "share")
val simpleGroupListing = new GroupListing(simpleGroupId, Optional.of(GroupType.CLASSIC), "")
val classicGroupListing = new GroupListing(classicGroupId, Optional.of(GroupType.CLASSIC), "consumer", Optional.of(GroupState.STABLE))
val consumerGroupListing = new GroupListing(consumerGroupId, Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE))
val shareGroupListing = new GroupListing(shareGroupId, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE))
val simpleGroupListing = new GroupListing(simpleGroupId, Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.EMPTY))
var listGroupsResult = client.listGroups()
assertTrue(listGroupsResult.errors().get().isEmpty)
@ -2327,6 +2600,11 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
val producer = createProducer()
try {
// Verify that initially there are no share groups to list.
val list = client.listGroups()
assertEquals(0, list.all().get().size())
assertEquals(0, list.errors().get().size())
assertEquals(0, list.valid().get().size())
val list1 = client.listShareGroups()
assertEquals(0, list1.all().get().size())
assertEquals(0, list1.errors().get().size())
@ -2350,21 +2628,40 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
TestUtils.waitUntilTrue(() => {
client.listShareGroups.all.get.stream().filter(group =>
group.groupId == testGroupId &&
group.state.get == ShareGroupState.STABLE).count() == 1
group.groupState.get == GroupState.STABLE).count() == 1
}, s"Expected to be able to list $testGroupId")
TestUtils.waitUntilTrue(() => {
val options = new ListShareGroupsOptions().inStates(Collections.singleton(ShareGroupState.STABLE))
val options = new ListShareGroupsOptions().inStates(Collections.singleton(GroupState.STABLE))
client.listShareGroups(options).all.get.stream().filter(group =>
group.groupId == testGroupId &&
group.state.get == ShareGroupState.STABLE).count() == 1
group.groupState.get == GroupState.STABLE).count() == 1
}, s"Expected to be able to list $testGroupId in state Stable")
TestUtils.waitUntilTrue(() => {
val options = new ListShareGroupsOptions().inStates(Collections.singleton(ShareGroupState.EMPTY))
val options = new ListShareGroupsOptions().inStates(Collections.singleton(GroupState.EMPTY))
client.listShareGroups(options).all.get.stream().filter(_.groupId == testGroupId).count() == 0
}, s"Expected to find zero groups")
// listGroups is equivalent to listShareGroups so ensure that works too
TestUtils.waitUntilTrue(() => {
client.listGroups.all.get.stream().filter(group =>
group.groupId == testGroupId &&
group.groupState.get == GroupState.STABLE).count() == 1
}, s"Expected to be able to list $testGroupId")
TestUtils.waitUntilTrue(() => {
val options = new ListGroupsOptions().withTypes(Collections.singleton(GroupType.SHARE)).inGroupStates(Collections.singleton(GroupState.STABLE))
client.listGroups(options).all.get.stream().filter(group =>
group.groupId == testGroupId &&
group.groupState.get == GroupState.STABLE).count() == 1
}, s"Expected to be able to list $testGroupId in state Stable")
TestUtils.waitUntilTrue(() => {
val options = new ListGroupsOptions().withTypes(Collections.singleton(GroupType.SHARE)).inGroupStates(Collections.singleton(GroupState.EMPTY))
client.listGroups(options).all.get.stream().filter(_.groupId == testGroupId).count() == 0
}, s"Expected to find zero groups")
val describeWithFakeGroupResult = client.describeShareGroups(util.Arrays.asList(testGroupId, fakeGroupId),
new DescribeShareGroupsOptions().includeAuthorizedOperations(true))
assertEquals(2, describeWithFakeGroupResult.describedGroups().size())
@ -2393,7 +2690,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertEquals(fakeGroupId, fakeGroupDescription.groupId())
assertEquals(0, fakeGroupDescription.members().size())
assertEquals(ShareGroupState.DEAD, fakeGroupDescription.state())
assertEquals(GroupState.DEAD, fakeGroupDescription.groupState())
assertNull(fakeGroupDescription.authorizedOperations())
// Test that all() returns 2 results

View File

@ -20,7 +20,7 @@ import org.apache.kafka.common.test.api.ClusterInstance
import org.apache.kafka.common.test.api._
import org.apache.kafka.common.test.api.ClusterTestExtensions
import kafka.utils.TestUtils
import org.apache.kafka.common.ShareGroupState
import org.apache.kafka.common.GroupState
import org.apache.kafka.common.message.ShareGroupDescribeResponseData.DescribedGroup
import org.apache.kafka.common.message.{ShareGroupDescribeRequestData, ShareGroupDescribeResponseData, ShareGroupHeartbeatResponseData}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@ -114,7 +114,7 @@ class ShareGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCoord
val expected = List(
new DescribedGroup()
.setGroupId("grp-1")
.setGroupState(ShareGroupState.STABLE.toString)
.setGroupState(GroupState.STABLE.toString)
.setGroupEpoch(1)
.setAssignmentEpoch(1)
.setAssignorName("simple")

View File

@ -16,8 +16,8 @@
*/
package org.apache.kafka.coordinator.group.metrics;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.ShareGroupState;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.metrics.Metrics;
@ -116,19 +116,19 @@ public class GroupCoordinatorMetricsTest {
GroupCoordinatorMetrics.METRICS_GROUP,
"The number of share groups in empty state.",
"protocol", Group.GroupType.SHARE.toString(),
"state", ShareGroupState.EMPTY.toString()),
"state", GroupState.EMPTY.toString()),
metrics.metricName(
"group-count",
GroupCoordinatorMetrics.METRICS_GROUP,
"The number of share groups in stable state.",
"protocol", Group.GroupType.SHARE.toString(),
"state", ShareGroupState.STABLE.toString()),
"state", GroupState.STABLE.toString()),
metrics.metricName(
"group-count",
GroupCoordinatorMetrics.METRICS_GROUP,
"The number of share groups in dead state.",
"protocol", Group.GroupType.SHARE.toString(),
"state", ShareGroupState.DEAD.toString())
"state", GroupState.DEAD.toString())
));
try {

View File

@ -38,7 +38,7 @@ import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
@ -130,11 +130,12 @@ public class ConsumerGroupCommand {
}
}
static Set<ConsumerGroupState> consumerGroupStatesFromString(String input) {
Set<ConsumerGroupState> parsedStates = Arrays.stream(input.split(",")).map(s -> ConsumerGroupState.parse(s.trim())).collect(Collectors.toSet());
if (parsedStates.contains(ConsumerGroupState.UNKNOWN)) {
Collection<ConsumerGroupState> validStates = Arrays.stream(ConsumerGroupState.values()).filter(s -> s != ConsumerGroupState.UNKNOWN).collect(Collectors.toList());
throw new IllegalArgumentException("Invalid state list '" + input + "'. Valid states are: " + validStates.stream().map(ConsumerGroupState::toString).collect(Collectors.joining(", ")));
static Set<GroupState> groupStatesFromString(String input) {
Set<GroupState> parsedStates = Arrays.stream(input.split(",")).map(s -> GroupState.parse(s.trim())).collect(Collectors.toSet());
Set<GroupState> validStates = GroupState.groupStatesForType(GroupType.CONSUMER);
if (!validStates.containsAll(parsedStates)) {
throw new IllegalArgumentException("Invalid state list '" + input + "'. Valid states are: " +
validStates.stream().map(GroupState::toString).collect(Collectors.joining(", ")));
}
return parsedStates;
}
@ -204,7 +205,7 @@ public class ConsumerGroupCommand {
if (includeType || includeState) {
Set<GroupType> types = typeValues();
Set<ConsumerGroupState> states = stateValues();
Set<GroupState> states = stateValues();
List<ConsumerGroupListing> listings = listConsumerGroupsWithFilters(types, states);
printGroupInfo(listings, includeType, includeState);
@ -213,11 +214,11 @@ public class ConsumerGroupCommand {
}
}
private Set<ConsumerGroupState> stateValues() {
private Set<GroupState> stateValues() {
String stateValue = opts.options.valueOf(opts.stateOpt);
return (stateValue == null || stateValue.isEmpty())
? Collections.emptySet()
: consumerGroupStatesFromString(stateValue);
: groupStatesFromString(stateValue);
}
private Set<GroupType> typeValues() {
@ -230,7 +231,7 @@ public class ConsumerGroupCommand {
private void printGroupInfo(List<ConsumerGroupListing> groups, boolean includeType, boolean includeState) {
Function<ConsumerGroupListing, String> groupId = ConsumerGroupListing::groupId;
Function<ConsumerGroupListing, String> groupType = groupListing -> groupListing.type().orElse(GroupType.UNKNOWN).toString();
Function<ConsumerGroupListing, String> groupState = groupListing -> groupListing.state().orElse(ConsumerGroupState.UNKNOWN).toString();
Function<ConsumerGroupListing, String> groupState = groupListing -> groupListing.groupState().orElse(GroupState.UNKNOWN).toString();
OptionalInt maybeMax = groups.stream().mapToInt(groupListing -> Math.max(15, groupId.apply(groupListing).length())).max();
int maxGroupLen = maybeMax.orElse(15) + 10;
@ -270,26 +271,26 @@ public class ConsumerGroupCommand {
}
}
List<ConsumerGroupListing> listConsumerGroupsWithFilters(Set<GroupType> types, Set<ConsumerGroupState> states) throws ExecutionException, InterruptedException {
List<ConsumerGroupListing> listConsumerGroupsWithFilters(Set<GroupType> types, Set<GroupState> states) throws ExecutionException, InterruptedException {
ListConsumerGroupsOptions listConsumerGroupsOptions = withTimeoutMs(new ListConsumerGroupsOptions());
listConsumerGroupsOptions
.inStates(states)
.inGroupStates(states)
.withTypes(types);
ListConsumerGroupsResult result = adminClient.listConsumerGroups(listConsumerGroupsOptions);
return new ArrayList<>(result.all().get());
}
private boolean shouldPrintMemberState(String group, Optional<ConsumerGroupState> state, Optional<Integer> numRows) {
private boolean shouldPrintMemberState(String group, Optional<GroupState> state, Optional<Integer> numRows) {
// numRows contains the number of data rows, if any, compiled from the API call in the caller method.
// if it's undefined or 0, there is no relevant group information to display.
if (!numRows.isPresent()) {
if (numRows.isEmpty()) {
printError("The consumer group '" + group + "' does not exist.", Optional.empty());
return false;
}
int num = numRows.get();
ConsumerGroupState state0 = state.orElse(ConsumerGroupState.UNKNOWN);
GroupState state0 = state.orElse(GroupState.UNKNOWN);
switch (state0) {
case DEAD:
printError("Consumer group '" + group + "' does not exist.", Optional.empty());
@ -310,16 +311,16 @@ public class ConsumerGroupCommand {
throw new KafkaException("Expected a valid consumer group state, but found '" + state0 + "'.");
}
return !state0.equals(ConsumerGroupState.DEAD) && num > 0;
return !state0.equals(GroupState.DEAD) && num > 0;
}
private Optional<Integer> size(Optional<? extends Collection<?>> colOpt) {
return colOpt.map(Collection::size);
}
private void printOffsets(Map<String, Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>>> offsets) {
private void printOffsets(Map<String, Entry<Optional<GroupState>, Optional<Collection<PartitionAssignmentState>>>> offsets) {
offsets.forEach((groupId, tuple) -> {
Optional<ConsumerGroupState> state = tuple.getKey();
Optional<GroupState> state = tuple.getKey();
Optional<Collection<PartitionAssignmentState>> assignments = tuple.getValue();
if (shouldPrintMemberState(groupId, state, size(assignments))) {
@ -361,14 +362,14 @@ public class ConsumerGroupCommand {
return "\n%" + (-maxGroupLen) + "s %" + (-maxTopicLen) + "s %-10s %-15s %-15s %-15s %" + (-maxConsumerIdLen) + "s %" + (-maxHostLen) + "s %s";
}
private void printMembers(Map<String, Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>>> members, boolean verbose) {
private void printMembers(Map<String, Entry<Optional<GroupState>, Optional<Collection<MemberAssignmentState>>>> members, boolean verbose) {
members.forEach((groupId, tuple) -> {
Optional<ConsumerGroupState> state = tuple.getKey();
Optional<GroupState> groupState = tuple.getKey();
Optional<Collection<MemberAssignmentState>> assignments = tuple.getValue();
int maxGroupLen = 15, maxConsumerIdLen = 15, maxGroupInstanceIdLen = 17, maxHostLen = 15, maxClientIdLen = 15;
boolean includeGroupInstanceId = false;
if (shouldPrintMemberState(groupId, state, size(assignments))) {
if (shouldPrintMemberState(groupId, groupState, size(assignments))) {
// find proper columns width
if (assignments.isPresent()) {
for (MemberAssignmentState memberAssignment : assignments.get()) {
@ -425,16 +426,16 @@ public class ConsumerGroupCommand {
});
}
private void printStates(Map<String, GroupState> states) {
private void printStates(Map<String, GroupInformation> states) {
states.forEach((groupId, state) -> {
if (shouldPrintMemberState(groupId, Optional.of(state.state), Optional.of(1))) {
if (shouldPrintMemberState(groupId, Optional.of(state.groupState), Optional.of(1))) {
String coordinator = state.coordinator.host() + ":" + state.coordinator.port() + " (" + state.coordinator.idString() + ")";
int coordinatorColLen = Math.max(25, coordinator.length());
String format = "\n%" + -coordinatorColLen + "s %-25s %-20s %-15s %s";
System.out.printf(format, "GROUP", "COORDINATOR (ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS");
System.out.printf(format, state.group, coordinator, state.assignmentStrategy, state.state, state.numMembers);
System.out.printf(format, state.group, coordinator, state.assignmentStrategy, state.groupState, state.numMembers);
System.out.println();
}
});
@ -450,15 +451,15 @@ public class ConsumerGroupCommand {
long subActions = Stream.of(membersOptPresent, offsetsOptPresent, stateOptPresent).filter(x -> x).count();
if (subActions == 0 || offsetsOptPresent) {
TreeMap<String, Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>>> offsets
TreeMap<String, Entry<Optional<GroupState>, Optional<Collection<PartitionAssignmentState>>>> offsets
= collectGroupsOffsets(groupIds);
printOffsets(offsets);
} else if (membersOptPresent) {
TreeMap<String, Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>>> members
TreeMap<String, Entry<Optional<GroupState>, Optional<Collection<MemberAssignmentState>>>> members
= collectGroupsMembers(groupIds, opts.options.has(opts.verboseOpt));
printMembers(members, opts.options.has(opts.verboseOpt));
} else {
TreeMap<String, GroupState> states = collectGroupsState(groupIds);
TreeMap<String, GroupInformation> states = collectGroupsState(groupIds);
printStates(states);
}
}
@ -530,7 +531,7 @@ public class ConsumerGroupCommand {
consumerGroups.forEach((groupId, groupDescription) -> {
try {
String state = groupDescription.get().state().toString();
String state = groupDescription.get().groupState().toString();
switch (state) {
case "Empty":
case "Dead":
@ -689,19 +690,19 @@ public class ConsumerGroupCommand {
/**
* Returns the state of the specified consumer group and partition assignment states
*/
Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> collectGroupOffsets(String groupId) throws Exception {
Entry<Optional<GroupState>, Optional<Collection<PartitionAssignmentState>>> collectGroupOffsets(String groupId) throws Exception {
return collectGroupsOffsets(Collections.singletonList(groupId)).getOrDefault(groupId, new SimpleImmutableEntry<>(Optional.empty(), Optional.empty()));
}
/**
* Returns states of the specified consumer groups and partition assignment states
*/
TreeMap<String, Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>>> collectGroupsOffsets(Collection<String> groupIds) throws Exception {
TreeMap<String, Entry<Optional<GroupState>, Optional<Collection<PartitionAssignmentState>>>> collectGroupsOffsets(Collection<String> groupIds) throws Exception {
Map<String, ConsumerGroupDescription> consumerGroups = describeConsumerGroups(groupIds);
TreeMap<String, Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>>> groupOffsets = new TreeMap<>();
TreeMap<String, Entry<Optional<GroupState>, Optional<Collection<PartitionAssignmentState>>>> groupOffsets = new TreeMap<>();
consumerGroups.forEach((groupId, consumerGroup) -> {
ConsumerGroupState state = consumerGroup.state();
GroupState state = consumerGroup.groupState();
Map<TopicPartition, OffsetAndMetadata> committedOffsets = getCommittedOffsets(groupId);
// The admin client returns `null` as a value to indicate that there is not committed offset for a partition.
Function<TopicPartition, Optional<Long>> getPartitionOffset = tp -> Optional.ofNullable(committedOffsets.get(tp)).map(OffsetAndMetadata::offset);
@ -746,16 +747,16 @@ public class ConsumerGroupCommand {
return groupOffsets;
}
Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> collectGroupMembers(String groupId, boolean verbose) throws Exception {
Entry<Optional<GroupState>, Optional<Collection<MemberAssignmentState>>> collectGroupMembers(String groupId, boolean verbose) throws Exception {
return collectGroupsMembers(Collections.singleton(groupId), verbose).get(groupId);
}
TreeMap<String, Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>>> collectGroupsMembers(Collection<String> groupIds, boolean verbose) throws Exception {
TreeMap<String, Entry<Optional<GroupState>, Optional<Collection<MemberAssignmentState>>>> collectGroupsMembers(Collection<String> groupIds, boolean verbose) throws Exception {
Map<String, ConsumerGroupDescription> consumerGroups = describeConsumerGroups(groupIds);
TreeMap<String, Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>>> res = new TreeMap<>();
TreeMap<String, Entry<Optional<GroupState>, Optional<Collection<MemberAssignmentState>>>> res = new TreeMap<>();
consumerGroups.forEach((groupId, consumerGroup) -> {
ConsumerGroupState state = consumerGroup.state();
GroupState state = consumerGroup.groupState();
List<MemberAssignmentState> memberAssignmentStates = consumerGroup.members().stream().map(consumer ->
new MemberAssignmentState(
groupId,
@ -771,19 +772,19 @@ public class ConsumerGroupCommand {
return res;
}
GroupState collectGroupState(String groupId) throws Exception {
GroupInformation collectGroupState(String groupId) throws Exception {
return collectGroupsState(Collections.singleton(groupId)).get(groupId);
}
TreeMap<String, GroupState> collectGroupsState(Collection<String> groupIds) throws Exception {
TreeMap<String, GroupInformation> collectGroupsState(Collection<String> groupIds) throws Exception {
Map<String, ConsumerGroupDescription> consumerGroups = describeConsumerGroups(groupIds);
TreeMap<String, GroupState> res = new TreeMap<>();
TreeMap<String, GroupInformation> res = new TreeMap<>();
consumerGroups.forEach((groupId, groupDescription) ->
res.put(groupId, new GroupState(
res.put(groupId, new GroupInformation(
groupId,
groupDescription.coordinator(),
groupDescription.partitionAssignor(),
groupDescription.state(),
groupDescription.groupState(),
groupDescription.members().size()
)));
return res;

View File

@ -16,21 +16,21 @@
*/
package org.apache.kafka.tools.consumer.group;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.Node;
class GroupState {
class GroupInformation {
final String group;
final Node coordinator;
final String assignmentStrategy;
final ConsumerGroupState state;
final GroupState groupState;
final int numMembers;
GroupState(String group, Node coordinator, String assignmentStrategy, ConsumerGroupState state, int numMembers) {
GroupInformation(String group, Node coordinator, String assignmentStrategy, GroupState groupState, int numMembers) {
this.group = group;
this.coordinator = coordinator;
this.assignmentStrategy = assignmentStrategy;
this.state = state;
this.groupState = groupState;
this.numMembers = numMembers;
}
}

View File

@ -20,14 +20,15 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AbstractOptions;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DescribeShareGroupsResult;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.clients.admin.ListGroupsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.ListShareGroupsOptions;
import org.apache.kafka.clients.admin.ListShareGroupsResult;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.ShareGroupDescription;
import org.apache.kafka.clients.admin.ShareGroupListing;
import org.apache.kafka.common.ShareGroupState;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandLineUtils;
@ -89,14 +90,13 @@ public class ShareGroupCommand {
}
}
static Set<ShareGroupState> shareGroupStatesFromString(String input) {
Set<ShareGroupState> parsedStates =
Arrays.stream(input.split(",")).map(s -> ShareGroupState.parse(s.trim())).collect(Collectors.toSet());
if (parsedStates.contains(ShareGroupState.UNKNOWN)) {
Collection<ShareGroupState> validStates =
Arrays.stream(ShareGroupState.values()).filter(s -> s != ShareGroupState.UNKNOWN).collect(Collectors.toList());
static Set<GroupState> groupStatesFromString(String input) {
Set<GroupState> parsedStates =
Arrays.stream(input.split(",")).map(s -> GroupState.parse(s.trim())).collect(Collectors.toSet());
Set<GroupState> validStates = GroupState.groupStatesForType(GroupType.SHARE);
if (!validStates.containsAll(parsedStates)) {
throw new IllegalArgumentException("Invalid state list '" + input + "'. Valid states are: " +
validStates.stream().map(Object::toString).collect(Collectors.joining(", ")));
validStates.stream().map(GroupState::toString).collect(Collectors.joining(", ")));
}
return parsedStates;
}
@ -128,10 +128,10 @@ public class ShareGroupCommand {
public void listGroups() throws ExecutionException, InterruptedException {
if (opts.options.has(opts.stateOpt)) {
String stateValue = opts.options.valueOf(opts.stateOpt);
Set<ShareGroupState> states = (stateValue == null || stateValue.isEmpty())
Set<GroupState> states = (stateValue == null || stateValue.isEmpty())
? Collections.emptySet()
: shareGroupStatesFromString(stateValue);
List<ShareGroupListing> listings = listShareGroupsWithState(states);
: groupStatesFromString(stateValue);
List<GroupListing> listings = listShareGroupsInStates(states);
printGroupInfo(listings);
} else
@ -140,31 +140,32 @@ public class ShareGroupCommand {
List<String> listShareGroups() {
try {
ListShareGroupsResult result = adminClient.listShareGroups(withTimeoutMs(new ListShareGroupsOptions()));
Collection<ShareGroupListing> listings = result.all().get();
return listings.stream().map(ShareGroupListing::groupId).collect(Collectors.toList());
ListGroupsResult result = adminClient.listGroups(withTimeoutMs(new ListGroupsOptions()).withTypes(Set.of(GroupType.SHARE)));
Collection<GroupListing> listings = result.all().get();
return listings.stream().map(GroupListing::groupId).collect(Collectors.toList());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
List<ShareGroupListing> listShareGroupsWithState(Set<ShareGroupState> states) throws ExecutionException, InterruptedException {
ListShareGroupsOptions listShareGroupsOptions = withTimeoutMs(new ListShareGroupsOptions());
listShareGroupsOptions.inStates(states);
ListShareGroupsResult result = adminClient.listShareGroups(listShareGroupsOptions);
List<GroupListing> listShareGroupsInStates(Set<GroupState> states) throws ExecutionException, InterruptedException {
ListGroupsOptions listGroupsOptions = withTimeoutMs(new ListGroupsOptions());
listGroupsOptions.withTypes(Set.of(GroupType.SHARE));
listGroupsOptions.inGroupStates(states);
ListGroupsResult result = adminClient.listGroups(listGroupsOptions);
return new ArrayList<>(result.all().get());
}
private void printGroupInfo(List<ShareGroupListing> groups) {
private void printGroupInfo(List<GroupListing> groups) {
// find proper columns width
int maxGroupLen = 15;
for (ShareGroupListing group : groups) {
for (GroupListing group : groups) {
maxGroupLen = Math.max(maxGroupLen, group.groupId().length());
}
System.out.printf("%" + (-maxGroupLen) + "s %s\n", "GROUP", "STATE");
for (ShareGroupListing group : groups) {
for (GroupListing group : groups) {
String groupId = group.groupId();
String state = group.state().orElse(ShareGroupState.UNKNOWN).toString();
String state = group.groupState().orElse(GroupState.UNKNOWN).toString();
System.out.printf("%" + (-maxGroupLen) + "s %s\n", groupId, state);
}
}
@ -174,14 +175,14 @@ public class ShareGroupCommand {
*
* @return Whether the group detail should be printed
*/
public static boolean maybePrintEmptyGroupState(String group, ShareGroupState state, int numRows) {
if (state == ShareGroupState.DEAD) {
public static boolean maybePrintEmptyGroupState(String group, GroupState state, int numRows) {
if (state == GroupState.DEAD) {
printError("Share group '" + group + "' does not exist.", Optional.empty());
} else if (state == ShareGroupState.EMPTY) {
} else if (state == GroupState.EMPTY) {
System.err.println("\nShare group '" + group + "' has no active members.");
}
return !state.equals(ShareGroupState.DEAD) && numRows > 0;
return !state.equals(GroupState.DEAD) && numRows > 0;
}
public void describeGroups() throws ExecutionException, InterruptedException {
@ -233,9 +234,9 @@ public class ShareGroupCommand {
private void printOffsets(ShareGroupDescription description) throws ExecutionException, InterruptedException {
Map<TopicPartition, Long> offsets = getOffsets(description.members());
if (maybePrintEmptyGroupState(description.groupId(), description.state(), offsets.size())) {
if (maybePrintEmptyGroupState(description.groupId(), description.groupState(), offsets.size())) {
String fmt = printOffsetFormat(description, offsets);
System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "OFFSET");
System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "START-OFFSET");
for (Map.Entry<TopicPartition, Long> offset : offsets.entrySet()) {
System.out.printf(fmt, description.groupId(), offset.getKey().topic(), offset.getKey().partition(), offset.getValue());
@ -253,7 +254,7 @@ public class ShareGroupCommand {
}
private void printStates(ShareGroupDescription description) {
maybePrintEmptyGroupState(description.groupId(), description.state(), 1);
maybePrintEmptyGroupState(description.groupId(), description.groupState(), 1);
int groupLen = Math.max(15, description.groupId().length());
String coordinator = description.coordinator().host() + ":" + description.coordinator().port() + " (" + description.coordinator().idString() + ")";
@ -261,14 +262,14 @@ public class ShareGroupCommand {
String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s %-15s %s\n";
System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE", "#MEMBERS");
System.out.printf(fmt, description.groupId(), coordinator, description.state().toString(), description.members().size());
System.out.printf(fmt, description.groupId(), coordinator, description.groupState().toString(), description.members().size());
}
private void printMembers(ShareGroupDescription description) {
int groupLen = Math.max(15, description.groupId().length());
int maxConsumerIdLen = 15, maxHostLen = 15, maxClientIdLen = 15;
Collection<MemberDescription> members = description.members();
if (maybePrintEmptyGroupState(description.groupId(), description.state(), description.members().size())) {
if (maybePrintEmptyGroupState(description.groupId(), description.groupState(), description.members().size())) {
for (MemberDescription member : members) {
maxConsumerIdLen = Math.max(maxConsumerIdLen, member.consumerId().length());
maxHostLen = Math.max(maxHostLen, member.host().length());

View File

@ -27,6 +27,7 @@ import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.Errors;
@ -198,9 +199,9 @@ public class GroupsCommandTest {
GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient);
ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer"),
new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer"),
new GroupListing("SG", Optional.of(GroupType.SHARE), "share")
new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer", Optional.of(GroupState.STABLE)),
new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)),
new GroupListing("SG", Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE))
);
when(adminClient.listGroups()).thenReturn(result);
@ -225,9 +226,9 @@ public class GroupsCommandTest {
GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient);
ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer"),
new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer"),
new GroupListing("SG", Optional.of(GroupType.SHARE), "share")
new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer", Optional.of(GroupState.STABLE)),
new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)),
new GroupListing("SG", Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE))
);
when(adminClient.listGroups()).thenReturn(result);
@ -251,9 +252,9 @@ public class GroupsCommandTest {
GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient);
ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer"),
new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer"),
new GroupListing("SG", Optional.of(GroupType.SHARE), "share")
new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer", Optional.of(GroupState.STABLE)),
new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)),
new GroupListing("SG", Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE))
);
when(adminClient.listGroups()).thenReturn(result);
@ -276,9 +277,9 @@ public class GroupsCommandTest {
GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient);
ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer"),
new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer"),
new GroupListing("SG", Optional.of(GroupType.SHARE), "share")
new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer", Optional.of(GroupState.STABLE)),
new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)),
new GroupListing("SG", Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE))
);
when(adminClient.listGroups()).thenReturn(result);
@ -302,9 +303,9 @@ public class GroupsCommandTest {
GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient);
ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer"),
new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer"),
new GroupListing("SG", Optional.of(GroupType.SHARE), "share")
new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer", Optional.of(GroupState.STABLE)),
new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)),
new GroupListing("SG", Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE))
);
when(adminClient.listGroups()).thenReturn(result);
@ -327,9 +328,9 @@ public class GroupsCommandTest {
GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient);
ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer"),
new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer"),
new GroupListing("SG", Optional.of(GroupType.SHARE), "share")
new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer", Optional.of(GroupState.STABLE)),
new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)),
new GroupListing("SG", Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE))
);
when(adminClient.listGroups()).thenReturn(result);
@ -352,8 +353,8 @@ public class GroupsCommandTest {
GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient);
ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer"),
new GroupListing("SG", Optional.of(GroupType.SHARE), "share")
new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)),
new GroupListing("SG", Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE))
);
when(adminClient.listGroups()).thenReturn(result);

View File

@ -31,7 +31,7 @@ import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
@ -85,14 +85,14 @@ public class ConsumerGroupServiceTest {
ConsumerGroupCommand.ConsumerGroupService groupService = consumerGroupService(args);
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(GROUP)), any()))
.thenReturn(describeGroupsResult(ConsumerGroupState.STABLE));
.thenReturn(describeGroupsResult(GroupState.STABLE));
when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec()), any()))
.thenReturn(listGroupOffsetsResult(GROUP));
when(admin.listOffsets(offsetsArgMatcher(), any()))
.thenReturn(listOffsetsResult());
Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> statesAndAssignments = groupService.collectGroupOffsets(GROUP);
assertEquals(Optional.of(ConsumerGroupState.STABLE), statesAndAssignments.getKey());
Entry<Optional<GroupState>, Optional<Collection<PartitionAssignmentState>>> statesAndAssignments = groupService.collectGroupOffsets(GROUP);
assertEquals(Optional.of(GroupState.STABLE), statesAndAssignments.getKey());
assertTrue(statesAndAssignments.getValue().isPresent());
assertEquals(TOPIC_PARTITIONS.size(), statesAndAssignments.getValue().get().size());
@ -139,7 +139,7 @@ public class ConsumerGroupServiceTest {
true,
Collections.singleton(new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", new MemberAssignment(assignedTopicPartitions))),
RangeAssignor.class.getName(),
ConsumerGroupState.STABLE,
GroupState.STABLE,
new Node(1, "localhost", 9092));
Function<Collection<TopicPartition>, ArgumentMatcher<Map<TopicPartition, OffsetSpec>>> offsetsArgMatcher = expectedPartitions ->
@ -164,8 +164,8 @@ public class ConsumerGroupServiceTest {
)).thenReturn(new ListOffsetsResult(endOffsets.entrySet().stream().filter(e -> unassignedTopicPartitions.contains(e.getKey()))
.collect(Collectors.toMap(Entry::getKey, Entry::getValue))));
Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> statesAndAssignments = groupService.collectGroupOffsets(GROUP);
Optional<ConsumerGroupState> state = statesAndAssignments.getKey();
Entry<Optional<GroupState>, Optional<Collection<PartitionAssignmentState>>> statesAndAssignments = groupService.collectGroupOffsets(GROUP);
Optional<GroupState> state = statesAndAssignments.getKey();
Optional<Collection<PartitionAssignmentState>> assignments = statesAndAssignments.getValue();
Map<TopicPartition, Optional<Long>> returnedOffsets = assignments.map(results ->
@ -183,7 +183,7 @@ public class ConsumerGroupServiceTest {
expectedOffsets.put(testTopicPartition4, Optional.of(100L));
expectedOffsets.put(testTopicPartition5, Optional.empty());
assertEquals(Optional.of(ConsumerGroupState.STABLE), state);
assertEquals(Optional.of(GroupState.STABLE), state);
assertEquals(expectedOffsets, returnedOffsets);
verify(admin, times(1)).describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(GROUP)), any());
@ -203,7 +203,7 @@ public class ConsumerGroupServiceTest {
ConsumerGroupCommand.ConsumerGroupService groupService = consumerGroupService(args.toArray(new String[0]));
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(GROUP)), any()))
.thenReturn(describeGroupsResult(ConsumerGroupState.DEAD));
.thenReturn(describeGroupsResult(GroupState.DEAD));
when(admin.describeTopics(ArgumentMatchers.eq(topicsWithoutPartitionsSpecified), any()))
.thenReturn(describeTopicsResult(topicsWithoutPartitionsSpecified));
when(admin.listOffsets(offsetsArgMatcher(), any()))
@ -227,7 +227,7 @@ public class ConsumerGroupServiceTest {
};
}
private DescribeConsumerGroupsResult describeGroupsResult(ConsumerGroupState groupState) {
private DescribeConsumerGroupsResult describeGroupsResult(GroupState groupState) {
MemberDescription member1 = new MemberDescription("member1", Optional.of("instance1"), "client1", "host1", null);
ConsumerGroupDescription description = new ConsumerGroupDescription(GROUP,
true,

View File

@ -20,7 +20,7 @@ import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.protocol.Errors;
@ -56,8 +56,8 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CO
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.common.ConsumerGroupState.EMPTY;
import static org.apache.kafka.common.ConsumerGroupState.STABLE;
import static org.apache.kafka.common.GroupState.EMPTY;
import static org.apache.kafka.common.GroupState.STABLE;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@ -318,8 +318,8 @@ public class DeleteConsumerGroupsTest {
);
}
private boolean checkGroupState(ConsumerGroupCommand.ConsumerGroupService service, String groupId, ConsumerGroupState state) throws Exception {
return Objects.equals(service.collectGroupState(groupId).state, state);
private boolean checkGroupState(ConsumerGroupCommand.ConsumerGroupService service, String groupId, GroupState state) throws Exception {
return Objects.equals(service.collectGroupState(groupId).groupState, state);
}
private ConsumerGroupCommand.ConsumerGroupService getConsumerGroupService(String[] args) {

View File

@ -26,7 +26,7 @@ import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.StringDeserializer;
@ -113,8 +113,8 @@ public class DescribeConsumerGroupTest {
// note the group to be queried is a different (non-existing) group
ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup})
) {
Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(missingGroup);
assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false),
Entry<Optional<GroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(missingGroup);
assertTrue(res.getKey().map(s -> s.equals(GroupState.DEAD)).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false),
"Expected the state to be 'Dead', with no members in the group '" + missingGroup + "'.");
}
}
@ -132,12 +132,12 @@ public class DescribeConsumerGroupTest {
// note the group to be queried is a different (non-existing) group
ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup})
) {
Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(missingGroup, false);
assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false),
Entry<Optional<GroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(missingGroup, false);
assertTrue(res.getKey().map(s -> s.equals(GroupState.DEAD)).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false),
"Expected the state to be 'Dead', with no members in the group '" + missingGroup + "'.");
Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res2 = service.collectGroupMembers(missingGroup, true);
assertTrue(res2.getKey().map(s -> s.equals(ConsumerGroupState.DEAD)).orElse(false) && res2.getValue().map(Collection::isEmpty).orElse(false),
Entry<Optional<GroupState>, Optional<Collection<MemberAssignmentState>>> res2 = service.collectGroupMembers(missingGroup, true);
assertTrue(res2.getKey().map(s -> s.equals(GroupState.DEAD)).orElse(false) && res2.getValue().map(Collection::isEmpty).orElse(false),
"Expected the state to be 'Dead', with no members in the group '" + missingGroup + "' (verbose option).");
}
}
@ -155,8 +155,8 @@ public class DescribeConsumerGroupTest {
// note the group to be queried is a different (non-existing) group
ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup})
) {
GroupState state = service.collectGroupState(missingGroup);
assertTrue(Objects.equals(state.state, ConsumerGroupState.DEAD) && state.numMembers == 0 &&
GroupInformation state = service.collectGroupState(missingGroup);
assertTrue(Objects.equals(state.groupState, GroupState.DEAD) && state.numMembers == 0 &&
state.coordinator != null && clusterInstance.brokerIds().contains(state.coordinator.id()),
"Expected the state to be 'Dead', with no members in the group '" + missingGroup + "'."
);
@ -277,13 +277,13 @@ public class DescribeConsumerGroupTest {
ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group})
) {
TestUtils.waitForCondition(() -> {
Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> groupOffsets = service.collectGroupOffsets(group);
Optional<ConsumerGroupState> state = groupOffsets.getKey();
Entry<Optional<GroupState>, Optional<Collection<PartitionAssignmentState>>> groupOffsets = service.collectGroupOffsets(group);
Optional<GroupState> state = groupOffsets.getKey();
Optional<Collection<PartitionAssignmentState>> assignments = groupOffsets.getValue();
Predicate<PartitionAssignmentState> isGrp = s -> Objects.equals(s.group, group);
boolean res = state.map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) &&
boolean res = state.map(s -> s.equals(GroupState.STABLE)).orElse(false) &&
assignments.isPresent() &&
assignments.get().stream().filter(isGrp).count() == 1;
@ -322,7 +322,7 @@ public class DescribeConsumerGroupTest {
return consumerGroupDescription.members().size() == 1 && consumerGroupDescription.members().iterator().next().assignment().topicPartitions().size() == 1;
}, "Expected a 'Stable' group status, rows and valid member information for group " + group + ".");
Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group, true);
Entry<Optional<GroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group, true);
assertTrue(res.getValue().isPresent());
assertTrue(res.getValue().get().size() == 1 && res.getValue().get().iterator().next().assignment.size() == 1,
@ -344,8 +344,8 @@ public class DescribeConsumerGroupTest {
ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group})
) {
TestUtils.waitForCondition(() -> {
GroupState state = service.collectGroupState(group);
return Objects.equals(state.state, ConsumerGroupState.STABLE) &&
GroupInformation state = service.collectGroupState(group);
return Objects.equals(state.groupState, GroupState.STABLE) &&
state.numMembers == 1 &&
state.coordinator != null &&
clusterInstance.brokerIds().contains(state.coordinator.id());
@ -376,8 +376,8 @@ public class DescribeConsumerGroupTest {
try (ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group})) {
TestUtils.waitForCondition(() -> {
GroupState state = service.collectGroupState(group);
return Objects.equals(state.state, ConsumerGroupState.STABLE) &&
GroupInformation state = service.collectGroupState(group);
return Objects.equals(state.groupState, GroupState.STABLE) &&
state.numMembers == 1 &&
Objects.equals(state.assignmentStrategy, expectedName) &&
state.coordinator != null &&
@ -434,8 +434,8 @@ public class DescribeConsumerGroupTest {
ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group})
) {
TestUtils.waitForCondition(() -> {
Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(group);
return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false)
Entry<Optional<GroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(group);
return res.getKey().map(s -> s.equals(GroupState.STABLE)).orElse(false)
&& res.getValue().map(c -> c.stream().anyMatch(assignment -> Objects.equals(assignment.group, group) && assignment.offset.isPresent())).orElse(false);
}, "Expected the group to initially become stable, and to find group in assignments after initial offset commit.");
@ -443,12 +443,12 @@ public class DescribeConsumerGroupTest {
protocolConsumerGroupExecutor.close();
TestUtils.waitForCondition(() -> {
Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> offsets = service.collectGroupOffsets(group);
Optional<ConsumerGroupState> state = offsets.getKey();
Entry<Optional<GroupState>, Optional<Collection<PartitionAssignmentState>>> offsets = service.collectGroupOffsets(group);
Optional<GroupState> state = offsets.getKey();
Optional<Collection<PartitionAssignmentState>> assignments = offsets.getValue();
List<PartitionAssignmentState> testGroupAssignments = assignments.get().stream().filter(a -> Objects.equals(a.group, group)).collect(Collectors.toList());
PartitionAssignmentState assignment = testGroupAssignments.get(0);
return state.map(s -> s.equals(ConsumerGroupState.EMPTY)).orElse(false) &&
return state.map(s -> s.equals(GroupState.EMPTY)).orElse(false) &&
testGroupAssignments.size() == 1 &&
assignment.consumerId.map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && // the member should be gone
assignment.clientId.map(c -> c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) &&
@ -471,8 +471,8 @@ public class DescribeConsumerGroupTest {
ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group})
) {
TestUtils.waitForCondition(() -> {
Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group, false);
return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false)
Entry<Optional<GroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group, false);
return res.getKey().map(s -> s.equals(GroupState.STABLE)).orElse(false)
&& res.getValue().map(c -> c.stream().anyMatch(m -> Objects.equals(m.group, group))).orElse(false);
}, "Expected the group to initially become stable, and to find group in assignments after initial offset commit.");
@ -480,8 +480,8 @@ public class DescribeConsumerGroupTest {
protocolConsumerGroupExecutor.close();
TestUtils.waitForCondition(() -> {
Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group, false);
return res.getKey().map(s -> s.equals(ConsumerGroupState.EMPTY)).orElse(false) && res.getValue().isPresent() && res.getValue().get().isEmpty();
Entry<Optional<GroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group, false);
return res.getKey().map(s -> s.equals(GroupState.EMPTY)).orElse(false) && res.getValue().isPresent() && res.getValue().get().isEmpty();
}, "Expected no member in describe group members results for group '" + group + "'");
}
}
@ -500,8 +500,8 @@ public class DescribeConsumerGroupTest {
ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group})
) {
TestUtils.waitForCondition(() -> {
GroupState state = service.collectGroupState(group);
return Objects.equals(state.state, ConsumerGroupState.STABLE) &&
GroupInformation state = service.collectGroupState(group);
return Objects.equals(state.groupState, GroupState.STABLE) &&
state.numMembers == 1 &&
state.coordinator != null &&
clusterInstance.brokerIds().contains(state.coordinator.id());
@ -511,8 +511,8 @@ public class DescribeConsumerGroupTest {
protocolConsumerGroupExecutor.close();
TestUtils.waitForCondition(() -> {
GroupState state = service.collectGroupState(group);
return Objects.equals(state.state, ConsumerGroupState.EMPTY) && state.numMembers == 0;
GroupInformation state = service.collectGroupState(group);
return Objects.equals(state.groupState, GroupState.EMPTY) && state.numMembers == 0;
}, "Expected the group to become empty after the only member leaving.");
}
}
@ -556,8 +556,8 @@ public class DescribeConsumerGroupTest {
ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group})
) {
TestUtils.waitForCondition(() -> {
Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(group);
return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).isPresent() &&
Entry<Optional<GroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(group);
return res.getKey().map(s -> s.equals(GroupState.STABLE)).isPresent() &&
res.getValue().isPresent() &&
res.getValue().get().stream().filter(s -> Objects.equals(s.group, group)).count() == 1 &&
res.getValue().get().stream().filter(x -> Objects.equals(x.group, group) && x.partition.isPresent()).count() == 1;
@ -579,8 +579,8 @@ public class DescribeConsumerGroupTest {
ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group})
) {
TestUtils.waitForCondition(() -> {
Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group, false);
return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) &&
Entry<Optional<GroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group, false);
return res.getKey().map(s -> s.equals(GroupState.STABLE)).orElse(false) &&
res.getValue().isPresent() &&
res.getValue().get().stream().filter(s -> Objects.equals(s.group, group)).count() == 2 &&
res.getValue().get().stream().filter(x -> Objects.equals(x.group, group) && x.numPartitions == 1).count() == 1 &&
@ -588,8 +588,8 @@ public class DescribeConsumerGroupTest {
res.getValue().get().stream().allMatch(s -> s.assignment.isEmpty());
}, "Expected rows for consumers with no assigned partitions in describe group results");
Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group, true);
assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false)
Entry<Optional<GroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group, true);
assertTrue(res.getKey().map(s -> s.equals(GroupState.STABLE)).orElse(false)
&& res.getValue().map(c -> c.stream().anyMatch(s -> !s.assignment.isEmpty())).orElse(false),
"Expected additional columns in verbose version of describe members");
}
@ -609,8 +609,8 @@ public class DescribeConsumerGroupTest {
ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group})
) {
TestUtils.waitForCondition(() -> {
GroupState state = service.collectGroupState(group);
return Objects.equals(state.state, ConsumerGroupState.STABLE) && state.numMembers == 2;
GroupInformation state = service.collectGroupState(group);
return Objects.equals(state.groupState, GroupState.STABLE) && state.numMembers == 2;
}, "Expected two consumers in describe group results");
}
}
@ -654,8 +654,8 @@ public class DescribeConsumerGroupTest {
ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group})
) {
TestUtils.waitForCondition(() -> {
Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(group);
return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) &&
Entry<Optional<GroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(group);
return res.getKey().map(s -> s.equals(GroupState.STABLE)).orElse(false) &&
res.getValue().isPresent() &&
res.getValue().get().stream().filter(s -> Objects.equals(s.group, group)).count() == 2 &&
res.getValue().get().stream().filter(x -> Objects.equals(x.group, group) && x.partition.isPresent()).count() == 2 &&
@ -678,16 +678,16 @@ public class DescribeConsumerGroupTest {
ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group})
) {
TestUtils.waitForCondition(() -> {
Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group, false);
return res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) &&
Entry<Optional<GroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group, false);
return res.getKey().map(s -> s.equals(GroupState.STABLE)).orElse(false) &&
res.getValue().isPresent() &&
res.getValue().get().stream().filter(s -> Objects.equals(s.group, group)).count() == 2 &&
res.getValue().get().stream().filter(x -> Objects.equals(x.group, group) && x.numPartitions == 1).count() == 2 &&
res.getValue().get().stream().noneMatch(x -> Objects.equals(x.group, group) && x.numPartitions == 0);
}, "Expected two rows (one row per consumer) in describe group members results.");
Entry<Optional<ConsumerGroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group, true);
assertTrue(res.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) && res.getValue().map(s -> s.stream().filter(x -> x.assignment.isEmpty()).count()).orElse(0L) == 0,
Entry<Optional<GroupState>, Optional<Collection<MemberAssignmentState>>> res = service.collectGroupMembers(group, true);
assertTrue(res.getKey().map(s -> s.equals(GroupState.STABLE)).orElse(false) && res.getValue().map(s -> s.stream().filter(x -> x.assignment.isEmpty()).count()).orElse(0L) == 0,
"Expected additional columns in verbose version of describe members");
}
}
@ -706,8 +706,8 @@ public class DescribeConsumerGroupTest {
ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group})
) {
TestUtils.waitForCondition(() -> {
GroupState state = service.collectGroupState(group);
return Objects.equals(state.state, ConsumerGroupState.STABLE) && Objects.equals(state.group, group) && state.numMembers == 2;
GroupInformation state = service.collectGroupState(group);
return Objects.equals(state.groupState, GroupState.STABLE) && Objects.equals(state.group, group) && state.numMembers == 2;
}, "Expected a stable group with two members in describe group state result.");
}
}
@ -726,8 +726,8 @@ public class DescribeConsumerGroupTest {
ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group})
) {
TestUtils.waitForCondition(() -> {
Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(group);
return res.getKey().map(s -> s.equals(ConsumerGroupState.EMPTY)).orElse(false)
Entry<Optional<GroupState>, Optional<Collection<PartitionAssignmentState>>> res = service.collectGroupOffsets(group);
return res.getKey().map(s -> s.equals(GroupState.EMPTY)).orElse(false)
&& res.getValue().isPresent() && res.getValue().get().stream().filter(s -> Objects.equals(s.group, group)).count() == 2;
}, "Expected a stable group with two members in describe group state result.");
}
@ -841,11 +841,11 @@ public class DescribeConsumerGroupTest {
ConsumerGroupCommand.ConsumerGroupService service = consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group})
) {
TestUtils.waitForCondition(() -> {
Entry<Optional<ConsumerGroupState>, Optional<Collection<PartitionAssignmentState>>> groupOffsets = service.collectGroupOffsets(group);
Entry<Optional<GroupState>, Optional<Collection<PartitionAssignmentState>>> groupOffsets = service.collectGroupOffsets(group);
Predicate<PartitionAssignmentState> isGrp = s -> Objects.equals(s.group, group);
boolean res = groupOffsets.getKey().map(s -> s.equals(ConsumerGroupState.STABLE)).orElse(false) &&
boolean res = groupOffsets.getKey().map(s -> s.equals(GroupState.STABLE)).orElse(false) &&
groupOffsets.getValue().isPresent() &&
groupOffsets.getValue().get().stream().filter(isGrp).count() == 1;

View File

@ -24,7 +24,7 @@ import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
@ -98,7 +98,7 @@ public class ListConsumerGroupTest {
try (AutoCloseable topicPartitionsConsumerGroupExecutor = consumerGroupClosable(topicPartitionsGroup, Collections.singleton(new TopicPartition(topic, 0)));
AutoCloseable topicConsumerGroupExecutor = consumerGroupClosable(GroupProtocol.CLASSIC, topicGroup, topic);
AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, protocolGroup, topic);
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--list"});
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--list"})
) {
Set<String> expectedGroups = set(Arrays.asList(topicPartitionsGroup, topicGroup, protocolGroup));
final AtomicReference<Set> foundGroups = new AtomicReference<>();
@ -131,50 +131,50 @@ public class ListConsumerGroupTest {
try (AutoCloseable topicPartitionsConsumerGroupExecutor = consumerGroupClosable(topicPartitionsGroup, Collections.singleton(new TopicPartition(topic, 0)));
AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, protocolGroup, topic);
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--state"});
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--state"})
) {
Set<ConsumerGroupListing> expectedListing = Set.of(
new ConsumerGroupListing(
topicPartitionsGroup,
true,
Optional.of(ConsumerGroupState.EMPTY),
Optional.of(GroupType.CLASSIC)
Optional.of(GroupState.EMPTY),
Optional.of(GroupType.CLASSIC),
true
),
new ConsumerGroupListing(
protocolGroup,
false,
Optional.of(ConsumerGroupState.STABLE),
Optional.of(GroupType.parse(groupProtocol.name()))
Optional.of(GroupState.STABLE),
Optional.of(GroupType.parse(groupProtocol.name())),
false
)
);
assertGroupListing(
service,
Collections.emptySet(),
EnumSet.allOf(ConsumerGroupState.class),
EnumSet.allOf(GroupState.class),
expectedListing
);
expectedListing = Set.of(
new ConsumerGroupListing(
protocolGroup,
false,
Optional.of(ConsumerGroupState.STABLE),
Optional.of(GroupType.parse(groupProtocol.name()))
Optional.of(GroupState.STABLE),
Optional.of(GroupType.parse(groupProtocol.name())),
false
)
);
assertGroupListing(
service,
Collections.emptySet(),
Set.of(ConsumerGroupState.STABLE),
Set.of(GroupState.STABLE),
expectedListing
);
assertGroupListing(
service,
Collections.emptySet(),
Set.of(ConsumerGroupState.PREPARING_REBALANCE),
Set.of(GroupState.PREPARING_REBALANCE),
Collections.emptySet()
);
}
@ -194,20 +194,20 @@ public class ListConsumerGroupTest {
try (AutoCloseable topicPartitionsConsumerGroupExecutor = consumerGroupClosable(topicPartitionsGroup, Collections.singleton(new TopicPartition(topic, 0)));
AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, protocolGroup, topic);
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--state"});
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--list", "--state"})
) {
Set<ConsumerGroupListing> expectedListing = Set.of(
new ConsumerGroupListing(
topicPartitionsGroup,
true,
Optional.of(ConsumerGroupState.EMPTY),
Optional.of(GroupType.CLASSIC)
Optional.of(GroupState.EMPTY),
Optional.of(GroupType.CLASSIC),
true
),
new ConsumerGroupListing(
protocolGroup,
false,
Optional.of(ConsumerGroupState.STABLE),
Optional.of(GroupType.CLASSIC)
Optional.of(GroupState.STABLE),
Optional.of(GroupType.CLASSIC),
false
)
);
@ -250,7 +250,7 @@ public class ListConsumerGroupTest {
try (AutoCloseable topicPartitionsConsumerGroupExecutor = consumerGroupClosable(topicPartitionsGroup, Collections.singleton(new TopicPartition(topic, 0)));
AutoCloseable topicConsumerGroupExecutor = consumerGroupClosable(GroupProtocol.CLASSIC, topicGroup, topic);
AutoCloseable protocolConsumerGroupExecutor = consumerGroupClosable(groupProtocol, protocolGroup, topic);
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--list"});
ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--list"})
) {
@ -258,21 +258,21 @@ public class ListConsumerGroupTest {
Set<ConsumerGroupListing> expectedListing = Set.of(
new ConsumerGroupListing(
topicPartitionsGroup,
true,
Optional.of(ConsumerGroupState.EMPTY),
Optional.of(GroupType.CLASSIC)
Optional.of(GroupState.EMPTY),
Optional.of(GroupType.CLASSIC),
true
),
new ConsumerGroupListing(
topicGroup,
false,
Optional.of(ConsumerGroupState.STABLE),
Optional.of(GroupType.CLASSIC)
Optional.of(GroupState.STABLE),
Optional.of(GroupType.CLASSIC),
false
),
new ConsumerGroupListing(
protocolGroup,
false,
Optional.of(ConsumerGroupState.STABLE),
Optional.of(GroupType.CONSUMER)
Optional.of(GroupState.STABLE),
Optional.of(GroupType.CONSUMER),
false
)
);
@ -288,9 +288,9 @@ public class ListConsumerGroupTest {
expectedListing = Set.of(
new ConsumerGroupListing(
protocolGroup,
false,
Optional.of(ConsumerGroupState.STABLE),
Optional.of(GroupType.CONSUMER)
Optional.of(GroupState.STABLE),
Optional.of(GroupType.CONSUMER),
false
)
);
@ -304,15 +304,15 @@ public class ListConsumerGroupTest {
expectedListing = Set.of(
new ConsumerGroupListing(
topicPartitionsGroup,
true,
Optional.of(ConsumerGroupState.EMPTY),
Optional.of(GroupType.CLASSIC)
Optional.of(GroupState.EMPTY),
Optional.of(GroupType.CLASSIC),
true
),
new ConsumerGroupListing(
topicGroup,
false,
Optional.of(ConsumerGroupState.STABLE),
Optional.of(GroupType.CLASSIC)
Optional.of(GroupState.STABLE),
Optional.of(GroupType.CLASSIC),
false
)
);
@ -550,18 +550,18 @@ public class ListConsumerGroupTest {
*
* @param service The service to list consumer groups.
* @param typeFilterSet Filters for group types, empty for no filter.
* @param stateFilterSet Filters for group states, empty for no filter.
* @param groupStateFilterSet Filters for group states, empty for no filter.
* @param expectedListing Expected consumer group listings.
*/
private static void assertGroupListing(
ConsumerGroupCommand.ConsumerGroupService service,
Set<GroupType> typeFilterSet,
Set<ConsumerGroupState> stateFilterSet,
Set<GroupState> groupStateFilterSet,
Set<ConsumerGroupListing> expectedListing
) throws Exception {
final AtomicReference<Set<ConsumerGroupListing>> foundListing = new AtomicReference<>();
TestUtils.waitForCondition(() -> {
foundListing.set(set(service.listConsumerGroupsWithFilters(set(typeFilterSet), set(stateFilterSet))));
foundListing.set(set(service.listConsumerGroupsWithFilters(set(typeFilterSet), set(groupStateFilterSet))));
return Objects.equals(set(expectedListing), foundListing.get());
}, () -> "Expected to show groups " + expectedListing + ", but found " + foundListing.get() + ".");
}
@ -614,29 +614,29 @@ public class ListConsumerGroupTest {
class ListConsumerGroupUnitTest {
@Test
public void testConsumerGroupStatesFromString() {
Set<ConsumerGroupState> result = ConsumerGroupCommand.consumerGroupStatesFromString("Stable");
Assertions.assertEquals(ListConsumerGroupTest.set(Collections.singleton(ConsumerGroupState.STABLE)), result);
Set<GroupState> result = ConsumerGroupCommand.groupStatesFromString("Stable");
Assertions.assertEquals(ListConsumerGroupTest.set(Collections.singleton(GroupState.STABLE)), result);
result = ConsumerGroupCommand.consumerGroupStatesFromString("Stable, PreparingRebalance");
Assertions.assertEquals(ListConsumerGroupTest.set(Arrays.asList(ConsumerGroupState.STABLE, ConsumerGroupState.PREPARING_REBALANCE)), result);
result = ConsumerGroupCommand.groupStatesFromString("Stable, PreparingRebalance");
Assertions.assertEquals(ListConsumerGroupTest.set(Arrays.asList(GroupState.STABLE, GroupState.PREPARING_REBALANCE)), result);
result = ConsumerGroupCommand.consumerGroupStatesFromString("Dead,CompletingRebalance,");
Assertions.assertEquals(ListConsumerGroupTest.set(Arrays.asList(ConsumerGroupState.DEAD, ConsumerGroupState.COMPLETING_REBALANCE)), result);
result = ConsumerGroupCommand.groupStatesFromString("Dead,CompletingRebalance,");
Assertions.assertEquals(ListConsumerGroupTest.set(Arrays.asList(GroupState.DEAD, GroupState.COMPLETING_REBALANCE)), result);
result = ConsumerGroupCommand.consumerGroupStatesFromString("stable");
Assertions.assertEquals(ListConsumerGroupTest.set(Collections.singletonList(ConsumerGroupState.STABLE)), result);
result = ConsumerGroupCommand.groupStatesFromString("stable");
Assertions.assertEquals(ListConsumerGroupTest.set(Collections.singletonList(GroupState.STABLE)), result);
result = ConsumerGroupCommand.consumerGroupStatesFromString("stable, assigning");
Assertions.assertEquals(ListConsumerGroupTest.set(Arrays.asList(ConsumerGroupState.STABLE, ConsumerGroupState.ASSIGNING)), result);
result = ConsumerGroupCommand.groupStatesFromString("stable, assigning");
Assertions.assertEquals(ListConsumerGroupTest.set(Arrays.asList(GroupState.STABLE, GroupState.ASSIGNING)), result);
result = ConsumerGroupCommand.consumerGroupStatesFromString("dead,reconciling,");
Assertions.assertEquals(ListConsumerGroupTest.set(Arrays.asList(ConsumerGroupState.DEAD, ConsumerGroupState.RECONCILING)), result);
result = ConsumerGroupCommand.groupStatesFromString("dead,reconciling,");
Assertions.assertEquals(ListConsumerGroupTest.set(Arrays.asList(GroupState.DEAD, GroupState.RECONCILING)), result);
Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumerGroupCommand.consumerGroupStatesFromString("bad, wrong"));
Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumerGroupCommand.groupStatesFromString("bad, wrong"));
Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumerGroupCommand.consumerGroupStatesFromString(" bad, Stable"));
Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumerGroupCommand.groupStatesFromString(" bad, Stable"));
Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumerGroupCommand.consumerGroupStatesFromString(" , ,"));
Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumerGroupCommand.groupStatesFromString(" , ,"));
}
@Test

View File

@ -25,7 +25,7 @@ import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
@ -834,10 +834,10 @@ public class ResetConsumerGroupOffsetTest {
private void awaitConsumerGroupInactive(ConsumerGroupCommand.ConsumerGroupService service,
String group) throws Exception {
TestUtils.waitForCondition(() -> {
ConsumerGroupState state = service.collectGroupState(group).state;
return Objects.equals(state, ConsumerGroupState.EMPTY) || Objects.equals(state, ConsumerGroupState.DEAD);
GroupState state = service.collectGroupState(group).groupState;
return Objects.equals(state, GroupState.EMPTY) || Objects.equals(state, GroupState.DEAD);
}, "Expected that consumer group is inactive. Actual state: " +
service.collectGroupState(group).state);
service.collectGroupState(group).groupState);
}
private void resetAndAssertOffsetsCommitted(ClusterInstance cluster,

View File

@ -18,18 +18,19 @@ package org.apache.kafka.tools.consumer.group;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DescribeShareGroupsResult;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.clients.admin.ListGroupsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.ListShareGroupsOptions;
import org.apache.kafka.clients.admin.ListShareGroupsResult;
import org.apache.kafka.clients.admin.MemberAssignment;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.ShareGroupDescription;
import org.apache.kafka.clients.admin.ShareGroupListing;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.ShareGroupState;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.consumer.group.ShareGroupCommand.ShareGroupService;
@ -66,12 +67,12 @@ public class ShareGroupCommandTest {
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--list"};
Admin adminClient = mock(KafkaAdminClient.class);
ListShareGroupsResult result = mock(ListShareGroupsResult.class);
ListGroupsResult result = mock(ListGroupsResult.class);
when(result.all()).thenReturn(KafkaFuture.completedFuture(Arrays.asList(
new ShareGroupListing(firstGroup, Optional.of(ShareGroupState.STABLE)),
new ShareGroupListing(secondGroup, Optional.of(ShareGroupState.EMPTY))
new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)),
new GroupListing(secondGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.EMPTY))
)));
when(adminClient.listShareGroups(any(ListShareGroupsOptions.class))).thenReturn(result);
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(result);
ShareGroupService service = getShareGroupService(cgcArgs, adminClient);
Set<String> expectedGroups = new HashSet<>(Arrays.asList(firstGroup, secondGroup));
@ -94,7 +95,7 @@ public class ShareGroupCommandTest {
Collections.singletonList(new MemberDescription("memid1", "clId1", "host1", new MemberAssignment(
Collections.singleton(new TopicPartition("topic1", 0))
))),
ShareGroupState.STABLE,
GroupState.STABLE,
new Node(0, "host1", 9090));
resultMap.put(firstGroup, exp);
@ -133,11 +134,11 @@ public class ShareGroupCommandTest {
@Test
public void testPrintEmptyGroupState() {
assertFalse(ShareGroupService.maybePrintEmptyGroupState("group", ShareGroupState.EMPTY, 0));
assertFalse(ShareGroupService.maybePrintEmptyGroupState("group", ShareGroupState.DEAD, 0));
assertFalse(ShareGroupService.maybePrintEmptyGroupState("group", ShareGroupState.STABLE, 0));
assertTrue(ShareGroupService.maybePrintEmptyGroupState("group", ShareGroupState.STABLE, 1));
assertTrue(ShareGroupService.maybePrintEmptyGroupState("group", ShareGroupState.UNKNOWN, 1));
assertFalse(ShareGroupService.maybePrintEmptyGroupState("group", GroupState.EMPTY, 0));
assertFalse(ShareGroupService.maybePrintEmptyGroupState("group", GroupState.DEAD, 0));
assertFalse(ShareGroupService.maybePrintEmptyGroupState("group", GroupState.STABLE, 0));
assertTrue(ShareGroupService.maybePrintEmptyGroupState("group", GroupState.STABLE, 1));
assertTrue(ShareGroupService.maybePrintEmptyGroupState("group", GroupState.UNKNOWN, 1));
}
@Test
@ -155,59 +156,61 @@ public class ShareGroupCommandTest {
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--list", "--state"};
Admin adminClient = mock(KafkaAdminClient.class);
ListShareGroupsResult resultWithAllStates = mock(ListShareGroupsResult.class);
ListGroupsResult resultWithAllStates = mock(ListGroupsResult.class);
when(resultWithAllStates.all()).thenReturn(KafkaFuture.completedFuture(Arrays.asList(
new ShareGroupListing(firstGroup, Optional.of(ShareGroupState.STABLE)),
new ShareGroupListing(secondGroup, Optional.of(ShareGroupState.EMPTY))
new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)),
new GroupListing(secondGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.EMPTY))
)));
when(adminClient.listShareGroups(any(ListShareGroupsOptions.class))).thenReturn(resultWithAllStates);
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithAllStates);
ShareGroupService service = getShareGroupService(cgcArgs, adminClient);
Set<ShareGroupListing> expectedListing = new HashSet<>(Arrays.asList(
new ShareGroupListing(firstGroup, Optional.of(ShareGroupState.STABLE)),
new ShareGroupListing(secondGroup, Optional.of(ShareGroupState.EMPTY))));
Set<GroupListing> expectedListing = new HashSet<>(Arrays.asList(
new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)),
new GroupListing(secondGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.EMPTY))));
final Set[] foundListing = new Set[]{Collections.emptySet()};
TestUtils.waitForCondition(() -> {
foundListing[0] = new HashSet<>(service.listShareGroupsWithState(new HashSet<>(Arrays.asList(ShareGroupState.values()))));
foundListing[0] = new HashSet<>(service.listShareGroupsInStates(new HashSet<>(Arrays.asList(GroupState.values()))));
return Objects.equals(expectedListing, foundListing[0]);
}, "Expected to show groups " + expectedListing + ", but found " + foundListing[0]);
ListShareGroupsResult resultWithStableState = mock(ListShareGroupsResult.class);
ListGroupsResult resultWithStableState = mock(ListGroupsResult.class);
when(resultWithStableState.all()).thenReturn(KafkaFuture.completedFuture(Collections.singletonList(
new ShareGroupListing(firstGroup, Optional.of(ShareGroupState.STABLE))
new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE))
)));
when(adminClient.listShareGroups(any(ListShareGroupsOptions.class))).thenReturn(resultWithStableState);
Set<ShareGroupListing> expectedListingStable = Collections.singleton(
new ShareGroupListing(firstGroup, Optional.of(ShareGroupState.STABLE)));
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithStableState);
Set<GroupListing> expectedListingStable = Collections.singleton(
new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)));
foundListing[0] = Collections.emptySet();
TestUtils.waitForCondition(() -> {
foundListing[0] = new HashSet<>(service.listShareGroupsWithState(Collections.singleton(ShareGroupState.STABLE)));
foundListing[0] = new HashSet<>(service.listShareGroupsInStates(Collections.singleton(GroupState.STABLE)));
return Objects.equals(expectedListingStable, foundListing[0]);
}, "Expected to show groups " + expectedListingStable + ", but found " + foundListing[0]);
service.close();
}
@Test
public void testShareGroupStatesFromString() {
Set<ShareGroupState> result = ShareGroupCommand.shareGroupStatesFromString("Stable");
assertEquals(Collections.singleton(ShareGroupState.STABLE), result);
public void testGroupStatesFromString() {
Set<GroupState> result = ShareGroupCommand.groupStatesFromString("Stable");
assertEquals(Collections.singleton(GroupState.STABLE), result);
result = ShareGroupCommand.shareGroupStatesFromString("stable");
assertEquals(new HashSet<>(Collections.singletonList(ShareGroupState.STABLE)), result);
result = ShareGroupCommand.groupStatesFromString("stable");
assertEquals(new HashSet<>(Collections.singletonList(GroupState.STABLE)), result);
result = ShareGroupCommand.shareGroupStatesFromString("dead");
assertEquals(new HashSet<>(Collections.singletonList(ShareGroupState.DEAD)), result);
result = ShareGroupCommand.groupStatesFromString("dead");
assertEquals(new HashSet<>(Collections.singletonList(GroupState.DEAD)), result);
result = ShareGroupCommand.shareGroupStatesFromString("empty");
assertEquals(new HashSet<>(Collections.singletonList(ShareGroupState.EMPTY)), result);
result = ShareGroupCommand.groupStatesFromString("empty");
assertEquals(new HashSet<>(Collections.singletonList(GroupState.EMPTY)), result);
assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.shareGroupStatesFromString("bad, wrong"));
assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.groupStatesFromString("assigning"));
assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.shareGroupStatesFromString(" bad, Stable"));
assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.groupStatesFromString("bad, wrong"));
assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.shareGroupStatesFromString(" , ,"));
assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.groupStatesFromString(" bad, Stable"));
assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.groupStatesFromString(" , ,"));
}
ShareGroupService getShareGroupService(String[] args, Admin adminClient) {