mirror of https://github.com/apache/kafka.git
Added number of messages option.
This commit is contained in:
parent
07cd1c66a9
commit
da94b8cbe7
|
@ -27,10 +27,11 @@ public class VerboseProducer {
|
||||||
String sync;
|
String sync;
|
||||||
private Properties producerProps = new Properties();
|
private Properties producerProps = new Properties();
|
||||||
private Producer<String, String> producer;
|
private Producer<String, String> producer;
|
||||||
|
private int numMessages;
|
||||||
|
|
||||||
public VerboseProducer(String[] args) throws IOException {
|
public VerboseProducer(String[] args) throws IOException {
|
||||||
this.configureParser();
|
this.configureParser();
|
||||||
this.configure(args);
|
this.parseCommandLineArgs(args);
|
||||||
this.producer = new KafkaProducer<String, String>(producerProps);
|
this.producer = new KafkaProducer<String, String>(producerProps);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,36 +50,44 @@ public class VerboseProducer {
|
||||||
.ofType(String.class);
|
.ofType(String.class);
|
||||||
commandLineOptions.put("broker-list", brokerListOpt);
|
commandLineOptions.put("broker-list", brokerListOpt);
|
||||||
|
|
||||||
|
|
||||||
|
ArgumentAcceptingOptionSpec<String> numMessagesOpt = commandLineParser.accepts("num-messages", "REQUIRED: The number of messages to produce.")
|
||||||
|
.withRequiredArg()
|
||||||
|
.describedAs("num-messages")
|
||||||
|
.ofType(String.class);
|
||||||
|
commandLineOptions.put("num-messages", numMessagesOpt);
|
||||||
|
|
||||||
// syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
|
// syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
|
||||||
// val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue()
|
// val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
OptionSpecBuilder helpOpt
|
OptionSpecBuilder helpOpt
|
||||||
= commandLineParser.accepts("help", "Print this message.");
|
= commandLineParser.accepts("help", "Print this message.");
|
||||||
commandLineOptions.put("help", helpOpt);
|
commandLineOptions.put("help", helpOpt);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Validate command-line arguments and parse into properties object. */
|
/** Validate command-line arguments and parse into properties object. */
|
||||||
public void configure(String[] args) throws IOException {
|
public void parseCommandLineArgs(String[] args) throws IOException {
|
||||||
|
|
||||||
OptionSpec[] requiredArgs = new OptionSpec[]{commandLineOptions.get("topic"),
|
OptionSpec[] requiredArgs = new OptionSpec[]{commandLineOptions.get("topic"),
|
||||||
commandLineOptions.get("broker-list")};
|
commandLineOptions.get("broker-list"),
|
||||||
|
commandLineOptions.get("num-messages")};
|
||||||
if(args.length == 0) {
|
if(args.length == 0) {
|
||||||
CommandLineUtils.printUsageAndDie(
|
CommandLineUtils.printUsageAndDie(
|
||||||
commandLineParser, "Read data from standard input and publish it to Kafka.");
|
commandLineParser, "Read data from standard input and publish it to Kafka.");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse and validate
|
|
||||||
OptionSet options = commandLineParser.parse(args);
|
OptionSet options = commandLineParser.parse(args);
|
||||||
if (options.has(commandLineOptions.get("help"))) {
|
if (options.has(commandLineOptions.get("help"))) {
|
||||||
commandLineParser.printHelpOn(System.out);
|
commandLineParser.printHelpOn(System.out);
|
||||||
System.exit(0);
|
System.exit(0);
|
||||||
}
|
}
|
||||||
checkRequiredArgs(commandLineParser, options, requiredArgs);
|
checkRequiredArgs(commandLineParser, options, requiredArgs);
|
||||||
|
|
||||||
|
this.numMessages = Integer.parseInt((String) options.valueOf("num-messages"));
|
||||||
|
this.topic = (String) options.valueOf("topic");
|
||||||
|
|
||||||
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf("broker-list"));
|
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf("broker-list"));
|
||||||
this.topic = (String) options.valueOf("topic");
|
|
||||||
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
|
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
|
||||||
"org.apache.kafka.common.serialization.StringSerializer");
|
"org.apache.kafka.common.serialization.StringSerializer");
|
||||||
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
|
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
|
||||||
|
@ -154,10 +163,10 @@ public class VerboseProducer {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws IOException {
|
public static void main(String[] args) throws IOException {
|
||||||
|
|
||||||
VerboseProducer producer = new VerboseProducer(args);
|
VerboseProducer producer = new VerboseProducer(args);
|
||||||
|
|
||||||
for (int i = 0; i < 20; i++) {
|
for (int i = 0; i < producer.numMessages; i++) {
|
||||||
producer.send(String.format("%d", i));
|
producer.send(String.format("%d", i));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue