mirror of https://github.com/apache/kafka.git
Minor updates to VerboseProducer
This commit is contained in:
parent
2777712b39
commit
8b4b1f2aa9
|
@ -19,6 +19,11 @@ import joptsimple.OptionSpec;
|
|||
import joptsimple.OptionSpecBuilder;
|
||||
import kafka.utils.CommandLineUtils;
|
||||
|
||||
/**
|
||||
* Primarily intended for use with system testing, this producer produces a fixed number of
|
||||
* increasing integers and prints metadata in the form of JSON to stdout for each failed or
|
||||
* successful produce attempt.
|
||||
*/
|
||||
public class VerboseProducer {
|
||||
OptionParser commandLineParser;
|
||||
Map<String, OptionSpec<?>> commandLineOptions = new HashMap<String, OptionSpec<?>>();
|
||||
|
@ -51,15 +56,11 @@ public class VerboseProducer {
|
|||
commandLineOptions.put("broker-list", brokerListOpt);
|
||||
|
||||
|
||||
ArgumentAcceptingOptionSpec<String> numMessagesOpt = commandLineParser.accepts("num-messages", "REQUIRED: The number of messages to produce.")
|
||||
ArgumentAcceptingOptionSpec<Integer> numMessagesOpt = commandLineParser.accepts("num-messages", "REQUIRED: The number of messages to produce.")
|
||||
.withRequiredArg()
|
||||
.describedAs("num-messages")
|
||||
.ofType(String.class);
|
||||
commandLineOptions.put("num-messages", numMessagesOpt);
|
||||
|
||||
// syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
|
||||
// val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue()
|
||||
|
||||
.ofType(Integer.class);
|
||||
commandLineOptions.put("num-messages", numMessagesOpt);
|
||||
|
||||
OptionSpecBuilder helpOpt
|
||||
= commandLineParser.accepts("help", "Print this message.");
|
||||
|
@ -84,7 +85,7 @@ public class VerboseProducer {
|
|||
}
|
||||
checkRequiredArgs(commandLineParser, options, requiredArgs);
|
||||
|
||||
this.numMessages = Integer.parseInt((String) options.valueOf("num-messages"));
|
||||
this.numMessages = (Integer) options.valueOf("num-messages");
|
||||
this.topic = (String) options.valueOf("topic");
|
||||
|
||||
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf("broker-list"));
|
||||
|
@ -115,7 +116,12 @@ public class VerboseProducer {
|
|||
*/
|
||||
public void send(String value) {
|
||||
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, value);
|
||||
producer.send(record, new PrintInfoCallback(value));
|
||||
try {
|
||||
producer.send(record, new PrintInfoCallback(value));
|
||||
}
|
||||
catch (Exception e) {
|
||||
System.out.println(errorString(e, value));
|
||||
}
|
||||
}
|
||||
|
||||
/** Need to close the producer to flush any remaining messages if we're in async mode. */
|
||||
|
@ -176,4 +182,4 @@ public class VerboseProducer {
|
|||
|
||||
producer.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue