KAFKA-18904: Add Admin#listConfigResources [2/N] (#19743)
CI / build (push) Waiting to run Details

* Add new functions `listConfigResources(Set<ConfigResource.Type>
configResourceTypes, ListConfigResourcesOptions options)` and
`listConfigResources()` to `Admin` interface.
  * New functions can list all kind of config resource types.
  * If input is a set with a type other than `CLIENT_METRICS` and
request version is 0, return `UnsupportedVersionException`.
* Deprecate functions
`listClientMetricsResources(ListClientMetricsResourcesOptions options)`
and `listClientMetricsResources()`.
* Deprecate classes `ListClientMetricsResourcesResult` and
`ClientMetricsResourceListing`.
* Change `ClientMetricsCommand` to use `listConfigResources`.
* Add integration tests to `PlaintextAdminIntegrationTest.java`.
* Add unit tests to `KafkaAdminClientTest.java`.

Reviewers: Andrew Schofield <aschofield@confluent.io>

---------

Signed-off-by: PoAn Yang <payang@apache.org>
This commit is contained in:
PoAn Yang 2025-05-22 10:05:35 -05:00 committed by GitHub
parent 239dce3e04
commit 30d7c71f09
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 695 additions and 342 deletions

View File

@ -1775,12 +1775,36 @@ public interface Admin extends AutoCloseable {
FenceProducersResult fenceProducers(Collection<String> transactionalIds,
FenceProducersOptions options);
/**
* List the configuration resources available in the cluster which matches config resource type.
* If no config resource types are specified, all configuration resources will be listed.
*
* @param configResourceTypes The set of configuration resource types to list.
* @param options The options to use when listing the configuration resources.
* @return The ListConfigurationResourcesResult.
*/
ListConfigResourcesResult listConfigResources(Set<ConfigResource.Type> configResourceTypes, ListConfigResourcesOptions options);
/**
* List all configuration resources available in the cluster with the default options.
* <p>
* This is a convenience method for {@link #listConfigResources(Set, ListConfigResourcesOptions)}
* with default options. See the overload for more details.
*
* @return The ListConfigurationResourcesResult.
*/
default ListConfigResourcesResult listConfigResources() {
return listConfigResources(Set.of(), new ListConfigResourcesOptions());
}
/**
* List the client metrics configuration resources available in the cluster.
*
* @param options The options to use when listing the client metrics resources.
* @return The ListClientMetricsResourcesResult.
* @deprecated Since 4.1. Use {@link #listConfigResources(Set, ListConfigResourcesOptions)} instead.
*/
@Deprecated(since = "4.1", forRemoval = true)
ListClientMetricsResourcesResult listClientMetricsResources(ListClientMetricsResourcesOptions options);
/**
@ -1790,7 +1814,9 @@ public interface Admin extends AutoCloseable {
* with default options. See the overload for more details.
*
* @return The ListClientMetricsResourcesResult.
* @deprecated Since 4.1. Use {@link #listConfigResources()} instead.
*/
@Deprecated(since = "4.1", forRemoval = true)
default ListClientMetricsResourcesResult listClientMetricsResources() {
return listClientMetricsResources(new ListClientMetricsResourcesOptions());
}

View File

@ -18,6 +18,7 @@ package org.apache.kafka.clients.admin;
import java.util.Objects;
@Deprecated(since = "4.1")
public class ClientMetricsResourceListing {
private final String name;

View File

@ -300,6 +300,12 @@ public class ForwardingAdmin implements Admin {
return delegate.fenceProducers(transactionalIds, options);
}
@Override
public ListConfigResourcesResult listConfigResources(Set<ConfigResource.Type> configResourceTypes, ListConfigResourcesOptions options) {
return delegate.listConfigResources(configResourceTypes, options);
}
@SuppressWarnings({"deprecation", "removal"})
@Override
public ListClientMetricsResourcesResult listClientMetricsResources(ListClientMetricsResourcesOptions options) {
return delegate.listClientMetricsResources(options);

View File

@ -19,6 +19,8 @@ package org.apache.kafka.clients.admin;
/**
* Options for {@link Admin#listClientMetricsResources()}.
* @deprecated Since 4.1. Use {@link ListConfigResourcesOptions} instead.
*/
@Deprecated(since = "4.1")
public class ListClientMetricsResourcesOptions extends AbstractOptions<ListClientMetricsResourcesOptions> {
}

View File

@ -25,7 +25,9 @@ import java.util.Collection;
/**
* The result of the {@link Admin#listClientMetricsResources()} call.
* <p>
* @deprecated Since 4.1. Use {@link ListConfigResourcesResult} instead.
*/
@Deprecated(since = "4.1")
public class ListClientMetricsResourcesResult {
private final KafkaFuture<Collection<ClientMetricsResourceListing>> future;

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
/**
* Options for {@link Admin#listConfigResources()}.
*/
public class ListConfigResourcesOptions extends AbstractOptions<ListConfigResourcesOptions> {
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import java.util.Collection;
/**
* The result of the {@link Admin#listConfigResources()} call.
* <p>
*/
public class ListConfigResourcesResult {
private final KafkaFuture<Collection<ConfigResource>> future;
ListConfigResourcesResult(KafkaFuture<Collection<ConfigResource>> future) {
this.future = future;
}
/**
* Returns a future that yields either an exception, or the full set of config resources.
*
* In the event of a failure, the future yields nothing but the first exception which
* occurred.
*/
public KafkaFuture<Collection<ConfigResource>> all() {
final KafkaFutureImpl<Collection<ConfigResource>> result = new KafkaFutureImpl<>();
future.whenComplete((resources, throwable) -> {
if (throwable != null) {
result.completeExceptionally(throwable);
} else {
result.complete(resources);
}
});
return result;
}
}

View File

@ -17,12 +17,14 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ListConfigResourcesRequestData;
import org.apache.kafka.common.message.ListConfigResourcesResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
import java.util.HashSet;
import java.util.Set;
public class ListConfigResourcesRequest extends AbstractRequest {
@ -36,6 +38,15 @@ public class ListConfigResourcesRequest extends AbstractRequest {
@Override
public ListConfigResourcesRequest build(short version) {
if (version == 0) {
// The v0 only supports CLIENT_METRICS resource type.
Set<Byte> resourceTypes = new HashSet<>(data.resourceTypes());
if (resourceTypes.size() != 1 || !resourceTypes.contains(ConfigResource.Type.CLIENT_METRICS.id())) {
throw new UnsupportedVersionException("The v0 ListConfigResources only supports CLIENT_METRICS");
}
// The v0 request does not have resource types field, so creating a new request data.
return new ListConfigResourcesRequest(new ListConfigResourcesRequestData(), version);
}
return new ListConfigResourcesRequest(data, version);
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.clients.admin.ClientMetricsResourceListing;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.message.ListConfigResourcesResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
@ -78,12 +77,4 @@ public class ListConfigResourcesResponse extends AbstractResponse {
)
).collect(Collectors.toList());
}
public Collection<ClientMetricsResourceListing> clientMetricsResources() {
return data.configResources()
.stream()
.filter(entry -> entry.resourceType() == ConfigResource.Type.CLIENT_METRICS.id())
.map(entry -> new ClientMetricsResourceListing(entry.resourceName()))
.collect(Collectors.toList());
}
}

View File

@ -163,17 +163,17 @@ public class AdminClientTestUtils {
return new ListConsumerGroupOffsetsResult(Collections.singletonMap(CoordinatorKey.byGroupId(group), future));
}
public static ListClientMetricsResourcesResult listClientMetricsResourcesResult(String... names) {
return new ListClientMetricsResourcesResult(
KafkaFuture.completedFuture(Arrays.stream(names)
.map(ClientMetricsResourceListing::new)
.collect(Collectors.toList())));
public static ListConfigResourcesResult listConfigResourcesResult(String... names) {
return new ListConfigResourcesResult(
KafkaFuture.completedFuture(Arrays.stream(names)
.map(name -> new ConfigResource(ConfigResource.Type.CLIENT_METRICS, name))
.collect(Collectors.toList())));
}
public static ListClientMetricsResourcesResult listClientMetricsResourcesResult(KafkaException exception) {
final KafkaFutureImpl<Collection<ClientMetricsResourceListing>> future = new KafkaFutureImpl<>();
public static ListConfigResourcesResult listConfigResourcesResult(KafkaException exception) {
final KafkaFutureImpl<Collection<ConfigResource>> future = new KafkaFutureImpl<>();
future.completeExceptionally(exception);
return new ListClientMetricsResourcesResult(future);
return new ListConfigResourcesResult(future);
}
public static ListShareGroupOffsetsResult createListShareGroupOffsetsResult(Map<String, KafkaFuture<Map<TopicPartition, OffsetAndMetadata>>> groupOffsets) {

View File

@ -10664,6 +10664,7 @@ public class KafkaAdminClientTest {
member.memberEpoch());
}
@SuppressWarnings({"deprecation", "removal"})
@Test
public void testListClientMetricsResources() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
@ -10697,6 +10698,7 @@ public class KafkaAdminClientTest {
}
}
@SuppressWarnings({"deprecation", "removal"})
@Test
public void testListClientMetricsResourcesEmpty() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
@ -10714,6 +10716,7 @@ public class KafkaAdminClientTest {
}
}
@SuppressWarnings({"deprecation", "removal"})
@Test
public void testListClientMetricsResourcesNotSupported() {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
@ -10729,6 +10732,70 @@ public class KafkaAdminClientTest {
}
}
@Test
public void testListConfigResources() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
List<ConfigResource> expected = List.of(
new ConfigResource(ConfigResource.Type.CLIENT_METRICS, "client-metrics"),
new ConfigResource(ConfigResource.Type.BROKER, "1"),
new ConfigResource(ConfigResource.Type.BROKER_LOGGER, "1"),
new ConfigResource(ConfigResource.Type.TOPIC, "topic"),
new ConfigResource(ConfigResource.Type.GROUP, "group")
);
ListConfigResourcesResponseData responseData =
new ListConfigResourcesResponseData().setErrorCode(Errors.NONE.code());
expected.forEach(c ->
responseData.configResources()
.add(new ListConfigResourcesResponseData
.ConfigResource()
.setResourceName(c.name())
.setResourceType(c.type().id())
)
);
env.kafkaClient().prepareResponse(
request -> request instanceof ListConfigResourcesRequest,
new ListConfigResourcesResponse(responseData));
ListConfigResourcesResult result = env.adminClient().listConfigResources();
assertEquals(expected.size(), result.all().get().size());
assertEquals(new HashSet<>(expected), new HashSet<>(result.all().get()));
}
}
@Test
public void testListConfigResourcesEmpty() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
ListConfigResourcesResponseData responseData =
new ListConfigResourcesResponseData().setErrorCode(Errors.NONE.code());
env.kafkaClient().prepareResponse(
request -> request instanceof ListConfigResourcesRequest,
new ListConfigResourcesResponse(responseData));
ListConfigResourcesResult result = env.adminClient().listConfigResources();
assertTrue(result.all().get().isEmpty());
}
}
@Test
public void testListConfigResourcesNotSupported() {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().prepareResponse(
request -> request instanceof ListConfigResourcesRequest,
new ListConfigResourcesResponse(new ListConfigResourcesResponseData()
.setErrorCode(Errors.UNSUPPORTED_VERSION.code())));
ListConfigResourcesResult result = env.adminClient().listConfigResources(
Set.of(ConfigResource.Type.UNKNOWN), new ListConfigResourcesOptions());
assertNotNull(result.all());
TestUtils.assertFutureThrows(UnsupportedVersionException.class, result.all());
}
}
@Test
public void testCallFailWithUnsupportedVersionExceptionDoesNotHaveConcurrentModificationException() throws InterruptedException {
Cluster cluster = mockCluster(1, 0);

View File

@ -1394,6 +1394,12 @@ public class MockAdminClient extends AdminClient {
throw new UnsupportedOperationException("Not implemented yet");
}
@Override
public ListConfigResourcesResult listConfigResources(Set<ConfigResource.Type> configResourceTypes, ListConfigResourcesOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}
@SuppressWarnings("deprecation")
@Override
public ListClientMetricsResourcesResult listClientMetricsResources(ListClientMetricsResourcesOptions options) {
KafkaFutureImpl<Collection<ClientMetricsResourceListing>> future = new KafkaFutureImpl<>();

View File

@ -3637,7 +3637,10 @@ public class RequestResponseTest {
}
private ListConfigResourcesRequest createListConfigResourcesRequest(short version) {
return new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()).build(version);
return version == 0 ?
new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()
.setResourceTypes(List.of(ConfigResource.Type.CLIENT_METRICS.id()))).build(version) :
new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()).build(version);
}
private ListConfigResourcesResponse createListConfigResourcesResponse() {
@ -3951,4 +3954,25 @@ public class RequestResponseTest {
parseRequest(SASL_AUTHENTICATE, SASL_AUTHENTICATE.latestVersion(), accessor)).getMessage();
assertEquals("Error reading byte array of 32767 byte(s): only 3 byte(s) available", msg);
}
@Test
public void testListConfigResourcesRequestV0FailsWithConfigResourceTypeOtherThanClientMetrics() {
// One type which is not CLIENT_METRICS
Arrays.stream(ConfigResource.Type.values())
.filter(t -> t != ConfigResource.Type.CLIENT_METRICS)
.forEach(t -> {
ListConfigResourcesRequestData data = new ListConfigResourcesRequestData()
.setResourceTypes(List.of(t.id()));
assertThrows(UnsupportedVersionException.class, () -> new ListConfigResourcesRequest.Builder(data).build((short) 0));
});
// Multiple types with CLIENT_METRICS
Arrays.stream(ConfigResource.Type.values())
.filter(t -> t != ConfigResource.Type.CLIENT_METRICS)
.forEach(t -> {
ListConfigResourcesRequestData data = new ListConfigResourcesRequestData()
.setResourceTypes(List.of(t.id(), ConfigResource.Type.CLIENT_METRICS.id()));
assertThrows(UnsupportedVersionException.class, () -> new ListConfigResourcesRequest.Builder(data).build((short) 0));
});
}
}

