mirror of https://github.com/apache/kafka.git
Use argparse4j instead of joptsimple. ThroughputThrottler now has more intuitive behavior when targetThroughput is 0.
This commit is contained in:
parent
a80a4282ba
commit
51a94fd6ec
|
@ -352,7 +352,7 @@ project(':clients') {
|
||||||
compile "org.slf4j:slf4j-api:1.7.6"
|
compile "org.slf4j:slf4j-api:1.7.6"
|
||||||
compile 'org.xerial.snappy:snappy-java:1.1.1.6'
|
compile 'org.xerial.snappy:snappy-java:1.1.1.6'
|
||||||
compile 'net.jpountz.lz4:lz4:1.2.0'
|
compile 'net.jpountz.lz4:lz4:1.2.0'
|
||||||
compile 'net.sf.jopt-simple:jopt-simple:4.8'
|
compile 'net.sourceforge.argparse4j:argparse4j:0.5.0'
|
||||||
compile 'com.googlecode.json-simple:json-simple:1.1.1'
|
compile 'com.googlecode.json-simple:json-simple:1.1.1'
|
||||||
|
|
||||||
testCompile 'com.novocode:junit-interface:0.9'
|
testCompile 'com.novocode:junit-interface:0.9'
|
||||||
|
|
|
@ -93,7 +93,7 @@
|
||||||
<allow pkg="org.apache.kafka.clients.producer" />
|
<allow pkg="org.apache.kafka.clients.producer" />
|
||||||
<allow pkg="org.apache.kafka.clients.consumer" />
|
<allow pkg="org.apache.kafka.clients.consumer" />
|
||||||
<allow pkg="org.json.simple" />
|
<allow pkg="org.json.simple" />
|
||||||
<allow pkg="joptsimple" />
|
<allow pkg="net.sourceforge.argparse4j" />
|
||||||
</subpackage>
|
</subpackage>
|
||||||
</subpackage>
|
</subpackage>
|
||||||
|
|
||||||
|
|
|
@ -21,8 +21,9 @@ package org.apache.kafka.clients.tools;
|
||||||
/**
|
/**
|
||||||
* This class helps producers throttle throughput.
|
* This class helps producers throttle throughput.
|
||||||
*
|
*
|
||||||
* The resulting average throughput will be approximately
|
* If targetThroughput >= 0, the resulting average throughput will be approximately
|
||||||
* min(targetThroughput, maximumPossibleThroughput)
|
* min(targetThroughput, maximumPossibleThroughput). If targetThroughput < 0,
|
||||||
|
* no throttling will occur.
|
||||||
*
|
*
|
||||||
* To use, do this between successive send attempts:
|
* To use, do this between successive send attempts:
|
||||||
* <pre>
|
* <pre>
|
||||||
|
@ -53,7 +54,9 @@ public class ThroughputThrottler {
|
||||||
public ThroughputThrottler(long targetThroughput, long startMs) {
|
public ThroughputThrottler(long targetThroughput, long startMs) {
|
||||||
this.startMs = startMs;
|
this.startMs = startMs;
|
||||||
this.targetThroughput = targetThroughput;
|
this.targetThroughput = targetThroughput;
|
||||||
this.sleepTimeNs = NS_PER_SEC / targetThroughput;
|
this.sleepTimeNs = targetThroughput > 0 ?
|
||||||
|
NS_PER_SEC / targetThroughput :
|
||||||
|
Long.MAX_VALUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -63,7 +66,7 @@ public class ThroughputThrottler {
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
|
public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
|
||||||
if (this.targetThroughput <= 0) {
|
if (this.targetThroughput < 0) {
|
||||||
// No throttling in this case
|
// No throttling in this case
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -72,7 +75,21 @@ public class ThroughputThrottler {
|
||||||
return elapsedMs > 0 && (amountSoFar / elapsedMs) > this.targetThroughput;
|
return elapsedMs > 0 && (amountSoFar / elapsedMs) > this.targetThroughput;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Occasionally blocks for small amounts of time to achieve targetThroughput.
|
||||||
|
*
|
||||||
|
* Note that if targetThroughput is 0, this will block extremely aggressively.
|
||||||
|
*/
|
||||||
public void throttle() {
|
public void throttle() {
|
||||||
|
if (targetThroughput == 0) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(Long.MAX_VALUE);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// throttle throughput by sleeping, on average,
|
// throttle throughput by sleeping, on average,
|
||||||
// (1 / this.throughput) seconds between "things sent"
|
// (1 / this.throughput) seconds between "things sent"
|
||||||
sleepDeficitNs += sleepTimeNs;
|
sleepDeficitNs += sleepTimeNs;
|
||||||
|
|
|
@ -27,15 +27,14 @@ import org.apache.kafka.clients.producer.RecordMetadata;
|
||||||
import org.json.simple.JSONObject;
|
import org.json.simple.JSONObject;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
import joptsimple.ArgumentAcceptingOptionSpec;
|
import static net.sourceforge.argparse4j.impl.Arguments.store;
|
||||||
import joptsimple.OptionParser;
|
|
||||||
import joptsimple.OptionSet;
|
import net.sourceforge.argparse4j.ArgumentParsers;
|
||||||
import joptsimple.OptionSpec;
|
import net.sourceforge.argparse4j.inf.ArgumentParser;
|
||||||
import joptsimple.OptionSpecBuilder;
|
import net.sourceforge.argparse4j.inf.ArgumentParserException;
|
||||||
|
import net.sourceforge.argparse4j.inf.Namespace;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Primarily intended for use with system testing, this producer prints metadata
|
* Primarily intended for use with system testing, this producer prints metadata
|
||||||
|
@ -52,11 +51,7 @@ import joptsimple.OptionSpecBuilder;
|
||||||
*/
|
*/
|
||||||
public class VerifiableProducer {
|
public class VerifiableProducer {
|
||||||
|
|
||||||
OptionParser commandLineParser;
|
|
||||||
Map<String, OptionSpec<?>> commandLineOptions = new HashMap<String, OptionSpec<?>>();
|
|
||||||
|
|
||||||
String topic;
|
String topic;
|
||||||
private Properties producerProps = new Properties();
|
|
||||||
private Producer<String, String> producer;
|
private Producer<String, String> producer;
|
||||||
// If maxMessages < 0, produce until the process is killed externally
|
// If maxMessages < 0, produce until the process is killed externally
|
||||||
private long maxMessages = -1;
|
private long maxMessages = -1;
|
||||||
|
@ -66,85 +61,114 @@ public class VerifiableProducer {
|
||||||
|
|
||||||
// Number of send attempts
|
// Number of send attempts
|
||||||
private long numSent = 0;
|
private long numSent = 0;
|
||||||
|
|
||||||
|
// Throttle message throughput if this is set >= 0
|
||||||
private long throughput;
|
private long throughput;
|
||||||
|
|
||||||
/** Construct with command-line arguments */
|
// Hook to trigger producing thread to stop sending messages
|
||||||
public VerifiableProducer(String[] args) throws IOException {
|
private boolean stopProducing = false;
|
||||||
this.configureParser();
|
|
||||||
this.parseCommandLineArgs(args);
|
public VerifiableProducer(
|
||||||
|
Properties producerProps, String topic, int throughput, int maxMessages) {
|
||||||
|
|
||||||
|
this.topic = topic;
|
||||||
|
this.throughput = throughput;
|
||||||
|
this.maxMessages = maxMessages;
|
||||||
this.producer = new KafkaProducer<String, String>(producerProps);
|
this.producer = new KafkaProducer<String, String>(producerProps);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Set up the command-line options. */
|
/** Get the command-line argument parser. */
|
||||||
private void configureParser() {
|
private static ArgumentParser argParser() {
|
||||||
this.commandLineParser = new OptionParser();
|
ArgumentParser parser = ArgumentParsers
|
||||||
ArgumentAcceptingOptionSpec<String> topicOpt = commandLineParser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
|
.newArgumentParser("verifiable-producer")
|
||||||
.withRequiredArg()
|
.defaultHelp(true)
|
||||||
.required()
|
.description("This tool produces increasing integers to the specified topic and prints JSON metadata to stdout on each \"send\" request, making externally visible which messages have been acked and which have not.");
|
||||||
.describedAs("topic")
|
|
||||||
.ofType(String.class);
|
|
||||||
commandLineOptions.put("topic", topicOpt);
|
|
||||||
|
|
||||||
ArgumentAcceptingOptionSpec<String> brokerListOpt = commandLineParser.accepts("broker-list", "REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
|
parser.addArgument("--topic")
|
||||||
.withRequiredArg()
|
.action(store())
|
||||||
.required()
|
.required(true)
|
||||||
.describedAs("broker-list")
|
.type(String.class)
|
||||||
.ofType(String.class);
|
.metavar("TOPIC")
|
||||||
commandLineOptions.put("broker-list", brokerListOpt);
|
.help("Produce messages to this topic.");
|
||||||
|
|
||||||
|
parser.addArgument("--broker-list")
|
||||||
|
.action(store())
|
||||||
|
.required(true)
|
||||||
|
.type(String.class)
|
||||||
|
.metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
|
||||||
|
.dest("brokerList")
|
||||||
|
.help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
|
||||||
|
|
||||||
ArgumentAcceptingOptionSpec<String> numMessagesOpt = commandLineParser.accepts("max-messages", "Produce this many messages. Default: -1, produces messages until the process is killed externally.")
|
parser.addArgument("--max-messages")
|
||||||
.withOptionalArg()
|
.action(store())
|
||||||
.defaultsTo("-1")
|
.required(false)
|
||||||
.describedAs("max-messages")
|
.setDefault(-1)
|
||||||
.ofType(String.class);
|
.type(Integer.class)
|
||||||
commandLineOptions.put("max-messages", numMessagesOpt);
|
.metavar("MAX-MESSAGES")
|
||||||
|
.dest("maxMessages")
|
||||||
|
.help("Produce this many messages. If -1, produce messages until the process is killed externally.");
|
||||||
|
|
||||||
ArgumentAcceptingOptionSpec<String> throughputOpt = commandLineParser.accepts("throughput", "Average message throughput, in messages/sec. Default: -1, results in no throttling.")
|
parser.addArgument("--throughput")
|
||||||
.withOptionalArg()
|
.action(store())
|
||||||
.defaultsTo("-1")
|
.required(false)
|
||||||
.describedAs("throughput")
|
.setDefault(-1)
|
||||||
.ofType(String.class);
|
.type(Integer.class)
|
||||||
commandLineOptions.put("throughput", throughputOpt);
|
.metavar("THROUGHPUT")
|
||||||
|
.help("If set >= 0, throttle maximum message throughput to *approximately* THROUGHPUT messages/sec.");
|
||||||
|
|
||||||
ArgumentAcceptingOptionSpec<String> acksOpt = commandLineParser.accepts("acks", "number of acks required. Default: -1")
|
parser.addArgument("--acks")
|
||||||
.withOptionalArg()
|
.action(store())
|
||||||
.defaultsTo("-1")
|
.required(false)
|
||||||
.describedAs("acks")
|
.setDefault(-1)
|
||||||
.ofType(String.class);
|
.type(Integer.class)
|
||||||
commandLineOptions.put("acks", acksOpt);
|
.choices(0, 1, -1)
|
||||||
|
.metavar("ACKS")
|
||||||
|
.help("Acks required on each produced message. See Kafka docs on request.required.acks for details.");
|
||||||
|
|
||||||
OptionSpecBuilder helpOpt = commandLineParser.accepts("help", "Print this message.");
|
return parser;
|
||||||
commandLineOptions.put("help", helpOpt);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Validate command-line arguments and parse into properties object. */
|
/** Construct a VerifiableProducer object from command-line arguments. */
|
||||||
public void parseCommandLineArgs(String[] args) throws IOException {
|
public static VerifiableProducer createFromArgs(String[] args) {
|
||||||
|
ArgumentParser parser = argParser();
|
||||||
|
VerifiableProducer producer = null;
|
||||||
|
|
||||||
OptionSet options = commandLineParser.parse(args);
|
try {
|
||||||
if (options.has(commandLineOptions.get("help"))) {
|
Namespace res;
|
||||||
commandLineParser.printHelpOn(System.out);
|
res = parser.parseArgs(args);
|
||||||
System.exit(0);
|
System.out.println(res);
|
||||||
}
|
System.out.println(res.getString("brokerList"));
|
||||||
|
|
||||||
this.maxMessages = Integer.parseInt((String) options.valueOf("max-messages"));
|
|
||||||
this.topic = (String) options.valueOf("topic");
|
|
||||||
this.throughput = Long.parseLong((String) options.valueOf("throughput"));
|
|
||||||
|
|
||||||
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf("broker-list"));
|
int maxMessages = res.getInt("maxMessages");
|
||||||
|
String topic = res.getString("topic");
|
||||||
|
int throughput = res.getInt("throughput");
|
||||||
|
|
||||||
|
Properties producerProps = new Properties();
|
||||||
|
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));
|
||||||
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
|
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
|
||||||
"org.apache.kafka.common.serialization.StringSerializer");
|
"org.apache.kafka.common.serialization.StringSerializer");
|
||||||
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
|
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
|
||||||
"org.apache.kafka.common.serialization.StringSerializer");
|
"org.apache.kafka.common.serialization.StringSerializer");
|
||||||
producerProps.put(ProducerConfig.ACKS_CONFIG, options.valueOf("acks"));
|
producerProps.put(ProducerConfig.ACKS_CONFIG, Integer.toString(res.getInt("acks")));
|
||||||
|
|
||||||
// No producer retries
|
// No producer retries
|
||||||
producerProps.put("retries", "0");
|
producerProps.put("retries", "0");
|
||||||
|
|
||||||
|
producer = new VerifiableProducer(producerProps, topic, throughput, maxMessages);
|
||||||
|
} catch (ArgumentParserException e) {
|
||||||
|
if (args.length == 0) {
|
||||||
|
parser.printHelp();
|
||||||
|
System.exit(0);
|
||||||
|
} else {
|
||||||
|
parser.handleError(e);
|
||||||
|
System.exit(1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
return producer;
|
||||||
* Produce a message with given value and no key.
|
}
|
||||||
*/
|
|
||||||
|
/** Produce a message with given key and value. */
|
||||||
public void send(String key, String value) {
|
public void send(String key, String value) {
|
||||||
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value);
|
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value);
|
||||||
numSent++;
|
numSent++;
|
||||||
|
@ -158,7 +182,7 @@ public class VerifiableProducer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Need to close the producer to flush any remaining messages if we're in async mode. */
|
/** Close the producer to flush any remaining messages. */
|
||||||
public void close() {
|
public void close() {
|
||||||
producer.close();
|
producer.close();
|
||||||
}
|
}
|
||||||
|
@ -199,9 +223,7 @@ public class VerifiableProducer {
|
||||||
return obj.toJSONString();
|
return obj.toJSONString();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/** Callback which prints errors to stdout when the producer fails to send. */
|
||||||
* Callback which prints errors to stdout when the producer fails to send.
|
|
||||||
*/
|
|
||||||
private class PrintInfoCallback implements Callback {
|
private class PrintInfoCallback implements Callback {
|
||||||
|
|
||||||
private String key;
|
private String key;
|
||||||
|
@ -226,18 +248,22 @@ public class VerifiableProducer {
|
||||||
|
|
||||||
public static void main(String[] args) throws IOException {
|
public static void main(String[] args) throws IOException {
|
||||||
|
|
||||||
final VerifiableProducer producer = new VerifiableProducer(args);
|
final VerifiableProducer producer = createFromArgs(args);
|
||||||
final long startMs = System.currentTimeMillis();
|
final long startMs = System.currentTimeMillis();
|
||||||
boolean infinite = producer.maxMessages < 0;
|
boolean infinite = producer.maxMessages < 0;
|
||||||
|
|
||||||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
// Trigger main thread to stop producing messages
|
||||||
|
producer.stopProducing = true;
|
||||||
|
|
||||||
|
// Flush any remaining messages
|
||||||
producer.close();
|
producer.close();
|
||||||
|
|
||||||
|
// Print a summary
|
||||||
long stopMs = System.currentTimeMillis();
|
long stopMs = System.currentTimeMillis();
|
||||||
double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));
|
double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));
|
||||||
|
|
||||||
JSONObject obj = new JSONObject();
|
JSONObject obj = new JSONObject();
|
||||||
obj.put("class", producer.getClass().toString());
|
obj.put("class", producer.getClass().toString());
|
||||||
obj.put("name", "tool_data");
|
obj.put("name", "tool_data");
|
||||||
|
@ -251,11 +277,16 @@ public class VerifiableProducer {
|
||||||
|
|
||||||
ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs);
|
ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs);
|
||||||
for (int i = 0; i < producer.maxMessages || infinite; i++) {
|
for (int i = 0; i < producer.maxMessages || infinite; i++) {
|
||||||
|
if (producer.stopProducing) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
long sendStartMs = System.currentTimeMillis();
|
long sendStartMs = System.currentTimeMillis();
|
||||||
producer.send(null, String.format("%d", i));
|
producer.send(null, String.format("%d", i));
|
||||||
|
|
||||||
if (throttler.shouldThrottle(i, sendStartMs)) {
|
if (throttler.shouldThrottle(i, sendStartMs)) {
|
||||||
throttler.throttle();
|
throttler.throttle();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue