From 16f194b20ad9795188f1d7781e7cbca1cd2a6a2d Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Sat, 24 Oct 2015 09:42:19 -0700 Subject: [PATCH] KAFKA-2460; Fix capitalisation in SSL classes Author: Ismael Juma Reviewers: Sriharsha Chintalapani Closes #355 from ijuma/kafka-2460-fix-capitalisation-in-ssl-classes --- .../clients/consumer/ConsumerConfig.java | 32 ++-- .../clients/producer/ProducerConfig.java | 32 ++-- .../{SSLConfigs.java => SslConfigs.java} | 4 +- .../kafka/common/network/ChannelBuilders.java | 2 +- .../network/PlaintextChannelBuilder.java | 5 +- .../common/network/SaslChannelBuilder.java | 14 +- ...nelBuilder.java => SslChannelBuilder.java} | 24 +-- ...sportLayer.java => SslTransportLayer.java} | 11 +- .../kafka/common/network/TransportLayer.java | 3 +- .../ssl/{SSLFactory.java => SslFactory.java} | 38 ++--- .../clients/producer/KafkaProducerTest.java | 4 +- .../kafka/common/network/EchoServer.java | 6 +- .../kafka/common/network/SelectorTest.java | 4 +- ...SelectorTest.java => SslSelectorTest.java} | 26 ++-- ...erTest.java => SslTransportLayerTest.java} | 109 +++++++------ ...SLFactoryTest.java => SslFactoryTest.java} | 22 +-- .../{TestSSLUtils.java => TestSslUtils.java} | 37 +++-- .../distributed/DistributedHerderConfig.java | 32 ++-- .../main/scala/kafka/api/FetchResponse.scala | 2 +- .../src/main/scala/kafka/cluster/Broker.scala | 6 +- .../main/scala/kafka/server/KafkaConfig.scala | 146 +++++++++--------- .../kafka/server/ReplicaFetcherThread.scala | 2 +- .../unit/kafka/network/SocketServerTest.scala | 2 +- .../unit/kafka/server/KafkaConfigTest.scala | 30 ++-- .../scala/unit/kafka/utils/TestUtils.scala | 8 +- 25 files changed, 297 insertions(+), 304 deletions(-) rename clients/src/main/java/org/apache/kafka/common/config/{SSLConfigs.java => SslConfigs.java} (98%) rename clients/src/main/java/org/apache/kafka/common/network/{SSLChannelBuilder.java => SslChannelBuilder.java} (79%) rename clients/src/main/java/org/apache/kafka/common/network/{SSLTransportLayer.java => SslTransportLayer.java} (98%) rename clients/src/main/java/org/apache/kafka/common/security/ssl/{SSLFactory.java => SslFactory.java} (86%) rename clients/src/test/java/org/apache/kafka/common/network/{SSLSelectorTest.java => SslSelectorTest.java} (86%) rename clients/src/test/java/org/apache/kafka/common/network/{SSLTransportLayerTest.java => SslTransportLayerTest.java} (86%) rename clients/src/test/java/org/apache/kafka/common/security/ssl/{SSLFactoryTest.java => SslFactoryTest.java} (71%) rename clients/src/test/java/org/apache/kafka/test/{TestSSLUtils.java => TestSslUtils.java} (90%) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 5cc04191e8d..14c54c2f197 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -17,7 +17,7 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; -import org.apache.kafka.common.config.SSLConfigs; +import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.serialization.Deserializer; @@ -287,21 +287,21 @@ public class ConsumerConfig extends AbstractConfig { Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC) .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC) - .define(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SSLConfigs.PRINCIPAL_BUILDER_CLASS_DOC) - .define(SSLConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SSLConfigs.SSL_PROTOCOL_DOC) - .define(SSLConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SSLConfigs.SSL_PROVIDER_DOC, false) - .define(SSLConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SSLConfigs.SSL_CIPHER_SUITES_DOC, false) - .define(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SSLConfigs.DEFAULT_ENABLED_PROTOCOLS, Importance.MEDIUM, SSLConfigs.SSL_ENABLED_PROTOCOLS_DOC) - .define(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SSLConfigs.SSL_KEYSTORE_TYPE_DOC) - .define(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEYSTORE_LOCATION_DOC, false) - .define(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEYSTORE_PASSWORD_DOC, false) - .define(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEY_PASSWORD_DOC, false) - .define(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SSLConfigs.SSL_TRUSTSTORE_TYPE_DOC) - .define(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false) - .define(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false) - .define(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC) - .define(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC) - .define(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false) + .define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC) + .define(SslConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC) + .define(SslConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false) + .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false) + .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC) + .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC) + .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false) + .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false) + .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false) + .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC) + .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false) + .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false) + .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC) + .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC) + .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false) .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, Type.STRING, Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false) .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC) .define(SaslConfigs.SASL_KAFKA_SERVER_REALM, Type.STRING, Importance.LOW, SaslConfigs.SASL_KAFKA_SERVER_DOC, false) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index b3cfe7057d3..6d40b777b89 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -22,7 +22,7 @@ import java.util.Properties; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.SSLConfigs; +import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; @@ -265,21 +265,21 @@ public class ProducerConfig extends AbstractConfig { .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC) .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC) .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC) - .define(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SSLConfigs.PRINCIPAL_BUILDER_CLASS_DOC) - .define(SSLConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SSLConfigs.SSL_PROTOCOL_DOC) - .define(SSLConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SSLConfigs.SSL_PROVIDER_DOC, false) - .define(SSLConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SSLConfigs.SSL_CIPHER_SUITES_DOC, false) - .define(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SSLConfigs.DEFAULT_ENABLED_PROTOCOLS, Importance.MEDIUM, SSLConfigs.SSL_ENABLED_PROTOCOLS_DOC) - .define(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SSLConfigs.SSL_KEYSTORE_TYPE_DOC) - .define(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEYSTORE_LOCATION_DOC, false) - .define(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEYSTORE_PASSWORD_DOC, false) - .define(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_KEY_PASSWORD_DOC, false) - .define(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SSLConfigs.SSL_TRUSTSTORE_TYPE_DOC) - .define(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false) - .define(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false) - .define(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC) - .define(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC) - .define(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false) + .define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC) + .define(SslConfigs.SSL_PROTOCOL_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC) + .define(SslConfigs.SSL_PROVIDER_CONFIG, Type.STRING, Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false) + .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Type.LIST, Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false) + .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC) + .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC) + .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false) + .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false) + .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false) + .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC) + .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false) + .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, Type.STRING, Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false) + .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC) + .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC) + .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, Type.STRING, Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false) .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, Type.STRING, Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false) .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC) .define(SaslConfigs.SASL_KAFKA_SERVER_REALM, Type.STRING, Importance.LOW, SaslConfigs.SASL_KAFKA_SERVER_DOC, false) diff --git a/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java similarity index 98% rename from clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java rename to clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java index 207a349ef42..60e1eb33541 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/SSLConfigs.java +++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java @@ -16,7 +16,7 @@ package org.apache.kafka.common.config; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.TrustManagerFactory; -public class SSLConfigs { +public class SslConfigs { /* * NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE. */ @@ -43,7 +43,7 @@ public class SSLConfigs { public static final String SSL_ENABLED_PROTOCOLS_CONFIG = "ssl.enabled.protocols"; public static final String SSL_ENABLED_PROTOCOLS_DOC = "The list of protocols enabled for SSL connections. " + "TLSv1.2, TLSv1.1 and TLSv1 are enabled by default."; - public static final String DEFAULT_ENABLED_PROTOCOLS = "TLSv1.2,TLSv1.1,TLSv1"; + public static final String DEFAULT_SSL_ENABLED_PROTOCOLS = "TLSv1.2,TLSv1.1,TLSv1"; public static final String SSL_KEYSTORE_TYPE_CONFIG = "ssl.keystore.type"; public static final String SSL_KEYSTORE_TYPE_DOC = "The file format of the key store file. " diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java index 1e5d8405045..03c663d0f10 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java @@ -36,7 +36,7 @@ public class ChannelBuilders { switch (securityProtocol) { case SSL: requireNonNullMode(mode, securityProtocol); - channelBuilder = new SSLChannelBuilder(mode); + channelBuilder = new SslChannelBuilder(mode); break; case SASL_SSL: case SASL_PLAINTEXT: diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java index a0281590856..bc1536aa98b 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java @@ -15,15 +15,14 @@ package org.apache.kafka.common.network; import java.nio.channels.SelectionKey; import java.util.Map; +import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.security.auth.PrincipalBuilder; -import org.apache.kafka.common.config.SSLConfigs; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.KafkaException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class PlaintextChannelBuilder implements ChannelBuilder { private static final Logger log = LoggerFactory.getLogger(PlaintextChannelBuilder.class); private PrincipalBuilder principalBuilder; @@ -32,7 +31,7 @@ public class PlaintextChannelBuilder implements ChannelBuilder { public void configure(Map configs) throws KafkaException { try { this.configs = configs; - this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class) configs.get(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG)); + this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class) configs.get(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG)); this.principalBuilder.configure(this.configs); } catch (Exception e) { throw new KafkaException(e); diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java index 53953c55048..148e5492589 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java @@ -25,9 +25,9 @@ import org.apache.kafka.common.security.kerberos.KerberosNameParser; import org.apache.kafka.common.security.kerberos.LoginManager; import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator; import org.apache.kafka.common.security.authenticator.SaslServerAuthenticator; -import org.apache.kafka.common.security.ssl.SSLFactory; +import org.apache.kafka.common.security.ssl.SslFactory; import org.apache.kafka.common.protocol.SecurityProtocol; -import org.apache.kafka.common.config.SSLConfigs; +import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.KafkaException; @@ -43,7 +43,7 @@ public class SaslChannelBuilder implements ChannelBuilder { private LoginManager loginManager; private PrincipalBuilder principalBuilder; - private SSLFactory sslFactory; + private SslFactory sslFactory; private Map configs; private KerberosNameParser kerberosNameParser; @@ -57,7 +57,7 @@ public class SaslChannelBuilder implements ChannelBuilder { try { this.configs = configs; this.loginManager = LoginManager.acquireLoginManager(loginType, configs); - this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class) configs.get(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG)); + this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class) configs.get(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG)); this.principalBuilder.configure(configs); String defaultRealm; @@ -69,7 +69,7 @@ public class SaslChannelBuilder implements ChannelBuilder { kerberosNameParser = new KerberosNameParser(defaultRealm, (List) configs.get(SaslConfigs.AUTH_TO_LOCAL)); if (this.securityProtocol == SecurityProtocol.SASL_SSL) { - this.sslFactory = new SSLFactory(mode); + this.sslFactory = new SslFactory(mode); this.sslFactory.configure(this.configs); } } catch (Exception e) { @@ -102,8 +102,8 @@ public class SaslChannelBuilder implements ChannelBuilder { protected TransportLayer buildTransportLayer(String id, SelectionKey key, SocketChannel socketChannel) throws IOException { if (this.securityProtocol == SecurityProtocol.SASL_SSL) { - return SSLTransportLayer.create(id, key, - sslFactory.createSSLEngine(socketChannel.socket().getInetAddress().getHostName(), + return SslTransportLayer.create(id, key, + sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(), socketChannel.socket().getPort())); } else { return new PlaintextTransportLayer(key); diff --git a/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java similarity index 79% rename from clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java rename to clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java index 1dd1ecd11a2..8edd37e3075 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java @@ -18,30 +18,30 @@ import java.nio.channels.SocketChannel; import java.util.Map; import org.apache.kafka.common.security.auth.PrincipalBuilder; -import org.apache.kafka.common.security.ssl.SSLFactory; -import org.apache.kafka.common.config.SSLConfigs; +import org.apache.kafka.common.security.ssl.SslFactory; +import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.KafkaException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SSLChannelBuilder implements ChannelBuilder { - private static final Logger log = LoggerFactory.getLogger(SSLChannelBuilder.class); - private SSLFactory sslFactory; +public class SslChannelBuilder implements ChannelBuilder { + private static final Logger log = LoggerFactory.getLogger(SslChannelBuilder.class); + private SslFactory sslFactory; private PrincipalBuilder principalBuilder; private Mode mode; private Map configs; - public SSLChannelBuilder(Mode mode) { + public SslChannelBuilder(Mode mode) { this.mode = mode; } public void configure(Map configs) throws KafkaException { try { this.configs = configs; - this.sslFactory = new SSLFactory(mode); + this.sslFactory = new SslFactory(mode); this.sslFactory.configure(this.configs); - this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class) configs.get(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG)); + this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class) configs.get(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG)); this.principalBuilder.configure(this.configs); } catch (Exception e) { throw new KafkaException(e); @@ -51,7 +51,7 @@ public class SSLChannelBuilder implements ChannelBuilder { public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException { KafkaChannel channel = null; try { - SSLTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key); + SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key); Authenticator authenticator = new DefaultAuthenticator(); authenticator.configure(transportLayer, this.principalBuilder, this.configs); channel = new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize); @@ -66,10 +66,10 @@ public class SSLChannelBuilder implements ChannelBuilder { this.principalBuilder.close(); } - protected SSLTransportLayer buildTransportLayer(SSLFactory sslFactory, String id, SelectionKey key) throws IOException { + protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); - return SSLTransportLayer.create(id, key, - sslFactory.createSSLEngine(socketChannel.socket().getInetAddress().getHostName(), + return SslTransportLayer.create(id, key, + sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(), socketChannel.socket().getPort())); } } diff --git a/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java similarity index 98% rename from clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java rename to clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java index 813f0b122fb..e2d6b3bcb07 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java @@ -43,9 +43,8 @@ import org.slf4j.LoggerFactory; /* * Transport layer for SSL communication */ - -public class SSLTransportLayer implements TransportLayer { - private static final Logger log = LoggerFactory.getLogger(SSLTransportLayer.class); +public class SslTransportLayer implements TransportLayer { + private static final Logger log = LoggerFactory.getLogger(SslTransportLayer.class); private final String channelId; private final SSLEngine sslEngine; private final SelectionKey key; @@ -61,15 +60,15 @@ public class SSLTransportLayer implements TransportLayer { private ByteBuffer appReadBuffer; private ByteBuffer emptyBuf = ByteBuffer.allocate(0); - public static SSLTransportLayer create(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException { + public static SslTransportLayer create(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException { // Disable renegotiation by default until we have fixed the known issues with the existing implementation - SSLTransportLayer transportLayer = new SSLTransportLayer(channelId, key, sslEngine, false); + SslTransportLayer transportLayer = new SslTransportLayer(channelId, key, sslEngine, false); transportLayer.startHandshake(); return transportLayer; } // Prefer `create`, only use this in tests - SSLTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine, boolean enableRenegotiation) throws IOException { + SslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine, boolean enableRenegotiation) throws IOException { this.channelId = channelId; this.key = key; this.socketChannel = (SocketChannel) key.channel(); diff --git a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java index 591fb8ded0d..7459774587b 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java +++ b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java @@ -74,7 +74,8 @@ public interface TransportLayer extends ScatteringByteChannel, GatheringByteChan boolean hasPendingWrites(); /** - * Returns `SSLSession.getPeerPrincipal` if SSLTransportLayer is used and `KakfaPrincipal.ANONYMOUS` otherwise. + * Returns `SSLSession.getPeerPrincipal()` if this is a SslTransportLayer and there is an authenticated peer, + * `KafkaPrincipal.ANONYMOUS` is returned otherwise. */ Principal peerPrincipal() throws IOException; diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java similarity index 86% rename from clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java rename to clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java index c25993eb184..0984ba0cc79 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SSLFactory.java +++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java @@ -18,7 +18,7 @@ package org.apache.kafka.common.security.ssl; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.config.SSLConfigs; +import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.network.Mode; import javax.net.ssl.*; @@ -30,7 +30,7 @@ import java.security.KeyStore; import java.util.List; import java.util.Map; -public class SSLFactory implements Configurable { +public class SslFactory implements Configurable { private String protocol; private String provider; @@ -47,29 +47,29 @@ public class SSLFactory implements Configurable { private boolean wantClientAuth; private final Mode mode; - public SSLFactory(Mode mode) { + public SslFactory(Mode mode) { this.mode = mode; } @Override public void configure(Map configs) throws KafkaException { - this.protocol = (String) configs.get(SSLConfigs.SSL_PROTOCOL_CONFIG); - this.provider = (String) configs.get(SSLConfigs.SSL_PROVIDER_CONFIG); + this.protocol = (String) configs.get(SslConfigs.SSL_PROTOCOL_CONFIG); + this.provider = (String) configs.get(SslConfigs.SSL_PROVIDER_CONFIG); - List cipherSuitesList = (List) configs.get(SSLConfigs.SSL_CIPHER_SUITES_CONFIG); + List cipherSuitesList = (List) configs.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG); if (cipherSuitesList != null) this.cipherSuites = cipherSuitesList.toArray(new String[cipherSuitesList.size()]); - List enabledProtocolsList = (List) configs.get(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG); + List enabledProtocolsList = (List) configs.get(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG); if (enabledProtocolsList != null) this.enabledProtocols = enabledProtocolsList.toArray(new String[enabledProtocolsList.size()]); - String endpointIdentification = (String) configs.get(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG); + String endpointIdentification = (String) configs.get(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG); if (endpointIdentification != null) this.endpointIdentification = endpointIdentification; - String clientAuthConfig = (String) configs.get(SSLConfigs.SSL_CLIENT_AUTH_CONFIG); + String clientAuthConfig = (String) configs.get(SslConfigs.SSL_CLIENT_AUTH_CONFIG); if (clientAuthConfig != null) { if (clientAuthConfig.equals("required")) this.needClientAuth = true; @@ -77,17 +77,17 @@ public class SSLFactory implements Configurable { this.wantClientAuth = true; } - this.kmfAlgorithm = (String) configs.get(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG); - this.tmfAlgorithm = (String) configs.get(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG); + this.kmfAlgorithm = (String) configs.get(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG); + this.tmfAlgorithm = (String) configs.get(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG); - createKeystore((String) configs.get(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG), - (String) configs.get(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG), - (String) configs.get(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG), - (String) configs.get(SSLConfigs.SSL_KEY_PASSWORD_CONFIG)); + createKeystore((String) configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG), + (String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG), + (String) configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG), + (String) configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)); - createTruststore((String) configs.get(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG), - (String) configs.get(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG), - (String) configs.get(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)); + createTruststore((String) configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG), + (String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG), + (String) configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)); try { this.sslContext = createSSLContext(); } catch (Exception e) { @@ -122,7 +122,7 @@ public class SSLFactory implements Configurable { return sslContext; } - public SSLEngine createSSLEngine(String peerHost, int peerPort) { + public SSLEngine createSslEngine(String peerHost, int peerPort) { SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort); if (cipherSuites != null) sslEngine.setEnabledCipherSuites(cipherSuites); if (enabledProtocols != null) sslEngine.setEnabledProtocols(enabledProtocols); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index b044cf462f3..b96a5f76800 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -18,7 +18,7 @@ package org.apache.kafka.clients.producer; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.config.SSLConfigs; +import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.test.MockMetricsReporter; import org.apache.kafka.test.MockSerializer; @@ -59,7 +59,7 @@ public class KafkaProducerTest { configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); configs.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL); - configs.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); + configs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); final int oldInitCount = MockSerializer.INIT_COUNT.get(); final int oldCloseCount = MockSerializer.CLOSE_COUNT.get(); diff --git a/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java index 4a6d30469b4..9354bfe50bf 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java +++ b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java @@ -13,7 +13,7 @@ package org.apache.kafka.common.network; import org.apache.kafka.common.protocol.SecurityProtocol; -import org.apache.kafka.common.security.ssl.SSLFactory; +import org.apache.kafka.common.security.ssl.SslFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocket; @@ -38,14 +38,14 @@ class EchoServer extends Thread { private final List threads; private final List sockets; private SecurityProtocol protocol = SecurityProtocol.PLAINTEXT; - private SSLFactory sslFactory; + private SslFactory sslFactory; private final AtomicBoolean renegotiate = new AtomicBoolean(); public EchoServer(Map configs) throws Exception { this.protocol = configs.containsKey("security.protocol") ? SecurityProtocol.valueOf((String) configs.get("security.protocol")) : SecurityProtocol.PLAINTEXT; if (protocol == SecurityProtocol.SSL) { - this.sslFactory = new SSLFactory(Mode.SERVER); + this.sslFactory = new SslFactory(Mode.SERVER); this.sslFactory.configure(configs); SSLContext sslContext = this.sslFactory.sslContext(); this.serverSocket = sslContext.getServerSocketFactory().createServerSocket(0); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index 683eeeeabab..8ce02988c9e 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -24,7 +24,7 @@ import java.nio.ByteBuffer; import java.util.*; import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.config.SSLConfigs; +import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -49,7 +49,7 @@ public class SelectorTest { @Before public void setup() throws Exception { Map configs = new HashMap(); - configs.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); + configs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); this.server = new EchoServer(configs); this.server.start(); this.time = new MockTime(); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java similarity index 86% rename from clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java rename to clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java index eee7531dfa4..94c5654e2e8 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java @@ -26,10 +26,10 @@ import java.io.IOException; import java.net.InetSocketAddress; import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.config.SSLConfigs; -import org.apache.kafka.common.security.ssl.SSLFactory; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.security.ssl.SslFactory; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.test.TestSSLUtils; +import org.apache.kafka.test.TestSslUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -37,7 +37,7 @@ import org.junit.Test; /** * A set of tests for the selector. These use a test harness that runs a simple socket server that echos back responses. */ -public class SSLSelectorTest extends SelectorTest { +public class SslSelectorTest extends SelectorTest { private Metrics metrics; private Map sslClientConfigs; @@ -46,15 +46,15 @@ public class SSLSelectorTest extends SelectorTest { public void setup() throws Exception { File trustStoreFile = File.createTempFile("truststore", ".jks"); - Map sslServerConfigs = TestSSLUtils.createSSLConfig(false, true, Mode.SERVER, trustStoreFile, "server"); - sslServerConfigs.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); + Map sslServerConfigs = TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, "server"); + sslServerConfigs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); this.server = new EchoServer(sslServerConfigs); this.server.start(); this.time = new MockTime(); - sslClientConfigs = TestSSLUtils.createSSLConfig(false, false, Mode.SERVER, trustStoreFile, "client"); - sslClientConfigs.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); + sslClientConfigs = TestSslUtils.createSslConfig(false, false, Mode.SERVER, trustStoreFile, "client"); + sslClientConfigs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); - this.channelBuilder = new SSLChannelBuilder(Mode.CLIENT); + this.channelBuilder = new SslChannelBuilder(Mode.CLIENT); this.channelBuilder.configure(sslClientConfigs); this.metrics = new Metrics(); this.selector = new Selector(5000, metrics, time, "MetricGroup", new LinkedHashMap(), channelBuilder); @@ -73,12 +73,12 @@ public class SSLSelectorTest extends SelectorTest { */ @Test public void testRenegotiation() throws Exception { - ChannelBuilder channelBuilder = new SSLChannelBuilder(Mode.CLIENT) { + ChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT) { @Override - protected SSLTransportLayer buildTransportLayer(SSLFactory sslFactory, String id, SelectionKey key) throws IOException { + protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); - SSLTransportLayer transportLayer = new SSLTransportLayer(id, key, - sslFactory.createSSLEngine(socketChannel.socket().getInetAddress().getHostName(), socketChannel.socket().getPort()), + SslTransportLayer transportLayer = new SslTransportLayer(id, key, + sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(), socketChannel.socket().getPort()), true); transportLayer.startHandshake(); return transportLayer; diff --git a/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java similarity index 86% rename from clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java rename to clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index ebb59b5bf0d..91bd47c5509 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SSLTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -38,12 +38,12 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.config.SSLConfigs; -import org.apache.kafka.common.security.ssl.SSLFactory; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.security.ssl.SslFactory; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.test.TestSSLUtils; +import org.apache.kafka.test.TestSslUtils; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; @@ -52,12 +52,11 @@ import org.junit.Test; /** * Tests for the SSL transport layer. These use a test harness that runs a simple socket server that echos back responses. */ - -public class SSLTransportLayerTest { +public class SslTransportLayerTest { private static final int BUFFER_SIZE = 4 * 1024; - private SSLEchoServer server; + private SslEchoServer server; private Selector selector; private ChannelBuilder channelBuilder; private CertStores serverCertStores; @@ -73,7 +72,7 @@ public class SSLTransportLayerTest { sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores); sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores); - this.channelBuilder = new SSLChannelBuilder(Mode.CLIENT); + this.channelBuilder = new SslChannelBuilder(Mode.CLIENT); this.channelBuilder.configure(sslClientConfigs); this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap(), channelBuilder); } @@ -94,7 +93,7 @@ public class SSLTransportLayerTest { public void testValidEndpointIdentification() throws Exception { String node = "0"; createEchoServer(sslServerConfigs); - sslClientConfigs.put(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS"); + sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS"); createSelector(sslClientConfigs); InetSocketAddress addr = new InetSocketAddress("localhost", server.port); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); @@ -111,9 +110,9 @@ public class SSLTransportLayerTest { public void testInvalidEndpointIdentification() throws Exception { String node = "0"; String serverHost = InetAddress.getLocalHost().getHostAddress(); - server = new SSLEchoServer(sslServerConfigs, serverHost); + server = new SslEchoServer(sslServerConfigs, serverHost); server.start(); - sslClientConfigs.put(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS"); + sslClientConfigs.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS"); createSelector(sslClientConfigs); InetSocketAddress addr = new InetSocketAddress(serverHost, server.port); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); @@ -129,9 +128,9 @@ public class SSLTransportLayerTest { public void testEndpointIdentificationDisabled() throws Exception { String node = "0"; String serverHost = InetAddress.getLocalHost().getHostAddress(); - server = new SSLEchoServer(sslServerConfigs, serverHost); + server = new SslEchoServer(sslServerConfigs, serverHost); server.start(); - sslClientConfigs.remove(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG); + sslClientConfigs.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG); createSelector(sslClientConfigs); InetSocketAddress addr = new InetSocketAddress(serverHost, server.port); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); @@ -146,7 +145,7 @@ public class SSLTransportLayerTest { @Test public void testClientAuthenticationRequiredValidProvided() throws Exception { String node = "0"; - sslServerConfigs.put(SSLConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); + sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); createEchoServer(sslServerConfigs); createSelector(sslClientConfigs); InetSocketAddress addr = new InetSocketAddress("localhost", server.port); @@ -163,7 +162,7 @@ public class SSLTransportLayerTest { public void testClientAuthenticationRequiredUntrustedProvided() throws Exception { String node = "0"; sslServerConfigs = serverCertStores.getUntrustingConfig(); - sslServerConfigs.put(SSLConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); + sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); createEchoServer(sslServerConfigs); createSelector(sslClientConfigs); InetSocketAddress addr = new InetSocketAddress("localhost", server.port); @@ -179,12 +178,12 @@ public class SSLTransportLayerTest { @Test public void testClientAuthenticationRequiredNotProvided() throws Exception { String node = "0"; - sslServerConfigs.put(SSLConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); + sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "required"); createEchoServer(sslServerConfigs); - sslClientConfigs.remove(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG); - sslClientConfigs.remove(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG); - sslClientConfigs.remove(SSLConfigs.SSL_KEY_PASSWORD_CONFIG); + sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG); + sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG); + sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG); createSelector(sslClientConfigs); InetSocketAddress addr = new InetSocketAddress("localhost", server.port); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); @@ -200,7 +199,7 @@ public class SSLTransportLayerTest { public void testClientAuthenticationDisabledUntrustedProvided() throws Exception { String node = "0"; sslServerConfigs = serverCertStores.getUntrustingConfig(); - sslServerConfigs.put(SSLConfigs.SSL_CLIENT_AUTH_CONFIG, "none"); + sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "none"); createEchoServer(sslServerConfigs); createSelector(sslClientConfigs); InetSocketAddress addr = new InetSocketAddress("localhost", server.port); @@ -216,12 +215,12 @@ public class SSLTransportLayerTest { @Test public void testClientAuthenticationDisabledNotProvided() throws Exception { String node = "0"; - sslServerConfigs.put(SSLConfigs.SSL_CLIENT_AUTH_CONFIG, "none"); + sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "none"); createEchoServer(sslServerConfigs); - sslClientConfigs.remove(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG); - sslClientConfigs.remove(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG); - sslClientConfigs.remove(SSLConfigs.SSL_KEY_PASSWORD_CONFIG); + sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG); + sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG); + sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG); createSelector(sslClientConfigs); InetSocketAddress addr = new InetSocketAddress("localhost", server.port); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); @@ -236,7 +235,7 @@ public class SSLTransportLayerTest { @Test public void testClientAuthenticationRequestedValidProvided() throws Exception { String node = "0"; - sslServerConfigs.put(SSLConfigs.SSL_CLIENT_AUTH_CONFIG, "requested"); + sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "requested"); createEchoServer(sslServerConfigs); createSelector(sslClientConfigs); InetSocketAddress addr = new InetSocketAddress("localhost", server.port); @@ -252,12 +251,12 @@ public class SSLTransportLayerTest { @Test public void testClientAuthenticationRequestedNotProvided() throws Exception { String node = "0"; - sslServerConfigs.put(SSLConfigs.SSL_CLIENT_AUTH_CONFIG, "requested"); + sslServerConfigs.put(SslConfigs.SSL_CLIENT_AUTH_CONFIG, "requested"); createEchoServer(sslServerConfigs); - sslClientConfigs.remove(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG); - sslClientConfigs.remove(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG); - sslClientConfigs.remove(SSLConfigs.SSL_KEY_PASSWORD_CONFIG); + sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG); + sslClientConfigs.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG); + sslClientConfigs.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG); createSelector(sslClientConfigs); InetSocketAddress addr = new InetSocketAddress("localhost", server.port); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); @@ -270,9 +269,9 @@ public class SSLTransportLayerTest { */ @Test public void testInvalidTruststorePassword() throws Exception { - SSLChannelBuilder channelBuilder = new SSLChannelBuilder(Mode.CLIENT); + SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT); try { - sslClientConfigs.put(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "invalid"); + sslClientConfigs.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "invalid"); channelBuilder.configure(sslClientConfigs); fail("SSL channel configured with invalid truststore password"); } catch (KafkaException e) { @@ -285,9 +284,9 @@ public class SSLTransportLayerTest { */ @Test public void testInvalidKeystorePassword() throws Exception { - SSLChannelBuilder channelBuilder = new SSLChannelBuilder(Mode.CLIENT); + SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT); try { - sslClientConfigs.put(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "invalid"); + sslClientConfigs.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "invalid"); channelBuilder.configure(sslClientConfigs); fail("SSL channel configured with invalid keystore password"); } catch (KafkaException e) { @@ -302,7 +301,7 @@ public class SSLTransportLayerTest { @Test public void testInvalidKeyPassword() throws Exception { String node = "0"; - sslServerConfigs.put(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, "invalid"); + sslServerConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "invalid"); createEchoServer(sslServerConfigs); createSelector(sslClientConfigs); InetSocketAddress addr = new InetSocketAddress("localhost", server.port); @@ -317,10 +316,10 @@ public class SSLTransportLayerTest { @Test public void testUnsupportedTLSVersion() throws Exception { String node = "0"; - sslServerConfigs.put(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.2")); + sslServerConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.2")); createEchoServer(sslServerConfigs); - sslClientConfigs.put(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.1")); + sslClientConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, Arrays.asList("TLSv1.1")); createSelector(sslClientConfigs); InetSocketAddress addr = new InetSocketAddress("localhost", server.port); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); @@ -335,10 +334,10 @@ public class SSLTransportLayerTest { public void testUnsupportedCiphers() throws Exception { String node = "0"; String[] cipherSuites = SSLContext.getDefault().getDefaultSSLParameters().getCipherSuites(); - sslServerConfigs.put(SSLConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[0])); + sslServerConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[0])); createEchoServer(sslServerConfigs); - sslClientConfigs.put(SSLConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[1])); + sslClientConfigs.put(SslConfigs.SSL_CIPHER_SUITES_CONFIG, Arrays.asList(cipherSuites[1])); createSelector(sslClientConfigs); InetSocketAddress addr = new InetSocketAddress("localhost", server.port); selector.connect(node, addr, BUFFER_SIZE, BUFFER_SIZE); @@ -427,7 +426,7 @@ public class SSLTransportLayerTest { } private void createEchoServer(Map sslServerConfigs) throws Exception { - server = new SSLEchoServer(sslServerConfigs, "localhost"); + server = new SslEchoServer(sslServerConfigs, "localhost"); server.start(); } @@ -437,14 +436,14 @@ public class SSLTransportLayerTest { private void createSelector(Map sslClientConfigs, final Integer netReadBufSize, final Integer netWriteBufSize, final Integer appBufSize) { - this.channelBuilder = new SSLChannelBuilder(Mode.CLIENT) { + this.channelBuilder = new SslChannelBuilder(Mode.CLIENT) { @Override - protected SSLTransportLayer buildTransportLayer(SSLFactory sslFactory, String id, SelectionKey key) throws IOException { + protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); - SSLEngine sslEngine = sslFactory.createSSLEngine(socketChannel.socket().getInetAddress().getHostName(), + SSLEngine sslEngine = sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(), socketChannel.socket().getPort()); - TestSSLTransportLayer transportLayer = new TestSSLTransportLayer(id, key, sslEngine, netReadBufSize, netWriteBufSize, appBufSize); + TestSslTransportLayer transportLayer = new TestSslTransportLayer(id, key, sslEngine, netReadBufSize, netWriteBufSize, appBufSize); transportLayer.startHandshake(); return transportLayer; } @@ -463,15 +462,15 @@ public class SSLTransportLayerTest { String name = server ? "server" : "client"; Mode mode = server ? Mode.SERVER : Mode.CLIENT; File truststoreFile = File.createTempFile(name + "TS", ".jks"); - sslConfig = TestSSLUtils.createSSLConfig(!server, true, mode, truststoreFile, name); - sslConfig.put(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); + sslConfig = TestSslUtils.createSslConfig(!server, true, mode, truststoreFile, name); + sslConfig.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); } private Map getTrustingConfig(CertStores truststoreConfig) { Map config = new HashMap(sslConfig); - config.put(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.sslConfig.get(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); - config.put(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.sslConfig.get(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)); - config.put(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, truststoreConfig.sslConfig.get(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)); + config.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); + config.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)); + config.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)); return config; } @@ -485,14 +484,14 @@ public class SSLTransportLayerTest { * code path. The overridden buffer size starts with a small value and increases in size when the buffer * size is retrieved to handle overflow/underflow, until the actual session buffer size is reached. */ - private static class TestSSLTransportLayer extends SSLTransportLayer { + private static class TestSslTransportLayer extends SslTransportLayer { private final ResizeableBufferSize netReadBufSize; private final ResizeableBufferSize netWriteBufSize; private final ResizeableBufferSize appBufSize; - public TestSSLTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine, - Integer netReadBufSize, Integer netWriteBufSize, Integer appBufSize) throws IOException { + public TestSslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine, + Integer netReadBufSize, Integer netWriteBufSize, Integer appBufSize) throws IOException { super(channelId, key, sslEngine, false); this.netReadBufSize = new ResizeableBufferSize(netReadBufSize); this.netWriteBufSize = new ResizeableBufferSize(netWriteBufSize); @@ -538,18 +537,18 @@ public class SSLTransportLayerTest { } // Non-blocking EchoServer implementation that uses SSLTransportLayer - private class SSLEchoServer extends Thread { + private class SslEchoServer extends Thread { private final int port; private final ServerSocketChannel serverSocketChannel; private final List newChannels; private final List socketChannels; private final AcceptorThread acceptorThread; - private SSLFactory sslFactory; + private SslFactory sslFactory; private final Selector selector; private final ConcurrentLinkedQueue inflightSends = new ConcurrentLinkedQueue(); - public SSLEchoServer(Map configs, String serverHost) throws Exception { - this.sslFactory = new SSLFactory(Mode.SERVER); + public SslEchoServer(Map configs, String serverHost) throws Exception { + this.sslFactory = new SslFactory(Mode.SERVER); this.sslFactory.configure(configs); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); @@ -557,7 +556,7 @@ public class SSLTransportLayerTest { this.port = serverSocketChannel.socket().getLocalPort(); this.socketChannels = Collections.synchronizedList(new ArrayList()); this.newChannels = Collections.synchronizedList(new ArrayList()); - SSLChannelBuilder channelBuilder = new SSLChannelBuilder(Mode.SERVER); + SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.SERVER); channelBuilder.configure(sslServerConfigs); this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", new LinkedHashMap(), channelBuilder); setName("echoserver"); diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java similarity index 71% rename from clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java rename to clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java index e90ec2bb6ab..b5710aa1b61 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/ssl/SSLFactoryTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java @@ -17,7 +17,7 @@ import javax.net.ssl.*; import java.io.File; import java.util.Map; -import org.apache.kafka.test.TestSSLUtils; +import org.apache.kafka.test.TestSslUtils; import org.apache.kafka.common.network.Mode; import org.junit.Test; @@ -31,16 +31,16 @@ import static org.junit.Assert.assertTrue; * A set of tests for the selector over ssl. These use a test harness that runs a simple socket server that echos back responses. */ -public class SSLFactoryTest { +public class SslFactoryTest { @Test - public void testSSLFactoryConfiguration() throws Exception { + public void testSslFactoryConfiguration() throws Exception { File trustStoreFile = File.createTempFile("truststore", ".jks"); - Map serverSSLConfig = TestSSLUtils.createSSLConfig(false, true, Mode.SERVER, trustStoreFile, "server"); - SSLFactory sslFactory = new SSLFactory(Mode.SERVER); - sslFactory.configure(serverSSLConfig); + Map serverSslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, "server"); + SslFactory sslFactory = new SslFactory(Mode.SERVER); + sslFactory.configure(serverSslConfig); //host and port are hints - SSLEngine engine = sslFactory.createSSLEngine("localhost", 0); + SSLEngine engine = sslFactory.createSslEngine("localhost", 0); assertNotNull(engine); String[] expectedProtocols = {"TLSv1.2"}; assertArrayEquals(expectedProtocols, engine.getEnabledProtocols()); @@ -50,11 +50,11 @@ public class SSLFactoryTest { @Test public void testClientMode() throws Exception { File trustStoreFile = File.createTempFile("truststore", ".jks"); - Map clientSSLConfig = TestSSLUtils.createSSLConfig(false, true, Mode.CLIENT, trustStoreFile, "client"); - SSLFactory sslFactory = new SSLFactory(Mode.CLIENT); - sslFactory.configure(clientSSLConfig); + Map clientSslConfig = TestSslUtils.createSslConfig(false, true, Mode.CLIENT, trustStoreFile, "client"); + SslFactory sslFactory = new SslFactory(Mode.CLIENT); + sslFactory.configure(clientSslConfig); //host and port are hints - SSLEngine engine = sslFactory.createSSLEngine("localhost", 0); + SSLEngine engine = sslFactory.createSslEngine("localhost", 0); assertTrue(engine.getUseClientMode()); } diff --git a/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java similarity index 90% rename from clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java rename to clients/src/test/java/org/apache/kafka/test/TestSslUtils.java index b23169297e3..30bdb6d35c5 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestSslUtils.java @@ -17,7 +17,7 @@ package org.apache.kafka.test; -import org.apache.kafka.common.config.SSLConfigs; +import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.network.Mode; import org.apache.kafka.clients.CommonClientConfigs; @@ -53,8 +53,7 @@ import java.util.Map; import java.util.List; import java.util.ArrayList; - -public class TestSSLUtils { +public class TestSslUtils { /** * Create a self-signed X.509 Certificate. @@ -177,33 +176,33 @@ public class TestSSLUtils { return certs; } - public static Map createSSLConfig(Mode mode, File keyStoreFile, String password, String keyPassword, + public static Map createSslConfig(Mode mode, File keyStoreFile, String password, String keyPassword, File trustStoreFile, String trustStorePassword) { - Map sslConfigs = new HashMap(); + Map sslConfigs = new HashMap<>(); sslConfigs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); // kafka security protocol - sslConfigs.put(SSLConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); // protocol to create SSLContext + sslConfigs.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLSv1.2"); // protocol to create SSLContext if (mode == Mode.SERVER || (mode == Mode.CLIENT && keyStoreFile != null)) { - sslConfigs.put(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreFile.getPath()); - sslConfigs.put(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS"); - sslConfigs.put(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm()); - sslConfigs.put(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password); - sslConfigs.put(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword); + sslConfigs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreFile.getPath()); + sslConfigs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS"); + sslConfigs.put(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm()); + sslConfigs.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, password); + sslConfigs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPassword); } - sslConfigs.put(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreFile.getPath()); - sslConfigs.put(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword); - sslConfigs.put(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS"); - sslConfigs.put(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm()); + sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreFile.getPath()); + sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword); + sslConfigs.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS"); + sslConfigs.put(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, TrustManagerFactory.getDefaultAlgorithm()); - List enabledProtocols = new ArrayList(); + List enabledProtocols = new ArrayList<>(); enabledProtocols.add("TLSv1.2"); - sslConfigs.put(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, enabledProtocols); + sslConfigs.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, enabledProtocols); return sslConfigs; } - public static Map createSSLConfig(boolean useClientCert, boolean trustStore, Mode mode, File trustStoreFile, String certAlias) + public static Map createSslConfig(boolean useClientCert, boolean trustStore, Mode mode, File trustStoreFile, String certAlias) throws IOException, GeneralSecurityException { Map certs = new HashMap(); File keyStoreFile; @@ -235,7 +234,7 @@ public class TestSSLUtils { createTrustStore(trustStoreFile.getPath(), trustStorePassword, certs); } - Map sslConfig = createSSLConfig(mode, keyStoreFile, password, + Map sslConfig = createSslConfig(mode, keyStoreFile, password, password, trustStoreFile, trustStorePassword); return sslConfig; } diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java index bd2ba56bfe3..1e09e8b1b3e 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderConfig.java @@ -20,7 +20,7 @@ package org.apache.kafka.copycat.runtime.distributed; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.SSLConfigs; +import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.SaslConfigs; import java.util.Map; @@ -139,21 +139,21 @@ public class DistributedHerderConfig extends AbstractConfig { ConfigDef.Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC) .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, ConfigDef.Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, ConfigDef.Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC) - .define(SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, ConfigDef.Type.CLASS, SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, ConfigDef.Importance.LOW, SSLConfigs.PRINCIPAL_BUILDER_CLASS_DOC) - .define(SSLConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, SSLConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, SSLConfigs.SSL_PROTOCOL_DOC) - .define(SSLConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SSLConfigs.SSL_PROVIDER_DOC, false) - .define(SSLConfigs.SSL_CIPHER_SUITES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.LOW, SSLConfigs.SSL_CIPHER_SUITES_DOC, false) - .define(SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, ConfigDef.Type.LIST, SSLConfigs.DEFAULT_ENABLED_PROTOCOLS, ConfigDef.Importance.MEDIUM, SSLConfigs.SSL_ENABLED_PROTOCOLS_DOC) - .define(SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SSLConfigs.DEFAULT_SSL_KEYSTORE_TYPE, ConfigDef.Importance.MEDIUM, SSLConfigs.SSL_KEYSTORE_TYPE_DOC) - .define(SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SSLConfigs.SSL_KEYSTORE_LOCATION_DOC, false) - .define(SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SSLConfigs.SSL_KEYSTORE_PASSWORD_DOC, false) - .define(SSLConfigs.SSL_KEY_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SSLConfigs.SSL_KEY_PASSWORD_DOC, false) - .define(SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, ConfigDef.Importance.MEDIUM, SSLConfigs.SSL_TRUSTSTORE_TYPE_DOC) - .define(SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false) - .define(SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SSLConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false) - .define(SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, ConfigDef.Importance.LOW, SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC) - .define(SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, ConfigDef.Importance.LOW, SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC) - .define(SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.LOW, SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false) + .define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, ConfigDef.Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, ConfigDef.Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC) + .define(SslConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC) + .define(SslConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false) + .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false) + .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, ConfigDef.Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC) + .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC) + .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false) + .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false) + .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false) + .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC) + .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false) + .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false) + .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC) + .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC) + .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false) .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false) .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC) .define(SaslConfigs.SASL_KAFKA_SERVER_REALM, ConfigDef.Type.STRING, ConfigDef.Importance.LOW, SaslConfigs.SASL_KAFKA_SERVER_DOC, false) diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index aa15612bd5d..43ae38eec10 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -24,7 +24,7 @@ import kafka.common.{TopicAndPartition, ErrorMapping} import kafka.message.{MessageSet, ByteBufferMessageSet} import kafka.api.ApiUtils._ import org.apache.kafka.common.KafkaException -import org.apache.kafka.common.network.{SSLTransportLayer, TransportLayer, Send, MultiSend} +import org.apache.kafka.common.network.{Send, MultiSend} import scala.collection._ diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala index 79e16c167f6..42b76cd25dc 100755 --- a/core/src/main/scala/kafka/cluster/Broker.scala +++ b/core/src/main/scala/kafka/cluster/Broker.scala @@ -17,9 +17,6 @@ package kafka.cluster -import kafka.utils.CoreUtils._ -import kafka.utils.Json -import kafka.api.ApiUtils._ import java.nio.ByteBuffer import kafka.common.{BrokerEndPointNotAvailableException, BrokerNotAvailableException, KafkaException} @@ -29,8 +26,7 @@ import org.apache.kafka.common.protocol.SecurityProtocol /** * A Kafka broker. * A broker has an id and a collection of end-points. - * Each end-point is (host, port,protocolType). - * Currently the only protocol type is PlainText but we will add SSL and Kerberos in the future. + * Each end-point is (host, port, protocolType). */ object Broker { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index d52b5c0bdc1..7aba1c92967 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -28,7 +28,7 @@ import kafka.utils.CoreUtils import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.config.SaslConfigs -import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SSLConfigs} +import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SslConfigs} import org.apache.kafka.common.metrics.MetricsReporter import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.security.auth.PrincipalBuilder @@ -156,17 +156,17 @@ object Defaults { val MetricReporterClasses = "" /** ********* SSL configuration ***********/ - val PrincipalBuilderClass = SSLConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS - val SSLProtocol = SSLConfigs.DEFAULT_SSL_PROTOCOL - val SSLEnabledProtocols = SSLConfigs.DEFAULT_ENABLED_PROTOCOLS - val SSLKeystoreType = SSLConfigs.DEFAULT_SSL_KEYSTORE_TYPE - val SSLTruststoreType = SSLConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE - val SSLKeyManagerAlgorithm = SSLConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM - val SSLTrustManagerAlgorithm = SSLConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM - val SSLClientAuthRequired = "required" - val SSLClientAuthRequested = "requested" - val SSLClientAuthNone = "none" - val SSLClientAuth = SSLClientAuthNone + val PrincipalBuilderClass = SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS + val SslProtocol = SslConfigs.DEFAULT_SSL_PROTOCOL + val SslEnabledProtocols = SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS + val SslKeystoreType = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE + val SslTruststoreType = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE + val SslKeyManagerAlgorithm = SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM + val SslTrustManagerAlgorithm = SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM + val SslClientAuthRequired = "required" + val SslClientAuthRequested = "requested" + val SslClientAuthNone = "none" + val SslClientAuth = SslClientAuthNone /** ********* Sasl configuration ***********/ val SaslKerberosKinitCmd = SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD @@ -305,22 +305,22 @@ object KafkaConfig { val MetricReporterClassesProp: String = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG /** ********* SSL Configuration ****************/ - val PrincipalBuilderClassProp = SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG - val SSLProtocolProp = SSLConfigs.SSL_PROTOCOL_CONFIG - val SSLProviderProp = SSLConfigs.SSL_PROVIDER_CONFIG - val SSLCipherSuitesProp = SSLConfigs.SSL_CIPHER_SUITES_CONFIG - val SSLEnabledProtocolsProp = SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG - val SSLKeystoreTypeProp = SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG - val SSLKeystoreLocationProp = SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG - val SSLKeystorePasswordProp = SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG - val SSLKeyPasswordProp = SSLConfigs.SSL_KEY_PASSWORD_CONFIG - val SSLTruststoreTypeProp = SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG - val SSLTruststoreLocationProp = SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG - val SSLTruststorePasswordProp = SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG - val SSLKeyManagerAlgorithmProp = SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG - val SSLTrustManagerAlgorithmProp = SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG - val SSLEndpointIdentificationAlgorithmProp = SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG - val SSLClientAuthProp = SSLConfigs.SSL_CLIENT_AUTH_CONFIG + val PrincipalBuilderClassProp = SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG + val SslProtocolProp = SslConfigs.SSL_PROTOCOL_CONFIG + val SslProviderProp = SslConfigs.SSL_PROVIDER_CONFIG + val SslCipherSuitesProp = SslConfigs.SSL_CIPHER_SUITES_CONFIG + val SslEnabledProtocolsProp = SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG + val SslKeystoreTypeProp = SslConfigs.SSL_KEYSTORE_TYPE_CONFIG + val SslKeystoreLocationProp = SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG + val SslKeystorePasswordProp = SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG + val SslKeyPasswordProp = SslConfigs.SSL_KEY_PASSWORD_CONFIG + val SslTruststoreTypeProp = SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG + val SslTruststoreLocationProp = SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG + val SslTruststorePasswordProp = SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG + val SslKeyManagerAlgorithmProp = SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG + val SslTrustManagerAlgorithmProp = SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG + val SslEndpointIdentificationAlgorithmProp = SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG + val SslClientAuthProp = SslConfigs.SSL_CLIENT_AUTH_CONFIG /** ********* SASL Configuration ****************/ val SaslKerberosServiceNameProp = SaslConfigs.SASL_KERBEROS_SERVICE_NAME @@ -480,22 +480,22 @@ object KafkaConfig { val MetricReporterClassesDoc = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC /** ********* SSL Configuration ****************/ - val PrincipalBuilderClassDoc = SSLConfigs.PRINCIPAL_BUILDER_CLASS_DOC - val SSLProtocolDoc = SSLConfigs.SSL_PROTOCOL_DOC - val SSLProviderDoc = SSLConfigs.SSL_PROVIDER_DOC - val SSLCipherSuitesDoc = SSLConfigs.SSL_CIPHER_SUITES_DOC - val SSLEnabledProtocolsDoc = SSLConfigs.SSL_ENABLED_PROTOCOLS_DOC - val SSLKeystoreTypeDoc = SSLConfigs.SSL_KEYSTORE_TYPE_DOC - val SSLKeystoreLocationDoc = SSLConfigs.SSL_KEYSTORE_LOCATION_DOC - val SSLKeystorePasswordDoc = SSLConfigs.SSL_KEYSTORE_PASSWORD_DOC - val SSLKeyPasswordDoc = SSLConfigs.SSL_KEY_PASSWORD_DOC - val SSLTruststoreTypeDoc = SSLConfigs.SSL_TRUSTSTORE_TYPE_DOC - val SSLTruststorePasswordDoc = SSLConfigs.SSL_TRUSTSTORE_PASSWORD_DOC - val SSLTruststoreLocationDoc = SSLConfigs.SSL_TRUSTSTORE_LOCATION_DOC - val SSLKeyManagerAlgorithmDoc = SSLConfigs.SSL_KEYMANAGER_ALGORITHM_DOC - val SSLTrustManagerAlgorithmDoc = SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC - val SSLEndpointIdentificationAlgorithmDoc = SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC - val SSLClientAuthDoc = SSLConfigs.SSL_CLIENT_AUTH_DOC + val PrincipalBuilderClassDoc = SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC + val SslProtocolDoc = SslConfigs.SSL_PROTOCOL_DOC + val SslProviderDoc = SslConfigs.SSL_PROVIDER_DOC + val SslCipherSuitesDoc = SslConfigs.SSL_CIPHER_SUITES_DOC + val SslEnabledProtocolsDoc = SslConfigs.SSL_ENABLED_PROTOCOLS_DOC + val SslKeystoreTypeDoc = SslConfigs.SSL_KEYSTORE_TYPE_DOC + val SslKeystoreLocationDoc = SslConfigs.SSL_KEYSTORE_LOCATION_DOC + val SslKeystorePasswordDoc = SslConfigs.SSL_KEYSTORE_PASSWORD_DOC + val SslKeyPasswordDoc = SslConfigs.SSL_KEY_PASSWORD_DOC + val SslTruststoreTypeDoc = SslConfigs.SSL_TRUSTSTORE_TYPE_DOC + val SslTruststorePasswordDoc = SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC + val SslTruststoreLocationDoc = SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC + val SslKeyManagerAlgorithmDoc = SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC + val SslTrustManagerAlgorithmDoc = SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC + val SslEndpointIdentificationAlgorithmDoc = SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC + val SslClientAuthDoc = SslConfigs.SSL_CLIENT_AUTH_DOC /** ********* Sasl Configuration ****************/ val SaslKerberosServiceNameDoc = SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC @@ -645,21 +645,21 @@ object KafkaConfig { /** ********* SSL Configuration ****************/ .define(PrincipalBuilderClassProp, CLASS, Defaults.PrincipalBuilderClass, MEDIUM, PrincipalBuilderClassDoc) - .define(SSLProtocolProp, STRING, Defaults.SSLProtocol, MEDIUM, SSLProtocolDoc) - .define(SSLProviderProp, STRING, MEDIUM, SSLProviderDoc, false) - .define(SSLEnabledProtocolsProp, LIST, Defaults.SSLEnabledProtocols, MEDIUM, SSLEnabledProtocolsDoc) - .define(SSLKeystoreTypeProp, STRING, Defaults.SSLKeystoreType, MEDIUM, SSLKeystoreTypeDoc) - .define(SSLKeystoreLocationProp, STRING, MEDIUM, SSLKeystoreLocationDoc, false) - .define(SSLKeystorePasswordProp, STRING, MEDIUM, SSLKeystorePasswordDoc, false) - .define(SSLKeyPasswordProp, STRING, MEDIUM, SSLKeyPasswordDoc, false) - .define(SSLTruststoreTypeProp, STRING, Defaults.SSLTruststoreType, MEDIUM, SSLTruststoreTypeDoc) - .define(SSLTruststoreLocationProp, STRING, MEDIUM, SSLTruststoreLocationDoc, false) - .define(SSLTruststorePasswordProp, STRING, MEDIUM, SSLTruststorePasswordDoc, false) - .define(SSLKeyManagerAlgorithmProp, STRING, Defaults.SSLKeyManagerAlgorithm, MEDIUM, SSLKeyManagerAlgorithmDoc) - .define(SSLTrustManagerAlgorithmProp, STRING, Defaults.SSLTrustManagerAlgorithm, MEDIUM, SSLTrustManagerAlgorithmDoc) - .define(SSLEndpointIdentificationAlgorithmProp, STRING, LOW, SSLEndpointIdentificationAlgorithmDoc, false) - .define(SSLClientAuthProp, STRING, Defaults.SSLClientAuth, in(Defaults.SSLClientAuthRequired, Defaults.SSLClientAuthRequested, Defaults.SSLClientAuthNone), MEDIUM, SSLClientAuthDoc) - .define(SSLCipherSuitesProp, LIST, MEDIUM, SSLCipherSuitesDoc, false) + .define(SslProtocolProp, STRING, Defaults.SslProtocol, MEDIUM, SslProtocolDoc) + .define(SslProviderProp, STRING, MEDIUM, SslProviderDoc, false) + .define(SslEnabledProtocolsProp, LIST, Defaults.SslEnabledProtocols, MEDIUM, SslEnabledProtocolsDoc) + .define(SslKeystoreTypeProp, STRING, Defaults.SslKeystoreType, MEDIUM, SslKeystoreTypeDoc) + .define(SslKeystoreLocationProp, STRING, MEDIUM, SslKeystoreLocationDoc, false) + .define(SslKeystorePasswordProp, STRING, MEDIUM, SslKeystorePasswordDoc, false) + .define(SslKeyPasswordProp, STRING, MEDIUM, SslKeyPasswordDoc, false) + .define(SslTruststoreTypeProp, STRING, Defaults.SslTruststoreType, MEDIUM, SslTruststoreTypeDoc) + .define(SslTruststoreLocationProp, STRING, MEDIUM, SslTruststoreLocationDoc, false) + .define(SslTruststorePasswordProp, STRING, MEDIUM, SslTruststorePasswordDoc, false) + .define(SslKeyManagerAlgorithmProp, STRING, Defaults.SslKeyManagerAlgorithm, MEDIUM, SslKeyManagerAlgorithmDoc) + .define(SslTrustManagerAlgorithmProp, STRING, Defaults.SslTrustManagerAlgorithm, MEDIUM, SslTrustManagerAlgorithmDoc) + .define(SslEndpointIdentificationAlgorithmProp, STRING, LOW, SslEndpointIdentificationAlgorithmDoc, false) + .define(SslClientAuthProp, STRING, Defaults.SslClientAuth, in(Defaults.SslClientAuthRequired, Defaults.SslClientAuthRequested, Defaults.SslClientAuthNone), MEDIUM, SslClientAuthDoc) + .define(SslCipherSuitesProp, LIST, MEDIUM, SslCipherSuitesDoc, false) /** ********* Sasl Configuration ****************/ .define(SaslKerberosServiceNameProp, STRING, MEDIUM, SaslKerberosServiceNameDoc, false) @@ -815,20 +815,20 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka /** ********* SSL Configuration **************/ val principalBuilderClass = getClass(KafkaConfig.PrincipalBuilderClassProp) - val sslProtocol = getString(KafkaConfig.SSLProtocolProp) - val sslProvider = getString(KafkaConfig.SSLProviderProp) - val sslEnabledProtocols = getList(KafkaConfig.SSLEnabledProtocolsProp) - val sslKeystoreType = getString(KafkaConfig.SSLKeystoreTypeProp) - val sslKeystoreLocation = getString(KafkaConfig.SSLKeystoreLocationProp) - val sslKeystorePassword = getString(KafkaConfig.SSLKeystorePasswordProp) - val sslKeyPassword = getString(KafkaConfig.SSLKeyPasswordProp) - val sslTruststoreType = getString(KafkaConfig.SSLTruststoreTypeProp) - val sslTruststoreLocation = getString(KafkaConfig.SSLTruststoreLocationProp) - val sslTruststorePassword = getString(KafkaConfig.SSLTruststorePasswordProp) - val sslKeyManagerAlgorithm = getString(KafkaConfig.SSLKeyManagerAlgorithmProp) - val sslTrustManagerAlgorithm = getString(KafkaConfig.SSLTrustManagerAlgorithmProp) - val sslClientAuth = getString(KafkaConfig.SSLClientAuthProp) - val sslCipher = getList(KafkaConfig.SSLCipherSuitesProp) + val sslProtocol = getString(KafkaConfig.SslProtocolProp) + val sslProvider = getString(KafkaConfig.SslProviderProp) + val sslEnabledProtocols = getList(KafkaConfig.SslEnabledProtocolsProp) + val sslKeystoreType = getString(KafkaConfig.SslKeystoreTypeProp) + val sslKeystoreLocation = getString(KafkaConfig.SslKeystoreLocationProp) + val sslKeystorePassword = getString(KafkaConfig.SslKeystorePasswordProp) + val sslKeyPassword = getString(KafkaConfig.SslKeyPasswordProp) + val sslTruststoreType = getString(KafkaConfig.SslTruststoreTypeProp) + val sslTruststoreLocation = getString(KafkaConfig.SslTruststoreLocationProp) + val sslTruststorePassword = getString(KafkaConfig.SslTruststorePasswordProp) + val sslKeyManagerAlgorithm = getString(KafkaConfig.SslKeyManagerAlgorithmProp) + val sslTrustManagerAlgorithm = getString(KafkaConfig.SslTrustManagerAlgorithmProp) + val sslClientAuth = getString(KafkaConfig.SslClientAuthProp) + val sslCipher = getList(KafkaConfig.SslCipherSuitesProp) /** ********* Sasl Configuration **************/ val saslKerberosServiceName = getString(KafkaConfig.SaslKerberosServiceNameProp) diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 06be5c233d6..62c02dd720b 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -30,10 +30,10 @@ import org.apache.kafka.clients.{ManualMetadataUpdater, NetworkClient, ClientReq import org.apache.kafka.common.network.{LoginType, Selectable, ChannelBuilders, NetworkReceive, Selector, Mode} import org.apache.kafka.common.requests.{ListOffsetResponse, FetchResponse, RequestSend, AbstractRequest, ListOffsetRequest} import org.apache.kafka.common.requests.{FetchRequest => JFetchRequest} +import org.apache.kafka.common.security.ssl.SslFactory import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.{Errors, ApiKeys} -import org.apache.kafka.common.security.ssl.SSLFactory import org.apache.kafka.common.utils.Time import scala.collection.{JavaConverters, Map, mutable} diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index b0cb97ee7c6..01f198e212b 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -219,7 +219,7 @@ class SocketServerTest extends JUnitSuite { } @Test - def testSSLSocketServer(): Unit = { + def testSslSocketServer(): Unit = { val trustStoreFile = File.createTempFile("truststore", ".jks") val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, enableSsl = true, trustStoreFile = Some(trustStoreFile)) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 4059dc29ece..d5ab2627edc 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -491,21 +491,21 @@ class KafkaConfigTest { //SSL Configs case KafkaConfig.PrincipalBuilderClassProp => - case KafkaConfig.SSLProtocolProp => // ignore string - case KafkaConfig.SSLProviderProp => // ignore string - case KafkaConfig.SSLEnabledProtocolsProp => - case KafkaConfig.SSLKeystoreTypeProp => // ignore string - case KafkaConfig.SSLKeystoreLocationProp => // ignore string - case KafkaConfig.SSLKeystorePasswordProp => // ignore string - case KafkaConfig.SSLKeyPasswordProp => // ignore string - case KafkaConfig.SSLTruststoreTypeProp => // ignore string - case KafkaConfig.SSLTruststorePasswordProp => // ignore string - case KafkaConfig.SSLTruststoreLocationProp => // ignore string - case KafkaConfig.SSLKeyManagerAlgorithmProp => - case KafkaConfig.SSLTrustManagerAlgorithmProp => - case KafkaConfig.SSLClientAuthProp => // ignore string - case KafkaConfig.SSLEndpointIdentificationAlgorithmProp => // ignore string - case KafkaConfig.SSLCipherSuitesProp => // ignore string + case KafkaConfig.SslProtocolProp => // ignore string + case KafkaConfig.SslProviderProp => // ignore string + case KafkaConfig.SslEnabledProtocolsProp => + case KafkaConfig.SslKeystoreTypeProp => // ignore string + case KafkaConfig.SslKeystoreLocationProp => // ignore string + case KafkaConfig.SslKeystorePasswordProp => // ignore string + case KafkaConfig.SslKeyPasswordProp => // ignore string + case KafkaConfig.SslTruststoreTypeProp => // ignore string + case KafkaConfig.SslTruststorePasswordProp => // ignore string + case KafkaConfig.SslTruststoreLocationProp => // ignore string + case KafkaConfig.SslKeyManagerAlgorithmProp => + case KafkaConfig.SslTrustManagerAlgorithmProp => + case KafkaConfig.SslClientAuthProp => // ignore string + case KafkaConfig.SslEndpointIdentificationAlgorithmProp => // ignore string + case KafkaConfig.SslCipherSuitesProp => // ignore string //Sasl Configs case KafkaConfig.SaslKerberosServiceNameProp => // ignore string diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index ca17c6b0fa2..5ad548d633a 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -28,7 +28,9 @@ import charset.Charset import kafka.security.auth.{Resource, Authorizer, Acl} import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.kafka.common.security.ssl.SslFactory import org.apache.kafka.common.utils.Utils._ +import org.apache.kafka.test.TestSslUtils import scala.collection.mutable.{ArrayBuffer, ListBuffer} @@ -52,8 +54,6 @@ import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.consumer.{RangeAssignor, KafkaConsumer} import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.network.Mode -import org.apache.kafka.common.security.ssl.SSLFactory -import org.apache.kafka.test.TestSSLUtils import scala.collection.Map import scala.collection.JavaConversions._ @@ -964,9 +964,9 @@ object TestUtils extends Logging { val sslConfigs = { if (mode == Mode.SERVER) - TestSSLUtils.createSSLConfig(true, true, mode, trustStore, certAlias) + TestSslUtils.createSslConfig(true, true, mode, trustStore, certAlias) else - TestSSLUtils.createSSLConfig(clientCert, false, mode, trustStore, certAlias) + TestSslUtils.createSslConfig(clientCert, false, mode, trustStore, certAlias) } val sslProps = new Properties()