Updated command-line options for VerifiableProducer. Extracted throughput logic to make it reusable.

This commit is contained in:
Geoff Anderson 2015-06-03 12:50:11 -07:00
parent 0a5de8e059
commit 9100417ce0
3 changed files with 162 additions and 109 deletions

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.tools;
/**
* This class helps producers throttle their maximum message throughput.
*
* The resulting average throughput will be approximately
* min(targetThroughput, maximumPossibleThroughput)
*
* To use, do this between successive send attempts:
* <pre>
* {@code
* if (throttler.shouldThrottle(...)) {
* throttler.throttle();
* }
* }
* </pre>
*/
public class MessageThroughputThrottler {
private static final long NS_PER_MS = 1000000L;
private static final long NS_PER_SEC = 1000 * NS_PER_MS;
private static final long MIN_SLEEP_NS = 2 * NS_PER_MS;
long sleepTimeNs;
long sleepDeficitNs = 0;
long targetThroughput = -1;
long startMs;
public MessageThroughputThrottler(long targetThroughput, long startMs) {
this.startMs = startMs;
this.targetThroughput = targetThroughput;
this.sleepTimeNs = NS_PER_SEC / targetThroughput;
}
public boolean shouldThrottle(long messageNum, long sendStartMs) {
if (this.targetThroughput <= 0) {
// No throttling in this case
return false;
}
float elapsedMs = (sendStartMs - startMs) / 1000.f;
return elapsedMs > 0 && (messageNum / elapsedMs) > this.targetThroughput;
}
public void throttle() {
// throttle message throughput by sleeping, on average,
// (1 / this.throughput) seconds between each sent message
sleepDeficitNs += sleepTimeNs;
// If enough sleep deficit has accumulated, sleep a little
if (sleepDeficitNs >= MIN_SLEEP_NS) {
long sleepMs = sleepDeficitNs / 1000000;
long sleepNs = sleepDeficitNs - sleepMs * 1000000;
long sleepStartNs = System.nanoTime();
try {
Thread.sleep(sleepMs, (int) sleepNs);
sleepDeficitNs = 0;
} catch (InterruptedException e) {
// If sleep is cut short, reduce deficit by the amount of
// time we actually spent sleeping
long sleepElapsedNs = System.nanoTime() - sleepStartNs;
if (sleepElapsedNs <= sleepDeficitNs) {
sleepDeficitNs -= sleepElapsedNs;
}
}
}
}
}

View File

