diff --git a/bin/kafka-client-metrics.sh b/bin/kafka-client-metrics.sh new file mode 100755 index 00000000000..f4a1adaaf0e --- /dev/null +++ b/bin/kafka-client-metrics.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.ClientMetricsCommand "$@" diff --git a/bin/windows/kafka-client-metrics.bat b/bin/windows/kafka-client-metrics.bat new file mode 100644 index 00000000000..bd9bcdf4264 --- /dev/null +++ b/bin/windows/kafka-client-metrics.bat @@ -0,0 +1,17 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +"%~dp0kafka-run-class.bat" org.apache.kafka.tools.ClientMetricsCommand %* diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java index a3f4e5fa5d6..8c9244e23d8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.clients.admin; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.stream.Collectors; @@ -101,6 +103,16 @@ public class AdminClientTestUtils { return new AlterConfigsResult(futures); } + /** Helper to create a DescribeConfigsResult instance for a given ConfigResource. + * DescribeConfigsResult's constructor is only accessible from within the + * admin package. + */ + public static DescribeConfigsResult describeConfigsResult(ConfigResource cr, Config config) { + KafkaFutureImpl future = new KafkaFutureImpl<>(); + future.complete(config); + return new DescribeConfigsResult(Collections.singletonMap(cr, future)); + } + /** * Helper to create a CreatePartitionsResult instance for a given Throwable. * CreatePartitionsResult's constructor is only accessible from within the @@ -141,6 +153,19 @@ public class AdminClientTestUtils { return new ListConsumerGroupOffsetsResult(Collections.singletonMap(CoordinatorKey.byGroupId(group), future)); } + public static ListClientMetricsResourcesResult listClientMetricsResourcesResult(String... names) { + return new ListClientMetricsResourcesResult( + KafkaFuture.completedFuture(Arrays.stream(names) + .map(name -> new ClientMetricsResourceListing(name)) + .collect(Collectors.toList()))); + } + + public static ListClientMetricsResourcesResult listClientMetricsResourcesResult(KafkaException exception) { + final KafkaFutureImpl> future = new KafkaFutureImpl<>(); + future.completeExceptionally(exception); + return new ListClientMetricsResourcesResult(future); + } + /** * Helper to create a KafkaAdminClient with a custom HostResolver accessible to tests outside this package. */ diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java b/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java new file mode 100644 index 00000000000..7710ebf0105 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import joptsimple.ArgumentAcceptingOptionSpec; +import joptsimple.OptionSpec; +import joptsimple.OptionSpecBuilder; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.AlterConfigsOptions; +import org.apache.kafka.clients.admin.ClientMetricsResourceListing; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class ClientMetricsCommand { + private static final Logger LOG = LoggerFactory.getLogger(ClientMetricsCommand.class); + + public static void main(String... args) { + Exit.exit(mainNoExit(args)); + } + + static int mainNoExit(String... args) { + try { + execute(args); + return 0; + } catch (Throwable e) { + System.err.println(e.getMessage()); + System.err.println(Utils.stackTrace(e)); + return 1; + } + } + + static void execute(String... args) throws Exception { + ClientMetricsCommandOptions opts = new ClientMetricsCommandOptions(args); + + Properties config = opts.commandConfig(); + config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.bootstrapServer()); + + ClientMetricsService service = new ClientMetricsService(config); + int exitCode = 0; + try { + if (opts.hasAlterOption()) { + service.alterClientMetrics(opts); + } else if (opts.hasDescribeOption()) { + service.describeClientMetrics(opts); + } else if (opts.hasDeleteOption()) { + service.deleteClientMetrics(opts); + } else if (opts.hasListOption()) { + service.listClientMetrics(opts); + } + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause != null) { + printException(cause); + } else { + printException(e); + } + exitCode = 1; + } catch (Throwable t) { + printException(t); + exitCode = 1; + } finally { + service.close(); + Exit.exit(exitCode); + } + } + + public static class ClientMetricsService implements AutoCloseable { + private Admin adminClient; + + public ClientMetricsService(Properties config) { + this.adminClient = Admin.create(config); + } + + // Visible for testing + ClientMetricsService(Admin adminClient) { + this.adminClient = adminClient; + } + + public void alterClientMetrics(ClientMetricsCommandOptions opts) throws Exception { + String entityName = opts.hasGenerateNameOption() ? Uuid.randomUuid().toString() : opts.name().get(); + + Map configsToBeSet = new HashMap<>(); + opts.interval().map(intervalVal -> configsToBeSet.put("interval.ms", intervalVal.toString())); + opts.metrics().map(metricslist -> configsToBeSet.put("metrics", metricslist.stream().collect(Collectors.joining(",")))); + opts.match().map(matchlist -> configsToBeSet.put("match", matchlist.stream().collect(Collectors.joining(",")))); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, entityName); + AlterConfigsOptions alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false); + Collection alterEntries = configsToBeSet.entrySet().stream() + .map(entry -> new AlterConfigOp(new ConfigEntry(entry.getKey(), entry.getValue()), + entry.getValue().isEmpty() ? AlterConfigOp.OpType.DELETE : AlterConfigOp.OpType.SET)) + .collect(Collectors.toList()); + adminClient.incrementalAlterConfigs(Collections.singletonMap(configResource, alterEntries), alterOptions).all() + .get(30, TimeUnit.SECONDS); + + System.out.println("Altered client metrics config for " + entityName + "."); + } + + public void deleteClientMetrics(ClientMetricsCommandOptions opts) throws Exception { + String entityName = opts.name().get(); + Collection oldConfigs = getClientMetricsConfig(entityName); + + ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, entityName); + AlterConfigsOptions alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false); + Collection alterEntries = oldConfigs.stream() + .map(entry -> new AlterConfigOp(entry, AlterConfigOp.OpType.DELETE)).collect(Collectors.toList()); + adminClient.incrementalAlterConfigs(Collections.singletonMap(configResource, alterEntries), alterOptions) + .all().get(30, TimeUnit.SECONDS); + + System.out.println("Deleted client metrics config for " + entityName + "."); + } + + public void describeClientMetrics(ClientMetricsCommandOptions opts) throws Exception { + Optional entityNameOpt = opts.name(); + + List entities; + if (entityNameOpt.isPresent()) { + entities = Collections.singletonList(entityNameOpt.get()); + } else { + Collection resources = adminClient.listClientMetricsResources() + .all().get(30, TimeUnit.SECONDS); + entities = resources.stream().map(ClientMetricsResourceListing::name).collect(Collectors.toList()); + } + + for (String entity : entities) { + System.out.println("Client metrics configs for " + entity + " are:"); + getClientMetricsConfig(entity) + .forEach(entry -> System.out.println(" " + entry.name() + "=" + entry.value())); + } + } + + public void listClientMetrics(ClientMetricsCommandOptions opts) throws Exception { + Collection resources = adminClient.listClientMetricsResources() + .all().get(30, TimeUnit.SECONDS); + String results = resources.stream().map(ClientMetricsResourceListing::name).collect(Collectors.joining("\n")); + System.out.println(results); + } + + private Collection getClientMetricsConfig(String entityName) throws Exception { + ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, entityName); + Map result = adminClient.describeConfigs(Collections.singleton(configResource)) + .all().get(30, TimeUnit.SECONDS); + return result.get(configResource).entries(); + } + + @Override + public void close() throws Exception { + adminClient.close(); + } + } + + private static void printException(Throwable e) { + System.out.println("Error while executing client metrics command : " + e.getMessage()); + LOG.error(Utils.stackTrace(e)); + } + + public static final class ClientMetricsCommandOptions extends CommandDefaultOptions { + private final ArgumentAcceptingOptionSpec bootstrapServerOpt; + + private final ArgumentAcceptingOptionSpec commandConfigOpt; + + private final OptionSpecBuilder alterOpt; + + private final OptionSpecBuilder deleteOpt; + + private final OptionSpecBuilder describeOpt; + + private final OptionSpecBuilder listOpt; + + private final ArgumentAcceptingOptionSpec nameOpt; + + private final OptionSpecBuilder generateNameOpt; + + private final ArgumentAcceptingOptionSpec intervalOpt; + + private final String nl; + + private final ArgumentAcceptingOptionSpec matchOpt; + + private final ArgumentAcceptingOptionSpec metricsOpt; + + public ClientMetricsCommandOptions(String[] args) { + super(args); + bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: The Kafka server to connect to.") + .withRequiredArg() + .describedAs("server to connect to") + .ofType(String.class); + commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.") + .withRequiredArg() + .describedAs("command config property file") + .ofType(String.class); + + alterOpt = parser.accepts("alter", "Alter the configuration for the client metrics resource."); + deleteOpt = parser.accepts("delete", "Delete the configuration for the client metrics resource."); + describeOpt = parser.accepts("describe", "List configurations for the client metrics resource."); + listOpt = parser.accepts("list", "List the client metrics resources."); + + nameOpt = parser.accepts("name", "Name of client metrics configuration resource.") + .withRequiredArg() + .describedAs("name") + .ofType(String.class); + generateNameOpt = parser.accepts("generate-name", "Generate a UUID to use as the name."); + intervalOpt = parser.accepts("interval", "The metrics push interval in milliseconds.") + .withRequiredArg() + .describedAs("push interval") + .ofType(java.lang.Integer.class); + + nl = System.getProperty("line.separator"); + + String[] matchSelectors = new String[] { + "client_id", "client_instance_id", "client_software_name", + "client_software_version", "client_source_address", "client_source_port" + }; + String matchSelectorNames = Arrays.stream(matchSelectors).map(config -> "\t" + config).collect(Collectors.joining(nl)); + matchOpt = parser.accepts("match", "Matching selector 'k1=v1,k2=v2'. The following is a list of valid selector names: " + + nl + matchSelectorNames) + .withRequiredArg() + .describedAs("k1=v1,k2=v2") + .ofType(String.class) + .withValuesSeparatedBy(','); + metricsOpt = parser.accepts("metrics", "Telemetry metric name prefixes 'm1,m2'.") + .withRequiredArg() + .describedAs("m1,m2") + .ofType(String.class) + .withValuesSeparatedBy(','); + + options = parser.parse(args); + + checkArgs(); + } + + public Boolean has(OptionSpec builder) { + return options.has(builder); + } + + public Optional valueAsOption(OptionSpec option) { + return valueAsOption(option, Optional.empty()); + } + + public Optional> valuesAsOption(OptionSpec option) { + return valuesAsOption(option, Optional.empty()); + } + + public Optional valueAsOption(OptionSpec option, Optional defaultValue) { + if (has(option)) { + return Optional.of(options.valueOf(option)); + } else { + return defaultValue; + } + } + + public Optional> valuesAsOption(OptionSpec option, Optional> defaultValue) { + return options.has(option) ? Optional.of(options.valuesOf(option)) : defaultValue; + } + + public String bootstrapServer() { + return options.valueOf(bootstrapServerOpt); + } + + public Properties commandConfig() throws IOException { + if (has(commandConfigOpt)) { + return Utils.loadProps(options.valueOf(commandConfigOpt)); + } else { + return new Properties(); + } + } + + public boolean hasAlterOption() { + return has(alterOpt); + } + + public boolean hasDeleteOption() { + return has(deleteOpt); + } + + public boolean hasDescribeOption() { + return has(describeOpt); + } + + public boolean hasListOption() { + return has(listOpt); + } + + public Optional name() { + return valueAsOption(nameOpt); + } + + public boolean hasGenerateNameOption() { + return has(generateNameOpt); + } + + public Optional> metrics() { + return valuesAsOption(metricsOpt); + } + + public Optional interval() { + return valueAsOption(intervalOpt); + } + + public Optional> match() { + return valuesAsOption(matchOpt); + } + + public void checkArgs() { + if (args.length == 0) + CommandLineUtils.printUsageAndExit(parser, "This tool helps to manipulate and describe client metrics configurations."); + + CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to manipulate and describe client metrics configurations."); + + // should have exactly one action + long actions = + Arrays.asList(alterOpt, deleteOpt, describeOpt, listOpt) + .stream().filter(options::has) + .count(); + if (actions != 1) + CommandLineUtils.printUsageAndExit(parser, "Command must include exactly one action: --alter, --delete, --describe or --list."); + + // check required args + if (!has(bootstrapServerOpt)) + throw new IllegalArgumentException("--bootstrap-server must be specified."); + + // check invalid args + CommandLineUtils.checkInvalidArgs(parser, options, deleteOpt, generateNameOpt, intervalOpt, matchOpt, metricsOpt); + CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, generateNameOpt, intervalOpt, matchOpt, metricsOpt); + CommandLineUtils.checkInvalidArgs(parser, options, listOpt, nameOpt, generateNameOpt, intervalOpt, matchOpt, metricsOpt); + + boolean isNamePresent = has(nameOpt); + + if (has(alterOpt)) { + if ((isNamePresent && has(generateNameOpt)) || (!isNamePresent && !has(generateNameOpt))) + throw new IllegalArgumentException("One of --name or --generate-name must be specified with --alter."); + } + + if (has(deleteOpt) && !isNamePresent) + throw new IllegalArgumentException("A client metrics resource name must be specified with --delete."); + } + } +} \ No newline at end of file diff --git a/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java new file mode 100644 index 00000000000..ce71674819b --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/ClientMetricsCommandTest.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import kafka.utils.Exit; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientTestUtils; +import org.apache.kafka.clients.admin.AlterConfigsResult; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.DescribeConfigsResult; +import org.apache.kafka.clients.admin.ListClientMetricsResourcesResult; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.protocol.Errors; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ClientMetricsCommandTest { + private String bootstrapServer = "localhost:9092"; + private String clientMetricsName = "cm"; + + @Test + public void testOptionsNoActionFails() { + assertInitializeInvalidOptionsExitCode(1, + new String[] {"--bootstrap-server", bootstrapServer}); + } + + @Test + public void testOptionsListSucceeds() { + ClientMetricsCommand.ClientMetricsCommandOptions opts = new ClientMetricsCommand.ClientMetricsCommandOptions( + new String[] {"--bootstrap-server", bootstrapServer, "--list"}); + assertTrue(opts.hasListOption()); + } + + @Test + public void testOptionsDescribeNoNameSucceeds() { + ClientMetricsCommand.ClientMetricsCommandOptions opts = new ClientMetricsCommand.ClientMetricsCommandOptions( + new String[] {"--bootstrap-server", bootstrapServer, "--describe"}); + assertTrue(opts.hasDescribeOption()); + } + + @Test + public void testOptionsDescribeWithNameSucceeds() { + ClientMetricsCommand.ClientMetricsCommandOptions opts = new ClientMetricsCommand.ClientMetricsCommandOptions( + new String[] {"--bootstrap-server", bootstrapServer, "--describe", "--name", clientMetricsName}); + assertTrue(opts.hasDescribeOption()); + } + + @Test + public void testOptionsDeleteNoNameFails() { + assertInitializeInvalidOptionsExitCode(1, + new String[] {"--bootstrap-server", bootstrapServer, "--delete"}); + } + + @Test + public void testOptionsDeleteWithNameSucceeds() { + ClientMetricsCommand.ClientMetricsCommandOptions opts = new ClientMetricsCommand.ClientMetricsCommandOptions( + new String[] {"--bootstrap-server", bootstrapServer, "--delete", "--name", clientMetricsName}); + assertTrue(opts.hasDeleteOption()); + } + + @Test + public void testOptionsAlterNoNameFails() { + assertInitializeInvalidOptionsExitCode(1, + new String[] {"--bootstrap-server", bootstrapServer, "--alter"}); + } + + @Test + public void testOptionsAlterGenerateNameSucceeds() { + ClientMetricsCommand.ClientMetricsCommandOptions opts = new ClientMetricsCommand.ClientMetricsCommandOptions( + new String[] {"--bootstrap-server", bootstrapServer, "--alter", "--generate-name"}); + assertTrue(opts.hasAlterOption()); + } + + @Test + public void testOptionsAlterWithNameSucceeds() { + ClientMetricsCommand.ClientMetricsCommandOptions opts = new ClientMetricsCommand.ClientMetricsCommandOptions( + new String[] {"--bootstrap-server", bootstrapServer, "--alter", "--name", clientMetricsName}); + assertTrue(opts.hasAlterOption()); + } + + @Test + public void testOptionsAlterAllOptionsSucceeds() { + ClientMetricsCommand.ClientMetricsCommandOptions opts = new ClientMetricsCommand.ClientMetricsCommandOptions( + new String[] {"--bootstrap-server", bootstrapServer, "--alter", "--name", clientMetricsName, + "--interval", "1000", "--match", "client_id=abc", "--metrics", "org.apache.kafka."}); + assertTrue(opts.hasAlterOption()); + + } + + @Test + public void testAlter() { + Admin adminClient = mock(Admin.class); + ClientMetricsCommand.ClientMetricsService service = new ClientMetricsCommand.ClientMetricsService(adminClient); + + AlterConfigsResult result = AdminClientTestUtils.alterConfigsResult(new ConfigResource(ConfigResource.Type.CLIENT_METRICS, clientMetricsName)); + when(adminClient.incrementalAlterConfigs(any(), any())).thenReturn(result); + + String capturedOutput = ToolsTestUtils.captureStandardOut(() -> { + try { + service.alterClientMetrics(new ClientMetricsCommand.ClientMetricsCommandOptions( + new String[]{"--bootstrap-server", bootstrapServer, "--alter", + "--name", clientMetricsName, "--metrics", "org.apache.kafka.producer.", + "--interval", "5000", "--match", "client_id=CLIENT1"})); + } catch (Throwable t) { + fail(t); + } + }); + assertTrue(capturedOutput.contains("Altered client metrics config for " + clientMetricsName + ".")); + } + + @Test + public void testAlterGenerateName() { + Admin adminClient = mock(Admin.class); + ClientMetricsCommand.ClientMetricsService service = new ClientMetricsCommand.ClientMetricsService(adminClient); + + AlterConfigsResult result = AdminClientTestUtils.alterConfigsResult(new ConfigResource(ConfigResource.Type.CLIENT_METRICS, "whatever")); + when(adminClient.incrementalAlterConfigs(any(), any())).thenReturn(result); + + String capturedOutput = ToolsTestUtils.captureStandardOut(() -> { + try { + service.alterClientMetrics(new ClientMetricsCommand.ClientMetricsCommandOptions( + new String[]{"--bootstrap-server", bootstrapServer, "--alter", + "--generate-name", "--metrics", "org.apache.kafka.producer.", + "--interval", "5000", "--match", "client_id=CLIENT1"})); + } catch (Throwable t) { + fail(t); + } + }); + assertTrue(capturedOutput.contains("Altered client metrics config")); + } + + @Test + public void testDelete() { + Admin adminClient = mock(Admin.class); + ClientMetricsCommand.ClientMetricsService service = new ClientMetricsCommand.ClientMetricsService(adminClient); + + ConfigResource cr = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, clientMetricsName); + Config cfg = new Config(Collections.singleton(new ConfigEntry("metrics", "org.apache.kafka.producer."))); + DescribeConfigsResult describeResult = AdminClientTestUtils.describeConfigsResult(cr, cfg); + when(adminClient.describeConfigs(any())).thenReturn(describeResult); + AlterConfigsResult alterResult = AdminClientTestUtils.alterConfigsResult(cr); + when(adminClient.incrementalAlterConfigs(any(), any())).thenReturn(alterResult); + + String capturedOutput = ToolsTestUtils.captureStandardOut(() -> { + try { + service.deleteClientMetrics(new ClientMetricsCommand.ClientMetricsCommandOptions( + new String[]{"--bootstrap-server", bootstrapServer, "--delete", + "--name", clientMetricsName})); + } catch (Throwable t) { + fail(t); + } + }); + assertTrue(capturedOutput.contains("Deleted client metrics config for " + clientMetricsName + ".")); + } + + @Test + public void testDescribe() { + Admin adminClient = mock(Admin.class); + ClientMetricsCommand.ClientMetricsService service = new ClientMetricsCommand.ClientMetricsService(adminClient); + + ConfigResource cr = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, clientMetricsName); + Config cfg = new Config(Collections.singleton(new ConfigEntry("metrics", "org.apache.kafka.producer."))); + DescribeConfigsResult describeResult = AdminClientTestUtils.describeConfigsResult(cr, cfg); + when(adminClient.describeConfigs(any())).thenReturn(describeResult); + + String capturedOutput = ToolsTestUtils.captureStandardOut(() -> { + try { + service.describeClientMetrics(new ClientMetricsCommand.ClientMetricsCommandOptions( + new String[]{"--bootstrap-server", bootstrapServer, "--describe", + "--name", clientMetricsName})); + } catch (Throwable t) { + fail(t); + } + }); + assertTrue(capturedOutput.contains("Client metrics configs for " + clientMetricsName + " are:")); + assertTrue(capturedOutput.contains("metrics=org.apache.kafka.producer.")); + } + + @Test + public void testDescribeAll() { + Admin adminClient = mock(Admin.class); + ClientMetricsCommand.ClientMetricsService service = new ClientMetricsCommand.ClientMetricsService(adminClient); + + ListClientMetricsResourcesResult result = AdminClientTestUtils.listClientMetricsResourcesResult(clientMetricsName); + when(adminClient.listClientMetricsResources()).thenReturn(result); + ConfigResource cr = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, clientMetricsName); + Config cfg = new Config(Collections.singleton(new ConfigEntry("metrics", "org.apache.kafka.producer."))); + DescribeConfigsResult describeResult = AdminClientTestUtils.describeConfigsResult(cr, cfg); + when(adminClient.describeConfigs(any())).thenReturn(describeResult); + + String capturedOutput = ToolsTestUtils.captureStandardOut(() -> { + try { + service.describeClientMetrics(new ClientMetricsCommand.ClientMetricsCommandOptions( + new String[]{"--bootstrap-server", bootstrapServer, "--describe"})); + } catch (Throwable t) { + fail(t); + } + }); + assertTrue(capturedOutput.contains("Client metrics configs for " + clientMetricsName + " are:")); + assertTrue(capturedOutput.contains("metrics=org.apache.kafka.producer.")); + } + + @Test + public void testList() { + Admin adminClient = mock(Admin.class); + ClientMetricsCommand.ClientMetricsService service = new ClientMetricsCommand.ClientMetricsService(adminClient); + + ListClientMetricsResourcesResult result = AdminClientTestUtils.listClientMetricsResourcesResult("one", "two"); + when(adminClient.listClientMetricsResources()).thenReturn(result); + + String capturedOutput = ToolsTestUtils.captureStandardOut(() -> { + try { + service.listClientMetrics(new ClientMetricsCommand.ClientMetricsCommandOptions( + new String[]{"--bootstrap-server", bootstrapServer, "--list"})); + } catch (Throwable t) { + fail(t); + } + }); + assertEquals("one,two", Arrays.stream(capturedOutput.split("\n")).collect(Collectors.joining(","))); + } + + @Test + public void testListFailsWithUnsupportedVersionException() { + Admin adminClient = mock(Admin.class); + ClientMetricsCommand.ClientMetricsService service = new ClientMetricsCommand.ClientMetricsService(adminClient); + + ListClientMetricsResourcesResult result = AdminClientTestUtils.listClientMetricsResourcesResult(Errors.UNSUPPORTED_VERSION.exception()); + when(adminClient.listClientMetricsResources()).thenReturn(result); + + assertThrows(ExecutionException.class, + () -> service.listClientMetrics(new ClientMetricsCommand.ClientMetricsCommandOptions( + new String[] {"--bootstrap-server", bootstrapServer, "--list"}))); + } + + private void assertInitializeInvalidOptionsExitCode(int expected, String[] options) { + Exit.setExitProcedure((exitCode, message) -> { + assertEquals(expected, exitCode); + throw new RuntimeException(); + }); + try { + assertThrows(RuntimeException.class, () -> new ClientMetricsCommand.ClientMetricsCommandOptions(options)); + } finally { + Exit.resetExitProcedure(); + } + } +}