Moved a bunch of files to kafkatest directory

This commit is contained in:
Geoff Anderson 2015-06-10 18:02:11 -07:00
parent fc7c81c1f6
commit 884b20e3a7
16 changed files with 18 additions and 945 deletions

View File

@ -1,91 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.tools
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer}
import kafka.consumer._
import java.util.Properties
import java.util.Arrays
import scala.Option.option2Iterable
object TestEndToEndLatency {
def main(args: Array[String]) {
if (args.length != 6) {
System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect topic num_messages consumer_fetch_max_wait producer_acks")
System.exit(1)
}
val brokerList = args(0)
val zkConnect = args(1)
val topic = args(2)
val numMessages = args(3).toInt
val consumerFetchMaxWait = args(4).toInt
val producerAcks = args(5).toInt
val consumerProps = new Properties()
consumerProps.put("group.id", topic)
consumerProps.put("auto.commit.enable", "false")
consumerProps.put("auto.offset.reset", "largest")
consumerProps.put("zookeeper.connect", zkConnect)
consumerProps.put("fetch.wait.max.ms", consumerFetchMaxWait.toString)
consumerProps.put("socket.timeout.ms", 1201000.toString)
val config = new ConsumerConfig(consumerProps)
val connector = Consumer.create(config)
val stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).head.head
val iter = stream.iterator
val producerProps = new Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0")
producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString)
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
// make sure the consumer fetcher has started before sending data since otherwise
// the consumption from the tail will skip the first message and hence be blocked
Thread.sleep(5000)
val message = "hello there beautiful".getBytes
var totalTime = 0.0
val latencies = new Array[Long](numMessages)
for (i <- 0 until numMessages) {
val begin = System.nanoTime
producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, message))
val received = iter.next
val elapsed = System.nanoTime - begin
// poor man's progress bar
if (i % 1000 == 0)
println(i + "\t" + elapsed / 1000.0 / 1000.0)
totalTime += elapsed
latencies(i) = (elapsed / 1000 / 1000)
}
println("Avg latency: %.4f ms\n".format(totalTime / numMessages / 1000.0 / 1000.0))
Arrays.sort(latencies)
val p50 = latencies((latencies.length * 0.5).toInt)
val p99 = latencies((latencies.length * 0.99).toInt)
val p999 = latencies((latencies.length * 0.999).toInt)
println("Percentiles: 50th = %d, 99th = %d, 99.9th = %d".format(p50, p99, p999))
producer.close()
connector.commitOffsets(true)
connector.shutdown()
System.exit(0)
}
}

View File

@ -27,16 +27,22 @@ To run the tests:
3. Bring up the cluster, making sure you have enough workers. For Vagrant,
use `vagrant up`. If you want to run on AWS, use `vagrant up
--provider=aws --no-parallel`.
Note that the initial provisioning process can be quite slow since it involves
installing dependencies and updates on every vm.
4. Install ducktape:
$ git clone https://github.com/confluentinc/ducktape
$ cd ducktape
$ pip install ducktape
$ python setup.py install
5. Run the system tests using ducktape, you can view results in the `results`
directory.
$ cd tests
$ ducktape tests
6. To iterate/run again if you made any changes:
$ cd kafka

View File

@ -1 +0,0 @@

View File

@ -119,6 +119,9 @@ class ConsoleConsumer(BackgroundThreadService):
prop_file = self.render('console_consumer.properties', consumer_timeout_ms=self.consumer_timeout_ms)
else:
prop_file = self.render('console_consumer.properties')
self.logger.info("console_consumer.properties:")
self.logger.info(prop_file)
node.account.create_file("/mnt/console_consumer.properties", prop_file)
# Run and capture output

View File

@ -55,6 +55,8 @@ class KafkaService(Service):
def start_node(self, node):
props_file = self.render('kafka.properties', node=node, broker_id=self.idx(node))
self.logger.info("kafka.properties:")
self.logger.info(props_file)
node.account.create_file("/mnt/kafka.properties", props_file)
cmd = "/opt/kafka/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log & echo $! > /mnt/kafka.pid"

View File

