Merge pull request #4 from confluentinc/move_muckrake

Move muckrake
This commit is contained in:
Geoff 2015-06-16 10:05:07 -07:00
commit ede6450840
28 changed files with 1063 additions and 457 deletions

5
Vagrantfile vendored
View File

@ -53,11 +53,6 @@ if File.exists?(local_config_file) then
eval(File.read(local_config_file), binding, "Vagrantfile.local")
end
if mode == "test"
num_zookeepers = 0
num_brokers = 0
end
# This is a horrible hack to work around bad interactions between
# vagrant-hostmanager and vagrant-aws/vagrant's implementation. Hostmanager
# wants to update the /etc/hosts entries, but tries to do so even on nodes that

View File

@ -136,10 +136,7 @@ public class VerifiableProducer {
try {
Namespace res;
res = parser.parseArgs(args);
System.out.println(res);
System.out.println(res.getString("brokerList"));
int maxMessages = res.getInt("maxMessages");
String topic = res.getString("topic");
int throughput = res.getInt("throughput");

View File

@ -17,13 +17,14 @@
package kafka.tools
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer}
import java.util.{Arrays, Properties}
import kafka.consumer._
import java.util.Properties
import java.util.Arrays
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import scala.Option.option2Iterable
object TestEndToEndLatency {
object EndToEndLatency {
def main(args: Array[String]) {
if (args.length != 6) {
System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect topic num_messages consumer_fetch_max_wait producer_acks")

View File

@ -2,43 +2,146 @@ System Integration & Performance Testing
========================================
This directory contains Kafka system integration and performance tests.
[Ducktape](https://github.com/confluentinc/ducktape) is used to run the tests.
[ducktape](https://github.com/confluentinc/ducktape) is used to run the tests.
(ducktape is a distributed testing framework which provides test runner,
result reporter and utilities to pull up and tear down services.)
Ducktape is a distributed testing framework which provides test runner,
result reporter and utilities to pull up and tear down services. It automatically
discovers tests from a directory and generate an HTML report for each run.
Local Quickstart
----------------
This quickstart will help you run the Kafka system tests on your local machine.
To run the tests:
* Install Virtual Box from [https://www.virtualbox.org/](https://www.virtualbox.org/) (run `$ vboxmanage --version` to check if it's installed).
* Install Vagrant >= 1.6.4 from [http://www.vagrantup.com/](http://www.vagrantup.com/) (run `vagrant --version` to check if it's installed).
* Install Vagrant Plugins:
1. Build a specific branch of Kafka
# Required
$ vagrant plugin install vagrant-hostmanager
* Build a specific branch of Kafka
$ cd kafka
$ git checkout $BRANCH
$ gradle
$ ./gradlew jar
2. Setup a testing cluster. You can use Vagrant to create a cluster of local
VMs or on EC2. Configure your Vagrant setup by creating the file
`Vagrantfile.local` in the directory of your Kafka checkout. At a minimum
, you *MUST* set `mode = "test"` and the value of `num_workers` high enough for
the test you're trying to run. If you run on AWS, you also need to set
enable_dns = true.
3. Bring up the cluster, making sure you have enough workers. For Vagrant,
use `vagrant up`. If you want to run on AWS, use `vagrant up
--provider=aws --no-parallel`.
4. Install ducktape:
* Setup a testing cluster with Vagrant. Configure your Vagrant setup by creating the file
`Vagrantfile.local` in the directory of your Kafka checkout. For testing purposes,
`num_brokers` and `num_kafka` should be 0, and `num_workers` should be set high enough
to run all of you tests. An example resides in kafka/vagrant/system-test-Vagrantfile.local
# Example Vagrantfile.local for use on local machine
# Vagrantfile.local should reside in the base Kafka directory
num_zookeepers = 0
num_kafka = 0
num_workers = 9
* Bring up the cluster (note that the initial provisioning process can be slow since it involves
installing dependencies and updates on every vm.):
$ vagrant up
* Install ducktape:
$ git clone https://github.com/confluentinc/ducktape
$ cd ducktape
$ pip install ducktape
5. Run the system tests using ducktape, you can view results in the `results`
directory.
* Run the system tests using ducktape:
$ cd tests
$ ducktape tests
6. To iterate/run again if you made any changes:
$ ducktape kafkatest/tests
* If you make changes to your Kafka checkout, you'll need to rebuild and resync to your Vagrant cluster:
$ cd kafka
$ ./gradlew jar
$ vagrant rsync # Re-syncs build output to cluster
EC2 Quickstart
--------------
This quickstart will help you run the Kafka system tests using Amazon EC2. As a convention, we'll use "kafkatest" in most names, but you can use whatever you want.
There are a lot of steps here, but the basic goals are to create one distinguished EC2 instance that
will be our "test driver", and to set up the security groups and iam role so that the test driver
can create, destroy, and run ssh commands on any number of "workers".
Preparation
-----------
In these steps, we will create an IAM role which has permission to create and destroy EC2 instances,
set up a keypair used for ssh access to the test driver and worker machines, and create a security group to allow the test driver and workers to all communicate via TCP.
* [Create an IAM role](http://docs.aws.amazon.com/IAM/latest/UserGuide/Using_SettingUpUser.html#Using_CreateUser_console). We'll give this role the ability to launch or kill additional EC2 machines.
- Create role "kafkatest-master"
- Role type: Amazon EC2
- Attach policy: AmazonEC2FullAccess (this will allow our test-driver to create and destroy EC2 instances)
* If you haven't already, [set up a keypair to use for SSH access](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html). For the purpose
of this quickstart, let's say the keypair name is kafkatest, and you've saved the private key in kafktest.pem
* Next, create a security group called "kafkatest".
- After creating the group, inbound rules: allow SSH on port 22 from anywhere; also, allow access on all ports (0-65535) from other machines in the kafkatest group.
Create the Test Driver
----------------------
* Launch a new test driver machine
- OS: Ubuntu server is recommended
- Instance type: t2.medium is easily enough since this machine is just a driver
- Instance details: Most defaults are fine.
- IAM role -> kafkatest-master
- Tagging the instance with a useful name is recommended.
- Security group -> 'kafkatest'
* Once the machine is started, upload the SSH key to your test driver:
$ scp -i /path/to/kafkatest.pem \
/path/to/kafkatest.pem ubuntu@public.hostname.amazonaws.com:kafkatest.pem
* Grab the public hostname/IP (available for example by navigating to your EC2 dashboard and viewing running instances) of your test driver and SSH into it:
$ ssh -i /path/to/kafkatest.pem ubuntu@public.hostname.amazonaws.com
Set Up the Test Driver
----------------------
The following steps assume you have ssh'd into
the test driver machine.
* Start by making sure you're up to date, and install git and ducktape:
$ sudo apt-get update && sudo apt-get -y upgrade && sudo apt-get install -y git
$ pip install ducktape
* Get Kafka:
$ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka
* Install some dependencies:
$ cd kafka
$ kafka/vagrant/aws/aws-init.sh
$ . ~/.bashrc
* An example Vagrantfile.local has been created by aws-init.sh which looks something like:
# Vagrantfile.local
ec2_instance_type = "..." # Pick something appropriate for your
# test. Note that the default m3.medium has
# a small disk.
num_zookeepers = 0
num_kafka = 0
num_workers = 9
ec2_keypair_name = 'kafkatest'
ec2_keypair_file = '/home/ubuntu/kafkatest.pem'
ec2_security_groups = ['kafkatest']
ec2_region = 'us-west-2'
ec2_ami = "ami-29ebb519"
* Start up the instances (note we have found bringing up machines in parallel can cause errors on aws):
$ vagrant up --provider=aws --no-provision --no-parallel && vagrant provision
* Now you should be able to run tests:
$ cd kafka/tests
$ ducktape kafkatest/tests
* To halt your workers without destroying persistent state, run `vagrant halt`. Run `vagrant destroy -f` to destroy all traces of your workers.

View File

@ -1,3 +0,0 @@
export AWS_ACCESS_KEY=`curl -s http://169.254.169.254/latest/meta-data/iam/security-credentials/ducttape-master | grep AccessKeyId | awk -F\" '{ print $4 }'`
export AWS_SECRET_KEY=`curl -s http://169.254.169.254/latest/meta-data/iam/security-credentials/ducttape-master | grep SecretAccessKey | awk -F\" '{ print $4 }'`
export AWS_SESSION_TOKEN=`curl -s http://169.254.169.254/latest/meta-data/iam/security-credentials/ducttape-master | grep Token | awk -F\" '{ print $4 }'`

View File

@ -1,9 +0,0 @@
ec3_instance_type = "m3.medium"
enable_dns = true
mode = "test"
num_workers = 1
ec2_keypair_name =
ec2_keypair_file =
ec2_security_groups = ['ducttape-insecure']
ec2_region = 'us-west-2'
ec2_ami = "ami-29ebb519"

View File

@ -0,0 +1,146 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.services.background_thread import BackgroundThreadService
def is_int(msg):
"""Default method used to check whether text pulled from console consumer is a message.
return int or None
"""
try:
return int(msg)
except:
return None
"""
0.8.2.1 ConsoleConsumer options
The console consumer is a tool that reads data from Kafka and outputs it to standard output.
Option Description
------ -----------
--blacklist <blacklist> Blacklist of topics to exclude from
consumption.
--consumer.config <config file> Consumer config properties file.
--csv-reporter-enabled If set, the CSV metrics reporter will
be enabled
--delete-consumer-offsets If specified, the consumer path in
zookeeper is deleted when starting up
--formatter <class> The name of a class to use for
formatting kafka messages for
display. (default: kafka.tools.
DefaultMessageFormatter)
--from-beginning If the consumer does not already have
an established offset to consume
from, start with the earliest
message present in the log rather
than the latest message.
--max-messages <Integer: num_messages> The maximum number of messages to
consume before exiting. If not set,
consumption is continual.
--metrics-dir <metrics dictory> If csv-reporter-enable is set, and
this parameter isset, the csv
metrics will be outputed here
--property <prop>
--skip-message-on-error If there is an error when processing a
message, skip it instead of halt.
--topic <topic> The topic id to consume on.
--whitelist <whitelist> Whitelist of topics to include for
consumption.
--zookeeper <urls> REQUIRED: The connection string for
the zookeeper connection in the form
host:port. Multiple URLS can be
given to allow fail-over.
"""
class ConsoleConsumer(BackgroundThreadService):
logs = {
"consumer_log": {
"path": "/mnt/consumer.log",
"collect_default": True}
}
def __init__(self, context, num_nodes, kafka, topic, message_validator=is_int, from_beginning=True, consumer_timeout_ms=None):
"""
Args:
context: standard context
num_nodes: number of nodes to use (this should be 1)
kafka: kafka service
topic: consume from this topic
message_validator: function which returns message or None
from_beginning: consume from beginning if True, else from the end
consumer_timeout_ms: corresponds to consumer.timeout.ms. consumer process ends if time between
successively consumed messages exceeds this timeout. Setting this and
waiting for the consumer to stop is a pretty good way to consume all messages
in a topic.
"""
super(ConsoleConsumer, self).__init__(context, num_nodes)
self.kafka = kafka
self.args = {
'topic': topic,
}
self.consumer_timeout_ms = consumer_timeout_ms
self.from_beginning = from_beginning
self.message_validator = message_validator
self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)}
@property
def start_cmd(self):
args = self.args.copy()
args.update({'zk_connect': self.kafka.zk.connect_setting()})
cmd = "/opt/kafka/bin/kafka-console-consumer.sh --topic %(topic)s --zookeeper %(zk_connect)s" \
" --consumer.config /mnt/console_consumer.properties" % args
if self.from_beginning:
cmd += " --from-beginning"
cmd += " 2>> /mnt/consumer.log | tee -a /mnt/consumer.log &"
return cmd
def _worker(self, idx, node):
# form config file
if self.consumer_timeout_ms is not None:
prop_file = self.render('console_consumer.properties', consumer_timeout_ms=self.consumer_timeout_ms)
else:
prop_file = self.render('console_consumer.properties')
self.logger.info("console_consumer.properties:")
self.logger.info(prop_file)
node.account.create_file("/mnt/console_consumer.properties", prop_file)
# Run and capture output
cmd = self.start_cmd
self.logger.debug("Console consumer %d command: %s", idx, cmd)
for line in node.account.ssh_capture(cmd):
msg = line.strip()
msg = self.message_validator(msg)
if msg is not None:
self.logger.debug("consumed a message: " + str(msg))
self.messages_consumed[idx].append(msg)
def start_node(self, node):
super(ConsoleConsumer, self).start_node(node)
def stop_node(self, node):
node.account.kill_process("java", allow_fail=False)
def clean_node(self, node):
node.account.ssh("rm -rf /mnt/console_consumer.properties /mnt/consumer.log", allow_fail=False)

View File

@ -1,10 +1,11 @@
# Copyright 2014 Confluent Inc.
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
@ -13,32 +14,37 @@
# limitations under the License.
from ducktape.services.service import Service
import time, re, json
import json
import re
import signal
import time
class KafkaService(Service):
def __init__(self, service_context, zk, topics=None):
logs = {
"kafka_log": {
"path": "/mnt/kafka.log",
"collect_default": True},
"kafka_data": {
"path": "/mnt/kafka-logs",
"collect_default": False}
}
def __init__(self, context, num_nodes, zk, topics=None):
"""
:type service_context ducktape.services.service.ServiceContext
:type context
:type zk: ZookeeperService
:type topics: dict
"""
super(KafkaService, self).__init__(service_context)
super(KafkaService, self).__init__(context, num_nodes)
self.zk = zk
self.topics = topics
def start(self):
super(KafkaService, self).start()
# Start all nodes in this Kafka service
for idx, node in enumerate(self.nodes, 1):
self.logger.info("Starting Kafka node %d on %s", idx, node.account.hostname)
self._stop_and_clean(node, allow_fail=True)
self.start_node(node)
# wait for start up
time.sleep(6)
# Create topics if necessary
if self.topics is not None:
for topic, topic_cfg in self.topics.items():
@ -48,6 +54,47 @@ class KafkaService(Service):
topic_cfg["topic"] = topic
self.create_topic(topic_cfg)
def start_node(self, node):
props_file = self.render('kafka.properties', node=node, broker_id=self.idx(node))
self.logger.info("kafka.properties:")
self.logger.info(props_file)
node.account.create_file("/mnt/kafka.properties", props_file)
cmd = "/opt/kafka/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log & echo $! > /mnt/kafka.pid"
self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd))
node.account.ssh(cmd)
time.sleep(5)
if len(self.pids(node)) == 0:
raise Exception("No process ids recorded on node %s" % str(node))
def pids(self, node):
"""Return process ids associated with running processes on the given node."""
try:
return [pid for pid in node.account.ssh_capture("cat /mnt/kafka.pid", callback=int)]
except:
return []
def signal_node(self, node, sig=signal.SIGTERM):
pids = self.pids(node)
for pid in pids:
node.account.signal(pid, sig)
def signal_leader(self, topic, partition=0, sig=signal.SIGTERM):
leader = self.leader(topic, partition)
self.signal_node(leader, sig)
def stop_node(self, node, clean_shutdown=True):
pids = self.pids(node)
sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
for pid in pids:
node.account.signal(pid, sig, allow_fail=False)
node.account.ssh("rm -f /mnt/kafka.pid", allow_fail=False)
def clean_node(self, node):
node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log /mnt/kafka.pid", allow_fail=False)
def create_topic(self, topic_cfg):
node = self.nodes[0] # any node is fine here
self.logger.info("Creating topic %s with settings %s", topic_cfg["topic"], topic_cfg)
@ -143,45 +190,13 @@ class KafkaService(Service):
self.logger.debug("Verify partition reassignment:")
self.logger.debug(output)
def stop(self):
"""If the service left any running processes or data, clean them up."""
super(KafkaService, self).stop()
for idx, node in enumerate(self.nodes, 1):
self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname))
self._stop_and_clean(node, allow_fail=True)
node.free()
def _stop_and_clean(self, node, allow_fail=False):
node.account.ssh("/opt/kafka/bin/kafka-server-stop.sh", allow_fail=allow_fail)
time.sleep(5) # the stop script doesn't wait
node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log")
def stop_node(self, node, clean_shutdown=True, allow_fail=True):
node.account.kill_process("kafka", clean_shutdown, allow_fail)
def start_node(self, node, config=None):
if config is None:
template = open('templates/kafka.properties').read()
template_params = {
'broker_id': self.idx(node),
'hostname': node.account.hostname,
'zk_connect': self.zk.connect_setting()
}
config = template % template_params
node.account.create_file("/mnt/kafka.properties", config)
cmd = "/opt/kafka/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log &"
self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd))
node.account.ssh(cmd)
def restart_node(self, node, wait_sec=0, clean_shutdown=True):
self.stop_node(node, clean_shutdown, allow_fail=True)
"""Restart the given node, waiting wait_sec in between stopping and starting up again."""
self.stop_node(node, clean_shutdown)
time.sleep(wait_sec)
self.start_node(node)
def get_leader_node(self, topic, partition=0):
def leader(self, topic, partition=0):
""" Get the leader replica for the given topic and partition.
"""
cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.ZooKeeperMainWrapper -server %s " \

View File

@ -1,10 +1,11 @@
# Copyright 2014 Confluent Inc.
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
@ -12,48 +13,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.services.service import Service
import threading
from ducktape.services.background_thread import BackgroundThreadService
class PerformanceService(Service):
def __init__(self, service_context):
super(PerformanceService, self).__init__(service_context)
def start(self):
super(PerformanceService, self).start()
self.worker_threads = []
self.results = [None] * len(self.nodes)
self.stats = [[] for x in range(len(self.nodes))]
for idx,node in enumerate(self.nodes,1):
self.logger.info("Running %s node %d on %s", self.__class__.__name__, idx, node.account.hostname)
worker = threading.Thread(
name=self.__class__.__name__ + "-worker-" + str(idx),
target=self._worker,
args=(idx,node)
)
worker.daemon = True
worker.start()
self.worker_threads.append(worker)
def wait(self):
super(PerformanceService, self).wait()
for idx,worker in enumerate(self.worker_threads,1):
self.logger.debug("Waiting for %s worker %d to finish", self.__class__.__name__, idx)
worker.join()
self.worker_threads = None
def stop(self):
super(PerformanceService, self).stop()
assert self.worker_threads is None, "%s.stop should only be called after wait" % self.__class__.__name__
for idx,node in enumerate(self.nodes,1):
self.logger.debug("Stopping %s node %d on %s", self.__class__.__name__, idx, node.account.hostname)
node.free()
class PerformanceService(BackgroundThreadService):
def __init__(self, context, num_nodes):
super(PerformanceService, self).__init__(context, num_nodes)
self.results = [None] * self.num_nodes
self.stats = [[] for x in range(self.num_nodes)]
class ProducerPerformanceService(PerformanceService):
def __init__(self, service_context, kafka, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False):
super(ProducerPerformanceService, self).__init__(service_context)
def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False):
super(ProducerPerformanceService, self).__init__(context, num_nodes)
self.kafka = kafka
self.args = {
'topic': topic,
@ -73,6 +45,7 @@ class ProducerPerformanceService(PerformanceService):
for key,value in self.settings.items():
cmd += " %s=%s" % (str(key), str(value))
self.logger.debug("Producer performance %d command: %s", idx, cmd)
def parse_stats(line):
parts = line.split(',')
return {
@ -103,8 +76,8 @@ class ProducerPerformanceService(PerformanceService):
class ConsumerPerformanceService(PerformanceService):
def __init__(self, service_context, kafka, topic, num_records, throughput, threads=1, settings={}):
super(ConsumerPerformanceService, self).__init__(service_context)
def __init__(self, context, num_nodes, kafka, topic, num_records, throughput, threads=1, settings={}):
super(ConsumerPerformanceService, self).__init__(context, num_nodes)
self.kafka = kafka
self.args = {
'topic': topic,
@ -128,16 +101,17 @@ class ConsumerPerformanceService(PerformanceService):
last = line
# Parse and save the last line's information
parts = last.split(',')
self.results[idx-1] = {
'total_mb': float(parts[3]),
'mbps': float(parts[4]),
'records_per_sec': float(parts[6]),
'total_mb': float(parts[2]),
'mbps': float(parts[3]),
'records_per_sec': float(parts[5]),
}
class EndToEndLatencyService(PerformanceService):
def __init__(self, service_context, kafka, topic, num_records, consumer_fetch_max_wait=100, acks=1):
super(EndToEndLatencyService, self).__init__(service_context)
def __init__(self, context, num_nodes, kafka, topic, num_records, consumer_fetch_max_wait=100, acks=1):
super(EndToEndLatencyService, self).__init__(context, num_nodes)
self.kafka = kafka
self.args = {
'topic': topic,
@ -152,7 +126,7 @@ class EndToEndLatencyService(PerformanceService):
'zk_connect': self.kafka.zk.connect_setting(),
'bootstrap_servers': self.kafka.bootstrap_servers(),
})
cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.TestEndToEndLatency "\
cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.EndToEndLatency "\
"%(bootstrap_servers)s %(zk_connect)s %(topic)s %(num_records)d "\
"%(consumer_fetch_max_wait)d %(acks)d" % args
self.logger.debug("End-to-end latency %d command: %s", idx, cmd)

View File

@ -0,0 +1,3 @@
{% if consumer_timeout_ms is defined %}
consumer.timeout.ms={{ consumer_timeout_ms }}
{% endif %}

View File

@ -17,7 +17,7 @@
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=%(broker_id)d
broker.id={{ broker_id }}
############################# Socket Server Settings #############################
@ -30,7 +30,7 @@ port=9092
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name=%(hostname)s
advertised.host.name={{ node.account.hostname }}
# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
@ -115,7 +115,7 @@ log.cleaner.enable=false
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=%(zk_connect)s
zookeeper.connect={{ zk.connect_setting() }}
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=2000

View File

@ -0,0 +1,9 @@
dataDir=/mnt/zookeeper
clientPort=2181
maxClientCnxns=0
initLimit=5
syncLimit=2
quorumListenOnAllIPs=true
{% for node in nodes %}
server.{{ loop.index }}={{ node.account.hostname }}:2888:3888
{% endfor %}

View File

@ -0,0 +1,109 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.services.background_thread import BackgroundThreadService
import json
class VerifiableProducer(BackgroundThreadService):
logs = {
"producer_log": {
"path": "/mnt/producer.log",
"collect_default": True}
}
def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000):
super(VerifiableProducer, self).__init__(context, num_nodes)
self.kafka = kafka
self.topic = topic
self.max_messages = max_messages
self.throughput = throughput
self.acked_values = []
self.not_acked_values = []
def _worker(self, idx, node):
cmd = self.start_cmd
self.logger.debug("VerifiableProducer %d command: %s" % (idx, cmd))
for line in node.account.ssh_capture(cmd):
line = line.strip()
data = self.try_parse_json(line)
if data is not None:
self.logger.debug("VerifiableProducer: " + str(data))
with self.lock:
if data["name"] == "producer_send_error":
data["node"] = idx
self.not_acked_values.append(int(data["value"]))
elif data["name"] == "producer_send_success":
self.acked_values.append(int(data["value"]))
@property
def start_cmd(self):
cmd = "/opt/kafka/bin/kafka-verifiable-producer.sh" \
" --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers())
if self.max_messages > 0:
cmd += " --max-messages %s" % str(self.max_messages)
if self.throughput > 0:
cmd += " --throughput %s" % str(self.throughput)
cmd += " 2>> /mnt/producer.log | tee -a /mnt/producer.log &"
return cmd
@property
def acked(self):
with self.lock:
return self.acked_values
@property
def not_acked(self):
with self.lock:
return self.not_acked_values
@property
def num_acked(self):
with self.lock:
return len(self.acked_values)
@property
def num_not_acked(self):
with self.lock:
return len(self.not_acked_values)
def stop_node(self, node):
node.account.kill_process("VerifiableProducer", allow_fail=False)
# block until the corresponding thread exits
if len(self.worker_threads) >= self.idx(node):
# Need to guard this because stop is preemptively called before the worker threads are added and started
self.worker_threads[self.idx(node) - 1].join()
def clean_node(self, node):
node.account.ssh("rm -rf /mnt/producer.log", allow_fail=False)
def try_parse_json(self, string):
"""Try to parse a string as json. Return None if not parseable."""
try:
record = json.loads(string)
return record
except ValueError:
self.logger.debug("Could not parse as json: %s" % str(string))
return None

View File

@ -0,0 +1,64 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.services.service import Service
import time
class ZookeeperService(Service):
logs = {
"zk_log": {
"path": "/mnt/zk.log",
"collect_default": True}
}
def __init__(self, context, num_nodes):
"""
:type context
"""
super(ZookeeperService, self).__init__(context, num_nodes)
def start_node(self, node):
idx = self.idx(node)
self.logger.info("Starting ZK node %d on %s", idx, node.account.hostname)
node.account.ssh("mkdir -p /mnt/zookeeper")
node.account.ssh("echo %d > /mnt/zookeeper/myid" % idx)
config_file = self.render('zookeeper.properties')
self.logger.info("zookeeper.properties:")
self.logger.info(config_file)
node.account.create_file("/mnt/zookeeper.properties", config_file)
node.account.ssh(
"/opt/kafka/bin/zookeeper-server-start.sh /mnt/zookeeper.properties 1>> %(path)s 2>> %(path)s &"
% self.logs["zk_log"])
time.sleep(5) # give it some time to start
def stop_node(self, node):
idx = self.idx(node)
self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname))
node.account.kill_process("zookeeper", allow_fail=False)
def clean_node(self, node):
self.logger.info("Cleaning ZK node %d on %s", self.idx(node), node.account.hostname)
node.account.ssh("rm -rf /mnt/zookeeper /mnt/zookeeper.properties /mnt/zk.log", allow_fail=False)
def connect_setting(self):
return ','.join([node.account.hostname + ':2181' for node in self.nodes])

