From d1f1e5c8fdffbd4bd2ce0807d4d03b1f0e6b9988 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Thu, 29 May 2025 12:15:31 -0500 Subject: [PATCH] KAFKA-18904: kafka-configs.sh return resource doesn't exist message [3/N] (#19808) * Return resource doesn't exist message when users try to describe a non-existent resource in kafka-configs.sh and kafka-client-metrics.sh. * For groups type, the command checks both existent groups and non-existent groups but having dynamic config. If it cannot find a group in both conditions, return resource doesn't exist message. Reviewers: Lan Ding <53332773+DL1231@users.noreply.github.com>, Andrew Schofield --------- Signed-off-by: PoAn Yang --- .../clients/admin/AdminClientTestUtils.java | 9 ++ .../kafka/clients/admin/MockAdminClient.java | 29 +++++- .../scala/kafka/admin/ConfigCommand.scala | 45 ++++++++- .../kafka/tools/ClientMetricsCommand.java | 6 ++ .../kafka/tools/ClientMetricsCommandTest.java | 27 ++++++ .../tools/ConfigCommandIntegrationTest.java | 92 +++++++++++++++++++ 6 files changed, 203 insertions(+), 5 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java index 36e7571d8dd..dc43a9f2bd4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java @@ -31,6 +31,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; public class AdminClientTestUtils { @@ -163,6 +164,14 @@ public class AdminClientTestUtils { return new ListConsumerGroupOffsetsResult(Collections.singletonMap(CoordinatorKey.byGroupId(group), future)); } + public static ListConfigResourcesResult listConfigResourcesResult(Map> resourceNames) { + Collection resources = resourceNames.entrySet().stream() + .flatMap(entry -> entry.getValue().stream() + .map(name -> new ConfigResource(entry.getKey(), name))) + .collect(Collectors.toList()); + return new ListConfigResourcesResult(KafkaFuture.completedFuture(resources)); + } + public static ListConfigResourcesResult listConfigResourcesResult(String... names) { return new ListConfigResourcesResult( KafkaFuture.completedFuture(Arrays.stream(names) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 0f0eaf51f8a..48874f1a1b2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -66,6 +66,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -1396,7 +1397,33 @@ public class MockAdminClient extends AdminClient { @Override public ListConfigResourcesResult listConfigResources(Set configResourceTypes, ListConfigResourcesOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); + KafkaFutureImpl> future = new KafkaFutureImpl<>(); + Set configResources = new HashSet<>(); + if (configResourceTypes.isEmpty() || configResourceTypes.contains(ConfigResource.Type.TOPIC)) { + allTopics.keySet().forEach(name -> configResources.add(new ConfigResource(ConfigResource.Type.TOPIC, name))); + } + + if (configResourceTypes.isEmpty() || configResourceTypes.contains(ConfigResource.Type.BROKER)) { + for (int i = 0; i < brokers.size(); i++) { + configResources.add(new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(i))); + } + } + + if (configResourceTypes.isEmpty() || configResourceTypes.contains(ConfigResource.Type.BROKER_LOGGER)) { + for (int i = 0; i < brokers.size(); i++) { + configResources.add(new ConfigResource(ConfigResource.Type.BROKER_LOGGER, String.valueOf(i))); + } + } + + if (configResourceTypes.isEmpty() || configResourceTypes.contains(ConfigResource.Type.CLIENT_METRICS)) { + clientMetricsConfigs.keySet().forEach(name -> configResources.add(new ConfigResource(ConfigResource.Type.CLIENT_METRICS, name))); + } + + if (configResourceTypes.isEmpty() || configResourceTypes.contains(ConfigResource.Type.GROUP)) { + groupConfigs.keySet().forEach(name -> configResources.add(new ConfigResource(ConfigResource.Type.GROUP, name))); + } + future.complete(configResources); + return new ListConfigResourcesResult(future); } @Override diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 8fb37665c3a..f004b9956c8 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,7 +21,7 @@ import joptsimple._ import kafka.server.DynamicConfig import kafka.utils.Implicits._ import kafka.utils.Logging -import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism} +import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListConfigResourcesOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism} import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors.{InvalidConfigurationException, UnsupportedVersionException} import org.apache.kafka.common.internals.Topic @@ -342,6 +342,42 @@ object ConfigCommand extends Logging { } private def describeResourceConfig(adminClient: Admin, entityType: String, entityName: Option[String], describeAll: Boolean): Unit = { + if (!describeAll) { + entityName.foreach { name => + entityType match { + case TopicType => + Topic.validate(name) + if (!adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names.get.contains(name)) { + System.out.println(s"The ${entityType.dropRight(1)} '$name' doesn't exist and doesn't have dynamic config.") + return + } + case BrokerType | BrokerLoggerConfigType => + if (adminClient.describeCluster.nodes.get.stream.anyMatch(_.idString == name)) { + // valid broker id + } else if (name == BrokerDefaultEntityName) { + // default broker configs + } else { + System.out.println(s"The ${entityType.dropRight(1)} '$name' doesn't exist and doesn't have dynamic config.") + return + } + case ClientMetricsType => + if (adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions).all.get + .stream.noneMatch(_.name == name)) { + System.out.println(s"The ${entityType.dropRight(1)} '$name' doesn't exist and doesn't have dynamic config.") + return + } + case GroupType => + if (adminClient.listGroups().all.get.stream.noneMatch(_.groupId() == name) && + adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP), new ListConfigResourcesOptions).all.get + .stream.noneMatch(_.name == name)) { + System.out.println(s"The ${entityType.dropRight(1)} '$name' doesn't exist and doesn't have dynamic config.") + return + } + case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType") + } + } + } + val entities = entityName .map(name => List(name)) .getOrElse(entityType match { @@ -350,9 +386,10 @@ object ConfigCommand extends Logging { case BrokerType | BrokerLoggerConfigType => adminClient.describeCluster(new DescribeClusterOptions()).nodes().get().asScala.map(_.idString).toSeq :+ BrokerDefaultEntityName case ClientMetricsType => - adminClient.listClientMetricsResources().all().get().asScala.map(_.name).toSeq + adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions).all().get().asScala.map(_.name).toSeq case GroupType => - adminClient.listGroups().all.get.asScala.map(_.groupId).toSeq + adminClient.listGroups().all.get.asScala.map(_.groupId).toSet ++ + adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP), new ListConfigResourcesOptions).all().get().asScala.map(_.name).toSet case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType") }) diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java b/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java index 8dcb5e5a750..6bd0f29d33d 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java @@ -156,6 +156,12 @@ public class ClientMetricsCommand { List entities; if (entityNameOpt.isPresent()) { + if (adminClient.listConfigResources(Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions()) + .all().get(30, TimeUnit.SECONDS).stream() + .noneMatch(resource -> resource.name().equals(entityNameOpt.get()))) { + System.out.println("The client metric resource " + entityNameOpt.get() + " doesn't exist and doesn't have dynamic config."); + return; + } entities = Collections.singletonList(entityNameOpt.get()); } else { Collection resources = adminClient diff --git a/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java index 2fcf082f0a0..78ef5542372 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java @@ -35,6 +35,7 @@ import org.mockito.ArgumentCaptor; import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -232,6 +233,10 @@ public class ClientMetricsCommandTest { ClientMetricsCommand.ClientMetricsService service = new ClientMetricsCommand.ClientMetricsService(adminClient); ConfigResource cr = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, clientMetricsName); + ListConfigResourcesResult listConfigResourcesResult = AdminClientTestUtils.listConfigResourcesResult(Map.of( + ConfigResource.Type.CLIENT_METRICS, Set.of(clientMetricsName) + )); + when(adminClient.listConfigResources(any(), any())).thenReturn(listConfigResourcesResult); Config cfg = new Config(Collections.singleton(new ConfigEntry("metrics", "org.apache.kafka.producer."))); DescribeConfigsResult describeResult = AdminClientTestUtils.describeConfigsResult(cr, cfg); when(adminClient.describeConfigs(any())).thenReturn(describeResult); @@ -249,6 +254,28 @@ public class ClientMetricsCommandTest { assertTrue(capturedOutput.contains("metrics=org.apache.kafka.producer.")); } + @Test + public void testDescribeNonExistentClientMetric() { + Admin adminClient = mock(Admin.class); + ClientMetricsCommand.ClientMetricsService service = new ClientMetricsCommand.ClientMetricsService(adminClient); + + ListConfigResourcesResult listConfigResourcesResult = AdminClientTestUtils.listConfigResourcesResult(Map.of( + ConfigResource.Type.CLIENT_METRICS, Set.of() + )); + when(adminClient.listConfigResources(any(), any())).thenReturn(listConfigResourcesResult); + + String capturedOutput = ToolsTestUtils.captureStandardOut(() -> { + try { + service.describeClientMetrics(new ClientMetricsCommand.ClientMetricsCommandOptions( + new String[]{"--bootstrap-server", bootstrapServer, "--describe", + "--name", clientMetricsName})); + } catch (Throwable t) { + fail(t); + } + }); + assertTrue(capturedOutput.contains("The client metric resource " + clientMetricsName + " doesn't exist and doesn't have dynamic config.")); + } + @Test public void testDescribeAll() { Admin adminClient = mock(Admin.class); diff --git a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java index a5c91a632f7..25da40ebf75 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.clients.admin.AlterConfigsOptions; import org.apache.kafka.clients.admin.AlterConfigsResult; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.common.errors.UnsupportedVersionException; @@ -37,6 +38,7 @@ import org.apache.kafka.test.TestUtils; import org.mockito.Mockito; +import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -58,6 +60,8 @@ import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_ import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG; import static org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG; import static org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; import static org.apache.kafka.server.config.ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG; import static org.apache.kafka.server.config.ServerConfigs.MESSAGE_MAX_BYTES_CONFIG; import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; @@ -128,6 +132,58 @@ public class ConfigCommandIntegrationTest { "--alter", "--add-config", "consumer.session.timeout.ms=50000")); message = captureStandardOut(run(command)); assertEquals("Completed updating config for group group.", message); + + // A non-existent group with dynamic configs can be described + command = Stream.concat(quorumArgs(), Stream.of( + "--entity-type", "groups", + "--describe")); + message = captureStandardOut(run(command)); + assertTrue(message.contains("Dynamic configs for group group are:")); + assertTrue(message.contains("consumer.session.timeout.ms=50000 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.session.timeout.ms=50000}")); + + command = Stream.concat(quorumArgs(), Stream.of( + "--entity-type", "groups", + "--entity-name", "group", + "--describe")); + message = captureStandardOut(run(command)); + assertTrue(message.contains("Dynamic configs for group group are:")); + assertTrue(message.contains("consumer.session.timeout.ms=50000 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.session.timeout.ms=50000}")); + } + + @ClusterTest(serverProperties = { + @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") + }) + public void testDescribeGroupWithoutDynamicConfigs(ClusterInstance cluster) throws InterruptedException, ExecutionException { + cluster.createTopic("topic", 1, (short) 1); + + try (Producer producer = cluster.producer(); + org.apache.kafka.clients.consumer.Consumer consumer = cluster.consumer(Map.of( + "group.protocol", "consumer", + "group.id", "group" + ))) { + producer.send(new org.apache.kafka.clients.producer.ProducerRecord<>("topic", "key".getBytes(), "value".getBytes())).get(); + producer.flush(); + consumer.subscribe(List.of("topic")); + consumer.poll(Duration.ofMillis(100)); + + TestUtils.waitForCondition(() -> { + Stream command = Stream.concat(quorumArgs(), Stream.of( + "--entity-type", "groups", + "--describe")); + String message = captureStandardOut(run(command)); + return message.contains("Dynamic configs for group group are:"); + }, () -> "cannot describe group without dynamic groups"); + + TestUtils.waitForCondition(() -> { + Stream command = Stream.concat(quorumArgs(), Stream.of( + "--entity-type", "groups", + "--entity-name", "group", + "--describe")); + String message = captureStandardOut(run(command)); + return message.contains("Dynamic configs for group group are:"); + }, () -> "cannot describe group without dynamic groups"); + } } @ClusterTest @@ -145,6 +201,21 @@ public class ConfigCommandIntegrationTest { "--alter", "--add-config", "metrics=org.apache")); message = captureStandardOut(run(command)); assertEquals("Completed updating config for client-metric cm.", message); + + command = Stream.concat(quorumArgs(), Stream.of( + "--entity-type", "client-metrics", + "--describe")); + message = captureStandardOut(run(command)); + assertTrue(message.contains("Dynamic configs for client-metric cm are:")); + assertTrue(message.contains("metrics=org.apache sensitive=false synonyms={DYNAMIC_CLIENT_METRICS_CONFIG:metrics=org.apache}")); + + command = Stream.concat(quorumArgs(), Stream.of( + "--entity-type", "client-metrics", + "--entity-name", "cm", + "--describe")); + message = captureStandardOut(run(command)); + assertTrue(message.contains("Dynamic configs for client-metric cm are:")); + assertTrue(message.contains("metrics=org.apache sensitive=false synonyms={DYNAMIC_CLIENT_METRICS_CONFIG:metrics=org.apache}")); } @ClusterTest @@ -471,6 +542,27 @@ public class ConfigCommandIntegrationTest { } } + @ClusterTest + public void testDescribeNonExistentConfigResource() { + Map configResourceTypeAndNames = Map.of( + "brokers", "3", + "broker-loggers", "3", + "topics", "non-existent", + "groups", "non-existent", + "client-metrics", "non-existent"); + configResourceTypeAndNames.forEach((type, name) -> { + Stream command = Stream.concat(quorumArgs(), Stream.of( + "--entity-type", type, + "--entity-name", name, + "--describe")); + String message = captureStandardOut(run(command)); + assertTrue( + message.contains("The " + type.substring(0, type.length() - 1) + " '" + name + "' doesn't exist and doesn't have dynamic config."), + "The config resource type " + type + " got unexpected result: " + message + ); + }); + } + private void assertNonZeroStatusExit(Stream args, Consumer checkErrOut) { AtomicReference exitStatus = new AtomicReference<>(); Exit.setExitProcedure((status, __) -> {