KAFKA-4702: Parametrize streams benchmarks to run at scale

Author: Eno Thereska <eno.thereska@gmail.com>
Author: Eno Thereska <eno@confluent.io>
Author: Ubuntu <ubuntu@ip-172-31-22-146.us-west-2.compute.internal>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2478 from enothereska/minor-benchmark-args
This commit is contained in:
Eno Thereska 2017-02-08 13:06:09 -08:00 committed by Guozhang Wang
parent c7c113af63
commit 13a82b48ca
4 changed files with 355 additions and 205 deletions

View File

@ -51,18 +51,32 @@ import org.apache.kafka.test.TestUtils;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.Properties;
import java.util.Random;
/**
* Class that provides support for a series of benchmarks. It is usually driven by
* tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py.
* If ran manually through the main() function below, you must do the following:
* 1. Have ZK and a Kafka broker set up
* 2. Run the loading step first: SimpleBenchmark localhost:9092 /tmp/statedir numRecords true "all"
* 3. Run the stream processing step second: SimpleBenchmark localhost:9092 /tmp/statedir numRecords false "all"
* Note that what changed is the 4th parameter, from "true" indicating that is a load phase, to "false" indicating
* that this is a real run.
*/
public class SimpleBenchmark {
private final String kafka;
private final File stateDir;
private final Boolean loadPhase;
private final String testName;
private static final String ALL_TESTS = "all";
private static final String SOURCE_TOPIC = "simpleBenchmarkSourceTopic";
private static final String SINK_TOPIC = "simpleBenchmarkSinkTopic";
private static final String COUNT_TOPIC = "countTopic";
private static final String JOIN_TOPIC_1_PREFIX = "joinSourceTopic1";
private static final String JOIN_TOPIC_2_PREFIX = "joinSourceTopic2";
private static final ValueJoiner VALUE_JOINER = new ValueJoiner<byte[], byte[], byte[]>() {
@ -83,7 +97,7 @@ public class SimpleBenchmark {
};
private static int numRecords;
private static Integer endKey;
private static int processedRecords = 0;
private static final int KEY_SIZE = 8;
private static final int VALUE_SIZE = 100;
private static final int RECORD_SIZE = KEY_SIZE + VALUE_SIZE;
@ -91,17 +105,80 @@ public class SimpleBenchmark {
private static final Serde<byte[]> BYTE_SERDE = Serdes.ByteArray();
private static final Serde<Integer> INTEGER_SERDE = Serdes.Integer();
public SimpleBenchmark(File stateDir, String kafka) {
public SimpleBenchmark(final File stateDir, final String kafka, final Boolean loadPhase, final String testName) {
super();
this.stateDir = stateDir;
this.kafka = kafka;
this.loadPhase = loadPhase;
this.testName = testName;
}
private void run() throws Exception {
switch (testName) {
case ALL_TESTS:
// producer performance
produce(SOURCE_TOPIC);
// consumer performance
consume(SOURCE_TOPIC);
// simple stream performance source->process
processStream(SOURCE_TOPIC);
// simple stream performance source->sink
processStreamWithSink(SOURCE_TOPIC);
// simple stream performance source->store
processStreamWithStateStore(SOURCE_TOPIC);
// simple stream performance source->cache->store
processStreamWithCachedStateStore(SOURCE_TOPIC);
// simple aggregation
count(COUNT_TOPIC);
// simple streams performance KSTREAM-KTABLE join
kStreamKTableJoin(JOIN_TOPIC_1_PREFIX + "KStreamKTable", JOIN_TOPIC_2_PREFIX + "KStreamKTable");
// simple streams performance KSTREAM-KSTREAM join
kStreamKStreamJoin(JOIN_TOPIC_1_PREFIX + "KStreamKStream", JOIN_TOPIC_2_PREFIX + "KStreamKStream");
// simple streams performance KTABLE-KTABLE join
kTableKTableJoin(JOIN_TOPIC_1_PREFIX + "KTableKTable", JOIN_TOPIC_2_PREFIX + "KTableKTable");
break;
case "produce":
produce(SOURCE_TOPIC, VALUE_SIZE, "simple-benchmark-produce", numRecords, true, numRecords, true);
break;
case "consume":
consume(SOURCE_TOPIC);
break;
case "count":
count(COUNT_TOPIC);
break;
case "processstream":
processStream(SOURCE_TOPIC);
break;
case "processstreamwithsink":
processStreamWithSink(SOURCE_TOPIC);
break;
case "processstreamwithstatestore":
processStreamWithStateStore(SOURCE_TOPIC);
break;
case "processstreamwithcachedstatestore":
processStreamWithCachedStateStore(SOURCE_TOPIC);
break;
case "kstreamktablejoin":
kStreamKTableJoin(JOIN_TOPIC_1_PREFIX + "KStreamKTable", JOIN_TOPIC_2_PREFIX + "KStreamKTable");
break;
case "kstreamkstreamjoin":
kStreamKStreamJoin(JOIN_TOPIC_1_PREFIX + "KStreamKStream", JOIN_TOPIC_2_PREFIX + "KStreamKStream");
break;
case "ktablektablejoin":
kTableKTableJoin(JOIN_TOPIC_1_PREFIX + "KTableKTable", JOIN_TOPIC_2_PREFIX + "KTableKTable");
break;
default:
throw new Exception("Unknown test name " + testName);
}
}
public static void main(String[] args) throws Exception {
String kafka = args.length > 0 ? args[0] : "localhost:9092";
String stateDirStr = args.length > 1 ? args[1] : TestUtils.tempDirectory().getAbsolutePath();
numRecords = args.length > 2 ? Integer.parseInt(args[2]) : 10000000;
endKey = numRecords - 1;
boolean loadPhase = args.length > 3 ? Boolean.parseBoolean(args[3]) : false;
String testName = args.length > 4 ? args[4].toLowerCase(Locale.ROOT) : ALL_TESTS;
final File stateDir = new File(stateDirStr);
stateDir.mkdir();
@ -113,30 +190,14 @@ public class SimpleBenchmark {
System.out.println("kafka=" + kafka);
System.out.println("stateDir=" + stateDir);
System.out.println("numRecords=" + numRecords);
System.out.println("loadPhase=" + loadPhase);
System.out.println("testName=" + testName);
SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka);
// producer performance
benchmark.produce(SOURCE_TOPIC, VALUE_SIZE, "simple-benchmark-produce", numRecords, true, numRecords, true);
// consumer performance
benchmark.consume(SOURCE_TOPIC);
// simple stream performance source->process
benchmark.processStream(SOURCE_TOPIC);
// simple stream performance source->sink
benchmark.processStreamWithSink(SOURCE_TOPIC);
// simple stream performance source->store
benchmark.processStreamWithStateStore(SOURCE_TOPIC);
// simple stream performance source->cache->store
benchmark.processStreamWithCachedStateStore(SOURCE_TOPIC);
// simple streams performance KSTREAM-KTABLE join
benchmark.kStreamKTableJoin(JOIN_TOPIC_1_PREFIX + "KStreamKTable", JOIN_TOPIC_2_PREFIX + "KStreamKTable");
// simple streams performance KSTREAM-KSTREAM join
benchmark.kStreamKStreamJoin(JOIN_TOPIC_1_PREFIX + "KStreamKStream", JOIN_TOPIC_2_PREFIX + "KStreamKStream");
// simple streams performance KTABLE-KTABLE join
benchmark.kTableKTableJoin(JOIN_TOPIC_1_PREFIX + "KTableKTable", JOIN_TOPIC_2_PREFIX + "KTableKTable");
SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, loadPhase, testName);
benchmark.run();
}
private Properties setJoinProperties(final String applicationId) {
private Properties setStreamProperties(final String applicationId) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
@ -148,25 +209,85 @@ public class SimpleBenchmark {
return props;
}
private Properties setProduceConsumeProperties(final String clientId) {
Properties props = new Properties();
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
return props;
}
private boolean maybeSetupPhase(final String topic, final String clientId,
final boolean skipIfAllTests) throws Exception {
processedRecords = 0;
// initialize topics
if (loadPhase) {
if (skipIfAllTests) {
// if we run all tests, the produce test will have already loaded the data
if (testName.equals(ALL_TESTS)) {
// Skipping loading phase since previously loaded
return true;
}
}
System.out.println("Initializing topic " + topic);
// WARNING: The keys must be sequential, i.e., unique, otherwise the logic for when this test
// stops will not work (in createCountStreams)
produce(topic, VALUE_SIZE, clientId, numRecords, true, numRecords, false);
return true;
}
return false;
}
private KafkaStreams createCountStreams(Properties streamConfig, String topic, final CountDownLatch latch) {
final KStreamBuilder builder = new KStreamBuilder();
final KStream<Integer, byte[]> input = builder.stream(topic);
input.groupByKey()
.count("tmpStoreName").foreach(new CountDownAction(latch));
return new KafkaStreams(builder, streamConfig);
}
/**
* Measure the performance of a simple aggregate like count.
* Counts the occurrence of numbers (note that normally people count words, this
* example counts numbers)
* @param countTopic Topic where numbers are stored
* @throws Exception
*/
public void count(String countTopic) throws Exception {
if (maybeSetupPhase(countTopic, "simple-benchmark-produce-count", false)) {
return;
}
CountDownLatch latch = new CountDownLatch(1);
Properties props = setStreamProperties("simple-benchmark-count");
final KafkaStreams streams = createCountStreams(props, countTopic, latch);
runGenericBenchmark(streams, "Streams Count Performance [records/latency/rec-sec/MB-sec counted]: ", latch);
}
/**
* Measure the performance of a KStream-KTable left join. The setup is such that each
* KStream record joins to exactly one element in the KTable
*/
public void kStreamKTableJoin(String kStreamTopic, String kTableTopic) throws Exception {
CountDownLatch latch = new CountDownLatch(numRecords);
if (maybeSetupPhase(kStreamTopic, "simple-benchmark-produce-kstream", false)) {
maybeSetupPhase(kTableTopic, "simple-benchmark-produce-ktable", false);
return;
}
// initialize topics
System.out.println("Initializing kStreamTopic " + kStreamTopic);
produce(kStreamTopic, VALUE_SIZE, "simple-benchmark-produce-kstream", numRecords, false, numRecords, false);
System.out.println("Initializing kTableTopic " + kTableTopic);
produce(kTableTopic, VALUE_SIZE, "simple-benchmark-produce-ktable", numRecords, true, numRecords, false);
CountDownLatch latch = new CountDownLatch(1);
// setup join
Properties props = setJoinProperties("simple-benchmark-kstream-ktable-join");
Properties props = setStreamProperties("simple-benchmark-kstream-ktable-join");
final KafkaStreams streams = createKafkaStreamsKStreamKTableJoin(props, kStreamTopic, kTableTopic, latch);
// run benchmark
runJoinBenchmark(streams, "Streams KStreamKTable LeftJoin Performance [MB/s joined]: ", latch);
runGenericBenchmark(streams, "Streams KStreamKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
}
/**
@ -174,20 +295,19 @@ public class SimpleBenchmark {
* KStream record joins to exactly one element in the other KStream
*/
public void kStreamKStreamJoin(String kStreamTopic1, String kStreamTopic2) throws Exception {
CountDownLatch latch = new CountDownLatch(numRecords);
if (maybeSetupPhase(kStreamTopic1, "simple-benchmark-produce-kstream-topic1", false)) {
maybeSetupPhase(kStreamTopic2, "simple-benchmark-produce-kstream-topic2", false);
return;
}
// initialize topics
System.out.println("Initializing kStreamTopic " + kStreamTopic1);
produce(kStreamTopic1, VALUE_SIZE, "simple-benchmark-produce-kstream-topic1", numRecords, true, numRecords, false);
System.out.println("Initializing kStreamTopic " + kStreamTopic2);
produce(kStreamTopic2, VALUE_SIZE, "simple-benchmark-produce-kstream-topic2", numRecords, true, numRecords, false);
CountDownLatch latch = new CountDownLatch(1);
// setup join
Properties props = setJoinProperties("simple-benchmark-kstream-kstream-join");
Properties props = setStreamProperties("simple-benchmark-kstream-kstream-join");
final KafkaStreams streams = createKafkaStreamsKStreamKStreamJoin(props, kStreamTopic1, kStreamTopic2, latch);
// run benchmark
runJoinBenchmark(streams, "Streams KStreamKStream LeftJoin Performance [MB/s joined]: ", latch);
runGenericBenchmark(streams, "Streams KStreamKStream LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
}
/**
@ -195,23 +315,29 @@ public class SimpleBenchmark {
* KTable record joins to exactly one element in the other KTable
*/
public void kTableKTableJoin(String kTableTopic1, String kTableTopic2) throws Exception {
CountDownLatch latch = new CountDownLatch(numRecords);
// initialize topics
System.out.println("Initializing kTableTopic " + kTableTopic1);
produce(kTableTopic1, VALUE_SIZE, "simple-benchmark-produce-ktable-topic1", numRecords, true, numRecords, false);
System.out.println("Initializing kTableTopic " + kTableTopic2);
produce(kTableTopic2, VALUE_SIZE, "simple-benchmark-produce-ktable-topic2", numRecords, true, numRecords, false);
if (maybeSetupPhase(kTableTopic1, "simple-benchmark-produce-ktable-topic1", false)) {
maybeSetupPhase(kTableTopic2, "simple-benchmark-produce-ktable-topic2", false);
return;
}
CountDownLatch latch = new CountDownLatch(1);
// setup join
Properties props = setJoinProperties("simple-benchmark-ktable-ktable-join");
Properties props = setStreamProperties("simple-benchmark-ktable-ktable-join");
final KafkaStreams streams = createKafkaStreamsKTableKTableJoin(props, kTableTopic1, kTableTopic2, latch);
// run benchmark
runJoinBenchmark(streams, "Streams KTableKTable LeftJoin Performance [MB/s joined]: ", latch);
runGenericBenchmark(streams, "Streams KTableKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
}
private void runJoinBenchmark(final KafkaStreams streams, final String nameOfBenchmark, final CountDownLatch latch) {
private void printResults(final String nameOfBenchmark, final long latency) {
System.out.println(nameOfBenchmark +
processedRecords + "/" +
latency + "/" +
recordsPerSec(latency, processedRecords) + "/" +
megaBytePerSec(latency, processedRecords, RECORD_SIZE));
}
private void runGenericBenchmark(final KafkaStreams streams, final String nameOfBenchmark, final CountDownLatch latch) {
streams.start();
long startTime = System.currentTimeMillis();
@ -224,20 +350,12 @@ public class SimpleBenchmark {
}
}
long endTime = System.currentTimeMillis();
System.out.println(nameOfBenchmark + megaBytePerSec(endTime - startTime, numRecords, KEY_SIZE + VALUE_SIZE));
printResults(nameOfBenchmark, endTime - startTime);
streams.close();
}
public void processStream(String topic) {
CountDownLatch latch = new CountDownLatch(1);
final KafkaStreams streams = createKafkaStreams(topic, stateDir, kafka, latch);
private long startStreamsThread(final KafkaStreams streams, final CountDownLatch latch) throws Exception {
Thread thread = new Thread() {
public void run() {
streams.start();
@ -257,96 +375,73 @@ public class SimpleBenchmark {
long endTime = System.currentTimeMillis();
System.out.println("Streams Performance [MB/sec read]: " + megaBytePerSec(endTime - startTime));
streams.close();
try {
thread.join();
} catch (Exception ex) {
// ignore
}
return endTime - startTime;
}
public void processStream(final String topic) throws Exception {
if (maybeSetupPhase(topic, "simple-benchmark-process-stream-load", true)) {
return;
}
public void processStreamWithSink(String topic) {
CountDownLatch latch = new CountDownLatch(1);
final KafkaStreams streams = createKafkaStreamsWithSink(topic, stateDir, kafka, latch);
final KafkaStreams streams = createKafkaStreams(topic, latch);
long latency = startStreamsThread(streams, latch);
Thread thread = new Thread() {
public void run() {
streams.start();
}
};
thread.start();
long startTime = System.currentTimeMillis();
while (latch.getCount() > 0) {
try {
latch.await();
} catch (InterruptedException ex) {
Thread.interrupted();
}
printResults("Streams Performance [records/latency/rec-sec/MB-sec source]: ", latency);
}
long endTime = System.currentTimeMillis();
System.out.println("Streams Performance [MB/sec read+write]: " + megaBytePerSec(endTime - startTime));
streams.close();
try {
thread.join();
} catch (Exception ex) {
// ignore
}
public void processStreamWithSink(String topic) throws Exception {
if (maybeSetupPhase(topic, "simple-benchmark-process-stream-with-sink-load", true)) {
return;
}
private void internalProcessStreamWithStore(final KafkaStreams streams, final CountDownLatch latch,
final String message) {
Thread thread = new Thread() {
public void run() {
streams.start();
}
};
thread.start();
long startTime = System.currentTimeMillis();
while (latch.getCount() > 0) {
try {
latch.await();
} catch (InterruptedException ex) {
Thread.interrupted();
}
}
long endTime = System.currentTimeMillis();
System.out.println(message + megaBytePerSec(endTime - startTime));
streams.close();
try {
thread.join();
} catch (Exception ex) {
// ignore
}
}
public void processStreamWithStateStore(String topic) {
CountDownLatch latch = new CountDownLatch(1);
final KafkaStreams streams = createKafkaStreamsWithSink(topic, latch);
long latency = startStreamsThread(streams, latch);
final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, stateDir, kafka, latch, false);
internalProcessStreamWithStore(streams, latch, "Streams Performance [MB/sec read+store]: ");
printResults("Streams Performance [records/latency/rec-sec/MB-sec source+sink]: ", latency);
}
public void processStreamWithCachedStateStore(String topic) {
public void processStreamWithStateStore(String topic) throws Exception {
if (maybeSetupPhase(topic, "simple-benchmark-process-stream-with-state-store-load", true)) {
return;
}
CountDownLatch latch = new CountDownLatch(1);
final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, latch, false);
long latency = startStreamsThread(streams, latch);
printResults("Streams Performance [records/latency/rec-sec/MB-sec source+store]: ", latency);
final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, stateDir, kafka, latch, true);
internalProcessStreamWithStore(streams, latch, "Streams Performance [MB/sec read+cache+store]: ");
}
public void processStreamWithCachedStateStore(String topic) throws Exception {
if (maybeSetupPhase(topic, "simple-benchmark-process-stream-with-cached-state-store-load", true)) {
return;
}
CountDownLatch latch = new CountDownLatch(1);
final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, latch, true);
long latency = startStreamsThread(streams, latch);
printResults("Streams Performance [records/latency/rec-sec/MB-sec source+cache+store]: ", latency);
}
public void produce(String topic) throws Exception {
// loading phase does not make sense for producer
if (loadPhase) {
return;
}
produce(topic, VALUE_SIZE, "simple-benchmark-produce", numRecords, true, numRecords, true);
}
/**
* Produce values to a topic
* @param topic Topic to produce to
@ -358,17 +453,18 @@ public class SimpleBenchmark {
* @param printStats if True, print stats on how long producing took. If False, don't print stats. False can be used
* when this produce step is part of another benchmark that produces its own stats
*/
public void produce(String topic, int valueSizeBytes, String clientId, int numRecords, boolean sequential,
private void produce(String topic, int valueSizeBytes, String clientId, int numRecords, boolean sequential,
int upperRange, boolean printStats) throws Exception {
if (sequential) {
if (upperRange < numRecords) throw new Exception("UpperRange must be >= numRecords");
}
Properties props = new Properties();
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
if (!sequential) {
System.out.println("WARNING: You are using non-sequential keys. If your tests' exit logic expects to see a final key, random keys may not work.");
}
Properties props = setProduceConsumeProperties(clientId);
int key = 0;
Random rand = new Random();
KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(props);
@ -387,17 +483,22 @@ public class SimpleBenchmark {
long endTime = System.currentTimeMillis();
if (printStats)
System.out.println("Producer Performance [MB/sec write]: " + megaBytePerSec(endTime - startTime, numRecords, KEY_SIZE + valueSizeBytes));
if (printStats) {
System.out.println("Producer Performance [records/latency/rec-sec/MB-sec write]: " +
numRecords + "/" +
(endTime - startTime) + "/" +
recordsPerSec(endTime - startTime, numRecords) + "/" +
megaBytePerSec(endTime - startTime, numRecords, KEY_SIZE + valueSizeBytes));
}
}
public void consume(String topic) {
Properties props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "simple-benchmark-consumer");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
public void consume(String topic) throws Exception {
int consumedRecords = 0;
if (maybeSetupPhase(topic, "simple-benchmark-consumer-load", true)) {
return;
}
Properties props = setProduceConsumeProperties("simple-benchmark-consumer");
KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(props);
@ -412,31 +513,34 @@ public class SimpleBenchmark {
while (true) {
ConsumerRecords<Integer, byte[]> records = consumer.poll(500);
if (records.isEmpty()) {
if (endKey.equals(key))
if (consumedRecords == numRecords)
break;
} else {
for (ConsumerRecord<Integer, byte[]> record : records) {
consumedRecords++;
Integer recKey = record.key();
if (key == null || key < recKey)
key = recKey;
if (consumedRecords == numRecords)
break;
}
}
if (consumedRecords == numRecords)
break;
}
long endTime = System.currentTimeMillis();
consumer.close();
System.out.println("Consumer Performance [MB/sec read]: " + megaBytePerSec(endTime - startTime));
System.out.println("Consumer Performance [records/latency/rec-sec/MB-sec read]: " +
consumedRecords + "/" +
(endTime - startTime) + "/" +
recordsPerSec(endTime - startTime, consumedRecords) + "/" +
megaBytePerSec(endTime - startTime, consumedRecords, RECORD_SIZE));
}
private KafkaStreams createKafkaStreams(String topic, File stateDir, String kafka, final CountDownLatch latch) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams");
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
private KafkaStreams createKafkaStreams(String topic, final CountDownLatch latch) {
Properties props = setStreamProperties("simple-benchmark-streams");
KStreamBuilder builder = new KStreamBuilder();
@ -453,7 +557,8 @@ public class SimpleBenchmark {
@Override
public void process(Integer key, byte[] value) {
if (endKey.equals(key)) {
processedRecords++;
if (processedRecords == numRecords) {
latch.countDown();
}
}
@ -472,13 +577,8 @@ public class SimpleBenchmark {
return new KafkaStreams(builder, props);
}
private KafkaStreams createKafkaStreamsWithSink(String topic, File stateDir, String kafka, final CountDownLatch latch) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams-with-sink");
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
private KafkaStreams createKafkaStreamsWithSink(String topic, final CountDownLatch latch) {
final Properties props = setStreamProperties("simple-benchmark-streams-with-sink");
KStreamBuilder builder = new KStreamBuilder();
@ -495,7 +595,8 @@ public class SimpleBenchmark {
@Override
public void process(Integer key, byte[] value) {
if (endKey.equals(key)) {
processedRecords++;
if (processedRecords == numRecords) {
latch.countDown();
}
}
@ -514,16 +615,19 @@ public class SimpleBenchmark {
return new KafkaStreams(builder, props);
}
private class CountDownAction<K, V> implements ForeachAction<K, V> {
private class CountDownAction<V> implements ForeachAction<Integer, V> {
private CountDownLatch latch;
CountDownAction(final CountDownLatch latch) {
this.latch = latch;
}
@Override
public void apply(K key, V value) {
public void apply(Integer key, V value) {
processedRecords++;
if (processedRecords == numRecords) {
this.latch.countDown();
}
}
}
private KafkaStreams createKafkaStreamsKStreamKTableJoin(Properties streamConfig, String kStreamTopic,
String kTableTopic, final CountDownLatch latch) {
@ -562,15 +666,10 @@ public class SimpleBenchmark {
return new KafkaStreams(builder, streamConfig);
}
private KafkaStreams createKafkaStreamsWithStateStore(String topic, File stateDir, String kafka,
private KafkaStreams createKafkaStreamsWithStateStore(String topic,
final CountDownLatch latch,
boolean enableCaching) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams-with-store" + enableCaching);
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Properties props = setStreamProperties("simple-benchmark-streams-with-store" + enableCaching);
KStreamBuilder builder = new KStreamBuilder();
@ -596,8 +695,8 @@ public class SimpleBenchmark {
@Override
public void process(Integer key, byte[] value) {
store.put(key, value);
if (endKey.equals(key)) {
processedRecords++;
if (processedRecords == numRecords) {
latch.countDown();
}
}
@ -616,14 +715,15 @@ public class SimpleBenchmark {
return new KafkaStreams(builder, props);
}
private double megaBytePerSec(long time) {
return (double) (RECORD_SIZE * numRecords / 1024 / 1024) / ((double) time / 1000);
}
private double megaBytePerSec(long time, int numRecords, int recordSizeBytes) {
return (double) (recordSizeBytes * numRecords / 1024 / 1024) / ((double) time / 1000);
}
private double recordsPerSec(long time, int numRecords) {
return (double) numRecords / ((double) time / 1000);
}
private List<TopicPartition> getAllPartitions(KafkaConsumer<?, ?> consumer, String... topics) {
ArrayList<TopicPartition> partitions = new ArrayList<>();

View File

@ -13,41 +13,86 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.tests.test import Test
from ducktape.mark.resource import cluster
from ducktape.mark import parametrize, matrix
from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.services.performance.streams_performance import StreamsSimpleBenchmarkService
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.version import DEV_BRANCH
class StreamsSimpleBenchmarkTest(KafkaTest):
class StreamsSimpleBenchmarkTest(Test):
"""
Simple benchmark of Kafka Streams.
"""
def __init__(self, test_context):
super(StreamsSimpleBenchmarkTest, self).__init__(test_context, num_zk=1, num_brokers=1,topics={
'simpleBenchmarkSourceTopic' : { 'partitions': 1, 'replication-factor': 1 },
'simpleBenchmarkSinkTopic' : { 'partitions': 1, 'replication-factor': 1 },
'joinSourceTopic1KStreamKStream' : { 'partitions': 1, 'replication-factor': 1 },
'joinSourceTopic2KStreamKStream' : { 'partitions': 1, 'replication-factor': 1 },
'joinSourceTopic1KStreamKTable' : { 'partitions': 1, 'replication-factor': 1 },
'joinSourceTopic2KStreamKTable' : { 'partitions': 1, 'replication-factor': 1 },
'joinSourceTopic1KTableKTable' : { 'partitions': 1, 'replication-factor': 1 },
'joinSourceTopic2KTableKTable' : { 'partitions': 1, 'replication-factor': 1 }
})
super(StreamsSimpleBenchmarkTest, self).__init__(test_context)
self.num_records = 2000000L
self.replication = 1
self.driver = StreamsSimpleBenchmarkService(test_context, self.kafka, 1000000L)
@cluster(num_nodes=3)
def test_simple_benchmark(self):
@cluster(num_nodes=9)
@matrix(test=["all"], scale=[1])
def test_simple_benchmark(self, test, scale):
"""
Run simple Kafka Streams benchmark
"""
self.driver = [None] * (scale + 1)
node = [None] * (scale)
data = [None] * (scale)
self.driver.start()
self.driver.wait()
self.driver.stop()
node = self.driver.node
node.account.ssh("grep Performance %s" % self.driver.STDOUT_FILE, allow_fail=False)
#############
# SETUP PHASE
#############
self.zk = ZookeeperService(self.test_context, num_nodes=1)
self.zk.start()
self.kafka = KafkaService(self.test_context, num_nodes=scale, zk=self.zk, version=DEV_BRANCH, topics={
'simpleBenchmarkSourceTopic' : { 'partitions': scale, 'replication-factor': self.replication },
'countTopic' : { 'partitions': scale, 'replication-factor': self.replication },
'simpleBenchmarkSinkTopic' : { 'partitions': scale, 'replication-factor': self.replication },
'joinSourceTopic1KStreamKStream' : { 'partitions': scale, 'replication-factor': self.replication },
'joinSourceTopic2KStreamKStream' : { 'partitions': scale, 'replication-factor': self.replication },
'joinSourceTopic1KStreamKTable' : { 'partitions': scale, 'replication-factor': self.replication },
'joinSourceTopic2KStreamKTable' : { 'partitions': scale, 'replication-factor': self.replication },
'joinSourceTopic1KTableKTable' : { 'partitions': scale, 'replication-factor': self.replication },
'joinSourceTopic2KTableKTable' : { 'partitions': scale, 'replication-factor': self.replication }
})
self.kafka.start()
return self.driver.collect_data(node)
################
# LOAD PHASE
################
self.load_driver = StreamsSimpleBenchmarkService(self.test_context, self.kafka,
self.num_records * scale, "true", test)
self.load_driver.start()
self.load_driver.wait()
self.load_driver.stop()
################
# RUN PHASE
################
for num in range(0, scale):
self.driver[num] = StreamsSimpleBenchmarkService(self.test_context, self.kafka,
self.num_records/(scale), "false", test)
self.driver[num].start()
#######################
# STOP + COLLECT PHASE
#######################
for num in range(0, scale):
self.driver[num].wait()
self.driver[num].stop()
node[num] = self.driver[num].node
node[num].account.ssh("grep Performance %s" % self.driver[num].STDOUT_FILE, allow_fail=False)
data[num] = self.driver[num].collect_data(node[num], "" )
final = {}
for num in range(0, scale):
for key in data[num]:
final[key + str(num)] = data[num][key]
return final

View File

@ -22,17 +22,19 @@ from kafkatest.services.streams import StreamsTestBaseService
class StreamsSimpleBenchmarkService(StreamsTestBaseService):
"""Base class for simple Kafka Streams benchmark"""
def __init__(self, test_context, kafka, numrecs):
def __init__(self, test_context, kafka, numrecs, load_phase, test_name):
super(StreamsSimpleBenchmarkService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.perf.SimpleBenchmark",
numrecs)
numrecs,
load_phase,
test_name)
def collect_data(self, node):
def collect_data(self, node, tag = None):
# Collect the data and return it to the framework
output = node.account.ssh_capture("grep Performance %s" % self.STDOUT_FILE)
data = {}
for line in output:
parts = line.split(':')
data[parts[0]] = float(parts[1])
data[tag + parts[0]] = parts[1]
return data

View File

@ -45,11 +45,14 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
"collect_default": True},
}
def __init__(self, test_context, kafka, streams_class_name, user_test_args):
def __init__(self, test_context, kafka, streams_class_name, user_test_args, user_test_args1, user_test_args2):
super(StreamsTestBaseService, self).__init__(test_context, 1)
self.kafka = kafka
self.args = {'streams_class_name': streams_class_name,
'user_test_args': user_test_args}
'user_test_args': user_test_args,
'user_test_args1': user_test_args1,
'user_test_args2': user_test_args2}
self.log_level = "DEBUG"
@property
def node(self):
@ -118,7 +121,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
"INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
" %(kafka)s %(state_dir)s %(user_test_args)s" \
" %(kafka)s %(state_dir)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s" \
" & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
return cmd