KAFKA-15093: Add 3.4.0 and 3.5.0 to core upgrade and compatibility system tests (#13859)

Reviewers: Luke Chen <showuon@gmail.com>, Christo Lolov  <christololov@gmail.com>
This commit is contained in:
Mickael Maison 2023-07-12 10:36:57 +02:00 committed by GitHub
parent 354db26b95
commit b584e91036
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 23 additions and 5 deletions

View File

@ -28,7 +28,7 @@ from kafkatest.services.kafka import KafkaService, quorum
from ducktape.tests.test import Test from ducktape.tests.test import Test
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, \ from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, \
LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, \ LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, \
LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, V_0_11_0_0, V_0_10_1_0, KafkaVersion LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, V_0_11_0_0, V_0_10_1_0, KafkaVersion
def get_broker_features(broker_version): def get_broker_features(broker_version):
features = {} features = {}
@ -136,6 +136,8 @@ class ClientCompatibilityFeaturesTest(Test):
@parametrize(broker_version=str(LATEST_3_1)) @parametrize(broker_version=str(LATEST_3_1))
@parametrize(broker_version=str(LATEST_3_2)) @parametrize(broker_version=str(LATEST_3_2))
@parametrize(broker_version=str(LATEST_3_3)) @parametrize(broker_version=str(LATEST_3_3))
@parametrize(broker_version=str(LATEST_3_4))
@parametrize(broker_version=str(LATEST_3_5))
def run_compatibility_test(self, broker_version, metadata_quorum=quorum.zk): def run_compatibility_test(self, broker_version, metadata_quorum=quorum.zk):
if self.zk: if self.zk:
self.zk.start() self.zk.start()

View File

@ -25,7 +25,7 @@ from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int_with_prefix from kafkatest.utils import is_int_with_prefix
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, \ from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, \
LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, \ LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, \
LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, KafkaVersion LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, KafkaVersion
class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest): class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
""" """
@ -77,6 +77,8 @@ class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
@parametrize(broker_version=str(LATEST_3_1)) @parametrize(broker_version=str(LATEST_3_1))
@parametrize(broker_version=str(LATEST_3_2)) @parametrize(broker_version=str(LATEST_3_2))
@parametrize(broker_version=str(LATEST_3_3)) @parametrize(broker_version=str(LATEST_3_3))
@parametrize(broker_version=str(LATEST_3_4))
@parametrize(broker_version=str(LATEST_3_5))
def test_produce_consume(self, broker_version, metadata_quorum=quorum.zk): def test_produce_consume(self, broker_version, metadata_quorum=quorum.zk):
print("running producer_consumer_compat with broker_version = %s" % broker_version, flush=True) print("running producer_consumer_compat with broker_version = %s" % broker_version, flush=True)
self.kafka.set_version(KafkaVersion(broker_version)) self.kafka.set_version(KafkaVersion(broker_version))

View File

