KAFKA-2489: add benchmark for new consumer

ewencp
The changes here are smaller than they look - mostly refactoring/cleanup.

- ConsumerPerformanceService: added new_consumer flag, and exposed more command-line settings
- benchmark.py: refactored to use `parametrize` and `matrix` - this reduced some amount of repeated code
- benchmark.py: added consumer performance tests with new consumer (using `parametrize`)
- benchmark.py: added more detailed test descriptions
- performance.py: broke into separate files

Author: Geoff Anderson <geoff@confluent.io>

Reviewers: Ewen Cheslack-Postava, Jason Gustafson, Gwen Shapira

Closes #179 from granders/KAFKA-2489-benchmark-new-consumer
This commit is contained in:
Geoff Anderson 2015-09-08 17:59:49 -07:00 committed by Gwen Shapira
parent f8902981c6
commit b8b1bca440
12 changed files with 498 additions and 342 deletions

View File

@ -155,7 +155,7 @@ import java.util.concurrent.atomic.AtomicReference;
* called <i>test</i> as described above.
* <p>
* The broker will automatically detect failed processes in the <i>test</i> group by using a heartbeat mechanism. The
* consumer will automatically ping the cluster periodically, which let's the cluster know that it is alive. As long as
* consumer will automatically ping the cluster periodically, which lets the cluster know that it is alive. As long as
* the consumer is able to do this it is considered alive and retains the right to consume from the partitions assigned
* to it. If it stops heartbeating for a period of time longer than <code>session.timeout.ms</code> then it will be
* considered dead and its partitions will be assigned to another process.

View File

@ -17,11 +17,15 @@
package kafka.tools
import java.util
import org.apache.kafka.common.TopicPartition
import scala.collection.JavaConversions._
import java.util.concurrent.atomic.AtomicLong
import java.nio.channels.ClosedByInterruptException
import org.apache.log4j.Logger
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer}
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import kafka.utils.CommandLineUtils
import java.util.{ Random, Properties }
@ -58,7 +62,7 @@ object ConsumerPerformance {
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config.props)
consumer.subscribe(List(config.topic))
startMs = System.currentTimeMillis
consume(consumer, config.numMessages, 1000, config, totalMessagesRead, totalBytesRead)
consume(consumer, List(config.topic), config.numMessages, 1000, config, totalMessagesRead, totalBytesRead)
endMs = System.currentTimeMillis
consumer.close()
} else {
@ -93,18 +97,40 @@ object ConsumerPerformance {
}
}
def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], count: Long, timeout: Long, config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) {
def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], topics: List[String], count: Long, timeout: Long, config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) {
var bytesRead = 0L
var messagesRead = 0L
val startMs = System.currentTimeMillis
var lastReportTime: Long = startMs
var lastBytesRead = 0L
var lastMessagesRead = 0L
var lastConsumed = System.currentTimeMillis
while(messagesRead < count && lastConsumed >= System.currentTimeMillis - timeout) {
// Wait for group join, metadata fetch, etc
val joinTimeout = 10000
val isAssigned = new AtomicBoolean(false)
consumer.subscribe(topics, new ConsumerRebalanceListener {
def onPartitionsAssigned(consumer: org.apache.kafka.clients.consumer.Consumer[_, _], partitions: util.Collection[TopicPartition]) {
isAssigned.set(true)
}
def onPartitionsRevoked(consumer: org.apache.kafka.clients.consumer.Consumer[_, _], partitions: util.Collection[TopicPartition]) {
isAssigned.set(false)
}})
val joinStart = System.currentTimeMillis()
while (!isAssigned.get()) {
if (System.currentTimeMillis() - joinStart >= joinTimeout) {
throw new Exception("Timed out waiting for initial group join.")
}
consumer.poll(100)
}
consumer.seekToBeginning()
// Now start the benchmark
val startMs = System.currentTimeMillis
var lastReportTime: Long = startMs
var lastConsumedTime = System.currentTimeMillis
while(messagesRead < count && System.currentTimeMillis() - lastConsumedTime <= timeout) {
val records = consumer.poll(100)
if(records.count() > 0)
lastConsumed = System.currentTimeMillis
lastConsumedTime = System.currentTimeMillis
for(record <- records) {
messagesRead += 1
if(record.key != null)
@ -121,6 +147,7 @@ object ConsumerPerformance {
}
}
}
totalMessagesRead.set(messagesRead)
totalBytesRead.set(bytesRead)
}
@ -255,5 +282,4 @@ object ConsumerPerformance {
}
}
}

View File

@ -1,163 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.services.background_thread import BackgroundThreadService
class PerformanceService(BackgroundThreadService):
def __init__(self, context, num_nodes):
super(PerformanceService, self).__init__(context, num_nodes)
self.results = [None] * self.num_nodes
self.stats = [[] for x in range(self.num_nodes)]
class ProducerPerformanceService(PerformanceService):
def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False):
super(ProducerPerformanceService, self).__init__(context, num_nodes)
self.kafka = kafka
self.args = {
'topic': topic,
'num_records': num_records,
'record_size': record_size,
'throughput': throughput
}
self.settings = settings
self.intermediate_stats = intermediate_stats
def _worker(self, idx, node):
args = self.args.copy()
args.update({'bootstrap_servers': self.kafka.bootstrap_servers()})
cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance "\
"%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s" % args
for key,value in self.settings.items():
cmd += " %s=%s" % (str(key), str(value))
self.logger.debug("Producer performance %d command: %s", idx, cmd)
def parse_stats(line):
parts = line.split(',')
return {
'records': int(parts[0].split()[0]),
'records_per_sec': float(parts[1].split()[0]),
'mbps': float(parts[1].split('(')[1].split()[0]),
'latency_avg_ms': float(parts[2].split()[0]),
'latency_max_ms': float(parts[3].split()[0]),
'latency_50th_ms': float(parts[4].split()[0]),
'latency_95th_ms': float(parts[5].split()[0]),
'latency_99th_ms': float(parts[6].split()[0]),
'latency_999th_ms': float(parts[7].split()[0]),
}
last = None
for line in node.account.ssh_capture(cmd):
self.logger.debug("Producer performance %d: %s", idx, line.strip())
if self.intermediate_stats:
try:
self.stats[idx-1].append(parse_stats(line))
except:
# Sometimes there are extraneous log messages
pass
last = line
try:
self.results[idx-1] = parse_stats(last)
except:
self.logger.error("Bad last line: %s", last)
class ConsumerPerformanceService(PerformanceService):
def __init__(self, context, num_nodes, kafka, topic, num_records, throughput, threads=1, settings={}):
super(ConsumerPerformanceService, self).__init__(context, num_nodes)
self.kafka = kafka
self.args = {
'topic': topic,
'num_records': num_records,
'throughput': throughput,
'threads': threads,
}
self.settings = settings
def _worker(self, idx, node):
args = self.args.copy()
args.update({'zk_connect': self.kafka.zk.connect_setting()})
cmd = "/opt/kafka/bin/kafka-consumer-perf-test.sh "\
"--topic %(topic)s --messages %(num_records)d --zookeeper %(zk_connect)s" % args
for key,value in self.settings.items():
cmd += " %s=%s" % (str(key), str(value))
self.logger.debug("Consumer performance %d command: %s", idx, cmd)
last = None
for line in node.account.ssh_capture(cmd):
self.logger.debug("Consumer performance %d: %s", idx, line.strip())
last = line
# Parse and save the last line's information
parts = last.split(',')
self.results[idx-1] = {
'total_mb': float(parts[2]),
'mbps': float(parts[3]),
'records_per_sec': float(parts[5]),
}
class EndToEndLatencyService(PerformanceService):
def __init__(self, context, num_nodes, kafka, topic, num_records, consumer_fetch_max_wait=100, acks=1):
super(EndToEndLatencyService, self).__init__(context, num_nodes)
self.kafka = kafka
self.args = {
'topic': topic,
'num_records': num_records,
'consumer_fetch_max_wait': consumer_fetch_max_wait,
'acks': acks
}
def _worker(self, idx, node):
args = self.args.copy()
args.update({
'zk_connect': self.kafka.zk.connect_setting(),
'bootstrap_servers': self.kafka.bootstrap_servers(),
})
cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.EndToEndLatency "\
"%(bootstrap_servers)s %(topic)s %(num_records)d "\
"%(acks)d 20" % args
self.logger.debug("End-to-end latency %d command: %s", idx, cmd)
results = {}
for line in node.account.ssh_capture(cmd):
self.logger.debug("End-to-end latency %d: %s", idx, line.strip())
if line.startswith("Avg latency:"):
results['latency_avg_ms'] = float(line.split()[2])
if line.startswith("Percentiles"):
results['latency_50th_ms'] = float(line.split()[3][:-1])
results['latency_99th_ms'] = float(line.split()[6][:-1])
results['latency_999th_ms'] = float(line.split()[9])
self.results[idx-1] = results
def parse_performance_output(summary):
parts = summary.split(',')
results = {
'records': int(parts[0].split()[0]),
'records_per_sec': float(parts[1].split()[0]),
'mbps': float(parts[1].split('(')[1].split()[0]),
'latency_avg_ms': float(parts[2].split()[0]),
'latency_max_ms': float(parts[3].split()[0]),
'latency_50th_ms': float(parts[4].split()[0]),
'latency_95th_ms': float(parts[5].split()[0]),
'latency_99th_ms': float(parts[6].split()[0]),
'latency_999th_ms': float(parts[7].split()[0]),
}
# To provide compatibility with ConsumerPerformanceService
results['total_mb'] = results['mbps'] * (results['records'] / results['records_per_sec'])
results['rate_mbps'] = results['mbps']
results['rate_mps'] = results['records_per_sec']
return results

