HOTFIX: fix broken Streams upgrade system test (#13654)

Reviewers: Victoria Xia <victoria.xia@confluent.io>, John Roesler <john@confluent.io>
This commit is contained in:
Matthias J. Sax 2023-05-08 14:24:11 -07:00 committed by GitHub
parent 7634eee262
commit b40a7fc037
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 19 additions and 27 deletions

View File

@ -19,7 +19,6 @@ package org.apache.kafka.streams.tests;
import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
import java.util.Random;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
@ -71,7 +70,7 @@ public class StreamsUpgradeTest {
final Properties config = new Properties();
config.setProperty(
StreamsConfig.APPLICATION_ID_CONFIG,
"StreamsUpgradeTest-" + new Random().nextLong());
"StreamsUpgradeTest");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties);

View File

@ -19,7 +19,6 @@ package org.apache.kafka.streams.tests;
import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
import java.util.Random;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
@ -71,7 +70,7 @@ public class StreamsUpgradeTest {
final Properties config = new Properties();
config.setProperty(
StreamsConfig.APPLICATION_ID_CONFIG,
"StreamsUpgradeTest-" + new Random().nextLong());
"StreamsUpgradeTest");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties);

View File

@ -19,7 +19,6 @@ package org.apache.kafka.streams.tests;
import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
import java.util.Random;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
@ -71,7 +70,7 @@ public class StreamsUpgradeTest {
final Properties config = new Properties();
config.setProperty(
StreamsConfig.APPLICATION_ID_CONFIG,
"StreamsUpgradeTest-" + new Random().nextLong());
"StreamsUpgradeTest");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties);

View File

@ -19,7 +19,6 @@ package org.apache.kafka.streams.tests;
import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
import java.util.Random;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
@ -71,7 +70,7 @@ public class StreamsUpgradeTest {
final Properties config = new Properties();
config.setProperty(
StreamsConfig.APPLICATION_ID_CONFIG,
"StreamsUpgradeTest-" + new Random().nextLong());
"StreamsUpgradeTest");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties);

View File

@ -19,7 +19,6 @@ package org.apache.kafka.streams.tests;
import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
import java.util.Random;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
@ -71,7 +70,7 @@ public class StreamsUpgradeTest {
final Properties config = new Properties();
config.setProperty(
StreamsConfig.APPLICATION_ID_CONFIG,
"StreamsUpgradeTest-" + new Random().nextLong());
"StreamsUpgradeTest");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);

View File

@ -19,7 +19,6 @@ package org.apache.kafka.streams.tests;
import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
import java.util.Random;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
@ -73,7 +72,7 @@ public class StreamsUpgradeTest {
final Properties config = new Properties();
config.setProperty(
StreamsConfig.APPLICATION_ID_CONFIG,
"StreamsUpgradeTest-" + new Random().nextLong());
"StreamsUpgradeTest");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);

View File

@ -19,7 +19,6 @@ package org.apache.kafka.streams.tests;
import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
import java.util.Random;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
@ -73,7 +72,7 @@ public class StreamsUpgradeTest {
final Properties config = new Properties();
config.setProperty(
StreamsConfig.APPLICATION_ID_CONFIG,
"StreamsUpgradeTest-" + new Random().nextLong());
"StreamsUpgradeTest");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);

View File

@ -30,7 +30,6 @@ import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import java.util.Properties;
import java.util.Random;
import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
@ -73,7 +72,7 @@ public class StreamsUpgradeTest {
final Properties config = new Properties();
config.setProperty(
StreamsConfig.APPLICATION_ID_CONFIG,
"StreamsUpgradeTest-" + new Random().nextLong());
"StreamsUpgradeTest");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);

View File

@ -30,7 +30,6 @@ import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import java.util.Properties;
import java.util.Random;
import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
@ -73,7 +72,7 @@ public class StreamsUpgradeTest {
final Properties config = new Properties();
config.setProperty(
StreamsConfig.APPLICATION_ID_CONFIG,
"StreamsUpgradeTest-" + new Random().nextLong());
"StreamsUpgradeTest");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties);

View File

@ -38,9 +38,13 @@ broker_upgrade_versions = [str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1),
str(DEV_BRANCH)]
metadata_1_versions = [str(LATEST_0_10_0)]
metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
fk_join_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8),
str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3)]
metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1),
str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8),
str(LATEST_3_0)]
# upgrading from version (2.4...3.0) is broken and only fixed later in 3.1
# we cannot test two bounce rolling upgrade because we know it's broken
# instead we add version 2.4...3.0 to the `metadata_2_versions` upgrade list
fk_join_versions = [str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3)]
"""
After each release one should first check that the released version has been uploaded to

View File

@ -107,7 +107,8 @@ class KafkaVersion(LooseVersion):
return self >= V_2_8_0
def supports_fk_joins(self):
return hasattr(self, "version") and self >= V_2_4_0
# while we support FK joins since 2.4, rolling upgrade is broken in older versions and only fixed in 3.1
return hasattr(self, "version") and self >= V_3_1_2
def get_version(node=None):
"""Return the version attached to the given node.
@ -119,7 +120,7 @@ def get_version(node=None):
return DEV_BRANCH
DEV_BRANCH = KafkaVersion("dev")
DEV_VERSION = KafkaVersion("3.5.0-SNAPSHOT")
DEV_VERSION = KafkaVersion("3.6.0-SNAPSHOT")
LATEST_METADATA_VERSION = "3.3"
@ -249,7 +250,3 @@ LATEST_3_4 = V_3_4_0
# 3.5.x versions
V_3_5_0 = KafkaVersion("3.5.0")
LATEST_3_5 = V_3_5_0
# 3.6.x versions
V_3_6_0 = KafkaVersion("3.6.0")
LATEST_3_6 = V_3_6_0