View File

@ -0,0 +1,274 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.services.service import Service
from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.services.performance import ProducerPerformanceService, ConsumerPerformanceService, EndToEndLatencyService
class Benchmark(KafkaTest):
'''A benchmark of Kafka producer/consumer performance. This replicates the test
run here:
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
'''
def __init__(self, test_context):
super(Benchmark, self).__init__(test_context, num_zk=1, num_brokers=3, topics={
'test-rep-one' : { 'partitions': 6, 'replication-factor': 1 },
'test-rep-three' : { 'partitions': 6, 'replication-factor': 3 }
})
if True:
# Works on both aws and local
self.msgs = 1000000
self.msgs_default = 1000000
else:
# Can use locally on Vagrant VMs, but may use too much memory for aws
self.msgs = 50000000
self.msgs_default = 50000000
self.msgs_large = 10000000
self.msg_size_default = 100
self.batch_size = 8*1024
self.buffer_memory = 64*1024*1024
self.msg_sizes = [10, 100, 1000, 10000, 100000]
self.target_data_size = 128*1024*1024
self.target_data_size_gb = self.target_data_size/float(1024*1024*1024)
def test_single_producer_no_replication(self):
self.logger.info("BENCHMARK: Single producer, no replication")
self.perf = ProducerPerformanceService(
self.test_context, 1, self.kafka,
topic="test-rep-one", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
)
self.perf.run()
data = compute_throughput(self.perf)
self.logger.info("Single producer, no replication: %s", str(data))
return data
def test_single_producer_replication(self):
self.logger.info("BENCHMARK: Single producer, async 3x replication")
self.perf = ProducerPerformanceService(
self.test_context, 1, self.kafka,
topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
)
self.perf.run()
data = compute_throughput(self.perf)
self.logger.info("Single producer, async 3x replication: %s" % str(data))
return data
def test_single_producer_sync(self):
self.logger.info("BENCHMARK: Single producer, sync 3x replication")
self.perf = ProducerPerformanceService(
self.test_context, 1, self.kafka,
topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
settings={'acks':-1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
)
self.perf.run()
data = compute_throughput(self.perf)
self.logger.info("Single producer, sync 3x replication: %s" % data)
return data
def test_three_producers_async(self):
self.logger.info("BENCHMARK: Three producers, async 3x replication")
self.perf = ProducerPerformanceService(
self.test_context, 3, self.kafka,
topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
)
self.perf.run()
data = compute_throughput(self.perf)
self.logger.info("Three producers, async 3x replication: %s" % data)
return data
def test_multiple_message_size(self):
# TODO this would be a great place to use parametrization
self.perfs = {}
for msg_size in self.msg_sizes:
self.logger.info("BENCHMARK: Message size %d (%f GB total, single producer, async 3x replication)", msg_size, self.target_data_size_gb)
# Always generate the same total amount of data
nrecords = int(self.target_data_size / msg_size)
self.perfs["perf-" + str(msg_size)] = ProducerPerformanceService(
self.test_context, 1, self.kafka,
topic="test-rep-three", num_records=nrecords, record_size=msg_size, throughput=-1,
settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
)
self.msg_size_perf = {}
for msg_size in self.msg_sizes:
perf = self.perfs["perf-" + str(msg_size)]
perf.run()
self.msg_size_perf[msg_size] = perf
summary = ["Message size:"]
data = {}
for msg_size in self.msg_sizes:
datum = compute_throughput(self.msg_size_perf[msg_size])
summary.append(" %d: %s" % (msg_size, datum))
data[msg_size] = datum
self.logger.info("\n".join(summary))
return data
def test_long_term_throughput(self):
self.logger.info("BENCHMARK: Long production")
self.perf = ProducerPerformanceService(
self.test_context, 1, self.kafka,
topic="test-rep-three", num_records=self.msgs_large, record_size=self.msg_size_default, throughput=-1,
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory},
intermediate_stats=True
)
self.perf.run()
summary = ["Throughput over long run, data > memory:"]
data = {}
# FIXME we should be generating a graph too
# Try to break it into 5 blocks, but fall back to a smaller number if
# there aren't even 5 elements
block_size = max(len(self.perf.stats[0]) / 5, 1)
nblocks = len(self.perf.stats[0]) / block_size
for i in range(nblocks):
subset = self.perf.stats[0][i*block_size:min((i+1)*block_size, len(self.perf.stats[0]))]
if len(subset) == 0:
summary.append(" Time block %d: (empty)" % i)
data[i] = None
else:
records_per_sec = sum([stat['records_per_sec'] for stat in subset])/float(len(subset))
mb_per_sec = sum([stat['mbps'] for stat in subset])/float(len(subset))
summary.append(" Time block %d: %f rec/sec (%f MB/s)" % (i, records_per_sec, mb_per_sec))
data[i] = throughput(records_per_sec, mb_per_sec)
self.logger.info("\n".join(summary))
return data
def test_end_to_end_latency(self):
self.logger.info("BENCHMARK: End to end latency")
self.perf = EndToEndLatencyService(
self.test_context, 1, self.kafka,
topic="test-rep-three", num_records=10000
)
self.perf.run()
data = latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms'])
self.logger.info("End-to-end latency: %s" % str(data))
return data
def test_producer_and_consumer(self):
self.logger.info("BENCHMARK: Producer + Consumer")
self.producer = ProducerPerformanceService(
self.test_context, 1, self.kafka,
topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
)
self.consumer = ConsumerPerformanceService(
self.test_context, 1, self.kafka,
topic="test-rep-three", num_records=self.msgs_default, throughput=-1, threads=1
)
Service.run_parallel(self.producer, self.consumer)
data = {
"producer": compute_throughput(self.producer),
"consumer": compute_throughput(self.consumer)
}
summary = [
"Producer + consumer:",
str(data)]
self.logger.info("\n".join(summary))
return data
def test_single_consumer(self):
topic = "test-rep-three"
self.producer = ProducerPerformanceService(
self.test_context, 1, self.kafka,
topic=topic, num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
)
self.producer.run()
# All consumer tests use the messages from the first benchmark, so
# they'll get messages of the default message size
self.logger.info("BENCHMARK: Single consumer")
self.perf = ConsumerPerformanceService(
self.test_context, 1, self.kafka,
topic=topic, num_records=self.msgs_default, throughput=-1, threads=1
)
self.perf.run()
data = compute_throughput(self.perf)
self.logger.info("Single consumer: %s" % data)
return data
def test_three_consumers(self):
topic = "test-rep-three"
self.producer = ProducerPerformanceService(
self.test_context, 1, self.kafka,
topic=topic, num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
)
self.producer.run()
self.logger.info("BENCHMARK: Three consumers")
self.perf = ConsumerPerformanceService(
self.test_context, 3, self.kafka,
topic="test-rep-three", num_records=self.msgs_default, throughput=-1, threads=1
)
self.perf.run()
data = compute_throughput(self.perf)
self.logger.info("Three consumers: %s", data)
return data
def throughput(records_per_sec, mb_per_sec):
"""Helper method to ensure uniform representation of throughput data"""
return {
"records_per_sec": records_per_sec,
"mb_per_sec": mb_per_sec
}
def latency(latency_50th_ms, latency_99th_ms, latency_999th_ms):
"""Helper method to ensure uniform representation of latency data"""
return {
"latency_50th_ms": latency_50th_ms,
"latency_99th_ms": latency_99th_ms,
"latency_999th_ms": latency_999th_ms
}
def compute_throughput(perf):
"""Helper method for computing throughput after running a performance service."""
aggregate_rate = sum([r['records_per_sec'] for r in perf.results])
aggregate_mbps = sum([r['mbps'] for r in perf.results])
return throughput(aggregate_rate, aggregate_mbps)