View File

@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from performance import PerformanceService
from end_to_end_latency import EndToEndLatencyService
from producer_performance import ProducerPerformanceService
from consumer_performance import ConsumerPerformanceService

View File

@ -0,0 +1,147 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kafkatest.services.performance import PerformanceService
import os
class ConsumerPerformanceService(PerformanceService):
"""
See ConsumerPerformance.scala as the source of truth on these settings, but for reference:
"zookeeper" "The connection string for the zookeeper connection in the form host:port. Multiple URLS can
be given to allow fail-over. This option is only used with the old consumer."
"broker-list", "A broker list to use for connecting if using the new consumer."
"topic", "REQUIRED: The topic to consume from."
"group", "The group id to consume on."
"fetch-size", "The amount of data to fetch in a single request."
"from-latest", "If the consumer does not already have an establishedoffset to consume from,
start with the latest message present in the log rather than the earliest message."
"socket-buffer-size", "The size of the tcp RECV size."
"threads", "Number of processing threads."
"num-fetch-threads", "Number of fetcher threads. Defaults to 1"
"new-consumer", "Use the new consumer implementation."
"""
# Root directory for persistent output
PERSISTENT_ROOT = "/mnt/consumer_performance"
LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "consumer_performance.stdout")
LOG_FILE = os.path.join(LOG_DIR, "consumer_performance.log")
LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
logs = {
"consumer_performance_output": {
"path": STDOUT_CAPTURE,
"collect_default": True},
"consumer_performance_log": {
"path": LOG_FILE,
"collect_default": True}
}
def __init__(self, context, num_nodes, kafka, topic, messages, new_consumer=False, settings={}):
super(ConsumerPerformanceService, self).__init__(context, num_nodes)
self.kafka = kafka
self.topic = topic
self.messages = messages
self.new_consumer = new_consumer
self.settings = settings
# These less-frequently used settings can be updated manually after instantiation
self.fetch_size = None
self.socket_buffer_size = None
self.threads = None
self.num_fetch_threads = None
self.group = None
self.from_latest = None
@property
def args(self):
"""Dictionary of arguments used to start the Consumer Performance script."""
args = {
'topic': self.topic,
'messages': self.messages,
}
if self.new_consumer:
args['new-consumer'] = ""
args['broker-list'] = self.kafka.bootstrap_servers()
else:
args['zookeeper'] = self.kafka.zk.connect_setting()
if self.fetch_size is not None:
args['fetch-size'] = self.fetch_size
if self.socket_buffer_size is not None:
args['socket-buffer-size'] = self.socket_buffer_size
if self.threads is not None:
args['threads'] = self.threads
if self.num_fetch_threads is not None:
args['num-fetch-threads'] = self.num_fetch_threads
if self.group is not None:
args['group'] = self.group
if self.from_latest:
args['from-latest'] = ""
return args
@property
def start_cmd(self):
cmd = "export LOG_DIR=%s;" % ConsumerPerformanceService.LOG_DIR
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsumerPerformanceService.LOG4J_CONFIG
cmd += " /opt/kafka/bin/kafka-consumer-perf-test.sh"
for key, value in self.args.items():
cmd += " --%s %s" % (key, value)
for key, value in self.settings.items():
cmd += " %s=%s" % (str(key), str(value))
cmd += " | tee %s" % ConsumerPerformanceService.STDOUT_CAPTURE
return cmd
def _worker(self, idx, node):
node.account.ssh("mkdir -p %s" % ConsumerPerformanceService.PERSISTENT_ROOT, allow_fail=False)
log_config = self.render('tools_log4j.properties', log_file=ConsumerPerformanceService.LOG_FILE)
node.account.create_file(ConsumerPerformanceService.LOG4J_CONFIG, log_config)
cmd = self.start_cmd
self.logger.debug("Consumer performance %d command: %s", idx, cmd)
last = None
for line in node.account.ssh_capture(cmd):
last = line
# Parse and save the last line's information
parts = last.split(',')
self.results[idx-1] = {
'total_mb': float(parts[2]),
'mbps': float(parts[3]),
'records_per_sec': float(parts[5]),
}

