KAFKA-19153: Add OAuth integration tests (#19938)

Adds a test dependency on
[mock-oauth2-server](https://github.com/navikt/mock-oauth2-server/) for
integration tests for OAuth layer. Also includes fixes for some
regressions that were caught by the integration tests.

Reviewers: Manikumar Reddy <manikumar@confluent.io>, Lianet Magrans
 <lmagrans@confluent.io>
This commit is contained in:
Kirk True 2025-06-12 12:48:14 -07:00 committed by GitHub
parent 0b2e410d61
commit 7d6e5edf8e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 308 additions and 9 deletions

View File

@ -1060,6 +1060,7 @@ project(':core') {
testImplementation libs.junitJupiter
testImplementation libs.caffeine
testImplementation testLog4j2Libs
testImplementation libs.mockOAuth2Server
testRuntimeOnly runtimeTestLibs
}

View File

@ -39,7 +39,10 @@ import javax.security.auth.login.AppConfigurationEntry;
import static org.apache.kafka.common.config.SaslConfigs.DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE;
import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID;
import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET;
import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_HEADER_URLENCODE;
import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SCOPE;
import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.CLIENT_ID_CONFIG;
import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_CONFIG;
import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.SCOPE_CONFIG;
@ -173,8 +176,8 @@ public class ClientCredentialsJwtRetriever implements JwtRetriever {
private String clientId() {
return getValue(
SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID,
CLIENT_ID_CONFIG,
"clientId",
true,
cu::validateString,
jou::validateString
@ -183,8 +186,8 @@ public class ClientCredentialsJwtRetriever implements JwtRetriever {
private String clientSecret() {
return getValue(
SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET,
CLIENT_SECRET_CONFIG,
"clientSecret",
true,
cu::validatePassword,
jou::validateString
@ -193,8 +196,8 @@ public class ClientCredentialsJwtRetriever implements JwtRetriever {
private String scope() {
return getValue(
SASL_OAUTHBEARER_SCOPE,
SCOPE_CONFIG,
"scope",
false,
cu::validateString,
jou::validateString

View File

@ -170,7 +170,7 @@ public class OAuthBearerValidatorCallbackHandler implements AuthenticateCallback
}
private void checkConfigured() {
if (verificationKeyResolver == null || jwtValidator == null)
if (jwtValidator == null)
throw new IllegalStateException(String.format("To use %s, first call the configure method", getClass().getSimpleName()));
}
}

View File

@ -22,6 +22,9 @@ import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
@ -47,6 +50,8 @@ import static org.apache.kafka.common.config.internals.BrokerSecurityConfigs.ALL
public class ConfigurationUtils {
private static final Logger LOG = LoggerFactory.getLogger(ConfigurationUtils.class);
private final Map<String, ?> configs;
private final String prefix;
@ -344,6 +349,13 @@ public class ConfigurationUtils {
((OAuthBearerConfigurable) o).configure(configs, saslMechanism, jaasConfigEntries);
} catch (Exception e) {
Utils.maybeCloseQuietly(o, "Instance of class " + o.getClass().getName() + " failed call to configure()");
LOG.warn(
"The class {} defined in the {} configuration encountered an error on configure(): {}",
o.getClass().getName(),
configName,
e.getMessage(),
e
);
throw new ConfigException(
String.format(
"The class %s defined in the %s configuration encountered an error on configure(): %s",

View File

@ -16,7 +16,7 @@
*/
package org.apache.kafka.common.security.oauthbearer.internals.secured.assertion;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.oauthbearer.JwtRetrieverException;
import org.apache.kafka.common.security.oauthbearer.internals.secured.CachedFile;
import org.apache.kafka.common.utils.Utils;
@ -89,7 +89,7 @@ public class DefaultAssertionCreator implements AssertionCreator {
return privateKey(contents.getBytes(StandardCharsets.UTF_8), passphrase);
} catch (GeneralSecurityException | IOException e) {
throw new KafkaException("An error occurred generating the OAuth assertion private key from " + file.getPath(), e);
throw new JwtRetrieverException("An error occurred generating the OAuth assertion private key from " + file.getPath(), e);
}
}
}

View File

@ -17,7 +17,6 @@
package org.apache.kafka.common.security.oauthbearer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.oauthbearer.internals.secured.OAuthBearerTest;
@ -95,7 +94,7 @@ public class JwtBearerJwtRetrieverTest extends OAuthBearerTest {
List<AppConfigurationEntry> jaasConfigEntries = getJaasConfigEntries();
try (JwtBearerJwtRetriever jwtRetriever = new JwtBearerJwtRetriever()) {
KafkaException e = assertThrows(KafkaException.class, () -> jwtRetriever.configure(configs, OAUTHBEARER_MECHANISM, jaasConfigEntries));
JwtRetrieverException e = assertThrows(JwtRetrieverException.class, () -> jwtRetriever.configure(configs, OAUTHBEARER_MECHANISM, jaasConfigEntries));
assertNotNull(e.getCause());
assertInstanceOf(GeneralSecurityException.class, e.getCause());
}
@ -144,7 +143,7 @@ public class JwtBearerJwtRetrieverTest extends OAuthBearerTest {
List<AppConfigurationEntry> jaasConfigEntries = getJaasConfigEntries();
try (JwtBearerJwtRetriever jwtRetriever = new JwtBearerJwtRetriever()) {
KafkaException e = assertThrows(KafkaException.class, () -> jwtRetriever.configure(configs, OAUTHBEARER_MECHANISM, jaasConfigEntries));
JwtRetrieverException e = assertThrows(JwtRetrieverException.class, () -> jwtRetriever.configure(configs, OAUTHBEARER_MECHANISM, jaasConfigEntries));
assertNotNull(e.getCause());
assertInstanceOf(IOException.class, e.getCause());
}

View File

@ -0,0 +1,261 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package integration.kafka.api
import com.nimbusds.jose.jwk.RSAKey
import kafka.api.{IntegrationTestHarness, SaslSetup}
import kafka.utils.TestInfoUtils
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.{ConfigException, SaslConfigs}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
import java.util.{Base64, Collections, Properties}
import no.nav.security.mock.oauth2.{MockOAuth2Server, OAuth2Config}
import no.nav.security.mock.oauth2.token.{KeyProvider, OAuth2TokenProvider}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.oauthbearer.{OAuthBearerLoginCallbackHandler, OAuthBearerLoginModule, OAuthBearerValidatorCallbackHandler}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertThrows}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import java.io.File
import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.file.StandardOpenOption
import java.security.{KeyPairGenerator, PrivateKey}
import java.security.interfaces.RSAPublicKey
import java.util
/**
* Integration tests for the consumer that cover basic usage as well as coordinator failure
*/
class ClientOAuthIntegrationTest extends IntegrationTestHarness with SaslSetup {
override val brokerCount = 3
override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
protected def kafkaClientSaslMechanism = "OAUTHBEARER"
protected def kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
val issuerId = "default"
var mockOAuthServer: MockOAuth2Server = _
var privateKey: PrivateKey = _
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
// Step 1: Generate the key pair dynamically.
val keyGen = KeyPairGenerator.getInstance("RSA")
keyGen.initialize(2048)
val keyPair = keyGen.generateKeyPair()
privateKey = keyPair.getPrivate
// Step 2: Create the RSA JWK from key pair.
val rsaJWK = new RSAKey.Builder(keyPair.getPublic.asInstanceOf[RSAPublicKey])
.privateKey(privateKey)
.keyID("foo")
.build()
// Step 3: Create the OAuth server using the keys just created
val keyProvider = new KeyProvider(Collections.singletonList(rsaJWK))
val tokenProvider = new OAuth2TokenProvider(keyProvider)
val oauthConfig = new OAuth2Config(false, null, null, false, tokenProvider)
mockOAuthServer = new MockOAuth2Server(oauthConfig)
mockOAuthServer.start()
val tokenEndpointUrl = mockOAuthServer.tokenEndpointUrl(issuerId).url().toString
val jwksUrl = mockOAuthServer.jwksUrl(issuerId).url().toString
System.setProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, s"$tokenEndpointUrl,$jwksUrl")
val listenerNamePrefix = s"listener.name.${listenerName.value().toLowerCase}"
serverConfig.setProperty(s"$listenerNamePrefix.oauthbearer.${SaslConfigs.SASL_JAAS_CONFIG}", s"${classOf[OAuthBearerLoginModule].getName} required ;")
serverConfig.setProperty(s"$listenerNamePrefix.oauthbearer.${SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE}", issuerId)
serverConfig.setProperty(s"$listenerNamePrefix.oauthbearer.${SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL}", jwksUrl)
serverConfig.setProperty(s"$listenerNamePrefix.oauthbearer.${BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS_CONFIG}", classOf[OAuthBearerValidatorCallbackHandler].getName)
// create static config including client login context with credentials for JaasTestUtils 'client2'
startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism)))
// The superuser needs the configuration in setUp because it's used to create resources before the individual
// test methods are invoked.
superuserClientConfig.putAll(defaultClientCredentialsConfigs())
super.setUp(testInfo)
}
@AfterEach
override def tearDown(): Unit = {
if (mockOAuthServer != null)
mockOAuthServer.shutdown()
closeSasl()
super.tearDown()
System.clearProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG)
System.clearProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG)
}
def defaultOAuthConfigs(): Properties = {
val tokenEndpointUrl = mockOAuthServer.tokenEndpointUrl(issuerId).url().toString
val configs = new Properties()
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name)
configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasClientLoginModule(kafkaClientSaslMechanism))
configs.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, classOf[OAuthBearerLoginCallbackHandler].getName)
configs.put(SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, tokenEndpointUrl)
configs
}
def defaultClientCredentialsConfigs(): Properties = {
val configs = defaultOAuthConfigs()
configs.put(SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID, "test-client")
configs.put(SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET, "test-secret")
configs
}
def defaultJwtBearerConfigs(): Properties = {
val configs = defaultOAuthConfigs()
configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasClientLoginModule(kafkaClientSaslMechanism))
configs.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, classOf[OAuthBearerLoginCallbackHandler].getName)
configs.put(SaslConfigs.SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS, "org.apache.kafka.common.security.oauthbearer.JwtBearerJwtRetriever")
configs
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testBasicClientCredentials(groupProtocol: String): Unit = {
val configs = defaultClientCredentialsConfigs()
assertDoesNotThrow(() => createProducer(configOverrides = configs))
assertDoesNotThrow(() => createConsumer(configOverrides = configs))
assertDoesNotThrow(() => createAdminClient(configOverrides = configs))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testBasicJwtBearer(groupProtocol: String): Unit = {
val jwt = mockOAuthServer.issueToken(issuerId, "jdoe", "someaudience", Collections.singletonMap("scope", "test"))
val assertionFile = TestUtils.tempFile(jwt.serialize())
System.setProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG, assertionFile.getAbsolutePath)
val configs = defaultJwtBearerConfigs()
configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_FILE, assertionFile.getAbsolutePath)
assertDoesNotThrow(() => createProducer(configOverrides = configs))
assertDoesNotThrow(() => createConsumer(configOverrides = configs))
assertDoesNotThrow(() => createAdminClient(configOverrides = configs))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testBasicJwtBearer2(groupProtocol: String): Unit = {
val privateKeyFile = generatePrivateKeyFile()
System.setProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG, privateKeyFile.getAbsolutePath)
val configs = defaultJwtBearerConfigs()
configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_PRIVATE_KEY_FILE, privateKeyFile.getPath)
configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_CLAIM_AUD, "default")
configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_CLAIM_SUB, "kafka-client-test-sub")
configs.put(SaslConfigs.SASL_OAUTHBEARER_SCOPE, "default")
// configs.put(SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME, "aud")
assertDoesNotThrow(() => createProducer(configOverrides = configs))
assertDoesNotThrow(() => createConsumer(configOverrides = configs))
assertDoesNotThrow(() => createAdminClient(configOverrides = configs))
}
@Disabled("KAFKA-19394: Failure in ConsumerNetworkThread.initializeResources() can cause hangs on AsyncKafkaConsumer.close()")
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testJwtBearerWithMalformedAssertionFile(groupProtocol: String): Unit = {
// Create the assertion file, but fill it with non-JWT garbage.
val assertionFile = TestUtils.tempFile("CQEN*)Q#F)&)^#QNC")
System.setProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG, assertionFile.getAbsolutePath)
val configs = defaultJwtBearerConfigs()
configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_FILE, assertionFile.getAbsolutePath)
assertThrows(classOf[KafkaException], () => createProducer(configOverrides = configs))
assertThrows(classOf[KafkaException], () => createConsumer(configOverrides = configs))
assertThrows(classOf[KafkaException], () => createAdminClient(configOverrides = configs))
}
@Disabled("KAFKA-19394: Failure in ConsumerNetworkThread.initializeResources() can cause hangs on AsyncKafkaConsumer.close()")
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testJwtBearerWithEmptyAssertionFile(groupProtocol: String): Unit = {
// Create the assertion file, but leave it empty.
val assertionFile = TestUtils.tempFile()
System.setProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG, assertionFile.getAbsolutePath)
val configs = defaultJwtBearerConfigs()
configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_FILE, assertionFile.getAbsolutePath)
assertThrows(classOf[KafkaException], () => createProducer(configOverrides = configs))
assertThrows(classOf[KafkaException], () => createConsumer(configOverrides = configs))
assertThrows(classOf[KafkaException], () => createAdminClient(configOverrides = configs))
}
@Disabled("KAFKA-19394: Failure in ConsumerNetworkThread.initializeResources() can cause hangs on AsyncKafkaConsumer.close()")
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testJwtBearerWithMissingAssertionFile(groupProtocol: String): Unit = {
val missingFileName = "/this/does/not/exist.txt"
val configs = defaultJwtBearerConfigs()
configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_FILE, missingFileName)
assertThrows(classOf[KafkaException], () => createProducer(configOverrides = configs))
assertThrows(classOf[KafkaException], () => createConsumer(configOverrides = configs))
assertThrows(classOf[KafkaException], () => createAdminClient(configOverrides = configs))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testUnsupportedJwtRetriever(groupProtocol: String): Unit = {
val className = "org.apache.kafka.common.security.oauthbearer.ThisIsNotARealJwtRetriever"
val configs = defaultOAuthConfigs()
configs.put(SaslConfigs.SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS, className)
assertThrows(classOf[ConfigException], () => createProducer(configOverrides = configs))
assertThrows(classOf[ConfigException], () => createConsumer(configOverrides = configs))
assertThrows(classOf[ConfigException], () => createAdminClient(configOverrides = configs))
}
def generatePrivateKeyFile(): File = {
val file = File.createTempFile("private-", ".key")
val bytes = Base64.getEncoder.encode(privateKey.getEncoded)
var channel: FileChannel = null
try {
channel = FileChannel.open(file.toPath, util.EnumSet.of(StandardOpenOption.WRITE))
Utils.writeFully(channel, ByteBuffer.wrap(bytes))
} finally {
channel.close()
}
file
}
}

