diff --git a/bin/kafka-groups.sh b/bin/kafka-groups.sh new file mode 100755 index 00000000000..9c84746e962 --- /dev/null +++ b/bin/kafka-groups.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.GroupsCommand "$@" diff --git a/bin/windows/kafka-groups.bat b/bin/windows/kafka-groups.bat new file mode 100644 index 00000000000..de079ed1568 --- /dev/null +++ b/bin/windows/kafka-groups.bat @@ -0,0 +1,17 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +"%~dp0kafka-run-class.bat" org.apache.kafka.tools.GroupsCommand %* diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index 6470042754c..d6695566bc2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -1022,6 +1022,26 @@ public interface Admin extends AutoCloseable { return deleteConsumerGroupOffsets(groupId, partitions, new DeleteConsumerGroupOffsetsOptions()); } + /** + * List the groups available in the cluster with the default options. + * + *

This is a convenience method for {@link #listGroups(ListGroupsOptions)} with default options. + * See the overload for more details. + * + * @return The ListGroupsResult. + */ + default ListGroupsResult listGroups() { + return listGroups(new ListGroupsOptions()); + } + + /** + * List the groups available in the cluster. + * + * @param options The options to use when listing the groups. + * @return The ListGroupsResult. + */ + ListGroupsResult listGroups(ListGroupsOptions options); + /** * Elect a replica as leader for topic partitions. *

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java index ef34367de86..87e350c5e7a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java @@ -309,6 +309,11 @@ public class ForwardingAdmin implements Admin { return delegate.listShareGroups(options); } + @Override + public ListGroupsResult listGroups(ListGroupsOptions options) { + return delegate.listGroups(options); + } + @Override public void registerMetricForSubscription(KafkaMetric metric) { throw new UnsupportedOperationException(); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/GroupListing.java b/clients/src/main/java/org/apache/kafka/clients/admin/GroupListing.java new file mode 100644 index 00000000000..0ee2a211e70 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/GroupListing.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.GroupType; + +import java.util.Objects; +import java.util.Optional; + +/** + * A listing of a group in the cluster. + */ +public class GroupListing { + private final String groupId; + private final Optional type; + private final String protocol; + + /** + * Create an instance with the specified parameters. + * + * @param groupId Group Id + * @param type Group type + * @param protocol Protocol + */ + public GroupListing(String groupId, Optional type, String protocol) { + this.groupId = groupId; + this.type = Objects.requireNonNull(type); + this.protocol = protocol; + } + + /** + * The group Id. + * + * @return Group Id + */ + public String groupId() { + return groupId; + } + + /** + * The type of the group. + *

+ * If the broker returns a group type which is not recognised, as might + * happen when talking to a broker with a later version, the type will be + * Optional.of(GroupType.UNKNOWN). If the broker is earlier than version 2.6.0, + * the group type will not be available, and the type will be Optional.empty(). + * + * @return An Optional containing the type, if available + */ + public Optional type() { + return type; + } + + /** + * The protocol of the group. + * + * @return The protocol + */ + public String protocol() { + return protocol; + } + + /** + * If the group is a simple consumer group or not. + */ + public boolean isSimpleConsumerGroup() { + return type.filter(gt -> gt == GroupType.CLASSIC).isPresent() && protocol.isEmpty(); + } + + @Override + public String toString() { + return "(" + + "groupId='" + groupId + '\'' + + ", type=" + type.map(GroupType::toString).orElse("none") + + ", protocol='" + protocol + '\'' + + ')'; + } + + @Override + public int hashCode() { + return Objects.hash(groupId, type, protocol); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof GroupListing)) return false; + GroupListing that = (GroupListing) o; + return Objects.equals(groupId, that.groupId) && + Objects.equals(type, that.type) && + Objects.equals(protocol, that.protocol); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 269ffd1099b..30fdac4687d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -3490,6 +3490,141 @@ public class KafkaAdminClient extends AdminClient { return new DescribeDelegationTokenResult(tokensFuture); } + private static final class ListGroupsResults { + private final List errors; + private final HashMap listings; + private final HashSet remaining; + private final KafkaFutureImpl> future; + + ListGroupsResults(Collection leaders, + KafkaFutureImpl> future) { + this.errors = new ArrayList<>(); + this.listings = new HashMap<>(); + this.remaining = new HashSet<>(leaders); + this.future = future; + tryComplete(); + } + + synchronized void addError(Throwable throwable, Node node) { + ApiError error = ApiError.fromThrowable(throwable); + if (error.message() == null || error.message().isEmpty()) { + errors.add(error.error().exception("Error listing groups on " + node)); + } else { + errors.add(error.error().exception("Error listing groups on " + node + ": " + error.message())); + } + } + + synchronized void addListing(GroupListing listing) { + listings.put(listing.groupId(), listing); + } + + synchronized void tryComplete(Node leader) { + remaining.remove(leader); + tryComplete(); + } + + private synchronized void tryComplete() { + if (remaining.isEmpty()) { + ArrayList results = new ArrayList<>(listings.values()); + results.addAll(errors); + future.complete(results); + } + } + } + + @Override + public ListGroupsResult listGroups(ListGroupsOptions options) { + final KafkaFutureImpl> all = new KafkaFutureImpl<>(); + final long nowMetadata = time.milliseconds(); + final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs()); + runnable.call(new Call("findAllBrokers", deadline, new LeastLoadedNodeProvider()) { + @Override + MetadataRequest.Builder createRequest(int timeoutMs) { + return new MetadataRequest.Builder(new MetadataRequestData() + .setTopics(Collections.emptyList()) + .setAllowAutoTopicCreation(true)); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + MetadataResponse metadataResponse = (MetadataResponse) abstractResponse; + Collection nodes = metadataResponse.brokers(); + if (nodes.isEmpty()) + throw new StaleMetadataException("Metadata fetch failed due to missing broker list"); + + HashSet allNodes = new HashSet<>(nodes); + final ListGroupsResults results = new ListGroupsResults(allNodes, all); + + for (final Node node : allNodes) { + final long nowList = time.milliseconds(); + runnable.call(new Call("listGroups", deadline, new ConstantNodeIdProvider(node.id())) { + @Override + ListGroupsRequest.Builder createRequest(int timeoutMs) { + List groupTypes = options.types() + .stream() + .map(GroupType::toString) + .collect(Collectors.toList()); + return new ListGroupsRequest.Builder(new ListGroupsRequestData() + .setTypesFilter(groupTypes) + ); + } + + private void maybeAddGroup(ListGroupsResponseData.ListedGroup group) { + final String groupId = group.groupId(); + final Optional type; + if (group.groupType() == null || group.groupType().isEmpty()) { + type = Optional.empty(); + } else { + type = Optional.of(GroupType.parse(group.groupType())); + } + final String protocolType = group.protocolType(); + final GroupListing groupListing = new GroupListing( + groupId, + type, + protocolType + ); + results.addListing(groupListing); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + final ListGroupsResponse response = (ListGroupsResponse) abstractResponse; + synchronized (results) { + Errors error = Errors.forCode(response.data().errorCode()); + if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.COORDINATOR_NOT_AVAILABLE) { + throw error.exception(); + } else if (error != Errors.NONE) { + results.addError(error.exception(), node); + } else { + for (ListGroupsResponseData.ListedGroup group : response.data().groups()) { + maybeAddGroup(group); + } + } + results.tryComplete(node); + } + } + + @Override + void handleFailure(Throwable throwable) { + synchronized (results) { + results.addError(throwable, node); + results.tryComplete(node); + } + } + }, nowList); + } + } + + @Override + void handleFailure(Throwable throwable) { + KafkaException exception = new KafkaException("Failed to find brokers to send ListGroups", throwable); + all.complete(Collections.singletonList(exception)); + } + }, nowMetadata); + + return new ListGroupsResult(all); + } + @Override public DescribeConsumerGroupsResult describeConsumerGroups(final Collection groupIds, final DescribeConsumerGroupsOptions options) { diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsOptions.java new file mode 100644 index 00000000000..042c88dc80f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsOptions.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.GroupType; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Collections; +import java.util.Set; + +/** + * Options for {@link Admin#listGroups()}. + *

+ * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class ListGroupsOptions extends AbstractOptions { + + private Set types = Collections.emptySet(); + + /** + * If types is set, only groups of these types will be returned by listGroups(). + * Otherwise, all groups are returned. + */ + public ListGroupsOptions withTypes(Set types) { + this.types = (types == null || types.isEmpty()) ? Set.of() : Set.copyOf(types); + return this; + } + + /** + * Returns the list of group types that are requested or empty if no types have been specified. + */ + public Set types() { + return types; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsResult.java new file mode 100644 index 00000000000..b19c3e38e9c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsResult.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.internals.KafkaFutureImpl; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * The result of the {@link Admin#listGroups()} call. + *

+ * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class ListGroupsResult { + private final KafkaFutureImpl> all; + private final KafkaFutureImpl> valid; + private final KafkaFutureImpl> errors; + + ListGroupsResult(KafkaFuture> future) { + this.all = new KafkaFutureImpl<>(); + this.valid = new KafkaFutureImpl<>(); + this.errors = new KafkaFutureImpl<>(); + future.thenApply(results -> { + ArrayList curErrors = new ArrayList<>(); + ArrayList curValid = new ArrayList<>(); + for (Object resultObject : results) { + if (resultObject instanceof Throwable) { + curErrors.add((Throwable) resultObject); + } else { + curValid.add((GroupListing) resultObject); + } + } + List validResult = Collections.unmodifiableList(curValid); + List errorsResult = Collections.unmodifiableList(curErrors); + if (!errorsResult.isEmpty()) { + all.completeExceptionally(errorsResult.get(0)); + } else { + all.complete(validResult); + } + valid.complete(validResult); + errors.complete(errorsResult); + return null; + }); + } + + /** + * Returns a future that yields either an exception, or the full set of group listings. + *

+ * In the event of a failure, the future yields nothing but the first exception which + * occurred. + */ + public KafkaFuture> all() { + return all; + } + + /** + * Returns a future which yields just the valid listings. + *

+ * This future never fails with an error, no matter what happens. Errors are completely + * ignored. If nothing can be fetched, an empty collection is yielded. + * If there is an error, but some results can be returned, this future will yield + * those partial results. When using this future, it is a good idea to also check + * the errors future so that errors can be displayed and handled. + */ + public KafkaFuture> valid() { + return valid; + } + + /** + * Returns a future which yields just the errors which occurred. + *

+ * If this future yields a non-empty collection, it is very likely that elements are + * missing from the valid() set. + *

+ * This future itself never fails with an error. In the event of an error, this future + * will successfully yield a collection containing at least one exception. + */ + public KafkaFuture> errors() { + return errors; + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java index 6a79299d3b2..1a549a6fdec 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java @@ -140,6 +140,16 @@ public class AdminClientTestUtils { .collect(Collectors.toMap(Map.Entry::getKey, e -> KafkaFuture.completedFuture(e.getValue())))); } + public static ListGroupsResult listGroupsResult(GroupListing... groups) { + return new ListGroupsResult( + KafkaFuture.completedFuture(Arrays.stream(groups) + .collect(Collectors.toList()))); + } + + public static ListGroupsResult listGroupsResult(KafkaException exception) { + return new ListGroupsResult(KafkaFuture.completedFuture(Collections.singleton(exception))); + } + public static ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult(Map> offsets) { Map>> resultMap = offsets.entrySet().stream() .collect(Collectors.toMap(e -> CoordinatorKey.byGroupId(e.getKey()), diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/GroupListingTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/GroupListingTest.java new file mode 100644 index 00000000000..7a8279be34a --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/admin/GroupListingTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.GroupType; + +import org.junit.jupiter.api.Test; + +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class GroupListingTest { + + private static final String GROUP_ID = "mygroup"; + + @Test + public void testSimpleConsumerGroup() { + GroupListing gl = new GroupListing(GROUP_ID, Optional.of(GroupType.CLASSIC), ""); + assertTrue(gl.isSimpleConsumerGroup()); + + gl = new GroupListing(GROUP_ID, Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE); + assertFalse(gl.isSimpleConsumerGroup()); + + gl = new GroupListing(GROUP_ID, Optional.of(GroupType.CONSUMER), ""); + assertFalse(gl.isSimpleConsumerGroup()); + + gl = new GroupListing(GROUP_ID, Optional.empty(), ""); + assertFalse(gl.isSimpleConsumerGroup()); + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 0fb6280b61c..a864503e1c9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -2934,6 +2934,266 @@ public class KafkaAdminClientTest { } } + @Test + public void testListGroups() throws Exception { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0), + AdminClientConfig.RETRIES_CONFIG, "2")) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + // Empty metadata response should be retried + env.kafkaClient().prepareResponse( + RequestTestUtils.metadataResponse( + Collections.emptyList(), + env.cluster().clusterResource().clusterId(), + -1, + Collections.emptyList())); + + env.kafkaClient().prepareResponse( + RequestTestUtils.metadataResponse( + env.cluster().nodes(), + env.cluster().clusterResource().clusterId(), + env.cluster().controller().id(), + Collections.emptyList())); + + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(asList( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupType(GroupType.CONSUMER.toString()) + .setGroupState("Stable"), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-connect-1") + .setProtocolType("connector") + .setGroupType(GroupType.CLASSIC.toString()) + .setGroupState("Stable") + ))), + env.cluster().nodeById(0)); + + // handle retriable errors + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) + .setGroups(Collections.emptyList()) + ), + env.cluster().nodeById(1)); + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()) + .setGroups(Collections.emptyList()) + ), + env.cluster().nodeById(1)); + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(asList( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-2") + .setProtocolType("anyproto") + .setGroupType(GroupType.CLASSIC.toString()) + .setGroupState("Stable"), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-connect-2") + .setProtocolType("connector") + .setGroupType(GroupType.CLASSIC.toString()) + .setGroupState("Stable") + ))), + env.cluster().nodeById(1)); + + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(asList( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-3") + .setProtocolType("share") + .setGroupType(GroupType.SHARE.toString()) + .setGroupState("Stable"), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-connect-3") + .setProtocolType("connector") + .setGroupType(GroupType.CLASSIC.toString()) + .setGroupState("Stable") + ))), + env.cluster().nodeById(2)); + + // fatal error + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()) + .setGroups(Collections.emptyList())), + env.cluster().nodeById(3)); + + final ListGroupsResult result = env.adminClient().listGroups(); + TestUtils.assertFutureError(result.all(), UnknownServerException.class); + + Collection listings = result.valid().get(); + assertEquals(6, listings.size()); + + Set groupIds = new HashSet<>(); + for (GroupListing listing : listings) { + groupIds.add(listing.groupId()); + } + + assertEquals(Set.of("group-1", "group-connect-1", "group-2", "group-connect-2", "group-3", "group-connect-3"), groupIds); + assertEquals(1, result.errors().get().size()); + } + } + + @Test + public void testListGroupsMetadataFailure() throws Exception { + final Cluster cluster = mockCluster(3, 0); + final Time time = new MockTime(); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, + AdminClientConfig.RETRIES_CONFIG, "0")) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + // Empty metadata causes the request to fail since we have no list of brokers + // to send the ListGroups requests to + env.kafkaClient().prepareResponse( + RequestTestUtils.metadataResponse( + Collections.emptyList(), + env.cluster().clusterResource().clusterId(), + -1, + Collections.emptyList())); + + final ListGroupsResult result = env.adminClient().listGroups(); + TestUtils.assertFutureError(result.all(), KafkaException.class); + } + } + + @Test + public void testListGroupsEmptyProtocol() throws Exception { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse(new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(asList( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupType(GroupType.CONSUMER.toString()) + .setGroupState("Stable"), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-2") + .setGroupType(GroupType.CLASSIC.toString()) + .setGroupState("Empty")))), + env.cluster().nodeById(0)); + + final ListGroupsOptions options = new ListGroupsOptions(); + final ListGroupsResult result = env.adminClient().listGroups(options); + Collection listings = result.valid().get(); + + assertEquals(2, listings.size()); + List expected = new ArrayList<>(); + expected.add(new GroupListing("group-2", Optional.of(GroupType.CLASSIC), "")); + expected.add(new GroupListing("group-1", Optional.of(GroupType.CONSUMER), ConsumerProtocol.PROTOCOL_TYPE)); + assertEquals(expected, listings); + assertEquals(0, result.errors().get().size()); + } + } + + @Test + public void testListGroupsEmptyGroupType() throws Exception { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse(new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType("any")))), + env.cluster().nodeById(0)); + + final ListGroupsOptions options = new ListGroupsOptions(); + final ListGroupsResult result = env.adminClient().listGroups(options); + Collection listings = result.valid().get(); + + assertEquals(1, listings.size()); + List expected = new ArrayList<>(); + expected.add(new GroupListing("group-1", Optional.empty(), "any")); + assertEquals(expected, listings); + assertEquals(0, result.errors().get().size()); + } + } + + @Test + public void testListGroupsWithTypes() throws Exception { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + // Test with list group options. + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + env.kafkaClient().prepareResponseFrom( + expectListGroupsRequestWithFilters(Collections.emptySet(), singleton(GroupType.CONSUMER.toString())), + new ListGroupsResponse(new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(asList( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState("Stable") + .setGroupType(GroupType.CONSUMER.toString()), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-2") + .setGroupState("Empty") + .setGroupType(GroupType.CONSUMER.toString())))), + env.cluster().nodeById(0)); + + final ListGroupsOptions options = new ListGroupsOptions().withTypes(singleton(GroupType.CONSUMER)); + final ListGroupsResult result = env.adminClient().listGroups(options); + Collection listing = result.valid().get(); + + assertEquals(2, listing.size()); + List expected = new ArrayList<>(); + expected.add(new GroupListing("group-2", Optional.of(GroupType.CONSUMER), "")); + expected.add(new GroupListing("group-1", Optional.of(GroupType.CONSUMER), ConsumerProtocol.PROTOCOL_TYPE)); + assertEquals(expected, listing); + assertEquals(0, result.errors().get().size()); + } + } + + @Test + public void testListGroupsWithTypesOlderBrokerVersion() throws Exception { + ApiVersion listGroupV4 = new ApiVersion() + .setApiKey(ApiKeys.LIST_GROUPS.id) + .setMinVersion((short) 0) + .setMaxVersion((short) 4); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(listGroupV4))); + + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + // Check that we cannot set a type filter with an older broker. + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + env.kafkaClient().prepareUnsupportedVersionResponse(request -> + request instanceof ListGroupsRequest && !((ListGroupsRequest) request).data().typesFilter().isEmpty() + ); + + ListGroupsOptions options = new ListGroupsOptions().withTypes(singleton(GroupType.CLASSIC)); + ListGroupsResult result = env.adminClient().listGroups(options); + TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class); + } + } + @Test public void testListConsumerGroups() throws Exception { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0), diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 44ea2bfe8e9..8b6e1267c2f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -19,7 +19,9 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo; import org.apache.kafka.clients.admin.internals.CoordinatorKey; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; import org.apache.kafka.common.ElectionType; +import org.apache.kafka.common.GroupType; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; @@ -721,6 +723,13 @@ public class MockAdminClient extends AdminClient { return new DescribeDelegationTokenResult(future); } + @Override + public synchronized ListGroupsResult listGroups(ListGroupsOptions options) { + KafkaFutureImpl> future = new KafkaFutureImpl<>(); + future.complete(groupConfigs.keySet().stream().map(g -> new GroupListing(g, Optional.of(GroupType.CONSUMER), ConsumerProtocol.PROTOCOL_TYPE)).collect(Collectors.toList())); + return new ListGroupsResult(future); + } + @Override public synchronized DescribeConsumerGroupsResult describeConsumerGroups(Collection groupIds, DescribeConsumerGroupsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java index 793392b6086..e1cb7411f80 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java @@ -100,6 +100,8 @@ import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec; import org.apache.kafka.clients.admin.ListConsumerGroupsOptions; import org.apache.kafka.clients.admin.ListConsumerGroupsResult; +import org.apache.kafka.clients.admin.ListGroupsOptions; +import org.apache.kafka.clients.admin.ListGroupsResult; import org.apache.kafka.clients.admin.ListOffsetsOptions; import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.ListPartitionReassignmentsOptions; @@ -268,6 +270,11 @@ public class TestingMetricsInterceptingAdminClient extends AdminClient { return adminDelegate.describeDelegationToken(options); } + @Override + public ListGroupsResult listGroups(final ListGroupsOptions options) { + return adminDelegate.listGroups(options); + } + @Override public DescribeConsumerGroupsResult describeConsumerGroups(final Collection groupIds, final DescribeConsumerGroupsOptions options) { return adminDelegate.describeConsumerGroups(groupIds, options); diff --git a/tools/src/main/java/org/apache/kafka/tools/GroupsCommand.java b/tools/src/main/java/org/apache/kafka/tools/GroupsCommand.java new file mode 100644 index 00000000000..8f5046d6b8a --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/GroupsCommand.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.GroupListing; +import org.apache.kafka.common.GroupType; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import joptsimple.ArgumentAcceptingOptionSpec; +import joptsimple.OptionSpec; +import joptsimple.OptionSpecBuilder; + +public class GroupsCommand { + private static final Logger LOG = LoggerFactory.getLogger(GroupsCommand.class); + + public static void main(String... args) { + Exit.exit(mainNoExit(args)); + } + + static int mainNoExit(String... args) { + try { + execute(args); + return 0; + } catch (Throwable e) { + System.err.println(e.getMessage()); + System.err.println(Utils.stackTrace(e)); + return 1; + } + } + + static void execute(String... args) throws Exception { + GroupsCommandOptions opts = new GroupsCommandOptions(args); + + Properties config = opts.commandConfig(); + config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.bootstrapServer()); + + int exitCode = 0; + try (GroupsService service = new GroupsService(config)) { + if (opts.hasListOption()) { + service.listGroups(opts); + } + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause != null) { + printException(cause); + } else { + printException(e); + } + exitCode = 1; + } catch (Throwable t) { + printException(t); + exitCode = 1; + } finally { + Exit.exit(exitCode); + } + } + + public static class GroupsService implements AutoCloseable { + private final Admin adminClient; + + public GroupsService(Properties config) { + this.adminClient = Admin.create(config); + } + + // Visible for testing + GroupsService(Admin adminClient) { + this.adminClient = adminClient; + } + + public void listGroups(GroupsCommandOptions opts) throws Exception { + Collection resources = adminClient.listGroups() + .all().get(30, TimeUnit.SECONDS); + printGroupDetails(resources, opts.groupType(), opts.protocol(), opts.hasConsumerOption(), opts.hasShareOption()); + } + + private void printGroupDetails(Collection groups, + Optional groupTypeFilter, + Optional protocolFilter, + boolean consumerGroupFilter, + boolean shareGroupFilter) { + List> lineItems = new ArrayList<>(); + int maxLen = 20; + for (GroupListing group : groups) { + if (combinedFilter(group, groupTypeFilter, protocolFilter, consumerGroupFilter, shareGroupFilter)) { + List lineItem = new ArrayList<>(); + lineItem.add(group.groupId()); + lineItem.add(group.type().map(GroupType::toString).orElse("")); + lineItem.add(group.protocol()); + for (String item : lineItem) { + if (item != null) { + maxLen = Math.max(maxLen, item.length()); + } + } + lineItems.add(lineItem); + } + } + + String fmt = "%" + (-maxLen) + "s"; + String header = fmt + " " + fmt + " " + fmt; + System.out.printf(header, "GROUP", "TYPE", "PROTOCOL"); + System.out.println(); + for (List item : lineItems) { + for (String atom : item) { + System.out.printf(fmt + " ", atom); + } + System.out.println(); + } + } + + private boolean combinedFilter(GroupListing group, + Optional groupTypeFilter, + Optional protocolFilter, + boolean consumerGroupFilter, + boolean shareGroupFilter) { + boolean pass = true; + Optional groupType = group.type(); + String protocol = group.protocol(); + + if (groupTypeFilter.isPresent()) { + pass = groupType.filter(gt -> gt == groupTypeFilter.get()).isPresent() + && protocolFilter.map(protocol::equals).orElse(true); + } else if (protocolFilter.isPresent()) { + pass = protocol.equals(protocolFilter.get()); + } else if (consumerGroupFilter) { + pass = protocol.equals("consumer") || protocol.isEmpty() || groupType.filter(gt -> gt == GroupType.CONSUMER).isPresent(); + } else if (shareGroupFilter) { + pass = groupType.filter(gt -> gt == GroupType.SHARE).isPresent(); + } + return pass; + } + + @Override + public void close() throws Exception { + adminClient.close(); + } + } + + private static void printException(Throwable e) { + System.out.println("Error while executing groups command : " + e.getMessage()); + LOG.error(Utils.stackTrace(e)); + } + + public static final class GroupsCommandOptions extends CommandDefaultOptions { + private final ArgumentAcceptingOptionSpec bootstrapServerOpt; + + private final ArgumentAcceptingOptionSpec commandConfigOpt; + + private final OptionSpecBuilder listOpt; + + private final ArgumentAcceptingOptionSpec groupTypeOpt; + + private final ArgumentAcceptingOptionSpec protocolOpt; + + private final OptionSpecBuilder consumerOpt; + + private final OptionSpecBuilder shareOpt; + + public GroupsCommandOptions(String[] args) { + super(args); + bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: The Kafka server to connect to.") + .withRequiredArg() + .describedAs("server to connect to") + .required() + .ofType(String.class); + commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.") + .withRequiredArg() + .describedAs("command config property file") + .ofType(String.class); + + listOpt = parser.accepts("list", "List the groups."); + + groupTypeOpt = parser.accepts("group-type", "Filter the groups based on group type. " + + "Valid types are: 'classic', 'consumer' and 'share'.") + .withRequiredArg() + .describedAs("type") + .ofType(String.class); + + protocolOpt = parser.accepts("protocol", "Filter the groups based on protocol type.") + .withRequiredArg() + .describedAs("protocol") + .ofType(String.class); + + consumerOpt = parser.accepts("consumer", "Filter the groups to show all kinds of consumer groups, including classic and simple consumer groups. " + + "This matches group type 'consumer', and group type 'classic' where the protocol type is 'consumer' or empty."); + shareOpt = parser.accepts("share", "Filter the groups to show share groups."); + + options = parser.parse(args); + + checkArgs(); + } + + public Boolean has(OptionSpec builder) { + return options.has(builder); + } + + public Optional valueAsOption(OptionSpec option) { + return valueAsOption(option, Optional.empty()); + } + + public Optional valueAsOption(OptionSpec option, Optional defaultValue) { + if (has(option)) { + return Optional.of(options.valueOf(option)); + } else { + return defaultValue; + } + } + + public String bootstrapServer() { + return options.valueOf(bootstrapServerOpt); + } + + public Properties commandConfig() throws IOException { + if (has(commandConfigOpt)) { + return Utils.loadProps(options.valueOf(commandConfigOpt)); + } else { + return new Properties(); + } + } + + public Optional groupType() { + return valueAsOption(groupTypeOpt).map(GroupType::parse).filter(gt -> gt != GroupType.UNKNOWN); + } + + public Optional protocol() { + return valueAsOption(protocolOpt); + } + + public boolean hasConsumerOption() { + return has(consumerOpt); + } + + public boolean hasListOption() { + return has(listOpt); + } + + public boolean hasShareOption() { + return has(shareOpt); + } + + public void checkArgs() { + if (args.length == 0) + CommandLineUtils.printUsageAndExit(parser, "This tool helps to list groups of all types."); + + CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to list groups of all types."); + + // should have exactly one action + long actions = Stream.of(listOpt).filter(options::has).count(); + if (actions != 1) + CommandLineUtils.printUsageAndExit(parser, "Command must include exactly one action: --list."); + + if (has(groupTypeOpt)) { + if (groupType().isEmpty()) { + throw new IllegalArgumentException("--group-type must be a valid group type."); + } + } + + // check invalid args + CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt, groupTypeOpt, protocolOpt, shareOpt); + CommandLineUtils.checkInvalidArgs(parser, options, shareOpt, consumerOpt, groupTypeOpt, protocolOpt); + } + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java new file mode 100644 index 00000000000..cdea5583653 --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/GroupsCommandTest.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientTestUtils; +import org.apache.kafka.clients.admin.GroupListing; +import org.apache.kafka.clients.admin.ListGroupsResult; +import org.apache.kafka.common.GroupType; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.Exit; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Optional; +import java.util.concurrent.ExecutionException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class GroupsCommandTest { + + private final String bootstrapServer = "localhost:9092"; + private final ToolsTestUtils.MockExitProcedure exitProcedure = new ToolsTestUtils.MockExitProcedure(); + + @BeforeEach + public void setupExitProcedure() { + Exit.setExitProcedure(exitProcedure); + } + + @AfterEach + public void resetExitProcedure() { + Exit.resetExitProcedure(); + } + + @Test + public void testOptionsNoActionFails() { + assertInitializeInvalidOptionsExitCode(1, + new String[] {"--bootstrap-server", bootstrapServer}); + } + + @Test + public void testOptionsListSucceeds() { + GroupsCommand.GroupsCommandOptions opts = new GroupsCommand.GroupsCommandOptions( + new String[] {"--bootstrap-server", bootstrapServer, "--list"}); + assertTrue(opts.hasListOption()); + } + + @Test + public void testOptionsListConsumerFilterSucceeds() { + GroupsCommand.GroupsCommandOptions opts = new GroupsCommand.GroupsCommandOptions( + new String[] {"--bootstrap-server", bootstrapServer, "--list", "--consumer"}); + assertTrue(opts.hasListOption()); + assertTrue(opts.hasConsumerOption()); + } + + @Test + public void testOptionsListShareFilterSucceeds() { + GroupsCommand.GroupsCommandOptions opts = new GroupsCommand.GroupsCommandOptions( + new String[] {"--bootstrap-server", bootstrapServer, "--list", "--share"}); + assertTrue(opts.hasListOption()); + assertTrue(opts.hasShareOption()); + } + + @Test + public void testOptionsListProtocolFilterSucceeds() { + GroupsCommand.GroupsCommandOptions opts = new GroupsCommand.GroupsCommandOptions( + new String[] {"--bootstrap-server", bootstrapServer, "--list", "--protocol", "anyproto"}); + assertTrue(opts.hasListOption()); + assertTrue(opts.protocol().isPresent()); + assertEquals("anyproto", opts.protocol().get()); + } + + @Test + public void testOptionsListTypeFilterSucceeds() { + GroupsCommand.GroupsCommandOptions opts = new GroupsCommand.GroupsCommandOptions( + new String[] {"--bootstrap-server", bootstrapServer, "--list", "--group-type", "share"}); + assertTrue(opts.hasListOption()); + assertTrue(opts.groupType().isPresent()); + assertEquals(GroupType.SHARE, opts.groupType().get()); + } + + @Test + public void testOptionsListInvalidTypeFilterFails() { + assertInitializeInvalidOptionsExitCode(1, + new String[] {"--bootstrap-server", bootstrapServer, "--list", "--group-type", "invalid"}); + } + + @Test + public void testOptionsListProtocolAndTypeFiltersSucceeds() { + GroupsCommand.GroupsCommandOptions opts = new GroupsCommand.GroupsCommandOptions( + new String[] {"--bootstrap-server", bootstrapServer, "--list", "--protocol", "anyproto", "--group-type", "share"}); + assertTrue(opts.hasListOption()); + assertTrue(opts.protocol().isPresent()); + assertEquals("anyproto", opts.protocol().get()); + assertTrue(opts.groupType().isPresent()); + assertEquals(GroupType.SHARE, opts.groupType().get()); + } + + @Test + public void testOptionsListConsumerAndShareFilterFails() { + assertInitializeInvalidOptionsExitCode(1, + new String[] {"--bootstrap-server", bootstrapServer, "--list", "--consumer", "--share"}); + } + + @Test + public void testOptionsListConsumerAndProtocolFilterFails() { + assertInitializeInvalidOptionsExitCode(1, + new String[] {"--bootstrap-server", bootstrapServer, "--list", "--consumer", "--protocol", "anyproto"}); + } + + @Test + public void testOptionsListConsumerAndTypeFilterFails() { + assertInitializeInvalidOptionsExitCode(1, + new String[] {"--bootstrap-server", bootstrapServer, "--list", "--consumer", "--group-type", "share"}); + } + + @Test + public void testOptionsListShareAndProtocolFilterFails() { + assertInitializeInvalidOptionsExitCode(1, + new String[] {"--bootstrap-server", bootstrapServer, "--list", "--share", "--protocol", "anyproto"}); + } + + @Test + public void testOptionsListShareAndTypeFilterFails() { + assertInitializeInvalidOptionsExitCode(1, + new String[] {"--bootstrap-server", bootstrapServer, "--list", "--share", "--group-type", "classic"}); + } + + @Test + public void testListGroupsEmpty() { + Admin adminClient = mock(Admin.class); + GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient); + + ListGroupsResult result = AdminClientTestUtils.listGroupsResult(); + when(adminClient.listGroups()).thenReturn(result); + + String capturedOutput = ToolsTestUtils.captureStandardOut(() -> { + try { + service.listGroups(new GroupsCommand.GroupsCommandOptions( + new String[]{"--bootstrap-server", bootstrapServer, "--list"} + )); + } catch (Throwable t) { + fail(t); + } + }); + assertCapturedListOutput(capturedOutput); + } + + @Test + public void testListGroups() { + Admin adminClient = mock(Admin.class); + GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient); + + ListGroupsResult result = AdminClientTestUtils.listGroupsResult( + new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer"), + new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer"), + new GroupListing("SG", Optional.of(GroupType.SHARE), "share") + ); + when(adminClient.listGroups()).thenReturn(result); + + String capturedOutput = ToolsTestUtils.captureStandardOut(() -> { + try { + service.listGroups(new GroupsCommand.GroupsCommandOptions( + new String[]{"--bootstrap-server", bootstrapServer, "--list"} + )); + } catch (Throwable t) { + fail(t); + } + }); + assertCapturedListOutput(capturedOutput, + new String[]{"CGclassic", "Classic", "consumer"}, + new String[]{"CGconsumer", "Consumer", "consumer"}, + new String[]{"SG", "Share", "share"}); + } + + @Test + public void testListGroupsConsumerFilter() { + Admin adminClient = mock(Admin.class); + GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient); + + ListGroupsResult result = AdminClientTestUtils.listGroupsResult( + new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer"), + new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer"), + new GroupListing("SG", Optional.of(GroupType.SHARE), "share") + ); + when(adminClient.listGroups()).thenReturn(result); + + String capturedOutput = ToolsTestUtils.captureStandardOut(() -> { + try { + service.listGroups(new GroupsCommand.GroupsCommandOptions( + new String[]{"--bootstrap-server", bootstrapServer, "--list", "--consumer"} + )); + } catch (Throwable t) { + fail(t); + } + }); + assertCapturedListOutput(capturedOutput, + new String[]{"CGclassic", "Classic", "consumer"}, + new String[]{"CGconsumer", "Consumer", "consumer"}); + } + + @Test + public void testListGroupsShareFilter() { + Admin adminClient = mock(Admin.class); + GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient); + + ListGroupsResult result = AdminClientTestUtils.listGroupsResult( + new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer"), + new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer"), + new GroupListing("SG", Optional.of(GroupType.SHARE), "share") + ); + when(adminClient.listGroups()).thenReturn(result); + + String capturedOutput = ToolsTestUtils.captureStandardOut(() -> { + try { + service.listGroups(new GroupsCommand.GroupsCommandOptions( + new String[]{"--bootstrap-server", bootstrapServer, "--list", "--share"} + )); + } catch (Throwable t) { + fail(t); + } + }); + assertCapturedListOutput(capturedOutput, + new String[]{"SG", "Share", "share"}); + } + + @Test + public void testListGroupsProtocolFilter() { + Admin adminClient = mock(Admin.class); + GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient); + + ListGroupsResult result = AdminClientTestUtils.listGroupsResult( + new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer"), + new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer"), + new GroupListing("SG", Optional.of(GroupType.SHARE), "share") + ); + when(adminClient.listGroups()).thenReturn(result); + + String capturedOutput = ToolsTestUtils.captureStandardOut(() -> { + try { + service.listGroups(new GroupsCommand.GroupsCommandOptions( + new String[]{"--bootstrap-server", bootstrapServer, "--list", "--protocol", "consumer"} + )); + } catch (Throwable t) { + fail(t); + } + }); + assertCapturedListOutput(capturedOutput, + new String[]{"CGclassic", "Classic", "consumer"}, + new String[]{"CGconsumer", "Consumer", "consumer"}); + } + + @Test + public void testListGroupsTypeFilter() { + Admin adminClient = mock(Admin.class); + GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient); + + ListGroupsResult result = AdminClientTestUtils.listGroupsResult( + new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer"), + new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer"), + new GroupListing("SG", Optional.of(GroupType.SHARE), "share") + ); + when(adminClient.listGroups()).thenReturn(result); + + String capturedOutput = ToolsTestUtils.captureStandardOut(() -> { + try { + service.listGroups(new GroupsCommand.GroupsCommandOptions( + new String[]{"--bootstrap-server", bootstrapServer, "--list", "--group-type", "share"} + )); + } catch (Throwable t) { + fail(t); + } + }); + assertCapturedListOutput(capturedOutput, + new String[]{"SG", "Share", "share"}); + } + + @Test + public void testListGroupsProtocolAndTypeFilter() { + Admin adminClient = mock(Admin.class); + GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient); + + ListGroupsResult result = AdminClientTestUtils.listGroupsResult( + new GroupListing("CGclassic", Optional.of(GroupType.CLASSIC), "consumer"), + new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer"), + new GroupListing("SG", Optional.of(GroupType.SHARE), "share") + ); + when(adminClient.listGroups()).thenReturn(result); + + String capturedOutput = ToolsTestUtils.captureStandardOut(() -> { + try { + service.listGroups(new GroupsCommand.GroupsCommandOptions( + new String[]{"--bootstrap-server", bootstrapServer, "--list", "--protocol", "consumer", "--group-type", "classic"} + )); + } catch (Throwable t) { + fail(t); + } + }); + assertCapturedListOutput(capturedOutput, + new String[]{"CGclassic", "Classic", "consumer"}); + } + + @Test + public void testListGroupsProtocolAndTypeFilterNoMatch() { + Admin adminClient = mock(Admin.class); + GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient); + + ListGroupsResult result = AdminClientTestUtils.listGroupsResult( + new GroupListing("CGconsumer", Optional.of(GroupType.CONSUMER), "consumer"), + new GroupListing("SG", Optional.of(GroupType.SHARE), "share") + ); + when(adminClient.listGroups()).thenReturn(result); + + String capturedOutput = ToolsTestUtils.captureStandardOut(() -> { + try { + service.listGroups(new GroupsCommand.GroupsCommandOptions( + new String[]{"--bootstrap-server", bootstrapServer, "--list", "--protocol", "consumer", "--group-type", "classic"} + )); + } catch (Throwable t) { + fail(t); + } + }); + assertCapturedListOutput(capturedOutput); + } + + @Test + public void testListGroupsFailsWithException() { + Admin adminClient = mock(Admin.class); + GroupsCommand.GroupsService service = new GroupsCommand.GroupsService(adminClient); + + ListGroupsResult result = AdminClientTestUtils.listGroupsResult(Errors.COORDINATOR_NOT_AVAILABLE.exception()); + when(adminClient.listGroups()).thenReturn(result); + + assertThrows(ExecutionException.class, () -> service.listGroups(new GroupsCommand.GroupsCommandOptions( + new String[]{"--bootstrap-server", bootstrapServer, "--list"} + ))); + } + + private void assertInitializeInvalidOptionsExitCode(int expected, String[] options) { + Exit.setExitProcedure((exitCode, message) -> { + assertEquals(expected, exitCode); + throw new RuntimeException(); + }); + try { + assertThrows(RuntimeException.class, () -> new GroupsCommand.GroupsCommandOptions(options)); + } finally { + Exit.resetExitProcedure(); + } + } + + private void assertCapturedListOutput(String capturedOutput, String[]... expectedLines) { + String[] capturedLines = capturedOutput.split("\n"); + assertEquals(expectedLines.length + 1, capturedLines.length); + assertEquals("GROUP,TYPE,PROTOCOL", String.join(",", capturedLines[0].split(" +"))); + int i = 1; + for (String[] line : expectedLines) { + assertEquals(String.join(",", line), String.join(",", capturedLines[i++].split(" +"))); + } + } +}