KAFKA-4580; Use sasl.jaas.config for some system tests

Switched console_consumer, verifiable_consumer and verifiable_producer to use new sasl.jaas_config property instead of static JAAS configuration file when used with SASL_PLAINTEXT.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #2323 from rajinisivaram/KAFKA-4580
This commit is contained in:
Rajini Sivaram 2017-01-17 18:42:55 +00:00 committed by Ismael Juma
parent 7a84b241ee
commit 3f6c4f63c9
5 changed files with 35 additions and 15 deletions

View File

@ -150,7 +150,8 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
# Add security properties to the config. If security protocol is not specified,
# use the default in the template properties.
self.security_config = self.kafka.security_config.client_config(prop_file)
self.security_config = self.kafka.security_config.client_config(prop_file, node)
self.security_config.setup_node(node)
prop_file += str(self.security_config)
return prop_file
@ -231,7 +232,6 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
prop_file = self.prop_file(node)
self.logger.info(prop_file)
node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file)
self.security_config.setup_node(node)
# Create and upload log properties
log_config = self.render('tools_log4j.properties', log_file=ConsoleConsumer.LOG_FILE)

View File

@ -112,7 +112,7 @@ class SecurityConfig(TemplateRenderer):
def __init__(self, context, security_protocol=None, interbroker_security_protocol=None,
client_sasl_mechanism=SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI,
zk_sasl=False, template_props=""):
zk_sasl=False, template_props="", static_jaas_conf=True):
"""
Initialize the security properties for the node and copy
keystore and truststore to the remote node if the transport protocol
@ -143,6 +143,7 @@ class SecurityConfig(TemplateRenderer):
self.has_sasl = self.is_sasl(security_protocol) or self.is_sasl(interbroker_security_protocol) or zk_sasl
self.has_ssl = self.is_ssl(security_protocol) or self.is_ssl(interbroker_security_protocol)
self.zk_sasl = zk_sasl
self.static_jaas_conf = static_jaas_conf
self.properties = {
'security.protocol' : security_protocol,
'ssl.keystore.location' : SecurityConfig.KEYSTORE_PATH,
@ -156,8 +157,14 @@ class SecurityConfig(TemplateRenderer):
'sasl.kerberos.service.name' : 'kafka'
}
def client_config(self, template_props=""):
return SecurityConfig(self.context, self.security_protocol, client_sasl_mechanism=self.client_sasl_mechanism, template_props=template_props)
def client_config(self, template_props="", node=None):
# If node is not specified, use static jaas config which will be created later.
# Otherwise use static JAAS configuration files with SASL_SSL and sasl.jaas.config
# property with SASL_PLAINTEXT so that both code paths are tested by existing tests.
# Note that this is an artibtrary choice and it is possible to run all tests with
# either static or dynamic jaas config files if required.
static_jaas_conf = node is None or (self.has_sasl and self.has_ssl)
return SecurityConfig(self.context, self.security_protocol, client_sasl_mechanism=self.client_sasl_mechanism, template_props=template_props, static_jaas_conf=static_jaas_conf)
def setup_ssl(self, node):
node.account.ssh("mkdir -p %s" % SecurityConfig.CONFIG_DIR, allow_fail=False)
@ -175,8 +182,12 @@ class SecurityConfig(TemplateRenderer):
jaas_conf = self.render(jaas_conf_file, node=node, is_ibm_jdk=is_ibm_jdk,
SecurityConfig=SecurityConfig,
client_sasl_mechanism=self.client_sasl_mechanism,
enabled_sasl_mechanisms=self.enabled_sasl_mechanisms)
node.account.create_file(SecurityConfig.JAAS_CONF_PATH, jaas_conf)
enabled_sasl_mechanisms=self.enabled_sasl_mechanisms,
static_jaas_conf=self.static_jaas_conf)
if self.static_jaas_conf:
node.account.create_file(SecurityConfig.JAAS_CONF_PATH, jaas_conf)
else:
self.properties['sasl.jaas.config'] = jaas_conf.replace("\n", " \\\n")
if self.has_sasl_kerberos:
node.account.copy_to(MiniKdc.LOCAL_KEYTAB_FILE, SecurityConfig.KEYTAB_PATH)
node.account.copy_to(MiniKdc.LOCAL_KRB5CONF_FILE, SecurityConfig.KRB5CONF_PATH)
@ -251,7 +262,10 @@ class SecurityConfig(TemplateRenderer):
@property
def kafka_opts(self):
if self.has_sasl:
return "\"-Djava.security.auth.login.config=%s -Djava.security.krb5.conf=%s\"" % (SecurityConfig.JAAS_CONF_PATH, SecurityConfig.KRB5CONF_PATH)
if self.static_jaas_conf:
return "\"-Djava.security.auth.login.config=%s -Djava.security.krb5.conf=%s\"" % (SecurityConfig.JAAS_CONF_PATH, SecurityConfig.KRB5CONF_PATH)
else:
return "\"-Djava.security.krb5.conf=%s\"" % SecurityConfig.KRB5CONF_PATH
else:
return ""
@ -265,6 +279,8 @@ class SecurityConfig(TemplateRenderer):
"""
if self.security_protocol == SecurityConfig.PLAINTEXT:
return ""
if self.has_sasl and not self.static_jaas_conf and 'sasl.jaas.config' not in self.properties:
raise Exception("JAAS configuration property has not yet been initialized")
config_lines = (prefix + key + "=" + value for key, value in self.properties.iteritems())
# Extra blank lines ensure this can be appended/prepended safely
return "\n".join(itertools.chain([""], config_lines, [""]))