@ -101,11 +101,6 @@ class ConsumerPerformanceService(PerformanceService):
# Parse and save the last line's information
parts = last.split(',')
print "=" * 20
print "ConsumerPerformanceService data:"
print parts
print "-" * 20
self.results[idx-1] = {
'total_mb': float(parts[3]),
'mbps': float(parts[4]),

View File

@ -37,7 +37,11 @@ class ZookeeperService(Service):
node.account.ssh("mkdir -p /mnt/zookeeper")
node.account.ssh("echo %d > /mnt/zookeeper/myid" % idx)
node.account.create_file("/mnt/zookeeper.properties", self.render('zookeeper.properties'))
config_file = self.render('zookeeper.properties')
self.logger.info("zookeeper.properties:")
self.logger.info(config_file)
node.account.create_file("/mnt/zookeeper.properties", config_file)
node.account.ssh(
"/opt/kafka/bin/zookeeper-server-start.sh /mnt/zookeeper.properties 1>> %(path)s 2>> %(path)s &"

View File

@ -238,13 +238,9 @@ def latency(latency_50th_ms, latency_99th_ms, latency_999th_ms):
def compute_throughput(perf):
print "=" * 20
print perf.results
print "-" * 20
"""Helper method for computing throughput after running a performance service."""
aggregate_rate = sum([r['records_per_sec'] for r in perf.results])
aggregate_mbps = sum([r['mb_per_sec'] for r in perf.results])
aggregate_mbps = sum([r['mbps'] for r in perf.results])
return throughput(aggregate_rate, aggregate_mbps)

View File

@ -1,212 +0,0 @@
# Copyright 2014 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.services.service import Service
import time, re, json
class KafkaService(Service):
def __init__(self, service_context, zk, topics=None):
"""
:type service_context ducktape.services.service.ServiceContext
:type zk: ZookeeperService
:type topics: dict
"""
super(KafkaService, self).__init__(service_context)
self.zk = zk
self.topics = topics
def start(self):
super(KafkaService, self).start()
# Start all nodes in this Kafka service
for idx, node in enumerate(self.nodes, 1):
self.logger.info("Starting Kafka node %d on %s", idx, node.account.hostname)
self._stop_and_clean(node, allow_fail=True)
self.start_node(node)
# wait for start up
time.sleep(6)
# Create topics if necessary
if self.topics is not None:
for topic, topic_cfg in self.topics.items():
if topic_cfg is None:
topic_cfg = {}
topic_cfg["topic"] = topic
self.create_topic(topic_cfg)
def create_topic(self, topic_cfg):
node = self.nodes[0] # any node is fine here
self.logger.info("Creating topic %s with settings %s", topic_cfg["topic"], topic_cfg)
cmd = "/opt/kafka/bin/kafka-topics.sh --zookeeper %(zk_connect)s --create "\
"--topic %(topic)s --partitions %(partitions)d --replication-factor %(replication)d" % {
'zk_connect': self.zk.connect_setting(),
'topic': topic_cfg.get("topic"),
'partitions': topic_cfg.get('partitions', 1),
'replication': topic_cfg.get('replication-factor', 1)
}
if "configs" in topic_cfg.keys() and topic_cfg["configs"] is not None:
for config_name, config_value in topic_cfg["configs"].items():
cmd += " --config %s=%s" % (config_name, str(config_value))
self.logger.info("Running topic creation command...\n%s" % cmd)
node.account.ssh(cmd)
time.sleep(1)
self.logger.info("Checking to see if topic was properly created...\n%s" % cmd)
for line in self.describe_topic(topic_cfg["topic"]).split("\n"):
self.logger.info(line)
def describe_topic(self, topic):
node = self.nodes[0]
cmd = "/opt/kafka/bin/kafka-topics.sh --zookeeper %s --topic %s --describe" % \
(self.zk.connect_setting(), topic)
output = ""
for line in node.account.ssh_capture(cmd):
output += line
return output
def verify_reassign_partitions(self, reassignment):
"""Run the reassign partitions admin tool in "verify" mode
"""
node = self.nodes[0]
json_file = "/tmp/" + str(time.time()) + "_reassign.json"
# reassignment to json
json_str = json.dumps(reassignment)
json_str = json.dumps(json_str)
# create command
cmd = "echo %s > %s && " % (json_str, json_file)
cmd += "/opt/kafka/bin/kafka-reassign-partitions.sh "\
"--zookeeper %(zk_connect)s "\
"--reassignment-json-file %(reassignment_file)s "\
"--verify" % {'zk_connect': self.zk.connect_setting(),
'reassignment_file': json_file}
cmd += " && sleep 1 && rm -f %s" % json_file
# send command
self.logger.info("Verifying parition reassignment...")
self.logger.debug(cmd)
output = ""
for line in node.account.ssh_capture(cmd):
output += line
self.logger.debug(output)
if re.match(".*is in progress.*", output) is not None:
return False
return True
def execute_reassign_partitions(self, reassignment):
"""Run the reassign partitions admin tool in "verify" mode
"""
node = self.nodes[0]
json_file = "/tmp/" + str(time.time()) + "_reassign.json"
# reassignment to json
json_str = json.dumps(reassignment)
json_str = json.dumps(json_str)
# create command
cmd = "echo %s > %s && " % (json_str, json_file)
cmd += "/opt/kafka/bin/kafka-reassign-partitions.sh "\
"--zookeeper %(zk_connect)s "\
"--reassignment-json-file %(reassignment_file)s "\
"--execute" % {'zk_connect': self.zk.connect_setting(),
'reassignment_file': json_file}
cmd += " && sleep 1 && rm -f %s" % json_file
# send command
self.logger.info("Executing parition reassignment...")
self.logger.debug(cmd)
output = ""
for line in node.account.ssh_capture(cmd):
output += line
self.logger.debug("Verify partition reassignment:")
self.logger.debug(output)
def stop(self):
"""If the service left any running processes or data, clean them up."""
super(KafkaService, self).stop()
for idx, node in enumerate(self.nodes, 1):
self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname))
self._stop_and_clean(node, allow_fail=True)
node.free()
def _stop_and_clean(self, node, allow_fail=False):
node.account.ssh("/opt/kafka/bin/kafka-server-stop.sh", allow_fail=allow_fail)
time.sleep(5) # the stop script doesn't wait
node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log")
def stop_node(self, node, clean_shutdown=True, allow_fail=True):
node.account.kill_process("kafka", clean_shutdown, allow_fail)
def start_node(self, node, config=None):
if config is None:
template = open('templates/kafka.properties').read()
template_params = {
'broker_id': self.idx(node),
'hostname': node.account.hostname,
'zk_connect': self.zk.connect_setting()
}
config = template % template_params
node.account.create_file("/mnt/kafka.properties", config)
cmd = "/opt/kafka/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log &"
self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd))
node.account.ssh(cmd)
def restart_node(self, node, wait_sec=0, clean_shutdown=True):
self.stop_node(node, clean_shutdown, allow_fail=True)
time.sleep(wait_sec)
self.start_node(node)
def get_leader_node(self, topic, partition=0):
""" Get the leader replica for the given topic and partition.
"""
cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.ZooKeeperMainWrapper -server %s " \
% self.zk.connect_setting()
cmd += "get /brokers/topics/%s/partitions/%d/state" % (topic, partition)
self.logger.debug(cmd)
node = self.nodes[0]
self.logger.debug("Querying zookeeper to find leader replica for topic %s: \n%s" % (cmd, topic))
partition_state = None
for line in node.account.ssh_capture(cmd):
match = re.match("^({.+})$", line)
if match is not None:
partition_state = match.groups()[0]
break
if partition_state is None:
raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition))
partition_state = json.loads(partition_state)
self.logger.info(partition_state)
leader_idx = int(partition_state["leader"])
self.logger.info("Leader for topic %s and partition %d is now: %d" % (topic, partition, leader_idx))
return self.get_node(leader_idx)
def bootstrap_servers(self):
return ','.join([node.account.hostname + ":9092" for node in self.nodes])

