KAFKA-5362: Add EOS system tests for Streams API

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Damian Guy <damian.guy@gmail.com>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3201 from mjsax/kafka-5362-add-eos-system-tests-for-streams-api
This commit is contained in:
Matthias J. Sax 2017-06-08 14:08:54 -07:00 committed by Guozhang Wang
parent 21194a63ed
commit ba07d828c5
7 changed files with 813 additions and 2 deletions

View File

@ -197,6 +197,10 @@
<allow pkg="org.I0Itec.zkclient" /> <allow pkg="org.I0Itec.zkclient" />
</subpackage> </subpackage>
<subpackage name="test">
<allow pkg="kafka.admin" />
</subpackage>
<subpackage name="state"> <subpackage name="state">
<allow pkg="org.rocksdb" /> <allow pkg="org.rocksdb" />
</subpackage> </subpackage>

View File

@ -548,7 +548,7 @@ public class StreamThread extends Thread {
timerStartedMs = time.milliseconds(); timerStartedMs = time.milliseconds();
// try to fetch some records if necessary // try to fetch some records if necessary
final ConsumerRecords<byte[], byte[]> records = pollRequests(pollTimeMs); final ConsumerRecords<byte[], byte[]> records = pollRequests();
if (records != null && !records.isEmpty() && !activeTasks.isEmpty()) { if (records != null && !records.isEmpty() && !activeTasks.isEmpty()) {
streamsMetrics.pollTimeSensor.record(computeLatency(), timerStartedMs); streamsMetrics.pollTimeSensor.record(computeLatency(), timerStartedMs);
addRecordsToTasks(records); addRecordsToTasks(records);
@ -573,7 +573,7 @@ public class StreamThread extends Thread {
* Get the next batch of records by polling. * Get the next batch of records by polling.
* @return Next batch of records or null if no records available. * @return Next batch of records or null if no records available.
*/ */
private ConsumerRecords<byte[], byte[]> pollRequests(final long pollTimeMs) { private ConsumerRecords<byte[], byte[]> pollRequests() {
ConsumerRecords<byte[], byte[]> records = null; ConsumerRecords<byte[], byte[]> records = null;
try { try {

View File

@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.io.File;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class EosTestClient extends SmokeTestUtil {
static final String APP_ID = "EosTest";
private final String kafka;
private final File stateDir;
private KafkaStreams streams;
private boolean uncaughtException;
EosTestClient(final File stateDir, final String kafka) {
super();
this.stateDir = stateDir;
this.kafka = kafka;
}
private boolean isRunning = true;
public void start() {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
isRunning = false;
streams.close(5, TimeUnit.SECONDS);
// do not remove these printouts since they are needed for health scripts
if (!uncaughtException) {
System.out.println("EOS-TEST-CLIENT-CLOSED");
}
}
}));
while (isRunning) {
if (streams == null) {
uncaughtException = false;
streams = createKafkaStreams(stateDir, kafka);
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
System.out.println("EOS-TEST-CLIENT-EXCEPTION");
e.printStackTrace();
uncaughtException = true;
}
});
streams.start();
}
if (uncaughtException) {
streams.close(5, TimeUnit.SECONDS);
streams = null;
}
sleep(1000);
}
}
private static KafkaStreams createKafkaStreams(final File stateDir,
final String kafka) {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
final KStreamBuilder builder = new KStreamBuilder();
final KStream<String, Integer> data = builder.stream("data");
data.to("echo");
data.process(SmokeTestUtil.printProcessorSupplier("data"));
final KGroupedStream<String, Integer> groupedData = data.groupByKey();
// min
groupedData
.aggregate(
new Initializer<Integer>() {
@Override
public Integer apply() {
return Integer.MAX_VALUE;
}
},
new Aggregator<String, Integer, Integer>() {
@Override
public Integer apply(final String aggKey,
final Integer value,
final Integer aggregate) {
return (value < aggregate) ? value : aggregate;
}
},
intSerde,
"min")
.to(stringSerde, intSerde, "min");
// sum
groupedData.aggregate(
new Initializer<Long>() {
@Override
public Long apply() {
return 0L;
}
},
new Aggregator<String, Integer, Long>() {
@Override
public Long apply(final String aggKey,
final Integer value,
final Long aggregate) {
return (long) value + aggregate;
}
},
longSerde,
"sum")
.to(stringSerde, longSerde, "sum");
return new KafkaStreams(builder, props);
}
}

View File

@ -0,0 +1,470 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import kafka.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
public class EosTestDriver extends SmokeTestUtil {
private static final int MAX_NUMBER_OF_KEYS = 100;
private static final long MAX_IDLE_TIME_MS = 300000L;
private static boolean isRunning = true;
static void generate(final String kafka) throws Exception {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
isRunning = false;
}
});
final Properties producerProps = new Properties();
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
final KafkaProducer<String, Integer> producer = new KafkaProducer<>(producerProps);
final Random rand = new Random(System.currentTimeMillis());
int numRecordsProduced = 0;
while (isRunning) {
final String key = "" + rand.nextInt(MAX_NUMBER_OF_KEYS);
final int value = rand.nextInt(10000);
final ProducerRecord<String, Integer> record = new ProducerRecord<>("data", key, value);
producer.send(record, new Callback() {
@Override
public void onCompletion(final RecordMetadata metadata, final Exception exception) {
if (exception != null) {
exception.printStackTrace();
Exit.exit(1);
}
}
});
numRecordsProduced++;
if (numRecordsProduced % 1000 == 0) {
System.out.println(numRecordsProduced + " records produced");
}
Utils.sleep(rand.nextInt(50));
}
producer.close();
System.out.println(numRecordsProduced + " records produced");
}
public static void verify(final String kafka) {
ensureStreamsApplicationDown(kafka);
final Map<TopicPartition, Long> committedOffsets = getCommittedOffsets(kafka);
final Properties props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
final List<TopicPartition> partitions = getAllPartitions(consumer, "data", "echo", "min", "sum");
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition
= getOutputRecords(consumer, committedOffsets);
truncate("data", recordPerTopicPerPartition, committedOffsets);
verifyMin(recordPerTopicPerPartition.get("data"), recordPerTopicPerPartition.get("min"));
verifySum(recordPerTopicPerPartition.get("data"), recordPerTopicPerPartition.get("sum"));
verifyAllTransactionFinished(consumer, kafka);
// do not modify: required test output
System.out.println("ALL-RECORDS-DELIVERED");
} catch (final Exception e) {
e.printStackTrace(System.err);
System.out.println("FAILED");
}
}
private static void ensureStreamsApplicationDown(final String kafka) {
AdminClient adminClient = null;
try {
adminClient = AdminClient.createSimplePlaintext(kafka);
final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
while (!adminClient.describeConsumerGroup(EosTestClient.APP_ID, 10000).consumers().get().isEmpty()) {
if (System.currentTimeMillis() > maxWaitTime) {
throw new RuntimeException("Streams application not down after 30 seconds.");
}
sleep(1000);
}
} finally {
if (adminClient != null) {
adminClient.close();
}
}
}
private static Map<TopicPartition, Long> getCommittedOffsets(final String kafka) {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(ConsumerConfig.GROUP_ID_CONFIG, EosTestClient.APP_ID);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "OffsetsClient");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
final Map<TopicPartition, Long> committedOffsets = new HashMap<>();
try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
final Set<String> topics = new HashSet<>();
topics.add("data");
consumer.subscribe(topics);
consumer.poll(0);
final Set<TopicPartition> partitions = new HashSet<>();
for (final String topic : topics) {
for (final PartitionInfo partition : consumer.partitionsFor(topic)) {
partitions.add(new TopicPartition(partition.topic(), partition.partition()));
}
}
for (final TopicPartition tp : partitions) {
final long offset = consumer.position(tp);
committedOffsets.put(tp, offset);
}
}
return committedOffsets;
}
private static Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> getOutputRecords(final KafkaConsumer<byte[], byte[]> consumer,
final Map<TopicPartition, Long> committedOffsets) {
final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition = new HashMap<>();
long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
boolean allRecordsReceived = false;
while (!allRecordsReceived && System.currentTimeMillis() < maxWaitTime) {
final ConsumerRecords<byte[], byte[]> receivedRecords = consumer.poll(500);
for (final ConsumerRecord<byte[], byte[]> record : receivedRecords) {
maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
addRecord(record, recordPerTopicPerPartition);
}
if (receivedRecords.count() > 0) {
allRecordsReceived =
receivedAllRecords(
recordPerTopicPerPartition.get("data"),
recordPerTopicPerPartition.get("echo"),
committedOffsets);
}
}
if (!allRecordsReceived) {
throw new RuntimeException("FAIL: did not receive all records after 30 sec idle time.");
}
return recordPerTopicPerPartition;
}
private static void addRecord(final ConsumerRecord<byte[], byte[]> record,
final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition) {
final String topic = record.topic();
final TopicPartition partition = new TopicPartition(topic, record.partition());
if ("data".equals(topic)
|| "echo".equals(topic)
|| "min".equals(topic)
|| "sum".equals(topic)) {
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> topicRecordsPerPartition
= recordPerTopicPerPartition.get(topic);
if (topicRecordsPerPartition == null) {
topicRecordsPerPartition = new HashMap<>();
recordPerTopicPerPartition.put(topic, topicRecordsPerPartition);
}
List<ConsumerRecord<byte[], byte[]>> records = topicRecordsPerPartition.get(partition);
if (records == null) {
records = new ArrayList<>();
topicRecordsPerPartition.put(partition, records);
}
records.add(record);
} else {
throw new RuntimeException("FAIL: received data from unexpected topic: " + record);
}
}
private static boolean receivedAllRecords(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> supersetExpectedRecords,
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> receivedRecords,
final Map<TopicPartition, Long> committedOffsets) {
if (supersetExpectedRecords == null
|| receivedRecords == null
|| supersetExpectedRecords.keySet().size() < committedOffsets.keySet().size()
|| receivedRecords.keySet().size() < committedOffsets.keySet().size()) {
return false;
}
for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : receivedRecords.entrySet()) {
final TopicPartition tp = partitionRecords.getKey();
final int numberOfReceivedRecords = partitionRecords.getValue().size();
final Long committed = committedOffsets.get(new TopicPartition("data", tp.partition()));
if (committed != null) {
if (numberOfReceivedRecords < committed) {
return false;
}
} else if (numberOfReceivedRecords > 0) {
throw new RuntimeException("Result verification failed for partition " + tp
+ ". No offset was committed but we received " + numberOfReceivedRecords + " records.");
}
}
final StringDeserializer stringDeserializer = new StringDeserializer();
final IntegerDeserializer integerDeserializer = new IntegerDeserializer();
for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : receivedRecords.entrySet()) {
try {
final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
final Iterator<ConsumerRecord<byte[], byte[]>> expectedRecords = supersetExpectedRecords.get(inputTopicPartition).iterator();
for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionRecords.getValue()) {
final ConsumerRecord<byte[], byte[]> expected = expectedRecords.next();
final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
final int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
final String expectedKey = stringDeserializer.deserialize(expected.topic(), expected.key());
final int expectedValue = integerDeserializer.deserialize(expected.topic(), expected.value());
if (!receivedKey.equals(expectedKey) || receivedValue != expectedValue) {
throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + expectedKey + "," + expectedValue + "> but was <" + receivedKey + "," + receivedValue + ">");
}
}
} catch (final NullPointerException | NoSuchElementException e) {
return false;
}
}
return true;
}
private static void truncate(final String topic,
final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition,
final Map<TopicPartition, Long> committedOffsets) {
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> topicRecords = recordPerTopicPerPartition.get(topic);
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> truncatedTopicRecords = recordPerTopicPerPartition.get(topic);
for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : topicRecords.entrySet()) {
final TopicPartition tp = partitionRecords.getKey();
final Long committed = committedOffsets.get(new TopicPartition("data", tp.partition()));
truncatedTopicRecords.put(tp, partitionRecords.getValue().subList(0, committed != null ? committed.intValue() : 0));
}
recordPerTopicPerPartition.put(topic, truncatedTopicRecords);
}
private static void verifyMin(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition,
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> minPerTopicPerPartition) {
final StringDeserializer stringDeserializer = new StringDeserializer();
final IntegerDeserializer integerDeserializer = new IntegerDeserializer();
final HashMap<String, Integer> currentMinPerKey = new HashMap<>();
for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : minPerTopicPerPartition.entrySet()) {
try {
final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = inputPerTopicPerPartition.get(inputTopicPartition).iterator();
for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionRecords.getValue()) {
final ConsumerRecord<byte[], byte[]> input = inputRecords.next();
final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
final int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
final String key = stringDeserializer.deserialize(input.topic(), input.key());
final Integer value = integerDeserializer.deserialize(input.topic(), input.value());
Integer min = currentMinPerKey.get(key);
if (min == null) {
min = value;
}
min = Math.min(min, value);
currentMinPerKey.put(key, min);
if (!receivedKey.equals(key) || receivedValue != min) {
throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + value + "> but was <" + receivedKey + "," + receivedValue + ">");
}
}
} catch (final NullPointerException e) {
System.err.println(inputPerTopicPerPartition);
e.printStackTrace(System.err);
throw e;
}
}
}
private static void verifySum(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition,
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> minPerTopicPerPartition) {
final StringDeserializer stringDeserializer = new StringDeserializer();
final IntegerDeserializer integerDeserializer = new IntegerDeserializer();
final LongDeserializer longDeserializer = new LongDeserializer();
final HashMap<String, Long> currentSumPerKey = new HashMap<>();
for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : minPerTopicPerPartition.entrySet()) {
try {
final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = inputPerTopicPerPartition.get(inputTopicPartition).iterator();
for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionRecords.getValue()) {
final ConsumerRecord<byte[], byte[]> input = inputRecords.next();
final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
final long receivedValue = longDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
final String key = stringDeserializer.deserialize(input.topic(), input.key());
final Integer value = integerDeserializer.deserialize(input.topic(), input.value());
Long sum = currentSumPerKey.get(key);
if (sum == null) {
sum = 0L;
}
sum += value;
currentSumPerKey.put(key, sum);
if (!receivedKey.equals(key) || receivedValue != sum) {
throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + value + "> but was <" + receivedKey + "," + receivedValue + ">");
}
}
} catch (final NullPointerException e) {
System.err.println(inputPerTopicPerPartition);
e.printStackTrace(System.err);
throw e;
}
}
}
private static void verifyAllTransactionFinished(final KafkaConsumer<byte[], byte[]> consumer,
final String kafka) {
final List<TopicPartition> partitions = getAllPartitions(consumer, "echo", "min", "sum");
consumer.assign(partitions);
consumer.seekToEnd(partitions);
consumer.poll(0);
final Properties producerProps = new Properties();
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "VerifyProducer");
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
try (final KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
for (final TopicPartition tp : partitions) {
final ProducerRecord<String, String> record = new ProducerRecord<>(tp.topic(), tp.partition(), "key", "value");
producer.send(record, new Callback() {
@Override
public void onCompletion(final RecordMetadata metadata, final Exception exception) {
if (exception != null) {
exception.printStackTrace();
Exit.exit(1);
}
}
});
}
}
final StringDeserializer stringDeserializer = new StringDeserializer();
final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) {
final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
for (final ConsumerRecord<byte[], byte[]> record : records) {
final String topic = record.topic();
final TopicPartition tp = new TopicPartition(topic, record.partition());
try {
final String key = stringDeserializer.deserialize(topic, record.key());
final String value = stringDeserializer.deserialize(topic, record.value());
if (!("key".equals(key) && "value".equals(value) && partitions.remove(tp))) {
throw new RuntimeException("Post transactions verification failed. Received unexpected verification record: " +
"Expected record <'key','value'> from one of " + partitions + " but got"
+ " <" + key + "," + value + "> [" + record.topic() + ", " + record.partition() + "]");
}
} catch (final SerializationException e) {
throw new RuntimeException("Post transactions verification failed. Received unexpected verification record: " +
"Expected record <'key','value'> from one of " + partitions + " but got " + record, e);
}
}
}
if (!partitions.isEmpty()) {
throw new RuntimeException("Could not read all verification records. Did not receive any new record within the last 30 sec.");
}
}
private static List<TopicPartition> getAllPartitions(final KafkaConsumer<?, ?> consumer,
final String... topics) {
final ArrayList<TopicPartition> partitions = new ArrayList<>();
for (final String topic : topics) {
for (final PartitionInfo info : consumer.partitionsFor(topic)) {
partitions.add(new TopicPartition(info.topic(), info.partition()));
}
}
return partitions;
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import java.io.File;
public class StreamsEosTest {
/**
* args ::= command kafka zookeeper stateDir
* command := "run" | "process" | "verify"
*/
public static void main(final String[] args) throws Exception {
final String kafka = args[0];
final String stateDir = args.length > 1 ? args[1] : null;
final String command = args.length > 2 ? args[2] : null;
System.out.println("StreamsTest instance started");
System.out.println("command=" + command);
System.out.println("kafka=" + kafka);
System.out.println("stateDir=" + stateDir);
if (command == null || stateDir == null) {
System.exit(-1);
}
switch (command) {
case "run":
EosTestDriver.generate(kafka);
break;
case "process":
final EosTestClient client = new EosTestClient(new File(stateDir), kafka);
client.start();
break;
case "verify":
EosTestDriver.verify(kafka);
break;
default:
System.out.println("unknown command: " + command);
}
}
}

View File

@ -150,6 +150,16 @@ class StreamsSmokeTestBaseService(StreamsTestBaseService):
command) command)
class StreamsEosTestBaseService(StreamsTestBaseService):
"""Base class for Streams EOS Test services providing some common settings and functionality"""
def __init__(self, test_context, kafka, command):
super(StreamsEosTestBaseService, self).__init__(test_context,
kafka,
"org.apache.kafka.streams.tests.StreamsEosTest",
command)
class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService): class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
def __init__(self, test_context, kafka): def __init__(self, test_context, kafka):
super(StreamsSmokeTestDriverService, self).__init__(test_context, kafka, "run") super(StreamsSmokeTestDriverService, self).__init__(test_context, kafka, "run")
@ -160,6 +170,21 @@ class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
super(StreamsSmokeTestJobRunnerService, self).__init__(test_context, kafka, "process") super(StreamsSmokeTestJobRunnerService, self).__init__(test_context, kafka, "process")
class StreamsEosTestDriverService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsEosTestDriverService, self).__init__(test_context, kafka, "run")
class StreamsEosTestJobRunnerService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsEosTestJobRunnerService, self).__init__(test_context, kafka, "process")
class StreamsEosTestVerifyRunnerService(StreamsEosTestBaseService):
def __init__(self, test_context, kafka):
super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify")
class StreamsSmokeTestShutdownDeadlockService(StreamsSmokeTestBaseService): class StreamsSmokeTestShutdownDeadlockService(StreamsSmokeTestBaseService):
def __init__(self, test_context, kafka): def __init__(self, test_context, kafka):
super(StreamsSmokeTestShutdownDeadlockService, self).__init__(test_context, kafka, "close-deadlock-test") super(StreamsSmokeTestShutdownDeadlockService, self).__init__(test_context, kafka, "close-deadlock-test")

