KAFKA-2802: kafka streams system tests

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Geoff Anderson <geoff@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #930 from ymatsuda/streams_systest
This commit is contained in:
Yasuhiro Matsuda 2016-02-23 12:14:26 -08:00 committed by Ewen Cheslack-Postava
parent 68af16ac15
commit 3358e1682f
11 changed files with 1343 additions and 0 deletions

View File

@ -52,6 +52,11 @@ do
CLASSPATH=$CLASSPATH:$file
done
for file in $base_dir/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar;
do
CLASSPATH=$CLASSPATH:$file
done
for file in $base_dir/tools/build/libs/kafka-tools*.jar;
do
CLASSPATH=$CLASSPATH:$file

23
bin/streams-smoke-test.sh Executable file
View File

@ -0,0 +1,23 @@
#!/bin/sh
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
base_dir=$(dirname $0)
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/tools-log4j.properties"
fi
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.streams.smoketest.StreamsSmokeTest "$@"

View File

@ -541,6 +541,10 @@ project(':streams') {
jar {
dependsOn 'copyDependantLibs'
}
systemTestLibs {
dependsOn testJar
}
}
project(':streams:examples') {

View File

@ -0,0 +1,241 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.smoketest;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.TumblingWindows;
import org.apache.kafka.streams.kstream.UnlimitedWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Windowed;
import java.io.File;
import java.util.Properties;
public class SmokeTestClient extends SmokeTestUtil {
private final String kafka;
private final String zookeeper;
private final File stateDir;
private KafkaStreams streams;
private Thread thread;
public SmokeTestClient(File stateDir, String kafka, String zookeeper) {
super();
this.stateDir = stateDir;
this.kafka = kafka;
this.zookeeper = zookeeper;
}
public void start() {
streams = createKafkaStreams(stateDir, kafka, zookeeper);
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
e.printStackTrace();
}
});
thread = new Thread() {
public void run() {
streams.start();
}
};
thread.start();
}
public void close() {
streams.close();
try {
thread.join();
} catch (Exception ex) {
// ignore
}
}
private static KafkaStreams createKafkaStreams(File stateDir, String kafka, String zookeeper) {
Properties props = new Properties();
props.put(StreamsConfig.JOB_ID_CONFIG, "SmokeTest");
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TestTimestampExtractor.class);
props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStreamBuilder builder = new KStreamBuilder();
KStream<String, Integer> source = builder.stream(stringDeserializer, integerDeserializer, "data");
source.to("echo", stringSerializer, integerSerializer);
KStream<String, Integer> data = source.filter(new Predicate<String, Integer>() {
@Override
public boolean test(String key, Integer value) {
return value == null || value != END;
}
});
data.process(SmokeTestUtil.<Integer>printProcessorSupplier("data"));
// min
data.aggregateByKey(
new Initializer<Integer>() {
public Integer apply() {
return Integer.MAX_VALUE;
}
},
new Aggregator<String, Integer, Integer>() {
@Override
public Integer apply(String aggKey, Integer value, Integer aggregate) {
return (value < aggregate) ? value : aggregate;
}
},
UnlimitedWindows.of("uwin-min"),
stringSerializer,
integerSerializer,
stringDeserializer,
integerDeserializer
).toStream().map(
new Unwindow<String, Integer>()
).to("min", stringSerializer, integerSerializer);
KTable<String, Integer> minTable = builder.table(stringSerializer, integerSerializer, stringDeserializer, integerDeserializer, "min");
minTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("min"));
// max
data.aggregateByKey(
new Initializer<Integer>() {
public Integer apply() {
return Integer.MIN_VALUE;
}
},
new Aggregator<String, Integer, Integer>() {
@Override
public Integer apply(String aggKey, Integer value, Integer aggregate) {
return (value > aggregate) ? value : aggregate;
}
},
UnlimitedWindows.of("uwin-max"),
stringSerializer,
integerSerializer,
stringDeserializer,
integerDeserializer
).toStream().map(
new Unwindow<String, Integer>()
).to("max", stringSerializer, integerSerializer);
KTable<String, Integer> maxTable = builder.table(stringSerializer, integerSerializer, stringDeserializer, integerDeserializer, "max");
maxTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("max"));
// sum
data.aggregateByKey(
new Initializer<Long>() {
public Long apply() {
return 0L;
}
},
new Aggregator<String, Integer, Long>() {
@Override
public Long apply(String aggKey, Integer value, Long aggregate) {
return (long) value + aggregate;
}
},
UnlimitedWindows.of("win-sum"),
stringSerializer,
longSerializer,
stringDeserializer,
longDeserializer
).toStream().map(
new Unwindow<String, Long>()
).to("sum", stringSerializer, longSerializer);
KTable<String, Long> sumTable = builder.table(stringSerializer, longSerializer, stringDeserializer, longDeserializer, "sum");
sumTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("sum"));
// cnt
data.countByKey(
UnlimitedWindows.of("uwin-cnt"),
stringSerializer,
longSerializer,
stringDeserializer,
longDeserializer
).toStream().map(
new Unwindow<String, Long>()
).to("cnt", stringSerializer, longSerializer);
KTable<String, Long> cntTable = builder.table(stringSerializer, longSerializer, stringDeserializer, longDeserializer, "cnt");
cntTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("cnt"));
// dif
maxTable.join(minTable,
new ValueJoiner<Integer, Integer, Integer>() {
public Integer apply(Integer value1, Integer value2) {
return value1 - value2;
}
}
).to("dif", stringSerializer, integerSerializer);
// avg
sumTable.join(
cntTable,
new ValueJoiner<Long, Long, Double>() {
public Double apply(Long value1, Long value2) {
return (double) value1 / (double) value2;
}
}
).to("avg", stringSerializer, doubleSerializer);
// windowed count
data.countByKey(
TumblingWindows.of("tumbling-win-cnt").with(WINDOW_SIZE),
stringSerializer,
longSerializer,
stringDeserializer,
longDeserializer
).toStream().map(
new KeyValueMapper<Windowed<String>, Long, KeyValue<String, Long>>() {
@Override
public KeyValue<String, Long> apply(Windowed<String> key, Long value) {
return new KeyValue<>(key.value() + "@" + key.window().start(), value);
}
}
).to("wcnt", stringSerializer, longSerializer);
return new KafkaStreams(builder, props);
}
}

View File

@ -0,0 +1,495 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.smoketest;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
public class SmokeTestDriver extends SmokeTestUtil {
private static class ValueList {
public final String key;
private final int[] values;
private int index;
ValueList(int min, int max) {
this.key = min + "-" + max;
this.values = new int[max - min + 1];
for (int i = 0; i < this.values.length; i++) {
this.values[i] = min + i;
}
// We want to randomize the order of data to test not completely predictable processing order
// However, values are also use as a timestamp of the record. (TODO: separate data and timestamp)
// We keep some correlation of time and order. Thus, the shuffling is done with a sliding window
shuffle(this.values, 10);
this.index = 0;
}
int next() {
return (index < values.length) ? values[index++] : -1;
}
}
// This main() is not used by the system test. It is intended to be used for local debugging.
public static void main(String[] args) throws Exception {
final String kafka = "localhost:9092";
final String zookeeper = "localhost:2181";
final File stateDir = createDir("/tmp/kafka-streams-smoketest");
final int numKeys = 10;
final int maxRecordsPerKey = 500;
Thread driver = new Thread() {
public void run() {
try {
Map<String, Set<Integer>> allData = generate(kafka, numKeys, maxRecordsPerKey);
verify(kafka, allData, maxRecordsPerKey);
} catch (Exception ex) {
ex.printStackTrace();
}
}
};
SmokeTestClient streams1 = new SmokeTestClient(createDir(stateDir, "1"), kafka, zookeeper);
SmokeTestClient streams2 = new SmokeTestClient(createDir(stateDir, "2"), kafka, zookeeper);
SmokeTestClient streams3 = new SmokeTestClient(createDir(stateDir, "3"), kafka, zookeeper);
SmokeTestClient streams4 = new SmokeTestClient(createDir(stateDir, "4"), kafka, zookeeper);
System.out.println("starting the driver");
driver.start();
System.out.println("starting the first and second client");
streams1.start();
streams2.start();
sleep(10000);
System.out.println("starting the third client");
streams3.start();
System.out.println("closing the first client");
streams1.close();
System.out.println("closed the first client");
sleep(10000);
System.out.println("starting the forth client");
streams4.start();
driver.join();
System.out.println("driver stopped");
streams2.close();
streams3.close();
streams4.close();
System.out.println("shutdown");
}
public static Map<String, Set<Integer>> generate(String kafka, final int numKeys, final int maxRecordsPerKey) throws Exception {
Properties props = new Properties();
props.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);
int numRecordsProduced = 0;
Map<String, Set<Integer>> allData = new HashMap<>();
ValueList[] data = new ValueList[numKeys];
for (int i = 0; i < numKeys; i++) {
data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
allData.put(data[i].key, new HashSet<Integer>());
}
Random rand = new Random();
int remaining = data.length;
while (remaining > 0) {
int index = rand.nextInt(remaining);
String key = data[index].key;
int value = data[index].next();
if (value < 0) {
remaining--;
data[index] = data[remaining];
value = END;
}
ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>("data", stringSerializer.serialize("", key), integerSerializer.serialize("", value));
producer.send(record);
if (value != END) {
numRecordsProduced++;
allData.get(key).add(value);
if (numRecordsProduced % 100 == 0)
System.out.println(numRecordsProduced + " records produced");
Thread.sleep(10);
}
}
producer.close();
return Collections.unmodifiableMap(allData);
}
private static void shuffle(int[] data, int windowSize) {
Random rand = new Random();
for (int i = 0; i < data.length; i++) {
// we shuffle data within windowSize
int j = rand.nextInt(Math.min(data.length - i, windowSize)) + i;
// swap
int tmp = data[i];
data[i] = data[j];
data[j] = tmp;
}
}
public static void verify(String kafka, Map<String, Set<Integer>> allData, int maxRecordsPerKey) {
Properties props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
List<TopicPartition> partitions = getAllPartitions(consumer, "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt");
consumer.assign(partitions);
consumer.seekToBeginning(partitions.toArray(new TopicPartition[partitions.size()]));
final int recordsGenerated = allData.size() * maxRecordsPerKey;
int recordsProcessed = 0;
HashMap<String, Integer> max = new HashMap<>();
HashMap<String, Integer> min = new HashMap<>();
HashMap<String, Integer> dif = new HashMap<>();
HashMap<String, Long> sum = new HashMap<>();
HashMap<String, Long> cnt = new HashMap<>();
HashMap<String, Double> avg = new HashMap<>();
HashMap<String, Long> wcnt = new HashMap<>();
HashSet<String> keys = new HashSet<>();
HashMap<String, Set<Integer>> received = new HashMap<>();
for (String key : allData.keySet()) {
keys.add(key);
received.put(key, new HashSet<Integer>());
}
int retryCount = 0;
int maxRetry = 240; // max two minutes (500ms * 240) (before we reach the end of records)
while (true) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
if (records.isEmpty()) {
retryCount++;
if (retryCount > maxRetry) break;
} else {
retryCount = 0;
for (ConsumerRecord<byte[], byte[]> record : records) {
String key = stringDeserializer.deserialize("", record.key());
switch (record.topic()) {
case "echo":
Integer value = integerDeserializer.deserialize("", record.value());
if (value != null && value == END) {
keys.remove(key);
if (keys.isEmpty()) {
// we reached the end of records, set retry to 60 (max 30 seconds)
maxRetry = 60;
}
} else {
recordsProcessed++;
received.get(key).add(value);
}
break;
case "min":
min.put(key, integerDeserializer.deserialize("", record.value()));
break;
case "max":
max.put(key, integerDeserializer.deserialize("", record.value()));
break;
case "dif":
dif.put(key, integerDeserializer.deserialize("", record.value()));
break;
case "sum":
sum.put(key, longDeserializer.deserialize("", record.value()));
break;
case "cnt":
cnt.put(key, longDeserializer.deserialize("", record.value()));
break;
case "avg":
avg.put(key, doubleDeserializer.deserialize("", record.value()));
break;
case "wcnt":
wcnt.put(key, longDeserializer.deserialize("", record.value()));
break;
default:
System.out.println("unknown topic: " + record.topic());
}
}
}
}
System.out.println("-------------------");
System.out.println("Result Verification");
System.out.println("-------------------");
System.out.println("recordGenerated=" + recordsGenerated);
System.out.println("recordProcessed=" + recordsProcessed);
if (recordsProcessed > recordsGenerated) {
System.out.println("PROCESSED-MORE-THAN-GENERATED");
} else if (recordsProcessed < recordsGenerated) {
System.out.println("PROCESSED-LESS-THAN-GENERATED");
}
boolean success;
success = allData.equals(received);
if (success) {
System.out.println("ALL-RECORDS-DELIVERED");
} else {
int missedCount = 0;
for (Map.Entry<String, Set<Integer>> entry : allData.entrySet()) {
missedCount += received.get(entry.getKey()).size();
}
System.out.println("missedRecords=" + missedCount);
}
success &= verifyMin(min);
success &= verifyMax(max);
success &= verifyDif(dif);
success &= verifySum(sum);
success &= verifyCnt(cnt);
success &= verifyAvg(avg);
success &= verifyWCnt(wcnt);
System.out.println(success ? "SUCCESS" : "FAILURE");
}
private static boolean verifyMin(Map<String, Integer> map) {
boolean success = true;
if (map.isEmpty()) {
System.out.println("min is empty");
success = false;
} else {
System.out.println("verifying min");
for (Map.Entry<String, Integer> entry : map.entrySet()) {
int expected = getMin(entry.getKey());
if (expected != entry.getValue()) {
System.out.println("fail: key=" + entry.getKey() + " min=" + entry.getValue() + " expected=" + expected);
success = false;
}
}
}
return success;
}
private static boolean verifyMax(Map<String, Integer> map) {
boolean success = true;
if (map.isEmpty()) {
System.out.println("max is empty");
success = false;
} else {
System.out.println("verifying max");
for (Map.Entry<String, Integer> entry : map.entrySet()) {
int expected = getMax(entry.getKey());
if (expected != entry.getValue()) {
System.out.println("fail: key=" + entry.getKey() + " max=" + entry.getValue() + " expected=" + expected);
success = false;
}
}
}
return success;
}
private static boolean verifyDif(Map<String, Integer> map) {
boolean success = true;
if (map.isEmpty()) {
System.out.println("dif is empty");
success = false;
} else {
System.out.println("verifying dif");
for (Map.Entry<String, Integer> entry : map.entrySet()) {
int min = getMin(entry.getKey());
int max = getMax(entry.getKey());
int expected = max - min;
if (expected != entry.getValue()) {
System.out.println("fail: key=" + entry.getKey() + " dif=" + entry.getValue() + " expected=" + expected);
success = false;
}
}
}
return success;
}
private static boolean verifyCnt(Map<String, Long> map) {
boolean success = true;
if (map.isEmpty()) {
System.out.println("cnt is empty");
success = false;
} else {
System.out.println("verifying cnt");
for (Map.Entry<String, Long> entry : map.entrySet()) {
int min = getMin(entry.getKey());
int max = getMax(entry.getKey());
long expected = (max - min) + 1L;
if (expected != entry.getValue()) {
System.out.println("fail: key=" + entry.getKey() + " cnt=" + entry.getValue() + " expected=" + expected);
success = false;
}
}
}
return success;
}
private static boolean verifySum(Map<String, Long> map) {
boolean success = true;
if (map.isEmpty()) {
System.out.println("sum is empty");
success = false;
} else {
System.out.println("verifying sum");
for (Map.Entry<String, Long> entry : map.entrySet()) {
int min = getMin(entry.getKey());
int max = getMax(entry.getKey());
long expected = ((long) min + (long) max) * (max - min + 1L) / 2L;
if (expected != entry.getValue()) {
System.out.println("fail: key=" + entry.getKey() + " sum=" + entry.getValue() + " expected=" + expected);
success = false;
}
}
}
return success;
}
private static boolean verifyAvg(Map<String, Double> map) {
boolean success = true;
if (map.isEmpty()) {
System.out.println("avg is empty");
success = false;
} else {
System.out.println("verifying avg");
for (Map.Entry<String, Double> entry : map.entrySet()) {
int min = getMin(entry.getKey());
int max = getMax(entry.getKey());
double expected = ((long) min + (long) max) / 2.0;
if (expected != entry.getValue()) {
System.out.println("fail: key=" + entry.getKey() + " avg=" + entry.getValue() + " expected=" + expected);
success = false;
}
}
}
return success;
}
private static boolean verifyWCnt(Map<String, Long> map) {
boolean success = true;
if (map.isEmpty()) {
System.out.println("wcnt is empty");
success = false;
} else {
System.out.println("verifying wcnt");
for (Map.Entry<String, Long> entry : map.entrySet()) {
long minTime = getMinFromWKey(entry.getKey()) + START_TIME;
long maxTime = getMaxFromWKey(entry.getKey()) + START_TIME;
long winTime = getStartFromWKey(entry.getKey());
long expected = WINDOW_SIZE;
if (minTime > winTime) expected -= minTime - winTime;
if (maxTime < winTime + WINDOW_SIZE - 1) expected -= winTime + WINDOW_SIZE - 1 - maxTime;
if (expected != entry.getValue()) {
System.out.println("fail: key=" + entry.getKey() + " wcnt=" + entry.getValue() + " expected=" + expected);
success = false;
}
}
}
return success;
}
private static int getMin(String key) {
return Integer.parseInt(key.split("-")[0]);
}
private static int getMax(String key) {
return Integer.parseInt(key.split("-")[1]);
}
private static int getMinFromWKey(String key) {
return getMin(key.split("@")[0]);
}
private static int getMaxFromWKey(String key) {
return getMax(key.split("@")[0]);
}
private static long getStartFromWKey(String key) {
return Long.parseLong(key.split("@")[1]);
}
private static List<TopicPartition> getAllPartitions(KafkaConsumer<?, ?> consumer, String... topics) {
ArrayList<TopicPartition> partitions = new ArrayList<>();
for (String topic : topics) {
for (PartitionInfo info : consumer.partitionsFor(topic)) {
partitions.add(new TopicPartition(info.topic(), info.partition()));
}
}
return partitions;
}
}

