KAFKA-16714: kafka-share-groups.sh list and describe (#16835)

Introduces kafka-share-groups.sh for listing and describing share groups. The PR also contains the remaining options in the command parser and usage message in preparation of their implementation.

Reviewers:  Manikumar Reddy <manikumar.reddy@gmail.com>, Apoorv Mittal <apoorvmittal10@gmail.com>
This commit is contained in:
Andrew Schofield 2024-08-14 11:52:26 +01:00 committed by GitHub
parent 75bcb9eb42
commit d64f4b9cd0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 727 additions and 0 deletions

17
bin/kafka-share-groups.sh Executable file
View File

@ -0,0 +1,17 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.consumer.group.ShareGroupCommand "$@"

View File

@ -0,0 +1,17 @@
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
"%~dp0kafka-run-class.bat" org.apache.kafka.tools.consumer.group.ShareGroupCommand %*

View File

@ -0,0 +1,284 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools.consumer.group;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AbstractOptions;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DescribeShareGroupsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.ListShareGroupsOptions;
import org.apache.kafka.clients.admin.ListShareGroupsResult;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.ShareGroupDescription;
import org.apache.kafka.clients.admin.ShareGroupListing;
import org.apache.kafka.common.ShareGroupState;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandLineUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import joptsimple.OptionException;
public class ShareGroupCommand {
public static void main(String[] args) {
ShareGroupCommandOptions opts = new ShareGroupCommandOptions(args);
try {
opts.checkArgs();
CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to list all share groups, describe a share group, delete share group info, or reset share group offsets.");
// should have exactly one action
long actions = Stream.of(opts.listOpt, opts.describeOpt, opts.deleteOpt, opts.resetOffsetsOpt, opts.deleteOffsetsOpt).filter(opts.options::has).count();
if (actions != 1)
CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --list, --describe, --delete, --reset-offsets, --delete-offsets.");
run(opts);
} catch (OptionException e) {
CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage());
}
}
public static void run(ShareGroupCommandOptions opts) {
try (ShareGroupService shareGroupService = new ShareGroupService(opts, Collections.emptyMap())) {
if (opts.options.has(opts.listOpt)) {
shareGroupService.listGroups();
} else if (opts.options.has(opts.describeOpt)) {
shareGroupService.describeGroups();
} else if (opts.options.has(opts.deleteOpt)) {
throw new UnsupportedOperationException("--delete option is not yet implemented");
} else if (opts.options.has(opts.resetOffsetsOpt)) {
throw new UnsupportedOperationException("--reset-offsets option is not yet implemented");
} else if (opts.options.has(opts.deleteOffsetsOpt)) {
throw new UnsupportedOperationException("--delete-offsets option is not yet implemented");
}
} catch (IllegalArgumentException e) {
CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage());
} catch (Throwable e) {
printError("Executing share group command failed due to " + e.getMessage(), Optional.of(e));
}
}
static Set<ShareGroupState> shareGroupStatesFromString(String input) {
Set<ShareGroupState> parsedStates =
Arrays.stream(input.split(",")).map(s -> ShareGroupState.parse(s.trim())).collect(Collectors.toSet());
if (parsedStates.contains(ShareGroupState.UNKNOWN)) {
Collection<ShareGroupState> validStates =
Arrays.stream(ShareGroupState.values()).filter(s -> s != ShareGroupState.UNKNOWN).collect(Collectors.toList());
throw new IllegalArgumentException("Invalid state list '" + input + "'. Valid states are: " +
validStates.stream().map(Object::toString).collect(Collectors.joining(", ")));
}
return parsedStates;
}
public static void printError(String msg, Optional<Throwable> e) {
System.out.println("\nError: " + msg);
e.ifPresent(Throwable::printStackTrace);
}
// Visibility for testing
static class ShareGroupService implements AutoCloseable {
final ShareGroupCommandOptions opts;
private final Admin adminClient;
public ShareGroupService(ShareGroupCommandOptions opts, Map<String, String> configOverrides) {
this.opts = opts;
try {
this.adminClient = createAdminClient(configOverrides);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public ShareGroupService(ShareGroupCommandOptions opts, Admin adminClient) {
this.opts = opts;
this.adminClient = adminClient;
}
public void listGroups() throws ExecutionException, InterruptedException {
if (opts.options.has(opts.stateOpt)) {
String stateValue = opts.options.valueOf(opts.stateOpt);
Set<ShareGroupState> states = (stateValue == null || stateValue.isEmpty())
? Collections.emptySet()
: shareGroupStatesFromString(stateValue);
List<ShareGroupListing> listings = listShareGroupsWithState(states);
printGroupInfo(listings);
} else
listShareGroups().forEach(System.out::println);
}
List<String> listShareGroups() {
try {
ListShareGroupsResult result = adminClient.listShareGroups(withTimeoutMs(new ListShareGroupsOptions()));
Collection<ShareGroupListing> listings = result.all().get();
return listings.stream().map(ShareGroupListing::groupId).collect(Collectors.toList());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
List<ShareGroupListing> listShareGroupsWithState(Set<ShareGroupState> states) throws ExecutionException, InterruptedException {
ListShareGroupsOptions listShareGroupsOptions = withTimeoutMs(new ListShareGroupsOptions());
listShareGroupsOptions.inStates(states);
ListShareGroupsResult result = adminClient.listShareGroups(listShareGroupsOptions);
return new ArrayList<>(result.all().get());
}
private void printGroupInfo(List<ShareGroupListing> groups) {
// find proper columns width
int maxGroupLen = 15;
for (ShareGroupListing group : groups) {
maxGroupLen = Math.max(maxGroupLen, group.groupId().length());
}
System.out.printf("%" + (-maxGroupLen) + "s %s\n", "GROUP", "STATE");
for (ShareGroupListing group : groups) {
String groupId = group.groupId();
String state = group.state().orElse(ShareGroupState.UNKNOWN).toString();
System.out.printf("%" + (-maxGroupLen) + "s %s\n", groupId, state);
}
}
private void describeGroups() throws ExecutionException, InterruptedException {
String group = opts.options.valueOf(opts.groupOpt);
ShareGroupDescription description = getDescribeGroup(group);
if (description == null)
return;
if (opts.options.has(opts.membersOpt)) {
printMembers(description);
} else if (opts.options.has(opts.stateOpt)) {
printStates(description);
} else {
printOffsets(description);
}
}
ShareGroupDescription getDescribeGroup(String group) throws ExecutionException, InterruptedException {
DescribeShareGroupsResult result = adminClient.describeShareGroups(Collections.singletonList(group));
Map<String, ShareGroupDescription> descriptionMap = result.all().get();
if (descriptionMap.containsKey(group)) {
return descriptionMap.get(group);
}
return null;
}
Map<TopicPartition, Long> getOffsets(Collection<MemberDescription> members) throws ExecutionException, InterruptedException {
Set<TopicPartition> allTp = new HashSet<>();
for (MemberDescription memberDescription : members) {
allTp.addAll(memberDescription.assignment().topicPartitions());
}
// fetch latest and earliest offsets
Map<TopicPartition, OffsetSpec> earliest = new HashMap<>();
Map<TopicPartition, OffsetSpec> latest = new HashMap<>();
for (TopicPartition tp : allTp) {
earliest.put(tp, OffsetSpec.earliest());
latest.put(tp, OffsetSpec.latest());
}
// This call to obtain the earliest offsets will be replaced once adminClient.listShareGroupOffsets is implemented
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> earliestResult = adminClient.listOffsets(earliest).all().get();
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestResult = adminClient.listOffsets(latest).all().get();
Map<TopicPartition, Long> lag = new HashMap<>();
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> tp : earliestResult.entrySet()) {
lag.put(tp.getKey(), latestResult.get(tp.getKey()).offset() - earliestResult.get(tp.getKey()).offset());
}
return lag;
}
private void printOffsets(ShareGroupDescription description) throws ExecutionException, InterruptedException {
Map<TopicPartition, Long> offsets = getOffsets(description.members());
if (offsets != null && !offsets.isEmpty()) {
String fmt = printOffsetFormat(description, offsets);
System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "OFFSET");
for (Map.Entry<TopicPartition, Long> offset : offsets.entrySet()) {
System.out.printf(fmt, description.groupId(), offset.getKey().topic(), offset.getKey().partition(), offset.getValue());
}
}
}
private static String printOffsetFormat(ShareGroupDescription description, Map<TopicPartition, Long> offsets) {
int groupLen = Math.max(15, description.groupId().length());
int maxTopicLen = 15;
for (TopicPartition topicPartition : offsets.keySet()) {
maxTopicLen = Math.max(maxTopicLen, topicPartition.topic().length());
}
return "%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s %s\n";
}
private void printStates(ShareGroupDescription description) {
int groupLen = Math.max(15, description.groupId().length());
String coordinator = description.coordinator().host() + ":" + description.coordinator().port() + " (" + description.coordinator().idString() + ")";
int coordinatorLen = Math.max(25, coordinator.length());
String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s %-15s %s\n";
System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE", "#MEMBERS");
System.out.printf(fmt, description.groupId(), coordinator, description.state().toString(), description.members().size());
}
private void printMembers(ShareGroupDescription description) {
int groupLen = Math.max(15, description.groupId().length());
int maxConsumerIdLen = 15, maxHostLen = 15, maxClientIdLen = 15;
Collection<MemberDescription> members = description.members();
for (MemberDescription member : members) {
maxConsumerIdLen = Math.max(maxConsumerIdLen, member.consumerId().length());
maxHostLen = Math.max(maxHostLen, member.host().length());
maxClientIdLen = Math.max(maxClientIdLen, member.clientId().length());
}
String fmt = "%" + -groupLen + "s %" + -maxConsumerIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %s\n";
System.out.printf(fmt, "GROUP", "CONSUMER-ID", "HOST", "CLIENT-ID", "ASSIGNMENT");
for (MemberDescription member : members) {
System.out.printf(fmt, description.groupId(), member.consumerId(), member.host(), member.clientId(),
member.assignment().topicPartitions().stream().map(part -> part.topic() + ":" + part.partition()).collect(Collectors.joining(",")));
}
}
public void close() {
adminClient.close();
}
protected Admin createAdminClient(Map<String, String> configOverrides) throws IOException {
Properties props = opts.options.has(opts.commandConfigOpt) ? Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) : new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt));
props.putAll(configOverrides);
return Admin.create(props);
}
private <T extends AbstractOptions<T>> T withTimeoutMs(T options) {
int t = opts.options.valueOf(opts.timeoutMsOpt).intValue();
return options.timeoutMs(t);
}
}
}

