diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 382c3884bec..19ef6ebead3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -155,7 +155,7 @@ import java.util.concurrent.atomic.AtomicReference; * called test as described above. *
* The broker will automatically detect failed processes in the test group by using a heartbeat mechanism. The
- * consumer will automatically ping the cluster periodically, which let's the cluster know that it is alive. As long as
+ * consumer will automatically ping the cluster periodically, which lets the cluster know that it is alive. As long as
* the consumer is able to do this it is considered alive and retains the right to consume from the partitions assigned
* to it. If it stops heartbeating for a period of time longer than session.timeout.ms then it will be
* considered dead and its partitions will be assigned to another process.
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 0078b00e6d5..1be4b23700e 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -17,11 +17,15 @@
package kafka.tools
+import java.util
+
+import org.apache.kafka.common.TopicPartition
+
import scala.collection.JavaConversions._
import java.util.concurrent.atomic.AtomicLong
import java.nio.channels.ClosedByInterruptException
import org.apache.log4j.Logger
-import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer}
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import kafka.utils.CommandLineUtils
import java.util.{ Random, Properties }
@@ -58,7 +62,7 @@ object ConsumerPerformance {
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config.props)
consumer.subscribe(List(config.topic))
startMs = System.currentTimeMillis
- consume(consumer, config.numMessages, 1000, config, totalMessagesRead, totalBytesRead)
+ consume(consumer, List(config.topic), config.numMessages, 1000, config, totalMessagesRead, totalBytesRead)
endMs = System.currentTimeMillis
consumer.close()
} else {
@@ -93,18 +97,40 @@ object ConsumerPerformance {
}
}
- def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], count: Long, timeout: Long, config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) {
+ def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], topics: List[String], count: Long, timeout: Long, config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) {
var bytesRead = 0L
var messagesRead = 0L
- val startMs = System.currentTimeMillis
- var lastReportTime: Long = startMs
var lastBytesRead = 0L
var lastMessagesRead = 0L
- var lastConsumed = System.currentTimeMillis
- while(messagesRead < count && lastConsumed >= System.currentTimeMillis - timeout) {
+
+ // Wait for group join, metadata fetch, etc
+ val joinTimeout = 10000
+ val isAssigned = new AtomicBoolean(false)
+ consumer.subscribe(topics, new ConsumerRebalanceListener {
+ def onPartitionsAssigned(consumer: org.apache.kafka.clients.consumer.Consumer[_, _], partitions: util.Collection[TopicPartition]) {
+ isAssigned.set(true)
+ }
+ def onPartitionsRevoked(consumer: org.apache.kafka.clients.consumer.Consumer[_, _], partitions: util.Collection[TopicPartition]) {
+ isAssigned.set(false)
+ }})
+ val joinStart = System.currentTimeMillis()
+ while (!isAssigned.get()) {
+ if (System.currentTimeMillis() - joinStart >= joinTimeout) {
+ throw new Exception("Timed out waiting for initial group join.")
+ }
+ consumer.poll(100)
+ }
+ consumer.seekToBeginning()
+
+ // Now start the benchmark
+ val startMs = System.currentTimeMillis
+ var lastReportTime: Long = startMs
+ var lastConsumedTime = System.currentTimeMillis
+
+ while(messagesRead < count && System.currentTimeMillis() - lastConsumedTime <= timeout) {
val records = consumer.poll(100)
if(records.count() > 0)
- lastConsumed = System.currentTimeMillis
+ lastConsumedTime = System.currentTimeMillis
for(record <- records) {
messagesRead += 1
if(record.key != null)
@@ -121,6 +147,7 @@ object ConsumerPerformance {
}
}
}
+
totalMessagesRead.set(messagesRead)
totalBytesRead.set(bytesRead)
}
@@ -255,5 +282,4 @@ object ConsumerPerformance {
}
}
-
}
diff --git a/tests/kafkatest/services/performance.py b/tests/kafkatest/services/performance.py
deleted file mode 100644
index 34892e0b7eb..00000000000
--- a/tests/kafkatest/services/performance.py
+++ /dev/null
@@ -1,163 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.services.background_thread import BackgroundThreadService
-
-
-class PerformanceService(BackgroundThreadService):
- def __init__(self, context, num_nodes):
- super(PerformanceService, self).__init__(context, num_nodes)
- self.results = [None] * self.num_nodes
- self.stats = [[] for x in range(self.num_nodes)]
-
-
-class ProducerPerformanceService(PerformanceService):
- def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False):
- super(ProducerPerformanceService, self).__init__(context, num_nodes)
- self.kafka = kafka
- self.args = {
- 'topic': topic,
- 'num_records': num_records,
- 'record_size': record_size,
- 'throughput': throughput
- }
- self.settings = settings
- self.intermediate_stats = intermediate_stats
-
- def _worker(self, idx, node):
- args = self.args.copy()
- args.update({'bootstrap_servers': self.kafka.bootstrap_servers()})
- cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance "\
- "%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s" % args
-
- for key,value in self.settings.items():
- cmd += " %s=%s" % (str(key), str(value))
- self.logger.debug("Producer performance %d command: %s", idx, cmd)
-
- def parse_stats(line):
- parts = line.split(',')
- return {
- 'records': int(parts[0].split()[0]),
- 'records_per_sec': float(parts[1].split()[0]),
- 'mbps': float(parts[1].split('(')[1].split()[0]),
- 'latency_avg_ms': float(parts[2].split()[0]),
- 'latency_max_ms': float(parts[3].split()[0]),
- 'latency_50th_ms': float(parts[4].split()[0]),
- 'latency_95th_ms': float(parts[5].split()[0]),
- 'latency_99th_ms': float(parts[6].split()[0]),
- 'latency_999th_ms': float(parts[7].split()[0]),
- }
- last = None
- for line in node.account.ssh_capture(cmd):
- self.logger.debug("Producer performance %d: %s", idx, line.strip())
- if self.intermediate_stats:
- try:
- self.stats[idx-1].append(parse_stats(line))
- except:
- # Sometimes there are extraneous log messages
- pass
- last = line
- try:
- self.results[idx-1] = parse_stats(last)
- except:
- self.logger.error("Bad last line: %s", last)
-
-
-class ConsumerPerformanceService(PerformanceService):
- def __init__(self, context, num_nodes, kafka, topic, num_records, throughput, threads=1, settings={}):
- super(ConsumerPerformanceService, self).__init__(context, num_nodes)
- self.kafka = kafka
- self.args = {
- 'topic': topic,
- 'num_records': num_records,
- 'throughput': throughput,
- 'threads': threads,
- }
- self.settings = settings
-
- def _worker(self, idx, node):
- args = self.args.copy()
- args.update({'zk_connect': self.kafka.zk.connect_setting()})
- cmd = "/opt/kafka/bin/kafka-consumer-perf-test.sh "\
- "--topic %(topic)s --messages %(num_records)d --zookeeper %(zk_connect)s" % args
- for key,value in self.settings.items():
- cmd += " %s=%s" % (str(key), str(value))
- self.logger.debug("Consumer performance %d command: %s", idx, cmd)
- last = None
- for line in node.account.ssh_capture(cmd):
- self.logger.debug("Consumer performance %d: %s", idx, line.strip())
- last = line
- # Parse and save the last line's information
- parts = last.split(',')
-
- self.results[idx-1] = {
- 'total_mb': float(parts[2]),
- 'mbps': float(parts[3]),
- 'records_per_sec': float(parts[5]),
- }
-
-
-class EndToEndLatencyService(PerformanceService):
- def __init__(self, context, num_nodes, kafka, topic, num_records, consumer_fetch_max_wait=100, acks=1):
- super(EndToEndLatencyService, self).__init__(context, num_nodes)
- self.kafka = kafka
- self.args = {
- 'topic': topic,
- 'num_records': num_records,
- 'consumer_fetch_max_wait': consumer_fetch_max_wait,
- 'acks': acks
- }
-
- def _worker(self, idx, node):
- args = self.args.copy()
- args.update({
- 'zk_connect': self.kafka.zk.connect_setting(),
- 'bootstrap_servers': self.kafka.bootstrap_servers(),
- })
- cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.EndToEndLatency "\
- "%(bootstrap_servers)s %(topic)s %(num_records)d "\
- "%(acks)d 20" % args
- self.logger.debug("End-to-end latency %d command: %s", idx, cmd)
- results = {}
- for line in node.account.ssh_capture(cmd):
- self.logger.debug("End-to-end latency %d: %s", idx, line.strip())
- if line.startswith("Avg latency:"):
- results['latency_avg_ms'] = float(line.split()[2])
- if line.startswith("Percentiles"):
- results['latency_50th_ms'] = float(line.split()[3][:-1])
- results['latency_99th_ms'] = float(line.split()[6][:-1])
- results['latency_999th_ms'] = float(line.split()[9])
- self.results[idx-1] = results
-
-
-def parse_performance_output(summary):
- parts = summary.split(',')
- results = {
- 'records': int(parts[0].split()[0]),
- 'records_per_sec': float(parts[1].split()[0]),
- 'mbps': float(parts[1].split('(')[1].split()[0]),
- 'latency_avg_ms': float(parts[2].split()[0]),
- 'latency_max_ms': float(parts[3].split()[0]),
- 'latency_50th_ms': float(parts[4].split()[0]),
- 'latency_95th_ms': float(parts[5].split()[0]),
- 'latency_99th_ms': float(parts[6].split()[0]),
- 'latency_999th_ms': float(parts[7].split()[0]),
- }
- # To provide compatibility with ConsumerPerformanceService
- results['total_mb'] = results['mbps'] * (results['records'] / results['records_per_sec'])
- results['rate_mbps'] = results['mbps']
- results['rate_mps'] = results['records_per_sec']
-
- return results
diff --git a/tests/kafkatest/services/performance/__init__.py b/tests/kafkatest/services/performance/__init__.py
new file mode 100644
index 00000000000..a72e3b792bd
--- /dev/null
+++ b/tests/kafkatest/services/performance/__init__.py
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from performance import PerformanceService
+from end_to_end_latency import EndToEndLatencyService
+from producer_performance import ProducerPerformanceService
+from consumer_performance import ConsumerPerformanceService
\ No newline at end of file
diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py
new file mode 100644
index 00000000000..ecaef43f14b
--- /dev/null
+++ b/tests/kafkatest/services/performance/consumer_performance.py
@@ -0,0 +1,147 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.services.performance import PerformanceService
+
+import os
+
+
+class ConsumerPerformanceService(PerformanceService):
+ """
+ See ConsumerPerformance.scala as the source of truth on these settings, but for reference:
+
+ "zookeeper" "The connection string for the zookeeper connection in the form host:port. Multiple URLS can
+ be given to allow fail-over. This option is only used with the old consumer."
+
+ "broker-list", "A broker list to use for connecting if using the new consumer."
+
+ "topic", "REQUIRED: The topic to consume from."
+
+ "group", "The group id to consume on."
+
+ "fetch-size", "The amount of data to fetch in a single request."
+
+ "from-latest", "If the consumer does not already have an establishedoffset to consume from,
+ start with the latest message present in the log rather than the earliest message."
+
+ "socket-buffer-size", "The size of the tcp RECV size."
+
+ "threads", "Number of processing threads."
+
+ "num-fetch-threads", "Number of fetcher threads. Defaults to 1"
+
+ "new-consumer", "Use the new consumer implementation."
+ """
+
+ # Root directory for persistent output
+ PERSISTENT_ROOT = "/mnt/consumer_performance"
+ LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
+ STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "consumer_performance.stdout")
+ LOG_FILE = os.path.join(LOG_DIR, "consumer_performance.log")
+ LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
+
+ logs = {
+ "consumer_performance_output": {
+ "path": STDOUT_CAPTURE,
+ "collect_default": True},
+
+ "consumer_performance_log": {
+ "path": LOG_FILE,
+ "collect_default": True}
+ }
+
+ def __init__(self, context, num_nodes, kafka, topic, messages, new_consumer=False, settings={}):
+ super(ConsumerPerformanceService, self).__init__(context, num_nodes)
+ self.kafka = kafka
+ self.topic = topic
+ self.messages = messages
+ self.new_consumer = new_consumer
+ self.settings = settings
+
+ # These less-frequently used settings can be updated manually after instantiation
+ self.fetch_size = None
+ self.socket_buffer_size = None
+ self.threads = None
+ self.num_fetch_threads = None
+ self.group = None
+ self.from_latest = None
+
+ @property
+ def args(self):
+ """Dictionary of arguments used to start the Consumer Performance script."""
+ args = {
+ 'topic': self.topic,
+ 'messages': self.messages,
+ }
+
+ if self.new_consumer:
+ args['new-consumer'] = ""
+ args['broker-list'] = self.kafka.bootstrap_servers()
+ else:
+ args['zookeeper'] = self.kafka.zk.connect_setting()
+
+ if self.fetch_size is not None:
+ args['fetch-size'] = self.fetch_size
+
+ if self.socket_buffer_size is not None:
+ args['socket-buffer-size'] = self.socket_buffer_size
+
+ if self.threads is not None:
+ args['threads'] = self.threads
+
+ if self.num_fetch_threads is not None:
+ args['num-fetch-threads'] = self.num_fetch_threads
+
+ if self.group is not None:
+ args['group'] = self.group
+
+ if self.from_latest:
+ args['from-latest'] = ""
+
+ return args
+
+ @property
+ def start_cmd(self):
+ cmd = "export LOG_DIR=%s;" % ConsumerPerformanceService.LOG_DIR
+ cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsumerPerformanceService.LOG4J_CONFIG
+ cmd += " /opt/kafka/bin/kafka-consumer-perf-test.sh"
+ for key, value in self.args.items():
+ cmd += " --%s %s" % (key, value)
+
+ for key, value in self.settings.items():
+ cmd += " %s=%s" % (str(key), str(value))
+
+ cmd += " | tee %s" % ConsumerPerformanceService.STDOUT_CAPTURE
+ return cmd
+
+ def _worker(self, idx, node):
+ node.account.ssh("mkdir -p %s" % ConsumerPerformanceService.PERSISTENT_ROOT, allow_fail=False)
+
+ log_config = self.render('tools_log4j.properties', log_file=ConsumerPerformanceService.LOG_FILE)
+ node.account.create_file(ConsumerPerformanceService.LOG4J_CONFIG, log_config)
+
+ cmd = self.start_cmd
+ self.logger.debug("Consumer performance %d command: %s", idx, cmd)
+ last = None
+ for line in node.account.ssh_capture(cmd):
+ last = line
+ # Parse and save the last line's information
+ parts = last.split(',')
+
+ self.results[idx-1] = {
+ 'total_mb': float(parts[2]),
+ 'mbps': float(parts[3]),
+ 'records_per_sec': float(parts[5]),
+ }
diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py b/tests/kafkatest/services/performance/end_to_end_latency.py
new file mode 100644
index 00000000000..4c61a93e8f2
--- /dev/null
+++ b/tests/kafkatest/services/performance/end_to_end_latency.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.services.performance import PerformanceService
+
+
+class EndToEndLatencyService(PerformanceService):
+
+ logs = {
+ "end_to_end_latency_log": {
+ "path": "/mnt/end-to-end-latency.log",
+ "collect_default": True},
+ }
+
+ def __init__(self, context, num_nodes, kafka, topic, num_records, consumer_fetch_max_wait=100, acks=1):
+ super(EndToEndLatencyService, self).__init__(context, num_nodes)
+ self.kafka = kafka
+ self.args = {
+ 'topic': topic,
+ 'num_records': num_records,
+ 'consumer_fetch_max_wait': consumer_fetch_max_wait,
+ 'acks': acks
+ }
+
+ def _worker(self, idx, node):
+ args = self.args.copy()
+ args.update({
+ 'zk_connect': self.kafka.zk.connect_setting(),
+ 'bootstrap_servers': self.kafka.bootstrap_servers(),
+ })
+
+ cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.EndToEndLatency "\
+ "%(bootstrap_servers)s %(topic)s %(num_records)d "\
+ "%(acks)d 20" % args
+
+ cmd += " | tee /mnt/end-to-end-latency.log"
+
+ self.logger.debug("End-to-end latency %d command: %s", idx, cmd)
+ results = {}
+ for line in node.account.ssh_capture(cmd):
+ if line.startswith("Avg latency:"):
+ results['latency_avg_ms'] = float(line.split()[2])
+ if line.startswith("Percentiles"):
+ results['latency_50th_ms'] = float(line.split()[3][:-1])
+ results['latency_99th_ms'] = float(line.split()[6][:-1])
+ results['latency_999th_ms'] = float(line.split()[9])
+ self.results[idx-1] = results
diff --git a/tests/kafkatest/services/performance/performance.py b/tests/kafkatest/services/performance/performance.py
new file mode 100644
index 00000000000..6d286f60fad
--- /dev/null
+++ b/tests/kafkatest/services/performance/performance.py
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.services.background_thread import BackgroundThreadService
+
+
+class PerformanceService(BackgroundThreadService):
+
+ def __init__(self, context, num_nodes):
+ super(PerformanceService, self).__init__(context, num_nodes)
+ self.results = [None] * self.num_nodes
+ self.stats = [[] for x in range(self.num_nodes)]
+
+ def clean_node(self, node):
+ node.account.kill_process("java", clean_shutdown=False, allow_fail=True)
+ node.account.ssh("rm -rf /mnt/*", allow_fail=False)
+
diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py
new file mode 100644
index 00000000000..c46a910f293
--- /dev/null
+++ b/tests/kafkatest/services/performance/producer_performance.py
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.services.performance import PerformanceService
+
+
+class ProducerPerformanceService(PerformanceService):
+
+ logs = {
+ "producer_performance_log": {
+ "path": "/mnt/producer-performance.log",
+ "collect_default": True},
+ }
+
+ def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False):
+ super(ProducerPerformanceService, self).__init__(context, num_nodes)
+ self.kafka = kafka
+ self.args = {
+ 'topic': topic,
+ 'num_records': num_records,
+ 'record_size': record_size,
+ 'throughput': throughput
+ }
+ self.settings = settings
+ self.intermediate_stats = intermediate_stats
+
+ def _worker(self, idx, node):
+ args = self.args.copy()
+ args.update({'bootstrap_servers': self.kafka.bootstrap_servers()})
+ cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance "\
+ "%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s"\
+ " | tee /mnt/producer-performance.log" % args
+
+ for key, value in self.settings.items():
+ cmd += " %s=%s" % (str(key), str(value))
+ self.logger.debug("Producer performance %d command: %s", idx, cmd)
+
+ def parse_stats(line):
+ parts = line.split(',')
+ return {
+ 'records': int(parts[0].split()[0]),
+ 'records_per_sec': float(parts[1].split()[0]),
+ 'mbps': float(parts[1].split('(')[1].split()[0]),
+ 'latency_avg_ms': float(parts[2].split()[0]),
+ 'latency_max_ms': float(parts[3].split()[0]),
+ 'latency_50th_ms': float(parts[4].split()[0]),
+ 'latency_95th_ms': float(parts[5].split()[0]),
+ 'latency_99th_ms': float(parts[6].split()[0]),
+ 'latency_999th_ms': float(parts[7].split()[0]),
+ }
+ last = None
+ for line in node.account.ssh_capture(cmd):
+ if self.intermediate_stats:
+ try:
+ self.stats[idx-1].append(parse_stats(line))
+ except:
+ # Sometimes there are extraneous log messages
+ pass
+
+ last = line
+ try:
+ self.results[idx-1] = parse_stats(last)
+ except:
+ raise Exception("Unable to parse aggregate performance statistics on node %d: %s" % (idx, last))
diff --git a/tests/kafkatest/services/performance/templates/tools_log4j.properties b/tests/kafkatest/services/performance/templates/tools_log4j.properties
new file mode 100644
index 00000000000..ce30d527abc
--- /dev/null
+++ b/tests/kafkatest/services/performance/templates/tools_log4j.properties
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Define the root logger with appender file
+log4j.rootLogger = {{ log_level|default("INFO") }}, FILE
+
+log4j.appender.FILE=org.apache.log4j.FileAppender
+log4j.appender.FILE.File={{ log_file }}
+log4j.appender.FILE.ImmediateFlush=true
+log4j.appender.FILE.Threshold=debug
+# Set the append to false, overwrite
+log4j.appender.FILE.Append=false
+log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.FILE.layout.conversionPattern=[%d] %p %m (%c)%n
\ No newline at end of file
diff --git a/tests/kafkatest/services/templates/tools_log4j.properties b/tests/kafkatest/services/templates/tools_log4j.properties
index e63e6d6004e..ce30d527abc 100644
--- a/tests/kafkatest/services/templates/tools_log4j.properties
+++ b/tests/kafkatest/services/templates/tools_log4j.properties
@@ -14,7 +14,7 @@
# limitations under the License.
# Define the root logger with appender file
-log4j.rootLogger = INFO, FILE
+log4j.rootLogger = {{ log_level|default("INFO") }}, FILE
log4j.appender.FILE=org.apache.log4j.FileAppender
log4j.appender.FILE.File={{ log_file }}
diff --git a/tests/kafkatest/tests/benchmark_test.py b/tests/kafkatest/tests/benchmark_test.py
index b01f27bc20c..02503ec44b0 100644
--- a/tests/kafkatest/tests/benchmark_test.py
+++ b/tests/kafkatest/tests/benchmark_test.py
@@ -14,136 +14,90 @@
# limitations under the License.
from ducktape.services.service import Service
+from ducktape.mark import parametrize
+from ducktape.mark import matrix
from kafkatest.tests.kafka_test import KafkaTest
-from kafkatest.services.performance import ProducerPerformanceService, ConsumerPerformanceService, EndToEndLatencyService
+from kafkatest.services.performance import ProducerPerformanceService, EndToEndLatencyService, ConsumerPerformanceService
+
+
+TOPIC_REP_ONE = "topic-replication-factor-one"
+TOPIC_REP_THREE = "topic-replication-factor-three"
+DEFAULT_RECORD_SIZE = 100 # bytes
class Benchmark(KafkaTest):
- '''A benchmark of Kafka producer/consumer performance. This replicates the test
+ """A benchmark of Kafka producer/consumer performance. This replicates the test
run here:
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
- '''
+ """
def __init__(self, test_context):
super(Benchmark, self).__init__(test_context, num_zk=1, num_brokers=3, topics={
- 'test-rep-one' : { 'partitions': 6, 'replication-factor': 1 },
- 'test-rep-three' : { 'partitions': 6, 'replication-factor': 3 }
+ TOPIC_REP_ONE: {'partitions': 6, 'replication-factor': 1},
+ TOPIC_REP_THREE: {'partitions': 6, 'replication-factor': 3}
})
- if True:
- # Works on both aws and local
- self.msgs = 1000000
- self.msgs_default = 1000000
- else:
- # Can use locally on Vagrant VMs, but may use too much memory for aws
- self.msgs = 50000000
- self.msgs_default = 50000000
-
self.msgs_large = 10000000
- self.msg_size_default = 100
self.batch_size = 8*1024
self.buffer_memory = 64*1024*1024
self.msg_sizes = [10, 100, 1000, 10000, 100000]
self.target_data_size = 128*1024*1024
self.target_data_size_gb = self.target_data_size/float(1024*1024*1024)
- def test_single_producer_no_replication(self):
- self.logger.info("BENCHMARK: Single producer, no replication")
- self.perf = ProducerPerformanceService(
+ @parametrize(acks=1, topic=TOPIC_REP_ONE, num_producers=1, message_size=DEFAULT_RECORD_SIZE)
+ @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=1, message_size=DEFAULT_RECORD_SIZE)
+ @parametrize(acks=-1, topic=TOPIC_REP_THREE, num_producers=1, message_size=DEFAULT_RECORD_SIZE)
+ @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3, message_size=DEFAULT_RECORD_SIZE)
+ @matrix(acks=[1], topic=[TOPIC_REP_THREE], num_producers=[1], message_size=[10, 100, 1000, 10000, 100000])
+ def test_producer_throughput(self, acks, topic, num_producers, message_size):
+ """
+ Setup: 1 node zk + 3 node kafka cluster
+ Produce ~128MB worth of messages to a topic with 6 partitions. Required acks, topic replication factor,
+ and message size are varied depending on arguments injected into this test.
+
+ Collect and return aggregate throughput statistics after all messages have been acknowledged.
+ (This runs ProducerPerformance.java under the hood)
+ """
+ # Always generate the same total amount of data
+ nrecords = int(self.target_data_size / message_size)
+
+ self.producer = ProducerPerformanceService(
+ self.test_context, num_producers, self.kafka, topic=topic,
+ num_records=nrecords, record_size=message_size, throughput=-1,
+ settings={
+ 'acks': acks,
+ 'batch.size': self.batch_size,
+ 'buffer.memory': self.buffer_memory})
+ self.producer.run()
+ return compute_aggregate_throughput(self.producer)
+
+ def test_long_term_producer_throughput(self):
+ """
+ Setup: 1 node zk + 3 node kafka cluster
+ Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1.
+
+ Collect and return aggregate throughput statistics after all messages have been acknowledged.
+
+ (This runs ProducerPerformance.java under the hood)
+ """
+ self.producer = ProducerPerformanceService(
self.test_context, 1, self.kafka,
- topic="test-rep-one", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
- settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
- )
- self.perf.run()
- data = compute_throughput(self.perf)
- self.logger.info("Single producer, no replication: %s", str(data))
- return data
-
- def test_single_producer_replication(self):
- self.logger.info("BENCHMARK: Single producer, async 3x replication")
- self.perf = ProducerPerformanceService(
- self.test_context, 1, self.kafka,
- topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
- settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
- )
- self.perf.run()
- data = compute_throughput(self.perf)
- self.logger.info("Single producer, async 3x replication: %s" % str(data))
- return data
-
- def test_single_producer_sync(self):
- self.logger.info("BENCHMARK: Single producer, sync 3x replication")
- self.perf = ProducerPerformanceService(
- self.test_context, 1, self.kafka,
- topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
- settings={'acks':-1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
- )
- self.perf.run()
-
- data = compute_throughput(self.perf)
- self.logger.info("Single producer, sync 3x replication: %s" % data)
- return data
-
- def test_three_producers_async(self):
- self.logger.info("BENCHMARK: Three producers, async 3x replication")
- self.perf = ProducerPerformanceService(
- self.test_context, 3, self.kafka,
- topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
- settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
- )
- self.perf.run()
-
- data = compute_throughput(self.perf)
- self.logger.info("Three producers, async 3x replication: %s" % data)
- return data
-
- def test_multiple_message_size(self):
- # TODO this would be a great place to use parametrization
- self.perfs = {}
- for msg_size in self.msg_sizes:
- self.logger.info("BENCHMARK: Message size %d (%f GB total, single producer, async 3x replication)", msg_size, self.target_data_size_gb)
- # Always generate the same total amount of data
- nrecords = int(self.target_data_size / msg_size)
- self.perfs["perf-" + str(msg_size)] = ProducerPerformanceService(
- self.test_context, 1, self.kafka,
- topic="test-rep-three", num_records=nrecords, record_size=msg_size, throughput=-1,
- settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
- )
-
- self.msg_size_perf = {}
- for msg_size in self.msg_sizes:
- perf = self.perfs["perf-" + str(msg_size)]
- perf.run()
- self.msg_size_perf[msg_size] = perf
-
- summary = ["Message size:"]
- data = {}
- for msg_size in self.msg_sizes:
- datum = compute_throughput(self.msg_size_perf[msg_size])
- summary.append(" %d: %s" % (msg_size, datum))
- data[msg_size] = datum
- self.logger.info("\n".join(summary))
- return data
-
- def test_long_term_throughput(self):
- self.logger.info("BENCHMARK: Long production")
- self.perf = ProducerPerformanceService(
- self.test_context, 1, self.kafka,
- topic="test-rep-three", num_records=self.msgs_large, record_size=self.msg_size_default, throughput=-1,
- settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory},
+ topic=TOPIC_REP_THREE, num_records=self.msgs_large, record_size=DEFAULT_RECORD_SIZE,
+ throughput=-1, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory},
intermediate_stats=True
)
- self.perf.run()
+ self.producer.run()
summary = ["Throughput over long run, data > memory:"]
data = {}
# FIXME we should be generating a graph too
# Try to break it into 5 blocks, but fall back to a smaller number if
# there aren't even 5 elements
- block_size = max(len(self.perf.stats[0]) / 5, 1)
- nblocks = len(self.perf.stats[0]) / block_size
+ block_size = max(len(self.producer.stats[0]) / 5, 1)
+ nblocks = len(self.producer.stats[0]) / block_size
+
for i in range(nblocks):
- subset = self.perf.stats[0][i*block_size:min((i+1)*block_size, len(self.perf.stats[0]))]
+ subset = self.producer.stats[0][i*block_size:min((i+1)*block_size, len(self.producer.stats[0]))]
if len(subset) == 0:
summary.append(" Time block %d: (empty)" % i)
data[i] = None
@@ -158,35 +112,48 @@ class Benchmark(KafkaTest):
return data
def test_end_to_end_latency(self):
+ """
+ Setup: 1 node zk + 3 node kafka cluster
+ Produce (acks = 1) and consume 10e3 messages to a topic with 6 partitions and replication-factor 3,
+ measuring the latency between production and consumption of each message.
+
+ Return aggregate latency statistics.
+
+ (Under the hood, this simply runs EndToEndLatency.scala)
+ """
self.logger.info("BENCHMARK: End to end latency")
self.perf = EndToEndLatencyService(
self.test_context, 1, self.kafka,
- topic="test-rep-three", num_records=10000
+ topic=TOPIC_REP_THREE, num_records=10000
)
self.perf.run()
+ return latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms'])
- data = latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms'])
- self.logger.info("End-to-end latency: %s" % str(data))
- return data
+ @matrix(new_consumer=[True, False])
+ def test_producer_and_consumer(self, new_consumer=False):
+ """
+ Setup: 1 node zk + 3 node kafka cluster
+ Concurrently produce and consume 10e6 messages with a single producer and a single consumer,
+ using new consumer if new_consumer == True
+
+ Return aggregate throughput statistics for both producer and consumer.
+
+ (Under the hood, this runs ProducerPerformance.java, and ConsumerPerformance.scala)
+ """
+ num_records = 10 * 1000 * 1000 # 10e6
- def test_producer_and_consumer(self):
- self.logger.info("BENCHMARK: Producer + Consumer")
self.producer = ProducerPerformanceService(
self.test_context, 1, self.kafka,
- topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
- settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
+ topic=TOPIC_REP_THREE, num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1,
+ settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
)
-
self.consumer = ConsumerPerformanceService(
- self.test_context, 1, self.kafka,
- topic="test-rep-three", num_records=self.msgs_default, throughput=-1, threads=1
- )
-
+ self.test_context, 1, self.kafka, topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records)
Service.run_parallel(self.producer, self.consumer)
data = {
- "producer": compute_throughput(self.producer),
- "consumer": compute_throughput(self.consumer)
+ "producer": compute_aggregate_throughput(self.producer),
+ "consumer": compute_aggregate_throughput(self.consumer)
}
summary = [
"Producer + consumer:",
@@ -194,49 +161,29 @@ class Benchmark(KafkaTest):
self.logger.info("\n".join(summary))
return data
- def test_single_consumer(self):
- topic = "test-rep-three"
+ @matrix(new_consumer=[True, False], num_consumers=[1])
+ def test_consumer_throughput(self, new_consumer, num_consumers):
+ """
+ Consume 10e6 100-byte messages with 1 or more consumers from a topic with 6 partitions
+ (using new consumer iff new_consumer == True), and report throughput.
+ """
+ num_records = 10 * 1000 * 1000 # 10e6
+ # seed kafka w/messages
self.producer = ProducerPerformanceService(
self.test_context, 1, self.kafka,
- topic=topic, num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
- settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
+ topic=TOPIC_REP_THREE, num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1,
+ settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
)
self.producer.run()
- # All consumer tests use the messages from the first benchmark, so
- # they'll get messages of the default message size
- self.logger.info("BENCHMARK: Single consumer")
- self.perf = ConsumerPerformanceService(
- self.test_context, 1, self.kafka,
- topic=topic, num_records=self.msgs_default, throughput=-1, threads=1
- )
- self.perf.run()
-
- data = compute_throughput(self.perf)
- self.logger.info("Single consumer: %s" % data)
- return data
-
- def test_three_consumers(self):
- topic = "test-rep-three"
-
- self.producer = ProducerPerformanceService(
- self.test_context, 1, self.kafka,
- topic=topic, num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
- settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
- )
- self.producer.run()
-
- self.logger.info("BENCHMARK: Three consumers")
- self.perf = ConsumerPerformanceService(
- self.test_context, 3, self.kafka,
- topic="test-rep-three", num_records=self.msgs_default, throughput=-1, threads=1
- )
- self.perf.run()
-
- data = compute_throughput(self.perf)
- self.logger.info("Three consumers: %s", data)
- return data
+ # consume
+ self.consumer = ConsumerPerformanceService(
+ self.test_context, num_consumers, self.kafka,
+ topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records)
+ self.consumer.group = "test-consumer-group"
+ self.consumer.run()
+ return compute_aggregate_throughput(self.consumer)
def throughput(records_per_sec, mb_per_sec):
@@ -256,19 +203,9 @@ def latency(latency_50th_ms, latency_99th_ms, latency_999th_ms):
}
-def compute_throughput(perf):
+def compute_aggregate_throughput(perf):
"""Helper method for computing throughput after running a performance service."""
aggregate_rate = sum([r['records_per_sec'] for r in perf.results])
aggregate_mbps = sum([r['mbps'] for r in perf.results])
return throughput(aggregate_rate, aggregate_mbps)
-
-
-
-
-
-
-
-
-
-
diff --git a/tools/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
index fd31c1a9fb6..1a9cf04cdcc 100644
--- a/tools/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
@@ -148,9 +148,9 @@ public class ProducerPerformance {
}
public void printTotal() {
- long ellapsed = System.currentTimeMillis() - start;
- double recsPerSec = 1000.0 * count / (double) ellapsed;
- double mbPerSec = 1000.0 * this.bytes / (double) ellapsed / (1024.0 * 1024.0);
+ long elapsed = System.currentTimeMillis() - start;
+ double recsPerSec = 1000.0 * count / (double) elapsed;
+ double mbPerSec = 1000.0 * this.bytes / (double) elapsed / (1024.0 * 1024.0);
int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999);
System.out.printf("%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.\n",
count,