View File

@ -123,6 +123,7 @@ versions += [
slf4j: "1.7.36",
snappy: "1.1.10.7",
spotbugs: "4.8.6",
mockOAuth2Server: "2.2.1",
zinc: "1.9.2",
// When updating the zstd version, please do as well in docker/native/native-image-configs/resource-config.json
// Also make sure the compression levels in org.apache.kafka.common.record.CompressionType are still valid
@ -224,6 +225,7 @@ libs += [
snappy: "org.xerial.snappy:snappy-java:$versions.snappy",
spotbugs: "com.github.spotbugs:spotbugs-annotations:$versions.spotbugs",
swaggerAnnotations: "io.swagger.core.v3:swagger-annotations:$swaggerVersion",
mockOAuth2Server: "no.nav.security:mock-oauth2-server:$versions.mockOAuth2Server",
jfreechart: "jfreechart:jfreechart:$versions.jfreechart",
mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact",
zstd: "com.github.luben:zstd-jni:$versions.zstd",

View File

@ -0,0 +1,21 @@
# The MIT License
Copyright 2025 NAV (Arbeids- og velferdsdirektoratet) - The Norwegian Labour and Welfare Administration
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the "Software"),
to deal in the Software without restriction, including without limitation
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included
in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
USE OR OTHER DEALINGS IN THE SOFTWARE.