KAFKA-19624: Improve consistency of command-line arguments for consumer performance tests (#20385)

resolves https://issues.apache.org/jira/browse/KAFKA-19624

Reviewers: @brandboat, @AndrewJSchofield, @m1a2st
This commit is contained in:
ally heev 2025-09-23 14:31:40 +05:30 committed by GitHub
parent 71efb89290
commit dbe9d34e47
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 459 additions and 121 deletions

View File

@ -40,7 +40,7 @@ class ConsumerPerformanceService(PerformanceService):
"socket-buffer-size", "The size of the tcp RECV size."
"new-consumer", "Use the new consumer implementation."
"consumer.config", "Consumer config properties file."
"command-config", "Config properties file."
"""
# Root directory for persistent output
@ -83,10 +83,14 @@ class ConsumerPerformanceService(PerformanceService):
def args(self, version):
"""Dictionary of arguments used to start the Consumer Performance script."""
args = {
'topic': self.topic,
'messages': self.messages
'topic': self.topic
}
if version.supports_command_config():
args['num-records'] = self.messages
else:
args['messages'] = self.messages
if version < V_2_5_0:
args['broker-list'] = self.kafka.bootstrap_servers(self.security_config.security_protocol)
else:
@ -115,7 +119,10 @@ class ConsumerPerformanceService(PerformanceService):
for key, value in self.args(node.version).items():
cmd += " --%s %s" % (key, value)
cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE
if node.version.supports_command_config():
cmd += " --command-config %s" % ConsumerPerformanceService.CONFIG_FILE
else:
cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE
for key, value in self.settings.items():
cmd += " %s=%s" % (str(key), str(value))

View File

@ -33,7 +33,7 @@ class ShareConsumerPerformanceService(PerformanceService):
"socket-buffer-size", "The size of the tcp RECV size."
"consumer.config", "Consumer config properties file."
"command-config", "Config properties file."
"""
# Root directory for persistent output
@ -73,16 +73,20 @@ class ShareConsumerPerformanceService(PerformanceService):
for node in self.nodes:
node.version = version
def args(self):
def args(self, version):
"""Dictionary of arguments used to start the Share Consumer Performance script."""
args = {
'topic': self.topic,
'messages': self.messages,
'bootstrap-server': self.kafka.bootstrap_servers(self.security_config.security_protocol),
'group': self.group,
'timeout': self.timeout
}
if version.supports_command_config():
args['num-records'] = self.messages
else:
args['messages'] = self.messages
if self.fetch_size is not None:
args['fetch-size'] = self.fetch_size
@ -97,10 +101,13 @@ class ShareConsumerPerformanceService(PerformanceService):
cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts
cmd += " export KAFKA_LOG4J_OPTS=\"%s%s\";" % (get_log4j_config_param(node), get_log4j_config_for_tools(node))
cmd += " %s" % self.path.script("kafka-share-consumer-perf-test.sh", node)
for key, value in self.args().items():
for key, value in self.args(node.version).items():
cmd += " --%s %s" % (key, value)
cmd += " --consumer.config %s" % ShareConsumerPerformanceService.CONFIG_FILE
if node.version.supports_command_config():
cmd += " --command-config %s" % ShareConsumerPerformanceService.CONFIG_FILE
else:
cmd += " --consumer.config %s" % ShareConsumerPerformanceService.CONFIG_FILE
for key, value in self.settings.items():
cmd += " %s=%s" % (str(key), str(value))

View File

@ -48,6 +48,7 @@ import joptsimple.OptionException;
import joptsimple.OptionSpec;
import static joptsimple.util.RegexMatcher.regex;
import static org.apache.kafka.server.util.CommandLineUtils.parseKeyValueArgs;
public class ConsumerPerformance {
private static final Logger LOG = LoggerFactory.getLogger(ConsumerPerformance.class);
@ -61,7 +62,7 @@ public class ConsumerPerformance {
try {
LOG.info("Starting consumer...");
ConsumerPerfOptions options = new ConsumerPerfOptions(args);
AtomicLong totalMessagesRead = new AtomicLong(0);
AtomicLong totalRecordsRead = new AtomicLong(0);
AtomicLong totalBytesRead = new AtomicLong(0);
AtomicLong joinTimeMs = new AtomicLong(0);
AtomicLong joinTimeMsInSingleRound = new AtomicLong(0);
@ -71,14 +72,14 @@ public class ConsumerPerformance {
try (Consumer<byte[], byte[]> consumer = consumerCreator.apply(options.props())) {
long bytesRead = 0L;
long messagesRead = 0L;
long recordsRead = 0L;
long lastBytesRead = 0L;
long lastMessagesRead = 0L;
long lastRecordsRead = 0L;
long currentTimeMs = System.currentTimeMillis();
long joinStartMs = currentTimeMs;
long startMs = currentTimeMs;
consume(consumer, options, totalMessagesRead, totalBytesRead, joinTimeMs,
bytesRead, messagesRead, lastBytesRead, lastMessagesRead,
consume(consumer, options, totalRecordsRead, totalBytesRead, joinTimeMs,
bytesRead, recordsRead, lastBytesRead, lastRecordsRead,
joinStartMs, joinTimeMsInSingleRound);
long endMs = System.currentTimeMillis();
@ -92,12 +93,12 @@ public class ConsumerPerformance {
options.dateFormat().format(endMs),
totalMbRead,
totalMbRead / elapsedSec,
totalMessagesRead.get(),
totalMessagesRead.get() / elapsedSec,
totalRecordsRead.get(),
totalRecordsRead.get() / elapsedSec,
joinTimeMs.get(),
fetchTimeInMs,
totalMbRead / (fetchTimeInMs / 1000.0),
totalMessagesRead.get() / (fetchTimeInMs / 1000.0)
totalRecordsRead.get() / (fetchTimeInMs / 1000.0)
);
}
@ -122,16 +123,16 @@ public class ConsumerPerformance {
private static void consume(Consumer<byte[], byte[]> consumer,
ConsumerPerfOptions options,
AtomicLong totalMessagesRead,
AtomicLong totalRecordsRead,
AtomicLong totalBytesRead,
AtomicLong joinTimeMs,
long bytesRead,
long messagesRead,
long recordsRead,
long lastBytesRead,
long lastMessagesRead,
long lastRecordsRead,
long joinStartMs,
AtomicLong joinTimeMsInSingleRound) {
long numMessages = options.numMessages();
long numRecords = options.numRecords();
long recordFetchTimeoutMs = options.recordFetchTimeoutMs();
long reportingIntervalMs = options.reportingIntervalMs();
boolean showDetailedStats = options.showDetailedStats();
@ -149,55 +150,55 @@ public class ConsumerPerformance {
long lastReportTimeMs = currentTimeMs;
long lastConsumedTimeMs = currentTimeMs;
while (messagesRead < numMessages && currentTimeMs - lastConsumedTimeMs <= recordFetchTimeoutMs) {
while (recordsRead < numRecords && currentTimeMs - lastConsumedTimeMs <= recordFetchTimeoutMs) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
currentTimeMs = System.currentTimeMillis();
if (!records.isEmpty())
lastConsumedTimeMs = currentTimeMs;
for (ConsumerRecord<byte[], byte[]> record : records) {
messagesRead += 1;
recordsRead += 1;
if (record.key() != null)
bytesRead += record.key().length;
if (record.value() != null)
bytesRead += record.value().length;
if (currentTimeMs - lastReportTimeMs >= reportingIntervalMs) {
if (showDetailedStats)
printConsumerProgress(0, bytesRead, lastBytesRead, messagesRead, lastMessagesRead,
printConsumerProgress(0, bytesRead, lastBytesRead, recordsRead, lastRecordsRead,
lastReportTimeMs, currentTimeMs, dateFormat, joinTimeMsInSingleRound.get());
joinTimeMsInSingleRound = new AtomicLong(0);
lastReportTimeMs = currentTimeMs;
lastMessagesRead = messagesRead;
lastRecordsRead = recordsRead;
lastBytesRead = bytesRead;
}
}
}
if (messagesRead < numMessages)
System.out.printf("WARNING: Exiting before consuming the expected number of messages: timeout (%d ms) exceeded. " +
if (recordsRead < numRecords)
System.out.printf("WARNING: Exiting before consuming the expected number of records: timeout (%d ms) exceeded. " +
"You can use the --timeout option to increase the timeout.%n", recordFetchTimeoutMs);
totalMessagesRead.set(messagesRead);
totalRecordsRead.set(recordsRead);
totalBytesRead.set(bytesRead);
}
protected static void printConsumerProgress(int id,
long bytesRead,
long lastBytesRead,
long messagesRead,
long lastMessagesRead,
long recordsRead,
long lastRecordsRead,
long startMs,
long endMs,
SimpleDateFormat dateFormat,
long joinTimeMsInSingleRound) {
printBasicProgress(id, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, dateFormat);
printExtendedProgress(bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, joinTimeMsInSingleRound);
printBasicProgress(id, bytesRead, lastBytesRead, recordsRead, lastRecordsRead, startMs, endMs, dateFormat);
printExtendedProgress(bytesRead, lastBytesRead, recordsRead, lastRecordsRead, startMs, endMs, joinTimeMsInSingleRound);
System.out.println();
}
private static void printBasicProgress(int id,
long bytesRead,
long lastBytesRead,
long messagesRead,
long lastMessagesRead,
long recordsRead,
long lastRecordsRead,
long startMs,
long endMs,
SimpleDateFormat dateFormat) {
@ -205,25 +206,25 @@ public class ConsumerPerformance {
double totalMbRead = (bytesRead * 1.0) / (1024 * 1024);
double intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024);
double intervalMbPerSec = 1000.0 * intervalMbRead / elapsedMs;
double intervalMessagesPerSec = ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0;
double intervalRecordsPerSec = ((recordsRead - lastRecordsRead) / elapsedMs) * 1000.0;
System.out.printf("%s, %d, %.4f, %.4f, %d, %.4f", dateFormat.format(endMs), id,
totalMbRead, intervalMbPerSec, messagesRead, intervalMessagesPerSec);
totalMbRead, intervalMbPerSec, recordsRead, intervalRecordsPerSec);
}
private static void printExtendedProgress(long bytesRead,
long lastBytesRead,
long messagesRead,
long lastMessagesRead,
long recordsRead,
long lastRecordsRead,
long startMs,
long endMs,
long joinTimeMsInSingleRound) {
long fetchTimeMs = endMs - startMs - joinTimeMsInSingleRound;
double intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024);
long intervalMessagesRead = messagesRead - lastMessagesRead;
long intervalRecordsRead = recordsRead - lastRecordsRead;
double intervalMbPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 * intervalMbRead / fetchTimeMs;
double intervalMessagesPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 * intervalMessagesRead / fetchTimeMs;
double intervalRecordsPerSec = (fetchTimeMs <= 0) ? 0.0 : 1000.0 * intervalRecordsRead / fetchTimeMs;
System.out.printf(", %d, %d, %.4f, %.4f", joinTimeMsInSingleRound,
fetchTimeMs, intervalMbPerSec, intervalMessagesPerSec);
fetchTimeMs, intervalMbPerSec, intervalRecordsPerSec);
}
public static class ConsumerPerfRebListener implements ConsumerRebalanceListener {
@ -256,13 +257,18 @@ public class ConsumerPerformance {
private final OptionSpec<String> includeOpt;
private final OptionSpec<String> groupIdOpt;
private final OptionSpec<Integer> fetchSizeOpt;
private final OptionSpec<String> commandPropertiesOpt;
private final OptionSpec<Void> resetBeginningOffsetOpt;
private final OptionSpec<Integer> socketBufferSizeOpt;
@Deprecated(since = "4.2", forRemoval = true)
private final OptionSpec<String> consumerConfigOpt;
private final OptionSpec<String> commandConfigOpt;
private final OptionSpec<Void> printMetricsOpt;
private final OptionSpec<Void> showDetailedStatsOpt;
private final OptionSpec<Long> recordFetchTimeoutOpt;
@Deprecated(since = "4.2", forRemoval = true)
private final OptionSpec<Long> numMessagesOpt;
private final OptionSpec<Long> numRecordsOpt;
private final OptionSpec<Long> reportingIntervalOpt;
private final OptionSpec<String> dateFormatOpt;
private final OptionSpec<Void> hideHeaderOpt;
@ -291,26 +297,41 @@ public class ConsumerPerformance {
.describedAs("size")
.ofType(Integer.class)
.defaultsTo(1024 * 1024);
commandPropertiesOpt = parser.accepts("command-property", "Kafka consumer related configuration properties like client.id. " +
"These configs take precedence over those passed via --command-config or --consumer.config.")
.withRequiredArg()
.describedAs("prop1=val1")
.ofType(String.class);
resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " +
"offset to consume from, start with the latest message present in the log rather than the earliest message.");
"offset to consume from, start with the latest record present in the log rather than the earliest record.");
socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
.withRequiredArg()
.describedAs("size")
.ofType(Integer.class)
.defaultsTo(2 * 1024 * 1024);
consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.")
consumerConfigOpt = parser.accepts("consumer.config", "(DEPRECATED) Consumer config properties file. " +
"This option will be removed in a future version. Use --command-config instead.")
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
commandConfigOpt = parser.accepts("command-config", "Config properties file.")
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
printMetricsOpt = parser.accepts("print-metrics", "Print out the metrics.");
showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " +
"interval as configured by reporting-interval");
"interval as configured by reporting-interval.");
recordFetchTimeoutOpt = parser.accepts("timeout", "The maximum allowed time in milliseconds between returned records.")
.withOptionalArg()
.describedAs("milliseconds")
.ofType(Long.class)
.defaultsTo(10_000L);
numMessagesOpt = parser.accepts("messages", "REQUIRED: The number of messages to consume.")
numMessagesOpt = parser.accepts("messages", "(DEPRECATED) The number of records to consume. " +
"This option will be removed in a future version. Use --num-records instead.")
.withRequiredArg()
.describedAs("count")
.ofType(Long.class);
numRecordsOpt = parser.accepts("num-records", "REQUIRED: The number of records to consume.")
.withRequiredArg()
.describedAs("count")
.ofType(Long.class);
@ -326,7 +347,7 @@ public class ConsumerPerformance {
.describedAs("date format")
.ofType(String.class)
.defaultsTo("yyyy-MM-dd HH:mm:ss:SSS");
hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats");
hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats.");
try {
options = parser.parse(args);
} catch (OptionException e) {
@ -335,8 +356,19 @@ public class ConsumerPerformance {
}
if (options != null) {
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is used to verify the consumer performance.");
CommandLineUtils.checkRequiredArgs(parser, options, numMessagesOpt);
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt);
CommandLineUtils.checkOneOfArgs(parser, options, topicOpt, includeOpt);
CommandLineUtils.checkOneOfArgs(parser, options, numMessagesOpt, numRecordsOpt);
CommandLineUtils.checkInvalidArgs(parser, options, consumerConfigOpt, commandConfigOpt);
if (options.has(numMessagesOpt)) {
System.out.println("Warning: --messages is deprecated. Use --num-records instead.");
}
if (options.has(consumerConfigOpt)) {
System.out.println("Warning: --consumer.config is deprecated. Use --command-config instead.");
}
}
}
@ -348,10 +380,23 @@ public class ConsumerPerformance {
return options.valueOf(bootstrapServerOpt);
}
private Properties readProps(List<String> commandProperties, String commandConfigFile) throws IOException {
Properties props = commandConfigFile != null
? Utils.loadProps(commandConfigFile)
: new Properties();
props.putAll(parseKeyValueArgs(commandProperties));
return props;
}
public Properties props() throws IOException {
Properties props = (options.has(consumerConfigOpt))
? Utils.loadProps(options.valueOf(consumerConfigOpt))
: new Properties();
List<String> commandProperties = options.valuesOf(commandPropertiesOpt);
String commandConfigFile;
if (options.has(consumerConfigOpt)) {
commandConfigFile = options.valueOf(consumerConfigOpt);
} else {
commandConfigFile = options.valueOf(commandConfigOpt);
}
Properties props = readProps(commandProperties, commandConfigFile);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostsAndPorts());
props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt));
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, options.valueOf(socketBufferSizeOpt).toString());
@ -378,8 +423,10 @@ public class ConsumerPerformance {
: Optional.empty();
}
public long numMessages() {
return options.valueOf(numMessagesOpt);
public long numRecords() {
return options.has(numMessagesOpt)
? options.valueOf(numMessagesOpt)
: options.valueOf(numRecordsOpt);
}
public long reportingIntervalMs() {

View File

@ -55,6 +55,7 @@ import joptsimple.OptionException;
import joptsimple.OptionSpec;
import static joptsimple.util.RegexMatcher.regex;
import static org.apache.kafka.server.util.CommandLineUtils.parseKeyValueArgs;
public class ShareConsumerPerformance {
private static final Logger LOG = LoggerFactory.getLogger(ShareConsumerPerformance.class);
@ -67,7 +68,7 @@ public class ShareConsumerPerformance {
try {
LOG.info("Starting share consumer/consumers...");
ShareConsumerPerfOptions options = new ShareConsumerPerfOptions(args);
AtomicLong totalMessagesRead = new AtomicLong(0);
AtomicLong totalRecordsRead = new AtomicLong(0);
AtomicLong totalBytesRead = new AtomicLong(0);
if (!options.hideHeader())
@ -78,7 +79,7 @@ public class ShareConsumerPerformance {
shareConsumers.add(shareConsumerCreator.apply(options.props()));
}
long startMs = System.currentTimeMillis();
consume(shareConsumers, options, totalMessagesRead, totalBytesRead, startMs);
consume(shareConsumers, options, totalRecordsRead, totalBytesRead, startMs);
long endMs = System.currentTimeMillis();
List<Map<MetricName, ? extends Metric>> shareConsumersMetrics = new ArrayList<>();
@ -93,7 +94,7 @@ public class ShareConsumerPerformance {
// Print final stats for share group.
double elapsedSec = (endMs - startMs) / 1_000.0;
long fetchTimeInMs = endMs - startMs;
printStats(totalBytesRead.get(), totalMessagesRead.get(), elapsedSec, fetchTimeInMs, startMs, endMs,
printStats(totalBytesRead.get(), totalRecordsRead.get(), elapsedSec, fetchTimeInMs, startMs, endMs,
options.dateFormat(), -1);
shareConsumersMetrics.forEach(ToolsUtils::printMetrics);
@ -113,15 +114,15 @@ public class ShareConsumerPerformance {
private static void consume(List<ShareConsumer<byte[], byte[]>> shareConsumers,
ShareConsumerPerfOptions options,
AtomicLong totalMessagesRead,
AtomicLong totalRecordsRead,
AtomicLong totalBytesRead,
long startMs) throws ExecutionException, InterruptedException {
long numMessages = options.numMessages();
long numRecords = options.numRecords();
long recordFetchTimeoutMs = options.recordFetchTimeoutMs();
shareConsumers.forEach(shareConsumer -> shareConsumer.subscribe(options.topic()));
// Now start the benchmark.
AtomicLong messagesRead = new AtomicLong(0);
AtomicLong recordsRead = new AtomicLong(0);
AtomicLong bytesRead = new AtomicLong(0);
List<ShareConsumerConsumption> shareConsumersConsumptionDetails = new ArrayList<>();
@ -133,7 +134,7 @@ public class ShareConsumerPerformance {
ShareConsumerConsumption shareConsumerConsumption = new ShareConsumerConsumption(0, 0);
futures.add(executorService.submit(() -> {
try {
consumeMessagesForSingleShareConsumer(shareConsumers.get(index), messagesRead, bytesRead, options,
consumeRecordsForSingleShareConsumer(shareConsumers.get(index), recordsRead, bytesRead, options,
shareConsumerConsumption, index + 1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
@ -171,22 +172,22 @@ public class ShareConsumerPerformance {
// Print stats for share consumer.
double elapsedSec = (endMs - startMs) / 1_000.0;
long fetchTimeInMs = endMs - startMs;
long messagesReadByConsumer = shareConsumersConsumptionDetails.get(index).messagesConsumed();
long recordsReadByConsumer = shareConsumersConsumptionDetails.get(index).recordsConsumed();
long bytesReadByConsumer = shareConsumersConsumptionDetails.get(index).bytesConsumed();
printStats(bytesReadByConsumer, messagesReadByConsumer, elapsedSec, fetchTimeInMs, startMs, endMs, options.dateFormat(), index + 1);
printStats(bytesReadByConsumer, recordsReadByConsumer, elapsedSec, fetchTimeInMs, startMs, endMs, options.dateFormat(), index + 1);
}
}
if (messagesRead.get() < numMessages) {
System.out.printf("WARNING: Exiting before consuming the expected number of messages: timeout (%d ms) exceeded. " +
if (recordsRead.get() < numRecords) {
System.out.printf("WARNING: Exiting before consuming the expected number of records: timeout (%d ms) exceeded. " +
"You can use the --timeout option to increase the timeout.%n", recordFetchTimeoutMs);
}
totalMessagesRead.set(messagesRead.get());
totalRecordsRead.set(recordsRead.get());
totalBytesRead.set(bytesRead.get());
}
private static void consumeMessagesForSingleShareConsumer(ShareConsumer<byte[], byte[]> shareConsumer,
AtomicLong totalMessagesRead,
private static void consumeRecordsForSingleShareConsumer(ShareConsumer<byte[], byte[]> shareConsumer,
AtomicLong totalRecordsRead,
AtomicLong totalBytesRead,
ShareConsumerPerfOptions options,
ShareConsumerConsumption shareConsumerConsumption,
@ -197,17 +198,17 @@ public class ShareConsumerPerformance {
long lastReportTimeMs = currentTimeMs;
long lastBytesRead = 0L;
long lastMessagesRead = 0L;
long messagesReadByConsumer = 0L;
long lastRecordsRead = 0L;
long recordsReadByConsumer = 0L;
long bytesReadByConsumer = 0L;
while (totalMessagesRead.get() < options.numMessages() && currentTimeMs - lastConsumedTimeMs <= options.recordFetchTimeoutMs()) {
while (totalRecordsRead.get() < options.numRecords() && currentTimeMs - lastConsumedTimeMs <= options.recordFetchTimeoutMs()) {
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(100));
currentTimeMs = System.currentTimeMillis();
if (!records.isEmpty())
lastConsumedTimeMs = currentTimeMs;
for (ConsumerRecord<byte[], byte[]> record : records) {
messagesReadByConsumer += 1;
totalMessagesRead.addAndGet(1);
recordsReadByConsumer += 1;
totalRecordsRead.addAndGet(1);
if (record.key() != null) {
bytesReadByConsumer += record.key().length;
totalBytesRead.addAndGet(record.key().length);
@ -218,13 +219,13 @@ public class ShareConsumerPerformance {
}
if (currentTimeMs - lastReportTimeMs >= options.reportingIntervalMs()) {
if (options.showDetailedStats())
printShareConsumerProgress(bytesReadByConsumer, lastBytesRead, messagesReadByConsumer, lastMessagesRead,
printShareConsumerProgress(bytesReadByConsumer, lastBytesRead, recordsReadByConsumer, lastRecordsRead,
lastReportTimeMs, currentTimeMs, dateFormat, index);
lastReportTimeMs = currentTimeMs;
lastMessagesRead = messagesReadByConsumer;
lastRecordsRead = recordsReadByConsumer;
lastBytesRead = bytesReadByConsumer;
}
shareConsumerConsumption.updateMessagesConsumed(messagesReadByConsumer);
shareConsumerConsumption.updateRecordsConsumed(recordsReadByConsumer);
shareConsumerConsumption.updateBytesConsumed(bytesReadByConsumer);
}
}
@ -232,8 +233,8 @@ public class ShareConsumerPerformance {
protected static void printShareConsumerProgress(long bytesRead,
long lastBytesRead,
long messagesRead,
long lastMessagesRead,
long recordsRead,
long lastRecordsRead,
long startMs,
long endMs,
SimpleDateFormat dateFormat,
@ -242,18 +243,18 @@ public class ShareConsumerPerformance {
double totalMbRead = (bytesRead * 1.0) / (1024 * 1024);
double intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024);
double intervalMbPerSec = 1000.0 * intervalMbRead / elapsedMs;
double intervalMessagesPerSec = ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0;
double intervalRecordsPerSec = ((recordsRead - lastRecordsRead) / elapsedMs) * 1000.0;
long fetchTimeMs = endMs - startMs;
System.out.printf("%s, %s, %.4f, %.4f, %.4f, %d, %d for share consumer %d", dateFormat.format(startMs), dateFormat.format(endMs),
totalMbRead, intervalMbPerSec, intervalMessagesPerSec, messagesRead, fetchTimeMs, index);
totalMbRead, intervalMbPerSec, intervalRecordsPerSec, recordsRead, fetchTimeMs, index);
System.out.println();
}
// Prints stats for both share consumer and share group. For share group, index is -1. For share consumer,
// index is >= 1.
private static void printStats(long bytesRead,
long messagesRead,
long recordsRead,
double elapsedSec,
long fetchTimeInMs,
long startMs,
@ -268,8 +269,8 @@ public class ShareConsumerPerformance {
dateFormat.format(endMs),
totalMbRead,
totalMbRead / elapsedSec,
messagesRead / elapsedSec,
messagesRead,
recordsRead / elapsedSec,
recordsRead,
fetchTimeInMs
);
return;
@ -279,8 +280,8 @@ public class ShareConsumerPerformance {
dateFormat.format(endMs),
totalMbRead,
totalMbRead / elapsedSec,
messagesRead / elapsedSec,
messagesRead,
recordsRead / elapsedSec,
recordsRead,
fetchTimeInMs
);
}
@ -290,12 +291,17 @@ public class ShareConsumerPerformance {
private final OptionSpec<String> topicOpt;
private final OptionSpec<String> groupIdOpt;
private final OptionSpec<Integer> fetchSizeOpt;
private final OptionSpec<String> commandPropertiesOpt;
private final OptionSpec<Integer> socketBufferSizeOpt;
@Deprecated(since = "4.2", forRemoval = true)
private final OptionSpec<String> consumerConfigOpt;
private final OptionSpec<String> commandConfigOpt;
private final OptionSpec<Void> printMetricsOpt;
private final OptionSpec<Void> showDetailedStatsOpt;
private final OptionSpec<Long> recordFetchTimeoutOpt;
@Deprecated(since = "4.2", forRemoval = true)
private final OptionSpec<Long> numMessagesOpt;
private final OptionSpec<Long> numRecordsOpt;
private final OptionSpec<Long> reportingIntervalOpt;
private final OptionSpec<String> dateFormatOpt;
private final OptionSpec<Void> hideHeaderOpt;
@ -322,24 +328,39 @@ public class ShareConsumerPerformance {
.describedAs("size")
.ofType(Integer.class)
.defaultsTo(1024 * 1024);
commandPropertiesOpt = parser.accepts("command-property", "Kafka share consumer related configuration properties like client.id. " +
"These configs take precedence over those passed via --command-config or --consumer.config.")
.withRequiredArg()
.describedAs("prop1=val1")
.ofType(String.class);
socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
.withRequiredArg()
.describedAs("size")
.ofType(Integer.class)
.defaultsTo(2 * 1024 * 1024);
consumerConfigOpt = parser.accepts("consumer.config", "Share consumer config properties file.")
consumerConfigOpt = parser.accepts("consumer.config", "(DEPRECATED) Share consumer config properties file. " +
"This option will be removed in a future version. Use --command-config instead.")
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
commandConfigOpt = parser.accepts("command-config", "Config properties file.")
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
printMetricsOpt = parser.accepts("print-metrics", "Print out the metrics.");
showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " +
"interval as configured by reporting-interval");
"interval as configured by reporting-interval.");
recordFetchTimeoutOpt = parser.accepts("timeout", "The maximum allowed time in milliseconds between returned records.")
.withOptionalArg()
.describedAs("milliseconds")
.ofType(Long.class)
.defaultsTo(10_000L);
numMessagesOpt = parser.accepts("messages", "REQUIRED: The number of messages to consume.")
numMessagesOpt = parser.accepts("messages", "(DEPRECATED) The number of records to consume. " +
"This option will be removed in a future version. Use --num-records instead.")
.withRequiredArg()
.describedAs("count")
.ofType(Long.class);
numRecordsOpt = parser.accepts("num-records", "REQUIRED: The number of records to consume.")
.withRequiredArg()
.describedAs("count")
.ofType(Long.class);
@ -355,7 +376,7 @@ public class ShareConsumerPerformance {
.describedAs("date format")
.ofType(String.class)
.defaultsTo("yyyy-MM-dd HH:mm:ss:SSS");
hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats");
hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats.");
numThreadsOpt = parser.accepts("threads", "The number of share consumers to use for sharing the load.")
.withRequiredArg()
.describedAs("count")
@ -371,7 +392,18 @@ public class ShareConsumerPerformance {
}
if (options != null) {
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is used to verify the share consumer performance.");
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt);
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, bootstrapServerOpt);
CommandLineUtils.checkOneOfArgs(parser, options, numMessagesOpt, numRecordsOpt);
CommandLineUtils.checkInvalidArgs(parser, options, consumerConfigOpt, commandConfigOpt);
if (options.has(numMessagesOpt)) {
System.out.println("Warning: --messages is deprecated. Use --num-records instead.");
}
if (options.has(consumerConfigOpt)) {
System.out.println("Warning: --consumer.config is deprecated. Use --command-config instead.");
}
}
}
@ -383,10 +415,23 @@ public class ShareConsumerPerformance {
return options.valueOf(bootstrapServerOpt);
}
public Properties props() throws IOException {
Properties props = (options.has(consumerConfigOpt))
? Utils.loadProps(options.valueOf(consumerConfigOpt))
private Properties readProps(List<String> commandProperties, String commandConfigFile) throws IOException {
Properties props = commandConfigFile != null
? Utils.loadProps(commandConfigFile)
: new Properties();
props.putAll(parseKeyValueArgs(commandProperties));
return props;
}
public Properties props() throws IOException {
List<String> commandProperties = options.valuesOf(commandPropertiesOpt);
String commandConfigFile;
if (options.has(consumerConfigOpt)) {
commandConfigFile = options.valueOf(consumerConfigOpt);
} else {
commandConfigFile = options.valueOf(commandConfigOpt);
}
Properties props = readProps(commandProperties, commandConfigFile);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostsAndPorts());
props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt));
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, options.valueOf(socketBufferSizeOpt).toString());
@ -403,8 +448,10 @@ public class ShareConsumerPerformance {
return Set.of(options.valueOf(topicOpt));
}
public long numMessages() {
return options.valueOf(numMessagesOpt);
public long numRecords() {
return options.has(numMessagesOpt)
? options.valueOf(numMessagesOpt)
: options.valueOf(numRecordsOpt);
}
public int threads() {
@ -439,26 +486,26 @@ public class ShareConsumerPerformance {
}
}
// Helper class to know the final messages and bytes consumer by share consumer at the end of consumption.
// Helper class to know the final records and bytes consumed by share consumer at the end of consumption.
private static class ShareConsumerConsumption {
private long messagesConsumed;
private long recordsConsumed;
private long bytesConsumed;
public ShareConsumerConsumption(long messagesConsumed, long bytesConsumed) {
this.messagesConsumed = messagesConsumed;
public ShareConsumerConsumption(long recordsConsumed, long bytesConsumed) {
this.recordsConsumed = recordsConsumed;
this.bytesConsumed = bytesConsumed;
}
public long messagesConsumed() {
return messagesConsumed;
public long recordsConsumed() {
return recordsConsumed;
}
public long bytesConsumed() {
return bytesConsumed;
}
public void updateMessagesConsumed(long messagesConsumed) {
this.messagesConsumed = messagesConsumed;
public void updateRecordsConsumed(long recordsConsumed) {
this.recordsConsumed = recordsConsumed;
}
public void updateBytesConsumed(long bytesConsumed) {

View File

@ -74,7 +74,7 @@ public class ConsumerPerformanceTest {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--messages", "10",
"--num-records", "10",
"--print-metrics"
};
@ -82,7 +82,56 @@ public class ConsumerPerformanceTest {
assertEquals("localhost:9092", config.brokerHostsAndPorts());
assertTrue(config.topic().get().contains("test"));
assertEquals(10, config.numMessages());
assertEquals(10, config.numRecords());
}
@Test
public void testBootstrapServerNotPresent() {
String[] args = new String[]{
"--topic", "test"
};
String err = ToolsTestUtils.captureStandardErr(() ->
new ConsumerPerformance.ConsumerPerfOptions(args));
assertTrue(err.contains("Missing required argument \"[bootstrap-server]\""));
}
@Test
public void testNumOfRecordsNotPresent() {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test"
};
String err = ToolsTestUtils.captureStandardErr(() ->
new ConsumerPerformance.ConsumerPerfOptions(args));
assertTrue(err.contains("Exactly one of the following arguments is required:"));
}
@Test
public void testMessagesDeprecated() {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--messages", "10"
};
ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args);
assertEquals(10, config.numRecords());
}
@Test
public void testNumOfRecordsWithMessagesPresent() {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--messages", "10",
"--num-records", "20"
};
String err = ToolsTestUtils.captureStandardErr(() ->
new ConsumerPerformance.ConsumerPerfOptions(args));
assertTrue(err.contains("Exactly one of the following arguments is required"));
}
@Test
@ -90,7 +139,7 @@ public class ConsumerPerformanceTest {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--messages", "10",
"--num-records", "10",
"--new-consumer"
};
@ -104,14 +153,14 @@ public class ConsumerPerformanceTest {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--include", "test.*",
"--messages", "10"
"--num-records", "10"
};
ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args);
assertEquals("localhost:9092", config.brokerHostsAndPorts());
assertTrue(config.include().get().toString().contains("test.*"));
assertEquals(10, config.numMessages());
assertEquals(10, config.numRecords());
}
@Test
@ -120,7 +169,7 @@ public class ConsumerPerformanceTest {
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--include", "test.*",
"--messages", "10"
"--num-records", "10"
};
String err = ToolsTestUtils.captureStandardErr(() -> new ConsumerPerformance.ConsumerPerfOptions(args));
@ -132,7 +181,7 @@ public class ConsumerPerformanceTest {
public void testConfigWithoutTopicAndInclude() {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--messages", "10"
"--num-records", "10"
};
String err = ToolsTestUtils.captureStandardErr(() -> new ConsumerPerformance.ConsumerPerfOptions(args));
@ -141,8 +190,10 @@ public class ConsumerPerformanceTest {
}
@Test
public void testClientIdOverride() throws IOException {
File tempFile = Files.createFile(tempDir.resolve("test_consumer_config.conf")).toFile();
public void testCommandProperty() throws IOException {
Path configPath = tempDir.resolve("test_command_property_consumer_perf.conf");
Files.deleteIfExists(configPath);
File tempFile = Files.createFile(configPath).toFile();
try (PrintWriter output = new PrintWriter(Files.newOutputStream(tempFile.toPath()))) {
output.println("client.id=consumer-1");
output.flush();
@ -151,7 +202,54 @@ public class ConsumerPerformanceTest {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--messages", "10",
"--num-records", "10",
"--command-property", "client.id=consumer-2",
"--command-config", tempFile.getAbsolutePath(),
"--command-property", "prop=val"
};
ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args);
assertEquals("consumer-2", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
assertEquals("val", config.props().getProperty("prop"));
}
@Test
public void testClientIdOverride() throws IOException {
Path configPath = tempDir.resolve("test_client_id_override_consumer_perf.conf");
Files.deleteIfExists(configPath);
File tempFile = Files.createFile(configPath).toFile();
try (PrintWriter output = new PrintWriter(Files.newOutputStream(tempFile.toPath()))) {
output.println("client.id=consumer-1");
output.flush();
}
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--num-records", "10",
"--command-config", tempFile.getAbsolutePath()
};
ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args);
assertEquals("consumer-1", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
}
@Test
public void testConsumerConfigDeprecated() throws IOException {
Path configPath = tempDir.resolve("test_consumer_config_deprecated_consumer_perf.conf");
Files.deleteIfExists(configPath);
File tempFile = Files.createFile(configPath).toFile();
try (PrintWriter output = new PrintWriter(Files.newOutputStream(tempFile.toPath()))) {
output.println("client.id=consumer-1");
output.flush();
}
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--num-records", "10",
"--consumer.config", tempFile.getAbsolutePath()
};
@ -160,12 +258,28 @@ public class ConsumerPerformanceTest {
assertEquals("consumer-1", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
}
@Test
public void testCommandConfigWithConsumerConfigPresent() {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--num-records", "10",
"--consumer.config", "some-path",
"--command-config", "some-path"
};
String err = ToolsTestUtils.captureStandardErr(() ->
new ConsumerPerformance.ConsumerPerfOptions(args));
assertTrue(err.contains(String.format("Option \"%s\" can't be used with option \"%s\"",
"[consumer.config]", "[command-config]")));
}
@Test
public void testDefaultClientId() throws IOException {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--messages", "10"
"--num-records", "10"
};
ConsumerPerformance.ConsumerPerfOptions config = new ConsumerPerformance.ConsumerPerfOptions(args);
@ -178,7 +292,7 @@ public class ConsumerPerformanceTest {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--messages", "0",
"--num-records", "0",
"--print-metrics"
};

View File

@ -67,7 +67,7 @@ public class ShareConsumerPerformanceTest {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--messages", "10",
"--num-records", "10",
"--print-metrics"
};
@ -75,7 +75,57 @@ public class ShareConsumerPerformanceTest {
assertEquals("localhost:9092", config.brokerHostsAndPorts());
assertTrue(config.topic().contains("test"));
assertEquals(10, config.numMessages());
assertEquals(10, config.numRecords());
}
@Test
public void testBootstrapServerNotPresent() {
String[] args = new String[]{
"--topic", "test"
};
String err = ToolsTestUtils.captureStandardErr(() ->
new ShareConsumerPerformance.ShareConsumerPerfOptions(args));
assertTrue(err.contains("Missing required argument \"[bootstrap-server]\""));
}
@Test
public void testNumOfRecordsNotPresent() {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test"
};
String err = ToolsTestUtils.captureStandardErr(() ->
new ShareConsumerPerformance.ShareConsumerPerfOptions(args));
assertTrue(err.contains("Exactly one of the following arguments is required:"));
}
@Test
public void testMessagesDeprecated() {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--messages", "10"
};
ShareConsumerPerformance.ShareConsumerPerfOptions config =
new ShareConsumerPerformance.ShareConsumerPerfOptions(args);
assertEquals(10, config.numRecords());
}
@Test
public void testNumOfRecordsWithMessagesPresent() {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--messages", "10",
"--num-records", "20"
};
String err = ToolsTestUtils.captureStandardErr(() ->
new ShareConsumerPerformance.ShareConsumerPerfOptions(args));
assertTrue(err.contains("Exactly one of the following arguments is required"));
}
@Test
@ -83,7 +133,7 @@ public class ShareConsumerPerformanceTest {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--messages", "10",
"--num-records", "10",
"--new-share-consumer"
};
@ -92,9 +142,36 @@ public class ShareConsumerPerformanceTest {
assertTrue(err.contains("new-share-consumer is not a recognized option"));
}
@Test
public void testCommandProperty() throws IOException {
Path configPath = tempDir.resolve("test_command_property_share_consumer_perf.conf");
Files.deleteIfExists(configPath);
File tempFile = Files.createFile(configPath).toFile();
try (PrintWriter output = new PrintWriter(Files.newOutputStream(tempFile.toPath()))) {
output.println("client.id=consumer-1");
output.flush();
}
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--num-records", "10",
"--command-property", "client.id=consumer-2",
"--command-config", tempFile.getAbsolutePath(),
"--command-property", "prop=val"
};
ShareConsumerPerformance.ShareConsumerPerfOptions config = new ShareConsumerPerformance.ShareConsumerPerfOptions(args);
assertEquals("consumer-2", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
assertEquals("val", config.props().getProperty("prop"));
}
@Test
public void testClientIdOverride() throws IOException {
File tempFile = Files.createFile(tempDir.resolve("test_share_consumer_config.conf")).toFile();
Path configPath = tempDir.resolve("test_client_id_override_share_consumer_perf.conf");
Files.deleteIfExists(configPath);
File tempFile = Files.createFile(configPath).toFile();
try (PrintWriter output = new PrintWriter(Files.newOutputStream(tempFile.toPath()))) {
output.println("client.id=share-consumer-1");
output.flush();
@ -103,8 +180,8 @@ public class ShareConsumerPerformanceTest {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--messages", "10",
"--consumer.config", tempFile.getAbsolutePath()
"--num-records", "10",
"--command-config", tempFile.getAbsolutePath()
};
ShareConsumerPerformance.ShareConsumerPerfOptions config = new ShareConsumerPerformance.ShareConsumerPerfOptions(args);
@ -112,12 +189,51 @@ public class ShareConsumerPerformanceTest {
assertEquals("share-consumer-1", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
}
@Test
public void testConsumerConfigDeprecated() throws IOException {
Path configPath = tempDir.resolve("test_consumer_config_deprecated_share_consumer_perf.conf");
Files.deleteIfExists(configPath);
File tempFile = Files.createFile(configPath).toFile();
try (PrintWriter output = new PrintWriter(Files.newOutputStream(tempFile.toPath()))) {
output.println("client.id=share-consumer-1");
output.flush();
}
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--num-records", "10",
"--consumer.config", tempFile.getAbsolutePath()
};
ShareConsumerPerformance.ShareConsumerPerfOptions config =
new ShareConsumerPerformance.ShareConsumerPerfOptions(args);
assertEquals("share-consumer-1", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG));
}
@Test
public void testCommandConfigWithConsumerConfigPresent() {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--num-records", "10",
"--consumer.config", "some-path",
"--command-config", "some-path"
};
String err = ToolsTestUtils.captureStandardErr(() ->
new ShareConsumerPerformance.ShareConsumerPerfOptions(args));
assertTrue(err.contains(String.format("Option \"%s\" can't be used with option \"%s\"",
"[consumer.config]", "[command-config]")));
}
@Test
public void testDefaultClientId() throws IOException {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--messages", "10"
"--num-records", "10"
};
ShareConsumerPerformance.ShareConsumerPerfOptions config = new ShareConsumerPerformance.ShareConsumerPerfOptions(args);
@ -130,7 +246,7 @@ public class ShareConsumerPerformanceTest {
String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--messages", "0",
"--num-records", "0",
"--print-metrics"
};