(topicName, payload);
- long sleepTime = NS_PER_SEC / throughput;
- long sleepDeficitNs = 0;
Stats stats = new Stats(numRecords, 5000);
- long start = System.currentTimeMillis();
+ long startMs = System.currentTimeMillis();
+
+ ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs);
for (int i = 0; i < numRecords; i++) {
- long sendStart = System.currentTimeMillis();
- Callback cb = stats.nextCompletion(sendStart, payload.length, stats);
+ long sendStartMs = System.currentTimeMillis();
+ Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats);
producer.send(record, cb);
- /*
- * Maybe sleep a little to control throughput. Sleep time can be a bit inaccurate for times < 1 ms so
- * instead of sleeping each time instead wait until a minimum sleep time accumulates (the "sleep deficit")
- * and then make up the whole deficit in one longer sleep.
- */
- if (throughput > 0) {
- float elapsed = (sendStart - start) / 1000.f;
- if (elapsed > 0 && i / elapsed > throughput) {
- sleepDeficitNs += sleepTime;
- if (sleepDeficitNs >= MIN_SLEEP_NS) {
- long sleepMs = sleepDeficitNs / 1000000;
- long sleepNs = sleepDeficitNs - sleepMs * 1000000;
- Thread.sleep(sleepMs, (int) sleepNs);
- sleepDeficitNs = 0;
- }
- }
+ if (throttler.shouldThrottle(i, sendStartMs)) {
+ throttler.throttle();
}
}
diff --git a/tools/src/main/java/org/apache/kafka/clients/tools/ThroughputThrottler.java b/tools/src/main/java/org/apache/kafka/clients/tools/ThroughputThrottler.java
new file mode 100644
index 00000000000..06c443f576c
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/clients/tools/ThroughputThrottler.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.tools;
+
+
+/**
+ * This class helps producers throttle throughput.
+ *
+ * If targetThroughput >= 0, the resulting average throughput will be approximately
+ * min(targetThroughput, maximumPossibleThroughput). If targetThroughput < 0,
+ * no throttling will occur.
+ *
+ * To use, do this between successive send attempts:
+ *
+ * {@code
+ * if (throttler.shouldThrottle(...)) {
+ * throttler.throttle();
+ * }
+ * }
+ *
+ *
+ * Note that this can be used to throttle message throughput or data throughput.
+ */
+public class ThroughputThrottler {
+
+ private static final long NS_PER_MS = 1000000L;
+ private static final long NS_PER_SEC = 1000 * NS_PER_MS;
+ private static final long MIN_SLEEP_NS = 2 * NS_PER_MS;
+
+ long sleepTimeNs;
+ long sleepDeficitNs = 0;
+ long targetThroughput = -1;
+ long startMs;
+
+ /**
+ * @param targetThroughput Can be messages/sec or bytes/sec
+ * @param startMs When the very first message is sent
+ */
+ public ThroughputThrottler(long targetThroughput, long startMs) {
+ this.startMs = startMs;
+ this.targetThroughput = targetThroughput;
+ this.sleepTimeNs = targetThroughput > 0 ?
+ NS_PER_SEC / targetThroughput :
+ Long.MAX_VALUE;
+ }
+
+ /**
+ * @param amountSoFar bytes produced so far if you want to throttle data throughput, or
+ * messages produced so far if you want to throttle message throughput.
+ * @param sendStartMs timestamp of the most recently sent message
+ * @return
+ */
+ public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
+ if (this.targetThroughput < 0) {
+ // No throttling in this case
+ return false;
+ }
+
+ float elapsedMs = (sendStartMs - startMs) / 1000.f;
+ return elapsedMs > 0 && (amountSoFar / elapsedMs) > this.targetThroughput;
+ }
+
+ /**
+ * Occasionally blocks for small amounts of time to achieve targetThroughput.
+ *
+ * Note that if targetThroughput is 0, this will block extremely aggressively.
+ */
+ public void throttle() {
+ if (targetThroughput == 0) {
+ try {
+ Thread.sleep(Long.MAX_VALUE);
+ } catch (InterruptedException e) {
+ // do nothing
+ }
+ return;
+ }
+
+ // throttle throughput by sleeping, on average,
+ // (1 / this.throughput) seconds between "things sent"
+ sleepDeficitNs += sleepTimeNs;
+
+ // If enough sleep deficit has accumulated, sleep a little
+ if (sleepDeficitNs >= MIN_SLEEP_NS) {
+ long sleepMs = sleepDeficitNs / 1000000;
+ long sleepNs = sleepDeficitNs - sleepMs * 1000000;
+
+ long sleepStartNs = System.nanoTime();
+ try {
+ Thread.sleep(sleepMs, (int) sleepNs);
+ sleepDeficitNs = 0;
+ } catch (InterruptedException e) {
+ // If sleep is cut short, reduce deficit by the amount of
+ // time we actually spent sleeping
+ long sleepElapsedNs = System.nanoTime() - sleepStartNs;
+ if (sleepElapsedNs <= sleepDeficitNs) {
+ sleepDeficitNs -= sleepElapsedNs;
+ }
+ }
+ }
+ }
+}
+
+
\ No newline at end of file
diff --git a/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java
new file mode 100644
index 00000000000..b04876f8fc7
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.tools;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+
+/**
+ * Primarily intended for use with system testing, this producer prints metadata
+ * in the form of JSON to stdout on each "send" request. For example, this helps
+ * with end-to-end correctness tests by making externally visible which messages have been
+ * acked and which have not.
+ *
+ * When used as a command-line tool, it produces increasing integers. It will produce a
+ * fixed number of messages unless the default max-messages -1 is used, in which case
+ * it produces indefinitely.
+ *
+ * If logging is left enabled, log output on stdout can be easily ignored by checking
+ * whether a given line is valid JSON.
+ */
+public class VerifiableProducer {
+
+ String topic;
+ private Producer producer;
+ // If maxMessages < 0, produce until the process is killed externally
+ private long maxMessages = -1;
+
+ // Number of messages for which acks were received
+ private long numAcked = 0;
+
+ // Number of send attempts
+ private long numSent = 0;
+
+ // Throttle message throughput if this is set >= 0
+ private long throughput;
+
+ // Hook to trigger producing thread to stop sending messages
+ private boolean stopProducing = false;
+
+ public VerifiableProducer(
+ Properties producerProps, String topic, int throughput, int maxMessages) {
+
+ this.topic = topic;
+ this.throughput = throughput;
+ this.maxMessages = maxMessages;
+ this.producer = new KafkaProducer(producerProps);
+ }
+
+ /** Get the command-line argument parser. */
+ private static ArgumentParser argParser() {
+ ArgumentParser parser = ArgumentParsers
+ .newArgumentParser("verifiable-producer")
+ .defaultHelp(true)
+ .description("This tool produces increasing integers to the specified topic and prints JSON metadata to stdout on each \"send\" request, making externally visible which messages have been acked and which have not.");
+
+ parser.addArgument("--topic")
+ .action(store())
+ .required(true)
+ .type(String.class)
+ .metavar("TOPIC")
+ .help("Produce messages to this topic.");
+
+ parser.addArgument("--broker-list")
+ .action(store())
+ .required(true)
+ .type(String.class)
+ .metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
+ .dest("brokerList")
+ .help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
+
+ parser.addArgument("--max-messages")
+ .action(store())
+ .required(false)
+ .setDefault(-1)
+ .type(Integer.class)
+ .metavar("MAX-MESSAGES")
+ .dest("maxMessages")
+ .help("Produce this many messages. If -1, produce messages until the process is killed externally.");
+
+ parser.addArgument("--throughput")
+ .action(store())
+ .required(false)
+ .setDefault(-1)
+ .type(Integer.class)
+ .metavar("THROUGHPUT")
+ .help("If set >= 0, throttle maximum message throughput to *approximately* THROUGHPUT messages/sec.");
+
+ parser.addArgument("--acks")
+ .action(store())
+ .required(false)
+ .setDefault(-1)
+ .type(Integer.class)
+ .choices(0, 1, -1)
+ .metavar("ACKS")
+ .help("Acks required on each produced message. See Kafka docs on request.required.acks for details.");
+
+ return parser;
+ }
+
+ /** Construct a VerifiableProducer object from command-line arguments. */
+ public static VerifiableProducer createFromArgs(String[] args) {
+ ArgumentParser parser = argParser();
+ VerifiableProducer producer = null;
+
+ try {
+ Namespace res;
+ res = parser.parseArgs(args);
+
+ int maxMessages = res.getInt("maxMessages");
+ String topic = res.getString("topic");
+ int throughput = res.getInt("throughput");
+
+ Properties producerProps = new Properties();
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringSerializer");
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringSerializer");
+ producerProps.put(ProducerConfig.ACKS_CONFIG, Integer.toString(res.getInt("acks")));
+ // No producer retries
+ producerProps.put("retries", "0");
+
+ producer = new VerifiableProducer(producerProps, topic, throughput, maxMessages);
+ } catch (ArgumentParserException e) {
+ if (args.length == 0) {
+ parser.printHelp();
+ System.exit(0);
+ } else {
+ parser.handleError(e);
+ System.exit(1);
+ }
+ }
+
+ return producer;
+ }
+
+ /** Produce a message with given key and value. */
+ public void send(String key, String value) {
+ ProducerRecord record = new ProducerRecord(topic, key, value);
+ numSent++;
+ try {
+ producer.send(record, new PrintInfoCallback(key, value));
+ } catch (Exception e) {
+
+ synchronized (System.out) {
+ System.out.println(errorString(e, key, value, System.currentTimeMillis()));
+ }
+ }
+ }
+
+ /** Close the producer to flush any remaining messages. */
+ public void close() {
+ producer.close();
+ }
+
+ /**
+ * Return JSON string encapsulating basic information about the exception, as well
+ * as the key and value which triggered the exception.
+ */
+ String errorString(Exception e, String key, String value, Long nowMs) {
+ assert e != null : "Expected non-null exception.";
+
+ Map errorData = new HashMap<>();
+ errorData.put("class", this.getClass().toString());
+ errorData.put("name", "producer_send_error");
+
+ errorData.put("time_ms", nowMs);
+ errorData.put("exception", e.getClass().toString());
+ errorData.put("message", e.getMessage());
+ errorData.put("topic", this.topic);
+ errorData.put("key", key);
+ errorData.put("value", value);
+
+ return toJsonString(errorData);
+ }
+
+ String successString(RecordMetadata recordMetadata, String key, String value, Long nowMs) {
+ assert recordMetadata != null : "Expected non-null recordMetadata object.";
+
+ Map successData = new HashMap<>();
+ successData.put("class", this.getClass().toString());
+ successData.put("name", "producer_send_success");
+
+ successData.put("time_ms", nowMs);
+ successData.put("topic", this.topic);
+ successData.put("partition", recordMetadata.partition());
+ successData.put("offset", recordMetadata.offset());
+ successData.put("key", key);
+ successData.put("value", value);
+
+ return toJsonString(successData);
+ }
+
+ private String toJsonString(Map data) {
+ String json;
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ json = mapper.writeValueAsString(data);
+ } catch (JsonProcessingException e) {
+ json = "Bad data can't be written as json: " + e.getMessage();
+ }
+ return json;
+ }
+
+ /** Callback which prints errors to stdout when the producer fails to send. */
+ private class PrintInfoCallback implements Callback {
+
+ private String key;
+ private String value;
+
+ PrintInfoCallback(String key, String value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
+ synchronized (System.out) {
+ if (e == null) {
+ VerifiableProducer.this.numAcked++;
+ System.out.println(successString(recordMetadata, this.key, this.value, System.currentTimeMillis()));
+ } else {
+ System.out.println(errorString(e, this.key, this.value, System.currentTimeMillis()));
+ }
+ }
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+
+ final VerifiableProducer producer = createFromArgs(args);
+ final long startMs = System.currentTimeMillis();
+ boolean infinite = producer.maxMessages < 0;
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ // Trigger main thread to stop producing messages
+ producer.stopProducing = true;
+
+ // Flush any remaining messages
+ producer.close();
+
+ // Print a summary
+ long stopMs = System.currentTimeMillis();
+ double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));
+
+ Map data = new HashMap<>();
+ data.put("class", producer.getClass().toString());
+ data.put("name", "tool_data");
+ data.put("sent", producer.numSent);
+ data.put("acked", producer.numAcked);
+ data.put("target_throughput", producer.throughput);
+ data.put("avg_throughput", avgThroughput);
+
+ System.out.println(producer.toJsonString(data));
+ }
+ });
+
+ ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs);
+ for (int i = 0; i < producer.maxMessages || infinite; i++) {
+ if (producer.stopProducing) {
+ break;
+ }
+ long sendStartMs = System.currentTimeMillis();
+ producer.send(null, String.format("%d", i));
+
+ if (throttler.shouldThrottle(i, sendStartMs)) {
+ throttler.throttle();
+ }
+ }
+ }
+
+}
diff --git a/vagrant/aws/aws-access-keys-commands b/vagrant/aws/aws-access-keys-commands
new file mode 100644
index 00000000000..9c923f86f3f
--- /dev/null
+++ b/vagrant/aws/aws-access-keys-commands
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+if [ -z "$AWS_IAM" ];then
+ echo "Warning: AWS_IAM is not set"
+fi
+
+export AWS_ACCESS_KEY=`curl -s http://169.254.169.254/latest/meta-data/iam/security-credentials/$AWS_IAM | grep AccessKeyId | awk -F\" '{ print $4 }'`
+export AWS_SECRET_KEY=`curl -s http://169.254.169.254/latest/meta-data/iam/security-credentials/$AWS_IAM | grep SecretAccessKey | awk -F\" '{ print $4 }'`
+export AWS_SESSION_TOKEN=`curl -s http://169.254.169.254/latest/meta-data/iam/security-credentials/$AWS_IAM | grep Token | awk -F\" '{ print $4 }'`
+
+if [ -z "$AWS_ACCESS_KEY" ]; then
+ echo "Failed to populate environment variables AWS_ACCESS_KEY, AWS_SECRET_KEY, and AWS_SESSION_TOKEN."
+ echo "AWS_IAM is currently $AWS_IAM. Double-check that this is correct. If not set, add this command to your .bashrc file:"
+ echo "export AWS_IAM= # put this into your ~/.bashrc"
+fi
diff --git a/vagrant/aws/aws-example-Vagrantfile.local b/vagrant/aws/aws-example-Vagrantfile.local
new file mode 100644
index 00000000000..c3b075b9420
--- /dev/null
+++ b/vagrant/aws/aws-example-Vagrantfile.local
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# Use this template Vagrantfile.local for running system tests on aws
+# To use it, move it to the base kafka directory and rename
+# it to Vagrantfile.local, and adjust variables as needed.
+ec3_instance_type = "m3.medium"
+num_zookeepers = 0
+num_brokers = 0
+num_workers = 9
+ec2_keypair_name = kafkatest
+ec2_keypair_file = ../kafkatest.pem
+ec2_security_groups = ['kafkatest']
+ec2_region = 'us-west-2'
+ec2_ami = "ami-29ebb519"
diff --git a/vagrant/aws/aws-init.sh b/vagrant/aws/aws-init.sh
new file mode 100755
index 00000000000..61519280db3
--- /dev/null
+++ b/vagrant/aws/aws-init.sh
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+#!/bin/bash
+
+# This script can be used to set up a driver machine on aws from which you will run tests
+# or bring up your mini Kafka cluster.
+
+# Install dependencies
+sudo apt-get install -y maven openjdk-6-jdk build-essential \
+ ruby-dev zlib1g-dev realpath python-setuptools
+
+base_dir=`dirname $0`/../..
+
+if [ -z `which vagrant` ]; then
+ echo "Installing vagrant..."
+ wget https://dl.bintray.com/mitchellh/vagrant/vagrant_1.7.2_x86_64.deb
+ sudo dpkg -i vagrant_1.7.2_x86_64.deb
+ rm -f vagrant_1.7.2_x86_64.deb
+fi
+
+# Install necessary vagrant plugins
+# Note: Do NOT install vagrant-cachier since it doesn't work on AWS and only
+# adds log noise
+vagrant_plugins="vagrant-aws vagrant-hostmanager"
+existing=`vagrant plugin list`
+for plugin in $vagrant_plugins; do
+ echo $existing | grep $plugin > /dev/null
+ if [ $? != 0 ]; then
+ vagrant plugin install $plugin
+ fi
+done
+
+# Create Vagrantfile.local as a convenience
+if [ ! -e "$base_dir/Vagrantfile.local" ]; then
+ cp $base_dir/aws/aws-example-Vagrantfile.local $base_dir/Vagrantfile.local
+fi
+
+gradle="gradle-2.2.1"
+if [ -z `which gradle` ] && [ ! -d $base_dir/$gradle ]; then
+ if [ ! -e $gradle-bin.zip ]; then
+ wget https://services.gradle.org/distributions/$gradle-bin.zip
+ fi
+ unzip $gradle-bin.zip
+ rm -rf $gradle-bin.zip
+ mv $gradle $base_dir/$gradle
+fi
+
+# Ensure aws access keys are in the environment when we use a EC2 driver machine
+LOCAL_HOSTNAME=$(hostname -d)
+if [[ ${LOCAL_HOSTNAME} =~ .*\.compute\.internal ]]; then
+ grep "AWS ACCESS KEYS" ~/.bashrc > /dev/null
+ if [ $? != 0 ]; then
+ echo "# --- AWS ACCESS KEYS ---" >> ~/.bashrc
+ echo ". `realpath $base_dir/aws/aws-access-keys-commands`" >> ~/.bashrc
+ echo "# -----------------------" >> ~/.bashrc
+ source ~/.bashrc
+ fi
+fi
+
diff --git a/vagrant/base.sh b/vagrant/base.sh
index 6f28dfed678..133f10a9562 100644
--- a/vagrant/base.sh
+++ b/vagrant/base.sh
@@ -41,3 +41,12 @@ chmod a+rw /opt
if [ ! -e /opt/kafka ]; then
ln -s /vagrant /opt/kafka
fi
+
+# For EC2 nodes, we want to use /mnt, which should have the local disk. On local
+# VMs, we can just create it if it doesn't exist and use it like we'd use
+# /tmp. Eventually, we'd like to also support more directories, e.g. when EC2
+# instances have multiple local disks.
+if [ ! -e /mnt ]; then
+ mkdir /mnt
+fi
+chmod a+rwx /mnt
diff --git a/vagrant/system-test-Vagrantfile.local b/vagrant/system-test-Vagrantfile.local
new file mode 100644
index 00000000000..7f280a4653c
--- /dev/null
+++ b/vagrant/system-test-Vagrantfile.local
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# Use this example Vagrantfile.local for running system tests
+# To use it, move it to the base kafka directory and rename
+# it to Vagrantfile.local
+num_zookeepers = 0
+num_brokers = 0
+num_workers = 9