mirror of https://github.com/apache/kafka.git
KAFKA-17335 Lack of default for URL encoding configuration for OAuth causes NPE (#16990)
AccessTokenRetrieverFactory uses the value of sasl.oauthbearer.header.urlencode provided by the user, or null if no value was provided for that configuration. When the HttpAccessTokenRetriever is created the JVM attempts to unbox the value into a boolean, a NullPointerException is thrown. The fix is to explicitly check the Boolean, and if it's null, use Boolean.FALSE. Reviewers: bachmanity1 <81428651+bachmanity1@users.noreply.github.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
a19792fbd7
commit
a87b501a47
|
|
@ -17,12 +17,15 @@
|
|||
|
||||
package org.apache.kafka.common.security.oauthbearer.internals.secured;
|
||||
|
||||
import org.apache.kafka.common.config.SaslConfigs;
|
||||
|
||||
import java.net.URL;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.net.ssl.SSLSocketFactory;
|
||||
|
||||
import static org.apache.kafka.common.config.SaslConfigs.DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE;
|
||||
import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_CONNECT_TIMEOUT_MS;
|
||||
import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_READ_TIMEOUT_MS;
|
||||
import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MAX_MS;
|
||||
|
|
@ -70,6 +73,8 @@ public class AccessTokenRetrieverFactory {
|
|||
if (jou.shouldCreateSSLSocketFactory(tokenEndpointUrl))
|
||||
sslSocketFactory = jou.createSSLSocketFactory();
|
||||
|
||||
boolean urlencodeHeader = validateUrlencodeHeader(cu);
|
||||
|
||||
return new HttpAccessTokenRetriever(clientId,
|
||||
clientSecret,
|
||||
scope,
|
||||
|
|
@ -79,8 +84,27 @@ public class AccessTokenRetrieverFactory {
|
|||
cu.validateLong(SASL_LOGIN_RETRY_BACKOFF_MAX_MS),
|
||||
cu.validateInteger(SASL_LOGIN_CONNECT_TIMEOUT_MS, false),
|
||||
cu.validateInteger(SASL_LOGIN_READ_TIMEOUT_MS, false),
|
||||
cu.get(SASL_OAUTHBEARER_HEADER_URLENCODE));
|
||||
urlencodeHeader);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* In some cases, the incoming {@link Map} doesn't contain a value for
|
||||
* {@link SaslConfigs#SASL_OAUTHBEARER_HEADER_URLENCODE}. Returning {@code null} from {@link Map#get(Object)}
|
||||
* will cause a {@link NullPointerException} when it is later unboxed.
|
||||
*
|
||||
* <p/>
|
||||
*
|
||||
* This utility method ensures that we have a non-{@code null} value to use in the
|
||||
* {@link HttpAccessTokenRetriever} constructor.
|
||||
*/
|
||||
static boolean validateUrlencodeHeader(ConfigurationUtils configurationUtils) {
|
||||
Boolean urlencodeHeader = configurationUtils.validateBoolean(SASL_OAUTHBEARER_HEADER_URLENCODE, false);
|
||||
|
||||
if (urlencodeHeader != null)
|
||||
return urlencodeHeader;
|
||||
else
|
||||
return DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -72,17 +72,17 @@ public class ConfigurationUtils {
|
|||
try {
|
||||
file = new File(url.toURI().getRawPath()).getAbsoluteFile();
|
||||
} catch (URISyntaxException e) {
|
||||
throw new ConfigException(name, url.toString(), String.format("The OAuth configuration option %s contains a URL (%s) that is malformed: %s", name, url, e.getMessage()));
|
||||
throw new ConfigException(String.format("The OAuth configuration option %s contains a URL (%s) that is malformed: %s", name, url, e.getMessage()));
|
||||
}
|
||||
|
||||
if (!file.exists())
|
||||
throw new ConfigException(name, file, String.format("The OAuth configuration option %s contains a file (%s) that doesn't exist", name, file));
|
||||
throw new ConfigException(String.format("The OAuth configuration option %s contains a file (%s) that doesn't exist", name, file));
|
||||
|
||||
if (!file.canRead())
|
||||
throw new ConfigException(name, file, String.format("The OAuth configuration option %s contains a file (%s) that doesn't have read permission", name, file));
|
||||
throw new ConfigException(String.format("The OAuth configuration option %s contains a file (%s) that doesn't have read permission", name, file));
|
||||
|
||||
if (file.isDirectory())
|
||||
throw new ConfigException(name, file, String.format("The OAuth configuration option %s references a directory (%s), not a file", name, file));
|
||||
throw new ConfigException(String.format("The OAuth configuration option %s references a directory (%s), not a file", name, file));
|
||||
|
||||
return file.toPath();
|
||||
}
|
||||
|
|
@ -104,7 +104,7 @@ public class ConfigurationUtils {
|
|||
|
||||
if (value == null) {
|
||||
if (isRequired)
|
||||
throw new ConfigException(name, null, String.format("The OAuth configuration option %s must be non-null", name));
|
||||
throw new ConfigException(String.format("The OAuth configuration option %s must be non-null", name));
|
||||
else
|
||||
return null;
|
||||
}
|
||||
|
|
@ -137,13 +137,13 @@ public class ConfigurationUtils {
|
|||
|
||||
if (value == null) {
|
||||
if (isRequired)
|
||||
throw new ConfigException(name, null, String.format("The OAuth configuration option %s must be non-null", name));
|
||||
throw new ConfigException(String.format("The OAuth configuration option %s must be non-null", name));
|
||||
else
|
||||
return null;
|
||||
}
|
||||
|
||||
if (min != null && value < min)
|
||||
throw new ConfigException(name, value, String.format("The OAuth configuration option %s value must be at least %s", name, min));
|
||||
throw new ConfigException(String.format("The OAuth configuration option %s value must be at least %s", name, min));
|
||||
|
||||
return value;
|
||||
}
|
||||
|
|
@ -167,18 +167,18 @@ public class ConfigurationUtils {
|
|||
try {
|
||||
url = new URL(value);
|
||||
} catch (MalformedURLException e) {
|
||||
throw new ConfigException(name, value, String.format("The OAuth configuration option %s contains a URL (%s) that is malformed: %s", name, value, e.getMessage()));
|
||||
throw new ConfigException(String.format("The OAuth configuration option %s contains a URL (%s) that is malformed: %s", name, value, e.getMessage()));
|
||||
}
|
||||
|
||||
String protocol = url.getProtocol();
|
||||
|
||||
if (protocol == null || protocol.trim().isEmpty())
|
||||
throw new ConfigException(name, value, String.format("The OAuth configuration option %s contains a URL (%s) that is missing the protocol", name, value));
|
||||
throw new ConfigException(String.format("The OAuth configuration option %s contains a URL (%s) that is missing the protocol", name, value));
|
||||
|
||||
protocol = protocol.toLowerCase(Locale.ROOT);
|
||||
|
||||
if (!(protocol.equals("http") || protocol.equals("https") || protocol.equals("file")))
|
||||
throw new ConfigException(name, value, String.format("The OAuth configuration option %s contains a URL (%s) that contains an invalid protocol (%s); only \"http\", \"https\", and \"file\" protocol are supported", name, value, protocol));
|
||||
throw new ConfigException(String.format("The OAuth configuration option %s contains a URL (%s) that contains an invalid protocol (%s); only \"http\", \"https\", and \"file\" protocol are supported", name, value, protocol));
|
||||
|
||||
return url;
|
||||
}
|
||||
|
|
@ -209,6 +209,15 @@ public class ConfigurationUtils {
|
|||
return value;
|
||||
}
|
||||
|
||||
public Boolean validateBoolean(String name, boolean isRequired) {
|
||||
Boolean value = get(name);
|
||||
|
||||
if (value == null && isRequired)
|
||||
throw new ConfigException(String.format("The OAuth configuration option %s must be non-null", name));
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T> T get(String name) {
|
||||
T value = (T) configs.get(prefix + name);
|
||||
|
|
|
|||
|
|
@ -20,11 +20,17 @@ package org.apache.kafka.common.security.oauthbearer.internals.secured;
|
|||
import org.apache.kafka.common.config.ConfigException;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.apache.kafka.common.config.SaslConfigs.DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE;
|
||||
import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_HEADER_URLENCODE;
|
||||
import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
|
|
@ -64,4 +70,21 @@ public class AccessTokenRetrieverFactoryTest extends OAuthBearerTest {
|
|||
assertThrowsWithMessage(ConfigException.class, () -> AccessTokenRetrieverFactory.create(configs, jaasConfig), "that doesn't exist");
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("urlencodeHeaderSupplier")
|
||||
public void testUrlencodeHeader(Map<String, Object> configs, boolean expectedValue) {
|
||||
ConfigurationUtils cu = new ConfigurationUtils(configs);
|
||||
boolean actualValue = AccessTokenRetrieverFactory.validateUrlencodeHeader(cu);
|
||||
assertEquals(expectedValue, actualValue);
|
||||
}
|
||||
|
||||
private static Stream<Arguments> urlencodeHeaderSupplier() {
|
||||
return Stream.of(
|
||||
Arguments.of(Collections.emptyMap(), DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE),
|
||||
Arguments.of(Collections.singletonMap(SASL_OAUTHBEARER_HEADER_URLENCODE, null), DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE),
|
||||
Arguments.of(Collections.singletonMap(SASL_OAUTHBEARER_HEADER_URLENCODE, true), true),
|
||||
Arguments.of(Collections.singletonMap(SASL_OAUTHBEARER_HEADER_URLENCODE, false), false)
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue