KAFKA-2562: update kafka scripts to use new tools/code

Updated  kafka-producer-perf-test.sh to use org.apache.kafka.clients.tools.ProducerPerformance.
Updated build.gradle to add kafka-tools-0.9.0.0-SNAPSHOT.jar to kafka/libs  folder.

Author: Manikumar reddy O <manikumar.reddy@gmail.com>

Reviewers: Gwen Shapira, Ismael Juma

Closes #242 from omkreddy/KAFKA-2562
This commit is contained in:
Manikumar reddy O 2015-10-30 15:30:34 -07:00 committed by Gwen Shapira
parent d50499a0e0
commit 1cc44830b9
5 changed files with 113 additions and 42 deletions

View File

@ -17,4 +17,4 @@
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ProducerPerformance $@
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance $@

View File

@ -16,5 +16,5 @@ rem limitations under the License.
SetLocal
set KAFKA_HEAP_OPTS=-Xmx512M
%~dp0kafka-run-class.bat kafka.tools.ProducerPerformance %*
%~dp0kafka-run-class.bat org.apache.kafka.tools.ProducerPerformance %*
EndLocal

View File

@ -254,7 +254,7 @@ project(':core') {
dependencies {
compile project(':clients')
compile project(':log4j-appender')
compile "$slf4jlog4j"
compile "org.scala-lang:scala-library:$scalaVersion"
compile 'org.apache.zookeeper:zookeeper:3.4.6'
compile 'com.101tec:zkclient:0.6'
@ -318,6 +318,9 @@ project(':core') {
from(configurations.runtime) { into("libs/") }
from(configurations.archives.artifacts.files) { into("libs/") }
from(project.siteDocsTar) { into("site-docs/") }
from(project(':log4j-appender').jar) { into("libs/") }
from(project(':tools').jar) { into("libs/") }
from(project(':tools').configurations.runtime) { into("libs/") }
}
jar {

View File

@ -53,7 +53,7 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
'kafka_directory': kafka_dir(node)
})
cmd = "JMX_PORT=%(jmx_port)d /opt/%(kafka_directory)s/bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance " \
"%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s" % args
"--topic %(topic)s --num-records %(num_records)d --record-size %(record_size)d --throughput %(throughput)d --producer-props bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s" % args
self.security_config.setup_node(node)
if self.security_protocol == SecurityConfig.SSL:

View File

@ -3,67 +3,135 @@
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.tools;
import static net.sourceforge.argparse4j.impl.Arguments.store;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.clients.producer.*;
public class ProducerPerformance {
public static void main(String[] args) throws Exception {
if (args.length < 4) {
System.err.println("USAGE: java " + ProducerPerformance.class.getName() +
" topic_name num_records record_size target_records_sec [prop_name=prop_value]*");
System.exit(1);
}
ArgumentParser parser = argParser();
/* parse args */
String topicName = args[0];
long numRecords = Long.parseLong(args[1]);
int recordSize = Integer.parseInt(args[2]);
int throughput = Integer.parseInt(args[3]);
try {
Namespace res = parser.parseArgs(args);
Properties props = new Properties();
for (int i = 4; i < args.length; i++) {
String[] pieces = args[i].split("=");
if (pieces.length != 2)
throw new IllegalArgumentException("Invalid property: " + args[i]);
props.put(pieces[0], pieces[1]);
}
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(props);
/* parse args */
String topicName = res.getString("topic");
long numRecords = res.getLong("numRecords");
int recordSize = res.getInt("recordSize");
int throughput = res.getInt("throughput");
List<String> producerProps = res.getList("producerConfig");
/* setup perf test */
byte[] payload = new byte[recordSize];
Arrays.fill(payload, (byte) 1);
ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicName, payload);
Stats stats = new Stats(numRecords, 5000);
long startMs = System.currentTimeMillis();
Properties props = new Properties();
if (producerProps != null)
for (String prop : producerProps) {
String[] pieces = prop.split("=");
if (pieces.length != 2)
throw new IllegalArgumentException("Invalid property: " + prop);
props.put(pieces[0], pieces[1]);
}
ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs);
for (int i = 0; i < numRecords; i++) {
long sendStartMs = System.currentTimeMillis();
Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats);
producer.send(record, cb);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(props);
if (throttler.shouldThrottle(i, sendStartMs)) {
throttler.throttle();
/* setup perf test */
byte[] payload = new byte[recordSize];
Arrays.fill(payload, (byte) 1);
ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicName, payload);
Stats stats = new Stats(numRecords, 5000);
long startMs = System.currentTimeMillis();
ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs);
for (int i = 0; i < numRecords; i++) {
long sendStartMs = System.currentTimeMillis();
Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats);
producer.send(record, cb);
if (throttler.shouldThrottle(i, sendStartMs)) {
throttler.throttle();
}
}
/* print final results */
producer.close();
stats.printTotal();
} catch (ArgumentParserException e) {
if (args.length == 0) {
parser.printHelp();
System.exit(0);
} else {
parser.handleError(e);
System.exit(1);
}
}
/* print final results */
producer.close();
stats.printTotal();
}
/** Get the command-line argument parser. */
private static ArgumentParser argParser() {
ArgumentParser parser = ArgumentParsers
.newArgumentParser("producer-performance")
.defaultHelp(true)
.description("This tool is used to verify the producer performance.");
parser.addArgument("--topic")
.action(store())
.required(true)
.type(String.class)
.metavar("TOPIC")
.help("produce messages to this topic");
parser.addArgument("--num-records")
.action(store())
.required(true)
.type(Long.class)
.metavar("NUM-RECORDS")
.dest("numRecords")
.help("number of messages to produce");
parser.addArgument("--record-size")
.action(store())
.required(true)
.type(Integer.class)
.metavar("RECORD-SIZE")
.dest("recordSize")
.help("message size in bytes");
parser.addArgument("--throughput")
.action(store())
.required(true)
.type(Integer.class)
.metavar("THROUGHPUT")
.help("throttle maximum message throughput to *approximately* THROUGHPUT messages/sec");
parser.addArgument("--producer-props")
.nargs("+")
.required(true)
.metavar("PROP-NAME=PROP-VALUE")
.type(String.class)
.dest("producerConfig")
.help("kafka producer related configuaration properties like bootstrap.servers,client.id etc..");
return parser;
}
private static class Stats {