From 8b4b1f2aa9681632ef65aa92dfd3066cd7d62851 Mon Sep 17 00:00:00 2001 From: Geoff Anderson Date: Fri, 29 May 2015 16:38:32 -0700 Subject: [PATCH] Minor updates to VerboseProducer --- .../scala/kafka/tools/VerboseProducer.java | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/tools/VerboseProducer.java b/core/src/main/scala/kafka/tools/VerboseProducer.java index 2ef26fee9cb..97214e14511 100644 --- a/core/src/main/scala/kafka/tools/VerboseProducer.java +++ b/core/src/main/scala/kafka/tools/VerboseProducer.java @@ -19,6 +19,11 @@ import joptsimple.OptionSpec; import joptsimple.OptionSpecBuilder; import kafka.utils.CommandLineUtils; +/** + * Primarily intended for use with system testing, this producer produces a fixed number of + * increasing integers and prints metadata in the form of JSON to stdout for each failed or + * successful produce attempt. + */ public class VerboseProducer { OptionParser commandLineParser; Map> commandLineOptions = new HashMap>(); @@ -51,15 +56,11 @@ public class VerboseProducer { commandLineOptions.put("broker-list", brokerListOpt); - ArgumentAcceptingOptionSpec numMessagesOpt = commandLineParser.accepts("num-messages", "REQUIRED: The number of messages to produce.") + ArgumentAcceptingOptionSpec numMessagesOpt = commandLineParser.accepts("num-messages", "REQUIRED: The number of messages to produce.") .withRequiredArg() .describedAs("num-messages") - .ofType(String.class); - commandLineOptions.put("num-messages", numMessagesOpt); - - // syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.") -// val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue() - + .ofType(Integer.class); + commandLineOptions.put("num-messages", numMessagesOpt); OptionSpecBuilder helpOpt = commandLineParser.accepts("help", "Print this message."); @@ -84,7 +85,7 @@ public class VerboseProducer { } checkRequiredArgs(commandLineParser, options, requiredArgs); - this.numMessages = Integer.parseInt((String) options.valueOf("num-messages")); + this.numMessages = (Integer) options.valueOf("num-messages"); this.topic = (String) options.valueOf("topic"); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf("broker-list")); @@ -115,7 +116,12 @@ public class VerboseProducer { */ public void send(String value) { ProducerRecord record = new ProducerRecord(topic, value); - producer.send(record, new PrintInfoCallback(value)); + try { + producer.send(record, new PrintInfoCallback(value)); + } + catch (Exception e) { + System.out.println(errorString(e, value)); + } } /** Need to close the producer to flush any remaining messages if we're in async mode. */ @@ -176,4 +182,4 @@ public class VerboseProducer { producer.close(); } -} \ No newline at end of file +}