KAFKA-18346 Fix e2e TestKRaftUpgrade for v3.3.2 (#18386)

Due to an issue with handling folders in Kafka version 3.3.2 (see https://github.com/apache/kafka/pull/13130), this end-to-end test requires using a single folder for upgrade/downgrade scenarios involving 3.3.2.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
TaiJuWu 2025-01-15 20:37:55 +08:00 committed by GitHub
parent 66b1f00c0e
commit ceee1a732e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 14 additions and 4 deletions

View File

@ -255,7 +255,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
:param jmx_attributes: :param jmx_attributes:
:param int zk_connect_timeout: :param int zk_connect_timeout:
:param int zk_session_timeout: :param int zk_session_timeout:
:param list[list] server_prop_overrides: overrides for kafka.properties file :param list[list] server_prop_overrides: overrides for kafka.properties file, if the second value is None or "", it will be filtered
e.g: [["config1", "true"], ["config2", "1000"]] e.g: [["config1", "true"], ["config2", "1000"]]
:param str zk_chroot: :param str zk_chroot:
:param bool zk_client_secure: connect to Zookeeper over secure client port (TLS) when True :param bool zk_client_secure: connect to Zookeeper over secure client port (TLS) when True
@ -781,7 +781,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
#update template configs with test override configs #update template configs with test override configs
configs.update(override_configs) configs.update(override_configs)
prop_file = self.render_configs(configs) filtered_configs = {k: v for k, v in configs.items() if v not in [None, ""]}
prop_file = self.render_configs(filtered_configs)
return prop_file return prop_file
def render_configs(self, configs): def render_configs(self, configs):

View File

@ -17,7 +17,7 @@ from ducktape.mark import parametrize, matrix
from ducktape.mark.resource import cluster from ducktape.mark.resource import cluster
from ducktape.utils.util import wait_until from ducktape.utils.util import wait_until
from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.kafka import KafkaService from kafkatest.services.kafka import config_property, KafkaService
from kafkatest.services.kafka.quorum import isolated_kraft, combined_kraft from kafkatest.services.kafka.quorum import isolated_kraft, combined_kraft
from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
@ -138,6 +138,14 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest):
- Perform rolling downgrade. - Perform rolling downgrade.
- Finally, validate that every message acked by the producer was consumed by the consumer. - Finally, validate that every message acked by the producer was consumed by the consumer.
""" """
# Due to compatability issue with version 3.3, we need to use a single folder. Using multiple folders
# will cause broker to throw InconsistentBrokerMetadataException during startup.
# see https://github.com/apache/kafka/pull/13130
server_prop_overrides = None
if starting_kafka_version == str(LATEST_3_3):
server_prop_overrides = [[config_property.LOG_DIRS, "/mnt/kafka/kafka-metadata-logs"], [config_property.METADATA_LOG_DIR, ""]]
fromKafkaVersion = KafkaVersion(starting_kafka_version) fromKafkaVersion = KafkaVersion(starting_kafka_version)
self.kafka = KafkaService(self.test_context, self.kafka = KafkaService(self.test_context,
num_nodes=3, num_nodes=3,
@ -145,7 +153,8 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest):
version=fromKafkaVersion, version=fromKafkaVersion,
topics={self.topic: {"partitions": self.partitions, topics={self.topic: {"partitions": self.partitions,
"replication-factor": self.replication_factor, "replication-factor": self.replication_factor,
'configs': {"min.insync.replicas": 2}}}) 'configs': {"min.insync.replicas": 2}}},
server_prop_overrides = server_prop_overrides)
self.kafka.start() self.kafka.start()
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka,
self.topic, throughput=self.producer_throughput, self.topic, throughput=self.producer_throughput,