From 52d5e88393630ce6f817bd003c7c787e36e31277 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 17 Nov 2015 08:36:43 -0800 Subject: [PATCH] KAFKA-2847; Remove principal builder class from client configs Also mark `PrincipalBuilder` as `Unstable` and tweak docs. Author: Ismael Juma Reviewers: Jun Rao Closes #542 from ijuma/kafka-2847-remove-principal-builder-class-from-client-configs --- checkstyle/import-control.xml | 1 + .../kafka/common/config/SslConfigs.java | 7 ++++--- .../kafka/common/network/ChannelBuilders.java | 19 +++++++++++++++++++ .../network/PlaintextChannelBuilder.java | 5 +---- .../common/network/SaslChannelBuilder.java | 10 ++-------- .../common/network/SslChannelBuilder.java | 5 +---- .../security/auth/PrincipalBuilder.java | 2 ++ .../clients/producer/KafkaProducerTest.java | 4 +--- .../kafka/common/network/SelectorTest.java | 4 +--- .../kafka/common/network/SslSelectorTest.java | 1 - .../common/network/SslTransportLayerTest.java | 5 +++-- 11 files changed, 35 insertions(+), 28 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 204bc60bfa5..e221dce633b 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -66,6 +66,7 @@ + diff --git a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java index ae4667a8a4d..a893b754cce 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java +++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java @@ -22,7 +22,9 @@ public class SslConfigs { */ public static final String PRINCIPAL_BUILDER_CLASS_CONFIG = "principal.builder.class"; - public static final String PRINCIPAL_BUILDER_CLASS_DOC = "principal builder to generate a java Principal. This config is optional for client."; + public static final String PRINCIPAL_BUILDER_CLASS_DOC = "The fully qualified name of a class that implements the PrincipalBuilder interface, " + + "which is currently used to build the Principal for connections with the SSL SecurityProtocol. " + + "Default is DefaultPrincipalBuilder."; public static final String DEFAULT_PRINCIPAL_BUILDER_CLASS = "org.apache.kafka.common.security.auth.DefaultPrincipalBuilder"; public static final String SSL_PROTOCOL_CONFIG = "ssl.protocol"; @@ -97,8 +99,7 @@ public class SslConfigs { + "
  • ssl.client.auth=none This means client authentication is not needed."; public static void addClientSslSupport(ConfigDef config) { - config.define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, ConfigDef.Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, ConfigDef.Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC) - .define(SslConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC) + config.define(SslConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC) .define(SslConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC) .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, ConfigDef.Type.LIST, null, ConfigDef.Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC) .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, ConfigDef.Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC) diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java index 03c663d0f10..669f269f91c 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java @@ -13,7 +13,11 @@ package org.apache.kafka.common.network; +import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.protocol.SecurityProtocol; +import org.apache.kafka.common.security.auth.DefaultPrincipalBuilder; +import org.apache.kafka.common.security.auth.PrincipalBuilder; +import org.apache.kafka.common.utils.Utils; import java.util.Map; @@ -57,6 +61,21 @@ public class ChannelBuilders { return channelBuilder; } + /** + * Returns a configured `PrincipalBuilder`. + */ + static PrincipalBuilder createPrincipalBuilder(Map configs) { + // this is a server-only config so it will always be null on the client + Class principalBuilderClass = (Class) configs.get(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG); + PrincipalBuilder principalBuilder; + if (principalBuilderClass == null) + principalBuilder = new DefaultPrincipalBuilder(); + else + principalBuilder = (PrincipalBuilder) Utils.newInstance(principalBuilderClass); + principalBuilder.configure(configs); + return principalBuilder; + } + private static void requireNonNullMode(Mode mode, SecurityProtocol securityProtocol) { if (mode == null) throw new IllegalArgumentException("`mode` must be non-null if `securityProtocol` is `" + securityProtocol + "`"); diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java index bc1536aa98b..f0af9351ae4 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java @@ -15,9 +15,7 @@ package org.apache.kafka.common.network; import java.nio.channels.SelectionKey; import java.util.Map; -import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.security.auth.PrincipalBuilder; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.KafkaException; import org.slf4j.Logger; @@ -31,8 +29,7 @@ public class PlaintextChannelBuilder implements ChannelBuilder { public void configure(Map configs) throws KafkaException { try { this.configs = configs; - this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class) configs.get(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG)); - this.principalBuilder.configure(this.configs); + principalBuilder = ChannelBuilders.createPrincipalBuilder(configs); } catch (Exception e) { throw new KafkaException(e); } diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java index 75e3fcad4f3..86ac779a4d6 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java @@ -20,15 +20,12 @@ import java.util.Map; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.security.JaasUtils; -import org.apache.kafka.common.security.auth.PrincipalBuilder; import org.apache.kafka.common.security.kerberos.KerberosShortNamer; import org.apache.kafka.common.security.kerberos.LoginManager; import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator; import org.apache.kafka.common.security.authenticator.SaslServerAuthenticator; import org.apache.kafka.common.security.ssl.SslFactory; import org.apache.kafka.common.protocol.SecurityProtocol; -import org.apache.kafka.common.config.SslConfigs; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.KafkaException; import org.slf4j.Logger; @@ -42,7 +39,6 @@ public class SaslChannelBuilder implements ChannelBuilder { private final LoginType loginType; private LoginManager loginManager; - private PrincipalBuilder principalBuilder; private SslFactory sslFactory; private Map configs; private KerberosShortNamer kerberosShortNamer; @@ -57,8 +53,6 @@ public class SaslChannelBuilder implements ChannelBuilder { try { this.configs = configs; this.loginManager = LoginManager.acquireLoginManager(loginType, configs); - this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class) configs.get(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG)); - this.principalBuilder.configure(configs); String defaultRealm; try { @@ -90,7 +84,8 @@ public class SaslChannelBuilder implements ChannelBuilder { else authenticator = new SaslClientAuthenticator(id, loginManager.subject(), loginManager.serviceName(), socketChannel.socket().getInetAddress().getHostName()); - authenticator.configure(transportLayer, this.principalBuilder, this.configs); + // Both authenticators don't use `PrincipalBuilder`, so we pass `null` for now. Reconsider if this changes. + authenticator.configure(transportLayer, null, this.configs); return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize); } catch (Exception e) { log.info("Failed to create channel due to ", e); @@ -99,7 +94,6 @@ public class SaslChannelBuilder implements ChannelBuilder { } public void close() { - this.principalBuilder.close(); this.loginManager.release(); } diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java index 9a7ba0c42ea..b546174fdf1 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java @@ -19,8 +19,6 @@ import java.util.Map; import org.apache.kafka.common.security.auth.PrincipalBuilder; import org.apache.kafka.common.security.ssl.SslFactory; -import org.apache.kafka.common.config.SslConfigs; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.KafkaException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,8 +39,7 @@ public class SslChannelBuilder implements ChannelBuilder { this.configs = configs; this.sslFactory = new SslFactory(mode); this.sslFactory.configure(this.configs); - this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class) configs.get(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG)); - this.principalBuilder.configure(this.configs); + this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs); } catch (Exception e) { throw new KafkaException(e); } diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java index 99b6d21aff3..75e18555e33 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.security.auth; +import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.network.TransportLayer; import org.apache.kafka.common.network.Authenticator; import org.apache.kafka.common.KafkaException; @@ -28,6 +29,7 @@ import java.security.Principal; /* * PrincipalBuilder for Authenticator */ +@InterfaceStability.Unstable public interface PrincipalBuilder extends Configurable { /** diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index b96a5f76800..11302254da7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -18,7 +18,6 @@ package org.apache.kafka.clients.producer; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.test.MockMetricsReporter; import org.apache.kafka.test.MockSerializer; @@ -54,12 +53,11 @@ public class KafkaProducerTest { @Test public void testSerializerClose() throws Exception { - Map configs = new HashMap(); + Map configs = new HashMap<>(); configs.put(ProducerConfig.CLIENT_ID_CONFIG, "testConstructorClose"); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); configs.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL); - configs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); final int oldInitCount = MockSerializer.INIT_COUNT.get(); final int oldCloseCount = MockSerializer.CLOSE_COUNT.get(); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index 8ce02988c9e..18fd080a2ef 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -24,7 +24,6 @@ import java.nio.ByteBuffer; import java.util.*; import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -48,8 +47,7 @@ public class SelectorTest { @Before public void setup() throws Exception { - Map configs = new HashMap(); - configs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); + Map configs = new HashMap<>(); this.server = new EchoServer(configs); this.server.start(); this.time = new MockTime(); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java index 94c5654e2e8..a442ea00049 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java @@ -52,7 +52,6 @@ public class SslSelectorTest extends SelectorTest { this.server.start(); this.time = new MockTime(); sslClientConfigs = TestSslUtils.createSslConfig(false, false, Mode.SERVER, trustStoreFile, "client"); - sslClientConfigs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); this.channelBuilder = new SslChannelBuilder(Mode.CLIENT); this.channelBuilder.configure(sslClientConfigs); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index 282ff8b9cf7..2b5d26b50bd 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -464,11 +464,12 @@ public class SslTransportLayerTest { Mode mode = server ? Mode.SERVER : Mode.CLIENT; File truststoreFile = File.createTempFile(name + "TS", ".jks"); sslConfig = TestSslUtils.createSslConfig(!server, true, mode, truststoreFile, name); - sslConfig.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); + if (server) + sslConfig.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); } private Map getTrustingConfig(CertStores truststoreConfig) { - Map config = new HashMap(sslConfig); + Map config = new HashMap<>(sslConfig); config.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); config.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)); config.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG));