KAFKA-18904: kafka-configs.sh return resource doesn't exist message [3/N] (#19808)

* Return resource doesn't exist message when users try to describe a
non-existent resource in kafka-configs.sh and kafka-client-metrics.sh.
* For groups type, the command checks both existent groups and
non-existent groups but having dynamic config. If it cannot find a group
in both conditions, return resource doesn't exist message.

Reviewers: Lan Ding <53332773+DL1231@users.noreply.github.com>, Andrew
Schofield <aschofield@confluent.io>

---------

Signed-off-by: PoAn Yang <payang@apache.org>
This commit is contained in:
PoAn Yang 2025-05-29 12:15:31 -05:00 committed by GitHub
parent f42abe6db8
commit d1f1e5c8fd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 203 additions and 5 deletions

View File

@ -31,6 +31,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public class AdminClientTestUtils {
@ -163,6 +164,14 @@ public class AdminClientTestUtils {
return new ListConsumerGroupOffsetsResult(Collections.singletonMap(CoordinatorKey.byGroupId(group), future));
}
public static ListConfigResourcesResult listConfigResourcesResult(Map<ConfigResource.Type, Set<String>> resourceNames) {
Collection<ConfigResource> resources = resourceNames.entrySet().stream()
.flatMap(entry -> entry.getValue().stream()
.map(name -> new ConfigResource(entry.getKey(), name)))
.collect(Collectors.toList());
return new ListConfigResourcesResult(KafkaFuture.completedFuture(resources));
}
public static ListConfigResourcesResult listConfigResourcesResult(String... names) {
return new ListConfigResourcesResult(
KafkaFuture.completedFuture(Arrays.stream(names)

View File

@ -66,6 +66,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -1396,7 +1397,33 @@ public class MockAdminClient extends AdminClient {
@Override
public ListConfigResourcesResult listConfigResources(Set<ConfigResource.Type> configResourceTypes, ListConfigResourcesOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
KafkaFutureImpl<Collection<ConfigResource>> future = new KafkaFutureImpl<>();
Set<ConfigResource> configResources = new HashSet<>();
if (configResourceTypes.isEmpty() || configResourceTypes.contains(ConfigResource.Type.TOPIC)) {
allTopics.keySet().forEach(name -> configResources.add(new ConfigResource(ConfigResource.Type.TOPIC, name)));
}
if (configResourceTypes.isEmpty() || configResourceTypes.contains(ConfigResource.Type.BROKER)) {
for (int i = 0; i < brokers.size(); i++) {
configResources.add(new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(i)));
}
}
if (configResourceTypes.isEmpty() || configResourceTypes.contains(ConfigResource.Type.BROKER_LOGGER)) {
for (int i = 0; i < brokers.size(); i++) {
configResources.add(new ConfigResource(ConfigResource.Type.BROKER_LOGGER, String.valueOf(i)));
}
}
if (configResourceTypes.isEmpty() || configResourceTypes.contains(ConfigResource.Type.CLIENT_METRICS)) {
clientMetricsConfigs.keySet().forEach(name -> configResources.add(new ConfigResource(ConfigResource.Type.CLIENT_METRICS, name)));
}
if (configResourceTypes.isEmpty() || configResourceTypes.contains(ConfigResource.Type.GROUP)) {
groupConfigs.keySet().forEach(name -> configResources.add(new ConfigResource(ConfigResource.Type.GROUP, name)));
}
future.complete(configResources);
return new ListConfigResourcesResult(future);
}
@Override

View File

@ -21,7 +21,7 @@ import joptsimple._
import kafka.server.DynamicConfig
import kafka.utils.Implicits._
import kafka.utils.Logging
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism}
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListConfigResourcesOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{InvalidConfigurationException, UnsupportedVersionException}
import org.apache.kafka.common.internals.Topic
@ -342,6 +342,42 @@ object ConfigCommand extends Logging {
}
private def describeResourceConfig(adminClient: Admin, entityType: String, entityName: Option[String], describeAll: Boolean): Unit = {
if (!describeAll) {
entityName.foreach { name =>
entityType match {
case TopicType =>
Topic.validate(name)
if (!adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names.get.contains(name)) {
System.out.println(s"The ${entityType.dropRight(1)} '$name' doesn't exist and doesn't have dynamic config.")
return
}
case BrokerType | BrokerLoggerConfigType =>
if (adminClient.describeCluster.nodes.get.stream.anyMatch(_.idString == name)) {
// valid broker id
} else if (name == BrokerDefaultEntityName) {
// default broker configs
} else {
System.out.println(s"The ${entityType.dropRight(1)} '$name' doesn't exist and doesn't have dynamic config.")
return
}
case ClientMetricsType =>
if (adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions).all.get
.stream.noneMatch(_.name == name)) {
System.out.println(s"The ${entityType.dropRight(1)} '$name' doesn't exist and doesn't have dynamic config.")
return
}
case GroupType =>
if (adminClient.listGroups().all.get.stream.noneMatch(_.groupId() == name) &&
adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP), new ListConfigResourcesOptions).all.get
.stream.noneMatch(_.name == name)) {
System.out.println(s"The ${entityType.dropRight(1)} '$name' doesn't exist and doesn't have dynamic config.")
return
}
case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType")
}
}
}
val entities = entityName
.map(name => List(name))
.getOrElse(entityType match {
@ -350,9 +386,10 @@ object ConfigCommand extends Logging {
case BrokerType | BrokerLoggerConfigType =>
adminClient.describeCluster(new DescribeClusterOptions()).nodes().get().asScala.map(_.idString).toSeq :+ BrokerDefaultEntityName
case ClientMetricsType =>
adminClient.listClientMetricsResources().all().get().asScala.map(_.name).toSeq
adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions).all().get().asScala.map(_.name).toSeq
case GroupType =>
adminClient.listGroups().all.get.asScala.map(_.groupId).toSeq
adminClient.listGroups().all.get.asScala.map(_.groupId).toSet ++
adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP), new ListConfigResourcesOptions).all().get().asScala.map(_.name).toSet
case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType")
})

