KAFKA-16275: Update kraft_upgrade_test.py to support KIP-848’s group protocol config (#15626)

Added a new optional group_protocol parameter to the test methods, then passed that down to the methods involved.

Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢

Reviewers: Walker Carlson <wcarlson@apache.org>
This commit is contained in:
Kirk True 2024-04-03 10:12:51 -07:00 committed by GitHub
parent 21479a31bd
commit e95e91a062
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 19 additions and 10 deletions

View File

@ -17,12 +17,12 @@ from ducktape.mark import parametrize, matrix
from ducktape.mark.resource import cluster from ducktape.mark.resource import cluster
from ducktape.utils.util import wait_until from ducktape.utils.util import wait_until
from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.kafka import KafkaService from kafkatest.services.kafka import KafkaService, consumer_group
from kafkatest.services.kafka.quorum import isolated_kraft, combined_kraft from kafkatest.services.kafka.quorum import isolated_kraft, combined_kraft
from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int from kafkatest.utils import is_int
from kafkatest.version import LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, \ from kafkatest.version import LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_7, \
DEV_BRANCH, KafkaVersion, LATEST_STABLE_METADATA_VERSION DEV_BRANCH, KafkaVersion, LATEST_STABLE_METADATA_VERSION
# #
@ -74,7 +74,7 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest):
self.logger.info("Changing metadata.version to %s" % LATEST_STABLE_METADATA_VERSION) self.logger.info("Changing metadata.version to %s" % LATEST_STABLE_METADATA_VERSION)
self.kafka.upgrade_metadata_version(LATEST_STABLE_METADATA_VERSION) self.kafka.upgrade_metadata_version(LATEST_STABLE_METADATA_VERSION)
def run_upgrade(self, from_kafka_version): def run_upgrade(self, from_kafka_version, group_protocol):
"""Test upgrade of Kafka broker cluster from various versions to the current version """Test upgrade of Kafka broker cluster from various versions to the current version
from_kafka_version is a Kafka version to upgrade from. from_kafka_version is a Kafka version to upgrade from.
@ -101,7 +101,8 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest):
version=KafkaVersion(from_kafka_version)) version=KafkaVersion(from_kafka_version))
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
self.topic, new_consumer=True, consumer_timeout_ms=30000, self.topic, new_consumer=True, consumer_timeout_ms=30000,
message_validator=is_int, version=KafkaVersion(from_kafka_version)) message_validator=is_int, version=KafkaVersion(from_kafka_version),
consumer_properties=consumer_group.maybe_set_group_protocol(group_protocol))
self.run_produce_consume_validate(core_test_action=lambda: self.perform_version_change(from_kafka_version)) self.run_produce_consume_validate(core_test_action=lambda: self.perform_version_change(from_kafka_version))
cluster_id = self.kafka.cluster_id() cluster_id = self.kafka.cluster_id()
assert cluster_id is not None assert cluster_id is not None
@ -112,13 +113,21 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest):
@matrix(from_kafka_version=[str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(DEV_BRANCH)], @matrix(from_kafka_version=[str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(DEV_BRANCH)],
use_new_coordinator=[True, False], use_new_coordinator=[True, False],
metadata_quorum=[combined_kraft]) metadata_quorum=[combined_kraft])
def test_combined_mode_upgrade(self, from_kafka_version, metadata_quorum, use_new_coordinator=False): @matrix(from_kafka_version=[str(LATEST_3_7), str(DEV_BRANCH)],
self.run_upgrade(from_kafka_version) use_new_coordinator=[True],
metadata_quorum=[combined_kraft],
group_protocol=consumer_group.all_group_protocols)
def test_combined_mode_upgrade(self, from_kafka_version, metadata_quorum, use_new_coordinator=False, group_protocol=None):
self.run_upgrade(from_kafka_version, group_protocol)
@cluster(num_nodes=8) @cluster(num_nodes=8)
@matrix(from_kafka_version=[str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(DEV_BRANCH)], @matrix(from_kafka_version=[str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(DEV_BRANCH)],
use_new_coordinator=[True, False], use_new_coordinator=[True, False],
metadata_quorum=[isolated_kraft]) metadata_quorum=[isolated_kraft])
def test_isolated_mode_upgrade(self, from_kafka_version, metadata_quorum, use_new_coordinator=False): @matrix(from_kafka_version=[str(LATEST_3_7), str(DEV_BRANCH)],
self.run_upgrade(from_kafka_version) use_new_coordinator=[True],
metadata_quorum=[isolated_kraft],
group_protocol=consumer_group.all_group_protocols)
def test_isolated_mode_upgrade(self, from_kafka_version, metadata_quorum, use_new_coordinator=False, group_protocol=None):
self.run_upgrade(from_kafka_version, group_protocol)