diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index 9c987746281..cb2556f3ece 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -52,6 +52,11 @@ do CLASSPATH=$CLASSPATH:$file done +for file in $base_dir/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar; +do + CLASSPATH=$CLASSPATH:$file +done + for file in $base_dir/tools/build/libs/kafka-tools*.jar; do CLASSPATH=$CLASSPATH:$file diff --git a/bin/streams-smoke-test.sh b/bin/streams-smoke-test.sh new file mode 100755 index 00000000000..196990ef41c --- /dev/null +++ b/bin/streams-smoke-test.sh @@ -0,0 +1,23 @@ +#!/bin/sh +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +base_dir=$(dirname $0) + +if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then + export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/tools-log4j.properties" +fi + +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.streams.smoketest.StreamsSmokeTest "$@" diff --git a/build.gradle b/build.gradle index 275e250dcef..feef93b9aec 100644 --- a/build.gradle +++ b/build.gradle @@ -541,6 +541,10 @@ project(':streams') { jar { dependsOn 'copyDependantLibs' } + + systemTestLibs { + dependsOn testJar + } } project(':streams:examples') { diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java new file mode 100644 index 00000000000..7f1b343bd55 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.smoketest; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Predicate; +import org.apache.kafka.streams.kstream.TumblingWindows; +import org.apache.kafka.streams.kstream.UnlimitedWindows; +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.kstream.Windowed; + +import java.io.File; +import java.util.Properties; + +public class SmokeTestClient extends SmokeTestUtil { + + private final String kafka; + private final String zookeeper; + private final File stateDir; + private KafkaStreams streams; + private Thread thread; + + public SmokeTestClient(File stateDir, String kafka, String zookeeper) { + super(); + this.stateDir = stateDir; + this.kafka = kafka; + this.zookeeper = zookeeper; + } + + public void start() { + streams = createKafkaStreams(stateDir, kafka, zookeeper); + streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + e.printStackTrace(); + } + }); + + thread = new Thread() { + public void run() { + streams.start(); + } + }; + thread.start(); + } + + public void close() { + streams.close(); + try { + thread.join(); + } catch (Exception ex) { + // ignore + } + } + + private static KafkaStreams createKafkaStreams(File stateDir, String kafka, String zookeeper) { + Properties props = new Properties(); + props.put(StreamsConfig.JOB_ID_CONFIG, "SmokeTest"); + props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString()); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper); + props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TestTimestampExtractor.class); + props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3); + props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2); + props.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + KStreamBuilder builder = new KStreamBuilder(); + + KStream source = builder.stream(stringDeserializer, integerDeserializer, "data"); + + source.to("echo", stringSerializer, integerSerializer); + + KStream data = source.filter(new Predicate() { + @Override + public boolean test(String key, Integer value) { + return value == null || value != END; + } + }); + + data.process(SmokeTestUtil.printProcessorSupplier("data")); + + // min + data.aggregateByKey( + new Initializer() { + public Integer apply() { + return Integer.MAX_VALUE; + } + }, + new Aggregator() { + @Override + public Integer apply(String aggKey, Integer value, Integer aggregate) { + return (value < aggregate) ? value : aggregate; + } + }, + UnlimitedWindows.of("uwin-min"), + stringSerializer, + integerSerializer, + stringDeserializer, + integerDeserializer + ).toStream().map( + new Unwindow() + ).to("min", stringSerializer, integerSerializer); + + KTable minTable = builder.table(stringSerializer, integerSerializer, stringDeserializer, integerDeserializer, "min"); + minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min")); + + // max + data.aggregateByKey( + new Initializer() { + public Integer apply() { + return Integer.MIN_VALUE; + } + }, + new Aggregator() { + @Override + public Integer apply(String aggKey, Integer value, Integer aggregate) { + return (value > aggregate) ? value : aggregate; + } + }, + UnlimitedWindows.of("uwin-max"), + stringSerializer, + integerSerializer, + stringDeserializer, + integerDeserializer + ).toStream().map( + new Unwindow() + ).to("max", stringSerializer, integerSerializer); + + KTable maxTable = builder.table(stringSerializer, integerSerializer, stringDeserializer, integerDeserializer, "max"); + maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max")); + + // sum + data.aggregateByKey( + new Initializer() { + public Long apply() { + return 0L; + } + }, + new Aggregator() { + @Override + public Long apply(String aggKey, Integer value, Long aggregate) { + return (long) value + aggregate; + } + }, + UnlimitedWindows.of("win-sum"), + stringSerializer, + longSerializer, + stringDeserializer, + longDeserializer + ).toStream().map( + new Unwindow() + ).to("sum", stringSerializer, longSerializer); + + + KTable sumTable = builder.table(stringSerializer, longSerializer, stringDeserializer, longDeserializer, "sum"); + sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum")); + + // cnt + data.countByKey( + UnlimitedWindows.of("uwin-cnt"), + stringSerializer, + longSerializer, + stringDeserializer, + longDeserializer + ).toStream().map( + new Unwindow() + ).to("cnt", stringSerializer, longSerializer); + + KTable cntTable = builder.table(stringSerializer, longSerializer, stringDeserializer, longDeserializer, "cnt"); + cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt")); + + // dif + maxTable.join(minTable, + new ValueJoiner() { + public Integer apply(Integer value1, Integer value2) { + return value1 - value2; + } + } + ).to("dif", stringSerializer, integerSerializer); + + // avg + sumTable.join( + cntTable, + new ValueJoiner() { + public Double apply(Long value1, Long value2) { + return (double) value1 / (double) value2; + } + } + ).to("avg", stringSerializer, doubleSerializer); + + // windowed count + data.countByKey( + TumblingWindows.of("tumbling-win-cnt").with(WINDOW_SIZE), + stringSerializer, + longSerializer, + stringDeserializer, + longDeserializer + ).toStream().map( + new KeyValueMapper, Long, KeyValue>() { + @Override + public KeyValue apply(Windowed key, Long value) { + return new KeyValue<>(key.value() + "@" + key.window().start(), value); + } + } + ).to("wcnt", stringSerializer, longSerializer); + + return new KafkaStreams(builder, props); + } + +} diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java new file mode 100644 index 00000000000..e56a369024e --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java @@ -0,0 +1,495 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.smoketest; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.Set; + +public class SmokeTestDriver extends SmokeTestUtil { + + private static class ValueList { + public final String key; + private final int[] values; + private int index; + + ValueList(int min, int max) { + this.key = min + "-" + max; + + this.values = new int[max - min + 1]; + for (int i = 0; i < this.values.length; i++) { + this.values[i] = min + i; + } + // We want to randomize the order of data to test not completely predictable processing order + // However, values are also use as a timestamp of the record. (TODO: separate data and timestamp) + // We keep some correlation of time and order. Thus, the shuffling is done with a sliding window + shuffle(this.values, 10); + + this.index = 0; + } + + int next() { + return (index < values.length) ? values[index++] : -1; + } + } + + // This main() is not used by the system test. It is intended to be used for local debugging. + public static void main(String[] args) throws Exception { + final String kafka = "localhost:9092"; + final String zookeeper = "localhost:2181"; + final File stateDir = createDir("/tmp/kafka-streams-smoketest"); + + final int numKeys = 10; + final int maxRecordsPerKey = 500; + + Thread driver = new Thread() { + public void run() { + try { + Map> allData = generate(kafka, numKeys, maxRecordsPerKey); + verify(kafka, allData, maxRecordsPerKey); + } catch (Exception ex) { + ex.printStackTrace(); + } + } + }; + + SmokeTestClient streams1 = new SmokeTestClient(createDir(stateDir, "1"), kafka, zookeeper); + SmokeTestClient streams2 = new SmokeTestClient(createDir(stateDir, "2"), kafka, zookeeper); + SmokeTestClient streams3 = new SmokeTestClient(createDir(stateDir, "3"), kafka, zookeeper); + SmokeTestClient streams4 = new SmokeTestClient(createDir(stateDir, "4"), kafka, zookeeper); + + System.out.println("starting the driver"); + driver.start(); + + System.out.println("starting the first and second client"); + streams1.start(); + streams2.start(); + + sleep(10000); + + System.out.println("starting the third client"); + streams3.start(); + + System.out.println("closing the first client"); + streams1.close(); + System.out.println("closed the first client"); + + sleep(10000); + + System.out.println("starting the forth client"); + streams4.start(); + + driver.join(); + + System.out.println("driver stopped"); + + streams2.close(); + streams3.close(); + streams4.close(); + + System.out.println("shutdown"); + } + + public static Map> generate(String kafka, final int numKeys, final int maxRecordsPerKey) throws Exception { + Properties props = new Properties(); + props.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + + KafkaProducer producer = new KafkaProducer<>(props); + + int numRecordsProduced = 0; + + Map> allData = new HashMap<>(); + ValueList[] data = new ValueList[numKeys]; + for (int i = 0; i < numKeys; i++) { + data[i] = new ValueList(i, i + maxRecordsPerKey - 1); + allData.put(data[i].key, new HashSet()); + } + Random rand = new Random(); + + int remaining = data.length; + + while (remaining > 0) { + int index = rand.nextInt(remaining); + String key = data[index].key; + int value = data[index].next(); + + if (value < 0) { + remaining--; + data[index] = data[remaining]; + value = END; + } + + ProducerRecord record = + new ProducerRecord<>("data", stringSerializer.serialize("", key), integerSerializer.serialize("", value)); + + producer.send(record); + + if (value != END) { + numRecordsProduced++; + allData.get(key).add(value); + + if (numRecordsProduced % 100 == 0) + System.out.println(numRecordsProduced + " records produced"); + + Thread.sleep(10); + } + } + + producer.close(); + + return Collections.unmodifiableMap(allData); + } + + private static void shuffle(int[] data, int windowSize) { + Random rand = new Random(); + for (int i = 0; i < data.length; i++) { + // we shuffle data within windowSize + int j = rand.nextInt(Math.min(data.length - i, windowSize)) + i; + + // swap + int tmp = data[i]; + data[i] = data[j]; + data[j] = tmp; + } + } + + public static void verify(String kafka, Map> allData, int maxRecordsPerKey) { + Properties props = new Properties(); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier"); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + + KafkaConsumer consumer = new KafkaConsumer<>(props); + List partitions = getAllPartitions(consumer, "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt"); + consumer.assign(partitions); + consumer.seekToBeginning(partitions.toArray(new TopicPartition[partitions.size()])); + + final int recordsGenerated = allData.size() * maxRecordsPerKey; + int recordsProcessed = 0; + + HashMap max = new HashMap<>(); + HashMap min = new HashMap<>(); + HashMap dif = new HashMap<>(); + HashMap sum = new HashMap<>(); + HashMap cnt = new HashMap<>(); + HashMap avg = new HashMap<>(); + HashMap wcnt = new HashMap<>(); + + HashSet keys = new HashSet<>(); + HashMap> received = new HashMap<>(); + for (String key : allData.keySet()) { + keys.add(key); + received.put(key, new HashSet()); + } + + int retryCount = 0; + int maxRetry = 240; // max two minutes (500ms * 240) (before we reach the end of records) + + while (true) { + ConsumerRecords records = consumer.poll(500); + if (records.isEmpty()) { + retryCount++; + if (retryCount > maxRetry) break; + } else { + retryCount = 0; + + for (ConsumerRecord record : records) { + String key = stringDeserializer.deserialize("", record.key()); + switch (record.topic()) { + case "echo": + Integer value = integerDeserializer.deserialize("", record.value()); + if (value != null && value == END) { + keys.remove(key); + if (keys.isEmpty()) { + // we reached the end of records, set retry to 60 (max 30 seconds) + maxRetry = 60; + } + } else { + recordsProcessed++; + received.get(key).add(value); + } + break; + case "min": + min.put(key, integerDeserializer.deserialize("", record.value())); + break; + case "max": + max.put(key, integerDeserializer.deserialize("", record.value())); + break; + case "dif": + dif.put(key, integerDeserializer.deserialize("", record.value())); + break; + case "sum": + sum.put(key, longDeserializer.deserialize("", record.value())); + break; + case "cnt": + cnt.put(key, longDeserializer.deserialize("", record.value())); + break; + case "avg": + avg.put(key, doubleDeserializer.deserialize("", record.value())); + break; + case "wcnt": + wcnt.put(key, longDeserializer.deserialize("", record.value())); + break; + default: + System.out.println("unknown topic: " + record.topic()); + } + } + } + } + + + System.out.println("-------------------"); + System.out.println("Result Verification"); + System.out.println("-------------------"); + System.out.println("recordGenerated=" + recordsGenerated); + System.out.println("recordProcessed=" + recordsProcessed); + + if (recordsProcessed > recordsGenerated) { + System.out.println("PROCESSED-MORE-THAN-GENERATED"); + } else if (recordsProcessed < recordsGenerated) { + System.out.println("PROCESSED-LESS-THAN-GENERATED"); + } + + boolean success; + success = allData.equals(received); + + if (success) { + System.out.println("ALL-RECORDS-DELIVERED"); + } else { + int missedCount = 0; + for (Map.Entry> entry : allData.entrySet()) { + missedCount += received.get(entry.getKey()).size(); + } + System.out.println("missedRecords=" + missedCount); + } + + success &= verifyMin(min); + success &= verifyMax(max); + success &= verifyDif(dif); + success &= verifySum(sum); + success &= verifyCnt(cnt); + success &= verifyAvg(avg); + success &= verifyWCnt(wcnt); + + System.out.println(success ? "SUCCESS" : "FAILURE"); + } + + private static boolean verifyMin(Map map) { + boolean success = true; + if (map.isEmpty()) { + System.out.println("min is empty"); + success = false; + } else { + System.out.println("verifying min"); + + for (Map.Entry entry : map.entrySet()) { + int expected = getMin(entry.getKey()); + if (expected != entry.getValue()) { + System.out.println("fail: key=" + entry.getKey() + " min=" + entry.getValue() + " expected=" + expected); + success = false; + } + } + } + return success; + } + + private static boolean verifyMax(Map map) { + boolean success = true; + if (map.isEmpty()) { + System.out.println("max is empty"); + success = false; + } else { + System.out.println("verifying max"); + + for (Map.Entry entry : map.entrySet()) { + int expected = getMax(entry.getKey()); + if (expected != entry.getValue()) { + System.out.println("fail: key=" + entry.getKey() + " max=" + entry.getValue() + " expected=" + expected); + success = false; + } + } + } + return success; + } + + private static boolean verifyDif(Map map) { + boolean success = true; + if (map.isEmpty()) { + System.out.println("dif is empty"); + success = false; + } else { + System.out.println("verifying dif"); + + for (Map.Entry entry : map.entrySet()) { + int min = getMin(entry.getKey()); + int max = getMax(entry.getKey()); + int expected = max - min; + if (expected != entry.getValue()) { + System.out.println("fail: key=" + entry.getKey() + " dif=" + entry.getValue() + " expected=" + expected); + success = false; + } + } + } + return success; + } + + private static boolean verifyCnt(Map map) { + boolean success = true; + if (map.isEmpty()) { + System.out.println("cnt is empty"); + success = false; + } else { + System.out.println("verifying cnt"); + + for (Map.Entry entry : map.entrySet()) { + int min = getMin(entry.getKey()); + int max = getMax(entry.getKey()); + long expected = (max - min) + 1L; + if (expected != entry.getValue()) { + System.out.println("fail: key=" + entry.getKey() + " cnt=" + entry.getValue() + " expected=" + expected); + success = false; + } + } + } + return success; + } + + private static boolean verifySum(Map map) { + boolean success = true; + if (map.isEmpty()) { + System.out.println("sum is empty"); + success = false; + } else { + System.out.println("verifying sum"); + + for (Map.Entry entry : map.entrySet()) { + int min = getMin(entry.getKey()); + int max = getMax(entry.getKey()); + long expected = ((long) min + (long) max) * (max - min + 1L) / 2L; + if (expected != entry.getValue()) { + System.out.println("fail: key=" + entry.getKey() + " sum=" + entry.getValue() + " expected=" + expected); + success = false; + } + } + } + return success; + } + + private static boolean verifyAvg(Map map) { + boolean success = true; + if (map.isEmpty()) { + System.out.println("avg is empty"); + success = false; + } else { + System.out.println("verifying avg"); + + for (Map.Entry entry : map.entrySet()) { + int min = getMin(entry.getKey()); + int max = getMax(entry.getKey()); + double expected = ((long) min + (long) max) / 2.0; + + if (expected != entry.getValue()) { + System.out.println("fail: key=" + entry.getKey() + " avg=" + entry.getValue() + " expected=" + expected); + success = false; + } + } + } + return success; + } + + private static boolean verifyWCnt(Map map) { + boolean success = true; + if (map.isEmpty()) { + System.out.println("wcnt is empty"); + success = false; + } else { + System.out.println("verifying wcnt"); + + for (Map.Entry entry : map.entrySet()) { + long minTime = getMinFromWKey(entry.getKey()) + START_TIME; + long maxTime = getMaxFromWKey(entry.getKey()) + START_TIME; + long winTime = getStartFromWKey(entry.getKey()); + + long expected = WINDOW_SIZE; + if (minTime > winTime) expected -= minTime - winTime; + if (maxTime < winTime + WINDOW_SIZE - 1) expected -= winTime + WINDOW_SIZE - 1 - maxTime; + + if (expected != entry.getValue()) { + System.out.println("fail: key=" + entry.getKey() + " wcnt=" + entry.getValue() + " expected=" + expected); + success = false; + } + } + } + return success; + } + + private static int getMin(String key) { + return Integer.parseInt(key.split("-")[0]); + } + + private static int getMax(String key) { + return Integer.parseInt(key.split("-")[1]); + } + + private static int getMinFromWKey(String key) { + return getMin(key.split("@")[0]); + } + + private static int getMaxFromWKey(String key) { + return getMax(key.split("@")[0]); + } + + private static long getStartFromWKey(String key) { + return Long.parseLong(key.split("@")[1]); + } + + private static List getAllPartitions(KafkaConsumer consumer, String... topics) { + ArrayList partitions = new ArrayList<>(); + + for (String topic : topics) { + for (PartitionInfo info : consumer.partitionsFor(topic)) { + partitions.add(new TopicPartition(info.topic(), info.partition())); + } + } + return partitions; + } + +} diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java new file mode 100644 index 00000000000..4a135990394 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.smoketest; + +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +import java.io.File; +import java.util.Map; + +public class SmokeTestUtil { + + public final static int WINDOW_SIZE = 100; + public final static long START_TIME = 60000L * 60 * 24 * 365 * 30; + public final static int END = Integer.MAX_VALUE; + + public static ProcessorSupplier printProcessorSupplier(final String topic) { + return printProcessorSupplier(topic, false); + } + + public static ProcessorSupplier printProcessorSupplier(final String topic, final boolean printOffset) { + return new ProcessorSupplier() { + public Processor get() { + return new Processor() { + private int numRecordsProcessed = 0; + private ProcessorContext context; + + @Override + public void init(ProcessorContext context) { + System.out.println("initializing processor: topic=" + topic + " taskId=" + context.taskId()); + numRecordsProcessed = 0; + this.context = context; + } + + @Override + public void process(String key, T value) { + if (printOffset) System.out.println(">>> " + context.offset()); + numRecordsProcessed++; + if (numRecordsProcessed % 100 == 0) { + System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic); + } + } + + @Override + public void punctuate(long timestamp) { + } + + @Override + public void close() { + } + }; + } + }; + } + + public static final class Unwindow implements KeyValueMapper, V, KeyValue> { + public KeyValue apply(Windowed winKey, V value) { + return new KeyValue(winKey.value(), value); + } + } + + public static Serializer stringSerializer = new StringSerializer(); + + public static Deserializer stringDeserializer = new StringDeserializer(); + + public static Serializer integerSerializer = new IntegerSerializer(); + + public static Deserializer integerDeserializer = new IntegerDeserializer(); + + public static Serializer longSerializer = new LongSerializer(); + + public static Deserializer longDeserializer = new LongDeserializer(); + + public static Serializer doubleSerializer = new Serializer() { + + @Override + public void configure(Map configs, boolean isKey) { + } + + @Override + public byte[] serialize(String topic, Double data) { + if (data == null) + return null; + + long bits = Double.doubleToLongBits(data); + return new byte[] { + (byte) (bits >>> 56), + (byte) (bits >>> 48), + (byte) (bits >>> 40), + (byte) (bits >>> 32), + (byte) (bits >>> 24), + (byte) (bits >>> 16), + (byte) (bits >>> 8), + (byte) bits + }; + } + + @Override + public void close() { + } + }; + + public static Deserializer doubleDeserializer = new Deserializer() { + + @Override + public void configure(Map configs, boolean isKey) { + } + + @Override + public Double deserialize(String topic, byte[] data) { + if (data == null) + return null; + if (data.length != 8) { + throw new SerializationException("Size of data received by Deserializer is " + + "not 8"); + } + + long value = 0; + for (byte b : data) { + value <<= 8; + value |= b & 0xFF; + } + return Double.longBitsToDouble(value); + } + + @Override + public void close() { + } + }; + + public static File createDir(String path) throws Exception { + File dir = new File(path); + + dir.mkdir(); + + return dir; + } + + public static File createDir(File parent, String child) throws Exception { + File dir = new File(parent, child); + + dir.mkdir(); + + return dir; + } + + public static void sleep(long duration) { + try { + Thread.sleep(duration); + } catch (Exception ex) { + // + } + } + + +} diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java new file mode 100644 index 00000000000..a6cd141f91d --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.smoketest; + +import java.io.File; +import java.util.Map; +import java.util.Set; + +/** + * Created by yasuhiro on 2/10/16. + */ +public class StreamsSmokeTest { + + /** + * args ::= command kafka zookeeper stateDir + * command := "run" | "process" + * + * @param args + */ + public static void main(String[] args) throws Exception { + String command = args[0]; + String kafka = args.length > 1 ? args[1] : null; + String zookeeper = args.length > 2 ? args[2] : null; + String stateDir = args.length > 3 ? args[3] : null; + + System.out.println("StreamsSmokeTest instance started"); + System.out.println("command=" + command); + System.out.println("kafka=" + kafka); + System.out.println("zookeeper=" + zookeeper); + System.out.println("stateDir=" + stateDir); + + switch (command) { + case "standalone": + SmokeTestDriver.main(args); + break; + case "run": + // this starts the driver (data generation and result verification) + final int numKeys = 10; + final int maxRecordsPerKey = 500; + Map> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey); + SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey); + break; + case "process": + // this starts a KafkaStreams client + final SmokeTestClient client = new SmokeTestClient(new File(stateDir), kafka, zookeeper); + client.start(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + client.close(); + } + }); + break; + default: + System.out.println("unknown command: " + command); + } + } + +} diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java new file mode 100644 index 00000000000..04e264c2287 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.smoketest; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.processor.TimestampExtractor; + +public class TestTimestampExtractor implements TimestampExtractor { + + private final long base = SmokeTestUtil.START_TIME; + + @Override + public long extract(ConsumerRecord record) { + switch (record.topic()) { + case "data": + return base + (Integer) record.value(); + default: + return System.currentTimeMillis(); + } + } + +} diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py new file mode 100644 index 00000000000..192a8d9fcfa --- /dev/null +++ b/tests/kafkatest/services/streams.py @@ -0,0 +1,135 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.services.service import Service +from ducktape.utils.util import wait_until +from ducktape.errors import DucktapeError + +from kafkatest.services.kafka.directory import kafka_dir +import signal, random, requests, os.path, json + +class StreamsSmokeTestBaseService(Service): + """Base class for Streams Smoke Test services providing some common settings and functionality""" + + PERSISTENT_ROOT = "/mnt/streams" + # The log file contains normal log4j logs written using a file appender. stdout and stderr are handled separately + LOG_FILE = os.path.join(PERSISTENT_ROOT, "streams.log") + STDOUT_FILE = os.path.join(PERSISTENT_ROOT, "streams.stdout") + STDERR_FILE = os.path.join(PERSISTENT_ROOT, "streams.stderr") + LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties") + PID_FILE = os.path.join(PERSISTENT_ROOT, "streams.pid") + + logs = { + "streams_log": { + "path": LOG_FILE, + "collect_default": True}, + "streams_stdout": { + "path": STDOUT_FILE, + "collect_default": True}, + "streams_stderr": { + "path": STDERR_FILE, + "collect_default": True}, + } + + def __init__(self, context, kafka, command): + super(StreamsSmokeTestBaseService, self).__init__(context, 1) + self.kafka = kafka + self.args = { 'command': command } + + @property + def node(self): + return self.nodes[0] + + def pids(self, node): + try: + return [pid for pid in node.account.ssh_capture("cat " + self.PID_FILE, callback=int)] + except: + return [] + + def stop_node(self, node, clean_shutdown=True): + self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping Streams Smoke Test on " + str(node.account)) + pids = self.pids(node) + sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL + + for pid in pids: + node.account.signal(pid, sig, allow_fail=True) + if clean_shutdown: + for pid in pids: + wait_until(lambda: not node.account.alive(pid), timeout_sec=60, err_msg="Streams Smoke Test process on " + str(node.account) + " took too long to exit") + + node.account.ssh("rm -f " + self.PID_FILE, allow_fail=False) + + def restart(self): + # We don't want to do any clean up here, just restart the process. + for node in self.nodes: + self.logger.info("Restarting Kafka Streams on " + str(node.account)) + self.stop_node(node) + self.start_node(node) + + def abortThenRestart(self): + # We don't want to do any clean up here, just abort then restart the process. The running service is killed immediately. + for node in self.nodes: + self.logger.info("Aborting Kafka Streams on " + str(node.account)) + self.stop_node(node, False) + self.logger.info("Restarting Kafka Streams on " + str(node.account)) + self.start_node(node) + + def wait(self): + for node in self.nodes: + for pid in self.pids(node): + wait_until(lambda: not node.account.alive(pid), timeout_sec=120, err_msg="Streams Smoke Test process on " + str(node.account) + " took too long to exit") + + def clean_node(self, node): + node.account.kill_process("streams", clean_shutdown=False, allow_fail=True) + node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False) + + def start_cmd(self, node): + args = self.args.copy() + args['kafka'] = self.kafka.bootstrap_servers() + args['zk'] = self.kafka.zk.connect_setting() + args['state_dir'] = self.PERSISTENT_ROOT + args['stdout'] = self.STDOUT_FILE + args['stderr'] = self.STDERR_FILE + args['pidfile'] = self.PID_FILE + args['log4j'] = self.LOG4J_CONFIG_FILE + args['kafka_dir'] = kafka_dir(node) + + cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \ + "/opt/%(kafka_dir)s/bin/streams-smoke-test.sh %(command)s %(kafka)s %(zk)s %(state_dir)s " \ + " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args + + return cmd + + def start_node(self, node): + node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False) + + node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('tools_log4j.properties', log_file=self.LOG_FILE)) + + self.logger.info("Starting StreamsSmokeTest process on " + str(node.account)) + with node.account.monitor_log(self.STDOUT_FILE) as monitor: + node.account.ssh(self.start_cmd(node)) + monitor.wait_until('StreamsSmokeTest instance started', timeout_sec=15, err_msg="Never saw message indicating StreamsSmokeTest finished startup on " + str(node.account)) + + if len(self.pids(node)) == 0: + raise RuntimeError("No process ids recorded") + + +class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService): + def __init__(self, context, kafka): + super(StreamsSmokeTestDriverService, self).__init__(context, kafka, "run") + +class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService): + def __init__(self, context, kafka): + super(StreamsSmokeTestJobRunnerService, self).__init__(context, kafka, "process") diff --git a/tests/kafkatest/tests/streams_bounce_test.py b/tests/kafkatest/tests/streams_bounce_test.py new file mode 100644 index 00000000000..176f0109181 --- /dev/null +++ b/tests/kafkatest/tests/streams_bounce_test.py @@ -0,0 +1,71 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kafkatest.tests.kafka_test import KafkaTest +from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService +from ducktape.utils.util import wait_until +import time + +class StreamsBounceTest(KafkaTest): + """ + Simple test of Kafka Streams. + """ + + def __init__(self, test_context): + super(StreamsBounceTest, self).__init__(test_context, num_zk=1, num_brokers=2, topics={ + 'echo' : { 'partitions': 5, 'replication-factor': 2 }, + 'data' : { 'partitions': 5, 'replication-factor': 2 }, + 'min' : { 'partitions': 5, 'replication-factor': 2 }, + 'max' : { 'partitions': 5, 'replication-factor': 2 }, + 'sum' : { 'partitions': 5, 'replication-factor': 2 }, + 'dif' : { 'partitions': 5, 'replication-factor': 2 }, + 'cnt' : { 'partitions': 5, 'replication-factor': 2 }, + 'avg' : { 'partitions': 5, 'replication-factor': 2 }, + 'wcnt' : { 'partitions': 5, 'replication-factor': 2 } + }) + + self.driver = StreamsSmokeTestDriverService(test_context, self.kafka) + self.processor1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka) + + def test_bounce(self): + """ + Start a smoke test client, then abort (kill -9) and restart it a few times. + Ensure that all records are delivered. + """ + + self.driver.start() + + self.processor1.start() + + time.sleep(15); + + self.processor1.abortThenRestart() + + time.sleep(15); + + # enable this after we add change log partition replicas + #self.kafka.signal_leader("data") + + time.sleep(15); + + self.processor1.abortThenRestart() + + self.driver.wait() + self.driver.stop() + + self.processor1.stop() + + node = self.driver.node + node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False) diff --git a/tests/kafkatest/tests/streams_smoke_test.py b/tests/kafkatest/tests/streams_smoke_test.py new file mode 100644 index 00000000000..28618372b2f --- /dev/null +++ b/tests/kafkatest/tests/streams_smoke_test.py @@ -0,0 +1,73 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kafkatest.tests.kafka_test import KafkaTest +from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService +from ducktape.utils.util import wait_until +import time + +class StreamsSmokeTest(KafkaTest): + """ + Simple test of Kafka Streams. + """ + + def __init__(self, test_context): + super(StreamsSmokeTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={ + 'echo' : { 'partitions': 5, 'replication-factor': 1 }, + 'data' : { 'partitions': 5, 'replication-factor': 1 }, + 'min' : { 'partitions': 5, 'replication-factor': 1 }, + 'max' : { 'partitions': 5, 'replication-factor': 1 }, + 'sum' : { 'partitions': 5, 'replication-factor': 1 }, + 'dif' : { 'partitions': 5, 'replication-factor': 1 }, + 'cnt' : { 'partitions': 5, 'replication-factor': 1 }, + 'avg' : { 'partitions': 5, 'replication-factor': 1 }, + 'wcnt' : { 'partitions': 5, 'replication-factor': 1 } + }) + + self.driver = StreamsSmokeTestDriverService(test_context, self.kafka) + self.processor1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka) + self.processor2 = StreamsSmokeTestJobRunnerService(test_context, self.kafka) + self.processor3 = StreamsSmokeTestJobRunnerService(test_context, self.kafka) + self.processor4 = StreamsSmokeTestJobRunnerService(test_context, self.kafka) + + def test_streams(self): + """ + Start a few smoke test clients, then repeat start a new one, stop (cleanly) running one a few times. + Ensure that all results (stats on values computed by Kafka Streams) are correct. + """ + + self.driver.start() + + self.processor1.start() + self.processor2.start() + + time.sleep(15); + + self.processor3.start() + self.processor1.stop() + + time.sleep(15); + + self.processor4.start(); + + self.driver.wait() + self.driver.stop() + + self.processor2.stop() + self.processor3.stop() + self.processor4.stop() + + node = self.driver.node + node.account.ssh("grep SUCCESS %s" % self.driver.STDOUT_FILE, allow_fail=False)