View File

@ -0,0 +1,59 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kafkatest.services.performance import PerformanceService
class EndToEndLatencyService(PerformanceService):
logs = {
"end_to_end_latency_log": {
"path": "/mnt/end-to-end-latency.log",
"collect_default": True},
}
def __init__(self, context, num_nodes, kafka, topic, num_records, consumer_fetch_max_wait=100, acks=1):
super(EndToEndLatencyService, self).__init__(context, num_nodes)
self.kafka = kafka
self.args = {
'topic': topic,
'num_records': num_records,
'consumer_fetch_max_wait': consumer_fetch_max_wait,
'acks': acks
}
def _worker(self, idx, node):
args = self.args.copy()
args.update({
'zk_connect': self.kafka.zk.connect_setting(),
'bootstrap_servers': self.kafka.bootstrap_servers(),
})
cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.EndToEndLatency "\
"%(bootstrap_servers)s %(topic)s %(num_records)d "\
"%(acks)d 20" % args
cmd += " | tee /mnt/end-to-end-latency.log"
self.logger.debug("End-to-end latency %d command: %s", idx, cmd)
results = {}
for line in node.account.ssh_capture(cmd):
if line.startswith("Avg latency:"):
results['latency_avg_ms'] = float(line.split()[2])
if line.startswith("Percentiles"):
results['latency_50th_ms'] = float(line.split()[3][:-1])
results['latency_99th_ms'] = float(line.split()[6][:-1])
results['latency_999th_ms'] = float(line.split()[9])
self.results[idx-1] = results

View File

