diff --git a/tests/kafkatest/tests/streams/streams_smoke_test.py b/tests/kafkatest/tests/streams/streams_smoke_test.py index 88846de8949..51f5420f62e 100644 --- a/tests/kafkatest/tests/streams/streams_smoke_test.py +++ b/tests/kafkatest/tests/streams/streams_smoke_test.py @@ -49,8 +49,8 @@ class StreamsSmokeTest(KafkaTest): @cluster(num_nodes=8) @matrix(processing_guarantee=['exactly_once_v2', 'at_least_once'], crash=[True, False], - metadata_quorum=quorum.all_non_upgrade) - def test_streams(self, processing_guarantee, crash, metadata_quorum=quorum.zk): + metadata_quorum=[quorum.combined_kraft]) + def test_streams(self, processing_guarantee, crash, metadata_quorum): processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee) processor2 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee) processor3 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee) diff --git a/tests/kafkatest/tests/streams/streams_static_membership_test.py b/tests/kafkatest/tests/streams/streams_static_membership_test.py index 2249f6cc345..3a2f41d3c34 100644 --- a/tests/kafkatest/tests/streams/streams_static_membership_test.py +++ b/tests/kafkatest/tests/streams/streams_static_membership_test.py @@ -19,7 +19,6 @@ from ducktape.tests.test import Test from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.streams import StaticMemberTestService from kafkatest.services.verifiable_producer import VerifiableProducer -from kafkatest.services.zookeeper import ZookeeperService from kafkatest.tests.streams.utils import verify_stopped, stop_processors, verify_running, extract_generation_from_logs, extract_generation_id class StreamsStaticMembershipTest(Test): @@ -39,13 +38,8 @@ class StreamsStaticMembershipTest(Test): self.input_topic: {'partitions': 18}, } - self.zookeeper = ( - ZookeeperService(self.test_context, 1) - if quorum.for_test(self.test_context) == quorum.zk - else None - ) self.kafka = KafkaService(self.test_context, num_nodes=3, - zk=self.zookeeper, topics=self.topics, controller_num_nodes_override=1) + zk=None, topics=self.topics, controller_num_nodes_override=1) self.producer = VerifiableProducer(self.test_context, 1, @@ -57,8 +51,6 @@ class StreamsStaticMembershipTest(Test): @cluster(num_nodes=8) @matrix(metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[True, False]) def test_rolling_bounces_will_not_trigger_rebalance_under_static_membership(self, metadata_quorum, use_new_coordinator=False): - if self.zookeeper: - self.zookeeper.start() self.kafka.start() numThreads = 3 @@ -104,8 +96,6 @@ class StreamsStaticMembershipTest(Test): self.producer.stop() self.kafka.stop(timeout_sec=120) - if self.zookeeper: - self.zookeeper.stop() def verify_processing(self, processors): for processor in processors: diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index 7b0ff2f3413..2b37e0c2a4f 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -18,28 +18,22 @@ import time from ducktape.mark import matrix, ignore from ducktape.mark.resource import cluster from ducktape.tests.test import Test -from ducktape.utils.util import wait_until -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService, \ StreamsUpgradeTestJobRunnerService -from kafkatest.services.zookeeper import ZookeeperService from kafkatest.tests.streams.utils import extract_generation_from_logs, extract_generation_id -from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \ +from kafkatest.version import LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \ LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \ LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, DEV_BRANCH, DEV_VERSION, \ KafkaVersion # broker 0.10.0 is not compatible with newer Kafka Streams versions # broker 0.10.1 and 0.10.2 do not support headers, as required by suppress() (since v2.2.1) -broker_upgrade_versions = [str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), - str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3), - str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), - str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), +broker_upgrade_versions = [str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(LATEST_3_6), str(LATEST_3_7), str(LATEST_3_8), str(DEV_BRANCH)] -metadata_1_versions = [str(LATEST_0_10_0)] -metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), +metadata_2_versions = [str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3)] # upgrading from version (2.4...3.3) is broken and only fixed later in 3.3.3 (unreleased) and 3.4.0 @@ -111,102 +105,10 @@ class StreamsUpgradeTest(Test): node.version = KafkaVersion(to_version) self.kafka.start_node(node) - @ignore @cluster(num_nodes=6) - @matrix(from_version=broker_upgrade_versions, to_version=broker_upgrade_versions) - def test_upgrade_downgrade_brokers(self, from_version, to_version): - """ - Start a smoke test client then perform rolling upgrades on the broker. - """ - - if from_version == to_version: - return - - self.replication = 3 - self.num_kafka_nodes = 3 - self.partitions = 1 - self.isr = 2 - self.topics = { - 'echo' : { 'partitions': self.partitions, 'replication-factor': self.replication, - 'configs': {"min.insync.replicas": self.isr}}, - 'data' : { 'partitions': self.partitions, 'replication-factor': self.replication, - 'configs': {"min.insync.replicas": self.isr} }, - 'min' : { 'partitions': self.partitions, 'replication-factor': self.replication, - 'configs': {"min.insync.replicas": self.isr} }, - 'max' : { 'partitions': self.partitions, 'replication-factor': self.replication, - 'configs': {"min.insync.replicas": self.isr} }, - 'sum' : { 'partitions': self.partitions, 'replication-factor': self.replication, - 'configs': {"min.insync.replicas": self.isr} }, - 'dif' : { 'partitions': self.partitions, 'replication-factor': self.replication, - 'configs': {"min.insync.replicas": self.isr} }, - 'cnt' : { 'partitions': self.partitions, 'replication-factor': self.replication, - 'configs': {"min.insync.replicas": self.isr} }, - 'avg' : { 'partitions': self.partitions, 'replication-factor': self.replication, - 'configs': {"min.insync.replicas": self.isr} }, - 'wcnt' : { 'partitions': self.partitions, 'replication-factor': self.replication, - 'configs': {"min.insync.replicas": self.isr} }, - 'tagg' : { 'partitions': self.partitions, 'replication-factor': self.replication, - 'configs': {"min.insync.replicas": self.isr} } - } - - # Setup phase - self.zk = ZookeeperService(self.test_context, num_nodes=1) - self.zk.start() - - # number of nodes needs to be >= 3 for the smoke test - self.kafka = KafkaService(self.test_context, num_nodes=self.num_kafka_nodes, - zk=self.zk, version=KafkaVersion(from_version), topics=self.topics) - self.kafka.start() - - # allow some time for topics to be created - wait_until(lambda: self.confirm_topics_on_all_brokers(set(self.topics.keys())), - timeout_sec=60, - err_msg="Broker did not create all topics in 60 seconds ") - - self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka) - - processor = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, "at_least_once") - - with self.driver.node.account.monitor_log(self.driver.STDOUT_FILE) as driver_monitor: - self.driver.start() - - with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor: - processor.start() - monitor.wait_until(self.processed_data_msg, - timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_data_msg + str(processor.node)) - - connected_message = "Discovered group coordinator" - with processor.node.account.monitor_log(processor.LOG_FILE) as log_monitor: - with processor.node.account.monitor_log(processor.STDOUT_FILE) as stdout_monitor: - self.perform_broker_upgrade(to_version) - - log_monitor.wait_until(connected_message, - timeout_sec=120, - err_msg=("Never saw output '%s' on " % connected_message) + str(processor.node.account)) - - stdout_monitor.wait_until(self.processed_data_msg, - timeout_sec=60, - err_msg="Never saw output '%s' on" % self.processed_data_msg + str(processor.node.account)) - - # SmokeTestDriver allows up to 6 minutes to consume all - # records for the verification step so this timeout is set to - # 6 minutes (360 seconds) for consuming of verification records - # and a very conservative additional 2 minutes (120 seconds) to process - # the records in the verification step - driver_monitor.wait_until('ALL-RECORDS-DELIVERED\|PROCESSED-MORE-THAN-GENERATED', - timeout_sec=480, - err_msg="Never saw output '%s' on" % 'ALL-RECORDS-DELIVERED|PROCESSED-MORE-THAN-GENERATED' + str(self.driver.node.account)) - - self.driver.stop() - processor.stop() - processor.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False) - - @cluster(num_nodes=6) - @matrix(from_version=metadata_1_versions) - @matrix(from_version=metadata_2_versions) - @matrix(from_version=fk_join_versions) - def test_rolling_upgrade_with_2_bounces(self, from_version): + @matrix(from_version=metadata_2_versions, metadata_quorum=[quorum.combined_kraft]) + @matrix(from_version=fk_join_versions, metadata_quorum=[quorum.combined_kraft]) + def test_rolling_upgrade_with_2_bounces(self, from_version, metadata_quorum): """ This test verifies that the cluster successfully upgrades despite changes in the metadata and FK join protocols. @@ -245,7 +147,8 @@ class StreamsUpgradeTest(Test): self.stop_and_await() @cluster(num_nodes=6) - def test_version_probing_upgrade(self): + @matrix(metadata_quorum=[quorum.combined_kraft]) + def test_version_probing_upgrade(self, metadata_quorum): """ Starts 3 KafkaStreams instances, and upgrades one-by-one to "future version" """ @@ -272,8 +175,8 @@ class StreamsUpgradeTest(Test): self.stop_and_await() @cluster(num_nodes=6) - @matrix(from_version=[str(LATEST_3_2), str(DEV_VERSION)], upgrade=[True, False]) - def test_upgrade_downgrade_state_updater(self, from_version, upgrade): + @matrix(from_version=[str(LATEST_3_2), str(DEV_VERSION)], upgrade=[True, False], metadata_quorum=[quorum.combined_kraft]) + def test_upgrade_downgrade_state_updater(self, from_version, upgrade, metadata_quorum): """ Starts 3 KafkaStreams instances, and enables / disables state restoration for the instances in a rolling bounce. @@ -312,10 +215,7 @@ class StreamsUpgradeTest(Test): self.stop_and_await() def set_up_services(self): - self.zk = ZookeeperService(self.test_context, num_nodes=1) - self.zk.start() - - self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics=self.topics) + self.kafka = KafkaService(self.test_context, num_nodes=1, zk=None, topics=self.topics) self.kafka.start() self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)