mirror of https://github.com/apache/kafka.git
KAFKA-10131: Remove use_zk_connection flag from ducktape (#9274)
Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
parent
ee68b999c4
commit
ebd64b5d55
|
@ -254,7 +254,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
def alive(self, node):
|
||||
return len(self.pids(node)) > 0
|
||||
|
||||
def start(self, add_principals="", use_zk_to_create_topic=True):
|
||||
def start(self, add_principals=""):
|
||||
if self.zk_client_secure and not self.zk.zk_client_secure_port:
|
||||
raise Exception("Unable to start Kafka: TLS to Zookeeper requested but Zookeeper secure port not enabled")
|
||||
self.open_port(self.security_protocol)
|
||||
|
@ -281,7 +281,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
topic_cfg = {}
|
||||
|
||||
topic_cfg["topic"] = topic
|
||||
self.create_topic(topic_cfg, use_zk_to_create_topic=use_zk_to_create_topic)
|
||||
self.create_topic(topic_cfg)
|
||||
|
||||
def _ensure_zk_chroot(self):
|
||||
self.logger.info("Ensuring zk_chroot %s exists", self.zk_chroot)
|
||||
|
@ -445,25 +445,61 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
clean_shutdown=False, allow_fail=True)
|
||||
node.account.ssh("sudo rm -rf -- %s" % KafkaService.PERSISTENT_ROOT, allow_fail=False)
|
||||
|
||||
def _kafka_topics_cmd(self, node, use_zk_connection=True):
|
||||
def _kafka_topics_cmd(self, node, force_use_zk_connection):
|
||||
"""
|
||||
Returns kafka-topics.sh command path with jaas configuration and krb5 environment variable
|
||||
set. If Admin client is not going to be used, don't set the environment variable.
|
||||
"""
|
||||
kafka_topic_script = self.path.script("kafka-topics.sh", node)
|
||||
skip_security_settings = use_zk_connection or not node.version.topic_command_supports_bootstrap_server()
|
||||
skip_security_settings = force_use_zk_connection or not self.all_nodes_topic_command_supports_bootstrap_server()
|
||||
return kafka_topic_script if skip_security_settings else \
|
||||
"KAFKA_OPTS='-D%s -D%s' %s" % (KafkaService.JAAS_CONF_PROPERTY, KafkaService.KRB5_CONF, kafka_topic_script)
|
||||
|
||||
def _kafka_topics_cmd_config(self, node, use_zk_connection=True):
|
||||
def _kafka_topics_cmd_config(self, node, force_use_zk_connection):
|
||||
"""
|
||||
Return --command-config parameter to the kafka-topics.sh command. The config parameter specifies
|
||||
the security settings that AdminClient uses to connect to a secure kafka server.
|
||||
"""
|
||||
skip_command_config = use_zk_connection or not node.version.topic_command_supports_bootstrap_server()
|
||||
skip_command_config = force_use_zk_connection or not self.all_nodes_topic_command_supports_bootstrap_server()
|
||||
return "" if skip_command_config else " --command-config <(echo '%s')" % (self.security_config.client_config())
|
||||
|
||||
def create_topic(self, topic_cfg, node=None, use_zk_to_create_topic=True):
|
||||
def all_nodes_topic_command_supports_bootstrap_server(self):
|
||||
for node in self.nodes:
|
||||
if not node.version.topic_command_supports_bootstrap_server():
|
||||
return False
|
||||
return True
|
||||
|
||||
def all_nodes_topic_command_supports_if_not_exists_with_bootstrap_server(self):
|
||||
for node in self.nodes:
|
||||
if not node.version.topic_command_supports_if_not_exists_with_bootstrap_server():
|
||||
return False
|
||||
return True
|
||||
|
||||
def all_nodes_configs_command_uses_bootstrap_server(self):
|
||||
for node in self.nodes:
|
||||
if not node.version.kafka_configs_command_uses_bootstrap_server():
|
||||
return False
|
||||
return True
|
||||
|
||||
def all_nodes_configs_command_uses_bootstrap_server_scram(self):
|
||||
for node in self.nodes:
|
||||
if not node.version.kafka_configs_command_uses_bootstrap_server_scram():
|
||||
return False
|
||||
return True
|
||||
|
||||
def all_nodes_acl_command_supports_bootstrap_server(self):
|
||||
for node in self.nodes:
|
||||
if not node.version.acl_command_supports_bootstrap_server():
|
||||
return False
|
||||
return True
|
||||
|
||||
def all_nodes_reassign_partitions_command_supports_bootstrap_server(self):
|
||||
for node in self.nodes:
|
||||
if not node.version.reassign_partitions_command_supports_bootstrap_server():
|
||||
return False
|
||||
return True
|
||||
|
||||
def create_topic(self, topic_cfg, node=None):
|
||||
"""Run the admin tool create topic command.
|
||||
Specifying node is optional, and may be done if for different kafka nodes have different versions,
|
||||
and we care where command gets run.
|
||||
|
@ -475,12 +511,13 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
self.logger.info("Creating topic %s with settings %s",
|
||||
topic_cfg["topic"], topic_cfg)
|
||||
|
||||
use_zk_connection = topic_cfg.get('if-not-exists', False) or use_zk_to_create_topic
|
||||
force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server() or\
|
||||
(topic_cfg.get('if-not-exists', False) and not self.all_nodes_topic_command_supports_if_not_exists_with_bootstrap_server())
|
||||
|
||||
cmd = fix_opts_for_new_jvm(node)
|
||||
cmd += "%(kafka_topics_cmd)s %(connection_string)s --create --topic %(topic)s " % {
|
||||
'kafka_topics_cmd': self._kafka_topics_cmd(node, use_zk_connection),
|
||||
'connection_string': self._topic_command_connect_setting(node, use_zk_connection),
|
||||
'kafka_topics_cmd': self._kafka_topics_cmd(node, force_use_zk_connection),
|
||||
'connection_string': self._topic_command_connect_setting(node, force_use_zk_connection),
|
||||
'topic': topic_cfg.get("topic"),
|
||||
}
|
||||
if 'replica-assignment' in topic_cfg:
|
||||
|
@ -500,12 +537,12 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
for config_name, config_value in topic_cfg["configs"].items():
|
||||
cmd += " --config %s=%s" % (config_name, str(config_value))
|
||||
|
||||
cmd += self._kafka_topics_cmd_config(node, use_zk_connection)
|
||||
cmd += self._kafka_topics_cmd_config(node, force_use_zk_connection)
|
||||
|
||||
self.logger.info("Running topic creation command...\n%s" % cmd)
|
||||
node.account.ssh(cmd)
|
||||
|
||||
def delete_topic(self, topic, node=None, use_zk_to_delete_topic=False):
|
||||
def delete_topic(self, topic, node=None):
|
||||
"""
|
||||
Delete a topic with the topics command
|
||||
:param topic:
|
||||
|
@ -516,22 +553,27 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
node = self.nodes[0]
|
||||
self.logger.info("Deleting topic %s" % topic)
|
||||
|
||||
force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server()
|
||||
|
||||
cmd = fix_opts_for_new_jvm(node)
|
||||
cmd += "%s %s --topic %s --delete %s" % \
|
||||
(self._kafka_topics_cmd(node=node, use_zk_connection=use_zk_to_delete_topic),
|
||||
self._topic_command_connect_setting(node=node, use_zk_connection=use_zk_to_delete_topic),
|
||||
topic, self._kafka_topics_cmd_config(node=node, use_zk_connection=use_zk_to_delete_topic))
|
||||
(self._kafka_topics_cmd(node, force_use_zk_connection),
|
||||
self._topic_command_connect_setting(node, force_use_zk_connection),
|
||||
topic, self._kafka_topics_cmd_config(node, force_use_zk_connection))
|
||||
self.logger.info("Running topic delete command...\n%s" % cmd)
|
||||
node.account.ssh(cmd)
|
||||
|
||||
def describe_topic(self, topic, node=None, use_zk_to_describe_topic=True):
|
||||
def describe_topic(self, topic, node=None):
|
||||
if node is None:
|
||||
node = self.nodes[0]
|
||||
|
||||
force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server()
|
||||
|
||||
cmd = fix_opts_for_new_jvm(node)
|
||||
cmd += "%s %s --topic %s --describe %s" % \
|
||||
(self._kafka_topics_cmd(node=node, use_zk_connection=use_zk_to_describe_topic),
|
||||
self._topic_command_connect_setting(node=node, use_zk_connection=use_zk_to_describe_topic),
|
||||
topic, self._kafka_topics_cmd_config(node=node, use_zk_connection=use_zk_to_describe_topic))
|
||||
(self._kafka_topics_cmd(node, force_use_zk_connection),
|
||||
self._topic_command_connect_setting(node, force_use_zk_connection),
|
||||
topic, self._kafka_topics_cmd_config(node, force_use_zk_connection))
|
||||
|
||||
self.logger.info("Running topic describe command...\n%s" % cmd)
|
||||
output = ""
|
||||
|
@ -539,14 +581,16 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
output += line
|
||||
return output
|
||||
|
||||
def list_topics(self, node=None, use_zk_to_list_topic=True):
|
||||
def list_topics(self, node=None):
|
||||
if node is None:
|
||||
node = self.nodes[0]
|
||||
|
||||
force_use_zk_connection = not self.all_nodes_topic_command_supports_bootstrap_server()
|
||||
|
||||
cmd = fix_opts_for_new_jvm(node)
|
||||
cmd += "%s %s --list %s" % (self._kafka_topics_cmd(node, use_zk_to_list_topic),
|
||||
self._topic_command_connect_setting(node, use_zk_to_list_topic),
|
||||
self._kafka_topics_cmd_config(node, use_zk_to_list_topic))
|
||||
cmd += "%s %s --list %s" % (self._kafka_topics_cmd(node, force_use_zk_connection),
|
||||
self._topic_command_connect_setting(node, force_use_zk_connection),
|
||||
self._kafka_topics_cmd_config(node, force_use_zk_connection))
|
||||
for line in node.account.ssh_capture(cmd):
|
||||
if not line.startswith("SLF4J"):
|
||||
yield line.rstrip()
|
||||
|
@ -578,7 +622,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
|
||||
def _connect_setting_kafka_configs(self, node):
|
||||
# Use this for everything related to kafka-configs except User SCRAM Credentials
|
||||
if node.version.kafka_configs_command_uses_bootstrap_server():
|
||||
if self.all_nodes_configs_command_uses_bootstrap_server():
|
||||
return "--bootstrap-server %s --command-config <(echo '%s')" % (self.bootstrap_servers(self.security_protocol),
|
||||
self.security_config.client_config())
|
||||
else:
|
||||
|
@ -586,13 +630,32 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
|
||||
def _connect_setting_kafka_configs_scram(self, node):
|
||||
# Use this for kafka-configs when operating on User SCRAM Credentials
|
||||
if node.version.kafka_configs_command_uses_bootstrap_server_scram():
|
||||
if self.all_nodes_configs_command_uses_bootstrap_server_scram():
|
||||
return "--bootstrap-server %s --command-config <(echo '%s')" %\
|
||||
(self.bootstrap_servers(self.security_protocol),
|
||||
self.security_config.client_config(use_inter_broker_mechanism_for_client = True))
|
||||
else:
|
||||
return "--zookeeper %s %s" % (self.zk_connect_setting(), self.zk.zkTlsConfigFileOption())
|
||||
|
||||
def kafka_acls_cmd(self, node, force_use_zk_connection):
|
||||
"""
|
||||
Returns kafka-acls.sh command path with jaas configuration and krb5 environment variable
|
||||
set. If Admin client is not going to be used, don't set the environment variable.
|
||||
"""
|
||||
kafka_acls_script = self.path.script("kafka-acls.sh", node)
|
||||
skip_security_settings = force_use_zk_connection or not self.all_nodes_acl_command_supports_bootstrap_server()
|
||||
return kafka_acls_script if skip_security_settings else \
|
||||
"KAFKA_OPTS='-D%s -D%s' %s" % (KafkaService.JAAS_CONF_PROPERTY, KafkaService.KRB5_CONF, kafka_acls_script)
|
||||
|
||||
def run_cli_tool(self, node, cmd):
|
||||
output = ""
|
||||
self.logger.debug(cmd)
|
||||
for line in node.account.ssh_capture(cmd):
|
||||
if not line.startswith("SLF4J"):
|
||||
output += line
|
||||
self.logger.debug(output)
|
||||
return output
|
||||
|
||||
def parse_describe_topic(self, topic_description):
|
||||
"""Parse output of kafka-topics.sh --describe (or describe_topic() method above), which is a string of form
|
||||
PartitionCount:2\tReplicationFactor:2\tConfigs:
|
||||
|
@ -624,7 +687,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
|
||||
|
||||
def _connect_setting_reassign_partitions(self, node):
|
||||
if node.version.reassign_partitions_command_supports_bootstrap_server():
|
||||
if self.all_nodes_reassign_partitions_command_supports_bootstrap_server():
|
||||
return "--bootstrap-server %s " % self.bootstrap_servers(self.security_protocol)
|
||||
else:
|
||||
return "--zookeeper %s " % self.zk_connect_setting()
|
||||
|
@ -748,37 +811,68 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
self.stop_node(node, clean_shutdown, timeout_sec)
|
||||
self.start_node(node, timeout_sec)
|
||||
|
||||
def _describe_topic_line_for_partition(self, partition, describe_topic_output):
|
||||
# Lines look like this: Topic: test_topic Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2
|
||||
grep_for = "Partition: %i\t" % (partition) # be sure to include trailing tab, otherwise 1 might match 10 (for example)
|
||||
found_lines = [line for line in describe_topic_output.splitlines() if grep_for in line]
|
||||
return None if not found_lines else found_lines[0]
|
||||
|
||||
def isr_idx_list(self, topic, partition=0):
|
||||
""" Get in-sync replica list the given topic and partition.
|
||||
"""
|
||||
self.logger.debug("Querying zookeeper to find in-sync replicas for topic %s and partition %d" % (topic, partition))
|
||||
zk_path = "/brokers/topics/%s/partitions/%d/state" % (topic, partition)
|
||||
partition_state = self.zk.query(zk_path, chroot=self.zk_chroot)
|
||||
node = self.nodes[0]
|
||||
if not self.all_nodes_topic_command_supports_bootstrap_server():
|
||||
self.logger.debug("Querying zookeeper to find in-sync replicas for topic %s and partition %d" % (topic, partition))
|
||||
zk_path = "/brokers/topics/%s/partitions/%d/state" % (topic, partition)
|
||||
partition_state = self.zk.query(zk_path, chroot=self.zk_chroot)
|
||||
|
||||
if partition_state is None:
|
||||
raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition))
|
||||
if partition_state is None:
|
||||
raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition))
|
||||
|
||||
partition_state = json.loads(partition_state)
|
||||
self.logger.info(partition_state)
|
||||
partition_state = json.loads(partition_state)
|
||||
self.logger.info(partition_state)
|
||||
|
||||
isr_idx_list = partition_state["isr"]
|
||||
else:
|
||||
self.logger.debug("Querying Kafka Admin API to find in-sync replicas for topic %s and partition %d" % (topic, partition))
|
||||
describe_output = self.describe_topic(topic, node)
|
||||
self.logger.debug(describe_output)
|
||||
requested_partition_line = self._describe_topic_line_for_partition(partition, describe_output)
|
||||
# e.g. Topic: test_topic Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2
|
||||
if not requested_partition_line:
|
||||
raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition))
|
||||
isr_csv = requested_partition_line.split()[9] # 10th column from above
|
||||
isr_idx_list = [int(i) for i in isr_csv.split(",")]
|
||||
|
||||
isr_idx_list = partition_state["isr"]
|
||||
self.logger.info("Isr for topic %s and partition %d is now: %s" % (topic, partition, isr_idx_list))
|
||||
return isr_idx_list
|
||||
|
||||
def replicas(self, topic, partition=0):
|
||||
""" Get the assigned replicas for the given topic and partition.
|
||||
"""
|
||||
self.logger.debug("Querying zookeeper to find assigned replicas for topic %s and partition %d" % (topic, partition))
|
||||
zk_path = "/brokers/topics/%s" % (topic)
|
||||
assignment = self.zk.query(zk_path, chroot=self.zk_chroot)
|
||||
node = self.nodes[0]
|
||||
if not self.all_nodes_topic_command_supports_bootstrap_server():
|
||||
self.logger.debug("Querying zookeeper to find assigned replicas for topic %s and partition %d" % (topic, partition))
|
||||
zk_path = "/brokers/topics/%s" % (topic)
|
||||
assignment = self.zk.query(zk_path, chroot=self.zk_chroot)
|
||||
|
||||
if assignment is None:
|
||||
raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition))
|
||||
if assignment is None:
|
||||
raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition))
|
||||
|
||||
assignment = json.loads(assignment)
|
||||
self.logger.info(assignment)
|
||||
assignment = json.loads(assignment)
|
||||
self.logger.info(assignment)
|
||||
|
||||
replicas = assignment["partitions"][str(partition)]
|
||||
replicas = assignment["partitions"][str(partition)]
|
||||
else:
|
||||
self.logger.debug("Querying Kafka Admin API to find replicas for topic %s and partition %d" % (topic, partition))
|
||||
describe_output = self.describe_topic(topic, node)
|
||||
self.logger.debug(describe_output)
|
||||
requested_partition_line = self._describe_topic_line_for_partition(partition, describe_output)
|
||||
# e.g. Topic: test_topic Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2
|
||||
if not requested_partition_line:
|
||||
raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition))
|
||||
isr_csv = requested_partition_line.split()[7] # 8th column from above
|
||||
replicas = [int(i) for i in isr_csv.split(",")]
|
||||
|
||||
self.logger.info("Assigned replicas for topic %s and partition %d is now: %s" % (topic, partition, replicas))
|
||||
return [self.get_node(replica) for replica in replicas]
|
||||
|
@ -786,17 +880,29 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
def leader(self, topic, partition=0):
|
||||
""" Get the leader replica for the given topic and partition.
|
||||
"""
|
||||
self.logger.debug("Querying zookeeper to find leader replica for topic %s and partition %d" % (topic, partition))
|
||||
zk_path = "/brokers/topics/%s/partitions/%d/state" % (topic, partition)
|
||||
partition_state = self.zk.query(zk_path, chroot=self.zk_chroot)
|
||||
node = self.nodes[0]
|
||||
if not self.all_nodes_topic_command_supports_bootstrap_server():
|
||||
self.logger.debug("Querying zookeeper to find leader replica for topic %s and partition %d" % (topic, partition))
|
||||
zk_path = "/brokers/topics/%s/partitions/%d/state" % (topic, partition)
|
||||
partition_state = self.zk.query(zk_path, chroot=self.zk_chroot)
|
||||
|
||||
if partition_state is None:
|
||||
raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition))
|
||||
if partition_state is None:
|
||||
raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition))
|
||||
|
||||
partition_state = json.loads(partition_state)
|
||||
self.logger.info(partition_state)
|
||||
partition_state = json.loads(partition_state)
|
||||
self.logger.info(partition_state)
|
||||
|
||||
leader_idx = int(partition_state["leader"])
|
||||
else:
|
||||
self.logger.debug("Querying Kafka Admin API to find leader for topic %s and partition %d" % (topic, partition))
|
||||
describe_output = self.describe_topic(topic, node)
|
||||
self.logger.debug(describe_output)
|
||||
requested_partition_line = self._describe_topic_line_for_partition(partition, describe_output)
|
||||
# e.g. Topic: test_topic Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2
|
||||
if not requested_partition_line:
|
||||
raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition))
|
||||
leader_idx = int(requested_partition_line.split()[5]) # 6th column from above
|
||||
|
||||
leader_idx = int(partition_state["leader"])
|
||||
self.logger.info("Leader for topic %s and partition %d is now: %d" % (topic, partition, leader_idx))
|
||||
return self.get_node(leader_idx)
|
||||
|
||||
|
@ -840,13 +946,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
(consumer_group_script,
|
||||
self.bootstrap_servers(self.security_protocol),
|
||||
command_config)
|
||||
output = ""
|
||||
self.logger.debug(cmd)
|
||||
for line in node.account.ssh_capture(cmd):
|
||||
if not line.startswith("SLF4J"):
|
||||
output += line
|
||||
self.logger.debug(output)
|
||||
return output
|
||||
return self.run_cli_tool(node, cmd)
|
||||
|
||||
def describe_consumer_group(self, group, node=None, command_config=None):
|
||||
""" Describe a consumer group.
|
||||
|
@ -877,12 +977,12 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
|
|||
def zk_connect_setting(self):
|
||||
return self.zk.connect_setting(self.zk_chroot, self.zk_client_secure)
|
||||
|
||||
def _topic_command_connect_setting(self, node, use_zk_connection=True):
|
||||
def _topic_command_connect_setting(self, node, force_use_zk_connection):
|
||||
"""
|
||||
Checks if --bootstrap-server config is supported, if yes then returns a string with
|
||||
bootstrap server, otherwise returns zookeeper connection string.
|
||||
"""
|
||||
if node.version.topic_command_supports_bootstrap_server() and not use_zk_connection:
|
||||
if not force_use_zk_connection and self.all_nodes_topic_command_supports_bootstrap_server():
|
||||
connection_setting = "--bootstrap-server %s" % (self.bootstrap_servers(self.security_protocol))
|
||||
else:
|
||||
connection_setting = "--zookeeper %s" % (self.zk_connect_setting())
|
||||
|
|
|
@ -14,62 +14,140 @@
|
|||
# limitations under the License.
|
||||
|
||||
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
|
||||
from kafkatest.services.kafka.util import fix_opts_for_new_jvm
|
||||
|
||||
|
||||
class ACLs(KafkaPathResolverMixin):
|
||||
def __init__(self, context):
|
||||
self.context = context
|
||||
|
||||
def set_acls(self, protocol, kafka, topic, group):
|
||||
node = kafka.nodes[0]
|
||||
setting = kafka.zk_connect_setting()
|
||||
|
||||
def set_acls(self, protocol, kafka, topic, group, force_use_zk_connection=False):
|
||||
# Set server ACLs
|
||||
kafka_principal = "User:CN=systemtest" if protocol == "SSL" else "User:kafka"
|
||||
self.acls_command(node, ACLs.add_cluster_acl(setting, kafka_principal))
|
||||
self.acls_command(node, ACLs.broker_read_acl(setting, "*", kafka_principal))
|
||||
self.add_cluster_acl(kafka, kafka_principal, force_use_zk_connection=force_use_zk_connection)
|
||||
self.add_read_acl(kafka, kafka_principal, "*", force_use_zk_connection=force_use_zk_connection)
|
||||
|
||||
# Set client ACLs
|
||||
client_principal = "User:CN=systemtest" if protocol == "SSL" else "User:client"
|
||||
self.acls_command(node, ACLs.produce_acl(setting, topic, client_principal))
|
||||
self.acls_command(node, ACLs.consume_acl(setting, topic, group, client_principal))
|
||||
self.add_produce_acl(kafka, client_principal, topic, force_use_zk_connection=force_use_zk_connection)
|
||||
self.add_consume_acl(kafka, client_principal, topic, group, force_use_zk_connection=force_use_zk_connection)
|
||||
|
||||
def acls_command(self, node, properties):
|
||||
cmd = "%s %s" % (self.path.script("kafka-acls.sh", node), properties)
|
||||
node.account.ssh(cmd)
|
||||
def _acl_command_connect_setting(self, kafka, node, force_use_zk_connection):
|
||||
"""
|
||||
Checks if --bootstrap-server config is supported, if yes then returns a string with
|
||||
bootstrap server, otherwise returns authorizer properties for zookeeper connection.
|
||||
"""
|
||||
if not force_use_zk_connection and kafka.all_nodes_acl_command_supports_bootstrap_server():
|
||||
connection_setting = "--bootstrap-server %s" % (kafka.bootstrap_servers(kafka.security_protocol))
|
||||
else:
|
||||
connection_setting = "--authorizer-properties zookeeper.connect=%s" % (kafka.zk_connect_setting())
|
||||
|
||||
@staticmethod
|
||||
def add_cluster_acl(zk_connect, principal="User:kafka"):
|
||||
return "--authorizer-properties zookeeper.connect=%(zk_connect)s --add --cluster " \
|
||||
"--operation=ClusterAction --allow-principal=%(principal)s " % {
|
||||
'zk_connect': zk_connect,
|
||||
'principal': principal
|
||||
}
|
||||
return connection_setting
|
||||
|
||||
@staticmethod
|
||||
def broker_read_acl(zk_connect, topic, principal="User:kafka"):
|
||||
return "--authorizer-properties zookeeper.connect=%(zk_connect)s --add --topic=%(topic)s " \
|
||||
"--operation=Read --allow-principal=%(principal)s " % {
|
||||
'zk_connect': zk_connect,
|
||||
def _kafka_acls_cmd_config(self, kafka, node, force_use_zk_connection):
|
||||
"""
|
||||
Return --command-config parameter to the kafka-acls.sh command. The config parameter specifies
|
||||
the security settings that AdminClient uses to connect to a secure kafka server.
|
||||
"""
|
||||
skip_command_config = force_use_zk_connection or not kafka.all_nodes_acl_command_supports_bootstrap_server()
|
||||
return "" if skip_command_config else " --command-config <(echo '%s')" % (kafka.security_config.client_config())
|
||||
|
||||
def _acl_cmd_prefix(self, kafka, node, force_use_zk_connection):
|
||||
"""
|
||||
:param node: Node to use when determining connection settings
|
||||
:param force_use_zk_connection: forces the use of ZooKeeper when true, otherwise AdminClient is used when available
|
||||
:return command prefix for running kafka-acls
|
||||
"""
|
||||
cmd = fix_opts_for_new_jvm(node)
|
||||
cmd += "%s %s %s" % (
|
||||
kafka.kafka_acls_cmd(node, force_use_zk_connection),
|
||||
self._acl_command_connect_setting(kafka, node, force_use_zk_connection),
|
||||
self._kafka_acls_cmd_config(kafka, node, force_use_zk_connection))
|
||||
return cmd
|
||||
|
||||
def _add_acl_on_topic(self, kafka, principal, topic, operation_flag, node, force_use_zk_connection):
|
||||
"""
|
||||
:param principal: principal for which ACL is created
|
||||
:param topic: topic for which ACL is created
|
||||
:param operation_flag: type of ACL created (e.g. --producer, --consumer, --operation=Read)
|
||||
:param node: Node to use when determining connection settings
|
||||
:param force_use_zk_connection: forces the use of ZooKeeper when true, otherwise AdminClient is used when available
|
||||
"""
|
||||
cmd = "%(cmd_prefix)s --add --topic=%(topic)s %(operation_flag)s --allow-principal=%(principal)s" % {
|
||||
'cmd_prefix': self._acl_cmd_prefix(kafka, node, force_use_zk_connection),
|
||||
'topic': topic,
|
||||
'operation_flag': operation_flag,
|
||||
'principal': principal
|
||||
}
|
||||
kafka.run_cli_tool(node, cmd)
|
||||
|
||||
@staticmethod
|
||||
def produce_acl(zk_connect, topic, principal="User:client"):
|
||||
return "--authorizer-properties zookeeper.connect=%(zk_connect)s --add --topic=%(topic)s " \
|
||||
"--producer --allow-principal=%(principal)s " % {
|
||||
'zk_connect': zk_connect,
|
||||
'topic': topic,
|
||||
def add_cluster_acl(self, kafka, principal, force_use_zk_connection=False):
|
||||
"""
|
||||
:param kafka: Kafka cluster upon which ClusterAction ACL is created
|
||||
:param principal: principal for which ClusterAction ACL is created
|
||||
:param node: Node to use when determining connection settings
|
||||
:param force_use_zk_connection: forces the use of ZooKeeper when true, otherwise AdminClient is used when available.
|
||||
This is necessary for the case where we are bootstrapping ACLs before Kafka is started or before authorizer is enabled
|
||||
"""
|
||||
node = kafka.nodes[0]
|
||||
|
||||
force_use_zk_connection = force_use_zk_connection or not kafka.all_nodes_acl_command_supports_bootstrap_server()
|
||||
|
||||
cmd = "%(cmd_prefix)s --add --cluster --operation=ClusterAction --allow-principal=%(principal)s" % {
|
||||
'cmd_prefix': self._acl_cmd_prefix(kafka, node, force_use_zk_connection),
|
||||
'principal': principal
|
||||
}
|
||||
kafka.run_cli_tool(node, cmd)
|
||||
|
||||
@staticmethod
|
||||
def consume_acl(zk_connect, topic, group, principal="User:client"):
|
||||
return "--authorizer-properties zookeeper.connect=%(zk_connect)s --add --topic=%(topic)s " \
|
||||
"--group=%(group)s --consumer --allow-principal=%(principal)s " % {
|
||||
'zk_connect': zk_connect,
|
||||
def add_read_acl(self, kafka, principal, topic, force_use_zk_connection=False):
|
||||
"""
|
||||
:param kafka: Kafka cluster upon which Read ACL is created
|
||||
:param principal: principal for which Read ACL is created
|
||||
:param topic: topic for which Read ACL is created
|
||||
:param node: Node to use when determining connection settings
|
||||
:param force_use_zk_connection: forces the use of ZooKeeper when true, otherwise AdminClient is used when available.
|
||||
This is necessary for the case where we are bootstrapping ACLs before Kafka is started or before authorizer is enabled
|
||||
"""
|
||||
node = kafka.nodes[0]
|
||||
|
||||
force_use_zk_connection = force_use_zk_connection or not kafka.all_nodes_acl_command_supports_bootstrap_server()
|
||||
|
||||
self._add_acl_on_topic(kafka, principal, topic, "--operation=Read", node, force_use_zk_connection)
|
||||
|
||||
def add_produce_acl(self, kafka, principal, topic, force_use_zk_connection=False):
|
||||
"""
|
||||
:param kafka: Kafka cluster upon which Producer ACL is created
|
||||
:param principal: principal for which Producer ACL is created
|
||||
:param topic: topic for which Producer ACL is created
|
||||
:param node: Node to use when determining connection settings
|
||||
:param force_use_zk_connection: forces the use of ZooKeeper when true, otherwise AdminClient is used when available.
|
||||
This is necessary for the case where we are bootstrapping ACLs before Kafka is started or before authorizer is enabled
|
||||
"""
|
||||
node = kafka.nodes[0]
|
||||
|
||||
force_use_zk_connection = force_use_zk_connection or not kafka.all_nodes_acl_command_supports_bootstrap_server()
|
||||
|
||||
self._add_acl_on_topic(kafka, principal, topic, "--producer", node, force_use_zk_connection)
|
||||
|
||||
def add_consume_acl(self, kafka, principal, topic, group, force_use_zk_connection=False):
|
||||
"""
|
||||
:param kafka: Kafka cluster upon which Consumer ACL is created
|
||||
:param principal: principal for which Consumer ACL is created
|
||||
:param topic: topic for which Consumer ACL is created
|
||||
:param group: consumewr group for which Consumer ACL is created
|
||||
:param node: Node to use when determining connection settings
|
||||
:param force_use_zk_connection: forces the use of ZooKeeper when true, otherwise AdminClient is used when available.
|
||||
This is necessary for the case where we are bootstrapping ACLs before Kafka is started or before authorizer is enabled
|
||||
"""
|
||||
node = kafka.nodes[0]
|
||||
|
||||
force_use_zk_connection = force_use_zk_connection or not kafka.all_nodes_acl_command_supports_bootstrap_server()
|
||||
|
||||
cmd = "%(cmd_prefix)s --add --topic=%(topic)s --group=%(group)s --consumer --allow-principal=%(principal)s" % {
|
||||
'cmd_prefix': self._acl_cmd_prefix(kafka, node, force_use_zk_connection),
|
||||
'topic': topic,
|
||||
'group': group,
|
||||
'principal': principal
|
||||
}
|
||||
kafka.run_cli_tool(node, cmd)
|
||||
|
||||
|
|
|
@ -203,13 +203,16 @@ class SecurityConfig(TemplateRenderer):
|
|||
# If node is not specified, use static jaas config which will be created later.
|
||||
# Otherwise use static JAAS configuration files with SASL_SSL and sasl.jaas.config
|
||||
# property with SASL_PLAINTEXT so that both code paths are tested by existing tests.
|
||||
# Note that this is an artibtrary choice and it is possible to run all tests with
|
||||
# Note that this is an arbitrary choice and it is possible to run all tests with
|
||||
# either static or dynamic jaas config files if required.
|
||||
static_jaas_conf = node is None or (self.has_sasl and self.has_ssl)
|
||||
if use_inter_broker_mechanism_for_client:
|
||||
client_sasl_mechanism_to_use = self.interbroker_sasl_mechanism
|
||||
else:
|
||||
client_sasl_mechanism_to_use = self.client_sasl_mechanism
|
||||
# csv is supported here, but client configs only supports a single mechanism,
|
||||
# so arbitrarily take the first one defined in case it has multiple values
|
||||
client_sasl_mechanism_to_use = self.client_sasl_mechanism.split(',')[0].strip()
|
||||
|
||||
return SecurityConfig(self.context, self.security_protocol,
|
||||
client_sasl_mechanism=client_sasl_mechanism_to_use,
|
||||
template_props=template_props,
|
||||
|
@ -297,7 +300,8 @@ class SecurityConfig(TemplateRenderer):
|
|||
self.export_kafka_opts_for_admin_client_as_broker())
|
||||
|
||||
def maybe_create_scram_credentials(self, node, connect, path, mechanism, user_name, password, kafka_opts_for_admin_client_as_broker = ""):
|
||||
if self.has_sasl and self.is_sasl_scram(mechanism):
|
||||
# we only need to create these credentials when the client and broker mechanisms are both SASL/SCRAM
|
||||
if self.has_sasl and self.is_sasl_scram(mechanism) and self.is_sasl_scram(self.interbroker_sasl_mechanism):
|
||||
cmd = "%s %s %s --entity-name %s --entity-type users --alter --add-config %s=[password=%s]" % \
|
||||
(kafka_opts_for_admin_client_as_broker, path.script("kafka-configs.sh", node), connect,
|
||||
user_name, mechanism, password)
|
||||
|
|
|
@ -72,9 +72,10 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
|
|||
|
||||
def set_authorizer_and_bounce(self, client_protocol, broker_protocol, authorizer_class_name = KafkaService.ACL_AUTHORIZER):
|
||||
self.kafka.authorizer_class_name = authorizer_class_name
|
||||
self.acls.set_acls(client_protocol, self.kafka, self.topic, self.group)
|
||||
self.acls.set_acls(broker_protocol, self.kafka, self.topic, self.group)
|
||||
self.bounce()
|
||||
# Force use of direct ZooKeeper access due to SecurityDisabledException: No Authorizer is configured on the broker.
|
||||
self.acls.set_acls(client_protocol, self.kafka, self.topic, self.group, force_use_zk_connection=True)
|
||||
self.acls.set_acls(broker_protocol, self.kafka, self.topic, self.group, force_use_zk_connection=True)
|
||||
self.bounce() # enables the authorizer
|
||||
|
||||
def open_secured_port(self, client_protocol):
|
||||
self.kafka.security_protocol = client_protocol
|
||||
|
|
|
@ -67,8 +67,9 @@ class SecurityTest(EndToEndTest):
|
|||
with hostname verification failure. Hence clients are expected to fail with LEADER_NOT_AVAILABLE.
|
||||
"""
|
||||
|
||||
# Start Kafka with valid hostnames in the certs' SANs so that we can create the test topic via the admin client
|
||||
SecurityConfig.ssl_stores = TestSslStores(self.test_context.local_scratch_dir,
|
||||
valid_hostname=False)
|
||||
valid_hostname=True)
|
||||
|
||||
self.create_zookeeper()
|
||||
self.zk.start()
|
||||
|
@ -77,6 +78,10 @@ class SecurityTest(EndToEndTest):
|
|||
interbroker_security_protocol=interbroker_security_protocol)
|
||||
self.kafka.start()
|
||||
|
||||
# now set the certs to have invalid hostnames so we can run the actual test
|
||||
SecurityConfig.ssl_stores.valid_hostname = False
|
||||
self.kafka.restart_cluster()
|
||||
|
||||
# We need more verbose logging to catch the expected errors
|
||||
self.create_and_start_clients(log_level="DEBUG")
|
||||
|
||||
|
|
|
@ -96,7 +96,8 @@ class ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest):
|
|||
# set acls
|
||||
if self.is_secure:
|
||||
self.kafka.authorizer_class_name = KafkaService.ACL_AUTHORIZER
|
||||
self.acls.set_acls(security_protocol, self.kafka, self.topic, self.group)
|
||||
# Force use of direct ZooKeeper access because Kafka is not yet started
|
||||
self.acls.set_acls(security_protocol, self.kafka, self.topic, self.group, force_use_zk_connection=True)
|
||||
|
||||
if self.no_sasl:
|
||||
self.kafka.start()
|
||||
|
|
|
@ -69,9 +69,7 @@ class ZookeeperTlsEncryptOnlyTest(ProduceConsumeValidateTest):
|
|||
self.zk.start()
|
||||
self.kafka.security_protocol = self.kafka.interbroker_security_protocol = "PLAINTEXT"
|
||||
|
||||
# Cannot use --zookeeper because kafka-topics.sh is unable to connect to a TLS-enabled ZooKeeper quorum,
|
||||
# so indicate that topics should be created via the Admin client
|
||||
self.kafka.start(use_zk_to_create_topic=False)
|
||||
self.kafka.start()
|
||||
|
||||
self.perform_produce_consume_validation()
|
||||
|
||||
|
|
|
@ -52,9 +52,15 @@ class KafkaVersion(LooseVersion):
|
|||
def supports_named_listeners(self):
|
||||
return self >= V_0_10_2_0
|
||||
|
||||
def acl_command_supports_bootstrap_server(self):
|
||||
return self >= V_2_1_0
|
||||
|
||||
def topic_command_supports_bootstrap_server(self):
|
||||
return self >= V_2_3_0
|
||||
|
||||
def topic_command_supports_if_not_exists_with_bootstrap_server(self):
|
||||
return self >= V_2_6_0
|
||||
|
||||
def supports_tls_to_zookeeper(self):
|
||||
# indicate if KIP-515 is available
|
||||
return self >= V_2_5_0
|
||||
|
|
Loading…
Reference in New Issue