KAFKA-3078: Add ducktape tests for KafkaLog4jAppender producing to SASL enabled Kafka cluster

Note that KAFKA-3077 will be required to run these tests.

Author: Ashish Singh <asingh@cloudera.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #747 from SinghAsDev/KAFKA-3078
This commit is contained in:
Ashish Singh 2016-01-11 23:15:42 -08:00 committed by Ewen Cheslack-Postava
parent 2adeb214b1
commit 3e5afbfa0d
3 changed files with 56 additions and 9 deletions

View File

@ -49,10 +49,18 @@ class KafkaLog4jAppender(BackgroundThreadService):
if self.max_messages > 0: if self.max_messages > 0:
cmd += " --max-messages %s" % str(self.max_messages) cmd += " --max-messages %s" % str(self.max_messages)
if self.security_protocol == SecurityConfig.SSL: if self.security_protocol != SecurityConfig.PLAINTEXT:
cmd += " --security-protocol SSL" cmd += " --security-protocol %s" % str(self.security_protocol)
if self.security_protocol == SecurityConfig.SSL or self.security_protocol == SecurityConfig.SASL_SSL:
cmd += " --ssl-truststore-location %s" % str(SecurityConfig.TRUSTSTORE_PATH) cmd += " --ssl-truststore-location %s" % str(SecurityConfig.TRUSTSTORE_PATH)
cmd += " --ssl-truststore-password %s" % str(SecurityConfig.ssl_stores['ssl.truststore.password']) cmd += " --ssl-truststore-password %s" % str(SecurityConfig.ssl_stores['ssl.truststore.password'])
if self.security_protocol == SecurityConfig.SASL_PLAINTEXT or \
self.security_protocol == SecurityConfig.SASL_SSL or \
self.security_protocol == SecurityConfig.SASL_MECHANISM_GSSAPI or \
self.security_protocol == SecurityConfig.SASL_MECHANISM_PLAIN:
cmd += " --sasl-kerberos-service-name %s" % str('kafka')
cmd += " --client-jaas-conf-path %s" % str(SecurityConfig.JAAS_CONF_PATH)
cmd += " --kerb5-conf-path %s" % str(SecurityConfig.KRB5CONF_PATH)
cmd += " 2>> /mnt/kafka_log4j_appender.log | tee -a /mnt/kafka_log4j_appender.log &" cmd += " 2>> /mnt/kafka_log4j_appender.log | tee -a /mnt/kafka_log4j_appender.log &"
return cmd return cmd

View File

@ -35,6 +35,7 @@ class Log4jAppenderTest(Test):
super(Log4jAppenderTest, self).__init__(test_context) super(Log4jAppenderTest, self).__init__(test_context)
self.num_zk = 1 self.num_zk = 1
self.num_brokers = 1 self.num_brokers = 1
self.messages_received_count = 0
self.topics = { self.topics = {
TOPIC: {'partitions': 1, 'replication-factor': 1} TOPIC: {'partitions': 1, 'replication-factor': 1}
} }
@ -56,13 +57,20 @@ class Log4jAppenderTest(Test):
security_protocol=security_protocol) security_protocol=security_protocol)
self.appender.start() self.appender.start()
def custom_message_validator(self, msg):
if msg and "INFO : org.apache.kafka.tools.VerifiableLog4jAppender" in msg:
self.logger.debug("Received message: %s" % msg)
self.messages_received_count += 1
def start_consumer(self, security_protocol): def start_consumer(self, security_protocol):
enable_new_consumer = security_protocol == SecurityConfig.SSL enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT
self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC, self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC,
consumer_timeout_ms=1000, new_consumer=enable_new_consumer) consumer_timeout_ms=1000, new_consumer=enable_new_consumer,
message_validator=self.custom_message_validator)
self.consumer.start() self.consumer.start()
@matrix(security_protocol=['PLAINTEXT', 'SSL']) @matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
def test_log4j_appender(self, security_protocol='PLAINTEXT'): def test_log4j_appender(self, security_protocol='PLAINTEXT'):
""" """
Tests if KafkaLog4jAppender is producing to Kafka topic Tests if KafkaLog4jAppender is producing to Kafka topic
@ -79,8 +87,7 @@ class Log4jAppenderTest(Test):
timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start") timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start")
# Verify consumed messages count # Verify consumed messages count
expected_lines_count = MAX_MESSAGES * 2 # two times to account for new lines introduced by log4j wait_until(lambda: self.messages_received_count == MAX_MESSAGES, timeout_sec=10,
wait_until(lambda: len(self.consumer.messages_consumed[1]) == expected_lines_count, timeout_sec=10,
err_msg="Timed out waiting to consume expected number of messages.") err_msg="Timed out waiting to consume expected number of messages.")
self.consumer.stop() self.consumer.stop()