@ -19,10 +19,6 @@ import org.apache.kafka.clients.producer.*;
public class ProducerPerformance { public class ProducerPerformance {
private static final long NS_PER_MS = 1000000L;
private static final long NS_PER_SEC = 1000 * NS_PER_MS;
private static final long MIN_SLEEP_NS = 2 * NS_PER_MS;
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
if (args.length < 4) { if (args.length < 4) {
System.err.println("USAGE: java " + ProducerPerformance.class.getName() + System.err.println("USAGE: java " + ProducerPerformance.class.getName() +
@ -51,31 +47,17 @@ public class ProducerPerformance {
byte[] payload = new byte[recordSize]; byte[] payload = new byte[recordSize];
Arrays.fill(payload, (byte) 1); Arrays.fill(payload, (byte) 1);
ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicName, payload); ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicName, payload);
long sleepTime = NS_PER_SEC / throughput;
long sleepDeficitNs = 0;
Stats stats = new Stats(numRecords, 5000); Stats stats = new Stats(numRecords, 5000);
long start = System.currentTimeMillis(); long startMs = System.currentTimeMillis();
MessageThroughputThrottler throttler = new MessageThroughputThrottler(throughput, startMs);
for (int i = 0; i < numRecords; i++) { for (int i = 0; i < numRecords; i++) {
long sendStart = System.currentTimeMillis(); long sendStartMs = System.currentTimeMillis();
Callback cb = stats.nextCompletion(sendStart, payload.length, stats); Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats);
producer.send(record, cb); producer.send(record, cb);
/* if (throttler.shouldThrottle(i, sendStartMs)) {
* Maybe sleep a little to control throughput. Sleep time can be a bit inaccurate for times < 1 ms so throttler.throttle();
* instead of sleeping each time instead wait until a minimum sleep time accumulates (the "sleep deficit")
* and then make up the whole deficit in one longer sleep.
*/
if (throughput > 0) {
float elapsed = (sendStart - start) / 1000.f;
if (elapsed > 0 && i / elapsed > throughput) {
sleepDeficitNs += sleepTime;
if (sleepDeficitNs >= MIN_SLEEP_NS) {
long sleepMs = sleepDeficitNs / 1000000;
long sleepNs = sleepDeficitNs - sleepMs * 1000000;
Thread.sleep(sleepMs, (int) sleepNs);
sleepDeficitNs = 0;
}
}
} }
} }

View File

@ -43,23 +43,29 @@ import joptsimple.OptionSpecBuilder;
* with end-to-end correctness tests by making externally visible which messages have been * with end-to-end correctness tests by making externally visible which messages have been
* acked and which have not. * acked and which have not.
* *
* When used as a command-line tool, it produces a fixed number of increasing integers. * When used as a command-line tool, it produces increasing integers. It will produce a
* fixed number of messages unless the default max-messages -1 is used, in which case
* it produces indefinitely.
*
* If logging is left enabled, log output on stdout can be easily ignored by checking * If logging is left enabled, log output on stdout can be easily ignored by checking
* whether a given line is valid JSON. * whether a given line is valid JSON.
*/ */
public class VerifiableProducer { public class VerifiableProducer {
private static final long NS_PER_MS = 1000000L;
private static final long NS_PER_SEC = 1000 * NS_PER_MS;
private static final long MIN_SLEEP_NS = 2 * NS_PER_MS;
OptionParser commandLineParser; OptionParser commandLineParser;
Map<String, OptionSpec<?>> commandLineOptions = new HashMap<String, OptionSpec<?>>(); Map<String, OptionSpec<?>> commandLineOptions = new HashMap<String, OptionSpec<?>>();
String topic; String topic;
private Properties producerProps = new Properties(); private Properties producerProps = new Properties();
private Producer<String, String> producer; private Producer<String, String> producer;
private int numMessages; // If maxMessages < 0, produce until the process is killed externally
private long maxMessages = -1;
// Number of messages for which acks were received
private long numAcked = 0;
// Number of send attempts
private long numSent = 0;
private long throughput; private long throughput;
/** Construct with command-line arguments */ /** Construct with command-line arguments */
@ -74,29 +80,40 @@ public class VerifiableProducer {
this.commandLineParser = new OptionParser(); this.commandLineParser = new OptionParser();
ArgumentAcceptingOptionSpec<String> topicOpt = commandLineParser.accepts("topic", "REQUIRED: The topic id to produce messages to.") ArgumentAcceptingOptionSpec<String> topicOpt = commandLineParser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
.withRequiredArg() .withRequiredArg()
.required()
.describedAs("topic") .describedAs("topic")
.ofType(String.class); .ofType(String.class);
commandLineOptions.put("topic", topicOpt); commandLineOptions.put("topic", topicOpt);
ArgumentAcceptingOptionSpec<String> brokerListOpt = commandLineParser.accepts("broker-list", "REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.") ArgumentAcceptingOptionSpec<String> brokerListOpt = commandLineParser.accepts("broker-list", "REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
.withRequiredArg() .withRequiredArg()
.required()
.describedAs("broker-list") .describedAs("broker-list")
.ofType(String.class); .ofType(String.class);
commandLineOptions.put("broker-list", brokerListOpt); commandLineOptions.put("broker-list", brokerListOpt);
ArgumentAcceptingOptionSpec<Integer> numMessagesOpt = commandLineParser.accepts("num-messages", "REQUIRED: The number of messages to produce.") ArgumentAcceptingOptionSpec<String> numMessagesOpt = commandLineParser.accepts("max-messages", "Produce this many messages. Default: -1, produces messages until the process is killed externally.")
.withRequiredArg() .withOptionalArg()
.describedAs("num-messages") .defaultsTo("-1")
.ofType(Integer.class); .describedAs("max-messages")
commandLineOptions.put("num-messages", numMessagesOpt); .ofType(String.class);
commandLineOptions.put("max-messages", numMessagesOpt);
ArgumentAcceptingOptionSpec<Long> throughputOpt = commandLineParser.accepts("throughput", "REQUIRED: Average message throughput, in messages/sec.") ArgumentAcceptingOptionSpec<String> throughputOpt = commandLineParser.accepts("throughput", "Average message throughput, in messages/sec. Default: -1, results in no throttling.")
.withRequiredArg() .withOptionalArg()
.defaultsTo("-1")
.describedAs("throughput") .describedAs("throughput")
.ofType(Long.class); .ofType(String.class);
commandLineOptions.put("throughput", throughputOpt); commandLineOptions.put("throughput", throughputOpt);
ArgumentAcceptingOptionSpec<String> acksOpt = commandLineParser.accepts("acks", "number of acks required. Default: -1")
.withOptionalArg()
.defaultsTo("-1")
.describedAs("acks")
.ofType(String.class);
commandLineOptions.put("acks", acksOpt);
OptionSpecBuilder helpOpt = commandLineParser.accepts("help", "Print this message."); OptionSpecBuilder helpOpt = commandLineParser.accepts("help", "Print this message.");
commandLineOptions.put("help", helpOpt); commandLineOptions.put("help", helpOpt);
} }
@ -104,50 +121,33 @@ public class VerifiableProducer {
/** Validate command-line arguments and parse into properties object. */ /** Validate command-line arguments and parse into properties object. */
public void parseCommandLineArgs(String[] args) throws IOException { public void parseCommandLineArgs(String[] args) throws IOException {
OptionSpec[] requiredArgs = new OptionSpec[]{commandLineOptions.get("topic"),
commandLineOptions.get("broker-list"),
commandLineOptions.get("num-messages"),
commandLineOptions.get("throughput")};
OptionSet options = commandLineParser.parse(args); OptionSet options = commandLineParser.parse(args);
if (options.has(commandLineOptions.get("help"))) { if (options.has(commandLineOptions.get("help"))) {
commandLineParser.printHelpOn(System.out); commandLineParser.printHelpOn(System.out);
System.exit(0); System.exit(0);
} }
checkRequiredArgs(commandLineParser, options, requiredArgs);
this.numMessages = (Integer) options.valueOf("num-messages"); this.maxMessages = Integer.parseInt((String) options.valueOf("max-messages"));
this.topic = (String) options.valueOf("topic"); this.topic = (String) options.valueOf("topic");
this.throughput = (Long) options.valueOf("throughput"); this.throughput = Long.parseLong((String) options.valueOf("throughput"));
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf("broker-list")); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf("broker-list"));
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer"); "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer"); "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.ACKS_CONFIG, "-1"); producerProps.put(ProducerConfig.ACKS_CONFIG, options.valueOf("acks"));
// No producer retries // No producer retries
producerProps.put("retries", "0"); producerProps.put("retries", "0");
} }
private static void checkRequiredArgs(
OptionParser parser, OptionSet options, OptionSpec[] required) throws IOException
{
for (OptionSpec arg : required) {
if (!options.has(arg)) {
System.err.println("Missing required argument \"" + arg + "\"");
parser.printHelpOn(System.err);
System.exit(1);
}
}
}
/** /**
* Produce a message with given value and no key. * Produce a message with given value and no key.
*/ */
public void send(String key, String value) { public void send(String key, String value) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value); ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value);
numSent++;
try { try {
producer.send(record, new PrintInfoCallback(key, value)); producer.send(record, new PrintInfoCallback(key, value));
} catch (Exception e) { } catch (Exception e) {
@ -215,6 +215,7 @@ public class VerifiableProducer {
public void onCompletion(RecordMetadata recordMetadata, Exception e) { public void onCompletion(RecordMetadata recordMetadata, Exception e) {
synchronized (System.out) { synchronized (System.out) {
if (e == null) { if (e == null) {
VerifiableProducer.this.numAcked++;
System.out.println(successString(recordMetadata, this.key, this.value, System.currentTimeMillis())); System.out.println(successString(recordMetadata, this.key, this.value, System.currentTimeMillis()));
} else { } else {
System.out.println(errorString(e, this.key, this.value, System.currentTimeMillis())); System.out.println(errorString(e, this.key, this.value, System.currentTimeMillis()));
@ -225,55 +226,36 @@ public class VerifiableProducer {
public static void main(String[] args) throws IOException { public static void main(String[] args) throws IOException {
VerifiableProducer producer = new VerifiableProducer(args); final VerifiableProducer producer = new VerifiableProducer(args);
final long startMs = System.currentTimeMillis();
boolean infinite = producer.maxMessages < 0;
long sleepTimeNs = NS_PER_SEC / producer.throughput; Runtime.getRuntime().addShutdownHook(new Thread() {
long sleepDeficitNs = 0; @Override
long startMs = System.currentTimeMillis(); public void run() {
for (int i = 0; i < producer.numMessages; i++) {
long sendStartMs = System.currentTimeMillis();
producer.send(null, String.format("%d", i));
// throttle message throughput by sleeping, on average,
// (NS_PER_SEC / producer.throughput) nanoseconds between each sent message
if (producer.throughput > 0) {
float elapsedMs = (sendStartMs - startMs) / 1000.f;
if (elapsedMs > 0 && i / elapsedMs > producer.throughput) {
sleepDeficitNs += sleepTimeNs;
// If enough sleep deficit has accumulated, sleep a little
if (sleepDeficitNs >= MIN_SLEEP_NS) {
long sleepMs = sleepDeficitNs / 1000000;
long sleepNs = sleepDeficitNs - sleepMs * 1000000;
long sleepStartNs = System.nanoTime();
try {
Thread.sleep(sleepMs, (int) sleepNs);
sleepDeficitNs = 0;
} catch (InterruptedException e) {
// If sleep is cut short, reduce deficit by the amount of
// time we actually spent sleeping
long sleepElapsedNs = System.nanoTime() - sleepStartNs;
if (sleepElapsedNs <= sleepDeficitNs) {
sleepDeficitNs -= sleepElapsedNs;
}
}
}
}
}
}
producer.close(); producer.close();
long stopMs = System.currentTimeMillis(); long stopMs = System.currentTimeMillis();
double avgThroughput = 1000 * ((producer.numMessages) / (double) (stopMs - startMs)); double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));
JSONObject obj = new JSONObject(); JSONObject obj = new JSONObject();
obj.put("class", producer.getClass().toString()); obj.put("class", producer.getClass().toString());
obj.put("name", "tool_data"); obj.put("name", "tool_data");
obj.put("sent", producer.numSent);
obj.put("acked", producer.numAcked);
obj.put("target_throughput", producer.throughput); obj.put("target_throughput", producer.throughput);
obj.put("avg_throughput", avgThroughput); obj.put("avg_throughput", avgThroughput);
System.out.println(obj.toJSONString()); System.out.println(obj.toJSONString());
} }
});
MessageThroughputThrottler throttler = new MessageThroughputThrottler(producer.throughput, startMs);
for (int i = 0; i < producer.maxMessages || infinite; i++) {
long sendStartMs = System.currentTimeMillis();
producer.send(null, String.format("%d", i));
if (throttler.shouldThrottle(i, sendStartMs)) {
throttler.throttle();
}
}
}
} }