From b8b1bca44095df0481fc6bb0d7ea5c06686b9337 Mon Sep 17 00:00:00 2001 From: Geoff Anderson Date: Tue, 8 Sep 2015 17:59:49 -0700 Subject: [PATCH] KAFKA-2489: add benchmark for new consumer ewencp The changes here are smaller than they look - mostly refactoring/cleanup. - ConsumerPerformanceService: added new_consumer flag, and exposed more command-line settings - benchmark.py: refactored to use `parametrize` and `matrix` - this reduced some amount of repeated code - benchmark.py: added consumer performance tests with new consumer (using `parametrize`) - benchmark.py: added more detailed test descriptions - performance.py: broke into separate files Author: Geoff Anderson Reviewers: Ewen Cheslack-Postava, Jason Gustafson, Gwen Shapira Closes #179 from granders/KAFKA-2489-benchmark-new-consumer --- .../kafka/clients/consumer/KafkaConsumer.java | 2 +- .../kafka/tools/ConsumerPerformance.scala | 44 ++- tests/kafkatest/services/performance.py | 163 ----------- .../services/performance/__init__.py | 19 ++ .../performance/consumer_performance.py | 147 ++++++++++ .../performance/end_to_end_latency.py | 59 ++++ .../services/performance/performance.py | 29 ++ .../performance/producer_performance.py | 76 +++++ .../templates/tools_log4j.properties | 26 ++ .../services/templates/tools_log4j.properties | 2 +- tests/kafkatest/tests/benchmark_test.py | 267 +++++++----------- .../clients/tools/ProducerPerformance.java | 6 +- 12 files changed, 498 insertions(+), 342 deletions(-) delete mode 100644 tests/kafkatest/services/performance.py create mode 100644 tests/kafkatest/services/performance/__init__.py create mode 100644 tests/kafkatest/services/performance/consumer_performance.py create mode 100644 tests/kafkatest/services/performance/end_to_end_latency.py create mode 100644 tests/kafkatest/services/performance/performance.py create mode 100644 tests/kafkatest/services/performance/producer_performance.py create mode 100644 tests/kafkatest/services/performance/templates/tools_log4j.properties diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 382c3884bec..19ef6ebead3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -155,7 +155,7 @@ import java.util.concurrent.atomic.AtomicReference; * called test as described above. *

