diff --git a/bin/kafka-share-consumer-perf-test.sh b/bin/kafka-share-consumer-perf-test.sh new file mode 100755 index 00000000000..8e490ca02c4 --- /dev/null +++ b/bin/kafka-share-consumer-perf-test.sh @@ -0,0 +1,20 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then + export KAFKA_HEAP_OPTS="-Xmx512M" +fi +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.ShareConsumerPerformance "$@" diff --git a/bin/windows/kafka-share-consumer-perf-test.bat b/bin/windows/kafka-share-consumer-perf-test.bat new file mode 100644 index 00000000000..5bfcb1ed11b --- /dev/null +++ b/bin/windows/kafka-share-consumer-perf-test.bat @@ -0,0 +1,20 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +SetLocal +set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M +"%~dp0kafka-run-class.bat" org.apache.kafka.tools.ShareConsumerPerformance %* +EndLocal diff --git a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java new file mode 100644 index 00000000000..ff2a679e96b --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaShareConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import joptsimple.OptionException; +import joptsimple.OptionSpec; + +import static joptsimple.util.RegexMatcher.regex; + +public class ShareConsumerPerformance { + private static final Logger LOG = LoggerFactory.getLogger(ShareConsumerPerformance.class); + + public static void main(String[] args) { + try { + LOG.info("Starting share consumer/consumers..."); + ShareConsumerPerfOptions options = new ShareConsumerPerfOptions(args); + AtomicLong totalMessagesRead = new AtomicLong(0); + AtomicLong totalBytesRead = new AtomicLong(0); + + if (!options.hideHeader()) + printHeader(); + + List> shareConsumers = new ArrayList<>(); + for (int i = 0; i < options.threads(); i++) { + shareConsumers.add(new KafkaShareConsumer<>(options.props())); + } + long startMs = System.currentTimeMillis(); + consume(shareConsumers, options, totalMessagesRead, totalBytesRead, startMs); + long endMs = System.currentTimeMillis(); + + List> shareConsumersMetrics = new ArrayList<>(); + if (options.printMetrics()) { + shareConsumers.forEach(shareConsumer -> shareConsumersMetrics.add(shareConsumer.metrics())); + } + shareConsumers.forEach(shareConsumer -> { + Map> val = shareConsumer.commitSync(); + shareConsumer.close(Duration.ofMillis(500)); + }); + + // Print final stats for share group. + double elapsedSec = (endMs - startMs) / 1_000.0; + long fetchTimeInMs = endMs - startMs; + printStats(totalBytesRead.get(), totalMessagesRead.get(), elapsedSec, fetchTimeInMs, startMs, endMs, + options.dateFormat(), -1); + + shareConsumersMetrics.forEach(ToolsUtils::printMetrics); + + } catch (Throwable e) { + System.err.println(e.getMessage()); + System.err.println(Utils.stackTrace(e)); + Exit.exit(1); + } + } + + protected static void printHeader() { + String newFieldsInHeader = ", fetch.time.ms"; + System.out.printf("start.time, end.time, data.consumed.in.MB, MB.sec, nMsg.sec, data.consumed.in.nMsg%s%n", newFieldsInHeader); + } + + private static void consume(List> shareConsumers, + ShareConsumerPerfOptions options, + AtomicLong totalMessagesRead, + AtomicLong totalBytesRead, + long startMs) { + long numMessages = options.numMessages(); + long recordFetchTimeoutMs = options.recordFetchTimeoutMs(); + shareConsumers.forEach(shareConsumer -> shareConsumer.subscribe(options.topic())); + + // Now start the benchmark. + AtomicLong messagesRead = new AtomicLong(0); + AtomicLong bytesRead = new AtomicLong(0); + List shareConsumersConsumptionDetails = new ArrayList<>(); + + + ExecutorService executorService = Executors.newFixedThreadPool(shareConsumers.size()); + for (int i = 0; i < shareConsumers.size(); i++) { + final int index = i; + ShareConsumerConsumption shareConsumerConsumption = new ShareConsumerConsumption(0, 0); + executorService.submit(() -> { + try { + consumeMessagesForSingleShareConsumer(shareConsumers.get(index), messagesRead, bytesRead, options, + shareConsumerConsumption, index + 1); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + shareConsumersConsumptionDetails.add(shareConsumerConsumption); + } + LOG.debug("Shutting down of thread pool is started"); + executorService.shutdown(); + + try { + // Wait a while for existing tasks to terminate. + // Adding 100 ms to the timeout so all the threads can finish before we reach this part of code. + if (!executorService.awaitTermination(recordFetchTimeoutMs + 100, TimeUnit.MILLISECONDS)) { + LOG.debug("Shutting down of thread pool could not be completed. It will retry cancelling the tasks using shutdownNow."); + executorService.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!executorService.awaitTermination(recordFetchTimeoutMs + 100, TimeUnit.MILLISECONDS)) + LOG.debug("Shutting down of thread pool could not be completed even after retrying cancellation of the tasks using shutdownNow."); + } + } catch (InterruptedException e) { + // (Re-)Cancel if current thread also interrupted + LOG.warn("Encountered InterruptedException while shutting down thread pool. It will retry cancelling the tasks using shutdownNow."); + executorService.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + + if (options.showShareConsumerStats()) { + long endMs = System.currentTimeMillis(); + for (int index = 0; index < shareConsumersConsumptionDetails.size(); index++) { + // Print stats for share consumer. + double elapsedSec = (endMs - startMs) / 1_000.0; + long fetchTimeInMs = endMs - startMs; + long messagesReadByConsumer = shareConsumersConsumptionDetails.get(index).messagesConsumed(); + long bytesReadByConsumer = shareConsumersConsumptionDetails.get(index).bytesConsumed(); + printStats(bytesReadByConsumer, messagesReadByConsumer, elapsedSec, fetchTimeInMs, startMs, endMs, options.dateFormat(), index + 1); + } + } + + if (messagesRead.get() < numMessages) { + System.out.printf("WARNING: Exiting before consuming the expected number of messages: timeout (%d ms) exceeded. " + + "You can use the --timeout option to increase the timeout.%n", recordFetchTimeoutMs); + } + totalMessagesRead.set(messagesRead.get()); + totalBytesRead.set(bytesRead.get()); + } + + private static void consumeMessagesForSingleShareConsumer(KafkaShareConsumer shareConsumer, + AtomicLong totalMessagesRead, + AtomicLong totalBytesRead, + ShareConsumerPerfOptions options, + ShareConsumerConsumption shareConsumerConsumption, + int index) throws InterruptedException { + SimpleDateFormat dateFormat = options.dateFormat(); + long currentTimeMs = System.currentTimeMillis(); + long lastConsumedTimeMs = currentTimeMs; + long lastReportTimeMs = currentTimeMs; + + long lastBytesRead = 0L; + long lastMessagesRead = 0L; + long messagesReadByConsumer = 0L; + long bytesReadByConsumer = 0L; + while (totalMessagesRead.get() < options.numMessages() && currentTimeMs - lastConsumedTimeMs <= options.recordFetchTimeoutMs()) { + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(100)); + currentTimeMs = System.currentTimeMillis(); + if (!records.isEmpty()) + lastConsumedTimeMs = currentTimeMs; + for (ConsumerRecord record : records) { + messagesReadByConsumer += 1; + totalMessagesRead.addAndGet(1); + if (record.key() != null) { + bytesReadByConsumer += record.key().length; + totalBytesRead.addAndGet(record.key().length); + } + if (record.value() != null) { + bytesReadByConsumer += record.value().length; + totalBytesRead.addAndGet(record.value().length); + } + if (currentTimeMs - lastReportTimeMs >= options.reportingIntervalMs()) { + if (options.showDetailedStats()) + printShareConsumerProgress(bytesReadByConsumer, lastBytesRead, messagesReadByConsumer, lastMessagesRead, + lastReportTimeMs, currentTimeMs, dateFormat, index); + lastReportTimeMs = currentTimeMs; + lastMessagesRead = messagesReadByConsumer; + lastBytesRead = bytesReadByConsumer; + } + shareConsumerConsumption.updateMessagesConsumed(messagesReadByConsumer); + shareConsumerConsumption.updateBytesConsumed(bytesReadByConsumer); + } + } + } + + protected static void printShareConsumerProgress(long bytesRead, + long lastBytesRead, + long messagesRead, + long lastMessagesRead, + long startMs, + long endMs, + SimpleDateFormat dateFormat, + int index) { + double elapsedMs = endMs - startMs; + double totalMbRead = (bytesRead * 1.0) / (1024 * 1024); + double intervalMbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024); + double intervalMbPerSec = 1000.0 * intervalMbRead / elapsedMs; + double intervalMessagesPerSec = ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0; + long fetchTimeMs = endMs - startMs; + + System.out.printf("%s, %s, %.4f, %.4f, %.4f, %d, %d for share consumer %d", dateFormat.format(startMs), dateFormat.format(endMs), + totalMbRead, intervalMbPerSec, intervalMessagesPerSec, messagesRead, fetchTimeMs, index); + System.out.println(); + } + + // Prints stats for both share consumer and share group. For share group, index is -1. For share consumer, + // index is >= 1. + private static void printStats(long bytesRead, + long messagesRead, + double elapsedSec, + long fetchTimeInMs, + long startMs, + long endMs, + SimpleDateFormat dateFormat, + int index) { + double totalMbRead = (bytesRead * 1.0) / (1024 * 1024); + if (index != -1) { + System.out.printf("Share consumer %s consumption metrics- %s, %s, %.4f, %.4f, %.4f, %d, %d%n", + index, + dateFormat.format(startMs), + dateFormat.format(endMs), + totalMbRead, + totalMbRead / elapsedSec, + messagesRead / elapsedSec, + messagesRead, + fetchTimeInMs + ); + return; + } + System.out.printf("%s, %s, %.4f, %.4f, %.4f, %d, %d%n", + dateFormat.format(startMs), + dateFormat.format(endMs), + totalMbRead, + totalMbRead / elapsedSec, + messagesRead / elapsedSec, + messagesRead, + fetchTimeInMs + ); + } + + protected static class ShareConsumerPerfOptions extends CommandDefaultOptions { + private final OptionSpec bootstrapServerOpt; + private final OptionSpec topicOpt; + private final OptionSpec groupIdOpt; + private final OptionSpec fetchSizeOpt; + private final OptionSpec socketBufferSizeOpt; + private final OptionSpec consumerConfigOpt; + private final OptionSpec printMetricsOpt; + private final OptionSpec showDetailedStatsOpt; + private final OptionSpec recordFetchTimeoutOpt; + private final OptionSpec numMessagesOpt; + private final OptionSpec reportingIntervalOpt; + private final OptionSpec dateFormatOpt; + private final OptionSpec hideHeaderOpt; + private final OptionSpec numThreadsOpt; + private final OptionSpec showShareConsumerStatsOpt; + + public ShareConsumerPerfOptions(String[] args) { + super(args); + bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to.") + .withRequiredArg() + .describedAs("server to connect to") + .ofType(String.class); + topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.") + .withRequiredArg() + .describedAs("topic") + .ofType(String.class); + groupIdOpt = parser.accepts("group", "The group id to consume on.") + .withRequiredArg() + .describedAs("gid") + .defaultsTo("perf-share-consumer") + .ofType(String.class); + fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.") + .withRequiredArg() + .describedAs("size") + .ofType(Integer.class) + .defaultsTo(1024 * 1024); + socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.") + .withRequiredArg() + .describedAs("size") + .ofType(Integer.class) + .defaultsTo(2 * 1024 * 1024); + consumerConfigOpt = parser.accepts("consumer.config", "Share consumer config properties file.") + .withRequiredArg() + .describedAs("config file") + .ofType(String.class); + printMetricsOpt = parser.accepts("print-metrics", "Print out the metrics."); + showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " + + "interval as configured by reporting-interval"); + recordFetchTimeoutOpt = parser.accepts("timeout", "The maximum allowed time in milliseconds between returned records.") + .withOptionalArg() + .describedAs("milliseconds") + .ofType(Long.class) + .defaultsTo(10_000L); + numMessagesOpt = parser.accepts("messages", "REQUIRED: The number of messages to send or consume") + .withRequiredArg() + .describedAs("count") + .ofType(Long.class); + reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in milliseconds at which to print progress info.") + .withRequiredArg() + .withValuesConvertedBy(regex("^\\d+$")) + .describedAs("interval_ms") + .ofType(Long.class) + .defaultsTo(5_000L); + dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " + + "See java.text.SimpleDateFormat for options.") + .withRequiredArg() + .describedAs("date format") + .ofType(String.class) + .defaultsTo("yyyy-MM-dd HH:mm:ss:SSS"); + hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats"); + numThreadsOpt = parser.accepts("threads", "The number of share consumers to use for sharing the load.") + .withRequiredArg() + .describedAs("count") + .ofType(Integer.class) + .defaultsTo(1); + showShareConsumerStatsOpt = parser.accepts("show-consumer-stats", "If set, stats are reported for each share " + + "consumer depending on the no. of threads."); + try { + options = parser.parse(args); + } catch (OptionException e) { + CommandLineUtils.printUsageAndExit(parser, e.getMessage()); + return; + } + if (options != null) { + CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is used to verify the share consumer performance."); + CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt); + } + } + + public boolean printMetrics() { + return options.has(printMetricsOpt); + } + + public String brokerHostsAndPorts() { + return options.valueOf(bootstrapServerOpt); + } + + public Properties props() throws IOException { + Properties props = (options.has(consumerConfigOpt)) + ? Utils.loadProps(options.valueOf(consumerConfigOpt)) + : new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostsAndPorts()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt)); + props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, options.valueOf(socketBufferSizeOpt).toString()); + props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, options.valueOf(fetchSizeOpt).toString()); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.CHECK_CRCS_CONFIG, "false"); + if (props.getProperty(ConsumerConfig.CLIENT_ID_CONFIG) == null) + props.put(ConsumerConfig.CLIENT_ID_CONFIG, "perf-share-consumer-client"); + return props; + } + + public Set topic() { + return Collections.singleton(options.valueOf(topicOpt)); + } + + public long numMessages() { + return options.valueOf(numMessagesOpt); + } + + public int threads() { + return options.valueOf(numThreadsOpt); + } + + public boolean showShareConsumerStats() { + return options.has(showShareConsumerStatsOpt); + } + + public long reportingIntervalMs() { + long value = options.valueOf(reportingIntervalOpt); + if (value <= 0) + throw new IllegalArgumentException("Reporting interval must be greater than 0."); + return value; + } + + public boolean showDetailedStats() { + return options.has(showDetailedStatsOpt); + } + + public SimpleDateFormat dateFormat() { + return new SimpleDateFormat(options.valueOf(dateFormatOpt)); + } + + public boolean hideHeader() { + return options.has(hideHeaderOpt); + } + + public long recordFetchTimeoutMs() { + return options.valueOf(recordFetchTimeoutOpt); + } + } + + // Helper class to know the final messages and bytes consumer by share consumer at the end of consumption. + private static class ShareConsumerConsumption { + private long messagesConsumed; + private long bytesConsumed; + + public ShareConsumerConsumption(long messagesConsumed, long bytesConsumed) { + this.messagesConsumed = messagesConsumed; + this.bytesConsumed = bytesConsumed; + } + + public long messagesConsumed() { + return messagesConsumed; + } + + public long bytesConsumed() { + return bytesConsumed; + } + + public void updateMessagesConsumed(long messagesConsumed) { + this.messagesConsumed = messagesConsumed; + } + + public void updateBytesConsumed(long bytesConsumed) { + this.bytesConsumed = bytesConsumed; + } + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java new file mode 100644 index 00000000000..853d79a7e3b --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/ShareConsumerPerformanceTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.utils.Exit; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.text.SimpleDateFormat; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ShareConsumerPerformanceTest { + private final ToolsTestUtils.MockExitProcedure exitProcedure = new ToolsTestUtils.MockExitProcedure(); + private final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS"); + + @TempDir + static Path tempDir; + + @BeforeEach + public void beforeEach() { + Exit.setExitProcedure(exitProcedure); + } + + @AfterEach + public void afterEach() { + Exit.resetExitProcedure(); + } + + @Test + public void testHeaderMatchBody() { + testHeaderMatchContent(2, + () -> ShareConsumerPerformance.printShareConsumerProgress(1024 * 1024, 0, 1, 0, 0, 1, dateFormat, 1)); + } + + @Test + public void testConfigBootStrapServer() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--messages", "10", + "--print-metrics" + }; + + ShareConsumerPerformance.ShareConsumerPerfOptions config = new ShareConsumerPerformance.ShareConsumerPerfOptions(args); + + assertEquals("localhost:9092", config.brokerHostsAndPorts()); + assertTrue(config.topic().contains("test")); + assertEquals(10, config.numMessages()); + } + + @Test + public void testConfigWithUnrecognizedOption() { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--messages", "10", + "--new-share-consumer" + }; + + String err = ToolsTestUtils.captureStandardErr(() -> new ShareConsumerPerformance.ShareConsumerPerfOptions(args)); + + assertTrue(err.contains("new-share-consumer is not a recognized option")); + } + + @Test + public void testClientIdOverride() throws IOException { + File tempFile = Files.createFile(tempDir.resolve("test_share_consumer_config.conf")).toFile(); + try (PrintWriter output = new PrintWriter(Files.newOutputStream(tempFile.toPath()))) { + output.println("client.id=share-consumer-1"); + output.flush(); + } + + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--messages", "10", + "--consumer.config", tempFile.getAbsolutePath() + }; + + ShareConsumerPerformance.ShareConsumerPerfOptions config = new ShareConsumerPerformance.ShareConsumerPerfOptions(args); + + assertEquals("share-consumer-1", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); + } + + @Test + public void testDefaultClientId() throws IOException { + String[] args = new String[]{ + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--messages", "10" + }; + + ShareConsumerPerformance.ShareConsumerPerfOptions config = new ShareConsumerPerformance.ShareConsumerPerfOptions(args); + + assertEquals("perf-share-consumer-client", config.props().getProperty(ConsumerConfig.CLIENT_ID_CONFIG)); + } + + private void testHeaderMatchContent(int expectedOutputLineCount, Runnable runnable) { + String out = ToolsTestUtils.captureStandardOut(() -> { + ShareConsumerPerformance.printHeader(); + runnable.run(); + }); + + String[] contents = out.split("\n"); + assertEquals(expectedOutputLineCount, contents.length); + String header = contents[0]; + String body = contents[1]; + + assertEquals(header.split(",\\s").length, body.split(",\\s").length); + } + +}