mirror of https://github.com/apache/kafka.git
KAFKA-2276; KIP-25 initial patch
Initial patch for KIP-25 Note that to install ducktape, do *not* use pip to install ducktape. Instead: ``` $ git clone gitgithub.com:confluentinc/ducktape.git $ cd ducktape $ python setup.py install ``` Author: Geoff Anderson <geoff@confluent.io> Author: Geoff <granders@gmail.com> Author: Liquan Pei <liquanpei@gmail.com> Reviewers: Ewen, Gwen, Jun, Guozhang Closes #70 from granders/KAFKA-2276 and squashes the following commits:a62fb6c
[Geoff Anderson] fixed checkstyle errorsa70f0f8
[Geoff Anderson] Merged in upstream trunk.8b62019
[Geoff Anderson] Merged in upstream trunk.47b7b64
[Geoff Anderson] Created separate tools jar so that the clients package does not pull in dependencies on the Jackson JSON tools or argparse4j.a9e6a14
[Geoff Anderson] Merged in upstream changesd18db7b
[Geoff Anderson] fixed :rat errors (needed to add licenses)321fdf8
[Geoff Anderson] Ignore tests/ and vagrant/ directories when running rat build task795fc75
[Geoff Anderson] Merged in changes from upstream trunk.1d93f06
[Geoff Anderson] Updated provisioning to use java 7 in light of KAFKA-23162ea4e29
[Geoff Anderson] Tweaked README, changed default log collection behavior on VerifiableProducer0eb6fdc
[Geoff Anderson] Merged in system-tests69dd7be
[Geoff Anderson] Merged in trunk4034dd6
[Geoff Anderson] Merged in upstream trunkede6450
[Geoff] Merge pull request #4 from confluentinc/move_muckrake7751545
[Geoff Anderson] Corrected license headerse6d532f
[Geoff Anderson] java 7 -> java 68c61e2d
[Geoff Anderson] Reverted jdk back to 6f14c507
[Geoff Anderson] Removed mode = "test" from Vagrantfile and Vagrantfile.local examples. Updated testing README to clarify aws setup.98b7253
[Geoff Anderson] Updated consumer tests to pre-populate kafka logse6a41f1
[Geoff Anderson] removed stray printlnb15b24f
[Geoff Anderson] leftover KafkaBenchmark in super call0f75187
[Geoff Anderson] Rmoved stray allow_fail. kafka_benchmark_test -> benchmark_testf469f84
[Geoff Anderson] Tweaked readme, added example Vagrantfile.local3d73857
[Geoff Anderson] Merged downstream changes42dcdb1
[Geoff Anderson] Tweaked behavior of stop_node, clean_node to generally fail fast7f7c3e0
[Geoff Anderson] Updated setup.py for kafkatestc60125c
[Geoff Anderson] TestEndToEndLatency -> EndToEndLatency4f476fe
[Geoff Anderson] Moved aws scripts to vagrant directory5af88fc
[Geoff Anderson] Updated README to include aws quickstarte5edf03
[Geoff Anderson] Updated example aws Vagrantfile.local96533c3
[Geoff] Update aws-access-keys-commands25a413d
[Geoff] Update aws-example-Vagrantfile.local884b20e
[Geoff Anderson] Moved a bunch of files to kafkatest directoryfc7c81c
[Geoff Anderson] added setup.py632be12
[Geoff] Merge pull request #3 from confluentinc/verbose-client51a94fd
[Geoff Anderson] Use argparse4j instead of joptsimple. ThroughputThrottler now has more intuitive behavior when targetThroughput is 0.a80a428
[Geoff Anderson] Added shell program for VerifiableProducer.d586fb0
[Geoff Anderson] Updated comments to reflect that throttler is not message-specific6842ed1
[Geoff Anderson] left out a file from last commit1228eef
[Geoff Anderson] Renamed throttler9100417
[Geoff Anderson] Updated command-line options for VerifiableProducer. Extracted throughput logic to make it reusable.0a5de8e
[Geoff Anderson] Fixed checkstyle errors. Changed name to VerifiableProducer. Added synchronization for thread safety on println statements.475423b
[Geoff Anderson] Convert class to string before adding to json object.bc009f2
[Geoff Anderson] Got rid of VerboseProducer in core (moved to clients)c0526fe
[Geoff Anderson] Updates per review comments.8b4b1f2
[Geoff Anderson] Minor updates to VerboseProducer2777712
[Geoff Anderson] Added some metadata to producer output.da94b8c
[Geoff Anderson] Added number of messages option.07cd1c6
[Geoff Anderson] Added simple producer which prints status of produced messages to stdout.a278988
[Geoff Anderson] fixed typosf1914c3
[Liquan Pei] Merge pull request #2 from confluentinc/system_tests81e4156
[Liquan Pei] Bootstrap Kafka system tests
This commit is contained in:
parent
f4101ab3fc
commit
e43c9aff92
|
@ -29,3 +29,8 @@ config/server-*
|
|||
config/zookeeper-*
|
||||
core/data/*
|
||||
gradle/wrapper/*
|
||||
|
||||
results
|
||||
tests/results
|
||||
.ducktape
|
||||
tests/.ducktape
|
||||
|
|
|
@ -31,6 +31,7 @@ ram_megabytes = 1280
|
|||
# EC2
|
||||
ec2_access_key = ENV['AWS_ACCESS_KEY']
|
||||
ec2_secret_key = ENV['AWS_SECRET_KEY']
|
||||
ec2_session_token = ENV['AWS_SESSION_TOKEN']
|
||||
ec2_keypair_name = nil
|
||||
ec2_keypair_file = nil
|
||||
|
||||
|
@ -50,6 +51,24 @@ if File.exists?(local_config_file) then
|
|||
eval(File.read(local_config_file), binding, "Vagrantfile.local")
|
||||
end
|
||||
|
||||
# This is a horrible hack to work around bad interactions between
|
||||
# vagrant-hostmanager and vagrant-aws/vagrant's implementation. Hostmanager
|
||||
# wants to update the /etc/hosts entries, but tries to do so even on nodes that
|
||||
# aren't up (e.g. even when all nodes are stopped and you run vagrant
|
||||
# destroy). Because of the way the underlying code in vagrant works, it still
|
||||
# tries to communicate with the node and has to wait for a very long
|
||||
# timeout. This modifies the update to check for hosts that are not created or
|
||||
# stopped, skipping the update in that case since it's impossible to update
|
||||
# nodes in that state.
|
||||
Object.const_get("VagrantPlugins").const_get("HostManager").const_get("HostsFile").class_eval do
|
||||
alias_method :old_update_guest, :update_guest
|
||||
def update_guest(machine)
|
||||
state_id = machine.state.id
|
||||
return if state_id == :not_created || state_id == :stopped
|
||||
old_update_guest(machine)
|
||||
end
|
||||
end
|
||||
|
||||
# TODO(ksweeney): RAM requirements are not empirical and can probably be significantly lowered.
|
||||
Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
|
||||
config.hostmanager.enabled = true
|
||||
|
@ -85,13 +104,31 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
|
|||
override.vm.box = "dummy"
|
||||
override.vm.box_url = "https://github.com/mitchellh/vagrant-aws/raw/master/dummy.box"
|
||||
|
||||
override.hostmanager.ignore_private_ip = true
|
||||
cached_addresses = {}
|
||||
# Use a custom resolver that SSH's into the machine and finds the IP address
|
||||
# directly. This lets us get at the private IP address directly, avoiding
|
||||
# some issues with using the default IP resolver, which uses the public IP
|
||||
# address.
|
||||
override.hostmanager.ip_resolver = proc do |vm, resolving_vm|
|
||||
if !cached_addresses.has_key?(vm.name)
|
||||
state_id = vm.state.id
|
||||
if state_id != :not_created && state_id != :stopped && vm.communicate.ready?
|
||||
vm.communicate.execute("/sbin/ifconfig eth0 | grep 'inet addr' | tail -n 1 | egrep -o '[0-9\.]+' | head -n 1 2>&1") do |type, contents|
|
||||
cached_addresses[vm.name] = contents.split("\n").first[/(\d+\.\d+\.\d+\.\d+)/, 1]
|
||||
end
|
||||
else
|
||||
cached_addresses[vm.name] = nil
|
||||
end
|
||||
end
|
||||
cached_addresses[vm.name]
|
||||
end
|
||||
|
||||
override.ssh.username = ec2_user
|
||||
override.ssh.private_key_path = ec2_keypair_file
|
||||
|
||||
aws.access_key_id = ec2_access_key
|
||||
aws.secret_access_key = ec2_secret_key
|
||||
aws.session_token = ec2_session_token
|
||||
aws.keypair_name = ec2_keypair_name
|
||||
|
||||
aws.region = ec2_region
|
||||
|
|
|
@ -65,6 +65,16 @@ do
|
|||
CLASSPATH=$CLASSPATH:$file
|
||||
done
|
||||
|
||||
for file in $base_dir/tools/build/libs/kafka-tools*.jar;
|
||||
do
|
||||
CLASSPATH=$CLASSPATH:$file
|
||||
done
|
||||
|
||||
for file in $base_dir/tools/build/dependant-libs-${SCALA_VERSION}*/*.jar;
|
||||
do
|
||||
CLASSPATH=$CLASSPATH:$file
|
||||
done
|
||||
|
||||
# classpath addition for release
|
||||
for file in $base_dir/libs/*.jar;
|
||||
do
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
#!/bin/bash
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
|
||||
export KAFKA_HEAP_OPTS="-Xmx512M"
|
||||
fi
|
||||
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.clients.tools.VerifiableProducer $@
|
60
build.gradle
60
build.gradle
|
@ -204,20 +204,20 @@ for ( sv in ['2_10_5', '2_11_7'] ) {
|
|||
}
|
||||
}
|
||||
|
||||
tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10_5', 'jar_core_2_11_7', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar', 'log4j-appender:jar']) {
|
||||
tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10_5', 'jar_core_2_11_7', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar', 'log4j-appender:jar', 'tools:jar']) {
|
||||
}
|
||||
|
||||
tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_10_5', 'srcJar_2_11_7', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar', 'log4j-appender:srcJar']) { }
|
||||
tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_10_5', 'srcJar_2_11_7', 'clients:srcJar', 'examples:srcJar', 'contrib:hadoop-consumer:srcJar', 'contrib:hadoop-producer:srcJar', 'log4j-appender:srcJar', 'tools:srcJar']) { }
|
||||
|
||||
tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_10_5', 'docsJar_2_11_7', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar', 'log4j-appender:docsJar']) { }
|
||||
tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_10_5', 'docsJar_2_11_7', 'clients:docsJar', 'examples:docsJar', 'contrib:hadoop-consumer:docsJar', 'contrib:hadoop-producer:docsJar', 'log4j-appender:docsJar', 'tools:docsJar']) { }
|
||||
|
||||
tasks.create(name: "testAll", dependsOn: ['test_core_2_10_5', 'test_core_2_11_7', 'clients:test', 'log4j-appender:test']) {
|
||||
tasks.create(name: "testAll", dependsOn: ['test_core_2_10_5', 'test_core_2_11_7', 'clients:test', 'log4j-appender:test', 'tools:test']) {
|
||||
}
|
||||
|
||||
tasks.create(name: "releaseTarGzAll", dependsOn: ['releaseTarGz_2_10_5', 'releaseTarGz_2_11_7']) {
|
||||
}
|
||||
|
||||
tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_7', 'clients:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives', 'log4j-appender:uploadArchives']) {
|
||||
tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_7', 'clients:uploadArchives', 'examples:uploadArchives', 'contrib:hadoop-consumer:uploadArchives', 'contrib:hadoop-producer:uploadArchives', 'log4j-appender:uploadArchives', 'tools:uploadArchives']) {
|
||||
}
|
||||
|
||||
project(':core') {
|
||||
|
@ -413,6 +413,56 @@ project(':clients') {
|
|||
test.dependsOn('checkstyleMain', 'checkstyleTest')
|
||||
}
|
||||
|
||||
project(':tools') {
|
||||
apply plugin: 'checkstyle'
|
||||
archivesBaseName = "kafka-tools"
|
||||
|
||||
dependencies {
|
||||
compile project(':clients')
|
||||
compile 'net.sourceforge.argparse4j:argparse4j:0.5.0'
|
||||
compile 'com.fasterxml.jackson.core:jackson-databind:2.5.4'
|
||||
compile "$slf4jlog4j"
|
||||
|
||||
testCompile 'junit:junit:4.6'
|
||||
testCompile project(path: ':clients', configuration: 'archives')
|
||||
}
|
||||
|
||||
task testJar(type: Jar) {
|
||||
classifier = 'test'
|
||||
from sourceSets.test.output
|
||||
}
|
||||
|
||||
test {
|
||||
testLogging {
|
||||
events "passed", "skipped", "failed"
|
||||
exceptionFormat = 'full'
|
||||
}
|
||||
}
|
||||
|
||||
javadoc {
|
||||
include "**/org/apache/kafka/tools/*"
|
||||
}
|
||||
|
||||
tasks.create(name: "copyDependantLibs", type: Copy) {
|
||||
from (configurations.testRuntime) {
|
||||
include('slf4j-log4j12*')
|
||||
}
|
||||
from (configurations.runtime) {
|
||||
exclude('kafka-clients*')
|
||||
}
|
||||
into "$buildDir/dependant-libs-${scalaVersion}"
|
||||
}
|
||||
|
||||
jar {
|
||||
dependsOn 'copyDependantLibs'
|
||||
}
|
||||
|
||||
checkstyle {
|
||||
configFile = new File(rootDir, "checkstyle/checkstyle.xml")
|
||||
}
|
||||
test.dependsOn('checkstyleMain', 'checkstyleTest')
|
||||
}
|
||||
|
||||
project(':log4j-appender') {
|
||||
apply plugin: 'checkstyle'
|
||||
archivesBaseName = "kafka-log4j-appender"
|
||||
|
|
|
@ -92,6 +92,8 @@
|
|||
<subpackage name="tools">
|
||||
<allow pkg="org.apache.kafka.clients.producer" />
|
||||
<allow pkg="org.apache.kafka.clients.consumer" />
|
||||
<allow pkg="com.fasterxml.jackson" />
|
||||
<allow pkg="net.sourceforge.argparse4j" />
|
||||
</subpackage>
|
||||
</subpackage>
|
||||
|
||||
|
|
|
@ -17,13 +17,14 @@
|
|||
|
||||
package kafka.tools
|
||||
|
||||
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer}
|
||||
import java.util.{Arrays, Properties}
|
||||
|
||||
import kafka.consumer._
|
||||
import java.util.Properties
|
||||
import java.util.Arrays
|
||||
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
|
||||
|
||||
import scala.Option.option2Iterable
|
||||
|
||||
object TestEndToEndLatency {
|
||||
object EndToEndLatency {
|
||||
def main(args: Array[String]) {
|
||||
if (args.length != 6) {
|
||||
System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect topic num_messages consumer_fetch_max_wait producer_acks")
|
|
@ -115,9 +115,9 @@ object ProducerPerformance extends Logging {
|
|||
.defaultsTo(0)
|
||||
val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
|
||||
val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
|
||||
"set, the csv metrics will be outputed here")
|
||||
"set, the csv metrics will be output here")
|
||||
.withRequiredArg
|
||||
.describedAs("metrics dictory")
|
||||
.describedAs("metrics directory")
|
||||
.ofType(classOf[java.lang.String])
|
||||
val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.")
|
||||
|
||||
|
|
|
@ -14,4 +14,5 @@
|
|||
// limitations under the License.
|
||||
|
||||
apply from: file('scala.gradle')
|
||||
include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'log4j-appender'
|
||||
include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'tools', 'log4j-appender'
|
||||
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
Vagrantfile.local
|
||||
|
||||
.idea/
|
||||
|
||||
*.pyc
|
||||
*.ipynb
|
||||
|
||||
.DS_Store
|
||||
|
||||
.ducktape
|
||||
results/
|
|
@ -0,0 +1,150 @@
|
|||
System Integration & Performance Testing
|
||||
========================================
|
||||
|
||||
This directory contains Kafka system integration and performance tests.
|
||||
[ducktape](https://github.com/confluentinc/ducktape) is used to run the tests.
|
||||
(ducktape is a distributed testing framework which provides test runner,
|
||||
result reporter and utilities to pull up and tear down services.)
|
||||
|
||||
Local Quickstart
|
||||
----------------
|
||||
This quickstart will help you run the Kafka system tests on your local machine.
|
||||
|
||||
* Install Virtual Box from [https://www.virtualbox.org/](https://www.virtualbox.org/) (run `$ vboxmanage --version` to check if it's installed).
|
||||
* Install Vagrant >= 1.6.4 from [http://www.vagrantup.com/](http://www.vagrantup.com/) (run `vagrant --version` to check if it's installed).
|
||||
* Install Vagrant Plugins:
|
||||
|
||||
# Required
|
||||
$ vagrant plugin install vagrant-hostmanager vagrant-cachier
|
||||
|
||||
* Build a specific branch of Kafka
|
||||
|
||||
$ cd kafka
|
||||
$ git checkout $BRANCH
|
||||
$ gradle
|
||||
$ ./gradlew jar
|
||||
|
||||
* Setup a testing cluster with Vagrant. Configure your Vagrant setup by creating the file
|
||||
`Vagrantfile.local` in the directory of your Kafka checkout. For testing purposes,
|
||||
`num_brokers` and `num_kafka` should be 0, and `num_workers` should be set high enough
|
||||
to run all of you tests. An example resides in kafka/vagrant/system-test-Vagrantfile.local
|
||||
|
||||
# Example Vagrantfile.local for use on local machine
|
||||
# Vagrantfile.local should reside in the base Kafka directory
|
||||
num_zookeepers = 0
|
||||
num_kafka = 0
|
||||
num_workers = 9
|
||||
|
||||
* Bring up the cluster (note that the initial provisioning process can be slow since it involves
|
||||
installing dependencies and updates on every vm.):
|
||||
|
||||
$ vagrant up
|
||||
|
||||
* Install ducktape:
|
||||
|
||||
$ pip install ducktape
|
||||
|
||||
* Run the system tests using ducktape:
|
||||
|
||||
$ cd tests
|
||||
$ ducktape kafkatest/tests
|
||||
|
||||
* If you make changes to your Kafka checkout, you'll need to rebuild and resync to your Vagrant cluster:
|
||||
|
||||
$ cd kafka
|
||||
$ ./gradlew jar
|
||||
$ vagrant rsync # Re-syncs build output to cluster
|
||||
|
||||
EC2 Quickstart
|
||||
--------------
|
||||
This quickstart will help you run the Kafka system tests on EC2. In this setup, all logic is run
|
||||
on EC2 and none on your local machine.
|
||||
|
||||
There are a lot of steps here, but the basic goals are to create one distinguished EC2 instance that
|
||||
will be our "test driver", and to set up the security groups and iam role so that the test driver
|
||||
can create, destroy, and run ssh commands on any number of "workers".
|
||||
|
||||
As a convention, we'll use "kafkatest" in most names, but you can use whatever name you want.
|
||||
|
||||
Preparation
|
||||
-----------
|
||||
In these steps, we will create an IAM role which has permission to create and destroy EC2 instances,
|
||||
set up a keypair used for ssh access to the test driver and worker machines, and create a security group to allow the test driver and workers to all communicate via TCP.
|
||||
|
||||
* [Create an IAM role](http://docs.aws.amazon.com/IAM/latest/UserGuide/Using_SettingUpUser.html#Using_CreateUser_console). We'll give this role the ability to launch or kill additional EC2 machines.
|
||||
- Create role "kafkatest-master"
|
||||
- Role type: Amazon EC2
|
||||
- Attach policy: AmazonEC2FullAccess (this will allow our test-driver to create and destroy EC2 instances)
|
||||
|
||||
* If you haven't already, [set up a keypair to use for SSH access](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html). For the purpose
|
||||
of this quickstart, let's say the keypair name is kafkatest, and you've saved the private key in kafktest.pem
|
||||
|
||||
* Next, create a security group called "kafkatest".
|
||||
- After creating the group, inbound rules: allow SSH on port 22 from anywhere; also, allow access on all ports (0-65535) from other machines in the kafkatest group.
|
||||
|
||||
Create the Test Driver
|
||||
----------------------
|
||||
* Launch a new test driver machine
|
||||
- OS: Ubuntu server is recommended
|
||||
- Instance type: t2.medium is easily enough since this machine is just a driver
|
||||
- Instance details: Most defaults are fine.
|
||||
- IAM role -> kafkatest-master
|
||||
- Tagging the instance with a useful name is recommended.
|
||||
- Security group -> 'kafkatest'
|
||||
|
||||
|
||||
* Once the machine is started, upload the SSH key to your test driver:
|
||||
|
||||
$ scp -i /path/to/kafkatest.pem \
|
||||
/path/to/kafkatest.pem ubuntu@public.hostname.amazonaws.com:kafkatest.pem
|
||||
|
||||
* Grab the public hostname/IP (available for example by navigating to your EC2 dashboard and viewing running instances) of your test driver and SSH into it:
|
||||
|
||||
$ ssh -i /path/to/kafkatest.pem ubuntu@public.hostname.amazonaws.com
|
||||
|
||||
Set Up the Test Driver
|
||||
----------------------
|
||||
The following steps assume you have ssh'd into
|
||||
the test driver machine.
|
||||
|
||||
* Start by making sure you're up to date, and install git and ducktape:
|
||||
|
||||
$ sudo apt-get update && sudo apt-get -y upgrade && sudo apt-get install -y git
|
||||
$ pip install ducktape
|
||||
|
||||
* Get Kafka:
|
||||
|
||||
$ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka
|
||||
|
||||
* Install some dependencies:
|
||||
|
||||
$ cd kafka
|
||||
$ kafka/vagrant/aws/aws-init.sh
|
||||
$ . ~/.bashrc
|
||||
|
||||
* An example Vagrantfile.local has been created by aws-init.sh which looks something like:
|
||||
|
||||
# Vagrantfile.local
|
||||
ec2_instance_type = "..." # Pick something appropriate for your
|
||||
# test. Note that the default m3.medium has
|
||||
# a small disk.
|
||||
num_zookeepers = 0
|
||||
num_kafka = 0
|
||||
num_workers = 9
|
||||
ec2_keypair_name = 'kafkatest'
|
||||
ec2_keypair_file = '/home/ubuntu/kafkatest.pem'
|
||||
ec2_security_groups = ['kafkatest']
|
||||
ec2_region = 'us-west-2'
|
||||
ec2_ami = "ami-29ebb519"
|
||||
|
||||
* Start up the instances (note we have found bringing up machines in parallel can cause errors on aws):
|
||||
|
||||
$ vagrant up --provider=aws --no-provision --no-parallel && vagrant provision
|
||||
|
||||
* Now you should be able to run tests:
|
||||
|
||||
$ cd kafka/tests
|
||||
$ ducktape kafkatest/tests
|
||||
|
||||
* To halt your workers without destroying persistent state, run `vagrant halt`. Run `vagrant destroy -f` to destroy all traces of your workers.
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# see kafka.server.KafkaConfig for additional details and defaults
|
||||
|
|
@ -0,0 +1,15 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# see kafka.server.KafkaConfig for additional details and defaults
|
|
@ -0,0 +1,146 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ducktape.services.background_thread import BackgroundThreadService
|
||||
|
||||
|
||||
def is_int(msg):
|
||||
"""Default method used to check whether text pulled from console consumer is a message.
|
||||
|
||||
return int or None
|
||||
"""
|
||||
try:
|
||||
return int(msg)
|
||||
except:
|
||||
return None
|
||||
|
||||
|
||||
"""
|
||||
0.8.2.1 ConsoleConsumer options
|
||||
|
||||
The console consumer is a tool that reads data from Kafka and outputs it to standard output.
|
||||
Option Description
|
||||
------ -----------
|
||||
--blacklist <blacklist> Blacklist of topics to exclude from
|
||||
consumption.
|
||||
--consumer.config <config file> Consumer config properties file.
|
||||
--csv-reporter-enabled If set, the CSV metrics reporter will
|
||||
be enabled
|
||||
--delete-consumer-offsets If specified, the consumer path in
|
||||
zookeeper is deleted when starting up
|
||||
--formatter <class> The name of a class to use for
|
||||
formatting kafka messages for
|
||||
display. (default: kafka.tools.
|
||||
DefaultMessageFormatter)
|
||||
--from-beginning If the consumer does not already have
|
||||
an established offset to consume
|
||||
from, start with the earliest
|
||||
message present in the log rather
|
||||
than the latest message.
|
||||
--max-messages <Integer: num_messages> The maximum number of messages to
|
||||
consume before exiting. If not set,
|
||||
consumption is continual.
|
||||
--metrics-dir <metrics dictory> If csv-reporter-enable is set, and
|
||||
this parameter isset, the csv
|
||||
metrics will be outputed here
|
||||
--property <prop>
|
||||
--skip-message-on-error If there is an error when processing a
|
||||
message, skip it instead of halt.
|
||||
--topic <topic> The topic id to consume on.
|
||||
--whitelist <whitelist> Whitelist of topics to include for
|
||||
consumption.
|
||||
--zookeeper <urls> REQUIRED: The connection string for
|
||||
the zookeeper connection in the form
|
||||
host:port. Multiple URLS can be
|
||||
given to allow fail-over.
|
||||
"""
|
||||
|
||||
|
||||
class ConsoleConsumer(BackgroundThreadService):
|
||||
logs = {
|
||||
"consumer_log": {
|
||||
"path": "/mnt/consumer.log",
|
||||
"collect_default": True}
|
||||
}
|
||||
|
||||
def __init__(self, context, num_nodes, kafka, topic, message_validator=is_int, from_beginning=True, consumer_timeout_ms=None):
|
||||
"""
|
||||
Args:
|
||||
context: standard context
|
||||
num_nodes: number of nodes to use (this should be 1)
|
||||
kafka: kafka service
|
||||
topic: consume from this topic
|
||||
message_validator: function which returns message or None
|
||||
from_beginning: consume from beginning if True, else from the end
|
||||
consumer_timeout_ms: corresponds to consumer.timeout.ms. consumer process ends if time between
|
||||
successively consumed messages exceeds this timeout. Setting this and
|
||||
waiting for the consumer to stop is a pretty good way to consume all messages
|
||||
in a topic.
|
||||
"""
|
||||
super(ConsoleConsumer, self).__init__(context, num_nodes)
|
||||
self.kafka = kafka
|
||||
self.args = {
|
||||
'topic': topic,
|
||||
}
|
||||
|
||||
self.consumer_timeout_ms = consumer_timeout_ms
|
||||
|
||||
self.from_beginning = from_beginning
|
||||
self.message_validator = message_validator
|
||||
self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)}
|
||||
|
||||
@property
|
||||
def start_cmd(self):
|
||||
args = self.args.copy()
|
||||
args.update({'zk_connect': self.kafka.zk.connect_setting()})
|
||||
cmd = "/opt/kafka/bin/kafka-console-consumer.sh --topic %(topic)s --zookeeper %(zk_connect)s" \
|
||||
" --consumer.config /mnt/console_consumer.properties" % args
|
||||
|
||||
if self.from_beginning:
|
||||
cmd += " --from-beginning"
|
||||
|
||||
cmd += " 2>> /mnt/consumer.log | tee -a /mnt/consumer.log &"
|
||||
return cmd
|
||||
|
||||
def _worker(self, idx, node):
|
||||
# form config file
|
||||
if self.consumer_timeout_ms is not None:
|
||||
prop_file = self.render('console_consumer.properties', consumer_timeout_ms=self.consumer_timeout_ms)
|
||||
else:
|
||||
prop_file = self.render('console_consumer.properties')
|
||||
|
||||
self.logger.info("console_consumer.properties:")
|
||||
self.logger.info(prop_file)
|
||||
node.account.create_file("/mnt/console_consumer.properties", prop_file)
|
||||
|
||||
# Run and capture output
|
||||
cmd = self.start_cmd
|
||||
self.logger.debug("Console consumer %d command: %s", idx, cmd)
|
||||
for line in node.account.ssh_capture(cmd):
|
||||
msg = line.strip()
|
||||
msg = self.message_validator(msg)
|
||||
if msg is not None:
|
||||
self.logger.debug("consumed a message: " + str(msg))
|
||||
self.messages_consumed[idx].append(msg)
|
||||
|
||||
def start_node(self, node):
|
||||
super(ConsoleConsumer, self).start_node(node)
|
||||
|
||||
def stop_node(self, node):
|
||||
node.account.kill_process("java", allow_fail=False)
|
||||
|
||||
def clean_node(self, node):
|
||||
node.account.ssh("rm -rf /mnt/console_consumer.properties /mnt/consumer.log", allow_fail=False)
|
||||
|
|
@ -0,0 +1,227 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ducktape.services.service import Service
|
||||
|
||||
import json
|
||||
import re
|
||||
import signal
|
||||
import time
|
||||
|
||||
|
||||
class KafkaService(Service):
|
||||
|
||||
logs = {
|
||||
"kafka_log": {
|
||||
"path": "/mnt/kafka.log",
|
||||
"collect_default": True},
|
||||
"kafka_data": {
|
||||
"path": "/mnt/kafka-logs",
|
||||
"collect_default": False}
|
||||
}
|
||||
|
||||
def __init__(self, context, num_nodes, zk, topics=None):
|
||||
"""
|
||||
:type context
|
||||
:type zk: ZookeeperService
|
||||
:type topics: dict
|
||||
"""
|
||||
super(KafkaService, self).__init__(context, num_nodes)
|
||||
self.zk = zk
|
||||
self.topics = topics
|
||||
|
||||
def start(self):
|
||||
super(KafkaService, self).start()
|
||||
|
||||
# Create topics if necessary
|
||||
if self.topics is not None:
|
||||
for topic, topic_cfg in self.topics.items():
|
||||
if topic_cfg is None:
|
||||
topic_cfg = {}
|
||||
|
||||
topic_cfg["topic"] = topic
|
||||
self.create_topic(topic_cfg)
|
||||
|
||||
def start_node(self, node):
|
||||
props_file = self.render('kafka.properties', node=node, broker_id=self.idx(node))
|
||||
self.logger.info("kafka.properties:")
|
||||
self.logger.info(props_file)
|
||||
node.account.create_file("/mnt/kafka.properties", props_file)
|
||||
|
||||
cmd = "/opt/kafka/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log & echo $! > /mnt/kafka.pid"
|
||||
self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd))
|
||||
node.account.ssh(cmd)
|
||||
time.sleep(5)
|
||||
if len(self.pids(node)) == 0:
|
||||
raise Exception("No process ids recorded on node %s" % str(node))
|
||||
|
||||
def pids(self, node):
|
||||
"""Return process ids associated with running processes on the given node."""
|
||||
try:
|
||||
return [pid for pid in node.account.ssh_capture("cat /mnt/kafka.pid", callback=int)]
|
||||
except:
|
||||
return []
|
||||
|
||||
def signal_node(self, node, sig=signal.SIGTERM):
|
||||
pids = self.pids(node)
|
||||
for pid in pids:
|
||||
node.account.signal(pid, sig)
|
||||
|
||||
def signal_leader(self, topic, partition=0, sig=signal.SIGTERM):
|
||||
leader = self.leader(topic, partition)
|
||||
self.signal_node(leader, sig)
|
||||
|
||||
def stop_node(self, node, clean_shutdown=True):
|
||||
pids = self.pids(node)
|
||||
sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
|
||||
|
||||
for pid in pids:
|
||||
node.account.signal(pid, sig, allow_fail=False)
|
||||
|
||||
node.account.ssh("rm -f /mnt/kafka.pid", allow_fail=False)
|
||||
|
||||
def clean_node(self, node):
|
||||
node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log /mnt/kafka.pid", allow_fail=False)
|
||||
|
||||
def create_topic(self, topic_cfg):
|
||||
node = self.nodes[0] # any node is fine here
|
||||
self.logger.info("Creating topic %s with settings %s", topic_cfg["topic"], topic_cfg)
|
||||
|
||||
cmd = "/opt/kafka/bin/kafka-topics.sh --zookeeper %(zk_connect)s --create "\
|
||||
"--topic %(topic)s --partitions %(partitions)d --replication-factor %(replication)d" % {
|
||||
'zk_connect': self.zk.connect_setting(),
|
||||
'topic': topic_cfg.get("topic"),
|
||||
'partitions': topic_cfg.get('partitions', 1),
|
||||
'replication': topic_cfg.get('replication-factor', 1)
|
||||
}
|
||||
|
||||
if "configs" in topic_cfg.keys() and topic_cfg["configs"] is not None:
|
||||
for config_name, config_value in topic_cfg["configs"].items():
|
||||
cmd += " --config %s=%s" % (config_name, str(config_value))
|
||||
|
||||
self.logger.info("Running topic creation command...\n%s" % cmd)
|
||||
node.account.ssh(cmd)
|
||||
|
||||
time.sleep(1)
|
||||
self.logger.info("Checking to see if topic was properly created...\n%s" % cmd)
|
||||
for line in self.describe_topic(topic_cfg["topic"]).split("\n"):
|
||||
self.logger.info(line)
|
||||
|
||||
def describe_topic(self, topic):
|
||||
node = self.nodes[0]
|
||||
cmd = "/opt/kafka/bin/kafka-topics.sh --zookeeper %s --topic %s --describe" % \
|
||||
(self.zk.connect_setting(), topic)
|
||||
output = ""
|
||||
for line in node.account.ssh_capture(cmd):
|
||||
output += line
|
||||
return output
|
||||
|
||||
def verify_reassign_partitions(self, reassignment):
|
||||
"""Run the reassign partitions admin tool in "verify" mode
|
||||
"""
|
||||
node = self.nodes[0]
|
||||
json_file = "/tmp/" + str(time.time()) + "_reassign.json"
|
||||
|
||||
# reassignment to json
|
||||
json_str = json.dumps(reassignment)
|
||||
json_str = json.dumps(json_str)
|
||||
|
||||
# create command
|
||||
cmd = "echo %s > %s && " % (json_str, json_file)
|
||||
cmd += "/opt/kafka/bin/kafka-reassign-partitions.sh "\
|
||||
"--zookeeper %(zk_connect)s "\
|
||||
"--reassignment-json-file %(reassignment_file)s "\
|
||||
"--verify" % {'zk_connect': self.zk.connect_setting(),
|
||||
'reassignment_file': json_file}
|
||||
cmd += " && sleep 1 && rm -f %s" % json_file
|
||||
|
||||
# send command
|
||||
self.logger.info("Verifying parition reassignment...")
|
||||
self.logger.debug(cmd)
|
||||
output = ""
|
||||
for line in node.account.ssh_capture(cmd):
|
||||
output += line
|
||||
|
||||
self.logger.debug(output)
|
||||
|
||||
if re.match(".*is in progress.*", output) is not None:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def execute_reassign_partitions(self, reassignment):
|
||||
"""Run the reassign partitions admin tool in "verify" mode
|
||||
"""
|
||||
node = self.nodes[0]
|
||||
json_file = "/tmp/" + str(time.time()) + "_reassign.json"
|
||||
|
||||
# reassignment to json
|
||||
json_str = json.dumps(reassignment)
|
||||
json_str = json.dumps(json_str)
|
||||
|
||||
# create command
|
||||
cmd = "echo %s > %s && " % (json_str, json_file)
|
||||
cmd += "/opt/kafka/bin/kafka-reassign-partitions.sh "\
|
||||
"--zookeeper %(zk_connect)s "\
|
||||
"--reassignment-json-file %(reassignment_file)s "\
|
||||
"--execute" % {'zk_connect': self.zk.connect_setting(),
|
||||
'reassignment_file': json_file}
|
||||
cmd += " && sleep 1 && rm -f %s" % json_file
|
||||
|
||||
# send command
|
||||
self.logger.info("Executing parition reassignment...")
|
||||
self.logger.debug(cmd)
|
||||
output = ""
|
||||
for line in node.account.ssh_capture(cmd):
|
||||
output += line
|
||||
|
||||
self.logger.debug("Verify partition reassignment:")
|
||||
self.logger.debug(output)
|
||||
|
||||
def restart_node(self, node, wait_sec=0, clean_shutdown=True):
|
||||
"""Restart the given node, waiting wait_sec in between stopping and starting up again."""
|
||||
self.stop_node(node, clean_shutdown)
|
||||
time.sleep(wait_sec)
|
||||
self.start_node(node)
|
||||
|
||||
def leader(self, topic, partition=0):
|
||||
""" Get the leader replica for the given topic and partition.
|
||||
"""
|
||||
cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.ZooKeeperMainWrapper -server %s " \
|
||||
% self.zk.connect_setting()
|
||||
cmd += "get /brokers/topics/%s/partitions/%d/state" % (topic, partition)
|
||||
self.logger.debug(cmd)
|
||||
|
||||
node = self.nodes[0]
|
||||
self.logger.debug("Querying zookeeper to find leader replica for topic %s: \n%s" % (cmd, topic))
|
||||
partition_state = None
|
||||
for line in node.account.ssh_capture(cmd):
|
||||
match = re.match("^({.+})$", line)
|
||||
if match is not None:
|
||||
partition_state = match.groups()[0]
|
||||
break
|
||||
|
||||
if partition_state is None:
|
||||
raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition))
|
||||
|
||||
partition_state = json.loads(partition_state)
|
||||
self.logger.info(partition_state)
|
||||
|
||||
leader_idx = int(partition_state["leader"])
|
||||
self.logger.info("Leader for topic %s and partition %d is now: %d" % (topic, partition, leader_idx))
|
||||
return self.get_node(leader_idx)
|
||||
|
||||
def bootstrap_servers(self):
|
||||
return ','.join([node.account.hostname + ":9092" for node in self.nodes])
|
|
@ -0,0 +1,163 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ducktape.services.background_thread import BackgroundThreadService
|
||||
|
||||
|
||||
class PerformanceService(BackgroundThreadService):
|
||||
def __init__(self, context, num_nodes):
|
||||
super(PerformanceService, self).__init__(context, num_nodes)
|
||||
self.results = [None] * self.num_nodes
|
||||
self.stats = [[] for x in range(self.num_nodes)]
|
||||
|
||||
|
||||
class ProducerPerformanceService(PerformanceService):
|
||||
def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, settings={}, intermediate_stats=False):
|
||||
super(ProducerPerformanceService, self).__init__(context, num_nodes)
|
||||
self.kafka = kafka
|
||||
self.args = {
|
||||
'topic': topic,
|
||||
'num_records': num_records,
|
||||
'record_size': record_size,
|
||||
'throughput': throughput
|
||||
}
|
||||
self.settings = settings
|
||||
self.intermediate_stats = intermediate_stats
|
||||
|
||||
def _worker(self, idx, node):
|
||||
args = self.args.copy()
|
||||
args.update({'bootstrap_servers': self.kafka.bootstrap_servers()})
|
||||
cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance "\
|
||||
"%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s" % args
|
||||
|
||||
for key,value in self.settings.items():
|
||||
cmd += " %s=%s" % (str(key), str(value))
|
||||
self.logger.debug("Producer performance %d command: %s", idx, cmd)
|
||||
|
||||
def parse_stats(line):
|
||||
parts = line.split(',')
|
||||
return {
|
||||
'records': int(parts[0].split()[0]),
|
||||
'records_per_sec': float(parts[1].split()[0]),
|
||||
'mbps': float(parts[1].split('(')[1].split()[0]),
|
||||
'latency_avg_ms': float(parts[2].split()[0]),
|
||||
'latency_max_ms': float(parts[3].split()[0]),
|
||||
'latency_50th_ms': float(parts[4].split()[0]),
|
||||
'latency_95th_ms': float(parts[5].split()[0]),
|
||||
'latency_99th_ms': float(parts[6].split()[0]),
|
||||
'latency_999th_ms': float(parts[7].split()[0]),
|
||||
}
|
||||
last = None
|
||||
for line in node.account.ssh_capture(cmd):
|
||||
self.logger.debug("Producer performance %d: %s", idx, line.strip())
|
||||
if self.intermediate_stats:
|
||||
try:
|
||||
self.stats[idx-1].append(parse_stats(line))
|
||||
except:
|
||||
# Sometimes there are extraneous log messages
|
||||
pass
|
||||
last = line
|
||||
try:
|
||||
self.results[idx-1] = parse_stats(last)
|
||||
except:
|
||||
self.logger.error("Bad last line: %s", last)
|
||||
|
||||
|
||||
class ConsumerPerformanceService(PerformanceService):
|
||||
def __init__(self, context, num_nodes, kafka, topic, num_records, throughput, threads=1, settings={}):
|
||||
super(ConsumerPerformanceService, self).__init__(context, num_nodes)
|
||||
self.kafka = kafka
|
||||
self.args = {
|
||||
'topic': topic,
|
||||
'num_records': num_records,
|
||||
'throughput': throughput,
|
||||
'threads': threads,
|
||||
}
|
||||
self.settings = settings
|
||||
|
||||
def _worker(self, idx, node):
|
||||
args = self.args.copy()
|
||||
args.update({'zk_connect': self.kafka.zk.connect_setting()})
|
||||
cmd = "/opt/kafka/bin/kafka-consumer-perf-test.sh "\
|
||||
"--topic %(topic)s --messages %(num_records)d --zookeeper %(zk_connect)s" % args
|
||||
for key,value in self.settings.items():
|
||||
cmd += " %s=%s" % (str(key), str(value))
|
||||
self.logger.debug("Consumer performance %d command: %s", idx, cmd)
|
||||
last = None
|
||||
for line in node.account.ssh_capture(cmd):
|
||||
self.logger.debug("Consumer performance %d: %s", idx, line.strip())
|
||||
last = line
|
||||
# Parse and save the last line's information
|
||||
parts = last.split(',')
|
||||
|
||||
self.results[idx-1] = {
|
||||
'total_mb': float(parts[2]),
|
||||
'mbps': float(parts[3]),
|
||||
'records_per_sec': float(parts[5]),
|
||||
}
|
||||
|
||||
|
||||
class EndToEndLatencyService(PerformanceService):
|
||||
def __init__(self, context, num_nodes, kafka, topic, num_records, consumer_fetch_max_wait=100, acks=1):
|
||||
super(EndToEndLatencyService, self).__init__(context, num_nodes)
|
||||
self.kafka = kafka
|
||||
self.args = {
|
||||
'topic': topic,
|
||||
'num_records': num_records,
|
||||
'consumer_fetch_max_wait': consumer_fetch_max_wait,
|
||||
'acks': acks
|
||||
}
|
||||
|
||||
def _worker(self, idx, node):
|
||||
args = self.args.copy()
|
||||
args.update({
|
||||
'zk_connect': self.kafka.zk.connect_setting(),
|
||||
'bootstrap_servers': self.kafka.bootstrap_servers(),
|
||||
})
|
||||
cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.EndToEndLatency "\
|
||||
"%(bootstrap_servers)s %(zk_connect)s %(topic)s %(num_records)d "\
|
||||
"%(consumer_fetch_max_wait)d %(acks)d" % args
|
||||
self.logger.debug("End-to-end latency %d command: %s", idx, cmd)
|
||||
results = {}
|
||||
for line in node.account.ssh_capture(cmd):
|
||||
self.logger.debug("End-to-end latency %d: %s", idx, line.strip())
|
||||
if line.startswith("Avg latency:"):
|
||||
results['latency_avg_ms'] = float(line.split()[2])
|
||||
if line.startswith("Percentiles"):
|
||||
results['latency_50th_ms'] = float(line.split()[3][:-1])
|
||||
results['latency_99th_ms'] = float(line.split()[6][:-1])
|
||||
results['latency_999th_ms'] = float(line.split()[9])
|
||||
self.results[idx-1] = results
|
||||
|
||||
|
||||
def parse_performance_output(summary):
|
||||
parts = summary.split(',')
|
||||
results = {
|
||||
'records': int(parts[0].split()[0]),
|
||||
'records_per_sec': float(parts[1].split()[0]),
|
||||
'mbps': float(parts[1].split('(')[1].split()[0]),
|
||||
'latency_avg_ms': float(parts[2].split()[0]),
|
||||
'latency_max_ms': float(parts[3].split()[0]),
|
||||
'latency_50th_ms': float(parts[4].split()[0]),
|
||||
'latency_95th_ms': float(parts[5].split()[0]),
|
||||
'latency_99th_ms': float(parts[6].split()[0]),
|
||||
'latency_999th_ms': float(parts[7].split()[0]),
|
||||
}
|
||||
# To provide compatibility with ConsumerPerformanceService
|
||||
results['total_mb'] = results['mbps'] * (results['records'] / results['records_per_sec'])
|
||||
results['rate_mbps'] = results['mbps']
|
||||
results['rate_mps'] = results['records_per_sec']
|
||||
|
||||
return results
|
|
@ -0,0 +1,19 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# see kafka.server.KafkaConfig for additional details and defaults
|
||||
|
||||
{% if consumer_timeout_ms is defined %}
|
||||
consumer.timeout.ms={{ consumer_timeout_ms }}
|
||||
{% endif %}
|
|
@ -0,0 +1,121 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# see kafka.server.KafkaConfig for additional details and defaults
|
||||
|
||||
############################# Server Basics #############################
|
||||
|
||||
# The id of the broker. This must be set to a unique integer for each broker.
|
||||
broker.id={{ broker_id }}
|
||||
|
||||
############################# Socket Server Settings #############################
|
||||
|
||||
# The port the socket server listens on
|
||||
port=9092
|
||||
|
||||
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
|
||||
#host.name=localhost
|
||||
|
||||
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
|
||||
# value for "host.name" if configured. Otherwise, it will use the value returned from
|
||||
# java.net.InetAddress.getCanonicalHostName().
|
||||
advertised.host.name={{ node.account.hostname }}
|
||||
|
||||
# The port to publish to ZooKeeper for clients to use. If this is not set,
|
||||
# it will publish the same port that the broker binds to.
|
||||
#advertised.port=<port accessible by clients>
|
||||
|
||||
# The number of threads handling network requests
|
||||
num.network.threads=3
|
||||
|
||||
# The number of threads doing disk I/O
|
||||
num.io.threads=8
|
||||
|
||||
# The send buffer (SO_SNDBUF) used by the socket server
|
||||
socket.send.buffer.bytes=102400
|
||||
|
||||
# The receive buffer (SO_RCVBUF) used by the socket server
|
||||
socket.receive.buffer.bytes=65536
|
||||
|
||||
# The maximum size of a request that the socket server will accept (protection against OOM)
|
||||
socket.request.max.bytes=104857600
|
||||
|
||||
|
||||
############################# Log Basics #############################
|
||||
|
||||
# A comma seperated list of directories under which to store log files
|
||||
log.dirs=/mnt/kafka-logs
|
||||
|
||||
# The default number of log partitions per topic. More partitions allow greater
|
||||
# parallelism for consumption, but this will also result in more files across
|
||||
# the brokers.
|
||||
num.partitions=1
|
||||
|
||||
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
|
||||
# This value is recommended to be increased for installations with data dirs located in RAID array.
|
||||
num.recovery.threads.per.data.dir=1
|
||||
|
||||
############################# Log Flush Policy #############################
|
||||
|
||||
# Messages are immediately written to the filesystem but by default we only fsync() to sync
|
||||
# the OS cache lazily. The following configurations control the flush of data to disk.
|
||||
# There are a few important trade-offs here:
|
||||
# 1. Durability: Unflushed data may be lost if you are not using replication.
|
||||
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
|
||||
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
|
||||
# The settings below allow one to configure the flush policy to flush data after a period of time or
|
||||
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
|
||||
|
||||
# The number of messages to accept before forcing a flush of data to disk
|
||||
#log.flush.interval.messages=10000
|
||||
|
||||
# The maximum amount of time a message can sit in a log before we force a flush
|
||||
#log.flush.interval.ms=1000
|
||||
|
||||
############################# Log Retention Policy #############################
|
||||
|
||||
# The following configurations control the disposal of log segments. The policy can
|
||||
# be set to delete segments after a period of time, or after a given size has accumulated.
|
||||
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
|
||||
# from the end of the log.
|
||||
|
||||
# The minimum age of a log file to be eligible for deletion
|
||||
log.retention.hours=168
|
||||
|
||||
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
|
||||
# segments don't drop below log.retention.bytes.
|
||||
#log.retention.bytes=1073741824
|
||||
|
||||
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
|
||||
log.segment.bytes=1073741824
|
||||
|
||||
# The interval at which log segments are checked to see if they can be deleted according
|
||||
# to the retention policies
|
||||
log.retention.check.interval.ms=300000
|
||||
|
||||
# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
|
||||
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
|
||||
log.cleaner.enable=false
|
||||
|
||||
############################# Zookeeper #############################
|
||||
|
||||
# Zookeeper connection string (see zookeeper docs for details).
|
||||
# This is a comma separated host:port pairs, each corresponding to a zk
|
||||
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
|
||||
# You can also append an optional chroot string to the urls to specify the
|
||||
# root directory for all kafka znodes.
|
||||
zookeeper.connect={{ zk.connect_setting() }}
|
||||
|
||||
# Timeout in ms for connecting to zookeeper
|
||||
zookeeper.connection.timeout.ms=2000
|
|
@ -0,0 +1,25 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# see kafka.server.KafkaConfig for additional details and defaults
|
||||
|
||||
dataDir=/mnt/zookeeper
|
||||
clientPort=2181
|
||||
maxClientCnxns=0
|
||||
initLimit=5
|
||||
syncLimit=2
|
||||
quorumListenOnAllIPs=true
|
||||
{% for node in nodes %}
|
||||
server.{{ loop.index }}={{ node.account.hostname }}:2888:3888
|
||||
{% endfor %}
|
|
@ -0,0 +1,107 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ducktape.services.background_thread import BackgroundThreadService
|
||||
|
||||
import json
|
||||
|
||||
|
||||
class VerifiableProducer(BackgroundThreadService):
|
||||
|
||||
logs = {
|
||||
"producer_log": {
|
||||
"path": "/mnt/producer.log",
|
||||
"collect_default": False}
|
||||
}
|
||||
|
||||
def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000):
|
||||
super(VerifiableProducer, self).__init__(context, num_nodes)
|
||||
|
||||
self.kafka = kafka
|
||||
self.topic = topic
|
||||
self.max_messages = max_messages
|
||||
self.throughput = throughput
|
||||
|
||||
self.acked_values = []
|
||||
self.not_acked_values = []
|
||||
|
||||
def _worker(self, idx, node):
|
||||
cmd = self.start_cmd
|
||||
self.logger.debug("VerifiableProducer %d command: %s" % (idx, cmd))
|
||||
|
||||
for line in node.account.ssh_capture(cmd):
|
||||
line = line.strip()
|
||||
|
||||
data = self.try_parse_json(line)
|
||||
if data is not None:
|
||||
|
||||
with self.lock:
|
||||
if data["name"] == "producer_send_error":
|
||||
data["node"] = idx
|
||||
self.not_acked_values.append(int(data["value"]))
|
||||
|
||||
elif data["name"] == "producer_send_success":
|
||||
self.acked_values.append(int(data["value"]))
|
||||
|
||||
@property
|
||||
def start_cmd(self):
|
||||
cmd = "/opt/kafka/bin/kafka-verifiable-producer.sh" \
|
||||
" --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers())
|
||||
if self.max_messages > 0:
|
||||
cmd += " --max-messages %s" % str(self.max_messages)
|
||||
if self.throughput > 0:
|
||||
cmd += " --throughput %s" % str(self.throughput)
|
||||
|
||||
cmd += " 2>> /mnt/producer.log | tee -a /mnt/producer.log &"
|
||||
return cmd
|
||||
|
||||
@property
|
||||
def acked(self):
|
||||
with self.lock:
|
||||
return self.acked_values
|
||||
|
||||
@property
|
||||
def not_acked(self):
|
||||
with self.lock:
|
||||
return self.not_acked_values
|
||||
|
||||
@property
|
||||
def num_acked(self):
|
||||
with self.lock:
|
||||
return len(self.acked_values)
|
||||
|
||||
@property
|
||||
def num_not_acked(self):
|
||||
with self.lock:
|
||||
return len(self.not_acked_values)
|
||||
|
||||
def stop_node(self, node):
|
||||
node.account.kill_process("VerifiableProducer", allow_fail=False)
|
||||
# block until the corresponding thread exits
|
||||
if len(self.worker_threads) >= self.idx(node):
|
||||
# Need to guard this because stop is preemptively called before the worker threads are added and started
|
||||
self.worker_threads[self.idx(node) - 1].join()
|
||||
|
||||
def clean_node(self, node):
|
||||
node.account.ssh("rm -rf /mnt/producer.log", allow_fail=False)
|
||||
|
||||
def try_parse_json(self, string):
|
||||
"""Try to parse a string as json. Return None if not parseable."""
|
||||
try:
|
||||
record = json.loads(string)
|
||||
return record
|
||||
except ValueError:
|
||||
self.logger.debug("Could not parse as json: %s" % str(string))
|
||||
return None
|
|
@ -0,0 +1,64 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
from ducktape.services.service import Service
|
||||
|
||||
import time
|
||||
|
||||
|
||||
class ZookeeperService(Service):
|
||||
|
||||
logs = {
|
||||
"zk_log": {
|
||||
"path": "/mnt/zk.log",
|
||||
"collect_default": True}
|
||||
}
|
||||
|
||||
def __init__(self, context, num_nodes):
|
||||
"""
|
||||
:type context
|
||||
"""
|
||||
super(ZookeeperService, self).__init__(context, num_nodes)
|
||||
|
||||
def start_node(self, node):
|
||||
idx = self.idx(node)
|
||||
self.logger.info("Starting ZK node %d on %s", idx, node.account.hostname)
|
||||
|
||||
node.account.ssh("mkdir -p /mnt/zookeeper")
|
||||
node.account.ssh("echo %d > /mnt/zookeeper/myid" % idx)
|
||||
|
||||
config_file = self.render('zookeeper.properties')
|
||||
self.logger.info("zookeeper.properties:")
|
||||
self.logger.info(config_file)
|
||||
node.account.create_file("/mnt/zookeeper.properties", config_file)
|
||||
|
||||
node.account.ssh(
|
||||
"/opt/kafka/bin/zookeeper-server-start.sh /mnt/zookeeper.properties 1>> %(path)s 2>> %(path)s &"
|
||||
% self.logs["zk_log"])
|
||||
|
||||
time.sleep(5) # give it some time to start
|
||||
|
||||
def stop_node(self, node):
|
||||
idx = self.idx(node)
|
||||
self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname))
|
||||
node.account.kill_process("zookeeper", allow_fail=False)
|
||||
|
||||
def clean_node(self, node):
|
||||
self.logger.info("Cleaning ZK node %d on %s", self.idx(node), node.account.hostname)
|
||||
node.account.ssh("rm -rf /mnt/zookeeper /mnt/zookeeper.properties /mnt/zk.log", allow_fail=False)
|
||||
|
||||
def connect_setting(self):
|
||||
return ','.join([node.account.hostname + ':2181' for node in self.nodes])
|
|
@ -0,0 +1,15 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# see kafka.server.KafkaConfig for additional details and defaults
|
|
@ -0,0 +1,274 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ducktape.services.service import Service
|
||||
|
||||
from kafkatest.tests.kafka_test import KafkaTest
|
||||
from kafkatest.services.performance import ProducerPerformanceService, ConsumerPerformanceService, EndToEndLatencyService
|
||||
|
||||
|
||||
class Benchmark(KafkaTest):
|
||||
'''A benchmark of Kafka producer/consumer performance. This replicates the test
|
||||
run here:
|
||||
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
|
||||
'''
|
||||
def __init__(self, test_context):
|
||||
super(Benchmark, self).__init__(test_context, num_zk=1, num_brokers=3, topics={
|
||||
'test-rep-one' : { 'partitions': 6, 'replication-factor': 1 },
|
||||
'test-rep-three' : { 'partitions': 6, 'replication-factor': 3 }
|
||||
})
|
||||
|
||||
if True:
|
||||
# Works on both aws and local
|
||||
self.msgs = 1000000
|
||||
self.msgs_default = 1000000
|
||||
else:
|
||||
# Can use locally on Vagrant VMs, but may use too much memory for aws
|
||||
self.msgs = 50000000
|
||||
self.msgs_default = 50000000
|
||||
|
||||
self.msgs_large = 10000000
|
||||
self.msg_size_default = 100
|
||||
self.batch_size = 8*1024
|
||||
self.buffer_memory = 64*1024*1024
|
||||
self.msg_sizes = [10, 100, 1000, 10000, 100000]
|
||||
self.target_data_size = 128*1024*1024
|
||||
self.target_data_size_gb = self.target_data_size/float(1024*1024*1024)
|
||||
|
||||
def test_single_producer_no_replication(self):
|
||||
self.logger.info("BENCHMARK: Single producer, no replication")
|
||||
self.perf = ProducerPerformanceService(
|
||||
self.test_context, 1, self.kafka,
|
||||
topic="test-rep-one", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
|
||||
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
|
||||
)
|
||||
self.perf.run()
|
||||
data = compute_throughput(self.perf)
|
||||
self.logger.info("Single producer, no replication: %s", str(data))
|
||||
return data
|
||||
|
||||
def test_single_producer_replication(self):
|
||||
self.logger.info("BENCHMARK: Single producer, async 3x replication")
|
||||
self.perf = ProducerPerformanceService(
|
||||
self.test_context, 1, self.kafka,
|
||||
topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
|
||||
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
|
||||
)
|
||||
self.perf.run()
|
||||
data = compute_throughput(self.perf)
|
||||
self.logger.info("Single producer, async 3x replication: %s" % str(data))
|
||||
return data
|
||||
|
||||
def test_single_producer_sync(self):
|
||||
self.logger.info("BENCHMARK: Single producer, sync 3x replication")
|
||||
self.perf = ProducerPerformanceService(
|
||||
self.test_context, 1, self.kafka,
|
||||
topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
|
||||
settings={'acks':-1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
|
||||
)
|
||||
self.perf.run()
|
||||
|
||||
data = compute_throughput(self.perf)
|
||||
self.logger.info("Single producer, sync 3x replication: %s" % data)
|
||||
return data
|
||||
|
||||
def test_three_producers_async(self):
|
||||
self.logger.info("BENCHMARK: Three producers, async 3x replication")
|
||||
self.perf = ProducerPerformanceService(
|
||||
self.test_context, 3, self.kafka,
|
||||
topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
|
||||
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
|
||||
)
|
||||
self.perf.run()
|
||||
|
||||
data = compute_throughput(self.perf)
|
||||
self.logger.info("Three producers, async 3x replication: %s" % data)
|
||||
return data
|
||||
|
||||
def test_multiple_message_size(self):
|
||||
# TODO this would be a great place to use parametrization
|
||||
self.perfs = {}
|
||||
for msg_size in self.msg_sizes:
|
||||
self.logger.info("BENCHMARK: Message size %d (%f GB total, single producer, async 3x replication)", msg_size, self.target_data_size_gb)
|
||||
# Always generate the same total amount of data
|
||||
nrecords = int(self.target_data_size / msg_size)
|
||||
self.perfs["perf-" + str(msg_size)] = ProducerPerformanceService(
|
||||
self.test_context, 1, self.kafka,
|
||||
topic="test-rep-three", num_records=nrecords, record_size=msg_size, throughput=-1,
|
||||
settings={'acks': 1, 'batch.size': self.batch_size, 'buffer.memory': self.buffer_memory}
|
||||
)
|
||||
|
||||
self.msg_size_perf = {}
|
||||
for msg_size in self.msg_sizes:
|
||||
perf = self.perfs["perf-" + str(msg_size)]
|
||||
perf.run()
|
||||
self.msg_size_perf[msg_size] = perf
|
||||
|
||||
summary = ["Message size:"]
|
||||
data = {}
|
||||
for msg_size in self.msg_sizes:
|
||||
datum = compute_throughput(self.msg_size_perf[msg_size])
|
||||
summary.append(" %d: %s" % (msg_size, datum))
|
||||
data[msg_size] = datum
|
||||
self.logger.info("\n".join(summary))
|
||||
return data
|
||||
|
||||
def test_long_term_throughput(self):
|
||||
self.logger.info("BENCHMARK: Long production")
|
||||
self.perf = ProducerPerformanceService(
|
||||
self.test_context, 1, self.kafka,
|
||||
topic="test-rep-three", num_records=self.msgs_large, record_size=self.msg_size_default, throughput=-1,
|
||||
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory},
|
||||
intermediate_stats=True
|
||||
)
|
||||
self.perf.run()
|
||||
|
||||
summary = ["Throughput over long run, data > memory:"]
|
||||
data = {}
|
||||
# FIXME we should be generating a graph too
|
||||
# Try to break it into 5 blocks, but fall back to a smaller number if
|
||||
# there aren't even 5 elements
|
||||
block_size = max(len(self.perf.stats[0]) / 5, 1)
|
||||
nblocks = len(self.perf.stats[0]) / block_size
|
||||
for i in range(nblocks):
|
||||
subset = self.perf.stats[0][i*block_size:min((i+1)*block_size, len(self.perf.stats[0]))]
|
||||
if len(subset) == 0:
|
||||
summary.append(" Time block %d: (empty)" % i)
|
||||
data[i] = None
|
||||
else:
|
||||
records_per_sec = sum([stat['records_per_sec'] for stat in subset])/float(len(subset))
|
||||
mb_per_sec = sum([stat['mbps'] for stat in subset])/float(len(subset))
|
||||
|
||||
summary.append(" Time block %d: %f rec/sec (%f MB/s)" % (i, records_per_sec, mb_per_sec))
|
||||
data[i] = throughput(records_per_sec, mb_per_sec)
|
||||
|
||||
self.logger.info("\n".join(summary))
|
||||
return data
|
||||
|
||||
def test_end_to_end_latency(self):
|
||||
self.logger.info("BENCHMARK: End to end latency")
|
||||
self.perf = EndToEndLatencyService(
|
||||
self.test_context, 1, self.kafka,
|
||||
topic="test-rep-three", num_records=10000
|
||||
)
|
||||
self.perf.run()
|
||||
|
||||
data = latency(self.perf.results[0]['latency_50th_ms'], self.perf.results[0]['latency_99th_ms'], self.perf.results[0]['latency_999th_ms'])
|
||||
self.logger.info("End-to-end latency: %s" % str(data))
|
||||
return data
|
||||
|
||||
def test_producer_and_consumer(self):
|
||||
self.logger.info("BENCHMARK: Producer + Consumer")
|
||||
self.producer = ProducerPerformanceService(
|
||||
self.test_context, 1, self.kafka,
|
||||
topic="test-rep-three", num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
|
||||
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
|
||||
)
|
||||
|
||||
self.consumer = ConsumerPerformanceService(
|
||||
self.test_context, 1, self.kafka,
|
||||
topic="test-rep-three", num_records=self.msgs_default, throughput=-1, threads=1
|
||||
)
|
||||
|
||||
Service.run_parallel(self.producer, self.consumer)
|
||||
|
||||
data = {
|
||||
"producer": compute_throughput(self.producer),
|
||||
"consumer": compute_throughput(self.consumer)
|
||||
}
|
||||
summary = [
|
||||
"Producer + consumer:",
|
||||
str(data)]
|
||||
self.logger.info("\n".join(summary))
|
||||
return data
|
||||
|
||||
def test_single_consumer(self):
|
||||
topic = "test-rep-three"
|
||||
|
||||
self.producer = ProducerPerformanceService(
|
||||
self.test_context, 1, self.kafka,
|
||||
topic=topic, num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
|
||||
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
|
||||
)
|
||||
self.producer.run()
|
||||
|
||||
# All consumer tests use the messages from the first benchmark, so
|
||||
# they'll get messages of the default message size
|
||||
self.logger.info("BENCHMARK: Single consumer")
|
||||
self.perf = ConsumerPerformanceService(
|
||||
self.test_context, 1, self.kafka,
|
||||
topic=topic, num_records=self.msgs_default, throughput=-1, threads=1
|
||||
)
|
||||
self.perf.run()
|
||||
|
||||
data = compute_throughput(self.perf)
|
||||
self.logger.info("Single consumer: %s" % data)
|
||||
return data
|
||||
|
||||
def test_three_consumers(self):
|
||||
topic = "test-rep-three"
|
||||
|
||||
self.producer = ProducerPerformanceService(
|
||||
self.test_context, 1, self.kafka,
|
||||
topic=topic, num_records=self.msgs_default, record_size=self.msg_size_default, throughput=-1,
|
||||
settings={'acks':1, 'batch.size':self.batch_size, 'buffer.memory':self.buffer_memory}
|
||||
)
|
||||
self.producer.run()
|
||||
|
||||
self.logger.info("BENCHMARK: Three consumers")
|
||||
self.perf = ConsumerPerformanceService(
|
||||
self.test_context, 3, self.kafka,
|
||||
topic="test-rep-three", num_records=self.msgs_default, throughput=-1, threads=1
|
||||
)
|
||||
self.perf.run()
|
||||
|
||||
data = compute_throughput(self.perf)
|
||||
self.logger.info("Three consumers: %s", data)
|
||||
return data
|
||||
|
||||
|
||||
def throughput(records_per_sec, mb_per_sec):
|
||||
"""Helper method to ensure uniform representation of throughput data"""
|
||||
return {
|
||||
"records_per_sec": records_per_sec,
|
||||
"mb_per_sec": mb_per_sec
|
||||
}
|
||||
|
||||
|
||||
def latency(latency_50th_ms, latency_99th_ms, latency_999th_ms):
|
||||
"""Helper method to ensure uniform representation of latency data"""
|
||||
return {
|
||||
"latency_50th_ms": latency_50th_ms,
|
||||
"latency_99th_ms": latency_99th_ms,
|
||||
"latency_999th_ms": latency_999th_ms
|
||||
}
|
||||
|
||||
|
||||
def compute_throughput(perf):
|
||||
"""Helper method for computing throughput after running a performance service."""
|
||||
aggregate_rate = sum([r['records_per_sec'] for r in perf.results])
|
||||
aggregate_mbps = sum([r['mbps'] for r in perf.results])
|
||||
|
||||
return throughput(aggregate_rate, aggregate_mbps)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,45 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ducktape.tests.test import Test
|
||||
|
||||
|
||||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
from kafkatest.services.kafka import KafkaService
|
||||
|
||||
|
||||
class KafkaTest(Test):
|
||||
"""
|
||||
Helper class that manages setting up a Kafka cluster. Use this if the
|
||||
default settings for Kafka are sufficient for your test; any customization
|
||||
needs to be done manually. Your run() method should call tearDown and
|
||||
setUp. The Zookeeper and Kafka services are available as the fields
|
||||
KafkaTest.zk and KafkaTest.kafka.
|
||||
"""
|
||||
def __init__(self, test_context, num_zk, num_brokers, topics=None):
|
||||
super(KafkaTest, self).__init__(test_context)
|
||||
self.num_zk = num_zk
|
||||
self.num_brokers = num_brokers
|
||||
self.topics = topics
|
||||
|
||||
self.zk = ZookeeperService(test_context, self.num_zk)
|
||||
|
||||
self.kafka = KafkaService(
|
||||
test_context, self.num_brokers,
|
||||
self.zk, topics=self.topics)
|
||||
|
||||
def setUp(self):
|
||||
self.zk.start()
|
||||
self.kafka.start()
|
|
@ -0,0 +1,165 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ducktape.tests.test import Test
|
||||
from ducktape.utils.util import wait_until
|
||||
|
||||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
from kafkatest.services.kafka import KafkaService
|
||||
from kafkatest.services.verifiable_producer import VerifiableProducer
|
||||
from kafkatest.services.console_consumer import ConsoleConsumer
|
||||
|
||||
import signal
|
||||
import time
|
||||
|
||||
|
||||
class ReplicationTest(Test):
|
||||
"""Replication tests.
|
||||
These tests verify that replication provides simple durability guarantees by checking that data acked by
|
||||
brokers is still available for consumption in the face of various failure scenarios."""
|
||||
|
||||
def __init__(self, test_context):
|
||||
""":type test_context: ducktape.tests.test.TestContext"""
|
||||
super(ReplicationTest, self).__init__(test_context=test_context)
|
||||
|
||||
self.topic = "test_topic"
|
||||
self.zk = ZookeeperService(test_context, num_nodes=1)
|
||||
self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
|
||||
"partitions": 3,
|
||||
"replication-factor": 3,
|
||||
"min.insync.replicas": 2}
|
||||
})
|
||||
self.producer_throughput = 10000
|
||||
self.num_producers = 1
|
||||
self.num_consumers = 1
|
||||
|
||||
def setUp(self):
|
||||
self.zk.start()
|
||||
self.kafka.start()
|
||||
|
||||
def min_cluster_size(self):
|
||||
"""Override this since we're adding services outside of the constructor"""
|
||||
return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers
|
||||
|
||||
def run_with_failure(self, failure):
|
||||
"""This is the top-level test template.
|
||||
|
||||
The steps are:
|
||||
Produce messages in the background while driving some failure condition
|
||||
When done driving failures, immediately stop producing
|
||||
Consume all messages
|
||||
Validate that messages acked by brokers were consumed
|
||||
|
||||
Note that consuming is a bit tricky, at least with console consumer. The goal is to consume all messages
|
||||
(foreach partition) in the topic. In this case, waiting for the last message may cause the consumer to stop
|
||||
too soon since console consumer is consuming multiple partitions from a single thread and therefore we lose
|
||||
ordering guarantees.
|
||||
|
||||
Waiting on a count of consumed messages can be unreliable: if we stop consuming when num_consumed == num_acked,
|
||||
we might exit early if some messages are duplicated (though not an issue here since producer retries==0)
|
||||
|
||||
Therefore rely here on the consumer.timeout.ms setting which times out on the interval between successively
|
||||
consumed messages. Since we run the producer to completion before running the consumer, this is a reliable
|
||||
indicator that nothing is left to consume.
|
||||
|
||||
"""
|
||||
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
|
||||
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=3000)
|
||||
|
||||
# Produce in a background thread while driving broker failures
|
||||
self.producer.start()
|
||||
if not wait_until(lambda: self.producer.num_acked > 5, timeout_sec=5):
|
||||
raise RuntimeError("Producer failed to start in a reasonable amount of time.")
|
||||
failure()
|
||||
self.producer.stop()
|
||||
|
||||
self.acked = self.producer.acked
|
||||
self.not_acked = self.producer.not_acked
|
||||
self.logger.info("num not acked: %d" % self.producer.num_not_acked)
|
||||
self.logger.info("num acked: %d" % self.producer.num_acked)
|
||||
|
||||
# Consume all messages
|
||||
self.consumer.start()
|
||||
self.consumer.wait()
|
||||
self.consumed = self.consumer.messages_consumed[1]
|
||||
self.logger.info("num consumed: %d" % len(self.consumed))
|
||||
|
||||
# Check produced vs consumed
|
||||
success, msg = self.validate()
|
||||
|
||||
if not success:
|
||||
self.mark_for_collect(self.producer)
|
||||
|
||||
assert success, msg
|
||||
|
||||
def clean_shutdown(self):
|
||||
"""Discover leader node for our topic and shut it down cleanly."""
|
||||
self.kafka.signal_leader(self.topic, partition=0, sig=signal.SIGTERM)
|
||||
|
||||
def hard_shutdown(self):
|
||||
"""Discover leader node for our topic and shut it down with a hard kill."""
|
||||
self.kafka.signal_leader(self.topic, partition=0, sig=signal.SIGKILL)
|
||||
|
||||
def clean_bounce(self):
|
||||
"""Chase the leader of one partition and restart it cleanly."""
|
||||
for i in range(5):
|
||||
prev_leader_node = self.kafka.leader(topic=self.topic, partition=0)
|
||||
self.kafka.restart_node(prev_leader_node, wait_sec=5, clean_shutdown=True)
|
||||
|
||||
def hard_bounce(self):
|
||||
"""Chase the leader and restart it cleanly."""
|
||||
for i in range(5):
|
||||
prev_leader_node = self.kafka.leader(topic=self.topic, partition=0)
|
||||
self.kafka.restart_node(prev_leader_node, wait_sec=5, clean_shutdown=False)
|
||||
|
||||
# Wait long enough for previous leader to probably be awake again
|
||||
time.sleep(6)
|
||||
|
||||
def validate(self):
|
||||
"""Check that produced messages were consumed."""
|
||||
|
||||
success = True
|
||||
msg = ""
|
||||
|
||||
if len(set(self.consumed)) != len(self.consumed):
|
||||
# There are duplicates. This is ok, so report it but don't fail the test
|
||||
msg += "There are duplicate messages in the log\n"
|
||||
|
||||
if not set(self.consumed).issuperset(set(self.acked)):
|
||||
# Every acked message must appear in the logs. I.e. consumed messages must be superset of acked messages.
|
||||
acked_minus_consumed = set(self.producer.acked) - set(self.consumed)
|
||||
success = False
|
||||
msg += "At least one acked message did not appear in the consumed messages. acked_minus_consumed: " + str(acked_minus_consumed)
|
||||
|
||||
if not success:
|
||||
# Collect all the data logs if there was a failure
|
||||
self.mark_for_collect(self.kafka)
|
||||
|
||||
return success, msg
|
||||
|
||||
def test_clean_shutdown(self):
|
||||
self.run_with_failure(self.clean_shutdown)
|
||||
|
||||
def test_hard_shutdown(self):
|
||||
self.run_with_failure(self.hard_shutdown)
|
||||
|
||||
def test_clean_bounce(self):
|
||||
self.run_with_failure(self.clean_bounce)
|
||||
|
||||
def test_hard_bounce(self):
|
||||
self.run_with_failure(self.hard_bounce)
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,27 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# see kafka.server.KafkaConfig for additional details and defaults
|
||||
|
||||
from setuptools import find_packages, setup
|
||||
|
||||
setup(name="kafkatest",
|
||||
version="0.8.3-SNAPSHOT",
|
||||
description="Apache Kafka System Tests",
|
||||
author="Apache Kafka",
|
||||
platforms=["any"],
|
||||
license="apache2.0",
|
||||
packages=find_packages(),
|
||||
requires=["ducktape(>=0.2.0)"]
|
||||
)
|
|
@ -19,10 +19,6 @@ import org.apache.kafka.clients.producer.*;
|
|||
|
||||
public class ProducerPerformance {
|
||||
|
||||
private static final long NS_PER_MS = 1000000L;
|
||||
private static final long NS_PER_SEC = 1000 * NS_PER_MS;
|
||||
private static final long MIN_SLEEP_NS = 2 * NS_PER_MS;
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
if (args.length < 4) {
|
||||
System.err.println("USAGE: java " + ProducerPerformance.class.getName() +
|
||||
|
@ -51,31 +47,17 @@ public class ProducerPerformance {
|
|||
byte[] payload = new byte[recordSize];
|
||||
Arrays.fill(payload, (byte) 1);
|
||||
ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicName, payload);
|
||||
long sleepTime = NS_PER_SEC / throughput;
|
||||
long sleepDeficitNs = 0;
|
||||
Stats stats = new Stats(numRecords, 5000);
|
||||
long start = System.currentTimeMillis();
|
||||
long startMs = System.currentTimeMillis();
|
||||
|
||||
ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs);
|
||||
for (int i = 0; i < numRecords; i++) {
|
||||
long sendStart = System.currentTimeMillis();
|
||||
Callback cb = stats.nextCompletion(sendStart, payload.length, stats);
|
||||
long sendStartMs = System.currentTimeMillis();
|
||||
Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats);
|
||||
producer.send(record, cb);
|
||||
|
||||
/*
|
||||
* Maybe sleep a little to control throughput. Sleep time can be a bit inaccurate for times < 1 ms so
|
||||
* instead of sleeping each time instead wait until a minimum sleep time accumulates (the "sleep deficit")
|
||||
* and then make up the whole deficit in one longer sleep.
|
||||
*/
|
||||
if (throughput > 0) {
|
||||
float elapsed = (sendStart - start) / 1000.f;
|
||||
if (elapsed > 0 && i / elapsed > throughput) {
|
||||
sleepDeficitNs += sleepTime;
|
||||
if (sleepDeficitNs >= MIN_SLEEP_NS) {
|
||||
long sleepMs = sleepDeficitNs / 1000000;
|
||||
long sleepNs = sleepDeficitNs - sleepMs * 1000000;
|
||||
Thread.sleep(sleepMs, (int) sleepNs);
|
||||
sleepDeficitNs = 0;
|
||||
}
|
||||
}
|
||||
if (throttler.shouldThrottle(i, sendStartMs)) {
|
||||
throttler.throttle();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,118 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.clients.tools;
|
||||
|
||||
|
||||
/**
|
||||
* This class helps producers throttle throughput.
|
||||
*
|
||||
* If targetThroughput >= 0, the resulting average throughput will be approximately
|
||||
* min(targetThroughput, maximumPossibleThroughput). If targetThroughput < 0,
|
||||
* no throttling will occur.
|
||||
*
|
||||
* To use, do this between successive send attempts:
|
||||
* <pre>
|
||||
* {@code
|
||||
* if (throttler.shouldThrottle(...)) {
|
||||
* throttler.throttle();
|
||||
* }
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
* Note that this can be used to throttle message throughput or data throughput.
|
||||
*/
|
||||
public class ThroughputThrottler {
|
||||
|
||||
private static final long NS_PER_MS = 1000000L;
|
||||
private static final long NS_PER_SEC = 1000 * NS_PER_MS;
|
||||
private static final long MIN_SLEEP_NS = 2 * NS_PER_MS;
|
||||
|
||||
long sleepTimeNs;
|
||||
long sleepDeficitNs = 0;
|
||||
long targetThroughput = -1;
|
||||
long startMs;
|
||||
|
||||
/**
|
||||
* @param targetThroughput Can be messages/sec or bytes/sec
|
||||
* @param startMs When the very first message is sent
|
||||
*/
|
||||
public ThroughputThrottler(long targetThroughput, long startMs) {
|
||||
this.startMs = startMs;
|
||||
this.targetThroughput = targetThroughput;
|
||||
this.sleepTimeNs = targetThroughput > 0 ?
|
||||
NS_PER_SEC / targetThroughput :
|
||||
Long.MAX_VALUE;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param amountSoFar bytes produced so far if you want to throttle data throughput, or
|
||||
* messages produced so far if you want to throttle message throughput.
|
||||
* @param sendStartMs timestamp of the most recently sent message
|
||||
* @return
|
||||
*/
|
||||
public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
|
||||
if (this.targetThroughput < 0) {
|
||||
// No throttling in this case
|
||||
return false;
|
||||
}
|
||||
|
||||
float elapsedMs = (sendStartMs - startMs) / 1000.f;
|
||||
return elapsedMs > 0 && (amountSoFar / elapsedMs) > this.targetThroughput;
|
||||
}
|
||||
|
||||
/**
|
||||
* Occasionally blocks for small amounts of time to achieve targetThroughput.
|
||||
*
|
||||
* Note that if targetThroughput is 0, this will block extremely aggressively.
|
||||
*/
|
||||
public void throttle() {
|
||||
if (targetThroughput == 0) {
|
||||
try {
|
||||
Thread.sleep(Long.MAX_VALUE);
|
||||
} catch (InterruptedException e) {
|
||||
// do nothing
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// throttle throughput by sleeping, on average,
|
||||
// (1 / this.throughput) seconds between "things sent"
|
||||
sleepDeficitNs += sleepTimeNs;
|
||||
|
||||
// If enough sleep deficit has accumulated, sleep a little
|
||||
if (sleepDeficitNs >= MIN_SLEEP_NS) {
|
||||
long sleepMs = sleepDeficitNs / 1000000;
|
||||
long sleepNs = sleepDeficitNs - sleepMs * 1000000;
|
||||
|
||||
long sleepStartNs = System.nanoTime();
|
||||
try {
|
||||
Thread.sleep(sleepMs, (int) sleepNs);
|
||||
sleepDeficitNs = 0;
|
||||
} catch (InterruptedException e) {
|
||||
// If sleep is cut short, reduce deficit by the amount of
|
||||
// time we actually spent sleeping
|
||||
long sleepElapsedNs = System.nanoTime() - sleepStartNs;
|
||||
if (sleepElapsedNs <= sleepDeficitNs) {
|
||||
sleepDeficitNs -= sleepElapsedNs;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,307 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.clients.tools;
|
||||
|
||||
import org.apache.kafka.clients.producer.Callback;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.Producer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import static net.sourceforge.argparse4j.impl.Arguments.store;
|
||||
|
||||
import net.sourceforge.argparse4j.ArgumentParsers;
|
||||
import net.sourceforge.argparse4j.inf.ArgumentParser;
|
||||
import net.sourceforge.argparse4j.inf.ArgumentParserException;
|
||||
import net.sourceforge.argparse4j.inf.Namespace;
|
||||
|
||||
/**
|
||||
* Primarily intended for use with system testing, this producer prints metadata
|
||||
* in the form of JSON to stdout on each "send" request. For example, this helps
|
||||
* with end-to-end correctness tests by making externally visible which messages have been
|
||||
* acked and which have not.
|
||||
*
|
||||
* When used as a command-line tool, it produces increasing integers. It will produce a
|
||||
* fixed number of messages unless the default max-messages -1 is used, in which case
|
||||
* it produces indefinitely.
|
||||
*
|
||||
* If logging is left enabled, log output on stdout can be easily ignored by checking
|
||||
* whether a given line is valid JSON.
|
||||
*/
|
||||
public class VerifiableProducer {
|
||||
|
||||
String topic;
|
||||
private Producer<String, String> producer;
|
||||
// If maxMessages < 0, produce until the process is killed externally
|
||||
private long maxMessages = -1;
|
||||
|
||||
// Number of messages for which acks were received
|
||||
private long numAcked = 0;
|
||||
|
||||
// Number of send attempts
|
||||
private long numSent = 0;
|
||||
|
||||
// Throttle message throughput if this is set >= 0
|
||||
private long throughput;
|
||||
|
||||
// Hook to trigger producing thread to stop sending messages
|
||||
private boolean stopProducing = false;
|
||||
|
||||
public VerifiableProducer(
|
||||
Properties producerProps, String topic, int throughput, int maxMessages) {
|
||||
|
||||
this.topic = topic;
|
||||
this.throughput = throughput;
|
||||
this.maxMessages = maxMessages;
|
||||
this.producer = new KafkaProducer<String, String>(producerProps);
|
||||
}
|
||||
|
||||
/** Get the command-line argument parser. */
|
||||
private static ArgumentParser argParser() {
|
||||
ArgumentParser parser = ArgumentParsers
|
||||
.newArgumentParser("verifiable-producer")
|
||||
.defaultHelp(true)
|
||||
.description("This tool produces increasing integers to the specified topic and prints JSON metadata to stdout on each \"send\" request, making externally visible which messages have been acked and which have not.");
|
||||
|
||||
parser.addArgument("--topic")
|
||||
.action(store())
|
||||
.required(true)
|
||||
.type(String.class)
|
||||
.metavar("TOPIC")
|
||||
.help("Produce messages to this topic.");
|
||||
|
||||
parser.addArgument("--broker-list")
|
||||
.action(store())
|
||||
.required(true)
|
||||
.type(String.class)
|
||||
.metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
|
||||
.dest("brokerList")
|
||||
.help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
|
||||
|
||||
parser.addArgument("--max-messages")
|
||||
.action(store())
|
||||
.required(false)
|
||||
.setDefault(-1)
|
||||
.type(Integer.class)
|
||||
.metavar("MAX-MESSAGES")
|
||||
.dest("maxMessages")
|
||||
.help("Produce this many messages. If -1, produce messages until the process is killed externally.");
|
||||
|
||||
parser.addArgument("--throughput")
|
||||
.action(store())
|
||||
.required(false)
|
||||
.setDefault(-1)
|
||||
.type(Integer.class)
|
||||
.metavar("THROUGHPUT")
|
||||
.help("If set >= 0, throttle maximum message throughput to *approximately* THROUGHPUT messages/sec.");
|
||||
|
||||
parser.addArgument("--acks")
|
||||
.action(store())
|
||||
.required(false)
|
||||
.setDefault(-1)
|
||||
.type(Integer.class)
|
||||
.choices(0, 1, -1)
|
||||
.metavar("ACKS")
|
||||
.help("Acks required on each produced message. See Kafka docs on request.required.acks for details.");
|
||||
|
||||
return parser;
|
||||
}
|
||||
|
||||
/** Construct a VerifiableProducer object from command-line arguments. */
|
||||
public static VerifiableProducer createFromArgs(String[] args) {
|
||||
ArgumentParser parser = argParser();
|
||||
VerifiableProducer producer = null;
|
||||
|
||||
try {
|
||||
Namespace res;
|
||||
res = parser.parseArgs(args);
|
||||
|
||||
int maxMessages = res.getInt("maxMessages");
|
||||
String topic = res.getString("topic");
|
||||
int throughput = res.getInt("throughput");
|
||||
|
||||
Properties producerProps = new Properties();
|
||||
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));
|
||||
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
|
||||
"org.apache.kafka.common.serialization.StringSerializer");
|
||||
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
|
||||
"org.apache.kafka.common.serialization.StringSerializer");
|
||||
producerProps.put(ProducerConfig.ACKS_CONFIG, Integer.toString(res.getInt("acks")));
|
||||
// No producer retries
|
||||
producerProps.put("retries", "0");
|
||||
|
||||
producer = new VerifiableProducer(producerProps, topic, throughput, maxMessages);
|
||||
} catch (ArgumentParserException e) {
|
||||
if (args.length == 0) {
|
||||
parser.printHelp();
|
||||
System.exit(0);
|
||||
} else {
|
||||
parser.handleError(e);
|
||||
System.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
return producer;
|
||||
}
|
||||
|
||||
/** Produce a message with given key and value. */
|
||||
public void send(String key, String value) {
|
||||
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value);
|
||||
numSent++;
|
||||
try {
|
||||
producer.send(record, new PrintInfoCallback(key, value));
|
||||
} catch (Exception e) {
|
||||
|
||||
synchronized (System.out) {
|
||||
System.out.println(errorString(e, key, value, System.currentTimeMillis()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Close the producer to flush any remaining messages. */
|
||||
public void close() {
|
||||
producer.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return JSON string encapsulating basic information about the exception, as well
|
||||
* as the key and value which triggered the exception.
|
||||
*/
|
||||
String errorString(Exception e, String key, String value, Long nowMs) {
|
||||
assert e != null : "Expected non-null exception.";
|
||||
|
||||
Map<String, Object> errorData = new HashMap<>();
|
||||
errorData.put("class", this.getClass().toString());
|
||||
errorData.put("name", "producer_send_error");
|
||||
|
||||
errorData.put("time_ms", nowMs);
|
||||
errorData.put("exception", e.getClass().toString());
|
||||
errorData.put("message", e.getMessage());
|
||||
errorData.put("topic", this.topic);
|
||||
errorData.put("key", key);
|
||||
errorData.put("value", value);
|
||||
|
||||
return toJsonString(errorData);
|
||||
}
|
||||
|
||||
String successString(RecordMetadata recordMetadata, String key, String value, Long nowMs) {
|
||||
assert recordMetadata != null : "Expected non-null recordMetadata object.";
|
||||
|
||||
Map<String, Object> successData = new HashMap<>();
|
||||
successData.put("class", this.getClass().toString());
|
||||
successData.put("name", "producer_send_success");
|
||||
|
||||
successData.put("time_ms", nowMs);
|
||||
successData.put("topic", this.topic);
|
||||
successData.put("partition", recordMetadata.partition());
|
||||
successData.put("offset", recordMetadata.offset());
|
||||
successData.put("key", key);
|
||||
successData.put("value", value);
|
||||
|
||||
return toJsonString(successData);
|
||||
}
|
||||
|
||||
private String toJsonString(Map<String, Object> data) {
|
||||
String json;
|
||||
try {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
json = mapper.writeValueAsString(data);
|
||||
} catch (JsonProcessingException e) {
|
||||
json = "Bad data can't be written as json: " + e.getMessage();
|
||||
}
|
||||
return json;
|
||||
}
|
||||
|
||||
/** Callback which prints errors to stdout when the producer fails to send. */
|
||||
private class PrintInfoCallback implements Callback {
|
||||
|
||||
private String key;
|
||||
private String value;
|
||||
|
||||
PrintInfoCallback(String key, String value) {
|
||||
this.key = key;
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
|
||||
synchronized (System.out) {
|
||||
if (e == null) {
|
||||
VerifiableProducer.this.numAcked++;
|
||||
System.out.println(successString(recordMetadata, this.key, this.value, System.currentTimeMillis()));
|
||||
} else {
|
||||
System.out.println(errorString(e, this.key, this.value, System.currentTimeMillis()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws IOException {
|
||||
|
||||
final VerifiableProducer producer = createFromArgs(args);
|
||||
final long startMs = System.currentTimeMillis();
|
||||
boolean infinite = producer.maxMessages < 0;
|
||||
|
||||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
// Trigger main thread to stop producing messages
|
||||
producer.stopProducing = true;
|
||||
|
||||
// Flush any remaining messages
|
||||
producer.close();
|
||||
|
||||
// Print a summary
|
||||
long stopMs = System.currentTimeMillis();
|
||||
double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));
|
||||
|
||||
Map<String, Object> data = new HashMap<>();
|
||||
data.put("class", producer.getClass().toString());
|
||||
data.put("name", "tool_data");
|
||||
data.put("sent", producer.numSent);
|
||||
data.put("acked", producer.numAcked);
|
||||
data.put("target_throughput", producer.throughput);
|
||||
data.put("avg_throughput", avgThroughput);
|
||||
|
||||
System.out.println(producer.toJsonString(data));
|
||||
}
|
||||
});
|
||||
|
||||
ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs);
|
||||
for (int i = 0; i < producer.maxMessages || infinite; i++) {
|
||||
if (producer.stopProducing) {
|
||||
break;
|
||||
}
|
||||
long sendStartMs = System.currentTimeMillis();
|
||||
producer.send(null, String.format("%d", i));
|
||||
|
||||
if (throttler.shouldThrottle(i, sendStartMs)) {
|
||||
throttler.throttle();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# see kafka.server.KafkaConfig for additional details and defaults
|
||||
|
||||
if [ -z "$AWS_IAM" ];then
|
||||
echo "Warning: AWS_IAM is not set"
|
||||
fi
|
||||
|
||||
export AWS_ACCESS_KEY=`curl -s http://169.254.169.254/latest/meta-data/iam/security-credentials/$AWS_IAM | grep AccessKeyId | awk -F\" '{ print $4 }'`
|
||||
export AWS_SECRET_KEY=`curl -s http://169.254.169.254/latest/meta-data/iam/security-credentials/$AWS_IAM | grep SecretAccessKey | awk -F\" '{ print $4 }'`
|
||||
export AWS_SESSION_TOKEN=`curl -s http://169.254.169.254/latest/meta-data/iam/security-credentials/$AWS_IAM | grep Token | awk -F\" '{ print $4 }'`
|
||||
|
||||
if [ -z "$AWS_ACCESS_KEY" ]; then
|
||||
echo "Failed to populate environment variables AWS_ACCESS_KEY, AWS_SECRET_KEY, and AWS_SESSION_TOKEN."
|
||||
echo "AWS_IAM is currently $AWS_IAM. Double-check that this is correct. If not set, add this command to your .bashrc file:"
|
||||
echo "export AWS_IAM=<my_aws_iam> # put this into your ~/.bashrc"
|
||||
fi
|
|
@ -0,0 +1,28 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# see kafka.server.KafkaConfig for additional details and defaults
|
||||
|
||||
# Use this template Vagrantfile.local for running system tests on aws
|
||||
# To use it, move it to the base kafka directory and rename
|
||||
# it to Vagrantfile.local, and adjust variables as needed.
|
||||
ec3_instance_type = "m3.medium"
|
||||
num_zookeepers = 0
|
||||
num_brokers = 0
|
||||
num_workers = 9
|
||||
ec2_keypair_name = kafkatest
|
||||
ec2_keypair_file = ../kafkatest.pem
|
||||
ec2_security_groups = ['kafkatest']
|
||||
ec2_region = 'us-west-2'
|
||||
ec2_ami = "ami-29ebb519"
|
|
@ -0,0 +1,73 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# see kafka.server.KafkaConfig for additional details and defaults
|
||||
|
||||
#!/bin/bash
|
||||
|
||||
# This script can be used to set up a driver machine on aws from which you will run tests
|
||||
# or bring up your mini Kafka cluster.
|
||||
|
||||
# Install dependencies
|
||||
sudo apt-get install -y maven openjdk-6-jdk build-essential \
|
||||
ruby-dev zlib1g-dev realpath python-setuptools
|
||||
|
||||
base_dir=`dirname $0`/../..
|
||||
|
||||
if [ -z `which vagrant` ]; then
|
||||
echo "Installing vagrant..."
|
||||
wget https://dl.bintray.com/mitchellh/vagrant/vagrant_1.7.2_x86_64.deb
|
||||
sudo dpkg -i vagrant_1.7.2_x86_64.deb
|
||||
rm -f vagrant_1.7.2_x86_64.deb
|
||||
fi
|
||||
|
||||
# Install necessary vagrant plugins
|
||||
# Note: Do NOT install vagrant-cachier since it doesn't work on AWS and only
|
||||
# adds log noise
|
||||
vagrant_plugins="vagrant-aws vagrant-hostmanager"
|
||||
existing=`vagrant plugin list`
|
||||
for plugin in $vagrant_plugins; do
|
||||
echo $existing | grep $plugin > /dev/null
|
||||
if [ $? != 0 ]; then
|
||||
vagrant plugin install $plugin
|
||||
fi
|
||||
done
|
||||
|
||||
# Create Vagrantfile.local as a convenience
|
||||
if [ ! -e "$base_dir/Vagrantfile.local" ]; then
|
||||
cp $base_dir/aws/aws-example-Vagrantfile.local $base_dir/Vagrantfile.local
|
||||
fi
|
||||
|
||||
gradle="gradle-2.2.1"
|
||||
if [ -z `which gradle` ] && [ ! -d $base_dir/$gradle ]; then
|
||||
if [ ! -e $gradle-bin.zip ]; then
|
||||
wget https://services.gradle.org/distributions/$gradle-bin.zip
|
||||
fi
|
||||
unzip $gradle-bin.zip
|
||||
rm -rf $gradle-bin.zip
|
||||
mv $gradle $base_dir/$gradle
|
||||
fi
|
||||
|
||||
# Ensure aws access keys are in the environment when we use a EC2 driver machine
|
||||
LOCAL_HOSTNAME=$(hostname -d)
|
||||
if [[ ${LOCAL_HOSTNAME} =~ .*\.compute\.internal ]]; then
|
||||
grep "AWS ACCESS KEYS" ~/.bashrc > /dev/null
|
||||
if [ $? != 0 ]; then
|
||||
echo "# --- AWS ACCESS KEYS ---" >> ~/.bashrc
|
||||
echo ". `realpath $base_dir/aws/aws-access-keys-commands`" >> ~/.bashrc
|
||||
echo "# -----------------------" >> ~/.bashrc
|
||||
source ~/.bashrc
|
||||
fi
|
||||
fi
|
||||
|
|
@ -41,3 +41,12 @@ chmod a+rw /opt
|
|||
if [ ! -e /opt/kafka ]; then
|
||||
ln -s /vagrant /opt/kafka
|
||||
fi
|
||||
|
||||
# For EC2 nodes, we want to use /mnt, which should have the local disk. On local
|
||||
# VMs, we can just create it if it doesn't exist and use it like we'd use
|
||||
# /tmp. Eventually, we'd like to also support more directories, e.g. when EC2
|
||||
# instances have multiple local disks.
|
||||
if [ ! -e /mnt ]; then
|
||||
mkdir /mnt
|
||||
fi
|
||||
chmod a+rwx /mnt
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# see kafka.server.KafkaConfig for additional details and defaults
|
||||
|
||||
# Use this example Vagrantfile.local for running system tests
|
||||
# To use it, move it to the base kafka directory and rename
|
||||
# it to Vagrantfile.local
|
||||
num_zookeepers = 0
|
||||
num_brokers = 0
|
||||
num_workers = 9
|
Loading…
Reference in New Issue