View File

@ -21,6 +21,7 @@ import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser; import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException; import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace; import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator; import org.apache.log4j.PropertyConfigurator;
@ -96,7 +97,7 @@ public class VerifiableLog4jAppender {
.required(false) .required(false)
.setDefault("PLAINTEXT") .setDefault("PLAINTEXT")
.type(String.class) .type(String.class)
.choices("PLAINTEXT", "SSL") .choices("PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL")
.metavar("SECURITY-PROTOCOL") .metavar("SECURITY-PROTOCOL")
.dest("securityProtocol") .dest("securityProtocol")
.help("Security protocol to be used while communicating with Kafka brokers."); .help("Security protocol to be used while communicating with Kafka brokers.");
@ -124,6 +125,30 @@ public class VerifiableLog4jAppender {
.metavar("CONFIG_FILE") .metavar("CONFIG_FILE")
.help("Log4jAppender config properties file."); .help("Log4jAppender config properties file.");
parser.addArgument("--sasl-kerberos-service-name")
.action(store())
.required(false)
.type(String.class)
.metavar("SASL-KERBEROS-SERVICE-NAME")
.dest("saslKerberosServiceName")
.help("Name of sasl kerberos service.");
parser.addArgument("--client-jaas-conf-path")
.action(store())
.required(false)
.type(String.class)
.metavar("CLIENT-JAAS-CONF-PATH")
.dest("clientJaasConfPath")
.help("Path of JAAS config file of Kafka client.");
parser.addArgument("--kerb5-conf-path")
.action(store())
.required(false)
.type(String.class)
.metavar("KERB5-CONF-PATH")
.dest("kerb5ConfPath")
.help("Path of Kerb5 config file.");
return parser; return parser;
} }
@ -171,11 +196,18 @@ public class VerifiableLog4jAppender {
props.setProperty("log4j.appender.KAFKA.RequiredNumAcks", res.getString("acks")); props.setProperty("log4j.appender.KAFKA.RequiredNumAcks", res.getString("acks"));
props.setProperty("log4j.appender.KAFKA.SyncSend", "true"); props.setProperty("log4j.appender.KAFKA.SyncSend", "true");
final String securityProtocol = res.getString("securityProtocol"); final String securityProtocol = res.getString("securityProtocol");
if (securityProtocol != null && securityProtocol.equals("SSL")) { if (securityProtocol != null && !securityProtocol.equals(SecurityProtocol.PLAINTEXT.toString())) {
props.setProperty("log4j.appender.KAFKA.SecurityProtocol", securityProtocol); props.setProperty("log4j.appender.KAFKA.SecurityProtocol", securityProtocol);
}
if (securityProtocol != null && securityProtocol.contains("SSL")) {
props.setProperty("log4j.appender.KAFKA.SslTruststoreLocation", res.getString("sslTruststoreLocation")); props.setProperty("log4j.appender.KAFKA.SslTruststoreLocation", res.getString("sslTruststoreLocation"));
props.setProperty("log4j.appender.KAFKA.SslTruststorePassword", res.getString("sslTruststorePassword")); props.setProperty("log4j.appender.KAFKA.SslTruststorePassword", res.getString("sslTruststorePassword"));
} }
if (securityProtocol != null && securityProtocol.contains("SASL")) {
props.setProperty("log4j.appender.KAFKA.SaslKerberosServiceName", res.getString("saslKerberosServiceName"));
props.setProperty("log4j.appender.KAFKA.clientJaasConfPath", res.getString("clientJaasConfPath"));
props.setProperty("log4j.appender.KAFKA.kerb5ConfPath", res.getString("kerb5ConfPath"));
}
props.setProperty("log4j.logger.kafka.log4j", "INFO, KAFKA"); props.setProperty("log4j.logger.kafka.log4j", "INFO, KAFKA");
if (configFile != null) { if (configFile != null) {