KAFKA-13582: TestVerifiableProducer.test_multiple_kraft_security_protocols fails (#11664)

KRaft brokers always use the first controller listener, so if there is not also a colocated KRaft controller on the node be sure to only publish one controller listener in `controller.listener.names` even when the inter-controller listener name differs.  System tests were failing due to unnecessarily publishing a second entry in `controller.listener.names` for a broker-only config and not also publishing a mapping for it in `listener.security.protocol.map`.  Removing the unnecessary entry in `controller.listener.names` solves the problem.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Ron Dagostino 2022-01-10 14:54:26 -05:00 committed by GitHub
parent aaa546df7a
commit 1785e1223e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 11 additions and 6 deletions

View File

@ -677,7 +677,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
advertised_listeners = []
protocol_map = []
controller_listener_names = self.controller_listener_name_list()
controller_listener_names = self.controller_listener_name_list(node)
for port in self.port_mappings.values():
if port.open:
@ -758,12 +758,17 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
KafkaService.STDOUT_STDERR_CAPTURE)
return cmd
def controller_listener_name_list(self):
def controller_listener_name_list(self, node):
if self.quorum_info.using_zk:
return []
broker_to_controller_listener_name = self.controller_listener_name(self.controller_quorum.controller_security_protocol)
return [broker_to_controller_listener_name] if (self.controller_quorum.intercontroller_security_protocol == self.controller_quorum.controller_security_protocol) \
else [broker_to_controller_listener_name, self.controller_listener_name(self.controller_quorum.intercontroller_security_protocol)]
# Brokers always use the first controller listener, so include a second, inter-controller listener if and only if:
# 1) the node is a controller node
# 2) the inter-controller listener name differs from the broker-to-controller listener name
return [broker_to_controller_listener_name, self.controller_listener_name(self.controller_quorum.intercontroller_security_protocol)] \
if (quorum.NodeQuorumInfo(self.quorum_info, node).has_controller_role and
self.controller_quorum.intercontroller_security_protocol != self.controller_quorum.controller_security_protocol) \
else [broker_to_controller_listener_name]
def start_node(self, node, timeout_sec=60):
if node not in self.nodes_to_start:
@ -772,7 +777,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.node_quorum_info = quorum.NodeQuorumInfo(self.quorum_info, node)
if self.quorum_info.has_controllers:
for controller_listener in self.controller_listener_name_list():
for controller_listener in self.controller_listener_name_list(node):
if self.node_quorum_info.has_controller_role:
self.open_port(controller_listener)
else: # co-located case where node doesn't have a controller
@ -793,7 +798,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
KafkaService.SECURITY_PROTOCOLS.index(security_protocol_to_use))
for node in self.controller_quorum.nodes[:self.controller_quorum.num_nodes_controller_role]])
# define controller.listener.names
self.controller_listener_names = ','.join(self.controller_listener_name_list())
self.controller_listener_names = ','.join(self.controller_listener_name_list(node))
# define sasl.mechanism.controller.protocol to match remote quorum if one exists
if self.remote_controller_quorum:
self.controller_sasl_mechanism = self.remote_controller_quorum.controller_sasl_mechanism