KAFKA-6246; Dynamic update of listeners and security configs (#4488)

Dynamic update of listeners as described in KIP-226. This includes:
  - Addition of new listeners with listener-prefixed security configs
  - Removal of existing listeners
  - Password encryption
  - sasl.jaas.config property for broker's JAAS config prefixed with listener and mechanism name
This commit is contained in:
Rajini Sivaram 2018-02-04 09:19:17 -08:00 committed by Jason Gustafson
parent f9b56d680b
commit 4019b21d60
41 changed files with 1573 additions and 362 deletions

View File

@ -50,6 +50,7 @@
<disallow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common" exact-match="true" />
<allow pkg="org.apache.kafka.common.annotation" />
<allow pkg="org.apache.kafka.common.config" exact-match="true" />
<allow pkg="org.apache.kafka.common.internals" exact-match="true" />
<allow pkg="org.apache.kafka.test" />

View File

@ -16,6 +16,8 @@
*/
package org.apache.kafka.common;
import org.apache.kafka.common.config.ConfigException;
import java.util.Map;
import java.util.Set;
@ -33,9 +35,12 @@ public interface Reconfigurable extends Configurable {
* Validates the provided configuration. The provided map contains
* all configs including any reconfigurable configs that may be different
* from the initial configuration. Reconfiguration will be not performed
* if this method returns false or throws any exception.
* if this method throws any exception.
* @throws ConfigException if the provided configs are not valid. The exception
* message from ConfigException will be returned to the client in
* the AlterConfigs response.
*/
boolean validateReconfiguration(Map<String, ?> configs);
void validateReconfiguration(Map<String, ?> configs) throws ConfigException;
/**
* Reconfigures this instance with the given key-value pairs. The provided

View File

@ -204,6 +204,16 @@ public class AbstractConfig {
* put all the remaining keys with the prefix stripped and their parsed values in the result map.
*
* This is useful if one wants to allow prefixed configs to override default ones.
* <p>
* Two forms of prefixes are supported:
* <ul>
* <li>listener.name.{listenerName}.some.prop: If the provided prefix is `listener.name.{listenerName}.`,
* the key `some.prop` with the value parsed using the definition of `some.prop` is returned.</li>
* <li>listener.name.{listenerName}.{mechanism}.some.prop: If the provided prefix is `listener.name.{listenerName}.`,
* the key `{mechanism}.some.prop` with the value parsed using the definition of `some.prop` is returned.
* This is used to provide per-mechanism configs for a broker listener (e.g sasl.jaas.config)</li>
* </ul>
* </p>
*/
public Map<String, Object> valuesWithPrefixOverride(String prefix) {
Map<String, Object> result = new RecordingMap<>(values(), prefix, true);
@ -213,6 +223,12 @@ public class AbstractConfig {
ConfigDef.ConfigKey configKey = definition.configKeys().get(keyWithNoPrefix);
if (configKey != null)
result.put(keyWithNoPrefix, definition.parseValue(configKey, entry.getValue(), true));
else {
String keyWithNoSecondaryPrefix = keyWithNoPrefix.substring(keyWithNoPrefix.indexOf('.') + 1);
configKey = definition.configKeys().get(keyWithNoSecondaryPrefix);
if (configKey != null)
result.put(keyWithNoPrefix, definition.parseValue(configKey, entry.getValue(), true));
}
}
}
return result;

View File

@ -29,6 +29,9 @@ import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import org.apache.kafka.common.security.token.delegation.DelegationTokenCache;
import org.apache.kafka.common.utils.Utils;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ChannelBuilders {
@ -105,9 +108,20 @@ public class ChannelBuilders {
case SASL_SSL:
case SASL_PLAINTEXT:
requireNonNullMode(mode, securityProtocol);
JaasContext jaasContext = JaasContext.load(contextType, listenerName, configs);
Map<String, JaasContext> jaasContexts;
if (mode == Mode.SERVER) {
List<String> enabledMechanisms = (List<String>) configs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG);
jaasContexts = new HashMap<>(enabledMechanisms.size());
for (String mechanism : enabledMechanisms)
jaasContexts.put(mechanism, JaasContext.loadServerContext(listenerName, mechanism, configs));
} else {
// Use server context for inter-broker client connections and client context for other clients
JaasContext jaasContext = contextType == JaasContext.Type.CLIENT ? JaasContext.loadClientContext(configs) :
JaasContext.loadServerContext(listenerName, clientSaslMechanism, configs);
jaasContexts = Collections.singletonMap(clientSaslMechanism, jaasContext);
}
channelBuilder = new SaslChannelBuilder(mode,
jaasContext,
jaasContexts,
securityProtocol,
listenerName,
isInterBrokerListener,

View File

@ -71,4 +71,8 @@ public final class ListenerName {
public String configPrefix() {
return CONFIG_STATIC_PREFIX + "." + value.toLowerCase(Locale.ROOT) + ".";
}
public String saslMechanismConfigPrefix(String saslMechanism) {
return configPrefix() + saslMechanism.toLowerCase(Locale.ROOT) + ".";
}
}

View File

@ -41,6 +41,7 @@ import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -55,18 +56,19 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
private final boolean isInterBrokerListener;
private final String clientSaslMechanism;
private final Mode mode;
private final JaasContext jaasContext;
private final Map<String, JaasContext> jaasContexts;
private final boolean handshakeRequestEnable;
private final CredentialCache credentialCache;
private final DelegationTokenCache tokenCache;
private final Map<String, LoginManager> loginManagers;
private final Map<String, Subject> subjects;
private LoginManager loginManager;
private SslFactory sslFactory;
private Map<String, ?> configs;
private KerberosShortNamer kerberosShortNamer;
public SaslChannelBuilder(Mode mode,
JaasContext jaasContext,
Map<String, JaasContext> jaasContexts,
SecurityProtocol securityProtocol,
ListenerName listenerName,
boolean isInterBrokerListener,
@ -75,7 +77,9 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
CredentialCache credentialCache,
DelegationTokenCache tokenCache) {
this.mode = mode;
this.jaasContext = jaasContext;
this.jaasContexts = jaasContexts;
this.loginManagers = new HashMap<>(jaasContexts.size());
this.subjects = new HashMap<>(jaasContexts.size());
this.securityProtocol = securityProtocol;
this.listenerName = listenerName;
this.isInterBrokerListener = isInterBrokerListener;
@ -89,14 +93,7 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
public void configure(Map<String, ?> configs) throws KafkaException {
try {
this.configs = configs;
boolean hasKerberos;
if (mode == Mode.SERVER) {
@SuppressWarnings("unchecked")
List<String> enabledMechanisms = (List<String>) this.configs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG);
hasKerberos = enabledMechanisms == null || enabledMechanisms.contains(SaslConfigs.GSSAPI_MECHANISM);
} else {
hasKerberos = clientSaslMechanism.equals(SaslConfigs.GSSAPI_MECHANISM);
}
boolean hasKerberos = jaasContexts.containsKey(SaslConfigs.GSSAPI_MECHANISM);
if (hasKerberos) {
String defaultRealm;
@ -110,8 +107,14 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
if (principalToLocalRules != null)
kerberosShortNamer = KerberosShortNamer.fromUnparsedRules(defaultRealm, principalToLocalRules);
}
this.loginManager = LoginManager.acquireLoginManager(jaasContext, hasKerberos, configs);
for (Map.Entry<String, JaasContext> entry : jaasContexts.entrySet()) {
String mechanism = entry.getKey();
// With static JAAS configuration, use KerberosLogin if Kerberos is enabled. With dynamic JAAS configuration,
// use KerberosLogin only for the LoginContext corresponding to GSSAPI
LoginManager loginManager = LoginManager.acquireLoginManager(entry.getValue(), mechanism, hasKerberos, configs);
loginManagers.put(mechanism, loginManager);
subjects.put(mechanism, loginManager.subject());
}
if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
// Disable SSL client authentication as we are using SASL authentication
this.sslFactory = new SslFactory(mode, "none", isInterBrokerListener);
@ -129,11 +132,9 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
}
@Override
public boolean validateReconfiguration(Map<String, ?> configs) {
public void validateReconfiguration(Map<String, ?> configs) {
if (this.securityProtocol == SecurityProtocol.SASL_SSL)
return sslFactory.validateReconfiguration(configs);
else
return true;
sslFactory.validateReconfiguration(configs);
}
@Override
@ -154,11 +155,13 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
Socket socket = socketChannel.socket();
TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel);
Authenticator authenticator;
if (mode == Mode.SERVER)
authenticator = buildServerAuthenticator(configs, id, transportLayer, loginManager.subject());
else
if (mode == Mode.SERVER) {
authenticator = buildServerAuthenticator(configs, id, transportLayer, subjects);
} else {
LoginManager loginManager = loginManagers.get(clientSaslMechanism);
authenticator = buildClientAuthenticator(configs, id, socket.getInetAddress().getHostName(),
loginManager.serviceName(), transportLayer, loginManager.subject());
}
return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE);
} catch (Exception e) {
log.info("Failed to create channel due to ", e);
@ -168,10 +171,9 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
@Override
public void close() {
if (loginManager != null) {
for (LoginManager loginManager : loginManagers.values())
loginManager.release();
loginManager = null;
}
loginManagers.clear();
}
private TransportLayer buildTransportLayer(String id, SelectionKey key, SocketChannel socketChannel) throws IOException {
@ -185,8 +187,8 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
// Visible to override for testing
protected SaslServerAuthenticator buildServerAuthenticator(Map<String, ?> configs, String id,
TransportLayer transportLayer, Subject subject) throws IOException {
return new SaslServerAuthenticator(configs, id, jaasContext, subject,
TransportLayer transportLayer, Map<String, Subject> subjects) throws IOException {
return new SaslServerAuthenticator(configs, id, jaasContexts, subjects,
kerberosShortNamer, credentialCache, listenerName, securityProtocol, transportLayer, tokenCache);
}
@ -198,8 +200,8 @@ public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurabl
}
// Package private for testing
LoginManager loginManager() {
return loginManager;
Map<String, LoginManager> loginManagers() {
return loginManagers;
}
private static String defaultKerberosRealm() throws ClassNotFoundException, NoSuchMethodException,

View File

@ -71,8 +71,8 @@ public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable
}
@Override
public boolean validateReconfiguration(Map<String, ?> configs) {
return sslFactory.validateReconfiguration(configs);
public void validateReconfiguration(Map<String, ?> configs) {
sslFactory.validateReconfiguration(configs);
}
@Override

View File

@ -41,57 +41,59 @@ public class JaasContext {
/**
* Returns an instance of this class.
*
* For contextType SERVER, the context will contain the default Configuration and the context name will be one of:
* The context will contain the configuration specified by the JAAS configuration property
* {@link SaslConfigs#SASL_JAAS_CONFIG} with prefix `listener.name.{listenerName}.{mechanism}.`
* with listenerName and mechanism in lower case. The context `KafkaServer` will be returned
* with a single login context entry loaded from the property.
* <p>
* If the property is not defined, the context will contain the default Configuration and
* the context name will be one of:
* <ol>
* <li>Lowercased listener name followed by a period and the string `KafkaServer`</li>
* <li>The string `KafkaServer`</li>
* </ol>
* If both are valid entries in the default JAAS configuration, the first option is chosen.
* </p>
*
* 1. Lowercased listener name followed by a period and the string `KafkaServer`
* 2. The string `KafkaServer`
* @throws IllegalArgumentException if listenerName or mechanism is not defined.
*/
public static JaasContext loadServerContext(ListenerName listenerName, String mechanism, Map<String, ?> configs) {
if (listenerName == null)
throw new IllegalArgumentException("listenerName should not be null for SERVER");
if (mechanism == null)
throw new IllegalArgumentException("mechanism should not be null for SERVER");
String globalContextName = GLOBAL_CONTEXT_NAME_SERVER;
String listenerContextName = listenerName.value().toLowerCase(Locale.ROOT) + "." + GLOBAL_CONTEXT_NAME_SERVER;
Password jaasConfigArgs = (Password) configs.get(mechanism.toLowerCase(Locale.ROOT) + "." + SaslConfigs.SASL_JAAS_CONFIG);
if (jaasConfigArgs == null && configs.containsKey(SaslConfigs.SASL_JAAS_CONFIG))
LOG.warn("Server config {} should be prefixed with SASL mechanism name, ignoring config", SaslConfigs.SASL_JAAS_CONFIG);
return load(Type.SERVER, listenerContextName, globalContextName, jaasConfigArgs);
}
/**
* Returns an instance of this class.
*
* If both are valid entries in the JAAS configuration, the first option is chosen.
*
* For contextType CLIENT, if JAAS configuration property @link SaslConfigs#SASL_JAAS_CONFIG} is specified,
* If JAAS configuration property @link SaslConfigs#SASL_JAAS_CONFIG} is specified,
* the configuration object is created by parsing the property value. Otherwise, the default Configuration
* is returned. The context name is always `KafkaClient`.
*
* @throws IllegalArgumentException if JAAS configuration property is specified for contextType SERVER, if
* listenerName is not defined for contextType SERVER of if listenerName is defined for contextType CLIENT.
*/
public static JaasContext load(JaasContext.Type contextType, ListenerName listenerName,
Map<String, ?> configs) {
String listenerContextName;
String globalContextName;
switch (contextType) {
case CLIENT:
if (listenerName != null)
throw new IllegalArgumentException("listenerName should be null for CLIENT");
globalContextName = GLOBAL_CONTEXT_NAME_CLIENT;
listenerContextName = null;
break;
case SERVER:
if (listenerName == null)
throw new IllegalArgumentException("listenerName should not be null for SERVER");
globalContextName = GLOBAL_CONTEXT_NAME_SERVER;
listenerContextName = listenerName.value().toLowerCase(Locale.ROOT) + "." + GLOBAL_CONTEXT_NAME_SERVER;
break;
default:
throw new IllegalArgumentException("Unexpected context type " + contextType);
}
return load(contextType, listenerContextName, globalContextName, configs);
public static JaasContext loadClientContext(Map<String, ?> configs) {
String globalContextName = GLOBAL_CONTEXT_NAME_CLIENT;
Password jaasConfigArgs = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG);
return load(JaasContext.Type.CLIENT, null, globalContextName, jaasConfigArgs);
}
static JaasContext load(JaasContext.Type contextType, String listenerContextName,
String globalContextName, Map<String, ?> configs) {
Password jaasConfigArgs = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG);
String globalContextName, Password jaasConfigArgs) {
if (jaasConfigArgs != null) {
if (contextType == JaasContext.Type.SERVER)
throw new IllegalArgumentException("JAAS config property not supported for server");
else {
JaasConfig jaasConfig = new JaasConfig(globalContextName, jaasConfigArgs.value());
AppConfigurationEntry[] clientModules = jaasConfig.getAppConfigurationEntry(globalContextName);
int numModules = clientModules == null ? 0 : clientModules.length;
if (numModules != 1)
throw new IllegalArgumentException("JAAS config property contains " + numModules + " login modules, should be 1 module");
AppConfigurationEntry[] contextModules = jaasConfig.getAppConfigurationEntry(globalContextName);
if (contextModules == null || contextModules.length == 0)
throw new IllegalArgumentException("JAAS config property does not contain any login modules");
else if (contextModules.length != 1)
throw new IllegalArgumentException("JAAS config property contains " + contextModules.length + " login modules, should be 1 module");
return new JaasContext(globalContextName, contextType, jaasConfig);
}
} else
return defaultContext(contextType, listenerContextName, globalContextName);
}

View File

@ -16,18 +16,16 @@
*/
package org.apache.kafka.common.security.authenticator;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class CredentialCache {
private final Map<String, Cache<? extends Object>> cacheMap = new HashMap<>();
private final ConcurrentHashMap<String, Cache<? extends Object>> cacheMap = new ConcurrentHashMap<>();
public <C> Cache<C> createCache(String mechanism, Class<C> credentialClass) {
Cache<C> cache = new Cache<C>(credentialClass);
cacheMap.put(mechanism, cache);
return cache;
Cache<C> oldCache = (Cache<C>) cacheMap.putIfAbsent(mechanism, cache);
return oldCache == null ? cache : oldCache;
}
@SuppressWarnings("unchecked")

View File

@ -64,7 +64,7 @@ public class LoginManager {
* shut it down when the broker or clients are closed. It's straightforward to do the former, but it's more
* complicated to do the latter without making the consumer API more complex.
*/
public static LoginManager acquireLoginManager(JaasContext jaasContext, boolean hasKerberos,
public static LoginManager acquireLoginManager(JaasContext jaasContext, String saslMechanism, boolean hasKerberos,
Map<String, ?> configs) throws IOException, LoginException {
synchronized (LoginManager.class) {
// SASL_JAAS_CONFIG is only supported by clients
@ -73,7 +73,7 @@ public class LoginManager {
if (jaasContext.type() == JaasContext.Type.CLIENT && jaasConfigValue != null) {
loginManager = DYNAMIC_INSTANCES.get(jaasConfigValue);
if (loginManager == null) {
loginManager = new LoginManager(jaasContext, hasKerberos, configs, jaasConfigValue);
loginManager = new LoginManager(jaasContext, saslMechanism.equals(SaslConfigs.GSSAPI_MECHANISM), configs, jaasConfigValue);
DYNAMIC_INSTANCES.put(jaasConfigValue, loginManager);
}
} else {

View File

@ -101,8 +101,8 @@ public class SaslServerAuthenticator implements Authenticator {
private final SecurityProtocol securityProtocol;
private final ListenerName listenerName;
private final String connectionId;
private final JaasContext jaasContext;
private final Subject subject;
private final Map<String, JaasContext> jaasContexts;
private final Map<String, Subject> subjects;
private final CredentialCache credentialCache;
private final TransportLayer transportLayer;
private final Set<String> enabledMechanisms;
@ -128,19 +128,17 @@ public class SaslServerAuthenticator implements Authenticator {
public SaslServerAuthenticator(Map<String, ?> configs,
String connectionId,
JaasContext jaasContext,
Subject subject,
Map<String, JaasContext> jaasContexts,
Map<String, Subject> subjects,
KerberosShortNamer kerberosNameParser,
CredentialCache credentialCache,
ListenerName listenerName,
SecurityProtocol securityProtocol,
TransportLayer transportLayer,
DelegationTokenCache tokenCache) throws IOException {
if (subject == null)
throw new IllegalArgumentException("subject cannot be null");
this.connectionId = connectionId;
this.jaasContext = jaasContext;
this.subject = subject;
this.jaasContexts = jaasContexts;
this.subjects = subjects;
this.credentialCache = credentialCache;
this.listenerName = listenerName;
this.securityProtocol = securityProtocol;
@ -154,6 +152,12 @@ public class SaslServerAuthenticator implements Authenticator {
if (enabledMechanisms == null || enabledMechanisms.isEmpty())
throw new IllegalArgumentException("No SASL mechanisms are enabled");
this.enabledMechanisms = new HashSet<>(enabledMechanisms);
for (String mechanism : enabledMechanisms) {
if (!jaasContexts.containsKey(mechanism))
throw new IllegalArgumentException("Jaas context not specified for SASL mechanism " + mechanism);
if (!subjects.containsKey(mechanism))
throw new IllegalArgumentException("Subject cannot be null for SASL mechanism " + mechanism);
}
// Note that the old principal builder does not support SASL, so we do not need to pass the
// authenticator or the transport layer
@ -162,8 +166,9 @@ public class SaslServerAuthenticator implements Authenticator {
private void createSaslServer(String mechanism) throws IOException {
this.saslMechanism = mechanism;
Subject subject = subjects.get(mechanism);
if (!ScramMechanism.isScram(mechanism))
callbackHandler = new SaslServerCallbackHandler(jaasContext);
callbackHandler = new SaslServerCallbackHandler(jaasContexts.get(mechanism));
else
callbackHandler = new ScramServerCallbackHandler(credentialCache.cache(mechanism, ScramCredential.class), tokenCache);
callbackHandler.configure(configs, Mode.SERVER, subject, saslMechanism);

View File

@ -76,9 +76,9 @@ public final class ScramCredentialUtils {
return props;
}
public static void createCache(CredentialCache cache, Collection<String> enabledMechanisms) {
public static void createCache(CredentialCache cache, Collection<String> mechanisms) {
for (String mechanism : ScramMechanism.mechanismNames()) {
if (enabledMechanisms.contains(mechanism))
if (mechanisms.contains(mechanism))
cache.createCache(mechanism, ScramCredential.class);
}
}

View File

@ -144,14 +144,13 @@ public class SslFactory implements Reconfigurable {
}
@Override
public boolean validateReconfiguration(Map<String, ?> configs) {
public void validateReconfiguration(Map<String, ?> configs) {
try {
SecurityStore newKeystore = maybeCreateNewKeystore(configs);
if (newKeystore != null)
createSSLContext(newKeystore);
return true;
} catch (Exception e) {
throw new KafkaException("Validation of dynamic config update failed", e);
throw new ConfigException("Validation of dynamic config update failed", e);
}
}
@ -163,7 +162,7 @@ public class SslFactory implements Reconfigurable {
this.sslContext = createSSLContext(newKeystore);
this.keystore = newKeystore;
} catch (Exception e) {
throw new KafkaException("Reconfiguration of SSL keystore failed", e);
throw new ConfigException("Reconfiguration of SSL keystore failed", e);
}
}
}

View File

@ -19,6 +19,7 @@ package org.apache.kafka.common.config;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.metrics.FakeMetricsReporter;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricsReporter;
@ -139,6 +140,54 @@ public class AbstractConfigTest {
assertFalse(config.unused().contains("ssl.key.password"));
}
@Test
public void testValuesWithSecondaryPrefix() {
String prefix = "listener.name.listener1.";
Password saslJaasConfig1 = new Password("test.myLoginModule1 required;");
Password saslJaasConfig2 = new Password("test.myLoginModule2 required;");
Password saslJaasConfig3 = new Password("test.myLoginModule3 required;");
Properties props = new Properties();
props.put("listener.name.listener1.test-mechanism.sasl.jaas.config", saslJaasConfig1.value());
props.put("test-mechanism.sasl.jaas.config", saslJaasConfig2.value());
props.put("sasl.jaas.config", saslJaasConfig3.value());
props.put("listener.name.listener1.gssapi.sasl.kerberos.kinit.cmd", "/usr/bin/kinit2");
props.put("listener.name.listener1.gssapi.sasl.kerberos.service.name", "testkafka");
props.put("listener.name.listener1.gssapi.sasl.kerberos.min.time.before.relogin", "60000");
props.put("ssl.provider", "TEST");
TestSecurityConfig config = new TestSecurityConfig(props);
Map<String, Object> valuesWithPrefixOverride = config.valuesWithPrefixOverride(prefix);
// prefix with mechanism overrides global
assertTrue(config.unused().contains("listener.name.listener1.test-mechanism.sasl.jaas.config"));
assertTrue(config.unused().contains("test-mechanism.sasl.jaas.config"));
assertEquals(saslJaasConfig1, valuesWithPrefixOverride.get("test-mechanism.sasl.jaas.config"));
assertEquals(saslJaasConfig3, valuesWithPrefixOverride.get("sasl.jaas.config"));
assertFalse(config.unused().contains("listener.name.listener1.test-mechanism.sasl.jaas.config"));
assertFalse(config.unused().contains("test-mechanism.sasl.jaas.config"));
assertFalse(config.unused().contains("sasl.jaas.config"));
// prefix with mechanism overrides default
assertFalse(config.unused().contains("sasl.kerberos.kinit.cmd"));
assertTrue(config.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.kinit.cmd"));
assertFalse(config.unused().contains("gssapi.sasl.kerberos.kinit.cmd"));
assertFalse(config.unused().contains("sasl.kerberos.kinit.cmd"));
assertEquals("/usr/bin/kinit2", valuesWithPrefixOverride.get("gssapi.sasl.kerberos.kinit.cmd"));
assertFalse(config.unused().contains("listener.name.listener1.sasl.kerberos.kinit.cmd"));
// prefix override for mechanism with no default
assertFalse(config.unused().contains("sasl.kerberos.service.name"));
assertTrue(config.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
assertFalse(config.unused().contains("gssapi.sasl.kerberos.service.name"));
assertFalse(config.unused().contains("sasl.kerberos.service.name"));
assertEquals("testkafka", valuesWithPrefixOverride.get("gssapi.sasl.kerberos.service.name"));
assertFalse(config.unused().contains("listener.name.listener1.gssapi.sasl.kerberos.service.name"));
// unset with no default
assertTrue(config.unused().contains("ssl.provider"));
assertNull(valuesWithPrefixOverride.get("gssapi.ssl.provider"));
assertTrue(config.unused().contains("ssl.provider"));
}
@Test
public void testValuesWithPrefixAllOrNothing() {
String prefix1 = "prefix1.";

View File

@ -26,9 +26,10 @@ import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class SaslChannelBuilderTest {
@ -37,20 +38,20 @@ public class SaslChannelBuilderTest {
public void testCloseBeforeConfigureIsIdempotent() {
SaslChannelBuilder builder = createChannelBuilder(SecurityProtocol.SASL_PLAINTEXT);
builder.close();
assertNull(builder.loginManager());
assertTrue(builder.loginManagers().isEmpty());
builder.close();
assertNull(builder.loginManager());
assertTrue(builder.loginManagers().isEmpty());
}
@Test
public void testCloseAfterConfigIsIdempotent() {
SaslChannelBuilder builder = createChannelBuilder(SecurityProtocol.SASL_PLAINTEXT);
builder.configure(new HashMap<String, Object>());
assertNotNull(builder.loginManager());
assertNotNull(builder.loginManagers().get("PLAIN"));
builder.close();
assertNull(builder.loginManager());
assertTrue(builder.loginManagers().isEmpty());
builder.close();
assertNull(builder.loginManager());
assertTrue(builder.loginManagers().isEmpty());
}
@Test
@ -61,17 +62,18 @@ public class SaslChannelBuilderTest {
builder.configure(Collections.singletonMap(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, "1"));
fail("Exception should have been thrown");
} catch (KafkaException e) {
assertNull(builder.loginManager());
assertTrue(builder.loginManagers().isEmpty());
}
builder.close();
assertNull(builder.loginManager());
assertTrue(builder.loginManagers().isEmpty());
}
private SaslChannelBuilder createChannelBuilder(SecurityProtocol securityProtocol) {
TestJaasConfig jaasConfig = new TestJaasConfig();
jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap<String, Object>());
JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig);
return new SaslChannelBuilder(Mode.CLIENT, jaasContext, securityProtocol, new ListenerName("PLAIN"),
Map<String, JaasContext> jaasContexts = Collections.singletonMap("PLAIN", jaasContext);
return new SaslChannelBuilder(Mode.CLIENT, jaasContexts, securityProtocol, new ListenerName("PLAIN"),
false, "PLAIN", true, null, null);
}

View File

@ -188,8 +188,8 @@ public class JaasContextTest {
"KafkaServer { test.LoginModuleDefault required; };",
"plaintext.KafkaServer { test.LoginModuleOverride requisite; };"
));
JaasContext context = JaasContext.load(JaasContext.Type.SERVER, new ListenerName("plaintext"),
Collections.<String, Object>emptyMap());
JaasContext context = JaasContext.loadServerContext(new ListenerName("plaintext"),
"SOME-MECHANISM", Collections.<String, Object>emptyMap());
assertEquals("plaintext.KafkaServer", context.name());
assertEquals(JaasContext.Type.SERVER, context.type());
assertEquals(1, context.configurationEntries().size());
@ -203,8 +203,8 @@ public class JaasContextTest {
"KafkaServer { test.LoginModule required; };",
"other.KafkaServer { test.LoginModuleOther requisite; };"
));
JaasContext context = JaasContext.load(JaasContext.Type.SERVER, new ListenerName("plaintext"),
Collections.<String, Object>emptyMap());
JaasContext context = JaasContext.loadServerContext(new ListenerName("plaintext"),
"SOME-MECHANISM", Collections.<String, Object>emptyMap());
assertEquals("KafkaServer", context.name());
assertEquals(JaasContext.Type.SERVER, context.type());
assertEquals(1, context.configurationEntries().size());
@ -215,24 +215,13 @@ public class JaasContextTest {
@Test(expected = IllegalArgumentException.class)
public void testLoadForServerWithWrongListenerName() throws IOException {
writeConfiguration("Server", "test.LoginModule required;");
JaasContext.load(JaasContext.Type.SERVER, new ListenerName("plaintext"),
Collections.<String, Object>emptyMap());
}
/**
* ListenerName can only be used with Type.SERVER.
*/
@Test(expected = IllegalArgumentException.class)
public void testLoadForClientWithListenerName() {
JaasContext.load(JaasContext.Type.CLIENT, new ListenerName("foo"),
JaasContext.loadServerContext(new ListenerName("plaintext"), "SOME-MECHANISM",
Collections.<String, Object>emptyMap());
}
private AppConfigurationEntry configurationEntry(JaasContext.Type contextType, String jaasConfigProp) {
Map<String, Object> configs = new HashMap<>();
if (jaasConfigProp != null)
configs.put(SaslConfigs.SASL_JAAS_CONFIG, new Password(jaasConfigProp));
JaasContext context = JaasContext.load(contextType, null, contextType.name(), configs);
Password saslJaasConfig = jaasConfigProp == null ? null : new Password(jaasConfigProp);
JaasContext context = JaasContext.load(contextType, null, contextType.name(), saslJaasConfig);
List<AppConfigurationEntry> entries = context.configurationEntries();
assertEquals(1, entries.size());
return entries.get(0);

View File

@ -715,7 +715,7 @@ public class SaslAuthenticatorTest {
* property override is used during authentication.
*/
@Test
public void testDynamicJaasConfiguration() throws Exception {
public void testClientDynamicJaasConfiguration() throws Exception {
SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
saslServerConfigs.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Arrays.asList("PLAIN"));
@ -756,6 +756,37 @@ public class SaslAuthenticatorTest {
}
}
/**
* Tests dynamic JAAS configuration property for SASL server. Invalid server credentials
* are set in the static JVM-wide configuration instance to ensure that the dynamic
* property override is used during authentication.
*/
@Test
public void testServerDynamicJaasConfiguration() throws Exception {
SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
saslClientConfigs.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
saslServerConfigs.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, Arrays.asList("PLAIN"));
Map<String, Object> serverOptions = new HashMap<>();
serverOptions.put("user_user1", "user1-secret");
serverOptions.put("user_user2", "user2-secret");
saslServerConfigs.put("listener.name.sasl_ssl.plain." + SaslConfigs.SASL_JAAS_CONFIG,
TestJaasConfig.jaasConfigProperty("PLAIN", serverOptions));
TestJaasConfig staticJaasConfig = new TestJaasConfig();
staticJaasConfig.createOrUpdateEntry(TestJaasConfig.LOGIN_CONTEXT_SERVER, PlainLoginModule.class.getName(),
Collections.<String, Object>emptyMap());
staticJaasConfig.setClientOptions("PLAIN", "user1", "user1-secret");
Configuration.setConfiguration(staticJaasConfig);
server = createEchoServer(securityProtocol);
// Check that 'user1' can connect with static Jaas config
createAndCheckClientConnection(securityProtocol, "1");
// Check that user 'user2' can also connect with a Jaas config override
saslClientConfigs.put(SaslConfigs.SASL_JAAS_CONFIG,
TestJaasConfig.jaasConfigProperty("PLAIN", "user2", "user2-secret"));
createAndCheckClientConnection(securityProtocol, "2");
}
@Test
public void testJaasConfigurationForListener() throws Exception {
SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
@ -986,18 +1017,19 @@ public class SaslAuthenticatorTest {
throws Exception {
final ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
final Map<String, ?> configs = Collections.emptyMap();
final JaasContext jaasContext = JaasContext.load(JaasContext.Type.SERVER, listenerName, configs);
final JaasContext jaasContext = JaasContext.loadServerContext(listenerName, saslMechanism, configs);
final Map<String, JaasContext> jaasContexts = Collections.singletonMap(saslMechanism, jaasContext);
boolean isScram = ScramMechanism.isScram(saslMechanism);
if (isScram)
ScramCredentialUtils.createCache(credentialCache, Arrays.asList(saslMechanism));
SaslChannelBuilder serverChannelBuilder = new SaslChannelBuilder(Mode.SERVER, jaasContext,
SaslChannelBuilder serverChannelBuilder = new SaslChannelBuilder(Mode.SERVER, jaasContexts,
securityProtocol, listenerName, false, saslMechanism, true, credentialCache, null) {
@Override
protected SaslServerAuthenticator buildServerAuthenticator(Map<String, ?> configs, String id,
TransportLayer transportLayer, Subject subject) throws IOException {
return new SaslServerAuthenticator(configs, id, jaasContext, subject, null,
TransportLayer transportLayer, Map<String, Subject> subjects) throws IOException {
return new SaslServerAuthenticator(configs, id, jaasContexts, subjects, null,
credentialCache, listenerName, securityProtocol, transportLayer, null) {
@Override
@ -1032,8 +1064,10 @@ public class SaslAuthenticatorTest {
final ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
final Map<String, ?> configs = Collections.emptyMap();
final JaasContext jaasContext = JaasContext.load(JaasContext.Type.CLIENT, null, configs);
SaslChannelBuilder clientChannelBuilder = new SaslChannelBuilder(Mode.CLIENT, jaasContext,
final JaasContext jaasContext = JaasContext.loadClientContext(configs);
final Map<String, JaasContext> jaasContexts = Collections.singletonMap(saslMechanism, jaasContext);
SaslChannelBuilder clientChannelBuilder = new SaslChannelBuilder(Mode.CLIENT, jaasContexts,
securityProtocol, listenerName, false, saslMechanism, true, null, null) {
@Override

View File

@ -51,7 +51,7 @@ public class SaslServerAuthenticatorTest {
TransportLayer transportLayer = EasyMock.mock(TransportLayer.class);
Map<String, ?> configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
Collections.singletonList(SCRAM_SHA_256.mechanismName()));
SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer);
SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer, SCRAM_SHA_256.mechanismName());
final Capture<ByteBuffer> size = EasyMock.newCapture();
EasyMock.expect(transportLayer.read(EasyMock.capture(size))).andAnswer(new IAnswer<Integer>() {
@ -72,7 +72,7 @@ public class SaslServerAuthenticatorTest {
TransportLayer transportLayer = EasyMock.mock(TransportLayer.class);
Map<String, ?> configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
Collections.singletonList(SCRAM_SHA_256.mechanismName()));
SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer);
SaslServerAuthenticator authenticator = setupAuthenticator(configs, transportLayer, SCRAM_SHA_256.mechanismName());
final RequestHeader header = new RequestHeader(ApiKeys.METADATA, (short) 0, "clientId", 13243);
final Struct headerStruct = header.toStruct();
@ -106,12 +106,13 @@ public class SaslServerAuthenticatorTest {
}
}
private SaslServerAuthenticator setupAuthenticator(Map<String, ?> configs, TransportLayer transportLayer) throws IOException {
private SaslServerAuthenticator setupAuthenticator(Map<String, ?> configs, TransportLayer transportLayer, String mechanism) throws IOException {
TestJaasConfig jaasConfig = new TestJaasConfig();
jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap<String, Object>());
JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig);
Subject subject = new Subject();
return new SaslServerAuthenticator(configs, "node", jaasContext, subject, null, new CredentialCache(),
Map<String, JaasContext> jaasContexts = Collections.singletonMap(mechanism,
new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig));
Map<String, Subject> subjects = Collections.singletonMap(mechanism, new Subject());
return new SaslServerAuthenticator(configs, "node", jaasContexts, subjects, null, new CredentialCache(),
new ListenerName("ssl"), SecurityProtocol.SASL_SSL, transportLayer, new DelegationTokenCache(ScramMechanism.mechanismNames()));
}

View File

@ -54,6 +54,20 @@ public class TestJaasConfig extends Configuration {
return new Password(loginModule(mechanism) + " required username=" + username + " password=" + password + ";");
}
public static Password jaasConfigProperty(String mechanism, Map<String, Object> options) {
StringBuilder builder = new StringBuilder();
builder.append(loginModule(mechanism));
builder.append(" required");
for (Map.Entry<String, Object> option : options.entrySet()) {
builder.append(' ');
builder.append(option.getKey());
builder.append('=');
builder.append(option.getValue());
}
builder.append(';');
return new Password(builder.toString());
}
public void setClientOptions(String saslMechanism, String clientUsername, String clientPassword) {
Map<String, Object> options = new HashMap<>();
if (clientUsername != null)

View File

@ -54,11 +54,13 @@ object KafkaController extends Logging {
}
class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics, brokerInfo: BrokerInfo,
class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics, initialBrokerInfo: BrokerInfo,
tokenManager: DelegationTokenManager, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
this.logIdent = s"[Controller id=${config.brokerId}] "
@volatile private var brokerInfo = initialBrokerInfo
private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None)
val controllerContext = new ControllerContext
@ -77,6 +79,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
private val controllerChangeHandler = new ControllerChangeHandler(this, eventManager)
private val brokerChangeHandler = new BrokerChangeHandler(this, eventManager)
private val brokerModificationsHandlers: mutable.Map[Int, BrokerModificationsHandler] = mutable.Map.empty
private val topicChangeHandler = new TopicChangeHandler(this, eventManager)
private val topicDeletionHandler = new TopicDeletionHandler(this, eventManager)
private val partitionModificationsHandlers: mutable.Map[String, PartitionModificationsHandler] = mutable.Map.empty
@ -274,6 +277,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
zkClient.unregisterZNodeChangeHandler(partitionReassignmentHandler.path)
zkClient.unregisterZNodeChangeHandler(preferredReplicaElectionHandler.path)
zkClient.unregisterZNodeChildChangeHandler(logDirEventNotificationHandler.path)
unregisterBrokerModificationsHandler(brokerModificationsHandlers.keySet)
// reset topic deletion manager
topicDeletionManager.reset()
@ -360,6 +364,23 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
s"${newBrokers.mkString(",")}. Signaling restart of topic deletion for these topics")
topicDeletionManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic))
}
registerBrokerModificationsHandler(newBrokers)
}
private def registerBrokerModificationsHandler(brokerIds: Iterable[Int]): Unit = {
debug(s"Register BrokerModifications handler for $brokerIds")
brokerIds.foreach { brokerId =>
val brokerModificationsHandler = new BrokerModificationsHandler(this, eventManager, brokerId)
zkClient.registerZNodeChangeHandler(brokerModificationsHandler)
brokerModificationsHandlers.put(brokerId, brokerModificationsHandler)
}
}
private def unregisterBrokerModificationsHandler(brokerIds: Iterable[Int]): Unit = {
debug(s"Unregister BrokerModifications handler for $brokerIds")
brokerIds.foreach { brokerId =>
brokerModificationsHandlers.remove(brokerId).foreach(handler => zkClient.unregisterZNodeChangeHandler(handler.path))
}
}
/*
@ -374,6 +395,13 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
info(s"Removed $deadBrokersThatWereShuttingDown from list of shutting down brokers.")
val allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokers.toSet)
onReplicasBecomeOffline(allReplicasOnDeadBrokers)
unregisterBrokerModificationsHandler(deadBrokers)
}
private def onBrokerUpdate(updatedBrokers: Seq[Int]) {
info(s"Broker info update callback for ${updatedBrokers.mkString(",")}")
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
}
/**
@ -613,6 +641,8 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
controllerContext.partitionReplicaAssignment = mutable.Map.empty ++ zkClient.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet)
controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicPartition, LeaderIsrAndControllerEpoch]
controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
// register broker modifications handlers
registerBrokerModificationsHandler(controllerContext.liveBrokers.map(_.id))
// update the leader and isr cache for all existing partitions from Zookeeper
updateLeaderAndIsrCache()
// start the channel manager
@ -1209,6 +1239,29 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
}
}
case object BrokerModifications extends ControllerEvent {
override def state: ControllerState = ControllerState.BrokerChange
override def process(): Unit = {
if (!isActive) return
val curBrokers = zkClient.getAllBrokersInCluster.toSet
val updatedBrokers = controllerContext.liveBrokers.filter { broker =>
val existingBroker = curBrokers.find(_.id == broker.id)
existingBroker match {
case Some(b) => broker.endPoints != b.endPoints
case None => false
}
}
if (updatedBrokers.nonEmpty) {
val updatedBrokerIdsSorted = updatedBrokers.map(_.id).toSeq.sorted
info(s"Updated brokers: $updatedBrokers")
controllerContext.liveBrokers = curBrokers // Update broker metadata
onBrokerUpdate(updatedBrokerIdsSorted)
}
}
}
case object TopicChange extends ControllerEvent {
override def state: ControllerState = ControllerState.TopicChange
@ -1458,7 +1511,17 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
class BrokerChangeHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
override val path: String = BrokerIdsZNode.path
override def handleChildChange(): Unit = eventManager.put(controller.BrokerChange)
override def handleChildChange(): Unit = {
eventManager.put(controller.BrokerChange)
}
}
class BrokerModificationsHandler(controller: KafkaController, eventManager: ControllerEventManager, brokerId: Int) extends ZNodeChangeHandler {
override val path: String = BrokerIdZNode.path(brokerId)
override def handleDataChange(): Unit = {
eventManager.put(controller.BrokerModifications)
}
}
class TopicChangeHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {

View File

@ -54,7 +54,6 @@ import scala.util.control.ControlThrowable
*/
class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup {
private val endpoints = config.listeners.map(l => l.listenerName -> l).toMap
private val maxQueuedRequests = config.queuedMaxRequests
private val maxConnectionsPerIp = config.maxConnectionsPerIp
@ -72,7 +71,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
private val processors = new ConcurrentHashMap[Int, Processor]()
private var nextProcessorId = 0
private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()
private[network] val acceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
private var connectionQuotas: ConnectionQuotas = _
private var stoppedProcessingRequests = false
@ -82,7 +81,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
def startup() {
this.synchronized {
connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
createProcessors(config.numNetworkThreads)
createProcessors(config.numNetworkThreads, config.listeners)
}
newGauge("NetworkProcessorAvgIdlePercent",
@ -111,14 +110,17 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
info("Started " + acceptors.size + " acceptor threads")
}
private def createProcessors(newProcessorsPerListener: Int): Unit = synchronized {
private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
private def createProcessors(newProcessorsPerListener: Int,
endpoints: Seq[EndPoint]): Unit = synchronized {
val sendBufferSize = config.socketSendBufferBytes
val recvBufferSize = config.socketReceiveBufferBytes
val brokerId = config.brokerId
val numProcessorThreads = config.numNetworkThreads
config.listeners.foreach { endpoint =>
endpoints.foreach { endpoint =>
val listenerName = endpoint.listenerName
val securityProtocol = endpoint.securityProtocol
val listenerProcessors = new ArrayBuffer[Processor]()
@ -130,12 +132,10 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
}
listenerProcessors.foreach(p => processors.put(p.id, p))
val acceptor = acceptors.getOrElseUpdate(endpoint, {
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas)
KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()
acceptor.awaitStartup()
acceptor
})
acceptors.put(endpoint, acceptor)
acceptor.addProcessors(listenerProcessors)
}
}
@ -153,7 +153,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
def stopProcessingRequests() = {
info("Stopping socket server request processors")
this.synchronized {
acceptors.values.foreach(_.shutdown)
acceptors.asScala.values.foreach(_.shutdown)
processors.asScala.values.foreach(_.shutdown)
requestChannel.clear()
stoppedProcessingRequests = true
@ -161,12 +161,12 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
info("Stopped socket server request processors")
}
def resizeThreadPool(oldNumNetworkThreads: Int, newNumNetworkThreads: Int): Unit = {
def resizeThreadPool(oldNumNetworkThreads: Int, newNumNetworkThreads: Int): Unit = synchronized {
info(s"Resizing network thread pool size for each listener from $oldNumNetworkThreads to $newNumNetworkThreads")
if (newNumNetworkThreads > oldNumNetworkThreads)
createProcessors(newNumNetworkThreads - oldNumNetworkThreads)
createProcessors(newNumNetworkThreads - oldNumNetworkThreads, config.listeners)
else if (newNumNetworkThreads < oldNumNetworkThreads)
acceptors.values.foreach(_.removeProcessors(oldNumNetworkThreads - newNumNetworkThreads, requestChannel))
acceptors.asScala.values.foreach(_.removeProcessors(oldNumNetworkThreads - newNumNetworkThreads, requestChannel))
}
/**
@ -185,9 +185,22 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
def boundPort(listenerName: ListenerName): Int = {
try {
acceptors(endpoints(listenerName)).serverChannel.socket.getLocalPort
acceptors.get(endpoints(listenerName)).serverChannel.socket.getLocalPort
} catch {
case e: Exception => throw new KafkaException("Tried to check server's port before server was started or checked for port of non-existing protocol", e)
case e: Exception =>
throw new KafkaException("Tried to check server's port before server was started or checked for port of non-existing protocol", e)
}
}
def addListeners(listenersAdded: Seq[EndPoint]): Unit = synchronized {
info(s"Adding listeners for endpoints $listenersAdded")
createProcessors(config.numNetworkThreads, listenersAdded)
}
def removeListeners(listenersRemoved: Seq[EndPoint]): Unit = synchronized {
info(s"Removing listeners for endpoints $listenersRemoved")
listenersRemoved.foreach { endpoint =>
acceptors.asScala.remove(endpoint).foreach(_.shutdown())
}
}
@ -239,7 +252,7 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ
* Initiates a graceful shutdown by signaling to stop and waiting for the shutdown to complete
*/
def shutdown(): Unit = {
alive.set(false)
if (alive.getAndSet(false))
wakeup()
shutdownLatch.await()
}
@ -312,6 +325,11 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
toRemove.foreach(processor => requestChannel.removeProcessor(processor.id))
}
override def shutdown(): Unit = {
super.shutdown()
processors.foreach(_.shutdown())
}
/**
* Accept loop that checks for new connection attempts
*/

View File

@ -17,7 +17,7 @@
package kafka.security
import java.util.{List, Properties}
import java.util.{Collection, Properties}
import org.apache.kafka.common.security.authenticator.CredentialCache
import org.apache.kafka.common.security.scram.{ScramCredential, ScramCredentialUtils, ScramMechanism}
@ -25,10 +25,10 @@ import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.common.config.ConfigDef._
import org.apache.kafka.common.security.token.delegation.DelegationTokenCache
class CredentialProvider(saslEnabledMechanisms: List[String], val tokenCache: DelegationTokenCache) {
class CredentialProvider(scramMechanisms: Collection[String], val tokenCache: DelegationTokenCache) {
val credentialCache = new CredentialCache
ScramCredentialUtils.createCache(credentialCache, saslEnabledMechanisms)
ScramCredentialUtils.createCache(credentialCache, scramMechanisms)
def updateCredentials(username: String, config: Properties) {
for (mechanism <- ScramMechanism.values()) {

View File

@ -17,20 +17,22 @@
package kafka.server
import java.nio.charset.StandardCharsets
import java.util
import java.util.{Collections, Properties}
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.cluster.EndPoint
import kafka.log.{LogCleaner, LogConfig, LogManager}
import kafka.server.DynamicBrokerConfig._
import kafka.utils.{CoreUtils, Logging}
import kafka.utils.{CoreUtils, Logging, PasswordEncoder}
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.Reconfigurable
import org.apache.kafka.common.config.{ConfigDef, ConfigException, SslConfigs}
import org.apache.kafka.common.network.ListenerReconfigurable
import org.apache.kafka.common.metrics.MetricsReporter
import org.apache.kafka.common.utils.{Base64, Utils}
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
import org.apache.kafka.common.security.authenticator.LoginManager
import org.apache.kafka.common.utils.Utils
import scala.collection._
import scala.collection.JavaConverters._
@ -71,11 +73,7 @@ import scala.collection.JavaConverters._
*/
object DynamicBrokerConfig {
private val DynamicPasswordConfigs = Set(
SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
SslConfigs.SSL_KEY_PASSWORD_CONFIG
)
private val DynamicSecurityConfigs = SslConfigs.RECONFIGURABLE_CONFIGS.asScala
private[server] val DynamicSecurityConfigs = SslConfigs.RECONFIGURABLE_CONFIGS.asScala
val AllDynamicConfigs = mutable.Set[String]()
AllDynamicConfigs ++= DynamicSecurityConfigs
@ -83,11 +81,17 @@ object DynamicBrokerConfig {
AllDynamicConfigs ++= DynamicLogConfig.ReconfigurableConfigs
AllDynamicConfigs ++= DynamicThreadPool.ReconfigurableConfigs
AllDynamicConfigs ++= Set(KafkaConfig.MetricReporterClassesProp)
AllDynamicConfigs ++= DynamicListenerConfig.ReconfigurableConfigs
private val PerBrokerConfigs = DynamicSecurityConfigs
private val PerBrokerConfigs = DynamicSecurityConfigs ++
DynamicListenerConfig.ReconfigurableConfigs
val ListenerConfigRegex = """listener\.name\.[^.]*\.(.*)""".r
private[server] val DynamicPasswordConfigs = {
val passwordConfigs = KafkaConfig.configKeys.filter(_._2.`type` == ConfigDef.Type.PASSWORD).keySet
AllDynamicConfigs.intersect(passwordConfigs)
}
def brokerConfigSynonyms(name: String, matchListenerOverride: Boolean): List[String] = {
name match {
@ -123,11 +127,14 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()
private val lock = new ReentrantReadWriteLock
private var currentConfig = kafkaConfig
private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
private[server] def initialize(zkClient: KafkaZkClient): Unit = {
val adminZkClient = new AdminZkClient(zkClient)
updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.Broker, ConfigEntityName.Default))
updateBrokerConfig(kafkaConfig.brokerId, adminZkClient.fetchEntityConfig(ConfigType.Broker, kafkaConfig.brokerId.toString))
val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, kafkaConfig.brokerId.toString)
val brokerConfig = maybeReEncodePasswords(props, adminZkClient)
updateBrokerConfig(kafkaConfig.brokerId, brokerConfig)
}
def addReconfigurables(kafkaServer: KafkaServer): Unit = {
@ -136,6 +143,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
addBrokerReconfigurable(kafkaServer.logManager.cleaner)
addReconfigurable(new DynamicLogConfig(kafkaServer.logManager))
addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer))
addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
}
def addReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) {
@ -187,22 +195,37 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
}
}
private def maybeCreatePasswordEncoder(secret: Option[Password]): Option[PasswordEncoder] = {
secret.map { secret =>
new PasswordEncoder(secret,
kafkaConfig.passwordEncoderKeyFactoryAlgorithm,
kafkaConfig.passwordEncoderCipherAlgorithm,
kafkaConfig.passwordEncoderKeyLength,
kafkaConfig.passwordEncoderIterations)
}
}
private def passwordEncoder: PasswordEncoder = {
dynamicConfigPasswordEncoder.getOrElse(throw new ConfigException("Password encoder secret not configured"))
}
private[server] def toPersistentProps(configProps: Properties, perBrokerConfig: Boolean): Properties = {
val props = configProps.clone().asInstanceOf[Properties]
// TODO (KAFKA-6246): encrypt passwords
def encodePassword(configName: String): Unit = {
val value = props.getProperty(configName)
if (value != null) {
if (!perBrokerConfig)
throw new ConfigException("Password config can be defined only at broker level")
props.setProperty(configName, Base64.encoder.encodeToString(value.getBytes(StandardCharsets.UTF_8)))
props.setProperty(configName, passwordEncoder.encode(new Password(value)))
}
}
DynamicPasswordConfigs.foreach(encodePassword)
props
}
private[server] def fromPersistentProps(persistentProps: Properties, perBrokerConfig: Boolean): Properties = {
private[server] def fromPersistentProps(persistentProps: Properties,
perBrokerConfig: Boolean): Properties = {
val props = persistentProps.clone().asInstanceOf[Properties]
// Remove all invalid configs from `props`
@ -219,17 +242,50 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
if (!perBrokerConfig)
removeInvalidProps(perBrokerConfigs(props), "Per-broker configs defined at default cluster level will be ignored")
// TODO (KAFKA-6246): encrypt passwords
def decodePassword(configName: String): Unit = {
val value = props.getProperty(configName)
if (value != null) {
props.setProperty(configName, new String(Base64.decoder.decode(value), StandardCharsets.UTF_8))
try {
props.setProperty(configName, passwordEncoder.decode(value).value)
} catch {
case e: Exception =>
error(s"Dynamic password config $configName could not be decoded, ignoring.", e)
props.remove(configName)
}
}
}
DynamicPasswordConfigs.foreach(decodePassword)
props
}
// If the secret has changed, password.encoder.old.secret contains the old secret that was used
// to encode the configs in ZK. Decode passwords using the old secret and update ZK with values
// encoded using the current secret. Ignore any errors during decoding since old secret may not
// have been removed during broker restart.
private def maybeReEncodePasswords(persistentProps: Properties, adminZkClient: AdminZkClient): Properties = {
val props = persistentProps.clone().asInstanceOf[Properties]
if (!props.asScala.keySet.exists(DynamicPasswordConfigs.contains)) {
maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderOldSecret).foreach { passwordDecoder =>
DynamicPasswordConfigs.foreach { configName =>
val value = props.getProperty(configName)
if (value != null) {
val decoded = try {
Some(passwordDecoder.decode(value).value)
} catch {
case _: Exception =>
debug(s"Dynamic password config $configName could not be decoded using old secret, new secret will be used.")
None
}
decoded.foreach { value => props.put(configName, passwordEncoder.encode(new Password(value))) }
}
}
adminZkClient.changeBrokerConfig(Seq(kafkaConfig.brokerId), props)
}
}
props
}
private[server] def validate(props: Properties, perBrokerConfig: Boolean): Unit = CoreUtils.inReadLock(lock) {
def checkInvalidProps(invalidPropNames: Set[String], errorMessage: String): Unit = {
if (invalidPropNames.nonEmpty)
@ -331,14 +387,18 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
newProps ++= staticBrokerConfigs
overrideProps(newProps, dynamicDefaultConfigs)
overrideProps(newProps, dynamicBrokerConfigs)
val newConfig = processReconfiguration(newProps, validateOnly = false)
val oldConfig = currentConfig
val (newConfig, brokerReconfigurablesToUpdate) = processReconfiguration(newProps, validateOnly = false)
if (newConfig ne currentConfig) {
currentConfig = newConfig
kafkaConfig.updateCurrentConfig(currentConfig)
kafkaConfig.updateCurrentConfig(newConfig)
// Process BrokerReconfigurable updates after current config is updated
brokerReconfigurablesToUpdate.foreach(_.reconfigure(oldConfig, newConfig))
}
}
private def processReconfiguration(newProps: Map[String, String], validateOnly: Boolean): KafkaConfig = {
private def processReconfiguration(newProps: Map[String, String], validateOnly: Boolean): (KafkaConfig, List[BrokerReconfigurable]) = {
val newConfig = new KafkaConfig(newProps.asJava, !validateOnly, None)
val updatedMap = updatedConfigs(newConfig.originalsFromThisConfig, currentConfig.originals)
if (updatedMap.nonEmpty) {
@ -352,16 +412,22 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
val newValues = newConfig.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix)
val updatedKeys = updatedConfigs(newValues, oldValues).keySet
if (needsReconfiguration(listenerReconfigurable.reconfigurableConfigs, updatedKeys))
processReconfigurable(listenerReconfigurable, newValues, customConfigs, validateOnly)
processReconfigurable(listenerReconfigurable, updatedKeys, newValues, customConfigs, validateOnly)
case reconfigurable =>
if (needsReconfiguration(reconfigurable.reconfigurableConfigs, updatedMap.keySet))
processReconfigurable(reconfigurable, newConfig.valuesFromThisConfig, customConfigs, validateOnly)
processReconfigurable(reconfigurable, updatedMap.keySet, newConfig.valuesFromThisConfig, customConfigs, validateOnly)
}
// BrokerReconfigurable updates are processed after config is updated. Only do the validation here.
val brokerReconfigurablesToUpdate = mutable.Buffer[BrokerReconfigurable]()
brokerReconfigurables.foreach { reconfigurable =>
if (needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, updatedMap.keySet))
processBrokerReconfigurable(reconfigurable, currentConfig, newConfig, validateOnly)
if (needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, updatedMap.keySet)) {
reconfigurable.validateReconfiguration(newConfig)
if (!validateOnly)
brokerReconfigurablesToUpdate += reconfigurable
}
newConfig
}
(newConfig, brokerReconfigurablesToUpdate.toList)
} catch {
case e: Exception =>
if (!validateOnly)
@ -370,7 +436,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
}
}
else
currentConfig
(currentConfig, List.empty)
}
private def needsReconfiguration(reconfigurableConfigs: util.Set[String], updatedKeys: Set[String]): Boolean = {
@ -378,27 +444,25 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
}
private def processReconfigurable(reconfigurable: Reconfigurable,
updatedConfigNames: Set[String],
allNewConfigs: util.Map[String, _],
newCustomConfigs: util.Map[String, Object],
validateOnly: Boolean): Unit = {
val newConfigs = new util.HashMap[String, Object]
allNewConfigs.asScala.foreach { case (k, v) => newConfigs.put(k, v.asInstanceOf[AnyRef]) }
newConfigs.putAll(newCustomConfigs)
if (validateOnly) {
if (!reconfigurable.validateReconfiguration(newConfigs))
throw new ConfigException("Validation of dynamic config update failed")
} else
reconfigurable.reconfigure(newConfigs)
try {
reconfigurable.validateReconfiguration(newConfigs)
} catch {
case e: ConfigException => throw e
case _: Exception =>
throw new ConfigException(s"Validation of dynamic config update of $updatedConfigNames failed with class ${reconfigurable.getClass}")
}
private def processBrokerReconfigurable(reconfigurable: BrokerReconfigurable,
oldConfig: KafkaConfig,
newConfig: KafkaConfig,
validateOnly: Boolean): Unit = {
if (validateOnly)
reconfigurable.validateReconfiguration(newConfig)
else
reconfigurable.reconfigure(oldConfig, newConfig)
if (!validateOnly) {
info(s"Reconfiguring $reconfigurable, updated configs: $updatedConfigNames custom configs: $newCustomConfigs")
reconfigurable.reconfigure(newConfigs)
}
}
}
@ -427,11 +491,10 @@ class DynamicLogConfig(logManager: LogManager) extends Reconfigurable with Loggi
DynamicLogConfig.ReconfigurableConfigs.asJava
}
override def validateReconfiguration(configs: util.Map[String, _]): Boolean = {
override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
// For update of topic config overrides, only config names and types are validated
// Names and types have already been validated. For consistency with topic config
// validation, no additional validation is performed.
true
}
override def reconfigure(configs: util.Map[String, _]): Unit = {
@ -538,7 +601,7 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaServer) extends Reconf
configs
}
override def validateReconfiguration(configs: util.Map[String, _]): Boolean = {
override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
val updatedMetricsReporters = metricsReporterClasses(configs)
// Ensure all the reporter classes can be loaded and have a default constructor
@ -548,10 +611,11 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaServer) extends Reconf
}
// Validate the new configuration using every reconfigurable reporter instance that is not being deleted
currentReporters.values.forall {
currentReporters.values.foreach {
case reporter: Reconfigurable =>
!updatedMetricsReporters.contains(reporter.getClass.getName) || reporter.validateReconfiguration(configs)
case _ => true
if (updatedMetricsReporters.contains(reporter.getClass.getName))
reporter.validateReconfiguration(configs)
case _ =>
}
}
@ -588,3 +652,103 @@ class DynamicMetricsReporters(brokerId: Int, server: KafkaServer) extends Reconf
configs.get(KafkaConfig.MetricReporterClassesProp).asInstanceOf[util.List[String]].asScala
}
}
object DynamicListenerConfig {
val ReconfigurableConfigs = Set(
// Listener configs
KafkaConfig.AdvertisedListenersProp,
KafkaConfig.ListenersProp,
KafkaConfig.ListenerSecurityProtocolMapProp,
// SSL configs
KafkaConfig.PrincipalBuilderClassProp,
KafkaConfig.SslProtocolProp,
KafkaConfig.SslProviderProp,
KafkaConfig.SslCipherSuitesProp,
KafkaConfig.SslEnabledProtocolsProp,
KafkaConfig.SslKeystoreTypeProp,
KafkaConfig.SslKeystoreLocationProp,
KafkaConfig.SslKeystorePasswordProp,
KafkaConfig.SslKeyPasswordProp,
KafkaConfig.SslTruststoreTypeProp,
KafkaConfig.SslTruststoreLocationProp,
KafkaConfig.SslTruststorePasswordProp,
KafkaConfig.SslKeyManagerAlgorithmProp,
KafkaConfig.SslTrustManagerAlgorithmProp,
KafkaConfig.SslEndpointIdentificationAlgorithmProp,
KafkaConfig.SslSecureRandomImplementationProp,
KafkaConfig.SslClientAuthProp,
// SASL configs
KafkaConfig.SaslMechanismInterBrokerProtocolProp,
KafkaConfig.SaslJaasConfigProp,
KafkaConfig.SaslEnabledMechanismsProp,
KafkaConfig.SaslKerberosServiceNameProp,
KafkaConfig.SaslKerberosKinitCmdProp,
KafkaConfig.SaslKerberosTicketRenewWindowFactorProp,
KafkaConfig.SaslKerberosTicketRenewJitterProp,
KafkaConfig.SaslKerberosMinTimeBeforeReloginProp,
KafkaConfig.SaslKerberosPrincipalToLocalRulesProp
)
}
class DynamicListenerConfig(server: KafkaServer) extends BrokerReconfigurable with Logging {
override def reconfigurableConfigs: Set[String] = {
DynamicListenerConfig.ReconfigurableConfigs
}
def validateReconfiguration(newConfig: KafkaConfig): Unit = {
def immutableListenerConfigs(kafkaConfig: KafkaConfig, prefix: String): Map[String, AnyRef] = {
newConfig.originals.asScala
.filterKeys(_.startsWith(prefix))
.filterKeys(k => !DynamicSecurityConfigs.contains(k))
}
val oldConfig = server.config
val newListeners = listenersToMap(newConfig.listeners)
val newAdvertisedListeners = listenersToMap(newConfig.advertisedListeners)
val oldListeners = listenersToMap(oldConfig.listeners)
if (!newAdvertisedListeners.keySet.subsetOf(newListeners.keySet))
throw new ConfigException(s"Advertised listeners '$newAdvertisedListeners' must be a subset of listeners '$newListeners'")
if (newListeners.keySet != newConfig.listenerSecurityProtocolMap.keySet)
throw new ConfigException(s"Listeners '$newListeners' and listener map '${newConfig.listenerSecurityProtocolMap}' don't match")
newListeners.keySet.intersect(oldListeners.keySet).foreach { listenerName =>
val prefix = listenerName.configPrefix
val newListenerProps = immutableListenerConfigs(newConfig, prefix)
val oldListenerProps = immutableListenerConfigs(oldConfig, prefix)
if (newListenerProps != oldListenerProps)
throw new ConfigException(s"Configs cannot be updated dynamically for existing listener $listenerName, " +
"restart broker or create a new listener for update")
if (oldConfig.listenerSecurityProtocolMap(listenerName) != newConfig.listenerSecurityProtocolMap(listenerName))
throw new ConfigException(s"Security protocol cannot be updated for existing listener $listenerName")
}
if (!newAdvertisedListeners.contains(newConfig.interBrokerListenerName))
throw new ConfigException(s"Advertised listener must be specified for inter-broker listener ${newConfig.interBrokerListenerName}")
}
def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
val newListeners = newConfig.listeners
val newListenerMap = listenersToMap(newListeners)
val oldListeners = oldConfig.listeners
val oldListenerMap = listenersToMap(oldListeners)
val listenersRemoved = oldListeners.filterNot(e => newListenerMap.contains(e.listenerName))
val listenersAdded = newListeners.filterNot(e => oldListenerMap.contains(e.listenerName))
// Clear SASL login cache to force re-login
if (listenersAdded.nonEmpty || listenersRemoved.nonEmpty)
LoginManager.closeAll()
server.socketServer.removeListeners(listenersRemoved)
if (listenersAdded.nonEmpty)
server.socketServer.addListeners(listenersAdded)
server.zkClient.updateBrokerInfoInZk(server.createBrokerInfo)
}
private def listenersToMap(listeners: Seq[EndPoint]): Map[ListenerName, EndPoint] =
listeners.map(e => (e.listenerName, e)).toMap
}

View File

@ -1291,7 +1291,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleSaslHandshakeRequest(request: RequestChannel.Request) {
sendResponseMaybeThrottle(request, _ => new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, config.saslEnabledMechanisms))
sendResponseMaybeThrottle(request, _ => new SaslHandshakeResponse(Errors.ILLEGAL_SASL_STATE, Collections.emptySet()))
}
def handleSaslAuthenticateRequest(request: RequestChannel.Request) {

View File

@ -223,6 +223,11 @@ object Defaults {
val DelegationTokenMaxLifeTimeMsDefault = 7 * 24 * 60 * 60 * 1000L
val DelegationTokenExpiryTimeMsDefault = 24 * 60 * 60 * 1000L
val DelegationTokenExpiryCheckIntervalMsDefault = 1 * 60 * 60 * 1000L
/** ********* Password encryption configuration for dynamic configs *********/
val PasswordEncoderCipherAlgorithm = "AES/CBC/PKCS5Padding"
val PasswordEncoderKeyLength = 128
val PasswordEncoderIterations = 4096
}
object KafkaConfig {
@ -412,6 +417,7 @@ object KafkaConfig {
/** ********* SASL Configuration ****************/
val SaslMechanismInterBrokerProtocolProp = "sasl.mechanism.inter.broker.protocol"
val SaslJaasConfigProp = SaslConfigs.SASL_JAAS_CONFIG
val SaslEnabledMechanismsProp = BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG
val SaslKerberosServiceNameProp = SaslConfigs.SASL_KERBEROS_SERVICE_NAME
val SaslKerberosKinitCmdProp = SaslConfigs.SASL_KERBEROS_KINIT_CMD
@ -426,6 +432,14 @@ object KafkaConfig {
val DelegationTokenExpiryTimeMsProp = "delegation.token.expiry.time.ms"
val DelegationTokenExpiryCheckIntervalMsProp = "delegation.token.expiry.check.interval.ms"
/** ********* Password encryption configuration for dynamic configs *********/
val PasswordEncoderSecretProp = "password.encoder.secret"
val PasswordEncoderOldSecretProp = "password.encoder.old.secret"
val PasswordEncoderKeyFactoryAlgorithmProp = "password.encoder.keyfactory.algorithm"
val PasswordEncoderCipherAlgorithmProp = "password.encoder.cipher.algorithm"
val PasswordEncoderKeyLengthProp = "password.encoder.key.length"
val PasswordEncoderIterationsProp = "password.encoder.iterations"
/* Documentation */
/** ********* Zookeeper Configuration ***********/
val ZkConnectDoc = "Zookeeper host string"
@ -689,6 +703,7 @@ object KafkaConfig {
/** ********* Sasl Configuration ****************/
val SaslMechanismInterBrokerProtocolDoc = "SASL mechanism used for inter-broker communication. Default is GSSAPI."
val SaslJaasConfigDoc = SaslConfigs.SASL_JAAS_CONFIG_DOC
val SaslEnabledMechanismsDoc = SaslConfigs.SASL_ENABLED_MECHANISMS_DOC
val SaslKerberosServiceNameDoc = SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC
val SaslKerberosKinitCmdDoc = SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC
@ -704,6 +719,16 @@ object KafkaConfig {
val DelegationTokenExpiryTimeMsDoc = "The token validity time in seconds before the token needs to be renewed. Default value 1 day."
val DelegationTokenExpiryCheckIntervalDoc = "Scan interval to remove expired delegation tokens."
/** ********* Password encryption configuration for dynamic configs *********/
val PasswordEncoderSecretDoc = "The secret used for encoding dynamically configured passwords for this broker."
val PasswordEncoderOldSecretDoc = "The old secret that was used for encoding dynamically configured passwords. " +
"This is required only when the secret is updated. If specified, all dynamically encoded passwords are " +
s"decoded using this old secret and re-encoded using $PasswordEncoderSecretProp when broker starts up."
val PasswordEncoderKeyFactoryAlgorithmDoc = "The SecretKeyFactory algorithm used for encoding dynamically configured passwords. " +
"Default is PBKDF2WithHmacSHA512 if available and PBKDF2WithHmacSHA1 otherwise."
val PasswordEncoderCipherAlgorithmDoc = "The Cipher algorithm used for encoding dynamically configured passwords."
val PasswordEncoderKeyLengthDoc = "The key length used for encoding dynamically configured passwords."
val PasswordEncoderIterationsDoc = "The iteration count used for encoding dynamically configured passwords."
private val configDef = {
import ConfigDef.Importance._
@ -898,6 +923,7 @@ object KafkaConfig {
/** ********* Sasl Configuration ****************/
.define(SaslMechanismInterBrokerProtocolProp, STRING, Defaults.SaslMechanismInterBrokerProtocol, MEDIUM, SaslMechanismInterBrokerProtocolDoc)
.define(SaslJaasConfigProp, PASSWORD, null, MEDIUM, SaslJaasConfigDoc)
.define(SaslEnabledMechanismsProp, LIST, Defaults.SaslEnabledMechanisms, MEDIUM, SaslEnabledMechanismsDoc)
.define(SaslKerberosServiceNameProp, STRING, null, MEDIUM, SaslKerberosServiceNameDoc)
.define(SaslKerberosKinitCmdProp, STRING, Defaults.SaslKerberosKinitCmd, MEDIUM, SaslKerberosKinitCmdDoc)
@ -911,6 +937,13 @@ object KafkaConfig {
.define(DelegationTokenExpiryTimeMsProp, LONG, Defaults.DelegationTokenExpiryTimeMsDefault, atLeast(1), MEDIUM, DelegationTokenExpiryTimeMsDoc)
.define(DelegationTokenExpiryCheckIntervalMsProp, LONG, Defaults.DelegationTokenExpiryCheckIntervalMsDefault, atLeast(1), LOW, DelegationTokenExpiryCheckIntervalDoc)
/** ********* Password encryption configuration for dynamic configs *********/
.define(PasswordEncoderSecretProp, PASSWORD, null, MEDIUM, PasswordEncoderSecretDoc)
.define(PasswordEncoderOldSecretProp, PASSWORD, null, MEDIUM, PasswordEncoderOldSecretDoc)
.define(PasswordEncoderKeyFactoryAlgorithmProp, STRING, null, LOW, PasswordEncoderKeyFactoryAlgorithmDoc)
.define(PasswordEncoderCipherAlgorithmProp, STRING, Defaults.PasswordEncoderCipherAlgorithm, LOW, PasswordEncoderCipherAlgorithmDoc)
.define(PasswordEncoderKeyLengthProp, INT, Defaults.PasswordEncoderKeyLength, atLeast(8), LOW, PasswordEncoderKeyLengthDoc)
.define(PasswordEncoderIterationsProp, INT, Defaults.PasswordEncoderIterations, atLeast(1024), LOW, PasswordEncoderIterationsDoc)
}
def configNames() = configDef.names().asScala.toList.sorted
@ -942,9 +975,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
def this(props: java.util.Map[_, _]) = this(props, true, None)
def this(props: java.util.Map[_, _], doLog: Boolean) = this(props, doLog, None)
private[server] val dynamicConfig = dynamicConfigOverride.getOrElse(new DynamicBrokerConfig(this))
// Cache the current config to avoid acquiring read lock to access from dynamicConfig
@volatile private var currentConfig = this
private[server] val dynamicConfig = dynamicConfigOverride.getOrElse(new DynamicBrokerConfig(this))
private[server] def updateCurrentConfig(newConfig: KafkaConfig): Unit = {
this.currentConfig = newConfig
@ -1076,8 +1109,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
val leaderImbalanceCheckIntervalSeconds = getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp)
def uncleanLeaderElectionEnable: java.lang.Boolean = getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp)
val (interBrokerListenerName, interBrokerSecurityProtocol) = getInterBrokerListenerNameAndSecurityProtocol
// We keep the user-provided String as `ApiVersion.apply` can choose a slightly different version (eg if `0.10.0`
// is passed, `0.10.0-IV0` may be picked)
val interBrokerProtocolVersionString = getString(KafkaConfig.InterBrokerProtocolVersionProp)
@ -1120,32 +1151,21 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
val metricSampleWindowMs = getLong(KafkaConfig.MetricSampleWindowMsProp)
val metricRecordingLevel = getString(KafkaConfig.MetricRecordingLevelProp)
/** ********* SSL Configuration **************/
val principalBuilderClass = getClass(KafkaConfig.PrincipalBuilderClassProp)
val sslProtocol = getString(KafkaConfig.SslProtocolProp)
val sslProvider = getString(KafkaConfig.SslProviderProp)
val sslEnabledProtocols = getList(KafkaConfig.SslEnabledProtocolsProp)
def sslKeystoreType = getString(KafkaConfig.SslKeystoreTypeProp)
def sslKeystoreLocation = getString(KafkaConfig.SslKeystoreLocationProp)
def sslKeystorePassword = getPassword(KafkaConfig.SslKeystorePasswordProp)
def sslKeyPassword = getPassword(KafkaConfig.SslKeyPasswordProp)
val sslTruststoreType = getString(KafkaConfig.SslTruststoreTypeProp)
val sslTruststoreLocation = getString(KafkaConfig.SslTruststoreLocationProp)
val sslTruststorePassword = getPassword(KafkaConfig.SslTruststorePasswordProp)
val sslKeyManagerAlgorithm = getString(KafkaConfig.SslKeyManagerAlgorithmProp)
val sslTrustManagerAlgorithm = getString(KafkaConfig.SslTrustManagerAlgorithmProp)
val sslClientAuth = getString(KafkaConfig.SslClientAuthProp)
val sslCipher = getList(KafkaConfig.SslCipherSuitesProp)
/** ********* SSL/SASL Configuration **************/
// Security configs may be overridden for listeners, so it is not safe to use the base values
// Hence the base SSL/SASL configs are not fields of KafkaConfig, listener configs should be
// retrieved using KafkaConfig#valuesWithPrefixOverride
private def saslEnabledMechanisms(listenerName: ListenerName): Set[String] = {
val value = valuesWithPrefixOverride(listenerName.configPrefix).get(KafkaConfig.SaslEnabledMechanismsProp)
if (value != null)
value.asInstanceOf[util.List[String]].asScala.toSet
else
Set.empty[String]
}
/** ********* Sasl Configuration **************/
val saslMechanismInterBrokerProtocol = getString(KafkaConfig.SaslMechanismInterBrokerProtocolProp)
val saslEnabledMechanisms = getList(KafkaConfig.SaslEnabledMechanismsProp)
val saslKerberosServiceName = getString(KafkaConfig.SaslKerberosServiceNameProp)
val saslKerberosKinitCmd = getString(KafkaConfig.SaslKerberosKinitCmdProp)
val saslKerberosTicketRenewWindowFactor = getDouble(KafkaConfig.SaslKerberosTicketRenewWindowFactorProp)
val saslKerberosTicketRenewJitter = getDouble(KafkaConfig.SaslKerberosTicketRenewJitterProp)
val saslKerberosMinTimeBeforeRelogin = getLong(KafkaConfig.SaslKerberosMinTimeBeforeReloginProp)
val saslKerberosPrincipalToLocalRules = getList(KafkaConfig.SaslKerberosPrincipalToLocalRulesProp)
def interBrokerListenerName = getInterBrokerListenerNameAndSecurityProtocol._1
def interBrokerSecurityProtocol = getInterBrokerListenerNameAndSecurityProtocol._2
def saslMechanismInterBrokerProtocol = getString(KafkaConfig.SaslMechanismInterBrokerProtocolProp)
val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion >= KAFKA_0_10_0_IV1
/** ********* DelegationToken Configuration **************/
@ -1155,6 +1175,14 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
val delegationTokenExpiryTimeMs = getLong(KafkaConfig.DelegationTokenExpiryTimeMsProp)
val delegationTokenExpiryCheckIntervalMs = getLong(KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp)
/** ********* Password encryption configuration for dynamic configs *********/
def passwordEncoderSecret = Option(getPassword(KafkaConfig.PasswordEncoderSecretProp))
def passwordEncoderOldSecret = Option(getPassword(KafkaConfig.PasswordEncoderOldSecretProp))
def passwordEncoderCipherAlgorithm = getString(KafkaConfig.PasswordEncoderCipherAlgorithmProp)
def passwordEncoderKeyFactoryAlgorithm = Option(getString(KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp))
def passwordEncoderKeyLength = getInt(KafkaConfig.PasswordEncoderKeyLengthProp)
def passwordEncoderIterations = getInt(KafkaConfig.PasswordEncoderIterationsProp)
/** ********* Quota Configuration **************/
val producerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp)
val consumerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp)
@ -1170,9 +1198,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
val deleteTopicEnable = getBoolean(KafkaConfig.DeleteTopicEnableProp)
def compressionType = getString(KafkaConfig.CompressionTypeProp)
val listeners: Seq[EndPoint] = getListeners
val advertisedListeners: Seq[EndPoint] = getAdvertisedListeners
private[kafka] lazy val listenerSecurityProtocolMap = getListenerSecurityProtocolMap
def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
dynamicConfig.addReconfigurable(reconfigurable)
@ -1203,7 +1228,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
// If the user did not define listeners but did define host or port, let's use them in backward compatible way
// If none of those are defined, we default to PLAINTEXT://:9092
private def getListeners: Seq[EndPoint] = {
def listeners: Seq[EndPoint] = {
Option(getString(KafkaConfig.ListenersProp)).map { listenerProp =>
CoreUtils.listenerListToEndPoints(listenerProp, listenerSecurityProtocolMap)
}.getOrElse(CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port, listenerSecurityProtocolMap))
@ -1212,14 +1237,14 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
// If the user defined advertised listeners, we use those
// If he didn't but did define advertised host or port, we'll use those and fill in the missing value from regular host / port or defaults
// If none of these are defined, we'll use the listeners
private def getAdvertisedListeners: Seq[EndPoint] = {
def advertisedListeners: Seq[EndPoint] = {
val advertisedListenersProp = getString(KafkaConfig.AdvertisedListenersProp)
if (advertisedListenersProp != null)
CoreUtils.listenerListToEndPoints(advertisedListenersProp, listenerSecurityProtocolMap)
else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null)
CoreUtils.listenerListToEndPoints("PLAINTEXT://" + advertisedHostName + ":" + advertisedPort, listenerSecurityProtocolMap)
else
getListeners
listeners
}
private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, SecurityProtocol) = {
@ -1248,7 +1273,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
}
}
private def getListenerSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = {
def listenerSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = {
getMap(KafkaConfig.ListenerSecurityProtocolMapProp, getString(KafkaConfig.ListenerSecurityProtocolMapProp))
.map { case (listenerName, protocolName) =>
ListenerName.normalised(listenerName) -> getSecurityProtocol(protocolName, KafkaConfig.ListenerSecurityProtocolMapProp)
@ -1295,7 +1320,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
val interBrokerUsesSasl = interBrokerSecurityProtocol == SecurityProtocol.SASL_PLAINTEXT || interBrokerSecurityProtocol == SecurityProtocol.SASL_SSL
require(!interBrokerUsesSasl || saslInterBrokerHandshakeRequestEnable || saslMechanismInterBrokerProtocol == SaslConfigs.GSSAPI_MECHANISM,
s"Only GSSAPI mechanism is supported for inter-broker communication with SASL when inter.broker.protocol.version is set to $interBrokerProtocolVersionString")
require(!interBrokerUsesSasl || saslEnabledMechanisms.contains(saslMechanismInterBrokerProtocol),
require(!interBrokerUsesSasl || saslEnabledMechanisms(interBrokerListenerName).contains(saslMechanismInterBrokerProtocol),
s"${KafkaConfig.SaslMechanismInterBrokerProtocolProp} must be included in ${KafkaConfig.SaslEnabledMechanismsProp} when SASL is used for inter-broker communication")
require(queuedMaxBytes <= 0 || queuedMaxBytes >= socketRequestMaxBytes,
s"${KafkaConfig.QueuedMaxBytesProp} must be larger or equal to ${KafkaConfig.SocketRequestMaxBytesProp}")

View File

@ -44,6 +44,7 @@ import org.apache.kafka.common.network._
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.ScramMechanism
import org.apache.kafka.common.security.token.delegation.DelegationTokenCache
import org.apache.kafka.common.security.{JaasContext, JaasUtils}
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
@ -236,8 +237,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
logManager.startup()
metadataCache = new MetadataCache(config.brokerId)
tokenCache = new DelegationTokenCache(config.saslEnabledMechanisms)
credentialProvider = new CredentialProvider(config.saslEnabledMechanisms, tokenCache)
// Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
// This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
socketServer = new SocketServer(config, metrics, time, credentialProvider)
socketServer.startup()
@ -366,7 +369,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
zkClient.getClusterId.getOrElse(zkClient.createOrGetClusterId(CoreUtils.generateUuidAsBase64))
}
private def createBrokerInfo: BrokerInfo = {
private[server] def createBrokerInfo: BrokerInfo = {
val listeners = config.advertisedListeners.map { endpoint =>
if (endpoint.port == 0)
endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))

View File

@ -0,0 +1,175 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import java.nio.charset.StandardCharsets
import java.security.{AlgorithmParameters, NoSuchAlgorithmException, SecureRandom}
import java.security.spec.AlgorithmParameterSpec
import javax.crypto.{Cipher, SecretKeyFactory}
import javax.crypto.spec._
import kafka.utils.PasswordEncoder._
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.utils.Base64
import scala.collection.Map
object PasswordEncoder {
val KeyFactoryAlgorithmProp = "keyFactoryAlgorithm"
val CipherAlgorithmProp = "cipherAlgorithm"
val InitializationVectorProp = "initializationVector"
val KeyLengthProp = "keyLength"
val SaltProp = "salt"
val IterationsProp = "iterations"
val EncyrptedPasswordProp = "encryptedPassword"
val PasswordLengthProp = "passwordLength"
}
/**
* Password encoder and decoder implementation. Encoded passwords are persisted as a CSV map
* containing the encoded password in base64 and along with the properties used for encryption.
*
* @param secret The secret used for encoding and decoding
* @param keyFactoryAlgorithm Key factory algorithm if configured. By default, PBKDF2WithHmacSHA512 is
* used if available, PBKDF2WithHmacSHA1 otherwise.
* @param cipherAlgorithm Cipher algorithm used for encoding.
* @param keyLength Key length used for encoding. This should be valid for the specified algorithms.
* @param iterations Iteration count used for encoding.
*
* The provided `keyFactoryAlgorithm`, 'cipherAlgorithm`, `keyLength` and `iterations` are used for encoding passwords.
* The values used for encoding are stored along with the encoded password and the stored values are used for decoding.
*
*/
class PasswordEncoder(secret: Password,
keyFactoryAlgorithm: Option[String],
cipherAlgorithm: String,
keyLength: Int,
iterations: Int) extends Logging {
private val secureRandom = new SecureRandom
private val cipherParamsEncoder = cipherParamsInstance(cipherAlgorithm)
def encode(password: Password): String = {
val salt = new Array[Byte](256)
secureRandom.nextBytes(salt)
val cipher = Cipher.getInstance(cipherAlgorithm)
val keyFactory = secretKeyFactory(keyFactoryAlgorithm)
val keySpec = secretKeySpec(keyFactory, cipherAlgorithm, keyLength, salt, iterations)
cipher.init(Cipher.ENCRYPT_MODE, keySpec)
val encryptedPassword = cipher.doFinal(password.value.getBytes(StandardCharsets.UTF_8))
val encryptedMap = Map(
KeyFactoryAlgorithmProp -> keyFactory.getAlgorithm,
CipherAlgorithmProp -> cipherAlgorithm,
KeyLengthProp -> keyLength,
SaltProp -> base64Encode(salt),
IterationsProp -> iterations.toString,
EncyrptedPasswordProp -> base64Encode(encryptedPassword),
PasswordLengthProp -> password.value.length
) ++ cipherParamsEncoder.toMap(cipher.getParameters)
encryptedMap.map { case (k, v) => s"$k:$v" }.mkString(",")
}
def decode(encodedPassword: String): Password = {
val params = CoreUtils.parseCsvMap(encodedPassword)
val keyFactoryAlg = params(KeyFactoryAlgorithmProp)
val cipherAlg = params(CipherAlgorithmProp)
val keyLength = params(KeyLengthProp).toInt
val salt = base64Decode(params(SaltProp))
val iterations = params(IterationsProp).toInt
val encryptedPassword = base64Decode(params(EncyrptedPasswordProp))
val passwordLengthProp = params(PasswordLengthProp).toInt
val cipher = Cipher.getInstance(cipherAlg)
val keyFactory = secretKeyFactory(Some(keyFactoryAlg))
val keySpec = secretKeySpec(keyFactory, cipherAlg, keyLength, salt, iterations)
cipher.init(Cipher.DECRYPT_MODE, keySpec, cipherParamsEncoder.toParameterSpec(params))
val password = try {
val decrypted = cipher.doFinal(encryptedPassword)
new String(decrypted, StandardCharsets.UTF_8)
} catch {
case e: Exception => throw new ConfigException("Password could not be decoded", e)
}
if (password.length != passwordLengthProp) // Sanity check
throw new ConfigException("Password could not be decoded, sanity check of length failed")
new Password(password)
}
private def secretKeyFactory(keyFactoryAlg: Option[String]): SecretKeyFactory = {
keyFactoryAlg match {
case Some(algorithm) => SecretKeyFactory.getInstance(algorithm)
case None =>
try {
SecretKeyFactory.getInstance("PBKDF2WithHmacSHA512")
} catch {
case _: NoSuchAlgorithmException => SecretKeyFactory.getInstance("PBKDF2WithHmacSHA1")
}
}
}
private def secretKeySpec(keyFactory: SecretKeyFactory,
cipherAlg: String,
keyLength: Int,
salt: Array[Byte], iterations: Int): SecretKeySpec = {
val keySpec = new PBEKeySpec(secret.value.toCharArray, salt, iterations, keyLength)
val algorithm = if (cipherAlg.indexOf('/') > 0) cipherAlg.substring(0, cipherAlg.indexOf('/')) else cipherAlg
new SecretKeySpec(keyFactory.generateSecret(keySpec).getEncoded, algorithm)
}
private def base64Encode(bytes: Array[Byte]): String = Base64.encoder.encodeToString(bytes)
private[utils] def base64Decode(encoded: String): Array[Byte] = Base64.decoder.decode(encoded)
private def cipherParamsInstance(cipherAlgorithm: String): CipherParamsEncoder = {
val aesPattern = "AES/(.*)/.*".r
cipherAlgorithm match {
case aesPattern("GCM") => new GcmParamsEncoder
case _ => new IvParamsEncoder
}
}
private trait CipherParamsEncoder {
def toMap(cipher: AlgorithmParameters): Map[String, String]
def toParameterSpec(paramMap: Map[String, String]): AlgorithmParameterSpec
}
private class IvParamsEncoder extends CipherParamsEncoder {
def toMap(cipherParams: AlgorithmParameters): Map[String, String] = {
if (cipherParams != null) {
val ivSpec = cipherParams.getParameterSpec(classOf[IvParameterSpec])
Map(InitializationVectorProp -> base64Encode(ivSpec.getIV))
} else
throw new IllegalStateException("Could not determine initialization vector for cipher")
}
def toParameterSpec(paramMap: Map[String, String]): AlgorithmParameterSpec = {
new IvParameterSpec(base64Decode(paramMap(InitializationVectorProp)))
}
}
private class GcmParamsEncoder extends CipherParamsEncoder {
def toMap(cipherParams: AlgorithmParameters): Map[String, String] = {
if (cipherParams != null) {
val spec = cipherParams.getParameterSpec(classOf[GCMParameterSpec])
Map(InitializationVectorProp -> base64Encode(spec.getIV),
"authenticationTagLength" -> spec.getTLen.toString)
} else
throw new IllegalStateException("Could not determine initialization vector for cipher")
}
def toParameterSpec(paramMap: Map[String, String]): AlgorithmParameterSpec = {
new GCMParameterSpec(paramMap("authenticationTagLength").toInt, base64Decode(paramMap(InitializationVectorProp)))
}
}
}

View File

@ -82,6 +82,13 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
info(s"Registered broker ${brokerInfo.broker.id} at path $path with addresses: ${brokerInfo.broker.endPoints}")
}
def updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit = {
val brokerIdPath = brokerInfo.path
val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion)
retryRequestUntilConnected(setDataRequest)
info("Updated broker %d at path %s with addresses: %s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints))
}
/**
* Gets topic partition states for the given partitions.
* @param partitions the partitions for which we want ot get states.

View File

@ -430,13 +430,13 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertEquals(KafkaConfig.ListenerSecurityProtocolMapProp, listenerSecurityProtocolMap.name)
assertFalse(listenerSecurityProtocolMap.isDefault)
assertFalse(listenerSecurityProtocolMap.isSensitive)
assertTrue(listenerSecurityProtocolMap.isReadOnly)
assertFalse(listenerSecurityProtocolMap.isReadOnly)
val truststorePassword = configs.get(brokerResource1).get(KafkaConfig.SslTruststorePasswordProp)
assertEquals(KafkaConfig.SslTruststorePasswordProp, truststorePassword.name)
assertNull(truststorePassword.value)
assertFalse(truststorePassword.isDefault)
assertTrue(truststorePassword.isSensitive)
assertTrue(truststorePassword.isReadOnly)
assertFalse(truststorePassword.isReadOnly)
val compressionType = configs.get(brokerResource1).get(KafkaConfig.CompressionTypeProp)
assertEquals(servers(1).config.compressionType.toString, compressionType.value)
assertEquals(KafkaConfig.CompressionTypeProp, compressionType.name)

View File

@ -59,12 +59,7 @@ trait SaslSetup {
case _ => false
})
if (hasKerberos) {
val (serverKeytabFile, clientKeytabFile) = maybeCreateEmptyKeytabFiles()
kdc = new MiniKdc(kdcConf, workDir)
kdc.start()
kdc.createPrincipal(serverKeytabFile, JaasTestUtils.KafkaServerPrincipalUnqualifiedName + "/localhost")
kdc.createPrincipal(clientKeytabFile,
JaasTestUtils.KafkaClientPrincipalUnqualifiedName, JaasTestUtils.KafkaClientPrincipalUnqualifiedName2)
initializeKerberos()
}
writeJaasConfigurationToFile(jaasSections)
val hasZk = jaasSections.exists(_.modules.exists {
@ -75,6 +70,15 @@ trait SaslSetup {
System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
}
protected def initializeKerberos(): Unit = {
val (serverKeytabFile, clientKeytabFile) = maybeCreateEmptyKeytabFiles()
kdc = new MiniKdc(kdcConf, workDir)
kdc.start()
kdc.createPrincipal(serverKeytabFile, JaasTestUtils.KafkaServerPrincipalUnqualifiedName + "/localhost")
kdc.createPrincipal(clientKeytabFile,
JaasTestUtils.KafkaClientPrincipalUnqualifiedName, JaasTestUtils.KafkaClientPrincipalUnqualifiedName2)
}
/** Return a tuple with the path to the server keytab file and client keytab file */
protected def maybeCreateEmptyKeytabFiles(): (File, File) = {
if (serverKeytabFile.isEmpty)

View File

@ -18,28 +18,29 @@
package kafka.server
import java.io.{Closeable, File, FileOutputStream, FileWriter}
import java.io.{Closeable, File, FileWriter}
import java.nio.file.{Files, StandardCopyOption}
import java.lang.management.ManagementFactory
import java.util
import java.util.{Collections, Properties}
import java.util.concurrent.{ConcurrentLinkedQueue, ExecutionException, TimeUnit}
import java.util.concurrent._
import javax.management.ObjectName
import kafka.admin.ConfigCommand
import kafka.api.SaslSetup
import kafka.log.LogConfig
import kafka.api.{KafkaSasl, SaslSetup}
import kafka.coordinator.group.OffsetConfig
import kafka.log.LogConfig
import kafka.message.ProducerCompressionCodec
import kafka.utils.{ShutdownableThread, TestUtils}
import kafka.utils._
import kafka.utils.Implicits._
import kafka.zk.{ConfigEntityChangeNotificationZNode, ZooKeeperTestHarness}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.ConfigEntry.{ConfigSource, ConfigSynonym}
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, Reconfigurable, TopicPartition}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.{ConfigException, ConfigResource, SslConfigs}
import org.apache.kafka.common.config.SslConfigs._
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.errors.{AuthenticationException, InvalidRequestException}
@ -55,6 +56,7 @@ import org.junit.{After, Before, Test}
import scala.collection._
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConverters._
import scala.collection.Seq
object DynamicBrokerReconfigurationTest {
val SecureInternal = "INTERNAL"
@ -65,12 +67,13 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
import DynamicBrokerReconfigurationTest._
private var servers = new ArrayBuffer[KafkaServer]
private val servers = new ArrayBuffer[KafkaServer]
private val numServers = 3
private val producers = new ArrayBuffer[KafkaProducer[String, String]]
private val consumers = new ArrayBuffer[KafkaConsumer[String, String]]
private val adminClients = new ArrayBuffer[AdminClient]()
private val clientThreads = new ArrayBuffer[ShutdownableThread]()
private val executors = new ArrayBuffer[ExecutorService]
private val topic = "testtopic"
private val kafkaClientSaslMechanism = "PLAIN"
@ -95,10 +98,13 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
props.put(KafkaConfig.ListenersProp, s"$SecureInternal://localhost:0, $SecureExternal://localhost:0")
props.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"$SecureInternal:SSL, $SecureExternal:SASL_SSL")
props.put(KafkaConfig.InterBrokerListenerNameProp, SecureInternal)
props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, "PLAIN")
props.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
props.put(KafkaConfig.SaslEnabledMechanismsProp, kafkaServerSaslMechanisms.mkString(","))
props.put(KafkaConfig.LogSegmentBytesProp, "2000")
props.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "10000000")
props.put(KafkaConfig.PasswordEncoderSecretProp, "dynamic-config-secret")
props.put(KafkaConfig.PasswordEncoderOldSecretProp, "old-dynamic-config-secret")
props ++= sslProperties1
addKeystoreWithListenerPrefix(sslProperties1, props, SecureInternal)
@ -127,8 +133,9 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
clientThreads.foreach(_.interrupt())
clientThreads.foreach(_.initiateShutdown())
clientThreads.foreach(_.join(5 * 1000))
producers.foreach(_.close())
consumers.foreach(_.close())
executors.foreach(_.shutdownNow())
producers.foreach(_.close(0, TimeUnit.MILLISECONDS))
consumers.foreach(_.close(0, TimeUnit.MILLISECONDS))
adminClients.foreach(_.close())
TestUtils.shutdownServers(servers)
super.tearDown()
@ -514,13 +521,217 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
stopAndVerifyProduceConsume(producerThread, consumerThread)
}
@Test
def testAdvertisedListenerUpdate(): Unit = {
val adminClient = adminClients.head
val externalAdminClient = createAdminClient(SecurityProtocol.SASL_SSL, SecureExternal)
// Ensure connections are made to brokers before external listener is made inaccessible
describeConfig(externalAdminClient)
// Update broker keystore for external listener to use invalid listener address
// any address other than localhost is sufficient to fail (either connection or host name verification failure)
val invalidHost = "192.168.0.1"
alterAdvertisedListener(adminClient, externalAdminClient, "localhost", invalidHost)
// Verify that producer connections fail since advertised listener is invalid
val bootstrap = bootstrapServers.replaceAll(invalidHost, "localhost") // allow bootstrap connection to succeed
val producer1 = createProducer(trustStoreFile1, retries = 0, bootstrap = bootstrap)
val sendFuture = verifyConnectionFailure(producer1)
alterAdvertisedListener(adminClient, externalAdminClient, invalidHost, "localhost")
// Verify that produce/consume work now
val producer = createProducer(trustStoreFile1, retries = 0)
val consumer = createConsumer("group2", trustStoreFile1, topic)
verifyProduceConsume(producer, consumer, 10, topic)
// Verify updating inter-broker listener
val props = new Properties
props.put(KafkaConfig.InterBrokerListenerNameProp, SecureExternal)
try {
reconfigureServers(props, perBrokerConfig = true, (KafkaConfig.InterBrokerListenerNameProp, SecureExternal))
fail("Inter-broker listener cannot be dynamically updated")
} catch {
case e: ExecutionException =>
assertTrue(s"Unexpected exception ${e.getCause}", e.getCause.isInstanceOf[InvalidRequestException])
servers.foreach(server => assertEquals(SecureInternal, server.config.interBrokerListenerName.value))
}
// Verify that the other send did not complete
verifyTimeout(sendFuture)
}
@Test
def testAddRemoveSslListener(): Unit = {
verifyAddListener("SSL", SecurityProtocol.SSL, Seq.empty)
// Restart servers and check secret rotation
servers.foreach(_.shutdown())
servers.foreach(_.awaitShutdown())
adminClients.foreach(_.close())
adminClients.clear()
// All passwords are currently encoded with password.encoder.secret. Encode with password.encoder.old.secret
// and update ZK. When each server is started, it should decode using password.encoder.old.secret and update
// ZK with newly encoded values using password.encoder.secret.
servers.foreach { server =>
val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, server.config.brokerId.toString)
val config = server.config
val secret = config.passwordEncoderSecret.getOrElse(throw new IllegalStateException("Password encoder secret not configured"))
val oldSecret = config.passwordEncoderOldSecret.getOrElse(throw new IllegalStateException("Password encoder old secret not configured"))
val passwordConfigs = props.asScala.filterKeys(DynamicBrokerConfig.DynamicPasswordConfigs.contains)
val passwordDecoder = new PasswordEncoder(secret,
config.passwordEncoderKeyFactoryAlgorithm,
config.passwordEncoderCipherAlgorithm,
config.passwordEncoderKeyLength,
config.passwordEncoderIterations)
val passwordEncoder = new PasswordEncoder(oldSecret,
config.passwordEncoderKeyFactoryAlgorithm,
config.passwordEncoderCipherAlgorithm,
config.passwordEncoderKeyLength,
config.passwordEncoderIterations)
passwordConfigs.foreach { case (name, value) =>
val decoded = passwordDecoder.decode(value).value
props.put(name, passwordEncoder.encode(new Password(decoded)))
}
val brokerId = server.config.brokerId
adminZkClient.changeBrokerConfig(Seq(brokerId), props)
val updatedProps = adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString)
passwordConfigs.foreach { case (name, value) => assertNotEquals(value, updatedProps.get(name)) }
server.startup()
TestUtils.retry(10000) {
val newProps = adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString)
passwordConfigs.foreach { case (name, value) => assertEquals(value, newProps.get(name)) }
}
}
verifyListener(SecurityProtocol.SSL, None)
createAdminClient(SecurityProtocol.SSL, SecureInternal)
verifyRemoveListener("SSL", SecurityProtocol.SSL, Seq.empty)
}
@Test
def testAddRemoveSaslListeners(): Unit = {
createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword)
createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword)
initializeKerberos()
//verifyAddListener("SASL_SSL", SecurityProtocol.SASL_SSL, Seq("SCRAM-SHA-512", "SCRAM-SHA-256", "PLAIN"))
verifyAddListener("SASL_PLAINTEXT", SecurityProtocol.SASL_PLAINTEXT, Seq("GSSAPI"))
//verifyRemoveListener("SASL_SSL", SecurityProtocol.SASL_SSL, Seq("SCRAM-SHA-512", "SCRAM-SHA-256", "PLAIN"))
verifyRemoveListener("SASL_PLAINTEXT", SecurityProtocol.SASL_PLAINTEXT, Seq("GSSAPI"))
}
private def verifyAddListener(listenerName: String, securityProtocol: SecurityProtocol,
saslMechanisms: Seq[String]): Unit = {
val config = servers.head.config
val existingListenerCount = config.listeners.size
val listeners = config.listeners
.map(e => s"${e.listenerName.value}://${e.host}:${e.port}")
.mkString(",") + s",$listenerName://localhost:0"
val listenerMap = config.listenerSecurityProtocolMap
.map { case (name, protocol) => s"${name.value}:${protocol.name}" }
.mkString(",") + s",$listenerName:${securityProtocol.name}"
val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, config.brokerId.toString)
props.put(KafkaConfig.ListenersProp, listeners)
props.put(KafkaConfig.ListenerSecurityProtocolMapProp, listenerMap)
securityProtocol match {
case SecurityProtocol.SSL =>
addListenerPropsSsl(listenerName, props)
case SecurityProtocol.SASL_PLAINTEXT =>
addListenerPropsSasl(listenerName, saslMechanisms, props)
case SecurityProtocol.SASL_SSL =>
addListenerPropsSasl(listenerName, saslMechanisms, props)
addListenerPropsSsl(listenerName, props)
case SecurityProtocol.PLAINTEXT => // no additional props
}
alterConfigs(adminClients.head, props, perBrokerConfig = true).all.get
TestUtils.waitUntilTrue(() => servers.forall(server => server.config.listeners.size == existingListenerCount + 1),
"Listener config not updated")
TestUtils.waitUntilTrue(() => servers.forall(server => {
try {
server.socketServer.boundPort(new ListenerName(listenerName)) > 0
} catch {
case _: Exception => false
}
}), "Listener not created")
if (saslMechanisms.nonEmpty)
saslMechanisms.foreach(mechanism => verifyListener(securityProtocol, Some(mechanism)))
else
verifyListener(securityProtocol, None)
}
private def verifyRemoveListener(listenerName: String, securityProtocol: SecurityProtocol,
saslMechanisms: Seq[String]): Unit = {
val saslMechanism = if (saslMechanisms.isEmpty) "" else saslMechanisms.head
val producer1 = createProducer(listenerName, securityProtocol, saslMechanism)
val consumer1 = createConsumer(listenerName, securityProtocol, saslMechanism,
s"remove-listener-group-$securityProtocol")
verifyProduceConsume(producer1, consumer1, numRecords = 10, topic)
// send another message to check consumer later
producer1.send(new ProducerRecord(topic, "key", "value")).get(100, TimeUnit.MILLISECONDS)
val config = servers.head.config
val existingListenerCount = config.listeners.size
val listeners = config.listeners
.filter(e => e.listenerName.value != securityProtocol.name)
.map(e => s"${e.listenerName.value}://${e.host}:${e.port}")
.mkString(",")
val listenerMap = config.listenerSecurityProtocolMap
.filterKeys(listenerName => listenerName.value != securityProtocol.name)
.map { case (listenerName, protocol) => s"${listenerName.value}:${protocol.name}" }
.mkString(",")
val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, config.brokerId.toString)
val listenerProps = props.asScala.keySet.filter(_.startsWith(new ListenerName(listenerName).configPrefix))
listenerProps.foreach(props.remove)
props.put(KafkaConfig.ListenersProp, listeners)
props.put(KafkaConfig.ListenerSecurityProtocolMapProp, listenerMap)
alterConfigs(adminClients.head, props, perBrokerConfig = true).all.get
TestUtils.waitUntilTrue(() => servers.forall(server => server.config.listeners.size == existingListenerCount - 1),
"Listeners not updated")
// Test that connections using deleted listener don't work
val producerFuture = verifyConnectionFailure(producer1)
val consumerFuture = verifyConnectionFailure(consumer1)
// Test that other listeners still work
val producer2 = createProducer(trustStoreFile1, retries = 0)
val consumer2 = createConsumer(s"remove-listener-group2-$securityProtocol", trustStoreFile1, topic, autoOffsetReset = "latest")
verifyProduceConsume(producer2, consumer2, numRecords = 10, topic)
// Verify that producer/consumer using old listener don't work
verifyTimeout(producerFuture)
verifyTimeout(consumerFuture)
}
private def verifyListener(securityProtocol: SecurityProtocol, saslMechanism: Option[String]): Unit = {
val mechanism = saslMechanism.getOrElse("")
val producer = createProducer(securityProtocol.name, securityProtocol, mechanism)
val consumer = createConsumer(securityProtocol.name, securityProtocol, mechanism,
s"add-listener-group-$securityProtocol-$mechanism")
verifyProduceConsume(producer, consumer, numRecords = 10, topic)
}
private def bootstrapServers: String = TestUtils.bootstrapServers(servers, new ListenerName(SecureExternal))
private def createProducer(trustStore: File, retries: Int,
clientId: String = "test-producer"): KafkaProducer[String, String] = {
val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(SecureExternal))
clientId: String = "test-producer",
bootstrap: String = bootstrapServers,
securityProtocol: SecurityProtocol = SecurityProtocol.SASL_SSL): KafkaProducer[String, String] = {
val propsOverride = new Properties
propsOverride.put(ProducerConfig.CLIENT_ID_CONFIG, clientId)
propsOverride.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS")
val producer = TestUtils.createNewProducer(
bootstrapServers,
bootstrap,
acks = -1,
retries = retries,
securityProtocol = SecurityProtocol.SASL_SSL,
@ -533,17 +744,72 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
producer
}
private def createConsumer(groupId: String, trustStore: File, topic: String = topic):KafkaConsumer[String, String] = {
val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(SecureExternal))
private def createConsumer(groupId: String, trustStore: File,
topic: String = topic,
bootstrap: String = bootstrapServers,
securityProtocol: SecurityProtocol = SecurityProtocol.SASL_SSL,
autoOffsetReset: String = "earliest"):KafkaConsumer[String, String] = {
val propsOverride = new Properties
propsOverride.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS")
val consumer = TestUtils.createNewConsumer(
bootstrapServers,
bootstrap,
groupId,
securityProtocol = SecurityProtocol.SASL_SSL,
autoOffsetReset = autoOffsetReset,
securityProtocol = securityProtocol,
trustStoreFile = Some(trustStore),
saslProperties = Some(clientSaslProps),
keyDeserializer = new StringDeserializer,
valueDeserializer = new StringDeserializer)
consumer.subscribe(Collections.singleton(topic))
if (autoOffsetReset == "latest") {
do {
consumer.poll(1)
} while (consumer.assignment.isEmpty)
}
consumers += consumer
consumer
}
private def clientProps(securityProtocol: SecurityProtocol, saslMechanism: String): Properties = {
val props = new Properties
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name)
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "HTTPS")
val saslProps = if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL) {
Some(kafkaClientSaslProperties(saslMechanism, dynamicJaasConfig = true))
} else
None
val securityProps = TestUtils.securityConfigs(Mode.CLIENT, securityProtocol,
Some(trustStoreFile1), "client", TestUtils.SslCertificateCn, saslProps)
props ++= securityProps
props
}
private def createProducer(listenerName: String, securityProtocol: SecurityProtocol,
saslMechanism: String): KafkaProducer[String, String] = {
val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(listenerName))
val producer = TestUtils.createNewProducer(bootstrapServers,
acks = -1, retries = 0,
securityProtocol = securityProtocol,
keySerializer = new StringSerializer,
valueSerializer = new StringSerializer,
props = Some(clientProps(securityProtocol, saslMechanism)))
producers += producer
producer
}
private def createConsumer(listenerName: String, securityProtocol: SecurityProtocol,
saslMechanism: String, group: String): KafkaConsumer[String, String] = {
val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(listenerName))
val consumer = TestUtils.createNewConsumer(bootstrapServers, group,
autoOffsetReset = "latest",
securityProtocol = securityProtocol,
keyDeserializer = new StringDeserializer,
valueDeserializer = new StringDeserializer,
props = Some(clientProps(securityProtocol, saslMechanism)))
consumer.subscribe(Collections.singleton(topic))
do {
consumer.poll(1)
} while (consumer.assignment.isEmpty)
consumers += consumer
consumer
}
@ -552,6 +818,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
val config = new util.HashMap[String, Object]
val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(listenerName))
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
config.put(AdminClientConfig.METADATA_MAX_AGE_CONFIG, "10")
val securityProps: util.Map[Object, Object] =
TestUtils.adminClientSecurityConfigs(securityProtocol, Some(trustStoreFile1), Some(clientSaslProps))
securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
@ -563,7 +830,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
private def verifyProduceConsume(producer: KafkaProducer[String, String],
consumer: KafkaConsumer[String, String],
numRecords: Int,
topic: String = topic): Unit = {
topic: String): Unit = {
val producerRecords = (1 to numRecords).map(i => new ProducerRecord(topic, s"key$i", s"value$i"))
producerRecords.map(producer.send).map(_.get(10, TimeUnit.SECONDS))
@ -639,6 +906,39 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
waitForConfig(s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", props.getProperty(SSL_KEYSTORE_LOCATION_CONFIG))
}
private def serverEndpoints(adminClient: AdminClient): String = {
val nodes = adminClient.describeCluster().nodes().get
nodes.asScala.map { node =>
s"${node.host}:${node.port}"
}.mkString(",")
}
private def alterAdvertisedListener(adminClient: AdminClient, externalAdminClient: AdminClient, oldHost: String, newHost: String): Unit = {
val configs = servers.map { server =>
val resource = new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
val newListeners = server.config.advertisedListeners.map { e =>
if (e.listenerName.value == SecureExternal)
s"${e.listenerName.value}://$newHost:${server.boundPort(e.listenerName)}"
else
s"${e.listenerName.value}://${e.host}:${server.boundPort(e.listenerName)}"
}.mkString(",")
val configEntry = new ConfigEntry(KafkaConfig.AdvertisedListenersProp, newListeners)
(resource, new Config(Collections.singleton(configEntry)))
}.toMap.asJava
adminClient.alterConfigs(configs).all.get
servers.foreach { server =>
TestUtils.retry(10000) {
val externalListener = server.config.advertisedListeners.find(_.listenerName.value == SecureExternal)
.getOrElse(throw new IllegalStateException("External listener not found"))
assertTrue("Config not updated", externalListener.host == newHost)
}
}
val (endpoints, altered) = TestUtils.computeUntilTrue(serverEndpoints(externalAdminClient)) { endpoints =>
!endpoints.contains(oldHost)
}
assertTrue(s"Advertised listener update not propagated by controller: $endpoints", altered)
}
private def alterConfigs(adminClient: AdminClient, props: Properties, perBrokerConfig: Boolean): AlterConfigsResult = {
val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
val newConfig = new Config(configEntries)
@ -701,12 +1001,6 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
adminZkClient.changeBrokerConfig(brokers, keystoreProps)
}
private def waitForKeystore(sslProperties: Properties, maxWaitMs: Long = 10000): Unit = {
waitForConfig(new ListenerName(SecureExternal).configPrefix + SSL_KEYSTORE_LOCATION_CONFIG,
sslProperties.getProperty(SSL_KEYSTORE_LOCATION_CONFIG), maxWaitMs)
}
private def waitForConfig(propName: String, propValue: String, maxWaitMs: Long = 10000): Unit = {
servers.foreach { server => waitForConfigOnServer(server, propName, propValue, maxWaitMs) }
}
@ -774,6 +1068,64 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
}
}
private def verifyConnectionFailure(producer: KafkaProducer[String, String]): Future[_] = {
val executor = Executors.newSingleThreadExecutor
executors += executor
val future = executor.submit(new Runnable() {
def run() {
producer.send(new ProducerRecord(topic, "key", "value")).get
}
})
verifyTimeout(future)
future
}
private def verifyConnectionFailure(consumer: KafkaConsumer[String, String]): Future[_] = {
val executor = Executors.newSingleThreadExecutor
executors += executor
val future = executor.submit(new Runnable() {
def run() {
assertEquals(0, consumer.poll(100).count)
}
})
verifyTimeout(future)
future
}
private def verifyTimeout(future: Future[_]): Unit = {
try {
future.get(100, TimeUnit.MILLISECONDS)
fail("Operation should not have completed")
} catch {
case _: TimeoutException => // expected exception
}
}
private def addListenerPropsSsl(listenerName: String, props: Properties): Unit = {
val prefix = new ListenerName(listenerName).configPrefix
sslProperties1.keySet.asScala.foreach { name =>
val value = sslProperties1.get(name)
val valueStr = value match {
case password: Password => password.value
case list: util.List[_] => list.asScala.map(_.toString).mkString(",")
case _ => value.toString
}
props.put(s"$prefix$name", valueStr)
}
}
private def addListenerPropsSasl(listener: String, mechanisms: Seq[String], props: Properties): Unit = {
val listenerName = new ListenerName(listener)
val prefix = listenerName.configPrefix
props.put(prefix + KafkaConfig.SaslEnabledMechanismsProp, mechanisms.mkString(","))
props.put(prefix + KafkaConfig.SaslKerberosServiceNameProp, "kafka")
mechanisms.foreach { mechanism =>
val jaasSection = jaasSections(Seq(mechanism), None, KafkaSasl, "").head
val jaasConfig = jaasSection.modules.head.toString
props.put(listenerName.saslMechanismConfigPrefix(mechanism) + KafkaConfig.SaslJaasConfigProp, jaasConfig)
}
}
private class ProducerThread(clientId: String, retries: Int) extends ShutdownableThread(clientId, isInterruptible = false) {
private val producer = createProducer(trustStoreFile1, retries, clientId)
@volatile var sent = 0
@ -877,8 +1229,10 @@ class TestMetricsReporter extends MetricsReporter with Reconfigurable with Close
Set(PollingIntervalProp).asJava
}
override def validateReconfiguration(configs: util.Map[String, _]): Boolean = {
configs.get(PollingIntervalProp).toString.toInt > 0
override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
val pollingInterval = configs.get(PollingIntervalProp).toString
if (configs.get(PollingIntervalProp).toString.toInt <= 0)
throw new ConfigException(s"Invalid polling interval $pollingInterval")
}
override def reconfigure(configs: util.Map[String, _]): Unit = {

View File

@ -21,25 +21,23 @@ import java.util.Properties
import kafka.utils.JaasTestUtils
import kafka.utils.JaasTestUtils.JaasSection
import org.apache.kafka.common.network.ListenerName
class MultipleListenersWithAdditionalJaasContextTest extends MultipleListenersWithSameSecurityProtocolBaseTest {
import MultipleListenersWithSameSecurityProtocolBaseTest._
override def saslProperties(listenerName: ListenerName): Properties = {
listenerName.value match {
case SecureInternal => kafkaClientSaslProperties(Plain, dynamicJaasConfig = true)
case SecureExternal => kafkaClientSaslProperties(GssApi, dynamicJaasConfig = true)
case _ => throw new IllegalArgumentException(s"Unexpected listener name $listenerName")
}
override def staticJaasSections: Seq[JaasSection] = {
val (serverKeytabFile, _) = maybeCreateEmptyKeytabFiles()
JaasTestUtils.zkSections :+
JaasTestUtils.kafkaServerSection("secure_external.KafkaServer", kafkaServerSaslMechanisms(SecureExternal), Some(serverKeytabFile))
}
override def jaasSections: Seq[JaasSection] = {
val (serverKeytabFile, _) = maybeCreateEmptyKeytabFiles()
JaasTestUtils.zkSections ++ Seq(
JaasTestUtils.kafkaServerSection("secure_external.KafkaServer", Seq(GssApi), Some(serverKeytabFile)),
JaasTestUtils.kafkaServerSection("secure_internal.KafkaServer", Seq(Plain), None)
)
override protected def dynamicJaasSections: Properties = {
val props = new Properties
kafkaServerSaslMechanisms(SecureInternal).foreach { mechanism =>
addDynamicJaasSection(props, SecureInternal, mechanism,
JaasTestUtils.kafkaServerSection("secure_internal.KafkaServer", Seq(mechanism), None))
}
props
}
}

View File

@ -26,12 +26,9 @@ import org.apache.kafka.common.network.ListenerName
class MultipleListenersWithDefaultJaasContextTest extends MultipleListenersWithSameSecurityProtocolBaseTest {
import MultipleListenersWithSameSecurityProtocolBaseTest._
override def staticJaasSections: Seq[JaasSection] =
jaasSections(kafkaServerSaslMechanisms.values.flatMap(identity).toSeq, Some(kafkaClientSaslMechanism), Both)
override def saslProperties(listenerName: ListenerName): Properties =
kafkaClientSaslProperties(Plain, dynamicJaasConfig = true)
override def jaasSections: Seq[JaasSection] =
jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both)
override protected def dynamicJaasSections: Properties = new Properties
}

View File

@ -19,13 +19,13 @@
package kafka.server
import java.io.File
import java.util.{Collections, Properties}
import java.util.{Collections, Objects, Properties}
import java.util.concurrent.TimeUnit
import kafka.api.SaslSetup
import kafka.coordinator.group.OffsetConfig
import kafka.utils.JaasTestUtils.JaasSection
import kafka.utils.TestUtils
import kafka.utils.{JaasTestUtils, TestUtils}
import kafka.utils.Implicits._
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
@ -55,18 +55,20 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep
private val trustStoreFile = File.createTempFile("truststore", ".jks")
private val servers = new ArrayBuffer[KafkaServer]
private val producers = mutable.Map[ListenerName, KafkaProducer[Array[Byte], Array[Byte]]]()
private val consumers = mutable.Map[ListenerName, KafkaConsumer[Array[Byte], Array[Byte]]]()
private val producers = mutable.Map[ClientMetadata, KafkaProducer[Array[Byte], Array[Byte]]]()
private val consumers = mutable.Map[ClientMetadata, KafkaConsumer[Array[Byte], Array[Byte]]]()
protected val kafkaClientSaslMechanism = Plain
protected val kafkaServerSaslMechanisms = List(GssApi, Plain)
protected val kafkaServerSaslMechanisms = Map(
SecureExternal -> Seq("SCRAM-SHA-256", GssApi),
SecureInternal -> Seq(Plain, "SCRAM-SHA-512"))
protected def saslProperties(listenerName: ListenerName): Properties
protected def jaasSections: Seq[JaasSection]
protected def staticJaasSections: Seq[JaasSection]
protected def dynamicJaasSections: Properties
@Before
override def setUp(): Unit = {
startSasl(jaasSections)
startSasl(staticJaasSections)
super.setUp()
// 2 brokers so that we can test that the data propagates correctly via UpdateMetadadaRequest
val numServers = 2
@ -82,8 +84,12 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep
props.put(KafkaConfig.InterBrokerListenerNameProp, Internal)
props.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, kafkaClientSaslMechanism)
props.put(KafkaConfig.SaslEnabledMechanismsProp, kafkaServerSaslMechanisms.mkString(","))
props.put(s"${new ListenerName(SecureInternal).configPrefix}${KafkaConfig.SaslEnabledMechanismsProp}",
kafkaServerSaslMechanisms(SecureInternal).mkString(","))
props.put(s"${new ListenerName(SecureExternal).configPrefix}${KafkaConfig.SaslEnabledMechanismsProp}",
kafkaServerSaslMechanisms(SecureExternal).mkString(","))
props.put(KafkaConfig.SaslKerberosServiceNameProp, "kafka")
props ++= dynamicJaasSections
props ++= TestUtils.sslConfigs(Mode.SERVER, false, Some(trustStoreFile), s"server$brokerId")
@ -107,27 +113,38 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep
TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, OffsetConfig.DefaultOffsetsTopicNumPartitions,
replicationFactor = 2, servers, servers.head.groupCoordinator.offsetsTopicConfigs)
createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword)
servers.head.config.listeners.foreach { endPoint =>
val listenerName = endPoint.listenerName
TestUtils.createTopic(zkClient, listenerName.value, 2, 2, servers)
val trustStoreFile =
if (TestUtils.usesSslTransportLayer(endPoint.securityProtocol)) Some(this.trustStoreFile)
else None
val saslProps =
if (TestUtils.usesSaslAuthentication(endPoint.securityProtocol)) Some(saslProperties(listenerName))
else None
val bootstrapServers = TestUtils.bootstrapServers(servers, listenerName)
producers(listenerName) = TestUtils.createNewProducer(bootstrapServers, acks = -1,
def addProducerConsumer(listenerName: ListenerName, mechanism: String, saslProps: Option[Properties]): Unit = {
val topic = s"${listenerName.value}${producers.size}"
TestUtils.createTopic(zkClient, topic, 2, 2, servers)
val clientMetadata = ClientMetadata(listenerName, mechanism, topic)
producers(clientMetadata) = TestUtils.createNewProducer(bootstrapServers, acks = -1,
securityProtocol = endPoint.securityProtocol, trustStoreFile = trustStoreFile, saslProperties = saslProps)
consumers(listenerName) = TestUtils.createNewConsumer(bootstrapServers, groupId = listenerName.value,
consumers(clientMetadata) = TestUtils.createNewConsumer(bootstrapServers, groupId = clientMetadata.toString,
securityProtocol = endPoint.securityProtocol, trustStoreFile = trustStoreFile, saslProperties = saslProps)
}
if (TestUtils.usesSaslAuthentication(endPoint.securityProtocol)) {
kafkaServerSaslMechanisms(endPoint.listenerName.value).foreach { mechanism =>
addProducerConsumer(listenerName, mechanism, Some(kafkaClientSaslProperties(mechanism, dynamicJaasConfig = true)))
}
} else {
addProducerConsumer(listenerName, "", saslProps = None)
}
}
}
@After
@ -145,18 +162,34 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep
*/
@Test
def testProduceConsume(): Unit = {
producers.foreach { case (listenerName, producer) =>
val producerRecords = (1 to 10).map(i => new ProducerRecord(listenerName.value, s"key$i".getBytes,
producers.foreach { case (clientMetadata, producer) =>
val producerRecords = (1 to 10).map(i => new ProducerRecord(clientMetadata.topic, s"key$i".getBytes,
s"value$i".getBytes))
producerRecords.map(producer.send).map(_.get(10, TimeUnit.SECONDS))
val consumer = consumers(listenerName)
consumer.subscribe(Collections.singleton(listenerName.value))
val consumer = consumers(clientMetadata)
consumer.subscribe(Collections.singleton(clientMetadata.topic))
val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]
TestUtils.waitUntilTrue(() => {
records ++= consumer.poll(50).asScala
records.size == producerRecords.size
}, s"Consumed ${records.size} records until timeout instead of the expected ${producerRecords.size} records")
}, s"Consumed ${records.size} records until timeout instead of the expected ${producerRecords.size} records with mechanism ${clientMetadata.saslMechanism}")
}
}
protected def addDynamicJaasSection(props: Properties, listener: String, mechanism: String, jaasSection: JaasSection): Unit = {
val listenerName = new ListenerName(listener)
val prefix = listenerName.saslMechanismConfigPrefix(mechanism)
val jaasConfig = jaasSection.modules.head.toString
props.put(s"${prefix}${KafkaConfig.SaslJaasConfigProp}", jaasConfig)
}
case class ClientMetadata(val listenerName: ListenerName, val saslMechanism: String, topic: String) {
override def hashCode: Int = Objects.hash(listenerName, saslMechanism)
override def equals(obj: Any): Boolean = obj match {
case other: ClientMetadata => listenerName == other.listenerName && saslMechanism == other.saslMechanism && topic == other.topic
case _ => false
}
override def toString: String = s"${listenerName.value}:$saslMechanism:$topic"
}
}

View File

@ -87,13 +87,13 @@ class KafkaTest {
val config = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "ssl.keystore.password=keystore_password",
"--override", "ssl.key.password=key_password",
"--override", "ssl.truststore.password=truststore_password")))
assertEquals(Password.HIDDEN, config.sslKeyPassword.toString)
assertEquals(Password.HIDDEN, config.sslKeystorePassword.toString)
assertEquals(Password.HIDDEN, config.sslTruststorePassword.toString)
assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslKeyPasswordProp).toString)
assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslKeystorePasswordProp).toString)
assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslTruststorePasswordProp).toString)
assertEquals("key_password", config.sslKeyPassword.value)
assertEquals("keystore_password", config.sslKeystorePassword.value)
assertEquals("truststore_password", config.sslTruststorePassword.value)
assertEquals("key_password", config.getPassword(KafkaConfig.SslKeyPasswordProp).value)
assertEquals("keystore_password", config.getPassword(KafkaConfig.SslKeystorePasswordProp).value)
assertEquals("truststore_password", config.getPassword(KafkaConfig.SslTruststorePasswordProp).value)
}
def prepareDefaultConfig(): String = {

View File

@ -27,10 +27,10 @@ import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness}
import org.apache.kafka.clients.admin._
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.internals.KafkaFutureImpl
import org.apache.kafka.common.{KafkaFuture, Node}
import org.apache.kafka.common.Node
import org.apache.kafka.common.security.scram.ScramCredentialUtils
import org.apache.kafka.common.utils.Sanitizer
import org.easymock.{EasyMock, IAnswer}
import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.Test

View File

@ -38,6 +38,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.{AbstractRequest, ProduceRequest, RequestHeader}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.scram.ScramMechanism
import org.apache.kafka.common.utils.{LogContext, MockTime, Time}
import org.apache.log4j.Level
import org.junit.Assert._
@ -61,7 +62,7 @@ class SocketServerTest extends JUnitSuite {
props.put("connections.max.idle.ms", "60000")
val config = KafkaConfig.fromProps(props)
val metrics = new Metrics
val credentialProvider = new CredentialProvider(config.saslEnabledMechanisms, null)
val credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, null)
val localAddress = InetAddress.getLoopbackAddress
// Clean-up any metrics left around by previous tests
@ -406,7 +407,7 @@ class SocketServerTest extends JUnitSuite {
// the following sleep is necessary to reliably detect the connection close when we send data below
Thread.sleep(200L)
// make sure the sockets are open
server.acceptors.values.foreach(acceptor => assertFalse(acceptor.serverChannel.socket.isClosed))
server.acceptors.asScala.values.foreach(acceptor => assertFalse(acceptor.serverChannel.socket.isClosed))
// then shutdown the server
shutdownServerAndMetrics(server)

View File

@ -17,13 +17,18 @@
package kafka.server
import java.util
import java.util.Properties
import kafka.utils.TestUtils
import org.apache.kafka.common.Reconfigurable
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.config.{ConfigException, SslConfigs}
import org.junit.Assert._
import org.junit.Test
import scala.collection.JavaConverters._
class DynamicBrokerConfigTest {
@Test
@ -34,7 +39,7 @@ class DynamicBrokerConfigTest {
val config = KafkaConfig(props)
val dynamicConfig = config.dynamicConfig
assertSame(config, dynamicConfig.currentKafkaConfig)
assertEquals(oldKeystore, config.sslKeystoreLocation)
assertEquals(oldKeystore, config.values.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
assertEquals(oldKeystore,
config.valuesFromThisConfigWithPrefixOverride("listener.name.external.").get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
assertEquals(oldKeystore, config.originalsFromThisConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
@ -55,7 +60,7 @@ class DynamicBrokerConfigTest {
assertEquals(newKeystore,
config.originalsWithPrefix("listener.name.external.").get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
assertEquals(oldKeystore, config.sslKeystoreLocation)
assertEquals(oldKeystore, config.getString(KafkaConfig.SslKeystoreLocationProp))
assertEquals(oldKeystore, config.originals.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
assertEquals(oldKeystore, config.values.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
assertEquals(oldKeystore, config.originalsStrings.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
@ -75,38 +80,48 @@ class DynamicBrokerConfigTest {
origProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS")
val config = KafkaConfig(origProps)
def verifyConfigUpdateWithInvalidConfig(validProps: Map[String, String], invalidProps: Map[String, String]): Unit = {
val props = new Properties
validProps.foreach { case (k, v) => props.put(k, v) }
invalidProps.foreach { case (k, v) => props.put(k, v) }
// DynamicBrokerConfig#validate is used by AdminClient to validate the configs provided in
// in an AlterConfigs request. Validation should fail with an exception if any of the configs are invalid.
try {
config.dynamicConfig.validate(props, perBrokerConfig = true)
fail("Invalid config did not fail validation")
} catch {
case e: ConfigException => // expected exception
}
// DynamicBrokerConfig#updateBrokerConfig is used to update configs from ZooKeeper during
// startup and when configs are updated in ZK. Update should apply valid configs and ignore
// invalid ones.
config.dynamicConfig.updateBrokerConfig(0, props)
validProps.foreach { case (name, value) => assertEquals(value, config.originals.get(name)) }
invalidProps.keySet.foreach { name =>
assertEquals(origProps.get(name), config.originals.get(name))
}
}
val validProps = Map(s"listener.name.external.${SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG}" -> "ks.p12")
val securityPropsWithoutListenerPrefix = Map(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG -> "PKCS12")
verifyConfigUpdateWithInvalidConfig(validProps, securityPropsWithoutListenerPrefix)
val nonDynamicProps = Map(KafkaConfig.ZkConnectProp -> "somehost:2181")
verifyConfigUpdateWithInvalidConfig(validProps, nonDynamicProps)
val securityPropsWithoutListenerPrefix = Map(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG -> "PKCS12")
verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, securityPropsWithoutListenerPrefix)
val nonDynamicProps = Map(KafkaConfig.ZkConnectProp -> "somehost:2181")
verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, nonDynamicProps)
// Test update of configs with invalid type
val invalidProps = Map(KafkaConfig.LogCleanerThreadsProp -> "invalid")
verifyConfigUpdateWithInvalidConfig(validProps, invalidProps)
verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, invalidProps)
}
@Test
def testConfigUpdateWithReconfigurableValidationFailure(): Unit = {
val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
origProps.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "100000000")
val config = KafkaConfig(origProps)
val validProps = Map.empty[String, String]
val invalidProps = Map(KafkaConfig.LogCleanerThreadsProp -> "20")
def validateLogCleanerConfig(configs: util.Map[String, _]): Unit = {
val cleanerThreads = configs.get(KafkaConfig.LogCleanerThreadsProp).toString.toInt
if (cleanerThreads <=0 || cleanerThreads >= 5)
throw new ConfigException(s"Invalid cleaner threads $cleanerThreads")
}
val reconfigurable = new Reconfigurable {
override def configure(configs: util.Map[String, _]): Unit = {}
override def reconfigurableConfigs(): util.Set[String] = Set(KafkaConfig.LogCleanerThreadsProp).asJava
override def validateReconfiguration(configs: util.Map[String, _]): Unit = validateLogCleanerConfig(configs)
override def reconfigure(configs: util.Map[String, _]): Unit = {}
}
config.dynamicConfig.addReconfigurable(reconfigurable)
verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, invalidProps)
config.dynamicConfig.removeReconfigurable(reconfigurable)
val brokerReconfigurable = new BrokerReconfigurable {
override def reconfigurableConfigs: collection.Set[String] = Set(KafkaConfig.LogCleanerThreadsProp)
override def validateReconfiguration(newConfig: KafkaConfig): Unit = validateLogCleanerConfig(newConfig.originals)
override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {}
}
config.dynamicConfig.addBrokerReconfigurable(brokerReconfigurable)
verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, invalidProps)
}
@Test
@ -151,4 +166,86 @@ class DynamicBrokerConfigTest {
assertEquals(oldValue, config.originals.get(name))
}
}
private def verifyConfigUpdateWithInvalidConfig(config: KafkaConfig,
origProps: Properties,
validProps: Map[String, String],
invalidProps: Map[String, String]): Unit = {
val props = new Properties
validProps.foreach { case (k, v) => props.put(k, v) }
invalidProps.foreach { case (k, v) => props.put(k, v) }
// DynamicBrokerConfig#validate is used by AdminClient to validate the configs provided in
// in an AlterConfigs request. Validation should fail with an exception if any of the configs are invalid.
try {
config.dynamicConfig.validate(props, perBrokerConfig = true)
fail("Invalid config did not fail validation")
} catch {
case e: ConfigException => // expected exception
}
// DynamicBrokerConfig#updateBrokerConfig is used to update configs from ZooKeeper during
// startup and when configs are updated in ZK. Update should apply valid configs and ignore
// invalid ones.
config.dynamicConfig.updateBrokerConfig(0, props)
validProps.foreach { case (name, value) => assertEquals(value, config.originals.get(name)) }
invalidProps.keySet.foreach { name =>
assertEquals(origProps.get(name), config.originals.get(name))
}
}
@Test
def testPasswordConfigEncryption(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
val configWithoutSecret = KafkaConfig(props)
props.put(KafkaConfig.PasswordEncoderSecretProp, "config-encoder-secret")
val configWithSecret = KafkaConfig(props)
val dynamicProps = new Properties
dynamicProps.put(KafkaConfig.SaslJaasConfigProp, "myLoginModule required;")
try {
configWithoutSecret.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig = true)
} catch {
case e: ConfigException => // expected exception
}
val persistedProps = configWithSecret.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig = true)
assertFalse("Password not encoded",
persistedProps.getProperty(KafkaConfig.SaslJaasConfigProp).contains("myLoginModule"))
val decodedProps = configWithSecret.dynamicConfig.fromPersistentProps(persistedProps, perBrokerConfig = true)
assertEquals("myLoginModule required;", decodedProps.getProperty(KafkaConfig.SaslJaasConfigProp))
}
@Test
def testPasswordConfigEncoderSecretChange(): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put(KafkaConfig.SaslJaasConfigProp, "staticLoginModule required;")
props.put(KafkaConfig.PasswordEncoderSecretProp, "config-encoder-secret")
val config = KafkaConfig(props)
val dynamicProps = new Properties
dynamicProps.put(KafkaConfig.SaslJaasConfigProp, "dynamicLoginModule required;")
val persistedProps = config.dynamicConfig.toPersistentProps(dynamicProps, perBrokerConfig = true)
assertFalse("Password not encoded",
persistedProps.getProperty(KafkaConfig.SaslJaasConfigProp).contains("LoginModule"))
config.dynamicConfig.updateBrokerConfig(0, persistedProps)
assertEquals("dynamicLoginModule required;", config.values.get(KafkaConfig.SaslJaasConfigProp).asInstanceOf[Password].value)
// New config with same secret should use the dynamic password config
val newConfigWithSameSecret = KafkaConfig(props)
newConfigWithSameSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
assertEquals("dynamicLoginModule required;", newConfigWithSameSecret.values.get(KafkaConfig.SaslJaasConfigProp).asInstanceOf[Password].value)
// New config with new secret should use the dynamic password config if new and old secrets are configured in KafkaConfig
props.put(KafkaConfig.PasswordEncoderSecretProp, "new-encoder-secret")
props.put(KafkaConfig.PasswordEncoderOldSecretProp, "config-encoder-secret")
val newConfigWithNewAndOldSecret = KafkaConfig(props)
newConfigWithNewAndOldSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
assertEquals("dynamicLoginModule required;", newConfigWithSameSecret.values.get(KafkaConfig.SaslJaasConfigProp).asInstanceOf[Password].value)
// New config with new secret alone should revert to static password config since dynamic config cannot be decoded
props.put(KafkaConfig.PasswordEncoderSecretProp, "another-new-encoder-secret")
val newConfigWithNewSecret = KafkaConfig(props)
newConfigWithNewSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
assertEquals("staticLoginModule required;", newConfigWithNewSecret.values.get(KafkaConfig.SaslJaasConfigProp).asInstanceOf[Password].value)
}
}

View File

@ -674,12 +674,22 @@ class KafkaConfigTest {
case KafkaConfig.SaslKerberosTicketRenewJitterProp =>
case KafkaConfig.SaslKerberosMinTimeBeforeReloginProp =>
case KafkaConfig.SaslKerberosPrincipalToLocalRulesProp => // ignore string
case KafkaConfig.SaslJaasConfigProp =>
// Password encoder configs
case KafkaConfig.PasswordEncoderSecretProp =>
case KafkaConfig.PasswordEncoderOldSecretProp =>
case KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp =>
case KafkaConfig.PasswordEncoderCipherAlgorithmProp =>
case KafkaConfig.PasswordEncoderKeyLengthProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
case KafkaConfig.PasswordEncoderIterationsProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
//delegation token configs
case KafkaConfig.DelegationTokenMasterKeyProp => // ignore
case KafkaConfig.DelegationTokenMaxLifeTimeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.DelegationTokenExpiryTimeMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case _ => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
}
})