View File

@ -156,6 +156,12 @@ public class ClientMetricsCommand {
List<String> entities;
if (entityNameOpt.isPresent()) {
if (adminClient.listConfigResources(Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions())
.all().get(30, TimeUnit.SECONDS).stream()
.noneMatch(resource -> resource.name().equals(entityNameOpt.get()))) {
System.out.println("The client metric resource " + entityNameOpt.get() + " doesn't exist and doesn't have dynamic config.");
return;
}
entities = Collections.singletonList(entityNameOpt.get());
} else {
Collection<ConfigResource> resources = adminClient

View File

@ -35,6 +35,7 @@ import org.mockito.ArgumentCaptor;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -232,6 +233,10 @@ public class ClientMetricsCommandTest {
ClientMetricsCommand.ClientMetricsService service = new ClientMetricsCommand.ClientMetricsService(adminClient);
ConfigResource cr = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, clientMetricsName);
ListConfigResourcesResult listConfigResourcesResult = AdminClientTestUtils.listConfigResourcesResult(Map.of(
ConfigResource.Type.CLIENT_METRICS, Set.of(clientMetricsName)
));
when(adminClient.listConfigResources(any(), any())).thenReturn(listConfigResourcesResult);
Config cfg = new Config(Collections.singleton(new ConfigEntry("metrics", "org.apache.kafka.producer.")));
DescribeConfigsResult describeResult = AdminClientTestUtils.describeConfigsResult(cr, cfg);
when(adminClient.describeConfigs(any())).thenReturn(describeResult);
@ -249,6 +254,28 @@ public class ClientMetricsCommandTest {
assertTrue(capturedOutput.contains("metrics=org.apache.kafka.producer."));
}
@Test
public void testDescribeNonExistentClientMetric() {
Admin adminClient = mock(Admin.class);
ClientMetricsCommand.ClientMetricsService service = new ClientMetricsCommand.ClientMetricsService(adminClient);
ListConfigResourcesResult listConfigResourcesResult = AdminClientTestUtils.listConfigResourcesResult(Map.of(
ConfigResource.Type.CLIENT_METRICS, Set.of()
));
when(adminClient.listConfigResources(any(), any())).thenReturn(listConfigResourcesResult);
String capturedOutput = ToolsTestUtils.captureStandardOut(() -> {
try {
service.describeClientMetrics(new ClientMetricsCommand.ClientMetricsCommandOptions(
new String[]{"--bootstrap-server", bootstrapServer, "--describe",
"--name", clientMetricsName}));
} catch (Throwable t) {
fail(t);
}
});
assertTrue(capturedOutput.contains("The client metric resource " + clientMetricsName + " doesn't exist and doesn't have dynamic config."));
}
@Test
public void testDescribeAll() {
Admin adminClient = mock(Admin.class);

View File

@ -24,6 +24,7 @@ import org.apache.kafka.clients.admin.AlterConfigsOptions;
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
@ -37,6 +38,7 @@ import org.apache.kafka.test.TestUtils;
import org.mockito.Mockito;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -58,6 +60,8 @@ import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG;
import static org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
import static org.apache.kafka.server.config.ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG;
import static org.apache.kafka.server.config.ServerConfigs.MESSAGE_MAX_BYTES_CONFIG;
import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
@ -128,6 +132,58 @@ public class ConfigCommandIntegrationTest {
"--alter", "--add-config", "consumer.session.timeout.ms=50000"));
message = captureStandardOut(run(command));
assertEquals("Completed updating config for group group.", message);
// A non-existent group with dynamic configs can be described
command = Stream.concat(quorumArgs(), Stream.of(
"--entity-type", "groups",
"--describe"));
message = captureStandardOut(run(command));
assertTrue(message.contains("Dynamic configs for group group are:"));
assertTrue(message.contains("consumer.session.timeout.ms=50000 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.session.timeout.ms=50000}"));
command = Stream.concat(quorumArgs(), Stream.of(
"--entity-type", "groups",
"--entity-name", "group",
"--describe"));
message = captureStandardOut(run(command));
assertTrue(message.contains("Dynamic configs for group group are:"));
assertTrue(message.contains("consumer.session.timeout.ms=50000 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.session.timeout.ms=50000}"));
}
@ClusterTest(serverProperties = {
@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
})
public void testDescribeGroupWithoutDynamicConfigs(ClusterInstance cluster) throws InterruptedException, ExecutionException {
cluster.createTopic("topic", 1, (short) 1);
try (Producer<byte[], byte[]> producer = cluster.producer();
org.apache.kafka.clients.consumer.Consumer<byte[], byte[]> consumer = cluster.consumer(Map.of(
"group.protocol", "consumer",
"group.id", "group"
))) {
producer.send(new org.apache.kafka.clients.producer.ProducerRecord<>("topic", "key".getBytes(), "value".getBytes())).get();
producer.flush();
consumer.subscribe(List.of("topic"));
consumer.poll(Duration.ofMillis(100));
TestUtils.waitForCondition(() -> {
Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
"--entity-type", "groups",
"--describe"));
String message = captureStandardOut(run(command));
return message.contains("Dynamic configs for group group are:");
}, () -> "cannot describe group without dynamic groups");
TestUtils.waitForCondition(() -> {
Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
"--entity-type", "groups",
"--entity-name", "group",
"--describe"));
String message = captureStandardOut(run(command));
return message.contains("Dynamic configs for group group are:");
}, () -> "cannot describe group without dynamic groups");
}
}
@ClusterTest
@ -145,6 +201,21 @@ public class ConfigCommandIntegrationTest {
"--alter", "--add-config", "metrics=org.apache"));
message = captureStandardOut(run(command));
assertEquals("Completed updating config for client-metric cm.", message);
command = Stream.concat(quorumArgs(), Stream.of(
"--entity-type", "client-metrics",
"--describe"));
message = captureStandardOut(run(command));
assertTrue(message.contains("Dynamic configs for client-metric cm are:"));
assertTrue(message.contains("metrics=org.apache sensitive=false synonyms={DYNAMIC_CLIENT_METRICS_CONFIG:metrics=org.apache}"));
command = Stream.concat(quorumArgs(), Stream.of(
"--entity-type", "client-metrics",
"--entity-name", "cm",
"--describe"));
message = captureStandardOut(run(command));
assertTrue(message.contains("Dynamic configs for client-metric cm are:"));
assertTrue(message.contains("metrics=org.apache sensitive=false synonyms={DYNAMIC_CLIENT_METRICS_CONFIG:metrics=org.apache}"));
}
@ClusterTest
@ -471,6 +542,27 @@ public class ConfigCommandIntegrationTest {
}
}
@ClusterTest
public void testDescribeNonExistentConfigResource() {
Map<String, String> configResourceTypeAndNames = Map.of(
"brokers", "3",
"broker-loggers", "3",
"topics", "non-existent",
"groups", "non-existent",
"client-metrics", "non-existent");
configResourceTypeAndNames.forEach((type, name) -> {
Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
"--entity-type", type,
"--entity-name", name,
"--describe"));
String message = captureStandardOut(run(command));
assertTrue(
message.contains("The " + type.substring(0, type.length() - 1) + " '" + name + "' doesn't exist and doesn't have dynamic config."),
"The config resource type " + type + " got unexpected result: " + message
);
});
}
private void assertNonZeroStatusExit(Stream<String> args, Consumer<String> checkErrOut) {
AtomicReference<Integer> exitStatus = new AtomicReference<>();
Exit.setExitProcedure((status, __) -> {