mirror of https://github.com/apache/kafka.git
KAFKA-2460; Fix capitalisation in SSL classes
Author: Ismael Juma <ismael@juma.me.uk> Reviewers: Sriharsha Chintalapani <harsha@hortonworks.com> Closes #355 from ijuma/kafka-2460-fix-capitalisation-in-ssl-classes
This commit is contained in:
parent
6f2f1f9843
commit
16f194b20a
|
@ -17,7 +17,7 @@ import org.apache.kafka.common.config.AbstractConfig;
|
|||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.common.config.ConfigDef.Importance;
|
||||
import org.apache.kafka.common.config.ConfigDef.Type;
|
||||
import org.apache.kafka.common.config.SSLConfigs;
|
||||
import org.apache.kafka.common.config.SslConfigs;
|
||||
import org.apache.kafka.common.config.SaslConfigs;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
|
||||
|
@ -287,21 +287,21 @@ public class ConsumerConfig extends AbstractConfig {
|
|||
Importance.HIGH,
|
||||
VALUE_DESERIALIZER_CLASS_DOC)
|
||||
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
|
||||
.define(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SSLConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
|
||||
.define(SSLConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SSLConfigs.SSL_PROTOCOL_DOC)
|
||||
.define(SSLConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SSLConfigs.SSL_PROVIDER_DOC, false)
|
||||
.define(SSLConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SSLConfigs.SSL_CIPHER_SUITES_DOC, false)
|
||||
.define(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SSLConfigs.DEFAULT_ENABLED_PROTOCOLS, Importance.MEDIUM, SSLConfigs.SSL_ENABLED_PROTOCOLS_DOC)
|
||||
.define(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SSLConfigs.SSL_KEYSTORE_TYPE_DOC)
|
||||
.define(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
|
||||
.define(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
|
||||
.define(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEY_PASSWORD_DOC, false)
|
||||
.define(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SSLConfigs.SSL_TRUSTSTORE_TYPE_DOC)
|
||||
.define(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
|
||||
.define(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
|
||||
.define(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
|
||||
.define(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
|
||||
.define(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
|
||||
.define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
|
||||
.define(SslConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
|
||||
.define(SslConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false)
|
||||
.define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false)
|
||||
.define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
|
||||
.define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
|
||||
.define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
|
||||
.define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
|
||||
.define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false)
|
||||
.define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
|
||||
.define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
|
||||
.define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
|
||||
.define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
|
||||
.define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
|
||||
.define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
|
||||
.define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, Type.STRING, Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
|
||||
.define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
|
||||
.define(SaslConfigs.SASL_KAFKA_SERVER_REALM, Type.STRING, Importance.LOW, SaslConfigs.SASL_KAFKA_SERVER_DOC, false)
|
||||
|
|
|
@ -22,7 +22,7 @@ import java.util.Properties;
|
|||
|
||||
import org.apache.kafka.clients.CommonClientConfigs;
|
||||
import org.apache.kafka.common.config.AbstractConfig;
|
||||
import org.apache.kafka.common.config.SSLConfigs;
|
||||
import org.apache.kafka.common.config.SslConfigs;
|
||||
import org.apache.kafka.common.config.SaslConfigs;
|
||||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.common.config.ConfigDef.Importance;
|
||||
|
@ -265,21 +265,21 @@ public class ProducerConfig extends AbstractConfig {
|
|||
.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
|
||||
.define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC)
|
||||
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
|
||||
.define(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SSLConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
|
||||
.define(SSLConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SSLConfigs.SSL_PROTOCOL_DOC)
|
||||
.define(SSLConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SSLConfigs.SSL_PROVIDER_DOC, false)
|
||||
.define(SSLConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SSLConfigs.SSL_CIPHER_SUITES_DOC, false)
|
||||
.define(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SSLConfigs.DEFAULT_ENABLED_PROTOCOLS, Importance.MEDIUM, SSLConfigs.SSL_ENABLED_PROTOCOLS_DOC)
|
||||
.define(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SSLConfigs.SSL_KEYSTORE_TYPE_DOC)
|
||||
.define(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
|
||||
.define(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
|
||||
.define(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEY_PASSWORD_DOC, false)
|
||||
.define(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SSLConfigs.SSL_TRUSTSTORE_TYPE_DOC)
|
||||
.define(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
|
||||
.define(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
|
||||
.define(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
|
||||
.define(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
|
||||
.define(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
|
||||
.define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
|
||||
.define(SslConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
|
||||
.define(SslConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false)
|
||||
.define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false)
|
||||
.define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
|
||||
.define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
|
||||
.define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
|
||||
.define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
|
||||
.define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false)
|
||||
.define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
|
||||
.define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
|
||||
.define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
|
||||
.define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
|
||||
.define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
|
||||
.define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
|
||||
.define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, Type.STRING, Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
|
||||
.define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
|
||||
.define(SaslConfigs.SASL_KAFKA_SERVER_REALM, Type.STRING, Importance.LOW, SaslConfigs.SASL_KAFKA_SERVER_DOC, false)
|
||||
|
|
|
@ -16,7 +16,7 @@ package org.apache.kafka.common.config;
|
|||
import javax.net.ssl.KeyManagerFactory;
|
||||
import javax.net.ssl.TrustManagerFactory;
|
||||
|
||||
public class SSLConfigs {
|
||||
public class SslConfigs {
|
||||
/*
|
||||
* NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
|
||||
*/
|
||||
|
@ -43,7 +43,7 @@ public class SSLConfigs {
|
|||
public static final String SSL_ENABLED_PROTOCOLS_CONFIG = "ssl.enabled.protocols";
|
||||
public static final String SSL_ENABLED_PROTOCOLS_DOC = "The list of protocols enabled for SSL connections. "
|
||||
+ "TLSv1.2, TLSv1.1 and TLSv1 are enabled by default.";
|
||||
public static final String DEFAULT_ENABLED_PROTOCOLS = "TLSv1.2,TLSv1.1,TLSv1";
|
||||
public static final String DEFAULT_SSL_ENABLED_PROTOCOLS = "TLSv1.2,TLSv1.1,TLSv1";
|
||||
|
||||
public static final String SSL_KEYSTORE_TYPE_CONFIG = "ssl.keystore.type";
|
||||
public static final String SSL_KEYSTORE_TYPE_DOC = "The file format of the key store file. "
|
|
@ -36,7 +36,7 @@ public class ChannelBuilders {
|
|||
switch (securityProtocol) {
|
||||
case SSL:
|
||||
requireNonNullMode(mode, securityProtocol);
|
||||
channelBuilder = new SSLChannelBuilder(mode);
|
||||
channelBuilder = new SslChannelBuilder(mode);
|
||||
break;
|
||||
case SASL_SSL:
|
||||
case SASL_PLAINTEXT:
|
||||
|
|
|
@ -15,15 +15,14 @@ package org.apache.kafka.common.network;
|
|||
import java.nio.channels.SelectionKey;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.common.config.SslConfigs;
|
||||
import org.apache.kafka.common.security.auth.PrincipalBuilder;
|
||||
import org.apache.kafka.common.config.SSLConfigs;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
public class PlaintextChannelBuilder implements ChannelBuilder {
|
||||
private static final Logger log = LoggerFactory.getLogger(PlaintextChannelBuilder.class);
|
||||
private PrincipalBuilder principalBuilder;
|
||||
|
@ -32,7 +31,7 @@ public class PlaintextChannelBuilder implements ChannelBuilder {
|
|||
public void configure(Map<String, ?> configs) throws KafkaException {
|
||||
try {
|
||||
this.configs = configs;
|
||||
this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
|
||||
this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
|
||||
this.principalBuilder.configure(this.configs);
|
||||
} catch (Exception e) {
|
||||
throw new KafkaException(e);
|
||||
|
|
|
@ -25,9 +25,9 @@ import org.apache.kafka.common.security.kerberos.KerberosNameParser;
|
|||
import org.apache.kafka.common.security.kerberos.LoginManager;
|
||||
import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
|
||||
import org.apache.kafka.common.security.authenticator.SaslServerAuthenticator;
|
||||
import org.apache.kafka.common.security.ssl.SSLFactory;
|
||||
import org.apache.kafka.common.security.ssl.SslFactory;
|
||||
import org.apache.kafka.common.protocol.SecurityProtocol;
|
||||
import org.apache.kafka.common.config.SSLConfigs;
|
||||
import org.apache.kafka.common.config.SslConfigs;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
|
||||
|
@ -43,7 +43,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
|
|||
|
||||
private LoginManager loginManager;
|
||||
private PrincipalBuilder principalBuilder;
|
||||
private SSLFactory sslFactory;
|
||||
private SslFactory sslFactory;
|
||||
private Map<String, ?> configs;
|
||||
private KerberosNameParser kerberosNameParser;
|
||||
|
||||
|
@ -57,7 +57,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
|
|||
try {
|
||||
this.configs = configs;
|
||||
this.loginManager = LoginManager.acquireLoginManager(loginType, configs);
|
||||
this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
|
||||
this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
|
||||
this.principalBuilder.configure(configs);
|
||||
|
||||
String defaultRealm;
|
||||
|
@ -69,7 +69,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
|
|||
kerberosNameParser = new KerberosNameParser(defaultRealm, (List<String>) configs.get(SaslConfigs.AUTH_TO_LOCAL));
|
||||
|
||||
if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
|
||||
this.sslFactory = new SSLFactory(mode);
|
||||
this.sslFactory = new SslFactory(mode);
|
||||
this.sslFactory.configure(this.configs);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
@ -102,8 +102,8 @@ public class SaslChannelBuilder implements ChannelBuilder {
|
|||
|
||||
protected TransportLayer buildTransportLayer(String id, SelectionKey key, SocketChannel socketChannel) throws IOException {
|
||||
if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
|
||||
return SSLTransportLayer.create(id, key,
|
||||
sslFactory.createSSLEngine(socketChannel.socket().getInetAddress().getHostName(),
|
||||
return SslTransportLayer.create(id, key,
|
||||
sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(),
|
||||
socketChannel.socket().getPort()));
|
||||
} else {
|
||||
return new PlaintextTransportLayer(key);
|
||||
|
|
|
@ -18,30 +18,30 @@ import java.nio.channels.SocketChannel;
|
|||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.common.security.auth.PrincipalBuilder;
|
||||
import org.apache.kafka.common.security.ssl.SSLFactory;
|
||||
import org.apache.kafka.common.config.SSLConfigs;
|
||||
import org.apache.kafka.common.security.ssl.SslFactory;
|
||||
import org.apache.kafka.common.config.SslConfigs;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class SSLChannelBuilder implements ChannelBuilder {
|
||||
private static final Logger log = LoggerFactory.getLogger(SSLChannelBuilder.class);
|
||||
private SSLFactory sslFactory;
|
||||
public class SslChannelBuilder implements ChannelBuilder {
|
||||
private static final Logger log = LoggerFactory.getLogger(SslChannelBuilder.class);
|
||||
private SslFactory sslFactory;
|
||||
private PrincipalBuilder principalBuilder;
|
||||
private Mode mode;
|
||||
private Map<String, ?> configs;
|
||||
|
||||
public SSLChannelBuilder(Mode mode) {
|
||||
public SslChannelBuilder(Mode mode) {
|
||||
this.mode = mode;
|
||||
}
|
||||
|
||||
public void configure(Map<String, ?> configs) throws KafkaException {
|
||||
try {
|
||||
this.configs = configs;
|
||||
this.sslFactory = new SSLFactory(mode);
|
||||
this.sslFactory = new SslFactory(mode);
|
||||
this.sslFactory.configure(this.configs);
|
||||
this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
|
||||
this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
|
||||
this.principalBuilder.configure(this.configs);
|
||||
} catch (Exception e) {
|
||||
throw new KafkaException(e);
|
||||
|
@ -51,7 +51,7 @@ public class SSLChannelBuilder implements ChannelBuilder {
|
|||
public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
|
||||
KafkaChannel channel = null;
|
||||
try {
|
||||
SSLTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key);
|
||||
SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key);
|
||||
Authenticator authenticator = new DefaultAuthenticator();
|
||||
authenticator.configure(transportLayer, this.principalBuilder, this.configs);
|
||||
channel = new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
|
||||
|
@ -66,10 +66,10 @@ public class SSLChannelBuilder implements ChannelBuilder {
|
|||
this.principalBuilder.close();
|
||||
}
|
||||
|
||||
protected SSLTransportLayer buildTransportLayer(SSLFactory sslFactory, String id, SelectionKey key) throws IOException {
|
||||
protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key) throws IOException {
|
||||
SocketChannel socketChannel = (SocketChannel) key.channel();
|
||||
return SSLTransportLayer.create(id, key,
|
||||
sslFactory.createSSLEngine(socketChannel.socket().getInetAddress().getHostName(),
|
||||
return SslTransportLayer.create(id, key,
|
||||
sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(),
|
||||
socketChannel.socket().getPort()));
|
||||
}
|
||||
}
|
|
@ -43,9 +43,8 @@ import org.slf4j.LoggerFactory;
|
|||
/*
|
||||
* Transport layer for SSL communication
|
||||
*/
|
||||
|
||||
public class SSLTransportLayer implements TransportLayer {
|
||||
private static final Logger log = LoggerFactory.getLogger(SSLTransportLayer.class);
|
||||
public class SslTransportLayer implements TransportLayer {
|
||||
private static final Logger log = LoggerFactory.getLogger(SslTransportLayer.class);
|
||||
private final String channelId;
|
||||
private final SSLEngine sslEngine;
|
||||
private final SelectionKey key;
|
||||
|
@ -61,15 +60,15 @@ public class SSLTransportLayer implements TransportLayer {
|
|||
private ByteBuffer appReadBuffer;
|
||||
private ByteBuffer emptyBuf = ByteBuffer.allocate(0);
|
||||
|
||||
public static SSLTransportLayer create(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException {
|
||||
public static SslTransportLayer create(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException {
|
||||
// Disable renegotiation by default until we have fixed the known issues with the existing implementation
|
||||
SSLTransportLayer transportLayer = new SSLTransportLayer(channelId, key, sslEngine, false);
|
||||
SslTransportLayer transportLayer = new SslTransportLayer(channelId, key, sslEngine, false);
|
||||
transportLayer.startHandshake();
|
||||
return transportLayer;
|
||||
}
|
||||
|
||||
// Prefer `create`, only use this in tests
|
||||
SSLTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine, boolean enableRenegotiation) throws IOException {
|
||||
SslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine, boolean enableRenegotiation) throws IOException {
|
||||
this.channelId = channelId;
|
||||
this.key = key;
|
||||
this.socketChannel = (SocketChannel) key.channel();
|
|
@ -74,7 +74,8 @@ public interface TransportLayer extends ScatteringByteChannel, GatheringByteChan
|
|||
boolean hasPendingWrites();
|
||||
|
||||
/**
|
||||
* Returns `SSLSession.getPeerPrincipal` if SSLTransportLayer is used and `KakfaPrincipal.ANONYMOUS` otherwise.
|
||||
* Returns `SSLSession.getPeerPrincipal()` if this is a SslTransportLayer and there is an authenticated peer,
|
||||
* `KafkaPrincipal.ANONYMOUS` is returned otherwise.
|
||||
*/
|
||||
Principal peerPrincipal() throws IOException;
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@ package org.apache.kafka.common.security.ssl;
|
|||
|
||||
import org.apache.kafka.common.Configurable;
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
import org.apache.kafka.common.config.SSLConfigs;
|
||||
import org.apache.kafka.common.config.SslConfigs;
|
||||
import org.apache.kafka.common.network.Mode;
|
||||
|
||||
import javax.net.ssl.*;
|
||||
|
@ -30,7 +30,7 @@ import java.security.KeyStore;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class SSLFactory implements Configurable {
|
||||
public class SslFactory implements Configurable {
|
||||
|
||||
private String protocol;
|
||||
private String provider;
|
||||
|
@ -47,29 +47,29 @@ public class SSLFactory implements Configurable {
|
|||
private boolean wantClientAuth;
|
||||
private final Mode mode;
|
||||
|
||||
public SSLFactory(Mode mode) {
|
||||
public SslFactory(Mode mode) {
|
||||
this.mode = mode;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Map<String, ?> configs) throws KafkaException {
|
||||
this.protocol = (String) configs.get(SSLConfigs.SSL_PROTOCOL_CONFIG);
|
||||
this.provider = (String) configs.get(SSLConfigs.SSL_PROVIDER_CONFIG);
|
||||
this.protocol = (String) configs.get(SslConfigs.SSL_PROTOCOL_CONFIG);
|
||||
this.provider = (String) configs.get(SslConfigs.SSL_PROVIDER_CONFIG);
|
||||
|
||||
|
||||
List<String> cipherSuitesList = (List<String>) configs.get(SSLConfigs.SSL_CIPHER_SUITES_CONFIG);
|
||||
List<String> cipherSuitesList = (List<String>) configs.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG);
|
||||
if (cipherSuitesList != null)
|
||||
this.cipherSuites = cipherSuitesList.toArray(new String[cipherSuitesList.size()]);
|
||||
|
||||
List<String> enabledProtocolsList = (List<String>) configs.get(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG);
|
||||
List<String> enabledProtocolsList = (List<String>) configs.get(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG);
|
||||
if (enabledProtocolsList != null)
|
||||
this.enabledProtocols = enabledProtocolsList.toArray(new String[enabledProtocolsList.size()]);
|
||||
|
||||
String endpointIdentification = (String) configs.get(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
|
||||
String endpointIdentification = (String) configs.get(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
|
||||
if (endpointIdentification != null)
|
||||
this.endpointIdentification = endpointIdentification;
|
||||
|
||||
String clientAuthConfig = (String) configs.get(SSLConfigs.SSL_CLIENT_AUTH_CONFIG);
|
||||
String clientAuthConfig = (String) configs.get(SslConfigs.SSL_CLIENT_AUTH_CONFIG);
|
||||
if (clientAuthConfig != null) {
|
||||
if (clientAuthConfig.equals("required"))
|
||||
this.needClientAuth = true;
|
||||
|
@ -77,17 +77,17 @@ public class SSLFactory implements Configurable {
|
|||
this.wantClientAuth = true;
|
||||
}
|
||||
|
||||
this.kmfAlgorithm = (String) configs.get(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG);
|
||||
this.tmfAlgorithm = (String) configs.get(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG);
|
||||
this.kmfAlgorithm = (String) configs.get(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG);
|
||||
this.tmfAlgorithm = (String) configs.get(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG);
|
||||
|
||||
createKeystore((String) configs.get(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG),
|
||||
(String) configs.get(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG),
|
||||
(String) configs.get(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG),
|
||||
(String) configs.get(SSLConfigs.SSL_KEY_PASSWORD_CONFIG));
|
||||
createKeystore((String) configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG),
|
||||
(String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG),
|
||||
(String) configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG),
|
||||
(String) configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG));
|
||||
|
||||
createTruststore((String) configs.get(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG),
|
||||
(String) configs.get(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG),
|
||||
(String) configs.get(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
|
||||
createTruststore((String) configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG),
|
||||
(String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG),
|
||||
(String) configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
|
||||
try {
|
||||
this.sslContext = createSSLContext();
|
||||
} catch (Exception e) {
|
||||
|
@ -122,7 +122,7 @@ public class SSLFactory implements Configurable {
|
|||
return sslContext;
|
||||
}
|
||||
|
||||
public SSLEngine createSSLEngine(String peerHost, int peerPort) {
|
||||
public SSLEngine createSslEngine(String peerHost, int peerPort) {
|
||||
SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort);
|
||||
if (cipherSuites != null) sslEngine.setEnabledCipherSuites(cipherSuites);
|
||||
if (enabledProtocols != null) sslEngine.setEnabledProtocols(enabledProtocols);
|
|
@ -18,7 +18,7 @@ package org.apache.kafka.clients.producer;
|
|||
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
import org.apache.kafka.common.serialization.ByteArraySerializer;
|
||||
import org.apache.kafka.common.config.SSLConfigs;
|
||||
import org.apache.kafka.common.config.SslConfigs;
|
||||
import org.apache.kafka.clients.CommonClientConfigs;
|
||||
import org.apache.kafka.test.MockMetricsReporter;
|
||||
import org.apache.kafka.test.MockSerializer;
|
||||
|
@ -59,7 +59,7 @@ public class KafkaProducerTest {
|
|||
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
|
||||
configs.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
|
||||
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL);
|
||||
configs.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
|
||||
configs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
|
||||
final int oldInitCount = MockSerializer.INIT_COUNT.get();
|
||||
final int oldCloseCount = MockSerializer.CLOSE_COUNT.get();
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
package org.apache.kafka.common.network;
|
||||
|
||||
import org.apache.kafka.common.protocol.SecurityProtocol;
|
||||
import org.apache.kafka.common.security.ssl.SSLFactory;
|
||||
import org.apache.kafka.common.security.ssl.SslFactory;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.SSLSocket;
|
||||
|
@ -38,14 +38,14 @@ class EchoServer extends Thread {
|
|||
private final List<Thread> threads;
|
||||
private final List<Socket> sockets;
|
||||
private SecurityProtocol protocol = SecurityProtocol.PLAINTEXT;
|
||||
private SSLFactory sslFactory;
|
||||
private SslFactory sslFactory;
|
||||
private final AtomicBoolean renegotiate = new AtomicBoolean();
|
||||
|
||||
public EchoServer(Map<String, ?> configs) throws Exception {
|
||||
this.protocol = configs.containsKey("security.protocol") ?
|
||||
SecurityProtocol.valueOf((String) configs.get("security.protocol")) : SecurityProtocol.PLAINTEXT;
|
||||
if (protocol == SecurityProtocol.SSL) {
|
||||
this.sslFactory = new SSLFactory(Mode.SERVER);
|
||||
this.sslFactory = new SslFactory(Mode.SERVER);
|
||||
this.sslFactory.configure(configs);
|
||||
SSLContext sslContext = this.sslFactory.sslContext();
|
||||
this.serverSocket = sslContext.getServerSocketFactory().createServerSocket(0);
|
||||
|
|
|
@ -24,7 +24,7 @@ import java.nio.ByteBuffer;
|
|||
import java.util.*;
|
||||
|
||||
import org.apache.kafka.common.metrics.Metrics;
|
||||
import org.apache.kafka.common.config.SSLConfigs;
|
||||
import org.apache.kafka.common.config.SslConfigs;
|
||||
import org.apache.kafka.common.utils.MockTime;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
|
@ -49,7 +49,7 @@ public class SelectorTest {
|
|||
@Before
|
||||
public void setup() throws Exception {
|
||||
Map<String, Object> configs = new HashMap<String, Object>();
|
||||
configs.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
|
||||
configs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
|
||||
this.server = new EchoServer(configs);
|
||||
this.server.start();
|
||||
this.time = new MockTime();
|
||||
|
|
|
@ -26,10 +26,10 @@ import java.io.IOException;
|
|||
import java.net.InetSocketAddress;
|
||||
|
||||
import org.apache.kafka.common.metrics.Metrics;
|
||||
import org.apache.kafka.common.config.SSLConfigs;
|
||||
import org.apache.kafka.common.security.ssl.SSLFactory;
|
||||
import org.apache.kafka.common.config.SslConfigs;
|
||||
import org.apache.kafka.common.security.ssl.SslFactory;
|
||||
import org.apache.kafka.common.utils.MockTime;
|
||||
import org.apache.kafka.test.TestSSLUtils;
|
||||
import org.apache.kafka.test.TestSslUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -37,7 +37,7 @@ import org.junit.Test;
|
|||
/**
|
||||
* A set of tests for the selector. These use a test harness that runs a simple socket server that echos back responses.
|
||||
*/
|
||||
public class SSLSelectorTest extends SelectorTest {
|
||||
public class SslSelectorTest extends SelectorTest {
|
||||
|
||||
private Metrics metrics;
|
||||
private Map<String, Object> sslClientConfigs;
|
||||
|
@ -46,15 +46,15 @@ public class SSLSelectorTest extends SelectorTest {
|
|||
public void setup() throws Exception {
|
||||
File trustStoreFile = File.createTempFile("truststore", ".jks");
|
||||
|
||||
Map<String, Object> sslServerConfigs = TestSSLUtils.createSSLConfig(false, true, Mode.SERVER, trustStoreFile, "server");
|
||||
sslServerConfigs.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
|
||||
Map<String, Object> sslServerConfigs = TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, "server");
|
||||
sslServerConfigs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
|
||||
this.server = new EchoServer(sslServerConfigs);
|
||||
this.server.start();
|
||||
this.time = new MockTime();
|
||||
sslClientConfigs = TestSSLUtils.createSSLConfig(false, false, Mode.SERVER, trustStoreFile, "client");
|
||||
sslClientConfigs.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
|
||||
sslClientConfigs = TestSslUtils.createSslConfig(false, false, Mode.SERVER, trustStoreFile, "client");
|
||||
sslClientConfigs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
|
||||
|
||||
this.channelBuilder = new SSLChannelBuilder(Mode.CLIENT);
|
||||
this.channelBuilder = new SslChannelBuilder(Mode.CLIENT);
|
||||
this.channelBuilder.configure(sslClientConfigs);
|
||||
this.metrics = new Metrics();
|
||||
this.selector = new Selector(5000, metrics, time, "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
|
||||
|
@ -73,12 +73,12 @@ public class SSLSelectorTest extends SelectorTest {
|
|||
*/
|
||||
@Test
|
||||
public void testRenegotiation() throws Exception {
|
||||
ChannelBuilder channelBuilder = new SSLChannelBuilder(Mode.CLIENT) {
|
||||
ChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT) {
|
||||
@Override
|
||||
protected SSLTransportLayer buildTransportLayer(SSLFactory sslFactory, String id, SelectionKey key) throws IOException {
|
||||
protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key) throws IOException {
|
||||
SocketChannel socketChannel = (SocketChannel) key.channel();
|
||||
SSLTransportLayer transportLayer = new SSLTransportLayer(id, key,
|
||||
sslFactory.createSSLEngine(socketChannel.socket().getInetAddress().getHostName(), socketChannel.socket().getPort()),
|
||||
SslTransportLayer transportLayer = new SslTransportLayer(id, key,
|
||||
sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(), socketChannel.socket().getPort()),
|
||||
true);
|
||||
transportLayer.startHandshake();
|
||||
return transportLayer;
|
|
@ -38,12 +38,12 @@ import javax.net.ssl.SSLContext;
|
|||
import javax.net.ssl.SSLEngine;
|
||||
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
import org.apache.kafka.common.config.SSLConfigs;
|
||||
import org.apache.kafka.common.security.ssl.SSLFactory;
|
||||
import org.apache.kafka.common.config.SslConfigs;
|
||||
import org.apache.kafka.common.security.ssl.SslFactory;
|
||||
import org.apache.kafka.common.metrics.Metrics;
|
||||
import org.apache.kafka.common.utils.MockTime;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.test.TestSSLUtils;
|
||||
import org.apache.kafka.test.TestSslUtils;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
@ -52,12 +52,11 @@ import org.junit.Test;
|
|||
/**
|
||||
* Tests for the SSL transport layer. These use a test harness that runs a simple socket server that echos back responses.
|
||||
*/
|
||||
|
||||
public class SSLTransportLayerTest {
|
||||
public class SslTransportLayerTest {
|
||||
|
||||
private static final int BUFFER_SIZE = 4 * 1024;
|
||||
|
||||
private SSLEchoServer server;
|
||||
private SslEchoServer server;
|
||||
private Selector selector;
|
||||
private ChannelBuilder channelBuilder;
|
||||
private CertStores serverCertStores;
|
||||
|
@ -73,7 +72,7 @@ public class SSLTransportLayerTest {
|
|||
sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores);
|
||||
sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores);
|
||||
|
||||
this.channelBuilder = new SSLChannelBuilder(Mode.CLIENT);
|
||||
this.channelBuilder = new SslChannelBuilder(Mode.CLIENT);
|
||||
this.channelBuilder.configure(sslClientConfigs);
|
||||
this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
|
||||
}
|
||||
|
@ -94,7 +93,7 @@ public class SSLTransportLayerTest {
|
|||
public void testValidEndpointIdentification() throws Exception {
|
||||
String node = "0";
|
||||
createEchoServer(sslServerConfigs);
|
||||
sslClientConfigs.put(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS");
|
||||
sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS");
|
||||
createSelector(sslClientConfigs);
|
||||
InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
|
||||
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
|
||||
|
@ -111,9 +110,9 @@ public class SSLTransportLayerTest {
|
|||
public void testInvalidEndpointIdentification() throws Exception {
|
||||
String node = "0";
|
||||
String serverHost = InetAddress.getLocalHost().getHostAddress();
|
||||
server = new SSLEchoServer(sslServerConfigs, serverHost);
|
||||
server = new SslEchoServer(sslServerConfigs, serverHost);
|
||||
server.start();
|
||||
sslClientConfigs.put(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS");
|
||||
sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS");
|
||||
createSelector(sslClientConfigs);
|
||||
InetSocketAddress addr = new InetSocketAddress(serverHost, server.port);
|
||||
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
|
||||
|
@ -129,9 +128,9 @@ public class SSLTransportLayerTest {
|
|||
public void testEndpointIdentificationDisabled() throws Exception {
|
||||
String node = "0";
|
||||
String serverHost = InetAddress.getLocalHost().getHostAddress();
|
||||
server = new SSLEchoServer(sslServerConfigs, serverHost);
|
||||
server = new SslEchoServer(sslServerConfigs, serverHost);
|
||||
server.start();
|
||||
sslClientConfigs.remove(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
|
||||
sslClientConfigs.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
|
||||
createSelector(sslClientConfigs);
|
||||
InetSocketAddress addr = new InetSocketAddress(serverHost, server.port);
|
||||
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
|
||||
|
@ -146,7 +145,7 @@ public class SSLTransportLayerTest {
|
|||
@Test
|
||||
public void testClientAuthenticationRequiredValidProvided() throws Exception {
|
||||
String node = "0";
|
||||
sslServerConfigs.put(SSLConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
|
||||
sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
|
||||
createEchoServer(sslServerConfigs);
|
||||
createSelector(sslClientConfigs);
|
||||
InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
|
||||
|
@ -163,7 +162,7 @@ public class SSLTransportLayerTest {
|
|||
public void testClientAuthenticationRequiredUntrustedProvided() throws Exception {
|
||||
String node = "0";
|
||||
sslServerConfigs = serverCertStores.getUntrustingConfig();
|
||||
sslServerConfigs.put(SSLConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
|
||||
sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
|
||||
createEchoServer(sslServerConfigs);
|
||||
createSelector(sslClientConfigs);
|
||||
InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
|
||||
|
@ -179,12 +178,12 @@ public class SSLTransportLayerTest {
|
|||
@Test
|
||||
public void testClientAuthenticationRequiredNotProvided() throws Exception {
|
||||
String node = "0";
|
||||
sslServerConfigs.put(SSLConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
|
||||
sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required");
|
||||
createEchoServer(sslServerConfigs);
|
||||
|
||||
sslClientConfigs.remove(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
|
||||
sslClientConfigs.remove(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
|
||||
sslClientConfigs.remove(SSLConfigs.SSL_KEY_PASSWORD_CONFIG);
|
||||
sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
|
||||
sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
|
||||
sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG);
|
||||
createSelector(sslClientConfigs);
|
||||
InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
|
||||
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
|
||||
|
@ -200,7 +199,7 @@ public class SSLTransportLayerTest {
|
|||
public void testClientAuthenticationDisabledUntrustedProvided() throws Exception {
|
||||
String node = "0";
|
||||
sslServerConfigs = serverCertStores.getUntrustingConfig();
|
||||
sslServerConfigs.put(SSLConfigs.SSL_CLIENT_AUTH_CONFIG, "none");
|
||||
sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "none");
|
||||
createEchoServer(sslServerConfigs);
|
||||
createSelector(sslClientConfigs);
|
||||
InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
|
||||
|
@ -216,12 +215,12 @@ public class SSLTransportLayerTest {
|
|||
@Test
|
||||
public void testClientAuthenticationDisabledNotProvided() throws Exception {
|
||||
String node = "0";
|
||||
sslServerConfigs.put(SSLConfigs.SSL_CLIENT_AUTH_CONFIG, "none");
|
||||
sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "none");
|
||||
createEchoServer(sslServerConfigs);
|
||||
|
||||
sslClientConfigs.remove(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
|
||||
sslClientConfigs.remove(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
|
||||
sslClientConfigs.remove(SSLConfigs.SSL_KEY_PASSWORD_CONFIG);
|
||||
sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
|
||||
sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
|
||||
sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG);
|
||||
createSelector(sslClientConfigs);
|
||||
InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
|
||||
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
|
||||
|
@ -236,7 +235,7 @@ public class SSLTransportLayerTest {
|
|||
@Test
|
||||
public void testClientAuthenticationRequestedValidProvided() throws Exception {
|
||||
String node = "0";
|
||||
sslServerConfigs.put(SSLConfigs.SSL_CLIENT_AUTH_CONFIG, "requested");
|
||||
sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "requested");
|
||||
createEchoServer(sslServerConfigs);
|
||||
createSelector(sslClientConfigs);
|
||||
InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
|
||||
|
@ -252,12 +251,12 @@ public class SSLTransportLayerTest {
|
|||
@Test
|
||||
public void testClientAuthenticationRequestedNotProvided() throws Exception {
|
||||
String node = "0";
|
||||
sslServerConfigs.put(SSLConfigs.SSL_CLIENT_AUTH_CONFIG, "requested");
|
||||
sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "requested");
|
||||
createEchoServer(sslServerConfigs);
|
||||
|
||||
sslClientConfigs.remove(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
|
||||
sslClientConfigs.remove(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
|
||||
sslClientConfigs.remove(SSLConfigs.SSL_KEY_PASSWORD_CONFIG);
|
||||
sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
|
||||
sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
|
||||
sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG);
|
||||
createSelector(sslClientConfigs);
|
||||
InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
|
||||
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
|
||||
|
@ -270,9 +269,9 @@ public class SSLTransportLayerTest {
|
|||
*/
|
||||
@Test
|
||||
public void testInvalidTruststorePassword() throws Exception {
|
||||
SSLChannelBuilder channelBuilder = new SSLChannelBuilder(Mode.CLIENT);
|
||||
SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT);
|
||||
try {
|
||||
sslClientConfigs.put(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "invalid");
|
||||
sslClientConfigs.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "invalid");
|
||||
channelBuilder.configure(sslClientConfigs);
|
||||
fail("SSL channel configured with invalid truststore password");
|
||||
} catch (KafkaException e) {
|
||||
|
@ -285,9 +284,9 @@ public class SSLTransportLayerTest {
|
|||
*/
|
||||
@Test
|
||||
public void testInvalidKeystorePassword() throws Exception {
|
||||
SSLChannelBuilder channelBuilder = new SSLChannelBuilder(Mode.CLIENT);
|
||||
SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT);
|
||||
try {
|
||||
sslClientConfigs.put(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "invalid");
|
||||
sslClientConfigs.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "invalid");
|
||||
channelBuilder.configure(sslClientConfigs);
|
||||
fail("SSL channel configured with invalid keystore password");
|
||||
} catch (KafkaException e) {
|
||||
|
@ -302,7 +301,7 @@ public class SSLTransportLayerTest {
|
|||
@Test
|
||||
public void testInvalidKeyPassword() throws Exception {
|
||||
String node = "0";
|
||||
sslServerConfigs.put(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, "invalid");
|
||||
sslServerConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "invalid");
|
||||
createEchoServer(sslServerConfigs);
|
||||
createSelector(sslClientConfigs);
|
||||
InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
|
||||
|
@ -317,10 +316,10 @@ public class SSLTransportLayerTest {
|
|||
@Test
|
||||
public void testUnsupportedTLSVersion() throws Exception {
|
||||
String node = "0";
|
||||
sslServerConfigs.put(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.2"));
|
||||
sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.2"));
|
||||
createEchoServer(sslServerConfigs);
|
||||
|
||||
sslClientConfigs.put(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.1"));
|
||||
sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.1"));
|
||||
createSelector(sslClientConfigs);
|
||||
InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
|
||||
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
|
||||
|
@ -335,10 +334,10 @@ public class SSLTransportLayerTest {
|
|||
public void testUnsupportedCiphers() throws Exception {
|
||||
String node = "0";
|
||||
String[] cipherSuites = SSLContext.getDefault().getDefaultSSLParameters().getCipherSuites();
|
||||
sslServerConfigs.put(SSLConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[0]));
|
||||
sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[0]));
|
||||
createEchoServer(sslServerConfigs);
|
||||
|
||||
sslClientConfigs.put(SSLConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[1]));
|
||||
sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[1]));
|
||||
createSelector(sslClientConfigs);
|
||||
InetSocketAddress addr = new InetSocketAddress("localhost", server.port);
|
||||
selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE);
|
||||
|
@ -427,7 +426,7 @@ public class SSLTransportLayerTest {
|
|||
}
|
||||
|
||||
private void createEchoServer(Map<String, Object> sslServerConfigs) throws Exception {
|
||||
server = new SSLEchoServer(sslServerConfigs, "localhost");
|
||||
server = new SslEchoServer(sslServerConfigs, "localhost");
|
||||
server.start();
|
||||
}
|
||||
|
||||
|
@ -437,14 +436,14 @@ public class SSLTransportLayerTest {
|
|||
|
||||
private void createSelector(Map<String, Object> sslClientConfigs, final Integer netReadBufSize, final Integer netWriteBufSize, final Integer appBufSize) {
|
||||
|
||||
this.channelBuilder = new SSLChannelBuilder(Mode.CLIENT) {
|
||||
this.channelBuilder = new SslChannelBuilder(Mode.CLIENT) {
|
||||
|
||||
@Override
|
||||
protected SSLTransportLayer buildTransportLayer(SSLFactory sslFactory, String id, SelectionKey key) throws IOException {
|
||||
protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key) throws IOException {
|
||||
SocketChannel socketChannel = (SocketChannel) key.channel();
|
||||
SSLEngine sslEngine = sslFactory.createSSLEngine(socketChannel.socket().getInetAddress().getHostName(),
|
||||
SSLEngine sslEngine = sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(),
|
||||
socketChannel.socket().getPort());
|
||||
TestSSLTransportLayer transportLayer = new TestSSLTransportLayer(id, key, sslEngine, netReadBufSize, netWriteBufSize, appBufSize);
|
||||
TestSslTransportLayer transportLayer = new TestSslTransportLayer(id, key, sslEngine, netReadBufSize, netWriteBufSize, appBufSize);
|
||||
transportLayer.startHandshake();
|
||||
return transportLayer;
|
||||
}
|
||||
|
@ -463,15 +462,15 @@ public class SSLTransportLayerTest {
|
|||
String name = server ? "server" : "client";
|
||||
Mode mode = server ? Mode.SERVER : Mode.CLIENT;
|
||||
File truststoreFile = File.createTempFile(name + "TS", ".jks");
|
||||
sslConfig = TestSSLUtils.createSSLConfig(!server, true, mode, truststoreFile, name);
|
||||
sslConfig.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
|
||||
sslConfig = TestSslUtils.createSslConfig(!server, true, mode, truststoreFile, name);
|
||||
sslConfig.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
|
||||
}
|
||||
|
||||
private Map<String, Object> getTrustingConfig(CertStores truststoreConfig) {
|
||||
Map<String, Object> config = new HashMap<String, Object>(sslConfig);
|
||||
config.put(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.sslConfig.get(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
|
||||
config.put(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.sslConfig.get(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
|
||||
config.put(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, truststoreConfig.sslConfig.get(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG));
|
||||
config.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
|
||||
config.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
|
||||
config.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG));
|
||||
return config;
|
||||
}
|
||||
|
||||
|
@ -485,13 +484,13 @@ public class SSLTransportLayerTest {
|
|||
* code path. The overridden buffer size starts with a small value and increases in size when the buffer
|
||||
* size is retrieved to handle overflow/underflow, until the actual session buffer size is reached.
|
||||
*/
|
||||
private static class TestSSLTransportLayer extends SSLTransportLayer {
|
||||
private static class TestSslTransportLayer extends SslTransportLayer {
|
||||
|
||||
private final ResizeableBufferSize netReadBufSize;
|
||||
private final ResizeableBufferSize netWriteBufSize;
|
||||
private final ResizeableBufferSize appBufSize;
|
||||
|
||||
public TestSSLTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine,
|
||||
public TestSslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine,
|
||||
Integer netReadBufSize, Integer netWriteBufSize, Integer appBufSize) throws IOException {
|
||||
super(channelId, key, sslEngine, false);
|
||||
this.netReadBufSize = new ResizeableBufferSize(netReadBufSize);
|
||||
|
@ -538,18 +537,18 @@ public class SSLTransportLayerTest {
|
|||
}
|
||||
|
||||
// Non-blocking EchoServer implementation that uses SSLTransportLayer
|
||||
private class SSLEchoServer extends Thread {
|
||||
private class SslEchoServer extends Thread {
|
||||
private final int port;
|
||||
private final ServerSocketChannel serverSocketChannel;
|
||||
private final List<SocketChannel> newChannels;
|
||||
private final List<SocketChannel> socketChannels;
|
||||
private final AcceptorThread acceptorThread;
|
||||
private SSLFactory sslFactory;
|
||||
private SslFactory sslFactory;
|
||||
private final Selector selector;
|
||||
private final ConcurrentLinkedQueue<NetworkSend> inflightSends = new ConcurrentLinkedQueue<NetworkSend>();
|
||||
|
||||
public SSLEchoServer(Map<String, ?> configs, String serverHost) throws Exception {
|
||||
this.sslFactory = new SSLFactory(Mode.SERVER);
|
||||
public SslEchoServer(Map<String, ?> configs, String serverHost) throws Exception {
|
||||
this.sslFactory = new SslFactory(Mode.SERVER);
|
||||
this.sslFactory.configure(configs);
|
||||
serverSocketChannel = ServerSocketChannel.open();
|
||||
serverSocketChannel.configureBlocking(false);
|
||||
|
@ -557,7 +556,7 @@ public class SSLTransportLayerTest {
|
|||
this.port = serverSocketChannel.socket().getLocalPort();
|
||||
this.socketChannels = Collections.synchronizedList(new ArrayList<SocketChannel>());
|
||||
this.newChannels = Collections.synchronizedList(new ArrayList<SocketChannel>());
|
||||
SSLChannelBuilder channelBuilder = new SSLChannelBuilder(Mode.SERVER);
|
||||
SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.SERVER);
|
||||
channelBuilder.configure(sslServerConfigs);
|
||||
this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap<String, String>(), channelBuilder);
|
||||
setName("echoserver");
|
|
@ -17,7 +17,7 @@ import javax.net.ssl.*;
|
|||
import java.io.File;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.test.TestSSLUtils;
|
||||
import org.apache.kafka.test.TestSslUtils;
|
||||
import org.apache.kafka.common.network.Mode;
|
||||
|
||||
import org.junit.Test;
|
||||
|
@ -31,16 +31,16 @@ import static org.junit.Assert.assertTrue;
|
|||
* A set of tests for the selector over ssl. These use a test harness that runs a simple socket server that echos back responses.
|
||||
*/
|
||||
|
||||
public class SSLFactoryTest {
|
||||
public class SslFactoryTest {
|
||||
|
||||
@Test
|
||||
public void testSSLFactoryConfiguration() throws Exception {
|
||||
public void testSslFactoryConfiguration() throws Exception {
|
||||
File trustStoreFile = File.createTempFile("truststore", ".jks");
|
||||
Map<String, Object> serverSSLConfig = TestSSLUtils.createSSLConfig(false, true, Mode.SERVER, trustStoreFile, "server");
|
||||
SSLFactory sslFactory = new SSLFactory(Mode.SERVER);
|
||||
sslFactory.configure(serverSSLConfig);
|
||||
Map<String, Object> serverSslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, "server");
|
||||
SslFactory sslFactory = new SslFactory(Mode.SERVER);
|
||||
sslFactory.configure(serverSslConfig);
|
||||
//host and port are hints
|
||||
SSLEngine engine = sslFactory.createSSLEngine("localhost", 0);
|
||||
SSLEngine engine = sslFactory.createSslEngine("localhost", 0);
|
||||
assertNotNull(engine);
|
||||
String[] expectedProtocols = {"TLSv1.2"};
|
||||
assertArrayEquals(expectedProtocols, engine.getEnabledProtocols());
|
||||
|
@ -50,11 +50,11 @@ public class SSLFactoryTest {
|
|||
@Test
|
||||
public void testClientMode() throws Exception {
|
||||
File trustStoreFile = File.createTempFile("truststore", ".jks");
|
||||
Map<String, Object> clientSSLConfig = TestSSLUtils.createSSLConfig(false, true, Mode.CLIENT, trustStoreFile, "client");
|
||||
SSLFactory sslFactory = new SSLFactory(Mode.CLIENT);
|
||||
sslFactory.configure(clientSSLConfig);
|
||||
Map<String, Object> clientSslConfig = TestSslUtils.createSslConfig(false, true, Mode.CLIENT, trustStoreFile, "client");
|
||||
SslFactory sslFactory = new SslFactory(Mode.CLIENT);
|
||||
sslFactory.configure(clientSslConfig);
|
||||
//host and port are hints
|
||||
SSLEngine engine = sslFactory.createSSLEngine("localhost", 0);
|
||||
SSLEngine engine = sslFactory.createSslEngine("localhost", 0);
|
||||
assertTrue(engine.getUseClientMode());
|
||||
}
|
||||
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package org.apache.kafka.test;
|
||||
|
||||
import org.apache.kafka.common.config.SSLConfigs;
|
||||
import org.apache.kafka.common.config.SslConfigs;
|
||||
import org.apache.kafka.common.network.Mode;
|
||||
import org.apache.kafka.clients.CommonClientConfigs;
|
||||
|
||||
|
@ -53,8 +53,7 @@ import java.util.Map;
|
|||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
|
||||
|
||||
public class TestSSLUtils {
|
||||
public class TestSslUtils {
|
||||
|
||||
/**
|
||||
* Create a self-signed X.509 Certificate.
|
||||
|
@ -177,33 +176,33 @@ public class TestSSLUtils {
|
|||
return certs;
|
||||
}
|
||||
|
||||
public static Map<String, Object> createSSLConfig(Mode mode, File keyStoreFile, String password, String keyPassword,
|
||||
public static Map<String, Object> createSslConfig(Mode mode, File keyStoreFile, String password, String keyPassword,
|
||||
File trustStoreFile, String trustStorePassword) {
|
||||
Map<String, Object> sslConfigs = new HashMap<String, Object>();
|
||||
Map<String, Object> sslConfigs = new HashMap<>();
|
||||
sslConfigs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); // kafka security protocol
|
||||
sslConfigs.put(SSLConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); // protocol to create SSLContext
|
||||
sslConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); // protocol to create SSLContext
|
||||
|
||||
if (mode == Mode.SERVER || (mode == Mode.CLIENT && keyStoreFile != null)) {
|
||||
sslConfigs.put(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreFile.getPath());
|
||||
sslConfigs.put(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
|
||||
sslConfigs.put(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm());
|
||||
sslConfigs.put(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password);
|
||||
sslConfigs.put(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword);
|
||||
sslConfigs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreFile.getPath());
|
||||
sslConfigs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS");
|
||||
sslConfigs.put(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm());
|
||||
sslConfigs.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password);
|
||||
sslConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword);
|
||||
}
|
||||
|
||||
sslConfigs.put(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreFile.getPath());
|
||||
sslConfigs.put(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);
|
||||
sslConfigs.put(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
|
||||
sslConfigs.put(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm());
|
||||
sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreFile.getPath());
|
||||
sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);
|
||||
sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
|
||||
sslConfigs.put(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm());
|
||||
|
||||
List<String> enabledProtocols = new ArrayList<String>();
|
||||
List<String> enabledProtocols = new ArrayList<>();
|
||||
enabledProtocols.add("TLSv1.2");
|
||||
sslConfigs.put(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, enabledProtocols);
|
||||
sslConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, enabledProtocols);
|
||||
|
||||
return sslConfigs;
|
||||
}
|
||||
|
||||
public static Map<String, Object> createSSLConfig(boolean useClientCert, boolean trustStore, Mode mode, File trustStoreFile, String certAlias)
|
||||
public static Map<String, Object> createSslConfig(boolean useClientCert, boolean trustStore, Mode mode, File trustStoreFile, String certAlias)
|
||||
throws IOException, GeneralSecurityException {
|
||||
Map<String, X509Certificate> certs = new HashMap<String, X509Certificate>();
|
||||
File keyStoreFile;
|
||||
|
@ -235,7 +234,7 @@ public class TestSSLUtils {
|
|||
createTrustStore(trustStoreFile.getPath(), trustStorePassword, certs);
|
||||
}
|
||||
|
||||
Map<String, Object> sslConfig = createSSLConfig(mode, keyStoreFile, password,
|
||||
Map<String, Object> sslConfig = createSslConfig(mode, keyStoreFile, password,
|
||||
password, trustStoreFile, trustStorePassword);
|
||||
return sslConfig;
|
||||
}
|
|
@ -20,7 +20,7 @@ package org.apache.kafka.copycat.runtime.distributed;
|
|||
import org.apache.kafka.clients.CommonClientConfigs;
|
||||
import org.apache.kafka.common.config.AbstractConfig;
|
||||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.common.config.SSLConfigs;
|
||||
import org.apache.kafka.common.config.SslConfigs;
|
||||
import org.apache.kafka.common.config.SaslConfigs;
|
||||
|
||||
import java.util.Map;
|
||||
|
@ -139,21 +139,21 @@ public class DistributedHerderConfig extends AbstractConfig {
|
|||
ConfigDef.Importance.LOW,
|
||||
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
|
||||
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, ConfigDef.Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, ConfigDef.Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
|
||||
.define(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, ConfigDef.Type.CLASS, SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, ConfigDef.Importance.LOW, SSLConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
|
||||
.define(SSLConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, SSLConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, SSLConfigs.SSL_PROTOCOL_DOC)
|
||||
.define(SSLConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SSLConfigs.SSL_PROVIDER_DOC, false)
|
||||
.define(SSLConfigs.SSL_CIPHER_SUITES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.LOW, SSLConfigs.SSL_CIPHER_SUITES_DOC, false)
|
||||
.define(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, ConfigDef.Type.LIST, SSLConfigs.DEFAULT_ENABLED_PROTOCOLS, ConfigDef.Importance.MEDIUM, SSLConfigs.SSL_ENABLED_PROTOCOLS_DOC)
|
||||
.define(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SSLConfigs.DEFAULT_SSL_KEYSTORE_TYPE, ConfigDef.Importance.MEDIUM, SSLConfigs.SSL_KEYSTORE_TYPE_DOC)
|
||||
.define(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SSLConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
|
||||
.define(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SSLConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
|
||||
.define(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SSLConfigs.SSL_KEY_PASSWORD_DOC, false)
|
||||
.define(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, ConfigDef.Importance.MEDIUM, SSLConfigs.SSL_TRUSTSTORE_TYPE_DOC)
|
||||
.define(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
|
||||
.define(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
|
||||
.define(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, ConfigDef.Importance.LOW, SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
|
||||
.define(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, ConfigDef.Importance.LOW, SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
|
||||
.define(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.LOW, SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
|
||||
.define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, ConfigDef.Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, ConfigDef.Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
|
||||
.define(SslConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
|
||||
.define(SslConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false)
|
||||
.define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false)
|
||||
.define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, ConfigDef.Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
|
||||
.define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
|
||||
.define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
|
||||
.define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
|
||||
.define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false)
|
||||
.define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
|
||||
.define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
|
||||
.define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
|
||||
.define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
|
||||
.define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
|
||||
.define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
|
||||
.define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
|
||||
.define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
|
||||
.define(SaslConfigs.SASL_KAFKA_SERVER_REALM, ConfigDef.Type.STRING, ConfigDef.Importance.LOW, SaslConfigs.SASL_KAFKA_SERVER_DOC, false)
|
||||
|
|
|
@ -24,7 +24,7 @@ import kafka.common.{TopicAndPartition, ErrorMapping}
|
|||
import kafka.message.{MessageSet, ByteBufferMessageSet}
|
||||
import kafka.api.ApiUtils._
|
||||
import org.apache.kafka.common.KafkaException
|
||||
import org.apache.kafka.common.network.{SSLTransportLayer, TransportLayer, Send, MultiSend}
|
||||
import org.apache.kafka.common.network.{Send, MultiSend}
|
||||
|
||||
import scala.collection._
|
||||
|
||||
|
|
|
@ -17,9 +17,6 @@
|
|||
|
||||
package kafka.cluster
|
||||
|
||||
import kafka.utils.CoreUtils._
|
||||
import kafka.utils.Json
|
||||
import kafka.api.ApiUtils._
|
||||
import java.nio.ByteBuffer
|
||||
|
||||
import kafka.common.{BrokerEndPointNotAvailableException, BrokerNotAvailableException, KafkaException}
|
||||
|
@ -29,8 +26,7 @@ import org.apache.kafka.common.protocol.SecurityProtocol
|
|||
/**
|
||||
* A Kafka broker.
|
||||
* A broker has an id and a collection of end-points.
|
||||
* Each end-point is (host, port,protocolType).
|
||||
* Currently the only protocol type is PlainText but we will add SSL and Kerberos in the future.
|
||||
* Each end-point is (host, port, protocolType).
|
||||
*/
|
||||
object Broker {
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@ import kafka.utils.CoreUtils
|
|||
import org.apache.kafka.clients.CommonClientConfigs
|
||||
import org.apache.kafka.common.config.SaslConfigs
|
||||
|
||||
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SSLConfigs}
|
||||
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SslConfigs}
|
||||
import org.apache.kafka.common.metrics.MetricsReporter
|
||||
import org.apache.kafka.common.protocol.SecurityProtocol
|
||||
import org.apache.kafka.common.security.auth.PrincipalBuilder
|
||||
|
@ -156,17 +156,17 @@ object Defaults {
|
|||
val MetricReporterClasses = ""
|
||||
|
||||
/** ********* SSL configuration ***********/
|
||||
val PrincipalBuilderClass = SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS
|
||||
val SSLProtocol = SSLConfigs.DEFAULT_SSL_PROTOCOL
|
||||
val SSLEnabledProtocols = SSLConfigs.DEFAULT_ENABLED_PROTOCOLS
|
||||
val SSLKeystoreType = SSLConfigs.DEFAULT_SSL_KEYSTORE_TYPE
|
||||
val SSLTruststoreType = SSLConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
|
||||
val SSLKeyManagerAlgorithm = SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM
|
||||
val SSLTrustManagerAlgorithm = SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM
|
||||
val SSLClientAuthRequired = "required"
|
||||
val SSLClientAuthRequested = "requested"
|
||||
val SSLClientAuthNone = "none"
|
||||
val SSLClientAuth = SSLClientAuthNone
|
||||
val PrincipalBuilderClass = SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS
|
||||
val SslProtocol = SslConfigs.DEFAULT_SSL_PROTOCOL
|
||||
val SslEnabledProtocols = SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS
|
||||
val SslKeystoreType = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
|
||||
val SslTruststoreType = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
|
||||
val SslKeyManagerAlgorithm = SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM
|
||||
val SslTrustManagerAlgorithm = SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM
|
||||
val SslClientAuthRequired = "required"
|
||||
val SslClientAuthRequested = "requested"
|
||||
val SslClientAuthNone = "none"
|
||||
val SslClientAuth = SslClientAuthNone
|
||||
|
||||
/** ********* Sasl configuration ***********/
|
||||
val SaslKerberosKinitCmd = SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD
|
||||
|
@ -305,22 +305,22 @@ object KafkaConfig {
|
|||
val MetricReporterClassesProp: String = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG
|
||||
|
||||
/** ********* SSL Configuration ****************/
|
||||
val PrincipalBuilderClassProp = SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG
|
||||
val SSLProtocolProp = SSLConfigs.SSL_PROTOCOL_CONFIG
|
||||
val SSLProviderProp = SSLConfigs.SSL_PROVIDER_CONFIG
|
||||
val SSLCipherSuitesProp = SSLConfigs.SSL_CIPHER_SUITES_CONFIG
|
||||
val SSLEnabledProtocolsProp = SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG
|
||||
val SSLKeystoreTypeProp = SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG
|
||||
val SSLKeystoreLocationProp = SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG
|
||||
val SSLKeystorePasswordProp = SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG
|
||||
val SSLKeyPasswordProp = SSLConfigs.SSL_KEY_PASSWORD_CONFIG
|
||||
val SSLTruststoreTypeProp = SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG
|
||||
val SSLTruststoreLocationProp = SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG
|
||||
val SSLTruststorePasswordProp = SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG
|
||||
val SSLKeyManagerAlgorithmProp = SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG
|
||||
val SSLTrustManagerAlgorithmProp = SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG
|
||||
val SSLEndpointIdentificationAlgorithmProp = SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG
|
||||
val SSLClientAuthProp = SSLConfigs.SSL_CLIENT_AUTH_CONFIG
|
||||
val PrincipalBuilderClassProp = SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG
|
||||
val SslProtocolProp = SslConfigs.SSL_PROTOCOL_CONFIG
|
||||
val SslProviderProp = SslConfigs.SSL_PROVIDER_CONFIG
|
||||
val SslCipherSuitesProp = SslConfigs.SSL_CIPHER_SUITES_CONFIG
|
||||
val SslEnabledProtocolsProp = SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG
|
||||
val SslKeystoreTypeProp = SslConfigs.SSL_KEYSTORE_TYPE_CONFIG
|
||||
val SslKeystoreLocationProp = SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG
|
||||
val SslKeystorePasswordProp = SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG
|
||||
val SslKeyPasswordProp = SslConfigs.SSL_KEY_PASSWORD_CONFIG
|
||||
val SslTruststoreTypeProp = SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG
|
||||
val SslTruststoreLocationProp = SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG
|
||||
val SslTruststorePasswordProp = SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG
|
||||
val SslKeyManagerAlgorithmProp = SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG
|
||||
val SslTrustManagerAlgorithmProp = SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG
|
||||
val SslEndpointIdentificationAlgorithmProp = SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG
|
||||
val SslClientAuthProp = SslConfigs.SSL_CLIENT_AUTH_CONFIG
|
||||
|
||||
/** ********* SASL Configuration ****************/
|
||||
val SaslKerberosServiceNameProp = SaslConfigs.SASL_KERBEROS_SERVICE_NAME
|
||||
|
@ -480,22 +480,22 @@ object KafkaConfig {
|
|||
val MetricReporterClassesDoc = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC
|
||||
|
||||
/** ********* SSL Configuration ****************/
|
||||
val PrincipalBuilderClassDoc = SSLConfigs.PRINCIPAL_BUILDER_CLASS_DOC
|
||||
val SSLProtocolDoc = SSLConfigs.SSL_PROTOCOL_DOC
|
||||
val SSLProviderDoc = SSLConfigs.SSL_PROVIDER_DOC
|
||||
val SSLCipherSuitesDoc = SSLConfigs.SSL_CIPHER_SUITES_DOC
|
||||
val SSLEnabledProtocolsDoc = SSLConfigs.SSL_ENABLED_PROTOCOLS_DOC
|
||||
val SSLKeystoreTypeDoc = SSLConfigs.SSL_KEYSTORE_TYPE_DOC
|
||||
val SSLKeystoreLocationDoc = SSLConfigs.SSL_KEYSTORE_LOCATION_DOC
|
||||
val SSLKeystorePasswordDoc = SSLConfigs.SSL_KEYSTORE_PASSWORD_DOC
|
||||
val SSLKeyPasswordDoc = SSLConfigs.SSL_KEY_PASSWORD_DOC
|
||||
val SSLTruststoreTypeDoc = SSLConfigs.SSL_TRUSTSTORE_TYPE_DOC
|
||||
val SSLTruststorePasswordDoc = SSLConfigs.SSL_TRUSTSTORE_PASSWORD_DOC
|
||||
val SSLTruststoreLocationDoc = SSLConfigs.SSL_TRUSTSTORE_LOCATION_DOC
|
||||
val SSLKeyManagerAlgorithmDoc = SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC
|
||||
val SSLTrustManagerAlgorithmDoc = SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC
|
||||
val SSLEndpointIdentificationAlgorithmDoc = SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC
|
||||
val SSLClientAuthDoc = SSLConfigs.SSL_CLIENT_AUTH_DOC
|
||||
val PrincipalBuilderClassDoc = SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC
|
||||
val SslProtocolDoc = SslConfigs.SSL_PROTOCOL_DOC
|
||||
val SslProviderDoc = SslConfigs.SSL_PROVIDER_DOC
|
||||
val SslCipherSuitesDoc = SslConfigs.SSL_CIPHER_SUITES_DOC
|
||||
val SslEnabledProtocolsDoc = SslConfigs.SSL_ENABLED_PROTOCOLS_DOC
|
||||
val SslKeystoreTypeDoc = SslConfigs.SSL_KEYSTORE_TYPE_DOC
|
||||
val SslKeystoreLocationDoc = SslConfigs.SSL_KEYSTORE_LOCATION_DOC
|
||||
val SslKeystorePasswordDoc = SslConfigs.SSL_KEYSTORE_PASSWORD_DOC
|
||||
val SslKeyPasswordDoc = SslConfigs.SSL_KEY_PASSWORD_DOC
|
||||
val SslTruststoreTypeDoc = SslConfigs.SSL_TRUSTSTORE_TYPE_DOC
|
||||
val SslTruststorePasswordDoc = SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC
|
||||
val SslTruststoreLocationDoc = SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC
|
||||
val SslKeyManagerAlgorithmDoc = SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC
|
||||
val SslTrustManagerAlgorithmDoc = SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC
|
||||
val SslEndpointIdentificationAlgorithmDoc = SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC
|
||||
val SslClientAuthDoc = SslConfigs.SSL_CLIENT_AUTH_DOC
|
||||
|
||||
/** ********* Sasl Configuration ****************/
|
||||
val SaslKerberosServiceNameDoc = SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC
|
||||
|
@ -645,21 +645,21 @@ object KafkaConfig {
|
|||
|
||||
/** ********* SSL Configuration ****************/
|
||||
.define(PrincipalBuilderClassProp, CLASS, Defaults.PrincipalBuilderClass, MEDIUM, PrincipalBuilderClassDoc)
|
||||
.define(SSLProtocolProp, STRING, Defaults.SSLProtocol, MEDIUM, SSLProtocolDoc)
|
||||
.define(SSLProviderProp, STRING, MEDIUM, SSLProviderDoc, false)
|
||||
.define(SSLEnabledProtocolsProp, LIST, Defaults.SSLEnabledProtocols, MEDIUM, SSLEnabledProtocolsDoc)
|
||||
.define(SSLKeystoreTypeProp, STRING, Defaults.SSLKeystoreType, MEDIUM, SSLKeystoreTypeDoc)
|
||||
.define(SSLKeystoreLocationProp, STRING, MEDIUM, SSLKeystoreLocationDoc, false)
|
||||
.define(SSLKeystorePasswordProp, STRING, MEDIUM, SSLKeystorePasswordDoc, false)
|
||||
.define(SSLKeyPasswordProp, STRING, MEDIUM, SSLKeyPasswordDoc, false)
|
||||
.define(SSLTruststoreTypeProp, STRING, Defaults.SSLTruststoreType, MEDIUM, SSLTruststoreTypeDoc)
|
||||
.define(SSLTruststoreLocationProp, STRING, MEDIUM, SSLTruststoreLocationDoc, false)
|
||||
.define(SSLTruststorePasswordProp, STRING, MEDIUM, SSLTruststorePasswordDoc, false)
|
||||
.define(SSLKeyManagerAlgorithmProp, STRING, Defaults.SSLKeyManagerAlgorithm, MEDIUM, SSLKeyManagerAlgorithmDoc)
|
||||
.define(SSLTrustManagerAlgorithmProp, STRING, Defaults.SSLTrustManagerAlgorithm, MEDIUM, SSLTrustManagerAlgorithmDoc)
|
||||
.define(SSLEndpointIdentificationAlgorithmProp, STRING, LOW, SSLEndpointIdentificationAlgorithmDoc, false)
|
||||
.define(SSLClientAuthProp, STRING, Defaults.SSLClientAuth, in(Defaults.SSLClientAuthRequired, Defaults.SSLClientAuthRequested, Defaults.SSLClientAuthNone), MEDIUM, SSLClientAuthDoc)
|
||||
.define(SSLCipherSuitesProp, LIST, MEDIUM, SSLCipherSuitesDoc, false)
|
||||
.define(SslProtocolProp, STRING, Defaults.SslProtocol, MEDIUM, SslProtocolDoc)
|
||||
.define(SslProviderProp, STRING, MEDIUM, SslProviderDoc, false)
|
||||
.define(SslEnabledProtocolsProp, LIST, Defaults.SslEnabledProtocols, MEDIUM, SslEnabledProtocolsDoc)
|
||||
.define(SslKeystoreTypeProp, STRING, Defaults.SslKeystoreType, MEDIUM, SslKeystoreTypeDoc)
|
||||
.define(SslKeystoreLocationProp, STRING, MEDIUM, SslKeystoreLocationDoc, false)
|
||||
.define(SslKeystorePasswordProp, STRING, MEDIUM, SslKeystorePasswordDoc, false)
|
||||
.define(SslKeyPasswordProp, STRING, MEDIUM, SslKeyPasswordDoc, false)
|
||||
.define(SslTruststoreTypeProp, STRING, Defaults.SslTruststoreType, MEDIUM, SslTruststoreTypeDoc)
|
||||
.define(SslTruststoreLocationProp, STRING, MEDIUM, SslTruststoreLocationDoc, false)
|
||||
.define(SslTruststorePasswordProp, STRING, MEDIUM, SslTruststorePasswordDoc, false)
|
||||
.define(SslKeyManagerAlgorithmProp, STRING, Defaults.SslKeyManagerAlgorithm, MEDIUM, SslKeyManagerAlgorithmDoc)
|
||||
.define(SslTrustManagerAlgorithmProp, STRING, Defaults.SslTrustManagerAlgorithm, MEDIUM, SslTrustManagerAlgorithmDoc)
|
||||
.define(SslEndpointIdentificationAlgorithmProp, STRING, LOW, SslEndpointIdentificationAlgorithmDoc, false)
|
||||
.define(SslClientAuthProp, STRING, Defaults.SslClientAuth, in(Defaults.SslClientAuthRequired, Defaults.SslClientAuthRequested, Defaults.SslClientAuthNone), MEDIUM, SslClientAuthDoc)
|
||||
.define(SslCipherSuitesProp, LIST, MEDIUM, SslCipherSuitesDoc, false)
|
||||
|
||||
/** ********* Sasl Configuration ****************/
|
||||
.define(SaslKerberosServiceNameProp, STRING, MEDIUM, SaslKerberosServiceNameDoc, false)
|
||||
|
@ -815,20 +815,20 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
|
|||
|
||||
/** ********* SSL Configuration **************/
|
||||
val principalBuilderClass = getClass(KafkaConfig.PrincipalBuilderClassProp)
|
||||
val sslProtocol = getString(KafkaConfig.SSLProtocolProp)
|
||||
val sslProvider = getString(KafkaConfig.SSLProviderProp)
|
||||
val sslEnabledProtocols = getList(KafkaConfig.SSLEnabledProtocolsProp)
|
||||
val sslKeystoreType = getString(KafkaConfig.SSLKeystoreTypeProp)
|
||||
val sslKeystoreLocation = getString(KafkaConfig.SSLKeystoreLocationProp)
|
||||
val sslKeystorePassword = getString(KafkaConfig.SSLKeystorePasswordProp)
|
||||
val sslKeyPassword = getString(KafkaConfig.SSLKeyPasswordProp)
|
||||
val sslTruststoreType = getString(KafkaConfig.SSLTruststoreTypeProp)
|
||||
val sslTruststoreLocation = getString(KafkaConfig.SSLTruststoreLocationProp)
|
||||
val sslTruststorePassword = getString(KafkaConfig.SSLTruststorePasswordProp)
|
||||
val sslKeyManagerAlgorithm = getString(KafkaConfig.SSLKeyManagerAlgorithmProp)
|
||||
val sslTrustManagerAlgorithm = getString(KafkaConfig.SSLTrustManagerAlgorithmProp)
|
||||
val sslClientAuth = getString(KafkaConfig.SSLClientAuthProp)
|
||||
val sslCipher = getList(KafkaConfig.SSLCipherSuitesProp)
|
||||
val sslProtocol = getString(KafkaConfig.SslProtocolProp)
|
||||
val sslProvider = getString(KafkaConfig.SslProviderProp)
|
||||
val sslEnabledProtocols = getList(KafkaConfig.SslEnabledProtocolsProp)
|
||||
val sslKeystoreType = getString(KafkaConfig.SslKeystoreTypeProp)
|
||||
val sslKeystoreLocation = getString(KafkaConfig.SslKeystoreLocationProp)
|
||||
val sslKeystorePassword = getString(KafkaConfig.SslKeystorePasswordProp)
|
||||
val sslKeyPassword = getString(KafkaConfig.SslKeyPasswordProp)
|
||||
val sslTruststoreType = getString(KafkaConfig.SslTruststoreTypeProp)
|
||||
val sslTruststoreLocation = getString(KafkaConfig.SslTruststoreLocationProp)
|
||||
val sslTruststorePassword = getString(KafkaConfig.SslTruststorePasswordProp)
|
||||
val sslKeyManagerAlgorithm = getString(KafkaConfig.SslKeyManagerAlgorithmProp)
|
||||
val sslTrustManagerAlgorithm = getString(KafkaConfig.SslTrustManagerAlgorithmProp)
|
||||
val sslClientAuth = getString(KafkaConfig.SslClientAuthProp)
|
||||
val sslCipher = getList(KafkaConfig.SslCipherSuitesProp)
|
||||
|
||||
/** ********* Sasl Configuration **************/
|
||||
val saslKerberosServiceName = getString(KafkaConfig.SaslKerberosServiceNameProp)
|
||||
|
|
|
@ -30,10 +30,10 @@ import org.apache.kafka.clients.{ManualMetadataUpdater, NetworkClient, ClientReq
|
|||
import org.apache.kafka.common.network.{LoginType, Selectable, ChannelBuilders, NetworkReceive, Selector, Mode}
|
||||
import org.apache.kafka.common.requests.{ListOffsetResponse, FetchResponse, RequestSend, AbstractRequest, ListOffsetRequest}
|
||||
import org.apache.kafka.common.requests.{FetchRequest => JFetchRequest}
|
||||
import org.apache.kafka.common.security.ssl.SslFactory
|
||||
import org.apache.kafka.common.{Node, TopicPartition}
|
||||
import org.apache.kafka.common.metrics.Metrics
|
||||
import org.apache.kafka.common.protocol.{Errors, ApiKeys}
|
||||
import org.apache.kafka.common.security.ssl.SSLFactory
|
||||
import org.apache.kafka.common.utils.Time
|
||||
|
||||
import scala.collection.{JavaConverters, Map, mutable}
|
||||
|
|
|
@ -219,7 +219,7 @@ class SocketServerTest extends JUnitSuite {
|
|||
}
|
||||
|
||||
@Test
|
||||
def testSSLSocketServer(): Unit = {
|
||||
def testSslSocketServer(): Unit = {
|
||||
val trustStoreFile = File.createTempFile("truststore", ".jks")
|
||||
val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, enableSsl = true,
|
||||
trustStoreFile = Some(trustStoreFile))
|
||||
|
|
|
@ -491,21 +491,21 @@ class KafkaConfigTest {
|
|||
|
||||
//SSL Configs
|
||||
case KafkaConfig.PrincipalBuilderClassProp =>
|
||||
case KafkaConfig.SSLProtocolProp => // ignore string
|
||||
case KafkaConfig.SSLProviderProp => // ignore string
|
||||
case KafkaConfig.SSLEnabledProtocolsProp =>
|
||||
case KafkaConfig.SSLKeystoreTypeProp => // ignore string
|
||||
case KafkaConfig.SSLKeystoreLocationProp => // ignore string
|
||||
case KafkaConfig.SSLKeystorePasswordProp => // ignore string
|
||||
case KafkaConfig.SSLKeyPasswordProp => // ignore string
|
||||
case KafkaConfig.SSLTruststoreTypeProp => // ignore string
|
||||
case KafkaConfig.SSLTruststorePasswordProp => // ignore string
|
||||
case KafkaConfig.SSLTruststoreLocationProp => // ignore string
|
||||
case KafkaConfig.SSLKeyManagerAlgorithmProp =>
|
||||
case KafkaConfig.SSLTrustManagerAlgorithmProp =>
|
||||
case KafkaConfig.SSLClientAuthProp => // ignore string
|
||||
case KafkaConfig.SSLEndpointIdentificationAlgorithmProp => // ignore string
|
||||
case KafkaConfig.SSLCipherSuitesProp => // ignore string
|
||||
case KafkaConfig.SslProtocolProp => // ignore string
|
||||
case KafkaConfig.SslProviderProp => // ignore string
|
||||
case KafkaConfig.SslEnabledProtocolsProp =>
|
||||
case KafkaConfig.SslKeystoreTypeProp => // ignore string
|
||||
case KafkaConfig.SslKeystoreLocationProp => // ignore string
|
||||
case KafkaConfig.SslKeystorePasswordProp => // ignore string
|
||||
case KafkaConfig.SslKeyPasswordProp => // ignore string
|
||||
case KafkaConfig.SslTruststoreTypeProp => // ignore string
|
||||
case KafkaConfig.SslTruststorePasswordProp => // ignore string
|
||||
case KafkaConfig.SslTruststoreLocationProp => // ignore string
|
||||
case KafkaConfig.SslKeyManagerAlgorithmProp =>
|
||||
case KafkaConfig.SslTrustManagerAlgorithmProp =>
|
||||
case KafkaConfig.SslClientAuthProp => // ignore string
|
||||
case KafkaConfig.SslEndpointIdentificationAlgorithmProp => // ignore string
|
||||
case KafkaConfig.SslCipherSuitesProp => // ignore string
|
||||
|
||||
//Sasl Configs
|
||||
case KafkaConfig.SaslKerberosServiceNameProp => // ignore string
|
||||
|
|
|
@ -28,7 +28,9 @@ import charset.Charset
|
|||
|
||||
import kafka.security.auth.{Resource, Authorizer, Acl}
|
||||
import org.apache.kafka.common.protocol.SecurityProtocol
|
||||
import org.apache.kafka.common.security.ssl.SslFactory
|
||||
import org.apache.kafka.common.utils.Utils._
|
||||
import org.apache.kafka.test.TestSslUtils
|
||||
|
||||
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
|
||||
|
||||
|
@ -52,8 +54,6 @@ import org.apache.kafka.clients.producer.KafkaProducer
|
|||
import org.apache.kafka.clients.consumer.{RangeAssignor, KafkaConsumer}
|
||||
import org.apache.kafka.clients.CommonClientConfigs
|
||||
import org.apache.kafka.common.network.Mode
|
||||
import org.apache.kafka.common.security.ssl.SSLFactory
|
||||
import org.apache.kafka.test.TestSSLUtils
|
||||
|
||||
import scala.collection.Map
|
||||
import scala.collection.JavaConversions._
|
||||
|
@ -964,9 +964,9 @@ object TestUtils extends Logging {
|
|||
|
||||
val sslConfigs = {
|
||||
if (mode == Mode.SERVER)
|
||||
TestSSLUtils.createSSLConfig(true, true, mode, trustStore, certAlias)
|
||||
TestSslUtils.createSslConfig(true, true, mode, trustStore, certAlias)
|
||||
else
|
||||
TestSSLUtils.createSSLConfig(clientCert, false, mode, trustStore, certAlias)
|
||||
TestSslUtils.createSslConfig(clientCert, false, mode, trustStore, certAlias)
|
||||
}
|
||||
|
||||
val sslProps = new Properties()
|
||||
|
|
Loading…
Reference in New Issue