KAFKA-3077: Enable KafkaLog4jAppender to work with SASL enabled brokers

Author: Ashish Singh <asingh@cloudera.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #740 from SinghAsDev/KAFKA-3077
This commit is contained in:
Ashish Singh 2016-01-11 23:14:14 -08:00 committed by Ewen Cheslack-Postava
parent c9114488b3
commit 2adeb214b1
1 changed files with 40 additions and 2 deletions

View File

@ -24,6 +24,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.helpers.LogLog;
@ -51,6 +52,7 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
private static final String SSL_KEYSTORE_TYPE = SslConfigs.SSL_KEYSTORE_TYPE_CONFIG;
private static final String SSL_KEYSTORE_LOCATION = SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG;
private static final String SSL_KEYSTORE_PASSWORD = SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG;
private static final String SASL_KERBEROS_SERVICE_NAME = SaslConfigs.SASL_KERBEROS_SERVICE_NAME;
private String brokerList = null;
private String topic = null;
@ -61,6 +63,9 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
private String sslKeystoreType = null;
private String sslKeystoreLocation = null;
private String sslKeystorePassword = null;
private String saslKerberosServiceName = null;
private String clientJaasConfPath = null;
private String kerb5ConfPath = null;
private int retries = 0;
private int requiredNumAcks = Integer.MAX_VALUE;
@ -155,6 +160,18 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
this.sslKeystoreLocation = sslKeystoreLocation;
}
public void setSaslKerberosServiceName(String saslKerberosServiceName) {
this.saslKerberosServiceName = saslKerberosServiceName;
}
public void setClientJaasConfPath(String clientJaasConfPath) {
this.clientJaasConfPath = clientJaasConfPath;
}
public void setKerb5ConfPath(String kerb5ConfPath) {
this.kerb5ConfPath = kerb5ConfPath;
}
public String getSslKeystoreLocation() {
return sslKeystoreLocation;
}
@ -167,6 +184,18 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
return sslKeystorePassword;
}
public String getSaslKerberosServiceName() {
return saslKerberosServiceName;
}
public String getClientJaasConfPath() {
return clientJaasConfPath;
}
public String getKerb5ConfPath() {
return kerb5ConfPath;
}
@Override
public void activateOptions() {
// check for config parameter validity
@ -183,9 +212,11 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
props.put(ACKS_CONFIG, Integer.toString(requiredNumAcks));
if (retries > 0)
props.put(RETRIES_CONFIG, retries);
if (securityProtocol != null && sslTruststoreLocation != null &&
sslTruststorePassword != null) {
if (securityProtocol != null) {
props.put(SECURITY_PROTOCOL, securityProtocol);
}
if (securityProtocol != null && securityProtocol.contains("SSL") && sslTruststoreLocation != null &&
sslTruststorePassword != null) {
props.put(SSL_TRUSTSTORE_LOCATION, sslTruststoreLocation);
props.put(SSL_TRUSTSTORE_PASSWORD, sslTruststorePassword);
@ -196,6 +227,13 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
props.put(SSL_KEYSTORE_PASSWORD, sslKeystorePassword);
}
}
if (securityProtocol != null && securityProtocol.contains("SASL") && saslKerberosServiceName != null && clientJaasConfPath != null) {
props.put(SASL_KERBEROS_SERVICE_NAME, saslKerberosServiceName);
System.setProperty("java.security.auth.login.config", clientJaasConfPath);
if (kerb5ConfPath != null) {
System.setProperty("java.security.krb5.conf", kerb5ConfPath);
}
}
props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");