mirror of https://github.com/apache/kafka.git
KAFKA-16275: Update kraft_upgrade_test.py to support KIP-848’s group protocol config (#15626)
Added a new optional group_protocol parameter to the test methods, then passed that down to the methods involved.
Unfortunately, because the new consumer can only be used with the new coordinator, this required a new @matrix block instead of adding the group_protocol=["classic", "consumer"] to the existing blocks 😢
Reviewers: Walker Carlson <wcarlson@apache.org>
This commit is contained in:
parent
21479a31bd
commit
e95e91a062
|
@ -17,12 +17,12 @@ from ducktape.mark import parametrize, matrix
|
||||||
from ducktape.mark.resource import cluster
|
from ducktape.mark.resource import cluster
|
||||||
from ducktape.utils.util import wait_until
|
from ducktape.utils.util import wait_until
|
||||||
from kafkatest.services.console_consumer import ConsoleConsumer
|
from kafkatest.services.console_consumer import ConsoleConsumer
|
||||||
from kafkatest.services.kafka import KafkaService
|
from kafkatest.services.kafka import KafkaService, consumer_group
|
||||||
from kafkatest.services.kafka.quorum import isolated_kraft, combined_kraft
|
from kafkatest.services.kafka.quorum import isolated_kraft, combined_kraft
|
||||||
from kafkatest.services.verifiable_producer import VerifiableProducer
|
from kafkatest.services.verifiable_producer import VerifiableProducer
|
||||||
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
||||||
from kafkatest.utils import is_int
|
from kafkatest.utils import is_int
|
||||||
from kafkatest.version import LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, \
|
from kafkatest.version import LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_7, \
|
||||||
DEV_BRANCH, KafkaVersion, LATEST_STABLE_METADATA_VERSION
|
DEV_BRANCH, KafkaVersion, LATEST_STABLE_METADATA_VERSION
|
||||||
|
|
||||||
#
|
#
|
||||||
|
@ -74,7 +74,7 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest):
|
||||||
self.logger.info("Changing metadata.version to %s" % LATEST_STABLE_METADATA_VERSION)
|
self.logger.info("Changing metadata.version to %s" % LATEST_STABLE_METADATA_VERSION)
|
||||||
self.kafka.upgrade_metadata_version(LATEST_STABLE_METADATA_VERSION)
|
self.kafka.upgrade_metadata_version(LATEST_STABLE_METADATA_VERSION)
|
||||||
|
|
||||||
def run_upgrade(self, from_kafka_version):
|
def run_upgrade(self, from_kafka_version, group_protocol):
|
||||||
"""Test upgrade of Kafka broker cluster from various versions to the current version
|
"""Test upgrade of Kafka broker cluster from various versions to the current version
|
||||||
|
|
||||||
from_kafka_version is a Kafka version to upgrade from.
|
from_kafka_version is a Kafka version to upgrade from.
|
||||||
|
@ -101,7 +101,8 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest):
|
||||||
version=KafkaVersion(from_kafka_version))
|
version=KafkaVersion(from_kafka_version))
|
||||||
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
|
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka,
|
||||||
self.topic, new_consumer=True, consumer_timeout_ms=30000,
|
self.topic, new_consumer=True, consumer_timeout_ms=30000,
|
||||||
message_validator=is_int, version=KafkaVersion(from_kafka_version))
|
message_validator=is_int, version=KafkaVersion(from_kafka_version),
|
||||||
|
consumer_properties=consumer_group.maybe_set_group_protocol(group_protocol))
|
||||||
self.run_produce_consume_validate(core_test_action=lambda: self.perform_version_change(from_kafka_version))
|
self.run_produce_consume_validate(core_test_action=lambda: self.perform_version_change(from_kafka_version))
|
||||||
cluster_id = self.kafka.cluster_id()
|
cluster_id = self.kafka.cluster_id()
|
||||||
assert cluster_id is not None
|
assert cluster_id is not None
|
||||||
|
@ -112,13 +113,21 @@ class TestKRaftUpgrade(ProduceConsumeValidateTest):
|
||||||
@matrix(from_kafka_version=[str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(DEV_BRANCH)],
|
@matrix(from_kafka_version=[str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(DEV_BRANCH)],
|
||||||
use_new_coordinator=[True, False],
|
use_new_coordinator=[True, False],
|
||||||
metadata_quorum=[combined_kraft])
|
metadata_quorum=[combined_kraft])
|
||||||
def test_combined_mode_upgrade(self, from_kafka_version, metadata_quorum, use_new_coordinator=False):
|
@matrix(from_kafka_version=[str(LATEST_3_7), str(DEV_BRANCH)],
|
||||||
self.run_upgrade(from_kafka_version)
|
use_new_coordinator=[True],
|
||||||
|
metadata_quorum=[combined_kraft],
|
||||||
|
group_protocol=consumer_group.all_group_protocols)
|
||||||
|
def test_combined_mode_upgrade(self, from_kafka_version, metadata_quorum, use_new_coordinator=False, group_protocol=None):
|
||||||
|
self.run_upgrade(from_kafka_version, group_protocol)
|
||||||
|
|
||||||
@cluster(num_nodes=8)
|
@cluster(num_nodes=8)
|
||||||
@matrix(from_kafka_version=[str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(DEV_BRANCH)],
|
@matrix(from_kafka_version=[str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(DEV_BRANCH)],
|
||||||
use_new_coordinator=[True, False],
|
use_new_coordinator=[True, False],
|
||||||
metadata_quorum=[isolated_kraft])
|
metadata_quorum=[isolated_kraft])
|
||||||
def test_isolated_mode_upgrade(self, from_kafka_version, metadata_quorum, use_new_coordinator=False):
|
@matrix(from_kafka_version=[str(LATEST_3_7), str(DEV_BRANCH)],
|
||||||
self.run_upgrade(from_kafka_version)
|
use_new_coordinator=[True],
|
||||||
|
metadata_quorum=[isolated_kraft],
|
||||||
|
group_protocol=consumer_group.all_group_protocols)
|
||||||
|
def test_isolated_mode_upgrade(self, from_kafka_version, metadata_quorum, use_new_coordinator=False, group_protocol=None):
|
||||||
|
self.run_upgrade(from_kafka_version, group_protocol)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue