KAFKA-13444: Fix OAuthCompatibilityTool help and add SSL options (#11486)

Reviewers: Jun Rao <junrao@gmail.com>
This commit is contained in:
Kirk True 2021-11-15 15:45:18 -08:00 committed by GitHub
parent a3ab7d5b42
commit ec29b62e92
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 244 additions and 136 deletions

View File

@ -45,6 +45,44 @@ import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SUB_CL
import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME_DOC; import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME_DOC;
import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL; import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL;
import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL_DOC; import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL_DOC;
import static org.apache.kafka.common.config.SslConfigs.SSL_CIPHER_SUITES_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_CIPHER_SUITES_DOC;
import static org.apache.kafka.common.config.SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_ENABLED_PROTOCOLS_DOC;
import static org.apache.kafka.common.config.SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC;
import static org.apache.kafka.common.config.SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_ENGINE_FACTORY_CLASS_DOC;
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC;
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_DOC;
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_KEY_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_KEY_DOC;
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_DOC;
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_DOC;
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_TYPE_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_TYPE_DOC;
import static org.apache.kafka.common.config.SslConfigs.SSL_KEY_PASSWORD_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_KEY_PASSWORD_DOC;
import static org.apache.kafka.common.config.SslConfigs.SSL_PROTOCOL_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_PROTOCOL_DOC;
import static org.apache.kafka.common.config.SslConfigs.SSL_PROVIDER_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_PROVIDER_DOC;
import static org.apache.kafka.common.config.SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_DOC;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_DOC;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG;
import static org.apache.kafka.common.config.SslConfigs.SSL_TRUSTSTORE_TYPE_DOC;
import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_ID_CONFIG; import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_ID_CONFIG;
import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_ID_DOC; import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_ID_DOC;
import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_CONFIG; import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_CONFIG;
@ -52,123 +90,47 @@ import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLo
import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.SCOPE_CONFIG; import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.SCOPE_CONFIG;
import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.SCOPE_DOC; import static org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler.SCOPE_DOC;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import net.sourceforge.argparse4j.ArgumentParsers; import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.Argument;
import net.sourceforge.argparse4j.inf.ArgumentParser; import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException; import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace; import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.security.oauthbearer.secured.AccessTokenRetriever; import org.apache.kafka.common.security.oauthbearer.secured.AccessTokenRetriever;
import org.apache.kafka.common.security.oauthbearer.secured.AccessTokenRetrieverFactory; import org.apache.kafka.common.security.oauthbearer.secured.AccessTokenRetrieverFactory;
import org.apache.kafka.common.security.oauthbearer.secured.AccessTokenValidator; import org.apache.kafka.common.security.oauthbearer.secured.AccessTokenValidator;
import org.apache.kafka.common.security.oauthbearer.secured.AccessTokenValidatorFactory; import org.apache.kafka.common.security.oauthbearer.secured.AccessTokenValidatorFactory;
import org.apache.kafka.common.security.oauthbearer.secured.CloseableVerificationKeyResolver; import org.apache.kafka.common.security.oauthbearer.secured.CloseableVerificationKeyResolver;
import org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.secured.VerificationKeyResolverFactory; import org.apache.kafka.common.security.oauthbearer.secured.VerificationKeyResolverFactory;
import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Exit;
public class OAuthCompatibilityTool { public class OAuthCompatibilityTool {
public static void main(String[] args) { public static void main(String[] args) {
String description = String.format( ArgsHandler argsHandler = new ArgsHandler();
"This tool is used to verify OAuth/OIDC provider compatibility.%n%n" +
"To use, first export KAFKA_OPTS with Java system properties that match%n" +
"your OAuth/OIDC configuration. Next, run the following script to%n" +
"execute the test:%n%n" +
" ./bin/kafka-run-class.sh %s" +
"%n%n" +
"Please refer to the following source files for OAuth/OIDC client and%n" +
"broker configuration options:" +
"%n%n" +
" %s%n" +
" %s",
OAuthCompatibilityTool.class.getName(),
SaslConfigs.class.getName(),
OAuthBearerLoginCallbackHandler.class.getName());
ArgumentParser parser = ArgumentParsers
.newArgumentParser("oauth-compatibility-test")
.defaultHelp(true)
.description(description);
parser.addArgument("--connect-timeout-ms")
.type(Integer.class)
.dest("connectTimeoutMs")
.help(SASL_LOGIN_CONNECT_TIMEOUT_MS_DOC);
parser.addArgument("--read-timeout-ms")
.type(Integer.class)
.dest("readTimeoutMs")
.help(SASL_LOGIN_READ_TIMEOUT_MS_DOC);
parser.addArgument("--login-retry-backoff-ms")
.type(Long.class)
.dest("loginRetryBackoffMs")
.help(SASL_LOGIN_RETRY_BACKOFF_MS_DOC);
parser.addArgument("--login-retry-backoff-max-ms")
.type(Long.class)
.dest("loginRetryBackoffMax")
.help(SASL_LOGIN_RETRY_BACKOFF_MAX_MS_DOC);
parser.addArgument("--scope-claim-name")
.dest("scopeClaimName")
.help(SASL_OAUTHBEARER_SCOPE_CLAIM_NAME_DOC);
parser.addArgument("--sub-claim-name")
.dest("subClaimName")
.help(SASL_OAUTHBEARER_SUB_CLAIM_NAME_DOC);
parser.addArgument("--token-endpoint-url")
.dest("tokenEndpointUrl")
.help(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL_DOC);
parser.addArgument("--jwks-endpoint-url")
.dest("jwksEndpointUrl")
.help(SASL_OAUTHBEARER_JWKS_ENDPOINT_URL_DOC);
parser.addArgument("--jwks-endpoint-refresh-ms")
.type(Long.class)
.dest("jwksEndpointRefreshMs")
.help(SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS_DOC);
parser.addArgument("--jwks-endpoint-retry-backoff-max-ms")
.type(Long.class)
.dest("jwksEndpointRetryBackoffMaxMs")
.help(SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS_DOC);
parser.addArgument("--jwks-endpoint-retry-backoff-ms")
.type(Long.class)
.dest("jwksEndpointRetryBackoffMs")
.help(SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS_DOC);
parser.addArgument("--clock-skew-seconds")
.type(Integer.class)
.dest("clockSkewSeconds")
.help(SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC);
parser.addArgument("--expected-audience")
.dest("expectedAudience")
.help(SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC);
parser.addArgument("--expected-issuer")
.dest("expectedIssuer")
.help(SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC);
parser.addArgument("--client-id")
.dest("clientId")
.help(CLIENT_ID_DOC);
parser.addArgument("--client-secret")
.dest("clientSecret")
.help(CLIENT_SECRET_DOC);
parser.addArgument("--scope")
.dest("scope")
.help(SCOPE_DOC);
Namespace namespace; Namespace namespace;
try { try {
namespace = parser.parseArgs(args); namespace = argsHandler.parseArgs(args);
} catch (ArgumentParserException e) { } catch (ArgumentParserException e) {
parser.handleError(e);
Exit.exit(1); Exit.exit(1);
return; return;
} }
Map<String, ?> configs = getConfigs(namespace); ConfigHandler configHandler = new ConfigHandler(namespace);
Map<String, Object> jaasConfigs = getJaasConfigs(namespace);
Map<String, ?> configs = configHandler.getConfigs();
Map<String, Object> jaasConfigs = configHandler.getJaasOptions();
try { try {
String accessToken; String accessToken;
@ -208,71 +170,217 @@ public class OAuthCompatibilityTool {
if (t instanceof ConfigException) { if (t instanceof ConfigException) {
System.out.printf("%n"); System.out.printf("%n");
parser.printHelp(); argsHandler.parser.printHelp();
} }
Exit.exit(1); Exit.exit(1);
} }
} }
private static Map<String, ?> getConfigs(Namespace namespace) {
Map<String, Object> c = new HashMap<>(); private static class ArgsHandler {
maybeAddInt(namespace, "connectTimeoutMs", c, SASL_LOGIN_CONNECT_TIMEOUT_MS);
maybeAddInt(namespace, "readTimeoutMs", c, SASL_LOGIN_READ_TIMEOUT_MS); private static final String DESCRIPTION = String.format(
maybeAddLong(namespace, "loginRetryBackoffMs", c, SASL_LOGIN_RETRY_BACKOFF_MS); "This tool is used to verify OAuth/OIDC provider compatibility.%n%n" +
maybeAddLong(namespace, "loginRetryBackoffMax", c, SASL_LOGIN_RETRY_BACKOFF_MAX_MS); "Run the following script to determine the configuration options:%n%n" +
maybeAddString(namespace, "scopeClaimName", c, SASL_OAUTHBEARER_SCOPE_CLAIM_NAME); " ./bin/kafka-run-class.sh %s --help",
maybeAddString(namespace, "subClaimName", c, SASL_OAUTHBEARER_SUB_CLAIM_NAME); OAuthCompatibilityTool.class.getName());
maybeAddString(namespace, "tokenEndpointUrl", c, SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL);
maybeAddString(namespace, "jwksEndpointUrl", c, SASL_OAUTHBEARER_JWKS_ENDPOINT_URL); private final ArgumentParser parser;
maybeAddLong(namespace, "jwksEndpdointRefreshMs", c, SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS);
maybeAddLong(namespace, "jwksEndpdointRetryBackoffMaxMs", c, SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS); private ArgsHandler() {
maybeAddLong(namespace, "jwksEndpdointRetryBackoffMs", c, SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS); this.parser = ArgumentParsers
maybeAddInt(namespace, "clockSkewSeconds", c, SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS); .newArgumentParser("oauth-compatibility-tool")
maybeAddStringList(namespace, "expectedAudience", c, SASL_OAUTHBEARER_EXPECTED_AUDIENCE); .defaultHelp(true)
maybeAddString(namespace, "expectedIssuer", c, SASL_OAUTHBEARER_EXPECTED_ISSUER); .description(DESCRIPTION);
}
private Namespace parseArgs(String[] args) throws ArgumentParserException {
// SASL/OAuth
addArgument(SASL_LOGIN_CONNECT_TIMEOUT_MS, SASL_LOGIN_CONNECT_TIMEOUT_MS_DOC, Integer.class);
addArgument(SASL_LOGIN_READ_TIMEOUT_MS, SASL_LOGIN_READ_TIMEOUT_MS_DOC, Integer.class);
addArgument(SASL_LOGIN_RETRY_BACKOFF_MAX_MS, SASL_LOGIN_RETRY_BACKOFF_MAX_MS_DOC, Long.class);
addArgument(SASL_LOGIN_RETRY_BACKOFF_MS, SASL_LOGIN_RETRY_BACKOFF_MS_DOC, Long.class);
addArgument(SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC, Integer.class);
addArgument(SASL_OAUTHBEARER_EXPECTED_AUDIENCE, SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC)
.action(Arguments.append());
addArgument(SASL_OAUTHBEARER_EXPECTED_ISSUER, SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC);
addArgument(SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS, SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS_DOC, Long.class);
addArgument(SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS, SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS_DOC, Long.class);
addArgument(SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS, SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS_DOC, Long.class);
addArgument(SASL_OAUTHBEARER_JWKS_ENDPOINT_URL, SASL_OAUTHBEARER_JWKS_ENDPOINT_URL_DOC);
addArgument(SASL_OAUTHBEARER_SCOPE_CLAIM_NAME, SASL_OAUTHBEARER_SCOPE_CLAIM_NAME_DOC);
addArgument(SASL_OAUTHBEARER_SUB_CLAIM_NAME, SASL_OAUTHBEARER_SUB_CLAIM_NAME_DOC);
addArgument(SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL_DOC);
// SSL
addArgument(SSL_CIPHER_SUITES_CONFIG, SSL_CIPHER_SUITES_DOC)
.action(Arguments.append());
addArgument(SSL_ENABLED_PROTOCOLS_CONFIG, SSL_ENABLED_PROTOCOLS_DOC)
.action(Arguments.append());
addArgument(SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC);
addArgument(SSL_ENGINE_FACTORY_CLASS_CONFIG, SSL_ENGINE_FACTORY_CLASS_DOC);
addArgument(SSL_KEYMANAGER_ALGORITHM_CONFIG, SSL_KEYMANAGER_ALGORITHM_DOC);
addArgument(SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, SSL_KEYSTORE_CERTIFICATE_CHAIN_DOC);
addArgument(SSL_KEYSTORE_KEY_CONFIG, SSL_KEYSTORE_KEY_DOC);
addArgument(SSL_KEYSTORE_LOCATION_CONFIG, SSL_KEYSTORE_LOCATION_DOC);
addArgument(SSL_KEYSTORE_PASSWORD_CONFIG, SSL_KEYSTORE_PASSWORD_DOC);
addArgument(SSL_KEYSTORE_TYPE_CONFIG, SSL_KEYSTORE_TYPE_DOC);
addArgument(SSL_KEY_PASSWORD_CONFIG, SSL_KEY_PASSWORD_DOC);
addArgument(SSL_PROTOCOL_CONFIG, SSL_PROTOCOL_DOC);
addArgument(SSL_PROVIDER_CONFIG, SSL_PROVIDER_DOC);
addArgument(SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG, SSL_SECURE_RANDOM_IMPLEMENTATION_DOC);
addArgument(SSL_TRUSTMANAGER_ALGORITHM_CONFIG, SSL_TRUSTMANAGER_ALGORITHM_DOC);
addArgument(SSL_TRUSTSTORE_CERTIFICATES_CONFIG, SSL_TRUSTSTORE_CERTIFICATES_DOC);
addArgument(SSL_TRUSTSTORE_LOCATION_CONFIG, SSL_TRUSTSTORE_LOCATION_DOC);
addArgument(SSL_TRUSTSTORE_PASSWORD_CONFIG, SSL_TRUSTSTORE_PASSWORD_DOC);
addArgument(SSL_TRUSTSTORE_TYPE_CONFIG, SSL_TRUSTSTORE_TYPE_DOC);
// JAAS options...
addArgument(CLIENT_ID_CONFIG, CLIENT_ID_DOC);
addArgument(CLIENT_SECRET_CONFIG, CLIENT_SECRET_DOC);
addArgument(SCOPE_CONFIG, SCOPE_DOC);
try {
return parser.parseArgs(args);
} catch (ArgumentParserException e) {
parser.handleError(e);
throw e;
}
}
private Argument addArgument(String option, String help) {
return addArgument(option, help, String.class);
}
private Argument addArgument(String option, String help, Class<?> clazz) {
// Change foo.bar into --foo.bar.
String name = "--" + option;
return parser.addArgument(name)
.type(clazz)
.metavar(option)
.dest(option)
.help(help);
}
}
private static class ConfigHandler {
private final Namespace namespace;
private ConfigHandler(Namespace namespace) {
this.namespace = namespace;
}
private Map<String, ?> getConfigs() {
Map<String, Object> m = new HashMap<>();
// SASL/OAuth
maybeAddInt(m, SASL_LOGIN_CONNECT_TIMEOUT_MS);
maybeAddInt(m, SASL_LOGIN_READ_TIMEOUT_MS);
maybeAddLong(m, SASL_LOGIN_RETRY_BACKOFF_MS);
maybeAddLong(m, SASL_LOGIN_RETRY_BACKOFF_MAX_MS);
maybeAddString(m, SASL_OAUTHBEARER_SCOPE_CLAIM_NAME);
maybeAddString(m, SASL_OAUTHBEARER_SUB_CLAIM_NAME);
maybeAddString(m, SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL);
maybeAddString(m, SASL_OAUTHBEARER_JWKS_ENDPOINT_URL);
maybeAddLong(m, SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS);
maybeAddLong(m, SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS);
maybeAddLong(m, SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS);
maybeAddInt(m, SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS);
maybeAddStringList(m, SASL_OAUTHBEARER_EXPECTED_AUDIENCE);
maybeAddString(m, SASL_OAUTHBEARER_EXPECTED_ISSUER);
// This here is going to fill in all the defaults for the values we don't specify... // This here is going to fill in all the defaults for the values we don't specify...
ConfigDef cd = new ConfigDef(); ConfigDef cd = new ConfigDef();
SaslConfigs.addClientSaslSupport(cd); SaslConfigs.addClientSaslSupport(cd);
AbstractConfig config = new AbstractConfig(cd, c); SslConfigs.addClientSslSupport(cd);
AbstractConfig config = new AbstractConfig(cd, m);
return config.values(); return config.values();
} }
private static void maybeAddInt(Namespace namespace, String namespaceKey, Map<String, Object> configs, String configsKey) { private Map<String, Object> getJaasOptions() {
Integer value = namespace.getInt(namespaceKey); Map<String, Object> m = new HashMap<>();
if (value != null) // SASL/OAuth
configs.put(configsKey, value); maybeAddString(m, CLIENT_ID_CONFIG);
maybeAddString(m, CLIENT_SECRET_CONFIG);
maybeAddString(m, SCOPE_CONFIG);
// SSL
maybeAddStringList(m, SSL_CIPHER_SUITES_CONFIG);
maybeAddStringList(m, SSL_ENABLED_PROTOCOLS_CONFIG);
maybeAddString(m, SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
maybeAddClass(m, SSL_ENGINE_FACTORY_CLASS_CONFIG);
maybeAddString(m, SSL_KEYMANAGER_ALGORITHM_CONFIG);
maybeAddPassword(m, SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG);
maybeAddPassword(m, SSL_KEYSTORE_KEY_CONFIG);
maybeAddString(m, SSL_KEYSTORE_LOCATION_CONFIG);
maybeAddPassword(m, SSL_KEYSTORE_PASSWORD_CONFIG);
maybeAddString(m, SSL_KEYSTORE_TYPE_CONFIG);
maybeAddPassword(m, SSL_KEY_PASSWORD_CONFIG);
maybeAddString(m, SSL_PROTOCOL_CONFIG);
maybeAddString(m, SSL_PROVIDER_CONFIG);
maybeAddString(m, SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG);
maybeAddString(m, SSL_TRUSTMANAGER_ALGORITHM_CONFIG);
maybeAddPassword(m, SSL_TRUSTSTORE_CERTIFICATES_CONFIG);
maybeAddString(m, SSL_TRUSTSTORE_LOCATION_CONFIG);
maybeAddPassword(m, SSL_TRUSTSTORE_PASSWORD_CONFIG);
maybeAddString(m, SSL_TRUSTSTORE_TYPE_CONFIG);
return m;
} }
private static void maybeAddLong(Namespace namespace, String namespaceKey, Map<String, Object> configs, String configsKey) { private void maybeAddInt(Map<String, Object> m, String option) {
Long value = namespace.getLong(namespaceKey); Integer value = namespace.getInt(option);
if (value != null) if (value != null)
configs.put(configsKey, value); m.put(option, value);
} }
private static void maybeAddString(Namespace namespace, String namespaceKey, Map<String, Object> configs, String configsKey) { private void maybeAddLong(Map<String, Object> m, String option) {
String value = namespace.getString(namespaceKey); Long value = namespace.getLong(option);
if (value != null) if (value != null)
configs.put(configsKey, value); m.put(option, value);
} }
private static void maybeAddStringList(Namespace namespace, String namespaceKey, Map<String, Object> configs, String configsKey) { private void maybeAddString(Map<String, Object> m, String option) {
String value = namespace.getString(namespaceKey); String value = namespace.getString(option);
if (value != null) if (value != null)
configs.put(configsKey, Arrays.asList(value.split(","))); m.put(option, value);
}
private void maybeAddPassword(Map<String, Object> m, String option) {
String value = namespace.getString(option);
if (value != null)
m.put(option, new Password(value));
}
private void maybeAddClass(Map<String, Object> m, String option) {
String value = namespace.getString(option);
if (value != null) {
try {
m.put(option, Class.forName(value));
} catch (ClassNotFoundException e) {
throw new KafkaException("Could not find class for " + option, e);
}
}
}
private void maybeAddStringList(Map<String, Object> m, String option) {
List<String> value = namespace.getList(option);
if (value != null)
m.put(option, value);
} }
private static Map<String, Object> getJaasConfigs(Namespace namespace) {
Map<String, Object> c = new HashMap<>();
c.put(CLIENT_ID_CONFIG, namespace.getString("clientId"));
c.put(CLIENT_SECRET_CONFIG, namespace.getString("clientSecret"));
c.put(SCOPE_CONFIG, namespace.getString("scope"));
return c;
} }
} }