KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests. (#10602)

Also adjusted the acceptable recovery lag to stabilize Streams tests.

Reviewers: Justine Olshan <jolshan@confluent.io>, Matthias J. Sax <mjsax@apache.org>, John Roesler <vvcephei@apache.org>
This commit is contained in:
Kamal Chandraprakash 2021-08-05 04:01:10 +05:30 committed by GitHub
parent 06034938c4
commit a103c95a31
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1299 additions and 11 deletions

View File

@ -1842,6 +1842,7 @@ project(':streams') {
':streams:upgrade-system-tests-25:test', ':streams:upgrade-system-tests-25:test',
':streams:upgrade-system-tests-26:test', ':streams:upgrade-system-tests-26:test',
':streams:upgrade-system-tests-27:test', ':streams:upgrade-system-tests-27:test',
':streams:upgrade-system-tests-28:test',
':streams:examples:test' ':streams:examples:test'
] ]
) )
@ -2141,6 +2142,19 @@ project(':streams:upgrade-system-tests-27') {
} }
} }
project(':streams:upgrade-system-tests-28') {
archivesBaseName = "kafka-streams-upgrade-system-tests-28"
dependencies {
testImplementation libs.kafkaStreams_28
testRuntimeOnly libs.junitJupiter
}
systemTestLibs {
dependsOn testJar
}
}
project(':jmh-benchmarks') { project(':jmh-benchmarks') {
apply plugin: 'com.github.johnrengelman.shadow' apply plugin: 'com.github.johnrengelman.shadow'

View File

@ -96,6 +96,7 @@ versions += [
kafka_25: "2.5.1", kafka_25: "2.5.1",
kafka_26: "2.6.2", kafka_26: "2.6.2",
kafka_27: "2.7.1", kafka_27: "2.7.1",
kafka_28: "2.8.0",
lz4: "1.7.1", lz4: "1.7.1",
mavenArtifact: "3.8.1", mavenArtifact: "3.8.1",
metrics: "2.2.0", metrics: "2.2.0",
@ -169,6 +170,7 @@ libs += [
kafkaStreams_25: "org.apache.kafka:kafka-streams:$versions.kafka_25", kafkaStreams_25: "org.apache.kafka:kafka-streams:$versions.kafka_25",
kafkaStreams_26: "org.apache.kafka:kafka-streams:$versions.kafka_26", kafkaStreams_26: "org.apache.kafka:kafka-streams:$versions.kafka_26",
kafkaStreams_27: "org.apache.kafka:kafka-streams:$versions.kafka_27", kafkaStreams_27: "org.apache.kafka:kafka-streams:$versions.kafka_27",
kafkaStreams_28: "org.apache.kafka:kafka-streams:$versions.kafka_28",
log4j: "log4j:log4j:$versions.log4j", log4j: "log4j:log4j:$versions.log4j",
lz4: "org.lz4:lz4-java:$versions.lz4", lz4: "org.lz4:lz4-java:$versions.lz4",
metrics: "com.yammer.metrics:metrics-core:$versions.metrics", metrics: "com.yammer.metrics:metrics-core:$versions.metrics",

View File

@ -51,5 +51,6 @@ include 'clients',
'streams:upgrade-system-tests-25', 'streams:upgrade-system-tests-25',
'streams:upgrade-system-tests-26', 'streams:upgrade-system-tests-26',
'streams:upgrade-system-tests-27', 'streams:upgrade-system-tests-27',
'streams:upgrade-system-tests-28',
'tools', 'tools',
'trogdor' 'trogdor'

View File

@ -67,7 +67,7 @@ public class StreamsUpgradeTest {
@Override @Override
public void init(final ProcessorContext context) { public void init(final ProcessorContext context) {
System.out.println("[2.6] initializing processor: topic=data taskId=" + context.taskId()); System.out.println("[2.7] initializing processor: topic=data taskId=" + context.taskId());
numRecordsProcessed = 0; numRecordsProcessed = 0;
} }

View File

@ -0,0 +1,299 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.Duration;
import java.time.Instant;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
public class SmokeTestClient extends SmokeTestUtil {
private final String name;
private KafkaStreams streams;
private boolean uncaughtException = false;
private boolean started;
private volatile boolean closed;
private static void addShutdownHook(final String name, final Runnable runnable) {
if (name != null) {
Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
} else {
Runtime.getRuntime().addShutdownHook(new Thread(runnable));
}
}
private static File tempDirectory() {
final String prefix = "kafka-";
final File file;
try {
file = Files.createTempDirectory(prefix).toFile();
} catch (final IOException ex) {
throw new RuntimeException("Failed to create a temp dir", ex);
}
file.deleteOnExit();
addShutdownHook("delete-temp-file-shutdown-hook", () -> {
try {
Utils.delete(file);
} catch (final IOException e) {
System.out.println("Error deleting " + file.getAbsolutePath());
e.printStackTrace(System.out);
}
});
return file;
}
public SmokeTestClient(final String name) {
this.name = name;
}
public boolean started() {
return started;
}
public boolean closed() {
return closed;
}
public void start(final Properties streamsProperties) {
final Topology build = getTopology();
streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
final CountDownLatch countDownLatch = new CountDownLatch(1);
streams.setStateListener((newState, oldState) -> {
System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
started = true;
countDownLatch.countDown();
}
if (newState == KafkaStreams.State.NOT_RUNNING) {
closed = true;
}
});
streams.setUncaughtExceptionHandler(e -> {
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
System.out.println(name + ": FATAL: An unexpected exception is encountered: " + e);
e.printStackTrace(System.out);
uncaughtException = true;
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
});
addShutdownHook("streams-shutdown-hook", this::close);
streams.start();
try {
if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute");
}
} catch (final InterruptedException e) {
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
e.printStackTrace(System.out);
}
System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
System.out.println(name + " started at " + Instant.now());
}
public void closeAsync() {
streams.close(Duration.ZERO);
}
public void close() {
final boolean closed = streams.close(Duration.ofMinutes(1));
if (closed && !uncaughtException) {
System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED");
} else if (closed) {
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
} else {
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't close");
}
}
private Properties getStreamsConfig(final Properties props) {
final Properties fullProps = new Properties(props);
fullProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
fullProps.put(StreamsConfig.CLIENT_ID_CONFIG, "SmokeTest-" + name);
fullProps.put(StreamsConfig.STATE_DIR_CONFIG, tempDirectory().getAbsolutePath());
fullProps.putAll(props);
return fullProps;
}
public Topology getTopology() {
final StreamsBuilder builder = new StreamsBuilder();
final Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde);
final KStream<String, Integer> source = builder.stream("data", stringIntConsumed);
source.filterNot((k, v) -> k.equals("flush"))
.to("echo", Produced.with(stringSerde, intSerde));
final KStream<String, Integer> data = source.filter((key, value) -> value == null || value != END);
data.process(SmokeTestUtil.printProcessorSupplier("data", name));
// min
final KGroupedStream<String, Integer> groupedData = data.groupByKey(Grouped.with(stringSerde, intSerde));
final KTable<Windowed<String>, Integer> minAggregation = groupedData
.windowedBy(TimeWindows.of(Duration.ofDays(1)).grace(Duration.ofMinutes(1)))
.aggregate(
() -> Integer.MAX_VALUE,
(aggKey, value, aggregate) -> (value < aggregate) ? value : aggregate,
Materialized
.<String, Integer, WindowStore<Bytes, byte[]>>as("uwin-min")
.withValueSerde(intSerde)
.withRetention(Duration.ofHours(25))
);
streamify(minAggregation, "min-raw");
streamify(minAggregation.suppress(untilWindowCloses(BufferConfig.unbounded())), "min-suppressed");
minAggregation
.toStream(new Unwindow<>())
.filterNot((k, v) -> k.equals("flush"))
.to("min", Produced.with(stringSerde, intSerde));
final KTable<Windowed<String>, Integer> smallWindowSum = groupedData
.windowedBy(TimeWindows.of(Duration.ofSeconds(2)).advanceBy(Duration.ofSeconds(1)).grace(Duration.ofSeconds(30)))
.reduce((l, r) -> l + r);
streamify(smallWindowSum, "sws-raw");
streamify(smallWindowSum.suppress(untilWindowCloses(BufferConfig.unbounded())), "sws-suppressed");
final KTable<String, Integer> minTable = builder.table(
"min",
Consumed.with(stringSerde, intSerde),
Materialized.as("minStoreName"));
minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min", name));
// max
groupedData
.windowedBy(TimeWindows.of(Duration.ofDays(2)))
.aggregate(
() -> Integer.MIN_VALUE,
(aggKey, value, aggregate) -> (value > aggregate) ? value : aggregate,
Materialized.<String, Integer, WindowStore<Bytes, byte[]>>as("uwin-max").withValueSerde(intSerde))
.toStream(new Unwindow<>())
.filterNot((k, v) -> k.equals("flush"))
.to("max", Produced.with(stringSerde, intSerde));
final KTable<String, Integer> maxTable = builder.table(
"max",
Consumed.with(stringSerde, intSerde),
Materialized.as("maxStoreName"));
maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max", name));
// sum
groupedData
.windowedBy(TimeWindows.of(Duration.ofDays(2)))
.aggregate(
() -> 0L,
(aggKey, value, aggregate) -> (long) value + aggregate,
Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("win-sum").withValueSerde(longSerde))
.toStream(new Unwindow<>())
.filterNot((k, v) -> k.equals("flush"))
.to("sum", Produced.with(stringSerde, longSerde));
final Consumed<String, Long> stringLongConsumed = Consumed.with(stringSerde, longSerde);
final KTable<String, Long> sumTable = builder.table("sum", stringLongConsumed);
sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum", name));
// cnt
groupedData
.windowedBy(TimeWindows.of(Duration.ofDays(2)))
.count(Materialized.as("uwin-cnt"))
.toStream(new Unwindow<>())
.filterNot((k, v) -> k.equals("flush"))
.to("cnt", Produced.with(stringSerde, longSerde));
final KTable<String, Long> cntTable = builder.table(
"cnt",
Consumed.with(stringSerde, longSerde),
Materialized.as("cntStoreName"));
cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt", name));
// dif
maxTable
.join(
minTable,
(value1, value2) -> value1 - value2)
.toStream()
.filterNot((k, v) -> k.equals("flush"))
.to("dif", Produced.with(stringSerde, intSerde));
// avg
sumTable
.join(
cntTable,
(value1, value2) -> (double) value1 / (double) value2)
.toStream()
.filterNot((k, v) -> k.equals("flush"))
.to("avg", Produced.with(stringSerde, doubleSerde));
// test repartition
final Agg agg = new Agg();
cntTable.groupBy(agg.selector(), Grouped.with(stringSerde, longSerde))
.aggregate(agg.init(), agg.adder(), agg.remover(),
Materialized.<String, Long>as(Stores.inMemoryKeyValueStore("cntByCnt"))
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long()))
.toStream()
.to("tagg", Produced.with(stringSerde, longSerde));
return builder.build();
}
private static void streamify(final KTable<Windowed<String>, Integer> windowedTable, final String topic) {
windowedTable
.toStream()
.filterNot((k, v) -> k.key().equals("flush"))
.map((key, value) -> new KeyValue<>(key.toString(), value))
.to(topic, Produced.with(stringSerde, intSerde));
}
}

View File

@ -0,0 +1,622 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
public class SmokeTestDriver extends SmokeTestUtil {
private static final String[] TOPICS = {
"data",
"echo",
"max",
"min", "min-suppressed", "min-raw",
"dif",
"sum",
"sws-raw", "sws-suppressed",
"cnt",
"avg",
"tagg"
};
private static final int MAX_RECORD_EMPTY_RETRIES = 30;
private static class ValueList {
public final String key;
private final int[] values;
private int index;
ValueList(final int min, final int max) {
key = min + "-" + max;
values = new int[max - min + 1];
for (int i = 0; i < values.length; i++) {
values[i] = min + i;
}
// We want to randomize the order of data to test not completely predictable processing order
// However, values are also use as a timestamp of the record. (TODO: separate data and timestamp)
// We keep some correlation of time and order. Thus, the shuffling is done with a sliding window
shuffle(values, 10);
index = 0;
}
int next() {
return (index < values.length) ? values[index++] : -1;
}
}
public static String[] topics() {
return Arrays.copyOf(TOPICS, TOPICS.length);
}
static void generatePerpetually(final String kafka,
final int numKeys,
final int maxRecordsPerKey) {
final Properties producerProps = generatorProperties(kafka);
int numRecordsProduced = 0;
final ValueList[] data = new ValueList[numKeys];
for (int i = 0; i < numKeys; i++) {
data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
}
final Random rand = new Random();
try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
while (true) {
final int index = rand.nextInt(numKeys);
final String key = data[index].key;
final int value = data[index].next();
final ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>(
"data",
stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value)
);
producer.send(record);
numRecordsProduced++;
if (numRecordsProduced % 100 == 0) {
System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
}
Utils.sleep(2);
}
}
}
public static Map<String, Set<Integer>> generate(final String kafka,
final int numKeys,
final int maxRecordsPerKey,
final Duration timeToSpend) {
final Properties producerProps = generatorProperties(kafka);
int numRecordsProduced = 0;
final Map<String, Set<Integer>> allData = new HashMap<>();
final ValueList[] data = new ValueList[numKeys];
for (int i = 0; i < numKeys; i++) {
data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
allData.put(data[i].key, new HashSet<>());
}
final Random rand = new Random();
int remaining = data.length;
final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
while (remaining > 0) {
final int index = rand.nextInt(remaining);
final String key = data[index].key;
final int value = data[index].next();
if (value < 0) {
remaining--;
data[index] = data[remaining];
} else {
final ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>(
"data",
stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value)
);
producer.send(record, new TestCallback(record, needRetry));
numRecordsProduced++;
allData.get(key).add(value);
if (numRecordsProduced % 100 == 0) {
System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
}
Utils.sleep(Math.max(recordPauseTime, 2));
}
}
producer.flush();
int remainingRetries = 5;
while (!needRetry.isEmpty()) {
final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
for (final ProducerRecord<byte[], byte[]> record : needRetry) {
System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
producer.send(record, new TestCallback(record, needRetry2));
}
producer.flush();
needRetry = needRetry2;
if (--remainingRetries == 0 && !needRetry.isEmpty()) {
System.err.println("Failed to produce all records after multiple retries");
Exit.exit(1);
}
}
// now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
// all suppressed records.
final List<PartitionInfo> partitions = producer.partitionsFor("data");
for (final PartitionInfo partition : partitions) {
producer.send(new ProducerRecord<>(
partition.topic(),
partition.partition(),
System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
stringSerde.serializer().serialize("", "flush"),
intSerde.serializer().serialize("", 0)
));
}
}
return Collections.unmodifiableMap(allData);
}
private static Properties generatorProperties(final String kafka) {
final Properties producerProps = new Properties();
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
return producerProps;
}
private static class TestCallback implements Callback {
private final ProducerRecord<byte[], byte[]> originalRecord;
private final List<ProducerRecord<byte[], byte[]>> needRetry;
TestCallback(final ProducerRecord<byte[], byte[]> originalRecord,
final List<ProducerRecord<byte[], byte[]>> needRetry) {
this.originalRecord = originalRecord;
this.needRetry = needRetry;
}
@Override
public void onCompletion(final RecordMetadata metadata, final Exception exception) {
if (exception != null) {
if (exception instanceof TimeoutException) {
needRetry.add(originalRecord);
} else {
exception.printStackTrace();
Exit.exit(1);
}
}
}
}
private static void shuffle(final int[] data, @SuppressWarnings("SameParameterValue") final int windowSize) {
final Random rand = new Random();
for (int i = 0; i < data.length; i++) {
// we shuffle data within windowSize
final int j = rand.nextInt(Math.min(data.length - i, windowSize)) + i;
// swap
final int tmp = data[i];
data[i] = data[j];
data[j] = tmp;
}
}
public static class NumberDeserializer implements Deserializer<Number> {
@Override
public Number deserialize(final String topic, final byte[] data) {
final Number value;
switch (topic) {
case "data":
case "echo":
case "min":
case "min-raw":
case "min-suppressed":
case "sws-raw":
case "sws-suppressed":
case "max":
case "dif":
value = intSerde.deserializer().deserialize(topic, data);
break;
case "sum":
case "cnt":
case "tagg":
value = longSerde.deserializer().deserialize(topic, data);
break;
case "avg":
value = doubleSerde.deserializer().deserialize(topic, data);
break;
default:
throw new RuntimeException("unknown topic: " + topic);
}
return value;
}
}
public static VerificationResult verify(final String kafka,
final Map<String, Set<Integer>> inputs,
final int maxRecordsPerKey) {
final Properties props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NumberDeserializer.class);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS);
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
final int recordsGenerated = inputs.size() * maxRecordsPerKey;
int recordsProcessed = 0;
final Map<String, AtomicInteger> processed =
Stream.of(TOPICS)
.collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
VerificationResult verificationResult = new VerificationResult(false, "no results yet");
int retry = 0;
final long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) {
final ConsumerRecords<String, Number> records = consumer.poll(Duration.ofSeconds(5));
if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
verificationResult = verifyAll(inputs, events, false);
if (verificationResult.passed()) {
break;
} else if (retry++ > MAX_RECORD_EMPTY_RETRIES) {
System.out.println(Instant.now() + " Didn't get any more results, verification hasn't passed, and out of retries.");
break;
} else {
System.out.println(Instant.now() + " Didn't get any more results, but verification hasn't passed (yet). Retrying..." + retry);
}
} else {
System.out.println(Instant.now() + " Get some more results from " + records.partitions() + ", resetting retry.");
retry = 0;
for (final ConsumerRecord<String, Number> record : records) {
final String key = record.key();
final String topic = record.topic();
processed.get(topic).incrementAndGet();
if (topic.equals("echo")) {
recordsProcessed++;
if (recordsProcessed % 100 == 0) {
System.out.println("Echo records processed = " + recordsProcessed);
}
}
events.computeIfAbsent(topic, t -> new HashMap<>())
.computeIfAbsent(key, k -> new LinkedList<>())
.add(record);
}
System.out.println(processed);
}
}
consumer.close();
final long finished = System.currentTimeMillis() - start;
System.out.println("Verification time=" + finished);
System.out.println("-------------------");
System.out.println("Result Verification");
System.out.println("-------------------");
System.out.println("recordGenerated=" + recordsGenerated);
System.out.println("recordProcessed=" + recordsProcessed);
if (recordsProcessed > recordsGenerated) {
System.out.println("PROCESSED-MORE-THAN-GENERATED");
} else if (recordsProcessed < recordsGenerated) {
System.out.println("PROCESSED-LESS-THAN-GENERATED");
}
boolean success;
final Map<String, Set<Number>> received =
events.get("echo")
.entrySet()
.stream()
.map(entry -> mkEntry(
entry.getKey(),
entry.getValue().stream().map(ConsumerRecord::value).collect(Collectors.toSet()))
)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
success = inputs.equals(received);
if (success) {
System.out.println("ALL-RECORDS-DELIVERED");
} else {
int missedCount = 0;
for (final Map.Entry<String, Set<Integer>> entry : inputs.entrySet()) {
missedCount += received.get(entry.getKey()).size();
}
System.out.println("missedRecords=" + missedCount);
}
// give it one more try if it's not already passing.
if (!verificationResult.passed()) {
verificationResult = verifyAll(inputs, events, true);
}
success &= verificationResult.passed();
System.out.println(verificationResult.result());
System.out.println(success ? "SUCCESS" : "FAILURE");
return verificationResult;
}
public static class VerificationResult {
private final boolean passed;
private final String result;
VerificationResult(final boolean passed, final String result) {
this.passed = passed;
this.result = result;
}
public boolean passed() {
return passed;
}
public String result() {
return result;
}
}
private static VerificationResult verifyAll(final Map<String, Set<Integer>> inputs,
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
final boolean printResults) {
final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
boolean pass;
try (final PrintStream resultStream = new PrintStream(byteArrayOutputStream)) {
pass = verifyTAgg(resultStream, inputs, events.get("tagg"), printResults);
pass &= verifySuppressed(resultStream, "min-suppressed", events, printResults);
pass &= verify(resultStream, "min-suppressed", inputs, events, windowedKey -> {
final String unwindowedKey = windowedKey.substring(1, windowedKey.length() - 1).replaceAll("@.*", "");
return getMin(unwindowedKey);
}, printResults);
pass &= verifySuppressed(resultStream, "sws-suppressed", events, printResults);
pass &= verify(resultStream, "min", inputs, events, SmokeTestDriver::getMin, printResults);
pass &= verify(resultStream, "max", inputs, events, SmokeTestDriver::getMax, printResults);
pass &= verify(resultStream, "dif", inputs, events, key -> getMax(key).intValue() - getMin(key).intValue(), printResults);
pass &= verify(resultStream, "sum", inputs, events, SmokeTestDriver::getSum, printResults);
pass &= verify(resultStream, "cnt", inputs, events, key1 -> getMax(key1).intValue() - getMin(key1).intValue() + 1L, printResults);
pass &= verify(resultStream, "avg", inputs, events, SmokeTestDriver::getAvg, printResults);
}
return new VerificationResult(pass, new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8));
}
private static boolean verify(final PrintStream resultStream,
final String topic,
final Map<String, Set<Integer>> inputData,
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
final Function<String, Number> keyToExpectation,
final boolean printResults) {
final Map<String, LinkedList<ConsumerRecord<String, Number>>> observedInputEvents = events.get("data");
final Map<String, LinkedList<ConsumerRecord<String, Number>>> outputEvents = events.getOrDefault(topic, emptyMap());
if (outputEvents.isEmpty()) {
resultStream.println(topic + " is empty");
return false;
} else {
resultStream.printf("verifying %s with %d keys%n", topic, outputEvents.size());
if (outputEvents.size() != inputData.size()) {
resultStream.printf("fail: resultCount=%d expectedCount=%s%n\tresult=%s%n\texpected=%s%n",
outputEvents.size(), inputData.size(), outputEvents.keySet(), inputData.keySet());
return false;
}
for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : outputEvents.entrySet()) {
final String key = entry.getKey();
final Number expected = keyToExpectation.apply(key);
final Number actual = entry.getValue().getLast().value();
if (!expected.equals(actual)) {
resultStream.printf("%s fail: key=%s actual=%s expected=%s%n", topic, key, actual, expected);
if (printResults) {
resultStream.printf("\t inputEvents=%n%s%n\t" +
"echoEvents=%n%s%n\tmaxEvents=%n%s%n\tminEvents=%n%s%n\tdifEvents=%n%s%n\tcntEvents=%n%s%n\ttaggEvents=%n%s%n",
indent("\t\t", observedInputEvents.get(key)),
indent("\t\t", events.getOrDefault("echo", emptyMap()).getOrDefault(key, new LinkedList<>())),
indent("\t\t", events.getOrDefault("max", emptyMap()).getOrDefault(key, new LinkedList<>())),
indent("\t\t", events.getOrDefault("min", emptyMap()).getOrDefault(key, new LinkedList<>())),
indent("\t\t", events.getOrDefault("dif", emptyMap()).getOrDefault(key, new LinkedList<>())),
indent("\t\t", events.getOrDefault("cnt", emptyMap()).getOrDefault(key, new LinkedList<>())),
indent("\t\t", events.getOrDefault("tagg", emptyMap()).getOrDefault(key, new LinkedList<>())));
if (!Utils.mkSet("echo", "max", "min", "dif", "cnt", "tagg").contains(topic))
resultStream.printf("%sEvents=%n%s%n", topic, indent("\t\t", entry.getValue()));
}
return false;
}
}
return true;
}
}
private static boolean verifySuppressed(final PrintStream resultStream,
@SuppressWarnings("SameParameterValue") final String topic,
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
final boolean printResults) {
resultStream.println("verifying suppressed " + topic);
final Map<String, LinkedList<ConsumerRecord<String, Number>>> topicEvents = events.getOrDefault(topic, emptyMap());
for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : topicEvents.entrySet()) {
if (entry.getValue().size() != 1) {
final String unsuppressedTopic = topic.replace("-suppressed", "-raw");
final String key = entry.getKey();
final String unwindowedKey = key.substring(1, key.length() - 1).replaceAll("@.*", "");
resultStream.printf("fail: key=%s%n\tnon-unique result:%n%s%n",
key,
indent("\t\t", entry.getValue()));
if (printResults)
resultStream.printf("\tresultEvents:%n%s%n\tinputEvents:%n%s%n",
indent("\t\t", events.get(unsuppressedTopic).get(key)),
indent("\t\t", events.get("data").get(unwindowedKey)));
return false;
}
}
return true;
}
private static String indent(@SuppressWarnings("SameParameterValue") final String prefix,
final Iterable<ConsumerRecord<String, Number>> list) {
final StringBuilder stringBuilder = new StringBuilder();
for (final ConsumerRecord<String, Number> record : list) {
stringBuilder.append(prefix).append(record).append('\n');
}
return stringBuilder.toString();
}
private static Long getSum(final String key) {
final int min = getMin(key).intValue();
final int max = getMax(key).intValue();
return ((long) min + max) * (max - min + 1L) / 2L;
}
private static Double getAvg(final String key) {
final int min = getMin(key).intValue();
final int max = getMax(key).intValue();
return ((long) min + max) / 2.0;
}
private static boolean verifyTAgg(final PrintStream resultStream,
final Map<String, Set<Integer>> allData,
final Map<String, LinkedList<ConsumerRecord<String, Number>>> taggEvents,
final boolean printResults) {
if (taggEvents == null) {
resultStream.println("tagg is missing");
return false;
} else if (taggEvents.isEmpty()) {
resultStream.println("tagg is empty");
return false;
} else {
resultStream.println("verifying tagg");
// generate expected answer
final Map<String, Long> expected = new HashMap<>();
for (final String key : allData.keySet()) {
final int min = getMin(key).intValue();
final int max = getMax(key).intValue();
final String cnt = Long.toString(max - min + 1L);
expected.put(cnt, expected.getOrDefault(cnt, 0L) + 1);
}
// check the result
for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : taggEvents.entrySet()) {
final String key = entry.getKey();
Long expectedCount = expected.remove(key);
if (expectedCount == null) {
expectedCount = 0L;
}
if (entry.getValue().getLast().value().longValue() != expectedCount) {
resultStream.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expectedCount);
if (printResults)
resultStream.println("\t taggEvents: " + entry.getValue());
return false;
}
}
}
return true;
}
private static Number getMin(final String key) {
return Integer.parseInt(key.split("-")[0]);
}
private static Number getMax(final String key) {
return Integer.parseInt(key.split("-")[1]);
}
private static List<TopicPartition> getAllPartitions(final KafkaConsumer<?, ?> consumer, final String... topics) {
final List<TopicPartition> partitions = new ArrayList<>();
for (final String topic : topics) {
for (final PartitionInfo info : consumer.partitionsFor(topic)) {
partitions.add(new TopicPartition(info.topic(), info.partition()));
}
}
return partitions;
}
}

View File

@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import java.time.Instant;
public class SmokeTestUtil {
final static int END = Integer.MAX_VALUE;
static ProcessorSupplier<Object, Object> printProcessorSupplier(final String topic) {
return printProcessorSupplier(topic, "");
}
static ProcessorSupplier<Object, Object> printProcessorSupplier(final String topic, final String name) {
return new ProcessorSupplier<Object, Object>() {
@Override
public Processor<Object, Object> get() {
return new AbstractProcessor<Object, Object>() {
private int numRecordsProcessed = 0;
private long smallestOffset = Long.MAX_VALUE;
private long largestOffset = Long.MIN_VALUE;
@Override
public void init(final ProcessorContext context) {
super.init(context);
System.out.println("[2.8] initializing processor: topic=" + topic + " taskId=" + context.taskId());
System.out.flush();
numRecordsProcessed = 0;
smallestOffset = Long.MAX_VALUE;
largestOffset = Long.MIN_VALUE;
}
@Override
public void process(final Object key, final Object value) {
numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) {
System.out.printf("%s: %s%n", name, Instant.now());
System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
}
if (smallestOffset > context().offset()) {
smallestOffset = context().offset();
}
if (largestOffset < context().offset()) {
largestOffset = context().offset();
}
}
@Override
public void close() {
System.out.printf("Close processor for task %s%n", context().taskId());
System.out.println("processed " + numRecordsProcessed + " records");
final long processed;
if (largestOffset >= smallestOffset) {
processed = 1L + largestOffset - smallestOffset;
} else {
processed = 0L;
}
System.out.println("offset " + smallestOffset + " to " + largestOffset + " -> processed " + processed);
System.out.flush();
}
};
}
};
}
public static final class Unwindow<K, V> implements KeyValueMapper<Windowed<K>, V, K> {
@Override
public K apply(final Windowed<K> winKey, final V value) {
return winKey.key();
}
}
public static class Agg {
KeyValueMapper<String, Long, KeyValue<String, Long>> selector() {
return (key, value) -> new KeyValue<>(value == null ? null : Long.toString(value), 1L);
}
public Initializer<Long> init() {
return () -> 0L;
}
Aggregator<String, Long, Long> adder() {
return (aggKey, value, aggregate) -> aggregate + value;
}
Aggregator<String, Long, Long> remover() {
return (aggKey, value, aggregate) -> aggregate - value;
}
}
public static Serde<String> stringSerde = Serdes.String();
public static Serde<Integer> intSerde = Serdes.Integer();
static Serde<Long> longSerde = Serdes.Long();
static Serde<Double> doubleSerde = Serdes.Double();
public static void sleep(final long duration) {
try {
Thread.sleep(duration);
} catch (final Exception ignore) { }
}
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
import static org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually;
public class StreamsSmokeTest {
/**
* args ::= kafka propFileName command disableAutoTerminate
* command := "run" | "process"
*
* @param args
*/
public static void main(final String[] args) throws IOException {
if (args.length < 2) {
System.err.println("StreamsSmokeTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter");
Exit.exit(1);
}
final String propFileName = args[0];
final String command = args[1];
final boolean disableAutoTerminate = args.length > 2;
final Properties streamsProperties = Utils.loadProps(propFileName);
final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
final String processingGuarantee = streamsProperties.getProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG);
if (kafka == null) {
System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
Exit.exit(1);
}
if ("process".equals(command)) {
if (!StreamsConfig.AT_LEAST_ONCE.equals(processingGuarantee) &&
!StreamsConfig.EXACTLY_ONCE.equals(processingGuarantee)) {
System.err.println("processingGuarantee must be either " + StreamsConfig.AT_LEAST_ONCE + " or " +
StreamsConfig.EXACTLY_ONCE);
Exit.exit(1);
}
}
System.out.println("StreamsTest instance started (StreamsSmokeTest)");
System.out.println("command=" + command);
System.out.println("props=" + streamsProperties);
System.out.println("disableAutoTerminate=" + disableAutoTerminate);
switch (command) {
case "run":
// this starts the driver (data generation and result verification)
final int numKeys = 10;
final int maxRecordsPerKey = 500;
if (disableAutoTerminate) {
generatePerpetually(kafka, numKeys, maxRecordsPerKey);
} else {
// slow down data production to span 30 seconds so that system tests have time to
// do their bounces, etc.
final Map<String, Set<Integer>> allData =
generate(kafka, numKeys, maxRecordsPerKey, Duration.ofSeconds(30));
SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
}
break;
case "process":
// this starts the stream processing app
new SmokeTestClient(UUID.randomUUID().toString()).start(streamsProperties);
break;
default:
System.out.println("unknown command: " + command);
}
}
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import java.util.Properties;
public class StreamsUpgradeTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 1) {
System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
}
final String propFileName = args[0];
final Properties streamsProperties = Utils.loadProps(propFileName);
System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.8)");
System.out.println("props=" + streamsProperties);
final StreamsBuilder builder = new StreamsBuilder();
final KStream dataStream = builder.stream("data");
dataStream.process(printProcessorSupplier());
dataStream.to("echo");
final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);
final KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close();
System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
System.out.flush();
}));
}
private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
return () -> new AbstractProcessor<K, V>() {
private int numRecordsProcessed = 0;
@Override
public void init(final ProcessorContext context) {
System.out.println("[2.8] initializing processor: topic=data taskId=" + context.taskId());
numRecordsProcessed = 0;
}
@Override
public void process(final K key, final V value) {
numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) {
System.out.println("processed " + numRecordsProcessed + " records from topic=data");
}
}
@Override
public void close() {}
};
}
}