View File

@ -12,7 +12,9 @@
*/
{% if static_jaas_conf %}
KafkaClient {
{% endif %}
{% if client_sasl_mechanism == "GSSAPI" %}
{% if is_ibm_jdk %}
com.ibm.security.auth.module.Krb5LoginModule required debug=false
@ -37,6 +39,7 @@ KafkaClient {
password="{{ SecurityConfig.SCRAM_CLIENT_PASSWORD }}";
{% endif %}
{% if static_jaas_conf %}
};
KafkaServer {
@ -102,3 +105,4 @@ Server {
{% endif %}
};
{% endif %}
{% endif %}

View File

@ -148,8 +148,6 @@ class VerifiableConsumer(KafkaPathResolverMixin, BackgroundThreadService):
self.enable_autocommit = enable_autocommit
self.assignment_strategy = assignment_strategy
self.prop_file = ""
self.security_config = kafka.security_config.client_config(self.prop_file)
self.prop_file += str(self.security_config)
self.stop_timeout_sec = stop_timeout_sec
self.event_handlers = {}
@ -171,6 +169,9 @@ class VerifiableConsumer(KafkaPathResolverMixin, BackgroundThreadService):
node.account.create_file(VerifiableConsumer.LOG4J_CONFIG, log_config)
# Create and upload config file
self.security_config = self.kafka.security_config.client_config(self.prop_file, node)
self.security_config.setup_node(node)
self.prop_file += str(self.security_config)
self.logger.info("verifiable_consumer.properties:")
self.logger.info(self.prop_file)
node.account.create_file(VerifiableConsumer.CONFIG_FILE, self.prop_file)

View File

@ -83,10 +83,6 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
self.acks = acks
self.stop_timeout_sec = stop_timeout_sec
@property
def security_config(self):
return self.kafka.security_config.client_config()
def prop_file(self, node):
idx = self.idx(node)
prop_file = str(self.security_config)
@ -104,6 +100,10 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
log_config = self.render('tools_log4j.properties', log_file=VerifiableProducer.LOG_FILE)
node.account.create_file(VerifiableProducer.LOG4J_CONFIG, log_config)
# Configure security
self.security_config = self.kafka.security_config.client_config(node=node)
self.security_config.setup_node(node)
# Create and upload config file
producer_prop_file = self.prop_file(node)
if self.acks is not None:
@ -112,7 +112,6 @@ class VerifiableProducer(KafkaPathResolverMixin, BackgroundThreadService):
self.logger.info("verifiable_producer.properties:")
self.logger.info(producer_prop_file)
node.account.create_file(VerifiableProducer.CONFIG_FILE, producer_prop_file)
self.security_config.setup_node(node)
cmd = self.start_cmd(node, idx)
self.logger.debug("VerifiableProducer %d command: %s" % (idx, cmd))