View File

@ -0,0 +1,105 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.mark.resource import cluster
from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.services.streams import StreamsEosTestDriverService, StreamsEosTestJobRunnerService, StreamsEosTestVerifyRunnerService
import time
class StreamsEosTest(KafkaTest):
"""
Test of Kafka Streams exactly-once semantics
"""
def __init__(self, test_context):
super(StreamsEosTest, self).__init__(test_context, num_zk=1, num_brokers=3, topics={
'data' : { 'partitions': 5, 'replication-factor': 2 },
'echo' : { 'partitions': 5, 'replication-factor': 2 },
'min' : { 'partitions': 5, 'replication-factor': 2 },
'sum' : { 'partitions': 5, 'replication-factor': 2 }
})
self.driver = StreamsEosTestDriverService(test_context, self.kafka)
self.processor1 = StreamsEosTestJobRunnerService(test_context, self.kafka)
self.processor2 = StreamsEosTestJobRunnerService(test_context, self.kafka)
self.verifier = StreamsEosTestVerifyRunnerService(test_context, self.kafka)
@cluster(num_nodes=8)
def test_rebalance(self):
"""
Starts and stops two test clients a few times.
Ensure that all records are delivered exactly-once.
"""
self.driver.start()
self.processor1.start()
time.sleep(30)
self.processor2.start()
time.sleep(30)
self.processor1.stop()
time.sleep(30)
self.processor1.start()
time.sleep(30)
self.processor2.stop()
time.sleep(30)
self.driver.stop()
self.processor1.stop()
self.processor2.stop()
self.verifier.start()
self.verifier.wait()
self.verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.verifier.STDOUT_FILE, allow_fail=False)
@cluster(num_nodes=8)
def test_failure_and_recovery(self):
"""
Starts two test clients, then abort (kill -9) and restart them a few times.
Ensure that all records are delivered exactly-once.
"""
self.driver.start()
self.processor1.start()
self.processor2.start()
time.sleep(30)
self.processor1.abortThenRestart()
time.sleep(30)
self.processor1.abortThenRestart()
time.sleep(30)
self.processor2.abortThenRestart()
time.sleep(30)
self.driver.stop()
self.processor1.stop()
self.processor2.stop()
self.verifier.start()
self.verifier.wait()
self.verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.verifier.STDOUT_FILE, allow_fail=False)