mirror of https://github.com/apache/kafka.git
KAFKA-7896; Add sasl.jaas.config/sasl.mechanism props to the log4j kafka appender
This patch adds 2 props to the log4j kafka appender that get put directly into the sasl properties passed to the producer: - ClientJaasConf: This property sets sasl.jaas.config - SaslMechanim: This property sets sasl.mechanism Author: Rohan Desai <desai.p.rohan@gmail.com> Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com> Closes #6216 from rodesai/add-kafka-appender-security-props
This commit is contained in:
parent
d2575f03a3
commit
08036fa4b1
|
@ -41,6 +41,8 @@ import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CL
|
||||||
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
|
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
|
||||||
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
|
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
|
||||||
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
|
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
|
||||||
|
import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
|
||||||
|
import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM;
|
||||||
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
|
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
|
||||||
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
|
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
|
||||||
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_TYPE_CONFIG;
|
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_TYPE_CONFIG;
|
||||||
|
@ -63,7 +65,9 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
|
||||||
private String sslKeystoreLocation;
|
private String sslKeystoreLocation;
|
||||||
private String sslKeystorePassword;
|
private String sslKeystorePassword;
|
||||||
private String saslKerberosServiceName;
|
private String saslKerberosServiceName;
|
||||||
|
private String saslMechanism;
|
||||||
private String clientJaasConfPath;
|
private String clientJaasConfPath;
|
||||||
|
private String clientJaasConf;
|
||||||
private String kerb5ConfPath;
|
private String kerb5ConfPath;
|
||||||
private Integer maxBlockMs;
|
private Integer maxBlockMs;
|
||||||
|
|
||||||
|
@ -210,6 +214,22 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
|
||||||
return clientJaasConfPath;
|
return clientJaasConfPath;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setSaslMechanism(String saslMechanism) {
|
||||||
|
this.saslMechanism = saslMechanism;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getSaslMechanism() {
|
||||||
|
return this.saslMechanism;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setClientJaasConf(final String clientJaasConf) {
|
||||||
|
this.clientJaasConf = clientJaasConf;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getClientJaasConf() {
|
||||||
|
return this.clientJaasConf;
|
||||||
|
}
|
||||||
|
|
||||||
public String getKerb5ConfPath() {
|
public String getKerb5ConfPath() {
|
||||||
return kerb5ConfPath;
|
return kerb5ConfPath;
|
||||||
}
|
}
|
||||||
|
@ -257,9 +277,15 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
|
||||||
if (securityProtocol != null && securityProtocol.contains("SASL") && saslKerberosServiceName != null && clientJaasConfPath != null) {
|
if (securityProtocol != null && securityProtocol.contains("SASL") && saslKerberosServiceName != null && clientJaasConfPath != null) {
|
||||||
props.put(SASL_KERBEROS_SERVICE_NAME, saslKerberosServiceName);
|
props.put(SASL_KERBEROS_SERVICE_NAME, saslKerberosServiceName);
|
||||||
System.setProperty("java.security.auth.login.config", clientJaasConfPath);
|
System.setProperty("java.security.auth.login.config", clientJaasConfPath);
|
||||||
if (kerb5ConfPath != null) {
|
}
|
||||||
System.setProperty("java.security.krb5.conf", kerb5ConfPath);
|
if (kerb5ConfPath != null) {
|
||||||
}
|
System.setProperty("java.security.krb5.conf", kerb5ConfPath);
|
||||||
|
}
|
||||||
|
if (saslMechanism != null) {
|
||||||
|
props.put(SASL_MECHANISM, saslMechanism);
|
||||||
|
}
|
||||||
|
if (clientJaasConf != null) {
|
||||||
|
props.put(SASL_JAAS_CONFIG, clientJaasConf);
|
||||||
}
|
}
|
||||||
if (maxBlockMs != null) {
|
if (maxBlockMs != null) {
|
||||||
props.put(MAX_BLOCK_MS_CONFIG, maxBlockMs);
|
props.put(MAX_BLOCK_MS_CONFIG, maxBlockMs);
|
||||||
|
|
|
@ -16,9 +16,14 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.kafka.log4jappender;
|
package org.apache.kafka.log4jappender;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
|
import static org.hamcrest.CoreMatchers.hasItem;
|
||||||
|
import static org.hamcrest.CoreMatchers.not;
|
||||||
|
|
||||||
import org.apache.kafka.clients.producer.MockProducer;
|
import org.apache.kafka.clients.producer.MockProducer;
|
||||||
import org.apache.kafka.clients.producer.RecordMetadata;
|
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||||
import org.apache.kafka.common.config.ConfigException;
|
import org.apache.kafka.common.config.ConfigException;
|
||||||
|
import org.apache.kafka.common.config.SaslConfigs;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.log4j.PropertyConfigurator;
|
import org.apache.log4j.PropertyConfigurator;
|
||||||
import org.apache.log4j.helpers.LogLog;
|
import org.apache.log4j.helpers.LogLog;
|
||||||
|
@ -77,6 +82,45 @@ public class KafkaLog4jAppenderTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetSaslMechanism() {
|
||||||
|
Properties props = getLog4jConfig(false);
|
||||||
|
props.put("log4j.appender.KAFKA.SaslMechanism", "PLAIN");
|
||||||
|
PropertyConfigurator.configure(props);
|
||||||
|
|
||||||
|
MockKafkaLog4jAppender mockKafkaLog4jAppender = getMockKafkaLog4jAppender();
|
||||||
|
Assert.assertThat(
|
||||||
|
mockKafkaLog4jAppender.getProducerProperties().getProperty(SaslConfigs.SASL_MECHANISM),
|
||||||
|
equalTo("PLAIN"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSaslMechanismNotSet() {
|
||||||
|
testProducerPropertyNotSet(SaslConfigs.SASL_MECHANISM);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetJaasConfig() {
|
||||||
|
Properties props = getLog4jConfig(false);
|
||||||
|
props.put("log4j.appender.KAFKA.ClientJaasConf", "jaas-config");
|
||||||
|
PropertyConfigurator.configure(props);
|
||||||
|
|
||||||
|
MockKafkaLog4jAppender mockKafkaLog4jAppender = getMockKafkaLog4jAppender();
|
||||||
|
Assert.assertThat(
|
||||||
|
mockKafkaLog4jAppender.getProducerProperties().getProperty(SaslConfigs.SASL_JAAS_CONFIG),
|
||||||
|
equalTo("jaas-config"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJaasConfigNotSet() {
|
||||||
|
testProducerPropertyNotSet(SaslConfigs.SASL_JAAS_CONFIG);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testProducerPropertyNotSet(String name) {
|
||||||
|
PropertyConfigurator.configure(getLog4jConfig(false));
|
||||||
|
MockKafkaLog4jAppender mockKafkaLog4jAppender = getMockKafkaLog4jAppender();
|
||||||
|
Assert.assertThat(mockKafkaLog4jAppender.getProducerProperties().stringPropertyNames(), not(hasItem(name)));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testLog4jAppends() {
|
public void testLog4jAppends() {
|
||||||
|
|
|
@ -29,8 +29,11 @@ public class MockKafkaLog4jAppender extends KafkaLog4jAppender {
|
||||||
private MockProducer<byte[], byte[]> mockProducer =
|
private MockProducer<byte[], byte[]> mockProducer =
|
||||||
new MockProducer<>(false, new MockSerializer(), new MockSerializer());
|
new MockProducer<>(false, new MockSerializer(), new MockSerializer());
|
||||||
|
|
||||||
|
private Properties producerProperties;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Producer<byte[], byte[]> getKafkaProducer(Properties props) {
|
protected Producer<byte[], byte[]> getKafkaProducer(Properties props) {
|
||||||
|
producerProperties = props;
|
||||||
return mockProducer;
|
return mockProducer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,4 +52,8 @@ public class MockKafkaLog4jAppender extends KafkaLog4jAppender {
|
||||||
List<ProducerRecord<byte[], byte[]>> getHistory() {
|
List<ProducerRecord<byte[], byte[]>> getHistory() {
|
||||||
return mockProducer.history();
|
return mockProducer.history();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Properties getProducerProperties() {
|
||||||
|
return producerProperties;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue