diff --git a/bin/kafka-share-groups.sh b/bin/kafka-share-groups.sh new file mode 100755 index 00000000000..1712874b057 --- /dev/null +++ b/bin/kafka-share-groups.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.consumer.group.ShareGroupCommand "$@" diff --git a/bin/windows/kafka-share-groups.bat b/bin/windows/kafka-share-groups.bat new file mode 100644 index 00000000000..117b1ecd30d --- /dev/null +++ b/bin/windows/kafka-share-groups.bat @@ -0,0 +1,17 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +"%~dp0kafka-run-class.bat" org.apache.kafka.tools.consumer.group.ShareGroupCommand %* diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java new file mode 100644 index 00000000000..c6bdfcfbfc4 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer.group; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.AbstractOptions; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.DescribeShareGroupsResult; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.ListShareGroupsOptions; +import org.apache.kafka.clients.admin.ListShareGroupsResult; +import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.admin.ShareGroupDescription; +import org.apache.kafka.clients.admin.ShareGroupListing; +import org.apache.kafka.common.ShareGroupState; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandLineUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import joptsimple.OptionException; + +public class ShareGroupCommand { + + public static void main(String[] args) { + ShareGroupCommandOptions opts = new ShareGroupCommandOptions(args); + try { + opts.checkArgs(); + CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to list all share groups, describe a share group, delete share group info, or reset share group offsets."); + + // should have exactly one action + long actions = Stream.of(opts.listOpt, opts.describeOpt, opts.deleteOpt, opts.resetOffsetsOpt, opts.deleteOffsetsOpt).filter(opts.options::has).count(); + if (actions != 1) + CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --list, --describe, --delete, --reset-offsets, --delete-offsets."); + + run(opts); + } catch (OptionException e) { + CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage()); + } + } + + public static void run(ShareGroupCommandOptions opts) { + try (ShareGroupService shareGroupService = new ShareGroupService(opts, Collections.emptyMap())) { + if (opts.options.has(opts.listOpt)) { + shareGroupService.listGroups(); + } else if (opts.options.has(opts.describeOpt)) { + shareGroupService.describeGroups(); + } else if (opts.options.has(opts.deleteOpt)) { + throw new UnsupportedOperationException("--delete option is not yet implemented"); + } else if (opts.options.has(opts.resetOffsetsOpt)) { + throw new UnsupportedOperationException("--reset-offsets option is not yet implemented"); + } else if (opts.options.has(opts.deleteOffsetsOpt)) { + throw new UnsupportedOperationException("--delete-offsets option is not yet implemented"); + } + } catch (IllegalArgumentException e) { + CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage()); + } catch (Throwable e) { + printError("Executing share group command failed due to " + e.getMessage(), Optional.of(e)); + } + } + + static Set shareGroupStatesFromString(String input) { + Set parsedStates = + Arrays.stream(input.split(",")).map(s -> ShareGroupState.parse(s.trim())).collect(Collectors.toSet()); + if (parsedStates.contains(ShareGroupState.UNKNOWN)) { + Collection validStates = + Arrays.stream(ShareGroupState.values()).filter(s -> s != ShareGroupState.UNKNOWN).collect(Collectors.toList()); + throw new IllegalArgumentException("Invalid state list '" + input + "'. Valid states are: " + + validStates.stream().map(Object::toString).collect(Collectors.joining(", "))); + } + return parsedStates; + } + + public static void printError(String msg, Optional e) { + System.out.println("\nError: " + msg); + e.ifPresent(Throwable::printStackTrace); + } + + // Visibility for testing + static class ShareGroupService implements AutoCloseable { + final ShareGroupCommandOptions opts; + private final Admin adminClient; + + public ShareGroupService(ShareGroupCommandOptions opts, Map configOverrides) { + this.opts = opts; + try { + this.adminClient = createAdminClient(configOverrides); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public ShareGroupService(ShareGroupCommandOptions opts, Admin adminClient) { + this.opts = opts; + this.adminClient = adminClient; + } + + public void listGroups() throws ExecutionException, InterruptedException { + if (opts.options.has(opts.stateOpt)) { + String stateValue = opts.options.valueOf(opts.stateOpt); + Set states = (stateValue == null || stateValue.isEmpty()) + ? Collections.emptySet() + : shareGroupStatesFromString(stateValue); + List listings = listShareGroupsWithState(states); + + printGroupInfo(listings); + } else + listShareGroups().forEach(System.out::println); + } + + List listShareGroups() { + try { + ListShareGroupsResult result = adminClient.listShareGroups(withTimeoutMs(new ListShareGroupsOptions())); + Collection listings = result.all().get(); + return listings.stream().map(ShareGroupListing::groupId).collect(Collectors.toList()); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + List listShareGroupsWithState(Set states) throws ExecutionException, InterruptedException { + ListShareGroupsOptions listShareGroupsOptions = withTimeoutMs(new ListShareGroupsOptions()); + listShareGroupsOptions.inStates(states); + ListShareGroupsResult result = adminClient.listShareGroups(listShareGroupsOptions); + return new ArrayList<>(result.all().get()); + } + + private void printGroupInfo(List groups) { + // find proper columns width + int maxGroupLen = 15; + for (ShareGroupListing group : groups) { + maxGroupLen = Math.max(maxGroupLen, group.groupId().length()); + } + System.out.printf("%" + (-maxGroupLen) + "s %s\n", "GROUP", "STATE"); + for (ShareGroupListing group : groups) { + String groupId = group.groupId(); + String state = group.state().orElse(ShareGroupState.UNKNOWN).toString(); + System.out.printf("%" + (-maxGroupLen) + "s %s\n", groupId, state); + } + } + + private void describeGroups() throws ExecutionException, InterruptedException { + String group = opts.options.valueOf(opts.groupOpt); + ShareGroupDescription description = getDescribeGroup(group); + if (description == null) + return; + if (opts.options.has(opts.membersOpt)) { + printMembers(description); + } else if (opts.options.has(opts.stateOpt)) { + printStates(description); + } else { + printOffsets(description); + } + } + + ShareGroupDescription getDescribeGroup(String group) throws ExecutionException, InterruptedException { + DescribeShareGroupsResult result = adminClient.describeShareGroups(Collections.singletonList(group)); + Map descriptionMap = result.all().get(); + if (descriptionMap.containsKey(group)) { + return descriptionMap.get(group); + } + return null; + } + + Map getOffsets(Collection members) throws ExecutionException, InterruptedException { + Set allTp = new HashSet<>(); + for (MemberDescription memberDescription : members) { + allTp.addAll(memberDescription.assignment().topicPartitions()); + } + // fetch latest and earliest offsets + Map earliest = new HashMap<>(); + Map latest = new HashMap<>(); + + for (TopicPartition tp : allTp) { + earliest.put(tp, OffsetSpec.earliest()); + latest.put(tp, OffsetSpec.latest()); + } + // This call to obtain the earliest offsets will be replaced once adminClient.listShareGroupOffsets is implemented + Map earliestResult = adminClient.listOffsets(earliest).all().get(); + Map latestResult = adminClient.listOffsets(latest).all().get(); + + Map lag = new HashMap<>(); + for (Map.Entry tp : earliestResult.entrySet()) { + lag.put(tp.getKey(), latestResult.get(tp.getKey()).offset() - earliestResult.get(tp.getKey()).offset()); + } + return lag; + } + + private void printOffsets(ShareGroupDescription description) throws ExecutionException, InterruptedException { + Map offsets = getOffsets(description.members()); + if (offsets != null && !offsets.isEmpty()) { + String fmt = printOffsetFormat(description, offsets); + System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "OFFSET"); + + for (Map.Entry offset : offsets.entrySet()) { + System.out.printf(fmt, description.groupId(), offset.getKey().topic(), offset.getKey().partition(), offset.getValue()); + } + } + } + + private static String printOffsetFormat(ShareGroupDescription description, Map offsets) { + int groupLen = Math.max(15, description.groupId().length()); + int maxTopicLen = 15; + for (TopicPartition topicPartition : offsets.keySet()) { + maxTopicLen = Math.max(maxTopicLen, topicPartition.topic().length()); + } + return "%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s %s\n"; + } + + private void printStates(ShareGroupDescription description) { + int groupLen = Math.max(15, description.groupId().length()); + String coordinator = description.coordinator().host() + ":" + description.coordinator().port() + " (" + description.coordinator().idString() + ")"; + int coordinatorLen = Math.max(25, coordinator.length()); + + String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s %-15s %s\n"; + System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE", "#MEMBERS"); + System.out.printf(fmt, description.groupId(), coordinator, description.state().toString(), description.members().size()); + } + + private void printMembers(ShareGroupDescription description) { + int groupLen = Math.max(15, description.groupId().length()); + int maxConsumerIdLen = 15, maxHostLen = 15, maxClientIdLen = 15; + Collection members = description.members(); + for (MemberDescription member : members) { + maxConsumerIdLen = Math.max(maxConsumerIdLen, member.consumerId().length()); + maxHostLen = Math.max(maxHostLen, member.host().length()); + maxClientIdLen = Math.max(maxClientIdLen, member.clientId().length()); + } + + String fmt = "%" + -groupLen + "s %" + -maxConsumerIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %s\n"; + System.out.printf(fmt, "GROUP", "CONSUMER-ID", "HOST", "CLIENT-ID", "ASSIGNMENT"); + for (MemberDescription member : members) { + System.out.printf(fmt, description.groupId(), member.consumerId(), member.host(), member.clientId(), + member.assignment().topicPartitions().stream().map(part -> part.topic() + ":" + part.partition()).collect(Collectors.joining(","))); + } + } + + public void close() { + adminClient.close(); + } + + protected Admin createAdminClient(Map configOverrides) throws IOException { + Properties props = opts.options.has(opts.commandConfigOpt) ? Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) : new Properties(); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt)); + props.putAll(configOverrides); + return Admin.create(props); + } + + private > T withTimeoutMs(T options) { + int t = opts.options.valueOf(opts.timeoutMsOpt).intValue(); + return options.timeoutMs(t); + } + } +} diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java new file mode 100644 index 00000000000..c30480fccb9 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer.group; + +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import joptsimple.OptionSpec; + +import static org.apache.kafka.tools.ToolsUtils.minus; + +public class ShareGroupCommandOptions extends CommandDefaultOptions { + public static final Logger LOGGER = LoggerFactory.getLogger(ShareGroupCommandOptions.class); + + public static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) to connect to."; + public static final String GROUP_DOC = "The share group we wish to act on."; + public static final String TOPIC_DOC = "The topic whose share group information should be deleted or topic whose should be included in the reset offset process. " + + "When resetting offsets, partitions can be specified using this format: 'topic1:0,1,2', where 0,1,2 are the partitions to be included."; + public static final String ALL_TOPICS_DOC = "Consider all topics assigned to a share group in the 'reset-offsets' process."; + public static final String LIST_DOC = "List all share groups."; + public static final String DESCRIBE_DOC = "Describe share group, members and offset information."; + public static final String NL = System.lineSeparator(); + public static final String DELETE_DOC = "Delete share group."; + public static final String TIMEOUT_MS_DOC = "The timeout that can be set for some use cases. For example, it can be used when describing the group " + + "to specify the maximum amount of time in milliseconds to wait before the group stabilizes."; + public static final String COMMAND_CONFIG_DOC = "Property file containing configs to be passed to Admin Client."; + public static final String RESET_OFFSETS_DOC = "Reset offsets of share group. Supports one share group at the time, and instances must be inactive." + NL + + "Has 2 execution options: --dry-run (the default) to plan which offsets to reset, and --execute to reset the offsets. " + NL + + "You must choose one of the following reset specifications: --to-datetime, --to-earliest, --to-latest." + NL + + "To define the scope use --all-topics or --topic."; + public static final String DRY_RUN_DOC = "Only show results without executing changes on share groups. Supported operations: reset-offsets."; + public static final String EXECUTE_DOC = "Execute operation. Supported operations: reset-offsets."; + public static final String RESET_TO_DATETIME_DOC = "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'"; + public static final String RESET_TO_EARLIEST_DOC = "Reset offsets to earliest offset."; + public static final String RESET_TO_LATEST_DOC = "Reset offsets to latest offset."; + public static final String MEMBERS_DOC = "Describe members of the group. This option may be used with the '--describe' option only."; + public static final String OFFSETS_DOC = "Describe the group and list all topic partitions in the group along with their offset information. " + + "This is the default sub-action and may be used with the '--describe' option only."; + public static final String STATE_DOC = "When specified with '--describe', includes the state of the group." + NL + + "When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states. " + + "Valid values are Empty, Stable and Dead."; + public static final String DELETE_OFFSETS_DOC = "Delete offsets of share group. Supports one share group at the time, and multiple topics."; + + public final OptionSpec bootstrapServerOpt; + public final OptionSpec groupOpt; + public final OptionSpec topicOpt; + public final OptionSpec allTopicsOpt; + public final OptionSpec listOpt; + public final OptionSpec describeOpt; + public final OptionSpec deleteOpt; + public final OptionSpec timeoutMsOpt; + public final OptionSpec commandConfigOpt; + public final OptionSpec resetOffsetsOpt; + public final OptionSpec deleteOffsetsOpt; + public final OptionSpec dryRunOpt; + public final OptionSpec executeOpt; + public final OptionSpec resetToDatetimeOpt; + public final OptionSpec resetToEarliestOpt; + public final OptionSpec resetToLatestOpt; + public final OptionSpec membersOpt; + public final OptionSpec offsetsOpt; + public final OptionSpec stateOpt; + + public final Set> allShareGroupLevelOpts; + public final Set> allResetOffsetScenarioOpts; + public final Set> allDeleteOffsetsOpts; + + public ShareGroupCommandOptions(String[] args) { + super(args); + + bootstrapServerOpt = parser.accepts("bootstrap-server", BOOTSTRAP_SERVER_DOC) + .withRequiredArg() + .describedAs("server to connect to") + .ofType(String.class); + groupOpt = parser.accepts("group", GROUP_DOC) + .withRequiredArg() + .describedAs("share group") + .ofType(String.class); + topicOpt = parser.accepts("topic", TOPIC_DOC) + .withRequiredArg() + .describedAs("topic") + .ofType(String.class); + allTopicsOpt = parser.accepts("all-topics", ALL_TOPICS_DOC); + listOpt = parser.accepts("list", LIST_DOC); + describeOpt = parser.accepts("describe", DESCRIBE_DOC); + deleteOpt = parser.accepts("delete", DELETE_DOC); + timeoutMsOpt = parser.accepts("timeout", TIMEOUT_MS_DOC) + .withRequiredArg() + .describedAs("timeout (ms)") + .ofType(Long.class) + .defaultsTo(5000L); + commandConfigOpt = parser.accepts("command-config", COMMAND_CONFIG_DOC) + .withRequiredArg() + .describedAs("command config property file") + .ofType(String.class); + resetOffsetsOpt = parser.accepts("reset-offsets", RESET_OFFSETS_DOC); + deleteOffsetsOpt = parser.accepts("delete-offsets", DELETE_OFFSETS_DOC); + dryRunOpt = parser.accepts("dry-run", DRY_RUN_DOC); + executeOpt = parser.accepts("execute", EXECUTE_DOC); + resetToDatetimeOpt = parser.accepts("to-datetime", RESET_TO_DATETIME_DOC) + .withRequiredArg() + .describedAs("datetime") + .ofType(String.class); + resetToEarliestOpt = parser.accepts("to-earliest", RESET_TO_EARLIEST_DOC); + resetToLatestOpt = parser.accepts("to-latest", RESET_TO_LATEST_DOC); + membersOpt = parser.accepts("members", MEMBERS_DOC) + .availableIf(describeOpt); + offsetsOpt = parser.accepts("offsets", OFFSETS_DOC) + .availableIf(describeOpt); + stateOpt = parser.accepts("state", STATE_DOC) + .availableIf(describeOpt, listOpt) + .withOptionalArg() + .ofType(String.class); + + allShareGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt, describeOpt, deleteOpt, resetOffsetsOpt)); + allResetOffsetScenarioOpts = new HashSet<>(Arrays.asList(resetToDatetimeOpt, resetToEarliestOpt, resetToLatestOpt)); + allDeleteOffsetsOpts = new HashSet<>(Arrays.asList(groupOpt, topicOpt)); + + options = parser.parse(args); + } + + @SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"}) + public void checkArgs() { + CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to list, describe, reset and delete share groups."); + + CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt); + + if (options.has(describeOpt)) { + if (!options.has(groupOpt)) + CommandLineUtils.printUsageAndExit(parser, + "Option " + describeOpt + " takes the option: " + groupOpt); + List> mutuallyExclusiveOpts = Arrays.asList(membersOpt, offsetsOpt, stateOpt); + if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ? 1 : 0).sum() > 1) { + CommandLineUtils.printUsageAndExit(parser, + "Option " + describeOpt + " takes at most one of these options: " + mutuallyExclusiveOpts.stream().map(Object::toString).collect(Collectors.joining(", "))); + } + if (options.has(stateOpt) && options.valueOf(stateOpt) != null) + CommandLineUtils.printUsageAndExit(parser, + "Option " + describeOpt + " does not take a value for " + stateOpt); + } else { + if (options.has(timeoutMsOpt)) + LOGGER.debug("Option " + timeoutMsOpt + " is applicable only when " + describeOpt + " is used."); + } + + if (options.has(deleteOpt)) { + if (!options.has(groupOpt)) + CommandLineUtils.printUsageAndExit(parser, + "Option " + deleteOpt + " takes the option: " + groupOpt); + if (options.has(topicOpt)) + CommandLineUtils.printUsageAndExit(parser, "The consumer does not support topic-specific offset " + + "deletion from a share group."); + } + + if (options.has(deleteOffsetsOpt)) { + if (!options.has(groupOpt) || !options.has(topicOpt)) + CommandLineUtils.printUsageAndExit(parser, + "Option " + deleteOffsetsOpt + " takes the following options: " + allDeleteOffsetsOpts.stream().map(Object::toString).collect(Collectors.joining(", "))); + } + + if (options.has(resetOffsetsOpt)) { + if (options.has(dryRunOpt) && options.has(executeOpt)) + CommandLineUtils.printUsageAndExit(parser, "Option " + resetOffsetsOpt + " only accepts one of " + executeOpt + " and " + dryRunOpt); + + if (!options.has(dryRunOpt) && !options.has(executeOpt)) { + CommandLineUtils.printUsageAndExit(parser, "Option " + resetOffsetsOpt + " takes the option: " + executeOpt + " or " + dryRunOpt); + } + + if (!options.has(groupOpt)) + CommandLineUtils.printUsageAndExit(parser, + "Option " + resetOffsetsOpt + " takes the option: " + groupOpt); + + CommandLineUtils.checkInvalidArgs(parser, options, resetToDatetimeOpt, minus(allResetOffsetScenarioOpts, resetToDatetimeOpt)); + CommandLineUtils.checkInvalidArgs(parser, options, resetToEarliestOpt, minus(allResetOffsetScenarioOpts, resetToEarliestOpt)); + CommandLineUtils.checkInvalidArgs(parser, options, resetToLatestOpt, minus(allResetOffsetScenarioOpts, resetToLatestOpt)); + } + + CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, minus(allShareGroupLevelOpts, describeOpt, deleteOpt, resetOffsetsOpt)); + CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, minus(allShareGroupLevelOpts, deleteOpt, resetOffsetsOpt)); + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java new file mode 100644 index 00000000000..86345cc75b4 --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer.group; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.DescribeShareGroupsResult; +import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.ListShareGroupsOptions; +import org.apache.kafka.clients.admin.ListShareGroupsResult; +import org.apache.kafka.clients.admin.MemberAssignment; +import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.clients.admin.MockAdminClient; +import org.apache.kafka.clients.admin.ShareGroupDescription; +import org.apache.kafka.clients.admin.ShareGroupListing; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.ShareGroupState; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.test.TestUtils; +import org.apache.kafka.tools.consumer.group.ShareGroupCommand.ShareGroupService; + +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentMatchers; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import joptsimple.OptionException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ShareGroupCommandTest { + + @Test + public void testListShareGroups() throws Exception { + String firstGroup = "first-group"; + String secondGroup = "second-group"; + String bootstrapServer = "localhost:9092"; + + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--list"}; + Admin adminClient = mock(KafkaAdminClient.class); + ListShareGroupsResult result = mock(ListShareGroupsResult.class); + when(result.all()).thenReturn(KafkaFuture.completedFuture(Arrays.asList( + new ShareGroupListing(firstGroup, Optional.of(ShareGroupState.STABLE)), + new ShareGroupListing(secondGroup, Optional.of(ShareGroupState.EMPTY)) + ))); + when(adminClient.listShareGroups(any(ListShareGroupsOptions.class))).thenReturn(result); + ShareGroupService service = getShareGroupService(cgcArgs, adminClient); + Set expectedGroups = new HashSet<>(Arrays.asList(firstGroup, secondGroup)); + + final Set[] foundGroups = new Set[]{Collections.emptySet()}; + TestUtils.waitForCondition(() -> { + foundGroups[0] = new HashSet<>(service.listShareGroups()); + return Objects.equals(expectedGroups, foundGroups[0]); + }, "Expected --list to show groups " + expectedGroups + ", but found " + foundGroups[0] + "."); + service.close(); + } + + @Test + public void testDescribeShareGroups() throws Exception { + String firstGroup = "group1"; + Admin adminClient = mock(KafkaAdminClient.class); + DescribeShareGroupsResult result = mock(DescribeShareGroupsResult.class); + Map resultMap = new HashMap<>(); + ShareGroupDescription exp = new ShareGroupDescription( + firstGroup, + Collections.singletonList(new MemberDescription("memid1", "clId1", "host1", new MemberAssignment( + Collections.singleton(new TopicPartition("topic1", 0)) + ))), + ShareGroupState.STABLE, + new Node(0, "host1", 9090)); + resultMap.put(firstGroup, exp); + + when(result.all()).thenReturn(KafkaFuture.completedFuture(resultMap)); + when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection())).thenReturn(result); + ShareGroupService service = new ShareGroupService(null, adminClient); + assertEquals(exp, service.getDescribeGroup(firstGroup)); + service.close(); + } + + @Test + public void testDescribeShareGroupsGetOffsets() throws Exception { + Admin adminClient = mock(KafkaAdminClient.class); + + ListOffsetsResult startOffset = mock(ListOffsetsResult.class); + Map startOffsetResultMap = new HashMap<>(); + startOffsetResultMap.put(new TopicPartition("topic1", 0), new ListOffsetsResult.ListOffsetsResultInfo(10, -1, Optional.empty())); + + ListOffsetsResult endOffset = mock(ListOffsetsResult.class); + Map endOffsetResultMap = new HashMap<>(); + endOffsetResultMap.put(new TopicPartition("topic1", 0), new ListOffsetsResult.ListOffsetsResultInfo(30, -1, Optional.empty())); + + when(startOffset.all()).thenReturn(KafkaFuture.completedFuture(startOffsetResultMap)); + when(endOffset.all()).thenReturn(KafkaFuture.completedFuture(endOffsetResultMap)); + + when(adminClient.listOffsets(ArgumentMatchers.anyMap())).thenReturn(startOffset, endOffset); + + MemberDescription description = new MemberDescription("", "", "", + new MemberAssignment(Collections.singleton(new TopicPartition("topic1", 0)))); + ShareGroupService service = new ShareGroupService(null, adminClient); + Map lags = service.getOffsets(Collections.singletonList(description)); + assertEquals(1, lags.size()); + assertEquals(20, lags.get(new TopicPartition("topic1", 0))); + service.close(); + } + + @Test + public void testListWithUnrecognizedOption() { + String bootstrapServer = "localhost:9092"; + String[] cgcArgs = new String[]{"--frivolous-nonsense", "--bootstrap-server", bootstrapServer, "--list"}; + assertThrows(OptionException.class, () -> getShareGroupService(cgcArgs, new MockAdminClient())); + } + + @Test + public void testListShareGroupsWithStates() throws Exception { + String firstGroup = "first-group"; + String secondGroup = "second-group"; + String bootstrapServer = "localhost:9092"; + + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--list", "--state"}; + Admin adminClient = mock(KafkaAdminClient.class); + ListShareGroupsResult resultWithAllStates = mock(ListShareGroupsResult.class); + when(resultWithAllStates.all()).thenReturn(KafkaFuture.completedFuture(Arrays.asList( + new ShareGroupListing(firstGroup, Optional.of(ShareGroupState.STABLE)), + new ShareGroupListing(secondGroup, Optional.of(ShareGroupState.EMPTY)) + ))); + when(adminClient.listShareGroups(any(ListShareGroupsOptions.class))).thenReturn(resultWithAllStates); + ShareGroupService service = getShareGroupService(cgcArgs, adminClient); + Set expectedListing = new HashSet<>(Arrays.asList( + new ShareGroupListing(firstGroup, Optional.of(ShareGroupState.STABLE)), + new ShareGroupListing(secondGroup, Optional.of(ShareGroupState.EMPTY)))); + + final Set[] foundListing = new Set[]{Collections.emptySet()}; + TestUtils.waitForCondition(() -> { + foundListing[0] = new HashSet<>(service.listShareGroupsWithState(new HashSet<>(Arrays.asList(ShareGroupState.values())))); + return Objects.equals(expectedListing, foundListing[0]); + }, "Expected to show groups " + expectedListing + ", but found " + foundListing[0]); + + ListShareGroupsResult resultWithStableState = mock(ListShareGroupsResult.class); + when(resultWithStableState.all()).thenReturn(KafkaFuture.completedFuture(Collections.singletonList( + new ShareGroupListing(firstGroup, Optional.of(ShareGroupState.STABLE)) + ))); + when(adminClient.listShareGroups(any(ListShareGroupsOptions.class))).thenReturn(resultWithStableState); + Set expectedListingStable = Collections.singleton( + new ShareGroupListing(firstGroup, Optional.of(ShareGroupState.STABLE))); + + foundListing[0] = Collections.emptySet(); + + TestUtils.waitForCondition(() -> { + foundListing[0] = new HashSet<>(service.listShareGroupsWithState(Collections.singleton(ShareGroupState.STABLE))); + return Objects.equals(expectedListingStable, foundListing[0]); + }, "Expected to show groups " + expectedListingStable + ", but found " + foundListing[0]); + service.close(); + } + + @Test + public void testShareGroupStatesFromString() { + Set result = ShareGroupCommand.shareGroupStatesFromString("Stable"); + assertEquals(Collections.singleton(ShareGroupState.STABLE), result); + + result = ShareGroupCommand.shareGroupStatesFromString("stable"); + assertEquals(new HashSet<>(Collections.singletonList(ShareGroupState.STABLE)), result); + + result = ShareGroupCommand.shareGroupStatesFromString("dead"); + assertEquals(new HashSet<>(Collections.singletonList(ShareGroupState.DEAD)), result); + + result = ShareGroupCommand.shareGroupStatesFromString("empty"); + assertEquals(new HashSet<>(Collections.singletonList(ShareGroupState.EMPTY)), result); + + assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.shareGroupStatesFromString("bad, wrong")); + + assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.shareGroupStatesFromString(" bad, Stable")); + + assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.shareGroupStatesFromString(" , ,")); + } + + ShareGroupService getShareGroupService(String[] args, Admin adminClient) { + ShareGroupCommandOptions opts = new ShareGroupCommandOptions(args); + return new ShareGroupService(opts, adminClient); + } +}