KAFKA-14676: Include all SASL configs in login cache key to ensure clients in a JVM can use different OAuth configs (#13211)

We currently cache login managers in static maps for both static JAAS config using system property and for JAAS config specified using Kafka config sasl.jaas.config. In addition to the JAAS config, the login manager callback handler is included in the key, but all other configs are ignored. This implementation is based on the assumption clients that require different logins (e.g. username/password) use different JAAS configs, because login properties are included in the JAAS config rather than as separate top-level configs. The OIDC support added in KIP-768 only allows configuration of token endpoint URL as a top-level config. This results in two clients in a JVM configured with different token endpoint URLs to incorrectly share a login.

This PR includes all SASL configs prefixed with sasl. to be included in the key so that logins are only shared if all the sasl configs are identical. 

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Kirk True <kirk@mustardgrain.com>
This commit is contained in:
Rajini Sivaram 2023-02-12 19:49:12 +00:00 committed by rajinisivaram
parent 2aeceff25a
commit 6a86e54c76
2 changed files with 55 additions and 5 deletions

View File

@ -99,14 +99,14 @@ public class LoginManager {
LoginManager loginManager;
Password jaasConfigValue = jaasContext.dynamicJaasConfig();
if (jaasConfigValue != null) {
LoginMetadata<Password> loginMetadata = new LoginMetadata<>(jaasConfigValue, loginClass, loginCallbackClass);
LoginMetadata<Password> loginMetadata = new LoginMetadata<>(jaasConfigValue, loginClass, loginCallbackClass, configs);
loginManager = DYNAMIC_INSTANCES.get(loginMetadata);
if (loginManager == null) {
loginManager = new LoginManager(jaasContext, saslMechanism, configs, loginMetadata);
DYNAMIC_INSTANCES.put(loginMetadata, loginManager);
}
} else {
LoginMetadata<String> loginMetadata = new LoginMetadata<>(jaasContext.name(), loginClass, loginCallbackClass);
LoginMetadata<String> loginMetadata = new LoginMetadata<>(jaasContext.name(), loginClass, loginCallbackClass, configs);
loginManager = STATIC_INSTANCES.get(loginMetadata);
if (loginManager == null) {
loginManager = new LoginManager(jaasContext, saslMechanism, configs, loginMetadata);
@ -198,17 +198,23 @@ public class LoginManager {
final T configInfo;
final Class<? extends Login> loginClass;
final Class<? extends AuthenticateCallbackHandler> loginCallbackClass;
final Map<String, Object> saslConfigs;
LoginMetadata(T configInfo, Class<? extends Login> loginClass,
Class<? extends AuthenticateCallbackHandler> loginCallbackClass) {
Class<? extends AuthenticateCallbackHandler> loginCallbackClass,
Map<String, ?> configs) {
this.configInfo = configInfo;
this.loginClass = loginClass;
this.loginCallbackClass = loginCallbackClass;
this.saslConfigs = new HashMap<>();
configs.entrySet().stream()
.filter(e -> e.getKey().startsWith("sasl."))
.forEach(e -> saslConfigs.put(e.getKey(), e.getValue())); // value may be null
}
@Override
public int hashCode() {
return Objects.hash(configInfo, loginClass, loginCallbackClass);
return Objects.hash(configInfo, loginClass, loginCallbackClass, saslConfigs);
}
@Override
@ -219,7 +225,8 @@ public class LoginManager {
LoginMetadata<?> loginMetadata = (LoginMetadata<?>) o;
return Objects.equals(configInfo, loginMetadata.configInfo) &&
Objects.equals(loginClass, loginMetadata.loginClass) &&
Objects.equals(loginCallbackClass, loginMetadata.loginCallbackClass);
Objects.equals(loginCallbackClass, loginMetadata.loginCallbackClass) &&
Objects.equals(saslConfigs, loginMetadata.saslConfigs);
}
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.security.authenticator;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.JaasContext;
@ -109,6 +110,48 @@ public class LoginManagerTest {
verifyLoginManagerRelease(staticScramLogin, 2, scramJaasContext, configs);
}
@Test
public void testLoginManagerWithDifferentConfigs() throws Exception {
Map<String, Object> configs1 = new HashMap<>();
configs1.put("sasl.jaas.config", dynamicPlainContext);
configs1.put("client.id", "client");
configs1.put(SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, "http://host1:1234");
Map<String, Object> configs2 = new HashMap<>(configs1);
configs2.put(SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, "http://host2:1234");
JaasContext dynamicContext = JaasContext.loadClientContext(configs1);
Map<String, Object> configs3 = new HashMap<>(configs1);
configs3.put("client.id", "client3");
LoginManager dynamicLogin1 = LoginManager.acquireLoginManager(dynamicContext, "PLAIN", DefaultLogin.class, configs1);
LoginManager dynamicLogin2 = LoginManager.acquireLoginManager(dynamicContext, "PLAIN", DefaultLogin.class, configs2);
assertEquals(dynamicPlainContext, dynamicLogin1.cacheKey());
assertEquals(dynamicPlainContext, dynamicLogin2.cacheKey());
assertNotSame(dynamicLogin1, dynamicLogin2);
assertSame(dynamicLogin1, LoginManager.acquireLoginManager(dynamicContext, "PLAIN", DefaultLogin.class, configs1));
assertSame(dynamicLogin2, LoginManager.acquireLoginManager(dynamicContext, "PLAIN", DefaultLogin.class, configs2));
assertSame(dynamicLogin1, LoginManager.acquireLoginManager(dynamicContext, "PLAIN", DefaultLogin.class, new HashMap<>(configs1)));
assertSame(dynamicLogin1, LoginManager.acquireLoginManager(dynamicContext, "PLAIN", DefaultLogin.class, configs3));
JaasContext staticContext = JaasContext.loadClientContext(Collections.emptyMap());
LoginManager staticLogin1 = LoginManager.acquireLoginManager(staticContext, "SCRAM-SHA-256", DefaultLogin.class, configs1);
LoginManager staticLogin2 = LoginManager.acquireLoginManager(staticContext, "SCRAM-SHA-256", DefaultLogin.class, configs2);
assertNotSame(staticLogin1, dynamicLogin1);
assertNotSame(staticLogin2, dynamicLogin2);
assertNotSame(staticLogin1, staticLogin2);
assertSame(staticLogin1, LoginManager.acquireLoginManager(staticContext, "SCRAM-SHA-256", DefaultLogin.class, configs1));
assertSame(staticLogin2, LoginManager.acquireLoginManager(staticContext, "SCRAM-SHA-256", DefaultLogin.class, configs2));
assertSame(staticLogin1, LoginManager.acquireLoginManager(staticContext, "SCRAM-SHA-256", DefaultLogin.class, new HashMap<>(configs1)));
assertSame(staticLogin1, LoginManager.acquireLoginManager(staticContext, "SCRAM-SHA-256", DefaultLogin.class, configs3));
verifyLoginManagerRelease(dynamicLogin1, 4, dynamicContext, configs1);
verifyLoginManagerRelease(dynamicLogin2, 2, dynamicContext, configs2);
verifyLoginManagerRelease(staticLogin1, 4, staticContext, configs1);
verifyLoginManagerRelease(staticLogin2, 2, staticContext, configs2);
}
private void verifyLoginManagerRelease(LoginManager loginManager, int acquireCount, JaasContext jaasContext,
Map<String, ?> configs) throws Exception {