From 58dd76817e3d54f1baabd51929d900eb3e462d68 Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Wed, 30 Oct 2024 12:31:47 -0400 Subject: [PATCH] KAFKA-17609:[2/4]Convert system tests to kraft part 2 (#17321) * Part 2 of 4 converting system tests to use KRaft Reviewers: Matthias Sax --- .../streams_cooperative_rebalance_upgrade_test.py | 15 ++++++--------- tests/kafkatest/tests/streams/streams_eos_test.py | 8 ++++---- .../streams_named_repartition_topic_test.py | 14 ++------------ 3 files changed, 12 insertions(+), 25 deletions(-) diff --git a/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py b/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py index b4125837d5c..992ad587923 100644 --- a/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py @@ -17,9 +17,8 @@ import time from ducktape.mark import matrix from ducktape.mark.resource import cluster from ducktape.tests.test import Test -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.verifiable_producer import VerifiableProducer -from kafkatest.services.zookeeper import ZookeeperService from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \ LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3 from kafkatest.services.streams import CooperativeRebalanceUpgradeService @@ -45,7 +44,7 @@ class StreamsCooperativeRebalanceUpgradeTest(Test): second_bounce_phase = "second_bounce_phase-" # !!CAUTION!!: THIS LIST OF VERSIONS IS FIXED, NO VERSIONS MUST BE ADDED - streams_eager_rebalance_upgrade_versions = [str(LATEST_0_10_0), str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), + streams_eager_rebalance_upgrade_versions = [str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3)] @@ -56,9 +55,9 @@ class StreamsCooperativeRebalanceUpgradeTest(Test): self.sink_topic: {'partitions': 9} } - self.zookeeper = ZookeeperService(self.test_context, num_nodes=1) self.kafka = KafkaService(self.test_context, num_nodes=3, - zk=self.zookeeper, topics=self.topics) + zk=None, topics=self.topics, + controller_num_nodes_override=1) self.producer = VerifiableProducer(self.test_context, 1, @@ -68,9 +67,8 @@ class StreamsCooperativeRebalanceUpgradeTest(Test): acks=1) @cluster(num_nodes=8) - @matrix(upgrade_from_version=streams_eager_rebalance_upgrade_versions) - def test_upgrade_to_cooperative_rebalance(self, upgrade_from_version): - self.zookeeper.start() + @matrix(upgrade_from_version=streams_eager_rebalance_upgrade_versions, metadata_quorum=[quorum.combined_kraft]) + def test_upgrade_to_cooperative_rebalance(self, upgrade_from_version, metadata_quorum): self.kafka.start() processor1 = CooperativeRebalanceUpgradeService(self.test_context, self.kafka) @@ -138,7 +136,6 @@ class StreamsCooperativeRebalanceUpgradeTest(Test): self.producer.stop() self.kafka.stop() - self.zookeeper.stop() def maybe_upgrade_rolling_bounce_and_verify(self, processors, diff --git a/tests/kafkatest/tests/streams/streams_eos_test.py b/tests/kafkatest/tests/streams/streams_eos_test.py index b144358cf2c..73e6be737d0 100644 --- a/tests/kafkatest/tests/streams/streams_eos_test.py +++ b/tests/kafkatest/tests/streams/streams_eos_test.py @@ -39,7 +39,7 @@ class StreamsEosTest(KafkaTest): self.test_context = test_context @cluster(num_nodes=9) - @matrix(metadata_quorum=[quorum.isolated_kraft]) + @matrix(metadata_quorum=[quorum.combined_kraft]) def test_rebalance_simple(self, metadata_quorum): self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka), StreamsEosTestJobRunnerService(self.test_context, self.kafka), @@ -47,7 +47,7 @@ class StreamsEosTest(KafkaTest): StreamsEosTestVerifyRunnerService(self.test_context, self.kafka)) @cluster(num_nodes=9) - @matrix(metadata_quorum=[quorum.isolated_kraft]) + @matrix(metadata_quorum=[quorum.combined_kraft]) def test_rebalance_complex(self, metadata_quorum): self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), @@ -81,7 +81,7 @@ class StreamsEosTest(KafkaTest): verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False) @cluster(num_nodes=9) - @matrix(metadata_quorum=[quorum.isolated_kraft]) + @matrix(metadata_quorum=[quorum.combined_kraft]) def test_failure_and_recovery(self, metadata_quorum): self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka), StreamsEosTestJobRunnerService(self.test_context, self.kafka), @@ -89,7 +89,7 @@ class StreamsEosTest(KafkaTest): StreamsEosTestVerifyRunnerService(self.test_context, self.kafka)) @cluster(num_nodes=9) - @matrix(metadata_quorum=[quorum.isolated_kraft]) + @matrix(metadata_quorum=[quorum.combined_kraft]) def test_failure_and_recovery_complex(self, metadata_quorum): self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), diff --git a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py index ff32cab24f3..26beaca3aea 100644 --- a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py +++ b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py @@ -19,7 +19,6 @@ from ducktape.tests.test import Test from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.streams import StreamsNamedRepartitionTopicService from kafkatest.services.verifiable_producer import VerifiableProducer -from kafkatest.services.zookeeper import ZookeeperService from kafkatest.tests.streams.utils import verify_stopped, stop_processors, verify_running class StreamsNamedRepartitionTopicTest(Test): @@ -41,13 +40,8 @@ class StreamsNamedRepartitionTopicTest(Test): self.aggregation_topic: {'partitions': 6} } - self.zookeeper = ( - ZookeeperService(self.test_context, 1) - if quorum.for_test(self.test_context) == quorum.zk - else None - ) self.kafka = KafkaService(self.test_context, num_nodes=3, - zk=self.zookeeper, topics=self.topics, controller_num_nodes_override=1) + zk=None, topics=self.topics, controller_num_nodes_override=1) self.producer = VerifiableProducer(self.test_context, 1, @@ -57,10 +51,8 @@ class StreamsNamedRepartitionTopicTest(Test): acks=1) @cluster(num_nodes=8) - @matrix(metadata_quorum=[quorum.isolated_kraft]) + @matrix(metadata_quorum=[quorum.combined_kraft]) def test_upgrade_topology_with_named_repartition_topic(self, metadata_quorum): - if self.zookeeper: - self.zookeeper.start() self.kafka.start() processor1 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka) @@ -91,8 +83,6 @@ class StreamsNamedRepartitionTopicTest(Test): self.producer.stop() self.kafka.stop() - if self.zookeeper: - self.zookeeper.stop() def verify_processing(self, processors): for processor in processors: