Use argparse4j instead of joptsimple. ThroughputThrottler now has more intuitive behavior when targetThroughput is 0.

This commit is contained in:
Geoff Anderson 2015-06-04 13:55:02 -07:00
parent a80a4282ba
commit 51a94fd6ec
4 changed files with 140 additions and 92 deletions

View File

@ -352,7 +352,7 @@ project(':clients') {
compile "org.slf4j:slf4j-api:1.7.6"
compile 'org.xerial.snappy:snappy-java:1.1.1.6'
compile 'net.jpountz.lz4:lz4:1.2.0'
compile 'net.sf.jopt-simple:jopt-simple:4.8'
compile 'net.sourceforge.argparse4j:argparse4j:0.5.0'
compile 'com.googlecode.json-simple:json-simple:1.1.1'
testCompile 'com.novocode:junit-interface:0.9'

View File

@ -93,7 +93,7 @@
<allow pkg="org.apache.kafka.clients.producer" />
<allow pkg="org.apache.kafka.clients.consumer" />
<allow pkg="org.json.simple" />
<allow pkg="joptsimple" />
<allow pkg="net.sourceforge.argparse4j" />
</subpackage>
</subpackage>

View File

@ -21,8 +21,9 @@ package org.apache.kafka.clients.tools;
/**
* This class helps producers throttle throughput.
*
* The resulting average throughput will be approximately
* min(targetThroughput, maximumPossibleThroughput)
* If targetThroughput >= 0, the resulting average throughput will be approximately
* min(targetThroughput, maximumPossibleThroughput). If targetThroughput < 0,
* no throttling will occur.
*
* To use, do this between successive send attempts:
* <pre>
@ -53,7 +54,9 @@ public class ThroughputThrottler {
public ThroughputThrottler(long targetThroughput, long startMs) {
this.startMs = startMs;
this.targetThroughput = targetThroughput;
this.sleepTimeNs = NS_PER_SEC / targetThroughput;
this.sleepTimeNs = targetThroughput > 0 ?
NS_PER_SEC / targetThroughput :
Long.MAX_VALUE;
}
/**
@ -63,7 +66,7 @@ public class ThroughputThrottler {
* @return
*/
public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
if (this.targetThroughput <= 0) {
if (this.targetThroughput < 0) {
// No throttling in this case
return false;
}
@ -72,7 +75,21 @@ public class ThroughputThrottler {
return elapsedMs > 0 && (amountSoFar / elapsedMs) > this.targetThroughput;
}
/**
* Occasionally blocks for small amounts of time to achieve targetThroughput.
*
* Note that if targetThroughput is 0, this will block extremely aggressively.
*/
public void throttle() {
if (targetThroughput == 0) {
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
// do nothing
}
return;
}
// throttle throughput by sleeping, on average,
// (1 / this.throughput) seconds between "things sent"
sleepDeficitNs += sleepTimeNs;

View File

@ -27,15 +27,14 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.json.simple.JSONObject;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import static net.sourceforge.argparse4j.impl.Arguments.store;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
/**
* Primarily intended for use with system testing, this producer prints metadata
@ -51,12 +50,8 @@ import joptsimple.OptionSpecBuilder;
* whether a given line is valid JSON.
*/
public class VerifiableProducer {
OptionParser commandLineParser;
Map<String, OptionSpec<?>> commandLineOptions = new HashMap<String, OptionSpec<?>>();
String topic;
private Properties producerProps = new Properties();
private Producer<String, String> producer;
// If maxMessages < 0, produce until the process is killed externally
private long maxMessages = -1;
@ -66,85 +61,114 @@ public class VerifiableProducer {
// Number of send attempts
private long numSent = 0;
// Throttle message throughput if this is set >= 0
private long throughput;
// Hook to trigger producing thread to stop sending messages
private boolean stopProducing = false;
/** Construct with command-line arguments */
public VerifiableProducer(String[] args) throws IOException {
this.configureParser();
this.parseCommandLineArgs(args);
public VerifiableProducer(
Properties producerProps, String topic, int throughput, int maxMessages) {
this.topic = topic;
this.throughput = throughput;
this.maxMessages = maxMessages;
this.producer = new KafkaProducer<String, String>(producerProps);
}
/** Set up the command-line options. */
private void configureParser() {
this.commandLineParser = new OptionParser();
ArgumentAcceptingOptionSpec<String> topicOpt = commandLineParser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
.withRequiredArg()
.required()
.describedAs("topic")
.ofType(String.class);
commandLineOptions.put("topic", topicOpt);
ArgumentAcceptingOptionSpec<String> brokerListOpt = commandLineParser.accepts("broker-list", "REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
.withRequiredArg()
.required()
.describedAs("broker-list")
.ofType(String.class);
commandLineOptions.put("broker-list", brokerListOpt);
ArgumentAcceptingOptionSpec<String> numMessagesOpt = commandLineParser.accepts("max-messages", "Produce this many messages. Default: -1, produces messages until the process is killed externally.")
.withOptionalArg()
.defaultsTo("-1")
.describedAs("max-messages")
.ofType(String.class);
commandLineOptions.put("max-messages", numMessagesOpt);
ArgumentAcceptingOptionSpec<String> throughputOpt = commandLineParser.accepts("throughput", "Average message throughput, in messages/sec. Default: -1, results in no throttling.")
.withOptionalArg()
.defaultsTo("-1")
.describedAs("throughput")
.ofType(String.class);
commandLineOptions.put("throughput", throughputOpt);
/** Get the command-line argument parser. */
private static ArgumentParser argParser() {
ArgumentParser parser = ArgumentParsers
.newArgumentParser("verifiable-producer")
.defaultHelp(true)
.description("This tool produces increasing integers to the specified topic and prints JSON metadata to stdout on each \"send\" request, making externally visible which messages have been acked and which have not.");
ArgumentAcceptingOptionSpec<String> acksOpt = commandLineParser.accepts("acks", "number of acks required. Default: -1")
.withOptionalArg()
.defaultsTo("-1")
.describedAs("acks")
.ofType(String.class);
commandLineOptions.put("acks", acksOpt);
OptionSpecBuilder helpOpt = commandLineParser.accepts("help", "Print this message.");
commandLineOptions.put("help", helpOpt);
parser.addArgument("--topic")
.action(store())
.required(true)
.type(String.class)
.metavar("TOPIC")
.help("Produce messages to this topic.");
parser.addArgument("--broker-list")
.action(store())
.required(true)
.type(String.class)
.metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
.dest("brokerList")
.help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
parser.addArgument("--max-messages")
.action(store())
.required(false)
.setDefault(-1)
.type(Integer.class)
.metavar("MAX-MESSAGES")
.dest("maxMessages")
.help("Produce this many messages. If -1, produce messages until the process is killed externally.");
parser.addArgument("--throughput")
.action(store())
.required(false)
.setDefault(-1)
.type(Integer.class)
.metavar("THROUGHPUT")
.help("If set >= 0, throttle maximum message throughput to *approximately* THROUGHPUT messages/sec.");
parser.addArgument("--acks")
.action(store())
.required(false)
.setDefault(-1)
.type(Integer.class)
.choices(0, 1, -1)
.metavar("ACKS")
.help("Acks required on each produced message. See Kafka docs on request.required.acks for details.");
return parser;
}
/** Validate command-line arguments and parse into properties object. */
public void parseCommandLineArgs(String[] args) throws IOException {
/** Construct a VerifiableProducer object from command-line arguments. */
public static VerifiableProducer createFromArgs(String[] args) {
ArgumentParser parser = argParser();
VerifiableProducer producer = null;
try {
Namespace res;
res = parser.parseArgs(args);
System.out.println(res);
System.out.println(res.getString("brokerList"));
int maxMessages = res.getInt("maxMessages");
String topic = res.getString("topic");
int throughput = res.getInt("throughput");
OptionSet options = commandLineParser.parse(args);
if (options.has(commandLineOptions.get("help"))) {
commandLineParser.printHelpOn(System.out);
System.exit(0);
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.ACKS_CONFIG, Integer.toString(res.getInt("acks")));
// No producer retries
producerProps.put("retries", "0");
producer = new VerifiableProducer(producerProps, topic, throughput, maxMessages);
} catch (ArgumentParserException e) {
if (args.length == 0) {
parser.printHelp();
System.exit(0);
} else {
parser.handleError(e);
System.exit(1);
}
}
this.maxMessages = Integer.parseInt((String) options.valueOf("max-messages"));
this.topic = (String) options.valueOf("topic");
this.throughput = Long.parseLong((String) options.valueOf("throughput"));
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf("broker-list"));
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.ACKS_CONFIG, options.valueOf("acks"));
// No producer retries
producerProps.put("retries", "0");
return producer;
}
/**
* Produce a message with given value and no key.
*/
/** Produce a message with given key and value. */
public void send(String key, String value) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value);
numSent++;
@ -158,7 +182,7 @@ public class VerifiableProducer {
}
}
/** Need to close the producer to flush any remaining messages if we're in async mode. */
/** Close the producer to flush any remaining messages. */
public void close() {
producer.close();
}
@ -199,9 +223,7 @@ public class VerifiableProducer {
return obj.toJSONString();
}
/**
* Callback which prints errors to stdout when the producer fails to send.
*/
/** Callback which prints errors to stdout when the producer fails to send. */
private class PrintInfoCallback implements Callback {
private String key;
@ -226,18 +248,22 @@ public class VerifiableProducer {
public static void main(String[] args) throws IOException {
final VerifiableProducer producer = new VerifiableProducer(args);
final VerifiableProducer producer = createFromArgs(args);
final long startMs = System.currentTimeMillis();
boolean infinite = producer.maxMessages < 0;
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Trigger main thread to stop producing messages
producer.stopProducing = true;
// Flush any remaining messages
producer.close();
// Print a summary
long stopMs = System.currentTimeMillis();
double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));
JSONObject obj = new JSONObject();
obj.put("class", producer.getClass().toString());
obj.put("name", "tool_data");
@ -248,14 +274,19 @@ public class VerifiableProducer {
System.out.println(obj.toJSONString());
}
});
ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs);
for (int i = 0; i < producer.maxMessages || infinite; i++) {
if (producer.stopProducing) {
break;
}
long sendStartMs = System.currentTimeMillis();
producer.send(null, String.format("%d", i));
if (throttler.shouldThrottle(i, sendStartMs)) {
throttler.throttle();
}
}
}
}