KAFKA-17897: Deprecate Admin.listConsumerGroups [2/N] (#19508)
CI / build (push) Waiting to run Details

Admin.listConsumerGroups() was able to use the early versions of
ListGroups RPC with the version used dependent upon the filters the user
specified. Admin.listGroups(ListGroupsOptions.forConsumerGroups())
inadvertently required ListGroups v5 because it always set a types
filter. This patch handles the UnsupportedVersionException and winds
back the complexity of the request unless the user has specified filters
which demand a higher version.

It also adds ListGroupsOptions.forShareGroups() and forStreamsGroups().
The usability of Admin.listGroups() is much improved as a result.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, PoAn Yang
 <payang@apache.org>
This commit is contained in:
Andrew Schofield 2025-05-09 08:38:16 +01:00 committed by GitHub
parent 1ccaddaa70
commit 70c0aca4b7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 761 additions and 166 deletions

View File

@ -3511,27 +3511,29 @@ public class KafkaAdminClient extends AdminClient {
}
private void maybeAddGroup(ListGroupsResponseData.ListedGroup group) {
final String groupId = group.groupId();
final Optional<GroupType> type;
if (group.groupType() == null || group.groupType().isEmpty()) {
type = Optional.empty();
} else {
type = Optional.of(GroupType.parse(group.groupType()));
String protocolType = group.protocolType();
if (options.protocolTypes().isEmpty() || options.protocolTypes().contains(protocolType)) {
final String groupId = group.groupId();
final Optional<GroupType> type;
if (group.groupType() == null || group.groupType().isEmpty()) {
type = Optional.empty();
} else {
type = Optional.of(GroupType.parse(group.groupType()));
}
final Optional<GroupState> groupState;
if (group.groupState() == null || group.groupState().isEmpty()) {
groupState = Optional.empty();
} else {
groupState = Optional.of(GroupState.parse(group.groupState()));
}
final GroupListing groupListing = new GroupListing(
groupId,
type,
protocolType,
groupState
);
results.addListing(groupListing);
}
final String protocolType = group.protocolType();
final Optional<GroupState> groupState;
if (group.groupState() == null || group.groupState().isEmpty()) {
groupState = Optional.empty();
} else {
groupState = Optional.of(GroupState.parse(group.groupState()));
}
final GroupListing groupListing = new GroupListing(
groupId,
type,
protocolType,
groupState
);
results.addListing(groupListing);
}
@Override

View File

@ -46,6 +46,24 @@ public class ListGroupsOptions extends AbstractOptions<ListGroupsOptions> {
.withProtocolTypes(Set.of("", ConsumerProtocol.PROTOCOL_TYPE));
}
/**
* Only share groups will be returned by listGroups().
* This operation sets a filter on group type which select share groups.
*/
public static ListGroupsOptions forShareGroups() {
return new ListGroupsOptions()
.withTypes(Set.of(GroupType.SHARE));
}
/**
* Only streams groups will be returned by listGroups().
* This operation sets a filter on group type which select streams groups.
*/
public static ListGroupsOptions forStreamsGroups() {
return new ListGroupsOptions()
.withTypes(Set.of(GroupType.STREAMS));
}
/**
* If groupStates is set, only groups in these states will be returned by listGroups().
* Otherwise, all groups are returned.
@ -56,6 +74,10 @@ public class ListGroupsOptions extends AbstractOptions<ListGroupsOptions> {
return this;
}
/**
* If protocol types is set, only groups of these protocol types will be returned by listGroups().
* Otherwise, all groups are returned.
*/
public ListGroupsOptions withProtocolTypes(Set<String> protocolTypes) {
this.protocolTypes = (protocolTypes == null || protocolTypes.isEmpty()) ? Set.of() : Set.copyOf(protocolTypes);
return this;

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
@ -24,6 +25,8 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
/**
* Possible error codes:
@ -50,8 +53,19 @@ public class ListGroupsRequest extends AbstractRequest {
"v" + version + ", but we need v4 or newer to request groups by states.");
}
if (!data.typesFilter().isEmpty() && version < 5) {
throw new UnsupportedVersionException("The broker only supports ListGroups " +
"v" + version + ", but we need v5 or newer to request groups by type.");
// Types filter is supported by brokers with version 3.8.0 or later. Older brokers only support
// classic groups, so listing consumer groups on an older broker does not need to use a types filter.
// If the types filter is only for consumer and classic, or just classic groups, it can be safely omitted.
// This allows a modern admin client to list consumer groups on older brokers in a straightforward way.
HashSet<String> typesCopy = new HashSet<>(data.typesFilter());
boolean containedClassic = typesCopy.remove(GroupType.CLASSIC.toString());
boolean containedConsumer = typesCopy.remove(GroupType.CONSUMER.toString());
if (!typesCopy.isEmpty() || (!containedClassic && containedConsumer)) {
throw new UnsupportedVersionException("The broker only supports ListGroups " +
"v" + version + ", but we need v5 or newer to request groups by type. " +
"Requested group types: [" + String.join(", ", data.typesFilter()) + "].");
}
return new ListGroupsRequest(data.duplicate().setTypesFilter(List.of()), version);
}
return new ListGroupsRequest(data, version);
}

View File

@ -3189,6 +3189,42 @@ public class KafkaAdminClientTest {
}
}
@Test
public void testListGroupsWithProtocolTypes() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
// Test with list group options.
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
env.kafkaClient().prepareResponseFrom(
expectListGroupsRequestWithFilters(Set.of(), Set.of()),
new ListGroupsResponse(new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(List.of(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-1")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState("Stable")
.setGroupType(GroupType.CONSUMER.toString()),
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-2")
.setGroupState("Empty")
.setGroupType(GroupType.CONSUMER.toString())))),
env.cluster().nodeById(0));
final ListGroupsOptions options = new ListGroupsOptions().withProtocolTypes(Set.of(""));
final ListGroupsResult result = env.adminClient().listGroups(options);
Collection<GroupListing> listing = result.valid().get();
assertEquals(1, listing.size());
List<GroupListing> expected = new ArrayList<>();
expected.add(new GroupListing("group-2", Optional.of(GroupType.CONSUMER), "", Optional.of(GroupState.EMPTY)));
assertEquals(expected, listing);
assertEquals(0, result.errors().get().size());
}
}
@Test
public void testListGroupsWithTypes() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
@ -3227,15 +3263,13 @@ public class KafkaAdminClientTest {
}
@Test
public void testListGroupsWithTypesOlderBrokerVersion() {
public void testListGroupsWithTypesOlderBrokerVersion() throws Exception {
ApiVersion listGroupV4 = new ApiVersion()
.setApiKey(ApiKeys.LIST_GROUPS.id)
.setMinVersion((short) 0)
.setMaxVersion((short) 4);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(listGroupV4)));
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(List.of(listGroupV4)));
// Check that we cannot set a type filter with an older broker.
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
@ -3243,9 +3277,44 @@ public class KafkaAdminClientTest {
request instanceof ListGroupsRequest && !((ListGroupsRequest) request).data().typesFilter().isEmpty()
);
ListGroupsOptions options = new ListGroupsOptions().withTypes(singleton(GroupType.CLASSIC));
ListGroupsOptions options = new ListGroupsOptions().withTypes(Set.of(GroupType.SHARE));
ListGroupsResult result = env.adminClient().listGroups(options);
TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all());
// But a type filter which is just classic groups is permitted with an older broker, because they
// only know about classic groups so the types filter can be omitted.
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
env.kafkaClient().prepareResponseFrom(
expectListGroupsRequestWithFilters(Set.of(), Set.of()),
new ListGroupsResponse(new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(List.of(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-1")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState(GroupState.STABLE.toString())))),
env.cluster().nodeById(0));
options = new ListGroupsOptions().withTypes(Set.of(GroupType.CLASSIC));
result = env.adminClient().listGroups(options);
Collection<GroupListing> listing = result.all().get();
assertEquals(1, listing.size());
List<GroupListing> expected = List.of(
new GroupListing("group-1", Optional.empty(), ConsumerProtocol.PROTOCOL_TYPE, Optional.of(GroupState.STABLE))
);
assertEquals(expected, listing);
// But a type filter which is just consumer groups without classic groups is not permitted with an older broker.
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
env.kafkaClient().prepareUnsupportedVersionResponse(request ->
request instanceof ListGroupsRequest && !((ListGroupsRequest) request).data().typesFilter().isEmpty()
);
options = new ListGroupsOptions().withTypes(Set.of(GroupType.CONSUMER));
result = env.adminClient().listGroups(options);
TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all());
}
}
@ -3267,7 +3336,6 @@ public class KafkaAdminClientTest {
}
@Test
@SuppressWarnings("removal")
public void testListConsumerGroups() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0),
AdminClientConfig.RETRIES_CONFIG, "2")) {
@ -3275,89 +3343,441 @@ public class KafkaAdminClientTest {
// Empty metadata response should be retried
env.kafkaClient().prepareResponse(
RequestTestUtils.metadataResponse(
Collections.emptyList(),
env.cluster().clusterResource().clusterId(),
-1,
Collections.emptyList()));
RequestTestUtils.metadataResponse(
List.of(),
env.cluster().clusterResource().clusterId(),
-1,
List.of()));
env.kafkaClient().prepareResponse(
RequestTestUtils.metadataResponse(
env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
env.cluster().controller().id(),
Collections.emptyList()));
RequestTestUtils.metadataResponse(
env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
env.cluster().controller().id(),
List.of()));
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(asList(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-1")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState("Stable"),
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-connect-1")
.setProtocolType("connector")
.setGroupState("Stable")
))),
env.cluster().nodeById(0));
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(List.of(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-1")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState("Stable"),
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-connect-1")
.setProtocolType("connector")
.setGroupState("Stable")
))),
env.cluster().nodeById(0));
// handle retriable errors
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setGroups(Collections.emptyList())
),
env.cluster().nodeById(1));
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setGroups(Collections.emptyList())
),
env.cluster().nodeById(1));
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
.setGroups(Collections.emptyList())
),
env.cluster().nodeById(1));
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
.setGroups(Collections.emptyList())
),
env.cluster().nodeById(1));
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(asList(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-2")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState("Stable"),
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-connect-2")
.setProtocolType("connector")
.setGroupState("Stable")
))),
env.cluster().nodeById(1));
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(asList(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-2")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState("Stable"),
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-connect-2")
.setProtocolType("connector")
.setGroupState("Stable")
))),
env.cluster().nodeById(1));
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(asList(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-3")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState("Stable"),
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-connect-3")
.setProtocolType("connector")
.setGroupState("Stable")
))),
env.cluster().nodeById(2));
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(List.of(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-3")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState("Stable"),
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-connect-3")
.setProtocolType("connector")
.setGroupState("Stable")
))),
env.cluster().nodeById(2));
// fatal error
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setGroups(Collections.emptyList())),
env.cluster().nodeById(3));
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setGroups(Collections.emptyList())),
env.cluster().nodeById(3));
final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forConsumerGroups());
TestUtils.assertFutureThrows(UnknownServerException.class, result.all());
Collection<GroupListing> listings = result.valid().get();
assertEquals(3, listings.size());
Set<String> groupIds = new HashSet<>();
for (GroupListing listing : listings) {
groupIds.add(listing.groupId());
assertTrue(listing.groupState().isPresent());
}
assertEquals(Set.of("group-1", "group-2", "group-3"), groupIds);
assertEquals(1, result.errors().get().size());
}
}
@Test
public void testListConsumerGroupsMetadataFailure() throws Exception {
final Cluster cluster = mockCluster(3, 0);
final Time time = new MockTime();
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
AdminClientConfig.RETRIES_CONFIG, "0")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
// Empty metadata causes the request to fail since we have no list of brokers
// to send the ListGroups requests to
env.kafkaClient().prepareResponse(
RequestTestUtils.metadataResponse(
List.of(),
env.cluster().clusterResource().clusterId(),
-1,
List.of()));
final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forConsumerGroups());
TestUtils.assertFutureThrows(KafkaException.class, result.all());
}
}
@Test
public void testListConsumerGroupsWithStates() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(List.of(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-1")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState("Stable"),
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-2")
.setGroupState("Empty")))),
env.cluster().nodeById(0));
final ListGroupsOptions options = ListGroupsOptions.forConsumerGroups();
final ListGroupsResult result = env.adminClient().listGroups(options);
Collection<GroupListing> listings = result.valid().get();
assertEquals(2, listings.size());
List<GroupListing> expected = new ArrayList<>();
expected.add(new GroupListing("group-2", Optional.empty(), "", Optional.of(GroupState.EMPTY)));
expected.add(new GroupListing("group-1", Optional.empty(), ConsumerProtocol.PROTOCOL_TYPE, Optional.of(GroupState.STABLE)));
assertEquals(expected, listings);
assertEquals(0, result.errors().get().size());
}
}
@Test
public void testListConsumerGroupsWithProtocolTypes() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
// Test with a specific protocol type filter in list consumer group options.
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
env.kafkaClient().prepareResponseFrom(
expectListGroupsRequestWithFilters(Set.of(), Set.of(GroupType.CONSUMER.toString(), GroupType.CLASSIC.toString())),
new ListGroupsResponse(new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(List.of(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-1")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState("Stable")
.setGroupType(GroupType.CONSUMER.toString()),
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-2")
.setGroupState("Empty")
.setGroupType(GroupType.CONSUMER.toString())))),
env.cluster().nodeById(0));
final ListGroupsOptions options = ListGroupsOptions.forConsumerGroups().withProtocolTypes(Set.of(ConsumerProtocol.PROTOCOL_TYPE));
final ListGroupsResult result = env.adminClient().listGroups(options);
Collection<GroupListing> listings = result.valid().get();
assertEquals(1, listings.size());
List<GroupListing> expected = new ArrayList<>();
expected.add(new GroupListing("group-1", Optional.of(GroupType.CONSUMER), ConsumerProtocol.PROTOCOL_TYPE, Optional.of(GroupState.STABLE)));
assertEquals(expected, listings);
assertEquals(0, result.errors().get().size());
}
}
@Test
public void testListConsumerGroupsWithTypes() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
// Test with a specific state filter but no type filter in list consumer group options.
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
env.kafkaClient().prepareResponseFrom(
expectListGroupsRequestWithFilters(Set.of(GroupState.STABLE.toString()), Set.of(GroupType.CONSUMER.toString(), GroupType.CLASSIC.toString())),
new ListGroupsResponse(new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(List.of(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-1")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState("Stable")
.setGroupType(GroupType.CLASSIC.toString())))),
env.cluster().nodeById(0));
final ListGroupsOptions options = ListGroupsOptions.forConsumerGroups().inGroupStates(Set.of(GroupState.STABLE));
final ListGroupsResult result = env.adminClient().listGroups(options);
Collection<GroupListing> listings = result.valid().get();
assertEquals(1, listings.size());
List<GroupListing> expected = new ArrayList<>();
expected.add(new GroupListing("group-1", Optional.of(GroupType.CLASSIC), ConsumerProtocol.PROTOCOL_TYPE, Optional.of(GroupState.STABLE)));
assertEquals(expected, listings);
assertEquals(0, result.errors().get().size());
// Test with list consumer group options.
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
env.kafkaClient().prepareResponseFrom(
expectListGroupsRequestWithFilters(Set.of(), Set.of(GroupType.CONSUMER.toString())),
new ListGroupsResponse(new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(List.of(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-1")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState("Stable")
.setGroupType(GroupType.CONSUMER.toString()),
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-2")
.setGroupState("Empty")
.setGroupType(GroupType.CONSUMER.toString())))),
env.cluster().nodeById(0));
final ListGroupsOptions options2 = ListGroupsOptions.forConsumerGroups().withTypes(Set.of(GroupType.CONSUMER));
final ListGroupsResult result2 = env.adminClient().listGroups(options2);
Collection<GroupListing> listings2 = result2.valid().get();
assertEquals(2, listings2.size());
List<GroupListing> expected2 = new ArrayList<>();
expected2.add(new GroupListing("group-2", Optional.of(GroupType.CONSUMER), "", Optional.of(GroupState.EMPTY)));
expected2.add(new GroupListing("group-1", Optional.of(GroupType.CONSUMER), ConsumerProtocol.PROTOCOL_TYPE, Optional.of(GroupState.STABLE)));
assertEquals(expected2, listings2);
assertEquals(0, result.errors().get().size());
}
}
@Test
public void testListConsumerGroupsWithStatesOlderBrokerVersion() throws Exception {
ApiVersion listGroupV3 = new ApiVersion()
.setApiKey(ApiKeys.LIST_GROUPS.id)
.setMinVersion((short) 0)
.setMaxVersion((short) 3);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(List.of(listGroupV3)));
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
// Check we can list groups v3 with older broker if we don't specify states, and use just consumer group types which can be omitted.
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(List.of(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-1")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)))),
env.cluster().nodeById(0));
ListGroupsOptions options = ListGroupsOptions.forConsumerGroups();
ListGroupsResult result = env.adminClient().listGroups(options);
Collection<GroupListing> listing = result.all().get();
assertEquals(1, listing.size());
List<GroupListing> expected = List.of(new GroupListing("group-1", Optional.empty(), ConsumerProtocol.PROTOCOL_TYPE, Optional.empty()));
assertEquals(expected, listing);
// But we cannot set a state filter with older broker
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
env.kafkaClient().prepareUnsupportedVersionResponse(request ->
request instanceof ListGroupsRequest &&
!((ListGroupsRequest) request).data().statesFilter().isEmpty()
);
options = ListGroupsOptions.forConsumerGroups().inGroupStates(Set.of(GroupState.STABLE));
result = env.adminClient().listGroups(options);
TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all());
}
}
@Test
public void testListConsumerGroupsWithTypesOlderBrokerVersion() throws Exception {
ApiVersion listGroupV4 = new ApiVersion()
.setApiKey(ApiKeys.LIST_GROUPS.id)
.setMinVersion((short) 0)
.setMaxVersion((short) 4);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(List.of(listGroupV4)));
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
// Check if we can list groups v4 with older broker if we specify states and don't specify types.
env.kafkaClient().prepareResponseFrom(
expectListGroupsRequestWithFilters(Set.of(GroupState.STABLE.toString()), Set.of()),
new ListGroupsResponse(new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(List.of(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-1")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState(GroupState.STABLE.toString())))),
env.cluster().nodeById(0));
ListGroupsOptions options = ListGroupsOptions.forConsumerGroups().inGroupStates(Set.of(GroupState.STABLE));
ListGroupsResult result = env.adminClient().listGroups(options);
Collection<GroupListing> listing = result.all().get();
assertEquals(1, listing.size());
List<GroupListing> expected = List.of(
new GroupListing("group-1", Optional.empty(), ConsumerProtocol.PROTOCOL_TYPE, Optional.of(GroupState.STABLE))
);
assertEquals(expected, listing);
// Check that we cannot set a type filter with an older broker.
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
// First attempt to build request will require v5 (type filter), but the broker only supports v4
env.kafkaClient().prepareUnsupportedVersionResponse(request ->
request instanceof ListGroupsRequest && !((ListGroupsRequest) request).data().typesFilter().isEmpty()
);
options = ListGroupsOptions.forConsumerGroups().withTypes(Set.of(GroupType.SHARE));
result = env.adminClient().listGroups(options);
TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all());
}
}
@Test
@SuppressWarnings("removal")
public void testListConsumerGroupsDeprecated() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0),
AdminClientConfig.RETRIES_CONFIG, "2")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
// Empty metadata response should be retried
env.kafkaClient().prepareResponse(
RequestTestUtils.metadataResponse(
List.of(),
env.cluster().clusterResource().clusterId(),
-1,
List.of()));
env.kafkaClient().prepareResponse(
RequestTestUtils.metadataResponse(
env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
env.cluster().controller().id(),
List.of()));
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(List.of(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-1")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState("Stable"),
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-connect-1")
.setProtocolType("connector")
.setGroupState("Stable")
))),
env.cluster().nodeById(0));
// handle retriable errors
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
.setGroups(List.of())
),
env.cluster().nodeById(1));
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
.setGroups(List.of())
),
env.cluster().nodeById(1));
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(List.of(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-2")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState("Stable"),
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-connect-2")
.setProtocolType("connector")
.setGroupState("Stable")
))),
env.cluster().nodeById(1));
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(List.of(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-3")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState("Stable"),
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-connect-3")
.setProtocolType("connector")
.setGroupState("Stable")
))),
env.cluster().nodeById(2));
// fatal error
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(
new ListGroupsResponseData()
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setGroups(List.of())),
env.cluster().nodeById(3));
final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups();
TestUtils.assertFutureThrows(UnknownServerException.class, result.all());
@ -3378,22 +3798,22 @@ public class KafkaAdminClientTest {
@Test
@SuppressWarnings("removal")
public void testListConsumerGroupsMetadataFailure() throws Exception {
public void testListConsumerGroupsDeprecatedMetadataFailure() throws Exception {
final Cluster cluster = mockCluster(3, 0);
final Time time = new MockTime();
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
AdminClientConfig.RETRIES_CONFIG, "0")) {
AdminClientConfig.RETRIES_CONFIG, "0")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
// Empty metadata causes the request to fail since we have no list of brokers
// to send the ListGroups requests to
env.kafkaClient().prepareResponse(
RequestTestUtils.metadataResponse(
Collections.emptyList(),
env.cluster().clusterResource().clusterId(),
-1,
Collections.emptyList()));
RequestTestUtils.metadataResponse(
List.of(),
env.cluster().clusterResource().clusterId(),
-1,
List.of()));
final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups();
TestUtils.assertFutureThrows(KafkaException.class, result.all());
@ -3402,7 +3822,7 @@ public class KafkaAdminClientTest {
@Test
@SuppressWarnings("removal")
public void testListConsumerGroupsWithStates() throws Exception {
public void testListConsumerGroupsDeprecatedWithStates() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
@ -3411,14 +3831,14 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(asList(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-1")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState("Stable"),
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-2")
.setGroupState("Empty")))),
.setGroups(List.of(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-1")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState("Stable"),
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-2")
.setGroupState("Empty")))),
env.cluster().nodeById(0));
final ListConsumerGroupsOptions options = new ListConsumerGroupsOptions();
@ -3436,7 +3856,7 @@ public class KafkaAdminClientTest {
@Test
@SuppressWarnings("removal")
public void testListConsumerGroupsWithTypes() throws Exception {
public void testListConsumerGroupsDeprecatedWithTypes() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
@ -3444,10 +3864,10 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
env.kafkaClient().prepareResponseFrom(
expectListGroupsRequestWithFilters(singleton(GroupState.STABLE.toString()), Collections.emptySet()),
expectListGroupsRequestWithFilters(Set.of(GroupState.STABLE.toString()), Set.of()),
new ListGroupsResponse(new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(singletonList(
.setGroups(List.of(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-1")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
@ -3455,7 +3875,7 @@ public class KafkaAdminClientTest {
.setGroupType(GroupType.CLASSIC.toString())))),
env.cluster().nodeById(0));
final ListConsumerGroupsOptions options = new ListConsumerGroupsOptions().inGroupStates(singleton(GroupState.STABLE));
final ListConsumerGroupsOptions options = new ListConsumerGroupsOptions().inGroupStates(Set.of(GroupState.STABLE));
final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(options);
Collection<ConsumerGroupListing> listings = result.valid().get();
@ -3469,10 +3889,10 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
env.kafkaClient().prepareResponseFrom(
expectListGroupsRequestWithFilters(Collections.emptySet(), singleton(GroupType.CONSUMER.toString())),
expectListGroupsRequestWithFilters(Set.of(), Set.of(GroupType.CONSUMER.toString())),
new ListGroupsResponse(new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(asList(
.setGroups(List.of(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-1")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
@ -3499,30 +3919,31 @@ public class KafkaAdminClientTest {
@Test
@SuppressWarnings("removal")
public void testListConsumerGroupsWithStatesOlderBrokerVersion() throws Exception {
public void testListConsumerGroupsDeprecatedWithStatesOlderBrokerVersion() throws Exception {
ApiVersion listGroupV3 = new ApiVersion()
.setApiKey(ApiKeys.LIST_GROUPS.id)
.setMinVersion((short) 0)
.setMaxVersion((short) 3);
.setApiKey(ApiKeys.LIST_GROUPS.id)
.setMinVersion((short) 0)
.setMaxVersion((short) 3);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(listGroupV3)));
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(List.of(listGroupV3)));
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
// Check we can list groups with older broker if we don't specify states
env.kafkaClient().prepareResponseFrom(
new ListGroupsResponse(new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(Collections.singletonList(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-1")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)))),
env.cluster().nodeById(0));
new ListGroupsResponse(new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(List.of(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-1")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)))),
env.cluster().nodeById(0));
ListConsumerGroupsOptions options = new ListConsumerGroupsOptions();
ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(options);
Collection<ConsumerGroupListing> listing = result.all().get();
assertEquals(1, listing.size());
List<ConsumerGroupListing> expected = Collections.singletonList(new ConsumerGroupListing("group-1", false));
List<ConsumerGroupListing> expected = List.of(new ConsumerGroupListing("group-1", false));
assertEquals(expected, listing);
// But we cannot set a state filter with older broker
@ -3530,7 +3951,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareUnsupportedVersionResponse(
body -> body instanceof ListGroupsRequest);
options = new ListConsumerGroupsOptions().inGroupStates(singleton(GroupState.STABLE));
options = new ListConsumerGroupsOptions().inGroupStates(Set.of(GroupState.STABLE));
result = env.adminClient().listConsumerGroups(options);
TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all());
}
@ -3538,34 +3959,34 @@ public class KafkaAdminClientTest {
@Test
@SuppressWarnings("removal")
public void testListConsumerGroupsWithTypesOlderBrokerVersion() throws Exception {
public void testListConsumerGroupsDeprecatedWithTypesOlderBrokerVersion() throws Exception {
ApiVersion listGroupV4 = new ApiVersion()
.setApiKey(ApiKeys.LIST_GROUPS.id)
.setMinVersion((short) 0)
.setMaxVersion((short) 4);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(listGroupV4)));
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(List.of(listGroupV4)));
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
// Check if we can list groups with older broker if we specify states and don't specify types.
env.kafkaClient().prepareResponseFrom(
expectListGroupsRequestWithFilters(singleton(GroupState.STABLE.toString()), Collections.emptySet()),
expectListGroupsRequestWithFilters(Set.of(GroupState.STABLE.toString()), Set.of()),
new ListGroupsResponse(new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(Collections.singletonList(
.setGroups(List.of(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-1")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState(GroupState.STABLE.toString())))),
env.cluster().nodeById(0));
ListConsumerGroupsOptions options = new ListConsumerGroupsOptions().inGroupStates(singleton(GroupState.STABLE));
ListConsumerGroupsOptions options = new ListConsumerGroupsOptions().inGroupStates(Set.of(GroupState.STABLE));
ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(options);
Collection<ConsumerGroupListing> listing = result.all().get();
assertEquals(1, listing.size());
List<ConsumerGroupListing> expected = Collections.singletonList(
List<ConsumerGroupListing> expected = List.of(
new ConsumerGroupListing("group-1", Optional.of(GroupState.STABLE), false)
);
assertEquals(expected, listing);
@ -3576,9 +3997,31 @@ public class KafkaAdminClientTest {
request instanceof ListGroupsRequest && !((ListGroupsRequest) request).data().typesFilter().isEmpty()
);
options = new ListConsumerGroupsOptions().withTypes(singleton(GroupType.CLASSIC));
options = new ListConsumerGroupsOptions().withTypes(Set.of(GroupType.SHARE));
result = env.adminClient().listConsumerGroups(options);
TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all());
// But a type filter which is just classic groups is permitted with an older broker, because they
// only know about classic groups so the types filter can be omitted.
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
env.kafkaClient().prepareResponseFrom(
expectListGroupsRequestWithFilters(Set.of(), Set.of()),
new ListGroupsResponse(new ListGroupsResponseData()
.setErrorCode(Errors.NONE.code())
.setGroups(List.of(
new ListGroupsResponseData.ListedGroup()
.setGroupId("group-1")
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
.setGroupState(GroupState.STABLE.toString())))),
env.cluster().nodeById(0));
options = new ListConsumerGroupsOptions().withTypes(Set.of(GroupType.CLASSIC));
result = env.adminClient().listConsumerGroups(options);
listing = result.all().get();
assertEquals(1, listing.size());
assertEquals(expected, listing);
}
}
@ -6117,7 +6560,7 @@ public class KafkaAdminClientTest {
.setGroups(Collections.emptyList())),
env.cluster().nodeById(3));
final ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.STREAMS)));
final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forStreamsGroups());
TestUtils.assertFutureThrows(UnknownServerException.class, result.all());
Collection<GroupListing> listings = result.valid().get();
@ -6152,7 +6595,7 @@ public class KafkaAdminClientTest {
-1,
Collections.emptyList()));
final ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.STREAMS)));
final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forStreamsGroups());
TestUtils.assertFutureThrows(KafkaException.class, result.all());
}
}
@ -6180,7 +6623,7 @@ public class KafkaAdminClientTest {
.setGroupState("NotReady")))),
env.cluster().nodeById(0));
final ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.STREAMS)));
final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forStreamsGroups());
Collection<GroupListing> listings = result.valid().get();
assertEquals(2, listings.size());
@ -6211,7 +6654,7 @@ public class KafkaAdminClientTest {
new ListGroupsResponseData.ListedGroup()
.setGroupId("streams-group-1")))),
env.cluster().nodeById(0));
ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.STREAMS)));
ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forStreamsGroups());
TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all());
}
}
@ -6527,7 +6970,7 @@ public class KafkaAdminClientTest {
.setGroups(Collections.emptyList())),
env.cluster().nodeById(3));
final ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.SHARE)));
final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forShareGroups());
TestUtils.assertFutureThrows(UnknownServerException.class, result.all());
Collection<GroupListing> listings = result.valid().get();
@ -6562,7 +7005,7 @@ public class KafkaAdminClientTest {
-1,
Collections.emptyList()));
final ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.SHARE)));
final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forShareGroups());
TestUtils.assertFutureThrows(KafkaException.class, result.all());
}
}
@ -6590,7 +7033,7 @@ public class KafkaAdminClientTest {
.setGroupState("Empty")))),
env.cluster().nodeById(0));
final ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.SHARE)));
final ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forShareGroups());
Collection<GroupListing> listings = result.valid().get();
assertEquals(2, listings.size());
@ -6621,7 +7064,7 @@ public class KafkaAdminClientTest {
new ListGroupsResponseData.ListedGroup()
.setGroupId("share-group-1")))),
env.cluster().nodeById(0));
ListGroupsResult result = env.adminClient().listGroups(new ListGroupsOptions().withTypes(Set.of(GroupType.SHARE)));
ListGroupsResult result = env.adminClient().listGroups(ListGroupsOptions.forShareGroups());
TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all());
}
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.junit.jupiter.api.Test;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ListGroupsOptionsTest {
@Test
public void testForConsumerGroups() {
ListGroupsOptions options = ListGroupsOptions.forConsumerGroups();
assertTrue(options.groupStates().isEmpty());
assertEquals(Set.of(GroupType.CONSUMER, GroupType.CLASSIC), options.types());
assertEquals(Set.of("", ConsumerProtocol.PROTOCOL_TYPE), options.protocolTypes());
options.inGroupStates(Set.of(GroupState.STABLE));
options.withTypes(Set.of(GroupType.CONSUMER));
options.withProtocolTypes(Set.of(ConsumerProtocol.PROTOCOL_TYPE));
assertEquals(Set.of(GroupState.STABLE), options.groupStates());
assertEquals(Set.of(GroupType.CONSUMER), options.types());
assertEquals(Set.of(ConsumerProtocol.PROTOCOL_TYPE), options.protocolTypes());
}
@Test
public void testForShareGroups() {
ListGroupsOptions options = ListGroupsOptions.forShareGroups();
assertTrue(options.groupStates().isEmpty());
assertEquals(Set.of(GroupType.SHARE), options.types());
assertTrue(options.protocolTypes().isEmpty());
options.inGroupStates(Set.of(GroupState.STABLE));
options.withTypes(Set.of(GroupType.CONSUMER));
options.withProtocolTypes(Set.of(ConsumerProtocol.PROTOCOL_TYPE));
assertEquals(Set.of(GroupState.STABLE), options.groupStates());
assertEquals(Set.of(GroupType.CONSUMER), options.types());
assertEquals(Set.of(ConsumerProtocol.PROTOCOL_TYPE), options.protocolTypes());
}
@Test
public void testForStreamsGroups() {
ListGroupsOptions options = ListGroupsOptions.forStreamsGroups();
assertTrue(options.groupStates().isEmpty());
assertEquals(Set.of(GroupType.STREAMS), options.types());
assertTrue(options.protocolTypes().isEmpty());
options.inGroupStates(Set.of(GroupState.STABLE));
options.withTypes(Set.of(GroupType.CONSUMER));
options.withProtocolTypes(Set.of(ConsumerProtocol.PROTOCOL_TYPE));
assertEquals(Set.of(GroupState.STABLE), options.groupStates());
assertEquals(Set.of(GroupType.CONSUMER), options.types());
assertEquals(Set.of(ConsumerProtocol.PROTOCOL_TYPE), options.protocolTypes());
}
@Test
public void testGroupStates() {
ListGroupsOptions options = new ListGroupsOptions();
assertTrue(options.groupStates().isEmpty());
options.inGroupStates(Set.of(GroupState.DEAD));
assertEquals(Set.of(GroupState.DEAD), options.groupStates());
Set<GroupState> groupStates = Set.of(GroupState.values());
options = new ListGroupsOptions().inGroupStates(groupStates);
assertEquals(groupStates, options.groupStates());
}
@Test
public void testProtocolTypes() {
ListGroupsOptions options = new ListGroupsOptions();
assertTrue(options.protocolTypes().isEmpty());
options.withProtocolTypes(Set.of(ConsumerProtocol.PROTOCOL_TYPE));
assertEquals(Set.of(ConsumerProtocol.PROTOCOL_TYPE), options.protocolTypes());
Set<String> protocolTypes = Set.of("", "consumer", "share");
options = new ListGroupsOptions().withProtocolTypes(protocolTypes);
assertEquals(protocolTypes, options.protocolTypes());
}
@Test
public void testTypes() {
ListGroupsOptions options = new ListGroupsOptions();
assertTrue(options.types().isEmpty());
options.withTypes(Set.of(GroupType.CLASSIC));
assertEquals(Set.of(GroupType.CLASSIC), options.types());
Set<GroupType> groupTypes = Set.of(GroupType.values());
options = new ListGroupsOptions().withTypes(groupTypes);
assertEquals(groupTypes, options.types());
}
}

View File

@ -21,7 +21,7 @@ import joptsimple._
import kafka.server.DynamicConfig
import kafka.utils.Implicits._
import kafka.utils.Logging
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListGroupsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism}
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{InvalidConfigurationException, UnsupportedVersionException}
import org.apache.kafka.common.internals.Topic
@ -352,7 +352,7 @@ object ConfigCommand extends Logging {
case ClientMetricsType =>
adminClient.listClientMetricsResources().all().get().asScala.map(_.name).toSeq
case GroupType =>
adminClient.listGroups(ListGroupsOptions.forConsumerGroups()).all.get.asScala.map(_.groupId).toSeq
adminClient.listGroups().all.get.asScala.map(_.groupId).toSeq
case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType")
})

View File

@ -177,10 +177,11 @@ public class ConsumerGroupCommand {
@SuppressWarnings("Regexp")
static Set<GroupType> consumerGroupTypesFromString(String input) {
Set<GroupType> validTypes = Set.of(GroupType.CLASSIC, GroupType.CONSUMER);
Set<GroupType> parsedTypes = Stream.of(input.toLowerCase().split(",")).map(s -> GroupType.parse(s.trim())).collect(Collectors.toSet());
if (parsedTypes.contains(GroupType.UNKNOWN)) {
List<String> validTypes = Arrays.stream(GroupType.values()).filter(t -> t != GroupType.UNKNOWN).map(Object::toString).collect(Collectors.toList());
throw new IllegalArgumentException("Invalid types list '" + input + "'. Valid types are: " + String.join(", ", validTypes));
if (!validTypes.containsAll(parsedTypes)) {
throw new IllegalArgumentException("Invalid types list '" + input + "'. Valid types are: " +
String.join(", ", validTypes.stream().map(GroupType::toString).collect(Collectors.toSet())));
}
return parsedTypes;
}

View File

@ -153,9 +153,8 @@ public class ShareGroupCommand {
List<String> listShareGroups() {
try {
ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions()
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
.withTypes(Set.of(GroupType.SHARE)));
ListGroupsResult result = adminClient.listGroups(ListGroupsOptions.forShareGroups()
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue()));
Collection<GroupListing> listings = result.all().get();
return listings.stream().map(GroupListing::groupId).collect(Collectors.toList());
} catch (InterruptedException | ExecutionException e) {
@ -165,9 +164,8 @@ public class ShareGroupCommand {
List<GroupListing> listDetailedShareGroups() {
try {
ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions()
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
.withTypes(Set.of(GroupType.SHARE)));
ListGroupsResult result = adminClient.listGroups(ListGroupsOptions.forShareGroups()
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue()));
Collection<GroupListing> listings = result.all().get();
return listings.stream().toList();
} catch (InterruptedException | ExecutionException e) {
@ -176,9 +174,8 @@ public class ShareGroupCommand {
}
List<GroupListing> listShareGroupsInStates(Set<GroupState> states) throws ExecutionException, InterruptedException {
ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions()
ListGroupsResult result = adminClient.listGroups(ListGroupsOptions.forShareGroups()
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
.withTypes(Set.of(GroupType.SHARE))
.inGroupStates(states));
return new ArrayList<>(result.all().get());
}

View File

@ -136,9 +136,8 @@ public class StreamsGroupCommand {
List<String> listStreamsGroups() {
try {
ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions()
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
.withTypes(Set.of(GroupType.STREAMS)));
ListGroupsResult result = adminClient.listGroups(ListGroupsOptions.forStreamsGroups()
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue()));
Collection<GroupListing> listings = result.all().get();
return listings.stream().map(GroupListing::groupId).collect(Collectors.toList());
} catch (InterruptedException | ExecutionException e) {
@ -147,9 +146,8 @@ public class StreamsGroupCommand {
}
List<GroupListing> listStreamsGroupsInStates(Set<GroupState> states) throws ExecutionException, InterruptedException {
ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions()
ListGroupsResult result = adminClient.listGroups(ListGroupsOptions.forStreamsGroups()
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
.withTypes(Set.of(GroupType.STREAMS))
.inGroupStates(states));
return new ArrayList<>(result.all().get());
}

View File

@ -662,6 +662,10 @@ class ListConsumerGroupUnitTest {
result = ConsumerGroupCommand.consumerGroupTypesFromString("Consumer, Classic");
Assertions.assertEquals(ListConsumerGroupTest.set(Arrays.asList(GroupType.CONSUMER, GroupType.CLASSIC)), result);
Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumerGroupCommand.consumerGroupTypesFromString("Share"));
Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumerGroupCommand.consumerGroupTypesFromString("streams"));
Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumerGroupCommand.consumerGroupTypesFromString("bad, wrong"));
Assertions.assertThrows(IllegalArgumentException.class, () -> ConsumerGroupCommand.consumerGroupTypesFromString(" bad, generic"));