KAFKA-17609 Migrate broker compatibility test from ZK to KRaft (#17603)

Reviewers: Matthias J. Sax <matthias@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Bill Bejeck 2024-10-30 16:51:06 -04:00 committed by GitHub
parent d7135b2a5b
commit 29881782c8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 15 additions and 52 deletions

4
Vagrantfile vendored
View File

@ -55,8 +55,8 @@ ec2_iam_instance_profile_name = nil
ebs_volume_type = 'gp3'
jdk_major = '8'
jdk_full = '8u202-linux-x64'
jdk_major = '11'
jdk_full = '11.0.2-linux-x64'
local_config_file = File.join(File.dirname(__FILE__), "Vagrantfile.local")
if File.exists?(local_config_file) then

View File

@ -18,23 +18,16 @@ from ducktape.mark import matrix
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from kafkatest.services.kafka import KafkaService
from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.streams import StreamsBrokerCompatibilityService
from kafkatest.services.verifiable_consumer import VerifiableConsumer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.version import LATEST_1_0, LATEST_1_1, \
LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \
LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, KafkaVersion
from kafkatest.version import LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, KafkaVersion
class StreamsBrokerCompatibility(Test):
"""
These tests validates that
- Streams works for older brokers 0.11 (or newer)
- Streams w/ EOS-alpha works for older brokers 0.11 (or newer)
- Streams w/ EOS-v2 works for older brokers 2.5 (or newer)
- Streams fails fast for older brokers 0.10.0, 0.10.2, and 0.10.1
- Streams w/ EOS-v2 fails fast for older brokers 2.4 or older
"""
input = "brokerCompatibilitySourceTopic"
@ -42,10 +35,9 @@ class StreamsBrokerCompatibility(Test):
def __init__(self, test_context):
super(StreamsBrokerCompatibility, self).__init__(test_context=test_context)
self.zk = ZookeeperService(test_context, num_nodes=1)
self.kafka = KafkaService(test_context,
num_nodes=1,
zk=self.zk,
zk=None,
topics={
self.input: {'partitions': 1, 'replication-factor': 1},
self.output: {'partitions': 1, 'replication-factor': 1}
@ -60,17 +52,14 @@ class StreamsBrokerCompatibility(Test):
self.output,
"stream-broker-compatibility-verify-consumer")
def setUp(self):
self.zk.start()
@cluster(num_nodes=4)
@matrix(broker_version=[str(LATEST_1_0),str(LATEST_1_1),str(LATEST_2_0),str(LATEST_2_1),
str(LATEST_2_2),str(LATEST_2_3),str(LATEST_2_4),str(LATEST_2_5),
str(LATEST_2_6),str(LATEST_2_7),str(LATEST_2_8),str(LATEST_3_0),
str(LATEST_3_1),str(LATEST_3_2),str(LATEST_3_3),str(LATEST_3_4),
str(LATEST_3_5),str(LATEST_3_6),str(LATEST_3_7),str(LATEST_3_8)])
def test_compatible_brokers_eos_disabled(self, broker_version):
@matrix(broker_version=[str(LATEST_3_0),str(LATEST_3_1),str(LATEST_3_2),str(LATEST_3_3),
str(LATEST_3_4),str(LATEST_3_5),str(LATEST_3_6),str(LATEST_3_7),
str(LATEST_3_8)],
metadata_quorum=[quorum.combined_kraft]
)
def test_compatible_brokers_eos_disabled(self, broker_version, metadata_quorum):
self.kafka.set_version(KafkaVersion(broker_version))
self.kafka.start()
@ -87,11 +76,11 @@ class StreamsBrokerCompatibility(Test):
self.kafka.stop()
@cluster(num_nodes=4)
@matrix(broker_version=[str(LATEST_2_5),str(LATEST_2_6),str(LATEST_2_7),str(LATEST_2_8),
str(LATEST_3_0),str(LATEST_3_1),str(LATEST_3_2),str(LATEST_3_3),
@matrix(broker_version=[str(LATEST_3_0),str(LATEST_3_1),str(LATEST_3_2),str(LATEST_3_3),
str(LATEST_3_4),str(LATEST_3_5),str(LATEST_3_6),str(LATEST_3_7),
str(LATEST_3_8)])
def test_compatible_brokers_eos_v2_enabled(self, broker_version):
str(LATEST_3_8)],
metadata_quorum=[quorum.combined_kraft])
def test_compatible_brokers_eos_v2_enabled(self, broker_version, metadata_quorum):
self.kafka.set_version(KafkaVersion(broker_version))
self.kafka.start()
@ -106,29 +95,3 @@ class StreamsBrokerCompatibility(Test):
self.consumer.stop()
self.kafka.stop()
@cluster(num_nodes=4)
@parametrize(broker_version=str(LATEST_2_4))
@parametrize(broker_version=str(LATEST_2_3))
@parametrize(broker_version=str(LATEST_2_2))
@parametrize(broker_version=str(LATEST_2_1))
@parametrize(broker_version=str(LATEST_2_0))
@parametrize(broker_version=str(LATEST_1_1))
@parametrize(broker_version=str(LATEST_1_0))
def test_fail_fast_on_incompatible_brokers_if_eos_v2_enabled(self, broker_version):
self.kafka.set_version(KafkaVersion(broker_version))
self.kafka.start()
processor = StreamsBrokerCompatibilityService(self.test_context, self.kafka, "exactly_once_v2")
with processor.node.account.monitor_log(processor.STDERR_FILE) as monitor:
with processor.node.account.monitor_log(processor.LOG_FILE) as log:
processor.start()
log.wait_until('Shutting down because the Kafka cluster seems to be on a too old version. Setting processing\.guarantee="exactly_once_v2" requires broker version 2\.5 or higher\.',
timeout_sec=60,
err_msg="Never saw 'Shutting down because the Kafka cluster seems to be on a too old version. Setting `processing.guarantee=\"exactly_once_v2\"` requires broker version 2.5 or higher.' log message " + str(processor.node.account))
monitor.wait_until('FATAL: An unexpected exception org.apache.kafka.common.errors.UnsupportedVersionException',
timeout_sec=60,
err_msg="Never saw 'FATAL: An unexpected exception org.apache.kafka.common.errors.UnsupportedVersionException' error message " + str(processor.node.account))
self.kafka.stop()