Added number of messages option.

This commit is contained in:
Geoff Anderson 2015-05-27 14:07:20 -07:00
parent 07cd1c66a9
commit da94b8cbe7
1 changed files with 18 additions and 9 deletions

View File

@ -27,10 +27,11 @@ public class VerboseProducer {
String sync; String sync;
private Properties producerProps = new Properties(); private Properties producerProps = new Properties();
private Producer<String, String> producer; private Producer<String, String> producer;
private int numMessages;
public VerboseProducer(String[] args) throws IOException { public VerboseProducer(String[] args) throws IOException {
this.configureParser(); this.configureParser();
this.configure(args); this.parseCommandLineArgs(args);
this.producer = new KafkaProducer<String, String>(producerProps); this.producer = new KafkaProducer<String, String>(producerProps);
} }
@ -49,36 +50,44 @@ public class VerboseProducer {
.ofType(String.class); .ofType(String.class);
commandLineOptions.put("broker-list", brokerListOpt); commandLineOptions.put("broker-list", brokerListOpt);
ArgumentAcceptingOptionSpec<String> numMessagesOpt = commandLineParser.accepts("num-messages", "REQUIRED: The number of messages to produce.")
.withRequiredArg()
.describedAs("num-messages")
.ofType(String.class);
commandLineOptions.put("num-messages", numMessagesOpt);
// syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.") // syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
// val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue() // val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue()
OptionSpecBuilder helpOpt OptionSpecBuilder helpOpt
= commandLineParser.accepts("help", "Print this message."); = commandLineParser.accepts("help", "Print this message.");
commandLineOptions.put("help", helpOpt); commandLineOptions.put("help", helpOpt);
} }
/** Validate command-line arguments and parse into properties object. */ /** Validate command-line arguments and parse into properties object. */
public void configure(String[] args) throws IOException { public void parseCommandLineArgs(String[] args) throws IOException {
OptionSpec[] requiredArgs = new OptionSpec[]{commandLineOptions.get("topic"), OptionSpec[] requiredArgs = new OptionSpec[]{commandLineOptions.get("topic"),
commandLineOptions.get("broker-list")}; commandLineOptions.get("broker-list"),
commandLineOptions.get("num-messages")};
if(args.length == 0) { if(args.length == 0) {
CommandLineUtils.printUsageAndDie( CommandLineUtils.printUsageAndDie(
commandLineParser, "Read data from standard input and publish it to Kafka."); commandLineParser, "Read data from standard input and publish it to Kafka.");
} }
// Parse and validate
OptionSet options = commandLineParser.parse(args); OptionSet options = commandLineParser.parse(args);
if (options.has(commandLineOptions.get("help"))) { if (options.has(commandLineOptions.get("help"))) {
commandLineParser.printHelpOn(System.out); commandLineParser.printHelpOn(System.out);
System.exit(0); System.exit(0);
} }
checkRequiredArgs(commandLineParser, options, requiredArgs); checkRequiredArgs(commandLineParser, options, requiredArgs);
this.numMessages = Integer.parseInt((String) options.valueOf("num-messages"));
this.topic = (String) options.valueOf("topic");
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf("broker-list")); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf("broker-list"));
this.topic = (String) options.valueOf("topic");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer"); "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
@ -154,10 +163,10 @@ public class VerboseProducer {
} }
public static void main(String[] args) throws IOException { public static void main(String[] args) throws IOException {
VerboseProducer producer = new VerboseProducer(args); VerboseProducer producer = new VerboseProducer(args);
for (int i = 0; i < 20; i++) { for (int i = 0; i < producer.numMessages; i++) {
producer.send(String.format("%d", i)); producer.send(String.format("%d", i));
} }