diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java index 7a10b79c10a..baef1e9541e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java +++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java @@ -51,18 +51,32 @@ import org.apache.kafka.test.TestUtils; import java.io.File; import java.util.ArrayList; import java.util.List; +import java.util.Locale; import java.util.concurrent.CountDownLatch; import java.util.Properties; import java.util.Random; +/** + * Class that provides support for a series of benchmarks. It is usually driven by + * tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py. + * If ran manually through the main() function below, you must do the following: + * 1. Have ZK and a Kafka broker set up + * 2. Run the loading step first: SimpleBenchmark localhost:9092 /tmp/statedir numRecords true "all" + * 3. Run the stream processing step second: SimpleBenchmark localhost:9092 /tmp/statedir numRecords false "all" + * Note that what changed is the 4th parameter, from "true" indicating that is a load phase, to "false" indicating + * that this is a real run. + */ public class SimpleBenchmark { private final String kafka; private final File stateDir; - + private final Boolean loadPhase; + private final String testName; + private static final String ALL_TESTS = "all"; private static final String SOURCE_TOPIC = "simpleBenchmarkSourceTopic"; private static final String SINK_TOPIC = "simpleBenchmarkSinkTopic"; + private static final String COUNT_TOPIC = "countTopic"; private static final String JOIN_TOPIC_1_PREFIX = "joinSourceTopic1"; private static final String JOIN_TOPIC_2_PREFIX = "joinSourceTopic2"; private static final ValueJoiner VALUE_JOINER = new ValueJoiner() { @@ -83,7 +97,7 @@ public class SimpleBenchmark { }; private static int numRecords; - private static Integer endKey; + private static int processedRecords = 0; private static final int KEY_SIZE = 8; private static final int VALUE_SIZE = 100; private static final int RECORD_SIZE = KEY_SIZE + VALUE_SIZE; @@ -91,17 +105,80 @@ public class SimpleBenchmark { private static final Serde BYTE_SERDE = Serdes.ByteArray(); private static final Serde INTEGER_SERDE = Serdes.Integer(); - public SimpleBenchmark(File stateDir, String kafka) { + public SimpleBenchmark(final File stateDir, final String kafka, final Boolean loadPhase, final String testName) { super(); this.stateDir = stateDir; this.kafka = kafka; + this.loadPhase = loadPhase; + this.testName = testName; + } + + private void run() throws Exception { + switch (testName) { + case ALL_TESTS: + // producer performance + produce(SOURCE_TOPIC); + // consumer performance + consume(SOURCE_TOPIC); + // simple stream performance source->process + processStream(SOURCE_TOPIC); + // simple stream performance source->sink + processStreamWithSink(SOURCE_TOPIC); + // simple stream performance source->store + processStreamWithStateStore(SOURCE_TOPIC); + // simple stream performance source->cache->store + processStreamWithCachedStateStore(SOURCE_TOPIC); + // simple aggregation + count(COUNT_TOPIC); + // simple streams performance KSTREAM-KTABLE join + kStreamKTableJoin(JOIN_TOPIC_1_PREFIX + "KStreamKTable", JOIN_TOPIC_2_PREFIX + "KStreamKTable"); + // simple streams performance KSTREAM-KSTREAM join + kStreamKStreamJoin(JOIN_TOPIC_1_PREFIX + "KStreamKStream", JOIN_TOPIC_2_PREFIX + "KStreamKStream"); + // simple streams performance KTABLE-KTABLE join + kTableKTableJoin(JOIN_TOPIC_1_PREFIX + "KTableKTable", JOIN_TOPIC_2_PREFIX + "KTableKTable"); + break; + case "produce": + produce(SOURCE_TOPIC, VALUE_SIZE, "simple-benchmark-produce", numRecords, true, numRecords, true); + break; + case "consume": + consume(SOURCE_TOPIC); + break; + case "count": + count(COUNT_TOPIC); + break; + case "processstream": + processStream(SOURCE_TOPIC); + break; + case "processstreamwithsink": + processStreamWithSink(SOURCE_TOPIC); + break; + case "processstreamwithstatestore": + processStreamWithStateStore(SOURCE_TOPIC); + break; + case "processstreamwithcachedstatestore": + processStreamWithCachedStateStore(SOURCE_TOPIC); + break; + case "kstreamktablejoin": + kStreamKTableJoin(JOIN_TOPIC_1_PREFIX + "KStreamKTable", JOIN_TOPIC_2_PREFIX + "KStreamKTable"); + break; + case "kstreamkstreamjoin": + kStreamKStreamJoin(JOIN_TOPIC_1_PREFIX + "KStreamKStream", JOIN_TOPIC_2_PREFIX + "KStreamKStream"); + break; + case "ktablektablejoin": + kTableKTableJoin(JOIN_TOPIC_1_PREFIX + "KTableKTable", JOIN_TOPIC_2_PREFIX + "KTableKTable"); + break; + default: + throw new Exception("Unknown test name " + testName); + + } } public static void main(String[] args) throws Exception { String kafka = args.length > 0 ? args[0] : "localhost:9092"; String stateDirStr = args.length > 1 ? args[1] : TestUtils.tempDirectory().getAbsolutePath(); numRecords = args.length > 2 ? Integer.parseInt(args[2]) : 10000000; - endKey = numRecords - 1; + boolean loadPhase = args.length > 3 ? Boolean.parseBoolean(args[3]) : false; + String testName = args.length > 4 ? args[4].toLowerCase(Locale.ROOT) : ALL_TESTS; final File stateDir = new File(stateDirStr); stateDir.mkdir(); @@ -113,30 +190,14 @@ public class SimpleBenchmark { System.out.println("kafka=" + kafka); System.out.println("stateDir=" + stateDir); System.out.println("numRecords=" + numRecords); + System.out.println("loadPhase=" + loadPhase); + System.out.println("testName=" + testName); - SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka); - - // producer performance - benchmark.produce(SOURCE_TOPIC, VALUE_SIZE, "simple-benchmark-produce", numRecords, true, numRecords, true); - // consumer performance - benchmark.consume(SOURCE_TOPIC); - // simple stream performance source->process - benchmark.processStream(SOURCE_TOPIC); - // simple stream performance source->sink - benchmark.processStreamWithSink(SOURCE_TOPIC); - // simple stream performance source->store - benchmark.processStreamWithStateStore(SOURCE_TOPIC); - // simple stream performance source->cache->store - benchmark.processStreamWithCachedStateStore(SOURCE_TOPIC); - // simple streams performance KSTREAM-KTABLE join - benchmark.kStreamKTableJoin(JOIN_TOPIC_1_PREFIX + "KStreamKTable", JOIN_TOPIC_2_PREFIX + "KStreamKTable"); - // simple streams performance KSTREAM-KSTREAM join - benchmark.kStreamKStreamJoin(JOIN_TOPIC_1_PREFIX + "KStreamKStream", JOIN_TOPIC_2_PREFIX + "KStreamKStream"); - // simple streams performance KTABLE-KTABLE join - benchmark.kTableKTableJoin(JOIN_TOPIC_1_PREFIX + "KTableKTable", JOIN_TOPIC_2_PREFIX + "KTableKTable"); + SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, loadPhase, testName); + benchmark.run(); } - private Properties setJoinProperties(final String applicationId) { + private Properties setStreamProperties(final String applicationId) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString()); @@ -148,25 +209,85 @@ public class SimpleBenchmark { return props; } + private Properties setProduceConsumeProperties(final String clientId) { + Properties props = new Properties(); + props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + return props; + } + + private boolean maybeSetupPhase(final String topic, final String clientId, + final boolean skipIfAllTests) throws Exception { + processedRecords = 0; + // initialize topics + if (loadPhase) { + if (skipIfAllTests) { + // if we run all tests, the produce test will have already loaded the data + if (testName.equals(ALL_TESTS)) { + // Skipping loading phase since previously loaded + return true; + } + } + System.out.println("Initializing topic " + topic); + // WARNING: The keys must be sequential, i.e., unique, otherwise the logic for when this test + // stops will not work (in createCountStreams) + produce(topic, VALUE_SIZE, clientId, numRecords, true, numRecords, false); + return true; + } + return false; + } + + private KafkaStreams createCountStreams(Properties streamConfig, String topic, final CountDownLatch latch) { + final KStreamBuilder builder = new KStreamBuilder(); + final KStream input = builder.stream(topic); + + input.groupByKey() + .count("tmpStoreName").foreach(new CountDownAction(latch)); + + return new KafkaStreams(builder, streamConfig); + } + + /** + * Measure the performance of a simple aggregate like count. + * Counts the occurrence of numbers (note that normally people count words, this + * example counts numbers) + * @param countTopic Topic where numbers are stored + * @throws Exception + */ + public void count(String countTopic) throws Exception { + if (maybeSetupPhase(countTopic, "simple-benchmark-produce-count", false)) { + return; + } + + CountDownLatch latch = new CountDownLatch(1); + Properties props = setStreamProperties("simple-benchmark-count"); + final KafkaStreams streams = createCountStreams(props, countTopic, latch); + runGenericBenchmark(streams, "Streams Count Performance [records/latency/rec-sec/MB-sec counted]: ", latch); + } + /** * Measure the performance of a KStream-KTable left join. The setup is such that each * KStream record joins to exactly one element in the KTable */ public void kStreamKTableJoin(String kStreamTopic, String kTableTopic) throws Exception { - CountDownLatch latch = new CountDownLatch(numRecords); + if (maybeSetupPhase(kStreamTopic, "simple-benchmark-produce-kstream", false)) { + maybeSetupPhase(kTableTopic, "simple-benchmark-produce-ktable", false); + return; + } - // initialize topics - System.out.println("Initializing kStreamTopic " + kStreamTopic); - produce(kStreamTopic, VALUE_SIZE, "simple-benchmark-produce-kstream", numRecords, false, numRecords, false); - System.out.println("Initializing kTableTopic " + kTableTopic); - produce(kTableTopic, VALUE_SIZE, "simple-benchmark-produce-ktable", numRecords, true, numRecords, false); + CountDownLatch latch = new CountDownLatch(1); // setup join - Properties props = setJoinProperties("simple-benchmark-kstream-ktable-join"); + Properties props = setStreamProperties("simple-benchmark-kstream-ktable-join"); final KafkaStreams streams = createKafkaStreamsKStreamKTableJoin(props, kStreamTopic, kTableTopic, latch); // run benchmark - runJoinBenchmark(streams, "Streams KStreamKTable LeftJoin Performance [MB/s joined]: ", latch); + runGenericBenchmark(streams, "Streams KStreamKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch); } /** @@ -174,20 +295,19 @@ public class SimpleBenchmark { * KStream record joins to exactly one element in the other KStream */ public void kStreamKStreamJoin(String kStreamTopic1, String kStreamTopic2) throws Exception { - CountDownLatch latch = new CountDownLatch(numRecords); + if (maybeSetupPhase(kStreamTopic1, "simple-benchmark-produce-kstream-topic1", false)) { + maybeSetupPhase(kStreamTopic2, "simple-benchmark-produce-kstream-topic2", false); + return; + } - // initialize topics - System.out.println("Initializing kStreamTopic " + kStreamTopic1); - produce(kStreamTopic1, VALUE_SIZE, "simple-benchmark-produce-kstream-topic1", numRecords, true, numRecords, false); - System.out.println("Initializing kStreamTopic " + kStreamTopic2); - produce(kStreamTopic2, VALUE_SIZE, "simple-benchmark-produce-kstream-topic2", numRecords, true, numRecords, false); + CountDownLatch latch = new CountDownLatch(1); // setup join - Properties props = setJoinProperties("simple-benchmark-kstream-kstream-join"); + Properties props = setStreamProperties("simple-benchmark-kstream-kstream-join"); final KafkaStreams streams = createKafkaStreamsKStreamKStreamJoin(props, kStreamTopic1, kStreamTopic2, latch); // run benchmark - runJoinBenchmark(streams, "Streams KStreamKStream LeftJoin Performance [MB/s joined]: ", latch); + runGenericBenchmark(streams, "Streams KStreamKStream LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch); } /** @@ -195,23 +315,29 @@ public class SimpleBenchmark { * KTable record joins to exactly one element in the other KTable */ public void kTableKTableJoin(String kTableTopic1, String kTableTopic2) throws Exception { - CountDownLatch latch = new CountDownLatch(numRecords); - - // initialize topics - System.out.println("Initializing kTableTopic " + kTableTopic1); - produce(kTableTopic1, VALUE_SIZE, "simple-benchmark-produce-ktable-topic1", numRecords, true, numRecords, false); - System.out.println("Initializing kTableTopic " + kTableTopic2); - produce(kTableTopic2, VALUE_SIZE, "simple-benchmark-produce-ktable-topic2", numRecords, true, numRecords, false); + if (maybeSetupPhase(kTableTopic1, "simple-benchmark-produce-ktable-topic1", false)) { + maybeSetupPhase(kTableTopic2, "simple-benchmark-produce-ktable-topic2", false); + return; + } + CountDownLatch latch = new CountDownLatch(1); // setup join - Properties props = setJoinProperties("simple-benchmark-ktable-ktable-join"); + Properties props = setStreamProperties("simple-benchmark-ktable-ktable-join"); final KafkaStreams streams = createKafkaStreamsKTableKTableJoin(props, kTableTopic1, kTableTopic2, latch); // run benchmark - runJoinBenchmark(streams, "Streams KTableKTable LeftJoin Performance [MB/s joined]: ", latch); + runGenericBenchmark(streams, "Streams KTableKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch); } - private void runJoinBenchmark(final KafkaStreams streams, final String nameOfBenchmark, final CountDownLatch latch) { + private void printResults(final String nameOfBenchmark, final long latency) { + System.out.println(nameOfBenchmark + + processedRecords + "/" + + latency + "/" + + recordsPerSec(latency, processedRecords) + "/" + + megaBytePerSec(latency, processedRecords, RECORD_SIZE)); + } + + private void runGenericBenchmark(final KafkaStreams streams, final String nameOfBenchmark, final CountDownLatch latch) { streams.start(); long startTime = System.currentTimeMillis(); @@ -224,20 +350,12 @@ public class SimpleBenchmark { } } long endTime = System.currentTimeMillis(); - - - System.out.println(nameOfBenchmark + megaBytePerSec(endTime - startTime, numRecords, KEY_SIZE + VALUE_SIZE)); + printResults(nameOfBenchmark, endTime - startTime); streams.close(); } - - - public void processStream(String topic) { - CountDownLatch latch = new CountDownLatch(1); - - final KafkaStreams streams = createKafkaStreams(topic, stateDir, kafka, latch); - + private long startStreamsThread(final KafkaStreams streams, final CountDownLatch latch) throws Exception { Thread thread = new Thread() { public void run() { streams.start(); @@ -257,96 +375,73 @@ public class SimpleBenchmark { long endTime = System.currentTimeMillis(); - System.out.println("Streams Performance [MB/sec read]: " + megaBytePerSec(endTime - startTime)); - streams.close(); try { thread.join(); } catch (Exception ex) { // ignore } + + return endTime - startTime; } - public void processStreamWithSink(String topic) { + public void processStream(final String topic) throws Exception { + if (maybeSetupPhase(topic, "simple-benchmark-process-stream-load", true)) { + return; + } + CountDownLatch latch = new CountDownLatch(1); - final KafkaStreams streams = createKafkaStreamsWithSink(topic, stateDir, kafka, latch); + final KafkaStreams streams = createKafkaStreams(topic, latch); + long latency = startStreamsThread(streams, latch); - Thread thread = new Thread() { - public void run() { - streams.start(); - } - }; - thread.start(); - - long startTime = System.currentTimeMillis(); - - while (latch.getCount() > 0) { - try { - latch.await(); - } catch (InterruptedException ex) { - Thread.interrupted(); - } - } - - long endTime = System.currentTimeMillis(); - - System.out.println("Streams Performance [MB/sec read+write]: " + megaBytePerSec(endTime - startTime)); - - streams.close(); - try { - thread.join(); - } catch (Exception ex) { - // ignore - } + printResults("Streams Performance [records/latency/rec-sec/MB-sec source]: ", latency); } - private void internalProcessStreamWithStore(final KafkaStreams streams, final CountDownLatch latch, - final String message) { - Thread thread = new Thread() { - public void run() { - streams.start(); - } - }; - thread.start(); - - long startTime = System.currentTimeMillis(); - - while (latch.getCount() > 0) { - try { - latch.await(); - } catch (InterruptedException ex) { - Thread.interrupted(); - } + public void processStreamWithSink(String topic) throws Exception { + if (maybeSetupPhase(topic, "simple-benchmark-process-stream-with-sink-load", true)) { + return; } - long endTime = System.currentTimeMillis(); - - System.out.println(message + megaBytePerSec(endTime - startTime)); - - streams.close(); - try { - thread.join(); - } catch (Exception ex) { - // ignore - } - } - public void processStreamWithStateStore(String topic) { CountDownLatch latch = new CountDownLatch(1); + final KafkaStreams streams = createKafkaStreamsWithSink(topic, latch); + long latency = startStreamsThread(streams, latch); - final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, stateDir, kafka, latch, false); - internalProcessStreamWithStore(streams, latch, "Streams Performance [MB/sec read+store]: "); + printResults("Streams Performance [records/latency/rec-sec/MB-sec source+sink]: ", latency); } - public void processStreamWithCachedStateStore(String topic) { + public void processStreamWithStateStore(String topic) throws Exception { + if (maybeSetupPhase(topic, "simple-benchmark-process-stream-with-state-store-load", true)) { + return; + } + CountDownLatch latch = new CountDownLatch(1); + final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, latch, false); + long latency = startStreamsThread(streams, latch); + printResults("Streams Performance [records/latency/rec-sec/MB-sec source+store]: ", latency); - final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, stateDir, kafka, latch, true); - - internalProcessStreamWithStore(streams, latch, "Streams Performance [MB/sec read+cache+store]: "); } + public void processStreamWithCachedStateStore(String topic) throws Exception { + if (maybeSetupPhase(topic, "simple-benchmark-process-stream-with-cached-state-store-load", true)) { + return; + } + + CountDownLatch latch = new CountDownLatch(1); + final KafkaStreams streams = createKafkaStreamsWithStateStore(topic, latch, true); + long latency = startStreamsThread(streams, latch); + printResults("Streams Performance [records/latency/rec-sec/MB-sec source+cache+store]: ", latency); + } + + public void produce(String topic) throws Exception { + // loading phase does not make sense for producer + if (loadPhase) { + return; + } + produce(topic, VALUE_SIZE, "simple-benchmark-produce", numRecords, true, numRecords, true); + + } /** * Produce values to a topic * @param topic Topic to produce to @@ -358,17 +453,18 @@ public class SimpleBenchmark { * @param printStats if True, print stats on how long producing took. If False, don't print stats. False can be used * when this produce step is part of another benchmark that produces its own stats */ - public void produce(String topic, int valueSizeBytes, String clientId, int numRecords, boolean sequential, + private void produce(String topic, int valueSizeBytes, String clientId, int numRecords, boolean sequential, int upperRange, boolean printStats) throws Exception { + if (sequential) { if (upperRange < numRecords) throw new Exception("UpperRange must be >= numRecords"); } - Properties props = new Properties(); - props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + if (!sequential) { + System.out.println("WARNING: You are using non-sequential keys. If your tests' exit logic expects to see a final key, random keys may not work."); + } + Properties props = setProduceConsumeProperties(clientId); + int key = 0; Random rand = new Random(); KafkaProducer producer = new KafkaProducer<>(props); @@ -387,17 +483,22 @@ public class SimpleBenchmark { long endTime = System.currentTimeMillis(); - if (printStats) - System.out.println("Producer Performance [MB/sec write]: " + megaBytePerSec(endTime - startTime, numRecords, KEY_SIZE + valueSizeBytes)); + if (printStats) { + System.out.println("Producer Performance [records/latency/rec-sec/MB-sec write]: " + + numRecords + "/" + + (endTime - startTime) + "/" + + recordsPerSec(endTime - startTime, numRecords) + "/" + + megaBytePerSec(endTime - startTime, numRecords, KEY_SIZE + valueSizeBytes)); + } } - public void consume(String topic) { - Properties props = new Properties(); - props.put(ConsumerConfig.CLIENT_ID_CONFIG, "simple-benchmark-consumer"); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + public void consume(String topic) throws Exception { + int consumedRecords = 0; + if (maybeSetupPhase(topic, "simple-benchmark-consumer-load", true)) { + return; + } + + Properties props = setProduceConsumeProperties("simple-benchmark-consumer"); KafkaConsumer consumer = new KafkaConsumer<>(props); @@ -412,31 +513,34 @@ public class SimpleBenchmark { while (true) { ConsumerRecords records = consumer.poll(500); if (records.isEmpty()) { - if (endKey.equals(key)) + if (consumedRecords == numRecords) break; } else { for (ConsumerRecord record : records) { + consumedRecords++; Integer recKey = record.key(); - if (key == null || key < recKey) key = recKey; + if (consumedRecords == numRecords) + break; } } + if (consumedRecords == numRecords) + break; } long endTime = System.currentTimeMillis(); consumer.close(); - System.out.println("Consumer Performance [MB/sec read]: " + megaBytePerSec(endTime - startTime)); + System.out.println("Consumer Performance [records/latency/rec-sec/MB-sec read]: " + + consumedRecords + "/" + + (endTime - startTime) + "/" + + recordsPerSec(endTime - startTime, consumedRecords) + "/" + + megaBytePerSec(endTime - startTime, consumedRecords, RECORD_SIZE)); } - private KafkaStreams createKafkaStreams(String topic, File stateDir, String kafka, final CountDownLatch latch) { - Properties props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams"); - props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString()); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + private KafkaStreams createKafkaStreams(String topic, final CountDownLatch latch) { + Properties props = setStreamProperties("simple-benchmark-streams"); KStreamBuilder builder = new KStreamBuilder(); @@ -453,7 +557,8 @@ public class SimpleBenchmark { @Override public void process(Integer key, byte[] value) { - if (endKey.equals(key)) { + processedRecords++; + if (processedRecords == numRecords) { latch.countDown(); } } @@ -472,13 +577,8 @@ public class SimpleBenchmark { return new KafkaStreams(builder, props); } - private KafkaStreams createKafkaStreamsWithSink(String topic, File stateDir, String kafka, final CountDownLatch latch) { - Properties props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams-with-sink"); - props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString()); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + private KafkaStreams createKafkaStreamsWithSink(String topic, final CountDownLatch latch) { + final Properties props = setStreamProperties("simple-benchmark-streams-with-sink"); KStreamBuilder builder = new KStreamBuilder(); @@ -495,7 +595,8 @@ public class SimpleBenchmark { @Override public void process(Integer key, byte[] value) { - if (endKey.equals(key)) { + processedRecords++; + if (processedRecords == numRecords) { latch.countDown(); } } @@ -514,14 +615,17 @@ public class SimpleBenchmark { return new KafkaStreams(builder, props); } - private class CountDownAction implements ForeachAction { + private class CountDownAction implements ForeachAction { private CountDownLatch latch; CountDownAction(final CountDownLatch latch) { this.latch = latch; } @Override - public void apply(K key, V value) { - this.latch.countDown(); + public void apply(Integer key, V value) { + processedRecords++; + if (processedRecords == numRecords) { + this.latch.countDown(); + } } } @@ -562,15 +666,10 @@ public class SimpleBenchmark { return new KafkaStreams(builder, streamConfig); } - private KafkaStreams createKafkaStreamsWithStateStore(String topic, File stateDir, String kafka, + private KafkaStreams createKafkaStreamsWithStateStore(String topic, final CountDownLatch latch, boolean enableCaching) { - Properties props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams-with-store" + enableCaching); - props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString()); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + Properties props = setStreamProperties("simple-benchmark-streams-with-store" + enableCaching); KStreamBuilder builder = new KStreamBuilder(); @@ -596,8 +695,8 @@ public class SimpleBenchmark { @Override public void process(Integer key, byte[] value) { store.put(key, value); - - if (endKey.equals(key)) { + processedRecords++; + if (processedRecords == numRecords) { latch.countDown(); } } @@ -616,14 +715,15 @@ public class SimpleBenchmark { return new KafkaStreams(builder, props); } - private double megaBytePerSec(long time) { - return (double) (RECORD_SIZE * numRecords / 1024 / 1024) / ((double) time / 1000); - } private double megaBytePerSec(long time, int numRecords, int recordSizeBytes) { return (double) (recordSizeBytes * numRecords / 1024 / 1024) / ((double) time / 1000); } + private double recordsPerSec(long time, int numRecords) { + return (double) numRecords / ((double) time / 1000); + } + private List getAllPartitions(KafkaConsumer consumer, String... topics) { ArrayList partitions = new ArrayList<>(); diff --git a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py index ebd69a6597b..6af7f11c03b 100644 --- a/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py +++ b/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py @@ -13,41 +13,86 @@ # See the License for the specific language governing permissions and # limitations under the License. +from ducktape.tests.test import Test from ducktape.mark.resource import cluster - +from ducktape.mark import parametrize, matrix from kafkatest.tests.kafka_test import KafkaTest + from kafkatest.services.performance.streams_performance import StreamsSimpleBenchmarkService +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService +from kafkatest.version import DEV_BRANCH - -class StreamsSimpleBenchmarkTest(KafkaTest): +class StreamsSimpleBenchmarkTest(Test): """ Simple benchmark of Kafka Streams. """ def __init__(self, test_context): - super(StreamsSimpleBenchmarkTest, self).__init__(test_context, num_zk=1, num_brokers=1,topics={ - 'simpleBenchmarkSourceTopic' : { 'partitions': 1, 'replication-factor': 1 }, - 'simpleBenchmarkSinkTopic' : { 'partitions': 1, 'replication-factor': 1 }, - 'joinSourceTopic1KStreamKStream' : { 'partitions': 1, 'replication-factor': 1 }, - 'joinSourceTopic2KStreamKStream' : { 'partitions': 1, 'replication-factor': 1 }, - 'joinSourceTopic1KStreamKTable' : { 'partitions': 1, 'replication-factor': 1 }, - 'joinSourceTopic2KStreamKTable' : { 'partitions': 1, 'replication-factor': 1 }, - 'joinSourceTopic1KTableKTable' : { 'partitions': 1, 'replication-factor': 1 }, - 'joinSourceTopic2KTableKTable' : { 'partitions': 1, 'replication-factor': 1 } - }) + super(StreamsSimpleBenchmarkTest, self).__init__(test_context) + self.num_records = 2000000L + self.replication = 1 - self.driver = StreamsSimpleBenchmarkService(test_context, self.kafka, 1000000L) - @cluster(num_nodes=3) - def test_simple_benchmark(self): + @cluster(num_nodes=9) + @matrix(test=["all"], scale=[1]) + def test_simple_benchmark(self, test, scale): """ Run simple Kafka Streams benchmark """ + self.driver = [None] * (scale + 1) + node = [None] * (scale) + data = [None] * (scale) - self.driver.start() - self.driver.wait() - self.driver.stop() - node = self.driver.node - node.account.ssh("grep Performance %s" % self.driver.STDOUT_FILE, allow_fail=False) + ############# + # SETUP PHASE + ############# + self.zk = ZookeeperService(self.test_context, num_nodes=1) + self.zk.start() + self.kafka = KafkaService(self.test_context, num_nodes=scale, zk=self.zk, version=DEV_BRANCH, topics={ + 'simpleBenchmarkSourceTopic' : { 'partitions': scale, 'replication-factor': self.replication }, + 'countTopic' : { 'partitions': scale, 'replication-factor': self.replication }, + 'simpleBenchmarkSinkTopic' : { 'partitions': scale, 'replication-factor': self.replication }, + 'joinSourceTopic1KStreamKStream' : { 'partitions': scale, 'replication-factor': self.replication }, + 'joinSourceTopic2KStreamKStream' : { 'partitions': scale, 'replication-factor': self.replication }, + 'joinSourceTopic1KStreamKTable' : { 'partitions': scale, 'replication-factor': self.replication }, + 'joinSourceTopic2KStreamKTable' : { 'partitions': scale, 'replication-factor': self.replication }, + 'joinSourceTopic1KTableKTable' : { 'partitions': scale, 'replication-factor': self.replication }, + 'joinSourceTopic2KTableKTable' : { 'partitions': scale, 'replication-factor': self.replication } + }) + self.kafka.start() + + ################ + # LOAD PHASE + ################ + self.load_driver = StreamsSimpleBenchmarkService(self.test_context, self.kafka, + self.num_records * scale, "true", test) + self.load_driver.start() + self.load_driver.wait() + self.load_driver.stop() - return self.driver.collect_data(node) + ################ + # RUN PHASE + ################ + for num in range(0, scale): + self.driver[num] = StreamsSimpleBenchmarkService(self.test_context, self.kafka, + self.num_records/(scale), "false", test) + self.driver[num].start() + + ####################### + # STOP + COLLECT PHASE + ####################### + for num in range(0, scale): + self.driver[num].wait() + self.driver[num].stop() + node[num] = self.driver[num].node + node[num].account.ssh("grep Performance %s" % self.driver[num].STDOUT_FILE, allow_fail=False) + data[num] = self.driver[num].collect_data(node[num], "" ) + + + final = {} + for num in range(0, scale): + for key in data[num]: + final[key + str(num)] = data[num][key] + + return final diff --git a/tests/kafkatest/services/performance/streams_performance.py b/tests/kafkatest/services/performance/streams_performance.py index 4ccc3b29438..8cedb51a218 100644 --- a/tests/kafkatest/services/performance/streams_performance.py +++ b/tests/kafkatest/services/performance/streams_performance.py @@ -22,17 +22,19 @@ from kafkatest.services.streams import StreamsTestBaseService class StreamsSimpleBenchmarkService(StreamsTestBaseService): """Base class for simple Kafka Streams benchmark""" - def __init__(self, test_context, kafka, numrecs): + def __init__(self, test_context, kafka, numrecs, load_phase, test_name): super(StreamsSimpleBenchmarkService, self).__init__(test_context, kafka, "org.apache.kafka.streams.perf.SimpleBenchmark", - numrecs) + numrecs, + load_phase, + test_name) - def collect_data(self, node): + def collect_data(self, node, tag = None): # Collect the data and return it to the framework output = node.account.ssh_capture("grep Performance %s" % self.STDOUT_FILE) data = {} for line in output: parts = line.split(':') - data[parts[0]] = float(parts[1]) + data[tag + parts[0]] = parts[1] return data diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index a0fa412213b..9e66c78da7f 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -45,11 +45,14 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service): "collect_default": True}, } - def __init__(self, test_context, kafka, streams_class_name, user_test_args): + def __init__(self, test_context, kafka, streams_class_name, user_test_args, user_test_args1, user_test_args2): super(StreamsTestBaseService, self).__init__(test_context, 1) self.kafka = kafka self.args = {'streams_class_name': streams_class_name, - 'user_test_args': user_test_args} + 'user_test_args': user_test_args, + 'user_test_args1': user_test_args1, + 'user_test_args2': user_test_args2} + self.log_level = "DEBUG" @property def node(self): @@ -118,7 +121,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service): cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \ "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \ - " %(kafka)s %(state_dir)s %(user_test_args)s" \ + " %(kafka)s %(state_dir)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s" \ " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args return cmd