View File

@ -1,189 +0,0 @@
# Copyright 2014 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.services.service import Service
import threading
class PerformanceService(Service):
def __init__(self, service_context):
super(PerformanceService, self).__init__(service_context)
def start(self):
super(PerformanceService, self).start()
self.worker_threads = []
self.results = [None] * len(self.nodes)
self.stats = [[] for x in range(len(self.nodes))]
for idx,node in enumerate(self.nodes,1):
self.logger.info("Running %s node %d on %s", self.__class__.__name__, idx, node.account.hostname)
worker = threading.Thread(
name=self.__class__.__name__ + "-worker-" + str(idx),
target=self._worker,
args=(idx,node)
)
worker.daemon = True
worker.start()
self.worker_threads.append(worker)
def wait(self):
super(PerformanceService, self).wait()
for idx,worker in enumerate(self.worker_threads,1):
self.logger.debug("Waiting for %s worker %d to finish", self.__class__.__name__, idx)
worker.join()
self.worker_threads = None
def stop(self):
super(PerformanceService, self).stop()
assert self.worker_threads is None, "%s.stop should only be called after wait" % self.__class__.__name__
for idx,node in enumerate(self.nodes,1):
self.logger.debug("Stopping %s node %d on %s", self.__class__.__name__, idx, node.account.hostname)
node.free()
class ProducerPerformanceService(PerformanceService):
def __init__(self, service_context, kafka, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False):
super(ProducerPerformanceService, self).__init__(service_context)
self.kafka = kafka
self.args = {
'topic': topic,
'num_records': num_records,
'record_size': record_size,
'throughput': throughput
}
self.settings = settings
self.intermediate_stats = intermediate_stats
def _worker(self, idx, node):
args = self.args.copy()
args.update({'bootstrap_servers': self.kafka.bootstrap_servers()})
cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance "\
"%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s" % args
for key,value in self.settings.items():
cmd += " %s=%s" % (str(key), str(value))
self.logger.debug("Producer performance %d command: %s", idx, cmd)
def parse_stats(line):
parts = line.split(',')
return {
'records': int(parts[0].split()[0]),
'records_per_sec': float(parts[1].split()[0]),
'mbps': float(parts[1].split('(')[1].split()[0]),
'latency_avg_ms': float(parts[2].split()[0]),
'latency_max_ms': float(parts[3].split()[0]),
'latency_50th_ms': float(parts[4].split()[0]),
'latency_95th_ms': float(parts[5].split()[0]),
'latency_99th_ms': float(parts[6].split()[0]),
'latency_999th_ms': float(parts[7].split()[0]),
}
last = None
for line in node.account.ssh_capture(cmd):
self.logger.debug("Producer performance %d: %s", idx, line.strip())
if self.intermediate_stats:
try:
self.stats[idx-1].append(parse_stats(line))
except:
# Sometimes there are extraneous log messages
pass
last = line
try:
self.results[idx-1] = parse_stats(last)
except:
self.logger.error("Bad last line: %s", last)
class ConsumerPerformanceService(PerformanceService):
def __init__(self, service_context, kafka, topic, num_records, throughput, threads=1, settings={}):
super(ConsumerPerformanceService, self).__init__(service_context)
self.kafka = kafka
self.args = {
'topic': topic,
'num_records': num_records,
'throughput': throughput,
'threads': threads,
}
self.settings = settings
def _worker(self, idx, node):
args = self.args.copy()
args.update({'zk_connect': self.kafka.zk.connect_setting()})
cmd = "/opt/kafka/bin/kafka-consumer-perf-test.sh "\
"--topic %(topic)s --messages %(num_records)d --zookeeper %(zk_connect)s" % args
for key,value in self.settings.items():
cmd += " %s=%s" % (str(key), str(value))
self.logger.debug("Consumer performance %d command: %s", idx, cmd)
last = None
for line in node.account.ssh_capture(cmd):
self.logger.debug("Consumer performance %d: %s", idx, line.strip())
last = line
# Parse and save the last line's information
parts = last.split(',')
self.results[idx-1] = {
'total_mb': float(parts[3]),
'mbps': float(parts[4]),
'records_per_sec': float(parts[6]),
}
class EndToEndLatencyService(PerformanceService):
def __init__(self, service_context, kafka, topic, num_records, consumer_fetch_max_wait=100, acks=1):
super(EndToEndLatencyService, self).__init__(service_context)
self.kafka = kafka
self.args = {
'topic': topic,
'num_records': num_records,
'consumer_fetch_max_wait': consumer_fetch_max_wait,
'acks': acks
}
def _worker(self, idx, node):
args = self.args.copy()
args.update({
'zk_connect': self.kafka.zk.connect_setting(),
'bootstrap_servers': self.kafka.bootstrap_servers(),
})
cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.TestEndToEndLatency "\
"%(bootstrap_servers)s %(zk_connect)s %(topic)s %(num_records)d "\
"%(consumer_fetch_max_wait)d %(acks)d" % args
self.logger.debug("End-to-end latency %d command: %s", idx, cmd)
results = {}
for line in node.account.ssh_capture(cmd):
self.logger.debug("End-to-end latency %d: %s", idx, line.strip())
if line.startswith("Avg latency:"):
results['latency_avg_ms'] = float(line.split()[2])
if line.startswith("Percentiles"):
results['latency_50th_ms'] = float(line.split()[3][:-1])
results['latency_99th_ms'] = float(line.split()[6][:-1])
results['latency_999th_ms'] = float(line.split()[9])
self.results[idx-1] = results
def parse_performance_output(summary):
parts = summary.split(',')
results = {
'records': int(parts[0].split()[0]),
'records_per_sec': float(parts[1].split()[0]),
'mbps': float(parts[1].split('(')[1].split()[0]),
'latency_avg_ms': float(parts[2].split()[0]),
'latency_max_ms': float(parts[3].split()[0]),
'latency_50th_ms': float(parts[4].split()[0]),
'latency_95th_ms': float(parts[5].split()[0]),
'latency_99th_ms': float(parts[6].split()[0]),
'latency_999th_ms': float(parts[7].split()[0]),
}
# To provide compatibility with ConsumerPerformanceService
results['total_mb'] = results['mbps'] * (results['records'] / results['records_per_sec'])
results['rate_mbps'] = results['mbps']
results['rate_mps'] = results['records_per_sec']
return results

