MINOR: remove stream simple benchmark suite (#8353)

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Boyang Chen 2020-04-14 09:49:03 -07:00 committed by John Roesler
parent d2c332a1be
commit 25a1ed4cb5
5 changed files with 0 additions and 1344 deletions

View File

@ -1,752 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.perf;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static java.time.Duration.ofMillis;
import static java.time.Duration.ofSeconds;
import static java.time.Instant.ofEpochMilli;
/**
* Class that provides support for a series of benchmarks. It is usually driven by
* tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py.
* If ran manually through the main() function below, you must do the following:
* 1. Have ZK and a Kafka broker set up
* 2. Run the loading step first: SimpleBenchmark localhost:9092 /tmp/statedir numRecords true "all"
* 3. Run the stream processing step second: SimpleBenchmark localhost:9092 /tmp/statedir numRecords false "all"
* Note that what changed is the 4th parameter, from "true" indicating that is a load phase, to "false" indicating
* that this is a real run.
*
* Note that "all" is a convenience option when running this test locally and will not work when running the test
* at scale (through tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py). That is due to exact syncronization
* needs for each test (e.g., you wouldn't want one instance to run "count" while another
* is still running "consume"
*/
public class SimpleBenchmark {
private static final String LOADING_PRODUCER_CLIENT_ID = "simple-benchmark-loading-producer";
private static final String SOURCE_TOPIC_ONE = "simpleBenchmarkSourceTopic1";
private static final String SOURCE_TOPIC_TWO = "simpleBenchmarkSourceTopic2";
private static final String SINK_TOPIC = "simpleBenchmarkSinkTopic";
private static final String YAHOO_CAMPAIGNS_TOPIC = "yahooCampaigns";
private static final String YAHOO_EVENTS_TOPIC = "yahooEvents";
private static final ValueJoiner<byte[], byte[], byte[]> VALUE_JOINER = new ValueJoiner<byte[], byte[], byte[]>() {
@Override
public byte[] apply(final byte[] value1, final byte[] value2) {
// dump joiner in order to have as less join overhead as possible
if (value1 != null) {
return value1;
} else if (value2 != null) {
return value2;
} else {
return new byte[100];
}
}
};
private static final Serde<byte[]> BYTE_SERDE = Serdes.ByteArray();
private static final Serde<Integer> INTEGER_SERDE = Serdes.Integer();
long processedBytes = 0L;
int processedRecords = 0;
private static final long POLL_MS = 500L;
private static final long COMMIT_INTERVAL_MS = 30000L;
private static final int MAX_POLL_RECORDS = 1000;
/* ----------- benchmark variables that are hard-coded ----------- */
private static final int KEY_SPACE_SIZE = 10000;
private static final long STREAM_STREAM_JOIN_WINDOW = 10000L;
private static final long AGGREGATE_WINDOW_SIZE = 1000L;
private static final long AGGREGATE_WINDOW_ADVANCE = 500L;
private static final int SOCKET_SIZE_BYTES = 1024 * 1024;
// the following numbers are based on empirical results and should only
// be considered for updates when perf results have significantly changed
// with at least 10 million records, we run for at most 3 minutes
private static final int MAX_WAIT_MS = 3 * 60 * 1000;
/* ----------- benchmark variables that can be specified ----------- */
final String testName;
final int numRecords;
final Properties props;
private final int valueSize;
private final double keySkew;
/* ----------- ----------------------------------------- ----------- */
private SimpleBenchmark(final Properties props,
final String testName,
final int numRecords,
final double keySkew,
final int valueSize) {
super();
this.props = props;
this.testName = testName;
this.keySkew = keySkew;
this.valueSize = valueSize;
this.numRecords = numRecords;
}
private void run() {
switch (testName) {
// loading phases
case "load-one":
produce(LOADING_PRODUCER_CLIENT_ID, SOURCE_TOPIC_ONE, numRecords, keySkew, valueSize);
break;
case "load-two":
produce(LOADING_PRODUCER_CLIENT_ID, SOURCE_TOPIC_ONE, numRecords, keySkew, valueSize);
produce(LOADING_PRODUCER_CLIENT_ID, SOURCE_TOPIC_TWO, numRecords, keySkew, valueSize);
break;
// testing phases
case "consume":
consume(SOURCE_TOPIC_ONE);
break;
case "consumeproduce":
consumeAndProduce(SOURCE_TOPIC_ONE);
break;
case "streamcount":
countStreamsNonWindowed(SOURCE_TOPIC_ONE);
break;
case "streamcountwindowed":
countStreamsWindowed(SOURCE_TOPIC_ONE);
break;
case "streamprocess":
processStream(SOURCE_TOPIC_ONE);
break;
case "streamprocesswithsink":
processStreamWithSink(SOURCE_TOPIC_ONE);
break;
case "streamprocesswithstatestore":
processStreamWithStateStore(SOURCE_TOPIC_ONE);
break;
case "streamprocesswithwindowstore":
processStreamWithWindowStore(SOURCE_TOPIC_ONE);
break;
case "streamtablejoin":
streamTableJoin(SOURCE_TOPIC_ONE, SOURCE_TOPIC_TWO);
break;
case "streamstreamjoin":
streamStreamJoin(SOURCE_TOPIC_ONE, SOURCE_TOPIC_TWO);
break;
case "tabletablejoin":
tableTableJoin(SOURCE_TOPIC_ONE, SOURCE_TOPIC_TWO);
break;
case "yahoo":
yahooBenchmark(YAHOO_CAMPAIGNS_TOPIC, YAHOO_EVENTS_TOPIC);
break;
default:
throw new RuntimeException("Unknown test name " + testName);
}
}
public static void main(final String[] args) throws IOException {
if (args.length < 5) {
System.err.println("Not enough parameters are provided; expecting propFileName, testName, numRecords, keySkew, valueSize");
System.exit(1);
}
final String propFileName = args[0];
final String testName = args[1].toLowerCase(Locale.ROOT);
final int numRecords = Integer.parseInt(args[2]);
final double keySkew = Double.parseDouble(args[3]); // 0d means even distribution
final int valueSize = Integer.parseInt(args[4]);
final Properties props = Utils.loadProps(propFileName);
final String kafka = props.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
if (kafka == null) {
System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
System.exit(1);
}
// Note: this output is needed for automated tests and must not be removed
System.out.println("StreamsTest instance started");
System.out.println("testName=" + testName);
System.out.println("streamsProperties=" + props);
System.out.println("numRecords=" + numRecords);
System.out.println("keySkew=" + keySkew);
System.out.println("valueSize=" + valueSize);
final SimpleBenchmark benchmark = new SimpleBenchmark(props, testName, numRecords, keySkew, valueSize);
benchmark.run();
}
public void setStreamProperties(final String applicationId) {
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.CLIENT_ID_CONFIG, "simple-benchmark");
props.put(StreamsConfig.POLL_MS_CONFIG, POLL_MS);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass());
// the socket buffer needs to be large, especially when running in AWS with
// high latency. if running locally the default is fine.
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
// improve producer throughput
props.put(ProducerConfig.LINGER_MS_CONFIG, 5000);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 128 * 1024);
}
private Properties setProduceConsumeProperties(final String clientId) {
final Properties clientProps = new Properties();
clientProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, props.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
clientProps.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
// the socket buffer needs to be large, especially when running in AWS with
// high latency. if running locally the default is fine.
clientProps.put(ProducerConfig.LINGER_MS_CONFIG, 5000);
clientProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 128 * 1024);
clientProps.put(ProducerConfig.SEND_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
clientProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
clientProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
clientProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
clientProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
clientProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// the socket buffer needs to be large, especially when running in AWS with
// high latency. if running locally the default is fine.
clientProps.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
clientProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
return clientProps;
}
void resetStats() {
processedRecords = 0;
processedBytes = 0L;
}
/**
* Produce values to a topic
* @param clientId String specifying client ID
* @param topic Topic to produce to
* @param numRecords Number of records to produce
* @param keySkew Key zipf distribution skewness
* @param valueSize Size of value in bytes
*/
private void produce(final String clientId,
final String topic,
final int numRecords,
final double keySkew,
final int valueSize) {
final Properties props = setProduceConsumeProperties(clientId);
final ZipfGenerator keyGen = new ZipfGenerator(KEY_SPACE_SIZE, keySkew);
try (final KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(props)) {
final byte[] value = new byte[valueSize];
// put some random values to increase entropy. Some devices
// like SSDs do compression and if the array is all zeros
// the performance will be too good.
new Random(System.currentTimeMillis()).nextBytes(value);
for (int i = 0; i < numRecords; i++) {
producer.send(new ProducerRecord<>(topic, keyGen.next(), value));
}
}
}
private void consumeAndProduce(final String topic) {
final Properties consumerProps = setProduceConsumeProperties("simple-benchmark-consumer");
final Properties producerProps = setProduceConsumeProperties("simple-benchmark-producer");
final long startTime = System.currentTimeMillis();
try (final KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps);
final KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(producerProps)) {
final List<TopicPartition> partitions = getAllPartitions(consumer, topic);
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
while (true) {
final ConsumerRecords<Integer, byte[]> records = consumer.poll(ofMillis(POLL_MS));
if (records.isEmpty()) {
if (processedRecords == numRecords) {
break;
}
} else {
for (final ConsumerRecord<Integer, byte[]> record : records) {
producer.send(new ProducerRecord<>(SINK_TOPIC, record.key(), record.value()));
processedRecords++;
processedBytes += record.value().length + Integer.SIZE;
if (processedRecords == numRecords) {
break;
}
}
}
if (processedRecords == numRecords) {
break;
}
}
}
final long endTime = System.currentTimeMillis();
printResults("ConsumerProducer Performance [records/latency/rec-sec/MB-sec read]: ", endTime - startTime);
}
private void consume(final String topic) {
final Properties consumerProps = setProduceConsumeProperties("simple-benchmark-consumer");
final long startTime = System.currentTimeMillis();
try (final KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps)) {
final List<TopicPartition> partitions = getAllPartitions(consumer, topic);
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
while (true) {
final ConsumerRecords<Integer, byte[]> records = consumer.poll(ofMillis(POLL_MS));
if (records.isEmpty()) {
if (processedRecords == numRecords) {
break;
}
} else {
for (final ConsumerRecord<Integer, byte[]> record : records) {
processedRecords++;
processedBytes += record.value().length + Integer.SIZE;
if (processedRecords == numRecords) {
break;
}
}
}
if (processedRecords == numRecords) {
break;
}
}
}
final long endTime = System.currentTimeMillis();
printResults("Consumer Performance [records/latency/rec-sec/MB-sec read]: ", endTime - startTime);
}
private void processStream(final String topic) {
final CountDownLatch latch = new CountDownLatch(1);
setStreamProperties("simple-benchmark-streams-source");
final StreamsBuilder builder = new StreamsBuilder();
builder.stream(topic, Consumed.with(INTEGER_SERDE, BYTE_SERDE)).peek(new CountDownAction(latch));
final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
runGenericBenchmark(streams, "Streams Source Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
}
private void processStreamWithSink(final String topic) {
final CountDownLatch latch = new CountDownLatch(1);
setStreamProperties("simple-benchmark-streams-source-sink");
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Integer, byte[]> source = builder.stream(topic);
source.peek(new CountDownAction(latch)).to(SINK_TOPIC);
final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
runGenericBenchmark(streams, "Streams SourceSink Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
}
private void processStreamWithStateStore(final String topic) {
final CountDownLatch latch = new CountDownLatch(1);
setStreamProperties("simple-benchmark-streams-with-store");
final StreamsBuilder builder = new StreamsBuilder();
final StoreBuilder<KeyValueStore<Integer, byte[]>> storeBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("store"), INTEGER_SERDE, BYTE_SERDE);
builder.addStateStore(storeBuilder.withCachingEnabled());
final KStream<Integer, byte[]> source = builder.stream(topic);
source.peek(new CountDownAction(latch)).process(new ProcessorSupplier<Integer, byte[]>() {
@Override
public Processor<Integer, byte[]> get() {
return new AbstractProcessor<Integer, byte[]>() {
KeyValueStore<Integer, byte[]> store;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
store = (KeyValueStore<Integer, byte[]>) context.getStateStore("store");
}
@Override
public void process(final Integer key, final byte[] value) {
store.get(key);
store.put(key, value);
}
};
}
}, "store");
final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
runGenericBenchmark(streams, "Streams Stateful Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
}
private void processStreamWithWindowStore(final String topic) {
final CountDownLatch latch = new CountDownLatch(1);
setStreamProperties("simple-benchmark-streams-with-store");
final StreamsBuilder builder = new StreamsBuilder();
final StoreBuilder<WindowStore<Integer, byte[]>> storeBuilder = Stores.windowStoreBuilder(
Stores.persistentWindowStore(
"store",
ofMillis(AGGREGATE_WINDOW_SIZE * 3),
ofMillis(AGGREGATE_WINDOW_SIZE),
false
),
INTEGER_SERDE,
BYTE_SERDE
);
builder.addStateStore(storeBuilder.withCachingEnabled());
final KStream<Integer, byte[]> source = builder.stream(topic);
source.peek(new CountDownAction(latch)).process(new ProcessorSupplier<Integer, byte[]>() {
@Override
public Processor<Integer, byte[]> get() {
return new AbstractProcessor<Integer, byte[]>() {
WindowStore<Integer, byte[]> store;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
super.init(context);
store = (WindowStore<Integer, byte[]>) context.getStateStore("store");
}
@Override
public void process(final Integer key, final byte[] value) {
final long timestamp = context().timestamp();
final KeyValueIterator<Windowed<Integer>, byte[]> iter = store.fetch(key - 10, key + 10, ofEpochMilli(timestamp - 1000L), ofEpochMilli(timestamp));
while (iter.hasNext()) {
iter.next();
}
iter.close();
store.put(key, value, timestamp);
}
};
}
}, "store");
final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
runGenericBenchmark(streams, "Streams Stateful Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
}
/**
* Measure the performance of a simple aggregate like count.
* Counts the occurrence of numbers (note that normally people count words, this
* example counts numbers)
*/
private void countStreamsNonWindowed(final String sourceTopic) {
final CountDownLatch latch = new CountDownLatch(1);
setStreamProperties("simple-benchmark-nonwindowed-count");
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Integer, byte[]> input = builder.stream(sourceTopic);
input.peek(new CountDownAction(latch))
.groupByKey()
.count();
final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
runGenericBenchmark(streams, "Streams Count Performance [records/latency/rec-sec/MB-sec counted]: ", latch);
}
/**
* Measure the performance of a simple aggregate like count.
* Counts the occurrence of numbers (note that normally people count words, this
* example counts numbers)
*/
private void countStreamsWindowed(final String sourceTopic) {
final CountDownLatch latch = new CountDownLatch(1);
setStreamProperties("simple-benchmark-windowed-count");
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Integer, byte[]> input = builder.stream(sourceTopic);
input.peek(new CountDownAction(latch))
.groupByKey()
.windowedBy(TimeWindows.of(ofMillis(AGGREGATE_WINDOW_SIZE)).advanceBy(ofMillis(AGGREGATE_WINDOW_ADVANCE)))
.count();
final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
runGenericBenchmark(streams, "Streams Count Windowed Performance [records/latency/rec-sec/MB-sec counted]: ", latch);
}
/**
* Measure the performance of a KStream-KTable left join. The setup is such that each
* KStream record joins to exactly one element in the KTable
*/
private void streamTableJoin(final String kStreamTopic, final String kTableTopic) {
final CountDownLatch latch = new CountDownLatch(1);
setStreamProperties("simple-benchmark-stream-table-join");
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Integer, byte[]> input1 = builder.stream(kStreamTopic);
final KTable<Integer, byte[]> input2 = builder.table(kTableTopic);
input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch));
final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
// run benchmark
runGenericBenchmark(streams, "Streams KStreamKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
}
/**
* Measure the performance of a KStream-KStream left join. The setup is such that each
* KStream record joins to exactly one element in the other KStream
*/
private void streamStreamJoin(final String kStreamTopic1, final String kStreamTopic2) {
final CountDownLatch latch = new CountDownLatch(1);
setStreamProperties("simple-benchmark-stream-stream-join");
final StreamsBuilder builder = new StreamsBuilder();
final KStream<Integer, byte[]> input1 = builder.stream(kStreamTopic1);
final KStream<Integer, byte[]> input2 = builder.stream(kStreamTopic2);
input1.leftJoin(input2, VALUE_JOINER, JoinWindows.of(ofMillis(STREAM_STREAM_JOIN_WINDOW))).foreach(new CountDownAction(latch));
final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
// run benchmark
runGenericBenchmark(streams, "Streams KStreamKStream LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
}
/**
* Measure the performance of a KTable-KTable left join. The setup is such that each
* KTable record joins to exactly one element in the other KTable
*/
private void tableTableJoin(final String kTableTopic1, final String kTableTopic2) {
final CountDownLatch latch = new CountDownLatch(1);
// setup join
setStreamProperties("simple-benchmark-table-table-join");
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Integer, byte[]> input1 = builder.table(kTableTopic1);
final KTable<Integer, byte[]> input2 = builder.table(kTableTopic2);
input1.leftJoin(input2, VALUE_JOINER).toStream().foreach(new CountDownAction(latch));
final KafkaStreams streams = createKafkaStreamsWithExceptionHandler(builder, props);
// run benchmark
runGenericBenchmark(streams, "Streams KTableKTable LeftJoin Performance [records/latency/rec-sec/MB-sec joined]: ", latch);
}
void printResults(final String nameOfBenchmark, final long latency) {
System.out.println(nameOfBenchmark +
processedRecords + "/" +
latency + "/" +
recordsPerSec(latency, processedRecords) + "/" +
megabytesPerSec(latency, processedBytes));
}
void runGenericBenchmark(final KafkaStreams streams, final String nameOfBenchmark, final CountDownLatch latch) {
streams.start();
final long startTime = System.currentTimeMillis();
long endTime = startTime;
while (latch.getCount() > 0 && (endTime - startTime < MAX_WAIT_MS)) {
try {
latch.await(1000, TimeUnit.MILLISECONDS);
} catch (final InterruptedException ex) {
Thread.interrupted();
}
endTime = System.currentTimeMillis();
}
streams.close();
printResults(nameOfBenchmark, endTime - startTime);
}
private class CountDownAction implements ForeachAction<Integer, byte[]> {
private final CountDownLatch latch;
CountDownAction(final CountDownLatch latch) {
this.latch = latch;
}
@Override
public void apply(final Integer key, final byte[] value) {
processedRecords++;
processedBytes += Integer.SIZE + value.length;
if (processedRecords == numRecords) {
this.latch.countDown();
}
}
}
private KafkaStreams createKafkaStreamsWithExceptionHandler(final StreamsBuilder builder, final Properties props) {
final KafkaStreams streamsClient = new KafkaStreams(builder.build(), props);
streamsClient.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
streamsClient.close(ofSeconds(30));
}
});
return streamsClient;
}
private double megabytesPerSec(final long time, final long processedBytes) {
return (processedBytes / 1024.0 / 1024.0) / (time / 1000.0);
}
private double recordsPerSec(final long time, final int numRecords) {
return numRecords / (time / 1000.0);
}
private List<TopicPartition> getAllPartitions(final KafkaConsumer<?, ?> consumer, final String... topics) {
final ArrayList<TopicPartition> partitions = new ArrayList<>();
for (final String topic : topics) {
for (final PartitionInfo info : consumer.partitionsFor(topic)) {
partitions.add(new TopicPartition(info.topic(), info.partition()));
}
}
return partitions;
}
private void yahooBenchmark(final String campaignsTopic, final String eventsTopic) {
final YahooBenchmark benchmark = new YahooBenchmark(this, campaignsTopic, eventsTopic);
benchmark.run();
}
private class ZipfGenerator {
final private Random rand = new Random(System.currentTimeMillis());
final private int size;
final private double skew;
private double bottom = 0.0d;
ZipfGenerator(final int size, final double skew) {
this.size = size;
this.skew = skew;
for (int i = 1; i < size; i++) {
this.bottom += 1.0d / Math.pow(i, this.skew);
}
}
int next() {
if (skew == 0.0d) {
return rand.nextInt(size);
} else {
int rank;
double dice;
double frequency;
rank = rand.nextInt(size);
frequency = (1.0d / Math.pow(rank, this.skew)) / this.bottom;
dice = rand.nextDouble();
while (!(dice < frequency)) {
rank = rand.nextInt(size);
frequency = (1.0d / Math.pow(rank, this.skew)) / this.bottom;
dice = rand.nextDouble();
}
return rank;
}
}
}
}

