KAFKA-2847; Remove principal builder class from client configs

Also mark `PrincipalBuilder` as `Unstable` and  tweak docs.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #542 from ijuma/kafka-2847-remove-principal-builder-class-from-client-configs
This commit is contained in:
Ismael Juma 2015-11-17 08:36:43 -08:00 committed by Jun Rao
parent f1169f1da8
commit 52d5e88393
11 changed files with 35 additions and 28 deletions

View File

@ -66,6 +66,7 @@
</subpackage>
<subpackage name="security">
<allow pkg="org.apache.kafka.common.annotation" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.config" />
</subpackage>

View File

@ -22,7 +22,9 @@ public class SslConfigs {
*/
public static final String PRINCIPAL_BUILDER_CLASS_CONFIG = "principal.builder.class";
public static final String PRINCIPAL_BUILDER_CLASS_DOC = "principal builder to generate a java Principal. This config is optional for client.";
public static final String PRINCIPAL_BUILDER_CLASS_DOC = "The fully qualified name of a class that implements the PrincipalBuilder interface, " +
"which is currently used to build the Principal for connections with the SSL SecurityProtocol. " +
"Default is DefaultPrincipalBuilder.";
public static final String DEFAULT_PRINCIPAL_BUILDER_CLASS = "org.apache.kafka.common.security.auth.DefaultPrincipalBuilder";
public static final String SSL_PROTOCOL_CONFIG = "ssl.protocol";
@ -97,8 +99,7 @@ public class SslConfigs {
+ " <li><code>ssl.client.auth=none</code> This means client authentication is not needed.";
public static void addClientSslSupport(ConfigDef config) {
config.define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, ConfigDef.Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, ConfigDef.Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
.define(SslConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
config.define(SslConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
.define(SslConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC)
.define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, ConfigDef.Type.LIST, null, ConfigDef.Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC)
.define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, ConfigDef.Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)

View File

@ -13,7 +13,11 @@
package org.apache.kafka.common.network;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.security.auth.DefaultPrincipalBuilder;
import org.apache.kafka.common.security.auth.PrincipalBuilder;
import org.apache.kafka.common.utils.Utils;
import java.util.Map;
@ -57,6 +61,21 @@ public class ChannelBuilders {
return channelBuilder;
}
/**
* Returns a configured `PrincipalBuilder`.
*/
static PrincipalBuilder createPrincipalBuilder(Map<String, ?> configs) {
// this is a server-only config so it will always be null on the client
Class<?> principalBuilderClass = (Class<?>) configs.get(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG);
PrincipalBuilder principalBuilder;
if (principalBuilderClass == null)
principalBuilder = new DefaultPrincipalBuilder();
else
principalBuilder = (PrincipalBuilder) Utils.newInstance(principalBuilderClass);
principalBuilder.configure(configs);
return principalBuilder;
}
private static void requireNonNullMode(Mode mode, SecurityProtocol securityProtocol) {
if (mode == null)
throw new IllegalArgumentException("`mode` must be non-null if `securityProtocol` is `" + securityProtocol + "`");

View File

@ -15,9 +15,7 @@ package org.apache.kafka.common.network;
import java.nio.channels.SelectionKey;
import java.util.Map;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.PrincipalBuilder;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.KafkaException;
import org.slf4j.Logger;
@ -31,8 +29,7 @@ public class PlaintextChannelBuilder implements ChannelBuilder {
public void configure(Map<String, ?> configs) throws KafkaException {
try {
this.configs = configs;
this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
this.principalBuilder.configure(this.configs);
principalBuilder = ChannelBuilders.createPrincipalBuilder(configs);
} catch (Exception e) {
throw new KafkaException(e);
}

View File

@ -20,15 +20,12 @@ import java.util.Map;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.security.auth.PrincipalBuilder;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import org.apache.kafka.common.security.kerberos.LoginManager;
import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
import org.apache.kafka.common.security.authenticator.SaslServerAuthenticator;
import org.apache.kafka.common.security.ssl.SslFactory;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.KafkaException;
import org.slf4j.Logger;
@ -42,7 +39,6 @@ public class SaslChannelBuilder implements ChannelBuilder {
private final LoginType loginType;
private LoginManager loginManager;
private PrincipalBuilder principalBuilder;
private SslFactory sslFactory;
private Map<String, ?> configs;
private KerberosShortNamer kerberosShortNamer;
@ -57,8 +53,6 @@ public class SaslChannelBuilder implements ChannelBuilder {
try {
this.configs = configs;
this.loginManager = LoginManager.acquireLoginManager(loginType, configs);
this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
this.principalBuilder.configure(configs);
String defaultRealm;
try {
@ -90,7 +84,8 @@ public class SaslChannelBuilder implements ChannelBuilder {
else
authenticator = new SaslClientAuthenticator(id, loginManager.subject(), loginManager.serviceName(),
socketChannel.socket().getInetAddress().getHostName());
authenticator.configure(transportLayer, this.principalBuilder, this.configs);
// Both authenticators don't use `PrincipalBuilder`, so we pass `null` for now. Reconsider if this changes.
authenticator.configure(transportLayer, null, this.configs);
return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
} catch (Exception e) {
log.info("Failed to create channel due to ", e);
@ -99,7 +94,6 @@ public class SaslChannelBuilder implements ChannelBuilder {
}
public void close() {
this.principalBuilder.close();
this.loginManager.release();
}

View File

@ -19,8 +19,6 @@ import java.util.Map;
import org.apache.kafka.common.security.auth.PrincipalBuilder;
import org.apache.kafka.common.security.ssl.SslFactory;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.KafkaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -41,8 +39,7 @@ public class SslChannelBuilder implements ChannelBuilder {
this.configs = configs;
this.sslFactory = new SslFactory(mode);
this.sslFactory.configure(this.configs);
this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class<?>) configs.get(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG));
this.principalBuilder.configure(this.configs);
this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs);
} catch (Exception e) {
throw new KafkaException(e);
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.common.security.auth;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.network.Authenticator;
import org.apache.kafka.common.KafkaException;
@ -28,6 +29,7 @@ import java.security.Principal;
/*
* PrincipalBuilder for Authenticator
*/
@InterfaceStability.Unstable
public interface PrincipalBuilder extends Configurable {
/**

View File

@ -18,7 +18,6 @@ package org.apache.kafka.clients.producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.MockSerializer;
@ -54,12 +53,11 @@ public class KafkaProducerTest {
@Test
public void testSerializerClose() throws Exception {
Map<String, Object> configs = new HashMap<String, Object>();
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "testConstructorClose");
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
configs.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL);
configs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
final int oldInitCount = MockSerializer.INIT_COUNT.get();
final int oldCloseCount = MockSerializer.CLOSE_COUNT.get();

View File

@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
import java.util.*;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@ -48,8 +47,7 @@ public class SelectorTest {
@Before
public void setup() throws Exception {
Map<String, Object> configs = new HashMap<String, Object>();
configs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
Map<String, Object> configs = new HashMap<>();
this.server = new EchoServer(configs);
this.server.start();
this.time = new MockTime();

View File

@ -52,7 +52,6 @@ public class SslSelectorTest extends SelectorTest {
this.server.start();
this.time = new MockTime();
sslClientConfigs = TestSslUtils.createSslConfig(false, false, Mode.SERVER, trustStoreFile, "client");
sslClientConfigs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
this.channelBuilder = new SslChannelBuilder(Mode.CLIENT);
this.channelBuilder.configure(sslClientConfigs);

View File

@ -464,11 +464,12 @@ public class SslTransportLayerTest {
Mode mode = server ? Mode.SERVER : Mode.CLIENT;
File truststoreFile = File.createTempFile(name + "TS", ".jks");
sslConfig = TestSslUtils.createSslConfig(!server, true, mode, truststoreFile, name);
sslConfig.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
if (server)
sslConfig.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS));
}
private Map<String, Object> getTrustingConfig(CertStores truststoreConfig) {
Map<String, Object> config = new HashMap<String, Object>(sslConfig);
Map<String, Object> config = new HashMap<>(sslConfig);
config.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
config.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
config.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG));