From 6fd973b4a5987c6f829b1285d4f38b029798f186 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 10 Sep 2024 17:55:03 -0700 Subject: [PATCH] KAFKA-16331: Remove EOSv1 from Kafka Streams system tests (#17108) Reviewers: Chia-Ping Tsai , Bill Bejeck --- .../processor/internals/StreamThread.java | 4 +- .../tests/BrokerCompatibilityTest.java | 6 +-- .../kafka/streams/tests/StreamsEosTest.java | 8 +--- .../kafka/streams/tests/StreamsSmokeTest.java | 5 --- tests/kafkatest/services/streams.py | 19 ++++---- .../streams_broker_compatibility_test.py | 27 +----------- .../tests/streams/streams_eos_test.py | 44 +++++++++---------- .../tests/streams/streams_smoke_test.py | 2 +- 8 files changed, 39 insertions(+), 76 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index e4247070679..7ee4bb54edc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -742,9 +742,9 @@ public class StreamThread extends Thread implements ProcessingThread { errorMessage.startsWith("Broker unexpectedly doesn't support requireStable flag on version ")) { log.error("Shutting down because the Kafka cluster seems to be on a too old version. " + - "Setting {}=\"{}\"/\"{}\" requires broker version 2.5 or higher.", + "Setting {}=\"{}\" requires broker version 2.5 or higher.", StreamsConfig.PROCESSING_GUARANTEE_CONFIG, - StreamsConfig.EXACTLY_ONCE_V2, StreamsConfig.EXACTLY_ONCE_BETA); + StreamsConfig.EXACTLY_ONCE_V2); } failedStreamThreadSensor.record(); this.streamsUncaughtExceptionHandler.accept(new StreamsException(e), false); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java index 7f8d144be45..131d62cb5a4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java @@ -68,8 +68,8 @@ public class BrokerCompatibilityTest { streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-system-test-broker-compatibility"); streamsProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + streamsProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + streamsProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); streamsProperties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); streamsProperties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingMode); @@ -103,7 +103,7 @@ public class BrokerCompatibilityTest { System.out.println("start Kafka Streams"); streams.start(); - final boolean eosEnabled = processingMode.startsWith("exactly_once"); + final boolean eosEnabled = processingMode.equals("exactly_once_v2"); System.out.println("send data"); final Properties producerProperties = new Properties(); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java index 5ad0641b60f..62223d789f0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java @@ -29,7 +29,6 @@ public class StreamsEosTest { * args ::= kafka propFileName command * command := "run" | "process" | "verify" */ - @SuppressWarnings("deprecation") public static void main(final String[] args) throws IOException { if (args.length < 2) { System.err.println("StreamsEosTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter"); @@ -49,12 +48,9 @@ public class StreamsEosTest { } if ("process".equals(command) || "process-complex".equals(command)) { - if (!StreamsConfig.EXACTLY_ONCE.equals(processingGuarantee) && - !StreamsConfig.EXACTLY_ONCE_BETA.equals(processingGuarantee) && - !StreamsConfig.EXACTLY_ONCE_V2.equals(processingGuarantee)) { + if (!StreamsConfig.EXACTLY_ONCE_V2.equals(processingGuarantee)) { - System.err.println("processingGuarantee must be either " + StreamsConfig.EXACTLY_ONCE + " or " + - StreamsConfig.EXACTLY_ONCE_BETA + " or " + StreamsConfig.EXACTLY_ONCE_V2); + System.err.println("processingGuarantee must be " + StreamsConfig.EXACTLY_ONCE_V2); Exit.exit(1); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java index d87da749ee9..31f68e609c0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java @@ -38,7 +38,6 @@ public class StreamsSmokeTest { * * @param args */ - @SuppressWarnings("deprecation") public static void main(final String[] args) throws IOException { if (args.length < 2) { System.err.println("StreamsSmokeTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter"); @@ -60,14 +59,10 @@ public class StreamsSmokeTest { if ("process".equals(command)) { if (!StreamsConfig.AT_LEAST_ONCE.equals(processingGuarantee) && - !StreamsConfig.EXACTLY_ONCE.equals(processingGuarantee) && - !StreamsConfig.EXACTLY_ONCE_BETA.equals(processingGuarantee) && !StreamsConfig.EXACTLY_ONCE_V2.equals(processingGuarantee)) { System.err.println("processingGuarantee must be either " + StreamsConfig.AT_LEAST_ONCE + ", " + - StreamsConfig.EXACTLY_ONCE + ", or " + - StreamsConfig.EXACTLY_ONCE_BETA + ", or " + StreamsConfig.EXACTLY_ONCE_V2); Exit.exit(1); diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 78fc98534ad..696e9f58f7f 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -384,17 +384,16 @@ class StreamsEosTestBaseService(StreamsTestBaseService): clean_node_enabled = True - def __init__(self, test_context, kafka, processing_guarantee, command): + def __init__(self, test_context, kafka, command): super(StreamsEosTestBaseService, self).__init__(test_context, kafka, "org.apache.kafka.streams.tests.StreamsEosTest", command) - self.PROCESSING_GUARANTEE = processing_guarantee def prop_file(self): properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT, streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(), - streams_property.PROCESSING_GUARANTEE: self.PROCESSING_GUARANTEE, + streams_property.PROCESSING_GUARANTEE: "exactly_once_v2", "acceptable.recovery.lag": "9223372036854775807", # enable a one-shot assignment "session.timeout.ms": "10000" # set back to 10s for tests. See KIP-735 } @@ -440,24 +439,24 @@ class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService): class StreamsEosTestDriverService(StreamsEosTestBaseService): def __init__(self, test_context, kafka): - super(StreamsEosTestDriverService, self).__init__(test_context, kafka, "not-required", "run") + super(StreamsEosTestDriverService, self).__init__(test_context, kafka, "run") class StreamsEosTestJobRunnerService(StreamsEosTestBaseService): - def __init__(self, test_context, kafka, processing_guarantee): - super(StreamsEosTestJobRunnerService, self).__init__(test_context, kafka, processing_guarantee, "process") + def __init__(self, test_context, kafka): + super(StreamsEosTestJobRunnerService, self).__init__(test_context, kafka, "process") class StreamsComplexEosTestJobRunnerService(StreamsEosTestBaseService): - def __init__(self, test_context, kafka, processing_guarantee): - super(StreamsComplexEosTestJobRunnerService, self).__init__(test_context, kafka, processing_guarantee, "process-complex") + def __init__(self, test_context, kafka): + super(StreamsComplexEosTestJobRunnerService, self).__init__(test_context, kafka, "process-complex") class StreamsEosTestVerifyRunnerService(StreamsEosTestBaseService): def __init__(self, test_context, kafka): - super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, kafka, "not-required", "verify") + super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify") class StreamsComplexEosTestVerifyRunnerService(StreamsEosTestBaseService): def __init__(self, test_context, kafka): - super(StreamsComplexEosTestVerifyRunnerService, self).__init__(test_context, kafka, "not-required", "verify-complex") + super(StreamsComplexEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify-complex") class StreamsSmokeTestShutdownDeadlockService(StreamsSmokeTestBaseService): diff --git a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py index e70b9a667ec..e07f67b46d9 100644 --- a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py +++ b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py @@ -87,29 +87,6 @@ class StreamsBrokerCompatibility(Test): self.consumer.stop() self.kafka.stop() - @cluster(num_nodes=4) - @matrix(broker_version=[str(LATEST_0_11_0),str(LATEST_1_0),str(LATEST_1_1),str(LATEST_2_0), - str(LATEST_2_1),str(LATEST_2_2),str(LATEST_2_3),str(LATEST_2_4), - str(LATEST_2_5),str(LATEST_2_6),str(LATEST_2_7),str(LATEST_2_8), - str(LATEST_3_0),str(LATEST_3_1),str(LATEST_3_2),str(LATEST_3_3), - str(LATEST_3_4),str(LATEST_3_5),str(LATEST_3_6),str(LATEST_3_7), - str(LATEST_3_8)]) - def test_compatible_brokers_eos_alpha_enabled(self, broker_version): - self.kafka.set_version(KafkaVersion(broker_version)) - self.kafka.start() - - processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, "exactly_once") - processor.start() - - self.consumer.start() - - processor.wait() - - wait_until(lambda: self.consumer.total_consumed() > 0, timeout_sec=30, err_msg="Did expect to read a message but got none within 30 seconds.") - - self.consumer.stop() - self.kafka.stop() - @cluster(num_nodes=4) @matrix(broker_version=[str(LATEST_2_5),str(LATEST_2_6),str(LATEST_2_7),str(LATEST_2_8), str(LATEST_3_0),str(LATEST_3_1),str(LATEST_3_2),str(LATEST_3_3), @@ -167,9 +144,9 @@ class StreamsBrokerCompatibility(Test): with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor: with processor.node.account.monitor_log(processor.LOG_FILE) as log: processor.start() - log.wait_until('Shutting down because the Kafka cluster seems to be on a too old version. Setting processing\.guarantee="exactly_once_v2"/"exactly_once_beta" requires broker version 2\.5 or higher\.', + log.wait_until('Shutting down because the Kafka cluster seems to be on a too old version. Setting processing\.guarantee="exactly_once_v2" requires broker version 2\.5 or higher\.', timeout_sec=60, - err_msg="Never saw 'Shutting down because the Kafka cluster seems to be on a too old version. Setting `processing.guarantee=\"exactly_once_v2\"/\"exactly_once_beta\"` requires broker version 2.5 or higher.' log message " + str(processor.node.account)) + err_msg="Never saw 'Shutting down because the Kafka cluster seems to be on a too old version. Setting `processing.guarantee=\"exactly_once_v2\"` requires broker version 2.5 or higher.' log message " + str(processor.node.account)) monitor.wait_until('FATAL: An unexpected exception org.apache.kafka.common.errors.UnsupportedVersionException', timeout_sec=60, err_msg="Never saw 'FATAL: An unexpected exception org.apache.kafka.common.errors.UnsupportedVersionException' error message " + str(processor.node.account)) diff --git a/tests/kafkatest/tests/streams/streams_eos_test.py b/tests/kafkatest/tests/streams/streams_eos_test.py index c8272136958..b144358cf2c 100644 --- a/tests/kafkatest/tests/streams/streams_eos_test.py +++ b/tests/kafkatest/tests/streams/streams_eos_test.py @@ -39,21 +39,19 @@ class StreamsEosTest(KafkaTest): self.test_context = test_context @cluster(num_nodes=9) - @matrix(processing_guarantee=["exactly_once", "exactly_once_v2"], - metadata_quorum=[quorum.isolated_kraft]) - def test_rebalance_simple(self, processing_guarantee, metadata_quorum): - self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), - StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), - StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), + @matrix(metadata_quorum=[quorum.isolated_kraft]) + def test_rebalance_simple(self, metadata_quorum): + self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka), + StreamsEosTestJobRunnerService(self.test_context, self.kafka), + StreamsEosTestJobRunnerService(self.test_context, self.kafka), StreamsEosTestVerifyRunnerService(self.test_context, self.kafka)) @cluster(num_nodes=9) - @matrix(processing_guarantee=["exactly_once", "exactly_once_v2"], - metadata_quorum=[quorum.isolated_kraft]) - def test_rebalance_complex(self, processing_guarantee, metadata_quorum): - self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), - StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), - StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), + @matrix(metadata_quorum=[quorum.isolated_kraft]) + def test_rebalance_complex(self, metadata_quorum): + self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), + StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), + StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka)) def run_rebalance(self, processor1, processor2, processor3, verifier): @@ -83,21 +81,19 @@ class StreamsEosTest(KafkaTest): verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False) @cluster(num_nodes=9) - @matrix(processing_guarantee=["exactly_once", "exactly_once_v2"], - metadata_quorum=[quorum.isolated_kraft]) - def test_failure_and_recovery(self, processing_guarantee, metadata_quorum): - self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), - StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), - StreamsEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), + @matrix(metadata_quorum=[quorum.isolated_kraft]) + def test_failure_and_recovery(self, metadata_quorum): + self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka), + StreamsEosTestJobRunnerService(self.test_context, self.kafka), + StreamsEosTestJobRunnerService(self.test_context, self.kafka), StreamsEosTestVerifyRunnerService(self.test_context, self.kafka)) @cluster(num_nodes=9) - @matrix(processing_guarantee=["exactly_once", "exactly_once_v2"], - metadata_quorum=[quorum.isolated_kraft]) - def test_failure_and_recovery_complex(self, processing_guarantee, metadata_quorum): - self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), - StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), - StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, processing_guarantee), + @matrix(metadata_quorum=[quorum.isolated_kraft]) + def test_failure_and_recovery_complex(self, metadata_quorum): + self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), + StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), + StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka)) def run_failure_and_recovery(self, processor1, processor2, processor3, verifier): diff --git a/tests/kafkatest/tests/streams/streams_smoke_test.py b/tests/kafkatest/tests/streams/streams_smoke_test.py index 8a119f37d99..88846de8949 100644 --- a/tests/kafkatest/tests/streams/streams_smoke_test.py +++ b/tests/kafkatest/tests/streams/streams_smoke_test.py @@ -47,7 +47,7 @@ class StreamsSmokeTest(KafkaTest): self.driver = StreamsSmokeTestDriverService(test_context, self.kafka) @cluster(num_nodes=8) - @matrix(processing_guarantee=['exactly_once', 'exactly_once_v2', 'at_least_once'], + @matrix(processing_guarantee=['exactly_once_v2', 'at_least_once'], crash=[True, False], metadata_quorum=quorum.all_non_upgrade) def test_streams(self, processing_guarantee, crash, metadata_quorum=quorum.zk):