KAFKA-8890: Make SSL context/engine configuration extensible (KIP-519) (#8338)

This commit is contained in:
maulin-vasavada 2020-04-08 07:20:32 -07:00 committed by GitHub
parent 833dc7725c
commit 9ba49b806a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 495 additions and 193 deletions

View File

@ -110,6 +110,9 @@ public class SslConfigs {
public static final String SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG = "ssl.secure.random.implementation";
public static final String SSL_SECURE_RANDOM_IMPLEMENTATION_DOC = "The SecureRandom PRNG implementation to use for SSL cryptography operations. ";
public static final String SSL_ENGINE_FACTORY_CLASS_CONFIG = "ssl.engine.factory.class";
public static final String SSL_ENGINE_FACTORY_CLASS_DOC = "The class of type org.apache.kafka.common.security.auth.SslEngineFactory to provide SSLEngine objects. Default value is org.apache.kafka.common.security.ssl.DefaultSslEngineFactory";
/**
* @deprecated As of 1.0.0. This field will be removed in a future major release.
*/
@ -136,7 +139,8 @@ public class SslConfigs {
.define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
.define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
.define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC)
.define(SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW, SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_DOC);
.define(SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW, SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_DOC)
.define(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, SslConfigs.SSL_ENGINE_FACTORY_CLASS_DOC);
}
public static final Set<String> RECONFIGURABLE_CONFIGS = Utils.mkSet(
@ -157,5 +161,6 @@ public class SslConfigs {
SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG,
SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG,
SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,
SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG);
SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG,
SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG);
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.security.auth;
import org.apache.kafka.common.Configurable;
import javax.net.ssl.SSLEngine;
import java.io.Closeable;
import java.security.KeyStore;
import java.util.Map;
import java.util.Set;
/**
* Plugin interface for allowing creation of SSLEngine object in a custom way.
* Example: You want to use custom way to load your key material and trust material needed for SSLContext.
* However, keep in mind that this is complementary to the existing Java Security Provider's mechanism and not a competing
* solution.
*/
public interface SslEngineFactory extends Configurable, Closeable {
/**
* Create a new SSLEngine object to be used by the client.
*
* @param peerHost The peer host to use. This is used in client mode if endpoint validation is enabled.
* @param peerPort The peer port to use. This is a hint and not used for validation.
* @param endpointIdentification Endpoint identification algorithm for client mode.
* @return The new SSLEngine.
*/
SSLEngine createClientSslEngine(String peerHost, int peerPort, String endpointIdentification);
/**
* Create a new SSLEngine object to be used by the server.
*
* @param peerHost The peer host to use. This is a hint and not used for validation.
* @param peerPort The peer port to use. This is a hint and not used for validation.
* @return The new SSLEngine.
*/
SSLEngine createServerSslEngine(String peerHost, int peerPort);
/**
* Returns true if SSLEngine needs to be rebuilt. This method will be called when reconfiguration is triggered on
* {@link org.apache.kafka.common.security.ssl.SslFactory}. Based on the <i>nextConfigs</i>, this method will
* decide whether underlying SSLEngine object needs to be rebuilt. If this method returns true, the
* {@link org.apache.kafka.common.security.ssl.SslFactory} will re-create instance of this object and run other
* checks before deciding to use the new object for the <i>new incoming connection</i> requests.The existing connections
* are not impacted by this and will not see any changes done as part of reconfiguration.
*
* <pre>
* Example: If the implementation depends on the file based key material it can check if the file is updated
* compared to the previous/last-loaded timestamp and return true.
* </pre>
*
* @param nextConfigs The configuration we want to use.
* @return True only if the underlying SSLEngine object should be rebuilt.
*/
boolean shouldBeRebuilt(Map<String, Object> nextConfigs);
/**
* Returns the names of configs that may be reconfigured.
*/
Set<String> reconfigurableConfigs();
/**
* Returns keystore.
* @return
*/
KeyStore keystore();
/**
* Returns truststore.
* @return
*/
KeyStore truststore();
}

View File

@ -20,8 +20,9 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.SslClientAuth;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.security.auth.SslEngineFactory;
import org.apache.kafka.common.utils.SecurityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -36,34 +37,77 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.SecureRandom;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.SecureRandom;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
public class SslEngineBuilder {
private static final Logger log = LoggerFactory.getLogger(SslEngineBuilder.class);
public final class DefaultSslEngineFactory implements SslEngineFactory {
private final Map<String, ?> configs;
private final String protocol;
private final String provider;
private final String kmfAlgorithm;
private final String tmfAlgorithm;
private final SecurityStore keystore;
private final SecurityStore truststore;
private final String[] cipherSuites;
private final String[] enabledProtocols;
private final SecureRandom secureRandomImplementation;
private final SSLContext sslContext;
private final SslClientAuth sslClientAuth;
private static final Logger log = LoggerFactory.getLogger(DefaultSslEngineFactory.class);
private Map<String, ?> configs;
private String protocol;
private String provider;
private String kmfAlgorithm;
private String tmfAlgorithm;
private SecurityStore keystore;
private SecurityStore truststore;
private String[] cipherSuites;
private String[] enabledProtocols;
private SecureRandom secureRandomImplementation;
private SSLContext sslContext;
private SslClientAuth sslClientAuth;
@Override
public SSLEngine createClientSslEngine(String peerHost, int peerPort, String endpointIdentification) {
return createSslEngine(Mode.CLIENT, peerHost, peerPort, endpointIdentification);
}
@Override
public SSLEngine createServerSslEngine(String peerHost, int peerPort) {
return createSslEngine(Mode.SERVER, peerHost, peerPort, null);
}
@Override
public boolean shouldBeRebuilt(Map<String, Object> nextConfigs) {
if (!nextConfigs.equals(configs)) {
return true;
}
if (truststore != null && truststore.modified()) {
return true;
}
if (keystore != null && keystore.modified()) {
return true;
}
return false;
}
@Override
public Set<String> reconfigurableConfigs() {
return SslConfigs.RECONFIGURABLE_CONFIGS;
}
@Override
public KeyStore keystore() {
return this.keystore != null ? this.keystore.get() : null;
}
@Override
public KeyStore truststore() {
return this.truststore != null ? this.truststore.get() : null;
}
@SuppressWarnings("unchecked")
SslEngineBuilder(Map<String, ?> configs) {
@Override
public void configure(Map<String, ?> configs) {
this.configs = Collections.unmodifiableMap(configs);
this.protocol = (String) configs.get(SslConfigs.SSL_PROTOCOL_CONFIG);
this.provider = (String) configs.get(SslConfigs.SSL_PROVIDER_CONFIG);
@ -104,6 +148,44 @@ public class SslEngineBuilder {
this.sslContext = createSSLContext();
}
@Override
public void close() throws IOException {
this.sslContext = null;
}
//For Test only
public SSLContext sslContext() {
return this.sslContext;
}
private SSLEngine createSslEngine(Mode mode, String peerHost, int peerPort, String endpointIdentification) {
SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort);
if (cipherSuites != null) sslEngine.setEnabledCipherSuites(cipherSuites);
if (enabledProtocols != null) sslEngine.setEnabledProtocols(enabledProtocols);
if (mode == Mode.SERVER) {
sslEngine.setUseClientMode(false);
switch (sslClientAuth) {
case REQUIRED:
sslEngine.setNeedClientAuth(true);
break;
case REQUESTED:
sslEngine.setWantClientAuth(true);
break;
case NONE:
break;
}
sslEngine.setUseClientMode(false);
} else {
sslEngine.setUseClientMode(true);
SSLParameters sslParams = sslEngine.getSSLParameters();
// SSLParameters#setEndpointIdentificationAlgorithm enables endpoint validation
// only in client mode. Hence, validation is enabled only for clients.
sslParams.setEndpointIdentificationAlgorithm(endpointIdentification);
sslEngine.setSSLParameters(sslParams);
}
return sslEngine;
}
private static SslClientAuth createSslClientAuth(String key) {
SslClientAuth auth = SslClientAuth.forConfig(key);
if (auth != null) {
@ -141,7 +223,7 @@ public class SslEngineBuilder {
this.kmfAlgorithm : KeyManagerFactory.getDefaultAlgorithm();
KeyManagerFactory kmf = KeyManagerFactory.getInstance(kmfAlgorithm);
if (keystore != null) {
KeyStore ks = keystore.load();
KeyStore ks = keystore.get();
Password keyPassword = keystore.keyPassword != null ? keystore.keyPassword : keystore.password;
kmf.init(ks, keyPassword.value().toCharArray());
} else {
@ -152,7 +234,7 @@ public class SslEngineBuilder {
String tmfAlgorithm = this.tmfAlgorithm != null ? this.tmfAlgorithm : TrustManagerFactory.getDefaultAlgorithm();
TrustManagerFactory tmf = TrustManagerFactory.getInstance(tmfAlgorithm);
KeyStore ts = truststore == null ? null : truststore.load();
KeyStore ts = truststore == null ? null : truststore.get();
tmf.init(ts);
sslContext.init(keyManagers, tmf.getTrustManagers(), this.secureRandomImplementation);
@ -184,80 +266,6 @@ public class SslEngineBuilder {
return null;
}
@SuppressWarnings("unchecked")
Map<String, Object> configs() {
return (Map<String, Object>) configs;
}
public SecurityStore keystore() {
return keystore;
}
public SecurityStore truststore() {
return truststore;
}
/**
* Create a new SSLEngine object.
*
* @param mode Whether to use client or server mode.
* @param peerHost The peer host to use. This is used in client mode if endpoint validation is enabled.
* @param peerPort The peer port to use. This is a hint and not used for validation.
* @param endpointIdentification Endpoint identification algorithm for client mode.
* @return The new SSLEngine.
*/
public SSLEngine createSslEngine(Mode mode, String peerHost, int peerPort, String endpointIdentification) {
SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort);
if (cipherSuites != null) sslEngine.setEnabledCipherSuites(cipherSuites);
if (enabledProtocols != null) sslEngine.setEnabledProtocols(enabledProtocols);
if (mode == Mode.SERVER) {
sslEngine.setUseClientMode(false);
switch (sslClientAuth) {
case REQUIRED:
sslEngine.setNeedClientAuth(true);
break;
case REQUESTED:
sslEngine.setWantClientAuth(true);
break;
case NONE:
break;
}
sslEngine.setUseClientMode(false);
} else {
sslEngine.setUseClientMode(true);
SSLParameters sslParams = sslEngine.getSSLParameters();
// SSLParameters#setEndpointIdentificationAlgorithm enables endpoint validation
// only in client mode. Hence, validation is enabled only for clients.
sslParams.setEndpointIdentificationAlgorithm(endpointIdentification);
sslEngine.setSSLParameters(sslParams);
}
return sslEngine;
}
public SSLContext sslContext() {
return sslContext;
}
/**
* Returns true if this SslEngineBuilder needs to be rebuilt.
*
* @param nextConfigs The configuration we want to use.
* @return True only if this builder should be rebuilt.
*/
public boolean shouldBeRebuilt(Map<String, Object> nextConfigs) {
if (!nextConfigs.equals(configs)) {
return true;
}
if (truststore != null && truststore.modified()) {
return true;
}
if (keystore != null && keystore.modified()) {
return true;
}
return false;
}
// package access for testing
static class SecurityStore {
private final String type;
@ -265,6 +273,7 @@ public class SslEngineBuilder {
private final Password password;
private final Password keyPassword;
private final Long fileLastModifiedMs;
private final KeyStore keyStore;
SecurityStore(String type, String path, Password password, Password keyPassword) {
Objects.requireNonNull(type, "type must not be null");
@ -273,6 +282,11 @@ public class SslEngineBuilder {
this.password = password;
this.keyPassword = keyPassword;
fileLastModifiedMs = lastModifiedMs(path);
this.keyStore = load();
}
KeyStore get() {
return keyStore;
}
/**
@ -281,7 +295,7 @@ public class SslEngineBuilder {
* @throws KafkaException if the file could not be read or if the keystore could not be loaded
* using the specified configs (e.g. if the password or keystore type is invalid)
*/
KeyStore load() {
private KeyStore load() {
try (InputStream in = Files.newInputStream(Paths.get(path))) {
KeyStore ks = KeyStore.getInstance(type);
// If a password is not set access to the truststore is still available, but integrity checking is disabled.

View File

@ -19,15 +19,14 @@ package org.apache.kafka.common.security.ssl;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Reconfigurable;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.security.auth.SslEngineFactory;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLException;
@ -55,7 +54,8 @@ public class SslFactory implements Reconfigurable {
private final String clientAuthConfigOverride;
private final boolean keystoreVerifiableUsingTruststore;
private String endpointIdentification;
private SslEngineBuilder sslEngineBuilder;
private SslEngineFactory sslEngineFactory;
private Map<String, Object> sslEngineFactoryConfig;
public SslFactory(Mode mode) {
this(mode, null, false);
@ -80,19 +80,16 @@ public class SslFactory implements Reconfigurable {
@Override
public void configure(Map<String, ?> configs) throws KafkaException {
if (sslEngineBuilder != null) {
if (sslEngineFactory != null) {
throw new IllegalStateException("SslFactory was already configured.");
}
this.endpointIdentification = (String) configs.get(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
Map<String, Object> nextConfigs = new HashMap<>();
copyMapEntries(nextConfigs, configs, SslConfigs.NON_RECONFIGURABLE_CONFIGS);
copyMapEntries(nextConfigs, configs, SslConfigs.RECONFIGURABLE_CONFIGS);
copyMapEntry(nextConfigs, configs, SecurityConfig.SECURITY_PROVIDERS_CONFIG);
Map<String, Object> nextConfigs = new HashMap<>(configs);
if (clientAuthConfigOverride != null) {
nextConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, clientAuthConfigOverride);
}
SslEngineBuilder builder = new SslEngineBuilder(nextConfigs);
SslEngineFactory builder = instantiateSslEngineFactory(nextConfigs);
if (keystoreVerifiableUsingTruststore) {
try {
SslEngineValidator.validate(builder, builder);
@ -101,68 +98,83 @@ public class SslFactory implements Reconfigurable {
"can't connect to a server SSLEngine created with those settings.", e);
}
}
this.sslEngineBuilder = builder;
this.sslEngineFactory = builder;
}
@Override
public Set<String> reconfigurableConfigs() {
return SslConfigs.RECONFIGURABLE_CONFIGS;
return sslEngineFactory.reconfigurableConfigs();
}
@Override
public void validateReconfiguration(Map<String, ?> newConfigs) {
createNewSslEngineBuilder(newConfigs);
createNewSslEngineFactory(newConfigs);
}
@Override
public void reconfigure(Map<String, ?> newConfigs) throws KafkaException {
SslEngineBuilder newSslEngineBuilder = createNewSslEngineBuilder(newConfigs);
if (newSslEngineBuilder != this.sslEngineBuilder) {
this.sslEngineBuilder = newSslEngineBuilder;
SslEngineFactory newSslEngineFactory = createNewSslEngineFactory(newConfigs);
if (newSslEngineFactory != this.sslEngineFactory) {
this.sslEngineFactory = newSslEngineFactory;
log.info("Created new {} SSL engine builder with keystore {} truststore {}", mode,
newSslEngineBuilder.keystore(), newSslEngineBuilder.truststore());
newSslEngineFactory.keystore(), newSslEngineFactory.truststore());
}
}
private SslEngineBuilder createNewSslEngineBuilder(Map<String, ?> newConfigs) {
if (sslEngineBuilder == null) {
private SslEngineFactory instantiateSslEngineFactory(Map<String, Object> configs) {
@SuppressWarnings("unchecked")
Class<? extends SslEngineFactory> sslEngineFactoryClass =
(Class<? extends SslEngineFactory>) configs.get(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG);
SslEngineFactory sslEngineFactory;
if (sslEngineFactoryClass == null) {
sslEngineFactory = new DefaultSslEngineFactory();
} else {
sslEngineFactory = Utils.newInstance(sslEngineFactoryClass);
}
sslEngineFactory.configure(configs);
this.sslEngineFactoryConfig = configs;
return sslEngineFactory;
}
private SslEngineFactory createNewSslEngineFactory(Map<String, ?> newConfigs) {
if (sslEngineFactory == null) {
throw new IllegalStateException("SslFactory has not been configured.");
}
Map<String, Object> nextConfigs = new HashMap<>(sslEngineBuilder.configs());
copyMapEntries(nextConfigs, newConfigs, SslConfigs.RECONFIGURABLE_CONFIGS);
Map<String, Object> nextConfigs = new HashMap<>(sslEngineFactoryConfig);
copyMapEntries(nextConfigs, newConfigs, reconfigurableConfigs());
if (clientAuthConfigOverride != null) {
nextConfigs.put(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, clientAuthConfigOverride);
}
if (!sslEngineBuilder.shouldBeRebuilt(nextConfigs)) {
return sslEngineBuilder;
if (!sslEngineFactory.shouldBeRebuilt(nextConfigs)) {
return sslEngineFactory;
}
try {
SslEngineBuilder newSslEngineBuilder = new SslEngineBuilder(nextConfigs);
if (sslEngineBuilder.keystore() == null) {
if (newSslEngineBuilder.keystore() != null) {
SslEngineFactory newSslEngineFactory = instantiateSslEngineFactory(nextConfigs);
if (sslEngineFactory.keystore() == null) {
if (newSslEngineFactory.keystore() != null) {
throw new ConfigException("Cannot add SSL keystore to an existing listener for " +
"which no keystore was configured.");
}
} else {
if (newSslEngineBuilder.keystore() == null) {
if (newSslEngineFactory.keystore() == null) {
throw new ConfigException("Cannot remove the SSL keystore from an existing listener for " +
"which a keystore was configured.");
}
if (!CertificateEntries.create(sslEngineBuilder.keystore().load()).equals(
CertificateEntries.create(newSslEngineBuilder.keystore().load()))) {
if (!CertificateEntries.create(sslEngineFactory.keystore()).equals(
CertificateEntries.create(newSslEngineFactory.keystore()))) {
throw new ConfigException("Keystore DistinguishedName or SubjectAltNames do not match");
}
}
if (sslEngineBuilder.truststore() == null && newSslEngineBuilder.truststore() != null) {
if (sslEngineFactory.truststore() == null && newSslEngineFactory.truststore() != null) {
throw new ConfigException("Cannot add SSL truststore to an existing listener for which no " +
"truststore was configured.");
}
if (keystoreVerifiableUsingTruststore) {
if (sslEngineBuilder.truststore() != null || sslEngineBuilder.keystore() != null) {
SslEngineValidator.validate(sslEngineBuilder, newSslEngineBuilder);
if (sslEngineFactory.truststore() != null || sslEngineFactory.keystore() != null) {
SslEngineValidator.validate(sslEngineFactory, newSslEngineFactory);
}
}
return newSslEngineBuilder;
return newSslEngineFactory;
} catch (Exception e) {
log.debug("Validation of dynamic config update of SSLFactory failed.", e);
throw new ConfigException("Validation of dynamic config update of SSLFactory failed: " + e);
@ -170,19 +182,18 @@ public class SslFactory implements Reconfigurable {
}
public SSLEngine createSslEngine(String peerHost, int peerPort) {
if (sslEngineBuilder == null) {
if (sslEngineFactory == null) {
throw new IllegalStateException("SslFactory has not been configured.");
}
return sslEngineBuilder.createSslEngine(mode, peerHost, peerPort, endpointIdentification);
if (mode == Mode.SERVER) {
return sslEngineFactory.createServerSslEngine(peerHost, peerPort);
} else {
return sslEngineFactory.createClientSslEngine(peerHost, peerPort, endpointIdentification);
}
}
@Deprecated
public SSLContext sslContext() {
return sslEngineBuilder.sslContext();
}
public SslEngineBuilder sslEngineBuilder() {
return sslEngineBuilder;
public SslEngineFactory sslEngineFactory() {
return sslEngineFactory;
}
/**
@ -275,17 +286,21 @@ public class SslFactory implements Reconfigurable {
private ByteBuffer appBuffer;
private ByteBuffer netBuffer;
static void validate(SslEngineBuilder oldEngineBuilder,
SslEngineBuilder newEngineBuilder) throws SSLException {
static void validate(SslEngineFactory oldEngineBuilder,
SslEngineFactory newEngineBuilder) throws SSLException {
validate(createSslEngineForValidation(oldEngineBuilder, Mode.SERVER),
createSslEngineForValidation(newEngineBuilder, Mode.CLIENT));
validate(createSslEngineForValidation(newEngineBuilder, Mode.SERVER),
createSslEngineForValidation(oldEngineBuilder, Mode.CLIENT));
}
private static SSLEngine createSslEngineForValidation(SslEngineBuilder sslEngineBuilder, Mode mode) {
private static SSLEngine createSslEngineForValidation(SslEngineFactory sslEngineFactory, Mode mode) {
// Use empty hostname, disable hostname verification
return sslEngineBuilder.createSslEngine(mode, "", 0, "");
if (mode == Mode.SERVER) {
return sslEngineFactory.createServerSslEngine("", 0);
} else {
return sslEngineFactory.createClientSslEngine("", 0, "");
}
}
static void validate(SSLEngine clientEngine, SSLEngine serverEngine) throws SSLException {

View File

@ -17,6 +17,7 @@
package org.apache.kafka.common.network;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory;
import org.apache.kafka.common.security.ssl.SslFactory;
import javax.net.ssl.SSLContext;
@ -50,7 +51,7 @@ class EchoServer extends Thread {
case SSL:
this.sslFactory = new SslFactory(Mode.SERVER);
this.sslFactory.configure(configs);
SSLContext sslContext = this.sslFactory.sslEngineBuilder().sslContext();
SSLContext sslContext = ((DefaultSslEngineFactory) this.sslFactory.sslEngineFactory()).sslContext();
this.serverSocket = sslContext.getServerSocketFactory().createServerSocket(0);
break;
case PLAINTEXT:

View File

@ -22,14 +22,15 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.security.TestSecurityConfig;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.TestSecurityConfig;
import org.apache.kafka.common.security.ssl.SslFactory;
import org.apache.kafka.common.utils.Java;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestSslUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
@ -53,8 +54,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -1138,6 +1139,64 @@ public class SslTransportLayerTest {
NetworkTestUtils.checkClientConnection(newClientSelector, "3", 100, 10);
}
/**
* Tests if client can plugin customize ssl.engine.factory
*/
@Test
public void testCustomClientSslEngineFactory() throws Exception {
String node = "0";
sslClientConfigs.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, TestSslUtils.TestSslEngineFactory.class);
server = createEchoServer(SecurityProtocol.SSL);
createSelector(sslClientConfigs);
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.checkClientConnection(selector, node, 100, 10);
}
/**
* Tests if server can plugin customize ssl.engine.factory
*/
@Test
public void testCustomServerSslEngineFactory() throws Exception {
String node = "0";
sslServerConfigs.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, TestSslUtils.TestSslEngineFactory.class);
server = createEchoServer(SecurityProtocol.SSL);
createSelector(sslClientConfigs);
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.checkClientConnection(selector, node, 100, 10);
}
/**
* Tests if client and server both can plugin customize ssl.engine.factory and talk to each other!
*/
@Test
public void testCustomClientAndServerSslEngineFactory() throws Exception {
String node = "0";
sslClientConfigs.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, TestSslUtils.TestSslEngineFactory.class);
sslServerConfigs.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, TestSslUtils.TestSslEngineFactory.class);
server = createEchoServer(SecurityProtocol.SSL);
createSelector(sslClientConfigs);
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
NetworkTestUtils.checkClientConnection(selector, node, 100, 10);
}
/**
* Tests invalid ssl.engine.factory plugin class
*/
@Test(expected = KafkaException.class)
public void testInvalidSslEngineFactory() throws Exception {
sslClientConfigs.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, String.class);
createSelector(sslClientConfigs);
}
private void verifyInvalidReconfigure(ListenerReconfigurable reconfigurable,
Map<String, Object> invalidConfigs, String errorMessage) {
try {

View File

@ -33,6 +33,7 @@ import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SecurityConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.security.auth.SslEngineFactory;
import org.apache.kafka.common.security.ssl.mock.TestKeyManagerFactory;
import org.apache.kafka.common.security.ssl.mock.TestProviderCreator;
import org.apache.kafka.common.security.ssl.mock.TestTrustManagerFactory;
@ -99,7 +100,7 @@ public class SslFactoryTest {
serverSslConfig.put(SecurityConfig.SECURITY_PROVIDERS_CONFIG, testProviderCreator.getClass().getName());
SslFactory sslFactory = new SslFactory(Mode.SERVER);
sslFactory.configure(serverSslConfig);
assertNotNull("SslEngineBuilder not created", sslFactory.sslEngineBuilder());
assertNotNull("SslEngineFactory not created", sslFactory.sslEngineFactory());
Security.removeProvider(testProviderCreator.getProvider().getName());
}
@ -167,54 +168,54 @@ public class SslFactoryTest {
.build();
SslFactory sslFactory = new SslFactory(Mode.SERVER);
sslFactory.configure(sslConfig);
SslEngineBuilder sslEngineBuilder = sslFactory.sslEngineBuilder();
assertNotNull("SslEngineBuilder not created", sslEngineBuilder);
SslEngineFactory sslEngineFactory = sslFactory.sslEngineFactory();
assertNotNull("SslEngineFactory not created", sslEngineFactory);
// Verify that SslEngineBuilder is not recreated on reconfigure() if config and
// Verify that SslEngineFactory is not recreated on reconfigure() if config and
// file are not changed
sslFactory.reconfigure(sslConfig);
assertSame("SslEngineBuilder recreated unnecessarily",
sslEngineBuilder, sslFactory.sslEngineBuilder());
assertSame("SslEngineFactory recreated unnecessarily",
sslEngineFactory, sslFactory.sslEngineFactory());
// Verify that the SslEngineBuilder is recreated on reconfigure() if config is changed
// Verify that the SslEngineFactory is recreated on reconfigure() if config is changed
trustStoreFile = File.createTempFile("truststore", ".jks");
sslConfig = sslConfigsBuilder(Mode.SERVER)
.createNewTrustStore(trustStoreFile)
.build();
sslFactory.reconfigure(sslConfig);
assertNotSame("SslEngineBuilder not recreated",
sslEngineBuilder, sslFactory.sslEngineBuilder());
sslEngineBuilder = sslFactory.sslEngineBuilder();
assertNotSame("SslEngineFactory not recreated",
sslEngineFactory, sslFactory.sslEngineFactory());
sslEngineFactory = sslFactory.sslEngineFactory();
// Verify that builder is recreated on reconfigure() if config is not changed, but truststore file was modified
trustStoreFile.setLastModified(System.currentTimeMillis() + 10000);
sslFactory.reconfigure(sslConfig);
assertNotSame("SslEngineBuilder not recreated",
sslEngineBuilder, sslFactory.sslEngineBuilder());
sslEngineBuilder = sslFactory.sslEngineBuilder();
assertNotSame("SslEngineFactory not recreated",
sslEngineFactory, sslFactory.sslEngineFactory());
sslEngineFactory = sslFactory.sslEngineFactory();
// Verify that builder is recreated on reconfigure() if config is not changed, but keystore file was modified
File keyStoreFile = new File((String) sslConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG));
keyStoreFile.setLastModified(System.currentTimeMillis() + 10000);
sslFactory.reconfigure(sslConfig);
assertNotSame("SslEngineBuilder not recreated",
sslEngineBuilder, sslFactory.sslEngineBuilder());
sslEngineBuilder = sslFactory.sslEngineBuilder();
assertNotSame("SslEngineFactory not recreated",
sslEngineFactory, sslFactory.sslEngineFactory());
sslEngineFactory = sslFactory.sslEngineFactory();
// Verify that builder is recreated after validation on reconfigure() if config is not changed, but keystore file was modified
keyStoreFile.setLastModified(System.currentTimeMillis() + 15000);
sslFactory.validateReconfiguration(sslConfig);
sslFactory.reconfigure(sslConfig);
assertNotSame("SslEngineBuilder not recreated",
sslEngineBuilder, sslFactory.sslEngineBuilder());
sslEngineBuilder = sslFactory.sslEngineBuilder();
assertNotSame("SslEngineFactory not recreated",
sslEngineFactory, sslFactory.sslEngineFactory());
sslEngineFactory = sslFactory.sslEngineFactory();
// Verify that the builder is not recreated if modification time cannot be determined
keyStoreFile.setLastModified(System.currentTimeMillis() + 20000);
Files.delete(keyStoreFile.toPath());
sslFactory.reconfigure(sslConfig);
assertSame("SslEngineBuilder recreated unnecessarily",
sslEngineBuilder, sslFactory.sslEngineBuilder());
assertSame("SslEngineFactory recreated unnecessarily",
sslEngineFactory, sslFactory.sslEngineFactory());
}
@Test
@ -228,10 +229,10 @@ public class SslFactoryTest {
sslConfig.remove(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG);
SslFactory sslFactory = new SslFactory(Mode.SERVER);
sslFactory.configure(sslConfig);
SSLContext sslContext = sslFactory.sslEngineBuilder().sslContext();
SSLContext sslContext = ((DefaultSslEngineFactory) sslFactory.sslEngineFactory()).sslContext();
assertNotNull("SSL context not created", sslContext);
assertSame("SSL context recreated unnecessarily", sslContext,
sslFactory.sslEngineBuilder().sslContext());
((DefaultSslEngineFactory) sslFactory.sslEngineFactory()).sslContext());
assertFalse(sslFactory.createSslEngine("localhost", 0).getUseClientMode());
Map<String, Object> sslConfig2 = sslConfigsBuilder(Mode.SERVER)
@ -256,10 +257,10 @@ public class SslFactoryTest {
sslConfig.remove(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG);
SslFactory sslFactory = new SslFactory(Mode.SERVER);
sslFactory.configure(sslConfig);
SSLContext sslContext = sslFactory.sslEngineBuilder().sslContext();
SSLContext sslContext = ((DefaultSslEngineFactory) sslFactory.sslEngineFactory()).sslContext();
assertNotNull("SSL context not created", sslContext);
assertSame("SSL context recreated unnecessarily", sslContext,
sslFactory.sslEngineBuilder().sslContext());
((DefaultSslEngineFactory) sslFactory.sslEngineFactory()).sslContext());
assertFalse(sslFactory.createSslEngine("localhost", 0).getUseClientMode());
File newTrustStoreFile = File.createTempFile("truststore", ".jks");
@ -271,7 +272,7 @@ public class SslFactoryTest {
sslConfig.remove(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG);
sslFactory.reconfigure(sslConfig);
assertNotSame("SSL context not recreated", sslContext,
sslFactory.sslEngineBuilder().sslContext());
((DefaultSslEngineFactory) sslFactory.sslEngineFactory()).sslContext());
sslConfig = sslConfigsBuilder(Mode.SERVER)
.createNewTrustStore(newTrustStoreFile)
@ -292,7 +293,7 @@ public class SslFactoryTest {
.build();
SslFactory sslFactory = new SslFactory(Mode.SERVER);
sslFactory.configure(serverSslConfig);
assertNotNull("SslEngineBuilder not created", sslFactory.sslEngineBuilder());
assertNotNull("SslEngineFactory not created", sslFactory.sslEngineFactory());
}
@Test
@ -355,20 +356,69 @@ public class SslFactoryTest {
.createNewTrustStore(File.createTempFile("truststore", ".jks"))
.cn("Another CN")
.build();
KeyStore ks1 = sslKeyStore(serverSslConfig).load();
KeyStore ks2 = sslKeyStore(serverSslConfig).load();
KeyStore ks1 = sslKeyStore(serverSslConfig).get();
KeyStore ks2 = sslKeyStore(serverSslConfig).get();
assertEquals(SslFactory.CertificateEntries.create(ks1), SslFactory.CertificateEntries.create(ks2));
// Use different alias name, validation should succeed
ks2.setCertificateEntry("another", ks1.getCertificate("localhost"));
assertEquals(SslFactory.CertificateEntries.create(ks1), SslFactory.CertificateEntries.create(ks2));
KeyStore ks3 = sslKeyStore(newCnConfig).load();
KeyStore ks3 = sslKeyStore(newCnConfig).get();
assertNotEquals(SslFactory.CertificateEntries.create(ks1), SslFactory.CertificateEntries.create(ks3));
}
private SslEngineBuilder.SecurityStore sslKeyStore(Map<String, Object> sslConfig) {
return new SslEngineBuilder.SecurityStore(
/**
* Tests client side ssl.engine.factory configuration is used when specified
*/
@Test
public void testClientSpecifiedSslEngineFactoryUsed() throws Exception {
File trustStoreFile = File.createTempFile("truststore", ".jks");
Map<String, Object> clientSslConfig = sslConfigsBuilder(Mode.CLIENT)
.createNewTrustStore(trustStoreFile)
.useClientCert(false)
.build();
clientSslConfig.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, TestSslUtils.TestSslEngineFactory.class);
SslFactory sslFactory = new SslFactory(Mode.CLIENT);
sslFactory.configure(clientSslConfig);
assertTrue("SslEngineFactory must be of expected type",
sslFactory.sslEngineFactory() instanceof TestSslUtils.TestSslEngineFactory);
}
/**
* Tests server side ssl.engine.factory configuration is used when specified
*/
@Test
public void testServerSpecifiedSslEngineFactoryUsed() throws Exception {
File trustStoreFile = File.createTempFile("truststore", ".jks");
Map<String, Object> serverSslConfig = sslConfigsBuilder(Mode.SERVER)
.createNewTrustStore(trustStoreFile)
.useClientCert(false)
.build();
serverSslConfig.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, TestSslUtils.TestSslEngineFactory.class);
SslFactory sslFactory = new SslFactory(Mode.SERVER);
sslFactory.configure(serverSslConfig);
assertTrue("SslEngineFactory must be of expected type",
sslFactory.sslEngineFactory() instanceof TestSslUtils.TestSslEngineFactory);
}
/**
* Tests invalid ssl.engine.factory configuration
*/
@Test(expected = ClassCastException.class)
public void testInvalidSslEngineFactory() throws Exception {
File trustStoreFile = File.createTempFile("truststore", ".jks");
Map<String, Object> clientSslConfig = sslConfigsBuilder(Mode.CLIENT)
.createNewTrustStore(trustStoreFile)
.useClientCert(false)
.build();
clientSslConfig.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, String.class);
SslFactory sslFactory = new SslFactory(Mode.CLIENT);
sslFactory.configure(clientSslConfig);
}
private DefaultSslEngineFactory.SecurityStore sslKeyStore(Map<String, Object> sslConfig) {
return new DefaultSslEngineFactory.SecurityStore(
(String) sslConfig.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG),
(String) sslConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG),
(Password) sslConfig.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG),

View File

@ -27,6 +27,7 @@ import java.io.OutputStream;
import java.math.BigInteger;
import java.net.InetAddress;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManagerFactory;
import java.nio.file.Files;
@ -44,6 +45,8 @@ import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.security.auth.SslEngineFactory;
import org.apache.kafka.common.security.ssl.DefaultSslEngineFactory;
import org.bouncycastle.asn1.DEROctetString;
import org.bouncycastle.asn1.x500.X500Name;
import org.bouncycastle.asn1.x509.AlgorithmIdentifier;
@ -68,6 +71,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
public class TestSslUtils {
@ -360,4 +364,49 @@ public class TestSslUtils {
return sslConfigs;
}
}
public static final class TestSslEngineFactory implements SslEngineFactory {
DefaultSslEngineFactory defaultSslEngineFactory = new DefaultSslEngineFactory();
@Override
public SSLEngine createClientSslEngine(String peerHost, int peerPort, String endpointIdentification) {
return defaultSslEngineFactory.createClientSslEngine(peerHost, peerPort, endpointIdentification);
}
@Override
public SSLEngine createServerSslEngine(String peerHost, int peerPort) {
return defaultSslEngineFactory.createServerSslEngine(peerHost, peerPort);
}
@Override
public boolean shouldBeRebuilt(Map<String, Object> nextConfigs) {
return defaultSslEngineFactory.shouldBeRebuilt(nextConfigs);
}
@Override
public Set<String> reconfigurableConfigs() {
return defaultSslEngineFactory.reconfigurableConfigs();
}
@Override
public KeyStore keystore() {
return defaultSslEngineFactory.keystore();
}
@Override
public KeyStore truststore() {
return defaultSslEngineFactory.truststore();
}
@Override
public void close() throws IOException {
defaultSslEngineFactory.close();
}
@Override
public void configure(Map<String, ?> configs) {
defaultSslEngineFactory.configure(configs);
}
}
}

View File

@ -829,6 +829,7 @@ object DynamicListenerConfig {
KafkaConfig.SslEndpointIdentificationAlgorithmProp,
KafkaConfig.SslSecureRandomImplementationProp,
KafkaConfig.SslClientAuthProp,
KafkaConfig.SslEngineFactoryClassProp,
// SASL configs
KafkaConfig.SaslMechanismInterBrokerProtocolProp,

View File

@ -520,6 +520,7 @@ object KafkaConfig {
val SslSecureRandomImplementationProp = SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG
val SslClientAuthProp = BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG
val SslPrincipalMappingRulesProp = BrokerSecurityConfigs.SSL_PRINCIPAL_MAPPING_RULES_CONFIG
var SslEngineFactoryClassProp = SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG
/** ********* SASL Configuration ****************/
val SaslMechanismInterBrokerProtocolProp = "sasl.mechanism.inter.broker.protocol"
@ -910,6 +911,7 @@ object KafkaConfig {
val SslSecureRandomImplementationDoc = SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_DOC
val SslClientAuthDoc = BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC
val SslPrincipalMappingRulesDoc = BrokerSecurityConfigs.SSL_PRINCIPAL_MAPPING_RULES_DOC
val SslEngineFactoryClassDoc = SslConfigs.SSL_ENGINE_FACTORY_CLASS_DOC
/** ********* Sasl Configuration ****************/
val SaslMechanismInterBrokerProtocolDoc = "SASL mechanism used for inter-broker communication. Default is GSSAPI."
@ -1173,6 +1175,7 @@ object KafkaConfig {
.define(SslClientAuthProp, STRING, Defaults.SslClientAuthentication, in(Defaults.SslClientAuthenticationValidValues:_*), MEDIUM, SslClientAuthDoc)
.define(SslCipherSuitesProp, LIST, Collections.emptyList(), MEDIUM, SslCipherSuitesDoc)
.define(SslPrincipalMappingRulesProp, STRING, Defaults.SslPrincipalMappingRules, LOW, SslPrincipalMappingRulesDoc)
.define(SslEngineFactoryClassProp, CLASS, null, LOW, SslEngineFactoryClassDoc)
/** ********* Sasl Configuration ****************/
.define(SaslMechanismInterBrokerProtocolProp, STRING, Defaults.SaslMechanismInterBrokerProtocol, MEDIUM, SaslMechanismInterBrokerProtocolDoc)

View File

@ -32,23 +32,24 @@ import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_KERBEROS_SERVICE_NAME;
import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_TYPE_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_KERBEROS_SERVICE_NAME;
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_TYPE_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
/**
* A log4j appender that produces log messages to Kafka
@ -70,6 +71,7 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
private String clientJaasConf;
private String kerb5ConfPath;
private Integer maxBlockMs;
private String sslEngineFactoryClass;
private int retries = Integer.MAX_VALUE;
private int requiredNumAcks = 1;
@ -242,6 +244,14 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
this.maxBlockMs = maxBlockMs;
}
public String getSslEngineFactoryClass() {
return sslEngineFactoryClass;
}
public void setSslEngineFactoryClass(String sslEngineFactoryClass) {
this.sslEngineFactoryClass = sslEngineFactoryClass;
}
@Override
public void activateOptions() {
// check for config parameter validity
@ -262,18 +272,25 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
if (securityProtocol != null) {
props.put(SECURITY_PROTOCOL_CONFIG, securityProtocol);
}
if (securityProtocol != null && securityProtocol.contains("SSL") && sslTruststoreLocation != null &&
sslTruststorePassword != null) {
if (securityProtocol != null && (securityProtocol.contains("SSL") || securityProtocol.contains("SASL"))) {
if (sslEngineFactoryClass != null) {
props.put(SSL_ENGINE_FACTORY_CLASS_CONFIG, sslEngineFactoryClass);
}
}
if (securityProtocol != null && securityProtocol.contains("SSL") && sslTruststoreLocation != null && sslTruststorePassword != null) {
props.put(SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststoreLocation);
props.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, sslTruststorePassword);
if (sslKeystoreType != null && sslKeystoreLocation != null &&
sslKeystorePassword != null) {
sslKeystorePassword != null) {
props.put(SSL_KEYSTORE_TYPE_CONFIG, sslKeystoreType);
props.put(SSL_KEYSTORE_LOCATION_CONFIG, sslKeystoreLocation);
props.put(SSL_KEYSTORE_PASSWORD_CONFIG, sslKeystorePassword);
}
}
if (securityProtocol != null && securityProtocol.contains("SASL") && saslKerberosServiceName != null && clientJaasConfPath != null) {
props.put(SASL_KERBEROS_SERVICE_NAME, saslKerberosServiceName);
System.setProperty("java.security.auth.login.config", clientJaasConfPath);