KAFKA-17546 Admin.listGroups and kafka-groups.sh (#17626)

This implements the kafka-groups.sh tool and Admin.listGroups method defined in KIP-1043.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Andrew Schofield 2024-11-01 21:37:04 +00:00 committed by GitHub
parent b864a66439
commit 3d9f88daf3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 1465 additions and 0 deletions

17
bin/kafka-groups.sh Executable file
View File

@ -0,0 +1,17 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.GroupsCommand "$@"

View File

@ -0,0 +1,17 @@
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
"%~dp0kafka-run-class.bat" org.apache.kafka.tools.GroupsCommand %*

View File

@ -1022,6 +1022,26 @@ public interface Admin extends AutoCloseable {
return deleteConsumerGroupOffsets(groupId, partitions, new DeleteConsumerGroupOffsetsOptions());
}
/**
* List the groups available in the cluster with the default options.
*
* <p>This is a convenience method for {@link #listGroups(ListGroupsOptions)} with default options.
* See the overload for more details.
*
* @return The ListGroupsResult.
*/
default ListGroupsResult listGroups() {
return listGroups(new ListGroupsOptions());
}
/**
* List the groups available in the cluster.
*
* @param options The options to use when listing the groups.
* @return The ListGroupsResult.
*/
ListGroupsResult listGroups(ListGroupsOptions options);
/**
* Elect a replica as leader for topic partitions.
* <p>

View File

@ -309,6 +309,11 @@ public class ForwardingAdmin implements Admin {
return delegate.listShareGroups(options);
}
@Override
public ListGroupsResult listGroups(ListGroupsOptions options) {
return delegate.listGroups(options);
}
@Override
public void registerMetricForSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException();

View File

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.GroupType;
import java.util.Objects;
import java.util.Optional;
/**
* A listing of a group in the cluster.
*/
public class GroupListing {
private final String groupId;
private final Optional<GroupType> type;
private final String protocol;
/**
* Create an instance with the specified parameters.
*
* @param groupId Group Id
* @param type Group type
* @param protocol Protocol
*/
public GroupListing(String groupId, Optional<GroupType> type, String protocol) {
this.groupId = groupId;
this.type = Objects.requireNonNull(type);
this.protocol = protocol;
}
/**
* The group Id.
*
* @return Group Id
*/
public String groupId() {
return groupId;
}
/**
* The type of the group.
* <p>
* If the broker returns a group type which is not recognised, as might
* happen when talking to a broker with a later version, the type will be
* <code>Optional.of(GroupType.UNKNOWN)</code>. If the broker is earlier than version 2.6.0,
* the group type will not be available, and the type will be <code>Optional.empty()</code>.
*
* @return An Optional containing the type, if available
*/
public Optional<GroupType> type() {
return type;
}
/**
* The protocol of the group.
*
* @return The protocol
*/
public String protocol() {
return protocol;
}
/**
* If the group is a simple consumer group or not.
*/
public boolean isSimpleConsumerGroup() {
return type.filter(gt -> gt == GroupType.CLASSIC).isPresent() && protocol.isEmpty();
}
@Override
public String toString() {
return "(" +
"groupId='" + groupId + '\'' +
", type=" + type.map(GroupType::toString).orElse("none") +
", protocol='" + protocol + '\'' +
')';
}
@Override
public int hashCode() {
return Objects.hash(groupId, type, protocol);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof GroupListing)) return false;
GroupListing that = (GroupListing) o;
return Objects.equals(groupId, that.groupId) &&
Objects.equals(type, that.type) &&
Objects.equals(protocol, that.protocol);
}
}

View File