View File

@ -1,10 +1,11 @@
# Copyright 2014 Confluent Inc.
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
@ -13,21 +14,19 @@
# limitations under the License.
from ducktape.tests.test import Test
from ducktape.services.service import ServiceContext
from services.zookeeper_service import ZookeeperService
from services.kafka_service import KafkaService
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
class KafkaTest(Test):
"""
Helper class that managest setting up a Kafka cluster. Use this if the
Helper class that manages setting up a Kafka cluster. Use this if the
default settings for Kafka are sufficient for your test; any customization
needs to be done manually. Your run() method should call tearDown and
setUp. The Zookeeper and Kafka services are available as the fields
KafkaTest.zk and KafkaTest.kafka.
"""
def __init__(self, test_context, num_zk, num_brokers, topics=None):
super(KafkaTest, self).__init__(test_context)
@ -35,17 +34,12 @@ class KafkaTest(Test):
self.num_brokers = num_brokers
self.topics = topics
def min_cluster_size(self):
return self.num_zk + self.num_brokers
self.zk = ZookeeperService(test_context, self.num_zk)
self.kafka = KafkaService(
test_context, self.num_brokers,
self.zk, topics=self.topics)
def setUp(self):
self.zk = ZookeeperService(ServiceContext(self.cluster, self.num_zk, self.logger))
self.kafka = KafkaService(
ServiceContext(self.cluster, self.num_brokers, self.logger),
self.zk, topics=self.topics)
self.zk.start()
self.kafka.start()
def tearDown(self):
self.kafka.stop()
self.zk.stop()
self.kafka.start()

