diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index dc963497b11..87a951fe26e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -3511,27 +3511,29 @@ public class KafkaAdminClient extends AdminClient { } private void maybeAddGroup(ListGroupsResponseData.ListedGroup group) { - final String groupId = group.groupId(); - final Optional type; - if (group.groupType() == null || group.groupType().isEmpty()) { - type = Optional.empty(); - } else { - type = Optional.of(GroupType.parse(group.groupType())); + String protocolType = group.protocolType(); + if (options.protocolTypes().isEmpty() || options.protocolTypes().contains(protocolType)) { + final String groupId = group.groupId(); + final Optional type; + if (group.groupType() == null || group.groupType().isEmpty()) { + type = Optional.empty(); + } else { + type = Optional.of(GroupType.parse(group.groupType())); + } + final Optional groupState; + if (group.groupState() == null || group.groupState().isEmpty()) { + groupState = Optional.empty(); + } else { + groupState = Optional.of(GroupState.parse(group.groupState())); + } + final GroupListing groupListing = new GroupListing( + groupId, + type, + protocolType, + groupState + ); + results.addListing(groupListing); } - final String protocolType = group.protocolType(); - final Optional groupState; - if (group.groupState() == null || group.groupState().isEmpty()) { - groupState = Optional.empty(); - } else { - groupState = Optional.of(GroupState.parse(group.groupState())); - } - final GroupListing groupListing = new GroupListing( - groupId, - type, - protocolType, - groupState - ); - results.addListing(groupListing); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsOptions.java index e5d70133186..7d7083f46c5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsOptions.java @@ -46,6 +46,24 @@ public class ListGroupsOptions extends AbstractOptions { .withProtocolTypes(Set.of("", ConsumerProtocol.PROTOCOL_TYPE)); } + /** + * Only share groups will be returned by listGroups(). + * This operation sets a filter on group type which select share groups. + */ + public static ListGroupsOptions forShareGroups() { + return new ListGroupsOptions() + .withTypes(Set.of(GroupType.SHARE)); + } + + /** + * Only streams groups will be returned by listGroups(). + * This operation sets a filter on group type which select streams groups. + */ + public static ListGroupsOptions forStreamsGroups() { + return new ListGroupsOptions() + .withTypes(Set.of(GroupType.STREAMS)); + } + /** * If groupStates is set, only groups in these states will be returned by listGroups(). * Otherwise, all groups are returned. @@ -56,6 +74,10 @@ public class ListGroupsOptions extends AbstractOptions { return this; } + /** + * If protocol types is set, only groups of these protocol types will be returned by listGroups(). + * Otherwise, all groups are returned. + */ public ListGroupsOptions withProtocolTypes(Set protocolTypes) { this.protocolTypes = (protocolTypes == null || protocolTypes.isEmpty()) ? Set.of() : Set.copyOf(protocolTypes); return this; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java index 6dfbcca955a..84f7cc2a72d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.GroupType; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.ListGroupsRequestData; import org.apache.kafka.common.message.ListGroupsResponseData; @@ -24,6 +25,8 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Readable; import java.util.Collections; +import java.util.HashSet; +import java.util.List; /** * Possible error codes: @@ -50,8 +53,19 @@ public class ListGroupsRequest extends AbstractRequest { "v" + version + ", but we need v4 or newer to request groups by states."); } if (!data.typesFilter().isEmpty() && version < 5) { - throw new UnsupportedVersionException("The broker only supports ListGroups " + - "v" + version + ", but we need v5 or newer to request groups by type."); + // Types filter is supported by brokers with version 3.8.0 or later. Older brokers only support + // classic groups, so listing consumer groups on an older broker does not need to use a types filter. + // If the types filter is only for consumer and classic, or just classic groups, it can be safely omitted. + // This allows a modern admin client to list consumer groups on older brokers in a straightforward way. + HashSet typesCopy = new HashSet<>(data.typesFilter()); + boolean containedClassic = typesCopy.remove(GroupType.CLASSIC.toString()); + boolean containedConsumer = typesCopy.remove(GroupType.CONSUMER.toString()); + if (!typesCopy.isEmpty() || (!containedClassic && containedConsumer)) { + throw new UnsupportedVersionException("The broker only supports ListGroups " + + "v" + version + ", but we need v5 or newer to request groups by type. " + + "Requested group types: [" + String.join(", ", data.typesFilter()) + "]."); + } + return new ListGroupsRequest(data.duplicate().setTypesFilter(List.of()), version); } return new ListGroupsRequest(data, version); } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 80c36704243..3df76fffd36 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -3189,6 +3189,42 @@ public class KafkaAdminClientTest { } } + @Test + public void testListGroupsWithProtocolTypes() throws Exception { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + // Test with list group options. + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + env.kafkaClient().prepareResponseFrom( + expectListGroupsRequestWithFilters(Set.of(), Set.of()), + new ListGroupsResponse(new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState("Stable") + .setGroupType(GroupType.CONSUMER.toString()), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-2") + .setGroupState("Empty") + .setGroupType(GroupType.CONSUMER.toString())))), + env.cluster().nodeById(0)); + + final ListGroupsOptions options = new ListGroupsOptions().withProtocolTypes(Set.of("")); + final ListGroupsResult result = env.adminClient().listGroups(options); + Collection listing = result.valid().get(); + + assertEquals(1, listing.size()); + List expected = new ArrayList<>(); + expected.add(new GroupListing("group-2", Optional.of(GroupType.CONSUMER), "", Optional.of(GroupState.EMPTY))); + assertEquals(expected, listing); + assertEquals(0, result.errors().get().size()); + } + } + @Test public void testListGroupsWithTypes() throws Exception { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { @@ -3227,15 +3263,13 @@ public class KafkaAdminClientTest { } @Test - public void testListGroupsWithTypesOlderBrokerVersion() { + public void testListGroupsWithTypesOlderBrokerVersion() throws Exception { ApiVersion listGroupV4 = new ApiVersion() .setApiKey(ApiKeys.LIST_GROUPS.id) .setMinVersion((short) 0) .setMaxVersion((short) 4); try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(listGroupV4))); - - env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(List.of(listGroupV4))); // Check that we cannot set a type filter with an older broker. env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); @@ -3243,9 +3277,44 @@ public class KafkaAdminClientTest { request instanceof ListGroupsRequest && !((ListGroupsRequest) request).data().typesFilter().isEmpty() ); - ListGroupsOptions options = new ListGroupsOptions().withTypes(singleton(GroupType.CLASSIC)); + ListGroupsOptions options = new ListGroupsOptions().withTypes(Set.of(GroupType.SHARE)); ListGroupsResult result = env.adminClient().listGroups(options); TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + + // But a type filter which is just classic groups is permitted with an older broker, because they + // only know about classic groups so the types filter can be omitted. + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + env.kafkaClient().prepareResponseFrom( + expectListGroupsRequestWithFilters(Set.of(), Set.of()), + new ListGroupsResponse(new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState(GroupState.STABLE.toString())))), + env.cluster().nodeById(0)); + + options = new ListGroupsOptions().withTypes(Set.of(GroupType.CLASSIC)); + result = env.adminClient().listGroups(options); + + Collection listing = result.all().get(); + assertEquals(1, listing.size()); + List expected = List.of( + new GroupListing("group-1", Optional.empty(), ConsumerProtocol.PROTOCOL_TYPE, Optional.of(GroupState.STABLE)) + ); + assertEquals(expected, listing); + + // But a type filter which is just consumer groups without classic groups is not permitted with an older broker. + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + env.kafkaClient().prepareUnsupportedVersionResponse(request -> + request instanceof ListGroupsRequest && !((ListGroupsRequest) request).data().typesFilter().isEmpty() + ); + + options = new ListGroupsOptions().withTypes(Set.of(GroupType.CONSUMER)); + result = env.adminClient().listGroups(options); + TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); } } @@ -3267,7 +3336,6 @@ public class KafkaAdminClientTest { } @Test - @SuppressWarnings("removal") public void testListConsumerGroups() throws Exception { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0), AdminClientConfig.RETRIES_CONFIG, "2")) { @@ -3275,89 +3343,441 @@ public class KafkaAdminClientTest { // Empty metadata response should be retried env.kafkaClient().prepareResponse( - RequestTestUtils.metadataResponse( - Collections.emptyList(), - env.cluster().clusterResource().clusterId(), - -1, - Collections.emptyList())); + RequestTestUtils.metadataResponse( + List.of(), + env.cluster().clusterResource().clusterId(), + -1, + List.of())); env.kafkaClient().prepareResponse( - RequestTestUtils.metadataResponse( - env.cluster().nodes(), - env.cluster().clusterResource().clusterId(), - env.cluster().controller().id(), - Collections.emptyList())); + RequestTestUtils.metadataResponse( + env.cluster().nodes(), + env.cluster().clusterResource().clusterId(), + env.cluster().controller().id(), + List.of())); env.kafkaClient().prepareResponseFrom( - new ListGroupsResponse( - new ListGroupsResponseData() - .setErrorCode(Errors.NONE.code()) - .setGroups(asList( - new ListGroupsResponseData.ListedGroup() - .setGroupId("group-1") - .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) - .setGroupState("Stable"), - new ListGroupsResponseData.ListedGroup() - .setGroupId("group-connect-1") - .setProtocolType("connector") - .setGroupState("Stable") - ))), - env.cluster().nodeById(0)); + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState("Stable"), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-connect-1") + .setProtocolType("connector") + .setGroupState("Stable") + ))), + env.cluster().nodeById(0)); // handle retriable errors env.kafkaClient().prepareResponseFrom( - new ListGroupsResponse( - new ListGroupsResponseData() - .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) - .setGroups(Collections.emptyList()) - ), - env.cluster().nodeById(1)); + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) + .setGroups(Collections.emptyList()) + ), + env.cluster().nodeById(1)); env.kafkaClient().prepareResponseFrom( - new ListGroupsResponse( - new ListGroupsResponseData() - .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()) - .setGroups(Collections.emptyList()) - ), - env.cluster().nodeById(1)); + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()) + .setGroups(Collections.emptyList()) + ), + env.cluster().nodeById(1)); env.kafkaClient().prepareResponseFrom( - new ListGroupsResponse( - new ListGroupsResponseData() - .setErrorCode(Errors.NONE.code()) - .setGroups(asList( - new ListGroupsResponseData.ListedGroup() - .setGroupId("group-2") - .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) - .setGroupState("Stable"), - new ListGroupsResponseData.ListedGroup() - .setGroupId("group-connect-2") - .setProtocolType("connector") - .setGroupState("Stable") - ))), - env.cluster().nodeById(1)); + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(asList( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-2") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState("Stable"), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-connect-2") + .setProtocolType("connector") + .setGroupState("Stable") + ))), + env.cluster().nodeById(1)); env.kafkaClient().prepareResponseFrom( - new ListGroupsResponse( - new ListGroupsResponseData() - .setErrorCode(Errors.NONE.code()) - .setGroups(asList( - new ListGroupsResponseData.ListedGroup() - .setGroupId("group-3") - .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) - .setGroupState("Stable"), - new ListGroupsResponseData.ListedGroup() - .setGroupId("group-connect-3") - .setProtocolType("connector") - .setGroupState("Stable") - ))), - env.cluster().nodeById(2)); + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-3") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState("Stable"), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-connect-3") + .setProtocolType("connector") + .setGroupState("Stable") + ))), + env.cluster().nodeById(2)); // fatal error env.kafkaClient().prepareResponseFrom( - new ListGroupsResponse( - new ListGroupsResponseData() - .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()) - .setGroups(Collections.emptyList())), - env.cluster().nodeById(3)); + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()) + .setGroups(Collections.emptyList())), + env.cluster().nodeById(3)); + + final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forConsumerGroups()); + TestUtils.assertFutureThrows(UnknownServerException.class, result.all()); + + Collection listings = result.valid().get(); + assertEquals(3, listings.size()); + + Set groupIds = new HashSet<>(); + for (GroupListing listing : listings) { + groupIds.add(listing.groupId()); + assertTrue(listing.groupState().isPresent()); + } + + assertEquals(Set.of("group-1", "group-2", "group-3"), groupIds); + assertEquals(1, result.errors().get().size()); + } + } + + @Test + public void testListConsumerGroupsMetadataFailure() throws Exception { + final Cluster cluster = mockCluster(3, 0); + final Time time = new MockTime(); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, + AdminClientConfig.RETRIES_CONFIG, "0")) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + // Empty metadata causes the request to fail since we have no list of brokers + // to send the ListGroups requests to + env.kafkaClient().prepareResponse( + RequestTestUtils.metadataResponse( + List.of(), + env.cluster().clusterResource().clusterId(), + -1, + List.of())); + + final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forConsumerGroups()); + TestUtils.assertFutureThrows(KafkaException.class, result.all()); + } + } + + @Test + public void testListConsumerGroupsWithStates() throws Exception { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse(new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState("Stable"), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-2") + .setGroupState("Empty")))), + env.cluster().nodeById(0)); + + final ListGroupsOptions options = ListGroupsOptions.forConsumerGroups(); + final ListGroupsResult result = env.adminClient().listGroups(options); + Collection listings = result.valid().get(); + + assertEquals(2, listings.size()); + List expected = new ArrayList<>(); + expected.add(new GroupListing("group-2", Optional.empty(), "", Optional.of(GroupState.EMPTY))); + expected.add(new GroupListing("group-1", Optional.empty(), ConsumerProtocol.PROTOCOL_TYPE, Optional.of(GroupState.STABLE))); + assertEquals(expected, listings); + assertEquals(0, result.errors().get().size()); + } + } + + @Test + public void testListConsumerGroupsWithProtocolTypes() throws Exception { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + // Test with a specific protocol type filter in list consumer group options. + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + env.kafkaClient().prepareResponseFrom( + expectListGroupsRequestWithFilters(Set.of(), Set.of(GroupType.CONSUMER.toString(), GroupType.CLASSIC.toString())), + new ListGroupsResponse(new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState("Stable") + .setGroupType(GroupType.CONSUMER.toString()), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-2") + .setGroupState("Empty") + .setGroupType(GroupType.CONSUMER.toString())))), + env.cluster().nodeById(0)); + + final ListGroupsOptions options = ListGroupsOptions.forConsumerGroups().withProtocolTypes(Set.of(ConsumerProtocol.PROTOCOL_TYPE)); + final ListGroupsResult result = env.adminClient().listGroups(options); + Collection listings = result.valid().get(); + + assertEquals(1, listings.size()); + List expected = new ArrayList<>(); + expected.add(new GroupListing("group-1", Optional.of(GroupType.CONSUMER), ConsumerProtocol.PROTOCOL_TYPE, Optional.of(GroupState.STABLE))); + assertEquals(expected, listings); + assertEquals(0, result.errors().get().size()); + } + } + + @Test + public void testListConsumerGroupsWithTypes() throws Exception { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + // Test with a specific state filter but no type filter in list consumer group options. + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + env.kafkaClient().prepareResponseFrom( + expectListGroupsRequestWithFilters(Set.of(GroupState.STABLE.toString()), Set.of(GroupType.CONSUMER.toString(), GroupType.CLASSIC.toString())), + new ListGroupsResponse(new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState("Stable") + .setGroupType(GroupType.CLASSIC.toString())))), + env.cluster().nodeById(0)); + + final ListGroupsOptions options = ListGroupsOptions.forConsumerGroups().inGroupStates(Set.of(GroupState.STABLE)); + final ListGroupsResult result = env.adminClient().listGroups(options); + Collection listings = result.valid().get(); + + assertEquals(1, listings.size()); + List expected = new ArrayList<>(); + expected.add(new GroupListing("group-1", Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE, Optional.of(GroupState.STABLE))); + assertEquals(expected, listings); + assertEquals(0, result.errors().get().size()); + + // Test with list consumer group options. + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + env.kafkaClient().prepareResponseFrom( + expectListGroupsRequestWithFilters(Set.of(), Set.of(GroupType.CONSUMER.toString())), + new ListGroupsResponse(new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState("Stable") + .setGroupType(GroupType.CONSUMER.toString()), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-2") + .setGroupState("Empty") + .setGroupType(GroupType.CONSUMER.toString())))), + env.cluster().nodeById(0)); + + final ListGroupsOptions options2 = ListGroupsOptions.forConsumerGroups().withTypes(Set.of(GroupType.CONSUMER)); + final ListGroupsResult result2 = env.adminClient().listGroups(options2); + Collection listings2 = result2.valid().get(); + + assertEquals(2, listings2.size()); + List expected2 = new ArrayList<>(); + expected2.add(new GroupListing("group-2", Optional.of(GroupType.CONSUMER), "", Optional.of(GroupState.EMPTY))); + expected2.add(new GroupListing("group-1", Optional.of(GroupType.CONSUMER), ConsumerProtocol.PROTOCOL_TYPE, Optional.of(GroupState.STABLE))); + assertEquals(expected2, listings2); + assertEquals(0, result.errors().get().size()); + } + } + + @Test + public void testListConsumerGroupsWithStatesOlderBrokerVersion() throws Exception { + ApiVersion listGroupV3 = new ApiVersion() + .setApiKey(ApiKeys.LIST_GROUPS.id) + .setMinVersion((short) 0) + .setMaxVersion((short) 3); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(List.of(listGroupV3))); + + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + // Check we can list groups v3 with older broker if we don't specify states, and use just consumer group types which can be omitted. + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse(new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)))), + env.cluster().nodeById(0)); + + ListGroupsOptions options = ListGroupsOptions.forConsumerGroups(); + ListGroupsResult result = env.adminClient().listGroups(options); + Collection listing = result.all().get(); + assertEquals(1, listing.size()); + List expected = List.of(new GroupListing("group-1", Optional.empty(), ConsumerProtocol.PROTOCOL_TYPE, Optional.empty())); + assertEquals(expected, listing); + + // But we cannot set a state filter with older broker + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + env.kafkaClient().prepareUnsupportedVersionResponse(request -> + request instanceof ListGroupsRequest && + !((ListGroupsRequest) request).data().statesFilter().isEmpty() + ); + + options = ListGroupsOptions.forConsumerGroups().inGroupStates(Set.of(GroupState.STABLE)); + result = env.adminClient().listGroups(options); + TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + } + } + + @Test + public void testListConsumerGroupsWithTypesOlderBrokerVersion() throws Exception { + ApiVersion listGroupV4 = new ApiVersion() + .setApiKey(ApiKeys.LIST_GROUPS.id) + .setMinVersion((short) 0) + .setMaxVersion((short) 4); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(List.of(listGroupV4))); + + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + // Check if we can list groups v4 with older broker if we specify states and don't specify types. + env.kafkaClient().prepareResponseFrom( + expectListGroupsRequestWithFilters(Set.of(GroupState.STABLE.toString()), Set.of()), + new ListGroupsResponse(new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState(GroupState.STABLE.toString())))), + env.cluster().nodeById(0)); + + ListGroupsOptions options = ListGroupsOptions.forConsumerGroups().inGroupStates(Set.of(GroupState.STABLE)); + ListGroupsResult result = env.adminClient().listGroups(options); + + Collection listing = result.all().get(); + assertEquals(1, listing.size()); + List expected = List.of( + new GroupListing("group-1", Optional.empty(), ConsumerProtocol.PROTOCOL_TYPE, Optional.of(GroupState.STABLE)) + ); + assertEquals(expected, listing); + + // Check that we cannot set a type filter with an older broker. + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + // First attempt to build request will require v5 (type filter), but the broker only supports v4 + env.kafkaClient().prepareUnsupportedVersionResponse(request -> + request instanceof ListGroupsRequest && !((ListGroupsRequest) request).data().typesFilter().isEmpty() + ); + + options = ListGroupsOptions.forConsumerGroups().withTypes(Set.of(GroupType.SHARE)); + result = env.adminClient().listGroups(options); + TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + } + } + + @Test + @SuppressWarnings("removal") + public void testListConsumerGroupsDeprecated() throws Exception { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0), + AdminClientConfig.RETRIES_CONFIG, "2")) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + // Empty metadata response should be retried + env.kafkaClient().prepareResponse( + RequestTestUtils.metadataResponse( + List.of(), + env.cluster().clusterResource().clusterId(), + -1, + List.of())); + + env.kafkaClient().prepareResponse( + RequestTestUtils.metadataResponse( + env.cluster().nodes(), + env.cluster().clusterResource().clusterId(), + env.cluster().controller().id(), + List.of())); + + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState("Stable"), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-connect-1") + .setProtocolType("connector") + .setGroupState("Stable") + ))), + env.cluster().nodeById(0)); + + // handle retriable errors + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()) + .setGroups(List.of()) + ), + env.cluster().nodeById(1)); + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()) + .setGroups(List.of()) + ), + env.cluster().nodeById(1)); + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-2") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState("Stable"), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-connect-2") + .setProtocolType("connector") + .setGroupState("Stable") + ))), + env.cluster().nodeById(1)); + + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-3") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState("Stable"), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-connect-3") + .setProtocolType("connector") + .setGroupState("Stable") + ))), + env.cluster().nodeById(2)); + + // fatal error + env.kafkaClient().prepareResponseFrom( + new ListGroupsResponse( + new ListGroupsResponseData() + .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()) + .setGroups(List.of())), + env.cluster().nodeById(3)); final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(); TestUtils.assertFutureThrows(UnknownServerException.class, result.all()); @@ -3378,22 +3798,22 @@ public class KafkaAdminClientTest { @Test @SuppressWarnings("removal") - public void testListConsumerGroupsMetadataFailure() throws Exception { + public void testListConsumerGroupsDeprecatedMetadataFailure() throws Exception { final Cluster cluster = mockCluster(3, 0); final Time time = new MockTime(); try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, - AdminClientConfig.RETRIES_CONFIG, "0")) { + AdminClientConfig.RETRIES_CONFIG, "0")) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); // Empty metadata causes the request to fail since we have no list of brokers // to send the ListGroups requests to env.kafkaClient().prepareResponse( - RequestTestUtils.metadataResponse( - Collections.emptyList(), - env.cluster().clusterResource().clusterId(), - -1, - Collections.emptyList())); + RequestTestUtils.metadataResponse( + List.of(), + env.cluster().clusterResource().clusterId(), + -1, + List.of())); final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(); TestUtils.assertFutureThrows(KafkaException.class, result.all()); @@ -3402,7 +3822,7 @@ public class KafkaAdminClientTest { @Test @SuppressWarnings("removal") - public void testListConsumerGroupsWithStates() throws Exception { + public void testListConsumerGroupsDeprecatedWithStates() throws Exception { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); @@ -3411,14 +3831,14 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponseFrom( new ListGroupsResponse(new ListGroupsResponseData() .setErrorCode(Errors.NONE.code()) - .setGroups(asList( - new ListGroupsResponseData.ListedGroup() - .setGroupId("group-1") - .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) - .setGroupState("Stable"), - new ListGroupsResponseData.ListedGroup() - .setGroupId("group-2") - .setGroupState("Empty")))), + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState("Stable"), + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-2") + .setGroupState("Empty")))), env.cluster().nodeById(0)); final ListConsumerGroupsOptions options = new ListConsumerGroupsOptions(); @@ -3436,7 +3856,7 @@ public class KafkaAdminClientTest { @Test @SuppressWarnings("removal") - public void testListConsumerGroupsWithTypes() throws Exception { + public void testListConsumerGroupsDeprecatedWithTypes() throws Exception { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); @@ -3444,10 +3864,10 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); env.kafkaClient().prepareResponseFrom( - expectListGroupsRequestWithFilters(singleton(GroupState.STABLE.toString()), Collections.emptySet()), + expectListGroupsRequestWithFilters(Set.of(GroupState.STABLE.toString()), Set.of()), new ListGroupsResponse(new ListGroupsResponseData() .setErrorCode(Errors.NONE.code()) - .setGroups(singletonList( + .setGroups(List.of( new ListGroupsResponseData.ListedGroup() .setGroupId("group-1") .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) @@ -3455,7 +3875,7 @@ public class KafkaAdminClientTest { .setGroupType(GroupType.CLASSIC.toString())))), env.cluster().nodeById(0)); - final ListConsumerGroupsOptions options = new ListConsumerGroupsOptions().inGroupStates(singleton(GroupState.STABLE)); + final ListConsumerGroupsOptions options = new ListConsumerGroupsOptions().inGroupStates(Set.of(GroupState.STABLE)); final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(options); Collection listings = result.valid().get(); @@ -3469,10 +3889,10 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); env.kafkaClient().prepareResponseFrom( - expectListGroupsRequestWithFilters(Collections.emptySet(), singleton(GroupType.CONSUMER.toString())), + expectListGroupsRequestWithFilters(Set.of(), Set.of(GroupType.CONSUMER.toString())), new ListGroupsResponse(new ListGroupsResponseData() .setErrorCode(Errors.NONE.code()) - .setGroups(asList( + .setGroups(List.of( new ListGroupsResponseData.ListedGroup() .setGroupId("group-1") .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) @@ -3499,30 +3919,31 @@ public class KafkaAdminClientTest { @Test @SuppressWarnings("removal") - public void testListConsumerGroupsWithStatesOlderBrokerVersion() throws Exception { + public void testListConsumerGroupsDeprecatedWithStatesOlderBrokerVersion() throws Exception { ApiVersion listGroupV3 = new ApiVersion() - .setApiKey(ApiKeys.LIST_GROUPS.id) - .setMinVersion((short) 0) - .setMaxVersion((short) 3); + .setApiKey(ApiKeys.LIST_GROUPS.id) + .setMinVersion((short) 0) + .setMaxVersion((short) 3); try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(listGroupV3))); + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(List.of(listGroupV3))); env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); // Check we can list groups with older broker if we don't specify states env.kafkaClient().prepareResponseFrom( - new ListGroupsResponse(new ListGroupsResponseData() - .setErrorCode(Errors.NONE.code()) - .setGroups(Collections.singletonList( - new ListGroupsResponseData.ListedGroup() - .setGroupId("group-1") - .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)))), - env.cluster().nodeById(0)); + new ListGroupsResponse(new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)))), + env.cluster().nodeById(0)); + ListConsumerGroupsOptions options = new ListConsumerGroupsOptions(); ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(options); Collection listing = result.all().get(); assertEquals(1, listing.size()); - List expected = Collections.singletonList(new ConsumerGroupListing("group-1", false)); + List expected = List.of(new ConsumerGroupListing("group-1", false)); assertEquals(expected, listing); // But we cannot set a state filter with older broker @@ -3530,7 +3951,7 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareUnsupportedVersionResponse( body -> body instanceof ListGroupsRequest); - options = new ListConsumerGroupsOptions().inGroupStates(singleton(GroupState.STABLE)); + options = new ListConsumerGroupsOptions().inGroupStates(Set.of(GroupState.STABLE)); result = env.adminClient().listConsumerGroups(options); TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); } @@ -3538,34 +3959,34 @@ public class KafkaAdminClientTest { @Test @SuppressWarnings("removal") - public void testListConsumerGroupsWithTypesOlderBrokerVersion() throws Exception { + public void testListConsumerGroupsDeprecatedWithTypesOlderBrokerVersion() throws Exception { ApiVersion listGroupV4 = new ApiVersion() .setApiKey(ApiKeys.LIST_GROUPS.id) .setMinVersion((short) 0) .setMaxVersion((short) 4); try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(listGroupV4))); + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(List.of(listGroupV4))); env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); // Check if we can list groups with older broker if we specify states and don't specify types. env.kafkaClient().prepareResponseFrom( - expectListGroupsRequestWithFilters(singleton(GroupState.STABLE.toString()), Collections.emptySet()), + expectListGroupsRequestWithFilters(Set.of(GroupState.STABLE.toString()), Set.of()), new ListGroupsResponse(new ListGroupsResponseData() .setErrorCode(Errors.NONE.code()) - .setGroups(Collections.singletonList( + .setGroups(List.of( new ListGroupsResponseData.ListedGroup() .setGroupId("group-1") .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) .setGroupState(GroupState.STABLE.toString())))), env.cluster().nodeById(0)); - ListConsumerGroupsOptions options = new ListConsumerGroupsOptions().inGroupStates(singleton(GroupState.STABLE)); + ListConsumerGroupsOptions options = new ListConsumerGroupsOptions().inGroupStates(Set.of(GroupState.STABLE)); ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(options); Collection listing = result.all().get(); assertEquals(1, listing.size()); - List expected = Collections.singletonList( + List expected = List.of( new ConsumerGroupListing("group-1", Optional.of(GroupState.STABLE), false) ); assertEquals(expected, listing); @@ -3576,9 +3997,31 @@ public class KafkaAdminClientTest { request instanceof ListGroupsRequest && !((ListGroupsRequest) request).data().typesFilter().isEmpty() ); - options = new ListConsumerGroupsOptions().withTypes(singleton(GroupType.CLASSIC)); + options = new ListConsumerGroupsOptions().withTypes(Set.of(GroupType.SHARE)); result = env.adminClient().listConsumerGroups(options); TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); + + // But a type filter which is just classic groups is permitted with an older broker, because they + // only know about classic groups so the types filter can be omitted. + env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE)); + + env.kafkaClient().prepareResponseFrom( + expectListGroupsRequestWithFilters(Set.of(), Set.of()), + new ListGroupsResponse(new ListGroupsResponseData() + .setErrorCode(Errors.NONE.code()) + .setGroups(List.of( + new ListGroupsResponseData.ListedGroup() + .setGroupId("group-1") + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState(GroupState.STABLE.toString())))), + env.cluster().nodeById(0)); + + options = new ListConsumerGroupsOptions().withTypes(Set.of(GroupType.CLASSIC)); + result = env.adminClient().listConsumerGroups(options); + + listing = result.all().get(); + assertEquals(1, listing.size()); + assertEquals(expected, listing); } } @@ -6117,7 +6560,7 @@ public class KafkaAdminClientTest { .setGroups(Collections.emptyList())), env.cluster().nodeById(3)); - final ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.STREAMS))); + final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forStreamsGroups()); TestUtils.assertFutureThrows(UnknownServerException.class, result.all()); Collection listings = result.valid().get(); @@ -6152,7 +6595,7 @@ public class KafkaAdminClientTest { -1, Collections.emptyList())); - final ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.STREAMS))); + final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forStreamsGroups()); TestUtils.assertFutureThrows(KafkaException.class, result.all()); } } @@ -6180,7 +6623,7 @@ public class KafkaAdminClientTest { .setGroupState("NotReady")))), env.cluster().nodeById(0)); - final ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.STREAMS))); + final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forStreamsGroups()); Collection listings = result.valid().get(); assertEquals(2, listings.size()); @@ -6211,7 +6654,7 @@ public class KafkaAdminClientTest { new ListGroupsResponseData.ListedGroup() .setGroupId("streams-group-1")))), env.cluster().nodeById(0)); - ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.STREAMS))); + ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forStreamsGroups()); TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); } } @@ -6527,7 +6970,7 @@ public class KafkaAdminClientTest { .setGroups(Collections.emptyList())), env.cluster().nodeById(3)); - final ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.SHARE))); + final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forShareGroups()); TestUtils.assertFutureThrows(UnknownServerException.class, result.all()); Collection listings = result.valid().get(); @@ -6562,7 +7005,7 @@ public class KafkaAdminClientTest { -1, Collections.emptyList())); - final ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.SHARE))); + final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forShareGroups()); TestUtils.assertFutureThrows(KafkaException.class, result.all()); } } @@ -6590,7 +7033,7 @@ public class KafkaAdminClientTest { .setGroupState("Empty")))), env.cluster().nodeById(0)); - final ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.SHARE))); + final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forShareGroups()); Collection listings = result.valid().get(); assertEquals(2, listings.size()); @@ -6621,7 +7064,7 @@ public class KafkaAdminClientTest { new ListGroupsResponseData.ListedGroup() .setGroupId("share-group-1")))), env.cluster().nodeById(0)); - ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.SHARE))); + ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forShareGroups()); TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all()); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/ListGroupsOptionsTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/ListGroupsOptionsTest.java new file mode 100644 index 00000000000..360da83b8da --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/admin/ListGroupsOptionsTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.GroupState; +import org.apache.kafka.common.GroupType; + +import org.junit.jupiter.api.Test; + +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ListGroupsOptionsTest { + @Test + public void testForConsumerGroups() { + ListGroupsOptions options = ListGroupsOptions.forConsumerGroups(); + assertTrue(options.groupStates().isEmpty()); + assertEquals(Set.of(GroupType.CONSUMER, GroupType.CLASSIC), options.types()); + assertEquals(Set.of("", ConsumerProtocol.PROTOCOL_TYPE), options.protocolTypes()); + + options.inGroupStates(Set.of(GroupState.STABLE)); + options.withTypes(Set.of(GroupType.CONSUMER)); + options.withProtocolTypes(Set.of(ConsumerProtocol.PROTOCOL_TYPE)); + assertEquals(Set.of(GroupState.STABLE), options.groupStates()); + assertEquals(Set.of(GroupType.CONSUMER), options.types()); + assertEquals(Set.of(ConsumerProtocol.PROTOCOL_TYPE), options.protocolTypes()); + } + + @Test + public void testForShareGroups() { + ListGroupsOptions options = ListGroupsOptions.forShareGroups(); + assertTrue(options.groupStates().isEmpty()); + assertEquals(Set.of(GroupType.SHARE), options.types()); + assertTrue(options.protocolTypes().isEmpty()); + + options.inGroupStates(Set.of(GroupState.STABLE)); + options.withTypes(Set.of(GroupType.CONSUMER)); + options.withProtocolTypes(Set.of(ConsumerProtocol.PROTOCOL_TYPE)); + assertEquals(Set.of(GroupState.STABLE), options.groupStates()); + assertEquals(Set.of(GroupType.CONSUMER), options.types()); + assertEquals(Set.of(ConsumerProtocol.PROTOCOL_TYPE), options.protocolTypes()); + } + + @Test + public void testForStreamsGroups() { + ListGroupsOptions options = ListGroupsOptions.forStreamsGroups(); + assertTrue(options.groupStates().isEmpty()); + assertEquals(Set.of(GroupType.STREAMS), options.types()); + assertTrue(options.protocolTypes().isEmpty()); + + options.inGroupStates(Set.of(GroupState.STABLE)); + options.withTypes(Set.of(GroupType.CONSUMER)); + options.withProtocolTypes(Set.of(ConsumerProtocol.PROTOCOL_TYPE)); + assertEquals(Set.of(GroupState.STABLE), options.groupStates()); + assertEquals(Set.of(GroupType.CONSUMER), options.types()); + assertEquals(Set.of(ConsumerProtocol.PROTOCOL_TYPE), options.protocolTypes()); + } + + @Test + public void testGroupStates() { + ListGroupsOptions options = new ListGroupsOptions(); + assertTrue(options.groupStates().isEmpty()); + + options.inGroupStates(Set.of(GroupState.DEAD)); + assertEquals(Set.of(GroupState.DEAD), options.groupStates()); + + Set groupStates = Set.of(GroupState.values()); + options = new ListGroupsOptions().inGroupStates(groupStates); + assertEquals(groupStates, options.groupStates()); + } + + @Test + public void testProtocolTypes() { + ListGroupsOptions options = new ListGroupsOptions(); + assertTrue(options.protocolTypes().isEmpty()); + + options.withProtocolTypes(Set.of(ConsumerProtocol.PROTOCOL_TYPE)); + assertEquals(Set.of(ConsumerProtocol.PROTOCOL_TYPE), options.protocolTypes()); + + Set protocolTypes = Set.of("", "consumer", "share"); + options = new ListGroupsOptions().withProtocolTypes(protocolTypes); + assertEquals(protocolTypes, options.protocolTypes()); + } + + @Test + public void testTypes() { + ListGroupsOptions options = new ListGroupsOptions(); + assertTrue(options.types().isEmpty()); + + options.withTypes(Set.of(GroupType.CLASSIC)); + assertEquals(Set.of(GroupType.CLASSIC), options.types()); + + Set groupTypes = Set.of(GroupType.values()); + options = new ListGroupsOptions().withTypes(groupTypes); + assertEquals(groupTypes, options.types()); + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 0f6b71d8092..ee1fbbca2da 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -21,7 +21,7 @@ import joptsimple._ import kafka.server.DynamicConfig import kafka.utils.Implicits._ import kafka.utils.Logging -import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListGroupsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism} +import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism} import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors.{InvalidConfigurationException, UnsupportedVersionException} import org.apache.kafka.common.internals.Topic @@ -352,7 +352,7 @@ object ConfigCommand extends Logging { case ClientMetricsType => adminClient.listClientMetricsResources().all().get().asScala.map(_.name).toSeq case GroupType => - adminClient.listGroups(ListGroupsOptions.forConsumerGroups()).all.get.asScala.map(_.groupId).toSeq + adminClient.listGroups().all.get.asScala.map(_.groupId).toSeq case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType") }) diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java index 1b9c13a8f98..94d24fc34a9 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java @@ -177,10 +177,11 @@ public class ConsumerGroupCommand { @SuppressWarnings("Regexp") static Set consumerGroupTypesFromString(String input) { + Set validTypes = Set.of(GroupType.CLASSIC, GroupType.CONSUMER); Set parsedTypes = Stream.of(input.toLowerCase().split(",")).map(s -> GroupType.parse(s.trim())).collect(Collectors.toSet()); - if (parsedTypes.contains(GroupType.UNKNOWN)) { - List validTypes = Arrays.stream(GroupType.values()).filter(t -> t != GroupType.UNKNOWN).map(Object::toString).collect(Collectors.toList()); - throw new IllegalArgumentException("Invalid types list '" + input + "'. Valid types are: " + String.join(", ", validTypes)); + if (!validTypes.containsAll(parsedTypes)) { + throw new IllegalArgumentException("Invalid types list '" + input + "'. Valid types are: " + + String.join(", ", validTypes.stream().map(GroupType::toString).collect(Collectors.toSet()))); } return parsedTypes; } diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java index 11a9a020e5c..dcbf2510922 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java @@ -153,9 +153,8 @@ public class ShareGroupCommand { List listShareGroups() { try { - ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions() - .timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue()) - .withTypes(Set.of(GroupType.SHARE))); + ListGroupsResult result = adminClient.listGroups(ListGroupsOptions.forShareGroups() + .timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())); Collection listings = result.all().get(); return listings.stream().map(GroupListing::groupId).collect(Collectors.toList()); } catch (InterruptedException | ExecutionException e) { @@ -165,9 +164,8 @@ public class ShareGroupCommand { List listDetailedShareGroups() { try { - ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions() - .timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue()) - .withTypes(Set.of(GroupType.SHARE))); + ListGroupsResult result = adminClient.listGroups(ListGroupsOptions.forShareGroups() + .timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())); Collection listings = result.all().get(); return listings.stream().toList(); } catch (InterruptedException | ExecutionException e) { @@ -176,9 +174,8 @@ public class ShareGroupCommand { } List listShareGroupsInStates(Set states) throws ExecutionException, InterruptedException { - ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions() + ListGroupsResult result = adminClient.listGroups(ListGroupsOptions.forShareGroups() .timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue()) - .withTypes(Set.of(GroupType.SHARE)) .inGroupStates(states)); return new ArrayList<>(result.all().get()); } diff --git a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java index 1a8be4104aa..0422786e7cb 100644 --- a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java @@ -136,9 +136,8 @@ public class StreamsGroupCommand { List listStreamsGroups() { try { - ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions() - .timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue()) - .withTypes(Set.of(GroupType.STREAMS))); + ListGroupsResult result = adminClient.listGroups(ListGroupsOptions.forStreamsGroups() + .timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())); Collection listings = result.all().get(); return listings.stream().map(GroupListing::groupId).collect(Collectors.toList()); } catch (InterruptedException | ExecutionException e) { @@ -147,9 +146,8 @@ public class StreamsGroupCommand { } List listStreamsGroupsInStates(Set states) throws ExecutionException, InterruptedException { - ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions() + ListGroupsResult result = adminClient.listGroups(ListGroupsOptions.forStreamsGroups() .timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue()) - .withTypes(Set.of(GroupType.STREAMS)) .inGroupStates(states)); return new ArrayList<>(result.all().get()); } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java index dba06a3e098..fd50aa3b7d7 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java @@ -662,6 +662,10 @@ class ListConsumerGroupUnitTest { result = ConsumerGroupCommand.consumerGroupTypesFromString("Consumer, Classic"); Assertions.assertEquals(ListConsumerGroupTest.set(Arrays.asList(GroupType.CONSUMER, GroupType.CLASSIC)), result); + Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumerGroupCommand.consumerGroupTypesFromString("Share")); + + Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumerGroupCommand.consumerGroupTypesFromString("streams")); + Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumerGroupCommand.consumerGroupTypesFromString("bad, wrong")); Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumerGroupCommand.consumerGroupTypesFromString(" bad, generic"));