View File

@ -1,306 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.perf;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
/**
* A basic DSL and data generation that emulates the behavior of the Yahoo Benchmark
* https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at
* Thanks to Michael Armbrust for providing the initial code for this benchmark in his blog:
* https://databricks.com/blog/2017/06/06/simple-super-fast-streaming-engine-apache-spark.html
*/
public class YahooBenchmark {
private final SimpleBenchmark parent;
private final String campaignsTopic;
private final String eventsTopic;
static class ProjectedEvent {
/* attributes need to be public for serializer to work */
/* main attributes */
String eventType;
String adID;
/* other attributes */
long eventTime;
/* not used
public String userID = UUID.randomUUID().toString();
public String pageID = UUID.randomUUID().toString();
public String addType = "banner78";
public String ipAddress = "1.2.3.4";
*/
}
static class CampaignAd {
/* attributes need to be public for serializer to work */
String adID;
String campaignID;
}
@SuppressWarnings("WeakerAccess")
public YahooBenchmark(final SimpleBenchmark parent, final String campaignsTopic, final String eventsTopic) {
this.parent = parent;
this.campaignsTopic = campaignsTopic;
this.eventsTopic = eventsTopic;
}
// just for Yahoo benchmark
private boolean maybeSetupPhaseCampaigns(final String topic,
final String clientId,
final boolean skipIfAllTests,
final int numCampaigns,
final int adsPerCampaign,
final List<String> ads) {
parent.resetStats();
// initialize topics
System.out.println("Initializing topic " + topic);
final Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, parent.props.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
try (final KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
for (int c = 0; c < numCampaigns; c++) {
final String campaignID = UUID.randomUUID().toString();
for (int a = 0; a < adsPerCampaign; a++) {
final String adId = UUID.randomUUID().toString();
final String concat = adId + ":" + campaignID;
producer.send(new ProducerRecord<>(topic, adId, concat));
ads.add(adId);
parent.processedRecords++;
parent.processedBytes += concat.length() + adId.length();
}
}
}
return true;
}
// just for Yahoo benchmark
private void maybeSetupPhaseEvents(final String topic,
final String clientId,
final int numRecords,
final List<String> ads) {
parent.resetStats();
final String[] eventTypes = new String[]{"view", "click", "purchase"};
final Random rand = new Random(System.currentTimeMillis());
System.out.println("Initializing topic " + topic);
final Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, parent.props.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG));
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
final long startTime = System.currentTimeMillis();
try (final KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props)) {
final ProjectedEvent event = new ProjectedEvent();
final Map<String, Object> serdeProps = new HashMap<>();
final Serializer<ProjectedEvent> projectedEventSerializer = new JsonPOJOSerializer<>();
serdeProps.put("JsonPOJOClass", ProjectedEvent.class);
projectedEventSerializer.configure(serdeProps, false);
for (int i = 0; i < numRecords; i++) {
event.eventType = eventTypes[rand.nextInt(eventTypes.length - 1)];
event.adID = ads.get(rand.nextInt(ads.size() - 1));
event.eventTime = System.currentTimeMillis();
final byte[] value = projectedEventSerializer.serialize(topic, event);
producer.send(new ProducerRecord<>(topic, event.adID, value));
parent.processedRecords++;
parent.processedBytes += value.length + event.adID.length();
}
}
final long endTime = System.currentTimeMillis();
parent.printResults("Producer Performance [records/latency/rec-sec/MB-sec write]: ", endTime - startTime);
}
public void run() {
final int numCampaigns = 100;
final int adsPerCampaign = 10;
final List<String> ads = new ArrayList<>(numCampaigns * adsPerCampaign);
maybeSetupPhaseCampaigns(campaignsTopic, "simple-benchmark-produce-campaigns", false, numCampaigns, adsPerCampaign, ads);
maybeSetupPhaseEvents(eventsTopic, "simple-benchmark-produce-events", parent.numRecords, ads);
final CountDownLatch latch = new CountDownLatch(1);
parent.setStreamProperties("simple-benchmark-yahoo" + new Random().nextInt());
final KafkaStreams streams = createYahooBenchmarkStreams(parent.props, campaignsTopic, eventsTopic, latch, parent.numRecords);
parent.runGenericBenchmark(streams, "Streams Yahoo Performance [records/latency/rec-sec/MB-sec counted]: ", latch);
}
// Note: these are also in the streams example package, eventually use 1 file
private class JsonPOJOSerializer<T> implements Serializer<T> {
private final ObjectMapper objectMapper = new ObjectMapper();
/**
* Default constructor needed by Kafka
*/
@SuppressWarnings("WeakerAccess")
public JsonPOJOSerializer() {}
@Override
public byte[] serialize(final String topic, final T data) {
if (data == null) {
return null;
}
try {
return objectMapper.writeValueAsBytes(data);
} catch (final Exception e) {
throw new SerializationException("Error serializing JSON message", e);
}
}
}
// Note: these are also in the streams example package, eventuall use 1 file
private class JsonPOJODeserializer<T> implements Deserializer<T> {
private final ObjectMapper objectMapper = new ObjectMapper();
private Class<T> tClass;
/**
* Default constructor needed by Kafka
*/
@SuppressWarnings("WeakerAccess")
public JsonPOJODeserializer() {}
@SuppressWarnings("unchecked")
@Override
public void configure(final Map<String, ?> props, final boolean isKey) {
tClass = (Class<T>) props.get("JsonPOJOClass");
}
@Override
public T deserialize(final String topic, final byte[] bytes) {
if (bytes == null) {
return null;
}
final T data;
try {
data = objectMapper.readValue(bytes, tClass);
} catch (final Exception e) {
throw new SerializationException(e);
}
return data;
}
}
private KafkaStreams createYahooBenchmarkStreams(final Properties streamConfig, final String campaignsTopic, final String eventsTopic,
final CountDownLatch latch, final int numRecords) {
final Map<String, Object> serdeProps = new HashMap<>();
final Serializer<ProjectedEvent> projectedEventSerializer = new JsonPOJOSerializer<>();
serdeProps.put("JsonPOJOClass", ProjectedEvent.class);
projectedEventSerializer.configure(serdeProps, false);
final Deserializer<ProjectedEvent> projectedEventDeserializer = new JsonPOJODeserializer<>();
serdeProps.put("JsonPOJOClass", ProjectedEvent.class);
projectedEventDeserializer.configure(serdeProps, false);
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, ProjectedEvent> kEvents = builder.stream(eventsTopic,
Consumed.with(Serdes.String(),
Serdes.serdeFrom(projectedEventSerializer, projectedEventDeserializer)));
final KTable<String, String> kCampaigns = builder.table(campaignsTopic, Consumed.with(Serdes.String(), Serdes.String()));
final KStream<String, ProjectedEvent> filteredEvents = kEvents
// use peek to quick when last element is processed
.peek((key, value) -> {
parent.processedRecords++;
if (parent.processedRecords % 1000000 == 0) {
System.out.println("Processed " + parent.processedRecords);
}
if (parent.processedRecords >= numRecords) {
latch.countDown();
}
})
// only keep "view" events
.filter((key, value) -> value.eventType.equals("view"))
// select just a few of the columns
.mapValues(value -> {
final ProjectedEvent event = new ProjectedEvent();
event.adID = value.adID;
event.eventTime = value.eventTime;
event.eventType = value.eventType;
return event;
});
// deserialize the add ID and campaign ID from the stored value in Kafka
final KTable<String, CampaignAd> deserCampaigns = kCampaigns.mapValues(value -> {
final String[] parts = value.split(":");
final CampaignAd cAdd = new CampaignAd();
cAdd.adID = parts[0];
cAdd.campaignID = parts[1];
return cAdd;
});
// join the events with the campaigns
final KStream<String, String> joined = filteredEvents.join(
deserCampaigns,
(value1, value2) -> value2.campaignID,
Joined.with(Serdes.String(), Serdes.serdeFrom(projectedEventSerializer, projectedEventDeserializer), null)
);
// key by campaign rather than by ad as original
final KStream<String, String> keyedByCampaign = joined
.selectKey((key, value) -> value);
// calculate windowed counts
keyedByCampaign
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofMillis(10 * 1000)))
.count(Materialized.as("time-windows"));
return new KafkaStreams(builder.build(), streamConfig);
}
}

View File

@ -1,14 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -1,164 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.tests.test import Test
from ducktape.mark.resource import cluster
from ducktape.mark import parametrize, matrix
from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.services.performance.streams_performance import StreamsSimpleBenchmarkService
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.version import DEV_BRANCH
STREAMS_SIMPLE_TESTS = ["streamprocess", "streamprocesswithsink", "streamprocesswithstatestore", "streamprocesswithwindowstore"]
STREAMS_COUNT_TESTS = ["streamcount", "streamcountwindowed"]
STREAMS_JOIN_TESTS = ["streamtablejoin", "streamstreamjoin", "tabletablejoin"]
NON_STREAMS_TESTS = ["consume", "consumeproduce"]
ALL_TEST = "all"
STREAMS_SIMPLE_TEST = "streams-simple"
STREAMS_COUNT_TEST = "streams-count"
STREAMS_JOIN_TEST = "streams-join"
class StreamsSimpleBenchmarkTest(Test):
"""
Simple benchmark of Kafka Streams.
"""
def __init__(self, test_context):
super(StreamsSimpleBenchmarkTest, self).__init__(test_context)
# these values could be updated in ad-hoc benchmarks
self.key_skew = 0
self.value_size = 1024
self.num_records = 10000000L
self.num_threads = 1
self.replication = 1
@cluster(num_nodes=12)
@matrix(test=["consume", "consumeproduce",
"streamprocess", "streamprocesswithsink", "streamprocesswithstatestore", "streamprocesswithwindowstore",
"streamcount", "streamcountwindowed",
"streamtablejoin", "streamstreamjoin", "tabletablejoin"],
scale=[1])
def test_simple_benchmark(self, test, scale):
"""
Run simple Kafka Streams benchmark
"""
self.driver = [None] * (scale + 1)
self.final = {}
#############
# SETUP PHASE
#############
self.zk = ZookeeperService(self.test_context, num_nodes=1)
self.zk.start()
self.kafka = KafkaService(self.test_context, num_nodes=scale, zk=self.zk, version=DEV_BRANCH, topics={
'simpleBenchmarkSourceTopic1' : { 'partitions': scale, 'replication-factor': self.replication },
'simpleBenchmarkSourceTopic2' : { 'partitions': scale, 'replication-factor': self.replication },
'simpleBenchmarkSinkTopic' : { 'partitions': scale, 'replication-factor': self.replication },
'yahooCampaigns' : { 'partitions': 20, 'replication-factor': self.replication },
'yahooEvents' : { 'partitions': 20, 'replication-factor': self.replication }
})
self.kafka.log_level = "INFO"
self.kafka.start()
load_test = ""
if test == ALL_TEST:
load_test = "load-two"
if test in STREAMS_JOIN_TESTS or test == STREAMS_JOIN_TEST:
load_test = "load-two"
if test in STREAMS_COUNT_TESTS or test == STREAMS_COUNT_TEST:
load_test = "load-one"
if test in STREAMS_SIMPLE_TESTS or test == STREAMS_SIMPLE_TEST:
load_test = "load-one"
if test in NON_STREAMS_TESTS:
load_test = "load-one"
################
# LOAD PHASE
################
self.load_driver = StreamsSimpleBenchmarkService(self.test_context,
self.kafka,
load_test,
self.num_threads,
self.num_records,
self.key_skew,
self.value_size)
self.load_driver.start()
self.load_driver.wait(3600) # wait at most 30 minutes
self.load_driver.stop()
if test == ALL_TEST:
for single_test in STREAMS_SIMPLE_TESTS + STREAMS_COUNT_TESTS + STREAMS_JOIN_TESTS:
self.execute(single_test, scale)
elif test == STREAMS_SIMPLE_TEST:
for single_test in STREAMS_SIMPLE_TESTS:
self.execute(single_test, scale)
elif test == STREAMS_COUNT_TEST:
for single_test in STREAMS_COUNT_TESTS:
self.execute(single_test, scale)
elif test == STREAMS_JOIN_TEST:
for single_test in STREAMS_JOIN_TESTS:
self.execute(single_test, scale)
else:
self.execute(test, scale)
return self.final
def execute(self, test, scale):
################
# RUN PHASE
################
for num in range(0, scale):
self.driver[num] = StreamsSimpleBenchmarkService(self.test_context,
self.kafka,
test,
self.num_threads,
self.num_records,
self.key_skew,
self.value_size)
self.driver[num].start()
#######################
# STOP + COLLECT PHASE
#######################
data = [None] * (scale)
for num in range(0, scale):
self.driver[num].wait()
self.driver[num].stop()
self.driver[num].node.account.ssh("grep Performance %s" % self.driver[num].STDOUT_FILE, allow_fail=False)
data[num] = self.driver[num].collect_data(self.driver[num].node, "")
self.driver[num].read_jmx_output_all_nodes()
for num in range(0, scale):
for key in data[num]:
self.final[key + "-" + str(num)] = data[num][key]
for key in sorted(self.driver[num].jmx_stats[0]):
self.logger.info("%s: %s" % (key, self.driver[num].jmx_stats[0][key]))
self.final[test + "-jmx-avg-" + str(num)] = self.driver[num].average_jmx_value
self.final[test + "-jmx-max-" + str(num)] = self.driver[num].maximum_jmx_value

