KAFKA-18273: KIP-1099 verbose display share group options (#18259)

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
Andrew Schofield 2025-01-02 09:12:52 +00:00 committed by GitHub
parent 4896d02ca7
commit 0344f8f5ae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 740 additions and 224 deletions

View File

@ -38,24 +38,32 @@ public class ShareGroupDescription {
private final Collection<ShareMemberDescription> members;
private final GroupState groupState;
private final Node coordinator;
private final int groupEpoch;
private final int targetAssignmentEpoch;
private final Set<AclOperation> authorizedOperations;
public ShareGroupDescription(String groupId,
Collection<ShareMemberDescription> members,
GroupState groupState,
Node coordinator) {
this(groupId, members, groupState, coordinator, Collections.emptySet());
Node coordinator,
int groupEpoch,
int targetAssignmentEpoch) {
this(groupId, members, groupState, coordinator, groupEpoch, targetAssignmentEpoch, Collections.emptySet());
}
public ShareGroupDescription(String groupId,
Collection<ShareMemberDescription> members,
GroupState groupState,
Node coordinator,
int groupEpoch,
int targetAssignmentEpoch,
Set<AclOperation> authorizedOperations) {
this.groupId = groupId == null ? "" : groupId;
this.members = members == null ? Collections.emptyList() : List.copyOf(members);
this.groupState = groupState;
this.coordinator = coordinator;
this.groupEpoch = groupEpoch;
this.targetAssignmentEpoch = targetAssignmentEpoch;
this.authorizedOperations = authorizedOperations;
}
@ -68,12 +76,14 @@ public class ShareGroupDescription {
Objects.equals(members, that.members) &&
groupState == that.groupState &&
Objects.equals(coordinator, that.coordinator) &&
groupEpoch == that.groupEpoch &&
targetAssignmentEpoch == that.targetAssignmentEpoch &&
Objects.equals(authorizedOperations, that.authorizedOperations);
}
@Override
public int hashCode() {
return Objects.hash(groupId, members, groupState, coordinator, authorizedOperations);
return Objects.hash(groupId, members, groupState, coordinator, groupEpoch, targetAssignmentEpoch, authorizedOperations);
}
/**
@ -111,12 +121,28 @@ public class ShareGroupDescription {
return authorizedOperations;
}
/**
* The epoch of the share group.
*/
public int groupEpoch() {
return groupEpoch;
}
/**
* The epoch of the target assignment.
*/
public int targetAssignmentEpoch() {
return targetAssignmentEpoch;
}
@Override
public String toString() {
return "(groupId=" + groupId +
", members=" + members.stream().map(ShareMemberDescription::toString).collect(Collectors.joining(",")) +
", groupState=" + groupState +
", coordinator=" + coordinator +
", groupEpoch=" + groupEpoch +
", targetAssignmentEpoch=" + targetAssignmentEpoch +
", authorizedOperations=" + authorizedOperations +
")";
}

View File

@ -30,18 +30,21 @@ public class ShareMemberDescription {
private final String clientId;
private final String host;
private final ShareMemberAssignment assignment;
private final int memberEpoch;
public ShareMemberDescription(
String memberId,
String clientId,
String host,
ShareMemberAssignment assignment
ShareMemberAssignment assignment,
int memberEpoch
) {
this.memberId = memberId == null ? "" : memberId;
this.clientId = clientId == null ? "" : clientId;
this.host = host == null ? "" : host;
this.assignment = assignment == null ?
new ShareMemberAssignment(Collections.emptySet()) : assignment;
this.memberEpoch = memberEpoch;
}
@Override
@ -52,12 +55,13 @@ public class ShareMemberDescription {
return memberId.equals(that.memberId) &&
clientId.equals(that.clientId) &&
host.equals(that.host) &&
assignment.equals(that.assignment);
assignment.equals(that.assignment) &&
memberEpoch == that.memberEpoch;
}
@Override
public int hashCode() {
return Objects.hash(memberId, clientId, host, assignment);
return Objects.hash(memberId, clientId, host, assignment, memberEpoch);
}
/**
@ -88,11 +92,20 @@ public class ShareMemberDescription {
return assignment;
}
/**
* The epoch of the group member.
*/
public int memberEpoch() {
return memberEpoch;
}
@Override
public String toString() {
return "(memberId=" + memberId +
", clientId=" + clientId +
", host=" + host +
", assignment=" + assignment + ")";
", assignment=" + assignment +
", memberEpoch=" + memberEpoch +
")";
}
}

View File

@ -121,7 +121,8 @@ public class DescribeShareGroupsHandler extends AdminApiHandler.Batched<Coordina
groupMember.memberId(),
groupMember.clientId(),
groupMember.clientHost(),
new ShareMemberAssignment(convertAssignment(groupMember.assignment()))
new ShareMemberAssignment(convertAssignment(groupMember.assignment())),
groupMember.memberEpoch()
))
);
@ -130,6 +131,8 @@ public class DescribeShareGroupsHandler extends AdminApiHandler.Batched<Coordina
memberDescriptions,
GroupState.parse(describedGroup.groupState()),
coordinator,
describedGroup.groupEpoch(),
describedGroup.assignmentEpoch(),
authorizedOperations);
completed.put(groupIdKey, shareGroupDescription);
}

View File

@ -8703,7 +8703,8 @@ public class KafkaAdminClientTest {
return new ShareMemberDescription(member.memberId(),
member.clientId(),
member.clientHost(),
assignment);
assignment,
member.memberEpoch());
}
@Test

View File

@ -20,8 +20,10 @@ import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import java.util.Arrays;
import java.util.Locale;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* Represents the strategy for resetting offsets in share consumer groups when no previous offset is found
@ -148,12 +150,19 @@ public class ShareGroupAutoOffsetResetStrategy {
fromString(offsetStrategy);
} catch (Exception e) {
throw new ConfigException(name, value, "Invalid value `" + offsetStrategy + "` for configuration " +
name + ". The value must be either 'earliest', 'latest' or of the format 'by_duration:<PnDTnHnMn.nS.>'.");
name + ". The value must be either 'earliest', 'latest' or of the format 'by_duration:<PnDTnHnMn.nS>'.");
}
}
public String toString() {
return "[earliest, latest, by_duration:PnDTnHnMn.nS]";
String values = Arrays.stream(StrategyType.values())
.map(strategyType -> {
if (strategyType == StrategyType.BY_DURATION) {
return strategyType + ":PnDTnHnMn.nS";
}
return strategyType.toString();
}).collect(Collectors.joining(", "));
return "[" + values + "]";
}
}
}

View File