View File

@ -393,6 +393,9 @@ class StreamsEosTestBaseService(StreamsTestBaseService):
streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(), streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(),
streams_property.PROCESSING_GUARANTEE: self.PROCESSING_GUARANTEE} streams_property.PROCESSING_GUARANTEE: self.PROCESSING_GUARANTEE}
# Long.MAX_VALUE lets us do the assignment without a warmup
properties['acceptable.recovery.lag'] = "9223372036854775807"
cfg = KafkaConfig(**properties) cfg = KafkaConfig(**properties)
return cfg.render() return cfg.render()
@ -472,6 +475,9 @@ class StreamsBrokerCompatibilityService(StreamsTestBaseService):
# the old broker (< 2.4) does not support configuration replication.factor=-1 # the old broker (< 2.4) does not support configuration replication.factor=-1
"replication.factor": 1} "replication.factor": 1}
# Long.MAX_VALUE lets us do the assignment without a warmup
properties['acceptable.recovery.lag'] = "9223372036854775807"
cfg = KafkaConfig(**properties) cfg = KafkaConfig(**properties)
return cfg.render() return cfg.render()
@ -610,6 +616,9 @@ class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
if self.UPGRADE_TO == "future_version": if self.UPGRADE_TO == "future_version":
properties['test.future.metadata'] = "any_value" properties['test.future.metadata'] = "any_value"
# Long.MAX_VALUE lets us do the assignment without a warmup
properties['acceptable.recovery.lag'] = "9223372036854775807"
cfg = KafkaConfig(**properties) cfg = KafkaConfig(**properties)
return cfg.render() return cfg.render()
@ -656,6 +665,9 @@ class StreamsNamedRepartitionTopicService(StreamsTestBaseService):
properties['aggregation.topic'] = self.AGGREGATION_TOPIC properties['aggregation.topic'] = self.AGGREGATION_TOPIC
properties['add.operations'] = self.ADD_ADDITIONAL_OPS properties['add.operations'] = self.ADD_ADDITIONAL_OPS
# Long.MAX_VALUE lets us do the assignment without a warmup
properties['acceptable.recovery.lag'] = "9223372036854775807"
cfg = KafkaConfig(**properties) cfg = KafkaConfig(**properties)
return cfg.render() return cfg.render()
@ -677,8 +689,9 @@ class StaticMemberTestService(StreamsTestBaseService):
consumer_property.SESSION_TIMEOUT_MS: 60000} consumer_property.SESSION_TIMEOUT_MS: 60000}
properties['input.topic'] = self.INPUT_TOPIC properties['input.topic'] = self.INPUT_TOPIC
# TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor
properties['internal.task.assignor.class'] = "org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor" # Long.MAX_VALUE lets us do the assignment without a warmup
properties['acceptable.recovery.lag'] = "9223372036854775807"
cfg = KafkaConfig(**properties) cfg = KafkaConfig(**properties)
return cfg.render() return cfg.render()
@ -759,5 +772,8 @@ class CooperativeRebalanceUpgradeService(StreamsTestBaseService):
properties['task.delimiter'] = self.TASK_DELIMITER properties['task.delimiter'] = self.TASK_DELIMITER
properties['report.interval'] = self.REPORT_INTERVAL properties['report.interval'] = self.REPORT_INTERVAL
# Long.MAX_VALUE lets us do the assignment without a warmup
properties['acceptable.recovery.lag'] = "9223372036854775807"
cfg = KafkaConfig(**properties) cfg = KafkaConfig(**properties)
return cfg.render() return cfg.render()

