Merge pull request #3 from confluentinc/verbose-client

Verbose client
This commit is contained in:
Geoff 2015-06-04 15:22:44 -07:00
commit 632be12d23
7 changed files with 443 additions and 27 deletions

View File

@ -0,0 +1,20 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.clients.tools.VerifiableProducer $@

View File

@ -352,6 +352,8 @@ project(':clients') {
compile "org.slf4j:slf4j-api:1.7.6"
compile 'org.xerial.snappy:snappy-java:1.1.1.6'
compile 'net.jpountz.lz4:lz4:1.2.0'
compile 'net.sourceforge.argparse4j:argparse4j:0.5.0'
compile 'com.googlecode.json-simple:json-simple:1.1.1'
testCompile 'com.novocode:junit-interface:0.9'
testRuntime "$slf4jlog4j"

View File

@ -92,6 +92,8 @@
<subpackage name="tools">
<allow pkg="org.apache.kafka.clients.producer" />
<allow pkg="org.apache.kafka.clients.consumer" />
<allow pkg="org.json.simple" />
<allow pkg="net.sourceforge.argparse4j" />
</subpackage>
</subpackage>

View File

@ -19,10 +19,6 @@ import org.apache.kafka.clients.producer.*;
public class ProducerPerformance {
private static final long NS_PER_MS = 1000000L;
private static final long NS_PER_SEC = 1000 * NS_PER_MS;
private static final long MIN_SLEEP_NS = 2 * NS_PER_MS;
public static void main(String[] args) throws Exception {
if (args.length < 4) {
System.err.println("USAGE: java " + ProducerPerformance.class.getName() +
@ -51,31 +47,17 @@ public class ProducerPerformance {
byte[] payload = new byte[recordSize];
Arrays.fill(payload, (byte) 1);
ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicName, payload);
long sleepTime = NS_PER_SEC / throughput;
long sleepDeficitNs = 0;
Stats stats = new Stats(numRecords, 5000);
long start = System.currentTimeMillis();
long startMs = System.currentTimeMillis();
ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs);
for (int i = 0; i < numRecords; i++) {
long sendStart = System.currentTimeMillis();
Callback cb = stats.nextCompletion(sendStart, payload.length, stats);
long sendStartMs = System.currentTimeMillis();
Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats);
producer.send(record, cb);
/*
* Maybe sleep a little to control throughput. Sleep time can be a bit inaccurate for times < 1 ms so
* instead of sleeping each time instead wait until a minimum sleep time accumulates (the "sleep deficit")
* and then make up the whole deficit in one longer sleep.
*/
if (throughput > 0) {
float elapsed = (sendStart - start) / 1000.f;
if (elapsed > 0 && i / elapsed > throughput) {
sleepDeficitNs += sleepTime;
if (sleepDeficitNs >= MIN_SLEEP_NS) {
long sleepMs = sleepDeficitNs / 1000000;
long sleepNs = sleepDeficitNs - sleepMs * 1000000;
Thread.sleep(sleepMs, (int) sleepNs);
sleepDeficitNs = 0;
}
}
if (throttler.shouldThrottle(i, sendStartMs)) {
throttler.throttle();
}
}

View File

@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.tools;
/**
* This class helps producers throttle throughput.
*
* If targetThroughput >= 0, the resulting average throughput will be approximately
* min(targetThroughput, maximumPossibleThroughput). If targetThroughput < 0,
* no throttling will occur.
*
* To use, do this between successive send attempts:
* <pre>
* {@code
* if (throttler.shouldThrottle(...)) {
* throttler.throttle();
* }
* }
* </pre>
*
* Note that this can be used to throttle message throughput or data throughput.
*/
public class ThroughputThrottler {
private static final long NS_PER_MS = 1000000L;
private static final long NS_PER_SEC = 1000 * NS_PER_MS;
private static final long MIN_SLEEP_NS = 2 * NS_PER_MS;
long sleepTimeNs;
long sleepDeficitNs = 0;
long targetThroughput = -1;
long startMs;
/**
* @param targetThroughput Can be messages/sec or bytes/sec
* @param startMs When the very first message is sent
*/
public ThroughputThrottler(long targetThroughput, long startMs) {
this.startMs = startMs;
this.targetThroughput = targetThroughput;
this.sleepTimeNs = targetThroughput > 0 ?
NS_PER_SEC / targetThroughput :
Long.MAX_VALUE;
}
/**
* @param amountSoFar bytes produced so far if you want to throttle data throughput, or
* messages produced so far if you want to throttle message throughput.
* @param sendStartMs timestamp of the most recently sent message
* @return
*/
public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
if (this.targetThroughput < 0) {
// No throttling in this case
return false;
}
float elapsedMs = (sendStartMs - startMs) / 1000.f;
return elapsedMs > 0 && (amountSoFar / elapsedMs) > this.targetThroughput;
}
/**
* Occasionally blocks for small amounts of time to achieve targetThroughput.
*
* Note that if targetThroughput is 0, this will block extremely aggressively.
*/
public void throttle() {
if (targetThroughput == 0) {
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
// do nothing
}
return;
}
// throttle throughput by sleeping, on average,
// (1 / this.throughput) seconds between "things sent"
sleepDeficitNs += sleepTimeNs;
// If enough sleep deficit has accumulated, sleep a little
if (sleepDeficitNs >= MIN_SLEEP_NS) {
long sleepMs = sleepDeficitNs / 1000000;
long sleepNs = sleepDeficitNs - sleepMs * 1000000;
long sleepStartNs = System.nanoTime();
try {
Thread.sleep(sleepMs, (int) sleepNs);
sleepDeficitNs = 0;
} catch (InterruptedException e) {
// If sleep is cut short, reduce deficit by the amount of
// time we actually spent sleeping
long sleepElapsedNs = System.nanoTime() - sleepStartNs;
if (sleepElapsedNs <= sleepDeficitNs) {
sleepDeficitNs -= sleepElapsedNs;
}
}
}
}
}

View File

@ -0,0 +1,292 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.tools;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.json.simple.JSONObject;
import java.io.IOException;
import java.util.Properties;
import static net.sourceforge.argparse4j.impl.Arguments.store;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
/**
* Primarily intended for use with system testing, this producer prints metadata
* in the form of JSON to stdout on each "send" request. For example, this helps
* with end-to-end correctness tests by making externally visible which messages have been
* acked and which have not.
*
* When used as a command-line tool, it produces increasing integers. It will produce a
* fixed number of messages unless the default max-messages -1 is used, in which case
* it produces indefinitely.
*
* If logging is left enabled, log output on stdout can be easily ignored by checking
* whether a given line is valid JSON.
*/
public class VerifiableProducer {
String topic;
private Producer<String, String> producer;
// If maxMessages < 0, produce until the process is killed externally
private long maxMessages = -1;
// Number of messages for which acks were received
private long numAcked = 0;
// Number of send attempts
private long numSent = 0;
// Throttle message throughput if this is set >= 0
private long throughput;
// Hook to trigger producing thread to stop sending messages
private boolean stopProducing = false;
public VerifiableProducer(
Properties producerProps, String topic, int throughput, int maxMessages) {
this.topic = topic;
this.throughput = throughput;
this.maxMessages = maxMessages;
this.producer = new KafkaProducer<String, String>(producerProps);
}
/** Get the command-line argument parser. */
private static ArgumentParser argParser() {
ArgumentParser parser = ArgumentParsers
.newArgumentParser("verifiable-producer")
.defaultHelp(true)
.description("This tool produces increasing integers to the specified topic and prints JSON metadata to stdout on each \"send\" request, making externally visible which messages have been acked and which have not.");
parser.addArgument("--topic")
.action(store())
.required(true)
.type(String.class)
.metavar("TOPIC")
.help("Produce messages to this topic.");
parser.addArgument("--broker-list")
.action(store())
.required(true)
.type(String.class)
.metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
.dest("brokerList")
.help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
parser.addArgument("--max-messages")
.action(store())
.required(false)
.setDefault(-1)
.type(Integer.class)
.metavar("MAX-MESSAGES")
.dest("maxMessages")
.help("Produce this many messages. If -1, produce messages until the process is killed externally.");
parser.addArgument("--throughput")
.action(store())
.required(false)
.setDefault(-1)
.type(Integer.class)
.metavar("THROUGHPUT")
.help("If set >= 0, throttle maximum message throughput to *approximately* THROUGHPUT messages/sec.");
parser.addArgument("--acks")
.action(store())
.required(false)
.setDefault(-1)
.type(Integer.class)
.choices(0, 1, -1)
.metavar("ACKS")
.help("Acks required on each produced message. See Kafka docs on request.required.acks for details.");
return parser;
}
/** Construct a VerifiableProducer object from command-line arguments. */
public static VerifiableProducer createFromArgs(String[] args) {
ArgumentParser parser = argParser();
VerifiableProducer producer = null;
try {
Namespace res;
res = parser.parseArgs(args);
System.out.println(res);
System.out.println(res.getString("brokerList"));
int maxMessages = res.getInt("maxMessages");
String topic = res.getString("topic");
int throughput = res.getInt("throughput");
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.ACKS_CONFIG, Integer.toString(res.getInt("acks")));
// No producer retries
producerProps.put("retries", "0");
producer = new VerifiableProducer(producerProps, topic, throughput, maxMessages);
} catch (ArgumentParserException e) {
if (args.length == 0) {
parser.printHelp();
System.exit(0);
} else {
parser.handleError(e);
System.exit(1);
}
}
return producer;
}
/** Produce a message with given key and value. */
public void send(String key, String value) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value);
numSent++;
try {
producer.send(record, new PrintInfoCallback(key, value));
} catch (Exception e) {
synchronized (System.out) {
System.out.println(errorString(e, key, value, System.currentTimeMillis()));
}
}
}
/** Close the producer to flush any remaining messages. */
public void close() {
producer.close();
}
/**
* Return JSON string encapsulating basic information about the exception, as well
* as the key and value which triggered the exception.
*/
String errorString(Exception e, String key, String value, Long nowMs) {
assert e != null : "Expected non-null exception.";
JSONObject obj = new JSONObject();
obj.put("class", this.getClass().toString());
obj.put("name", "producer_send_error");
obj.put("time_ms", nowMs);
obj.put("exception", e.getClass().toString());
obj.put("message", e.getMessage());
obj.put("topic", this.topic);
obj.put("key", key);
obj.put("value", value);
return obj.toJSONString();
}
String successString(RecordMetadata recordMetadata, String key, String value, Long nowMs) {
assert recordMetadata != null : "Expected non-null recordMetadata object.";
JSONObject obj = new JSONObject();
obj.put("class", this.getClass().toString());
obj.put("name", "producer_send_success");
obj.put("time_ms", nowMs);
obj.put("topic", this.topic);
obj.put("partition", recordMetadata.partition());
obj.put("offset", recordMetadata.offset());
obj.put("key", key);
obj.put("value", value);
return obj.toJSONString();
}
/** Callback which prints errors to stdout when the producer fails to send. */
private class PrintInfoCallback implements Callback {
private String key;
private String value;
PrintInfoCallback(String key, String value) {
this.key = key;
this.value = value;
}
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
synchronized (System.out) {
if (e == null) {
VerifiableProducer.this.numAcked++;
System.out.println(successString(recordMetadata, this.key, this.value, System.currentTimeMillis()));
} else {
System.out.println(errorString(e, this.key, this.value, System.currentTimeMillis()));
}
}
}
}
public static void main(String[] args) throws IOException {
final VerifiableProducer producer = createFromArgs(args);
final long startMs = System.currentTimeMillis();
boolean infinite = producer.maxMessages < 0;
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Trigger main thread to stop producing messages
producer.stopProducing = true;
// Flush any remaining messages
producer.close();
// Print a summary
long stopMs = System.currentTimeMillis();
double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));
JSONObject obj = new JSONObject();
obj.put("class", producer.getClass().toString());
obj.put("name", "tool_data");
obj.put("sent", producer.numSent);
obj.put("acked", producer.numAcked);
obj.put("target_throughput", producer.throughput);
obj.put("avg_throughput", avgThroughput);
System.out.println(obj.toJSONString());
}
});
ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs);
for (int i = 0; i < producer.maxMessages || infinite; i++) {
if (producer.stopProducing) {
break;
}
long sendStartMs = System.currentTimeMillis();
producer.send(null, String.format("%d", i));
if (throttler.shouldThrottle(i, sendStartMs)) {
throttler.throttle();
}
}
}
}

View File

@ -115,9 +115,9 @@ object ProducerPerformance extends Logging {
.defaultsTo(0)
val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
"set, the csv metrics will be outputed here")
"set, the csv metrics will be output here")
.withRequiredArg
.describedAs("metrics dictory")
.describedAs("metrics directory")
.ofType(classOf[java.lang.String])
val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.")