From b40a7fc037bb1543c3355fad9c71570f770f5177 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Mon, 8 May 2023 14:24:11 -0700 Subject: [PATCH] HOTFIX: fix broken Streams upgrade system test (#13654) Reviewers: Victoria Xia , John Roesler --- .../apache/kafka/streams/tests/StreamsUpgradeTest.java | 3 +-- .../apache/kafka/streams/tests/StreamsUpgradeTest.java | 3 +-- .../apache/kafka/streams/tests/StreamsUpgradeTest.java | 3 +-- .../apache/kafka/streams/tests/StreamsUpgradeTest.java | 3 +-- .../apache/kafka/streams/tests/StreamsUpgradeTest.java | 3 +-- .../apache/kafka/streams/tests/StreamsUpgradeTest.java | 3 +-- .../apache/kafka/streams/tests/StreamsUpgradeTest.java | 3 +-- .../apache/kafka/streams/tests/StreamsUpgradeTest.java | 3 +-- .../apache/kafka/streams/tests/StreamsUpgradeTest.java | 3 +-- tests/kafkatest/tests/streams/streams_upgrade_test.py | 10 +++++++--- tests/kafkatest/version.py | 9 +++------ 11 files changed, 19 insertions(+), 27 deletions(-) diff --git a/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 9d08663d9b3..06a36326903 100644 --- a/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -19,7 +19,6 @@ package org.apache.kafka.streams.tests; import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde; import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde; -import java.util.Random; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; @@ -71,7 +70,7 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty( StreamsConfig.APPLICATION_ID_CONFIG, - "StreamsUpgradeTest-" + new Random().nextLong()); + "StreamsUpgradeTest"); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); diff --git a/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 69c46de37af..efa32fc8be2 100644 --- a/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -19,7 +19,6 @@ package org.apache.kafka.streams.tests; import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde; import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde; -import java.util.Random; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; @@ -71,7 +70,7 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty( StreamsConfig.APPLICATION_ID_CONFIG, - "StreamsUpgradeTest-" + new Random().nextLong()); + "StreamsUpgradeTest"); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); diff --git a/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 0844552134a..77a7cbbc3c0 100644 --- a/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -19,7 +19,6 @@ package org.apache.kafka.streams.tests; import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde; import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde; -import java.util.Random; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; @@ -71,7 +70,7 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty( StreamsConfig.APPLICATION_ID_CONFIG, - "StreamsUpgradeTest-" + new Random().nextLong()); + "StreamsUpgradeTest"); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); diff --git a/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 32d8d9408f5..bf2e979815b 100644 --- a/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -19,7 +19,6 @@ package org.apache.kafka.streams.tests; import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde; import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde; -import java.util.Random; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; @@ -71,7 +70,7 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty( StreamsConfig.APPLICATION_ID_CONFIG, - "StreamsUpgradeTest-" + new Random().nextLong()); + "StreamsUpgradeTest"); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); diff --git a/streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index db17d73bcba..79235915132 100644 --- a/streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -19,7 +19,6 @@ package org.apache.kafka.streams.tests; import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde; import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde; -import java.util.Random; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; @@ -71,7 +70,7 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty( StreamsConfig.APPLICATION_ID_CONFIG, - "StreamsUpgradeTest-" + new Random().nextLong()); + "StreamsUpgradeTest"); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); diff --git a/streams/upgrade-system-tests-30/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-30/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 0751516d76c..4951380d979 100644 --- a/streams/upgrade-system-tests-30/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-30/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -19,7 +19,6 @@ package org.apache.kafka.streams.tests; import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde; import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde; -import java.util.Random; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; @@ -73,7 +72,7 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty( StreamsConfig.APPLICATION_ID_CONFIG, - "StreamsUpgradeTest-" + new Random().nextLong()); + "StreamsUpgradeTest"); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); diff --git a/streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 311d30ba400..a622a33b8c9 100644 --- a/streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -19,7 +19,6 @@ package org.apache.kafka.streams.tests; import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde; import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde; -import java.util.Random; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; @@ -73,7 +72,7 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty( StreamsConfig.APPLICATION_ID_CONFIG, - "StreamsUpgradeTest-" + new Random().nextLong()); + "StreamsUpgradeTest"); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); diff --git a/streams/upgrade-system-tests-32/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-32/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 7419896a0bf..c5a27974704 100644 --- a/streams/upgrade-system-tests-32/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-32/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -30,7 +30,6 @@ import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.Record; import java.util.Properties; -import java.util.Random; import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde; import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde; @@ -73,7 +72,7 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty( StreamsConfig.APPLICATION_ID_CONFIG, - "StreamsUpgradeTest-" + new Random().nextLong()); + "StreamsUpgradeTest"); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); diff --git a/streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 60b8305bc35..4beab16fb38 100644 --- a/streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -30,7 +30,6 @@ import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.Record; import java.util.Properties; -import java.util.Random; import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde; import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde; @@ -73,7 +72,7 @@ public class StreamsUpgradeTest { final Properties config = new Properties(); config.setProperty( StreamsConfig.APPLICATION_ID_CONFIG, - "StreamsUpgradeTest-" + new Random().nextLong()); + "StreamsUpgradeTest"); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index cf276364ea4..9039d9d7473 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -38,9 +38,13 @@ broker_upgrade_versions = [str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), str(DEV_BRANCH)] metadata_1_versions = [str(LATEST_0_10_0)] -metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)] -fk_join_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8), - str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3)] +metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), + str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8), + str(LATEST_3_0)] +# upgrading from version (2.4...3.0) is broken and only fixed later in 3.1 +# we cannot test two bounce rolling upgrade because we know it's broken +# instead we add version 2.4...3.0 to the `metadata_2_versions` upgrade list +fk_join_versions = [str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3)] """ After each release one should first check that the released version has been uploaded to diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index c31adc256ba..967674b551c 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -107,7 +107,8 @@ class KafkaVersion(LooseVersion): return self >= V_2_8_0 def supports_fk_joins(self): - return hasattr(self, "version") and self >= V_2_4_0 + # while we support FK joins since 2.4, rolling upgrade is broken in older versions and only fixed in 3.1 + return hasattr(self, "version") and self >= V_3_1_2 def get_version(node=None): """Return the version attached to the given node. @@ -119,7 +120,7 @@ def get_version(node=None): return DEV_BRANCH DEV_BRANCH = KafkaVersion("dev") -DEV_VERSION = KafkaVersion("3.5.0-SNAPSHOT") +DEV_VERSION = KafkaVersion("3.6.0-SNAPSHOT") LATEST_METADATA_VERSION = "3.3" @@ -249,7 +250,3 @@ LATEST_3_4 = V_3_4_0 # 3.5.x versions V_3_5_0 = KafkaVersion("3.5.0") LATEST_3_5 = V_3_5_0 - -# 3.6.x versions -V_3_6_0 = KafkaVersion("3.6.0") -LATEST_3_6 = V_3_6_0