View File

@ -26,7 +26,7 @@ from ducktape.tests.test import TestContext
from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.kafka import KafkaService, quorum
from ducktape.tests.test import Test from ducktape.tests.test import Test
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, V_0_11_0_0, V_0_10_1_0, KafkaVersion from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, V_0_11_0_0, V_0_10_1_0, KafkaVersion
def get_broker_features(broker_version): def get_broker_features(broker_version):
features = {} features = {}
@ -127,6 +127,7 @@ class ClientCompatibilityFeaturesTest(Test):
@parametrize(broker_version=str(LATEST_2_5)) @parametrize(broker_version=str(LATEST_2_5))
@parametrize(broker_version=str(LATEST_2_6)) @parametrize(broker_version=str(LATEST_2_6))
@parametrize(broker_version=str(LATEST_2_7)) @parametrize(broker_version=str(LATEST_2_7))
@parametrize(broker_version=str(LATEST_2_8))
def run_compatibility_test(self, broker_version, metadata_quorum=quorum.zk): def run_compatibility_test(self, broker_version, metadata_quorum=quorum.zk):
if self.zk: if self.zk:
self.zk.start() self.zk.start()

View File

@ -23,7 +23,7 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int_with_prefix from kafkatest.utils import is_int_with_prefix
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, KafkaVersion from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, KafkaVersion
class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest): class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
""" """
@ -70,6 +70,7 @@ class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
@parametrize(broker_version=str(LATEST_2_5)) @parametrize(broker_version=str(LATEST_2_5))
@parametrize(broker_version=str(LATEST_2_6)) @parametrize(broker_version=str(LATEST_2_6))
@parametrize(broker_version=str(LATEST_2_7)) @parametrize(broker_version=str(LATEST_2_7))
@parametrize(broker_version=str(LATEST_2_8))
def test_produce_consume(self, broker_version, metadata_quorum=quorum.zk): def test_produce_consume(self, broker_version, metadata_quorum=quorum.zk):
print("running producer_consumer_compat with broker_version = %s" % broker_version, flush=True) print("running producer_consumer_compat with broker_version = %s" % broker_version, flush=True)
self.kafka.set_version(KafkaVersion(broker_version)) self.kafka.set_version(KafkaVersion(broker_version))