View File

@ -1,108 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.services.streams import StreamsTestBaseService
from kafkatest.services.kafka import KafkaConfig
from kafkatest.services import streams_property
#
# Class used to start the simple Kafka Streams benchmark
#
class StreamsSimpleBenchmarkService(StreamsTestBaseService):
"""Base class for simple Kafka Streams benchmark"""
def __init__(self, test_context, kafka, test_name, num_threads, num_recs_or_wait_ms, key_skew, value_size):
super(StreamsSimpleBenchmarkService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.perf.SimpleBenchmark",
test_name,
num_recs_or_wait_ms,
key_skew,
value_size)
self.jmx_option = ""
if test_name.startswith('stream') or test_name.startswith('table'):
self.jmx_option = "stream-jmx"
JmxMixin.__init__(self,
num_nodes=1,
jmx_object_names=['kafka.streams:type=stream-thread-metrics,thread-id=simple-benchmark-StreamThread-%d' %(i+1) for i in range(num_threads)],
jmx_attributes=['process-latency-avg',
'process-rate',
'commit-latency-avg',
'commit-rate',
'poll-latency-avg',
'poll-rate'],
root=StreamsTestBaseService.PERSISTENT_ROOT)
if test_name.startswith('consume'):
self.jmx_option = "consumer-jmx"
JmxMixin.__init__(self,
num_nodes=1,
jmx_object_names=['kafka.consumer:type=consumer-fetch-manager-metrics,client-id=simple-benchmark-consumer'],
jmx_attributes=['records-consumed-rate'],
root=StreamsTestBaseService.PERSISTENT_ROOT)
self.num_threads = num_threads
def prop_file(self):
cfg = KafkaConfig(**{streams_property.STATE_DIR: self.PERSISTENT_ROOT,
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
streams_property.NUM_THREADS: self.num_threads})
return cfg.render()
def start_cmd(self, node):
if self.jmx_option != "":
args = self.args.copy()
args['jmx_port'] = self.jmx_port
args['config_file'] = self.CONFIG_FILE
args['stdout'] = self.STDOUT_FILE
args['stderr'] = self.STDERR_FILE
args['pidfile'] = self.PID_FILE
args['log4j'] = self.LOG4J_CONFIG_FILE
args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
cmd = "( export JMX_PORT=%(jmx_port)s; export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
"INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
" %(config_file)s %(user_test_args1)s %(user_test_args2)s %(user_test_args3)s" \
" %(user_test_args4)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
else:
cmd = super(StreamsSimpleBenchmarkService, self).start_cmd(node)
return cmd
def start_node(self, node):
super(StreamsSimpleBenchmarkService, self).start_node(node)
if self.jmx_option != "":
self.start_jmx_tool(1, node)
def clean_node(self, node):
if self.jmx_option != "":
JmxMixin.clean_node(self, node)
super(StreamsSimpleBenchmarkService, self).clean_node(node)
def collect_data(self, node, tag = None):
# Collect the data and return it to the framework
output = node.account.ssh_capture("grep Performance %s" % self.STDOUT_FILE)
data = {}
for line in output:
parts = line.split(':')
data[tag + parts[0]] = parts[1]
return data