View File

@ -0,0 +1,160 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.console_consumer import ConsoleConsumer
import signal
import time
class ReplicationTest(Test):
"""Replication tests.
These tests verify that replication provides simple durability guarantees by checking that data acked by
brokers is still available for consumption in the face of various failure scenarios."""
def __init__(self, test_context):
""":type test_context: ducktape.tests.test.TestContext"""
super(ReplicationTest, self).__init__(test_context=test_context)
self.topic = "test_topic"
self.zk = ZookeeperService(test_context, num_nodes=1)
self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
"partitions": 3,
"replication-factor": 3,
"min.insync.replicas": 2}
})
self.producer_throughput = 10000
self.num_producers = 1
self.num_consumers = 1
def setUp(self):
self.zk.start()
self.kafka.start()
def min_cluster_size(self):
"""Override this since we're adding services outside of the constructor"""
return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers
def run_with_failure(self, failure):
"""This is the top-level test template.
The steps are:
Produce messages in the background while driving some failure condition
When done driving failures, immediately stop producing
Consume all messages
Validate that messages acked by brokers were consumed
Note that consuming is a bit tricky, at least with console consumer. The goal is to consume all messages
(foreach partition) in the topic. In this case, waiting for the last message may cause the consumer to stop
too soon since console consumer is consuming multiple partitions from a single thread and therefore we lose
ordering guarantees.
Waiting on a count of consumed messages can be unreliable: if we stop consuming when num_consumed == num_acked,
we might exit early if some messages are duplicated (though not an issue here since producer retries==0)
Therefore rely here on the consumer.timeout.ms setting which times out on the interval between successively
consumed messages. Since we run the producer to completion before running the consumer, this is a reliable
indicator that nothing is left to consume.
"""
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=3000)
# Produce in a background thread while driving broker failures
self.producer.start()
if not wait_until(lambda: self.producer.num_acked > 5, timeout_sec=5):
raise RuntimeError("Producer failed to start in a reasonable amount of time.")
failure()
self.producer.stop()
self.acked = self.producer.acked
self.not_acked = self.producer.not_acked
self.logger.info("num not acked: %d" % self.producer.num_not_acked)
self.logger.info("num acked: %d" % self.producer.num_acked)
# Consume all messages
self.consumer.start()
self.consumer.wait()
self.consumed = self.consumer.messages_consumed[1]
self.logger.info("num consumed: %d" % len(self.consumed))
# Check produced vs consumed
self.validate()
def clean_shutdown(self):
"""Discover leader node for our topic and shut it down cleanly."""
self.kafka.signal_leader(self.topic, partition=0, sig=signal.SIGTERM)
def hard_shutdown(self):
"""Discover leader node for our topic and shut it down with a hard kill."""
self.kafka.signal_leader(self.topic, partition=0, sig=signal.SIGKILL)
def clean_bounce(self):
"""Chase the leader of one partition and restart it cleanly."""
for i in range(5):
prev_leader_node = self.kafka.leader(topic=self.topic, partition=0)
self.kafka.restart_node(prev_leader_node, wait_sec=5, clean_shutdown=True)
def hard_bounce(self):
"""Chase the leader and restart it cleanly."""
for i in range(5):
prev_leader_node = self.kafka.leader(topic=self.topic, partition=0)
self.kafka.restart_node(prev_leader_node, wait_sec=5, clean_shutdown=False)
# Wait long enough for previous leader to probably be awake again
time.sleep(6)
def validate(self):
"""Check that produced messages were consumed."""
success = True
msg = ""
if len(set(self.consumed)) != len(self.consumed):
# There are duplicates. This is ok, so report it but don't fail the test
msg += "There are duplicate messages in the log\n"
if not set(self.consumed).issuperset(set(self.acked)):
# Every acked message must appear in the logs. I.e. consumed messages must be superset of acked messages.
acked_minus_consumed = set(self.producer.acked) - set(self.consumed)
success = False
msg += "At least one acked message did not appear in the consumed messages. acked_minus_consumed: " + str(acked_minus_consumed)
if not success:
# Collect all the data logs if there was a failure
self.mark_for_collect(self.kafka)
assert success, msg
def test_clean_shutdown(self):
self.run_with_failure(self.clean_shutdown)
def test_hard_shutdown(self):
self.run_with_failure(self.hard_shutdown)
def test_clean_bounce(self):
self.run_with_failure(self.clean_bounce)
def test_hard_bounce(self):
self.run_with_failure(self.hard_bounce)