* The broker will automatically detect failed processes in the test group by using a heartbeat mechanism. The - * consumer will automatically ping the cluster periodically, which let's the cluster know that it is alive. As long as + * consumer will automatically ping the cluster periodically, which lets the cluster know that it is alive. As long as * the consumer is able to do this it is considered alive and retains the right to consume from the partitions assigned * to it. If it stops heartbeating for a period of time longer than session.timeout.ms then it will be * considered dead and its partitions will be assigned to another process. diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala index 0078b00e6d5..1be4b23700e 100644 --- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala @@ -17,11 +17,15 @@ package kafka.tools +import java.util + +import org.apache.kafka.common.TopicPartition + import scala.collection.JavaConversions._ import java.util.concurrent.atomic.AtomicLong import java.nio.channels.ClosedByInterruptException import org.apache.log4j.Logger -import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer} import org.apache.kafka.common.serialization.ByteArrayDeserializer import kafka.utils.CommandLineUtils import java.util.{ Random, Properties } @@ -58,7 +62,7 @@ object ConsumerPerformance { val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config.props) consumer.subscribe(List(config.topic)) startMs = System.currentTimeMillis - consume(consumer, config.numMessages, 1000, config, totalMessagesRead, totalBytesRead) + consume(consumer, List(config.topic), config.numMessages, 1000, config, totalMessagesRead, totalBytesRead) endMs = System.currentTimeMillis consumer.close() } else { @@ -93,18 +97,40 @@ object ConsumerPerformance { } } - def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], count: Long, timeout: Long, config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) { + def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], topics: List[String], count: Long, timeout: Long, config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) { var bytesRead = 0L var messagesRead = 0L - val startMs = System.currentTimeMillis - var lastReportTime: Long = startMs var lastBytesRead = 0L var lastMessagesRead = 0L - var lastConsumed = System.currentTimeMillis - while(messagesRead < count && lastConsumed >= System.currentTimeMillis - timeout) { + + // Wait for group join, metadata fetch, etc + val joinTimeout = 10000 + val isAssigned = new AtomicBoolean(false) + consumer.subscribe(topics, new ConsumerRebalanceListener { + def onPartitionsAssigned(consumer: org.apache.kafka.clients.consumer.Consumer[_, _], partitions: util.Collection[TopicPartition]) { + isAssigned.set(true) + } + def onPartitionsRevoked(consumer: org.apache.kafka.clients.consumer.Consumer[_, _], partitions: util.Collection[TopicPartition]) { + isAssigned.set(false) + }}) + val joinStart = System.currentTimeMillis() + while (!isAssigned.get()) { + if (System.currentTimeMillis() - joinStart >= joinTimeout) { + throw new Exception("Timed out waiting for initial group join.") + } + consumer.poll(100) + } + consumer.seekToBeginning() + + // Now start the benchmark + val startMs = System.currentTimeMillis + var lastReportTime: Long = startMs + var lastConsumedTime = System.currentTimeMillis + + while(messagesRead < count && System.currentTimeMillis() - lastConsumedTime <= timeout) { val records = consumer.poll(100) if(records.count() > 0) - lastConsumed = System.currentTimeMillis + lastConsumedTime = System.currentTimeMillis for(record <- records) { messagesRead += 1 if(record.key != null) @@ -121,6 +147,7 @@ object ConsumerPerformance { } } } + totalMessagesRead.set(messagesRead) totalBytesRead.set(bytesRead) } @@ -255,5 +282,4 @@ object ConsumerPerformance { } } - } diff --git a/tests/kafkatest/services/performance.py b/tests/kafkatest/services/performance.py deleted file mode 100644 index 34892e0b7eb..00000000000 --- a/tests/kafkatest/services/performance.py +++ /dev/null @@ -1,163 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ducktape.services.background_thread import BackgroundThreadService - - -class PerformanceService(BackgroundThreadService): - def __init__(self, context, num_nodes): - super(PerformanceService, self).__init__(context, num_nodes) - self.results = [None] * self.num_nodes - self.stats = [[] for x in range(self.num_nodes)] - - -class ProducerPerformanceService(PerformanceService): - def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False): - super(ProducerPerformanceService, self).__init__(context, num_nodes) - self.kafka = kafka - self.args = { - 'topic': topic, - 'num_records': num_records, - 'record_size': record_size, - 'throughput': throughput - } - self.settings = settings - self.intermediate_stats = intermediate_stats - - def _worker(self, idx, node): - args = self.args.copy() - args.update({'bootstrap_servers': self.kafka.bootstrap_servers()}) - cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance "\ - "%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s" % args - - for key,value in self.settings.items(): - cmd += " %s=%s" % (str(key), str(value)) - self.logger.debug("Producer performance %d command: %s", idx, cmd) - - def parse_stats(line): - parts = line.split(',') - return { - 'records': int(parts[0].split()[0]), - 'records_per_sec': float(parts[1].split()[0]), - 'mbps': float(parts[1].split('(')[1].split()[0]), - 'latency_avg_ms': float(parts[2].split()[0]), - 'latency_max_ms': float(parts[3].split()[0]), - 'latency_50th_ms': float(parts[4].split()[0]), - 'latency_95th_ms': float(parts[5].split()[0]), - 'latency_99th_ms': float(parts[6].split()[0]), - 'latency_999th_ms': float(parts[7].split()[0]), - } - last = None - for line in node.account.ssh_capture(cmd): - self.logger.debug("Producer performance %d: %s", idx, line.strip()) - if self.intermediate_stats: - try: - self.stats[idx-1].append(parse_stats(line)) - except: - # Sometimes there are extraneous log messages - pass - last = line - try: - self.results[idx-1] = parse_stats(last) - except: - self.logger.error("Bad last line: %s", last) - - -class ConsumerPerformanceService(PerformanceService): - def __init__(self, context, num_nodes, kafka, topic, num_records, throughput, threads=1, settings={}): - super(ConsumerPerformanceService, self).__init__(context, num_nodes) - self.kafka = kafka - self.args = { - 'topic': topic, - 'num_records': num_records, - 'throughput': throughput, - 'threads': threads, - } - self.settings = settings - - def _worker(self, idx, node): - args = self.args.copy() - args.update({'zk_connect': self.kafka.zk.connect_setting()}) - cmd = "/opt/kafka/bin/kafka-consumer-perf-test.sh "\ - "--topic %(topic)s --messages %(num_records)d --zookeeper %(zk_connect)s" % args - for key,value in self.settings.items(): - cmd += " %s=%s" % (str(key), str(value)) - self.logger.debug("Consumer performance %d command: %s", idx, cmd) - last = None - for line in node.account.ssh_capture(cmd): - self.logger.debug("Consumer performance %d: %s", idx, line.strip()) - last = line - # Parse and save the last line's information - parts = last.split(',') - - self.results[idx-1] = { - 'total_mb': float(parts[2]), - 'mbps': float(parts[3]), - 'records_per_sec': float(parts[5]), - } - - -class EndToEndLatencyService(PerformanceService): - def __init__(self, context, num_nodes, kafka, topic, num_records, consumer_fetch_max_wait=100, acks=1): - super(EndToEndLatencyService, self).__init__(context, num_nodes) - self.kafka = kafka - self.args = { - 'topic': topic, - 'num_records': num_records, - 'consumer_fetch_max_wait': consumer_fetch_max_wait, - 'acks': acks - } - - def _worker(self, idx, node): - args = self.args.copy() - args.update({ - 'zk_connect': self.kafka.zk.connect_setting(), - 'bootstrap_servers': self.kafka.bootstrap_servers(), - }) - cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.EndToEndLatency "\ - "%(bootstrap_servers)s %(topic)s %(num_records)d "\ - "%(acks)d 20" % args - self.logger.debug("End-to-end latency %d command: %s", idx, cmd) - results = {} - for line in node.account.ssh_capture(cmd): - self.logger.debug("End-to-end latency %d: %s", idx, line.strip()) - if line.startswith("Avg latency:"): - results['latency_avg_ms'] = float(line.split()[2]) - if line.startswith("Percentiles"): - results['latency_50th_ms'] = float(line.split()[3][:-1]) - results['latency_99th_ms'] = float(line.split()[6][:-1]) - results['latency_999th_ms'] = float(line.split()[9]) - self.results[idx-1] = results - - -def parse_performance_output(summary): - parts = summary.split(',') - results = { - 'records': int(parts[0].split()[0]), - 'records_per_sec': float(parts[1].split()[0]), - 'mbps': float(parts[1].split('(')[1].split()[0]), - 'latency_avg_ms': float(parts[2].split()[0]), - 'latency_max_ms': float(parts[3].split()[0]), - 'latency_50th_ms': float(parts[4].split()[0]), - 'latency_95th_ms': float(parts[5].split()[0]), - 'latency_99th_ms': float(parts[6].split()[0]), - 'latency_999th_ms': float(parts[7].split()[0]), - } - # To provide compatibility with ConsumerPerformanceService - results['total_mb'] = results['mbps'] * (results['records'] / results['records_per_sec']) - results['rate_mbps'] = results['mbps'] - results['rate_mps'] = results['records_per_sec'] - - return results diff --git a/tests/kafkatest/services/performance/__init__.py b/tests/kafkatest/services/performance/__init__.py new file mode 100644 index 00000000000..a72e3b792bd --- /dev/null +++ b/tests/kafkatest/services/performance/__init__.py @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from performance import PerformanceService +from end_to_end_latency import EndToEndLatencyService +from producer_performance import ProducerPerformanceService +from consumer_performance import ConsumerPerformanceService \ No newline at end of file diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py new file mode 100644 index 00000000000..ecaef43f14b --- /dev/null +++ b/tests/kafkatest/services/performance/consumer_performance.py @@ -0,0 +1,147 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kafkatest.services.performance import PerformanceService + +import os + + +class ConsumerPerformanceService(PerformanceService): + """ + See ConsumerPerformance.scala as the source of truth on these settings, but for reference: + + "zookeeper" "The connection string for the zookeeper connection in the form host:port. Multiple URLS can + be given to allow fail-over. This option is only used with the old consumer." + + "broker-list", "A broker list to use for connecting if using the new consumer." + + "topic", "REQUIRED: The topic to consume from." + + "group", "The group id to consume on." + + "fetch-size", "The amount of data to fetch in a single request." + + "from-latest", "If the consumer does not already have an establishedoffset to consume from, + start with the latest message present in the log rather than the earliest message." + + "socket-buffer-size", "The size of the tcp RECV size." + + "threads", "Number of processing threads." + + "num-fetch-threads", "Number of fetcher threads. Defaults to 1" + + "new-consumer", "Use the new consumer implementation." + """ + + # Root directory for persistent output + PERSISTENT_ROOT = "/mnt/consumer_performance" + LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs") + STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "consumer_performance.stdout") + LOG_FILE = os.path.join(LOG_DIR, "consumer_performance.log") + LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties") + + logs = { + "consumer_performance_output": { + "path": STDOUT_CAPTURE, + "collect_default": True}, + + "consumer_performance_log": { + "path": LOG_FILE, + "collect_default": True} + } + + def __init__(self, context, num_nodes, kafka, topic, messages, new_consumer=False, settings={}): + super(ConsumerPerformanceService, self).__init__(context, num_nodes) + self.kafka = kafka + self.topic = topic + self.messages = messages + self.new_consumer = new_consumer + self.settings = settings + + # These less-frequently used settings can be updated manually after instantiation + self.fetch_size = None + self.socket_buffer_size = None + self.threads = None + self.num_fetch_threads = None + self.group = None + self.from_latest = None + + @property + def args(self): + """Dictionary of arguments used to start the Consumer Performance script.""" + args = { + 'topic': self.topic, + 'messages': self.messages, + } + + if self.new_consumer: + args['new-consumer'] = "" + args['broker-list'] = self.kafka.bootstrap_servers() + else: + args['zookeeper'] = self.kafka.zk.connect_setting() + + if self.fetch_size is not None: + args['fetch-size'] = self.fetch_size + + if self.socket_buffer_size is not None: + args['socket-buffer-size'] = self.socket_buffer_size + + if self.threads is not None: + args['threads'] = self.threads + + if self.num_fetch_threads is not None: + args['num-fetch-threads'] = self.num_fetch_threads + + if self.group is not None: + args['group'] = self.group + + if self.from_latest: + args['from-latest'] = "" + + return args + + @property + def start_cmd(self): + cmd = "export LOG_DIR=%s;" % ConsumerPerformanceService.LOG_DIR + cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsumerPerformanceService.LOG4J_CONFIG + cmd += " /opt/kafka/bin/kafka-consumer-perf-test.sh" + for key, value in self.args.items(): + cmd += " --%s %s" % (key, value) + + for key, value in self.settings.items(): + cmd += " %s=%s" % (str(key), str(value)) + + cmd += " | tee %s" % ConsumerPerformanceService.STDOUT_CAPTURE + return cmd + + def _worker(self, idx, node): + node.account.ssh("mkdir -p %s" % ConsumerPerformanceService.PERSISTENT_ROOT, allow_fail=False) + + log_config = self.render('tools_log4j.properties', log_file=ConsumerPerformanceService.LOG_FILE) + node.account.create_file(ConsumerPerformanceService.LOG4J_CONFIG, log_config) + + cmd = self.start_cmd + self.logger.debug("Consumer performance %d command: %s", idx, cmd) + last = None + for line in node.account.ssh_capture(cmd): + last = line + # Parse and save the last line's information + parts = last.split(',') + + self.results[idx-1] = { + 'total_mb': float(parts[2]), + 'mbps': float(parts[3]), + 'records_per_sec': float(parts[5]), + } diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py b/tests/kafkatest/services/performance/end_to_end_latency.py new file mode 100644 index 00000000000..4c61a93e8f2 --- /dev/null +++ b/tests/kafkatest/services/performance/end_to_end_latency.py @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kafkatest.services.performance import PerformanceService + + +class EndToEndLatencyService(PerformanceService): + + logs = { + "end_to_end_latency_log": { + "path": "/mnt/end-to-end-latency.log", + "collect_default": True}, + } + + def __init__(self, context, num_nodes, kafka, topic, num_records, consumer_fetch_max_wait=100, acks=1): + super(EndToEndLatencyService, self).__init__(context, num_nodes) + self.kafka = kafka + self.args = { + 'topic': topic, + 'num_records': num_records, + 'consumer_fetch_max_wait': consumer_fetch_max_wait, + 'acks': acks + } + + def _worker(self, idx, node): + args = self.args.copy() + args.update({ + 'zk_connect': self.kafka.zk.connect_setting(), + 'bootstrap_servers': self.kafka.bootstrap_servers(), + }) + + cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.EndToEndLatency "\ + "%(bootstrap_servers)s %(topic)s %(num_records)d "\ + "%(acks)d 20" % args + + cmd += " | tee /mnt/end-to-end-latency.log" + + self.logger.debug("End-to-end latency %d command: %s", idx, cmd) + results = {} + for line in node.account.ssh_capture(cmd): + if line.startswith("Avg latency:"): + results['latency_avg_ms'] = float(line.split()[2]) + if line.startswith("Percentiles"): + results['latency_50th_ms'] = float(line.split()[3][:-1]) + results['latency_99th_ms'] = float(line.split()[6][:-1]) + results['latency_999th_ms'] = float(line.split()[9]) + self.results[idx-1] = results diff --git a/tests/kafkatest/services/performance/performance.py b/tests/kafkatest/services/performance/performance.py new file mode 100644 index 00000000000..6d286f60fad --- /dev/null +++ b/tests/kafkatest/services/performance/performance.py @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.services.background_thread import BackgroundThreadService + + +class PerformanceService(BackgroundThreadService): + + def __init__(self, context, num_nodes): + super(PerformanceService, self).__init__(context, num_nodes) + self.results = [None] * self.num_nodes + self.stats = [[] for x in range(self.num_nodes)] + + def clean_node(self, node): + node.account.kill_process("java", clean_shutdown=False, allow_fail=True) + node.account.ssh("rm -rf /mnt/*", allow_fail=False) + diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py new file mode 100644 index 00000000000..c46a910f293 --- /dev/null +++ b/tests/kafkatest/services/performance/producer_performance.py @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kafkatest.services.performance import PerformanceService + + +class ProducerPerformanceService(PerformanceService): + + logs = { + "producer_performance_log": { + "path": "/mnt/producer-performance.log", + "collect_default": True}, + } + + def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False): + super(ProducerPerformanceService, self).__init__(context, num_nodes) + self.kafka = kafka + self.args = { + 'topic': topic, + 'num_records': num_records, + 'record_size': record_size, + 'throughput': throughput + } + self.settings = settings + self.intermediate_stats = intermediate_stats + + def _worker(self, idx, node): + args = self.args.copy() + args.update({'bootstrap_servers': self.kafka.bootstrap_servers()}) + cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance "\ + "%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s"\ + " | tee /mnt/producer-performance.log" % args + + for key, value in self.settings.items(): + cmd += " %s=%s" % (str(key), str(value)) + self.logger.debug("Producer performance %d command: %s", idx, cmd) + + def parse_stats(line): + parts = line.split(',') + return { + 'records': int(parts[0].split()[0]), + 'records_per_sec': float(parts[1].split()[0]), + 'mbps': float(parts[1].split('(')[1].split()[0]), + 'latency_avg_ms': float(parts[2].split()[0]), + 'latency_max_ms': float(parts[3].split()[0]), + 'latency_50th_ms': float(parts[4].split()[0]), + 'latency_95th_ms': float(parts[5].split()[0]), + 'latency_99th_ms': float(parts[6].split()[0]), + 'latency_999th_ms': float(parts[7].split()[0]), + } + last = None + for line in node.account.ssh_capture(cmd): + if self.intermediate_stats: + try: + self.stats[idx-1].append(parse_stats(line)) + except: + # Sometimes there are extraneous log messages + pass + + last = line + try: + self.results[idx-1] = parse_stats(last) + except: + raise Exception("Unable to parse aggregate performance statistics on node %d: %s" % (idx, last)) diff --git a/tests/kafkatest/services/performance/templates/tools_log4j.properties b/tests/kafkatest/services/performance/templates/tools_log4j.properties new file mode 100644 index 00000000000..ce30d527abc --- /dev/null +++ b/tests/kafkatest/services/performance/templates/tools_log4j.properties @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Define the root logger with appender file +log4j.rootLogger = {{ log_level|default("INFO") }}, FILE + +log4j.appender.FILE=org.apache.log4j.FileAppender +log4j.appender.FILE.File={{ log_file }} +log4j.appender.FILE.ImmediateFlush=true +log4j.appender.FILE.Threshold=debug +# Set the append to false, overwrite +log4j.appender.FILE.Append=false +log4j.appender.FILE.layout=org.apache.log4j.PatternLayout +log4j.appender.FILE.layout.conversionPattern=[%d] %p %m (%c)%n \ No newline at end of file diff --git a/tests/kafkatest/services/templates/tools_log4j.properties b/tests/kafkatest/services/templates/tools_log4j.properties index e63e6d6004e..ce30d527abc 100644 --- a/tests/kafkatest/services/templates/tools_log4j.properties +++ b/tests/kafkatest/services/templates/tools_log4j.properties @@ -14,7 +14,7 @@ # limitations under the License. # Define the root logger with appender file -log4j.rootLogger = INFO, FILE +log4j.rootLogger = {{ log_level|default("INFO") }}, FILE log4j.appender.FILE=org.apache.log4j.FileAppender log4j.appender.FILE.File={{ log_file }} diff --git a/tests/kafkatest/tests/benchmark_test.py b/tests/kafkatest/tests/benchmark_test.py index b01f27bc20c..02503ec44b0 100644 --- a/tests/kafkatest/tests/benchmark_test.py +++ b/tests/kafkatest/tests/benchmark_test.py @@ -14,136 +14,90 @@ # limitations under the License. from ducktape.services.service import Service +from ducktape.mark import parametrize +from ducktape.mark import matrix from kafkatest.tests.kafka_test import KafkaTest -from kafkatest.services.performance import ProducerPerformanceService, ConsumerPerformanceService, EndToEndLatencyService +from kafkatest.services.performance import ProducerPerformanceService, EndToEndLatencyService, ConsumerPerformanceService + + +TOPIC_REP_ONE = "topic-replication-factor-one" +TOPIC_REP_THREE = "topic-replication-factor-three" +DEFAULT_RECORD_SIZE = 100 # bytes class Benchmark(KafkaTest): - '''A benchmark of Kafka producer/consumer performance. This replicates the test + """A benchmark of Kafka producer/consumer performance. This replicates the test run here: https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines - ''' + """ def __init__(self, test_context): super(Benchmark, self).__init__(test_context, num_zk=1, num_brokers=3, topics={ - 'test-rep-one' : { 'partitions': 6, 'replication-factor': 1 }, - 'test-rep-three' : { 'partitions': 6, 'replication-factor': 3 } + TOPIC_REP_ONE: {'partitions': 6, 'replication-factor': 1}, + TOPIC_REP_THREE: {'partitions': 6, 'replication-factor': 3} }) - if True: - # Works on both aws and local - self.msgs = 1000000 - self.msgs_default = 1000000 - else: - # Can use locally on Vagrant VMs, but may use too much memory for aws - self.msgs = 50000000 - self.msgs_default = 50000000 - self.msgs_large = 10000000 - self.msg_size_default = 100 self.batch_size = 8*1024 self.buffer_memory = 64*1024*1024 self.msg_sizes = [10, 100, 1000, 10000, 100000] self.target_data_size = 128*1024*1024 self.target_data_size_gb = self.target_data_size/float(1024*1024*1024) - def test_single_producer_no_replication(self): - self.logger.info("BENCHMARK: Single producer, no replication") - self.perf = ProducerPerformanceService( + @parametrize(acks=1, topic=TOPIC_REP_ONE, num_producers=1, message_size=DEFAULT_RECORD_SIZE) + @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=1, message_size=DEFAULT_RECORD_SIZE) + @parametrize(acks=-1, topic=TOPIC_REP_THREE, num_producers=1, message_size=DEFAULT_RECORD_SIZE) + @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3, message_size=DEFAULT_RECORD_SIZE) + @matrix(acks=[1], topic=[TOPIC_REP_THREE], num_producers=[1], message_size=[10, 100, 1000, 10000, 100000]) + def test_producer_throughput(self, acks, topic, num_producers, message_size): + """ + Setup: 1 node zk + 3 node kafka cluster + Produce ~128MB worth of messages to a topic with 6 partitions. Required acks, topic replication factor, + and message size are varied depending on arguments injected into this test. + + Collect and return aggregate throughput statistics after all messages have been acknowledged. + (This runs ProducerPerformance.java under the hood) + """ + # Always generate the same total amount of data + nrecords = int(self.target_data_size / message_size) + + self.producer = ProducerPerformanceService( + self.test_context, num_producers, self.kafka, topic=topic, + num_records=nrecords, record_size=message_size, throughput=-1, + settings={ + 'acks': acks, + 'batch.size': self.batch_size, + 'buffer.memory': self.buffer_memory}) + self.producer.run() + return compute_aggregate_throughput(self.producer) + + def test_long_term_producer_throughput(self): + """ + Setup: 1 node zk + 3 node kafka cluster + Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1. + + Collect and return aggregate throughput statistics after all messages have been acknowledged. + + (This runs ProducerPerformance.java under the hood) + """ + self.producer = ProducerPerformanceService( self.test_context, 1, self.kafka, - topic="test-rep-one", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1, - settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory} - ) - self.perf.run() - data = compute_throughput(self.perf) - self.logger.info("Single producer, no replication: %s", str(data)) - return data - - def test_single_producer_replication(self): - self.logger.info("BENCHMARK: Single producer, async 3x replication") - self.perf = ProducerPerformanceService( - self.test_context, 1, self.kafka, - topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1, - settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory} - ) - self.perf.run() - data = compute_throughput(self.perf) - self.logger.info("Single producer, async 3x replication: %s" % str(data)) - return data - - def test_single_producer_sync(self): - self.logger.info("BENCHMARK: Single producer, sync 3x replication") - self.perf = ProducerPerformanceService( - self.test_context, 1, self.kafka, - topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1, - settings={'acks':-1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory} - ) - self.perf.run() - - data = compute_throughput(self.perf) - self.logger.info("Single producer, sync 3x replication: %s" % data) - return data - - def test_three_producers_async(self): - self.logger.info("BENCHMARK: Three producers, async 3x replication") - self.perf = ProducerPerformanceService( - self.test_context, 3, self.kafka, - topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1, - settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory} - ) - self.perf.run() - - data = compute_throughput(self.perf) - self.logger.info("Three producers, async 3x replication: %s" % data) - return data - - def test_multiple_message_size(self): - # TODO this would be a great place to use parametrization - self.perfs = {} - for msg_size in self.msg_sizes: - self.logger.info("BENCHMARK: Message size %d (%f GB total, single producer, async 3x replication)", msg_size, self.target_data_size_gb) - # Always generate the same total amount of data - nrecords = int(self.target_data_size / msg_size) - self.perfs["perf-" + str(msg_size)] = ProducerPerformanceService( - self.test_context, 1, self.kafka, - topic="test-rep-three", num_records=nrecords, record_size=msg_size, throughput=-1, - settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory} - ) - - self.msg_size_perf = {} - for msg_size in self.msg_sizes: - perf = self.perfs["perf-" + str(msg_size)] - perf.run() - self.msg_size_perf[msg_size] = perf - - summary = ["Message size:"] - data = {} - for msg_size in self.msg_sizes: - datum = compute_throughput(self.msg_size_perf[msg_size]) - summary.append(" %d: %s" % (msg_size, datum)) - data[msg_size] = datum - self.logger.info("\n".join(summary)) - return data - - def test_long_term_throughput(self): - self.logger.info("BENCHMARK: Long production") - self.perf = ProducerPerformanceService( - self.test_context, 1, self.kafka, - topic="test-rep-three", num_records=self.msgs_large, record_size=self.msg_size_default, throughput=-1, - settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}, + topic=TOPIC_REP_THREE, num_records=self.msgs_large, record_size=DEFAULT_RECORD_SIZE, + throughput=-1, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}, intermediate_stats=True ) - self.perf.run() + self.producer.run() summary = ["Throughput over long run, data > memory:"] data = {} # FIXME we should be generating a graph too # Try to break it into 5 blocks, but fall back to a smaller number if # there aren't even 5 elements - block_size = max(len(self.perf.stats[0]) / 5, 1) - nblocks = len(self.perf.stats[0]) / block_size + block_size = max(len(self.producer.stats[0]) / 5, 1) + nblocks = len(self.producer.stats[0]) / block_size + for i in range(nblocks): - subset = self.perf.stats[0][i*block_size:min((i+1)*block_size, len(self.perf.stats[0]))] + subset = self.producer.stats[0][i*block_size:min((i+1)*block_size, len(self.producer.stats[0]))] if len(subset) == 0: summary.append(" Time block %d: (empty)" % i) data[i] = None @@ -158,35 +112,48 @@ class Benchmark(KafkaTest): return data def test_end_to_end_latency(self): + """ + Setup: 1 node zk + 3 node kafka cluster + Produce (acks = 1) and consume 10e3 messages to a topic with 6 partitions and replication-factor 3, + measuring the latency between production and consumption of each message. + + Return aggregate latency statistics. + + (Under the hood, this simply runs EndToEndLatency.scala) + """ self.logger.info("BENCHMARK: End to end latency") self.perf = EndToEndLatencyService( self.test_context, 1, self.kafka, - topic="test-rep-three", num_records=10000 + topic=TOPIC_REP_THREE, num_records=10000 ) self.perf.run() + return latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms']) - data = latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms']) - self.logger.info("End-to-end latency: %s" % str(data)) - return data + @matrix(new_consumer=[True, False]) + def test_producer_and_consumer(self, new_consumer=False): + """ + Setup: 1 node zk + 3 node kafka cluster + Concurrently produce and consume 10e6 messages with a single producer and a single consumer, + using new consumer if new_consumer == True + + Return aggregate throughput statistics for both producer and consumer. + + (Under the hood, this runs ProducerPerformance.java, and ConsumerPerformance.scala) + """ + num_records = 10 * 1000 * 1000 # 10e6 - def test_producer_and_consumer(self): - self.logger.info("BENCHMARK: Producer + Consumer") self.producer = ProducerPerformanceService( self.test_context, 1, self.kafka, - topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1, - settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory} + topic=TOPIC_REP_THREE, num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, + settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory} ) - self.consumer = ConsumerPerformanceService( - self.test_context, 1, self.kafka, - topic="test-rep-three", num_records=self.msgs_default, throughput=-1, threads=1 - ) - + self.test_context, 1, self.kafka, topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records) Service.run_parallel(self.producer, self.consumer) data = { - "producer": compute_throughput(self.producer), - "consumer": compute_throughput(self.consumer) + "producer": compute_aggregate_throughput(self.producer), + "consumer": compute_aggregate_throughput(self.consumer) } summary = [ "Producer + consumer:", @@ -194,49 +161,29 @@ class Benchmark(KafkaTest): self.logger.info("\n".join(summary)) return data - def test_single_consumer(self): - topic = "test-rep-three" + @matrix(new_consumer=[True, False], num_consumers=[1]) + def test_consumer_throughput(self, new_consumer, num_consumers): + """ + Consume 10e6 100-byte messages with 1 or more consumers from a topic with 6 partitions + (using new consumer iff new_consumer == True), and report throughput. + """ + num_records = 10 * 1000 * 1000 # 10e6 + # seed kafka w/messages self.producer = ProducerPerformanceService( self.test_context, 1, self.kafka, - topic=topic, num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1, - settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory} + topic=TOPIC_REP_THREE, num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1, + settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory} ) self.producer.run() - # All consumer tests use the messages from the first benchmark, so - # they'll get messages of the default message size - self.logger.info("BENCHMARK: Single consumer") - self.perf = ConsumerPerformanceService( - self.test_context, 1, self.kafka, - topic=topic, num_records=self.msgs_default, throughput=-1, threads=1 - ) - self.perf.run() - - data = compute_throughput(self.perf) - self.logger.info("Single consumer: %s" % data) - return data - - def test_three_consumers(self): - topic = "test-rep-three" - - self.producer = ProducerPerformanceService( - self.test_context, 1, self.kafka, - topic=topic, num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1, - settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory} - ) - self.producer.run() - - self.logger.info("BENCHMARK: Three consumers") - self.perf = ConsumerPerformanceService( - self.test_context, 3, self.kafka, - topic="test-rep-three", num_records=self.msgs_default, throughput=-1, threads=1 - ) - self.perf.run() - - data = compute_throughput(self.perf) - self.logger.info("Three consumers: %s", data) - return data + # consume + self.consumer = ConsumerPerformanceService( + self.test_context, num_consumers, self.kafka, + topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records) + self.consumer.group = "test-consumer-group" + self.consumer.run() + return compute_aggregate_throughput(self.consumer) def throughput(records_per_sec, mb_per_sec): @@ -256,19 +203,9 @@ def latency(latency_50th_ms, latency_99th_ms, latency_999th_ms): } -def compute_throughput(perf): +def compute_aggregate_throughput(perf): """Helper method for computing throughput after running a performance service.""" aggregate_rate = sum([r['records_per_sec'] for r in perf.results]) aggregate_mbps = sum([r['mbps'] for r in perf.results]) return throughput(aggregate_rate, aggregate_mbps) - - - - - - - - - - diff --git a/tools/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java index fd31c1a9fb6..1a9cf04cdcc 100644 --- a/tools/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java @@ -148,9 +148,9 @@ public class ProducerPerformance { } public void printTotal() { - long ellapsed = System.currentTimeMillis() - start; - double recsPerSec = 1000.0 * count / (double) ellapsed; - double mbPerSec = 1000.0 * this.bytes / (double) ellapsed / (1024.0 * 1024.0); + long elapsed = System.currentTimeMillis() - start; + double recsPerSec = 1000.0 * count / (double) elapsed; + double mbPerSec = 1000.0 * this.bytes / (double) elapsed / (1024.0 * 1024.0); int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999); System.out.printf("%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.\n", count,