View File

@ -1,75 +0,0 @@
# Copyright 2014 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.services.service import Service
import time
class ZookeeperService(Service):
def __init__(self, service_context):
"""
:type service_context ducktape.services.service.ServiceContext
"""
super(ZookeeperService, self).__init__(service_context)
self.logs = {"zk_log": "/mnt/zk.log"}
def start(self):
super(ZookeeperService, self).start()
config = """
dataDir=/mnt/zookeeper
clientPort=2181
maxClientCnxns=0
initLimit=5
syncLimit=2
quorumListenOnAllIPs=true
"""
for idx, node in enumerate(self.nodes, 1):
template_params = { 'idx': idx, 'host': node.account.hostname }
config += "server.%(idx)d=%(host)s:2888:3888\n" % template_params
for idx, node in enumerate(self.nodes, 1):
self.logger.info("Starting ZK node %d on %s", idx, node.account.hostname)
self._stop_and_clean(node, allow_fail=True)
node.account.ssh("mkdir -p /mnt/zookeeper")
node.account.ssh("echo %d > /mnt/zookeeper/myid" % idx)
node.account.create_file("/mnt/zookeeper.properties", config)
node.account.ssh(
"/opt/kafka/bin/zookeeper-server-start.sh /mnt/zookeeper.properties 1>> %(zk_log)s 2>> %(zk_log)s &"
% self.logs)
time.sleep(5) # give it some time to start
def stop_node(self, node, allow_fail=True):
idx = self.idx(node)
self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname))
node.account.ssh("ps ax | grep -i 'zookeeper' | grep -v grep | awk '{print $1}' | xargs kill -SIGTERM",
allow_fail=allow_fail)
def clean_node(self, node, allow_fail=True):
node.account.ssh("rm -rf /mnt/zookeeper /mnt/zookeeper.properties /mnt/zk.log", allow_fail=allow_fail)
def stop(self):
"""If the service left any running processes or data, clean them up."""
super(ZookeeperService, self).stop()
for idx, node in enumerate(self.nodes, 1):
self.stop_node(node, allow_fail=False)
self.clean_node(node)
node.free()
def _stop_and_clean(self, node, allow_fail=False):
self.stop_node(node, allow_fail)
self.clean_node(node, allow_fail)
def connect_setting(self):
return ','.join([node.account.hostname + ':2181' for node in self.nodes])

