Minor updates to VerboseProducer

This commit is contained in:
Geoff Anderson 2015-05-29 16:38:32 -07:00
parent 2777712b39
commit 8b4b1f2aa9
1 changed files with 16 additions and 10 deletions

View File

@ -19,6 +19,11 @@ import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import kafka.utils.CommandLineUtils;
/**
* Primarily intended for use with system testing, this producer produces a fixed number of
* increasing integers and prints metadata in the form of JSON to stdout for each failed or
* successful produce attempt.
*/
public class VerboseProducer {
OptionParser commandLineParser;
Map<String, OptionSpec<?>> commandLineOptions = new HashMap<String, OptionSpec<?>>();
@ -51,15 +56,11 @@ public class VerboseProducer {
commandLineOptions.put("broker-list", brokerListOpt);
ArgumentAcceptingOptionSpec<String> numMessagesOpt = commandLineParser.accepts("num-messages", "REQUIRED: The number of messages to produce.")
ArgumentAcceptingOptionSpec<Integer> numMessagesOpt = commandLineParser.accepts("num-messages", "REQUIRED: The number of messages to produce.")
.withRequiredArg()
.describedAs("num-messages")
.ofType(String.class);
commandLineOptions.put("num-messages", numMessagesOpt);
// syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
// val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue()
.ofType(Integer.class);
commandLineOptions.put("num-messages", numMessagesOpt);
OptionSpecBuilder helpOpt
= commandLineParser.accepts("help", "Print this message.");
@ -84,7 +85,7 @@ public class VerboseProducer {
}
checkRequiredArgs(commandLineParser, options, requiredArgs);
this.numMessages = Integer.parseInt((String) options.valueOf("num-messages"));
this.numMessages = (Integer) options.valueOf("num-messages");
this.topic = (String) options.valueOf("topic");
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf("broker-list"));
@ -115,7 +116,12 @@ public class VerboseProducer {
*/
public void send(String value) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, value);
producer.send(record, new PrintInfoCallback(value));
try {
producer.send(record, new PrintInfoCallback(value));
}
catch (Exception e) {
System.out.println(errorString(e, value));
}
}
/** Need to close the producer to flush any remaining messages if we're in async mode. */
@ -176,4 +182,4 @@ public class VerboseProducer {
producer.close();
}
}
}