View File

@ -0,0 +1,184 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.smoketest;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import java.io.File;
import java.util.Map;
public class SmokeTestUtil {
public final static int WINDOW_SIZE = 100;
public final static long START_TIME = 60000L * 60 * 24 * 365 * 30;
public final static int END = Integer.MAX_VALUE;
public static <T> ProcessorSupplier<String, T> printProcessorSupplier(final String topic) {
return printProcessorSupplier(topic, false);
}
public static <T> ProcessorSupplier<String, T> printProcessorSupplier(final String topic, final boolean printOffset) {
return new ProcessorSupplier<String, T>() {
public Processor<String, T> get() {
return new Processor<String, T>() {
private int numRecordsProcessed = 0;
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
System.out.println("initializing processor: topic=" + topic + " taskId=" + context.taskId());
numRecordsProcessed = 0;
this.context = context;
}
@Override
public void process(String key, T value) {
if (printOffset) System.out.println(">>> " + context.offset());
numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) {
System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
}
}
@Override
public void punctuate(long timestamp) {
}
@Override
public void close() {
}
};
}
};
}
public static final class Unwindow<K, V> implements KeyValueMapper<Windowed<K>, V, KeyValue<K, V>> {
public KeyValue<K, V> apply(Windowed<K> winKey, V value) {
return new KeyValue<K, V>(winKey.value(), value);
}
}
public static Serializer<String> stringSerializer = new StringSerializer();
public static Deserializer<String> stringDeserializer = new StringDeserializer();
public static Serializer<Integer> integerSerializer = new IntegerSerializer();
public static Deserializer<Integer> integerDeserializer = new IntegerDeserializer();
public static Serializer<Long> longSerializer = new LongSerializer();
public static Deserializer<Long> longDeserializer = new LongDeserializer();
public static Serializer<Double> doubleSerializer = new Serializer<Double>() {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, Double data) {
if (data == null)
return null;
long bits = Double.doubleToLongBits(data);
return new byte[] {
(byte) (bits >>> 56),
(byte) (bits >>> 48),
(byte) (bits >>> 40),
(byte) (bits >>> 32),
(byte) (bits >>> 24),
(byte) (bits >>> 16),
(byte) (bits >>> 8),
(byte) bits
};
}
@Override
public void close() {
}
};
public static Deserializer<Double> doubleDeserializer = new Deserializer<Double>() {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public Double deserialize(String topic, byte[] data) {
if (data == null)
return null;
if (data.length != 8) {
throw new SerializationException("Size of data received by Deserializer is " +
"not 8");
}
long value = 0;
for (byte b : data) {
value <<= 8;
value |= b & 0xFF;
}
return Double.longBitsToDouble(value);
}
@Override
public void close() {
}
};
public static File createDir(String path) throws Exception {
File dir = new File(path);
dir.mkdir();
return dir;
}
public static File createDir(File parent, String child) throws Exception {
File dir = new File(parent, child);
dir.mkdir();
return dir;
}
public static void sleep(long duration) {
try {
Thread.sleep(duration);
} catch (Exception ex) {
//
}
}
}

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.smoketest;
import java.io.File;
import java.util.Map;
import java.util.Set;
/**
* Created by yasuhiro on 2/10/16.
*/
public class StreamsSmokeTest {
/**
* args ::= command kafka zookeeper stateDir
* command := "run" | "process"
*
* @param args
*/
public static void main(String[] args) throws Exception {
String command = args[0];
String kafka = args.length > 1 ? args[1] : null;
String zookeeper = args.length > 2 ? args[2] : null;
String stateDir = args.length > 3 ? args[3] : null;
System.out.println("StreamsSmokeTest instance started");
System.out.println("command=" + command);
System.out.println("kafka=" + kafka);
System.out.println("zookeeper=" + zookeeper);
System.out.println("stateDir=" + stateDir);
switch (command) {
case "standalone":
SmokeTestDriver.main(args);
break;
case "run":
// this starts the driver (data generation and result verification)
final int numKeys = 10;
final int maxRecordsPerKey = 500;
Map<String, Set<Integer>> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey);
SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
break;
case "process":
// this starts a KafkaStreams client
final SmokeTestClient client = new SmokeTestClient(new File(stateDir), kafka, zookeeper);
client.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
client.close();
}
});
break;
default:
System.out.println("unknown command: " + command);
}
}
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.smoketest;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;
public class TestTimestampExtractor implements TimestampExtractor {
private final long base = SmokeTestUtil.START_TIME;
@Override
public long extract(ConsumerRecord<Object, Object> record) {
switch (record.topic()) {
case "data":
return base + (Integer) record.value();
default:
return System.currentTimeMillis();
}
}
}