@ -23,7 +23,7 @@ from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int from kafkatest.utils import is_int
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, \ from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, \
LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, \ LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, \
LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, DEV_BRANCH, KafkaVersion LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, DEV_BRANCH, KafkaVersion
# Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and new clients (e.g., 0.9.x) # Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and new clients (e.g., 0.9.x)
class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest): class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
@ -60,6 +60,8 @@ class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
@matrix(producer_version=[str(LATEST_3_1)], consumer_version=[str(LATEST_3_1)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_3_1)], consumer_version=[str(LATEST_3_1)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_3_2)], consumer_version=[str(LATEST_3_2)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_3_2)], consumer_version=[str(LATEST_3_2)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_3_3)], consumer_version=[str(LATEST_3_3)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_3_3)], consumer_version=[str(LATEST_3_3)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_3_4)], consumer_version=[str(LATEST_3_4)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_3_5)], consumer_version=[str(LATEST_3_5)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_2_1)], consumer_version=[str(LATEST_2_1)], compression_types=[["zstd"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_2_1)], consumer_version=[str(LATEST_2_1)], compression_types=[["zstd"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_2_0)], consumer_version=[str(LATEST_2_0)], compression_types=[["snappy"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_2_0)], consumer_version=[str(LATEST_2_0)], compression_types=[["snappy"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_1_1)], consumer_version=[str(LATEST_1_1)], compression_types=[["lz4"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade) @matrix(producer_version=[str(LATEST_1_1)], consumer_version=[str(LATEST_1_1)], compression_types=[["lz4"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)

View File

@ -20,7 +20,7 @@ from ducktape.utils.util import wait_until
from kafkatest.services.kafka import config_property from kafkatest.services.kafka import config_property
from kafkatest.tests.end_to_end import EndToEndTest from kafkatest.tests.end_to_end import EndToEndTest
from kafkatest.version import LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, \ from kafkatest.version import LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, \
LATEST_2_6, LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, DEV_BRANCH, KafkaVersion LATEST_2_6, LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, DEV_BRANCH, KafkaVersion
class TestDowngrade(EndToEndTest): class TestDowngrade(EndToEndTest):
PARTITIONS = 3 PARTITIONS = 3
@ -80,6 +80,12 @@ class TestDowngrade(EndToEndTest):
timeout_sec=60, backoff_sec=1, err_msg="Replicas did not rejoin the ISR in a reasonable amount of time") timeout_sec=60, backoff_sec=1, err_msg="Replicas did not rejoin the ISR in a reasonable amount of time")
@cluster(num_nodes=7) @cluster(num_nodes=7)
@parametrize(version=str(LATEST_3_5), compression_types=["snappy"])
@parametrize(version=str(LATEST_3_5), compression_types=["zstd"], security_protocol="SASL_SSL")
@matrix(version=[str(LATEST_3_5)], compression_types=[["none"]], static_membership=[False, True])
@parametrize(version=str(LATEST_3_4), compression_types=["snappy"])
@parametrize(version=str(LATEST_3_4), compression_types=["zstd"], security_protocol="SASL_SSL")
@matrix(version=[str(LATEST_3_4)], compression_types=[["none"]], static_membership=[False, True])
@parametrize(version=str(LATEST_3_3), compression_types=["snappy"]) @parametrize(version=str(LATEST_3_3), compression_types=["snappy"])
@parametrize(version=str(LATEST_3_3), compression_types=["zstd"], security_protocol="SASL_SSL") @parametrize(version=str(LATEST_3_3), compression_types=["zstd"], security_protocol="SASL_SSL")
@matrix(version=[str(LATEST_3_3)], compression_types=[["none"]], static_membership=[False, True]) @matrix(version=[str(LATEST_3_3)], compression_types=[["none"]], static_membership=[False, True])

View File

@ -26,7 +26,7 @@ from kafkatest.utils import is_int
from kafkatest.utils.remote_account import java_version from kafkatest.utils.remote_account import java_version
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, \ from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, \
LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, \ LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, \
LATEST_2_6, LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, V_0_11_0_0, V_2_8_0, V_3_0_0, \ LATEST_2_6, LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, V_0_11_0_0, V_2_8_0, V_3_0_0, \
DEV_BRANCH, KafkaVersion DEV_BRANCH, KafkaVersion
from kafkatest.services.kafka.util import new_jdk_not_supported from kafkatest.services.kafka.util import new_jdk_not_supported
@ -94,6 +94,12 @@ class TestUpgrade(ProduceConsumeValidateTest):
self.wait_until_rejoin() self.wait_until_rejoin()
@cluster(num_nodes=6) @cluster(num_nodes=6)
@parametrize(from_kafka_version=str(LATEST_3_5), to_message_format_version=None, compression_types=["none"])
@parametrize(from_kafka_version=str(LATEST_3_5), to_message_format_version=None, compression_types=["lz4"])
@parametrize(from_kafka_version=str(LATEST_3_5), to_message_format_version=None, compression_types=["snappy"])
@parametrize(from_kafka_version=str(LATEST_3_4), to_message_format_version=None, compression_types=["none"])
@parametrize(from_kafka_version=str(LATEST_3_4), to_message_format_version=None, compression_types=["lz4"])
@parametrize(from_kafka_version=str(LATEST_3_4), to_message_format_version=None, compression_types=["snappy"])
@parametrize(from_kafka_version=str(LATEST_3_3), to_message_format_version=None, compression_types=["none"]) @parametrize(from_kafka_version=str(LATEST_3_3), to_message_format_version=None, compression_types=["none"])
@parametrize(from_kafka_version=str(LATEST_3_3), to_message_format_version=None, compression_types=["lz4"]) @parametrize(from_kafka_version=str(LATEST_3_3), to_message_format_version=None, compression_types=["lz4"])
@parametrize(from_kafka_version=str(LATEST_3_3), to_message_format_version=None, compression_types=["snappy"]) @parametrize(from_kafka_version=str(LATEST_3_3), to_message_format_version=None, compression_types=["snappy"])