View File

@ -21,7 +21,7 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int from kafkatest.utils import is_int
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, DEV_BRANCH, KafkaVersion from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, DEV_BRANCH, KafkaVersion
# Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and new clients (e.g., 0.9.x) # Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and new clients (e.g., 0.9.x)
class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest): class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
@ -53,6 +53,7 @@ class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
@matrix(producer_version=[str(LATEST_2_5)], consumer_version=[str(LATEST_2_5)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_2_5)], consumer_version=[str(LATEST_2_5)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_2_6)], consumer_version=[str(LATEST_2_6)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_2_6)], consumer_version=[str(LATEST_2_6)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_2_7)], consumer_version=[str(LATEST_2_7)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_2_7)], consumer_version=[str(LATEST_2_7)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_2_8)], consumer_version=[str(LATEST_2_8)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_2_1)], consumer_version=[str(LATEST_2_1)], compression_types=[["zstd"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_2_1)], consumer_version=[str(LATEST_2_1)], compression_types=[["zstd"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_2_0)], consumer_version=[str(LATEST_2_0)], compression_types=[["snappy"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_2_0)], consumer_version=[str(LATEST_2_0)], compression_types=[["snappy"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_1_1)], consumer_version=[str(LATEST_1_1)], compression_types=[["lz4"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_1_1)], consumer_version=[str(LATEST_1_1)], compression_types=[["lz4"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)

View File

@ -19,7 +19,7 @@ from ducktape.utils.util import wait_until
from kafkatest.services.kafka import config_property from kafkatest.services.kafka import config_property
from kafkatest.tests.end_to_end import EndToEndTest from kafkatest.tests.end_to_end import EndToEndTest
from kafkatest.version import LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, DEV_BRANCH, KafkaVersion from kafkatest.version import LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, DEV_BRANCH, KafkaVersion
class TestDowngrade(EndToEndTest): class TestDowngrade(EndToEndTest):
PARTITIONS = 3 PARTITIONS = 3
@ -79,6 +79,15 @@ class TestDowngrade(EndToEndTest):
timeout_sec=60, backoff_sec=1, err_msg="Replicas did not rejoin the ISR in a reasonable amount of time") timeout_sec=60, backoff_sec=1, err_msg="Replicas did not rejoin the ISR in a reasonable amount of time")
@cluster(num_nodes=7) @cluster(num_nodes=7)
@parametrize(version=str(LATEST_2_8), compression_types=["snappy"])
@parametrize(version=str(LATEST_2_8), compression_types=["zstd"], security_protocol="SASL_SSL")
@matrix(version=[str(LATEST_2_8)], compression_types=[["none"]], static_membership=[False, True])
@parametrize(version=str(LATEST_2_7), compression_types=["lz4"])
@parametrize(version=str(LATEST_2_7), compression_types=["zstd"], security_protocol="SASL_SSL")
@matrix(version=[str(LATEST_2_7)], compression_types=[["none"]], static_membership=[False, True])
@parametrize(version=str(LATEST_2_6), compression_types=["lz4"])
@parametrize(version=str(LATEST_2_6), compression_types=["zstd"], security_protocol="SASL_SSL")
@matrix(version=[str(LATEST_2_6)], compression_types=[["none"]], static_membership=[False, True])
@matrix(version=[str(LATEST_2_5)], compression_types=[["none"]], static_membership=[False, True]) @matrix(version=[str(LATEST_2_5)], compression_types=[["none"]], static_membership=[False, True])
@parametrize(version=str(LATEST_2_5), compression_types=["zstd"], security_protocol="SASL_SSL") @parametrize(version=str(LATEST_2_5), compression_types=["zstd"], security_protocol="SASL_SSL")
# static membership was introduced with a buggy verifiable console consumer which # static membership was introduced with a buggy verifiable console consumer which

