Use argparse4j instead of joptsimple. ThroughputThrottler now has more intuitive behavior when targetThroughput is 0.

This commit is contained in:
Geoff Anderson 2015-06-04 13:55:02 -07:00
parent a80a4282ba
commit 51a94fd6ec
4 changed files with 140 additions and 92 deletions

View File

@ -352,7 +352,7 @@ project(':clients') {
compile "org.slf4j:slf4j-api:1.7.6" compile "org.slf4j:slf4j-api:1.7.6"
compile 'org.xerial.snappy:snappy-java:1.1.1.6' compile 'org.xerial.snappy:snappy-java:1.1.1.6'
compile 'net.jpountz.lz4:lz4:1.2.0' compile 'net.jpountz.lz4:lz4:1.2.0'
compile 'net.sf.jopt-simple:jopt-simple:4.8' compile 'net.sourceforge.argparse4j:argparse4j:0.5.0'
compile 'com.googlecode.json-simple:json-simple:1.1.1' compile 'com.googlecode.json-simple:json-simple:1.1.1'
testCompile 'com.novocode:junit-interface:0.9' testCompile 'com.novocode:junit-interface:0.9'

View File

@ -93,7 +93,7 @@
<allow pkg="org.apache.kafka.clients.producer" /> <allow pkg="org.apache.kafka.clients.producer" />
<allow pkg="org.apache.kafka.clients.consumer" /> <allow pkg="org.apache.kafka.clients.consumer" />
<allow pkg="org.json.simple" /> <allow pkg="org.json.simple" />
<allow pkg="joptsimple" /> <allow pkg="net.sourceforge.argparse4j" />
</subpackage> </subpackage>
</subpackage> </subpackage>

View File

@ -21,8 +21,9 @@ package org.apache.kafka.clients.tools;
/** /**
* This class helps producers throttle throughput. * This class helps producers throttle throughput.
* *
* The resulting average throughput will be approximately * If targetThroughput >= 0, the resulting average throughput will be approximately
* min(targetThroughput, maximumPossibleThroughput) * min(targetThroughput, maximumPossibleThroughput). If targetThroughput < 0,
* no throttling will occur.
* *
* To use, do this between successive send attempts: * To use, do this between successive send attempts:
* <pre> * <pre>
@ -53,7 +54,9 @@ public class ThroughputThrottler {
public ThroughputThrottler(long targetThroughput, long startMs) { public ThroughputThrottler(long targetThroughput, long startMs) {
this.startMs = startMs; this.startMs = startMs;
this.targetThroughput = targetThroughput; this.targetThroughput = targetThroughput;
this.sleepTimeNs = NS_PER_SEC / targetThroughput; this.sleepTimeNs = targetThroughput > 0 ?
NS_PER_SEC / targetThroughput :
Long.MAX_VALUE;
} }
/** /**
@ -63,7 +66,7 @@ public class ThroughputThrottler {
* @return * @return
*/ */
public boolean shouldThrottle(long amountSoFar, long sendStartMs) { public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
if (this.targetThroughput <= 0) { if (this.targetThroughput < 0) {
// No throttling in this case // No throttling in this case
return false; return false;
} }
@ -72,7 +75,21 @@ public class ThroughputThrottler {
return elapsedMs > 0 && (amountSoFar / elapsedMs) > this.targetThroughput; return elapsedMs > 0 && (amountSoFar / elapsedMs) > this.targetThroughput;
} }
/**
* Occasionally blocks for small amounts of time to achieve targetThroughput.
*
* Note that if targetThroughput is 0, this will block extremely aggressively.
*/
public void throttle() { public void throttle() {
if (targetThroughput == 0) {
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
// do nothing
}
return;
}
// throttle throughput by sleeping, on average, // throttle throughput by sleeping, on average,
// (1 / this.throughput) seconds between "things sent" // (1 / this.throughput) seconds between "things sent"
sleepDeficitNs += sleepTimeNs; sleepDeficitNs += sleepTimeNs;

View File

@ -27,15 +27,14 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.json.simple.JSONObject; import org.json.simple.JSONObject;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties; import java.util.Properties;
import joptsimple.ArgumentAcceptingOptionSpec; import static net.sourceforge.argparse4j.impl.Arguments.store;
import joptsimple.OptionParser;
import joptsimple.OptionSet; import net.sourceforge.argparse4j.ArgumentParsers;
import joptsimple.OptionSpec; import net.sourceforge.argparse4j.inf.ArgumentParser;
import joptsimple.OptionSpecBuilder; import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
/** /**
* Primarily intended for use with system testing, this producer prints metadata * Primarily intended for use with system testing, this producer prints metadata
@ -51,12 +50,8 @@ import joptsimple.OptionSpecBuilder;
* whether a given line is valid JSON. * whether a given line is valid JSON.
*/ */
public class VerifiableProducer { public class VerifiableProducer {
OptionParser commandLineParser;
Map<String, OptionSpec<?>> commandLineOptions = new HashMap<String, OptionSpec<?>>();
String topic; String topic;
private Properties producerProps = new Properties();
private Producer<String, String> producer; private Producer<String, String> producer;
// If maxMessages < 0, produce until the process is killed externally // If maxMessages < 0, produce until the process is killed externally
private long maxMessages = -1; private long maxMessages = -1;
@ -66,85 +61,114 @@ public class VerifiableProducer {
// Number of send attempts // Number of send attempts
private long numSent = 0; private long numSent = 0;
// Throttle message throughput if this is set >= 0
private long throughput; private long throughput;
// Hook to trigger producing thread to stop sending messages
private boolean stopProducing = false;
/** Construct with command-line arguments */ public VerifiableProducer(
public VerifiableProducer(String[] args) throws IOException { Properties producerProps, String topic, int throughput, int maxMessages) {
this.configureParser();
this.parseCommandLineArgs(args); this.topic = topic;
this.throughput = throughput;
this.maxMessages = maxMessages;
this.producer = new KafkaProducer<String, String>(producerProps); this.producer = new KafkaProducer<String, String>(producerProps);
} }
/** Set up the command-line options. */
private void configureParser() {
this.commandLineParser = new OptionParser();
ArgumentAcceptingOptionSpec<String> topicOpt = commandLineParser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
.withRequiredArg()
.required()
.describedAs("topic")
.ofType(String.class);
commandLineOptions.put("topic", topicOpt);
ArgumentAcceptingOptionSpec<String> brokerListOpt = commandLineParser.accepts("broker-list", "REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
.withRequiredArg()
.required()
.describedAs("broker-list")
.ofType(String.class);
commandLineOptions.put("broker-list", brokerListOpt);
ArgumentAcceptingOptionSpec<String> numMessagesOpt = commandLineParser.accepts("max-messages", "Produce this many messages. Default: -1, produces messages until the process is killed externally.")
.withOptionalArg()
.defaultsTo("-1")
.describedAs("max-messages")
.ofType(String.class);
commandLineOptions.put("max-messages", numMessagesOpt);
ArgumentAcceptingOptionSpec<String> throughputOpt = commandLineParser.accepts("throughput", "Average message throughput, in messages/sec. Default: -1, results in no throttling.") /** Get the command-line argument parser. */
.withOptionalArg() private static ArgumentParser argParser() {
.defaultsTo("-1") ArgumentParser parser = ArgumentParsers
.describedAs("throughput") .newArgumentParser("verifiable-producer")
.ofType(String.class); .defaultHelp(true)
commandLineOptions.put("throughput", throughputOpt); .description("This tool produces increasing integers to the specified topic and prints JSON metadata to stdout on each \"send\" request, making externally visible which messages have been acked and which have not.");
ArgumentAcceptingOptionSpec<String> acksOpt = commandLineParser.accepts("acks", "number of acks required. Default: -1") parser.addArgument("--topic")
.withOptionalArg() .action(store())
.defaultsTo("-1") .required(true)
.describedAs("acks") .type(String.class)
.ofType(String.class); .metavar("TOPIC")
commandLineOptions.put("acks", acksOpt); .help("Produce messages to this topic.");
OptionSpecBuilder helpOpt = commandLineParser.accepts("help", "Print this message."); parser.addArgument("--broker-list")
commandLineOptions.put("help", helpOpt); .action(store())
.required(true)
.type(String.class)
.metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
.dest("brokerList")
.help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
parser.addArgument("--max-messages")
.action(store())
.required(false)
.setDefault(-1)
.type(Integer.class)
.metavar("MAX-MESSAGES")
.dest("maxMessages")
.help("Produce this many messages. If -1, produce messages until the process is killed externally.");
parser.addArgument("--throughput")
.action(store())
.required(false)
.setDefault(-1)
.type(Integer.class)
.metavar("THROUGHPUT")
.help("If set >= 0, throttle maximum message throughput to *approximately* THROUGHPUT messages/sec.");
parser.addArgument("--acks")
.action(store())
.required(false)
.setDefault(-1)
.type(Integer.class)
.choices(0, 1, -1)
.metavar("ACKS")
.help("Acks required on each produced message. See Kafka docs on request.required.acks for details.");
return parser;
} }
/** Validate command-line arguments and parse into properties object. */ /** Construct a VerifiableProducer object from command-line arguments. */
public void parseCommandLineArgs(String[] args) throws IOException { public static VerifiableProducer createFromArgs(String[] args) {
ArgumentParser parser = argParser();
VerifiableProducer producer = null;
try {
Namespace res;
res = parser.parseArgs(args);
System.out.println(res);
System.out.println(res.getString("brokerList"));
int maxMessages = res.getInt("maxMessages");
String topic = res.getString("topic");
int throughput = res.getInt("throughput");
OptionSet options = commandLineParser.parse(args); Properties producerProps = new Properties();
if (options.has(commandLineOptions.get("help"))) { producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));
commandLineParser.printHelpOn(System.out); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
System.exit(0); "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.ACKS_CONFIG, Integer.toString(res.getInt("acks")));
// No producer retries
producerProps.put("retries", "0");
producer = new VerifiableProducer(producerProps, topic, throughput, maxMessages);
} catch (ArgumentParserException e) {
if (args.length == 0) {
parser.printHelp();
System.exit(0);
} else {
parser.handleError(e);
System.exit(1);
}
} }
this.maxMessages = Integer.parseInt((String) options.valueOf("max-messages")); return producer;
this.topic = (String) options.valueOf("topic");
this.throughput = Long.parseLong((String) options.valueOf("throughput"));
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf("broker-list"));
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.ACKS_CONFIG, options.valueOf("acks"));
// No producer retries
producerProps.put("retries", "0");
} }
/** /** Produce a message with given key and value. */
* Produce a message with given value and no key.
*/
public void send(String key, String value) { public void send(String key, String value) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value); ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value);
numSent++; numSent++;
@ -158,7 +182,7 @@ public class VerifiableProducer {
} }
} }
/** Need to close the producer to flush any remaining messages if we're in async mode. */ /** Close the producer to flush any remaining messages. */
public void close() { public void close() {
producer.close(); producer.close();
} }
@ -199,9 +223,7 @@ public class VerifiableProducer {
return obj.toJSONString(); return obj.toJSONString();
} }
/** /** Callback which prints errors to stdout when the producer fails to send. */
* Callback which prints errors to stdout when the producer fails to send.
*/
private class PrintInfoCallback implements Callback { private class PrintInfoCallback implements Callback {
private String key; private String key;
@ -226,18 +248,22 @@ public class VerifiableProducer {
public static void main(String[] args) throws IOException { public static void main(String[] args) throws IOException {
final VerifiableProducer producer = new VerifiableProducer(args); final VerifiableProducer producer = createFromArgs(args);
final long startMs = System.currentTimeMillis(); final long startMs = System.currentTimeMillis();
boolean infinite = producer.maxMessages < 0; boolean infinite = producer.maxMessages < 0;
Runtime.getRuntime().addShutdownHook(new Thread() { Runtime.getRuntime().addShutdownHook(new Thread() {
@Override @Override
public void run() { public void run() {
// Trigger main thread to stop producing messages
producer.stopProducing = true;
// Flush any remaining messages
producer.close(); producer.close();
// Print a summary
long stopMs = System.currentTimeMillis(); long stopMs = System.currentTimeMillis();
double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs)); double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));
JSONObject obj = new JSONObject(); JSONObject obj = new JSONObject();
obj.put("class", producer.getClass().toString()); obj.put("class", producer.getClass().toString());
obj.put("name", "tool_data"); obj.put("name", "tool_data");
@ -248,14 +274,19 @@ public class VerifiableProducer {
System.out.println(obj.toJSONString()); System.out.println(obj.toJSONString());
} }
}); });
ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs); ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs);
for (int i = 0; i < producer.maxMessages || infinite; i++) { for (int i = 0; i < producer.maxMessages || infinite; i++) {
if (producer.stopProducing) {
break;
}
long sendStartMs = System.currentTimeMillis(); long sendStartMs = System.currentTimeMillis();
producer.send(null, String.format("%d", i)); producer.send(null, String.format("%d", i));
if (throttler.shouldThrottle(i, sendStartMs)) { if (throttler.shouldThrottle(i, sendStartMs)) {
throttler.throttle(); throttler.throttle();
} }
} }
} }
} }