View File

@ -3866,6 +3866,92 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
} finally client.close(time.Duration.ZERO)
}
@Test
def testListConfigResources(): Unit = {
client = createAdminClient
// Alter group and client metric config to add group and client metric config resource
val clientMetric = "client-metrics"
val group = "group"
val clientMetricResource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, clientMetric)
val groupResource = new ConfigResource(ConfigResource.Type.GROUP, group)
val alterResult = client.incrementalAlterConfigs(util.Map.of(
clientMetricResource,
util.Set.of(new AlterConfigOp(new ConfigEntry("interval.ms", "111"), AlterConfigOp.OpType.SET)),
groupResource,
util.Set.of(new AlterConfigOp(new ConfigEntry(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "50000"), AlterConfigOp.OpType.SET))
))
assertEquals(util.Set.of(clientMetricResource, groupResource), alterResult.values.keySet)
alterResult.all.get(15, TimeUnit.SECONDS)
ensureConsistentKRaftMetadata()
// non-specified config resource type retrieves all config resources
var configResources = client.listConfigResources().all().get()
assertEquals(9, configResources.size())
brokerServers.foreach(b => {
assertTrue(configResources.contains(new ConfigResource(ConfigResource.Type.BROKER, b.config.nodeId.toString)))
assertTrue(configResources.contains(new ConfigResource(ConfigResource.Type.BROKER_LOGGER, b.config.nodeId.toString)))
})
assertTrue(configResources.contains(new ConfigResource(ConfigResource.Type.TOPIC, Topic.GROUP_METADATA_TOPIC_NAME)))
assertTrue(configResources.contains(groupResource))
assertTrue(configResources.contains(clientMetricResource))
// BROKER config resource type retrieves only broker config resources
configResources = client.listConfigResources(util.Set.of(ConfigResource.Type.BROKER), new ListConfigResourcesOptions()).all().get()
assertEquals(3, configResources.size())
brokerServers.foreach(b => {
assertTrue(configResources.contains(new ConfigResource(ConfigResource.Type.BROKER, b.config.nodeId.toString)))
assertFalse(configResources.contains(new ConfigResource(ConfigResource.Type.BROKER_LOGGER, b.config.nodeId.toString)))
})
assertFalse(configResources.contains(new ConfigResource(ConfigResource.Type.TOPIC, Topic.GROUP_METADATA_TOPIC_NAME)))
assertFalse(configResources.contains(groupResource))
assertFalse(configResources.contains(clientMetricResource))
// BROKER_LOGGER config resource type retrieves only broker logger config resources
configResources = client.listConfigResources(util.Set.of(ConfigResource.Type.BROKER_LOGGER), new ListConfigResourcesOptions()).all().get()
assertEquals(3, configResources.size())
brokerServers.foreach(b => {
assertFalse(configResources.contains(new ConfigResource(ConfigResource.Type.BROKER, b.config.nodeId.toString)))
assertTrue(configResources.contains(new ConfigResource(ConfigResource.Type.BROKER_LOGGER, b.config.nodeId.toString)))
})
assertFalse(configResources.contains(new ConfigResource(ConfigResource.Type.TOPIC, Topic.GROUP_METADATA_TOPIC_NAME)))
assertFalse(configResources.contains(groupResource))
assertFalse(configResources.contains(clientMetricResource))
// TOPIC config resource type retrieves only topic config resources
configResources = client.listConfigResources(util.Set.of(ConfigResource.Type.TOPIC), new ListConfigResourcesOptions()).all().get()
assertEquals(1, configResources.size())
assertTrue(configResources.contains(new ConfigResource(ConfigResource.Type.TOPIC, Topic.GROUP_METADATA_TOPIC_NAME)))
// GROUP config resource type retrieves only group config resources
configResources = client.listConfigResources(util.Set.of(ConfigResource.Type.GROUP), new ListConfigResourcesOptions()).all().get()
assertEquals(1, configResources.size())
assertTrue(configResources.contains(groupResource))
// CLIENT_METRICS config resource type retrieves only client metric config resources
configResources = client.listConfigResources(util.Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions()).all().get()
assertEquals(1, configResources.size())
assertTrue(configResources.contains(clientMetricResource))
// UNKNOWN config resource type gets UNSUPPORTED_VERSION error
assertThrows(classOf[ExecutionException], () => {
client.listConfigResources(util.Set.of(ConfigResource.Type.UNKNOWN), new ListConfigResourcesOptions()).all().get()
})
}
@Test
@Timeout(30)
def testListConfigResourcesTimeoutMs(): Unit = {
client = createInvalidAdminClient()
try {
val timeoutOption = new ListConfigResourcesOptions().timeoutMs(0)
val exception = assertThrows(classOf[ExecutionException], () =>
client.listConfigResources(util.Set.of(), timeoutOption).all().get())
assertInstanceOf(classOf[TimeoutException], exception.getCause)
} finally client.close(time.Duration.ZERO)
}
/**
* Test that createTopics returns the dynamic configurations of the topics that were created.
*

View File

@ -11207,7 +11207,8 @@ class KafkaApisTest extends Logging {
@Test
def testListConfigResourcesV0(): Unit = {
val request = buildRequest(new ListConfigResourcesRequest.Builder(new ListConfigResourcesRequestData()).build(0))
val request = buildRequest(new ListConfigResourcesRequest.Builder(
new ListConfigResourcesRequestData().setResourceTypes(util.List.of(ConfigResource.Type.CLIENT_METRICS.id))).build(0))
metadataCache = mock(classOf[KRaftMetadataCache])
val resources = util.Set.of("client-metric1", "client-metric2")

View File

@ -110,6 +110,8 @@ import org.apache.kafka.clients.admin.FenceProducersOptions;
import org.apache.kafka.clients.admin.FenceProducersResult;
import org.apache.kafka.clients.admin.ListClientMetricsResourcesOptions;
import org.apache.kafka.clients.admin.ListClientMetricsResourcesResult;
import org.apache.kafka.clients.admin.ListConfigResourcesOptions;
import org.apache.kafka.clients.admin.ListConfigResourcesResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
@ -436,6 +438,12 @@ public class TestingMetricsInterceptingAdminClient extends AdminClient {
return adminDelegate.fenceProducers(transactionalIds, options);
}
@Override
public ListConfigResourcesResult listConfigResources(final Set<ConfigResource.Type> configResourceTypes, final ListConfigResourcesOptions options) {
return adminDelegate.listConfigResources(configResourceTypes, options);
}
@SuppressWarnings({"deprecation", "removal"})
@Override
public ListClientMetricsResourcesResult listClientMetricsResources(final ListClientMetricsResourcesOptions options) {
return adminDelegate.listClientMetricsResources(options);

View File

@ -21,9 +21,9 @@ import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsOptions;
import org.apache.kafka.clients.admin.ClientMetricsResourceListing;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ListConfigResourcesOptions;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.utils.Exit;
@ -43,6 +43,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -157,9 +158,11 @@ public class ClientMetricsCommand {
if (entityNameOpt.isPresent()) {
entities = Collections.singletonList(entityNameOpt.get());
} else {
Collection<ClientMetricsResourceListing> resources = adminClient.listClientMetricsResources()
.all().get(30, TimeUnit.SECONDS);
entities = resources.stream().map(ClientMetricsResourceListing::name).collect(Collectors.toList());
Collection<ConfigResource> resources = adminClient
.listConfigResources(Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions())
.all()
.get(30, TimeUnit.SECONDS);
entities = resources.stream().map(ConfigResource::name).toList();
}
for (String entity : entities) {
@ -170,9 +173,11 @@ public class ClientMetricsCommand {
}
public void listClientMetrics() throws Exception {
Collection<ClientMetricsResourceListing> resources = adminClient.listClientMetricsResources()
.all().get(30, TimeUnit.SECONDS);
String results = resources.stream().map(ClientMetricsResourceListing::name).collect(Collectors.joining("\n"));
Collection<ConfigResource> resources = adminClient
.listConfigResources(Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions())
.all()
.get(30, TimeUnit.SECONDS);
String results = resources.stream().map(ConfigResource::name).collect(Collectors.joining("\n"));
System.out.println(results);
}

View File

@ -24,7 +24,7 @@ import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.ListClientMetricsResourcesResult;
import org.apache.kafka.clients.admin.ListConfigResourcesResult;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Exit;
@ -254,8 +254,8 @@ public class ClientMetricsCommandTest {
Admin adminClient = mock(Admin.class);
ClientMetricsCommand.ClientMetricsService service = new ClientMetricsCommand.ClientMetricsService(adminClient);
ListClientMetricsResourcesResult result = AdminClientTestUtils.listClientMetricsResourcesResult(clientMetricsName);
when(adminClient.listClientMetricsResources()).thenReturn(result);
ListConfigResourcesResult result = AdminClientTestUtils.listConfigResourcesResult(clientMetricsName);
when(adminClient.listConfigResources(any(), any())).thenReturn(result);
ConfigResource cr = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, clientMetricsName);
Config cfg = new Config(Collections.singleton(new ConfigEntry("metrics", "org.apache.kafka.producer.")));
DescribeConfigsResult describeResult = AdminClientTestUtils.describeConfigsResult(cr, cfg);
@ -278,8 +278,8 @@ public class ClientMetricsCommandTest {
Admin adminClient = mock(Admin.class);
ClientMetricsCommand.ClientMetricsService service = new ClientMetricsCommand.ClientMetricsService(adminClient);
ListClientMetricsResourcesResult result = AdminClientTestUtils.listClientMetricsResourcesResult("one", "two");
when(adminClient.listClientMetricsResources()).thenReturn(result);
ListConfigResourcesResult result = AdminClientTestUtils.listConfigResourcesResult("one", "two");
when(adminClient.listConfigResources(any(), any())).thenReturn(result);
String capturedOutput = ToolsTestUtils.captureStandardOut(() -> {
try {
@ -296,8 +296,8 @@ public class ClientMetricsCommandTest {
Admin adminClient = mock(Admin.class);
ClientMetricsCommand.ClientMetricsService service = new ClientMetricsCommand.ClientMetricsService(adminClient);
ListClientMetricsResourcesResult result = AdminClientTestUtils.listClientMetricsResourcesResult(Errors.UNSUPPORTED_VERSION.exception());
when(adminClient.listClientMetricsResources()).thenReturn(result);
ListConfigResourcesResult result = AdminClientTestUtils.listConfigResourcesResult(Errors.UNSUPPORTED_VERSION.exception());
when(adminClient.listConfigResources(any(), any())).thenReturn(result);
assertThrows(ExecutionException.class, () -> service.listClientMetrics());
}