View File

@ -0,0 +1,135 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.services.service import Service
from ducktape.utils.util import wait_until
from ducktape.errors import DucktapeError
from kafkatest.services.kafka.directory import kafka_dir
import signal, random, requests, os.path, json
class StreamsSmokeTestBaseService(Service):
"""Base class for Streams Smoke Test services providing some common settings and functionality"""
PERSISTENT_ROOT = "/mnt/streams"
# The log file contains normal log4j logs written using a file appender. stdout and stderr are handled separately
LOG_FILE = os.path.join(PERSISTENT_ROOT, "streams.log")
STDOUT_FILE = os.path.join(PERSISTENT_ROOT, "streams.stdout")
STDERR_FILE = os.path.join(PERSISTENT_ROOT, "streams.stderr")
LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
PID_FILE = os.path.join(PERSISTENT_ROOT, "streams.pid")
logs = {
"streams_log": {
"path": LOG_FILE,
"collect_default": True},
"streams_stdout": {
"path": STDOUT_FILE,
"collect_default": True},
"streams_stderr": {
"path": STDERR_FILE,
"collect_default": True},
}
def __init__(self, context, kafka, command):
super(StreamsSmokeTestBaseService, self).__init__(context, 1)
self.kafka = kafka
self.args = { 'command': command }
@property
def node(self):
return self.nodes[0]
def pids(self, node):
try:
return [pid for pid in node.account.ssh_capture("cat " + self.PID_FILE, callback=int)]
except:
return []
def stop_node(self, node, clean_shutdown=True):
self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping Streams Smoke Test on " + str(node.account))
pids = self.pids(node)
sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
for pid in pids:
node.account.signal(pid, sig, allow_fail=True)
if clean_shutdown:
for pid in pids:
wait_until(lambda: not node.account.alive(pid), timeout_sec=60, err_msg="Streams Smoke Test process on " + str(node.account) + " took too long to exit")
node.account.ssh("rm -f " + self.PID_FILE, allow_fail=False)
def restart(self):
# We don't want to do any clean up here, just restart the process.
for node in self.nodes:
self.logger.info("Restarting Kafka Streams on " + str(node.account))
self.stop_node(node)
self.start_node(node)
def abortThenRestart(self):
# We don't want to do any clean up here, just abort then restart the process. The running service is killed immediately.
for node in self.nodes:
self.logger.info("Aborting Kafka Streams on " + str(node.account))
self.stop_node(node, False)
self.logger.info("Restarting Kafka Streams on " + str(node.account))
self.start_node(node)
def wait(self):
for node in self.nodes:
for pid in self.pids(node):
wait_until(lambda: not node.account.alive(pid), timeout_sec=120, err_msg="Streams Smoke Test process on " + str(node.account) + " took too long to exit")
def clean_node(self, node):
node.account.kill_process("streams", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
def start_cmd(self, node):
args = self.args.copy()
args['kafka'] = self.kafka.bootstrap_servers()
args['zk'] = self.kafka.zk.connect_setting()
args['state_dir'] = self.PERSISTENT_ROOT
args['stdout'] = self.STDOUT_FILE
args['stderr'] = self.STDERR_FILE
args['pidfile'] = self.PID_FILE
args['log4j'] = self.LOG4J_CONFIG_FILE
args['kafka_dir'] = kafka_dir(node)
cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
"/opt/%(kafka_dir)s/bin/streams-smoke-test.sh %(command)s %(kafka)s %(zk)s %(state_dir)s " \
" & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
return cmd
def start_node(self, node):
node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('tools_log4j.properties', log_file=self.LOG_FILE))
self.logger.info("Starting StreamsSmokeTest process on " + str(node.account))
with node.account.monitor_log(self.STDOUT_FILE) as monitor:
node.account.ssh(self.start_cmd(node))
monitor.wait_until('StreamsSmokeTest instance started', timeout_sec=15, err_msg="Never saw message indicating StreamsSmokeTest finished startup on " + str(node.account))
if len(self.pids(node)) == 0:
raise RuntimeError("No process ids recorded")
class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
def __init__(self, context, kafka):
super(StreamsSmokeTestDriverService, self).__init__(context, kafka, "run")
class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
def __init__(self, context, kafka):
super(StreamsSmokeTestJobRunnerService, self).__init__(context, kafka, "process")

View File

@ -0,0 +1,71 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
from ducktape.utils.util import wait_until
import time
class StreamsBounceTest(KafkaTest):
"""
Simple test of Kafka Streams.
"""
def __init__(self, test_context):
super(StreamsBounceTest, self).__init__(test_context, num_zk=1, num_brokers=2, topics={
'echo' : { 'partitions': 5, 'replication-factor': 2 },
'data' : { 'partitions': 5, 'replication-factor': 2 },
'min' : { 'partitions': 5, 'replication-factor': 2 },
'max' : { 'partitions': 5, 'replication-factor': 2 },
'sum' : { 'partitions': 5, 'replication-factor': 2 },
'dif' : { 'partitions': 5, 'replication-factor': 2 },
'cnt' : { 'partitions': 5, 'replication-factor': 2 },
'avg' : { 'partitions': 5, 'replication-factor': 2 },
'wcnt' : { 'partitions': 5, 'replication-factor': 2 }
})
self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
self.processor1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
def test_bounce(self):
"""
Start a smoke test client, then abort (kill -9) and restart it a few times.
Ensure that all records are delivered.
"""
self.driver.start()
self.processor1.start()
time.sleep(15);
self.processor1.abortThenRestart()
time.sleep(15);
# enable this after we add change log partition replicas
#self.kafka.signal_leader("data")
time.sleep(15);
self.processor1.abortThenRestart()
self.driver.wait()
self.driver.stop()
self.processor1.stop()
node = self.driver.node
node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)

View File

@ -0,0 +1,73 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
from ducktape.utils.util import wait_until
import time
class StreamsSmokeTest(KafkaTest):
"""
Simple test of Kafka Streams.
"""
def __init__(self, test_context):
super(StreamsSmokeTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
'echo' : { 'partitions': 5, 'replication-factor': 1 },
'data' : { 'partitions': 5, 'replication-factor': 1 },
'min' : { 'partitions': 5, 'replication-factor': 1 },
'max' : { 'partitions': 5, 'replication-factor': 1 },
'sum' : { 'partitions': 5, 'replication-factor': 1 },
'dif' : { 'partitions': 5, 'replication-factor': 1 },
'cnt' : { 'partitions': 5, 'replication-factor': 1 },
'avg' : { 'partitions': 5, 'replication-factor': 1 },
'wcnt' : { 'partitions': 5, 'replication-factor': 1 }
})
self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
self.processor1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
self.processor2 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
self.processor3 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
self.processor4 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
def test_streams(self):
"""
Start a few smoke test clients, then repeat start a new one, stop (cleanly) running one a few times.
Ensure that all results (stats on values computed by Kafka Streams) are correct.
"""
self.driver.start()
self.processor1.start()
self.processor2.start()
time.sleep(15);
self.processor3.start()
self.processor1.stop()
time.sleep(15);
self.processor4.start();
self.driver.wait()
self.driver.stop()
self.processor2.stop()
self.processor3.stop()
self.processor4.stop()
node = self.driver.node
node.account.ssh("grep SUCCESS %s" % self.driver.STDOUT_FILE, allow_fail=False)