KAFKA-19498 Add include argument to ConsumerPerformance tool (#20221)

This patch adds the include argument to ConsumerPerformance tool.

ConsoleConsumer and ConsumerPerformance serve different purposes but
share common functionality for message consumption. Currently, there's
an inconsistency in their command-line interfaces:

- ConsoleConsumer supports an --include argument that allows users to
specify a regular expression pattern to filter topics for consumption
- ConsumerPerformance lacks this topic filtering capability, requiring
users to specify a single topic explicitly via --topic argument

This inconsistency creates two problems:

- Similar tools should provide similar topic selection capabilities for
better user experience
- Users cannot test consumer performance across multiple topics or
dynamically matching topic sets, making it difficult to test realistic
scenarios

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Federico Valeri 2025-08-24 22:15:37 +02:00 committed by GitHub
parent ecd5b4c157
commit f97b95c60a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 206 additions and 15 deletions

View File

@ -40,6 +40,10 @@
<li>
The <code>PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG</code> in <code>ProducerConfig</code> was deprecated and will be removed in Kafka 5.0. Please use the <code>PARTITIONER_ADAPTIVE_PARTITIONING_ENABLE_CONFIG</code> instead.
</li>
<li>
The <code>ConsumerPerformance</code> command line tool has a new <code>include</code> option that is alternative to the <code>topic</code> option.
This new option allows to pass a regular expression specifying a list of topics to include for consumption, which is useful to test consumer performance across multiple topics or dynamically matching topic sets.
</li>
</ul>
<h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>

View File

@ -23,10 +23,12 @@ import org.apache.kafka.common.utils.Exit;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
@ -135,6 +137,29 @@ public class CommandLineUtils {
Exit.exit(1, message);
}
/**
* Check that exactly one of a set of mutually exclusive arguments is present.
*/
public static void checkOneOfArgs(OptionParser parser, OptionSet options, OptionSpec<?>... optionSpecs) {
if (optionSpecs == null || optionSpecs.length == 0) {
throw new IllegalArgumentException("At least one option must be provided");
}
int presentCount = 0;
for (OptionSpec<?> spec : optionSpecs) {
if (options.has(spec)) {
presentCount++;
}
}
if (presentCount != 1) {
printUsageAndExit(parser, "Exactly one of the following arguments is required: " +
Arrays.stream(optionSpecs)
.map(Object::toString)
.collect(Collectors.joining(", ")));
}
}
public static void printUsageAndExit(OptionParser parser, String message) {
System.err.println(message);
try {

View File

@ -16,6 +16,8 @@
*/
package org.apache.kafka.server.util;
import org.apache.kafka.common.utils.Exit;
import org.junit.jupiter.api.Test;
import java.util.List;
@ -26,9 +28,12 @@ import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class CommandLineUtilsTest {
@Test
@ -266,4 +271,105 @@ public class CommandLineUtilsTest {
() -> CommandLineUtils.initializeBootstrapProperties(createTestProps(),
Optional.of("127.0.0.2:9094"), Optional.of("127.0.0.3:9095"))).getMessage());
}
private OptionSpec<String> createMockOptionSpec(String name) {
OptionSpec<String> spec = mock(OptionSpec.class);
when(spec.toString()).thenReturn("[" + name.replaceAll("--", "") + "]");
return spec;
}
@Test
void testCheckOneOfArgsNoOptions() {
OptionParser parser = mock(OptionParser.class);
OptionSet options = mock(OptionSet.class);
IllegalArgumentException e = assertThrows(IllegalArgumentException.class, () ->
CommandLineUtils.checkOneOfArgs(parser, options)
);
assertEquals("At least one option must be provided", e.getMessage());
}
@Test
void testCheckOneOfArgsOnePresent() {
OptionParser parser = mock(OptionParser.class);
OptionSet options = mock(OptionSet.class);
OptionSpec<String> opt1 = createMockOptionSpec("--first-option");
OptionSpec<String> opt2 = createMockOptionSpec("--second-option");
OptionSpec<String> opt3 = createMockOptionSpec("--third-option");
when(options.has(opt1)).thenReturn(true);
when(options.has(opt2)).thenReturn(false);
when(options.has(opt3)).thenReturn(false);
assertDoesNotThrow(() ->
CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2, opt3)
);
when(options.has(opt1)).thenReturn(false);
when(options.has(opt2)).thenReturn(true);
assertDoesNotThrow(() ->
CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2, opt3)
);
when(options.has(opt2)).thenReturn(false);
when(options.has(opt3)).thenReturn(true);
assertDoesNotThrow(() ->
CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2, opt3)
);
}
@Test
void testCheckOneOfArgsNonePresent() {
Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message);
});
OptionParser parser = mock(OptionParser.class);
OptionSet options = mock(OptionSet.class);
OptionSpec<String> opt1 = createMockOptionSpec("--first-option");
OptionSpec<String> opt2 = createMockOptionSpec("--second-option");
OptionSpec<String> opt3 = createMockOptionSpec("--third-option");
when(options.has(opt1)).thenReturn(false);
when(options.has(opt2)).thenReturn(false);
when(options.has(opt3)).thenReturn(false);
try {
IllegalArgumentException e = assertThrows(IllegalArgumentException.class,
() -> CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2, opt3));
assertEquals("Exactly one of the following arguments is required: " +
"[first-option], [second-option], [third-option]", e.getMessage());
} finally {
Exit.resetExitProcedure();
}
}
@Test
void testCheckOneOfArgsMultiplePresent() {
Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message);
});
OptionParser parser = mock(OptionParser.class);
OptionSet options = mock(OptionSet.class);
OptionSpec<String> opt1 = createMockOptionSpec("--first-option");
OptionSpec<String> opt2 = createMockOptionSpec("--second-option");
OptionSpec<String> opt3 = createMockOptionSpec("--third-option");
when(options.has(opt1)).thenReturn(true);
when(options.has(opt2)).thenReturn(true);
when(options.has(opt3)).thenReturn(false);
try {
IllegalArgumentException e = assertThrows(IllegalArgumentException.class,
() -> CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2, opt3));
assertEquals("Exactly one of the following arguments is required: " +
"[first-option], [second-option], [third-option]", e.getMessage());
} finally {
Exit.resetExitProcedure();
}
}
}