View File

@ -21,9 +21,11 @@ from ducktape.utils.util import wait_until
from kafkatest.services.kafka import KafkaService from kafkatest.services.kafka import KafkaService
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.version import LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, DEV_VERSION, KafkaVersion from kafkatest.version import LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, DEV_VERSION, KafkaVersion
smoke_test_versions = [str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5)] smoke_test_versions = [str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4),
str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7),
str(LATEST_2_8)]
dev_version = [str(DEV_VERSION)] dev_version = [str(DEV_VERSION)]
class StreamsUpgradeTest(Test): class StreamsUpgradeTest(Test):

View File

@ -25,13 +25,13 @@ from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmo
from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.streams.utils import extract_generation_from_logs, extract_generation_id from kafkatest.tests.streams.utils import extract_generation_from_logs, extract_generation_id
from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \ from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \
LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, DEV_BRANCH, DEV_VERSION, KafkaVersion LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, DEV_BRANCH, DEV_VERSION, KafkaVersion
# broker 0.10.0 is not compatible with newer Kafka Streams versions # broker 0.10.0 is not compatible with newer Kafka Streams versions
# broker 0.10.1 and 0.10.2 do not support headers, as required by suppress() (since v2.2.1) # broker 0.10.1 and 0.10.2 do not support headers, as required by suppress() (since v2.2.1)
broker_upgrade_versions = [str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), \ broker_upgrade_versions = [str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), \
str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3), \ str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3), \
str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(DEV_BRANCH)] str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8), str(DEV_BRANCH)]
metadata_1_versions = [str(LATEST_0_10_0)] metadata_1_versions = [str(LATEST_0_10_0)]
metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)] metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]