KAFKA-18006 Add 3.9.0 to end-to-end test (core, client) (#17797)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
TengYao Chi 2024-11-15 00:24:24 +08:00 committed by GitHub
parent 1834030107
commit e9cd9c9811
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 36 additions and 21 deletions

View File

@ -94,6 +94,7 @@ RUN mkdir -p "/opt/kafka-3.5.2" && chmod a+rw /opt/kafka-3.5.2 && curl -s "$KAFK
RUN mkdir -p "/opt/kafka-3.6.2" && chmod a+rw /opt/kafka-3.6.2 && curl -s "$KAFKA_MIRROR/kafka_2.12-3.6.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-3.6.2"
RUN mkdir -p "/opt/kafka-3.7.1" && chmod a+rw /opt/kafka-3.7.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-3.7.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-3.7.1"
RUN mkdir -p "/opt/kafka-3.8.1" && chmod a+rw /opt/kafka-3.8.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-3.8.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-3.8.1"
RUN mkdir -p "/opt/kafka-3.9.0" && chmod a+rw /opt/kafka-3.9.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-3.9.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-3.9.0"
# Streams test dependencies
@ -114,6 +115,7 @@ RUN curl -s "$KAFKA_MIRROR/kafka-streams-3.5.2-test.jar" -o /opt/kafka-3.5.2/lib
RUN curl -s "$KAFKA_MIRROR/kafka-streams-3.6.2-test.jar" -o /opt/kafka-3.6.2/libs/kafka-streams-3.6.2-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-3.7.1-test.jar" -o /opt/kafka-3.7.1/libs/kafka-streams-3.7.1-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-3.8.1-test.jar" -o /opt/kafka-3.8.1/libs/kafka-streams-3.8.1-test.jar
RUN curl -s "$KAFKA_MIRROR/kafka-streams-3.9.0-test.jar" -o /opt/kafka-3.9.0/libs/kafka-streams-3.9.0-test.jar
# To ensure the Kafka cluster starts successfully under JDK 17, we need to update the Zookeeper
# client from version 3.4.x to 3.5.7 in Kafka versions 2.1.1, 2.2.2, and 2.3.1, as the older Zookeeper

View File

@ -23,7 +23,7 @@ from ducktape.cluster.remoteaccount import RemoteCommandError
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.version import LATEST_3_8
from kafkatest.version import LATEST_3_9
class ZookeeperService(KafkaPathResolverMixin, Service):
@ -43,9 +43,9 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
"collect_default": True}
}
# After 4.0, zookeeper service is removed from source code. Using LATEST_3_8 for compatibility test cases.
# After 4.0, zookeeper service is removed from source code. Using LATEST_3_9 for compatibility test cases.
def __init__(self, context, num_nodes, zk_sasl = False, zk_client_port = True, zk_client_secure_port = False,
zk_tls_encrypt_only = False, version=LATEST_3_8):
zk_tls_encrypt_only = False, version=LATEST_3_9):
"""
:type context
"""
@ -187,7 +187,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
chroot_path = ('' if chroot is None else chroot) + path
kafka_run_class = self.path.script("kafka-run-class.sh", LATEST_3_8)
kafka_run_class = self.path.script("kafka-run-class.sh", LATEST_3_9)
cmd = "%s %s -server %s %s get %s" % \
(kafka_run_class, self.java_cli_class_name(), self.connect_setting(force_tls=self.zk_client_secure_port),
self.zkTlsConfigFileOption(True),
@ -212,7 +212,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
chroot_path = ('' if chroot is None else chroot) + path
kafka_run_class = self.path.script("kafka-run-class.sh", LATEST_3_8)
kafka_run_class = self.path.script("kafka-run-class.sh", LATEST_3_9)
cmd = "%s %s -server %s %s ls %s" % \
(kafka_run_class, self.java_cli_class_name(), self.connect_setting(force_tls=self.zk_client_secure_port),
self.zkTlsConfigFileOption(True),
@ -240,7 +240,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
chroot_path = ('' if chroot is None else chroot) + path
kafka_run_class = self.path.script("kafka-run-class.sh", LATEST_3_8)
kafka_run_class = self.path.script("kafka-run-class.sh", LATEST_3_9)
if recursive:
op = "deleteall"
else:
@ -262,7 +262,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
chroot_path = ('' if chroot is None else chroot) + path
kafka_run_class = self.path.script("kafka-run-class.sh", LATEST_3_8)
kafka_run_class = self.path.script("kafka-run-class.sh", LATEST_3_9)
cmd = "%s %s -server %s %s create %s '%s'" % \
(kafka_run_class, self.java_cli_class_name(), self.connect_setting(force_tls=self.zk_client_secure_port),
self.zkTlsConfigFileOption(True),
@ -276,7 +276,7 @@ class ZookeeperService(KafkaPathResolverMixin, Service):
Describe the default user using the ConfigCommand CLI
"""
kafka_run_class = self.path.script("kafka-run-class.sh", LATEST_3_8)
kafka_run_class = self.path.script("kafka-run-class.sh", LATEST_3_9)
cmd = "%s kafka.admin.ConfigCommand --zookeeper %s %s --describe --entity-type users --entity-default" % \
(kafka_run_class, self.connect_setting(force_tls=self.zk_client_secure_port),
self.zkTlsConfigFileOption())

View File

@ -28,7 +28,7 @@ from kafkatest.services.kafka import KafkaService, quorum
from ducktape.tests.test import Test
from kafkatest.version import DEV_BRANCH, \
LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \
LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, KafkaVersion
LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, LATEST_3_9, KafkaVersion
def get_broker_features(broker_version):
features = {}
@ -124,6 +124,7 @@ class ClientCompatibilityFeaturesTest(Test):
@parametrize(broker_version=str(LATEST_3_6))
@parametrize(broker_version=str(LATEST_3_7))
@parametrize(broker_version=str(LATEST_3_8))
@parametrize(broker_version=str(LATEST_3_9))
def run_compatibility_test(self, broker_version, metadata_quorum=quorum.zk):
if self.zk:
self.zk.start()

View File

@ -25,7 +25,7 @@ from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int_with_prefix
from kafkatest.version import DEV_BRANCH, \
LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \
LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, KafkaVersion
LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, LATEST_3_9, KafkaVersion
class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
"""
@ -75,6 +75,7 @@ class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
@parametrize(broker_version=str(LATEST_3_6))
@parametrize(broker_version=str(LATEST_3_7))
@parametrize(broker_version=str(LATEST_3_8))
@parametrize(broker_version=str(LATEST_3_9))
def test_produce_consume(self, broker_version, metadata_quorum=quorum.zk):
print("running producer_consumer_compat with broker_version = %s" % broker_version, flush=True)
self.kafka.set_version(KafkaVersion(broker_version))

View File

@ -20,7 +20,7 @@ from ducktape.mark.resource import cluster
from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
from kafkatest.services.kafka import TopicPartition, quorum, consumer_group
from kafkatest.version import LATEST_2_1, LATEST_2_3, LATEST_2_4, LATEST_2_5, \
LATEST_3_2, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, DEV_BRANCH, KafkaVersion
LATEST_3_2, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, LATEST_3_9, DEV_BRANCH, KafkaVersion
class ConsumerProtocolMigrationTest(VerifiableConsumerTest):
"""
@ -42,7 +42,7 @@ class ConsumerProtocolMigrationTest(VerifiableConsumerTest):
COOPERATIVE_STICKEY = "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"
all_consumer_versions = [LATEST_2_1, LATEST_2_3, LATEST_2_4, LATEST_2_5, \
LATEST_3_2, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, DEV_BRANCH]
LATEST_3_2, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, LATEST_3_9, DEV_BRANCH]
consumer_versions_supporting_range_assignnor = [str(v) for v in all_consumer_versions]
consumer_versions_supporting_static_membership = [str(v) for v in all_consumer_versions if v >= LATEST_2_3]
consumer_versions_supporting_cooperative_sticky_assignor = [str(v) for v in all_consumer_versions if v >= LATEST_2_4]

View File

@ -23,7 +23,7 @@ from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
from kafkatest.version import LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, \
LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, \
LATEST_3_7, LATEST_3_8, DEV_BRANCH, KafkaVersion
LATEST_3_7, LATEST_3_8, LATEST_3_9, DEV_BRANCH, KafkaVersion
# Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and new clients (e.g., 0.9.x)
class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
@ -64,6 +64,7 @@ class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
@matrix(producer_version=[str(LATEST_3_6)], consumer_version=[str(LATEST_3_6)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_3_7)], consumer_version=[str(LATEST_3_7)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_3_8)], consumer_version=[str(LATEST_3_8)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_3_9)], consumer_version=[str(LATEST_3_9)], compression_types=[["none"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
@matrix(producer_version=[str(LATEST_2_1)], consumer_version=[str(LATEST_2_1)], compression_types=[["zstd"]], timestamp_type=[str("CreateTime")], metadata_quorum=quorum.all_non_upgrade)
def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=True, timestamp_type=None, metadata_quorum=quorum.zk):
if not new_consumer and metadata_quorum != quorum.zk:

View File

@ -21,7 +21,7 @@ from kafkatest.services.kafka import config_property
from kafkatest.tests.end_to_end import EndToEndTest
from kafkatest.version import LATEST_2_4, LATEST_2_5, \
LATEST_2_6, LATEST_2_7, LATEST_2_8, LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, \
LATEST_3_6, LATEST_3_7, LATEST_3_8, DEV_BRANCH, KafkaVersion
LATEST_3_6, LATEST_3_7, LATEST_3_8, LATEST_3_9, DEV_BRANCH, KafkaVersion
class TestDowngrade(EndToEndTest):
PARTITIONS = 3
@ -81,8 +81,12 @@ class TestDowngrade(EndToEndTest):
timeout_sec=60, backoff_sec=1, err_msg="Replicas did not rejoin the ISR in a reasonable amount of time")
@cluster(num_nodes=7)
@parametrize(version=str(LATEST_3_9), compression_types=["snappy"])
@parametrize(version=str(LATEST_3_9), compression_types=["zstd"], security_protocol="SASL_SSL")
@matrix(version=[str(LATEST_3_9)], compression_types=[["none"]], static_membership=[False, True])
@parametrize(version=str(LATEST_3_8), compression_types=["snappy"])
@parametrize(version=str(LATEST_3_8), compression_types=["zstd"], security_protocol="SASL_SSL")
@matrix(version=[str(LATEST_3_8)], compression_types=[["none"]], static_membership=[False, True])
@parametrize(version=str(LATEST_3_7), compression_types=["snappy"])
@parametrize(version=str(LATEST_3_7), compression_types=["zstd"], security_protocol="SASL_SSL")
@matrix(version=[str(LATEST_3_7)], compression_types=[["none"]], static_membership=[False, True])

View File

@ -23,7 +23,7 @@ from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
from kafkatest.version import LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, \
LATEST_3_6, LATEST_3_7, LATEST_3_8, DEV_BRANCH, KafkaVersion, LATEST_STABLE_METADATA_VERSION
LATEST_3_6, LATEST_3_7, LATEST_3_8, LATEST_3_9, DEV_BRANCH, KafkaVersion, LATEST_STABLE_METADATA_VERSION
#
# Test upgrading between different KRaft versions.
@ -109,13 +109,13 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest):
assert self.kafka.check_protocol_errors(self)
@cluster(num_nodes=5)
@matrix(from_kafka_version=[str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(LATEST_3_6), str(LATEST_3_7), str(LATEST_3_8), str(DEV_BRANCH)],
@matrix(from_kafka_version=[str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(LATEST_3_6), str(LATEST_3_7), str(LATEST_3_8), str(LATEST_3_9), str(DEV_BRANCH)],
metadata_quorum=[combined_kraft])
def test_combined_mode_upgrade(self, from_kafka_version, metadata_quorum, use_new_coordinator=False):
self.run_upgrade(from_kafka_version)
@cluster(num_nodes=8)
@matrix(from_kafka_version=[str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(LATEST_3_6), str(LATEST_3_7), str(LATEST_3_8), str(DEV_BRANCH)],
@matrix(from_kafka_version=[str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(LATEST_3_6), str(LATEST_3_7), str(LATEST_3_8), str(LATEST_3_9), str(DEV_BRANCH)],
metadata_quorum=[isolated_kraft])
def test_isolated_mode_upgrade(self, from_kafka_version, metadata_quorum, use_new_coordinator=False):
self.run_upgrade(from_kafka_version)

View File

@ -21,7 +21,7 @@ from kafkatest.services.transactional_message_copier import TransactionalMessage
from kafkatest.utils import is_int
from kafkatest.utils.transactions_utils import create_and_start_copiers
from kafkatest.version import LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, \
LATEST_3_6, LATEST_3_7, LATEST_3_8, DEV_BRANCH, KafkaVersion, LATEST_STABLE_METADATA_VERSION
LATEST_3_6, LATEST_3_7, LATEST_3_8, LATEST_3_9, DEV_BRANCH, KafkaVersion, LATEST_STABLE_METADATA_VERSION
from ducktape.tests.test import Test
from ducktape.mark import matrix
@ -178,7 +178,7 @@ class TransactionsMixedVersionsTest(Test):
@cluster(num_nodes=8)
@matrix(
old_kafka_version=[str(LATEST_3_8), str(LATEST_3_7), str(LATEST_3_6), str(LATEST_3_5), str(LATEST_3_4), str(LATEST_3_3), str(LATEST_3_2), str(LATEST_3_1)],
old_kafka_version=[str(LATEST_3_9), str(LATEST_3_8), str(LATEST_3_7), str(LATEST_3_6), str(LATEST_3_5), str(LATEST_3_4), str(LATEST_3_3), str(LATEST_3_2), str(LATEST_3_1)],
metadata_quorum=[isolated_kraft],
use_new_coordinator=[False],
group_protocol=[None]

View File

@ -21,7 +21,7 @@ from kafkatest.services.transactional_message_copier import TransactionalMessage
from kafkatest.utils import is_int
from kafkatest.utils.transactions_utils import create_and_start_copiers
from kafkatest.version import LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, \
LATEST_3_6, LATEST_3_7, LATEST_3_8, DEV_BRANCH, KafkaVersion, LATEST_STABLE_METADATA_VERSION
LATEST_3_6, LATEST_3_7, LATEST_3_8, LATEST_3_9, DEV_BRANCH, KafkaVersion, LATEST_STABLE_METADATA_VERSION
from ducktape.tests.test import Test
from ducktape.mark import matrix
@ -199,7 +199,7 @@ class TransactionsUpgradeTest(Test):
@cluster(num_nodes=10)
@matrix(
from_kafka_version=[str(LATEST_3_8), str(LATEST_3_7), str(LATEST_3_6), str(LATEST_3_5), str(LATEST_3_4), str(LATEST_3_3), str(LATEST_3_2), str(LATEST_3_1)],
from_kafka_version=[str(LATEST_3_9), str(LATEST_3_8), str(LATEST_3_7), str(LATEST_3_6), str(LATEST_3_5), str(LATEST_3_4), str(LATEST_3_3), str(LATEST_3_2), str(LATEST_3_1)],
metadata_quorum=[isolated_kraft],
use_new_coordinator=[False],
group_protocol=[None]

View File

@ -209,6 +209,10 @@ V_3_8_0 = KafkaVersion("3.8.0")
V_3_8_1 = KafkaVersion("3.8.1")
LATEST_3_8 = V_3_8_1
# 3.9.x version
V_3_9_0 = KafkaVersion("3.9.0")
LATEST_3_9 = V_3_9_0
# 4.0.x version
V_4_0_0 = KafkaVersion("4.0.0")
LATEST_4_0 = V_4_0_0

View File

@ -148,6 +148,8 @@ get_kafka 3.7.1 2.12
chmod a+rw /opt/kafka-3.7.1
get_kafka 3.8.1 2.12
chmod a+rw /opt/kafka-3.8.1
get_kafka 3.9.0 2.12
chmod a+rw /opt/kafka-3.9.0
# For EC2 nodes, we want to use /mnt, which should have the local disk. On local
# VMs, we can just create it if it doesn't exist and use it like we'd use