From cea01af125a33b81f973a96501fe41ca9d698197 Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Fri, 29 Apr 2016 09:41:12 -0700 Subject: [PATCH] KAFKA-2693: Ducktape tests for SASL/PLAIN and multiple mechanisms Run a sanity test with SASL/PLAIN and a couple of replication tests with SASL/PLAIN and multiple mechanisms. Author: Rajini Sivaram Reviewers: Ismael Juma , Ewen Cheslack-Postava Closes #1282 from rajinisivaram/KAFKA-2693 --- .../sanity_checks/test_console_consumer.py | 5 +- tests/kafkatest/services/kafka/kafka.py | 14 ++- .../services/kafka/templates/kafka.properties | 5 +- .../services/security/security_config.py | 27 +++-- .../templates/{gssapi_jaas.conf => jaas.conf} | 103 ++++++++++-------- .../kafkatest/tests/core/replication_test.py | 7 +- 6 files changed, 97 insertions(+), 64 deletions(-) rename tests/kafkatest/services/security/templates/{gssapi_jaas.conf => jaas.conf} (61%) diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py index 139c74acfa4..d6a152a10bc 100644 --- a/tests/kafkatest/sanity_checks/test_console_consumer.py +++ b/tests/kafkatest/sanity_checks/test_console_consumer.py @@ -45,11 +45,14 @@ class ConsoleConsumerTest(Test): self.zk.start() @parametrize(security_protocol='PLAINTEXT', new_consumer=False) + @parametrize(security_protocol='SASL_SSL', sasl_mechanism='PLAIN') @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL']) - def test_lifecycle(self, security_protocol, new_consumer=True): + def test_lifecycle(self, security_protocol, new_consumer=True, sasl_mechanism='GSSAPI'): """Check that console consumer starts/stops properly, and that we are capturing log output.""" self.kafka.security_protocol = security_protocol + self.kafka.client_sasl_mechanism = sasl_mechanism + self.kafka.interbroker_sasl_mechanism = sasl_mechanism self.kafka.start() self.consumer.security_protocol = security_protocol diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 33ece3502f8..a74bb004c81 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -63,7 +63,8 @@ class KafkaService(JmxMixin, Service): } def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT, - sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, authorizer_class_name=None, topics=None, version=TRUNK, quota_config=None, jmx_object_names=None, + client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, + authorizer_class_name=None, topics=None, version=TRUNK, quota_config=None, jmx_object_names=None, jmx_attributes=[], zk_connect_timeout=5000): """ :type context @@ -78,7 +79,8 @@ class KafkaService(JmxMixin, Service): self.security_protocol = security_protocol self.interbroker_security_protocol = interbroker_security_protocol - self.sasl_mechanism = sasl_mechanism + self.client_sasl_mechanism = client_sasl_mechanism + self.interbroker_sasl_mechanism = interbroker_sasl_mechanism self.topics = topics self.minikdc = None self.authorizer_class_name = authorizer_class_name @@ -108,7 +110,9 @@ class KafkaService(JmxMixin, Service): @property def security_config(self): - return SecurityConfig(self.security_protocol, self.interbroker_security_protocol, zk_sasl = self.zk.zk_sasl , sasl_mechanism=self.sasl_mechanism) + return SecurityConfig(self.security_protocol, self.interbroker_security_protocol, + zk_sasl = self.zk.zk_sasl, + client_sasl_mechanism=self.client_sasl_mechanism, interbroker_sasl_mechanism=self.interbroker_sasl_mechanism) def open_port(self, protocol): self.port_mappings[protocol] = self.port_mappings[protocol]._replace(open=True) @@ -163,9 +167,7 @@ class KafkaService(JmxMixin, Service): # TODO - clean up duplicate configuration logic prop_file = cfg.render() prop_file += self.render('kafka.properties', node=node, broker_id=self.idx(node), - security_config=self.security_config, - interbroker_security_protocol=self.interbroker_security_protocol, - sasl_mechanism=self.sasl_mechanism) + security_config=self.security_config) return prop_file def start_cmd(self, node): diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties index a718ee2eefd..1e4f17c0d3c 100644 --- a/tests/kafkatest/services/kafka/templates/kafka.properties +++ b/tests/kafkatest/services/kafka/templates/kafka.properties @@ -50,7 +50,7 @@ quota.producer.bytes.per.second.overrides={{ quota_config.quota_producer_bytes_p quota.consumer.bytes.per.second.overrides={{ quota_config.quota_consumer_bytes_per_second_overrides }} {% endif %} -security.inter.broker.protocol={{ interbroker_security_protocol }} +security.inter.broker.protocol={{ security_config.interbroker_security_protocol }} ssl.keystore.location=/mnt/security/test.keystore.jks ssl.keystore.password=test-ks-passwd @@ -59,7 +59,8 @@ ssl.keystore.type=JKS ssl.truststore.location=/mnt/security/test.truststore.jks ssl.truststore.password=test-ts-passwd ssl.truststore.type=JKS -sasl.mechanism={{ sasl_mechanism }} +sasl.mechanism.inter.broker.protocol={{ security_config.interbroker_sasl_mechanism }} +sasl.enabled.mechanisms={{ ",".join(security_config.enabled_sasl_mechanisms) }} sasl.kerberos.service.name=kafka {% if authorizer_class_name is not none %} ssl.client.auth=required diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py index 1bbabd2359b..d7cc3c0a017 100644 --- a/tests/kafkatest/services/security/security_config.py +++ b/tests/kafkatest/services/security/security_config.py @@ -75,7 +75,9 @@ class SecurityConfig(TemplateRenderer): ssl_stores = Keytool.generate_keystore_truststore('.') - def __init__(self, security_protocol=None, interbroker_security_protocol=None, sasl_mechanism=SASL_MECHANISM_GSSAPI, zk_sasl=False, template_props=""): + def __init__(self, security_protocol=None, interbroker_security_protocol=None, + client_sasl_mechanism=SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI, + zk_sasl=False, template_props=""): """ Initialize the security properties for the node and copy keystore and truststore to the remote node if the transport protocol @@ -104,13 +106,14 @@ class SecurityConfig(TemplateRenderer): 'ssl.key.password' : SecurityConfig.ssl_stores['ssl.key.password'], 'ssl.truststore.location' : SecurityConfig.TRUSTSTORE_PATH, 'ssl.truststore.password' : SecurityConfig.ssl_stores['ssl.truststore.password'], - 'sasl.mechanism' : sasl_mechanism, + 'sasl.mechanism' : client_sasl_mechanism, + 'sasl.mechanism.inter.broker.protocol' : interbroker_sasl_mechanism, 'sasl.kerberos.service.name' : 'kafka' } def client_config(self, template_props=""): - return SecurityConfig(self.security_protocol, sasl_mechanism=self.sasl_mechanism, template_props=template_props) + return SecurityConfig(self.security_protocol, client_sasl_mechanism=self.client_sasl_mechanism, template_props=template_props) def setup_node(self, node): if self.has_ssl: @@ -120,13 +123,15 @@ class SecurityConfig(TemplateRenderer): if self.has_sasl: node.account.ssh("mkdir -p %s" % SecurityConfig.CONFIG_DIR, allow_fail=False) - jaas_conf_file = self.sasl_mechanism.lower() + "_jaas.conf" + jaas_conf_file = "jaas.conf" java_version = node.account.ssh_capture("java -version") if any('IBM' in line for line in java_version): is_ibm_jdk = True else: is_ibm_jdk = False - jaas_conf = self.render(jaas_conf_file, node=node, is_ibm_jdk=is_ibm_jdk) + jaas_conf = self.render(jaas_conf_file, node=node, is_ibm_jdk=is_ibm_jdk, + client_sasl_mechanism=self.client_sasl_mechanism, + enabled_sasl_mechanisms=self.enabled_sasl_mechanisms) node.account.create_file(SecurityConfig.JAAS_CONF_PATH, jaas_conf) if self.has_sasl_kerberos: node.account.scp_to(MiniKdc.LOCAL_KEYTAB_FILE, SecurityConfig.KEYTAB_PATH) @@ -159,12 +164,20 @@ class SecurityConfig(TemplateRenderer): return self.properties['security.protocol'] @property - def sasl_mechanism(self): + def client_sasl_mechanism(self): return self.properties['sasl.mechanism'] + @property + def interbroker_sasl_mechanism(self): + return self.properties['sasl.mechanism.inter.broker.protocol'] + + @property + def enabled_sasl_mechanisms(self): + return set([self.client_sasl_mechanism, self.interbroker_sasl_mechanism]) + @property def has_sasl_kerberos(self): - return self.has_sasl and self.sasl_mechanism == SecurityConfig.SASL_MECHANISM_GSSAPI + return self.has_sasl and (SecurityConfig.SASL_MECHANISM_GSSAPI in self.enabled_sasl_mechanisms) @property def kafka_opts(self): diff --git a/tests/kafkatest/services/security/templates/gssapi_jaas.conf b/tests/kafkatest/services/security/templates/jaas.conf similarity index 61% rename from tests/kafkatest/services/security/templates/gssapi_jaas.conf rename to tests/kafkatest/services/security/templates/jaas.conf index 6a629d9ec85..fbfa8af49c2 100644 --- a/tests/kafkatest/services/security/templates/gssapi_jaas.conf +++ b/tests/kafkatest/services/security/templates/jaas.conf @@ -11,76 +11,85 @@ * specific language governing permissions and limitations under the License. */ -{% if is_ibm_jdk %} KafkaClient { +{% if client_sasl_mechanism == "GSSAPI" %} +{% if is_ibm_jdk %} com.ibm.security.auth.module.Krb5LoginModule required debug=false credsType=both useKeytab="file:/mnt/security/keytab" principal="client@EXAMPLE.COM"; +{% else %} + com.sun.security.auth.module.Krb5LoginModule required debug=false + doNotPrompt=true + useKeyTab=true + storeKey=true + keyTab="/mnt/security/keytab" + principal="client@EXAMPLE.COM"; +{% endif %} +{% elif client_sasl_mechanism == "PLAIN" %} + org.apache.kafka.common.security.plain.PlainLoginModule required + username="client" + password="client-secret"; +{% endif %} + }; KafkaServer { +{% if "GSSAPI" in enabled_sasl_mechanisms %} +{% if is_ibm_jdk %} com.ibm.security.auth.module.Krb5LoginModule required debug=false credsType=both useKeytab="file:/mnt/security/keytab" principal="kafka/{{ node.account.hostname }}@EXAMPLE.COM"; +{% else %} + com.sun.security.auth.module.Krb5LoginModule required debug=false + doNotPrompt=true + useKeyTab=true + storeKey=true + keyTab="/mnt/security/keytab" + principal="kafka/{{ node.account.hostname }}@EXAMPLE.COM"; +{% endif %} +{% endif %} +{% if "PLAIN" in enabled_sasl_mechanisms %} + org.apache.kafka.common.security.plain.PlainLoginModule required + username="kafka" + password="kafka-secret" + user_client="client-secret" + user_kafka="kafka-secret"; +{% endif %} }; + {% if zk_sasl %} Client { +{% if is_ibm_jdk %} com.ibm.security.auth.module.Krb5LoginModule required debug=false credsType=both useKeytab="file:/mnt/security/keytab" principal="zkclient@EXAMPLE.COM"; -}; - -Server { - com.ibm.security.auth.module.Krb5LoginModule required debug=false - credsType=both - useKeyTab="file:/mnt/security/keytab" - principal="zookeeper/{{ node.account.hostname }}@EXAMPLE.COM"; -}; -{% endif %} {% else %} - -KafkaClient { - com.sun.security.auth.module.Krb5LoginModule required debug=false - doNotPrompt=true - useKeyTab=true - storeKey=true - keyTab="/mnt/security/keytab" - principal="client@EXAMPLE.COM"; -}; - -KafkaServer { - com.sun.security.auth.module.Krb5LoginModule required debug=false - doNotPrompt=true - useKeyTab=true - storeKey=true - keyTab="/mnt/security/keytab" - principal="kafka/{{ node.account.hostname }}@EXAMPLE.COM"; -}; - -{% if zk_sasl %} -Client { - com.sun.security.auth.module.Krb5LoginModule required - useKeyTab=true - keyTab="/mnt/security/keytab" - storeKey=true - useTicketCache=false - principal="zkclient@EXAMPLE.COM"; + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + keyTab="/mnt/security/keytab" + storeKey=true + useTicketCache=false + principal="zkclient@EXAMPLE.COM"; +{% endif %} }; Server { - com.sun.security.auth.module.Krb5LoginModule required - useKeyTab=true - keyTab="/mnt/security/keytab" - storeKey=true - useTicketCache=false - principal="zookeeper/{{ node.account.hostname }}@EXAMPLE.COM"; +{% if is_ibm_jdk %} + com.ibm.security.auth.module.Krb5LoginModule required debug=false + credsType=both + useKeyTab="file:/mnt/security/keytab" + principal="zookeeper/{{ node.account.hostname }}@EXAMPLE.COM"; +{% else %} + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + keyTab="/mnt/security/keytab" + storeKey=true + useTicketCache=false + principal="zookeeper/{{ node.account.hostname }}@EXAMPLE.COM"; +{% endif %} }; {% endif %} -{% endif %} - - - diff --git a/tests/kafkatest/tests/core/replication_test.py b/tests/kafkatest/tests/core/replication_test.py index 7b360abb16e..8e9474aec27 100644 --- a/tests/kafkatest/tests/core/replication_test.py +++ b/tests/kafkatest/tests/core/replication_test.py @@ -128,7 +128,10 @@ class ReplicationTest(ProduceConsumeValidateTest): @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], broker_type=["controller"], security_protocol=["PLAINTEXT", "SASL_SSL"]) - def test_replication_with_broker_failure(self, failure_mode, security_protocol, broker_type): + @matrix(failure_mode=["hard_bounce"], + broker_type=["leader"], + security_protocol=["SASL_SSL"], client_sasl_mechanism=["PLAIN"], interbroker_sasl_mechanism=["PLAIN", "GSSAPI"]) + def test_replication_with_broker_failure(self, failure_mode, security_protocol, broker_type, client_sasl_mechanism="GSSAPI", interbroker_sasl_mechanism="GSSAPI"): """Replication tests. These tests verify that replication provides simple durability guarantees by checking that data acked by brokers is still available for consumption in the face of various failure scenarios. @@ -144,6 +147,8 @@ class ReplicationTest(ProduceConsumeValidateTest): self.kafka.security_protocol = security_protocol self.kafka.interbroker_security_protocol = security_protocol + self.kafka.client_sasl_mechanism = client_sasl_mechanism + self.kafka.interbroker_sasl_mechanism = interbroker_sasl_mechanism new_consumer = False if self.kafka.security_protocol == "PLAINTEXT" else True self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput) self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)