View File

@ -0,0 +1,203 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools.consumer.group;
import org.apache.kafka.server.util.CommandDefaultOptions;
import org.apache.kafka.server.util.CommandLineUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import joptsimple.OptionSpec;
import static org.apache.kafka.tools.ToolsUtils.minus;
public class ShareGroupCommandOptions extends CommandDefaultOptions {
public static final Logger LOGGER = LoggerFactory.getLogger(ShareGroupCommandOptions.class);
public static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) to connect to.";
public static final String GROUP_DOC = "The share group we wish to act on.";
public static final String TOPIC_DOC = "The topic whose share group information should be deleted or topic whose should be included in the reset offset process. " +
"When resetting offsets, partitions can be specified using this format: 'topic1:0,1,2', where 0,1,2 are the partitions to be included.";
public static final String ALL_TOPICS_DOC = "Consider all topics assigned to a share group in the 'reset-offsets' process.";
public static final String LIST_DOC = "List all share groups.";
public static final String DESCRIBE_DOC = "Describe share group, members and offset information.";
public static final String NL = System.lineSeparator();
public static final String DELETE_DOC = "Delete share group.";
public static final String TIMEOUT_MS_DOC = "The timeout that can be set for some use cases. For example, it can be used when describing the group " +
"to specify the maximum amount of time in milliseconds to wait before the group stabilizes.";
public static final String COMMAND_CONFIG_DOC = "Property file containing configs to be passed to Admin Client.";
public static final String RESET_OFFSETS_DOC = "Reset offsets of share group. Supports one share group at the time, and instances must be inactive." + NL +
"Has 2 execution options: --dry-run (the default) to plan which offsets to reset, and --execute to reset the offsets. " + NL +
"You must choose one of the following reset specifications: --to-datetime, --to-earliest, --to-latest." + NL +
"To define the scope use --all-topics or --topic.";
public static final String DRY_RUN_DOC = "Only show results without executing changes on share groups. Supported operations: reset-offsets.";
public static final String EXECUTE_DOC = "Execute operation. Supported operations: reset-offsets.";
public static final String RESET_TO_DATETIME_DOC = "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'";
public static final String RESET_TO_EARLIEST_DOC = "Reset offsets to earliest offset.";
public static final String RESET_TO_LATEST_DOC = "Reset offsets to latest offset.";
public static final String MEMBERS_DOC = "Describe members of the group. This option may be used with the '--describe' option only.";
public static final String OFFSETS_DOC = "Describe the group and list all topic partitions in the group along with their offset information. " +
"This is the default sub-action and may be used with the '--describe' option only.";
public static final String STATE_DOC = "When specified with '--describe', includes the state of the group." + NL +
"When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states. " +
"Valid values are Empty, Stable and Dead.";
public static final String DELETE_OFFSETS_DOC = "Delete offsets of share group. Supports one share group at the time, and multiple topics.";
public final OptionSpec<String> bootstrapServerOpt;
public final OptionSpec<String> groupOpt;
public final OptionSpec<String> topicOpt;
public final OptionSpec<Void> allTopicsOpt;
public final OptionSpec<Void> listOpt;
public final OptionSpec<Void> describeOpt;
public final OptionSpec<Void> deleteOpt;
public final OptionSpec<Long> timeoutMsOpt;
public final OptionSpec<String> commandConfigOpt;
public final OptionSpec<Void> resetOffsetsOpt;
public final OptionSpec<Void> deleteOffsetsOpt;
public final OptionSpec<Void> dryRunOpt;
public final OptionSpec<Void> executeOpt;
public final OptionSpec<String> resetToDatetimeOpt;
public final OptionSpec<Void> resetToEarliestOpt;
public final OptionSpec<Void> resetToLatestOpt;
public final OptionSpec<Void> membersOpt;
public final OptionSpec<Void> offsetsOpt;
public final OptionSpec<String> stateOpt;
public final Set<OptionSpec<?>> allShareGroupLevelOpts;
public final Set<OptionSpec<?>> allResetOffsetScenarioOpts;
public final Set<OptionSpec<?>> allDeleteOffsetsOpts;
public ShareGroupCommandOptions(String[] args) {
super(args);
bootstrapServerOpt = parser.accepts("bootstrap-server", BOOTSTRAP_SERVER_DOC)
.withRequiredArg()
.describedAs("server to connect to")
.ofType(String.class);
groupOpt = parser.accepts("group", GROUP_DOC)
.withRequiredArg()
.describedAs("share group")
.ofType(String.class);
topicOpt = parser.accepts("topic", TOPIC_DOC)
.withRequiredArg()
.describedAs("topic")
.ofType(String.class);
allTopicsOpt = parser.accepts("all-topics", ALL_TOPICS_DOC);
listOpt = parser.accepts("list", LIST_DOC);
describeOpt = parser.accepts("describe", DESCRIBE_DOC);
deleteOpt = parser.accepts("delete", DELETE_DOC);
timeoutMsOpt = parser.accepts("timeout", TIMEOUT_MS_DOC)
.withRequiredArg()
.describedAs("timeout (ms)")
.ofType(Long.class)
.defaultsTo(5000L);
commandConfigOpt = parser.accepts("command-config", COMMAND_CONFIG_DOC)
.withRequiredArg()
.describedAs("command config property file")
.ofType(String.class);
resetOffsetsOpt = parser.accepts("reset-offsets", RESET_OFFSETS_DOC);
deleteOffsetsOpt = parser.accepts("delete-offsets", DELETE_OFFSETS_DOC);
dryRunOpt = parser.accepts("dry-run", DRY_RUN_DOC);
executeOpt = parser.accepts("execute", EXECUTE_DOC);
resetToDatetimeOpt = parser.accepts("to-datetime", RESET_TO_DATETIME_DOC)
.withRequiredArg()
.describedAs("datetime")
.ofType(String.class);
resetToEarliestOpt = parser.accepts("to-earliest", RESET_TO_EARLIEST_DOC);
resetToLatestOpt = parser.accepts("to-latest", RESET_TO_LATEST_DOC);
membersOpt = parser.accepts("members", MEMBERS_DOC)
.availableIf(describeOpt);
offsetsOpt = parser.accepts("offsets", OFFSETS_DOC)
.availableIf(describeOpt);
stateOpt = parser.accepts("state", STATE_DOC)
.availableIf(describeOpt, listOpt)
.withOptionalArg()
.ofType(String.class);
allShareGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt, describeOpt, deleteOpt, resetOffsetsOpt));
allResetOffsetScenarioOpts = new HashSet<>(Arrays.asList(resetToDatetimeOpt, resetToEarliestOpt, resetToLatestOpt));
allDeleteOffsetsOpts = new HashSet<>(Arrays.asList(groupOpt, topicOpt));
options = parser.parse(args);
}
@SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"})
public void checkArgs() {
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to list, describe, reset and delete share groups.");
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt);
if (options.has(describeOpt)) {
if (!options.has(groupOpt))
CommandLineUtils.printUsageAndExit(parser,
"Option " + describeOpt + " takes the option: " + groupOpt);
List<OptionSpec<?>> mutuallyExclusiveOpts = Arrays.asList(membersOpt, offsetsOpt, stateOpt);
if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ? 1 : 0).sum() > 1) {
CommandLineUtils.printUsageAndExit(parser,
"Option " + describeOpt + " takes at most one of these options: " + mutuallyExclusiveOpts.stream().map(Object::toString).collect(Collectors.joining(", ")));
}
if (options.has(stateOpt) && options.valueOf(stateOpt) != null)
CommandLineUtils.printUsageAndExit(parser,
"Option " + describeOpt + " does not take a value for " + stateOpt);
} else {
if (options.has(timeoutMsOpt))
LOGGER.debug("Option " + timeoutMsOpt + " is applicable only when " + describeOpt + " is used.");
}
if (options.has(deleteOpt)) {
if (!options.has(groupOpt))
CommandLineUtils.printUsageAndExit(parser,
"Option " + deleteOpt + " takes the option: " + groupOpt);
if (options.has(topicOpt))
CommandLineUtils.printUsageAndExit(parser, "The consumer does not support topic-specific offset " +
"deletion from a share group.");
}
if (options.has(deleteOffsetsOpt)) {
if (!options.has(groupOpt) || !options.has(topicOpt))
CommandLineUtils.printUsageAndExit(parser,
"Option " + deleteOffsetsOpt + " takes the following options: " + allDeleteOffsetsOpts.stream().map(Object::toString).collect(Collectors.joining(", ")));
}
if (options.has(resetOffsetsOpt)) {
if (options.has(dryRunOpt) && options.has(executeOpt))
CommandLineUtils.printUsageAndExit(parser, "Option " + resetOffsetsOpt + " only accepts one of " + executeOpt + " and " + dryRunOpt);
if (!options.has(dryRunOpt) && !options.has(executeOpt)) {
CommandLineUtils.printUsageAndExit(parser, "Option " + resetOffsetsOpt + " takes the option: " + executeOpt + " or " + dryRunOpt);
}
if (!options.has(groupOpt))
CommandLineUtils.printUsageAndExit(parser,
"Option " + resetOffsetsOpt + " takes the option: " + groupOpt);
CommandLineUtils.checkInvalidArgs(parser, options, resetToDatetimeOpt, minus(allResetOffsetScenarioOpts, resetToDatetimeOpt));
CommandLineUtils.checkInvalidArgs(parser, options, resetToEarliestOpt, minus(allResetOffsetScenarioOpts, resetToEarliestOpt));
CommandLineUtils.checkInvalidArgs(parser, options, resetToLatestOpt, minus(allResetOffsetScenarioOpts, resetToLatestOpt));
}
CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, minus(allShareGroupLevelOpts, describeOpt, deleteOpt, resetOffsetsOpt));
CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, minus(allShareGroupLevelOpts, deleteOpt, resetOffsetsOpt));
}
}

