mirror of https://github.com/apache/kafka.git
commit
ede6450840
|
@ -53,11 +53,6 @@ if File.exists?(local_config_file) then
|
|||
eval(File.read(local_config_file), binding, "Vagrantfile.local")
|
||||
end
|
||||
|
||||
if mode == "test"
|
||||
num_zookeepers = 0
|
||||
num_brokers = 0
|
||||
end
|
||||
|
||||
# This is a horrible hack to work around bad interactions between
|
||||
# vagrant-hostmanager and vagrant-aws/vagrant's implementation. Hostmanager
|
||||
# wants to update the /etc/hosts entries, but tries to do so even on nodes that
|
||||
|
|
|
@ -136,10 +136,7 @@ public class VerifiableProducer {
|
|||
try {
|
||||
Namespace res;
|
||||
res = parser.parseArgs(args);
|
||||
System.out.println(res);
|
||||
System.out.println(res.getString("brokerList"));
|
||||
|
||||
|
||||
|
||||
int maxMessages = res.getInt("maxMessages");
|
||||
String topic = res.getString("topic");
|
||||
int throughput = res.getInt("throughput");
|
||||
|
|
|
@ -17,13 +17,14 @@
|
|||
|
||||
package kafka.tools
|
||||
|
||||
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer}
|
||||
import java.util.{Arrays, Properties}
|
||||
|
||||
import kafka.consumer._
|
||||
import java.util.Properties
|
||||
import java.util.Arrays
|
||||
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
|
||||
|
||||
import scala.Option.option2Iterable
|
||||
|
||||
object TestEndToEndLatency {
|
||||
object EndToEndLatency {
|
||||
def main(args: Array[String]) {
|
||||
if (args.length != 6) {
|
||||
System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect topic num_messages consumer_fetch_max_wait producer_acks")
|
151
tests/README.md
151
tests/README.md
|
@ -2,43 +2,146 @@ System Integration & Performance Testing
|
|||
========================================
|
||||
|
||||
This directory contains Kafka system integration and performance tests.
|
||||
[Ducktape](https://github.com/confluentinc/ducktape) is used to run the tests.
|
||||
[ducktape](https://github.com/confluentinc/ducktape) is used to run the tests.
|
||||
(ducktape is a distributed testing framework which provides test runner,
|
||||
result reporter and utilities to pull up and tear down services.)
|
||||
|
||||
Ducktape is a distributed testing framework which provides test runner,
|
||||
result reporter and utilities to pull up and tear down services. It automatically
|
||||
discovers tests from a directory and generate an HTML report for each run.
|
||||
Local Quickstart
|
||||
----------------
|
||||
This quickstart will help you run the Kafka system tests on your local machine.
|
||||
|
||||
To run the tests:
|
||||
* Install Virtual Box from [https://www.virtualbox.org/](https://www.virtualbox.org/) (run `$ vboxmanage --version` to check if it's installed).
|
||||
* Install Vagrant >= 1.6.4 from [http://www.vagrantup.com/](http://www.vagrantup.com/) (run `vagrant --version` to check if it's installed).
|
||||
* Install Vagrant Plugins:
|
||||
|
||||
1. Build a specific branch of Kafka
|
||||
# Required
|
||||
$ vagrant plugin install vagrant-hostmanager
|
||||
|
||||
* Build a specific branch of Kafka
|
||||
|
||||
$ cd kafka
|
||||
$ git checkout $BRANCH
|
||||
$ gradle
|
||||
$ ./gradlew jar
|
||||
|
||||
2. Setup a testing cluster. You can use Vagrant to create a cluster of local
|
||||
VMs or on EC2. Configure your Vagrant setup by creating the file
|
||||
`Vagrantfile.local` in the directory of your Kafka checkout. At a minimum
|
||||
, you *MUST* set `mode = "test"` and the value of `num_workers` high enough for
|
||||
the test you're trying to run. If you run on AWS, you also need to set
|
||||
enable_dns = true.
|
||||
|
||||
3. Bring up the cluster, making sure you have enough workers. For Vagrant,
|
||||
use `vagrant up`. If you want to run on AWS, use `vagrant up
|
||||
--provider=aws --no-parallel`.
|
||||
4. Install ducktape:
|
||||
* Setup a testing cluster with Vagrant. Configure your Vagrant setup by creating the file
|
||||
`Vagrantfile.local` in the directory of your Kafka checkout. For testing purposes,
|
||||
`num_brokers` and `num_kafka` should be 0, and `num_workers` should be set high enough
|
||||
to run all of you tests. An example resides in kafka/vagrant/system-test-Vagrantfile.local
|
||||
|
||||
# Example Vagrantfile.local for use on local machine
|
||||
# Vagrantfile.local should reside in the base Kafka directory
|
||||
num_zookeepers = 0
|
||||
num_kafka = 0
|
||||
num_workers = 9
|
||||
|
||||
* Bring up the cluster (note that the initial provisioning process can be slow since it involves
|
||||
installing dependencies and updates on every vm.):
|
||||
|
||||
$ vagrant up
|
||||
|
||||
* Install ducktape:
|
||||
|
||||
$ git clone https://github.com/confluentinc/ducktape
|
||||
$ cd ducktape
|
||||
$ pip install ducktape
|
||||
5. Run the system tests using ducktape, you can view results in the `results`
|
||||
directory.
|
||||
|
||||
|
||||
* Run the system tests using ducktape:
|
||||
|
||||
$ cd tests
|
||||
$ ducktape tests
|
||||
6. To iterate/run again if you made any changes:
|
||||
$ ducktape kafkatest/tests
|
||||
|
||||
* If you make changes to your Kafka checkout, you'll need to rebuild and resync to your Vagrant cluster:
|
||||
|
||||
$ cd kafka
|
||||
$ ./gradlew jar
|
||||
$ vagrant rsync # Re-syncs build output to cluster
|
||||
|
||||
EC2 Quickstart
|
||||
--------------
|
||||
This quickstart will help you run the Kafka system tests using Amazon EC2. As a convention, we'll use "kafkatest" in most names, but you can use whatever you want.
|
||||
|
||||
There are a lot of steps here, but the basic goals are to create one distinguished EC2 instance that
|
||||
will be our "test driver", and to set up the security groups and iam role so that the test driver
|
||||
can create, destroy, and run ssh commands on any number of "workers".
|
||||
|
||||
Preparation
|
||||
-----------
|
||||
In these steps, we will create an IAM role which has permission to create and destroy EC2 instances,
|
||||
set up a keypair used for ssh access to the test driver and worker machines, and create a security group to allow the test driver and workers to all communicate via TCP.
|
||||
|
||||
* [Create an IAM role](http://docs.aws.amazon.com/IAM/latest/UserGuide/Using_SettingUpUser.html#Using_CreateUser_console). We'll give this role the ability to launch or kill additional EC2 machines.
|
||||
- Create role "kafkatest-master"
|
||||
- Role type: Amazon EC2
|
||||
- Attach policy: AmazonEC2FullAccess (this will allow our test-driver to create and destroy EC2 instances)
|
||||
|
||||
* If you haven't already, [set up a keypair to use for SSH access](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html). For the purpose
|
||||
of this quickstart, let's say the keypair name is kafkatest, and you've saved the private key in kafktest.pem
|
||||
|
||||
* Next, create a security group called "kafkatest".
|
||||
- After creating the group, inbound rules: allow SSH on port 22 from anywhere; also, allow access on all ports (0-65535) from other machines in the kafkatest group.
|
||||
|
||||
Create the Test Driver
|
||||
----------------------
|
||||
* Launch a new test driver machine
|
||||
- OS: Ubuntu server is recommended
|
||||
- Instance type: t2.medium is easily enough since this machine is just a driver
|
||||
- Instance details: Most defaults are fine.
|
||||
- IAM role -> kafkatest-master
|
||||
- Tagging the instance with a useful name is recommended.
|
||||
- Security group -> 'kafkatest'
|
||||
|
||||
|
||||
* Once the machine is started, upload the SSH key to your test driver:
|
||||
|
||||
$ scp -i /path/to/kafkatest.pem \
|
||||
/path/to/kafkatest.pem ubuntu@public.hostname.amazonaws.com:kafkatest.pem
|
||||
|
||||
* Grab the public hostname/IP (available for example by navigating to your EC2 dashboard and viewing running instances) of your test driver and SSH into it:
|
||||
|
||||
$ ssh -i /path/to/kafkatest.pem ubuntu@public.hostname.amazonaws.com
|
||||
|
||||
Set Up the Test Driver
|
||||
----------------------
|
||||
The following steps assume you have ssh'd into
|
||||
the test driver machine.
|
||||
|
||||
* Start by making sure you're up to date, and install git and ducktape:
|
||||
|
||||
$ sudo apt-get update && sudo apt-get -y upgrade && sudo apt-get install -y git
|
||||
$ pip install ducktape
|
||||
|
||||
* Get Kafka:
|
||||
|
||||
$ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka
|
||||
|
||||
* Install some dependencies:
|
||||
|
||||
$ cd kafka
|
||||
$ kafka/vagrant/aws/aws-init.sh
|
||||
$ . ~/.bashrc
|
||||
|
||||
* An example Vagrantfile.local has been created by aws-init.sh which looks something like:
|
||||
|
||||
# Vagrantfile.local
|
||||
ec2_instance_type = "..." # Pick something appropriate for your
|
||||
# test. Note that the default m3.medium has
|
||||
# a small disk.
|
||||
num_zookeepers = 0
|
||||
num_kafka = 0
|
||||
num_workers = 9
|
||||
ec2_keypair_name = 'kafkatest'
|
||||
ec2_keypair_file = '/home/ubuntu/kafkatest.pem'
|
||||
ec2_security_groups = ['kafkatest']
|
||||
ec2_region = 'us-west-2'
|
||||
ec2_ami = "ami-29ebb519"
|
||||
|
||||
* Start up the instances (note we have found bringing up machines in parallel can cause errors on aws):
|
||||
|
||||
$ vagrant up --provider=aws --no-provision --no-parallel && vagrant provision
|
||||
|
||||
* Now you should be able to run tests:
|
||||
|
||||
$ cd kafka/tests
|
||||
$ ducktape kafkatest/tests
|
||||
|
||||
* To halt your workers without destroying persistent state, run `vagrant halt`. Run `vagrant destroy -f` to destroy all traces of your workers.
|
||||
|
||||
|
|
|
@ -1,3 +0,0 @@
|
|||
export AWS_ACCESS_KEY=`curl -s http://169.254.169.254/latest/meta-data/iam/security-credentials/ducttape-master | grep AccessKeyId | awk -F\" '{ print $4 }'`
|
||||
export AWS_SECRET_KEY=`curl -s http://169.254.169.254/latest/meta-data/iam/security-credentials/ducttape-master | grep SecretAccessKey | awk -F\" '{ print $4 }'`
|
||||
export AWS_SESSION_TOKEN=`curl -s http://169.254.169.254/latest/meta-data/iam/security-credentials/ducttape-master | grep Token | awk -F\" '{ print $4 }'`
|
|
@ -1,9 +0,0 @@
|
|||
ec3_instance_type = "m3.medium"
|
||||
enable_dns = true
|
||||
mode = "test"
|
||||
num_workers = 1
|
||||
ec2_keypair_name =
|
||||
ec2_keypair_file =
|
||||
ec2_security_groups = ['ducttape-insecure']
|
||||
ec2_region = 'us-west-2'
|
||||
ec2_ami = "ami-29ebb519"
|
|
@ -0,0 +1,146 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ducktape.services.background_thread import BackgroundThreadService
|
||||
|
||||
|
||||
def is_int(msg):
|
||||
"""Default method used to check whether text pulled from console consumer is a message.
|
||||
|
||||
return int or None
|
||||
"""
|
||||
try:
|
||||
return int(msg)
|
||||
except:
|
||||
return None
|
||||
|
||||
|
||||
"""
|
||||
0.8.2.1 ConsoleConsumer options
|
||||
|
||||
The console consumer is a tool that reads data from Kafka and outputs it to standard output.
|
||||
Option Description
|
||||
------ -----------
|
||||
--blacklist <blacklist> Blacklist of topics to exclude from
|
||||
consumption.
|
||||
--consumer.config <config file> Consumer config properties file.
|
||||
--csv-reporter-enabled If set, the CSV metrics reporter will
|
||||
be enabled
|
||||
--delete-consumer-offsets If specified, the consumer path in
|
||||
zookeeper is deleted when starting up
|
||||
--formatter <class> The name of a class to use for
|
||||
formatting kafka messages for
|
||||
display. (default: kafka.tools.
|
||||
DefaultMessageFormatter)
|
||||
--from-beginning If the consumer does not already have
|
||||
an established offset to consume
|
||||
from, start with the earliest
|
||||
message present in the log rather
|
||||
than the latest message.
|
||||
--max-messages <Integer: num_messages> The maximum number of messages to
|
||||
consume before exiting. If not set,
|
||||
consumption is continual.
|
||||
--metrics-dir <metrics dictory> If csv-reporter-enable is set, and
|
||||
this parameter isset, the csv
|
||||
metrics will be outputed here
|
||||
--property <prop>
|
||||
--skip-message-on-error If there is an error when processing a
|
||||
message, skip it instead of halt.
|
||||
--topic <topic> The topic id to consume on.
|
||||
--whitelist <whitelist> Whitelist of topics to include for
|
||||
consumption.
|
||||
--zookeeper <urls> REQUIRED: The connection string for
|
||||
the zookeeper connection in the form
|
||||
host:port. Multiple URLS can be
|
||||
given to allow fail-over.
|
||||
"""
|
||||
|
||||
|
||||
class ConsoleConsumer(BackgroundThreadService):
|
||||
logs = {
|
||||
"consumer_log": {
|
||||
"path": "/mnt/consumer.log",
|
||||
"collect_default": True}
|
||||
}
|
||||
|
||||
def __init__(self, context, num_nodes, kafka, topic, message_validator=is_int, from_beginning=True, consumer_timeout_ms=None):
|
||||
"""
|
||||
Args:
|
||||
context: standard context
|
||||
num_nodes: number of nodes to use (this should be 1)
|
||||
kafka: kafka service
|
||||
topic: consume from this topic
|
||||
message_validator: function which returns message or None
|
||||
from_beginning: consume from beginning if True, else from the end
|
||||
consumer_timeout_ms: corresponds to consumer.timeout.ms. consumer process ends if time between
|
||||
successively consumed messages exceeds this timeout. Setting this and
|
||||
waiting for the consumer to stop is a pretty good way to consume all messages
|
||||
in a topic.
|
||||
"""
|
||||
super(ConsoleConsumer, self).__init__(context, num_nodes)
|
||||
self.kafka = kafka
|
||||
self.args = {
|
||||
'topic': topic,
|
||||
}
|
||||
|
||||
self.consumer_timeout_ms = consumer_timeout_ms
|
||||
|
||||
self.from_beginning = from_beginning
|
||||
self.message_validator = message_validator
|
||||
self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)}
|
||||
|
||||
@property
|
||||
def start_cmd(self):
|
||||
args = self.args.copy()
|
||||
args.update({'zk_connect': self.kafka.zk.connect_setting()})
|
||||
cmd = "/opt/kafka/bin/kafka-console-consumer.sh --topic %(topic)s --zookeeper %(zk_connect)s" \
|
||||
" --consumer.config /mnt/console_consumer.properties" % args
|
||||
|
||||
if self.from_beginning:
|
||||
cmd += " --from-beginning"
|
||||
|
||||
cmd += " 2>> /mnt/consumer.log | tee -a /mnt/consumer.log &"
|
||||
return cmd
|
||||
|
||||
def _worker(self, idx, node):
|
||||
# form config file
|
||||
if self.consumer_timeout_ms is not None:
|
||||
prop_file = self.render('console_consumer.properties', consumer_timeout_ms=self.consumer_timeout_ms)
|
||||
else:
|
||||
prop_file = self.render('console_consumer.properties')
|
||||
|
||||
self.logger.info("console_consumer.properties:")
|
||||
self.logger.info(prop_file)
|
||||
node.account.create_file("/mnt/console_consumer.properties", prop_file)
|
||||
|
||||
# Run and capture output
|
||||
cmd = self.start_cmd
|
||||
self.logger.debug("Console consumer %d command: %s", idx, cmd)
|
||||
for line in node.account.ssh_capture(cmd):
|
||||
msg = line.strip()
|
||||
msg = self.message_validator(msg)
|
||||
if msg is not None:
|
||||
self.logger.debug("consumed a message: " + str(msg))
|
||||
self.messages_consumed[idx].append(msg)
|
||||
|
||||
def start_node(self, node):
|
||||
super(ConsoleConsumer, self).start_node(node)
|
||||
|
||||
def stop_node(self, node):
|
||||
node.account.kill_process("java", allow_fail=False)
|
||||
|
||||
def clean_node(self, node):
|
||||
node.account.ssh("rm -rf /mnt/console_consumer.properties /mnt/consumer.log", allow_fail=False)
|
||||
|
|
@ -1,10 +1,11 @@
|
|||
# Copyright 2014 Confluent Inc.
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
@ -13,32 +14,37 @@
|
|||
# limitations under the License.
|
||||
|
||||
from ducktape.services.service import Service
|
||||
import time, re, json
|
||||
|
||||
import json
|
||||
import re
|
||||
import signal
|
||||
import time
|
||||
|
||||
|
||||
class KafkaService(Service):
|
||||
def __init__(self, service_context, zk, topics=None):
|
||||
|
||||
logs = {
|
||||
"kafka_log": {
|
||||
"path": "/mnt/kafka.log",
|
||||
"collect_default": True},
|
||||
"kafka_data": {
|
||||
"path": "/mnt/kafka-logs",
|
||||
"collect_default": False}
|
||||
}
|
||||
|
||||
def __init__(self, context, num_nodes, zk, topics=None):
|
||||
"""
|
||||
:type service_context ducktape.services.service.ServiceContext
|
||||
:type context
|
||||
:type zk: ZookeeperService
|
||||
:type topics: dict
|
||||
"""
|
||||
super(KafkaService, self).__init__(service_context)
|
||||
super(KafkaService, self).__init__(context, num_nodes)
|
||||
self.zk = zk
|
||||
self.topics = topics
|
||||
|
||||
def start(self):
|
||||
super(KafkaService, self).start()
|
||||
|
||||
# Start all nodes in this Kafka service
|
||||
for idx, node in enumerate(self.nodes, 1):
|
||||
self.logger.info("Starting Kafka node %d on %s", idx, node.account.hostname)
|
||||
self._stop_and_clean(node, allow_fail=True)
|
||||
self.start_node(node)
|
||||
|
||||
# wait for start up
|
||||
time.sleep(6)
|
||||
|
||||
# Create topics if necessary
|
||||
if self.topics is not None:
|
||||
for topic, topic_cfg in self.topics.items():
|
||||
|
@ -48,6 +54,47 @@ class KafkaService(Service):
|
|||
topic_cfg["topic"] = topic
|
||||
self.create_topic(topic_cfg)
|
||||
|
||||
def start_node(self, node):
|
||||
props_file = self.render('kafka.properties', node=node, broker_id=self.idx(node))
|
||||
self.logger.info("kafka.properties:")
|
||||
self.logger.info(props_file)
|
||||
node.account.create_file("/mnt/kafka.properties", props_file)
|
||||
|
||||
cmd = "/opt/kafka/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log & echo $! > /mnt/kafka.pid"
|
||||
self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd))
|
||||
node.account.ssh(cmd)
|
||||
time.sleep(5)
|
||||
if len(self.pids(node)) == 0:
|
||||
raise Exception("No process ids recorded on node %s" % str(node))
|
||||
|
||||
def pids(self, node):
|
||||
"""Return process ids associated with running processes on the given node."""
|
||||
try:
|
||||
return [pid for pid in node.account.ssh_capture("cat /mnt/kafka.pid", callback=int)]
|
||||
except:
|
||||
return []
|
||||
|
||||
def signal_node(self, node, sig=signal.SIGTERM):
|
||||
pids = self.pids(node)
|
||||
for pid in pids:
|
||||
node.account.signal(pid, sig)
|
||||
|
||||
def signal_leader(self, topic, partition=0, sig=signal.SIGTERM):
|
||||
leader = self.leader(topic, partition)
|
||||
self.signal_node(leader, sig)
|
||||
|
||||
def stop_node(self, node, clean_shutdown=True):
|
||||
pids = self.pids(node)
|
||||
sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
|
||||
|
||||
for pid in pids:
|
||||
node.account.signal(pid, sig, allow_fail=False)
|
||||
|
||||
node.account.ssh("rm -f /mnt/kafka.pid", allow_fail=False)
|
||||
|
||||
def clean_node(self, node):
|
||||
node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log /mnt/kafka.pid", allow_fail=False)
|
||||
|
||||
def create_topic(self, topic_cfg):
|
||||
node = self.nodes[0] # any node is fine here
|
||||
self.logger.info("Creating topic %s with settings %s", topic_cfg["topic"], topic_cfg)
|
||||
|
@ -143,45 +190,13 @@ class KafkaService(Service):
|
|||
self.logger.debug("Verify partition reassignment:")
|
||||
self.logger.debug(output)
|
||||
|
||||
def stop(self):
|
||||
"""If the service left any running processes or data, clean them up."""
|
||||
super(KafkaService, self).stop()
|
||||
|
||||
for idx, node in enumerate(self.nodes, 1):
|
||||
self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname))
|
||||
self._stop_and_clean(node, allow_fail=True)
|
||||
node.free()
|
||||
|
||||
def _stop_and_clean(self, node, allow_fail=False):
|
||||
node.account.ssh("/opt/kafka/bin/kafka-server-stop.sh", allow_fail=allow_fail)
|
||||
time.sleep(5) # the stop script doesn't wait
|
||||
node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log")
|
||||
|
||||
def stop_node(self, node, clean_shutdown=True, allow_fail=True):
|
||||
node.account.kill_process("kafka", clean_shutdown, allow_fail)
|
||||
|
||||
def start_node(self, node, config=None):
|
||||
if config is None:
|
||||
template = open('templates/kafka.properties').read()
|
||||
template_params = {
|
||||
'broker_id': self.idx(node),
|
||||
'hostname': node.account.hostname,
|
||||
'zk_connect': self.zk.connect_setting()
|
||||
}
|
||||
|
||||
config = template % template_params
|
||||
|
||||
node.account.create_file("/mnt/kafka.properties", config)
|
||||
cmd = "/opt/kafka/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log &"
|
||||
self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd))
|
||||
node.account.ssh(cmd)
|
||||
|
||||
def restart_node(self, node, wait_sec=0, clean_shutdown=True):
|
||||
self.stop_node(node, clean_shutdown, allow_fail=True)
|
||||
"""Restart the given node, waiting wait_sec in between stopping and starting up again."""
|
||||
self.stop_node(node, clean_shutdown)
|
||||
time.sleep(wait_sec)
|
||||
self.start_node(node)
|
||||
|
||||
def get_leader_node(self, topic, partition=0):
|
||||
def leader(self, topic, partition=0):
|
||||
""" Get the leader replica for the given topic and partition.
|
||||
"""
|
||||
cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.ZooKeeperMainWrapper -server %s " \
|
|
@ -1,10 +1,11 @@
|
|||
# Copyright 2014 Confluent Inc.
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
@ -12,48 +13,19 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ducktape.services.service import Service
|
||||
import threading
|
||||
from ducktape.services.background_thread import BackgroundThreadService
|
||||
|
||||
|
||||
class PerformanceService(Service):
|
||||
def __init__(self, service_context):
|
||||
super(PerformanceService, self).__init__(service_context)
|
||||
|
||||
def start(self):
|
||||
super(PerformanceService, self).start()
|
||||
self.worker_threads = []
|
||||
self.results = [None] * len(self.nodes)
|
||||
self.stats = [[] for x in range(len(self.nodes))]
|
||||
for idx,node in enumerate(self.nodes,1):
|
||||
self.logger.info("Running %s node %d on %s", self.__class__.__name__, idx, node.account.hostname)
|
||||
worker = threading.Thread(
|
||||
name=self.__class__.__name__ + "-worker-" + str(idx),
|
||||
target=self._worker,
|
||||
args=(idx,node)
|
||||
)
|
||||
worker.daemon = True
|
||||
worker.start()
|
||||
self.worker_threads.append(worker)
|
||||
|
||||
def wait(self):
|
||||
super(PerformanceService, self).wait()
|
||||
for idx,worker in enumerate(self.worker_threads,1):
|
||||
self.logger.debug("Waiting for %s worker %d to finish", self.__class__.__name__, idx)
|
||||
worker.join()
|
||||
self.worker_threads = None
|
||||
|
||||
def stop(self):
|
||||
super(PerformanceService, self).stop()
|
||||
assert self.worker_threads is None, "%s.stop should only be called after wait" % self.__class__.__name__
|
||||
for idx,node in enumerate(self.nodes,1):
|
||||
self.logger.debug("Stopping %s node %d on %s", self.__class__.__name__, idx, node.account.hostname)
|
||||
node.free()
|
||||
class PerformanceService(BackgroundThreadService):
|
||||
def __init__(self, context, num_nodes):
|
||||
super(PerformanceService, self).__init__(context, num_nodes)
|
||||
self.results = [None] * self.num_nodes
|
||||
self.stats = [[] for x in range(self.num_nodes)]
|
||||
|
||||
|
||||
class ProducerPerformanceService(PerformanceService):
|
||||
def __init__(self, service_context, kafka, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False):
|
||||
super(ProducerPerformanceService, self).__init__(service_context)
|
||||
def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False):
|
||||
super(ProducerPerformanceService, self).__init__(context, num_nodes)
|
||||
self.kafka = kafka
|
||||
self.args = {
|
||||
'topic': topic,
|
||||
|
@ -73,6 +45,7 @@ class ProducerPerformanceService(PerformanceService):
|
|||
for key,value in self.settings.items():
|
||||
cmd += " %s=%s" % (str(key), str(value))
|
||||
self.logger.debug("Producer performance %d command: %s", idx, cmd)
|
||||
|
||||
def parse_stats(line):
|
||||
parts = line.split(',')
|
||||
return {
|
||||
|
@ -103,8 +76,8 @@ class ProducerPerformanceService(PerformanceService):
|
|||
|
||||
|
||||
class ConsumerPerformanceService(PerformanceService):
|
||||
def __init__(self, service_context, kafka, topic, num_records, throughput, threads=1, settings={}):
|
||||
super(ConsumerPerformanceService, self).__init__(service_context)
|
||||
def __init__(self, context, num_nodes, kafka, topic, num_records, throughput, threads=1, settings={}):
|
||||
super(ConsumerPerformanceService, self).__init__(context, num_nodes)
|
||||
self.kafka = kafka
|
||||
self.args = {
|
||||
'topic': topic,
|
||||
|
@ -128,16 +101,17 @@ class ConsumerPerformanceService(PerformanceService):
|
|||
last = line
|
||||
# Parse and save the last line's information
|
||||
parts = last.split(',')
|
||||
|
||||
self.results[idx-1] = {
|
||||
'total_mb': float(parts[3]),
|
||||
'mbps': float(parts[4]),
|
||||
'records_per_sec': float(parts[6]),
|
||||
'total_mb': float(parts[2]),
|
||||
'mbps': float(parts[3]),
|
||||
'records_per_sec': float(parts[5]),
|
||||
}
|
||||
|
||||
|
||||
class EndToEndLatencyService(PerformanceService):
|
||||
def __init__(self, service_context, kafka, topic, num_records, consumer_fetch_max_wait=100, acks=1):
|
||||
super(EndToEndLatencyService, self).__init__(service_context)
|
||||
def __init__(self, context, num_nodes, kafka, topic, num_records, consumer_fetch_max_wait=100, acks=1):
|
||||
super(EndToEndLatencyService, self).__init__(context, num_nodes)
|
||||
self.kafka = kafka
|
||||
self.args = {
|
||||
'topic': topic,
|
||||
|
@ -152,7 +126,7 @@ class EndToEndLatencyService(PerformanceService):
|
|||
'zk_connect': self.kafka.zk.connect_setting(),
|
||||
'bootstrap_servers': self.kafka.bootstrap_servers(),
|
||||
})
|
||||
cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.TestEndToEndLatency "\
|
||||
cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.EndToEndLatency "\
|
||||
"%(bootstrap_servers)s %(zk_connect)s %(topic)s %(num_records)d "\
|
||||
"%(consumer_fetch_max_wait)d %(acks)d" % args
|
||||
self.logger.debug("End-to-end latency %d command: %s", idx, cmd)
|
|
@ -0,0 +1,3 @@
|
|||
{% if consumer_timeout_ms is defined %}
|
||||
consumer.timeout.ms={{ consumer_timeout_ms }}
|
||||
{% endif %}
|
|
@ -17,7 +17,7 @@
|
|||
############################# Server Basics #############################
|
||||
|
||||
# The id of the broker. This must be set to a unique integer for each broker.
|
||||
broker.id=%(broker_id)d
|
||||
broker.id={{ broker_id }}
|
||||
|
||||
############################# Socket Server Settings #############################
|
||||
|
||||
|
@ -30,7 +30,7 @@ port=9092
|
|||
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
|
||||
# value for "host.name" if configured. Otherwise, it will use the value returned from
|
||||
# java.net.InetAddress.getCanonicalHostName().
|
||||
advertised.host.name=%(hostname)s
|
||||
advertised.host.name={{ node.account.hostname }}
|
||||
|
||||
# The port to publish to ZooKeeper for clients to use. If this is not set,
|
||||
# it will publish the same port that the broker binds to.
|
||||
|
@ -115,7 +115,7 @@ log.cleaner.enable=false
|
|||
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
|
||||
# You can also append an optional chroot string to the urls to specify the
|
||||
# root directory for all kafka znodes.
|
||||
zookeeper.connect=%(zk_connect)s
|
||||
zookeeper.connect={{ zk.connect_setting() }}
|
||||
|
||||
# Timeout in ms for connecting to zookeeper
|
||||
zookeeper.connection.timeout.ms=2000
|
|
@ -0,0 +1,9 @@
|
|||
dataDir=/mnt/zookeeper
|
||||
clientPort=2181
|
||||
maxClientCnxns=0
|
||||
initLimit=5
|
||||
syncLimit=2
|
||||
quorumListenOnAllIPs=true
|
||||
{% for node in nodes %}
|
||||
server.{{ loop.index }}={{ node.account.hostname }}:2888:3888
|
||||
{% endfor %}
|
|
@ -0,0 +1,109 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ducktape.services.background_thread import BackgroundThreadService
|
||||
|
||||
import json
|
||||
|
||||
|
||||
class VerifiableProducer(BackgroundThreadService):
|
||||
|
||||
logs = {
|
||||
"producer_log": {
|
||||
"path": "/mnt/producer.log",
|
||||
"collect_default": True}
|
||||
}
|
||||
|
||||
def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000):
|
||||
super(VerifiableProducer, self).__init__(context, num_nodes)
|
||||
|
||||
self.kafka = kafka
|
||||
self.topic = topic
|
||||
self.max_messages = max_messages
|
||||
self.throughput = throughput
|
||||
|
||||
self.acked_values = []
|
||||
self.not_acked_values = []
|
||||
|
||||
def _worker(self, idx, node):
|
||||
cmd = self.start_cmd
|
||||
self.logger.debug("VerifiableProducer %d command: %s" % (idx, cmd))
|
||||
|
||||
for line in node.account.ssh_capture(cmd):
|
||||
line = line.strip()
|
||||
|
||||
data = self.try_parse_json(line)
|
||||
if data is not None:
|
||||
|
||||
self.logger.debug("VerifiableProducer: " + str(data))
|
||||
|
||||
with self.lock:
|
||||
if data["name"] == "producer_send_error":
|
||||
data["node"] = idx
|
||||
self.not_acked_values.append(int(data["value"]))
|
||||
|
||||
elif data["name"] == "producer_send_success":
|
||||
self.acked_values.append(int(data["value"]))
|
||||
|
||||
@property
|
||||
def start_cmd(self):
|
||||
cmd = "/opt/kafka/bin/kafka-verifiable-producer.sh" \
|
||||
" --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers())
|
||||
if self.max_messages > 0:
|
||||
cmd += " --max-messages %s" % str(self.max_messages)
|
||||
if self.throughput > 0:
|
||||
cmd += " --throughput %s" % str(self.throughput)
|
||||
|
||||
cmd += " 2>> /mnt/producer.log | tee -a /mnt/producer.log &"
|
||||
return cmd
|
||||
|
||||
@property
|
||||
def acked(self):
|
||||
with self.lock:
|
||||
return self.acked_values
|
||||
|
||||
@property
|
||||
def not_acked(self):
|
||||
with self.lock:
|
||||
return self.not_acked_values
|
||||
|
||||
@property
|
||||
def num_acked(self):
|
||||
with self.lock:
|
||||
return len(self.acked_values)
|
||||
|
||||
@property
|
||||
def num_not_acked(self):
|
||||
with self.lock:
|
||||
return len(self.not_acked_values)
|
||||
|
||||
def stop_node(self, node):
|
||||
node.account.kill_process("VerifiableProducer", allow_fail=False)
|
||||
# block until the corresponding thread exits
|
||||
if len(self.worker_threads) >= self.idx(node):
|
||||
# Need to guard this because stop is preemptively called before the worker threads are added and started
|
||||
self.worker_threads[self.idx(node) - 1].join()
|
||||
|
||||
def clean_node(self, node):
|
||||
node.account.ssh("rm -rf /mnt/producer.log", allow_fail=False)
|
||||
|
||||
def try_parse_json(self, string):
|
||||
"""Try to parse a string as json. Return None if not parseable."""
|
||||
try:
|
||||
record = json.loads(string)
|
||||
return record
|
||||
except ValueError:
|
||||
self.logger.debug("Could not parse as json: %s" % str(string))
|
||||
return None
|
|
@ -0,0 +1,64 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
from ducktape.services.service import Service
|
||||
|
||||
import time
|
||||
|
||||
|
||||
class ZookeeperService(Service):
|
||||
|
||||
logs = {
|
||||
"zk_log": {
|
||||
"path": "/mnt/zk.log",
|
||||
"collect_default": True}
|
||||
}
|
||||
|
||||
def __init__(self, context, num_nodes):
|
||||
"""
|
||||
:type context
|
||||
"""
|
||||
super(ZookeeperService, self).__init__(context, num_nodes)
|
||||
|
||||
def start_node(self, node):
|
||||
idx = self.idx(node)
|
||||
self.logger.info("Starting ZK node %d on %s", idx, node.account.hostname)
|
||||
|
||||
node.account.ssh("mkdir -p /mnt/zookeeper")
|
||||
node.account.ssh("echo %d > /mnt/zookeeper/myid" % idx)
|
||||
|
||||
config_file = self.render('zookeeper.properties')
|
||||
self.logger.info("zookeeper.properties:")
|
||||
self.logger.info(config_file)
|
||||
node.account.create_file("/mnt/zookeeper.properties", config_file)
|
||||
|
||||
node.account.ssh(
|
||||
"/opt/kafka/bin/zookeeper-server-start.sh /mnt/zookeeper.properties 1>> %(path)s 2>> %(path)s &"
|
||||
% self.logs["zk_log"])
|
||||
|
||||
time.sleep(5) # give it some time to start
|
||||
|
||||
def stop_node(self, node):
|
||||
idx = self.idx(node)
|
||||
self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname))
|
||||
node.account.kill_process("zookeeper", allow_fail=False)
|
||||
|
||||
def clean_node(self, node):
|
||||
self.logger.info("Cleaning ZK node %d on %s", self.idx(node), node.account.hostname)
|
||||
node.account.ssh("rm -rf /mnt/zookeeper /mnt/zookeeper.properties /mnt/zk.log", allow_fail=False)
|
||||
|
||||
def connect_setting(self):
|
||||
return ','.join([node.account.hostname + ':2181' for node in self.nodes])
|
|
@ -0,0 +1,274 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ducktape.services.service import Service
|
||||
|
||||
from kafkatest.tests.kafka_test import KafkaTest
|
||||
from kafkatest.services.performance import ProducerPerformanceService, ConsumerPerformanceService, EndToEndLatencyService
|
||||
|
||||
|
||||
class Benchmark(KafkaTest):
|
||||
'''A benchmark of Kafka producer/consumer performance. This replicates the test
|
||||
run here:
|
||||
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
|
||||
'''
|
||||
def __init__(self, test_context):
|
||||
super(Benchmark, self).__init__(test_context, num_zk=1, num_brokers=3, topics={
|
||||
'test-rep-one' : { 'partitions': 6, 'replication-factor': 1 },
|
||||
'test-rep-three' : { 'partitions': 6, 'replication-factor': 3 }
|
||||
})
|
||||
|
||||
if True:
|
||||
# Works on both aws and local
|
||||
self.msgs = 1000000
|
||||
self.msgs_default = 1000000
|
||||
else:
|
||||
# Can use locally on Vagrant VMs, but may use too much memory for aws
|
||||
self.msgs = 50000000
|
||||
self.msgs_default = 50000000
|
||||
|
||||
self.msgs_large = 10000000
|
||||
self.msg_size_default = 100
|
||||
self.batch_size = 8*1024
|
||||
self.buffer_memory = 64*1024*1024
|
||||
self.msg_sizes = [10, 100, 1000, 10000, 100000]
|
||||
self.target_data_size = 128*1024*1024
|
||||
self.target_data_size_gb = self.target_data_size/float(1024*1024*1024)
|
||||
|
||||
def test_single_producer_no_replication(self):
|
||||
self.logger.info("BENCHMARK: Single producer, no replication")
|
||||
self.perf = ProducerPerformanceService(
|
||||
self.test_context, 1, self.kafka,
|
||||
topic="test-rep-one", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
|
||||
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
|
||||
)
|
||||
self.perf.run()
|
||||
data = compute_throughput(self.perf)
|
||||
self.logger.info("Single producer, no replication: %s", str(data))
|
||||
return data
|
||||
|
||||
def test_single_producer_replication(self):
|
||||
self.logger.info("BENCHMARK: Single producer, async 3x replication")
|
||||
self.perf = ProducerPerformanceService(
|
||||
self.test_context, 1, self.kafka,
|
||||
topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
|
||||
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
|
||||
)
|
||||
self.perf.run()
|
||||
data = compute_throughput(self.perf)
|
||||
self.logger.info("Single producer, async 3x replication: %s" % str(data))
|
||||
return data
|
||||
|
||||
def test_single_producer_sync(self):
|
||||
self.logger.info("BENCHMARK: Single producer, sync 3x replication")
|
||||
self.perf = ProducerPerformanceService(
|
||||
self.test_context, 1, self.kafka,
|
||||
topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
|
||||
settings={'acks':-1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
|
||||
)
|
||||
self.perf.run()
|
||||
|
||||
data = compute_throughput(self.perf)
|
||||
self.logger.info("Single producer, sync 3x replication: %s" % data)
|
||||
return data
|
||||
|
||||
def test_three_producers_async(self):
|
||||
self.logger.info("BENCHMARK: Three producers, async 3x replication")
|
||||
self.perf = ProducerPerformanceService(
|
||||
self.test_context, 3, self.kafka,
|
||||
topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
|
||||
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
|
||||
)
|
||||
self.perf.run()
|
||||
|
||||
data = compute_throughput(self.perf)
|
||||
self.logger.info("Three producers, async 3x replication: %s" % data)
|
||||
return data
|
||||
|
||||
def test_multiple_message_size(self):
|
||||
# TODO this would be a great place to use parametrization
|
||||
self.perfs = {}
|
||||
for msg_size in self.msg_sizes:
|
||||
self.logger.info("BENCHMARK: Message size %d (%f GB total, single producer, async 3x replication)", msg_size, self.target_data_size_gb)
|
||||
# Always generate the same total amount of data
|
||||
nrecords = int(self.target_data_size / msg_size)
|
||||
self.perfs["perf-" + str(msg_size)] = ProducerPerformanceService(
|
||||
self.test_context, 1, self.kafka,
|
||||
topic="test-rep-three", num_records=nrecords, record_size=msg_size, throughput=-1,
|
||||
settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
|
||||
)
|
||||
|
||||
self.msg_size_perf = {}
|
||||
for msg_size in self.msg_sizes:
|
||||
perf = self.perfs["perf-" + str(msg_size)]
|
||||
perf.run()
|
||||
self.msg_size_perf[msg_size] = perf
|
||||
|
||||
summary = ["Message size:"]
|
||||
data = {}
|
||||
for msg_size in self.msg_sizes:
|
||||
datum = compute_throughput(self.msg_size_perf[msg_size])
|
||||
summary.append(" %d: %s" % (msg_size, datum))
|
||||
data[msg_size] = datum
|
||||
self.logger.info("\n".join(summary))
|
||||
return data
|
||||
|
||||
def test_long_term_throughput(self):
|
||||
self.logger.info("BENCHMARK: Long production")
|
||||
self.perf = ProducerPerformanceService(
|
||||
self.test_context, 1, self.kafka,
|
||||
topic="test-rep-three", num_records=self.msgs_large, record_size=self.msg_size_default, throughput=-1,
|
||||
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory},
|
||||
intermediate_stats=True
|
||||
)
|
||||
self.perf.run()
|
||||
|
||||
summary = ["Throughput over long run, data > memory:"]
|
||||
data = {}
|
||||
# FIXME we should be generating a graph too
|
||||
# Try to break it into 5 blocks, but fall back to a smaller number if
|
||||
# there aren't even 5 elements
|
||||
block_size = max(len(self.perf.stats[0]) / 5, 1)
|
||||
nblocks = len(self.perf.stats[0]) / block_size
|
||||
for i in range(nblocks):
|
||||
subset = self.perf.stats[0][i*block_size:min((i+1)*block_size, len(self.perf.stats[0]))]
|
||||
if len(subset) == 0:
|
||||
summary.append(" Time block %d: (empty)" % i)
|
||||
data[i] = None
|
||||
else:
|
||||
records_per_sec = sum([stat['records_per_sec'] for stat in subset])/float(len(subset))
|
||||
mb_per_sec = sum([stat['mbps'] for stat in subset])/float(len(subset))
|
||||
|
||||
summary.append(" Time block %d: %f rec/sec (%f MB/s)" % (i, records_per_sec, mb_per_sec))
|
||||
data[i] = throughput(records_per_sec, mb_per_sec)
|
||||
|
||||
self.logger.info("\n".join(summary))
|
||||
return data
|
||||
|
||||
def test_end_to_end_latency(self):
|
||||
self.logger.info("BENCHMARK: End to end latency")
|
||||
self.perf = EndToEndLatencyService(
|
||||
self.test_context, 1, self.kafka,
|
||||
topic="test-rep-three", num_records=10000
|
||||
)
|
||||
self.perf.run()
|
||||
|
||||
data = latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms'])
|
||||
self.logger.info("End-to-end latency: %s" % str(data))
|
||||
return data
|
||||
|
||||
def test_producer_and_consumer(self):
|
||||
self.logger.info("BENCHMARK: Producer + Consumer")
|
||||
self.producer = ProducerPerformanceService(
|
||||
self.test_context, 1, self.kafka,
|
||||
topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
|
||||
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
|
||||
)
|
||||
|
||||
self.consumer = ConsumerPerformanceService(
|
||||
self.test_context, 1, self.kafka,
|
||||
topic="test-rep-three", num_records=self.msgs_default, throughput=-1, threads=1
|
||||
)
|
||||
|
||||
Service.run_parallel(self.producer, self.consumer)
|
||||
|
||||
data = {
|
||||
"producer": compute_throughput(self.producer),
|
||||
"consumer": compute_throughput(self.consumer)
|
||||
}
|
||||
summary = [
|
||||
"Producer + consumer:",
|
||||
str(data)]
|
||||
self.logger.info("\n".join(summary))
|
||||
return data
|
||||
|
||||
def test_single_consumer(self):
|
||||
topic = "test-rep-three"
|
||||
|
||||
self.producer = ProducerPerformanceService(
|
||||
self.test_context, 1, self.kafka,
|
||||
topic=topic, num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
|
||||
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
|
||||
)
|
||||
self.producer.run()
|
||||
|
||||
# All consumer tests use the messages from the first benchmark, so
|
||||
# they'll get messages of the default message size
|
||||
self.logger.info("BENCHMARK: Single consumer")
|
||||
self.perf = ConsumerPerformanceService(
|
||||
self.test_context, 1, self.kafka,
|
||||
topic=topic, num_records=self.msgs_default, throughput=-1, threads=1
|
||||
)
|
||||
self.perf.run()
|
||||
|
||||
data = compute_throughput(self.perf)
|
||||
self.logger.info("Single consumer: %s" % data)
|
||||
return data
|
||||
|
||||
def test_three_consumers(self):
|
||||
topic = "test-rep-three"
|
||||
|
||||
self.producer = ProducerPerformanceService(
|
||||
self.test_context, 1, self.kafka,
|
||||
topic=topic, num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
|
||||
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
|
||||
)
|
||||
self.producer.run()
|
||||
|
||||
self.logger.info("BENCHMARK: Three consumers")
|
||||
self.perf = ConsumerPerformanceService(
|
||||
self.test_context, 3, self.kafka,
|
||||
topic="test-rep-three", num_records=self.msgs_default, throughput=-1, threads=1
|
||||
)
|
||||
self.perf.run()
|
||||
|
||||
data = compute_throughput(self.perf)
|
||||
self.logger.info("Three consumers: %s", data)
|
||||
return data
|
||||
|
||||
|
||||
def throughput(records_per_sec, mb_per_sec):
|
||||
"""Helper method to ensure uniform representation of throughput data"""
|
||||
return {
|
||||
"records_per_sec": records_per_sec,
|
||||
"mb_per_sec": mb_per_sec
|
||||
}
|
||||
|
||||
|
||||
def latency(latency_50th_ms, latency_99th_ms, latency_999th_ms):
|
||||
"""Helper method to ensure uniform representation of latency data"""
|
||||
return {
|
||||
"latency_50th_ms": latency_50th_ms,
|
||||
"latency_99th_ms": latency_99th_ms,
|
||||
"latency_999th_ms": latency_999th_ms
|
||||
}
|
||||
|
||||
|
||||
def compute_throughput(perf):
|
||||
"""Helper method for computing throughput after running a performance service."""
|
||||
aggregate_rate = sum([r['records_per_sec'] for r in perf.results])
|
||||
aggregate_mbps = sum([r['mbps'] for r in perf.results])
|
||||
|
||||
return throughput(aggregate_rate, aggregate_mbps)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
@ -1,10 +1,11 @@
|
|||
# Copyright 2014 Confluent Inc.
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
@ -13,21 +14,19 @@
|
|||
# limitations under the License.
|
||||
|
||||
from ducktape.tests.test import Test
|
||||
from ducktape.services.service import ServiceContext
|
||||
|
||||
from services.zookeeper_service import ZookeeperService
|
||||
from services.kafka_service import KafkaService
|
||||
|
||||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
from kafkatest.services.kafka import KafkaService
|
||||
|
||||
|
||||
class KafkaTest(Test):
|
||||
"""
|
||||
Helper class that managest setting up a Kafka cluster. Use this if the
|
||||
Helper class that manages setting up a Kafka cluster. Use this if the
|
||||
default settings for Kafka are sufficient for your test; any customization
|
||||
needs to be done manually. Your run() method should call tearDown and
|
||||
setUp. The Zookeeper and Kafka services are available as the fields
|
||||
KafkaTest.zk and KafkaTest.kafka.
|
||||
|
||||
|
||||
"""
|
||||
def __init__(self, test_context, num_zk, num_brokers, topics=None):
|
||||
super(KafkaTest, self).__init__(test_context)
|
||||
|
@ -35,17 +34,12 @@ class KafkaTest(Test):
|
|||
self.num_brokers = num_brokers
|
||||
self.topics = topics
|
||||
|
||||
def min_cluster_size(self):
|
||||
return self.num_zk + self.num_brokers
|
||||
self.zk = ZookeeperService(test_context, self.num_zk)
|
||||
|
||||
self.kafka = KafkaService(
|
||||
test_context, self.num_brokers,
|
||||
self.zk, topics=self.topics)
|
||||
|
||||
def setUp(self):
|
||||
self.zk = ZookeeperService(ServiceContext(self.cluster, self.num_zk, self.logger))
|
||||
self.kafka = KafkaService(
|
||||
ServiceContext(self.cluster, self.num_brokers, self.logger),
|
||||
self.zk, topics=self.topics)
|
||||
self.zk.start()
|
||||
self.kafka.start()
|
||||
|
||||
def tearDown(self):
|
||||
self.kafka.stop()
|
||||
self.zk.stop()
|
||||
self.kafka.start()
|
|
@ -0,0 +1,160 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ducktape.tests.test import Test
|
||||
from ducktape.utils.util import wait_until
|
||||
|
||||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
from kafkatest.services.kafka import KafkaService
|
||||
from kafkatest.services.verifiable_producer import VerifiableProducer
|
||||
from kafkatest.services.console_consumer import ConsoleConsumer
|
||||
|
||||
import signal
|
||||
import time
|
||||
|
||||
|
||||
class ReplicationTest(Test):
|
||||
"""Replication tests.
|
||||
These tests verify that replication provides simple durability guarantees by checking that data acked by
|
||||
brokers is still available for consumption in the face of various failure scenarios."""
|
||||
|
||||
def __init__(self, test_context):
|
||||
""":type test_context: ducktape.tests.test.TestContext"""
|
||||
super(ReplicationTest, self).__init__(test_context=test_context)
|
||||
|
||||
self.topic = "test_topic"
|
||||
self.zk = ZookeeperService(test_context, num_nodes=1)
|
||||
self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
|
||||
"partitions": 3,
|
||||
"replication-factor": 3,
|
||||
"min.insync.replicas": 2}
|
||||
})
|
||||
self.producer_throughput = 10000
|
||||
self.num_producers = 1
|
||||
self.num_consumers = 1
|
||||
|
||||
def setUp(self):
|
||||
self.zk.start()
|
||||
self.kafka.start()
|
||||
|
||||
def min_cluster_size(self):
|
||||
"""Override this since we're adding services outside of the constructor"""
|
||||
return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers
|
||||
|
||||
def run_with_failure(self, failure):
|
||||
"""This is the top-level test template.
|
||||
|
||||
The steps are:
|
||||
Produce messages in the background while driving some failure condition
|
||||
When done driving failures, immediately stop producing
|
||||
Consume all messages
|
||||
Validate that messages acked by brokers were consumed
|
||||
|
||||
Note that consuming is a bit tricky, at least with console consumer. The goal is to consume all messages
|
||||
(foreach partition) in the topic. In this case, waiting for the last message may cause the consumer to stop
|
||||
too soon since console consumer is consuming multiple partitions from a single thread and therefore we lose
|
||||
ordering guarantees.
|
||||
|
||||
Waiting on a count of consumed messages can be unreliable: if we stop consuming when num_consumed == num_acked,
|
||||
we might exit early if some messages are duplicated (though not an issue here since producer retries==0)
|
||||
|
||||
Therefore rely here on the consumer.timeout.ms setting which times out on the interval between successively
|
||||
consumed messages. Since we run the producer to completion before running the consumer, this is a reliable
|
||||
indicator that nothing is left to consume.
|
||||
|
||||
"""
|
||||
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
|
||||
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=3000)
|
||||
|
||||
# Produce in a background thread while driving broker failures
|
||||
self.producer.start()
|
||||
if not wait_until(lambda: self.producer.num_acked > 5, timeout_sec=5):
|
||||
raise RuntimeError("Producer failed to start in a reasonable amount of time.")
|
||||
failure()
|
||||
self.producer.stop()
|
||||
|
||||
self.acked = self.producer.acked
|
||||
self.not_acked = self.producer.not_acked
|
||||
self.logger.info("num not acked: %d" % self.producer.num_not_acked)
|
||||
self.logger.info("num acked: %d" % self.producer.num_acked)
|
||||
|
||||
# Consume all messages
|
||||
self.consumer.start()
|
||||
self.consumer.wait()
|
||||
self.consumed = self.consumer.messages_consumed[1]
|
||||
self.logger.info("num consumed: %d" % len(self.consumed))
|
||||
|
||||
# Check produced vs consumed
|
||||
self.validate()
|
||||
|
||||
def clean_shutdown(self):
|
||||
"""Discover leader node for our topic and shut it down cleanly."""
|
||||
self.kafka.signal_leader(self.topic, partition=0, sig=signal.SIGTERM)
|
||||
|
||||
def hard_shutdown(self):
|
||||
"""Discover leader node for our topic and shut it down with a hard kill."""
|
||||
self.kafka.signal_leader(self.topic, partition=0, sig=signal.SIGKILL)
|
||||
|
||||
def clean_bounce(self):
|
||||
"""Chase the leader of one partition and restart it cleanly."""
|
||||
for i in range(5):
|
||||
prev_leader_node = self.kafka.leader(topic=self.topic, partition=0)
|
||||
self.kafka.restart_node(prev_leader_node, wait_sec=5, clean_shutdown=True)
|
||||
|
||||
def hard_bounce(self):
|
||||
"""Chase the leader and restart it cleanly."""
|
||||
for i in range(5):
|
||||
prev_leader_node = self.kafka.leader(topic=self.topic, partition=0)
|
||||
self.kafka.restart_node(prev_leader_node, wait_sec=5, clean_shutdown=False)
|
||||
|
||||
# Wait long enough for previous leader to probably be awake again
|
||||
time.sleep(6)
|
||||
|
||||
def validate(self):
|
||||
"""Check that produced messages were consumed."""
|
||||
|
||||
success = True
|
||||
msg = ""
|
||||
|
||||
if len(set(self.consumed)) != len(self.consumed):
|
||||
# There are duplicates. This is ok, so report it but don't fail the test
|
||||
msg += "There are duplicate messages in the log\n"
|
||||
|
||||
if not set(self.consumed).issuperset(set(self.acked)):
|
||||
# Every acked message must appear in the logs. I.e. consumed messages must be superset of acked messages.
|
||||
acked_minus_consumed = set(self.producer.acked) - set(self.consumed)
|
||||
success = False
|
||||
msg += "At least one acked message did not appear in the consumed messages. acked_minus_consumed: " + str(acked_minus_consumed)
|
||||
|
||||
if not success:
|
||||
# Collect all the data logs if there was a failure
|
||||
self.mark_for_collect(self.kafka)
|
||||
|
||||
assert success, msg
|
||||
|
||||
def test_clean_shutdown(self):
|
||||
self.run_with_failure(self.clean_shutdown)
|
||||
|
||||
def test_hard_shutdown(self):
|
||||
self.run_with_failure(self.hard_shutdown)
|
||||
|
||||
def test_clean_bounce(self):
|
||||
self.run_with_failure(self.clean_bounce)
|
||||
|
||||
def test_hard_bounce(self):
|
||||
self.run_with_failure(self.hard_bounce)
|
||||
|
||||
|
||||
|
|
@ -1,75 +0,0 @@
|
|||
# Copyright 2014 Confluent Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ducktape.services.service import Service
|
||||
import time
|
||||
|
||||
|
||||
class ZookeeperService(Service):
|
||||
def __init__(self, service_context):
|
||||
"""
|
||||
:type service_context ducktape.services.service.ServiceContext
|
||||
"""
|
||||
super(ZookeeperService, self).__init__(service_context)
|
||||
self.logs = {"zk_log": "/mnt/zk.log"}
|
||||
|
||||
def start(self):
|
||||
super(ZookeeperService, self).start()
|
||||
config = """
|
||||
dataDir=/mnt/zookeeper
|
||||
clientPort=2181
|
||||
maxClientCnxns=0
|
||||
initLimit=5
|
||||
syncLimit=2
|
||||
quorumListenOnAllIPs=true
|
||||
"""
|
||||
for idx, node in enumerate(self.nodes, 1):
|
||||
template_params = { 'idx': idx, 'host': node.account.hostname }
|
||||
config += "server.%(idx)d=%(host)s:2888:3888\n" % template_params
|
||||
|
||||
for idx, node in enumerate(self.nodes, 1):
|
||||
self.logger.info("Starting ZK node %d on %s", idx, node.account.hostname)
|
||||
self._stop_and_clean(node, allow_fail=True)
|
||||
node.account.ssh("mkdir -p /mnt/zookeeper")
|
||||
node.account.ssh("echo %d > /mnt/zookeeper/myid" % idx)
|
||||
node.account.create_file("/mnt/zookeeper.properties", config)
|
||||
node.account.ssh(
|
||||
"/opt/kafka/bin/zookeeper-server-start.sh /mnt/zookeeper.properties 1>> %(zk_log)s 2>> %(zk_log)s &"
|
||||
% self.logs)
|
||||
time.sleep(5) # give it some time to start
|
||||
|
||||
def stop_node(self, node, allow_fail=True):
|
||||
idx = self.idx(node)
|
||||
self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname))
|
||||
node.account.ssh("ps ax | grep -i 'zookeeper' | grep -v grep | awk '{print $1}' | xargs kill -SIGTERM",
|
||||
allow_fail=allow_fail)
|
||||
|
||||
def clean_node(self, node, allow_fail=True):
|
||||
node.account.ssh("rm -rf /mnt/zookeeper /mnt/zookeeper.properties /mnt/zk.log", allow_fail=allow_fail)
|
||||
|
||||
def stop(self):
|
||||
"""If the service left any running processes or data, clean them up."""
|
||||
super(ZookeeperService, self).stop()
|
||||
|
||||
for idx, node in enumerate(self.nodes, 1):
|
||||
self.stop_node(node, allow_fail=False)
|
||||
self.clean_node(node)
|
||||
node.free()
|
||||
|
||||
def _stop_and_clean(self, node, allow_fail=False):
|
||||
self.stop_node(node, allow_fail)
|
||||
self.clean_node(node, allow_fail)
|
||||
|
||||
def connect_setting(self):
|
||||
return ','.join([node.account.hostname + ':2181' for node in self.nodes])
|
|
@ -0,0 +1,11 @@
|
|||
from setuptools import find_packages, setup
|
||||
|
||||
setup(name="kafkatest",
|
||||
version="0.8.3-SNAPSHOT",
|
||||
description="Apache Kafka System Tests",
|
||||
author="Apache Kafka",
|
||||
platforms=["any"],
|
||||
license="apache2.0",
|
||||
packages=find_packages(),
|
||||
requires=["ducktape(>=0.2.0)"]
|
||||
)
|
|
@ -1,193 +0,0 @@
|
|||
# Copyright 2014 Confluent Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ducktape.services.service import Service
|
||||
|
||||
from tests.test import KafkaTest
|
||||
from services.performance import ProducerPerformanceService, ConsumerPerformanceService, \
|
||||
EndToEndLatencyService
|
||||
|
||||
|
||||
class KafkaBenchmark(KafkaTest):
|
||||
'''A benchmark of Kafka producer/consumer performance. This replicates the test
|
||||
run here:
|
||||
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
|
||||
'''
|
||||
def __init__(self, test_context):
|
||||
super(KafkaBenchmark, self).__init__(test_context, num_zk=1, num_brokers=3, topics={
|
||||
'test-rep-one' : { 'partitions': 6, 'replication-factor': 1 },
|
||||
'test-rep-three' : { 'partitions': 6, 'replication-factor': 3 }
|
||||
})
|
||||
|
||||
def run(self):
|
||||
msgs_default = 50000000
|
||||
msgs_large = 100000000
|
||||
msg_size_default = 100
|
||||
batch_size = 8*1024
|
||||
buffer_memory = 64*1024*1024
|
||||
msg_sizes = [10, 100, 1000, 10000, 100000]
|
||||
target_data_size = 1024*1024*1024
|
||||
target_data_size_gb = target_data_size/float(1024*1024*1024)
|
||||
# These settings will work in the default local Vagrant VMs, useful for testing
|
||||
if False:
|
||||
msgs_default = 1000000
|
||||
msgs_large = 10000000
|
||||
msg_size_default = 100
|
||||
batch_size = 8*1024
|
||||
buffer_memory = 64*1024*1024
|
||||
msg_sizes = [10, 100, 1000, 10000, 100000]
|
||||
target_data_size = 128*1024*1024
|
||||
target_data_size_gb = target_data_size/float(1024*1024*1024)
|
||||
|
||||
# PRODUCER TESTS
|
||||
|
||||
self.logger.info("BENCHMARK: Single producer, no replication")
|
||||
single_no_rep = ProducerPerformanceService(
|
||||
self.service_context(1), self.kafka,
|
||||
topic="test-rep-one", num_records=msgs_default, record_size=msg_size_default, throughput=-1,
|
||||
settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory}
|
||||
)
|
||||
single_no_rep.run()
|
||||
|
||||
self.logger.info("BENCHMARK: Single producer, async 3x replication")
|
||||
single_rep_async = ProducerPerformanceService(
|
||||
self.service_context(1), self.kafka,
|
||||
topic="test-rep-three", num_records=msgs_default, record_size=msg_size_default, throughput=-1,
|
||||
settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory}
|
||||
)
|
||||
single_rep_async.run()
|
||||
|
||||
self.logger.info("BENCHMARK: Single producer, sync 3x replication")
|
||||
single_rep_sync = ProducerPerformanceService(
|
||||
self.service_context(1), self.kafka,
|
||||
topic="test-rep-three", num_records=msgs_default, record_size=msg_size_default, throughput=-1,
|
||||
settings={'acks':-1, 'batch.size':batch_size, 'buffer.memory':buffer_memory}
|
||||
)
|
||||
single_rep_sync.run()
|
||||
|
||||
self.logger.info("BENCHMARK: Three producers, async 3x replication")
|
||||
three_rep_async = ProducerPerformanceService(
|
||||
self.service_context(3), self.kafka,
|
||||
topic="test-rep-three", num_records=msgs_default, record_size=msg_size_default, throughput=-1,
|
||||
settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory}
|
||||
)
|
||||
three_rep_async.run()
|
||||
|
||||
|
||||
msg_size_perf = {}
|
||||
for msg_size in msg_sizes:
|
||||
self.logger.info("BENCHMARK: Message size %d (%f GB total, single producer, async 3x replication)", msg_size, target_data_size_gb)
|
||||
# Always generate the same total amount of data
|
||||
nrecords = int(target_data_size / msg_size)
|
||||
perf = ProducerPerformanceService(
|
||||
self.service_context(1), self.kafka,
|
||||
topic="test-rep-three", num_records=nrecords, record_size=msg_size, throughput=-1,
|
||||
settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory}
|
||||
)
|
||||
perf.run()
|
||||
msg_size_perf[msg_size] = perf
|
||||
|
||||
# CONSUMER TESTS
|
||||
|
||||
# All consumer tests use the messages from the first benchmark, so
|
||||
# they'll get messages of the default message size
|
||||
self.logger.info("BENCHMARK: Single consumer")
|
||||
single_consumer = ConsumerPerformanceService(
|
||||
self.service_context(1), self.kafka,
|
||||
topic="test-rep-three", num_records=msgs_default, throughput=-1, threads=1
|
||||
)
|
||||
single_consumer.run()
|
||||
|
||||
self.logger.info("BENCHMARK: Three consumers")
|
||||
three_consumers = ConsumerPerformanceService(
|
||||
self.service_context(3), self.kafka,
|
||||
topic="test-rep-three", num_records=msgs_default, throughput=-1, threads=1
|
||||
)
|
||||
three_consumers.run()
|
||||
|
||||
# PRODUCER + CONSUMER TEST
|
||||
self.logger.info("BENCHMARK: Producer + Consumer")
|
||||
pc_producer = ProducerPerformanceService(
|
||||
self.service_context(1), self.kafka,
|
||||
topic="test-rep-three", num_records=msgs_default, record_size=msg_size_default, throughput=-1,
|
||||
settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory}
|
||||
)
|
||||
pc_consumer = ConsumerPerformanceService(
|
||||
self.service_context(1), self.kafka,
|
||||
topic="test-rep-three", num_records=msgs_default, throughput=-1, threads=1
|
||||
)
|
||||
Service.run_parallel(pc_producer, pc_consumer)
|
||||
|
||||
# END TO END LATENCY TEST
|
||||
self.logger.info("BENCHMARK: End to end latency")
|
||||
e2e_latency = EndToEndLatencyService(
|
||||
self.service_context(1), self.kafka,
|
||||
topic="test-rep-three", num_records=10000
|
||||
)
|
||||
e2e_latency.run()
|
||||
|
||||
|
||||
# LONG TERM THROUGHPUT TEST
|
||||
|
||||
# Because of how much space this ends up using, we clear out the
|
||||
# existing cluster to start from a clean slate. This also keeps us from
|
||||
# running out of space with limited disk space.
|
||||
self.tearDown()
|
||||
self.setUp()
|
||||
self.logger.info("BENCHMARK: Long production")
|
||||
throughput_perf = ProducerPerformanceService(
|
||||
self.service_context(1), self.kafka,
|
||||
topic="test-rep-three", num_records=msgs_large, record_size=msg_size_default, throughput=-1,
|
||||
settings={'acks':1, 'batch.size':batch_size, 'buffer.memory':buffer_memory},
|
||||
intermediate_stats=True
|
||||
)
|
||||
throughput_perf.run()
|
||||
|
||||
# Summarize, extracting just the key info. With multiple
|
||||
# producers/consumers, we display the aggregate value
|
||||
def throughput(perf):
|
||||
aggregate_rate = sum([r['records_per_sec'] for r in perf.results])
|
||||
aggregate_mbps = sum([r['mbps'] for r in perf.results])
|
||||
return "%f rec/sec (%f MB/s)" % (aggregate_rate, aggregate_mbps)
|
||||
self.logger.info("=================")
|
||||
self.logger.info("BENCHMARK RESULTS")
|
||||
self.logger.info("=================")
|
||||
self.logger.info("Single producer, no replication: %s", throughput(single_no_rep))
|
||||
self.logger.info("Single producer, async 3x replication: %s", throughput(single_rep_async))
|
||||
self.logger.info("Single producer, sync 3x replication: %s", throughput(single_rep_sync))
|
||||
self.logger.info("Three producers, async 3x replication: %s", throughput(three_rep_async))
|
||||
self.logger.info("Message size:")
|
||||
for msg_size in msg_sizes:
|
||||
self.logger.info(" %d: %s", msg_size, throughput(msg_size_perf[msg_size]))
|
||||
self.logger.info("Throughput over long run, data > memory:")
|
||||
# FIXME we should be generating a graph too
|
||||
# Try to break it into 5 blocks, but fall back to a smaller number if
|
||||
# there aren't even 5 elements
|
||||
block_size = max(len(throughput_perf.stats[0]) / 5, 1)
|
||||
nblocks = len(throughput_perf.stats[0]) / block_size
|
||||
for i in range(nblocks):
|
||||
subset = throughput_perf.stats[0][i*block_size:min((i+1)*block_size,len(throughput_perf.stats[0]))]
|
||||
if len(subset) == 0:
|
||||
self.logger.info(" Time block %d: (empty)", i)
|
||||
else:
|
||||
self.logger.info(" Time block %d: %f rec/sec (%f MB/s)", i,
|
||||
sum([stat['records_per_sec'] for stat in subset])/float(len(subset)),
|
||||
sum([stat['mbps'] for stat in subset])/float(len(subset))
|
||||
)
|
||||
self.logger.info("Single consumer: %s", throughput(single_consumer))
|
||||
self.logger.info("Three consumers: %s", throughput(three_consumers))
|
||||
self.logger.info("Producer + consumer:")
|
||||
self.logger.info(" Producer: %s", throughput(pc_producer))
|
||||
self.logger.info(" Consumer: %s", throughput(pc_producer))
|
||||
self.logger.info("End-to-end latency: median %f ms, 99%% %f ms, 99.9%% %f ms", e2e_latency.results[0]['latency_50th_ms'], e2e_latency.results[0]['latency_99th_ms'], e2e_latency.results[0]['latency_999th_ms'])
|
|
@ -0,0 +1,13 @@
|
|||
if [ -z "$AWS_IAM" ];then
|
||||
echo "Warning: AWS_IAM is not set"
|
||||
fi
|
||||
|
||||
export AWS_ACCESS_KEY=`curl -s http://169.254.169.254/latest/meta-data/iam/security-credentials/$AWS_IAM | grep AccessKeyId | awk -F\" '{ print $4 }'`
|
||||
export AWS_SECRET_KEY=`curl -s http://169.254.169.254/latest/meta-data/iam/security-credentials/$AWS_IAM | grep SecretAccessKey | awk -F\" '{ print $4 }'`
|
||||
export AWS_SESSION_TOKEN=`curl -s http://169.254.169.254/latest/meta-data/iam/security-credentials/$AWS_IAM | grep Token | awk -F\" '{ print $4 }'`
|
||||
|
||||
if [ -z "$AWS_ACCESS_KEY" ]; then
|
||||
echo "Failed to populate environment variables AWS_ACCESS_KEY, AWS_SECRET_KEY, and AWS_SESSION_TOKEN."
|
||||
echo "AWS_IAM is currently $AWS_IAM. Double-check that this is correct. If not set, add this command to your .bashrc file:"
|
||||
echo "export AWS_IAM=<my_aws_iam> # put this into your ~/.bashrc"
|
||||
fi
|
|
@ -0,0 +1,12 @@
|
|||
# Use this template Vagrantfile.local for running system tests on aws
|
||||
# To use it, move it to the base kafka directory and rename
|
||||
# it to Vagrantfile.local, and adjust variables as needed.
|
||||
ec3_instance_type = "m3.medium"
|
||||
num_zookeepers = 0
|
||||
num_brokers = 0
|
||||
num_workers = 9
|
||||
ec2_keypair_name = kafkatest
|
||||
ec2_keypair_file = ../kafkatest.pem
|
||||
ec2_security_groups = ['kafkatest']
|
||||
ec2_region = 'us-west-2'
|
||||
ec2_ami = "ami-29ebb519"
|
|
@ -1,13 +1,13 @@
|
|||
#!/bin/bash
|
||||
|
||||
# This script should be run once on your aws test driver machine before
|
||||
# attempting to run any ducktape tests
|
||||
# This script can be used to set up a driver machine on aws from which you will run tests
|
||||
# or bring up your mini Kafka cluster.
|
||||
|
||||
# Install dependencies
|
||||
sudo apt-get install -y maven openjdk-6-jdk build-essential \
|
||||
ruby-dev zlib1g-dev realpath python-setuptools
|
||||
|
||||
base_dir=`dirname $0`/..
|
||||
base_dir=`dirname $0`/../..
|
||||
|
||||
if [ -z `which vagrant` ]; then
|
||||
echo "Installing vagrant..."
|
|
@ -24,16 +24,16 @@ if [ -z `which javac` ]; then
|
|||
apt-get -y update
|
||||
|
||||
# Try to share cache. See Vagrantfile for details
|
||||
mkdir -p /var/cache/oracle-jdk7-installer
|
||||
if [ -e "/tmp/oracle-jdk7-installer-cache/" ]; then
|
||||
find /tmp/oracle-jdk7-installer-cache/ -not -empty -exec cp '{}' /var/cache/oracle-jdk7-installer/ \;
|
||||
mkdir -p /var/cache/oracle-jdk6-installer
|
||||
if [ -e "/tmp/oracle-jdk6-installer-cache/" ]; then
|
||||
find /tmp/oracle-jdk6-installer-cache/ -not -empty -exec cp '{}' /var/cache/oracle-jdk6-installer/ \;
|
||||
fi
|
||||
|
||||
/bin/echo debconf shared/accepted-oracle-license-v1-1 select true | /usr/bin/debconf-set-selections
|
||||
apt-get -y install oracle-java7-installer oracle-java7-set-default
|
||||
apt-get -y install oracle-java6-installer oracle-java6-set-default
|
||||
|
||||
if [ -e "/tmp/oracle-jdk7-installer-cache/" ]; then
|
||||
cp -R /var/cache/oracle-jdk7-installer/* /tmp/oracle-jdk7-installer-cache
|
||||
if [ -e "/tmp/oracle-jdk6-installer-cache/" ]; then
|
||||
cp -R /var/cache/oracle-jdk6-installer/* /tmp/oracle-jdk6-installer-cache
|
||||
fi
|
||||
fi
|
||||
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
# Use this example Vagrantfile.local for running system tests
|
||||
# To use it, move it to the base kafka directory and rename
|
||||
# it to Vagrantfile.local
|
||||
num_zookeepers = 0
|
||||
num_brokers = 0
|
||||
num_workers = 9
|
Loading…
Reference in New Issue