KAFKA-9779: Add Stream system test for 2.5 release (#8378)

Reviewer: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Boyang Chen 2020-04-15 15:59:03 -07:00 committed by GitHub
parent f7d2b1baf7
commit df41713d64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 123 additions and 13 deletions

View File

@ -1533,6 +1533,18 @@ project(':streams:upgrade-system-tests-24') {
} }
} }
project(':streams:upgrade-system-tests-25') {
archivesBaseName = "kafka-streams-upgrade-system-tests-25"
dependencies {
testCompile libs.kafkaStreams_25
}
systemTestLibs {
dependsOn testJar
}
}
project(':jmh-benchmarks') { project(':jmh-benchmarks') {
apply plugin: 'com.github.johnrengelman.shadow' apply plugin: 'com.github.johnrengelman.shadow'

View File

@ -95,6 +95,7 @@ versions += [
kafka_22: "2.2.2", kafka_22: "2.2.2",
kafka_23: "2.3.1", kafka_23: "2.3.1",
kafka_24: "2.4.1", kafka_24: "2.4.1",
kafka_25: "2.5.0",
lz4: "1.7.1", lz4: "1.7.1",
mavenArtifact: "3.6.3", mavenArtifact: "3.6.3",
metrics: "2.2.0", metrics: "2.2.0",
@ -164,6 +165,7 @@ libs += [
kafkaStreams_22: "org.apache.kafka:kafka-streams:$versions.kafka_22", kafkaStreams_22: "org.apache.kafka:kafka-streams:$versions.kafka_22",
kafkaStreams_23: "org.apache.kafka:kafka-streams:$versions.kafka_23", kafkaStreams_23: "org.apache.kafka:kafka-streams:$versions.kafka_23",
kafkaStreams_24: "org.apache.kafka:kafka-streams:$versions.kafka_24", kafkaStreams_24: "org.apache.kafka:kafka-streams:$versions.kafka_24",
kafkaStreams_25: "org.apache.kafka:kafka-streams:$versions.kafka_25",
log4j: "log4j:log4j:$versions.log4j", log4j: "log4j:log4j:$versions.log4j",
lz4: "org.lz4:lz4-java:$versions.lz4", lz4: "org.lz4:lz4-java:$versions.lz4",
metrics: "com.yammer.metrics:metrics-core:$versions.metrics", metrics: "com.yammer.metrics:metrics-core:$versions.metrics",

View File

@ -42,4 +42,5 @@ include 'clients',
'streams:upgrade-system-tests-22', 'streams:upgrade-system-tests-22',
'streams:upgrade-system-tests-23', 'streams:upgrade-system-tests-23',
'streams:upgrade-system-tests-24', 'streams:upgrade-system-tests-24',
'streams:upgrade-system-tests-25',
'tools' 'tools'

View File

@ -33,7 +33,7 @@ public class StreamsUpgradeTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
if (args.length < 1) { if (args.length < 1) {
System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
} }
final String propFileName = args[0]; final String propFileName = args[0];

View File

@ -33,7 +33,7 @@ public class StreamsUpgradeTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
if (args.length < 1) { if (args.length < 1) {
System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
} }
final String propFileName = args[0]; final String propFileName = args[0];

View File

@ -33,7 +33,7 @@ public class StreamsUpgradeTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
if (args.length < 1) { if (args.length < 1) {
System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
} }
final String propFileName = args[0]; final String propFileName = args[0];

View File

@ -33,7 +33,7 @@ public class StreamsUpgradeTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
if (args.length < 1) { if (args.length < 1) {
System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
} }
final String propFileName = args[0]; final String propFileName = args[0];

View File

@ -32,7 +32,7 @@ public class StreamsUpgradeTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
if (args.length < 1) { if (args.length < 1) {
System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
} }
final String propFileName = args[0]; final String propFileName = args[0];

View File

@ -32,7 +32,7 @@ public class StreamsUpgradeTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
if (args.length < 1) { if (args.length < 1) {
System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
} }
final String propFileName = args[0]; final String propFileName = args[0];

View File

@ -32,7 +32,7 @@ public class StreamsUpgradeTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
if (args.length < 1) { if (args.length < 1) {
System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
} }
final String propFileName = args[0]; final String propFileName = args[0];

View File

@ -32,7 +32,7 @@ public class StreamsUpgradeTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
if (args.length < 1) { if (args.length < 1) {
System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
} }
final String propFileName = args[0]; final String propFileName = args[0];

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.tests;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import java.util.Properties;
public class StreamsUpgradeTest {
@SuppressWarnings("unchecked")
public static void main(final String[] args) throws Exception {
if (args.length < 1) {
System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
}
final String propFileName = args[0];
final Properties streamsProperties = Utils.loadProps(propFileName);
System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.5)");
System.out.println("props=" + streamsProperties);
final StreamsBuilder builder = new StreamsBuilder();
final KStream dataStream = builder.stream("data");
dataStream.process(printProcessorSupplier());
dataStream.to("echo");
final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);
final KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
streams.close();
System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
System.out.flush();
}));
}
private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
return () -> new AbstractProcessor<K, V>() {
private int numRecordsProcessed = 0;
@Override
public void init(final ProcessorContext context) {
System.out.println("[2.5] initializing processor: topic=data taskId=" + context.taskId());
numRecordsProcessed = 0;
}
@Override
public void process(final K key, final V value) {
numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) {
System.out.println("processed " + numRecordsProcessed + " records from topic=data");
}
}
@Override
public void close() {}
};
}
}

