mirror of https://github.com/apache/kafka.git
KAFKA-19153: Add OAuth integration tests (#19938)
Adds a test dependency on [mock-oauth2-server](https://github.com/navikt/mock-oauth2-server/) for integration tests for OAuth layer. Also includes fixes for some regressions that were caught by the integration tests. Reviewers: Manikumar Reddy <manikumar@confluent.io>, Lianet Magrans <lmagrans@confluent.io>
This commit is contained in:
parent
5f55e97a22
commit
923b6c3fea
|
@ -1060,6 +1060,7 @@ project(':core') {
|
||||||
testImplementation libs.junitJupiter
|
testImplementation libs.junitJupiter
|
||||||
testImplementation libs.caffeine
|
testImplementation libs.caffeine
|
||||||
testImplementation testLog4j2Libs
|
testImplementation testLog4j2Libs
|
||||||
|
testImplementation libs.mockOAuth2Server
|
||||||
|
|
||||||
testRuntimeOnly runtimeTestLibs
|
testRuntimeOnly runtimeTestLibs
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,7 +39,10 @@ import javax.security.auth.login.AppConfigurationEntry;
|
||||||
|
|
||||||
import static org.apache.kafka.common.config.SaslConfigs.DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE;
|
import static org.apache.kafka.common.config.SaslConfigs.DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE;
|
||||||
import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
|
import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
|
||||||
|
import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID;
|
||||||
|
import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET;
|
||||||
import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_HEADER_URLENCODE;
|
import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_HEADER_URLENCODE;
|
||||||
|
import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SCOPE;
|
||||||
import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.CLIENT_ID_CONFIG;
|
import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.CLIENT_ID_CONFIG;
|
||||||
import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_CONFIG;
|
import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_CONFIG;
|
||||||
import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.SCOPE_CONFIG;
|
import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.SCOPE_CONFIG;
|
||||||
|
@ -173,8 +176,8 @@ public class ClientCredentialsJwtRetriever implements JwtRetriever {
|
||||||
|
|
||||||
private String clientId() {
|
private String clientId() {
|
||||||
return getValue(
|
return getValue(
|
||||||
|
SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID,
|
||||||
CLIENT_ID_CONFIG,
|
CLIENT_ID_CONFIG,
|
||||||
"clientId",
|
|
||||||
true,
|
true,
|
||||||
cu::validateString,
|
cu::validateString,
|
||||||
jou::validateString
|
jou::validateString
|
||||||
|
@ -183,8 +186,8 @@ public class ClientCredentialsJwtRetriever implements JwtRetriever {
|
||||||
|
|
||||||
private String clientSecret() {
|
private String clientSecret() {
|
||||||
return getValue(
|
return getValue(
|
||||||
|
SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET,
|
||||||
CLIENT_SECRET_CONFIG,
|
CLIENT_SECRET_CONFIG,
|
||||||
"clientSecret",
|
|
||||||
true,
|
true,
|
||||||
cu::validatePassword,
|
cu::validatePassword,
|
||||||
jou::validateString
|
jou::validateString
|
||||||
|
@ -193,8 +196,8 @@ public class ClientCredentialsJwtRetriever implements JwtRetriever {
|
||||||
|
|
||||||
private String scope() {
|
private String scope() {
|
||||||
return getValue(
|
return getValue(
|
||||||
|
SASL_OAUTHBEARER_SCOPE,
|
||||||
SCOPE_CONFIG,
|
SCOPE_CONFIG,
|
||||||
"scope",
|
|
||||||
false,
|
false,
|
||||||
cu::validateString,
|
cu::validateString,
|
||||||
jou::validateString
|
jou::validateString
|
||||||
|
|
|
@ -170,7 +170,7 @@ public class OAuthBearerValidatorCallbackHandler implements AuthenticateCallback
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkConfigured() {
|
private void checkConfigured() {
|
||||||
if (verificationKeyResolver == null || jwtValidator == null)
|
if (jwtValidator == null)
|
||||||
throw new IllegalStateException(String.format("To use %s, first call the configure method", getClass().getSimpleName()));
|
throw new IllegalStateException(String.format("To use %s, first call the configure method", getClass().getSimpleName()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,9 @@ import org.apache.kafka.common.config.types.Password;
|
||||||
import org.apache.kafka.common.network.ListenerName;
|
import org.apache.kafka.common.network.ListenerName;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
|
@ -47,6 +50,8 @@ import static org.apache.kafka.common.config.internals.BrokerSecurityConfigs.ALL
|
||||||
|
|
||||||
public class ConfigurationUtils {
|
public class ConfigurationUtils {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(ConfigurationUtils.class);
|
||||||
|
|
||||||
private final Map<String, ?> configs;
|
private final Map<String, ?> configs;
|
||||||
|
|
||||||
private final String prefix;
|
private final String prefix;
|
||||||
|
@ -344,6 +349,13 @@ public class ConfigurationUtils {
|
||||||
((OAuthBearerConfigurable) o).configure(configs, saslMechanism, jaasConfigEntries);
|
((OAuthBearerConfigurable) o).configure(configs, saslMechanism, jaasConfigEntries);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Utils.maybeCloseQuietly(o, "Instance of class " + o.getClass().getName() + " failed call to configure()");
|
Utils.maybeCloseQuietly(o, "Instance of class " + o.getClass().getName() + " failed call to configure()");
|
||||||
|
LOG.warn(
|
||||||
|
"The class {} defined in the {} configuration encountered an error on configure(): {}",
|
||||||
|
o.getClass().getName(),
|
||||||
|
configName,
|
||||||
|
e.getMessage(),
|
||||||
|
e
|
||||||
|
);
|
||||||
throw new ConfigException(
|
throw new ConfigException(
|
||||||
String.format(
|
String.format(
|
||||||
"The class %s defined in the %s configuration encountered an error on configure(): %s",
|
"The class %s defined in the %s configuration encountered an error on configure(): %s",
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.kafka.common.security.oauthbearer.internals.secured.assertion;
|
package org.apache.kafka.common.security.oauthbearer.internals.secured.assertion;
|
||||||
|
|
||||||
import org.apache.kafka.common.KafkaException;
|
import org.apache.kafka.common.security.oauthbearer.JwtRetrieverException;
|
||||||
import org.apache.kafka.common.security.oauthbearer.internals.secured.CachedFile;
|
import org.apache.kafka.common.security.oauthbearer.internals.secured.CachedFile;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
|
|
||||||
|
@ -89,7 +89,7 @@ public class DefaultAssertionCreator implements AssertionCreator {
|
||||||
|
|
||||||
return privateKey(contents.getBytes(StandardCharsets.UTF_8), passphrase);
|
return privateKey(contents.getBytes(StandardCharsets.UTF_8), passphrase);
|
||||||
} catch (GeneralSecurityException | IOException e) {
|
} catch (GeneralSecurityException | IOException e) {
|
||||||
throw new KafkaException("An error occurred generating the OAuth assertion private key from " + file.getPath(), e);
|
throw new JwtRetrieverException("An error occurred generating the OAuth assertion private key from " + file.getPath(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
|
|
||||||
package org.apache.kafka.common.security.oauthbearer;
|
package org.apache.kafka.common.security.oauthbearer;
|
||||||
|
|
||||||
import org.apache.kafka.common.KafkaException;
|
|
||||||
import org.apache.kafka.common.config.SaslConfigs;
|
import org.apache.kafka.common.config.SaslConfigs;
|
||||||
import org.apache.kafka.common.security.oauthbearer.internals.secured.OAuthBearerTest;
|
import org.apache.kafka.common.security.oauthbearer.internals.secured.OAuthBearerTest;
|
||||||
|
|
||||||
|
@ -95,7 +94,7 @@ public class JwtBearerJwtRetrieverTest extends OAuthBearerTest {
|
||||||
List<AppConfigurationEntry> jaasConfigEntries = getJaasConfigEntries();
|
List<AppConfigurationEntry> jaasConfigEntries = getJaasConfigEntries();
|
||||||
|
|
||||||
try (JwtBearerJwtRetriever jwtRetriever = new JwtBearerJwtRetriever()) {
|
try (JwtBearerJwtRetriever jwtRetriever = new JwtBearerJwtRetriever()) {
|
||||||
KafkaException e = assertThrows(KafkaException.class, () -> jwtRetriever.configure(configs, OAUTHBEARER_MECHANISM, jaasConfigEntries));
|
JwtRetrieverException e = assertThrows(JwtRetrieverException.class, () -> jwtRetriever.configure(configs, OAUTHBEARER_MECHANISM, jaasConfigEntries));
|
||||||
assertNotNull(e.getCause());
|
assertNotNull(e.getCause());
|
||||||
assertInstanceOf(GeneralSecurityException.class, e.getCause());
|
assertInstanceOf(GeneralSecurityException.class, e.getCause());
|
||||||
}
|
}
|
||||||
|
@ -144,7 +143,7 @@ public class JwtBearerJwtRetrieverTest extends OAuthBearerTest {
|
||||||
List<AppConfigurationEntry> jaasConfigEntries = getJaasConfigEntries();
|
List<AppConfigurationEntry> jaasConfigEntries = getJaasConfigEntries();
|
||||||
|
|
||||||
try (JwtBearerJwtRetriever jwtRetriever = new JwtBearerJwtRetriever()) {
|
try (JwtBearerJwtRetriever jwtRetriever = new JwtBearerJwtRetriever()) {
|
||||||
KafkaException e = assertThrows(KafkaException.class, () -> jwtRetriever.configure(configs, OAUTHBEARER_MECHANISM, jaasConfigEntries));
|
JwtRetrieverException e = assertThrows(JwtRetrieverException.class, () -> jwtRetriever.configure(configs, OAUTHBEARER_MECHANISM, jaasConfigEntries));
|
||||||
assertNotNull(e.getCause());
|
assertNotNull(e.getCause());
|
||||||
assertInstanceOf(IOException.class, e.getCause());
|
assertInstanceOf(IOException.class, e.getCause());
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,261 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package integration.kafka.api
|
||||||
|
|
||||||
|
import com.nimbusds.jose.jwk.RSAKey
|
||||||
|
import kafka.api.{IntegrationTestHarness, SaslSetup}
|
||||||
|
import kafka.utils.TestInfoUtils
|
||||||
|
import org.apache.kafka.clients.CommonClientConfigs
|
||||||
|
import org.apache.kafka.common.config.{ConfigException, SaslConfigs}
|
||||||
|
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
|
||||||
|
|
||||||
|
import java.util.{Base64, Collections, Properties}
|
||||||
|
import no.nav.security.mock.oauth2.{MockOAuth2Server, OAuth2Config}
|
||||||
|
import no.nav.security.mock.oauth2.token.{KeyProvider, OAuth2TokenProvider}
|
||||||
|
import org.apache.kafka.common.KafkaException
|
||||||
|
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
|
||||||
|
import org.apache.kafka.common.security.auth.SecurityProtocol
|
||||||
|
import org.apache.kafka.common.security.oauthbearer.{OAuthBearerLoginCallbackHandler, OAuthBearerLoginModule, OAuthBearerValidatorCallbackHandler}
|
||||||
|
import org.apache.kafka.common.utils.Utils
|
||||||
|
import org.apache.kafka.test.TestUtils
|
||||||
|
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertThrows}
|
||||||
|
import org.junit.jupiter.params.ParameterizedTest
|
||||||
|
import org.junit.jupiter.params.provider.MethodSource
|
||||||
|
|
||||||
|
import java.io.File
|
||||||
|
import java.nio.ByteBuffer
|
||||||
|
import java.nio.channels.FileChannel
|
||||||
|
import java.nio.file.StandardOpenOption
|
||||||
|
import java.security.{KeyPairGenerator, PrivateKey}
|
||||||
|
import java.security.interfaces.RSAPublicKey
|
||||||
|
import java.util
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Integration tests for the consumer that cover basic usage as well as coordinator failure
|
||||||
|
*/
|
||||||
|
class ClientOAuthIntegrationTest extends IntegrationTestHarness with SaslSetup {
|
||||||
|
|
||||||
|
override val brokerCount = 3
|
||||||
|
|
||||||
|
override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
|
||||||
|
override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
|
||||||
|
override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
|
||||||
|
|
||||||
|
protected def kafkaClientSaslMechanism = "OAUTHBEARER"
|
||||||
|
protected def kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
|
||||||
|
|
||||||
|
val issuerId = "default"
|
||||||
|
var mockOAuthServer: MockOAuth2Server = _
|
||||||
|
var privateKey: PrivateKey = _
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
override def setUp(testInfo: TestInfo): Unit = {
|
||||||
|
// Step 1: Generate the key pair dynamically.
|
||||||
|
val keyGen = KeyPairGenerator.getInstance("RSA")
|
||||||
|
keyGen.initialize(2048)
|
||||||
|
val keyPair = keyGen.generateKeyPair()
|
||||||
|
|
||||||
|
privateKey = keyPair.getPrivate
|
||||||
|
|
||||||
|
// Step 2: Create the RSA JWK from key pair.
|
||||||
|
val rsaJWK = new RSAKey.Builder(keyPair.getPublic.asInstanceOf[RSAPublicKey])
|
||||||
|
.privateKey(privateKey)
|
||||||
|
.keyID("foo")
|
||||||
|
.build()
|
||||||
|
|
||||||
|
// Step 3: Create the OAuth server using the keys just created
|
||||||
|
val keyProvider = new KeyProvider(Collections.singletonList(rsaJWK))
|
||||||
|
val tokenProvider = new OAuth2TokenProvider(keyProvider)
|
||||||
|
val oauthConfig = new OAuth2Config(false, null, null, false, tokenProvider)
|
||||||
|
mockOAuthServer = new MockOAuth2Server(oauthConfig)
|
||||||
|
|
||||||
|
mockOAuthServer.start()
|
||||||
|
val tokenEndpointUrl = mockOAuthServer.tokenEndpointUrl(issuerId).url().toString
|
||||||
|
val jwksUrl = mockOAuthServer.jwksUrl(issuerId).url().toString
|
||||||
|
System.setProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, s"$tokenEndpointUrl,$jwksUrl")
|
||||||
|
|
||||||
|
val listenerNamePrefix = s"listener.name.${listenerName.value().toLowerCase}"
|
||||||
|
|
||||||
|
serverConfig.setProperty(s"$listenerNamePrefix.oauthbearer.${SaslConfigs.SASL_JAAS_CONFIG}", s"${classOf[OAuthBearerLoginModule].getName} required ;")
|
||||||
|
serverConfig.setProperty(s"$listenerNamePrefix.oauthbearer.${SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE}", issuerId)
|
||||||
|
serverConfig.setProperty(s"$listenerNamePrefix.oauthbearer.${SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL}", jwksUrl)
|
||||||
|
serverConfig.setProperty(s"$listenerNamePrefix.oauthbearer.${BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS_CONFIG}", classOf[OAuthBearerValidatorCallbackHandler].getName)
|
||||||
|
|
||||||
|
// create static config including client login context with credentials for JaasTestUtils 'client2'
|
||||||
|
startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism)))
|
||||||
|
|
||||||
|
// The superuser needs the configuration in setUp because it's used to create resources before the individual
|
||||||
|
// test methods are invoked.
|
||||||
|
superuserClientConfig.putAll(defaultClientCredentialsConfigs())
|
||||||
|
|
||||||
|
super.setUp(testInfo)
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterEach
|
||||||
|
override def tearDown(): Unit = {
|
||||||
|
if (mockOAuthServer != null)
|
||||||
|
mockOAuthServer.shutdown()
|
||||||
|
|
||||||
|
closeSasl()
|
||||||
|
super.tearDown()
|
||||||
|
|
||||||
|
System.clearProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG)
|
||||||
|
System.clearProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG)
|
||||||
|
}
|
||||||
|
|
||||||
|
def defaultOAuthConfigs(): Properties = {
|
||||||
|
val tokenEndpointUrl = mockOAuthServer.tokenEndpointUrl(issuerId).url().toString
|
||||||
|
|
||||||
|
val configs = new Properties()
|
||||||
|
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name)
|
||||||
|
configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasClientLoginModule(kafkaClientSaslMechanism))
|
||||||
|
configs.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, classOf[OAuthBearerLoginCallbackHandler].getName)
|
||||||
|
configs.put(SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, tokenEndpointUrl)
|
||||||
|
configs
|
||||||
|
}
|
||||||
|
|
||||||
|
def defaultClientCredentialsConfigs(): Properties = {
|
||||||
|
val configs = defaultOAuthConfigs()
|
||||||
|
configs.put(SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID, "test-client")
|
||||||
|
configs.put(SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET, "test-secret")
|
||||||
|
configs
|
||||||
|
}
|
||||||
|
|
||||||
|
def defaultJwtBearerConfigs(): Properties = {
|
||||||
|
val configs = defaultOAuthConfigs()
|
||||||
|
configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasClientLoginModule(kafkaClientSaslMechanism))
|
||||||
|
configs.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, classOf[OAuthBearerLoginCallbackHandler].getName)
|
||||||
|
configs.put(SaslConfigs.SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS, "org.apache.kafka.common.security.oauthbearer.JwtBearerJwtRetriever")
|
||||||
|
configs
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
|
||||||
|
@MethodSource(Array("getTestGroupProtocolParametersAll"))
|
||||||
|
def testBasicClientCredentials(groupProtocol: String): Unit = {
|
||||||
|
val configs = defaultClientCredentialsConfigs()
|
||||||
|
assertDoesNotThrow(() => createProducer(configOverrides = configs))
|
||||||
|
assertDoesNotThrow(() => createConsumer(configOverrides = configs))
|
||||||
|
assertDoesNotThrow(() => createAdminClient(configOverrides = configs))
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
|
||||||
|
@MethodSource(Array("getTestGroupProtocolParametersAll"))
|
||||||
|
def testBasicJwtBearer(groupProtocol: String): Unit = {
|
||||||
|
val jwt = mockOAuthServer.issueToken(issuerId, "jdoe", "someaudience", Collections.singletonMap("scope", "test"))
|
||||||
|
val assertionFile = TestUtils.tempFile(jwt.serialize())
|
||||||
|
System.setProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG, assertionFile.getAbsolutePath)
|
||||||
|
|
||||||
|
val configs = defaultJwtBearerConfigs()
|
||||||
|
configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_FILE, assertionFile.getAbsolutePath)
|
||||||
|
|
||||||
|
assertDoesNotThrow(() => createProducer(configOverrides = configs))
|
||||||
|
assertDoesNotThrow(() => createConsumer(configOverrides = configs))
|
||||||
|
assertDoesNotThrow(() => createAdminClient(configOverrides = configs))
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
|
||||||
|
@MethodSource(Array("getTestGroupProtocolParametersAll"))
|
||||||
|
def testBasicJwtBearer2(groupProtocol: String): Unit = {
|
||||||
|
val privateKeyFile = generatePrivateKeyFile()
|
||||||
|
System.setProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG, privateKeyFile.getAbsolutePath)
|
||||||
|
|
||||||
|
val configs = defaultJwtBearerConfigs()
|
||||||
|
configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_PRIVATE_KEY_FILE, privateKeyFile.getPath)
|
||||||
|
configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_CLAIM_AUD, "default")
|
||||||
|
configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_CLAIM_SUB, "kafka-client-test-sub")
|
||||||
|
configs.put(SaslConfigs.SASL_OAUTHBEARER_SCOPE, "default")
|
||||||
|
// configs.put(SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME, "aud")
|
||||||
|
|
||||||
|
assertDoesNotThrow(() => createProducer(configOverrides = configs))
|
||||||
|
assertDoesNotThrow(() => createConsumer(configOverrides = configs))
|
||||||
|
assertDoesNotThrow(() => createAdminClient(configOverrides = configs))
|
||||||
|
}
|
||||||
|
|
||||||
|
@Disabled("KAFKA-19394: Failure in ConsumerNetworkThread.initializeResources() can cause hangs on AsyncKafkaConsumer.close()")
|
||||||
|
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
|
||||||
|
@MethodSource(Array("getTestGroupProtocolParametersAll"))
|
||||||
|
def testJwtBearerWithMalformedAssertionFile(groupProtocol: String): Unit = {
|
||||||
|
// Create the assertion file, but fill it with non-JWT garbage.
|
||||||
|
val assertionFile = TestUtils.tempFile("CQEN*)Q#F)&)^#QNC")
|
||||||
|
System.setProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG, assertionFile.getAbsolutePath)
|
||||||
|
|
||||||
|
val configs = defaultJwtBearerConfigs()
|
||||||
|
configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_FILE, assertionFile.getAbsolutePath)
|
||||||
|
|
||||||
|
assertThrows(classOf[KafkaException], () => createProducer(configOverrides = configs))
|
||||||
|
assertThrows(classOf[KafkaException], () => createConsumer(configOverrides = configs))
|
||||||
|
assertThrows(classOf[KafkaException], () => createAdminClient(configOverrides = configs))
|
||||||
|
}
|
||||||
|
|
||||||
|
@Disabled("KAFKA-19394: Failure in ConsumerNetworkThread.initializeResources() can cause hangs on AsyncKafkaConsumer.close()")
|
||||||
|
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
|
||||||
|
@MethodSource(Array("getTestGroupProtocolParametersAll"))
|
||||||
|
def testJwtBearerWithEmptyAssertionFile(groupProtocol: String): Unit = {
|
||||||
|
// Create the assertion file, but leave it empty.
|
||||||
|
val assertionFile = TestUtils.tempFile()
|
||||||
|
System.setProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG, assertionFile.getAbsolutePath)
|
||||||
|
|
||||||
|
val configs = defaultJwtBearerConfigs()
|
||||||
|
configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_FILE, assertionFile.getAbsolutePath)
|
||||||
|
|
||||||
|
assertThrows(classOf[KafkaException], () => createProducer(configOverrides = configs))
|
||||||
|
assertThrows(classOf[KafkaException], () => createConsumer(configOverrides = configs))
|
||||||
|
assertThrows(classOf[KafkaException], () => createAdminClient(configOverrides = configs))
|
||||||
|
}
|
||||||
|
|
||||||
|
@Disabled("KAFKA-19394: Failure in ConsumerNetworkThread.initializeResources() can cause hangs on AsyncKafkaConsumer.close()")
|
||||||
|
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
|
||||||
|
@MethodSource(Array("getTestGroupProtocolParametersAll"))
|
||||||
|
def testJwtBearerWithMissingAssertionFile(groupProtocol: String): Unit = {
|
||||||
|
val missingFileName = "/this/does/not/exist.txt"
|
||||||
|
|
||||||
|
val configs = defaultJwtBearerConfigs()
|
||||||
|
configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_FILE, missingFileName)
|
||||||
|
|
||||||
|
assertThrows(classOf[KafkaException], () => createProducer(configOverrides = configs))
|
||||||
|
assertThrows(classOf[KafkaException], () => createConsumer(configOverrides = configs))
|
||||||
|
assertThrows(classOf[KafkaException], () => createAdminClient(configOverrides = configs))
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
|
||||||
|
@MethodSource(Array("getTestGroupProtocolParametersAll"))
|
||||||
|
def testUnsupportedJwtRetriever(groupProtocol: String): Unit = {
|
||||||
|
val className = "org.apache.kafka.common.security.oauthbearer.ThisIsNotARealJwtRetriever"
|
||||||
|
|
||||||
|
val configs = defaultOAuthConfigs()
|
||||||
|
configs.put(SaslConfigs.SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS, className)
|
||||||
|
|
||||||
|
assertThrows(classOf[ConfigException], () => createProducer(configOverrides = configs))
|
||||||
|
assertThrows(classOf[ConfigException], () => createConsumer(configOverrides = configs))
|
||||||
|
assertThrows(classOf[ConfigException], () => createAdminClient(configOverrides = configs))
|
||||||
|
}
|
||||||
|
|
||||||
|
def generatePrivateKeyFile(): File = {
|
||||||
|
val file = File.createTempFile("private-", ".key")
|
||||||
|
val bytes = Base64.getEncoder.encode(privateKey.getEncoded)
|
||||||
|
var channel: FileChannel = null
|
||||||
|
|
||||||
|
try {
|
||||||
|
channel = FileChannel.open(file.toPath, util.EnumSet.of(StandardOpenOption.WRITE))
|
||||||
|
Utils.writeFully(channel, ByteBuffer.wrap(bytes))
|
||||||
|
} finally {
|
||||||
|
channel.close()
|
||||||
|
}
|
||||||
|
|
||||||
|
file
|
||||||
|
}
|
||||||
|
}
|
|
@ -123,6 +123,7 @@ versions += [
|
||||||
slf4j: "1.7.36",
|
slf4j: "1.7.36",
|
||||||
snappy: "1.1.10.7",
|
snappy: "1.1.10.7",
|
||||||
spotbugs: "4.8.6",
|
spotbugs: "4.8.6",
|
||||||
|
mockOAuth2Server: "2.2.1",
|
||||||
zinc: "1.9.2",
|
zinc: "1.9.2",
|
||||||
// When updating the zstd version, please do as well in docker/native/native-image-configs/resource-config.json
|
// When updating the zstd version, please do as well in docker/native/native-image-configs/resource-config.json
|
||||||
// Also make sure the compression levels in org.apache.kafka.common.record.CompressionType are still valid
|
// Also make sure the compression levels in org.apache.kafka.common.record.CompressionType are still valid
|
||||||
|
@ -224,6 +225,7 @@ libs += [
|
||||||
snappy: "org.xerial.snappy:snappy-java:$versions.snappy",
|
snappy: "org.xerial.snappy:snappy-java:$versions.snappy",
|
||||||
spotbugs: "com.github.spotbugs:spotbugs-annotations:$versions.spotbugs",
|
spotbugs: "com.github.spotbugs:spotbugs-annotations:$versions.spotbugs",
|
||||||
swaggerAnnotations: "io.swagger.core.v3:swagger-annotations:$swaggerVersion",
|
swaggerAnnotations: "io.swagger.core.v3:swagger-annotations:$swaggerVersion",
|
||||||
|
mockOAuth2Server: "no.nav.security:mock-oauth2-server:$versions.mockOAuth2Server",
|
||||||
jfreechart: "jfreechart:jfreechart:$versions.jfreechart",
|
jfreechart: "jfreechart:jfreechart:$versions.jfreechart",
|
||||||
mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact",
|
mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact",
|
||||||
zstd: "com.github.luben:zstd-jni:$versions.zstd",
|
zstd: "com.github.luben:zstd-jni:$versions.zstd",
|
||||||
|
|
|
@ -0,0 +1,21 @@
|
||||||
|
# The MIT License
|
||||||
|
|
||||||
|
Copyright 2025 NAV (Arbeids- og velferdsdirektoratet) - The Norwegian Labour and Welfare Administration
|
||||||
|
|
||||||
|
Permission is hereby granted, free of charge, to any person obtaining
|
||||||
|
a copy of this software and associated documentation files (the "Software"),
|
||||||
|
to deal in the Software without restriction, including without limitation
|
||||||
|
the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||||
|
and/or sell copies of the Software, and to permit persons to whom the
|
||||||
|
Software is furnished to do so, subject to the following conditions:
|
||||||
|
|
||||||
|
The above copyright notice and this permission notice shall be included
|
||||||
|
in all copies or substantial portions of the Software.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||||
|
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||||
|
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
|
||||||
|
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
||||||
|
DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
||||||
|
OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
||||||
|
USE OR OTHER DEALINGS IN THE SOFTWARE.
|
Loading…
Reference in New Issue