From 29881782c842b3d8ecbd2215a12956f3536d4e31 Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Wed, 30 Oct 2024 16:51:06 -0400 Subject: [PATCH] KAFKA-17609 Migrate broker compatibility test from ZK to KRaft (#17603) Reviewers: Matthias J. Sax , Chia-Ping Tsai --- Vagrantfile | 4 +- .../streams_broker_compatibility_test.py | 63 ++++--------------- 2 files changed, 15 insertions(+), 52 deletions(-) diff --git a/Vagrantfile b/Vagrantfile index a053be28d01..3f64a4a9659 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -55,8 +55,8 @@ ec2_iam_instance_profile_name = nil ebs_volume_type = 'gp3' -jdk_major = '8' -jdk_full = '8u202-linux-x64' +jdk_major = '11' +jdk_full = '11.0.2-linux-x64' local_config_file = File.join(File.dirname(__FILE__), "Vagrantfile.local") if File.exists?(local_config_file) then diff --git a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py index 6ed8d7f42ce..953ce2263d4 100644 --- a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py +++ b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py @@ -18,23 +18,16 @@ from ducktape.mark import matrix from ducktape.mark.resource import cluster from ducktape.tests.test import Test from ducktape.utils.util import wait_until -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.streams import StreamsBrokerCompatibilityService from kafkatest.services.verifiable_consumer import VerifiableConsumer -from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.version import LATEST_1_0, LATEST_1_1, \ - LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \ - LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, KafkaVersion +from kafkatest.version import LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, KafkaVersion class StreamsBrokerCompatibility(Test): """ These tests validates that - - Streams works for older brokers 0.11 (or newer) - - Streams w/ EOS-alpha works for older brokers 0.11 (or newer) - Streams w/ EOS-v2 works for older brokers 2.5 (or newer) - - Streams fails fast for older brokers 0.10.0, 0.10.2, and 0.10.1 - - Streams w/ EOS-v2 fails fast for older brokers 2.4 or older """ input = "brokerCompatibilitySourceTopic" @@ -42,10 +35,9 @@ class StreamsBrokerCompatibility(Test): def __init__(self, test_context): super(StreamsBrokerCompatibility, self).__init__(test_context=test_context) - self.zk = ZookeeperService(test_context, num_nodes=1) self.kafka = KafkaService(test_context, num_nodes=1, - zk=self.zk, + zk=None, topics={ self.input: {'partitions': 1, 'replication-factor': 1}, self.output: {'partitions': 1, 'replication-factor': 1} @@ -60,17 +52,14 @@ class StreamsBrokerCompatibility(Test): self.output, "stream-broker-compatibility-verify-consumer") - def setUp(self): - self.zk.start() - @cluster(num_nodes=4) - @matrix(broker_version=[str(LATEST_1_0),str(LATEST_1_1),str(LATEST_2_0),str(LATEST_2_1), - str(LATEST_2_2),str(LATEST_2_3),str(LATEST_2_4),str(LATEST_2_5), - str(LATEST_2_6),str(LATEST_2_7),str(LATEST_2_8),str(LATEST_3_0), - str(LATEST_3_1),str(LATEST_3_2),str(LATEST_3_3),str(LATEST_3_4), - str(LATEST_3_5),str(LATEST_3_6),str(LATEST_3_7),str(LATEST_3_8)]) - def test_compatible_brokers_eos_disabled(self, broker_version): + @matrix(broker_version=[str(LATEST_3_0),str(LATEST_3_1),str(LATEST_3_2),str(LATEST_3_3), + str(LATEST_3_4),str(LATEST_3_5),str(LATEST_3_6),str(LATEST_3_7), + str(LATEST_3_8)], + metadata_quorum=[quorum.combined_kraft] + ) + def test_compatible_brokers_eos_disabled(self, broker_version, metadata_quorum): self.kafka.set_version(KafkaVersion(broker_version)) self.kafka.start() @@ -87,11 +76,11 @@ class StreamsBrokerCompatibility(Test): self.kafka.stop() @cluster(num_nodes=4) - @matrix(broker_version=[str(LATEST_2_5),str(LATEST_2_6),str(LATEST_2_7),str(LATEST_2_8), - str(LATEST_3_0),str(LATEST_3_1),str(LATEST_3_2),str(LATEST_3_3), + @matrix(broker_version=[str(LATEST_3_0),str(LATEST_3_1),str(LATEST_3_2),str(LATEST_3_3), str(LATEST_3_4),str(LATEST_3_5),str(LATEST_3_6),str(LATEST_3_7), - str(LATEST_3_8)]) - def test_compatible_brokers_eos_v2_enabled(self, broker_version): + str(LATEST_3_8)], + metadata_quorum=[quorum.combined_kraft]) + def test_compatible_brokers_eos_v2_enabled(self, broker_version, metadata_quorum): self.kafka.set_version(KafkaVersion(broker_version)) self.kafka.start() @@ -106,29 +95,3 @@ class StreamsBrokerCompatibility(Test): self.consumer.stop() self.kafka.stop() - - @cluster(num_nodes=4) - @parametrize(broker_version=str(LATEST_2_4)) - @parametrize(broker_version=str(LATEST_2_3)) - @parametrize(broker_version=str(LATEST_2_2)) - @parametrize(broker_version=str(LATEST_2_1)) - @parametrize(broker_version=str(LATEST_2_0)) - @parametrize(broker_version=str(LATEST_1_1)) - @parametrize(broker_version=str(LATEST_1_0)) - def test_fail_fast_on_incompatible_brokers_if_eos_v2_enabled(self, broker_version): - self.kafka.set_version(KafkaVersion(broker_version)) - self.kafka.start() - - processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, "exactly_once_v2") - - with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor: - with processor.node.account.monitor_log(processor.LOG_FILE) as log: - processor.start() - log.wait_until('Shutting down because the Kafka cluster seems to be on a too old version. Setting processing\.guarantee="exactly_once_v2" requires broker version 2\.5 or higher\.', - timeout_sec=60, - err_msg="Never saw 'Shutting down because the Kafka cluster seems to be on a too old version. Setting `processing.guarantee=\"exactly_once_v2\"` requires broker version 2.5 or higher.' log message " + str(processor.node.account)) - monitor.wait_until('FATAL: An unexpected exception org.apache.kafka.common.errors.UnsupportedVersionException', - timeout_sec=60, - err_msg="Never saw 'FATAL: An unexpected exception org.apache.kafka.common.errors.UnsupportedVersionException' error message " + str(processor.node.account)) - - self.kafka.stop()