View File

@ -58,6 +58,7 @@ RUN mkdir -p "/opt/kafka-2.1.1" && chmod a+rw /opt/kafka-2.1.1 && curl -s "$KAFK
RUN mkdir -p "/opt/kafka-2.2.2" && chmod a+rw /opt/kafka-2.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.2.2" RUN mkdir -p "/opt/kafka-2.2.2" && chmod a+rw /opt/kafka-2.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.2.2"
RUN mkdir -p "/opt/kafka-2.3.1" && chmod a+rw /opt/kafka-2.3.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.3.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.3.1" RUN mkdir -p "/opt/kafka-2.3.1" && chmod a+rw /opt/kafka-2.3.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.3.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.3.1"
RUN mkdir -p "/opt/kafka-2.4.1" && chmod a+rw /opt/kafka-2.4.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.4.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.4.1" RUN mkdir -p "/opt/kafka-2.4.1" && chmod a+rw /opt/kafka-2.4.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.4.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.4.1"
RUN mkdir -p "/opt/kafka-2.5.0" && chmod a+rw /opt/kafka-2.5.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.5.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.5.0"
# Streams test dependencies # Streams test dependencies
RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.0.1-test.jar" -o /opt/kafka-0.10.0.1/libs/kafka-streams-0.10.0.1-test.jar RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.0.1-test.jar" -o /opt/kafka-0.10.0.1/libs/kafka-streams-0.10.0.1-test.jar
@ -71,6 +72,7 @@ RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.1.1-test.jar" -o /opt/kafka-2.1.1/lib
RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.2.2-test.jar" -o /opt/kafka-2.2.2/libs/kafka-streams-2.2.2-test.jar RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.2.2-test.jar" -o /opt/kafka-2.2.2/libs/kafka-streams-2.2.2-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.3.1-test.jar" -o /opt/kafka-2.3.1/libs/kafka-streams-2.3.1-test.jar RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.3.1-test.jar" -o /opt/kafka-2.3.1/libs/kafka-streams-2.3.1-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.4.1-test.jar" -o /opt/kafka-2.4.1/libs/kafka-streams-2.4.1-test.jar RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.4.1-test.jar" -o /opt/kafka-2.4.1/libs/kafka-streams-2.4.1-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.5.0-test.jar" -o /opt/kafka-2.5.0/libs/kafka-streams-2.5.0-test.jar
# The version of Kibosh to use for testing. # The version of Kibosh to use for testing.
# If you update this, also update vagrant/base.sh # If you update this, also update vagrant/base.sh

View File

