KAFKA-16860; [2/2] Introduce group.version feature flag (#16149)

This patch updates the system tests to correctly enable the new consumer protocol/coordinator in the tests requiring them.

I went with the simplest approach for now. Long term, I think that we should refactor the tests to better handle features and non-production features.

I got a successful run of the consumer system tests with this patch combined with https://github.com/apache/kafka/pull/16120: https://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/system-test-kafka-branch-builder--1717155071--dajac--KAFKA-16860-2--29028ae0dd/2024-05-31--001./2024-05-31--001./report.html.

Reviewers: Justine Olshan <jolshan@confluent.io>
This commit is contained in:
David Jacot 2024-05-31 21:49:26 +02:00 committed by GitHub
parent ba61ff0cd9
commit 190dd79457
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 18 additions and 1 deletions

View File

@ -70,6 +70,9 @@ DELEGATION_TOKEN_SECRET_KEY="delegation.token.secret.key"
SASL_ENABLED_MECHANISMS="sasl.enabled.mechanisms"
NEW_GROUP_COORDINATOR_ENABLE="group.coordinator.new.enable"
GROUP_COORDINATOR_REBALANCE_PROTOCOLS="group.coordinator.rebalance.protocols"
UNSTABLE_FEATURE_VERSIONS_ENABLE="unstable.feature.versions.enable"
"""
From KafkaConfig.scala

View File

@ -409,6 +409,12 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.interbroker_sasl_mechanism = interbroker_sasl_mechanism
self._security_config = None
# When the new group coordinator is enabled, the new consumer rebalance
# protocol is enabled too.
rebalance_protocols = "classic"
if self.use_new_coordinator:
rebalance_protocols = "classic,consumer"
for node in self.nodes:
node_quorum_info = quorum.NodeQuorumInfo(self.quorum_info, node)
@ -422,7 +428,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
kraft_broker_configs = {
config_property.PORT: config_property.FIRST_BROKER_PORT,
config_property.NODE_ID: self.idx(node),
config_property.NEW_GROUP_COORDINATOR_ENABLE: use_new_coordinator
config_property.UNSTABLE_FEATURE_VERSIONS_ENABLE: use_new_coordinator,
config_property.NEW_GROUP_COORDINATOR_ENABLE: use_new_coordinator,
config_property.GROUP_COORDINATOR_REBALANCE_PROTOCOLS: rebalance_protocols
}
kraft_broker_plus_zk_configs = kraft_broker_configs.copy()
kraft_broker_plus_zk_configs.update(zk_broker_configs)
@ -781,7 +789,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
override_configs[config_property.ZOOKEEPER_SSL_CLIENT_ENABLE] = 'false'
if self.use_new_coordinator:
override_configs[config_property.UNSTABLE_FEATURE_VERSIONS_ENABLE] = 'true'
override_configs[config_property.NEW_GROUP_COORDINATOR_ENABLE] = 'true'
override_configs[config_property.GROUP_COORDINATOR_REBALANCE_PROTOCOLS] = 'classic,consumer'
for prop in self.server_prop_overrides:
override_configs[prop[0]] = prop[1]
@ -884,6 +894,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
# format log directories if necessary
kafka_storage_script = self.path.script("kafka-storage.sh", node)
cmd = "%s format --ignore-formatted --config %s --cluster-id %s" % (kafka_storage_script, KafkaService.CONFIG_FILE, config_property.CLUSTER_ID)
if self.use_new_coordinator:
cmd += " -f group.version=1"
self.logger.info("Running log directory format command...\n%s" % cmd)
node.account.ssh(cmd)