View File

@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools.consumer.group;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DescribeShareGroupsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.ListShareGroupsOptions;
import org.apache.kafka.clients.admin.ListShareGroupsResult;
import org.apache.kafka.clients.admin.MemberAssignment;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.ShareGroupDescription;
import org.apache.kafka.clients.admin.ShareGroupListing;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.ShareGroupState;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.consumer.group.ShareGroupCommand.ShareGroupService;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import joptsimple.OptionException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ShareGroupCommandTest {
@Test
public void testListShareGroups() throws Exception {
String firstGroup = "first-group";
String secondGroup = "second-group";
String bootstrapServer = "localhost:9092";
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--list"};
Admin adminClient = mock(KafkaAdminClient.class);
ListShareGroupsResult result = mock(ListShareGroupsResult.class);
when(result.all()).thenReturn(KafkaFuture.completedFuture(Arrays.asList(
new ShareGroupListing(firstGroup, Optional.of(ShareGroupState.STABLE)),
new ShareGroupListing(secondGroup, Optional.of(ShareGroupState.EMPTY))
)));
when(adminClient.listShareGroups(any(ListShareGroupsOptions.class))).thenReturn(result);
ShareGroupService service = getShareGroupService(cgcArgs, adminClient);
Set<String> expectedGroups = new HashSet<>(Arrays.asList(firstGroup, secondGroup));
final Set[] foundGroups = new Set[]{Collections.emptySet()};
TestUtils.waitForCondition(() -> {
foundGroups[0] = new HashSet<>(service.listShareGroups());
return Objects.equals(expectedGroups, foundGroups[0]);
}, "Expected --list to show groups " + expectedGroups + ", but found " + foundGroups[0] + ".");
service.close();
}
@Test
public void testDescribeShareGroups() throws Exception {
String firstGroup = "group1";
Admin adminClient = mock(KafkaAdminClient.class);
DescribeShareGroupsResult result = mock(DescribeShareGroupsResult.class);
Map<String, ShareGroupDescription> resultMap = new HashMap<>();
ShareGroupDescription exp = new ShareGroupDescription(
firstGroup,
Collections.singletonList(new MemberDescription("memid1", "clId1", "host1", new MemberAssignment(
Collections.singleton(new TopicPartition("topic1", 0))
))),
ShareGroupState.STABLE,
new Node(0, "host1", 9090));
resultMap.put(firstGroup, exp);
when(result.all()).thenReturn(KafkaFuture.completedFuture(resultMap));
when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection())).thenReturn(result);
ShareGroupService service = new ShareGroupService(null, adminClient);
assertEquals(exp, service.getDescribeGroup(firstGroup));
service.close();
}
@Test
public void testDescribeShareGroupsGetOffsets() throws Exception {
Admin adminClient = mock(KafkaAdminClient.class);
ListOffsetsResult startOffset = mock(ListOffsetsResult.class);
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> startOffsetResultMap = new HashMap<>();
startOffsetResultMap.put(new TopicPartition("topic1", 0), new ListOffsetsResult.ListOffsetsResultInfo(10, -1, Optional.empty()));
ListOffsetsResult endOffset = mock(ListOffsetsResult.class);
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsetResultMap = new HashMap<>();
endOffsetResultMap.put(new TopicPartition("topic1", 0), new ListOffsetsResult.ListOffsetsResultInfo(30, -1, Optional.empty()));
when(startOffset.all()).thenReturn(KafkaFuture.completedFuture(startOffsetResultMap));
when(endOffset.all()).thenReturn(KafkaFuture.completedFuture(endOffsetResultMap));
when(adminClient.listOffsets(ArgumentMatchers.anyMap())).thenReturn(startOffset, endOffset);
MemberDescription description = new MemberDescription("", "", "",
new MemberAssignment(Collections.singleton(new TopicPartition("topic1", 0))));
ShareGroupService service = new ShareGroupService(null, adminClient);
Map<TopicPartition, Long> lags = service.getOffsets(Collections.singletonList(description));
assertEquals(1, lags.size());
assertEquals(20, lags.get(new TopicPartition("topic1", 0)));
service.close();
}
@Test
public void testListWithUnrecognizedOption() {
String bootstrapServer = "localhost:9092";
String[] cgcArgs = new String[]{"--frivolous-nonsense", "--bootstrap-server", bootstrapServer, "--list"};
assertThrows(OptionException.class, () -> getShareGroupService(cgcArgs, new MockAdminClient()));
}
@Test
public void testListShareGroupsWithStates() throws Exception {
String firstGroup = "first-group";
String secondGroup = "second-group";
String bootstrapServer = "localhost:9092";
String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--list", "--state"};
Admin adminClient = mock(KafkaAdminClient.class);
ListShareGroupsResult resultWithAllStates = mock(ListShareGroupsResult.class);
when(resultWithAllStates.all()).thenReturn(KafkaFuture.completedFuture(Arrays.asList(
new ShareGroupListing(firstGroup, Optional.of(ShareGroupState.STABLE)),
new ShareGroupListing(secondGroup, Optional.of(ShareGroupState.EMPTY))
)));
when(adminClient.listShareGroups(any(ListShareGroupsOptions.class))).thenReturn(resultWithAllStates);
ShareGroupService service = getShareGroupService(cgcArgs, adminClient);
Set<ShareGroupListing> expectedListing = new HashSet<>(Arrays.asList(
new ShareGroupListing(firstGroup, Optional.of(ShareGroupState.STABLE)),
new ShareGroupListing(secondGroup, Optional.of(ShareGroupState.EMPTY))));
final Set[] foundListing = new Set[]{Collections.emptySet()};
TestUtils.waitForCondition(() -> {
foundListing[0] = new HashSet<>(service.listShareGroupsWithState(new HashSet<>(Arrays.asList(ShareGroupState.values()))));
return Objects.equals(expectedListing, foundListing[0]);
}, "Expected to show groups " + expectedListing + ", but found " + foundListing[0]);
ListShareGroupsResult resultWithStableState = mock(ListShareGroupsResult.class);
when(resultWithStableState.all()).thenReturn(KafkaFuture.completedFuture(Collections.singletonList(
new ShareGroupListing(firstGroup, Optional.of(ShareGroupState.STABLE))
)));
when(adminClient.listShareGroups(any(ListShareGroupsOptions.class))).thenReturn(resultWithStableState);
Set<ShareGroupListing> expectedListingStable = Collections.singleton(
new ShareGroupListing(firstGroup, Optional.of(ShareGroupState.STABLE)));
foundListing[0] = Collections.emptySet();
TestUtils.waitForCondition(() -> {
foundListing[0] = new HashSet<>(service.listShareGroupsWithState(Collections.singleton(ShareGroupState.STABLE)));
return Objects.equals(expectedListingStable, foundListing[0]);
}, "Expected to show groups " + expectedListingStable + ", but found " + foundListing[0]);
service.close();
}
@Test
public void testShareGroupStatesFromString() {
Set<ShareGroupState> result = ShareGroupCommand.shareGroupStatesFromString("Stable");
assertEquals(Collections.singleton(ShareGroupState.STABLE), result);
result = ShareGroupCommand.shareGroupStatesFromString("stable");
assertEquals(new HashSet<>(Collections.singletonList(ShareGroupState.STABLE)), result);
result = ShareGroupCommand.shareGroupStatesFromString("dead");
assertEquals(new HashSet<>(Collections.singletonList(ShareGroupState.DEAD)), result);
result = ShareGroupCommand.shareGroupStatesFromString("empty");
assertEquals(new HashSet<>(Collections.singletonList(ShareGroupState.EMPTY)), result);
assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.shareGroupStatesFromString("bad, wrong"));
assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.shareGroupStatesFromString(" bad, Stable"));
assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.shareGroupStatesFromString(" , ,"));
}
ShareGroupService getShareGroupService(String[] args, Admin adminClient) {
ShareGroupCommandOptions opts = new ShareGroupCommandOptions(args);
return new ShareGroupService(opts, adminClient);
}
}