@ -3490,6 +3490,141 @@ public class KafkaAdminClient extends AdminClient {
return new DescribeDelegationTokenResult(tokensFuture);
}
private static final class ListGroupsResults {
private final List<Throwable> errors;
private final HashMap<String, GroupListing> listings;
private final HashSet<Node> remaining;
private final KafkaFutureImpl<Collection<Object>> future;
ListGroupsResults(Collection<Node> leaders,
KafkaFutureImpl<Collection<Object>> future) {
this.errors = new ArrayList<>();
this.listings = new HashMap<>();
this.remaining = new HashSet<>(leaders);
this.future = future;
tryComplete();
}
synchronized void addError(Throwable throwable, Node node) {
ApiError error = ApiError.fromThrowable(throwable);
if (error.message() == null || error.message().isEmpty()) {
errors.add(error.error().exception("Error listing groups on " + node));
} else {
errors.add(error.error().exception("Error listing groups on " + node + ": " + error.message()));
}
}
synchronized void addListing(GroupListing listing) {
listings.put(listing.groupId(), listing);
}
synchronized void tryComplete(Node leader) {
remaining.remove(leader);
tryComplete();
}
private synchronized void tryComplete() {
if (remaining.isEmpty()) {
ArrayList<Object> results = new ArrayList<>(listings.values());
results.addAll(errors);
future.complete(results);
}
}
}
@Override
public ListGroupsResult listGroups(ListGroupsOptions options) {
final KafkaFutureImpl<Collection<Object>> all = new KafkaFutureImpl<>();
final long nowMetadata = time.milliseconds();
final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs());
runnable.call(new Call("findAllBrokers", deadline, new LeastLoadedNodeProvider()) {
@Override
MetadataRequest.Builder createRequest(int timeoutMs) {
return new MetadataRequest.Builder(new MetadataRequestData()
.setTopics(Collections.emptyList())
.setAllowAutoTopicCreation(true));
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
MetadataResponse metadataResponse = (MetadataResponse) abstractResponse;
Collection<Node> nodes = metadataResponse.brokers();
if (nodes.isEmpty())
throw new StaleMetadataException("Metadata fetch failed due to missing broker list");
HashSet<Node> allNodes = new HashSet<>(nodes);
final ListGroupsResults results = new ListGroupsResults(allNodes, all);
for (final Node node : allNodes) {
final long nowList = time.milliseconds();
runnable.call(new Call("listGroups", deadline, new ConstantNodeIdProvider(node.id())) {
@Override
ListGroupsRequest.Builder createRequest(int timeoutMs) {
List<String> groupTypes = options.types()
.stream()
.map(GroupType::toString)
.collect(Collectors.toList());
return new ListGroupsRequest.Builder(new ListGroupsRequestData()
.setTypesFilter(groupTypes)
);
}
private void maybeAddGroup(ListGroupsResponseData.ListedGroup group) {
final String groupId = group.groupId();
final Optional<GroupType> type;
if (group.groupType() == null || group.groupType().isEmpty()) {
type = Optional.empty();
} else {
type = Optional.of(GroupType.parse(group.groupType()));
}
final String protocolType = group.protocolType();
final GroupListing groupListing = new GroupListing(
groupId,
type,
protocolType
);
results.addListing(groupListing);
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
final ListGroupsResponse response = (ListGroupsResponse) abstractResponse;
synchronized (results) {
Errors error = Errors.forCode(response.data().errorCode());
if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.COORDINATOR_NOT_AVAILABLE) {
throw error.exception();
} else if (error != Errors.NONE) {
results.addError(error.exception(), node);
} else {
for (ListGroupsResponseData.ListedGroup group : response.data().groups()) {
maybeAddGroup(group);
}
}
results.tryComplete(node);
}
}
@Override
void handleFailure(Throwable throwable) {
synchronized (results) {
results.addError(throwable, node);
results.tryComplete(node);
}
}
}, nowList);
}
}
@Override
void handleFailure(Throwable throwable) {
KafkaException exception = new KafkaException("Failed to find brokers to send ListGroups", throwable);
all.complete(Collections.singletonList(exception));
}
}, nowMetadata);
return new ListGroupsResult(all);
}
@Override
public DescribeConsumerGroupsResult describeConsumerGroups(final Collection<String> groupIds,
final DescribeConsumerGroupsOptions options) {

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collections;
import java.util.Set;
/**
* Options for {@link Admin#listGroups()}.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class ListGroupsOptions extends AbstractOptions<ListGroupsOptions> {
private Set<GroupType> types = Collections.emptySet();
/**
* If types is set, only groups of these types will be returned by listGroups().
* Otherwise, all groups are returned.
*/
public ListGroupsOptions withTypes(Set<GroupType> types) {
this.types = (types == null || types.isEmpty()) ? Set.of() : Set.copyOf(types);
return this;
}
/**
* Returns the list of group types that are requested or empty if no types have been specified.
*/
public Set<GroupType> types() {
return types;
}
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
/**
* The result of the {@link Admin#listGroups()} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class ListGroupsResult {
private final KafkaFutureImpl<Collection<GroupListing>> all;
private final KafkaFutureImpl<Collection<GroupListing>> valid;
private final KafkaFutureImpl<Collection<Throwable>> errors;
ListGroupsResult(KafkaFuture<Collection<Object>> future) {
this.all = new KafkaFutureImpl<>();
this.valid = new KafkaFutureImpl<>();
this.errors = new KafkaFutureImpl<>();
future.thenApply(results -> {
ArrayList<Throwable> curErrors = new ArrayList<>();
ArrayList<GroupListing> curValid = new ArrayList<>();
for (Object resultObject : results) {
if (resultObject instanceof Throwable) {
curErrors.add((Throwable) resultObject);
} else {
curValid.add((GroupListing) resultObject);
}
}
List<GroupListing> validResult = Collections.unmodifiableList(curValid);
List<Throwable> errorsResult = Collections.unmodifiableList(curErrors);
if (!errorsResult.isEmpty()) {
all.completeExceptionally(errorsResult.get(0));
} else {
all.complete(validResult);
}
valid.complete(validResult);
errors.complete(errorsResult);
return null;
});
}
/**
* Returns a future that yields either an exception, or the full set of group listings.
* <p>
* In the event of a failure, the future yields nothing but the first exception which
* occurred.
*/
public KafkaFuture<Collection<GroupListing>> all() {
return all;
}
/**
* Returns a future which yields just the valid listings.
* <p>
* This future never fails with an error, no matter what happens. Errors are completely
* ignored. If nothing can be fetched, an empty collection is yielded.
* If there is an error, but some results can be returned, this future will yield
* those partial results. When using this future, it is a good idea to also check
* the errors future so that errors can be displayed and handled.
*/
public KafkaFuture<Collection<GroupListing>> valid() {
return valid;
}
/**
* Returns a future which yields just the errors which occurred.
* <p>
* If this future yields a non-empty collection, it is very likely that elements are
* missing from the valid() set.
* <p>
* This future itself never fails with an error. In the event of an error, this future
* will successfully yield a collection containing at least one exception.
*/
public KafkaFuture<Collection<Throwable>> errors() {
return errors;
}
}