@ -18,21 +18,24 @@ package org.apache.kafka.tools.consumer.group;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DescribeShareGroupsResult;
import org.apache.kafka.clients.admin.DescribeShareGroupsOptions;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.clients.admin.ListGroupsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.ShareGroupDescription;
import org.apache.kafka.clients.admin.ShareMemberAssignment;
import org.apache.kafka.clients.admin.ShareMemberDescription;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandLineUtils;
import java.io.IOException;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -40,9 +43,11 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -186,103 +191,185 @@ public class ShareGroupCommand {
}
public void describeGroups() throws ExecutionException, InterruptedException {
String group = opts.options.valueOf(opts.groupOpt);
ShareGroupDescription description = getDescribeGroup(group);
if (description == null)
return;
Collection<String> groupIds = opts.options.has(opts.allGroupsOpt)
? listShareGroups()
: opts.options.valuesOf(opts.groupOpt);
if (opts.options.has(opts.membersOpt)) {
printMembers(description);
TreeMap<String, ShareGroupDescription> members = collectGroupsDescription(groupIds);
printMembers(members, opts.options.has(opts.verboseOpt));
} else if (opts.options.has(opts.stateOpt)) {
printStates(description);
TreeMap<String, ShareGroupDescription> states = collectGroupsDescription(groupIds);
printStates(states, opts.options.has(opts.verboseOpt));
} else {
printOffsets(description);
TreeMap<String, Entry<ShareGroupDescription, Collection<SharePartitionOffsetInformation>>> offsets
= collectGroupsOffsets(groupIds);
printOffsets(offsets);
}
}
ShareGroupDescription getDescribeGroup(String group) throws ExecutionException, InterruptedException {
DescribeShareGroupsResult result = adminClient.describeShareGroups(List.of(group));
Map<String, ShareGroupDescription> descriptionMap = result.all().get();
if (descriptionMap.containsKey(group)) {
return descriptionMap.get(group);
Map<String, ShareGroupDescription> describeShareGroups(Collection<String> groupIds) throws ExecutionException, InterruptedException {
Map<String, ShareGroupDescription> res = new HashMap<>();
Map<String, KafkaFuture<ShareGroupDescription>> stringKafkaFutureMap = adminClient.describeShareGroups(
groupIds,
new DescribeShareGroupsOptions().timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
).describedGroups();
for (Entry<String, KafkaFuture<ShareGroupDescription>> e : stringKafkaFutureMap.entrySet()) {
res.put(e.getKey(), e.getValue().get());
}
return null;
return res;
}
Map<TopicPartition, Long> getOffsets(Collection<ShareMemberDescription> members) throws ExecutionException, InterruptedException {
Set<TopicPartition> allTp = new HashSet<>();
for (ShareMemberDescription memberDescription : members) {
allTp.addAll(memberDescription.assignment().topicPartitions());
}
// fetch latest and earliest offsets
Map<TopicPartition, OffsetSpec> earliest = new HashMap<>();
Map<TopicPartition, OffsetSpec> latest = new HashMap<>();
for (TopicPartition tp : allTp) {
earliest.put(tp, OffsetSpec.earliest());
latest.put(tp, OffsetSpec.latest());
}
// This call to obtain the earliest offsets will be replaced once adminClient.listShareGroupOffsets is implemented
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> earliestResult = adminClient.listOffsets(earliest).all().get();
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestResult = adminClient.listOffsets(latest).all().get();
Map<TopicPartition, Long> lag = new HashMap<>();
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> tp : earliestResult.entrySet()) {
lag.put(tp.getKey(), latestResult.get(tp.getKey()).offset() - earliestResult.get(tp.getKey()).offset());
}
return lag;
TreeMap<String, ShareGroupDescription> collectGroupsDescription(Collection<String> groupIds) throws ExecutionException, InterruptedException {
Map<String, ShareGroupDescription> shareGroups = describeShareGroups(groupIds);
TreeMap<String, ShareGroupDescription> res = new TreeMap<>();
shareGroups.forEach(res::put);
return res;
}
private void printOffsets(ShareGroupDescription description) throws ExecutionException, InterruptedException {
Map<TopicPartition, Long> offsets = getOffsets(description.members());
if (maybePrintEmptyGroupState(description.groupId(), description.groupState(), offsets.size())) {
String fmt = printOffsetFormat(description, offsets);
System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "START-OFFSET");
TreeMap<String, Entry<ShareGroupDescription, Collection<SharePartitionOffsetInformation>>> collectGroupsOffsets(Collection<String> groupIds) throws ExecutionException, InterruptedException {
Map<String, ShareGroupDescription> shareGroups = describeShareGroups(groupIds);
TreeMap<String, Entry<ShareGroupDescription, Collection<SharePartitionOffsetInformation>>> groupOffsets = new TreeMap<>();
for (Map.Entry<TopicPartition, Long> offset : offsets.entrySet()) {
System.out.printf(fmt, description.groupId(), offset.getKey().topic(), offset.getKey().partition(), offset.getValue());
shareGroups.forEach((groupId, shareGroup) -> {
Set<TopicPartition> allTp = new HashSet<>();
for (ShareMemberDescription memberDescription : shareGroup.members()) {
allTp.addAll(memberDescription.assignment().topicPartitions());
}
}
// Fetch latest and earliest offsets
Map<TopicPartition, OffsetSpec> earliest = new HashMap<>();
Map<TopicPartition, OffsetSpec> latest = new HashMap<>();
for (TopicPartition tp : allTp) {
earliest.put(tp, OffsetSpec.earliest());
latest.put(tp, OffsetSpec.latest());
}
// This call to obtain the earliest offsets will be replaced once adminClient.listShareGroupOffsets is implemented
try {
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> earliestResult = adminClient.listOffsets(earliest).all().get();
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestResult = adminClient.listOffsets(latest).all().get();
Set<SharePartitionOffsetInformation> partitionOffsets = new HashSet<>();
for (Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> tp : earliestResult.entrySet()) {
SharePartitionOffsetInformation partitionOffsetInfo = new SharePartitionOffsetInformation(
groupId,
tp.getKey().topic(),
tp.getKey().partition(),
latestResult.get(tp.getKey()).offset() - earliestResult.get(tp.getKey()).offset()
);
partitionOffsets.add(partitionOffsetInfo);
}
groupOffsets.put(groupId, new SimpleImmutableEntry<>(shareGroup, partitionOffsets));
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
return groupOffsets;
}
private static String printOffsetFormat(ShareGroupDescription description, Map<TopicPartition, Long> offsets) {
int groupLen = Math.max(15, description.groupId().length());
private void printOffsets(TreeMap<String, Entry<ShareGroupDescription, Collection<SharePartitionOffsetInformation>>> offsets) {
offsets.forEach((groupId, tuple) -> {
ShareGroupDescription description = tuple.getKey();
Collection<SharePartitionOffsetInformation> offsetsInfo = tuple.getValue();
if (maybePrintEmptyGroupState(groupId, description.groupState(), offsetsInfo.size())) {
String fmt = printOffsetFormat(groupId, offsetsInfo);
System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "START-OFFSET");
for (SharePartitionOffsetInformation info : offsetsInfo) {
System.out.printf(fmt,
groupId,
info.topic,
info.partition,
info.offset
);
}
System.out.println();
}
});
}
private static String printOffsetFormat(String groupId, Collection<SharePartitionOffsetInformation> offsetsInfo) {
int groupLen = Math.max(15, groupId.length());
int maxTopicLen = 15;
for (TopicPartition topicPartition : offsets.keySet()) {
maxTopicLen = Math.max(maxTopicLen, topicPartition.topic().length());
for (SharePartitionOffsetInformation info : offsetsInfo) {
maxTopicLen = Math.max(maxTopicLen, info.topic.length());
}
return "%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s %s\n";
return "\n%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s %s";
}
private void printStates(ShareGroupDescription description) {
maybePrintEmptyGroupState(description.groupId(), description.groupState(), 1);
private void printStates(Map<String, ShareGroupDescription> descriptions, boolean verbose) {
descriptions.forEach((groupId, description) -> {
maybePrintEmptyGroupState(groupId, description.groupState(), 1);
int groupLen = Math.max(15, description.groupId().length());
String coordinator = description.coordinator().host() + ":" + description.coordinator().port() + " (" + description.coordinator().idString() + ")";
int coordinatorLen = Math.max(25, coordinator.length());
int groupLen = Math.max(15, groupId.length());
String coordinator = description.coordinator().host() + ":" + description.coordinator().port() + " (" + description.coordinator().idString() + ")";
int coordinatorLen = Math.max(25, coordinator.length());
String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s %-15s %s\n";
System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE", "#MEMBERS");
System.out.printf(fmt, description.groupId(), coordinator, description.groupState().toString(), description.members().size());
if (verbose) {
String fmt = "\n%" + -groupLen + "s %" + -coordinatorLen + "s %-15s %-12s %-17s %s";
System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE", "GROUP-EPOCH", "ASSIGNMENT-EPOCH", "#MEMBERS");
System.out.printf(fmt, groupId, coordinator, description.groupState().toString(),
description.groupEpoch(), description.targetAssignmentEpoch(), description.members().size());
} else {
String fmt = "\n%" + -groupLen + "s %" + -coordinatorLen + "s %-15s %s";
System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE", "#MEMBERS");
System.out.printf(fmt, groupId, coordinator, description.groupState().toString(), description.members().size());
}
System.out.println();
});
}
private void printMembers(ShareGroupDescription description) {
int groupLen = Math.max(15, description.groupId().length());
int maxConsumerIdLen = 15, maxHostLen = 15, maxClientIdLen = 15;
Collection<ShareMemberDescription> members = description.members();
if (maybePrintEmptyGroupState(description.groupId(), description.groupState(), description.members().size())) {
for (ShareMemberDescription member : members) {
maxConsumerIdLen = Math.max(maxConsumerIdLen, member.consumerId().length());
maxHostLen = Math.max(maxHostLen, member.host().length());
maxClientIdLen = Math.max(maxClientIdLen, member.clientId().length());
}
private void printMembers(TreeMap<String, ShareGroupDescription> descriptions, boolean verbose) {
descriptions.forEach((groupId, description) -> {
int groupLen = Math.max(15, groupId.length());
int maxConsumerIdLen = 15, maxHostLen = 15, maxClientIdLen = 15;
Collection<ShareMemberDescription> members = description.members();
if (maybePrintEmptyGroupState(groupId, description.groupState(), description.members().size())) {
for (ShareMemberDescription member : members) {
maxConsumerIdLen = Math.max(maxConsumerIdLen, member.consumerId().length());
maxHostLen = Math.max(maxHostLen, member.host().length());
maxClientIdLen = Math.max(maxClientIdLen, member.clientId().length());
}
String fmt = "%" + -groupLen + "s %" + -maxConsumerIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %s\n";
System.out.printf(fmt, "GROUP", "CONSUMER-ID", "HOST", "CLIENT-ID", "ASSIGNMENT");
for (ShareMemberDescription member : members) {
System.out.printf(fmt, description.groupId(), member.consumerId(), member.host(), member.clientId(),
member.assignment().topicPartitions().stream().map(part -> part.topic() + ":" + part.partition()).collect(Collectors.joining(",")));
if (verbose) {
String fmt = "\n%" + -groupLen + "s %" + -maxConsumerIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %-13s %s";
System.out.printf(fmt, "GROUP", "CONSUMER-ID", "HOST", "CLIENT-ID", "MEMBER-EPOCH", "ASSIGNMENT");
for (ShareMemberDescription member : members) {
System.out.printf(fmt, groupId, member.consumerId(), member.host(), member.clientId(), member.memberEpoch(), getAssignmentString(member.assignment()));
}
} else {
String fmt = "\n%" + -groupLen + "s %" + -maxConsumerIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %s";
System.out.printf(fmt, "GROUP", "CONSUMER-ID", "HOST", "CLIENT-ID", "ASSIGNMENT");
for (ShareMemberDescription member : members) {
System.out.printf(fmt, groupId, member.consumerId(), member.host(), member.clientId(), getAssignmentString(member.assignment()));
}
}
System.out.println();
}
}
});
}
private String getAssignmentString(ShareMemberAssignment assignment) {
Map<String, List<TopicPartition>> grouped = new HashMap<>();
assignment.topicPartitions().forEach(tp ->
grouped
.computeIfAbsent(tp.topic(), key -> new ArrayList<>())
.add(tp)
);
return grouped.entrySet().stream().map(entry -> {
String topicName = entry.getKey();
List<TopicPartition> topicPartitions = entry.getValue();
return topicPartitions
.stream()
.map(TopicPartition::partition)
.map(Object::toString)
.sorted()
.collect(Collectors.joining(",", topicName + ":", ""));
}).sorted().collect(Collectors.joining(";"));
}
public void close() {
@ -296,4 +383,23 @@ public class ShareGroupCommand {
return Admin.create(props);
}
}
static class SharePartitionOffsetInformation {
final String group;
final String topic;
final int partition;
final long offset;
SharePartitionOffsetInformation(
String group,
String topic,
int partition,
long offset
) {
this.group = group;
this.topic = topic;
this.partition = partition;
this.offset = offset;
}
}
}

View File

@ -33,60 +33,66 @@ import joptsimple.OptionSpec;
import static org.apache.kafka.tools.ToolsUtils.minus;
public class ShareGroupCommandOptions extends CommandDefaultOptions {
public static final Logger LOGGER = LoggerFactory.getLogger(ShareGroupCommandOptions.class);
private static final Logger LOGGER = LoggerFactory.getLogger(ShareGroupCommandOptions.class);
public static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) to connect to.";
public static final String GROUP_DOC = "The share group we wish to act on.";
public static final String TOPIC_DOC = "The topic whose share group information should be deleted or topic whose should be included in the reset offset process. " +
private static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) to connect to.";
private static final String GROUP_DOC = "The share group we wish to act on.";
private static final String TOPIC_DOC = "The topic whose share group information should be deleted or topic whose should be included in the reset offset process. " +
"When resetting offsets, partitions can be specified using this format: 'topic1:0,1,2', where 0,1,2 are the partitions to be included.";
public static final String ALL_TOPICS_DOC = "Consider all topics assigned to a share group in the 'reset-offsets' process.";
public static final String LIST_DOC = "List all share groups.";
public static final String DESCRIBE_DOC = "Describe share group, members and offset information.";
public static final String NL = System.lineSeparator();
public static final String DELETE_DOC = "Delete share group.";
public static final String TIMEOUT_MS_DOC = "The timeout that can be set for some use cases. For example, it can be used when describing the group " +
private static final String ALL_TOPICS_DOC = "Consider all topics assigned to a share group in the 'reset-offsets' process.";
private static final String LIST_DOC = "List all share groups.";
private static final String DESCRIBE_DOC = "Describe share group, members and offset information.";
private static final String ALL_GROUPS_DOC = "Apply to all share groups.";
private static final String NL = System.lineSeparator();
private static final String DELETE_DOC = "Delete share group.";
private static final String TIMEOUT_MS_DOC = "The timeout that can be set for some use cases. For example, it can be used when describing the group " +
"to specify the maximum amount of time in milliseconds to wait before the group stabilizes.";
public static final String COMMAND_CONFIG_DOC = "Property file containing configs to be passed to Admin Client.";
public static final String RESET_OFFSETS_DOC = "Reset offsets of share group. Supports one share group at the time, and instances must be inactive." + NL +
private static final String COMMAND_CONFIG_DOC = "Property file containing configs to be passed to Admin Client.";
private static final String RESET_OFFSETS_DOC = "Reset offsets of share group. Supports one share group at the time, and instances must be inactive." + NL +
"Has 2 execution options: --dry-run (the default) to plan which offsets to reset, and --execute to reset the offsets. " + NL +
"You must choose one of the following reset specifications: --to-datetime, --to-earliest, --to-latest." + NL +
"To define the scope use --all-topics or --topic.";
public static final String DRY_RUN_DOC = "Only show results without executing changes on share groups. Supported operations: reset-offsets.";
public static final String EXECUTE_DOC = "Execute operation. Supported operations: reset-offsets.";
public static final String RESET_TO_DATETIME_DOC = "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'";
public static final String RESET_TO_EARLIEST_DOC = "Reset offsets to earliest offset.";
public static final String RESET_TO_LATEST_DOC = "Reset offsets to latest offset.";
public static final String MEMBERS_DOC = "Describe members of the group. This option may be used with the '--describe' option only.";
public static final String OFFSETS_DOC = "Describe the group and list all topic partitions in the group along with their offset information. " +
private static final String DRY_RUN_DOC = "Only show results without executing changes on share groups. Supported operations: reset-offsets.";
private static final String EXECUTE_DOC = "Execute operation. Supported operations: reset-offsets.";
private static final String RESET_TO_DATETIME_DOC = "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'";
private static final String RESET_TO_EARLIEST_DOC = "Reset offsets to earliest offset.";
private static final String RESET_TO_LATEST_DOC = "Reset offsets to latest offset.";
private static final String MEMBERS_DOC = "Describe members of the group. This option may be used with the '--describe' option only.";
private static final String OFFSETS_DOC = "Describe the group and list all topic partitions in the group along with their offset information. " +
"This is the default sub-action and may be used with the '--describe' option only.";
public static final String STATE_DOC = "When specified with '--describe', includes the state of the group." + NL +
private static final String STATE_DOC = "When specified with '--describe', includes the state of the group." + NL +
"When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states. " +
"Valid values are Empty, Stable and Dead.";
public static final String DELETE_OFFSETS_DOC = "Delete offsets of share group. Supports one share group at the time, and multiple topics.";
private static final String VERBOSE_DOC = "Provide additional information, if any, when describing the group. This option may be used " +
"with the '--describe --state' and '--describe --members' options only.";
private static final String DELETE_OFFSETS_DOC = "Delete offsets of share group. Supports one share group at the time, and multiple topics.";
public final OptionSpec<String> bootstrapServerOpt;
public final OptionSpec<String> groupOpt;
public final OptionSpec<String> topicOpt;
public final OptionSpec<Void> allTopicsOpt;
public final OptionSpec<Void> listOpt;
public final OptionSpec<Void> describeOpt;
public final OptionSpec<Void> deleteOpt;
public final OptionSpec<Long> timeoutMsOpt;
public final OptionSpec<String> commandConfigOpt;
public final OptionSpec<Void> resetOffsetsOpt;
public final OptionSpec<Void> deleteOffsetsOpt;
public final OptionSpec<Void> dryRunOpt;
public final OptionSpec<Void> executeOpt;
public final OptionSpec<String> resetToDatetimeOpt;
public final OptionSpec<Void> resetToEarliestOpt;
public final OptionSpec<Void> resetToLatestOpt;
public final OptionSpec<Void> membersOpt;
public final OptionSpec<Void> offsetsOpt;
public final OptionSpec<String> stateOpt;
final OptionSpec<String> bootstrapServerOpt;
final OptionSpec<String> groupOpt;
final OptionSpec<String> topicOpt;
final OptionSpec<Void> allTopicsOpt;
final OptionSpec<Void> listOpt;
final OptionSpec<Void> describeOpt;
final OptionSpec<Void> allGroupsOpt;
final OptionSpec<Void> deleteOpt;
final OptionSpec<Long> timeoutMsOpt;
final OptionSpec<String> commandConfigOpt;
final OptionSpec<Void> resetOffsetsOpt;
final OptionSpec<Void> deleteOffsetsOpt;
final OptionSpec<Void> dryRunOpt;
final OptionSpec<Void> executeOpt;
final OptionSpec<String> resetToDatetimeOpt;
final OptionSpec<Void> resetToEarliestOpt;
final OptionSpec<Void> resetToLatestOpt;
final OptionSpec<Void> membersOpt;
final OptionSpec<Void> offsetsOpt;
final OptionSpec<String> stateOpt;
final OptionSpec<Void> verboseOpt;
public final Set<OptionSpec<?>> allShareGroupLevelOpts;
public final Set<OptionSpec<?>> allResetOffsetScenarioOpts;
public final Set<OptionSpec<?>> allDeleteOffsetsOpts;
final Set<OptionSpec<?>> allGroupSelectionScopeOpts;
final Set<OptionSpec<?>> allShareGroupLevelOpts;
final Set<OptionSpec<?>> allResetOffsetScenarioOpts;
final Set<OptionSpec<?>> allDeleteOffsetsOpts;
public ShareGroupCommandOptions(String[] args) {
super(args);
@ -106,6 +112,7 @@ public class ShareGroupCommandOptions extends CommandDefaultOptions {
allTopicsOpt = parser.accepts("all-topics", ALL_TOPICS_DOC);
listOpt = parser.accepts("list", LIST_DOC);
describeOpt = parser.accepts("describe", DESCRIBE_DOC);
allGroupsOpt = parser.accepts("all-groups", ALL_GROUPS_DOC);
deleteOpt = parser.accepts("delete", DELETE_DOC);
timeoutMsOpt = parser.accepts("timeout", TIMEOUT_MS_DOC)
.withRequiredArg()
@ -134,8 +141,12 @@ public class ShareGroupCommandOptions extends CommandDefaultOptions {
.availableIf(describeOpt, listOpt)
.withOptionalArg()
.ofType(String.class);
verboseOpt = parser.accepts("verbose", VERBOSE_DOC)
.availableIf(membersOpt, stateOpt)
.availableUnless(listOpt);
allShareGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt, describeOpt, deleteOpt, resetOffsetsOpt));
allGroupSelectionScopeOpts = new HashSet<>(Arrays.asList(groupOpt, allGroupsOpt));
allShareGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt, describeOpt, deleteOpt, deleteOffsetsOpt, resetOffsetsOpt));
allResetOffsetScenarioOpts = new HashSet<>(Arrays.asList(resetToDatetimeOpt, resetToEarliestOpt, resetToLatestOpt));
allDeleteOffsetsOpts = new HashSet<>(Arrays.asList(groupOpt, topicOpt));
@ -149,9 +160,9 @@ public class ShareGroupCommandOptions extends CommandDefaultOptions {
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt);
if (options.has(describeOpt)) {
if (!options.has(groupOpt))
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
CommandLineUtils.printUsageAndExit(parser,
"Option " + describeOpt + " takes the option: " + groupOpt);
"Option " + describeOpt + " takes one of these options: " + allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(", ")));
List<OptionSpec<?>> mutuallyExclusiveOpts = Arrays.asList(membersOpt, offsetsOpt, stateOpt);
if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ? 1 : 0).sum() > 1) {
CommandLineUtils.printUsageAndExit(parser,
@ -160,9 +171,6 @@ public class ShareGroupCommandOptions extends CommandDefaultOptions {
if (options.has(stateOpt) && options.valueOf(stateOpt) != null)
CommandLineUtils.printUsageAndExit(parser,
"Option " + describeOpt + " does not take a value for " + stateOpt);
} else {
if (options.has(timeoutMsOpt))
LOGGER.debug("Option " + timeoutMsOpt + " is applicable only when " + describeOpt + " is used.");
}
if (options.has(deleteOpt)) {
@ -197,7 +205,8 @@ public class ShareGroupCommandOptions extends CommandDefaultOptions {
CommandLineUtils.checkInvalidArgs(parser, options, resetToLatestOpt, minus(allResetOffsetScenarioOpts, resetToLatestOpt));
}
CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, minus(allShareGroupLevelOpts, describeOpt, deleteOpt, resetOffsetsOpt));
CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, minus(allGroupSelectionScopeOpts, groupOpt));
CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, minus(allShareGroupLevelOpts, describeOpt, deleteOpt, deleteOffsetsOpt, resetOffsetsOpt));
CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, minus(allShareGroupLevelOpts, deleteOpt, resetOffsetsOpt));
}
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.tools.consumer.group;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DescribeShareGroupsOptions;
import org.apache.kafka.clients.admin.DescribeShareGroupsResult;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.KafkaAdminClient;
@ -32,33 +33,46 @@ import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.ToolsTestUtils;
import org.apache.kafka.tools.consumer.group.ShareGroupCommand.ShareGroupService;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import joptsimple.OptionException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ShareGroupCommandTest {
private static final List<List<String>> DESCRIBE_TYPE_OFFSETS = List.of(List.of(""), List.of("--offsets"));
private static final List<List<String>> DESCRIBE_TYPE_MEMBERS = List.of(List.of("--members"), List.of("--members", "--verbose"));
private static final List<List<String>> DESCRIBE_TYPE_STATE = List.of(List.of("--state"), List.of("--state", "--verbose"));
private static final List<List<String>> DESCRIBE_TYPES = Stream.of(DESCRIBE_TYPE_OFFSETS, DESCRIBE_TYPE_MEMBERS, DESCRIBE_TYPE_STATE).flatMap(Collection::stream).toList();
@Test
public void testListShareGroups() throws Exception {
@ -73,64 +87,401 @@ public class ShareGroupCommandTest {
new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)),
new GroupListing(secondGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.EMPTY))
)));
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(result);
ShareGroupService service = getShareGroupService(cgcArgs, adminClient);
Set<String> expectedGroups = new HashSet<>(Arrays.asList(firstGroup, secondGroup));
final Set[] foundGroups = new Set[]{Collections.emptySet()};
TestUtils.waitForCondition(() -> {
foundGroups[0] = new HashSet<>(service.listShareGroups());
return Objects.equals(expectedGroups, foundGroups[0]);
}, "Expected --list to show groups " + expectedGroups + ", but found " + foundGroups[0] + ".");
service.close();
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(result);
try (ShareGroupService service = getShareGroupService(cgcArgs, adminClient)) {
Set<String> expectedGroups = new HashSet<>(Arrays.asList(firstGroup, secondGroup));
final Set[] foundGroups = new Set[]{Set.of()};
TestUtils.waitForCondition(() -> {
foundGroups[0] = new HashSet<>(service.listShareGroups());
return Objects.equals(expectedGroups, foundGroups[0]);
}, "Expected --list to show groups " + expectedGroups + ", but found " + foundGroups[0] + ".");
}
}
@Test
public void testDescribeShareGroups() throws Exception {
String firstGroup = "group1";
public void testListShareGroupsWithStates() throws Exception {
String firstGroup = "first-group";
String secondGroup = "second-group";
String bootstrapServer = "localhost:9092";
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--list", "--state"};
Admin adminClient = mock(KafkaAdminClient.class);
DescribeShareGroupsResult result = mock(DescribeShareGroupsResult.class);
Map<String, ShareGroupDescription> resultMap = new HashMap<>();
ShareGroupDescription exp = new ShareGroupDescription(
ListGroupsResult resultWithAllStates = mock(ListGroupsResult.class);
when(resultWithAllStates.all()).thenReturn(KafkaFuture.completedFuture(Arrays.asList(
new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)),
new GroupListing(secondGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.EMPTY))
)));
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithAllStates);
try (ShareGroupService service = getShareGroupService(cgcArgs, adminClient)) {
Set<GroupListing> expectedListing = new HashSet<>(Arrays.asList(
new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)),
new GroupListing(secondGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.EMPTY))));
final Set[] foundListing = new Set[]{Set.of()};
TestUtils.waitForCondition(() -> {
foundListing[0] = new HashSet<>(service.listShareGroupsInStates(new HashSet<>(Arrays.asList(GroupState.values()))));
return Objects.equals(expectedListing, foundListing[0]);
}, "Expected to show groups " + expectedListing + ", but found " + foundListing[0]);
ListGroupsResult resultWithStableState = mock(ListGroupsResult.class);
when(resultWithStableState.all()).thenReturn(KafkaFuture.completedFuture(List.of(
new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE))
)));
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithStableState);
Set<GroupListing> expectedListingStable = Set.of(
new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)));
foundListing[0] = Set.of();
TestUtils.waitForCondition(() -> {
foundListing[0] = new HashSet<>(service.listShareGroupsInStates(Set.of(GroupState.STABLE)));
return Objects.equals(expectedListingStable, foundListing[0]);
}, "Expected to show groups " + expectedListingStable + ", but found " + foundListing[0]);
}
}
@Test
public void testDescribeOffsetsOfExistingGroup() throws Exception {
String firstGroup = "group1";
String bootstrapServer = "localhost:9092";
for (List<String> describeType : DESCRIBE_TYPE_OFFSETS) {
List<String> cgcArgs = new ArrayList<>(List.of("--bootstrap-server", bootstrapServer, "--describe", "--group", firstGroup));
cgcArgs.addAll(describeType);
Admin adminClient = mock(KafkaAdminClient.class);
DescribeShareGroupsResult describeShareGroupsResult = mock(DescribeShareGroupsResult.class);
ShareGroupDescription exp = new ShareGroupDescription(
firstGroup,
List.of(new ShareMemberDescription("memid1", "clId1", "host1", new ShareMemberAssignment(
Set.of(new TopicPartition("topic1", 0))
))),
Set.of(new TopicPartition("topic1", 0))
), 0)),
GroupState.STABLE,
new Node(0, "host1", 9090));
resultMap.put(firstGroup, exp);
new Node(0, "host1", 9090), 0, 0);
ListOffsetsResult resultOffsets = new ListOffsetsResult(
Map.of(
new TopicPartition("topic1", 0),
KafkaFuture.completedFuture(new ListOffsetsResult.ListOffsetsResultInfo(0, 0, Optional.empty()))
));
when(result.all()).thenReturn(KafkaFuture.completedFuture(resultMap));
when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection())).thenReturn(result);
ShareGroupService service = new ShareGroupService(null, adminClient);
assertEquals(exp, service.getDescribeGroup(firstGroup));
service.close();
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(firstGroup, KafkaFuture.completedFuture(exp)));
when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(), any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
when(adminClient.listOffsets(ArgumentMatchers.anyMap())).thenReturn(resultOffsets);
try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
TestUtils.waitForCondition(() -> {
Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
String[] lines = res.getKey().trim().split("\n");
if (lines.length != 2 && !res.getValue().isEmpty()) {
return false;
}
List<String> expectedValues = List.of(firstGroup, "topic1", "0", "0");
return checkArgsHeaderOutput(cgcArgs, lines[0]) &&
Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues);
}, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + ".");
}
}
}
@Test
public void testDescribeShareGroupsGetOffsets() throws Exception {
Admin adminClient = mock(KafkaAdminClient.class);
public void testDescribeOffsetsOfAllExistingGroups() throws Exception {
String firstGroup = "group1";
String secondGroup = "group2";
String bootstrapServer = "localhost:9092";
ListOffsetsResult startOffset = mock(ListOffsetsResult.class);
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> startOffsetResultMap = new HashMap<>();
startOffsetResultMap.put(new TopicPartition("topic1", 0), new ListOffsetsResult.ListOffsetsResultInfo(10, -1, Optional.empty()));
for (List<String> describeType : DESCRIBE_TYPE_OFFSETS) {
List<String> cgcArgs = new ArrayList<>(List.of("--bootstrap-server", bootstrapServer, "--describe", "--all-groups"));
cgcArgs.addAll(describeType);
Admin adminClient = mock(KafkaAdminClient.class);
ListGroupsResult listGroupsResult = mock(ListGroupsResult.class);
GroupListing firstGroupListing = new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE));
GroupListing secondGroupListing = new GroupListing(secondGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE));
DescribeShareGroupsResult describeShareGroupsResult = mock(DescribeShareGroupsResult.class);
ShareGroupDescription exp1 = new ShareGroupDescription(
firstGroup,
List.of(new ShareMemberDescription("memid1", "clId1", "host1", new ShareMemberAssignment(
Set.of(new TopicPartition("topic1", 0))
), 0)),
GroupState.STABLE,
new Node(0, "host1", 9090), 0, 0);
ShareGroupDescription exp2 = new ShareGroupDescription(
secondGroup,
List.of(new ShareMemberDescription("memid1", "clId1", "host1", new ShareMemberAssignment(
Set.of(new TopicPartition("topic1", 0))
), 0)),
GroupState.STABLE,
new Node(0, "host1", 9090), 0, 0);
ListOffsetsResult resultOffsets = new ListOffsetsResult(
Map.of(
new TopicPartition("topic1", 0),
KafkaFuture.completedFuture(new ListOffsetsResult.ListOffsetsResultInfo(0, 0, Optional.empty()))
));
ListOffsetsResult endOffset = mock(ListOffsetsResult.class);
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsetResultMap = new HashMap<>();
endOffsetResultMap.put(new TopicPartition("topic1", 0), new ListOffsetsResult.ListOffsetsResultInfo(30, -1, Optional.empty()));
when(listGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(List.of(firstGroupListing, secondGroupListing)));
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult);
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(firstGroup, KafkaFuture.completedFuture(exp1), secondGroup, KafkaFuture.completedFuture(exp2)));
when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(), any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
when(adminClient.listOffsets(ArgumentMatchers.anyMap())).thenReturn(resultOffsets);
try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
TestUtils.waitForCondition(() -> {
Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
String[] lines = res.getKey().trim().split("\n");
if (lines.length != 2 && !res.getValue().isEmpty()) {
return false;
}
when(startOffset.all()).thenReturn(KafkaFuture.completedFuture(startOffsetResultMap));
when(endOffset.all()).thenReturn(KafkaFuture.completedFuture(endOffsetResultMap));
List<String> expectedValues1 = List.of(firstGroup, "topic1", "0", "0");
List<String> expectedValues2 = List.of(secondGroup, "topic1", "0", "0");
return checkArgsHeaderOutput(cgcArgs, lines[0]) && checkArgsHeaderOutput(cgcArgs, lines[3]) &&
Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues1) &&
Arrays.stream(lines[4].trim().split("\\s+")).toList().equals(expectedValues2);
}, "Expected 2 data rows and no error in describe results with describe type " + String.join(" ", describeType) + ".");
}
}
}
when(adminClient.listOffsets(ArgumentMatchers.anyMap())).thenReturn(startOffset, endOffset);
@Test
public void testDescribeStateOfExistingGroup() throws Exception {
String firstGroup = "group1";
String bootstrapServer = "localhost:9092";
ShareMemberDescription description = new ShareMemberDescription("", "", "",
new ShareMemberAssignment(Set.of(new TopicPartition("topic1", 0))));
ShareGroupService service = new ShareGroupService(null, adminClient);
Map<TopicPartition, Long> lags = service.getOffsets(List.of(description));
assertEquals(1, lags.size());
assertEquals(20, lags.get(new TopicPartition("topic1", 0)));
service.close();
for (List<String> describeType : DESCRIBE_TYPE_STATE) {
List<String> cgcArgs = new ArrayList<>(List.of("--bootstrap-server", bootstrapServer, "--describe", "--group", firstGroup));
cgcArgs.addAll(describeType);
Admin adminClient = mock(KafkaAdminClient.class);
DescribeShareGroupsResult describeShareGroupsResult = mock(DescribeShareGroupsResult.class);
ShareGroupDescription exp1 = new ShareGroupDescription(
firstGroup,
List.of(new ShareMemberDescription("memid1", "clId1", "host1", new ShareMemberAssignment(
Set.of(new TopicPartition("topic1", 0))
), 0)),
GroupState.STABLE,
new Node(0, "host1", 9090), 0, 0);
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(firstGroup, KafkaFuture.completedFuture(exp1)));
when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(), any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
TestUtils.waitForCondition(() -> {
Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
String[] lines = res.getKey().trim().split("\n");
if (lines.length != 2 && !res.getValue().isEmpty()) {
return false;
}
List<String> expectedValues1;
if (describeType.contains("--verbose")) {
expectedValues1 = List.of(firstGroup, "host1:9090", "(0)", "Stable", "0", "0", "1");
} else {
expectedValues1 = List.of(firstGroup, "host1:9090", "(0)", "Stable", "1");
}
return checkArgsHeaderOutput(cgcArgs, lines[0]) &&
Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues1);
}, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + ".");
}
}
}
@Test
public void testDescribeStatesOfAllExistingGroups() throws Exception {
String firstGroup = "group1";
String secondGroup = "group2";
String bootstrapServer = "localhost:9092";
for (List<String> describeType : DESCRIBE_TYPE_STATE) {
List<String> cgcArgs = new ArrayList<>(List.of("--bootstrap-server", bootstrapServer, "--describe", "--all-groups"));
cgcArgs.addAll(describeType);
Admin adminClient = mock(KafkaAdminClient.class);
ListGroupsResult listGroupsResult = mock(ListGroupsResult.class);
GroupListing firstGroupListing = new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE));
GroupListing secondGroupListing = new GroupListing(secondGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE));
DescribeShareGroupsResult describeShareGroupsResult = mock(DescribeShareGroupsResult.class);
ShareGroupDescription exp1 = new ShareGroupDescription(
firstGroup,
List.of(new ShareMemberDescription("memid1", "clId1", "host1", new ShareMemberAssignment(
Set.of(new TopicPartition("topic1", 0))
), 0)),
GroupState.STABLE,
new Node(0, "host1", 9090), 0, 0);
ShareGroupDescription exp2 = new ShareGroupDescription(
secondGroup,
List.of(new ShareMemberDescription("memid1", "clId1", "host1", new ShareMemberAssignment(
Set.of(new TopicPartition("topic1", 0))
), 0)),
GroupState.STABLE,
new Node(0, "host1", 9090), 0, 0);
when(listGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(List.of(firstGroupListing, secondGroupListing)));
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult);
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(firstGroup, KafkaFuture.completedFuture(exp1), secondGroup, KafkaFuture.completedFuture(exp2)));
when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(), any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
TestUtils.waitForCondition(() -> {
Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
String[] lines = res.getKey().trim().split("\n");
if (lines.length != 2 && !res.getValue().isEmpty()) {
return false;
}
List<String> expectedValues1;
List<String> expectedValues2;
if (describeType.contains("--verbose")) {
expectedValues1 = List.of(firstGroup, "host1:9090", "(0)", "Stable", "0", "0", "1");
expectedValues2 = List.of(secondGroup, "host1:9090", "(0)", "Stable", "0", "0", "1");
} else {
expectedValues1 = List.of(firstGroup, "host1:9090", "(0)", "Stable", "1");
expectedValues2 = List.of(secondGroup, "host1:9090", "(0)", "Stable", "1");
}
return checkArgsHeaderOutput(cgcArgs, lines[0]) && checkArgsHeaderOutput(cgcArgs, lines[3]) &&
Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues1) &&
Arrays.stream(lines[4].trim().split("\\s+")).toList().equals(expectedValues2);
}, "Expected 2 data rows and no error in describe results with describe type " + String.join(" ", describeType) + ".");
}
}
}
@Test
public void testDescribeMembersOfExistingGroup() throws Exception {
String firstGroup = "group1";
String bootstrapServer = "localhost:9092";
for (List<String> describeType : DESCRIBE_TYPE_MEMBERS) {
List<String> cgcArgs = new ArrayList<>(List.of("--bootstrap-server", bootstrapServer, "--describe", "--group", firstGroup));
cgcArgs.addAll(describeType);
Admin adminClient = mock(KafkaAdminClient.class);
DescribeShareGroupsResult describeShareGroupsResult = mock(DescribeShareGroupsResult.class);
ShareGroupDescription exp1 = new ShareGroupDescription(
firstGroup,
List.of(new ShareMemberDescription("memid1", "clId1", "host1", new ShareMemberAssignment(
Set.of(new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic2", 0))
), 0)),
GroupState.STABLE,
new Node(0, "host1", 9090), 0, 0);
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(firstGroup, KafkaFuture.completedFuture(exp1)));
when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(), any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
TestUtils.waitForCondition(() -> {
Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
String[] lines = res.getKey().trim().split("\n");
if (lines.length != 2 && !res.getValue().isEmpty()) {
return false;
}
List<String> expectedValues1;
if (describeType.contains("--verbose")) {
expectedValues1 = List.of(firstGroup, "memid1", "host1", "clId1", "0", "topic1:0,1;topic2:0");
} else {
expectedValues1 = List.of(firstGroup, "memid1", "host1", "clId1", "topic1:0,1;topic2:0");
}
return checkArgsHeaderOutput(cgcArgs, lines[0]) &&
Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues1);
}, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + ".");
}
}
}
@Test
public void testDescribeMembersOfAllExistingGroups() throws Exception {
String firstGroup = "group1";
String secondGroup = "group2";
String bootstrapServer = "localhost:9092";
for (List<String> describeType : DESCRIBE_TYPE_MEMBERS) {
List<String> cgcArgs = new ArrayList<>(List.of("--bootstrap-server", bootstrapServer, "--describe", "--all-groups"));
cgcArgs.addAll(describeType);
Admin adminClient = mock(KafkaAdminClient.class);
ListGroupsResult listGroupsResult = mock(ListGroupsResult.class);
GroupListing firstGroupListing = new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE));
GroupListing secondGroupListing = new GroupListing(secondGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE));
DescribeShareGroupsResult describeShareGroupsResult = mock(DescribeShareGroupsResult.class);
ShareGroupDescription exp1 = new ShareGroupDescription(
firstGroup,
List.of(new ShareMemberDescription("memid1", "clId1", "host1", new ShareMemberAssignment(
Set.of(new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic2", 0))
), 0)),
GroupState.STABLE,
new Node(0, "host1", 9090), 0, 0);
ShareGroupDescription exp2 = new ShareGroupDescription(
secondGroup,
List.of(new ShareMemberDescription("memid1", "clId1", "host1", new ShareMemberAssignment(
Set.of(new TopicPartition("topic1", 0))
), 0)),
GroupState.STABLE,
new Node(0, "host1", 9090), 0, 0);
when(listGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(List.of(firstGroupListing, secondGroupListing)));
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult);
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(firstGroup, KafkaFuture.completedFuture(exp1), secondGroup, KafkaFuture.completedFuture(exp2)));
when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(), any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
TestUtils.waitForCondition(() -> {
Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
String[] lines = res.getKey().trim().split("\n");
if (lines.length != 2 && !res.getValue().isEmpty()) {
return false;
}
List<String> expectedValues1;
List<String> expectedValues2;
if (describeType.contains("--verbose")) {
expectedValues1 = List.of(firstGroup, "memid1", "host1", "clId1", "0", "topic1:0,1;topic2:0");
expectedValues2 = List.of(secondGroup, "memid1", "host1", "clId1", "0", "topic1:0");
} else {
expectedValues1 = List.of(firstGroup, "memid1", "host1", "clId1", "topic1:0,1;topic2:0");
expectedValues2 = List.of(secondGroup, "memid1", "host1", "clId1", "topic1:0");
}
return checkArgsHeaderOutput(cgcArgs, lines[0]) && checkArgsHeaderOutput(cgcArgs, lines[3]) &&
Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues1) &&
Arrays.stream(lines[4].trim().split("\\s+")).toList().equals(expectedValues2);
}, "Expected 2 data rows and no error in describe results with describe type " + String.join(" ", describeType) + ".");
}
}
}
@Test
public void testDescribeNonexistentGroup() {
String missingGroup = "missing.group";
String bootstrapServer = "localhost:9092";
for (List<String> describeType : DESCRIBE_TYPES) {
// note the group to be queried is a different (non-existing) group
List<String> cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer, "--describe", "--group", missingGroup));
cgcArgs.addAll(describeType);
Admin adminClient = mock(KafkaAdminClient.class);
DescribeShareGroupsResult describeShareGroupsResult = mock(DescribeShareGroupsResult.class);
KafkaFutureImpl<ShareGroupDescription> missingGroupFuture = new KafkaFutureImpl<>();
missingGroupFuture.completeExceptionally(new GroupIdNotFoundException("Group " + missingGroup + " not found."));
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(missingGroup, missingGroupFuture));
when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(), any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
service.describeGroups();
fail("Expected error was not detected for describe option '" + String.join(" ", describeType) + "'");
} catch (ExecutionException ee) {
assertInstanceOf(GroupIdNotFoundException.class, ee.getCause());
assertEquals("Group " + missingGroup + " not found.", ee.getCause().getMessage());
} catch (Exception e) {
fail("Expected error was not detected for describe option '" + String.join(" ", describeType) + "'");
}
}
}
@Test
public void testDescribeShareGroupsInvalidVerboseOption() {
String bootstrapServer = "localhost:9092";
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--describe", "--group", "group1", "--verbose"};
assertThrows(OptionException.class, () -> getShareGroupService(cgcArgs, new MockAdminClient()));
}
@Test
public void testDescribeShareGroupsOffsetsInvalidVerboseOption() {
String bootstrapServer = "localhost:9092";
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--describe", "--group", "group1", "--offsets", "--verbose"};
assertThrows(OptionException.class, () -> getShareGroupService(cgcArgs, new MockAdminClient()));
}
@Test
@ -145,65 +496,23 @@ public class ShareGroupCommandTest {
@Test
public void testListWithUnrecognizedOption() {
String bootstrapServer = "localhost:9092";
String[] cgcArgs = new String[]{"--frivolous-nonsense", "--bootstrap-server", bootstrapServer, "--list"};
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--list", "--verbose"};
assertThrows(OptionException.class, () -> getShareGroupService(cgcArgs, new MockAdminClient()));
}
@Test
public void testListShareGroupsWithStates() throws Exception {
String firstGroup = "first-group";
String secondGroup = "second-group";
String bootstrapServer = "localhost:9092";
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--list", "--state"};
Admin adminClient = mock(KafkaAdminClient.class);
ListGroupsResult resultWithAllStates = mock(ListGroupsResult.class);
when(resultWithAllStates.all()).thenReturn(KafkaFuture.completedFuture(Arrays.asList(
new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)),
new GroupListing(secondGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.EMPTY))
)));
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithAllStates);
ShareGroupService service = getShareGroupService(cgcArgs, adminClient);
Set<GroupListing> expectedListing = new HashSet<>(Arrays.asList(
new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)),
new GroupListing(secondGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.EMPTY))));
final Set[] foundListing = new Set[]{Collections.emptySet()};
TestUtils.waitForCondition(() -> {
foundListing[0] = new HashSet<>(service.listShareGroupsInStates(new HashSet<>(Arrays.asList(GroupState.values()))));
return Objects.equals(expectedListing, foundListing[0]);
}, "Expected to show groups " + expectedListing + ", but found " + foundListing[0]);
ListGroupsResult resultWithStableState = mock(ListGroupsResult.class);
when(resultWithStableState.all()).thenReturn(KafkaFuture.completedFuture(Collections.singletonList(
new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE))
)));
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithStableState);
Set<GroupListing> expectedListingStable = Collections.singleton(
new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)));
foundListing[0] = Collections.emptySet();
TestUtils.waitForCondition(() -> {
foundListing[0] = new HashSet<>(service.listShareGroupsInStates(Collections.singleton(GroupState.STABLE)));
return Objects.equals(expectedListingStable, foundListing[0]);
}, "Expected to show groups " + expectedListingStable + ", but found " + foundListing[0]);
service.close();
}
@Test
public void testGroupStatesFromString() {
Set<GroupState> result = ShareGroupCommand.groupStatesFromString("Stable");
assertEquals(Collections.singleton(GroupState.STABLE), result);
assertEquals(Set.of(GroupState.STABLE), result);
result = ShareGroupCommand.groupStatesFromString("stable");
assertEquals(new HashSet<>(Collections.singletonList(GroupState.STABLE)), result);
assertEquals(Set.of(GroupState.STABLE), result);
result = ShareGroupCommand.groupStatesFromString("dead");
assertEquals(new HashSet<>(Collections.singletonList(GroupState.DEAD)), result);
assertEquals(Set.of(GroupState.DEAD), result);
result = ShareGroupCommand.groupStatesFromString("empty");
assertEquals(new HashSet<>(Collections.singletonList(GroupState.EMPTY)), result);
assertEquals(Set.of(GroupState.EMPTY), result);
assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.groupStatesFromString("assigning"));
@ -218,4 +527,44 @@ public class ShareGroupCommandTest {
ShareGroupCommandOptions opts = new ShareGroupCommandOptions(args);
return new ShareGroupService(opts, adminClient);
}
private Runnable describeGroups(ShareGroupCommand.ShareGroupService service) {
return () -> Assertions.assertDoesNotThrow(service::describeGroups);
}
private boolean checkArgsHeaderOutput(List<String> args, String output) {
if (!output.contains("GROUP")) {
return false;
}
if (args.contains("--members")) {
return checkMembersArgsHeaderOutput(output, args.contains("--verbose"));
}
if (args.contains("--state")) {
return checkStateArgsHeaderOutput(output, args.contains("--verbose"));
}
// --offsets or no arguments
return checkOffsetsArgsHeaderOutput(output);
}
private boolean checkOffsetsArgsHeaderOutput(String output) {
List<String> expectedKeys = List.of("GROUP", "TOPIC", "PARTITION", "START-OFFSET");
return Arrays.stream(output.trim().split("\\s+")).toList().equals(expectedKeys);
}
private boolean checkMembersArgsHeaderOutput(String output, boolean verbose) {
List<String> expectedKeys = verbose ?
List.of("GROUP", "CONSUMER-ID", "HOST", "CLIENT-ID", "MEMBER-EPOCH", "ASSIGNMENT") :
List.of("GROUP", "CONSUMER-ID", "HOST", "CLIENT-ID", "ASSIGNMENT");
return Arrays.stream(output.trim().split("\\s+")).toList().equals(expectedKeys);
}
private boolean checkStateArgsHeaderOutput(String output, boolean verbose) {
List<String> expectedKeys = verbose ?
List.of("GROUP", "COORDINATOR", "(ID)", "STATE", "GROUP-EPOCH", "ASSIGNMENT-EPOCH", "#MEMBERS") :
List.of("GROUP", "COORDINATOR", "(ID)", "STATE", "#MEMBERS");
return Arrays.stream(output.trim().split("\\s+")).toList().equals(expectedKeys);
}
}