@ -21,7 +21,7 @@ from kafkatest.services.streams import StreamsBrokerCompatibilityService
from kafkatest.services.verifiable_consumer import VerifiableConsumer from kafkatest.services.verifiable_consumer import VerifiableConsumer
from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.version import LATEST_0_11_0, LATEST_0_10_2, LATEST_0_10_1, LATEST_0_10_0, LATEST_1_0, LATEST_1_1, \ from kafkatest.version import LATEST_0_11_0, LATEST_0_10_2, LATEST_0_10_1, LATEST_0_10_0, LATEST_1_0, LATEST_1_1, \
LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, KafkaVersion LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, KafkaVersion
class StreamsBrokerCompatibility(Test): class StreamsBrokerCompatibility(Test):
@ -85,6 +85,7 @@ class StreamsBrokerCompatibility(Test):
self.consumer.stop() self.consumer.stop()
self.kafka.stop() self.kafka.stop()
@parametrize(broker_version=str(LATEST_2_5))
@parametrize(broker_version=str(LATEST_2_4)) @parametrize(broker_version=str(LATEST_2_4))
@parametrize(broker_version=str(LATEST_2_3)) @parametrize(broker_version=str(LATEST_2_3))
@parametrize(broker_version=str(LATEST_2_2)) @parametrize(broker_version=str(LATEST_2_2))

View File

@ -20,7 +20,7 @@ from kafkatest.services.kafka import KafkaService
from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \ from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \
LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, DEV_BRANCH, DEV_VERSION, KafkaVersion LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, DEV_BRANCH, DEV_VERSION, KafkaVersion
from kafkatest.services.streams import CooperativeRebalanceUpgradeService from kafkatest.services.streams import CooperativeRebalanceUpgradeService
from kafkatest.tests.streams.utils import verify_stopped, stop_processors, verify_running from kafkatest.tests.streams.utils import verify_stopped, stop_processors, verify_running

View File

@ -25,18 +25,18 @@ from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmo
from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.streams.utils import extract_generation_from_logs from kafkatest.tests.streams.utils import extract_generation_from_logs
from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \ from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \
LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, DEV_BRANCH, DEV_VERSION, KafkaVersion LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, DEV_BRANCH, DEV_VERSION, KafkaVersion
# broker 0.10.0 is not compatible with newer Kafka Streams versions # broker 0.10.0 is not compatible with newer Kafka Streams versions
broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), \ broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), \
str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(DEV_BRANCH)] str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5), str(DEV_BRANCH)]
metadata_1_versions = [str(LATEST_0_10_0)] metadata_1_versions = [str(LATEST_0_10_0)]
metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)] metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
# once 0.10.1.2 is available backward_compatible_metadata_2_versions # once 0.10.1.2 is available backward_compatible_metadata_2_versions
# can be replaced with metadata_2_versions # can be replaced with metadata_2_versions
backward_compatible_metadata_2_versions = [str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)] backward_compatible_metadata_2_versions = [str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
metadata_3_or_higher_versions = [str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(DEV_VERSION)] metadata_3_or_higher_versions = [str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5), str(DEV_VERSION)]
""" """
After each release one should first check that the released version has been uploaded to After each release one should first check that the released version has been uploaded to

View File

@ -143,3 +143,7 @@ LATEST_2_3 = V_2_3_1
V_2_4_0 = KafkaVersion("2.4.0") V_2_4_0 = KafkaVersion("2.4.0")
V_2_4_1 = KafkaVersion("2.4.1") V_2_4_1 = KafkaVersion("2.4.1")
LATEST_2_4 = V_2_4_1 LATEST_2_4 = V_2_4_1
# 2.5.x versions
V_2_5_0 = KafkaVersion("2.5.0")
LATEST_2_5 = V_2_5_0

View File

@ -140,6 +140,8 @@ get_kafka 2.3.1 2.12
chmod a+rw /opt/kafka-2.3.1 chmod a+rw /opt/kafka-2.3.1
get_kafka 2.4.1 2.12 get_kafka 2.4.1 2.12
chmod a+rw /opt/kafka-2.4.1 chmod a+rw /opt/kafka-2.4.1
get_kafka 2.5.0 2.12
chmod a+rw /opt/kafka-2.5.0
# For EC2 nodes, we want to use /mnt, which should have the local disk. On local # For EC2 nodes, we want to use /mnt, which should have the local disk. On local
# VMs, we can just create it if it doesn't exist and use it like we'd use # VMs, we can just create it if it doesn't exist and use it like we'd use