KAFKA-18984: Reset interval.ms By Using kafka-client-metrics.sh (#19213)

kafka-client-metrics.sh cannot reset the interval using `--interval=`.

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Parker Chang 2025-03-24 18:53:49 +08:00 committed by Luke Chen
parent 33fa572a1c
commit 4844bc3067
2 changed files with 67 additions and 8 deletions

View File

@ -119,7 +119,7 @@ public class ClientMetricsCommand {
String entityName = opts.hasGenerateNameOption() ? Uuid.randomUuid().toString() : opts.name().get(); String entityName = opts.hasGenerateNameOption() ? Uuid.randomUuid().toString() : opts.name().get();
Map<String, String> configsToBeSet = new HashMap<>(); Map<String, String> configsToBeSet = new HashMap<>();
opts.interval().map(intervalVal -> configsToBeSet.put("interval.ms", intervalVal.toString())); opts.interval().map(intervalVal -> configsToBeSet.put("interval.ms", intervalVal));
opts.metrics().map(metricslist -> configsToBeSet.put("metrics", String.join(",", metricslist))); opts.metrics().map(metricslist -> configsToBeSet.put("metrics", String.join(",", metricslist)));
opts.match().map(matchlist -> configsToBeSet.put("match", String.join(",", matchlist))); opts.match().map(matchlist -> configsToBeSet.put("match", String.join(",", matchlist)));
@ -210,7 +210,7 @@ public class ClientMetricsCommand {
private final OptionSpecBuilder generateNameOpt; private final OptionSpecBuilder generateNameOpt;
private final ArgumentAcceptingOptionSpec<Integer> intervalOpt; private final ArgumentAcceptingOptionSpec<String> intervalOpt;
private final ArgumentAcceptingOptionSpec<String> matchOpt; private final ArgumentAcceptingOptionSpec<String> matchOpt;
@ -237,12 +237,13 @@ public class ClientMetricsCommand {
.describedAs("name") .describedAs("name")
.ofType(String.class); .ofType(String.class);
generateNameOpt = parser.accepts("generate-name", "Generate a UUID to use as the name."); generateNameOpt = parser.accepts("generate-name", "Generate a UUID to use as the name.");
intervalOpt = parser.accepts("interval", "The metrics push interval in milliseconds.") String nl = System.lineSeparator();
intervalOpt = parser.accepts("interval", "The metrics push interval in milliseconds." + nl + "Leave empty to reset the interval.")
.withRequiredArg() .withRequiredArg()
.describedAs("push interval") .describedAs("push interval")
.ofType(java.lang.Integer.class); .ofType(String.class);
String nl = System.lineSeparator();
String[] matchSelectors = new String[] { String[] matchSelectors = new String[] {
"client_id", "client_instance_id", "client_software_name", "client_id", "client_instance_id", "client_software_name",
@ -329,7 +330,7 @@ public class ClientMetricsCommand {
return valuesAsOption(metricsOpt); return valuesAsOption(metricsOpt);
} }
public Optional<Integer> interval() { public Optional<String> interval() {
return valueAsOption(intervalOpt); return valueAsOption(intervalOpt);
} }
@ -362,6 +363,18 @@ public class ClientMetricsCommand {
if (has(alterOpt)) { if (has(alterOpt)) {
if ((isNamePresent && has(generateNameOpt)) || (!isNamePresent && !has(generateNameOpt))) if ((isNamePresent && has(generateNameOpt)) || (!isNamePresent && !has(generateNameOpt)))
throw new IllegalArgumentException("One of --name or --generate-name must be specified with --alter."); throw new IllegalArgumentException("One of --name or --generate-name must be specified with --alter.");
interval().ifPresent(intervalStr -> {
if (!intervalStr.isEmpty()) {
try {
Integer.parseInt(intervalStr);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"Invalid interval value. Enter an integer, or leave empty to reset.");
}
}
});
} }
if (has(deleteOpt) && !isNamePresent) if (has(deleteOpt) && !isNamePresent)

View File

@ -19,6 +19,7 @@ package org.apache.kafka.tools;
import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientTestUtils; import org.apache.kafka.clients.admin.AdminClientTestUtils;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsResult; import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.ConfigEntry;
@ -29,11 +30,15 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Exit;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.fail;
@ -114,6 +119,14 @@ public class ClientMetricsCommandTest {
} }
@Test
public void testOptionsAlterInvalidInterval() {
Exception exception = assertThrows(IllegalArgumentException.class, () -> new ClientMetricsCommand.ClientMetricsCommandOptions(
new String[]{"--bootstrap-server", bootstrapServer, "--alter", "--name", clientMetricsName,
"--interval", "abc"}));
assertEquals("Invalid interval value. Enter an integer, or leave empty to reset.", exception.getMessage());
}
@Test @Test
public void testAlter() { public void testAlter() {
Admin adminClient = mock(Admin.class); Admin adminClient = mock(Admin.class);
@ -156,6 +169,39 @@ public class ClientMetricsCommandTest {
assertTrue(capturedOutput.contains("Altered client metrics config")); assertTrue(capturedOutput.contains("Altered client metrics config"));
} }
@Test
public void testAlterResetConfigs() {
Admin adminClient = mock(Admin.class);
ClientMetricsCommand.ClientMetricsService service = new ClientMetricsCommand.ClientMetricsService(adminClient);
AlterConfigsResult result = AdminClientTestUtils.alterConfigsResult(new ConfigResource(ConfigResource.Type.CLIENT_METRICS, clientMetricsName));
@SuppressWarnings("unchecked")
final ArgumentCaptor<Map<ConfigResource, Collection<AlterConfigOp>>> configCaptor = ArgumentCaptor.forClass(Map.class);
when(adminClient.incrementalAlterConfigs(configCaptor.capture(), any())).thenReturn(result);
String capturedOutput = ToolsTestUtils.captureStandardOut(() -> {
try {
service.alterClientMetrics(new ClientMetricsCommand.ClientMetricsCommandOptions(
new String[]{"--bootstrap-server", bootstrapServer, "--alter",
"--name", clientMetricsName, "--metrics", "",
"--interval", "", "--match", ""}));
} catch (Throwable t) {
fail(t);
}
});
Map<ConfigResource, Collection<AlterConfigOp>> alteredConfigOps = configCaptor.getValue();
assertNotNull(alteredConfigOps, "alteredConfigOps should not be null");
assertEquals(1, alteredConfigOps.size(), "Should have exactly one ConfigResource");
assertEquals(3, alteredConfigOps.values().iterator().next().size(), "Should have exactly 3 operations");
for (Collection<AlterConfigOp> operations : alteredConfigOps.values()) {
for (AlterConfigOp op : operations) {
assertEquals(AlterConfigOp.OpType.DELETE, op.opType(),
"Expected DELETE operation for config: " + op.configEntry().name());
}
}
assertTrue(capturedOutput.contains("Altered client metrics config for " + clientMetricsName + "."));
}
@Test @Test
public void testDelete() { public void testDelete() {
Admin adminClient = mock(Admin.class); Admin adminClient = mock(Admin.class);