View File

@ -1,75 +0,0 @@
# Copyright 2014 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.services.service import Service
import time
class ZookeeperService(Service):
def __init__(self, service_context):
"""
:type service_context ducktape.services.service.ServiceContext
"""
super(ZookeeperService, self).__init__(service_context)
self.logs = {"zk_log": "/mnt/zk.log"}
def start(self):
super(ZookeeperService, self).start()
config = """
dataDir=/mnt/zookeeper
clientPort=2181
maxClientCnxns=0
initLimit=5
syncLimit=2
quorumListenOnAllIPs=true
"""
for idx, node in enumerate(self.nodes, 1):
template_params = { 'idx': idx, 'host': node.account.hostname }
config += "server.%(idx)d=%(host)s:2888:3888\n" % template_params
for idx, node in enumerate(self.nodes, 1):
self.logger.info("Starting ZK node %d on %s", idx, node.account.hostname)
self._stop_and_clean(node, allow_fail=True)
node.account.ssh("mkdir -p /mnt/zookeeper")
node.account.ssh("echo %d > /mnt/zookeeper/myid" % idx)
node.account.create_file("/mnt/zookeeper.properties", config)
node.account.ssh(
"/opt/kafka/bin/zookeeper-server-start.sh /mnt/zookeeper.properties 1>> %(zk_log)s 2>> %(zk_log)s &"
% self.logs)
time.sleep(5) # give it some time to start
def stop_node(self, node, allow_fail=True):
idx = self.idx(node)
self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname))
node.account.ssh("ps ax | grep -i 'zookeeper' | grep -v grep | awk '{print $1}' | xargs kill -SIGTERM",
allow_fail=allow_fail)
def clean_node(self, node, allow_fail=True):
node.account.ssh("rm -rf /mnt/zookeeper /mnt/zookeeper.properties /mnt/zk.log", allow_fail=allow_fail)
def stop(self):
"""If the service left any running processes or data, clean them up."""
super(ZookeeperService, self).stop()
for idx, node in enumerate(self.nodes, 1):
self.stop_node(node, allow_fail=False)
self.clean_node(node)
node.free()
def _stop_and_clean(self, node, allow_fail=False):
self.stop_node(node, allow_fail)
self.clean_node(node, allow_fail)
def connect_setting(self):
return ','.join([node.account.hostname + ':2181' for node in self.nodes])