@ -0,0 +1,29 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.services.background_thread import BackgroundThreadService
class PerformanceService(BackgroundThreadService):
def __init__(self, context, num_nodes):
super(PerformanceService, self).__init__(context, num_nodes)
self.results = [None] * self.num_nodes
self.stats = [[] for x in range(self.num_nodes)]
def clean_node(self, node):
node.account.kill_process("java", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf /mnt/*", allow_fail=False)

View File

@ -0,0 +1,76 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kafkatest.services.performance import PerformanceService
class ProducerPerformanceService(PerformanceService):
logs = {
"producer_performance_log": {
"path": "/mnt/producer-performance.log",
"collect_default": True},
}
def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False):
super(ProducerPerformanceService, self).__init__(context, num_nodes)
self.kafka = kafka
self.args = {
'topic': topic,
'num_records': num_records,
'record_size': record_size,
'throughput': throughput
}
self.settings = settings
self.intermediate_stats = intermediate_stats
def _worker(self, idx, node):
args = self.args.copy()
args.update({'bootstrap_servers': self.kafka.bootstrap_servers()})
cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance "\
"%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s"\
" | tee /mnt/producer-performance.log" % args
for key, value in self.settings.items():
cmd += " %s=%s" % (str(key), str(value))
self.logger.debug("Producer performance %d command: %s", idx, cmd)
def parse_stats(line):
parts = line.split(',')
return {
'records': int(parts[0].split()[0]),
'records_per_sec': float(parts[1].split()[0]),
'mbps': float(parts[1].split('(')[1].split()[0]),
'latency_avg_ms': float(parts[2].split()[0]),
'latency_max_ms': float(parts[3].split()[0]),
'latency_50th_ms': float(parts[4].split()[0]),
'latency_95th_ms': float(parts[5].split()[0]),
'latency_99th_ms': float(parts[6].split()[0]),
'latency_999th_ms': float(parts[7].split()[0]),
}
last = None
for line in node.account.ssh_capture(cmd):
if self.intermediate_stats:
try:
self.stats[idx-1].append(parse_stats(line))
except:
# Sometimes there are extraneous log messages
pass
last = line
try:
self.results[idx-1] = parse_stats(last)
except:
raise Exception("Unable to parse aggregate performance statistics on node %d: %s" % (idx, last))

View File

@ -0,0 +1,26 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Define the root logger with appender file
log4j.rootLogger = {{ log_level|default("INFO") }}, FILE
log4j.appender.FILE=org.apache.log4j.FileAppender
log4j.appender.FILE.File={{ log_file }}
log4j.appender.FILE.ImmediateFlush=true
log4j.appender.FILE.Threshold=debug
# Set the append to false, overwrite
log4j.appender.FILE.Append=false
log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
log4j.appender.FILE.layout.conversionPattern=[%d] %p %m (%c)%n

View File

@ -14,7 +14,7 @@
# limitations under the License.
# Define the root logger with appender file
log4j.rootLogger = INFO, FILE
log4j.rootLogger = {{ log_level|default("INFO") }}, FILE
log4j.appender.FILE=org.apache.log4j.FileAppender
log4j.appender.FILE.File={{ log_file }}

View File

@ -14,136 +14,90 @@
# limitations under the License.
from ducktape.services.service import Service
from ducktape.mark import parametrize
from ducktape.mark import matrix
from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.services.performance import ProducerPerformanceService, ConsumerPerformanceService, EndToEndLatencyService
from kafkatest.services.performance import ProducerPerformanceService, EndToEndLatencyService, ConsumerPerformanceService
TOPIC_REP_ONE = "topic-replication-factor-one"
TOPIC_REP_THREE = "topic-replication-factor-three"
DEFAULT_RECORD_SIZE = 100 # bytes
class Benchmark(KafkaTest):
'''A benchmark of Kafka producer/consumer performance. This replicates the test
"""A benchmark of Kafka producer/consumer performance. This replicates the test
run here:
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
'''
"""
def __init__(self, test_context):
super(Benchmark, self).__init__(test_context, num_zk=1, num_brokers=3, topics={
'test-rep-one' : { 'partitions': 6, 'replication-factor': 1 },
'test-rep-three' : { 'partitions': 6, 'replication-factor': 3 }
TOPIC_REP_ONE: {'partitions': 6, 'replication-factor': 1},
TOPIC_REP_THREE: {'partitions': 6, 'replication-factor': 3}
})
if True:
# Works on both aws and local
self.msgs = 1000000
self.msgs_default = 1000000
else:
# Can use locally on Vagrant VMs, but may use too much memory for aws
self.msgs = 50000000
self.msgs_default = 50000000
self.msgs_large = 10000000
self.msg_size_default = 100
self.batch_size = 8*1024
self.buffer_memory = 64*1024*1024
self.msg_sizes = [10, 100, 1000, 10000, 100000]
self.target_data_size = 128*1024*1024
self.target_data_size_gb = self.target_data_size/float(1024*1024*1024)
def test_single_producer_no_replication(self):
self.logger.info("BENCHMARK: Single producer, no replication")
self.perf = ProducerPerformanceService(
self.test_context, 1, self.kafka,
topic="test-rep-one", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
)
self.perf.run()
data = compute_throughput(self.perf)
self.logger.info("Single producer, no replication: %s", str(data))
return data
@parametrize(acks=1, topic=TOPIC_REP_ONE, num_producers=1, message_size=DEFAULT_RECORD_SIZE)
@parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=1, message_size=DEFAULT_RECORD_SIZE)
@parametrize(acks=-1, topic=TOPIC_REP_THREE, num_producers=1, message_size=DEFAULT_RECORD_SIZE)
@parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3, message_size=DEFAULT_RECORD_SIZE)
@matrix(acks=[1], topic=[TOPIC_REP_THREE], num_producers=[1], message_size=[10, 100, 1000, 10000, 100000])
def test_producer_throughput(self, acks, topic, num_producers, message_size):
"""
Setup: 1 node zk + 3 node kafka cluster
Produce ~128MB worth of messages to a topic with 6 partitions. Required acks, topic replication factor,
and message size are varied depending on arguments injected into this test.
def test_single_producer_replication(self):
self.logger.info("BENCHMARK: Single producer, async 3x replication")
self.perf = ProducerPerformanceService(
self.test_context, 1, self.kafka,
topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
)
self.perf.run()
data = compute_throughput(self.perf)
self.logger.info("Single producer, async 3x replication: %s" % str(data))
return data
def test_single_producer_sync(self):
self.logger.info("BENCHMARK: Single producer, sync 3x replication")
self.perf = ProducerPerformanceService(
self.test_context, 1, self.kafka,
topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
settings={'acks':-1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
)
self.perf.run()
data = compute_throughput(self.perf)
self.logger.info("Single producer, sync 3x replication: %s" % data)
return data
def test_three_producers_async(self):
self.logger.info("BENCHMARK: Three producers, async 3x replication")
self.perf = ProducerPerformanceService(
self.test_context, 3, self.kafka,
topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
)
self.perf.run()
data = compute_throughput(self.perf)
self.logger.info("Three producers, async 3x replication: %s" % data)
return data
def test_multiple_message_size(self):
# TODO this would be a great place to use parametrization
self.perfs = {}
for msg_size in self.msg_sizes:
self.logger.info("BENCHMARK: Message size %d (%f GB total, single producer, async 3x replication)", msg_size, self.target_data_size_gb)
Collect and return aggregate throughput statistics after all messages have been acknowledged.
(This runs ProducerPerformance.java under the hood)
"""
# Always generate the same total amount of data
nrecords = int(self.target_data_size / msg_size)
self.perfs["perf-" + str(msg_size)] = ProducerPerformanceService(
nrecords = int(self.target_data_size / message_size)
self.producer = ProducerPerformanceService(
self.test_context, num_producers, self.kafka, topic=topic,
num_records=nrecords, record_size=message_size, throughput=-1,
settings={
'acks': acks,
'batch.size': self.batch_size,
'buffer.memory': self.buffer_memory})
self.producer.run()
return compute_aggregate_throughput(self.producer)
def test_long_term_producer_throughput(self):
"""
Setup: 1 node zk + 3 node kafka cluster
Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1.
Collect and return aggregate throughput statistics after all messages have been acknowledged.
(This runs ProducerPerformance.java under the hood)
"""
self.producer = ProducerPerformanceService(
self.test_context, 1, self.kafka,
topic="test-rep-three", num_records=nrecords, record_size=msg_size, throughput=-1,
settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
)
self.msg_size_perf = {}
for msg_size in self.msg_sizes:
perf = self.perfs["perf-" + str(msg_size)]
perf.run()
self.msg_size_perf[msg_size] = perf
summary = ["Message size:"]
data = {}
for msg_size in self.msg_sizes:
datum = compute_throughput(self.msg_size_perf[msg_size])
summary.append(" %d: %s" % (msg_size, datum))
data[msg_size] = datum
self.logger.info("\n".join(summary))
return data
def test_long_term_throughput(self):
self.logger.info("BENCHMARK: Long production")
self.perf = ProducerPerformanceService(
self.test_context, 1, self.kafka,
topic="test-rep-three", num_records=self.msgs_large, record_size=self.msg_size_default, throughput=-1,
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory},
topic=TOPIC_REP_THREE, num_records=self.msgs_large, record_size=DEFAULT_RECORD_SIZE,
throughput=-1, settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory},
intermediate_stats=True
)
self.perf.run()
self.producer.run()
summary = ["Throughput over long run, data > memory:"]
data = {}
# FIXME we should be generating a graph too
# Try to break it into 5 blocks, but fall back to a smaller number if
# there aren't even 5 elements
block_size = max(len(self.perf.stats[0]) / 5, 1)
nblocks = len(self.perf.stats[0]) / block_size
block_size = max(len(self.producer.stats[0]) / 5, 1)
nblocks = len(self.producer.stats[0]) / block_size
for i in range(nblocks):
subset = self.perf.stats[0][i*block_size:min((i+1)*block_size, len(self.perf.stats[0]))]
subset = self.producer.stats[0][i*block_size:min((i+1)*block_size, len(self.producer.stats[0]))]
if len(subset) == 0:
summary.append(" Time block %d: (empty)" % i)
data[i] = None
@ -158,35 +112,48 @@ class Benchmark(KafkaTest):
return data
def test_end_to_end_latency(self):
"""
Setup: 1 node zk + 3 node kafka cluster
Produce (acks = 1) and consume 10e3 messages to a topic with 6 partitions and replication-factor 3,
measuring the latency between production and consumption of each message.
Return aggregate latency statistics.
(Under the hood, this simply runs EndToEndLatency.scala)
"""
self.logger.info("BENCHMARK: End to end latency")
self.perf = EndToEndLatencyService(
self.test_context, 1, self.kafka,
topic="test-rep-three", num_records=10000
topic=TOPIC_REP_THREE, num_records=10000
)
self.perf.run()
return latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms'])
data = latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms'])
self.logger.info("End-to-end latency: %s" % str(data))
return data
@matrix(new_consumer=[True, False])
def test_producer_and_consumer(self, new_consumer=False):
"""
Setup: 1 node zk + 3 node kafka cluster
Concurrently produce and consume 10e6 messages with a single producer and a single consumer,
using new consumer if new_consumer == True
Return aggregate throughput statistics for both producer and consumer.
(Under the hood, this runs ProducerPerformance.java, and ConsumerPerformance.scala)
"""
num_records = 10 * 1000 * 1000 # 10e6
def test_producer_and_consumer(self):
self.logger.info("BENCHMARK: Producer + Consumer")
self.producer = ProducerPerformanceService(
self.test_context, 1, self.kafka,
topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
topic=TOPIC_REP_THREE, num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1,
settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
)
self.consumer = ConsumerPerformanceService(
self.test_context, 1, self.kafka,
topic="test-rep-three", num_records=self.msgs_default, throughput=-1, threads=1
)
self.test_context, 1, self.kafka, topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records)
Service.run_parallel(self.producer, self.consumer)
data = {
"producer": compute_throughput(self.producer),
"consumer": compute_throughput(self.consumer)
"producer": compute_aggregate_throughput(self.producer),
"consumer": compute_aggregate_throughput(self.consumer)
}
summary = [
"Producer + consumer:",
@ -194,49 +161,29 @@ class Benchmark(KafkaTest):
self.logger.info("\n".join(summary))
return data
def test_single_consumer(self):
topic = "test-rep-three"
@matrix(new_consumer=[True, False], num_consumers=[1])
def test_consumer_throughput(self, new_consumer, num_consumers):
"""
Consume 10e6 100-byte messages with 1 or more consumers from a topic with 6 partitions
(using new consumer iff new_consumer == True), and report throughput.
"""
num_records = 10 * 1000 * 1000 # 10e6
# seed kafka w/messages
self.producer = ProducerPerformanceService(
self.test_context, 1, self.kafka,
topic=topic, num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
topic=TOPIC_REP_THREE, num_records=num_records, record_size=DEFAULT_RECORD_SIZE, throughput=-1,
settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
)
self.producer.run()
# All consumer tests use the messages from the first benchmark, so
# they'll get messages of the default message size
self.logger.info("BENCHMARK: Single consumer")
self.perf = ConsumerPerformanceService(
self.test_context, 1, self.kafka,
topic=topic, num_records=self.msgs_default, throughput=-1, threads=1
)
self.perf.run()
data = compute_throughput(self.perf)
self.logger.info("Single consumer: %s" % data)
return data
def test_three_consumers(self):
topic = "test-rep-three"
self.producer = ProducerPerformanceService(
self.test_context, 1, self.kafka,
topic=topic, num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
)
self.producer.run()
self.logger.info("BENCHMARK: Three consumers")
self.perf = ConsumerPerformanceService(
self.test_context, 3, self.kafka,
topic="test-rep-three", num_records=self.msgs_default, throughput=-1, threads=1
)
self.perf.run()
data = compute_throughput(self.perf)
self.logger.info("Three consumers: %s", data)
return data
# consume
self.consumer = ConsumerPerformanceService(
self.test_context, num_consumers, self.kafka,
topic=TOPIC_REP_THREE, new_consumer=new_consumer, messages=num_records)
self.consumer.group = "test-consumer-group"
self.consumer.run()
return compute_aggregate_throughput(self.consumer)
def throughput(records_per_sec, mb_per_sec):
@ -256,19 +203,9 @@ def latency(latency_50th_ms, latency_99th_ms, latency_999th_ms):
}
def compute_throughput(perf):
def compute_aggregate_throughput(perf):
"""Helper method for computing throughput after running a performance service."""
aggregate_rate = sum([r['records_per_sec'] for r in perf.results])
aggregate_mbps = sum([r['mbps'] for r in perf.results])
return throughput(aggregate_rate, aggregate_mbps)

View File

@ -148,9 +148,9 @@ public class ProducerPerformance {
}
public void printTotal() {
long ellapsed = System.currentTimeMillis() - start;
double recsPerSec = 1000.0 * count / (double) ellapsed;
double mbPerSec = 1000.0 * this.bytes / (double) ellapsed / (1024.0 * 1024.0);
long elapsed = System.currentTimeMillis() - start;
double recsPerSec = 1000.0 * count / (double) elapsed;
double mbPerSec = 1000.0 * this.bytes / (double) elapsed / (1024.0 * 1024.0);
int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999);
System.out.printf("%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.\n",
count,