View File

@ -140,6 +140,16 @@ public class AdminClientTestUtils {
.collect(Collectors.toMap(Map.Entry::getKey, e -> KafkaFuture.completedFuture(e.getValue()))));
}
public static ListGroupsResult listGroupsResult(GroupListing... groups) {
return new ListGroupsResult(
KafkaFuture.completedFuture(Arrays.stream(groups)
.collect(Collectors.toList())));
}
public static ListGroupsResult listGroupsResult(KafkaException exception) {
return new ListGroupsResult(KafkaFuture.completedFuture(Collections.singleton(exception)));
}
public static ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult(Map<String, Map<TopicPartition, OffsetAndMetadata>> offsets) {
Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> resultMap = offsets.entrySet().stream()
.collect(Collectors.toMap(e -> CoordinatorKey.byGroupId(e.getKey()),

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.GroupType;
import org.junit.jupiter.api.Test;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class GroupListingTest {
private static final String GROUP_ID = "mygroup";
@Test
public void testSimpleConsumerGroup() {
GroupListing gl = new GroupListing(GROUP_ID, Optional.of(GroupType.CLASSIC), "");
assertTrue(gl.isSimpleConsumerGroup());
gl = new GroupListing(GROUP_ID, Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE);
assertFalse(gl.isSimpleConsumerGroup());
gl = new GroupListing(GROUP_ID, Optional.of(GroupType.CONSUMER), "");
assertFalse(gl.isSimpleConsumerGroup());
gl = new GroupListing(GROUP_ID, Optional.empty(), "");
assertFalse(gl.isSimpleConsumerGroup());
}
}

View File

@ -2934,6 +2934,266 @@ public class KafkaAdminClientTest {
}
}
@Test
public void testListGroups() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0),
AdminClientConfig.RETRIES_CONFIG, "2")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
// Empty metadata response should be retried
env.kafkaClient().prepareResponse(
RequestTestUtils.metadataResponse(
Collections.emptyList(),
env.cluster().clusterResource().clusterId(),
-1,
Collections.emptyList()));
env.kafkaClient().prepareResponse(
RequestTestUtils.metadataResponse(
env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
env.cluster().controller().id(),
Collections.emptyList()));
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(asList(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-1")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupType(GroupType.CONSUMER.toString())
.setGroupState("Stable"),
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-connect-1")
.setProtocolType("connector")
.setGroupType(GroupType.CLASSIC.toString())
.setGroupState("Stable")
))),
env.cluster().nodeById(0));
// handle retriable errors
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setGroups(Collections.emptyList())
),
env.cluster().nodeById(1));
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
.setGroups(Collections.emptyList())
),
env.cluster().nodeById(1));
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(asList(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-2")
.setProtocolType("anyproto")
.setGroupType(GroupType.CLASSIC.toString())
.setGroupState("Stable"),
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-connect-2")
.setProtocolType("connector")
.setGroupType(GroupType.CLASSIC.toString())
.setGroupState("Stable")
))),
env.cluster().nodeById(1));
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(asList(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-3")
.setProtocolType("share")
.setGroupType(GroupType.SHARE.toString())
.setGroupState("Stable"),
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-connect-3")
.setProtocolType("connector")
.setGroupType(GroupType.CLASSIC.toString())
.setGroupState("Stable")
))),
env.cluster().nodeById(2));
// fatal error
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setGroups(Collections.emptyList())),
env.cluster().nodeById(3));
final ListGroupsResult result = env.adminClient().listGroups();
TestUtils.assertFutureError(result.all(), UnknownServerException.class);
Collection<GroupListing> listings = result.valid().get();
assertEquals(6, listings.size());
Set<String> groupIds = new HashSet<>();
for (GroupListing listing : listings) {
groupIds.add(listing.groupId());
}
assertEquals(Set.of("group-1", "group-connect-1", "group-2", "group-connect-2", "group-3", "group-connect-3"), groupIds);
assertEquals(1, result.errors().get().size());
}
}
@Test
public void testListGroupsMetadataFailure() throws Exception {
final Cluster cluster = mockCluster(3, 0);
final Time time = new MockTime();
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
AdminClientConfig.RETRIES_CONFIG, "0")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
// Empty metadata causes the request to fail since we have no list of brokers
// to send the ListGroups requests to
env.kafkaClient().prepareResponse(
RequestTestUtils.metadataResponse(
Collections.emptyList(),
env.cluster().clusterResource().clusterId(),
-1,
Collections.emptyList()));
final ListGroupsResult result = env.adminClient().listGroups();
TestUtils.assertFutureError(result.all(), KafkaException.class);
}
}
@Test
public void testListGroupsEmptyProtocol() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(asList(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-1")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupType(GroupType.CONSUMER.toString())
.setGroupState("Stable"),
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-2")
.setGroupType(GroupType.CLASSIC.toString())
.setGroupState("Empty")))),
env.cluster().nodeById(0));
final ListGroupsOptions options = new ListGroupsOptions();
final ListGroupsResult result = env.adminClient().listGroups(options);
Collection<GroupListing> listings = result.valid().get();
assertEquals(2, listings.size());
List<GroupListing> expected = new ArrayList<>();
expected.add(new GroupListing("group-2", Optional.of(GroupType.CLASSIC), ""));
expected.add(new GroupListing("group-1", Optional.of(GroupType.CONSUMER), ConsumerProtocol.PROTOCOL_TYPE));
assertEquals(expected, listings);
assertEquals(0, result.errors().get().size());
}
}
@Test
public void testListGroupsEmptyGroupType() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(List.of(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-1")
.setProtocolType("any")))),
env.cluster().nodeById(0));
final ListGroupsOptions options = new ListGroupsOptions();
final ListGroupsResult result = env.adminClient().listGroups(options);
Collection<GroupListing> listings = result.valid().get();
assertEquals(1, listings.size());
List<GroupListing> expected = new ArrayList<>();
expected.add(new GroupListing("group-1", Optional.empty(), "any"));
assertEquals(expected, listings);
assertEquals(0, result.errors().get().size());
}
}
@Test
public void testListGroupsWithTypes() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
// Test with list group options.
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
env.kafkaClient().prepareResponseFrom(
expectListGroupsRequestWithFilters(Collections.emptySet(), singleton(GroupType.CONSUMER.toString())),
new ListGroupsResponse(new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(asList(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-1")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState("Stable")
.setGroupType(GroupType.CONSUMER.toString()),
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-2")
.setGroupState("Empty")
.setGroupType(GroupType.CONSUMER.toString())))),
env.cluster().nodeById(0));
final ListGroupsOptions options = new ListGroupsOptions().withTypes(singleton(GroupType.CONSUMER));
final ListGroupsResult result = env.adminClient().listGroups(options);
Collection<GroupListing> listing = result.valid().get();
assertEquals(2, listing.size());
List<GroupListing> expected = new ArrayList<>();
expected.add(new GroupListing("group-2", Optional.of(GroupType.CONSUMER), ""));
expected.add(new GroupListing("group-1", Optional.of(GroupType.CONSUMER), ConsumerProtocol.PROTOCOL_TYPE));
assertEquals(expected, listing);
assertEquals(0, result.errors().get().size());
}
}
@Test
public void testListGroupsWithTypesOlderBrokerVersion() throws Exception {
ApiVersion listGroupV4 = new ApiVersion()
.setApiKey(ApiKeys.LIST_GROUPS.id)
.setMinVersion((short) 0)
.setMaxVersion((short) 4);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(listGroupV4)));
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
// Check that we cannot set a type filter with an older broker.
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
env.kafkaClient().prepareUnsupportedVersionResponse(request ->
request instanceof ListGroupsRequest && !((ListGroupsRequest) request).data().typesFilter().isEmpty()
);
ListGroupsOptions options = new ListGroupsOptions().withTypes(singleton(GroupType.CLASSIC));
ListGroupsResult result = env.adminClient().listGroups(options);
TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class);
}
}
@Test
public void testListConsumerGroups() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0),