View File

@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import javax.crypto.SecretKeyFactory
import kafka.server.Defaults
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.utils.Java
import org.junit.Assert._
import org.junit.Test
class PasswordEncoderTest {
@Test
def testEncodeDecode(): Unit = {
val encoder = new PasswordEncoder(new Password("password-encoder-secret"),
None,
Defaults.PasswordEncoderCipherAlgorithm,
Defaults.PasswordEncoderKeyLength,
Defaults.PasswordEncoderIterations)
val password = "test-password"
val encoded = encoder.encode(new Password(password))
val encodedMap = CoreUtils.parseCsvMap(encoded)
assertEquals("4096", encodedMap(PasswordEncoder.IterationsProp))
assertEquals("128", encodedMap(PasswordEncoder.KeyLengthProp))
val defaultKeyFactoryAlgorithm = try {
SecretKeyFactory.getInstance("PBKDF2WithHmacSHA512")
"PBKDF2WithHmacSHA512"
} catch {
case _: Exception => "PBKDF2WithHmacSHA1"
}
assertEquals(defaultKeyFactoryAlgorithm, encodedMap(PasswordEncoder.KeyFactoryAlgorithmProp))
assertEquals("AES/CBC/PKCS5Padding", encodedMap(PasswordEncoder.CipherAlgorithmProp))
verifyEncodedPassword(encoder, password, encoded)
}
@Test
def testEncoderConfigChange(): Unit = {
val encoder = new PasswordEncoder(new Password("password-encoder-secret"),
Some("PBKDF2WithHmacSHA1"),
"DES/CBC/PKCS5Padding",
64,
1024)
val password = "test-password"
val encoded = encoder.encode(new Password(password))
val encodedMap = CoreUtils.parseCsvMap(encoded)
assertEquals("1024", encodedMap(PasswordEncoder.IterationsProp))
assertEquals("64", encodedMap(PasswordEncoder.KeyLengthProp))
assertEquals("PBKDF2WithHmacSHA1", encodedMap(PasswordEncoder.KeyFactoryAlgorithmProp))
assertEquals("DES/CBC/PKCS5Padding", encodedMap(PasswordEncoder.CipherAlgorithmProp))
// Test that decoding works even if PasswordEncoder algorithm, iterations etc. are altered
val decoder = new PasswordEncoder(new Password("password-encoder-secret"),
Some("PBKDF2WithHmacSHA1"),
"AES/CBC/PKCS5Padding",
128,
2048)
assertEquals(password, decoder.decode(encoded).value)
// Test that decoding fails if secret is altered
val decoder2 = new PasswordEncoder(new Password("secret-2"),
Some("PBKDF2WithHmacSHA1"),
"AES/CBC/PKCS5Padding",
128,
1024)
try {
decoder2.decode(encoded)
} catch {
case e: ConfigException => // expected exception
}
}
@Test
def testEncodeDecodeAlgorithms(): Unit = {
def verifyEncodeDecode(keyFactoryAlg: Option[String], cipherAlg: String, keyLength: Int): Unit = {
val encoder = new PasswordEncoder(new Password("password-encoder-secret"),
keyFactoryAlg,
cipherAlg,
keyLength,
Defaults.PasswordEncoderIterations)
val password = "test-password"
val encoded = encoder.encode(new Password(password))
verifyEncodedPassword(encoder, password, encoded)
}
verifyEncodeDecode(keyFactoryAlg = None, "DES/CBC/PKCS5Padding", keyLength = 64)
verifyEncodeDecode(keyFactoryAlg = None, "DESede/CBC/PKCS5Padding", keyLength = 192)
verifyEncodeDecode(keyFactoryAlg = None, "AES/CBC/PKCS5Padding", keyLength = 128)
verifyEncodeDecode(keyFactoryAlg = None, "AES/CFB/PKCS5Padding", keyLength = 128)
verifyEncodeDecode(keyFactoryAlg = None, "AES/OFB/PKCS5Padding", keyLength = 128)
verifyEncodeDecode(keyFactoryAlg = Some("PBKDF2WithHmacSHA1"), Defaults.PasswordEncoderCipherAlgorithm, keyLength = 128)
if (Java.IS_JAVA8_COMPATIBLE) {
verifyEncodeDecode(keyFactoryAlg = None, "AES/GCM/PKCS5Padding", keyLength = 128)
verifyEncodeDecode(keyFactoryAlg = Some("PBKDF2WithHmacSHA256"), Defaults.PasswordEncoderCipherAlgorithm, keyLength = 128)
verifyEncodeDecode(keyFactoryAlg = Some("PBKDF2WithHmacSHA512"), Defaults.PasswordEncoderCipherAlgorithm, keyLength = 128)
}
}
private def verifyEncodedPassword(encoder: PasswordEncoder, password: String, encoded: String): Unit = {
val encodedMap = CoreUtils.parseCsvMap(encoded)
assertEquals(password.length.toString, encodedMap(PasswordEncoder.PasswordLengthProp))
assertNotNull("Invalid salt", encoder.base64Decode(encodedMap("salt")))
assertNotNull("Invalid encoding parameters", encoder.base64Decode(encodedMap(PasswordEncoder.InitializationVectorProp)))
assertNotNull("Invalid encoded password", encoder.base64Decode(encodedMap(PasswordEncoder.EncyrptedPasswordProp)))
assertEquals(password, encoder.decode(encoded).value)
}
}

View File

@ -718,7 +718,7 @@ object TestUtils extends Logging {
def deleteBrokersInZk(zkClient: KafkaZkClient, ids: Seq[Int]): Seq[Broker] = {
val brokers = ids.map(createBroker(_, "localhost", 6667, SecurityProtocol.PLAINTEXT))
brokers.foreach(b => zkClient.deletePath(BrokerIdsZNode.path + "/" + b))
ids.foreach(b => zkClient.deletePath(BrokerIdsZNode.path + "/" + b))
brokers
}