11
tests/setup.py Normal file
View File

@ -0,0 +1,11 @@
from setuptools import find_packages, setup
setup(name="kafkatest",
version="0.8.3-SNAPSHOT",
description="Apache Kafka System Tests",
author="Apache Kafka",
platforms=["any"],
license="apache2.0",
packages=find_packages(),
requires=["ducktape(>=0.2.0)"]
)

View File

@ -1,193 +0,0 @@
# Copyright 2014 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.services.service import Service
from tests.test import KafkaTest
from services.performance import ProducerPerformanceService, ConsumerPerformanceService, \
EndToEndLatencyService
class KafkaBenchmark(KafkaTest):
'''A benchmark of Kafka producer/consumer performance. This replicates the test
run here:
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
'''
def __init__(self, test_context):
super(KafkaBenchmark, self).__init__(test_context, num_zk=1, num_brokers=3, topics={
'test-rep-one' : { 'partitions': 6, 'replication-factor': 1 },
'test-rep-three' : { 'partitions': 6, 'replication-factor': 3 }
})
def run(self):
msgs_default = 50000000
msgs_large = 100000000
msg_size_default = 100
batch_size = 8*1024
buffer_memory = 64*1024*1024
msg_sizes = [10, 100, 1000, 10000, 100000]
target_data_size = 1024*1024*1024
target_data_size_gb = target_data_size/float(1024*1024*1024)
# These settings will work in the default local Vagrant VMs, useful for testing
if False:
msgs_default = 1000000
msgs_large = 10000000
msg_size_default = 100
batch_size = 8*1024
buffer_memory = 64*1024*1024
msg_sizes = [10, 100, 1000, 10000, 100000]
target_data_size = 128*1024*1024
target_data_size_gb = target_data_size/float(1024*1024*1024)
# PRODUCER TESTS
self.logger.info("BENCHMARK: Single producer, no replication")
single_no_rep = ProducerPerformanceService(
self.service_context(1), self.kafka,
topic="test-rep-one", num_records=msgs_default, record_size=msg_size_default, throughput=-1,
settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory}
)
single_no_rep.run()
self.logger.info("BENCHMARK: Single producer, async 3x replication")
single_rep_async = ProducerPerformanceService(
self.service_context(1), self.kafka,
topic="test-rep-three", num_records=msgs_default, record_size=msg_size_default, throughput=-1,
settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory}
)
single_rep_async.run()
self.logger.info("BENCHMARK: Single producer, sync 3x replication")
single_rep_sync = ProducerPerformanceService(
self.service_context(1), self.kafka,
topic="test-rep-three", num_records=msgs_default, record_size=msg_size_default, throughput=-1,
settings={'acks':-1, 'batch.size':batch_size, 'buffer.memory':buffer_memory}
)
single_rep_sync.run()
self.logger.info("BENCHMARK: Three producers, async 3x replication")
three_rep_async = ProducerPerformanceService(
self.service_context(3), self.kafka,
topic="test-rep-three", num_records=msgs_default, record_size=msg_size_default, throughput=-1,
settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory}
)
three_rep_async.run()
msg_size_perf = {}
for msg_size in msg_sizes:
self.logger.info("BENCHMARK: Message size %d (%f GB total, single producer, async 3x replication)", msg_size, target_data_size_gb)
# Always generate the same total amount of data
nrecords = int(target_data_size / msg_size)
perf = ProducerPerformanceService(
self.service_context(1), self.kafka,
topic="test-rep-three", num_records=nrecords, record_size=msg_size, throughput=-1,
settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory}
)
perf.run()
msg_size_perf[msg_size] = perf
# CONSUMER TESTS
# All consumer tests use the messages from the first benchmark, so
# they'll get messages of the default message size
self.logger.info("BENCHMARK: Single consumer")
single_consumer = ConsumerPerformanceService(
self.service_context(1), self.kafka,
topic="test-rep-three", num_records=msgs_default, throughput=-1, threads=1
)
single_consumer.run()
self.logger.info("BENCHMARK: Three consumers")
three_consumers = ConsumerPerformanceService(
self.service_context(3), self.kafka,
topic="test-rep-three", num_records=msgs_default, throughput=-1, threads=1
)
three_consumers.run()
# PRODUCER + CONSUMER TEST
self.logger.info("BENCHMARK: Producer + Consumer")
pc_producer = ProducerPerformanceService(
self.service_context(1), self.kafka,
topic="test-rep-three", num_records=msgs_default, record_size=msg_size_default, throughput=-1,
settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory}
)
pc_consumer = ConsumerPerformanceService(
self.service_context(1), self.kafka,
topic="test-rep-three", num_records=msgs_default, throughput=-1, threads=1
)
Service.run_parallel(pc_producer, pc_consumer)
# END TO END LATENCY TEST
self.logger.info("BENCHMARK: End to end latency")
e2e_latency = EndToEndLatencyService(
self.service_context(1), self.kafka,
topic="test-rep-three", num_records=10000
)
e2e_latency.run()
# LONG TERM THROUGHPUT TEST
# Because of how much space this ends up using, we clear out the
# existing cluster to start from a clean slate. This also keeps us from
# running out of space with limited disk space.
self.tearDown()
self.setUp()
self.logger.info("BENCHMARK: Long production")
throughput_perf = ProducerPerformanceService(
self.service_context(1), self.kafka,
topic="test-rep-three", num_records=msgs_large, record_size=msg_size_default, throughput=-1,
settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory},
intermediate_stats=True
)
throughput_perf.run()
# Summarize, extracting just the key info. With multiple
# producers/consumers, we display the aggregate value
def throughput(perf):
aggregate_rate = sum([r['records_per_sec'] for r in perf.results])
aggregate_mbps = sum([r['mbps'] for r in perf.results])
return "%f rec/sec (%f MB/s)" % (aggregate_rate, aggregate_mbps)
self.logger.info("=================")
self.logger.info("BENCHMARK RESULTS")
self.logger.info("=================")
self.logger.info("Single producer, no replication: %s", throughput(single_no_rep))
self.logger.info("Single producer, async 3x replication: %s", throughput(single_rep_async))
self.logger.info("Single producer, sync 3x replication: %s", throughput(single_rep_sync))
self.logger.info("Three producers, async 3x replication: %s", throughput(three_rep_async))
self.logger.info("Message size:")
for msg_size in msg_sizes:
self.logger.info(" %d: %s", msg_size, throughput(msg_size_perf[msg_size]))
self.logger.info("Throughput over long run, data > memory:")
# FIXME we should be generating a graph too
# Try to break it into 5 blocks, but fall back to a smaller number if
# there aren't even 5 elements
block_size = max(len(throughput_perf.stats[0]) / 5, 1)
nblocks = len(throughput_perf.stats[0]) / block_size
for i in range(nblocks):
subset = throughput_perf.stats[0][i*block_size:min((i+1)*block_size,len(throughput_perf.stats[0]))]
if len(subset) == 0:
self.logger.info(" Time block %d: (empty)", i)
else:
self.logger.info(" Time block %d: %f rec/sec (%f MB/s)", i,
sum([stat['records_per_sec'] for stat in subset])/float(len(subset)),
sum([stat['mbps'] for stat in subset])/float(len(subset))
)
self.logger.info("Single consumer: %s", throughput(single_consumer))
self.logger.info("Three consumers: %s", throughput(three_consumers))
self.logger.info("Producer + consumer:")
self.logger.info(" Producer: %s", throughput(pc_producer))
self.logger.info(" Consumer: %s", throughput(pc_producer))
self.logger.info("End-to-end latency: median %f ms, 99%% %f ms, 99.9%% %f ms", e2e_latency.results[0]['latency_50th_ms'], e2e_latency.results[0]['latency_99th_ms'], e2e_latency.results[0]['latency_999th_ms'])

