From 0344f8f5ae4314d0723e69921c7b54715964fae9 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Thu, 2 Jan 2025 09:12:52 +0000 Subject: [PATCH] KAFKA-18273: KIP-1099 verbose display share group options (#18259) Reviewers: Manikumar Reddy --- .../clients/admin/ShareGroupDescription.java | 32 +- .../clients/admin/ShareMemberDescription.java | 21 +- .../internals/DescribeShareGroupsHandler.java | 5 +- .../clients/admin/KafkaAdminClientTest.java | 3 +- .../ShareGroupAutoOffsetResetStrategy.java | 13 +- .../consumer/group/ShareGroupCommand.java | 252 ++++++--- .../group/ShareGroupCommandOptions.java | 109 ++-- .../consumer/group/ShareGroupCommandTest.java | 529 +++++++++++++++--- 8 files changed, 740 insertions(+), 224 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupDescription.java index 913667bcf47..469c23428eb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupDescription.java @@ -38,24 +38,32 @@ public class ShareGroupDescription { private final Collection members; private final GroupState groupState; private final Node coordinator; + private final int groupEpoch; + private final int targetAssignmentEpoch; private final Set authorizedOperations; public ShareGroupDescription(String groupId, Collection members, GroupState groupState, - Node coordinator) { - this(groupId, members, groupState, coordinator, Collections.emptySet()); + Node coordinator, + int groupEpoch, + int targetAssignmentEpoch) { + this(groupId, members, groupState, coordinator, groupEpoch, targetAssignmentEpoch, Collections.emptySet()); } public ShareGroupDescription(String groupId, Collection members, GroupState groupState, Node coordinator, + int groupEpoch, + int targetAssignmentEpoch, Set authorizedOperations) { this.groupId = groupId == null ? "" : groupId; this.members = members == null ? Collections.emptyList() : List.copyOf(members); this.groupState = groupState; this.coordinator = coordinator; + this.groupEpoch = groupEpoch; + this.targetAssignmentEpoch = targetAssignmentEpoch; this.authorizedOperations = authorizedOperations; } @@ -68,12 +76,14 @@ public class ShareGroupDescription { Objects.equals(members, that.members) && groupState == that.groupState && Objects.equals(coordinator, that.coordinator) && + groupEpoch == that.groupEpoch && + targetAssignmentEpoch == that.targetAssignmentEpoch && Objects.equals(authorizedOperations, that.authorizedOperations); } @Override public int hashCode() { - return Objects.hash(groupId, members, groupState, coordinator, authorizedOperations); + return Objects.hash(groupId, members, groupState, coordinator, groupEpoch, targetAssignmentEpoch, authorizedOperations); } /** @@ -111,12 +121,28 @@ public class ShareGroupDescription { return authorizedOperations; } + /** + * The epoch of the share group. + */ + public int groupEpoch() { + return groupEpoch; + } + + /** + * The epoch of the target assignment. + */ + public int targetAssignmentEpoch() { + return targetAssignmentEpoch; + } + @Override public String toString() { return "(groupId=" + groupId + ", members=" + members.stream().map(ShareMemberDescription::toString).collect(Collectors.joining(",")) + ", groupState=" + groupState + ", coordinator=" + coordinator + + ", groupEpoch=" + groupEpoch + + ", targetAssignmentEpoch=" + targetAssignmentEpoch + ", authorizedOperations=" + authorizedOperations + ")"; } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ShareMemberDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/ShareMemberDescription.java index 57f2d90ae86..5fb74d8b242 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ShareMemberDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ShareMemberDescription.java @@ -30,18 +30,21 @@ public class ShareMemberDescription { private final String clientId; private final String host; private final ShareMemberAssignment assignment; + private final int memberEpoch; public ShareMemberDescription( String memberId, String clientId, String host, - ShareMemberAssignment assignment + ShareMemberAssignment assignment, + int memberEpoch ) { this.memberId = memberId == null ? "" : memberId; this.clientId = clientId == null ? "" : clientId; this.host = host == null ? "" : host; this.assignment = assignment == null ? new ShareMemberAssignment(Collections.emptySet()) : assignment; + this.memberEpoch = memberEpoch; } @Override @@ -52,12 +55,13 @@ public class ShareMemberDescription { return memberId.equals(that.memberId) && clientId.equals(that.clientId) && host.equals(that.host) && - assignment.equals(that.assignment); + assignment.equals(that.assignment) && + memberEpoch == that.memberEpoch; } @Override public int hashCode() { - return Objects.hash(memberId, clientId, host, assignment); + return Objects.hash(memberId, clientId, host, assignment, memberEpoch); } /** @@ -88,11 +92,20 @@ public class ShareMemberDescription { return assignment; } + /** + * The epoch of the group member. + */ + public int memberEpoch() { + return memberEpoch; + } + @Override public String toString() { return "(memberId=" + memberId + ", clientId=" + clientId + ", host=" + host + - ", assignment=" + assignment + ")"; + ", assignment=" + assignment + + ", memberEpoch=" + memberEpoch + + ")"; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java index a763a4255e6..1c79225b083 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java @@ -121,7 +121,8 @@ public class DescribeShareGroupsHandler extends AdminApiHandler.Batched'."); + name + ". The value must be either 'earliest', 'latest' or of the format 'by_duration:'."); } } public String toString() { - return "[earliest, latest, by_duration:PnDTnHnMn.nS]"; + String values = Arrays.stream(StrategyType.values()) + .map(strategyType -> { + if (strategyType == StrategyType.BY_DURATION) { + return strategyType + ":PnDTnHnMn.nS"; + } + return strategyType.toString(); + }).collect(Collectors.joining(", ")); + return "[" + values + "]"; } } } diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java index 8e774da0988..17bb993bc81 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java @@ -18,21 +18,24 @@ package org.apache.kafka.tools.consumer.group; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.DescribeShareGroupsResult; +import org.apache.kafka.clients.admin.DescribeShareGroupsOptions; import org.apache.kafka.clients.admin.GroupListing; import org.apache.kafka.clients.admin.ListGroupsOptions; import org.apache.kafka.clients.admin.ListGroupsResult; import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.ShareGroupDescription; +import org.apache.kafka.clients.admin.ShareMemberAssignment; import org.apache.kafka.clients.admin.ShareMemberDescription; import org.apache.kafka.common.GroupState; import org.apache.kafka.common.GroupType; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.server.util.CommandLineUtils; import java.io.IOException; +import java.util.AbstractMap.SimpleImmutableEntry; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -40,9 +43,11 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -186,103 +191,185 @@ public class ShareGroupCommand { } public void describeGroups() throws ExecutionException, InterruptedException { - String group = opts.options.valueOf(opts.groupOpt); - ShareGroupDescription description = getDescribeGroup(group); - if (description == null) - return; + Collection groupIds = opts.options.has(opts.allGroupsOpt) + ? listShareGroups() + : opts.options.valuesOf(opts.groupOpt); if (opts.options.has(opts.membersOpt)) { - printMembers(description); + TreeMap members = collectGroupsDescription(groupIds); + printMembers(members, opts.options.has(opts.verboseOpt)); } else if (opts.options.has(opts.stateOpt)) { - printStates(description); + TreeMap states = collectGroupsDescription(groupIds); + printStates(states, opts.options.has(opts.verboseOpt)); } else { - printOffsets(description); + TreeMap>> offsets + = collectGroupsOffsets(groupIds); + printOffsets(offsets); } } - ShareGroupDescription getDescribeGroup(String group) throws ExecutionException, InterruptedException { - DescribeShareGroupsResult result = adminClient.describeShareGroups(List.of(group)); - Map descriptionMap = result.all().get(); - if (descriptionMap.containsKey(group)) { - return descriptionMap.get(group); + Map describeShareGroups(Collection groupIds) throws ExecutionException, InterruptedException { + Map res = new HashMap<>(); + Map> stringKafkaFutureMap = adminClient.describeShareGroups( + groupIds, + new DescribeShareGroupsOptions().timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue()) + ).describedGroups(); + + for (Entry> e : stringKafkaFutureMap.entrySet()) { + res.put(e.getKey(), e.getValue().get()); } - return null; + return res; } - Map getOffsets(Collection members) throws ExecutionException, InterruptedException { - Set allTp = new HashSet<>(); - for (ShareMemberDescription memberDescription : members) { - allTp.addAll(memberDescription.assignment().topicPartitions()); - } - // fetch latest and earliest offsets - Map earliest = new HashMap<>(); - Map latest = new HashMap<>(); - - for (TopicPartition tp : allTp) { - earliest.put(tp, OffsetSpec.earliest()); - latest.put(tp, OffsetSpec.latest()); - } - // This call to obtain the earliest offsets will be replaced once adminClient.listShareGroupOffsets is implemented - Map earliestResult = adminClient.listOffsets(earliest).all().get(); - Map latestResult = adminClient.listOffsets(latest).all().get(); - - Map lag = new HashMap<>(); - for (Map.Entry tp : earliestResult.entrySet()) { - lag.put(tp.getKey(), latestResult.get(tp.getKey()).offset() - earliestResult.get(tp.getKey()).offset()); - } - return lag; + TreeMap collectGroupsDescription(Collection groupIds) throws ExecutionException, InterruptedException { + Map shareGroups = describeShareGroups(groupIds); + TreeMap res = new TreeMap<>(); + shareGroups.forEach(res::put); + return res; } - private void printOffsets(ShareGroupDescription description) throws ExecutionException, InterruptedException { - Map offsets = getOffsets(description.members()); - if (maybePrintEmptyGroupState(description.groupId(), description.groupState(), offsets.size())) { - String fmt = printOffsetFormat(description, offsets); - System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "START-OFFSET"); + TreeMap>> collectGroupsOffsets(Collection groupIds) throws ExecutionException, InterruptedException { + Map shareGroups = describeShareGroups(groupIds); + TreeMap>> groupOffsets = new TreeMap<>(); - for (Map.Entry offset : offsets.entrySet()) { - System.out.printf(fmt, description.groupId(), offset.getKey().topic(), offset.getKey().partition(), offset.getValue()); + shareGroups.forEach((groupId, shareGroup) -> { + Set allTp = new HashSet<>(); + for (ShareMemberDescription memberDescription : shareGroup.members()) { + allTp.addAll(memberDescription.assignment().topicPartitions()); } - } + + // Fetch latest and earliest offsets + Map earliest = new HashMap<>(); + Map latest = new HashMap<>(); + + for (TopicPartition tp : allTp) { + earliest.put(tp, OffsetSpec.earliest()); + latest.put(tp, OffsetSpec.latest()); + } + + // This call to obtain the earliest offsets will be replaced once adminClient.listShareGroupOffsets is implemented + try { + Map earliestResult = adminClient.listOffsets(earliest).all().get(); + Map latestResult = adminClient.listOffsets(latest).all().get(); + + Set partitionOffsets = new HashSet<>(); + + for (Entry tp : earliestResult.entrySet()) { + SharePartitionOffsetInformation partitionOffsetInfo = new SharePartitionOffsetInformation( + groupId, + tp.getKey().topic(), + tp.getKey().partition(), + latestResult.get(tp.getKey()).offset() - earliestResult.get(tp.getKey()).offset() + ); + partitionOffsets.add(partitionOffsetInfo); + } + groupOffsets.put(groupId, new SimpleImmutableEntry<>(shareGroup, partitionOffsets)); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + + return groupOffsets; } - private static String printOffsetFormat(ShareGroupDescription description, Map offsets) { - int groupLen = Math.max(15, description.groupId().length()); + private void printOffsets(TreeMap>> offsets) { + offsets.forEach((groupId, tuple) -> { + ShareGroupDescription description = tuple.getKey(); + Collection offsetsInfo = tuple.getValue(); + if (maybePrintEmptyGroupState(groupId, description.groupState(), offsetsInfo.size())) { + String fmt = printOffsetFormat(groupId, offsetsInfo); + System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "START-OFFSET"); + + for (SharePartitionOffsetInformation info : offsetsInfo) { + System.out.printf(fmt, + groupId, + info.topic, + info.partition, + info.offset + ); + } + System.out.println(); + } + }); + } + + private static String printOffsetFormat(String groupId, Collection offsetsInfo) { + int groupLen = Math.max(15, groupId.length()); int maxTopicLen = 15; - for (TopicPartition topicPartition : offsets.keySet()) { - maxTopicLen = Math.max(maxTopicLen, topicPartition.topic().length()); + for (SharePartitionOffsetInformation info : offsetsInfo) { + maxTopicLen = Math.max(maxTopicLen, info.topic.length()); } - return "%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s %s\n"; + return "\n%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s %s"; } - private void printStates(ShareGroupDescription description) { - maybePrintEmptyGroupState(description.groupId(), description.groupState(), 1); + private void printStates(Map descriptions, boolean verbose) { + descriptions.forEach((groupId, description) -> { + maybePrintEmptyGroupState(groupId, description.groupState(), 1); - int groupLen = Math.max(15, description.groupId().length()); - String coordinator = description.coordinator().host() + ":" + description.coordinator().port() + " (" + description.coordinator().idString() + ")"; - int coordinatorLen = Math.max(25, coordinator.length()); + int groupLen = Math.max(15, groupId.length()); + String coordinator = description.coordinator().host() + ":" + description.coordinator().port() + " (" + description.coordinator().idString() + ")"; + int coordinatorLen = Math.max(25, coordinator.length()); - String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s %-15s %s\n"; - System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE", "#MEMBERS"); - System.out.printf(fmt, description.groupId(), coordinator, description.groupState().toString(), description.members().size()); + if (verbose) { + String fmt = "\n%" + -groupLen + "s %" + -coordinatorLen + "s %-15s %-12s %-17s %s"; + System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE", "GROUP-EPOCH", "ASSIGNMENT-EPOCH", "#MEMBERS"); + System.out.printf(fmt, groupId, coordinator, description.groupState().toString(), + description.groupEpoch(), description.targetAssignmentEpoch(), description.members().size()); + } else { + String fmt = "\n%" + -groupLen + "s %" + -coordinatorLen + "s %-15s %s"; + System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE", "#MEMBERS"); + System.out.printf(fmt, groupId, coordinator, description.groupState().toString(), description.members().size()); + } + System.out.println(); + }); } - private void printMembers(ShareGroupDescription description) { - int groupLen = Math.max(15, description.groupId().length()); - int maxConsumerIdLen = 15, maxHostLen = 15, maxClientIdLen = 15; - Collection members = description.members(); - if (maybePrintEmptyGroupState(description.groupId(), description.groupState(), description.members().size())) { - for (ShareMemberDescription member : members) { - maxConsumerIdLen = Math.max(maxConsumerIdLen, member.consumerId().length()); - maxHostLen = Math.max(maxHostLen, member.host().length()); - maxClientIdLen = Math.max(maxClientIdLen, member.clientId().length()); - } + private void printMembers(TreeMap descriptions, boolean verbose) { + descriptions.forEach((groupId, description) -> { + int groupLen = Math.max(15, groupId.length()); + int maxConsumerIdLen = 15, maxHostLen = 15, maxClientIdLen = 15; + Collection members = description.members(); + if (maybePrintEmptyGroupState(groupId, description.groupState(), description.members().size())) { + for (ShareMemberDescription member : members) { + maxConsumerIdLen = Math.max(maxConsumerIdLen, member.consumerId().length()); + maxHostLen = Math.max(maxHostLen, member.host().length()); + maxClientIdLen = Math.max(maxClientIdLen, member.clientId().length()); + } - String fmt = "%" + -groupLen + "s %" + -maxConsumerIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %s\n"; - System.out.printf(fmt, "GROUP", "CONSUMER-ID", "HOST", "CLIENT-ID", "ASSIGNMENT"); - for (ShareMemberDescription member : members) { - System.out.printf(fmt, description.groupId(), member.consumerId(), member.host(), member.clientId(), - member.assignment().topicPartitions().stream().map(part -> part.topic() + ":" + part.partition()).collect(Collectors.joining(","))); + if (verbose) { + String fmt = "\n%" + -groupLen + "s %" + -maxConsumerIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %-13s %s"; + System.out.printf(fmt, "GROUP", "CONSUMER-ID", "HOST", "CLIENT-ID", "MEMBER-EPOCH", "ASSIGNMENT"); + for (ShareMemberDescription member : members) { + System.out.printf(fmt, groupId, member.consumerId(), member.host(), member.clientId(), member.memberEpoch(), getAssignmentString(member.assignment())); + } + } else { + String fmt = "\n%" + -groupLen + "s %" + -maxConsumerIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %s"; + System.out.printf(fmt, "GROUP", "CONSUMER-ID", "HOST", "CLIENT-ID", "ASSIGNMENT"); + for (ShareMemberDescription member : members) { + System.out.printf(fmt, groupId, member.consumerId(), member.host(), member.clientId(), getAssignmentString(member.assignment())); + } + } + System.out.println(); } - } + }); + } + + private String getAssignmentString(ShareMemberAssignment assignment) { + Map> grouped = new HashMap<>(); + assignment.topicPartitions().forEach(tp -> + grouped + .computeIfAbsent(tp.topic(), key -> new ArrayList<>()) + .add(tp) + ); + return grouped.entrySet().stream().map(entry -> { + String topicName = entry.getKey(); + List topicPartitions = entry.getValue(); + return topicPartitions + .stream() + .map(TopicPartition::partition) + .map(Object::toString) + .sorted() + .collect(Collectors.joining(",", topicName + ":", "")); + }).sorted().collect(Collectors.joining(";")); } public void close() { @@ -296,4 +383,23 @@ public class ShareGroupCommand { return Admin.create(props); } } + + static class SharePartitionOffsetInformation { + final String group; + final String topic; + final int partition; + final long offset; + + SharePartitionOffsetInformation( + String group, + String topic, + int partition, + long offset + ) { + this.group = group; + this.topic = topic; + this.partition = partition; + this.offset = offset; + } + } } diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java index c30480fccb9..f31f81a2f76 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java @@ -33,60 +33,66 @@ import joptsimple.OptionSpec; import static org.apache.kafka.tools.ToolsUtils.minus; public class ShareGroupCommandOptions extends CommandDefaultOptions { - public static final Logger LOGGER = LoggerFactory.getLogger(ShareGroupCommandOptions.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ShareGroupCommandOptions.class); - public static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) to connect to."; - public static final String GROUP_DOC = "The share group we wish to act on."; - public static final String TOPIC_DOC = "The topic whose share group information should be deleted or topic whose should be included in the reset offset process. " + + private static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) to connect to."; + private static final String GROUP_DOC = "The share group we wish to act on."; + private static final String TOPIC_DOC = "The topic whose share group information should be deleted or topic whose should be included in the reset offset process. " + "When resetting offsets, partitions can be specified using this format: 'topic1:0,1,2', where 0,1,2 are the partitions to be included."; - public static final String ALL_TOPICS_DOC = "Consider all topics assigned to a share group in the 'reset-offsets' process."; - public static final String LIST_DOC = "List all share groups."; - public static final String DESCRIBE_DOC = "Describe share group, members and offset information."; - public static final String NL = System.lineSeparator(); - public static final String DELETE_DOC = "Delete share group."; - public static final String TIMEOUT_MS_DOC = "The timeout that can be set for some use cases. For example, it can be used when describing the group " + + private static final String ALL_TOPICS_DOC = "Consider all topics assigned to a share group in the 'reset-offsets' process."; + private static final String LIST_DOC = "List all share groups."; + private static final String DESCRIBE_DOC = "Describe share group, members and offset information."; + private static final String ALL_GROUPS_DOC = "Apply to all share groups."; + private static final String NL = System.lineSeparator(); + private static final String DELETE_DOC = "Delete share group."; + private static final String TIMEOUT_MS_DOC = "The timeout that can be set for some use cases. For example, it can be used when describing the group " + "to specify the maximum amount of time in milliseconds to wait before the group stabilizes."; - public static final String COMMAND_CONFIG_DOC = "Property file containing configs to be passed to Admin Client."; - public static final String RESET_OFFSETS_DOC = "Reset offsets of share group. Supports one share group at the time, and instances must be inactive." + NL + + private static final String COMMAND_CONFIG_DOC = "Property file containing configs to be passed to Admin Client."; + private static final String RESET_OFFSETS_DOC = "Reset offsets of share group. Supports one share group at the time, and instances must be inactive." + NL + "Has 2 execution options: --dry-run (the default) to plan which offsets to reset, and --execute to reset the offsets. " + NL + "You must choose one of the following reset specifications: --to-datetime, --to-earliest, --to-latest." + NL + "To define the scope use --all-topics or --topic."; - public static final String DRY_RUN_DOC = "Only show results without executing changes on share groups. Supported operations: reset-offsets."; - public static final String EXECUTE_DOC = "Execute operation. Supported operations: reset-offsets."; - public static final String RESET_TO_DATETIME_DOC = "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'"; - public static final String RESET_TO_EARLIEST_DOC = "Reset offsets to earliest offset."; - public static final String RESET_TO_LATEST_DOC = "Reset offsets to latest offset."; - public static final String MEMBERS_DOC = "Describe members of the group. This option may be used with the '--describe' option only."; - public static final String OFFSETS_DOC = "Describe the group and list all topic partitions in the group along with their offset information. " + + private static final String DRY_RUN_DOC = "Only show results without executing changes on share groups. Supported operations: reset-offsets."; + private static final String EXECUTE_DOC = "Execute operation. Supported operations: reset-offsets."; + private static final String RESET_TO_DATETIME_DOC = "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'"; + private static final String RESET_TO_EARLIEST_DOC = "Reset offsets to earliest offset."; + private static final String RESET_TO_LATEST_DOC = "Reset offsets to latest offset."; + private static final String MEMBERS_DOC = "Describe members of the group. This option may be used with the '--describe' option only."; + private static final String OFFSETS_DOC = "Describe the group and list all topic partitions in the group along with their offset information. " + "This is the default sub-action and may be used with the '--describe' option only."; - public static final String STATE_DOC = "When specified with '--describe', includes the state of the group." + NL + + private static final String STATE_DOC = "When specified with '--describe', includes the state of the group." + NL + "When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states. " + "Valid values are Empty, Stable and Dead."; - public static final String DELETE_OFFSETS_DOC = "Delete offsets of share group. Supports one share group at the time, and multiple topics."; + private static final String VERBOSE_DOC = "Provide additional information, if any, when describing the group. This option may be used " + + "with the '--describe --state' and '--describe --members' options only."; + private static final String DELETE_OFFSETS_DOC = "Delete offsets of share group. Supports one share group at the time, and multiple topics."; - public final OptionSpec bootstrapServerOpt; - public final OptionSpec groupOpt; - public final OptionSpec topicOpt; - public final OptionSpec allTopicsOpt; - public final OptionSpec listOpt; - public final OptionSpec describeOpt; - public final OptionSpec deleteOpt; - public final OptionSpec timeoutMsOpt; - public final OptionSpec commandConfigOpt; - public final OptionSpec resetOffsetsOpt; - public final OptionSpec deleteOffsetsOpt; - public final OptionSpec dryRunOpt; - public final OptionSpec executeOpt; - public final OptionSpec resetToDatetimeOpt; - public final OptionSpec resetToEarliestOpt; - public final OptionSpec resetToLatestOpt; - public final OptionSpec membersOpt; - public final OptionSpec offsetsOpt; - public final OptionSpec stateOpt; + final OptionSpec bootstrapServerOpt; + final OptionSpec groupOpt; + final OptionSpec topicOpt; + final OptionSpec allTopicsOpt; + final OptionSpec listOpt; + final OptionSpec describeOpt; + final OptionSpec allGroupsOpt; + final OptionSpec deleteOpt; + final OptionSpec timeoutMsOpt; + final OptionSpec commandConfigOpt; + final OptionSpec resetOffsetsOpt; + final OptionSpec deleteOffsetsOpt; + final OptionSpec dryRunOpt; + final OptionSpec executeOpt; + final OptionSpec resetToDatetimeOpt; + final OptionSpec resetToEarliestOpt; + final OptionSpec resetToLatestOpt; + final OptionSpec membersOpt; + final OptionSpec offsetsOpt; + final OptionSpec stateOpt; + final OptionSpec verboseOpt; - public final Set> allShareGroupLevelOpts; - public final Set> allResetOffsetScenarioOpts; - public final Set> allDeleteOffsetsOpts; + final Set> allGroupSelectionScopeOpts; + final Set> allShareGroupLevelOpts; + final Set> allResetOffsetScenarioOpts; + final Set> allDeleteOffsetsOpts; public ShareGroupCommandOptions(String[] args) { super(args); @@ -106,6 +112,7 @@ public class ShareGroupCommandOptions extends CommandDefaultOptions { allTopicsOpt = parser.accepts("all-topics", ALL_TOPICS_DOC); listOpt = parser.accepts("list", LIST_DOC); describeOpt = parser.accepts("describe", DESCRIBE_DOC); + allGroupsOpt = parser.accepts("all-groups", ALL_GROUPS_DOC); deleteOpt = parser.accepts("delete", DELETE_DOC); timeoutMsOpt = parser.accepts("timeout", TIMEOUT_MS_DOC) .withRequiredArg() @@ -134,8 +141,12 @@ public class ShareGroupCommandOptions extends CommandDefaultOptions { .availableIf(describeOpt, listOpt) .withOptionalArg() .ofType(String.class); + verboseOpt = parser.accepts("verbose", VERBOSE_DOC) + .availableIf(membersOpt, stateOpt) + .availableUnless(listOpt); - allShareGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt, describeOpt, deleteOpt, resetOffsetsOpt)); + allGroupSelectionScopeOpts = new HashSet<>(Arrays.asList(groupOpt, allGroupsOpt)); + allShareGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt, describeOpt, deleteOpt, deleteOffsetsOpt, resetOffsetsOpt)); allResetOffsetScenarioOpts = new HashSet<>(Arrays.asList(resetToDatetimeOpt, resetToEarliestOpt, resetToLatestOpt)); allDeleteOffsetsOpts = new HashSet<>(Arrays.asList(groupOpt, topicOpt)); @@ -149,9 +160,9 @@ public class ShareGroupCommandOptions extends CommandDefaultOptions { CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt); if (options.has(describeOpt)) { - if (!options.has(groupOpt)) + if (!options.has(groupOpt) && !options.has(allGroupsOpt)) CommandLineUtils.printUsageAndExit(parser, - "Option " + describeOpt + " takes the option: " + groupOpt); + "Option " + describeOpt + " takes one of these options: " + allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(", "))); List> mutuallyExclusiveOpts = Arrays.asList(membersOpt, offsetsOpt, stateOpt); if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ? 1 : 0).sum() > 1) { CommandLineUtils.printUsageAndExit(parser, @@ -160,9 +171,6 @@ public class ShareGroupCommandOptions extends CommandDefaultOptions { if (options.has(stateOpt) && options.valueOf(stateOpt) != null) CommandLineUtils.printUsageAndExit(parser, "Option " + describeOpt + " does not take a value for " + stateOpt); - } else { - if (options.has(timeoutMsOpt)) - LOGGER.debug("Option " + timeoutMsOpt + " is applicable only when " + describeOpt + " is used."); } if (options.has(deleteOpt)) { @@ -197,7 +205,8 @@ public class ShareGroupCommandOptions extends CommandDefaultOptions { CommandLineUtils.checkInvalidArgs(parser, options, resetToLatestOpt, minus(allResetOffsetScenarioOpts, resetToLatestOpt)); } - CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, minus(allShareGroupLevelOpts, describeOpt, deleteOpt, resetOffsetsOpt)); + CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, minus(allGroupSelectionScopeOpts, groupOpt)); + CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, minus(allShareGroupLevelOpts, describeOpt, deleteOpt, deleteOffsetsOpt, resetOffsetsOpt)); CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, minus(allShareGroupLevelOpts, deleteOpt, resetOffsetsOpt)); } } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java index ce132effe09..11fd3bc35a9 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.tools.consumer.group; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.DescribeShareGroupsOptions; import org.apache.kafka.clients.admin.DescribeShareGroupsResult; import org.apache.kafka.clients.admin.GroupListing; import org.apache.kafka.clients.admin.KafkaAdminClient; @@ -32,33 +33,46 @@ import org.apache.kafka.common.GroupType; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.test.TestUtils; +import org.apache.kafka.tools.ToolsTestUtils; import org.apache.kafka.tools.consumer.group.ShareGroupCommand.ShareGroupService; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.mockito.ArgumentMatchers; +import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Stream; import joptsimple.OptionException; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class ShareGroupCommandTest { + private static final List> DESCRIBE_TYPE_OFFSETS = List.of(List.of(""), List.of("--offsets")); + private static final List> DESCRIBE_TYPE_MEMBERS = List.of(List.of("--members"), List.of("--members", "--verbose")); + private static final List> DESCRIBE_TYPE_STATE = List.of(List.of("--state"), List.of("--state", "--verbose")); + private static final List> DESCRIBE_TYPES = Stream.of(DESCRIBE_TYPE_OFFSETS, DESCRIBE_TYPE_MEMBERS, DESCRIBE_TYPE_STATE).flatMap(Collection::stream).toList(); @Test public void testListShareGroups() throws Exception { @@ -73,64 +87,401 @@ public class ShareGroupCommandTest { new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)), new GroupListing(secondGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.EMPTY)) ))); - when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(result); - ShareGroupService service = getShareGroupService(cgcArgs, adminClient); - Set expectedGroups = new HashSet<>(Arrays.asList(firstGroup, secondGroup)); - final Set[] foundGroups = new Set[]{Collections.emptySet()}; - TestUtils.waitForCondition(() -> { - foundGroups[0] = new HashSet<>(service.listShareGroups()); - return Objects.equals(expectedGroups, foundGroups[0]); - }, "Expected --list to show groups " + expectedGroups + ", but found " + foundGroups[0] + "."); - service.close(); + when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(result); + try (ShareGroupService service = getShareGroupService(cgcArgs, adminClient)) { + Set expectedGroups = new HashSet<>(Arrays.asList(firstGroup, secondGroup)); + + final Set[] foundGroups = new Set[]{Set.of()}; + TestUtils.waitForCondition(() -> { + foundGroups[0] = new HashSet<>(service.listShareGroups()); + return Objects.equals(expectedGroups, foundGroups[0]); + }, "Expected --list to show groups " + expectedGroups + ", but found " + foundGroups[0] + "."); + } } @Test - public void testDescribeShareGroups() throws Exception { - String firstGroup = "group1"; + public void testListShareGroupsWithStates() throws Exception { + String firstGroup = "first-group"; + String secondGroup = "second-group"; + String bootstrapServer = "localhost:9092"; + + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--list", "--state"}; Admin adminClient = mock(KafkaAdminClient.class); - DescribeShareGroupsResult result = mock(DescribeShareGroupsResult.class); - Map resultMap = new HashMap<>(); - ShareGroupDescription exp = new ShareGroupDescription( + ListGroupsResult resultWithAllStates = mock(ListGroupsResult.class); + when(resultWithAllStates.all()).thenReturn(KafkaFuture.completedFuture(Arrays.asList( + new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)), + new GroupListing(secondGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.EMPTY)) + ))); + when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithAllStates); + try (ShareGroupService service = getShareGroupService(cgcArgs, adminClient)) { + Set expectedListing = new HashSet<>(Arrays.asList( + new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)), + new GroupListing(secondGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.EMPTY)))); + + final Set[] foundListing = new Set[]{Set.of()}; + TestUtils.waitForCondition(() -> { + foundListing[0] = new HashSet<>(service.listShareGroupsInStates(new HashSet<>(Arrays.asList(GroupState.values())))); + return Objects.equals(expectedListing, foundListing[0]); + }, "Expected to show groups " + expectedListing + ", but found " + foundListing[0]); + + ListGroupsResult resultWithStableState = mock(ListGroupsResult.class); + when(resultWithStableState.all()).thenReturn(KafkaFuture.completedFuture(List.of( + new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)) + ))); + when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithStableState); + Set expectedListingStable = Set.of( + new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE))); + + foundListing[0] = Set.of(); + + TestUtils.waitForCondition(() -> { + foundListing[0] = new HashSet<>(service.listShareGroupsInStates(Set.of(GroupState.STABLE))); + return Objects.equals(expectedListingStable, foundListing[0]); + }, "Expected to show groups " + expectedListingStable + ", but found " + foundListing[0]); + } + } + + @Test + public void testDescribeOffsetsOfExistingGroup() throws Exception { + String firstGroup = "group1"; + String bootstrapServer = "localhost:9092"; + + for (List describeType : DESCRIBE_TYPE_OFFSETS) { + List cgcArgs = new ArrayList<>(List.of("--bootstrap-server", bootstrapServer, "--describe", "--group", firstGroup)); + cgcArgs.addAll(describeType); + Admin adminClient = mock(KafkaAdminClient.class); + DescribeShareGroupsResult describeShareGroupsResult = mock(DescribeShareGroupsResult.class); + ShareGroupDescription exp = new ShareGroupDescription( firstGroup, List.of(new ShareMemberDescription("memid1", "clId1", "host1", new ShareMemberAssignment( - Set.of(new TopicPartition("topic1", 0)) - ))), + Set.of(new TopicPartition("topic1", 0)) + ), 0)), GroupState.STABLE, - new Node(0, "host1", 9090)); - resultMap.put(firstGroup, exp); + new Node(0, "host1", 9090), 0, 0); + ListOffsetsResult resultOffsets = new ListOffsetsResult( + Map.of( + new TopicPartition("topic1", 0), + KafkaFuture.completedFuture(new ListOffsetsResult.ListOffsetsResultInfo(0, 0, Optional.empty())) + )); - when(result.all()).thenReturn(KafkaFuture.completedFuture(resultMap)); - when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection())).thenReturn(result); - ShareGroupService service = new ShareGroupService(null, adminClient); - assertEquals(exp, service.getDescribeGroup(firstGroup)); - service.close(); + when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(firstGroup, KafkaFuture.completedFuture(exp))); + when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(), any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult); + when(adminClient.listOffsets(ArgumentMatchers.anyMap())).thenReturn(resultOffsets); + try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) { + TestUtils.waitForCondition(() -> { + Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); + String[] lines = res.getKey().trim().split("\n"); + if (lines.length != 2 && !res.getValue().isEmpty()) { + return false; + } + + List expectedValues = List.of(firstGroup, "topic1", "0", "0"); + return checkArgsHeaderOutput(cgcArgs, lines[0]) && + Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues); + }, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + "."); + } + } } @Test - public void testDescribeShareGroupsGetOffsets() throws Exception { - Admin adminClient = mock(KafkaAdminClient.class); + public void testDescribeOffsetsOfAllExistingGroups() throws Exception { + String firstGroup = "group1"; + String secondGroup = "group2"; + String bootstrapServer = "localhost:9092"; - ListOffsetsResult startOffset = mock(ListOffsetsResult.class); - Map startOffsetResultMap = new HashMap<>(); - startOffsetResultMap.put(new TopicPartition("topic1", 0), new ListOffsetsResult.ListOffsetsResultInfo(10, -1, Optional.empty())); + for (List describeType : DESCRIBE_TYPE_OFFSETS) { + List cgcArgs = new ArrayList<>(List.of("--bootstrap-server", bootstrapServer, "--describe", "--all-groups")); + cgcArgs.addAll(describeType); + Admin adminClient = mock(KafkaAdminClient.class); + ListGroupsResult listGroupsResult = mock(ListGroupsResult.class); + GroupListing firstGroupListing = new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)); + GroupListing secondGroupListing = new GroupListing(secondGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)); + DescribeShareGroupsResult describeShareGroupsResult = mock(DescribeShareGroupsResult.class); + ShareGroupDescription exp1 = new ShareGroupDescription( + firstGroup, + List.of(new ShareMemberDescription("memid1", "clId1", "host1", new ShareMemberAssignment( + Set.of(new TopicPartition("topic1", 0)) + ), 0)), + GroupState.STABLE, + new Node(0, "host1", 9090), 0, 0); + ShareGroupDescription exp2 = new ShareGroupDescription( + secondGroup, + List.of(new ShareMemberDescription("memid1", "clId1", "host1", new ShareMemberAssignment( + Set.of(new TopicPartition("topic1", 0)) + ), 0)), + GroupState.STABLE, + new Node(0, "host1", 9090), 0, 0); + ListOffsetsResult resultOffsets = new ListOffsetsResult( + Map.of( + new TopicPartition("topic1", 0), + KafkaFuture.completedFuture(new ListOffsetsResult.ListOffsetsResultInfo(0, 0, Optional.empty())) + )); - ListOffsetsResult endOffset = mock(ListOffsetsResult.class); - Map endOffsetResultMap = new HashMap<>(); - endOffsetResultMap.put(new TopicPartition("topic1", 0), new ListOffsetsResult.ListOffsetsResultInfo(30, -1, Optional.empty())); + when(listGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(List.of(firstGroupListing, secondGroupListing))); + when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult); + when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(firstGroup, KafkaFuture.completedFuture(exp1), secondGroup, KafkaFuture.completedFuture(exp2))); + when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(), any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult); + when(adminClient.listOffsets(ArgumentMatchers.anyMap())).thenReturn(resultOffsets); + try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) { + TestUtils.waitForCondition(() -> { + Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); + String[] lines = res.getKey().trim().split("\n"); + if (lines.length != 2 && !res.getValue().isEmpty()) { + return false; + } - when(startOffset.all()).thenReturn(KafkaFuture.completedFuture(startOffsetResultMap)); - when(endOffset.all()).thenReturn(KafkaFuture.completedFuture(endOffsetResultMap)); + List expectedValues1 = List.of(firstGroup, "topic1", "0", "0"); + List expectedValues2 = List.of(secondGroup, "topic1", "0", "0"); + return checkArgsHeaderOutput(cgcArgs, lines[0]) && checkArgsHeaderOutput(cgcArgs, lines[3]) && + Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues1) && + Arrays.stream(lines[4].trim().split("\\s+")).toList().equals(expectedValues2); + }, "Expected 2 data rows and no error in describe results with describe type " + String.join(" ", describeType) + "."); + } + } + } - when(adminClient.listOffsets(ArgumentMatchers.anyMap())).thenReturn(startOffset, endOffset); + @Test + public void testDescribeStateOfExistingGroup() throws Exception { + String firstGroup = "group1"; + String bootstrapServer = "localhost:9092"; - ShareMemberDescription description = new ShareMemberDescription("", "", "", - new ShareMemberAssignment(Set.of(new TopicPartition("topic1", 0)))); - ShareGroupService service = new ShareGroupService(null, adminClient); - Map lags = service.getOffsets(List.of(description)); - assertEquals(1, lags.size()); - assertEquals(20, lags.get(new TopicPartition("topic1", 0))); - service.close(); + for (List describeType : DESCRIBE_TYPE_STATE) { + List cgcArgs = new ArrayList<>(List.of("--bootstrap-server", bootstrapServer, "--describe", "--group", firstGroup)); + cgcArgs.addAll(describeType); + Admin adminClient = mock(KafkaAdminClient.class); + DescribeShareGroupsResult describeShareGroupsResult = mock(DescribeShareGroupsResult.class); + ShareGroupDescription exp1 = new ShareGroupDescription( + firstGroup, + List.of(new ShareMemberDescription("memid1", "clId1", "host1", new ShareMemberAssignment( + Set.of(new TopicPartition("topic1", 0)) + ), 0)), + GroupState.STABLE, + new Node(0, "host1", 9090), 0, 0); + + when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(firstGroup, KafkaFuture.completedFuture(exp1))); + when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(), any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult); + try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) { + TestUtils.waitForCondition(() -> { + Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); + String[] lines = res.getKey().trim().split("\n"); + if (lines.length != 2 && !res.getValue().isEmpty()) { + return false; + } + + List expectedValues1; + if (describeType.contains("--verbose")) { + expectedValues1 = List.of(firstGroup, "host1:9090", "(0)", "Stable", "0", "0", "1"); + + } else { + expectedValues1 = List.of(firstGroup, "host1:9090", "(0)", "Stable", "1"); + } + return checkArgsHeaderOutput(cgcArgs, lines[0]) && + Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues1); + }, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + "."); + } + } + } + + @Test + public void testDescribeStatesOfAllExistingGroups() throws Exception { + String firstGroup = "group1"; + String secondGroup = "group2"; + String bootstrapServer = "localhost:9092"; + + for (List describeType : DESCRIBE_TYPE_STATE) { + List cgcArgs = new ArrayList<>(List.of("--bootstrap-server", bootstrapServer, "--describe", "--all-groups")); + cgcArgs.addAll(describeType); + Admin adminClient = mock(KafkaAdminClient.class); + ListGroupsResult listGroupsResult = mock(ListGroupsResult.class); + GroupListing firstGroupListing = new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)); + GroupListing secondGroupListing = new GroupListing(secondGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)); + DescribeShareGroupsResult describeShareGroupsResult = mock(DescribeShareGroupsResult.class); + ShareGroupDescription exp1 = new ShareGroupDescription( + firstGroup, + List.of(new ShareMemberDescription("memid1", "clId1", "host1", new ShareMemberAssignment( + Set.of(new TopicPartition("topic1", 0)) + ), 0)), + GroupState.STABLE, + new Node(0, "host1", 9090), 0, 0); + ShareGroupDescription exp2 = new ShareGroupDescription( + secondGroup, + List.of(new ShareMemberDescription("memid1", "clId1", "host1", new ShareMemberAssignment( + Set.of(new TopicPartition("topic1", 0)) + ), 0)), + GroupState.STABLE, + new Node(0, "host1", 9090), 0, 0); + + when(listGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(List.of(firstGroupListing, secondGroupListing))); + when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult); + when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(firstGroup, KafkaFuture.completedFuture(exp1), secondGroup, KafkaFuture.completedFuture(exp2))); + when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(), any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult); + try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) { + TestUtils.waitForCondition(() -> { + Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); + String[] lines = res.getKey().trim().split("\n"); + if (lines.length != 2 && !res.getValue().isEmpty()) { + return false; + } + + List expectedValues1; + List expectedValues2; + if (describeType.contains("--verbose")) { + expectedValues1 = List.of(firstGroup, "host1:9090", "(0)", "Stable", "0", "0", "1"); + expectedValues2 = List.of(secondGroup, "host1:9090", "(0)", "Stable", "0", "0", "1"); + + } else { + expectedValues1 = List.of(firstGroup, "host1:9090", "(0)", "Stable", "1"); + expectedValues2 = List.of(secondGroup, "host1:9090", "(0)", "Stable", "1"); + } + return checkArgsHeaderOutput(cgcArgs, lines[0]) && checkArgsHeaderOutput(cgcArgs, lines[3]) && + Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues1) && + Arrays.stream(lines[4].trim().split("\\s+")).toList().equals(expectedValues2); + }, "Expected 2 data rows and no error in describe results with describe type " + String.join(" ", describeType) + "."); + } + } + } + + @Test + public void testDescribeMembersOfExistingGroup() throws Exception { + String firstGroup = "group1"; + String bootstrapServer = "localhost:9092"; + + for (List describeType : DESCRIBE_TYPE_MEMBERS) { + List cgcArgs = new ArrayList<>(List.of("--bootstrap-server", bootstrapServer, "--describe", "--group", firstGroup)); + cgcArgs.addAll(describeType); + Admin adminClient = mock(KafkaAdminClient.class); + DescribeShareGroupsResult describeShareGroupsResult = mock(DescribeShareGroupsResult.class); + ShareGroupDescription exp1 = new ShareGroupDescription( + firstGroup, + List.of(new ShareMemberDescription("memid1", "clId1", "host1", new ShareMemberAssignment( + Set.of(new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic2", 0)) + ), 0)), + GroupState.STABLE, + new Node(0, "host1", 9090), 0, 0); + + when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(firstGroup, KafkaFuture.completedFuture(exp1))); + when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(), any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult); + try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) { + TestUtils.waitForCondition(() -> { + Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); + String[] lines = res.getKey().trim().split("\n"); + if (lines.length != 2 && !res.getValue().isEmpty()) { + return false; + } + + List expectedValues1; + if (describeType.contains("--verbose")) { + expectedValues1 = List.of(firstGroup, "memid1", "host1", "clId1", "0", "topic1:0,1;topic2:0"); + + } else { + expectedValues1 = List.of(firstGroup, "memid1", "host1", "clId1", "topic1:0,1;topic2:0"); + } + return checkArgsHeaderOutput(cgcArgs, lines[0]) && + Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues1); + }, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + "."); + } + } + } + + @Test + public void testDescribeMembersOfAllExistingGroups() throws Exception { + String firstGroup = "group1"; + String secondGroup = "group2"; + String bootstrapServer = "localhost:9092"; + + for (List describeType : DESCRIBE_TYPE_MEMBERS) { + List cgcArgs = new ArrayList<>(List.of("--bootstrap-server", bootstrapServer, "--describe", "--all-groups")); + cgcArgs.addAll(describeType); + Admin adminClient = mock(KafkaAdminClient.class); + ListGroupsResult listGroupsResult = mock(ListGroupsResult.class); + GroupListing firstGroupListing = new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)); + GroupListing secondGroupListing = new GroupListing(secondGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)); + DescribeShareGroupsResult describeShareGroupsResult = mock(DescribeShareGroupsResult.class); + ShareGroupDescription exp1 = new ShareGroupDescription( + firstGroup, + List.of(new ShareMemberDescription("memid1", "clId1", "host1", new ShareMemberAssignment( + Set.of(new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic2", 0)) + ), 0)), + GroupState.STABLE, + new Node(0, "host1", 9090), 0, 0); + ShareGroupDescription exp2 = new ShareGroupDescription( + secondGroup, + List.of(new ShareMemberDescription("memid1", "clId1", "host1", new ShareMemberAssignment( + Set.of(new TopicPartition("topic1", 0)) + ), 0)), + GroupState.STABLE, + new Node(0, "host1", 9090), 0, 0); + + when(listGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(List.of(firstGroupListing, secondGroupListing))); + when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult); + when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(firstGroup, KafkaFuture.completedFuture(exp1), secondGroup, KafkaFuture.completedFuture(exp2))); + when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(), any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult); + try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) { + TestUtils.waitForCondition(() -> { + Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); + String[] lines = res.getKey().trim().split("\n"); + if (lines.length != 2 && !res.getValue().isEmpty()) { + return false; + } + + List expectedValues1; + List expectedValues2; + if (describeType.contains("--verbose")) { + expectedValues1 = List.of(firstGroup, "memid1", "host1", "clId1", "0", "topic1:0,1;topic2:0"); + expectedValues2 = List.of(secondGroup, "memid1", "host1", "clId1", "0", "topic1:0"); + + } else { + expectedValues1 = List.of(firstGroup, "memid1", "host1", "clId1", "topic1:0,1;topic2:0"); + expectedValues2 = List.of(secondGroup, "memid1", "host1", "clId1", "topic1:0"); + } + return checkArgsHeaderOutput(cgcArgs, lines[0]) && checkArgsHeaderOutput(cgcArgs, lines[3]) && + Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues1) && + Arrays.stream(lines[4].trim().split("\\s+")).toList().equals(expectedValues2); + }, "Expected 2 data rows and no error in describe results with describe type " + String.join(" ", describeType) + "."); + } + } + } + + @Test + public void testDescribeNonexistentGroup() { + String missingGroup = "missing.group"; + String bootstrapServer = "localhost:9092"; + + for (List describeType : DESCRIBE_TYPES) { + // note the group to be queried is a different (non-existing) group + List cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer, "--describe", "--group", missingGroup)); + cgcArgs.addAll(describeType); + Admin adminClient = mock(KafkaAdminClient.class); + DescribeShareGroupsResult describeShareGroupsResult = mock(DescribeShareGroupsResult.class); + KafkaFutureImpl missingGroupFuture = new KafkaFutureImpl<>(); + missingGroupFuture.completeExceptionally(new GroupIdNotFoundException("Group " + missingGroup + " not found.")); + when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(missingGroup, missingGroupFuture)); + when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(), any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult); + try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) { + service.describeGroups(); + fail("Expected error was not detected for describe option '" + String.join(" ", describeType) + "'"); + } catch (ExecutionException ee) { + assertInstanceOf(GroupIdNotFoundException.class, ee.getCause()); + assertEquals("Group " + missingGroup + " not found.", ee.getCause().getMessage()); + } catch (Exception e) { + fail("Expected error was not detected for describe option '" + String.join(" ", describeType) + "'"); + } + } + } + + @Test + public void testDescribeShareGroupsInvalidVerboseOption() { + String bootstrapServer = "localhost:9092"; + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--describe", "--group", "group1", "--verbose"}; + assertThrows(OptionException.class, () -> getShareGroupService(cgcArgs, new MockAdminClient())); + } + + @Test + public void testDescribeShareGroupsOffsetsInvalidVerboseOption() { + String bootstrapServer = "localhost:9092"; + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--describe", "--group", "group1", "--offsets", "--verbose"}; + assertThrows(OptionException.class, () -> getShareGroupService(cgcArgs, new MockAdminClient())); } @Test @@ -145,65 +496,23 @@ public class ShareGroupCommandTest { @Test public void testListWithUnrecognizedOption() { String bootstrapServer = "localhost:9092"; - String[] cgcArgs = new String[]{"--frivolous-nonsense", "--bootstrap-server", bootstrapServer, "--list"}; + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--list", "--verbose"}; assertThrows(OptionException.class, () -> getShareGroupService(cgcArgs, new MockAdminClient())); } - @Test - public void testListShareGroupsWithStates() throws Exception { - String firstGroup = "first-group"; - String secondGroup = "second-group"; - String bootstrapServer = "localhost:9092"; - - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--list", "--state"}; - Admin adminClient = mock(KafkaAdminClient.class); - ListGroupsResult resultWithAllStates = mock(ListGroupsResult.class); - when(resultWithAllStates.all()).thenReturn(KafkaFuture.completedFuture(Arrays.asList( - new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)), - new GroupListing(secondGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.EMPTY)) - ))); - when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithAllStates); - ShareGroupService service = getShareGroupService(cgcArgs, adminClient); - Set expectedListing = new HashSet<>(Arrays.asList( - new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)), - new GroupListing(secondGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.EMPTY)))); - - final Set[] foundListing = new Set[]{Collections.emptySet()}; - TestUtils.waitForCondition(() -> { - foundListing[0] = new HashSet<>(service.listShareGroupsInStates(new HashSet<>(Arrays.asList(GroupState.values())))); - return Objects.equals(expectedListing, foundListing[0]); - }, "Expected to show groups " + expectedListing + ", but found " + foundListing[0]); - - ListGroupsResult resultWithStableState = mock(ListGroupsResult.class); - when(resultWithStableState.all()).thenReturn(KafkaFuture.completedFuture(Collections.singletonList( - new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE)) - ))); - when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithStableState); - Set expectedListingStable = Collections.singleton( - new GroupListing(firstGroup, Optional.of(GroupType.SHARE), "share", Optional.of(GroupState.STABLE))); - - foundListing[0] = Collections.emptySet(); - - TestUtils.waitForCondition(() -> { - foundListing[0] = new HashSet<>(service.listShareGroupsInStates(Collections.singleton(GroupState.STABLE))); - return Objects.equals(expectedListingStable, foundListing[0]); - }, "Expected to show groups " + expectedListingStable + ", but found " + foundListing[0]); - service.close(); - } - @Test public void testGroupStatesFromString() { Set result = ShareGroupCommand.groupStatesFromString("Stable"); - assertEquals(Collections.singleton(GroupState.STABLE), result); + assertEquals(Set.of(GroupState.STABLE), result); result = ShareGroupCommand.groupStatesFromString("stable"); - assertEquals(new HashSet<>(Collections.singletonList(GroupState.STABLE)), result); + assertEquals(Set.of(GroupState.STABLE), result); result = ShareGroupCommand.groupStatesFromString("dead"); - assertEquals(new HashSet<>(Collections.singletonList(GroupState.DEAD)), result); + assertEquals(Set.of(GroupState.DEAD), result); result = ShareGroupCommand.groupStatesFromString("empty"); - assertEquals(new HashSet<>(Collections.singletonList(GroupState.EMPTY)), result); + assertEquals(Set.of(GroupState.EMPTY), result); assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.groupStatesFromString("assigning")); @@ -218,4 +527,44 @@ public class ShareGroupCommandTest { ShareGroupCommandOptions opts = new ShareGroupCommandOptions(args); return new ShareGroupService(opts, adminClient); } + + private Runnable describeGroups(ShareGroupCommand.ShareGroupService service) { + return () -> Assertions.assertDoesNotThrow(service::describeGroups); + } + + private boolean checkArgsHeaderOutput(List args, String output) { + if (!output.contains("GROUP")) { + return false; + } + + if (args.contains("--members")) { + return checkMembersArgsHeaderOutput(output, args.contains("--verbose")); + } + + if (args.contains("--state")) { + return checkStateArgsHeaderOutput(output, args.contains("--verbose")); + } + + // --offsets or no arguments + return checkOffsetsArgsHeaderOutput(output); + } + + private boolean checkOffsetsArgsHeaderOutput(String output) { + List expectedKeys = List.of("GROUP", "TOPIC", "PARTITION", "START-OFFSET"); + return Arrays.stream(output.trim().split("\\s+")).toList().equals(expectedKeys); + } + + private boolean checkMembersArgsHeaderOutput(String output, boolean verbose) { + List expectedKeys = verbose ? + List.of("GROUP", "CONSUMER-ID", "HOST", "CLIENT-ID", "MEMBER-EPOCH", "ASSIGNMENT") : + List.of("GROUP", "CONSUMER-ID", "HOST", "CLIENT-ID", "ASSIGNMENT"); + return Arrays.stream(output.trim().split("\\s+")).toList().equals(expectedKeys); + } + + private boolean checkStateArgsHeaderOutput(String output, boolean verbose) { + List expectedKeys = verbose ? + List.of("GROUP", "COORDINATOR", "(ID)", "STATE", "GROUP-EPOCH", "ASSIGNMENT-EPOCH", "#MEMBERS") : + List.of("GROUP", "COORDINATOR", "(ID)", "STATE", "#MEMBERS"); + return Arrays.stream(output.trim().split("\\s+")).toList().equals(expectedKeys); + } }