KAFKA-5516: Formatting verifiable producer/consumer output in a similar fashion

Author: ppatierno <ppatierno@live.com>
Author: Paolo Patierno <ppatierno@live.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #3434 from ppatierno/verifiable-consumer-producer
This commit is contained in:
ppatierno 2017-08-07 08:10:42 -07:00 committed by Ewen Cheslack-Postava
parent 2010aa067f
commit f15cdc73dd
2 changed files with 253 additions and 144 deletions

View File

@ -18,6 +18,7 @@ package org.apache.kafka.tools;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonSerializer; import com.fasterxml.jackson.databind.JsonSerializer;
@ -256,6 +257,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
} }
} }
@JsonPropertyOrder({ "timestamp", "name" })
private static abstract class ConsumerEvent { private static abstract class ConsumerEvent {
private final long timestamp = System.currentTimeMillis(); private final long timestamp = System.currentTimeMillis();
@ -345,6 +347,7 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons
} }
} }
@JsonPropertyOrder({ "timestamp", "name", "key", "value", "topic", "partition", "offset" })
public static class RecordData extends ConsumerEvent { public static class RecordData extends ConsumerEvent {
private final ConsumerRecord<String, String> record; private final ConsumerRecord<String, String> record;

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.kafka.tools; package org.apache.kafka.tools;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.Producer;
@ -30,8 +32,6 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties; import java.util.Properties;
import static net.sourceforge.argparse4j.impl.Arguments.store; import static net.sourceforge.argparse4j.impl.Arguments.store;
@ -40,6 +40,7 @@ import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser; import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException; import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace; import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Exit;
/** /**
@ -57,8 +58,9 @@ import org.apache.kafka.common.utils.Exit;
*/ */
public class VerifiableProducer { public class VerifiableProducer {
String topic; private final ObjectMapper mapper = new ObjectMapper();
private Producer<String, String> producer; private final String topic;
private final Producer<String, String> producer;
// If maxMessages < 0, produce until the process is killed externally // If maxMessages < 0, produce until the process is killed externally
private long maxMessages = -1; private long maxMessages = -1;
@ -69,22 +71,21 @@ public class VerifiableProducer {
private long numSent = 0; private long numSent = 0;
// Throttle message throughput if this is set >= 0 // Throttle message throughput if this is set >= 0
private long throughput; private final long throughput;
// Hook to trigger producing thread to stop sending messages // Hook to trigger producing thread to stop sending messages
private boolean stopProducing = false; private boolean stopProducing = false;
// Prefix (plus a dot separator) added to every value produced by verifiable producer // Prefix (plus a dot separator) added to every value produced by verifiable producer
// if null, then values are produced without a prefix // if null, then values are produced without a prefix
private Integer valuePrefix; private final Integer valuePrefix;
public VerifiableProducer( public VerifiableProducer(KafkaProducer<String, String> producer, String topic, int throughput, int maxMessages, Integer valuePrefix) {
Properties producerProps, String topic, int throughput, int maxMessages, Integer valuePrefix) {
this.topic = topic; this.topic = topic;
this.throughput = throughput; this.throughput = throughput;
this.maxMessages = maxMessages; this.maxMessages = maxMessages;
this.producer = new KafkaProducer<>(producerProps); this.producer = producer;
this.valuePrefix = valuePrefix; this.valuePrefix = valuePrefix;
} }
@ -134,7 +135,7 @@ public class VerifiableProducer {
.type(Integer.class) .type(Integer.class)
.choices(0, 1, -1) .choices(0, 1, -1)
.metavar("ACKS") .metavar("ACKS")
.help("Acks required on each produced message. See Kafka docs on request.required.acks for details."); .help("Acks required on each produced message. See Kafka docs on acks for details.");
parser.addArgument("--producer.config") parser.addArgument("--producer.config")
.action(store()) .action(store())
@ -157,9 +158,9 @@ public class VerifiableProducer {
/** /**
* Read a properties file from the given path * Read a properties file from the given path
* @param filename The path of the file to read * @param filename The path of the file to read
* *
* Note: this duplication of org.apache.kafka.common.utils.Utils.loadProps is unfortunate * Note: this duplication of org.apache.kafka.common.utils.Utils.loadProps is unfortunate
* but *intentional*. In order to use VerifiableProducer in compatibility and upgrade tests, * but *intentional*. In order to use VerifiableProducer in compatibility and upgrade tests,
* we use VerifiableProducer from the development tools package, and run it against 0.8.X.X kafka jars. * we use VerifiableProducer from the development tools package, and run it against 0.8.X.X kafka jars.
* Since this method is not in Utils in the 0.8.X.X jars, we have to cheat a bit and duplicate. * Since this method is not in Utils in the 0.8.X.X jars, we have to cheat a bit and duplicate.
*/ */
@ -170,63 +171,50 @@ public class VerifiableProducer {
} }
return props; return props;
} }
/** Construct a VerifiableProducer object from command-line arguments. */ /** Construct a VerifiableProducer object from command-line arguments. */
public static VerifiableProducer createFromArgs(String[] args) { public static VerifiableProducer createFromArgs(ArgumentParser parser, String[] args) throws ArgumentParserException {
ArgumentParser parser = argParser(); Namespace res = parser.parseArgs(args);
VerifiableProducer producer = null;
try { int maxMessages = res.getInt("maxMessages");
Namespace res; String topic = res.getString("topic");
res = parser.parseArgs(args); int throughput = res.getInt("throughput");
String configFile = res.getString("producer.config");
Integer valuePrefix = res.getInt("valuePrefix");
int maxMessages = res.getInt("maxMessages"); Properties producerProps = new Properties();
String topic = res.getString("topic"); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));
int throughput = res.getInt("throughput"); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
String configFile = res.getString("producer.config"); "org.apache.kafka.common.serialization.StringSerializer");
Integer valuePrefix = res.getInt("valuePrefix"); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
Properties producerProps = new Properties(); producerProps.put(ProducerConfig.ACKS_CONFIG, Integer.toString(res.getInt("acks")));
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList")); // No producer retries
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, producerProps.put(ProducerConfig.RETRIES_CONFIG, "0");
"org.apache.kafka.common.serialization.StringSerializer"); if (configFile != null) {
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, try {
"org.apache.kafka.common.serialization.StringSerializer"); producerProps.putAll(loadProps(configFile));
producerProps.put(ProducerConfig.ACKS_CONFIG, Integer.toString(res.getInt("acks"))); } catch (IOException e) {
// No producer retries throw new ArgumentParserException(e.getMessage(), parser);
producerProps.put("retries", "0");
if (configFile != null) {
try {
producerProps.putAll(loadProps(configFile));
} catch (IOException e) {
throw new ArgumentParserException(e.getMessage(), parser);
}
}
producer = new VerifiableProducer(producerProps, topic, throughput, maxMessages, valuePrefix);
} catch (ArgumentParserException e) {
if (args.length == 0) {
parser.printHelp();
Exit.exit(0);
} else {
parser.handleError(e);
Exit.exit(1);
} }
} }
return producer; StringSerializer serializer = new StringSerializer();
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps, serializer, serializer);
return new VerifiableProducer(producer, topic, throughput, maxMessages, valuePrefix);
} }
/** Produce a message with given key and value. */ /** Produce a message with given key and value. */
public void send(String key, String value) { public void send(String key, String value) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value); ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
numSent++; numSent++;
try { try {
producer.send(record, new PrintInfoCallback(key, value)); producer.send(record, new PrintInfoCallback(key, value));
} catch (Exception e) { } catch (Exception e) {
synchronized (System.out) { synchronized (System.out) {
System.out.println(errorString(e, key, value, System.currentTimeMillis())); printJson(new FailedSend(key, value, topic, e));
} }
} }
} }
@ -242,66 +230,174 @@ public class VerifiableProducer {
/** Close the producer to flush any remaining messages. */ /** Close the producer to flush any remaining messages. */
public void close() { public void close() {
producer.close(); producer.close();
System.out.println(shutdownString()); printJson(new ShutdownComplete());
} }
String shutdownString() { @JsonPropertyOrder({ "timestamp", "name" })
Map<String, Object> data = new HashMap<>(); private static abstract class ProducerEvent {
data.put("name", "shutdown_complete"); private final long timestamp = System.currentTimeMillis();
return toJsonString(data);
}
String startupString() { @JsonProperty
Map<String, Object> data = new HashMap<>(); public abstract String name();
data.put("name", "startup_complete");
return toJsonString(data);
}
/** @JsonProperty
* Return JSON string encapsulating basic information about the exception, as well public long timestamp() {
* as the key and value which triggered the exception. return timestamp;
*/ }
String errorString(Exception e, String key, String value, Long nowMs) { }
assert e != null : "Expected non-null exception.";
private static class StartupComplete extends ProducerEvent {
Map<String, Object> errorData = new HashMap<>();
errorData.put("name", "producer_send_error"); @Override
public String name() {
errorData.put("time_ms", nowMs); return "startup_complete";
errorData.put("exception", e.getClass().toString()); }
errorData.put("message", e.getMessage()); }
errorData.put("topic", this.topic);
errorData.put("key", key); private static class ShutdownComplete extends ProducerEvent {
errorData.put("value", value);
@Override
return toJsonString(errorData); public String name() {
} return "shutdown_complete";
}
String successString(RecordMetadata recordMetadata, String key, String value, Long nowMs) { }
assert recordMetadata != null : "Expected non-null recordMetadata object.";
private static class SuccessfulSend extends ProducerEvent {
Map<String, Object> successData = new HashMap<>();
successData.put("name", "producer_send_success"); private String key;
private String value;
successData.put("time_ms", nowMs); private RecordMetadata recordMetadata;
successData.put("topic", this.topic);
successData.put("partition", recordMetadata.partition()); public SuccessfulSend(String key, String value, RecordMetadata recordMetadata) {
successData.put("offset", recordMetadata.offset()); assert recordMetadata != null : "Expected non-null recordMetadata object.";
successData.put("key", key); this.key = key;
successData.put("value", value); this.value = value;
this.recordMetadata = recordMetadata;
return toJsonString(successData); }
}
@Override
private String toJsonString(Map<String, Object> data) { public String name() {
String json; return "producer_send_success";
try { }
ObjectMapper mapper = new ObjectMapper();
json = mapper.writeValueAsString(data); @JsonProperty
} catch (JsonProcessingException e) { public String key() {
json = "Bad data can't be written as json: " + e.getMessage(); return key;
}
@JsonProperty
public String value() {
return value;
}
@JsonProperty
public String topic() {
return recordMetadata.topic();
}
@JsonProperty
public int partition() {
return recordMetadata.partition();
}
@JsonProperty
public long offset() {
return recordMetadata.offset();
}
}
private static class FailedSend extends ProducerEvent {
private String topic;
private String key;
private String value;
private Exception exception;
public FailedSend(String key, String value, String topic, Exception exception) {
assert exception != null : "Expected non-null exception.";
this.key = key;
this.value = value;
this.topic = topic;
this.exception = exception;
}
@Override
public String name() {
return "producer_send_error";
}
@JsonProperty
public String key() {
return key;
}
@JsonProperty
public String value() {
return value;
}
@JsonProperty
public String topic() {
return topic;
}
@JsonProperty
public String exception() {
return exception.getClass().toString();
}
@JsonProperty
public String message() {
return exception.getMessage();
}
}
private static class ToolData extends ProducerEvent {
private long sent;
private long acked;
private long targetThroughput;
private double avgThroughput;
public ToolData(long sent, long acked, long targetThroughput, double avgThroughput) {
this.sent = sent;
this.acked = acked;
this.targetThroughput = targetThroughput;
this.avgThroughput = avgThroughput;
}
@Override
public String name() {
return "tool_data";
}
@JsonProperty
public long sent() {
return this.sent;
}
@JsonProperty
public long acked() {
return this.acked;
}
@JsonProperty("target_throughput")
public long targetThroughput() {
return this.targetThroughput;
}
@JsonProperty("avg_throughput")
public double avgThroughput() {
return this.avgThroughput;
}
}
private void printJson(Object data) {
try {
System.out.println(mapper.writeValueAsString(data));
} catch (JsonProcessingException e) {
System.out.println("Bad data can't be written as json: " + e.getMessage());
} }
return json;
} }
/** Callback which prints errors to stdout when the producer fails to send. */ /** Callback which prints errors to stdout when the producer fails to send. */
@ -319,54 +415,27 @@ public class VerifiableProducer {
synchronized (System.out) { synchronized (System.out) {
if (e == null) { if (e == null) {
VerifiableProducer.this.numAcked++; VerifiableProducer.this.numAcked++;
System.out.println(successString(recordMetadata, this.key, this.value, System.currentTimeMillis())); printJson(new SuccessfulSend(this.key, this.value, recordMetadata));
} else { } else {
System.out.println(errorString(e, this.key, this.value, System.currentTimeMillis())); printJson(new FailedSend(this.key, this.value, topic, e));
} }
} }
} }
} }
public static void main(String[] args) throws IOException { public void run(ThroughputThrottler throttler) {
final VerifiableProducer producer = createFromArgs(args); printJson(new StartupComplete());
final long startMs = System.currentTimeMillis(); // negative maxMessages (-1) means "infinite"
boolean infinite = producer.maxMessages < 0; long maxMessages = (this.maxMessages < 0) ? Long.MAX_VALUE : this.maxMessages;
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Trigger main thread to stop producing messages
producer.stopProducing = true;
// Flush any remaining messages
producer.close();
// Print a summary
long stopMs = System.currentTimeMillis();
double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));
Map<String, Object> data = new HashMap<>();
data.put("name", "tool_data");
data.put("sent", producer.numSent);
data.put("acked", producer.numAcked);
data.put("target_throughput", producer.throughput);
data.put("avg_throughput", avgThroughput);
System.out.println(producer.toJsonString(data));
}
});
ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs);
System.out.println(producer.startupString());
long maxMessages = infinite ? Long.MAX_VALUE : producer.maxMessages;
for (long i = 0; i < maxMessages; i++) { for (long i = 0; i < maxMessages; i++) {
if (producer.stopProducing) { if (this.stopProducing) {
break; break;
} }
long sendStartMs = System.currentTimeMillis(); long sendStartMs = System.currentTimeMillis();
producer.send(null, producer.getValue(i)); this.send(null, this.getValue(i));
if (throttler.shouldThrottle(i, sendStartMs)) { if (throttler.shouldThrottle(i, sendStartMs)) {
throttler.throttle(); throttler.throttle();
@ -374,4 +443,41 @@ public class VerifiableProducer {
} }
} }
public static void main(String[] args) {
ArgumentParser parser = argParser();
if (args.length == 0) {
parser.printHelp();
Exit.exit(0);
}
try {
final VerifiableProducer producer = createFromArgs(parser, args);
final long startMs = System.currentTimeMillis();
ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Trigger main thread to stop producing messages
producer.stopProducing = true;
// Flush any remaining messages
producer.close();
// Print a summary
long stopMs = System.currentTimeMillis();
double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs));
producer.printJson(new ToolData(producer.numSent, producer.numAcked, producer.throughput, avgThroughput));
}
});
producer.run(throttler);
} catch (ArgumentParserException e) {
parser.handleError(e);
Exit.exit(1);
}
}
} }