mirror of https://github.com/apache/kafka.git
MINOR: Clean up system tests based on new defaults (#17113)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
72e16cb9e1
commit
2ff81f087a
|
@ -29,7 +29,6 @@ class KafkaConfig(dict):
|
||||||
config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS: str(10*1024*1024), # 10 MB
|
config_property.METADATA_LOG_BYTES_BETWEEN_SNAPSHOTS: str(10*1024*1024), # 10 MB
|
||||||
config_property.METADATA_LOG_RETENTION_BYTES: str(10*1024*1024), # 10 MB
|
config_property.METADATA_LOG_RETENTION_BYTES: str(10*1024*1024), # 10 MB
|
||||||
config_property.METADATA_LOG_SEGMENT_MS: str(1*60*1000), # one minute
|
config_property.METADATA_LOG_SEGMENT_MS: str(1*60*1000), # one minute
|
||||||
config_property.NEW_GROUP_COORDINATOR_ENABLE: False
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
|
|
|
@ -279,17 +279,15 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
||||||
self.controller_quorum = None # will define below if necessary
|
self.controller_quorum = None # will define below if necessary
|
||||||
self.isolated_controller_quorum = None # will define below if necessary
|
self.isolated_controller_quorum = None # will define below if necessary
|
||||||
self.configured_for_zk_migration = False
|
self.configured_for_zk_migration = False
|
||||||
|
|
||||||
# Set use_new_coordinator based on context and arguments.
|
# Set use_new_coordinator based on context and arguments.
|
||||||
# The new group coordinator is used by default in kraft mode.
|
# If not specified, the default config is used.
|
||||||
default_use_new_coordinator = self.quorum_info.using_kraft and version == DEV_BRANCH
|
|
||||||
|
|
||||||
if use_new_coordinator is None:
|
if use_new_coordinator is None:
|
||||||
arg_name = 'use_new_coordinator'
|
arg_name = 'use_new_coordinator'
|
||||||
if context.injected_args is not None:
|
if context.injected_args is not None:
|
||||||
use_new_coordinator = context.injected_args.get(arg_name)
|
use_new_coordinator = context.injected_args.get(arg_name)
|
||||||
if use_new_coordinator is None:
|
if use_new_coordinator is None:
|
||||||
use_new_coordinator = context.globals.get(arg_name, default_use_new_coordinator)
|
use_new_coordinator = context.globals.get(arg_name)
|
||||||
|
|
||||||
# Assign the determined value.
|
# Assign the determined value.
|
||||||
self.use_new_coordinator = use_new_coordinator
|
self.use_new_coordinator = use_new_coordinator
|
||||||
|
@ -413,12 +411,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
||||||
self.interbroker_sasl_mechanism = interbroker_sasl_mechanism
|
self.interbroker_sasl_mechanism = interbroker_sasl_mechanism
|
||||||
self._security_config = None
|
self._security_config = None
|
||||||
|
|
||||||
# When the new group coordinator is enabled, the new consumer rebalance
|
|
||||||
# protocol is enabled too.
|
|
||||||
rebalance_protocols = "classic"
|
|
||||||
if self.use_new_coordinator:
|
|
||||||
rebalance_protocols = "classic,consumer"
|
|
||||||
|
|
||||||
for node in self.nodes:
|
for node in self.nodes:
|
||||||
node_quorum_info = quorum.NodeQuorumInfo(self.quorum_info, node)
|
node_quorum_info = quorum.NodeQuorumInfo(self.quorum_info, node)
|
||||||
|
|
||||||
|
@ -431,9 +423,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
||||||
}
|
}
|
||||||
kraft_broker_configs = {
|
kraft_broker_configs = {
|
||||||
config_property.PORT: config_property.FIRST_BROKER_PORT,
|
config_property.PORT: config_property.FIRST_BROKER_PORT,
|
||||||
config_property.NODE_ID: self.idx(node),
|
config_property.NODE_ID: self.idx(node)
|
||||||
config_property.NEW_GROUP_COORDINATOR_ENABLE: use_new_coordinator,
|
|
||||||
config_property.GROUP_COORDINATOR_REBALANCE_PROTOCOLS: rebalance_protocols
|
|
||||||
}
|
}
|
||||||
kraft_broker_plus_zk_configs = kraft_broker_configs.copy()
|
kraft_broker_plus_zk_configs = kraft_broker_configs.copy()
|
||||||
kraft_broker_plus_zk_configs.update(zk_broker_configs)
|
kraft_broker_plus_zk_configs.update(zk_broker_configs)
|
||||||
|
@ -794,9 +784,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
||||||
else:
|
else:
|
||||||
override_configs[config_property.ZOOKEEPER_SSL_CLIENT_ENABLE] = 'false'
|
override_configs[config_property.ZOOKEEPER_SSL_CLIENT_ENABLE] = 'false'
|
||||||
|
|
||||||
if self.use_new_coordinator:
|
if self.use_new_coordinator is not None:
|
||||||
override_configs[config_property.NEW_GROUP_COORDINATOR_ENABLE] = 'true'
|
override_configs[config_property.NEW_GROUP_COORDINATOR_ENABLE] = str(self.use_new_coordinator)
|
||||||
override_configs[config_property.GROUP_COORDINATOR_REBALANCE_PROTOCOLS] = 'classic,consumer'
|
|
||||||
|
|
||||||
for prop in self.server_prop_overrides:
|
for prop in self.server_prop_overrides:
|
||||||
override_configs[prop[0]] = prop[1]
|
override_configs[prop[0]] = prop[1]
|
||||||
|
|
Loading…
Reference in New Issue