View File

@ -19,7 +19,9 @@ package org.apache.kafka.clients.admin;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
@ -721,6 +723,13 @@ public class MockAdminClient extends AdminClient {
return new DescribeDelegationTokenResult(future);
}
@Override
public synchronized ListGroupsResult listGroups(ListGroupsOptions options) {
KafkaFutureImpl<Collection<Object>> future = new KafkaFutureImpl<>();
future.complete(groupConfigs.keySet().stream().map(g -> new GroupListing(g, Optional.of(GroupType.CONSUMER), ConsumerProtocol.PROTOCOL_TYPE)).collect(Collectors.toList()));
return new ListGroupsResult(future);
}
@Override
public synchronized DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds, DescribeConsumerGroupsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");

View File

@ -100,6 +100,8 @@ import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListConsumerGroupsOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.clients.admin.ListGroupsResult;
import org.apache.kafka.clients.admin.ListOffsetsOptions;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.ListPartitionReassignmentsOptions;
@ -268,6 +270,11 @@ public class TestingMetricsInterceptingAdminClient extends AdminClient {
return adminDelegate.describeDelegationToken(options);
}
@Override
public ListGroupsResult listGroups(final ListGroupsOptions options) {
return adminDelegate.listGroups(options);
}
@Override
public DescribeConsumerGroupsResult describeConsumerGroups(final Collection<String> groupIds, final DescribeConsumerGroupsOptions options) {
return adminDelegate.describeConsumerGroups(groupIds, options);

View File

@ -0,0 +1,295 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandDefaultOptions;
import org.apache.kafka.server.util.CommandLineUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
public class GroupsCommand {
private static final Logger LOG = LoggerFactory.getLogger(GroupsCommand.class);
public static void main(String... args) {
Exit.exit(mainNoExit(args));
}
static int mainNoExit(String... args) {
try {
execute(args);
return 0;
} catch (Throwable e) {
System.err.println(e.getMessage());
System.err.println(Utils.stackTrace(e));
return 1;
}
}
static void execute(String... args) throws Exception {
GroupsCommandOptions opts = new GroupsCommandOptions(args);
Properties config = opts.commandConfig();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.bootstrapServer());
int exitCode = 0;
try (GroupsService service = new GroupsService(config)) {
if (opts.hasListOption()) {
service.listGroups(opts);
}
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause != null) {
printException(cause);
} else {
printException(e);
}
exitCode = 1;
} catch (Throwable t) {
printException(t);
exitCode = 1;
} finally {
Exit.exit(exitCode);
}
}
public static class GroupsService implements AutoCloseable {
private final Admin adminClient;
public GroupsService(Properties config) {
this.adminClient = Admin.create(config);
}
// Visible for testing
GroupsService(Admin adminClient) {
this.adminClient = adminClient;
}
public void listGroups(GroupsCommandOptions opts) throws Exception {
Collection<GroupListing> resources = adminClient.listGroups()
.all().get(30, TimeUnit.SECONDS);
printGroupDetails(resources, opts.groupType(), opts.protocol(), opts.hasConsumerOption(), opts.hasShareOption());
}
private void printGroupDetails(Collection<GroupListing> groups,
Optional<GroupType> groupTypeFilter,
Optional<String> protocolFilter,
boolean consumerGroupFilter,
boolean shareGroupFilter) {
List<List<String>> lineItems = new ArrayList<>();
int maxLen = 20;
for (GroupListing group : groups) {
if (combinedFilter(group, groupTypeFilter, protocolFilter, consumerGroupFilter, shareGroupFilter)) {
List<String> lineItem = new ArrayList<>();
lineItem.add(group.groupId());
lineItem.add(group.type().map(GroupType::toString).orElse(""));
lineItem.add(group.protocol());
for (String item : lineItem) {
if (item != null) {
maxLen = Math.max(maxLen, item.length());
}
}
lineItems.add(lineItem);
}
}
String fmt = "%" + (-maxLen) + "s";
String header = fmt + " " + fmt + " " + fmt;
System.out.printf(header, "GROUP", "TYPE", "PROTOCOL");
System.out.println();
for (List<String> item : lineItems) {
for (String atom : item) {
System.out.printf(fmt + " ", atom);
}
System.out.println();
}
}
private boolean combinedFilter(GroupListing group,
Optional<GroupType> groupTypeFilter,
Optional<String> protocolFilter,
boolean consumerGroupFilter,
boolean shareGroupFilter) {
boolean pass = true;
Optional<GroupType> groupType = group.type();
String protocol = group.protocol();
if (groupTypeFilter.isPresent()) {
pass = groupType.filter(gt -> gt == groupTypeFilter.get()).isPresent()
&& protocolFilter.map(protocol::equals).orElse(true);
} else if (protocolFilter.isPresent()) {
pass = protocol.equals(protocolFilter.get());
} else if (consumerGroupFilter) {
pass = protocol.equals("consumer") || protocol.isEmpty() || groupType.filter(gt -> gt == GroupType.CONSUMER).isPresent();
} else if (shareGroupFilter) {
pass = groupType.filter(gt -> gt == GroupType.SHARE).isPresent();
}
return pass;
}
@Override
public void close() throws Exception {
adminClient.close();
}
}
private static void printException(Throwable e) {
System.out.println("Error while executing groups command : " + e.getMessage());
LOG.error(Utils.stackTrace(e));
}
public static final class GroupsCommandOptions extends CommandDefaultOptions {
private final ArgumentAcceptingOptionSpec<String> bootstrapServerOpt;
private final ArgumentAcceptingOptionSpec<String> commandConfigOpt;
private final OptionSpecBuilder listOpt;
private final ArgumentAcceptingOptionSpec<String> groupTypeOpt;
private final ArgumentAcceptingOptionSpec<String> protocolOpt;
private final OptionSpecBuilder consumerOpt;
private final OptionSpecBuilder shareOpt;
public GroupsCommandOptions(String[] args) {
super(args);
bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: The Kafka server to connect to.")
.withRequiredArg()
.describedAs("server to connect to")
.required()
.ofType(String.class);
commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.")
.withRequiredArg()
.describedAs("command config property file")
.ofType(String.class);
listOpt = parser.accepts("list", "List the groups.");
groupTypeOpt = parser.accepts("group-type", "Filter the groups based on group type. "
+ "Valid types are: 'classic', 'consumer' and 'share'.")
.withRequiredArg()
.describedAs("type")
.ofType(String.class);
protocolOpt = parser.accepts("protocol", "Filter the groups based on protocol type.")
.withRequiredArg()
.describedAs("protocol")
.ofType(String.class);
consumerOpt = parser.accepts("consumer", "Filter the groups to show all kinds of consumer groups, including classic and simple consumer groups. "
+ "This matches group type 'consumer', and group type 'classic' where the protocol type is 'consumer' or empty.");
shareOpt = parser.accepts("share", "Filter the groups to show share groups.");
options = parser.parse(args);
checkArgs();
}
public Boolean has(OptionSpec<?> builder) {
return options.has(builder);
}
public <A> Optional<A> valueAsOption(OptionSpec<A> option) {
return valueAsOption(option, Optional.empty());
}
public <A> Optional<A> valueAsOption(OptionSpec<A> option, Optional<A> defaultValue) {
if (has(option)) {
return Optional.of(options.valueOf(option));
} else {
return defaultValue;
}
}
public String bootstrapServer() {
return options.valueOf(bootstrapServerOpt);
}
public Properties commandConfig() throws IOException {
if (has(commandConfigOpt)) {
return Utils.loadProps(options.valueOf(commandConfigOpt));
} else {
return new Properties();
}
}
public Optional<GroupType> groupType() {
return valueAsOption(groupTypeOpt).map(GroupType::parse).filter(gt -> gt != GroupType.UNKNOWN);
}
public Optional<String> protocol() {
return valueAsOption(protocolOpt);
}
public boolean hasConsumerOption() {
return has(consumerOpt);
}
public boolean hasListOption() {
return has(listOpt);
}
public boolean hasShareOption() {
return has(shareOpt);
}
public void checkArgs() {
if (args.length == 0)
CommandLineUtils.printUsageAndExit(parser, "This tool helps to list groups of all types.");
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to list groups of all types.");
// should have exactly one action
long actions = Stream.of(listOpt).filter(options::has).count();
if (actions != 1)
CommandLineUtils.printUsageAndExit(parser, "Command must include exactly one action: --list.");
if (has(groupTypeOpt)) {
if (groupType().isEmpty()) {
throw new IllegalArgumentException("--group-type must be a valid group type.");
}
}
// check invalid args
CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt, groupTypeOpt, protocolOpt, shareOpt);
CommandLineUtils.checkInvalidArgs(parser, options, shareOpt, consumerOpt, groupTypeOpt, protocolOpt);
}
}
}