View File

@ -37,11 +37,13 @@ import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import joptsimple.OptionException;
import joptsimple.OptionSpec;
@ -134,8 +136,13 @@ public class ConsumerPerformance {
long reportingIntervalMs = options.reportingIntervalMs();
boolean showDetailedStats = options.showDetailedStats();
SimpleDateFormat dateFormat = options.dateFormat();
consumer.subscribe(options.topic(),
new ConsumerPerfRebListener(joinTimeMs, joinStartMs, joinTimeMsInSingleRound));
ConsumerPerfRebListener listener = new ConsumerPerfRebListener(joinTimeMs, joinStartMs, joinTimeMsInSingleRound);
if (options.topic().isPresent()) {
consumer.subscribe(options.topic().get(), listener);
} else {
consumer.subscribe(options.include().get(), listener);
}
// now start the benchmark
long currentTimeMs = System.currentTimeMillis();
@ -246,6 +253,7 @@ public class ConsumerPerformance {
protected static class ConsumerPerfOptions extends CommandDefaultOptions {
private final OptionSpec<String> bootstrapServerOpt;
private final OptionSpec<String> topicOpt;
private final OptionSpec<String> includeOpt;
private final OptionSpec<String> groupIdOpt;
private final OptionSpec<Integer> fetchSizeOpt;
private final OptionSpec<Void> resetBeginningOffsetOpt;
@ -265,10 +273,14 @@ public class ConsumerPerformance {
.withRequiredArg()
.describedAs("server to connect to")
.ofType(String.class);
topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
topicOpt = parser.accepts("topic", "The topic to consume from.")
.withRequiredArg()
.describedAs("topic")
.ofType(String.class);
includeOpt = parser.accepts("include", "Regular expression specifying list of topics to include for consumption.")
.withRequiredArg()
.describedAs("Java regex (String)")
.ofType(String.class);
groupIdOpt = parser.accepts("group", "The group id to consume on.")
.withRequiredArg()
.describedAs("gid")
@ -323,7 +335,8 @@ public class ConsumerPerformance {
}
if (options != null) {
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is used to verify the consumer performance.");
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt);
CommandLineUtils.checkRequiredArgs(parser, options, numMessagesOpt);
CommandLineUtils.checkOneOfArgs(parser, options, topicOpt, includeOpt);
}
}
@ -353,8 +366,16 @@ public class ConsumerPerformance {
return props;
}
public Set<String> topic() {
return Set.of(options.valueOf(topicOpt));
public Optional<Collection<String>> topic() {
return options.has(topicOpt)
? Optional.of(List.of(options.valueOf(topicOpt)))
: Optional.empty();
}
public Optional<Pattern> include() {
return options.has(includeOpt)
? Optional.of(Pattern.compile(options.valueOf(includeOpt)))
: Optional.empty();
}
public long numMessages() {

View File

@ -25,10 +25,8 @@ import org.apache.kafka.server.util.CommandDefaultOptions;
import org.apache.kafka.server.util.CommandLineUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
@ -185,12 +183,8 @@ public final class ConsoleConsumerOptions extends CommandDefaultOptions {
}
private void checkRequiredArgs() {
List<Optional<String>> topicOrFilterArgs = new ArrayList<>(List.of(topicArg(), includedTopicsArg()));
topicOrFilterArgs.removeIf(Optional::isEmpty);
// user need to specify value for either --topic or --include options
if (topicOrFilterArgs.size() != 1) {
CommandLineUtils.printUsageAndExit(parser, "Exactly one of --include/--topic is required. ");
}
CommandLineUtils.checkOneOfArgs(parser, options, topicOpt, includeOpt);
if (partitionArg().isPresent()) {
if (!options.has(topicOpt)) {

View File

@ -75,7 +75,7 @@ public class ConsumerPerformanceTest {
ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args);
assertEquals("localhost:9092", config.brokerHostsAndPorts());
assertTrue(config.topic().contains("test"));
assertTrue(config.topic().get().contains("test"));
assertEquals(10, config.numMessages());
}
@ -93,6 +93,47 @@ public class ConsumerPerformanceTest {
assertTrue(err.contains("new-consumer is not a recognized option"));
}
@Test
public void testConfigWithInclude() {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--include", "test.*",
"--messages", "10"
};
ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args);
assertEquals("localhost:9092", config.brokerHostsAndPorts());
assertTrue(config.include().get().toString().contains("test.*"));
assertEquals(10, config.numMessages());
}
@Test
public void testConfigWithTopicAndInclude() {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--include", "test.*",
"--messages", "10"
};
String err = ToolsTestUtils.captureStandardErr(() -> new ConsumerPerformance.ConsumerPerfOptions(args));
assertTrue(err.contains("Exactly one of the following arguments is required: [topic], [include]"));
}
@Test
public void testConfigWithoutTopicAndInclude() {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--messages", "10"
};
String err = ToolsTestUtils.captureStandardErr(() -> new ConsumerPerformance.ConsumerPerfOptions(args));
assertTrue(err.contains("Exactly one of the following arguments is required: [topic], [include]"));
}
@Test
public void testClientIdOverride() throws IOException {
File tempFile = Files.createFile(tempDir.resolve("test_consumer_config.conf")).toFile();