mirror of https://github.com/apache/kafka.git
MINOR: Add 3.0 and 3.1 to broker and client compatibility tests (#11701)
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
This commit is contained in:
parent
265d3199ec
commit
110fae2f59
|
@ -26,7 +26,7 @@ from ducktape.tests.test import TestContext
|
||||||
from kafkatest.services.zookeeper import ZookeeperService
|
from kafkatest.services.zookeeper import ZookeeperService
|
||||||
from kafkatest.services.kafka import KafkaService, quorum
|
from kafkatest.services.kafka import KafkaService, quorum
|
||||||
from ducktape.tests.test import Test
|
from ducktape.tests.test import Test
|
||||||
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, V_0_11_0_0, V_0_10_1_0, KafkaVersion
|
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, V_0_11_0_0, V_0_10_1_0, KafkaVersion
|
||||||
|
|
||||||
def get_broker_features(broker_version):
|
def get_broker_features(broker_version):
|
||||||
features = {}
|
features = {}
|
||||||
|
@ -128,6 +128,8 @@ class ClientCompatibilityFeaturesTest(Test):
|
||||||
@parametrize(broker_version=str(LATEST_2_6))
|
@parametrize(broker_version=str(LATEST_2_6))
|
||||||
@parametrize(broker_version=str(LATEST_2_7))
|
@parametrize(broker_version=str(LATEST_2_7))
|
||||||
@parametrize(broker_version=str(LATEST_2_8))
|
@parametrize(broker_version=str(LATEST_2_8))
|
||||||
|
@parametrize(broker_version=str(LATEST_3_0))
|
||||||
|
@parametrize(broker_version=str(LATEST_3_1))
|
||||||
def run_compatibility_test(self, broker_version, metadata_quorum=quorum.zk):
|
def run_compatibility_test(self, broker_version, metadata_quorum=quorum.zk):
|
||||||
if self.zk:
|
if self.zk:
|
||||||
self.zk.start()
|
self.zk.start()
|
||||||
|
|
|
@ -23,7 +23,7 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
|
||||||
from kafkatest.services.console_consumer import ConsoleConsumer
|
from kafkatest.services.console_consumer import ConsoleConsumer
|
||||||
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
||||||
from kafkatest.utils import is_int_with_prefix
|
from kafkatest.utils import is_int_with_prefix
|
||||||
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, KafkaVersion
|
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, KafkaVersion
|
||||||
|
|
||||||
class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
|
class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
|
||||||
"""
|
"""
|
||||||
|
@ -71,6 +71,8 @@ class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
|
||||||
@parametrize(broker_version=str(LATEST_2_6))
|
@parametrize(broker_version=str(LATEST_2_6))
|
||||||
@parametrize(broker_version=str(LATEST_2_7))
|
@parametrize(broker_version=str(LATEST_2_7))
|
||||||
@parametrize(broker_version=str(LATEST_2_8))
|
@parametrize(broker_version=str(LATEST_2_8))
|
||||||
|
@parametrize(broker_version=str(LATEST_3_0))
|
||||||
|
@parametrize(broker_version=str(LATEST_3_1))
|
||||||
def test_produce_consume(self, broker_version, metadata_quorum=quorum.zk):
|
def test_produce_consume(self, broker_version, metadata_quorum=quorum.zk):
|
||||||
print("running producer_consumer_compat with broker_version = %s" % broker_version, flush=True)
|
print("running producer_consumer_compat with broker_version = %s" % broker_version, flush=True)
|
||||||
self.kafka.set_version(KafkaVersion(broker_version))
|
self.kafka.set_version(KafkaVersion(broker_version))
|
||||||
|
|
|
@ -21,7 +21,7 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
|
||||||
from kafkatest.services.zookeeper import ZookeeperService
|
from kafkatest.services.zookeeper import ZookeeperService
|
||||||
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
||||||
from kafkatest.utils import is_int
|
from kafkatest.utils import is_int
|
||||||
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, DEV_BRANCH, KafkaVersion
|
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, DEV_BRANCH, KafkaVersion
|
||||||
|
|
||||||
# Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and new clients (e.g., 0.9.x)
|
# Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and new clients (e.g., 0.9.x)
|
||||||
class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
|
class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
|
||||||
|
@ -54,6 +54,8 @@ class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
|
||||||
@matrix(producer_version=[str(LATEST_2_6)], consumer_version=[str(LATEST_2_6)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
|
@matrix(producer_version=[str(LATEST_2_6)], consumer_version=[str(LATEST_2_6)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
|
||||||
@matrix(producer_version=[str(LATEST_2_7)], consumer_version=[str(LATEST_2_7)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
|
@matrix(producer_version=[str(LATEST_2_7)], consumer_version=[str(LATEST_2_7)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
|
||||||
@matrix(producer_version=[str(LATEST_2_8)], consumer_version=[str(LATEST_2_8)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
|
@matrix(producer_version=[str(LATEST_2_8)], consumer_version=[str(LATEST_2_8)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
|
||||||
|
@matrix(producer_version=[str(LATEST_3_0)], consumer_version=[str(LATEST_3_0)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
|
||||||
|
@matrix(producer_version=[str(LATEST_3_1)], consumer_version=[str(LATEST_3_1)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
|
||||||
@matrix(producer_version=[str(LATEST_2_1)], consumer_version=[str(LATEST_2_1)], compression_types=[["zstd"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
|
@matrix(producer_version=[str(LATEST_2_1)], consumer_version=[str(LATEST_2_1)], compression_types=[["zstd"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
|
||||||
@matrix(producer_version=[str(LATEST_2_0)], consumer_version=[str(LATEST_2_0)], compression_types=[["snappy"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
|
@matrix(producer_version=[str(LATEST_2_0)], consumer_version=[str(LATEST_2_0)], compression_types=[["snappy"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
|
||||||
@matrix(producer_version=[str(LATEST_1_1)], consumer_version=[str(LATEST_1_1)], compression_types=[["lz4"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
|
@matrix(producer_version=[str(LATEST_1_1)], consumer_version=[str(LATEST_1_1)], compression_types=[["lz4"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
|
||||||
|
|
|
@ -19,7 +19,7 @@ from ducktape.utils.util import wait_until
|
||||||
|
|
||||||
from kafkatest.services.kafka import config_property
|
from kafkatest.services.kafka import config_property
|
||||||
from kafkatest.tests.end_to_end import EndToEndTest
|
from kafkatest.tests.end_to_end import EndToEndTest
|
||||||
from kafkatest.version import LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, DEV_BRANCH, KafkaVersion
|
from kafkatest.version import LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, DEV_BRANCH, KafkaVersion
|
||||||
|
|
||||||
class TestDowngrade(EndToEndTest):
|
class TestDowngrade(EndToEndTest):
|
||||||
PARTITIONS = 3
|
PARTITIONS = 3
|
||||||
|
@ -79,6 +79,12 @@ class TestDowngrade(EndToEndTest):
|
||||||
timeout_sec=60, backoff_sec=1, err_msg="Replicas did not rejoin the ISR in a reasonable amount of time")
|
timeout_sec=60, backoff_sec=1, err_msg="Replicas did not rejoin the ISR in a reasonable amount of time")
|
||||||
|
|
||||||
@cluster(num_nodes=7)
|
@cluster(num_nodes=7)
|
||||||
|
@parametrize(version=str(LATEST_3_1), compression_types=["snappy"])
|
||||||
|
@parametrize(version=str(LATEST_3_1), compression_types=["zstd"], security_protocol="SASL_SSL")
|
||||||
|
@matrix(version=[str(LATEST_3_1)], compression_types=[["none"]], static_membership=[False, True])
|
||||||
|
@parametrize(version=str(LATEST_3_0), compression_types=["snappy"])
|
||||||
|
@parametrize(version=str(LATEST_3_0), compression_types=["zstd"], security_protocol="SASL_SSL")
|
||||||
|
@matrix(version=[str(LATEST_3_0)], compression_types=[["none"]], static_membership=[False, True])
|
||||||
@parametrize(version=str(LATEST_2_8), compression_types=["snappy"])
|
@parametrize(version=str(LATEST_2_8), compression_types=["snappy"])
|
||||||
@parametrize(version=str(LATEST_2_8), compression_types=["zstd"], security_protocol="SASL_SSL")
|
@parametrize(version=str(LATEST_2_8), compression_types=["zstd"], security_protocol="SASL_SSL")
|
||||||
@matrix(version=[str(LATEST_2_8)], compression_types=[["none"]], static_membership=[False, True])
|
@matrix(version=[str(LATEST_2_8)], compression_types=[["none"]], static_membership=[False, True])
|
||||||
|
|
|
@ -24,7 +24,7 @@ from kafkatest.services.zookeeper import ZookeeperService
|
||||||
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
||||||
from kafkatest.utils import is_int
|
from kafkatest.utils import is_int
|
||||||
from kafkatest.utils.remote_account import java_version
|
from kafkatest.utils.remote_account import java_version
|
||||||
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, V_0_11_0_0, V_2_8_0, V_3_0_0, DEV_BRANCH, KafkaVersion
|
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, V_0_11_0_0, V_2_8_0, V_3_0_0, DEV_BRANCH, KafkaVersion
|
||||||
from kafkatest.services.kafka.util import new_jdk_not_supported
|
from kafkatest.services.kafka.util import new_jdk_not_supported
|
||||||
|
|
||||||
class TestUpgrade(ProduceConsumeValidateTest):
|
class TestUpgrade(ProduceConsumeValidateTest):
|
||||||
|
@ -91,6 +91,12 @@ class TestUpgrade(ProduceConsumeValidateTest):
|
||||||
self.wait_until_rejoin()
|
self.wait_until_rejoin()
|
||||||
|
|
||||||
@cluster(num_nodes=6)
|
@cluster(num_nodes=6)
|
||||||
|
@parametrize(from_kafka_version=str(LATEST_3_1), to_message_format_version=None, compression_types=["none"])
|
||||||
|
@parametrize(from_kafka_version=str(LATEST_3_1), to_message_format_version=None, compression_types=["lz4"])
|
||||||
|
@parametrize(from_kafka_version=str(LATEST_3_1), to_message_format_version=None, compression_types=["snappy"])
|
||||||
|
@parametrize(from_kafka_version=str(LATEST_3_0), to_message_format_version=None, compression_types=["none"])
|
||||||
|
@parametrize(from_kafka_version=str(LATEST_3_0), to_message_format_version=None, compression_types=["lz4"])
|
||||||
|
@parametrize(from_kafka_version=str(LATEST_3_0), to_message_format_version=None, compression_types=["snappy"])
|
||||||
@parametrize(from_kafka_version=str(LATEST_2_8), to_message_format_version=None, compression_types=["none"])
|
@parametrize(from_kafka_version=str(LATEST_2_8), to_message_format_version=None, compression_types=["none"])
|
||||||
@parametrize(from_kafka_version=str(LATEST_2_8), to_message_format_version=None, compression_types=["lz4"])
|
@parametrize(from_kafka_version=str(LATEST_2_8), to_message_format_version=None, compression_types=["lz4"])
|
||||||
@parametrize(from_kafka_version=str(LATEST_2_8), to_message_format_version=None, compression_types=["snappy"])
|
@parametrize(from_kafka_version=str(LATEST_2_8), to_message_format_version=None, compression_types=["snappy"])
|
||||||
|
|
Loading…
Reference in New Issue