View File

@ -0,0 +1,381 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientTestUtils;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.ListGroupsResult;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Exit;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class GroupsCommandTest {
private final String bootstrapServer = "localhost:9092";
private final ToolsTestUtils.MockExitProcedure exitProcedure = new ToolsTestUtils.MockExitProcedure();
@BeforeEach
public void setupExitProcedure() {
Exit.setExitProcedure(exitProcedure);
}
@AfterEach
public void resetExitProcedure() {
Exit.resetExitProcedure();
}
@Test
public void testOptionsNoActionFails() {
assertInitializeInvalidOptionsExitCode(1,
new String[] {"--bootstrap-server", bootstrapServer});
}
@Test
public void testOptionsListSucceeds() {
GroupsCommand.GroupsCommandOptions opts = new GroupsCommand.GroupsCommandOptions(
new String[] {"--bootstrap-server", bootstrapServer, "--list"});
assertTrue(opts.hasListOption());
}
@Test
public void testOptionsListConsumerFilterSucceeds() {
GroupsCommand.GroupsCommandOptions opts = new GroupsCommand.GroupsCommandOptions(
new String[] {"--bootstrap-server", bootstrapServer, "--list", "--consumer"});
assertTrue(opts.hasListOption());
assertTrue(opts.hasConsumerOption());
}
@Test
public void testOptionsListShareFilterSucceeds() {
GroupsCommand.GroupsCommandOptions opts = new GroupsCommand.GroupsCommandOptions(
new String[] {"--bootstrap-server", bootstrapServer, "--list", "--share"});
assertTrue(opts.hasListOption());
assertTrue(opts.hasShareOption());
}
@Test
public void testOptionsListProtocolFilterSucceeds() {
GroupsCommand.GroupsCommandOptions opts = new GroupsCommand.GroupsCommandOptions(
new String[] {"--bootstrap-server", bootstrapServer, "--list", "--protocol", "anyproto"});
assertTrue(opts.hasListOption());
assertTrue(opts.protocol().isPresent());
assertEquals("anyproto", opts.protocol().get());
}
@Test
public void testOptionsListTypeFilterSucceeds() {
GroupsCommand.GroupsCommandOptions opts = new GroupsCommand.GroupsCommandOptions(
new String[] {"--bootstrap-server", bootstrapServer, "--list", "--group-type", "share"});
assertTrue(opts.hasListOption());
assertTrue(opts.groupType().isPresent());
assertEquals(GroupType.SHARE, opts.groupType().get());
}
@Test
public void testOptionsListInvalidTypeFilterFails() {
assertInitializeInvalidOptionsExitCode(1,
new String[] {"--bootstrap-server", bootstrapServer, "--list", "--group-type", "invalid"});
}
@Test
public void testOptionsListProtocolAndTypeFiltersSucceeds() {
GroupsCommand.GroupsCommandOptions opts = new GroupsCommand.GroupsCommandOptions(
new String[] {"--bootstrap-server", bootstrapServer, "--list", "--protocol", "anyproto", "--group-type", "share"});
assertTrue(opts.hasListOption());
assertTrue(opts.protocol().isPresent());
assertEquals("anyproto", opts.protocol().get());
assertTrue(opts.groupType().isPresent());
assertEquals(GroupType.SHARE, opts.groupType().get());
}
@Test
public void testOptionsListConsumerAndShareFilterFails() {
assertInitializeInvalidOptionsExitCode(1,
new String[] {"--bootstrap-server", bootstrapServer, "--list", "--consumer", "--share"});
}
@Test
public void testOptionsListConsumerAndProtocolFilterFails() {
assertInitializeInvalidOptionsExitCode(1,
new String[] {"--bootstrap-server", bootstrapServer, "--list", "--consumer", "--protocol", "anyproto"});
}
@Test
public void testOptionsListConsumerAndTypeFilterFails() {
assertInitializeInvalidOptionsExitCode(1,
new String[] {"--bootstrap-server", bootstrapServer, "--list", "--consumer", "--group-type", "share"});
}
@Test
public void testOptionsListShareAndProtocolFilterFails() {
assertInitializeInvalidOptionsExitCode(1,
new String[] {"--bootstrap-server", bootstrapServer, "--list", "--share", "--protocol", "anyproto"});
}
@Test
public void testOptionsListShareAndTypeFilterFails() {
assertInitializeInvalidOptionsExitCode(1,
new String[] {"--bootstrap-server", bootstrapServer, "--list", "--share", "--group-type", "classic"});
}
@Test
public void testListGroupsEmpty() {
Admin adminClient = mock(Admin.class);
GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient);
ListGroupsResult result = AdminClientTestUtils.listGroupsResult();
when(adminClient.listGroups()).thenReturn(result);
String capturedOutput = ToolsTestUtils.captureStandardOut(() -> {
try {
service.listGroups(new GroupsCommand.GroupsCommandOptions(
new String[]{"--bootstrap-server", bootstrapServer, "--list"}
));
} catch (Throwable t) {
fail(t);
}
});
assertCapturedListOutput(capturedOutput);
}
@Test
public void testListGroups() {
Admin adminClient = mock(Admin.class);
GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient);
ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer"),
new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer"),
new GroupListing("SG", Optional.of(GroupType.SHARE), "share")
);
when(adminClient.listGroups()).thenReturn(result);
String capturedOutput = ToolsTestUtils.captureStandardOut(() -> {
try {
service.listGroups(new GroupsCommand.GroupsCommandOptions(
new String[]{"--bootstrap-server", bootstrapServer, "--list"}
));
} catch (Throwable t) {
fail(t);
}
});
assertCapturedListOutput(capturedOutput,
new String[]{"CGclassic", "Classic", "consumer"},
new String[]{"CGconsumer", "Consumer", "consumer"},
new String[]{"SG", "Share", "share"});
}
@Test
public void testListGroupsConsumerFilter() {
Admin adminClient = mock(Admin.class);
GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient);
ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer"),
new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer"),
new GroupListing("SG", Optional.of(GroupType.SHARE), "share")
);
when(adminClient.listGroups()).thenReturn(result);
String capturedOutput = ToolsTestUtils.captureStandardOut(() -> {
try {
service.listGroups(new GroupsCommand.GroupsCommandOptions(
new String[]{"--bootstrap-server", bootstrapServer, "--list", "--consumer"}
));
} catch (Throwable t) {
fail(t);
}
});
assertCapturedListOutput(capturedOutput,
new String[]{"CGclassic", "Classic", "consumer"},
new String[]{"CGconsumer", "Consumer", "consumer"});
}
@Test
public void testListGroupsShareFilter() {
Admin adminClient = mock(Admin.class);
GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient);
ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer"),
new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer"),
new GroupListing("SG", Optional.of(GroupType.SHARE), "share")
);
when(adminClient.listGroups()).thenReturn(result);
String capturedOutput = ToolsTestUtils.captureStandardOut(() -> {
try {
service.listGroups(new GroupsCommand.GroupsCommandOptions(
new String[]{"--bootstrap-server", bootstrapServer, "--list", "--share"}
));
} catch (Throwable t) {
fail(t);
}
});
assertCapturedListOutput(capturedOutput,
new String[]{"SG", "Share", "share"});
}
@Test
public void testListGroupsProtocolFilter() {
Admin adminClient = mock(Admin.class);
GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient);
ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer"),
new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer"),
new GroupListing("SG", Optional.of(GroupType.SHARE), "share")
);
when(adminClient.listGroups()).thenReturn(result);
String capturedOutput = ToolsTestUtils.captureStandardOut(() -> {
try {
service.listGroups(new GroupsCommand.GroupsCommandOptions(
new String[]{"--bootstrap-server", bootstrapServer, "--list", "--protocol", "consumer"}
));
} catch (Throwable t) {
fail(t);
}
});
assertCapturedListOutput(capturedOutput,
new String[]{"CGclassic", "Classic", "consumer"},
new String[]{"CGconsumer", "Consumer", "consumer"});
}
@Test
public void testListGroupsTypeFilter() {
Admin adminClient = mock(Admin.class);
GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient);
ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer"),
new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer"),
new GroupListing("SG", Optional.of(GroupType.SHARE), "share")
);
when(adminClient.listGroups()).thenReturn(result);
String capturedOutput = ToolsTestUtils.captureStandardOut(() -> {
try {
service.listGroups(new GroupsCommand.GroupsCommandOptions(
new String[]{"--bootstrap-server", bootstrapServer, "--list", "--group-type", "share"}
));
} catch (Throwable t) {
fail(t);
}
});
assertCapturedListOutput(capturedOutput,
new String[]{"SG", "Share", "share"});
}
@Test
public void testListGroupsProtocolAndTypeFilter() {
Admin adminClient = mock(Admin.class);
GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient);
ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer"),
new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer"),
new GroupListing("SG", Optional.of(GroupType.SHARE), "share")
);
when(adminClient.listGroups()).thenReturn(result);
String capturedOutput = ToolsTestUtils.captureStandardOut(() -> {
try {
service.listGroups(new GroupsCommand.GroupsCommandOptions(
new String[]{"--bootstrap-server", bootstrapServer, "--list", "--protocol", "consumer", "--group-type", "classic"}
));
} catch (Throwable t) {
fail(t);
}
});
assertCapturedListOutput(capturedOutput,
new String[]{"CGclassic", "Classic", "consumer"});
}
@Test
public void testListGroupsProtocolAndTypeFilterNoMatch() {
Admin adminClient = mock(Admin.class);
GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient);
ListGroupsResult result = AdminClientTestUtils.listGroupsResult(
new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer"),
new GroupListing("SG", Optional.of(GroupType.SHARE), "share")
);
when(adminClient.listGroups()).thenReturn(result);
String capturedOutput = ToolsTestUtils.captureStandardOut(() -> {
try {
service.listGroups(new GroupsCommand.GroupsCommandOptions(
new String[]{"--bootstrap-server", bootstrapServer, "--list", "--protocol", "consumer", "--group-type", "classic"}
));
} catch (Throwable t) {
fail(t);
}
});
assertCapturedListOutput(capturedOutput);
}
@Test
public void testListGroupsFailsWithException() {
Admin adminClient = mock(Admin.class);
GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient);
ListGroupsResult result = AdminClientTestUtils.listGroupsResult(Errors.COORDINATOR_NOT_AVAILABLE.exception());
when(adminClient.listGroups()).thenReturn(result);
assertThrows(ExecutionException.class, () -> service.listGroups(new GroupsCommand.GroupsCommandOptions(
new String[]{"--bootstrap-server", bootstrapServer, "--list"}
)));
}
private void assertInitializeInvalidOptionsExitCode(int expected, String[] options) {
Exit.setExitProcedure((exitCode, message) -> {
assertEquals(expected, exitCode);
throw new RuntimeException();
});
try {
assertThrows(RuntimeException.class, () -> new GroupsCommand.GroupsCommandOptions(options));
} finally {
Exit.resetExitProcedure();
}
}
private void assertCapturedListOutput(String capturedOutput, String[]... expectedLines) {
String[] capturedLines = capturedOutput.split("\n");
assertEquals(expectedLines.length + 1, capturedLines.length);
assertEquals("GROUP,TYPE,PROTOCOL", String.join(",", capturedLines[0].split(" +")));
int i = 1;
for (String[] line : expectedLines) {
assertEquals(String.join(",", line), String.join(",", capturedLines[i++].split(" +")));
}
}
}