View File

@ -0,0 +1,13 @@
if [ -z "$AWS_IAM" ];then
echo "Warning: AWS_IAM is not set"
fi
export AWS_ACCESS_KEY=`curl -s http://169.254.169.254/latest/meta-data/iam/security-credentials/$AWS_IAM | grep AccessKeyId | awk -F\" '{ print $4 }'`
export AWS_SECRET_KEY=`curl -s http://169.254.169.254/latest/meta-data/iam/security-credentials/$AWS_IAM | grep SecretAccessKey | awk -F\" '{ print $4 }'`
export AWS_SESSION_TOKEN=`curl -s http://169.254.169.254/latest/meta-data/iam/security-credentials/$AWS_IAM | grep Token | awk -F\" '{ print $4 }'`
if [ -z "$AWS_ACCESS_KEY" ]; then
echo "Failed to populate environment variables AWS_ACCESS_KEY, AWS_SECRET_KEY, and AWS_SESSION_TOKEN."
echo "AWS_IAM is currently $AWS_IAM. Double-check that this is correct. If not set, add this command to your .bashrc file:"
echo "export AWS_IAM=<my_aws_iam> # put this into your ~/.bashrc"
fi

View File

@ -0,0 +1,12 @@
# Use this template Vagrantfile.local for running system tests on aws
# To use it, move it to the base kafka directory and rename
# it to Vagrantfile.local, and adjust variables as needed.
ec3_instance_type = "m3.medium"
num_zookeepers = 0
num_brokers = 0
num_workers = 9
ec2_keypair_name = kafkatest
ec2_keypair_file = ../kafkatest.pem
ec2_security_groups = ['kafkatest']
ec2_region = 'us-west-2'
ec2_ami = "ami-29ebb519"

View File

@ -1,13 +1,13 @@
#!/bin/bash
# This script should be run once on your aws test driver machine before
# attempting to run any ducktape tests
# This script can be used to set up a driver machine on aws from which you will run tests
# or bring up your mini Kafka cluster.
# Install dependencies
sudo apt-get install -y maven openjdk-6-jdk build-essential \
ruby-dev zlib1g-dev realpath python-setuptools
base_dir=`dirname $0`/..
base_dir=`dirname $0`/../..
if [ -z `which vagrant` ]; then
echo "Installing vagrant..."

View File

@ -24,16 +24,16 @@ if [ -z `which javac` ]; then
apt-get -y update
# Try to share cache. See Vagrantfile for details
mkdir -p /var/cache/oracle-jdk7-installer
if [ -e "/tmp/oracle-jdk7-installer-cache/" ]; then
find /tmp/oracle-jdk7-installer-cache/ -not -empty -exec cp '{}' /var/cache/oracle-jdk7-installer/ \;
mkdir -p /var/cache/oracle-jdk6-installer
if [ -e "/tmp/oracle-jdk6-installer-cache/" ]; then
find /tmp/oracle-jdk6-installer-cache/ -not -empty -exec cp '{}' /var/cache/oracle-jdk6-installer/ \;
fi
/bin/echo debconf shared/accepted-oracle-license-v1-1 select true | /usr/bin/debconf-set-selections
apt-get -y install oracle-java7-installer oracle-java7-set-default
apt-get -y install oracle-java6-installer oracle-java6-set-default
if [ -e "/tmp/oracle-jdk7-installer-cache/" ]; then
cp -R /var/cache/oracle-jdk7-installer/* /tmp/oracle-jdk7-installer-cache
if [ -e "/tmp/oracle-jdk6-installer-cache/" ]; then
cp -R /var/cache/oracle-jdk6-installer/* /tmp/oracle-jdk6-installer-cache
fi
fi

View File

@ -0,0 +1,6 @@
# Use this example Vagrantfile.local for running system tests
# To use it, move it to the base kafka directory and rename
# it to Vagrantfile.local
num_zookeepers = 0
num_brokers = 0
num_workers = 9