View File

@ -1,121 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=%(broker_id)d
############################# Socket Server Settings #############################
# The port the socket server listens on
port=9092
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
#host.name=localhost
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name=%(hostname)s
# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=<port accessible by clients>
# The number of threads handling network requests
num.network.threads=3
# The number of threads doing disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=65536
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
log.dirs=/mnt/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=%(zk_connect)s
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=2000

View File

@ -1,193 +0,0 @@
# Copyright 2014 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.services.service import Service
from tests.test import KafkaTest
from services.performance import ProducerPerformanceService, ConsumerPerformanceService, \
EndToEndLatencyService
class KafkaBenchmark(KafkaTest):
'''A benchmark of Kafka producer/consumer performance. This replicates the test
run here:
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
'''
def __init__(self, test_context):
super(KafkaBenchmark, self).__init__(test_context, num_zk=1, num_brokers=3, topics={
'test-rep-one' : { 'partitions': 6, 'replication-factor': 1 },
'test-rep-three' : { 'partitions': 6, 'replication-factor': 3 }
})
def run(self):
msgs_default = 50000000
msgs_large = 100000000
msg_size_default = 100
batch_size = 8*1024
buffer_memory = 64*1024*1024
msg_sizes = [10, 100, 1000, 10000, 100000]
target_data_size = 1024*1024*1024
target_data_size_gb = target_data_size/float(1024*1024*1024)
# These settings will work in the default local Vagrant VMs, useful for testing
if False:
msgs_default = 1000000
msgs_large = 10000000
msg_size_default = 100
batch_size = 8*1024
buffer_memory = 64*1024*1024
msg_sizes = [10, 100, 1000, 10000, 100000]
target_data_size = 128*1024*1024
target_data_size_gb = target_data_size/float(1024*1024*1024)
# PRODUCER TESTS
self.logger.info("BENCHMARK: Single producer, no replication")
single_no_rep = ProducerPerformanceService(
self.service_context(1), self.kafka,
topic="test-rep-one", num_records=msgs_default, record_size=msg_size_default, throughput=-1,
settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory}
)
single_no_rep.run()
self.logger.info("BENCHMARK: Single producer, async 3x replication")
single_rep_async = ProducerPerformanceService(
self.service_context(1), self.kafka,
topic="test-rep-three", num_records=msgs_default, record_size=msg_size_default, throughput=-1,
settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory}
)
single_rep_async.run()
self.logger.info("BENCHMARK: Single producer, sync 3x replication")
single_rep_sync = ProducerPerformanceService(
self.service_context(1), self.kafka,
topic="test-rep-three", num_records=msgs_default, record_size=msg_size_default, throughput=-1,
settings={'acks':-1, 'batch.size':batch_size, 'buffer.memory':buffer_memory}
)
single_rep_sync.run()
self.logger.info("BENCHMARK: Three producers, async 3x replication")
three_rep_async = ProducerPerformanceService(
self.service_context(3), self.kafka,
topic="test-rep-three", num_records=msgs_default, record_size=msg_size_default, throughput=-1,
settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory}
)
three_rep_async.run()
msg_size_perf = {}
for msg_size in msg_sizes:
self.logger.info("BENCHMARK: Message size %d (%f GB total, single producer, async 3x replication)", msg_size, target_data_size_gb)
# Always generate the same total amount of data
nrecords = int(target_data_size / msg_size)
perf = ProducerPerformanceService(
self.service_context(1), self.kafka,
topic="test-rep-three", num_records=nrecords, record_size=msg_size, throughput=-1,
settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory}
)
perf.run()
msg_size_perf[msg_size] = perf
# CONSUMER TESTS
# All consumer tests use the messages from the first benchmark, so
# they'll get messages of the default message size
self.logger.info("BENCHMARK: Single consumer")
single_consumer = ConsumerPerformanceService(
self.service_context(1), self.kafka,
topic="test-rep-three", num_records=msgs_default, throughput=-1, threads=1
)
single_consumer.run()
self.logger.info("BENCHMARK: Three consumers")
three_consumers = ConsumerPerformanceService(
self.service_context(3), self.kafka,
topic="test-rep-three", num_records=msgs_default, throughput=-1, threads=1
)
three_consumers.run()
# PRODUCER + CONSUMER TEST
self.logger.info("BENCHMARK: Producer + Consumer")
pc_producer = ProducerPerformanceService(
self.service_context(1), self.kafka,
topic="test-rep-three", num_records=msgs_default, record_size=msg_size_default, throughput=-1,
settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory}
)
pc_consumer = ConsumerPerformanceService(
self.service_context(1), self.kafka,
topic="test-rep-three", num_records=msgs_default, throughput=-1, threads=1
)
Service.run_parallel(pc_producer, pc_consumer)
# END TO END LATENCY TEST
self.logger.info("BENCHMARK: End to end latency")
e2e_latency = EndToEndLatencyService(
self.service_context(1), self.kafka,
topic="test-rep-three", num_records=10000
)
e2e_latency.run()
# LONG TERM THROUGHPUT TEST
# Because of how much space this ends up using, we clear out the
# existing cluster to start from a clean slate. This also keeps us from
# running out of space with limited disk space.
self.tearDown()
self.setUp()
self.logger.info("BENCHMARK: Long production")
throughput_perf = ProducerPerformanceService(
self.service_context(1), self.kafka,
topic="test-rep-three", num_records=msgs_large, record_size=msg_size_default, throughput=-1,
settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory},
intermediate_stats=True
)
throughput_perf.run()
# Summarize, extracting just the key info. With multiple
# producers/consumers, we display the aggregate value
def throughput(perf):
aggregate_rate = sum([r['records_per_sec'] for r in perf.results])
aggregate_mbps = sum([r['mbps'] for r in perf.results])
return "%f rec/sec (%f MB/s)" % (aggregate_rate, aggregate_mbps)
self.logger.info("=================")
self.logger.info("BENCHMARK RESULTS")
self.logger.info("=================")
self.logger.info("Single producer, no replication: %s", throughput(single_no_rep))
self.logger.info("Single producer, async 3x replication: %s", throughput(single_rep_async))
self.logger.info("Single producer, sync 3x replication: %s", throughput(single_rep_sync))
self.logger.info("Three producers, async 3x replication: %s", throughput(three_rep_async))
self.logger.info("Message size:")
for msg_size in msg_sizes:
self.logger.info(" %d: %s", msg_size, throughput(msg_size_perf[msg_size]))
self.logger.info("Throughput over long run, data > memory:")
# FIXME we should be generating a graph too
# Try to break it into 5 blocks, but fall back to a smaller number if
# there aren't even 5 elements
block_size = max(len(throughput_perf.stats[0]) / 5, 1)
nblocks = len(throughput_perf.stats[0]) / block_size
for i in range(nblocks):
subset = throughput_perf.stats[0][i*block_size:min((i+1)*block_size,len(throughput_perf.stats[0]))]
if len(subset) == 0:
self.logger.info(" Time block %d: (empty)", i)
else:
self.logger.info(" Time block %d: %f rec/sec (%f MB/s)", i,
sum([stat['records_per_sec'] for stat in subset])/float(len(subset)),
sum([stat['mbps'] for stat in subset])/float(len(subset))
)
self.logger.info("Single consumer: %s", throughput(single_consumer))
self.logger.info("Three consumers: %s", throughput(three_consumers))
self.logger.info("Producer + consumer:")
self.logger.info(" Producer: %s", throughput(pc_producer))
self.logger.info(" Consumer: %s", throughput(pc_producer))
self.logger.info("End-to-end latency: median %f ms, 99%% %f ms, 99.9%% %f ms", e2e_latency.results[0]['latency_50th_ms'], e2e_latency.results[0]['latency_99th_ms'], e2e_latency.results[0]['latency_999th_ms'])

View File

@ -1,51 +0,0 @@
# Copyright 2014 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.tests.test import Test
from ducktape.services.service import ServiceContext
from services.zookeeper_service import ZookeeperService
from services.kafka_service import KafkaService
class KafkaTest(Test):
"""
Helper class that managest setting up a Kafka cluster. Use this if the
default settings for Kafka are sufficient for your test; any customization
needs to be done manually. Your run() method should call tearDown and
setUp. The Zookeeper and Kafka services are available as the fields
KafkaTest.zk and KafkaTest.kafka.
"""
def __init__(self, test_context, num_zk, num_brokers, topics=None):
super(KafkaTest, self).__init__(test_context)
self.num_zk = num_zk
self.num_brokers = num_brokers
self.topics = topics
def min_cluster_size(self):
return self.num_zk + self.num_brokers
def setUp(self):
self.zk = ZookeeperService(ServiceContext(self.cluster, self.num_zk, self.logger))
self.kafka = KafkaService(
ServiceContext(self.cluster, self.num_brokers, self.logger),
self.zk, topics=self.topics)
self.zk.start()
self.kafka.start()
def tearDown(self):
self.kafka.stop()
self.zk.stop()