mirror of https://github.com/apache/kafka.git
Bootstrap Kafka system tests
This commit is contained in:
parent
5397d3c53e
commit
81e41562f3
|
@ -29,3 +29,8 @@ config/server-*
|
||||||
config/zookeeper-*
|
config/zookeeper-*
|
||||||
core/data/*
|
core/data/*
|
||||||
gradle/wrapper/*
|
gradle/wrapper/*
|
||||||
|
|
||||||
|
results
|
||||||
|
tests/results
|
||||||
|
.ducktape
|
||||||
|
tests/.ducktape
|
||||||
|
|
|
@ -20,16 +20,20 @@ require 'socket'
|
||||||
# Vagrantfile API/syntax version. Don't touch unless you know what you're doing!
|
# Vagrantfile API/syntax version. Don't touch unless you know what you're doing!
|
||||||
VAGRANTFILE_API_VERSION = "2"
|
VAGRANTFILE_API_VERSION = "2"
|
||||||
|
|
||||||
|
# Mode
|
||||||
|
mode = "kafka_cluster"
|
||||||
|
|
||||||
# General config
|
# General config
|
||||||
enable_dns = false
|
enable_dns = false
|
||||||
|
ram_megabytes = 1280
|
||||||
|
num_workers = 0 # Generic workers that get the code, but don't start any services
|
||||||
num_zookeepers = 1
|
num_zookeepers = 1
|
||||||
num_brokers = 3
|
num_brokers = 3
|
||||||
num_workers = 0 # Generic workers that get the code, but don't start any services
|
|
||||||
ram_megabytes = 1280
|
|
||||||
|
|
||||||
# EC2
|
# EC2
|
||||||
ec2_access_key = ENV['AWS_ACCESS_KEY']
|
ec2_access_key = ENV['AWS_ACCESS_KEY']
|
||||||
ec2_secret_key = ENV['AWS_SECRET_KEY']
|
ec2_secret_key = ENV['AWS_SECRET_KEY']
|
||||||
|
ec2_session_token = ENV['AWS_SESSION_TOKEN']
|
||||||
ec2_keypair_name = nil
|
ec2_keypair_name = nil
|
||||||
ec2_keypair_file = nil
|
ec2_keypair_file = nil
|
||||||
|
|
||||||
|
@ -49,6 +53,29 @@ if File.exists?(local_config_file) then
|
||||||
eval(File.read(local_config_file), binding, "Vagrantfile.local")
|
eval(File.read(local_config_file), binding, "Vagrantfile.local")
|
||||||
end
|
end
|
||||||
|
|
||||||
|
if mode == "test"
|
||||||
|
num_zookeepers = 0
|
||||||
|
num_brokers = 0
|
||||||
|
end
|
||||||
|
|
||||||
|
# This is a horrible hack to work around bad interactions between
|
||||||
|
# vagrant-hostmanager and vagrant-aws/vagrant's implementation. Hostmanager
|
||||||
|
# wants to update the /etc/hosts entries, but tries to do so even on nodes that
|
||||||
|
# aren't up (e.g. even when all nodes are stopped and you run vagrant
|
||||||
|
# destroy). Because of the way the underlying code in vagrant works, it still
|
||||||
|
# tries to communicate with the node and has to wait for a very long
|
||||||
|
# timeout. This modifies the update to check for hosts that are not created or
|
||||||
|
# stopped, skipping the update in that case since it's impossible to update
|
||||||
|
# nodes in that state.
|
||||||
|
Object.const_get("VagrantPlugins").const_get("HostManager").const_get("HostsFile").class_eval do
|
||||||
|
alias_method :old_update_guest, :update_guest
|
||||||
|
def update_guest(machine)
|
||||||
|
state_id = machine.state.id
|
||||||
|
return if state_id == :not_created || state_id == :stopped
|
||||||
|
old_update_guest(machine)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
# TODO(ksweeney): RAM requirements are not empirical and can probably be significantly lowered.
|
# TODO(ksweeney): RAM requirements are not empirical and can probably be significantly lowered.
|
||||||
Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
|
Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
|
||||||
config.hostmanager.enabled = true
|
config.hostmanager.enabled = true
|
||||||
|
@ -84,13 +111,31 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
|
||||||
override.vm.box = "dummy"
|
override.vm.box = "dummy"
|
||||||
override.vm.box_url = "https://github.com/mitchellh/vagrant-aws/raw/master/dummy.box"
|
override.vm.box_url = "https://github.com/mitchellh/vagrant-aws/raw/master/dummy.box"
|
||||||
|
|
||||||
override.hostmanager.ignore_private_ip = true
|
cached_addresses = {}
|
||||||
|
# Use a custom resolver that SSH's into the machine and finds the IP address
|
||||||
|
# directly. This lets us get at the private IP address directly, avoiding
|
||||||
|
# some issues with using the default IP resolver, which uses the public IP
|
||||||
|
# address.
|
||||||
|
override.hostmanager.ip_resolver = proc do |vm, resolving_vm|
|
||||||
|
if !cached_addresses.has_key?(vm.name)
|
||||||
|
state_id = vm.state.id
|
||||||
|
if state_id != :not_created && state_id != :stopped && vm.communicate.ready?
|
||||||
|
vm.communicate.execute("/sbin/ifconfig eth0 | grep 'inet addr' | tail -n 1 | egrep -o '[0-9\.]+' | head -n 1 2>&1") do |type, contents|
|
||||||
|
cached_addresses[vm.name] = contents.split("\n").first[/(\d+\.\d+\.\d+\.\d+)/, 1]
|
||||||
|
end
|
||||||
|
else
|
||||||
|
cached_addresses[vm.name] = nil
|
||||||
|
end
|
||||||
|
end
|
||||||
|
cached_addresses[vm.name]
|
||||||
|
end
|
||||||
|
|
||||||
override.ssh.username = ec2_user
|
override.ssh.username = ec2_user
|
||||||
override.ssh.private_key_path = ec2_keypair_file
|
override.ssh.private_key_path = ec2_keypair_file
|
||||||
|
|
||||||
aws.access_key_id = ec2_access_key
|
aws.access_key_id = ec2_access_key
|
||||||
aws.secret_access_key = ec2_secret_key
|
aws.secret_access_key = ec2_secret_key
|
||||||
|
aws.session_token = ec2_session_token
|
||||||
aws.keypair_name = ec2_keypair_name
|
aws.keypair_name = ec2_keypair_name
|
||||||
|
|
||||||
aws.region = ec2_region
|
aws.region = ec2_region
|
||||||
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
Vagrantfile.local
|
||||||
|
|
||||||
|
.idea/
|
||||||
|
|
||||||
|
*.pyc
|
||||||
|
*.ipynb
|
||||||
|
|
||||||
|
.DS_Store
|
||||||
|
|
||||||
|
.ducktape
|
||||||
|
results/
|
|
@ -0,0 +1,44 @@
|
||||||
|
System Integration & Performance Testing
|
||||||
|
========================================
|
||||||
|
|
||||||
|
This directory contains Kafka system integration and performance tests.
|
||||||
|
[Ducktape](https://github.com/confluentinc/ducktape) is used to run the tests.
|
||||||
|
|
||||||
|
Ducktape is a distributed testing framework which provides test runner,
|
||||||
|
result reporter and utilities to pull up and tear down services. It automatically
|
||||||
|
discovers tests from a directory and generate an HTML report for each run.
|
||||||
|
|
||||||
|
To run the tests:
|
||||||
|
|
||||||
|
1. Build a specific branch of Kafka
|
||||||
|
|
||||||
|
$ cd kafka
|
||||||
|
$ git checkout $BRANCH
|
||||||
|
$ gradle
|
||||||
|
$ ./gradlew jar
|
||||||
|
|
||||||
|
2. Setup a testing cluster. You can use Vagrant to create a cluster of local
|
||||||
|
VMs or on EC2. Configure your Vagrant setup by creating the file
|
||||||
|
`Vagrantfile.local` in the directory of your Kafka checkout. At a minimum
|
||||||
|
, you *MUST* set `mode = "test"` and the value of `num_workers` high enough for
|
||||||
|
the test you're trying to run. If you run on AWS, you also need to set
|
||||||
|
enable_dns = true.
|
||||||
|
|
||||||
|
3. Bring up the cluster, making sure you have enough workers. For Vagrant,
|
||||||
|
use `vagrant up`. If you want to run on AWS, use `vagrant up
|
||||||
|
--provider=aws --no-parallel`.
|
||||||
|
4. Install ducktape:
|
||||||
|
|
||||||
|
$ git clone https://github.com/confluentinc/ducktape
|
||||||
|
$ cd ducktape
|
||||||
|
$ pip install ducktape
|
||||||
|
5. Run the system tests using ducktape, you can view results in the `results`
|
||||||
|
directory.
|
||||||
|
|
||||||
|
$ cd tests
|
||||||
|
$ ducktape tests
|
||||||
|
6. To iterate/run again if you made any changes:
|
||||||
|
|
||||||
|
$ cd kafka
|
||||||
|
$ ./gradlew jar
|
||||||
|
$ vagrant rsync # Re-syncs build output to cluster
|
|
@ -0,0 +1 @@
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
export AWS_ACCESS_KEY=`curl -s http://169.254.169.254/latest/meta-data/iam/security-credentials/ducttape-master | grep AccessKeyId | awk -F\" '{ print $4 }'`
|
||||||
|
export AWS_SECRET_KEY=`curl -s http://169.254.169.254/latest/meta-data/iam/security-credentials/ducttape-master | grep SecretAccessKey | awk -F\" '{ print $4 }'`
|
||||||
|
export AWS_SESSION_TOKEN=`curl -s http://169.254.169.254/latest/meta-data/iam/security-credentials/ducttape-master | grep Token | awk -F\" '{ print $4 }'`
|
|
@ -0,0 +1,9 @@
|
||||||
|
ec3_instance_type = "m3.medium"
|
||||||
|
enable_dns = true
|
||||||
|
mode = "test"
|
||||||
|
num_workers = 1
|
||||||
|
ec2_keypair_name =
|
||||||
|
ec2_keypair_file =
|
||||||
|
ec2_security_groups = ['ducttape-insecure']
|
||||||
|
ec2_region = 'us-west-2'
|
||||||
|
ec2_ami = "ami-29ebb519"
|
|
@ -0,0 +1,57 @@
|
||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
# This script should be run once on your aws test driver machine before
|
||||||
|
# attempting to run any ducktape tests
|
||||||
|
|
||||||
|
# Install dependencies
|
||||||
|
sudo apt-get install -y maven openjdk-6-jdk build-essential \
|
||||||
|
ruby-dev zlib1g-dev realpath python-setuptools
|
||||||
|
|
||||||
|
base_dir=`dirname $0`/..
|
||||||
|
|
||||||
|
if [ -z `which vagrant` ]; then
|
||||||
|
echo "Installing vagrant..."
|
||||||
|
wget https://dl.bintray.com/mitchellh/vagrant/vagrant_1.7.2_x86_64.deb
|
||||||
|
sudo dpkg -i vagrant_1.7.2_x86_64.deb
|
||||||
|
rm -f vagrant_1.7.2_x86_64.deb
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Install necessary vagrant plugins
|
||||||
|
# Note: Do NOT install vagrant-cachier since it doesn't work on AWS and only
|
||||||
|
# adds log noise
|
||||||
|
vagrant_plugins="vagrant-aws vagrant-hostmanager"
|
||||||
|
existing=`vagrant plugin list`
|
||||||
|
for plugin in $vagrant_plugins; do
|
||||||
|
echo $existing | grep $plugin > /dev/null
|
||||||
|
if [ $? != 0 ]; then
|
||||||
|
vagrant plugin install $plugin
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
|
||||||
|
# Create Vagrantfile.local as a convenience
|
||||||
|
if [ ! -e "$base_dir/Vagrantfile.local" ]; then
|
||||||
|
cp $base_dir/aws/aws-example-Vagrantfile.local $base_dir/Vagrantfile.local
|
||||||
|
fi
|
||||||
|
|
||||||
|
gradle="gradle-2.2.1"
|
||||||
|
if [ -z `which gradle` ] && [ ! -d $base_dir/$gradle ]; then
|
||||||
|
if [ ! -e $gradle-bin.zip ]; then
|
||||||
|
wget https://services.gradle.org/distributions/$gradle-bin.zip
|
||||||
|
fi
|
||||||
|
unzip $gradle-bin.zip
|
||||||
|
rm -rf $gradle-bin.zip
|
||||||
|
mv $gradle $base_dir/$gradle
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Ensure aws access keys are in the environment when we use a EC2 driver machine
|
||||||
|
LOCAL_HOSTNAME=$(hostname -d)
|
||||||
|
if [[ ${LOCAL_HOSTNAME} =~ .*\.compute\.internal ]]; then
|
||||||
|
grep "AWS ACCESS KEYS" ~/.bashrc > /dev/null
|
||||||
|
if [ $? != 0 ]; then
|
||||||
|
echo "# --- AWS ACCESS KEYS ---" >> ~/.bashrc
|
||||||
|
echo ". `realpath $base_dir/aws/aws-access-keys-commands`" >> ~/.bashrc
|
||||||
|
echo "# -----------------------" >> ~/.bashrc
|
||||||
|
source ~/.bashrc
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
|
|
@ -0,0 +1,212 @@
|
||||||
|
# Copyright 2014 Confluent Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from ducktape.services.service import Service
|
||||||
|
import time, re, json
|
||||||
|
|
||||||
|
|
||||||
|
class KafkaService(Service):
|
||||||
|
def __init__(self, service_context, zk, topics=None):
|
||||||
|
"""
|
||||||
|
:type service_context ducktape.services.service.ServiceContext
|
||||||
|
:type zk: ZookeeperService
|
||||||
|
:type topics: dict
|
||||||
|
"""
|
||||||
|
super(KafkaService, self).__init__(service_context)
|
||||||
|
self.zk = zk
|
||||||
|
self.topics = topics
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
super(KafkaService, self).start()
|
||||||
|
|
||||||
|
# Start all nodes in this Kafka service
|
||||||
|
for idx, node in enumerate(self.nodes, 1):
|
||||||
|
self.logger.info("Starting Kafka node %d on %s", idx, node.account.hostname)
|
||||||
|
self._stop_and_clean(node, allow_fail=True)
|
||||||
|
self.start_node(node)
|
||||||
|
|
||||||
|
# wait for start up
|
||||||
|
time.sleep(6)
|
||||||
|
|
||||||
|
# Create topics if necessary
|
||||||
|
if self.topics is not None:
|
||||||
|
for topic, topic_cfg in self.topics.items():
|
||||||
|
if topic_cfg is None:
|
||||||
|
topic_cfg = {}
|
||||||
|
|
||||||
|
topic_cfg["topic"] = topic
|
||||||
|
self.create_topic(topic_cfg)
|
||||||
|
|
||||||
|
def create_topic(self, topic_cfg):
|
||||||
|
node = self.nodes[0] # any node is fine here
|
||||||
|
self.logger.info("Creating topic %s with settings %s", topic_cfg["topic"], topic_cfg)
|
||||||
|
|
||||||
|
cmd = "/opt/kafka/bin/kafka-topics.sh --zookeeper %(zk_connect)s --create "\
|
||||||
|
"--topic %(topic)s --partitions %(partitions)d --replication-factor %(replication)d" % {
|
||||||
|
'zk_connect': self.zk.connect_setting(),
|
||||||
|
'topic': topic_cfg.get("topic"),
|
||||||
|
'partitions': topic_cfg.get('partitions', 1),
|
||||||
|
'replication': topic_cfg.get('replication-factor', 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
if "configs" in topic_cfg.keys() and topic_cfg["configs"] is not None:
|
||||||
|
for config_name, config_value in topic_cfg["configs"].items():
|
||||||
|
cmd += " --config %s=%s" % (config_name, str(config_value))
|
||||||
|
|
||||||
|
self.logger.info("Running topic creation command...\n%s" % cmd)
|
||||||
|
node.account.ssh(cmd)
|
||||||
|
|
||||||
|
time.sleep(1)
|
||||||
|
self.logger.info("Checking to see if topic was properly created...\n%s" % cmd)
|
||||||
|
for line in self.describe_topic(topic_cfg["topic"]).split("\n"):
|
||||||
|
self.logger.info(line)
|
||||||
|
|
||||||
|
def describe_topic(self, topic):
|
||||||
|
node = self.nodes[0]
|
||||||
|
cmd = "/opt/kafka/bin/kafka-topics.sh --zookeeper %s --topic %s --describe" % \
|
||||||
|
(self.zk.connect_setting(), topic)
|
||||||
|
output = ""
|
||||||
|
for line in node.account.ssh_capture(cmd):
|
||||||
|
output += line
|
||||||
|
return output
|
||||||
|
|
||||||
|
def verify_reassign_partitions(self, reassignment):
|
||||||
|
"""Run the reassign partitions admin tool in "verify" mode
|
||||||
|
"""
|
||||||
|
node = self.nodes[0]
|
||||||
|
json_file = "/tmp/" + str(time.time()) + "_reassign.json"
|
||||||
|
|
||||||
|
# reassignment to json
|
||||||
|
json_str = json.dumps(reassignment)
|
||||||
|
json_str = json.dumps(json_str)
|
||||||
|
|
||||||
|
# create command
|
||||||
|
cmd = "echo %s > %s && " % (json_str, json_file)
|
||||||
|
cmd += "/opt/kafka/bin/kafka-reassign-partitions.sh "\
|
||||||
|
"--zookeeper %(zk_connect)s "\
|
||||||
|
"--reassignment-json-file %(reassignment_file)s "\
|
||||||
|
"--verify" % {'zk_connect': self.zk.connect_setting(),
|
||||||
|
'reassignment_file': json_file}
|
||||||
|
cmd += " && sleep 1 && rm -f %s" % json_file
|
||||||
|
|
||||||
|
# send command
|
||||||
|
self.logger.info("Verifying parition reassignment...")
|
||||||
|
self.logger.debug(cmd)
|
||||||
|
output = ""
|
||||||
|
for line in node.account.ssh_capture(cmd):
|
||||||
|
output += line
|
||||||
|
|
||||||
|
self.logger.debug(output)
|
||||||
|
|
||||||
|
if re.match(".*is in progress.*", output) is not None:
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def execute_reassign_partitions(self, reassignment):
|
||||||
|
"""Run the reassign partitions admin tool in "verify" mode
|
||||||
|
"""
|
||||||
|
node = self.nodes[0]
|
||||||
|
json_file = "/tmp/" + str(time.time()) + "_reassign.json"
|
||||||
|
|
||||||
|
# reassignment to json
|
||||||
|
json_str = json.dumps(reassignment)
|
||||||
|
json_str = json.dumps(json_str)
|
||||||
|
|
||||||
|
# create command
|
||||||
|
cmd = "echo %s > %s && " % (json_str, json_file)
|
||||||
|
cmd += "/opt/kafka/bin/kafka-reassign-partitions.sh "\
|
||||||
|
"--zookeeper %(zk_connect)s "\
|
||||||
|
"--reassignment-json-file %(reassignment_file)s "\
|
||||||
|
"--execute" % {'zk_connect': self.zk.connect_setting(),
|
||||||
|
'reassignment_file': json_file}
|
||||||
|
cmd += " && sleep 1 && rm -f %s" % json_file
|
||||||
|
|
||||||
|
# send command
|
||||||
|
self.logger.info("Executing parition reassignment...")
|
||||||
|
self.logger.debug(cmd)
|
||||||
|
output = ""
|
||||||
|
for line in node.account.ssh_capture(cmd):
|
||||||
|
output += line
|
||||||
|
|
||||||
|
self.logger.debug("Verify partition reassignment:")
|
||||||
|
self.logger.debug(output)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""If the service left any running processes or data, clean them up."""
|
||||||
|
super(KafkaService, self).stop()
|
||||||
|
|
||||||
|
for idx, node in enumerate(self.nodes, 1):
|
||||||
|
self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname))
|
||||||
|
self._stop_and_clean(node, allow_fail=True)
|
||||||
|
node.free()
|
||||||
|
|
||||||
|
def _stop_and_clean(self, node, allow_fail=False):
|
||||||
|
node.account.ssh("/opt/kafka/bin/kafka-server-stop.sh", allow_fail=allow_fail)
|
||||||
|
time.sleep(5) # the stop script doesn't wait
|
||||||
|
node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log")
|
||||||
|
|
||||||
|
def stop_node(self, node, clean_shutdown=True, allow_fail=True):
|
||||||
|
node.account.kill_process("kafka", clean_shutdown, allow_fail)
|
||||||
|
|
||||||
|
def start_node(self, node, config=None):
|
||||||
|
if config is None:
|
||||||
|
template = open('templates/kafka.properties').read()
|
||||||
|
template_params = {
|
||||||
|
'broker_id': self.idx(node),
|
||||||
|
'hostname': node.account.hostname,
|
||||||
|
'zk_connect': self.zk.connect_setting()
|
||||||
|
}
|
||||||
|
|
||||||
|
config = template % template_params
|
||||||
|
|
||||||
|
node.account.create_file("/mnt/kafka.properties", config)
|
||||||
|
cmd = "/opt/kafka/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log &"
|
||||||
|
self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd))
|
||||||
|
node.account.ssh(cmd)
|
||||||
|
|
||||||
|
def restart_node(self, node, wait_sec=0, clean_shutdown=True):
|
||||||
|
self.stop_node(node, clean_shutdown, allow_fail=True)
|
||||||
|
time.sleep(wait_sec)
|
||||||
|
self.start_node(node)
|
||||||
|
|
||||||
|
def get_leader_node(self, topic, partition=0):
|
||||||
|
""" Get the leader replica for the given topic and partition.
|
||||||
|
"""
|
||||||
|
cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.ZooKeeperMainWrapper -server %s " \
|
||||||
|
% self.zk.connect_setting()
|
||||||
|
cmd += "get /brokers/topics/%s/partitions/%d/state" % (topic, partition)
|
||||||
|
self.logger.debug(cmd)
|
||||||
|
|
||||||
|
node = self.nodes[0]
|
||||||
|
self.logger.debug("Querying zookeeper to find leader replica for topic %s: \n%s" % (cmd, topic))
|
||||||
|
partition_state = None
|
||||||
|
for line in node.account.ssh_capture(cmd):
|
||||||
|
match = re.match("^({.+})$", line)
|
||||||
|
if match is not None:
|
||||||
|
partition_state = match.groups()[0]
|
||||||
|
break
|
||||||
|
|
||||||
|
if partition_state is None:
|
||||||
|
raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition))
|
||||||
|
|
||||||
|
partition_state = json.loads(partition_state)
|
||||||
|
self.logger.info(partition_state)
|
||||||
|
|
||||||
|
leader_idx = int(partition_state["leader"])
|
||||||
|
self.logger.info("Leader for topic %s and partition %d is now: %d" % (topic, partition, leader_idx))
|
||||||
|
return self.get_node(leader_idx)
|
||||||
|
|
||||||
|
def bootstrap_servers(self):
|
||||||
|
return ','.join([node.account.hostname + ":9092" for node in self.nodes])
|
|
@ -0,0 +1,189 @@
|
||||||
|
# Copyright 2014 Confluent Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from ducktape.services.service import Service
|
||||||
|
import threading
|
||||||
|
|
||||||
|
|
||||||
|
class PerformanceService(Service):
|
||||||
|
def __init__(self, service_context):
|
||||||
|
super(PerformanceService, self).__init__(service_context)
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
super(PerformanceService, self).start()
|
||||||
|
self.worker_threads = []
|
||||||
|
self.results = [None] * len(self.nodes)
|
||||||
|
self.stats = [[] for x in range(len(self.nodes))]
|
||||||
|
for idx,node in enumerate(self.nodes,1):
|
||||||
|
self.logger.info("Running %s node %d on %s", self.__class__.__name__, idx, node.account.hostname)
|
||||||
|
worker = threading.Thread(
|
||||||
|
name=self.__class__.__name__ + "-worker-" + str(idx),
|
||||||
|
target=self._worker,
|
||||||
|
args=(idx,node)
|
||||||
|
)
|
||||||
|
worker.daemon = True
|
||||||
|
worker.start()
|
||||||
|
self.worker_threads.append(worker)
|
||||||
|
|
||||||
|
def wait(self):
|
||||||
|
super(PerformanceService, self).wait()
|
||||||
|
for idx,worker in enumerate(self.worker_threads,1):
|
||||||
|
self.logger.debug("Waiting for %s worker %d to finish", self.__class__.__name__, idx)
|
||||||
|
worker.join()
|
||||||
|
self.worker_threads = None
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
super(PerformanceService, self).stop()
|
||||||
|
assert self.worker_threads is None, "%s.stop should only be called after wait" % self.__class__.__name__
|
||||||
|
for idx,node in enumerate(self.nodes,1):
|
||||||
|
self.logger.debug("Stopping %s node %d on %s", self.__class__.__name__, idx, node.account.hostname)
|
||||||
|
node.free()
|
||||||
|
|
||||||
|
|
||||||
|
class ProducerPerformanceService(PerformanceService):
|
||||||
|
def __init__(self, service_context, kafka, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False):
|
||||||
|
super(ProducerPerformanceService, self).__init__(service_context)
|
||||||
|
self.kafka = kafka
|
||||||
|
self.args = {
|
||||||
|
'topic': topic,
|
||||||
|
'num_records': num_records,
|
||||||
|
'record_size': record_size,
|
||||||
|
'throughput': throughput
|
||||||
|
}
|
||||||
|
self.settings = settings
|
||||||
|
self.intermediate_stats = intermediate_stats
|
||||||
|
|
||||||
|
def _worker(self, idx, node):
|
||||||
|
args = self.args.copy()
|
||||||
|
args.update({'bootstrap_servers': self.kafka.bootstrap_servers()})
|
||||||
|
cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance "\
|
||||||
|
"%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s" % args
|
||||||
|
|
||||||
|
for key,value in self.settings.items():
|
||||||
|
cmd += " %s=%s" % (str(key), str(value))
|
||||||
|
self.logger.debug("Producer performance %d command: %s", idx, cmd)
|
||||||
|
def parse_stats(line):
|
||||||
|
parts = line.split(',')
|
||||||
|
return {
|
||||||
|
'records': int(parts[0].split()[0]),
|
||||||
|
'records_per_sec': float(parts[1].split()[0]),
|
||||||
|
'mbps': float(parts[1].split('(')[1].split()[0]),
|
||||||
|
'latency_avg_ms': float(parts[2].split()[0]),
|
||||||
|
'latency_max_ms': float(parts[3].split()[0]),
|
||||||
|
'latency_50th_ms': float(parts[4].split()[0]),
|
||||||
|
'latency_95th_ms': float(parts[5].split()[0]),
|
||||||
|
'latency_99th_ms': float(parts[6].split()[0]),
|
||||||
|
'latency_999th_ms': float(parts[7].split()[0]),
|
||||||
|
}
|
||||||
|
last = None
|
||||||
|
for line in node.account.ssh_capture(cmd):
|
||||||
|
self.logger.debug("Producer performance %d: %s", idx, line.strip())
|
||||||
|
if self.intermediate_stats:
|
||||||
|
try:
|
||||||
|
self.stats[idx-1].append(parse_stats(line))
|
||||||
|
except:
|
||||||
|
# Sometimes there are extraneous log messages
|
||||||
|
pass
|
||||||
|
last = line
|
||||||
|
try:
|
||||||
|
self.results[idx-1] = parse_stats(last)
|
||||||
|
except:
|
||||||
|
self.logger.error("Bad last line: %s", last)
|
||||||
|
|
||||||
|
|
||||||
|
class ConsumerPerformanceService(PerformanceService):
|
||||||
|
def __init__(self, service_context, kafka, topic, num_records, throughput, threads=1, settings={}):
|
||||||
|
super(ConsumerPerformanceService, self).__init__(service_context)
|
||||||
|
self.kafka = kafka
|
||||||
|
self.args = {
|
||||||
|
'topic': topic,
|
||||||
|
'num_records': num_records,
|
||||||
|
'throughput': throughput,
|
||||||
|
'threads': threads,
|
||||||
|
}
|
||||||
|
self.settings = settings
|
||||||
|
|
||||||
|
def _worker(self, idx, node):
|
||||||
|
args = self.args.copy()
|
||||||
|
args.update({'zk_connect': self.kafka.zk.connect_setting()})
|
||||||
|
cmd = "/opt/kafka/bin/kafka-consumer-perf-test.sh "\
|
||||||
|
"--topic %(topic)s --messages %(num_records)d --zookeeper %(zk_connect)s" % args
|
||||||
|
for key,value in self.settings.items():
|
||||||
|
cmd += " %s=%s" % (str(key), str(value))
|
||||||
|
self.logger.debug("Consumer performance %d command: %s", idx, cmd)
|
||||||
|
last = None
|
||||||
|
for line in node.account.ssh_capture(cmd):
|
||||||
|
self.logger.debug("Consumer performance %d: %s", idx, line.strip())
|
||||||
|
last = line
|
||||||
|
# Parse and save the last line's information
|
||||||
|
parts = last.split(',')
|
||||||
|
self.results[idx-1] = {
|
||||||
|
'total_mb': float(parts[3]),
|
||||||
|
'mbps': float(parts[4]),
|
||||||
|
'records_per_sec': float(parts[6]),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class EndToEndLatencyService(PerformanceService):
|
||||||
|
def __init__(self, service_context, kafka, topic, num_records, consumer_fetch_max_wait=100, acks=1):
|
||||||
|
super(EndToEndLatencyService, self).__init__(service_context)
|
||||||
|
self.kafka = kafka
|
||||||
|
self.args = {
|
||||||
|
'topic': topic,
|
||||||
|
'num_records': num_records,
|
||||||
|
'consumer_fetch_max_wait': consumer_fetch_max_wait,
|
||||||
|
'acks': acks
|
||||||
|
}
|
||||||
|
|
||||||
|
def _worker(self, idx, node):
|
||||||
|
args = self.args.copy()
|
||||||
|
args.update({
|
||||||
|
'zk_connect': self.kafka.zk.connect_setting(),
|
||||||
|
'bootstrap_servers': self.kafka.bootstrap_servers(),
|
||||||
|
})
|
||||||
|
cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.TestEndToEndLatency "\
|
||||||
|
"%(bootstrap_servers)s %(zk_connect)s %(topic)s %(num_records)d "\
|
||||||
|
"%(consumer_fetch_max_wait)d %(acks)d" % args
|
||||||
|
self.logger.debug("End-to-end latency %d command: %s", idx, cmd)
|
||||||
|
results = {}
|
||||||
|
for line in node.account.ssh_capture(cmd):
|
||||||
|
self.logger.debug("End-to-end latency %d: %s", idx, line.strip())
|
||||||
|
if line.startswith("Avg latency:"):
|
||||||
|
results['latency_avg_ms'] = float(line.split()[2])
|
||||||
|
if line.startswith("Percentiles"):
|
||||||
|
results['latency_50th_ms'] = float(line.split()[3][:-1])
|
||||||
|
results['latency_99th_ms'] = float(line.split()[6][:-1])
|
||||||
|
results['latency_999th_ms'] = float(line.split()[9])
|
||||||
|
self.results[idx-1] = results
|
||||||
|
|
||||||
|
|
||||||
|
def parse_performance_output(summary):
|
||||||
|
parts = summary.split(',')
|
||||||
|
results = {
|
||||||
|
'records': int(parts[0].split()[0]),
|
||||||
|
'records_per_sec': float(parts[1].split()[0]),
|
||||||
|
'mbps': float(parts[1].split('(')[1].split()[0]),
|
||||||
|
'latency_avg_ms': float(parts[2].split()[0]),
|
||||||
|
'latency_max_ms': float(parts[3].split()[0]),
|
||||||
|
'latency_50th_ms': float(parts[4].split()[0]),
|
||||||
|
'latency_95th_ms': float(parts[5].split()[0]),
|
||||||
|
'latency_99th_ms': float(parts[6].split()[0]),
|
||||||
|
'latency_999th_ms': float(parts[7].split()[0]),
|
||||||
|
}
|
||||||
|
# To provide compatibility with ConsumerPerformanceService
|
||||||
|
results['total_mb'] = results['mbps'] * (results['records'] / results['records_per_sec'])
|
||||||
|
results['rate_mbps'] = results['mbps']
|
||||||
|
results['rate_mps'] = results['records_per_sec']
|
||||||
|
|
||||||
|
return results
|
|
@ -0,0 +1,75 @@
|
||||||
|
# Copyright 2014 Confluent Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from ducktape.services.service import Service
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
class ZookeeperService(Service):
|
||||||
|
def __init__(self, service_context):
|
||||||
|
"""
|
||||||
|
:type service_context ducktape.services.service.ServiceContext
|
||||||
|
"""
|
||||||
|
super(ZookeeperService, self).__init__(service_context)
|
||||||
|
self.logs = {"zk_log": "/mnt/zk.log"}
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
super(ZookeeperService, self).start()
|
||||||
|
config = """
|
||||||
|
dataDir=/mnt/zookeeper
|
||||||
|
clientPort=2181
|
||||||
|
maxClientCnxns=0
|
||||||
|
initLimit=5
|
||||||
|
syncLimit=2
|
||||||
|
quorumListenOnAllIPs=true
|
||||||
|
"""
|
||||||
|
for idx, node in enumerate(self.nodes, 1):
|
||||||
|
template_params = { 'idx': idx, 'host': node.account.hostname }
|
||||||
|
config += "server.%(idx)d=%(host)s:2888:3888\n" % template_params
|
||||||
|
|
||||||
|
for idx, node in enumerate(self.nodes, 1):
|
||||||
|
self.logger.info("Starting ZK node %d on %s", idx, node.account.hostname)
|
||||||
|
self._stop_and_clean(node, allow_fail=True)
|
||||||
|
node.account.ssh("mkdir -p /mnt/zookeeper")
|
||||||
|
node.account.ssh("echo %d > /mnt/zookeeper/myid" % idx)
|
||||||
|
node.account.create_file("/mnt/zookeeper.properties", config)
|
||||||
|
node.account.ssh(
|
||||||
|
"/opt/kafka/bin/zookeeper-server-start.sh /mnt/zookeeper.properties 1>> %(zk_log)s 2>> %(zk_log)s &"
|
||||||
|
% self.logs)
|
||||||
|
time.sleep(5) # give it some time to start
|
||||||
|
|
||||||
|
def stop_node(self, node, allow_fail=True):
|
||||||
|
idx = self.idx(node)
|
||||||
|
self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname))
|
||||||
|
node.account.ssh("ps ax | grep -i 'zookeeper' | grep -v grep | awk '{print $1}' | xargs kill -SIGTERM",
|
||||||
|
allow_fail=allow_fail)
|
||||||
|
|
||||||
|
def clean_node(self, node, allow_fail=True):
|
||||||
|
node.account.ssh("rm -rf /mnt/zookeeper /mnt/zookeeper.properties /mnt/zk.log", allow_fail=allow_fail)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""If the service left any running processes or data, clean them up."""
|
||||||
|
super(ZookeeperService, self).stop()
|
||||||
|
|
||||||
|
for idx, node in enumerate(self.nodes, 1):
|
||||||
|
self.stop_node(node, allow_fail=False)
|
||||||
|
self.clean_node(node)
|
||||||
|
node.free()
|
||||||
|
|
||||||
|
def _stop_and_clean(self, node, allow_fail=False):
|
||||||
|
self.stop_node(node, allow_fail)
|
||||||
|
self.clean_node(node, allow_fail)
|
||||||
|
|
||||||
|
def connect_setting(self):
|
||||||
|
return ','.join([node.account.hostname + ':2181' for node in self.nodes])
|
|
@ -0,0 +1,121 @@
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
# contributor license agreements. See the NOTICE file distributed with
|
||||||
|
# this work for additional information regarding copyright ownership.
|
||||||
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
# (the "License"); you may not use this file except in compliance with
|
||||||
|
# the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
# see kafka.server.KafkaConfig for additional details and defaults
|
||||||
|
|
||||||
|
############################# Server Basics #############################
|
||||||
|
|
||||||
|
# The id of the broker. This must be set to a unique integer for each broker.
|
||||||
|
broker.id=%(broker_id)d
|
||||||
|
|
||||||
|
############################# Socket Server Settings #############################
|
||||||
|
|
||||||
|
# The port the socket server listens on
|
||||||
|
port=9092
|
||||||
|
|
||||||
|
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
|
||||||
|
#host.name=localhost
|
||||||
|
|
||||||
|
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
|
||||||
|
# value for "host.name" if configured. Otherwise, it will use the value returned from
|
||||||
|
# java.net.InetAddress.getCanonicalHostName().
|
||||||
|
advertised.host.name=%(hostname)s
|
||||||
|
|
||||||
|
# The port to publish to ZooKeeper for clients to use. If this is not set,
|
||||||
|
# it will publish the same port that the broker binds to.
|
||||||
|
#advertised.port=<port accessible by clients>
|
||||||
|
|
||||||
|
# The number of threads handling network requests
|
||||||
|
num.network.threads=3
|
||||||
|
|
||||||
|
# The number of threads doing disk I/O
|
||||||
|
num.io.threads=8
|
||||||
|
|
||||||
|
# The send buffer (SO_SNDBUF) used by the socket server
|
||||||
|
socket.send.buffer.bytes=102400
|
||||||
|
|
||||||
|
# The receive buffer (SO_RCVBUF) used by the socket server
|
||||||
|
socket.receive.buffer.bytes=65536
|
||||||
|
|
||||||
|
# The maximum size of a request that the socket server will accept (protection against OOM)
|
||||||
|
socket.request.max.bytes=104857600
|
||||||
|
|
||||||
|
|
||||||
|
############################# Log Basics #############################
|
||||||
|
|
||||||
|
# A comma seperated list of directories under which to store log files
|
||||||
|
log.dirs=/mnt/kafka-logs
|
||||||
|
|
||||||
|
# The default number of log partitions per topic. More partitions allow greater
|
||||||
|
# parallelism for consumption, but this will also result in more files across
|
||||||
|
# the brokers.
|
||||||
|
num.partitions=1
|
||||||
|
|
||||||
|
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
|
||||||
|
# This value is recommended to be increased for installations with data dirs located in RAID array.
|
||||||
|
num.recovery.threads.per.data.dir=1
|
||||||
|
|
||||||
|
############################# Log Flush Policy #############################
|
||||||
|
|
||||||
|
# Messages are immediately written to the filesystem but by default we only fsync() to sync
|
||||||
|
# the OS cache lazily. The following configurations control the flush of data to disk.
|
||||||
|
# There are a few important trade-offs here:
|
||||||
|
# 1. Durability: Unflushed data may be lost if you are not using replication.
|
||||||
|
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
|
||||||
|
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
|
||||||
|
# The settings below allow one to configure the flush policy to flush data after a period of time or
|
||||||
|
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
|
||||||
|
|
||||||
|
# The number of messages to accept before forcing a flush of data to disk
|
||||||
|
#log.flush.interval.messages=10000
|
||||||
|
|
||||||
|
# The maximum amount of time a message can sit in a log before we force a flush
|
||||||
|
#log.flush.interval.ms=1000
|
||||||
|
|
||||||
|
############################# Log Retention Policy #############################
|
||||||
|
|
||||||
|
# The following configurations control the disposal of log segments. The policy can
|
||||||
|
# be set to delete segments after a period of time, or after a given size has accumulated.
|
||||||
|
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
|
||||||
|
# from the end of the log.
|
||||||
|
|
||||||
|
# The minimum age of a log file to be eligible for deletion
|
||||||
|
log.retention.hours=168
|
||||||
|
|
||||||
|
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
|
||||||
|
# segments don't drop below log.retention.bytes.
|
||||||
|
#log.retention.bytes=1073741824
|
||||||
|
|
||||||
|
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
|
||||||
|
log.segment.bytes=1073741824
|
||||||
|
|
||||||
|
# The interval at which log segments are checked to see if they can be deleted according
|
||||||
|
# to the retention policies
|
||||||
|
log.retention.check.interval.ms=300000
|
||||||
|
|
||||||
|
# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
|
||||||
|
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
|
||||||
|
log.cleaner.enable=false
|
||||||
|
|
||||||
|
############################# Zookeeper #############################
|
||||||
|
|
||||||
|
# Zookeeper connection string (see zookeeper docs for details).
|
||||||
|
# This is a comma separated host:port pairs, each corresponding to a zk
|
||||||
|
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
|
||||||
|
# You can also append an optional chroot string to the urls to specify the
|
||||||
|
# root directory for all kafka znodes.
|
||||||
|
zookeeper.connect=%(zk_connect)s
|
||||||
|
|
||||||
|
# Timeout in ms for connecting to zookeeper
|
||||||
|
zookeeper.connection.timeout.ms=2000
|
|
@ -0,0 +1,193 @@
|
||||||
|
# Copyright 2014 Confluent Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from ducktape.services.service import Service
|
||||||
|
|
||||||
|
from tests.test import KafkaTest
|
||||||
|
from services.performance import ProducerPerformanceService, ConsumerPerformanceService, \
|
||||||
|
EndToEndLatencyService
|
||||||
|
|
||||||
|
|
||||||
|
class KafkaBenchmark(KafkaTest):
|
||||||
|
'''A benchmark of Kafka producer/consumer performance. This replicates the test
|
||||||
|
run here:
|
||||||
|
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
|
||||||
|
'''
|
||||||
|
def __init__(self, test_context):
|
||||||
|
super(KafkaBenchmark, self).__init__(test_context, num_zk=1, num_brokers=3, topics={
|
||||||
|
'test-rep-one' : { 'partitions': 6, 'replication-factor': 1 },
|
||||||
|
'test-rep-three' : { 'partitions': 6, 'replication-factor': 3 }
|
||||||
|
})
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
msgs_default = 50000000
|
||||||
|
msgs_large = 100000000
|
||||||
|
msg_size_default = 100
|
||||||
|
batch_size = 8*1024
|
||||||
|
buffer_memory = 64*1024*1024
|
||||||
|
msg_sizes = [10, 100, 1000, 10000, 100000]
|
||||||
|
target_data_size = 1024*1024*1024
|
||||||
|
target_data_size_gb = target_data_size/float(1024*1024*1024)
|
||||||
|
# These settings will work in the default local Vagrant VMs, useful for testing
|
||||||
|
if False:
|
||||||
|
msgs_default = 1000000
|
||||||
|
msgs_large = 10000000
|
||||||
|
msg_size_default = 100
|
||||||
|
batch_size = 8*1024
|
||||||
|
buffer_memory = 64*1024*1024
|
||||||
|
msg_sizes = [10, 100, 1000, 10000, 100000]
|
||||||
|
target_data_size = 128*1024*1024
|
||||||
|
target_data_size_gb = target_data_size/float(1024*1024*1024)
|
||||||
|
|
||||||
|
# PRODUCER TESTS
|
||||||
|
|
||||||
|
self.logger.info("BENCHMARK: Single producer, no replication")
|
||||||
|
single_no_rep = ProducerPerformanceService(
|
||||||
|
self.service_context(1), self.kafka,
|
||||||
|
topic="test-rep-one", num_records=msgs_default, record_size=msg_size_default, throughput=-1,
|
||||||
|
settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory}
|
||||||
|
)
|
||||||
|
single_no_rep.run()
|
||||||
|
|
||||||
|
self.logger.info("BENCHMARK: Single producer, async 3x replication")
|
||||||
|
single_rep_async = ProducerPerformanceService(
|
||||||
|
self.service_context(1), self.kafka,
|
||||||
|
topic="test-rep-three", num_records=msgs_default, record_size=msg_size_default, throughput=-1,
|
||||||
|
settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory}
|
||||||
|
)
|
||||||
|
single_rep_async.run()
|
||||||
|
|
||||||
|
self.logger.info("BENCHMARK: Single producer, sync 3x replication")
|
||||||
|
single_rep_sync = ProducerPerformanceService(
|
||||||
|
self.service_context(1), self.kafka,
|
||||||
|
topic="test-rep-three", num_records=msgs_default, record_size=msg_size_default, throughput=-1,
|
||||||
|
settings={'acks':-1, 'batch.size':batch_size, 'buffer.memory':buffer_memory}
|
||||||
|
)
|
||||||
|
single_rep_sync.run()
|
||||||
|
|
||||||
|
self.logger.info("BENCHMARK: Three producers, async 3x replication")
|
||||||
|
three_rep_async = ProducerPerformanceService(
|
||||||
|
self.service_context(3), self.kafka,
|
||||||
|
topic="test-rep-three", num_records=msgs_default, record_size=msg_size_default, throughput=-1,
|
||||||
|
settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory}
|
||||||
|
)
|
||||||
|
three_rep_async.run()
|
||||||
|
|
||||||
|
|
||||||
|
msg_size_perf = {}
|
||||||
|
for msg_size in msg_sizes:
|
||||||
|
self.logger.info("BENCHMARK: Message size %d (%f GB total, single producer, async 3x replication)", msg_size, target_data_size_gb)
|
||||||
|
# Always generate the same total amount of data
|
||||||
|
nrecords = int(target_data_size / msg_size)
|
||||||
|
perf = ProducerPerformanceService(
|
||||||
|
self.service_context(1), self.kafka,
|
||||||
|
topic="test-rep-three", num_records=nrecords, record_size=msg_size, throughput=-1,
|
||||||
|
settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory}
|
||||||
|
)
|
||||||
|
perf.run()
|
||||||
|
msg_size_perf[msg_size] = perf
|
||||||
|
|
||||||
|
# CONSUMER TESTS
|
||||||
|
|
||||||
|
# All consumer tests use the messages from the first benchmark, so
|
||||||
|
# they'll get messages of the default message size
|
||||||
|
self.logger.info("BENCHMARK: Single consumer")
|
||||||
|
single_consumer = ConsumerPerformanceService(
|
||||||
|
self.service_context(1), self.kafka,
|
||||||
|
topic="test-rep-three", num_records=msgs_default, throughput=-1, threads=1
|
||||||
|
)
|
||||||
|
single_consumer.run()
|
||||||
|
|
||||||
|
self.logger.info("BENCHMARK: Three consumers")
|
||||||
|
three_consumers = ConsumerPerformanceService(
|
||||||
|
self.service_context(3), self.kafka,
|
||||||
|
topic="test-rep-three", num_records=msgs_default, throughput=-1, threads=1
|
||||||
|
)
|
||||||
|
three_consumers.run()
|
||||||
|
|
||||||
|
# PRODUCER + CONSUMER TEST
|
||||||
|
self.logger.info("BENCHMARK: Producer + Consumer")
|
||||||
|
pc_producer = ProducerPerformanceService(
|
||||||
|
self.service_context(1), self.kafka,
|
||||||
|
topic="test-rep-three", num_records=msgs_default, record_size=msg_size_default, throughput=-1,
|
||||||
|
settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory}
|
||||||
|
)
|
||||||
|
pc_consumer = ConsumerPerformanceService(
|
||||||
|
self.service_context(1), self.kafka,
|
||||||
|
topic="test-rep-three", num_records=msgs_default, throughput=-1, threads=1
|
||||||
|
)
|
||||||
|
Service.run_parallel(pc_producer, pc_consumer)
|
||||||
|
|
||||||
|
# END TO END LATENCY TEST
|
||||||
|
self.logger.info("BENCHMARK: End to end latency")
|
||||||
|
e2e_latency = EndToEndLatencyService(
|
||||||
|
self.service_context(1), self.kafka,
|
||||||
|
topic="test-rep-three", num_records=10000
|
||||||
|
)
|
||||||
|
e2e_latency.run()
|
||||||
|
|
||||||
|
|
||||||
|
# LONG TERM THROUGHPUT TEST
|
||||||
|
|
||||||
|
# Because of how much space this ends up using, we clear out the
|
||||||
|
# existing cluster to start from a clean slate. This also keeps us from
|
||||||
|
# running out of space with limited disk space.
|
||||||
|
self.tearDown()
|
||||||
|
self.setUp()
|
||||||
|
self.logger.info("BENCHMARK: Long production")
|
||||||
|
throughput_perf = ProducerPerformanceService(
|
||||||
|
self.service_context(1), self.kafka,
|
||||||
|
topic="test-rep-three", num_records=msgs_large, record_size=msg_size_default, throughput=-1,
|
||||||
|
settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory},
|
||||||
|
intermediate_stats=True
|
||||||
|
)
|
||||||
|
throughput_perf.run()
|
||||||
|
|
||||||
|
# Summarize, extracting just the key info. With multiple
|
||||||
|
# producers/consumers, we display the aggregate value
|
||||||
|
def throughput(perf):
|
||||||
|
aggregate_rate = sum([r['records_per_sec'] for r in perf.results])
|
||||||
|
aggregate_mbps = sum([r['mbps'] for r in perf.results])
|
||||||
|
return "%f rec/sec (%f MB/s)" % (aggregate_rate, aggregate_mbps)
|
||||||
|
self.logger.info("=================")
|
||||||
|
self.logger.info("BENCHMARK RESULTS")
|
||||||
|
self.logger.info("=================")
|
||||||
|
self.logger.info("Single producer, no replication: %s", throughput(single_no_rep))
|
||||||
|
self.logger.info("Single producer, async 3x replication: %s", throughput(single_rep_async))
|
||||||
|
self.logger.info("Single producer, sync 3x replication: %s", throughput(single_rep_sync))
|
||||||
|
self.logger.info("Three producers, async 3x replication: %s", throughput(three_rep_async))
|
||||||
|
self.logger.info("Message size:")
|
||||||
|
for msg_size in msg_sizes:
|
||||||
|
self.logger.info(" %d: %s", msg_size, throughput(msg_size_perf[msg_size]))
|
||||||
|
self.logger.info("Throughput over long run, data > memory:")
|
||||||
|
# FIXME we should be generating a graph too
|
||||||
|
# Try to break it into 5 blocks, but fall back to a smaller number if
|
||||||
|
# there aren't even 5 elements
|
||||||
|
block_size = max(len(throughput_perf.stats[0]) / 5, 1)
|
||||||
|
nblocks = len(throughput_perf.stats[0]) / block_size
|
||||||
|
for i in range(nblocks):
|
||||||
|
subset = throughput_perf.stats[0][i*block_size:min((i+1)*block_size,len(throughput_perf.stats[0]))]
|
||||||
|
if len(subset) == 0:
|
||||||
|
self.logger.info(" Time block %d: (empty)", i)
|
||||||
|
else:
|
||||||
|
self.logger.info(" Time block %d: %f rec/sec (%f MB/s)", i,
|
||||||
|
sum([stat['records_per_sec'] for stat in subset])/float(len(subset)),
|
||||||
|
sum([stat['mbps'] for stat in subset])/float(len(subset))
|
||||||
|
)
|
||||||
|
self.logger.info("Single consumer: %s", throughput(single_consumer))
|
||||||
|
self.logger.info("Three consumers: %s", throughput(three_consumers))
|
||||||
|
self.logger.info("Producer + consumer:")
|
||||||
|
self.logger.info(" Producer: %s", throughput(pc_producer))
|
||||||
|
self.logger.info(" Consumer: %s", throughput(pc_producer))
|
||||||
|
self.logger.info("End-to-end latency: median %f ms, 99%% %f ms, 99.9%% %f ms", e2e_latency.results[0]['latency_50th_ms'], e2e_latency.results[0]['latency_99th_ms'], e2e_latency.results[0]['latency_999th_ms'])
|
|
@ -0,0 +1,51 @@
|
||||||
|
# Copyright 2014 Confluent Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from ducktape.tests.test import Test
|
||||||
|
from ducktape.services.service import ServiceContext
|
||||||
|
|
||||||
|
from services.zookeeper_service import ZookeeperService
|
||||||
|
from services.kafka_service import KafkaService
|
||||||
|
|
||||||
|
|
||||||
|
class KafkaTest(Test):
|
||||||
|
"""
|
||||||
|
Helper class that managest setting up a Kafka cluster. Use this if the
|
||||||
|
default settings for Kafka are sufficient for your test; any customization
|
||||||
|
needs to be done manually. Your run() method should call tearDown and
|
||||||
|
setUp. The Zookeeper and Kafka services are available as the fields
|
||||||
|
KafkaTest.zk and KafkaTest.kafka.
|
||||||
|
|
||||||
|
|
||||||
|
"""
|
||||||
|
def __init__(self, test_context, num_zk, num_brokers, topics=None):
|
||||||
|
super(KafkaTest, self).__init__(test_context)
|
||||||
|
self.num_zk = num_zk
|
||||||
|
self.num_brokers = num_brokers
|
||||||
|
self.topics = topics
|
||||||
|
|
||||||
|
def min_cluster_size(self):
|
||||||
|
return self.num_zk + self.num_brokers
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.zk = ZookeeperService(ServiceContext(self.cluster, self.num_zk, self.logger))
|
||||||
|
self.kafka = KafkaService(
|
||||||
|
ServiceContext(self.cluster, self.num_brokers, self.logger),
|
||||||
|
self.zk, topics=self.topics)
|
||||||
|
self.zk.start()
|
||||||
|
self.kafka.start()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self.kafka.stop()
|
||||||
|
self.zk.stop()
|
|
@ -41,3 +41,12 @@ chmod a+rw /opt
|
||||||
if [ ! -e /opt/kafka ]; then
|
if [ ! -e /opt/kafka ]; then
|
||||||
ln -s /vagrant /opt/kafka
|
ln -s /vagrant /opt/kafka
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
# For EC2 nodes, we want to use /mnt, which should have the local disk. On local
|
||||||
|
# VMs, we can just create it if it doesn't exist and use it like we'd use
|
||||||
|
# /tmp. Eventually, we'd like to also support more directories, e.g. when EC2
|
||||||
|
# instances have multiple local disks.
|
||||||
|
if [ ! -e /mnt ]; then
|
||||||
|
mkdir /mnt
|
||||||
|
fi
|
||||||
|
chmod a+rwx /mnt
|
||||||
|
|
Loading…
Reference in New Issue