MINOR: Fix kafkatest advertised listeners (#17294)

Followup for #17146

Reviewers: Bill Bejeck <bbejeck@apache.org>
This commit is contained in:
Alyssa Huang 2024-09-30 05:51:49 -07:00 committed by José Armando García Sancio
parent edd77c1e25
commit 5c95a5da31
1 changed files with 15 additions and 10 deletions

View File

@ -280,7 +280,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.controller_quorum = None # will define below if necessary
self.isolated_controller_quorum = None # will define below if necessary
self.configured_for_zk_migration = False
self.dynamicRaftQuorum = False
# Set use_new_coordinator based on context and arguments.
default_use_new_coordinator = False
@ -761,7 +762,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
for port in self.port_mappings.values():
if port.open:
listeners.append(port.listener())
advertised_listeners.append(port.advertised_listener(node))
if (self.dynamicRaftQuorum and quorum.NodeQuorumInfo(self.quorum_info, node).has_controller_role) or \
port.name not in controller_listener_names:
advertised_listeners.append(port.advertised_listener(node))
protocol_map.append(port.listener_security_protocol())
controller_sec_protocol = self.isolated_controller_quorum.controller_security_protocol if self.isolated_controller_quorum \
else self.controller_security_protocol if self.quorum_info.has_brokers_and_controllers and not quorum.NodeQuorumInfo(self.quorum_info, node).has_controller_role \
@ -881,16 +884,18 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
# define controller.quorum.bootstrap.servers or controller.quorum.voters text
security_protocol_to_use = self.controller_quorum.controller_security_protocol
first_node_id = 1 if self.quorum_info.has_brokers_and_controllers else config_property.FIRST_CONTROLLER_ID
controller_quorum_bootstrap_servers = ','.join(["{}:{}".format(node.account.hostname,
config_property.FIRST_CONTROLLER_PORT +
KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
if self.dynamicRaftQuorum:
self.controller_quorum_bootstrap_servers = controller_quorum_bootstrap_servers
self.controller_quorum_bootstrap_servers = ','.join(["{}:{}".format(node.account.hostname,
config_property.FIRST_CONTROLLER_PORT +
KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
else:
self.controller_quorum_voters = ','.join(["%s@%s" % (self.controller_quorum.idx(node) + first_node_id - 1,
bootstrap_server)
for bootstrap_server in controller_quorum_bootstrap_servers.split(',')])
self.controller_quorum_voters = ','.join(["{}@{}:{}".format(self.controller_quorum.idx(node) +
first_node_id - 1,
node.account.hostname,
config_property.FIRST_CONTROLLER_PORT +
KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
# define controller.listener.names
self.controller_listener_names = ','.join(self.controller_listener_name_list(node))
# define sasl.mechanism.controller.protocol to match the isolated quorum if one exists