diff --git a/build.gradle b/build.gradle index 53b7a269834..da9edc9956d 100644 --- a/build.gradle +++ b/build.gradle @@ -1533,6 +1533,18 @@ project(':streams:upgrade-system-tests-24') { } } +project(':streams:upgrade-system-tests-25') { + archivesBaseName = "kafka-streams-upgrade-system-tests-25" + + dependencies { + testCompile libs.kafkaStreams_25 + } + + systemTestLibs { + dependsOn testJar + } +} + project(':jmh-benchmarks') { apply plugin: 'com.github.johnrengelman.shadow' diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 400b8ad06a8..307e4373b29 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -95,6 +95,7 @@ versions += [ kafka_22: "2.2.2", kafka_23: "2.3.1", kafka_24: "2.4.1", + kafka_25: "2.5.0", lz4: "1.7.1", mavenArtifact: "3.6.3", metrics: "2.2.0", @@ -164,6 +165,7 @@ libs += [ kafkaStreams_22: "org.apache.kafka:kafka-streams:$versions.kafka_22", kafkaStreams_23: "org.apache.kafka:kafka-streams:$versions.kafka_23", kafkaStreams_24: "org.apache.kafka:kafka-streams:$versions.kafka_24", + kafkaStreams_25: "org.apache.kafka:kafka-streams:$versions.kafka_25", log4j: "log4j:log4j:$versions.log4j", lz4: "org.lz4:lz4-java:$versions.lz4", metrics: "com.yammer.metrics:metrics-core:$versions.metrics", diff --git a/settings.gradle b/settings.gradle index e9f04a2aa7b..25fda5b815f 100644 --- a/settings.gradle +++ b/settings.gradle @@ -42,4 +42,5 @@ include 'clients', 'streams:upgrade-system-tests-22', 'streams:upgrade-system-tests-23', 'streams:upgrade-system-tests-24', + 'streams:upgrade-system-tests-25', 'tools' diff --git a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index d8e355b2e98..d710c054d85 100644 --- a/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-0102/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -33,7 +33,7 @@ public class StreamsUpgradeTest { @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { if (args.length < 1) { - System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); + System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); } final String propFileName = args[0]; diff --git a/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index a1187578a62..b56423334b9 100644 --- a/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-0110/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -33,7 +33,7 @@ public class StreamsUpgradeTest { @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { if (args.length < 1) { - System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); + System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); } final String propFileName = args[0]; diff --git a/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index f69162c7d80..8b2649f85e7 100644 --- a/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-10/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -33,7 +33,7 @@ public class StreamsUpgradeTest { @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { if (args.length < 1) { - System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); + System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); } final String propFileName = args[0]; diff --git a/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index ca284f22bfb..62e4f747655 100644 --- a/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-11/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -33,7 +33,7 @@ public class StreamsUpgradeTest { @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { if (args.length < 1) { - System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); + System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); } final String propFileName = args[0]; diff --git a/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index f66c7a42d71..f1980d86746 100644 --- a/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-20/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -32,7 +32,7 @@ public class StreamsUpgradeTest { @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { if (args.length < 1) { - System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); + System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); } final String propFileName = args[0]; diff --git a/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 83b68cc1460..844339db140 100644 --- a/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -32,7 +32,7 @@ public class StreamsUpgradeTest { @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { if (args.length < 1) { - System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); + System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); } final String propFileName = args[0]; diff --git a/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 6428ec6e4de..18e2c378542 100644 --- a/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -32,7 +32,7 @@ public class StreamsUpgradeTest { @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { if (args.length < 1) { - System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); + System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); } final String propFileName = args[0]; diff --git a/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 83ef0564a5b..346124a24c0 100644 --- a/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -32,7 +32,7 @@ public class StreamsUpgradeTest { @SuppressWarnings("unchecked") public static void main(final String[] args) throws Exception { if (args.length < 1) { - System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); + System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); } final String propFileName = args[0]; diff --git a/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java new file mode 100644 index 00000000000..a24a6315b77 --- /dev/null +++ b/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +import java.util.Properties; + +public class StreamsUpgradeTest { + + @SuppressWarnings("unchecked") + public static void main(final String[] args) throws Exception { + if (args.length < 1) { + System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none"); + } + final String propFileName = args[0]; + + final Properties streamsProperties = Utils.loadProps(propFileName); + + System.out.println("StreamsTest instance started (StreamsUpgradeTest v2.5)"); + System.out.println("props=" + streamsProperties); + + final StreamsBuilder builder = new StreamsBuilder(); + final KStream dataStream = builder.stream("data"); + dataStream.process(printProcessorSupplier()); + dataStream.to("echo"); + + final Properties config = new Properties(); + config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + config.putAll(streamsProperties); + + final KafkaStreams streams = new KafkaStreams(builder.build(), config); + streams.start(); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + streams.close(); + System.out.println("UPGRADE-TEST-CLIENT-CLOSED"); + System.out.flush(); + })); + } + + private static ProcessorSupplier printProcessorSupplier() { + return () -> new AbstractProcessor() { + private int numRecordsProcessed = 0; + + @Override + public void init(final ProcessorContext context) { + System.out.println("[2.5] initializing processor: topic=data taskId=" + context.taskId()); + numRecordsProcessed = 0; + } + + @Override + public void process(final K key, final V value) { + numRecordsProcessed++; + if (numRecordsProcessed % 100 == 0) { + System.out.println("processed " + numRecordsProcessed + " records from topic=data"); + } + } + + @Override + public void close() {} + }; + } +} diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index 714f4bb3359..3405b5a743d 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -58,6 +58,7 @@ RUN mkdir -p "/opt/kafka-2.1.1" && chmod a+rw /opt/kafka-2.1.1 && curl -s "$KAFK RUN mkdir -p "/opt/kafka-2.2.2" && chmod a+rw /opt/kafka-2.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.2.2" RUN mkdir -p "/opt/kafka-2.3.1" && chmod a+rw /opt/kafka-2.3.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.3.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.3.1" RUN mkdir -p "/opt/kafka-2.4.1" && chmod a+rw /opt/kafka-2.4.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.4.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.4.1" +RUN mkdir -p "/opt/kafka-2.5.0" && chmod a+rw /opt/kafka-2.5.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.5.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.5.0" # Streams test dependencies RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.0.1-test.jar" -o /opt/kafka-0.10.0.1/libs/kafka-streams-0.10.0.1-test.jar @@ -71,6 +72,7 @@ RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.1.1-test.jar" -o /opt/kafka-2.1.1/lib RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.2.2-test.jar" -o /opt/kafka-2.2.2/libs/kafka-streams-2.2.2-test.jar RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.3.1-test.jar" -o /opt/kafka-2.3.1/libs/kafka-streams-2.3.1-test.jar RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.4.1-test.jar" -o /opt/kafka-2.4.1/libs/kafka-streams-2.4.1-test.jar +RUN curl -s "$KAFKA_MIRROR/kafka-streams-2.5.0-test.jar" -o /opt/kafka-2.5.0/libs/kafka-streams-2.5.0-test.jar # The version of Kibosh to use for testing. # If you update this, also update vagrant/base.sh diff --git a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py index f6c1a9d9958..af92e6fbced 100644 --- a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py +++ b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py @@ -21,7 +21,7 @@ from kafkatest.services.streams import StreamsBrokerCompatibilityService from kafkatest.services.verifiable_consumer import VerifiableConsumer from kafkatest.services.zookeeper import ZookeeperService from kafkatest.version import LATEST_0_11_0, LATEST_0_10_2, LATEST_0_10_1, LATEST_0_10_0, LATEST_1_0, LATEST_1_1, \ - LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, KafkaVersion + LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, KafkaVersion class StreamsBrokerCompatibility(Test): @@ -85,6 +85,7 @@ class StreamsBrokerCompatibility(Test): self.consumer.stop() self.kafka.stop() + @parametrize(broker_version=str(LATEST_2_5)) @parametrize(broker_version=str(LATEST_2_4)) @parametrize(broker_version=str(LATEST_2_3)) @parametrize(broker_version=str(LATEST_2_2)) diff --git a/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py b/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py index dace3e9ac72..ae9f8c2f806 100644 --- a/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py @@ -20,7 +20,7 @@ from kafkatest.services.kafka import KafkaService from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.zookeeper import ZookeeperService from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \ - LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, DEV_BRANCH, DEV_VERSION, KafkaVersion + LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, DEV_BRANCH, DEV_VERSION, KafkaVersion from kafkatest.services.streams import CooperativeRebalanceUpgradeService from kafkatest.tests.streams.utils import verify_stopped, stop_processors, verify_running diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index f60aabb76fe..662f141ce9b 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -25,18 +25,18 @@ from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmo from kafkatest.services.zookeeper import ZookeeperService from kafkatest.tests.streams.utils import extract_generation_from_logs from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \ - LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, DEV_BRANCH, DEV_VERSION, KafkaVersion + LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, DEV_BRANCH, DEV_VERSION, KafkaVersion # broker 0.10.0 is not compatible with newer Kafka Streams versions broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), \ - str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(DEV_BRANCH)] + str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5), str(DEV_BRANCH)] metadata_1_versions = [str(LATEST_0_10_0)] metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)] # once 0.10.1.2 is available backward_compatible_metadata_2_versions # can be replaced with metadata_2_versions backward_compatible_metadata_2_versions = [str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)] -metadata_3_or_higher_versions = [str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(DEV_VERSION)] +metadata_3_or_higher_versions = [str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5), str(DEV_VERSION)] """ After each release one should first check that the released version has been uploaded to diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index bc9251c00e2..e9758d215e4 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -143,3 +143,7 @@ LATEST_2_3 = V_2_3_1 V_2_4_0 = KafkaVersion("2.4.0") V_2_4_1 = KafkaVersion("2.4.1") LATEST_2_4 = V_2_4_1 + +# 2.5.x versions +V_2_5_0 = KafkaVersion("2.5.0") +LATEST_2_5 = V_2_5_0 diff --git a/vagrant/base.sh b/vagrant/base.sh index bfdaf02165a..e68f95a4fcc 100755 --- a/vagrant/base.sh +++ b/vagrant/base.sh @@ -140,6 +140,8 @@ get_kafka 2.3.1 2.12 chmod a+rw /opt/kafka-2.3.1 get_kafka 2.4.1 2.12 chmod a+rw /opt/kafka-2.4.1 +get_kafka 2.5.0 2.12 +chmod a+rw /opt/kafka-2.5.0 # For EC2 nodes, we want to use /mnt, which should have the local disk. On local # VMs, we can just create it if it doesn't exist and use it like we'd use