From 7d6e5edf8eff946b04865263ad8ed176480d7c6e Mon Sep 17 00:00:00 2001 From: Kirk True Date: Thu, 12 Jun 2025 12:48:14 -0700 Subject: [PATCH] KAFKA-19153: Add OAuth integration tests (#19938) Adds a test dependency on [mock-oauth2-server](https://github.com/navikt/mock-oauth2-server/) for integration tests for OAuth layer. Also includes fixes for some regressions that were caught by the integration tests. Reviewers: Manikumar Reddy , Lianet Magrans --- build.gradle | 1 + .../ClientCredentialsJwtRetriever.java | 9 +- .../OAuthBearerValidatorCallbackHandler.java | 2 +- .../internals/secured/ConfigurationUtils.java | 12 + .../assertion/DefaultAssertionCreator.java | 4 +- .../JwtBearerJwtRetrieverTest.java | 5 +- .../api/ClientOAuthIntegrationTest.scala | 261 ++++++++++++++++++ gradle/dependencies.gradle | 2 + licenses/mock-oauth2-server-MIT | 21 ++ 9 files changed, 308 insertions(+), 9 deletions(-) create mode 100644 core/src/test/scala/integration/kafka/api/ClientOAuthIntegrationTest.scala create mode 100644 licenses/mock-oauth2-server-MIT diff --git a/build.gradle b/build.gradle index 767395ad32d..8e0f4393252 100644 --- a/build.gradle +++ b/build.gradle @@ -1060,6 +1060,7 @@ project(':core') { testImplementation libs.junitJupiter testImplementation libs.caffeine testImplementation testLog4j2Libs + testImplementation libs.mockOAuth2Server testRuntimeOnly runtimeTestLibs } diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/ClientCredentialsJwtRetriever.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/ClientCredentialsJwtRetriever.java index 627434f6d3c..4744fd91289 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/ClientCredentialsJwtRetriever.java +++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/ClientCredentialsJwtRetriever.java @@ -39,7 +39,10 @@ import javax.security.auth.login.AppConfigurationEntry; import static org.apache.kafka.common.config.SaslConfigs.DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE; import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET; import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_HEADER_URLENCODE; +import static org.apache.kafka.common.config.SaslConfigs.SASL_OAUTHBEARER_SCOPE; import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.CLIENT_ID_CONFIG; import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.CLIENT_SECRET_CONFIG; import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler.SCOPE_CONFIG; @@ -173,8 +176,8 @@ public class ClientCredentialsJwtRetriever implements JwtRetriever { private String clientId() { return getValue( + SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID, CLIENT_ID_CONFIG, - "clientId", true, cu::validateString, jou::validateString @@ -183,8 +186,8 @@ public class ClientCredentialsJwtRetriever implements JwtRetriever { private String clientSecret() { return getValue( + SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET, CLIENT_SECRET_CONFIG, - "clientSecret", true, cu::validatePassword, jou::validateString @@ -193,8 +196,8 @@ public class ClientCredentialsJwtRetriever implements JwtRetriever { private String scope() { return getValue( + SASL_OAUTHBEARER_SCOPE, SCOPE_CONFIG, - "scope", false, cu::validateString, jou::validateString diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerValidatorCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerValidatorCallbackHandler.java index 6563d36b8b6..60fa8cdb678 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerValidatorCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerValidatorCallbackHandler.java @@ -170,7 +170,7 @@ public class OAuthBearerValidatorCallbackHandler implements AuthenticateCallback } private void checkConfigured() { - if (verificationKeyResolver == null || jwtValidator == null) + if (jwtValidator == null) throw new IllegalStateException(String.format("To use %s, first call the configure method", getClass().getSimpleName())); } } diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtils.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtils.java index a0819766a38..3eebecf8fde 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtils.java @@ -22,6 +22,9 @@ import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.File; import java.net.MalformedURLException; import java.net.URISyntaxException; @@ -47,6 +50,8 @@ import static org.apache.kafka.common.config.internals.BrokerSecurityConfigs.ALL public class ConfigurationUtils { + private static final Logger LOG = LoggerFactory.getLogger(ConfigurationUtils.class); + private final Map configs; private final String prefix; @@ -344,6 +349,13 @@ public class ConfigurationUtils { ((OAuthBearerConfigurable) o).configure(configs, saslMechanism, jaasConfigEntries); } catch (Exception e) { Utils.maybeCloseQuietly(o, "Instance of class " + o.getClass().getName() + " failed call to configure()"); + LOG.warn( + "The class {} defined in the {} configuration encountered an error on configure(): {}", + o.getClass().getName(), + configName, + e.getMessage(), + e + ); throw new ConfigException( String.format( "The class %s defined in the %s configuration encountered an error on configure(): %s", diff --git a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/assertion/DefaultAssertionCreator.java b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/assertion/DefaultAssertionCreator.java index db562fade87..52b9eb2fb53 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/assertion/DefaultAssertionCreator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/assertion/DefaultAssertionCreator.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.common.security.oauthbearer.internals.secured.assertion; -import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.security.oauthbearer.JwtRetrieverException; import org.apache.kafka.common.security.oauthbearer.internals.secured.CachedFile; import org.apache.kafka.common.utils.Utils; @@ -89,7 +89,7 @@ public class DefaultAssertionCreator implements AssertionCreator { return privateKey(contents.getBytes(StandardCharsets.UTF_8), passphrase); } catch (GeneralSecurityException | IOException e) { - throw new KafkaException("An error occurred generating the OAuth assertion private key from " + file.getPath(), e); + throw new JwtRetrieverException("An error occurred generating the OAuth assertion private key from " + file.getPath(), e); } } } diff --git a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/JwtBearerJwtRetrieverTest.java b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/JwtBearerJwtRetrieverTest.java index c466ac83689..4a4e567dedf 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/JwtBearerJwtRetrieverTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/JwtBearerJwtRetrieverTest.java @@ -17,7 +17,6 @@ package org.apache.kafka.common.security.oauthbearer; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.security.oauthbearer.internals.secured.OAuthBearerTest; @@ -95,7 +94,7 @@ public class JwtBearerJwtRetrieverTest extends OAuthBearerTest { List jaasConfigEntries = getJaasConfigEntries(); try (JwtBearerJwtRetriever jwtRetriever = new JwtBearerJwtRetriever()) { - KafkaException e = assertThrows(KafkaException.class, () -> jwtRetriever.configure(configs, OAUTHBEARER_MECHANISM, jaasConfigEntries)); + JwtRetrieverException e = assertThrows(JwtRetrieverException.class, () -> jwtRetriever.configure(configs, OAUTHBEARER_MECHANISM, jaasConfigEntries)); assertNotNull(e.getCause()); assertInstanceOf(GeneralSecurityException.class, e.getCause()); } @@ -144,7 +143,7 @@ public class JwtBearerJwtRetrieverTest extends OAuthBearerTest { List jaasConfigEntries = getJaasConfigEntries(); try (JwtBearerJwtRetriever jwtRetriever = new JwtBearerJwtRetriever()) { - KafkaException e = assertThrows(KafkaException.class, () -> jwtRetriever.configure(configs, OAUTHBEARER_MECHANISM, jaasConfigEntries)); + JwtRetrieverException e = assertThrows(JwtRetrieverException.class, () -> jwtRetriever.configure(configs, OAUTHBEARER_MECHANISM, jaasConfigEntries)); assertNotNull(e.getCause()); assertInstanceOf(IOException.class, e.getCause()); } diff --git a/core/src/test/scala/integration/kafka/api/ClientOAuthIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/ClientOAuthIntegrationTest.scala new file mode 100644 index 00000000000..22ab6f2673c --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/ClientOAuthIntegrationTest.scala @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package integration.kafka.api + +import com.nimbusds.jose.jwk.RSAKey +import kafka.api.{IntegrationTestHarness, SaslSetup} +import kafka.utils.TestInfoUtils +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.common.config.{ConfigException, SaslConfigs} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo} + +import java.util.{Base64, Collections, Properties} +import no.nav.security.mock.oauth2.{MockOAuth2Server, OAuth2Config} +import no.nav.security.mock.oauth2.token.{KeyProvider, OAuth2TokenProvider} +import org.apache.kafka.common.KafkaException +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.security.oauthbearer.{OAuthBearerLoginCallbackHandler, OAuthBearerLoginModule, OAuthBearerValidatorCallbackHandler} +import org.apache.kafka.common.utils.Utils +import org.apache.kafka.test.TestUtils +import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertThrows} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.MethodSource + +import java.io.File +import java.nio.ByteBuffer +import java.nio.channels.FileChannel +import java.nio.file.StandardOpenOption +import java.security.{KeyPairGenerator, PrivateKey} +import java.security.interfaces.RSAPublicKey +import java.util + +/** + * Integration tests for the consumer that cover basic usage as well as coordinator failure + */ +class ClientOAuthIntegrationTest extends IntegrationTestHarness with SaslSetup { + + override val brokerCount = 3 + + override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT + override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) + override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) + + protected def kafkaClientSaslMechanism = "OAUTHBEARER" + protected def kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism) + + val issuerId = "default" + var mockOAuthServer: MockOAuth2Server = _ + var privateKey: PrivateKey = _ + + @BeforeEach + override def setUp(testInfo: TestInfo): Unit = { + // Step 1: Generate the key pair dynamically. + val keyGen = KeyPairGenerator.getInstance("RSA") + keyGen.initialize(2048) + val keyPair = keyGen.generateKeyPair() + + privateKey = keyPair.getPrivate + + // Step 2: Create the RSA JWK from key pair. + val rsaJWK = new RSAKey.Builder(keyPair.getPublic.asInstanceOf[RSAPublicKey]) + .privateKey(privateKey) + .keyID("foo") + .build() + + // Step 3: Create the OAuth server using the keys just created + val keyProvider = new KeyProvider(Collections.singletonList(rsaJWK)) + val tokenProvider = new OAuth2TokenProvider(keyProvider) + val oauthConfig = new OAuth2Config(false, null, null, false, tokenProvider) + mockOAuthServer = new MockOAuth2Server(oauthConfig) + + mockOAuthServer.start() + val tokenEndpointUrl = mockOAuthServer.tokenEndpointUrl(issuerId).url().toString + val jwksUrl = mockOAuthServer.jwksUrl(issuerId).url().toString + System.setProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, s"$tokenEndpointUrl,$jwksUrl") + + val listenerNamePrefix = s"listener.name.${listenerName.value().toLowerCase}" + + serverConfig.setProperty(s"$listenerNamePrefix.oauthbearer.${SaslConfigs.SASL_JAAS_CONFIG}", s"${classOf[OAuthBearerLoginModule].getName} required ;") + serverConfig.setProperty(s"$listenerNamePrefix.oauthbearer.${SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE}", issuerId) + serverConfig.setProperty(s"$listenerNamePrefix.oauthbearer.${SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL}", jwksUrl) + serverConfig.setProperty(s"$listenerNamePrefix.oauthbearer.${BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS_CONFIG}", classOf[OAuthBearerValidatorCallbackHandler].getName) + + // create static config including client login context with credentials for JaasTestUtils 'client2' + startSasl(jaasSections(kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism))) + + // The superuser needs the configuration in setUp because it's used to create resources before the individual + // test methods are invoked. + superuserClientConfig.putAll(defaultClientCredentialsConfigs()) + + super.setUp(testInfo) + } + + @AfterEach + override def tearDown(): Unit = { + if (mockOAuthServer != null) + mockOAuthServer.shutdown() + + closeSasl() + super.tearDown() + + System.clearProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG) + System.clearProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG) + } + + def defaultOAuthConfigs(): Properties = { + val tokenEndpointUrl = mockOAuthServer.tokenEndpointUrl(issuerId).url().toString + + val configs = new Properties() + configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name) + configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasClientLoginModule(kafkaClientSaslMechanism)) + configs.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, classOf[OAuthBearerLoginCallbackHandler].getName) + configs.put(SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, tokenEndpointUrl) + configs + } + + def defaultClientCredentialsConfigs(): Properties = { + val configs = defaultOAuthConfigs() + configs.put(SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_ID, "test-client") + configs.put(SaslConfigs.SASL_OAUTHBEARER_CLIENT_CREDENTIALS_CLIENT_SECRET, "test-secret") + configs + } + + def defaultJwtBearerConfigs(): Properties = { + val configs = defaultOAuthConfigs() + configs.put(SaslConfigs.SASL_JAAS_CONFIG, jaasClientLoginModule(kafkaClientSaslMechanism)) + configs.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, classOf[OAuthBearerLoginCallbackHandler].getName) + configs.put(SaslConfigs.SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS, "org.apache.kafka.common.security.oauthbearer.JwtBearerJwtRetriever") + configs + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) + @MethodSource(Array("getTestGroupProtocolParametersAll")) + def testBasicClientCredentials(groupProtocol: String): Unit = { + val configs = defaultClientCredentialsConfigs() + assertDoesNotThrow(() => createProducer(configOverrides = configs)) + assertDoesNotThrow(() => createConsumer(configOverrides = configs)) + assertDoesNotThrow(() => createAdminClient(configOverrides = configs)) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) + @MethodSource(Array("getTestGroupProtocolParametersAll")) + def testBasicJwtBearer(groupProtocol: String): Unit = { + val jwt = mockOAuthServer.issueToken(issuerId, "jdoe", "someaudience", Collections.singletonMap("scope", "test")) + val assertionFile = TestUtils.tempFile(jwt.serialize()) + System.setProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG, assertionFile.getAbsolutePath) + + val configs = defaultJwtBearerConfigs() + configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_FILE, assertionFile.getAbsolutePath) + + assertDoesNotThrow(() => createProducer(configOverrides = configs)) + assertDoesNotThrow(() => createConsumer(configOverrides = configs)) + assertDoesNotThrow(() => createAdminClient(configOverrides = configs)) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) + @MethodSource(Array("getTestGroupProtocolParametersAll")) + def testBasicJwtBearer2(groupProtocol: String): Unit = { + val privateKeyFile = generatePrivateKeyFile() + System.setProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG, privateKeyFile.getAbsolutePath) + + val configs = defaultJwtBearerConfigs() + configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_PRIVATE_KEY_FILE, privateKeyFile.getPath) + configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_CLAIM_AUD, "default") + configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_CLAIM_SUB, "kafka-client-test-sub") + configs.put(SaslConfigs.SASL_OAUTHBEARER_SCOPE, "default") + // configs.put(SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME, "aud") + + assertDoesNotThrow(() => createProducer(configOverrides = configs)) + assertDoesNotThrow(() => createConsumer(configOverrides = configs)) + assertDoesNotThrow(() => createAdminClient(configOverrides = configs)) + } + + @Disabled("KAFKA-19394: Failure in ConsumerNetworkThread.initializeResources() can cause hangs on AsyncKafkaConsumer.close()") + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) + @MethodSource(Array("getTestGroupProtocolParametersAll")) + def testJwtBearerWithMalformedAssertionFile(groupProtocol: String): Unit = { + // Create the assertion file, but fill it with non-JWT garbage. + val assertionFile = TestUtils.tempFile("CQEN*)Q#F)&)^#QNC") + System.setProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG, assertionFile.getAbsolutePath) + + val configs = defaultJwtBearerConfigs() + configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_FILE, assertionFile.getAbsolutePath) + + assertThrows(classOf[KafkaException], () => createProducer(configOverrides = configs)) + assertThrows(classOf[KafkaException], () => createConsumer(configOverrides = configs)) + assertThrows(classOf[KafkaException], () => createAdminClient(configOverrides = configs)) + } + + @Disabled("KAFKA-19394: Failure in ConsumerNetworkThread.initializeResources() can cause hangs on AsyncKafkaConsumer.close()") + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) + @MethodSource(Array("getTestGroupProtocolParametersAll")) + def testJwtBearerWithEmptyAssertionFile(groupProtocol: String): Unit = { + // Create the assertion file, but leave it empty. + val assertionFile = TestUtils.tempFile() + System.setProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_FILES_CONFIG, assertionFile.getAbsolutePath) + + val configs = defaultJwtBearerConfigs() + configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_FILE, assertionFile.getAbsolutePath) + + assertThrows(classOf[KafkaException], () => createProducer(configOverrides = configs)) + assertThrows(classOf[KafkaException], () => createConsumer(configOverrides = configs)) + assertThrows(classOf[KafkaException], () => createAdminClient(configOverrides = configs)) + } + + @Disabled("KAFKA-19394: Failure in ConsumerNetworkThread.initializeResources() can cause hangs on AsyncKafkaConsumer.close()") + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) + @MethodSource(Array("getTestGroupProtocolParametersAll")) + def testJwtBearerWithMissingAssertionFile(groupProtocol: String): Unit = { + val missingFileName = "/this/does/not/exist.txt" + + val configs = defaultJwtBearerConfigs() + configs.put(SaslConfigs.SASL_OAUTHBEARER_ASSERTION_FILE, missingFileName) + + assertThrows(classOf[KafkaException], () => createProducer(configOverrides = configs)) + assertThrows(classOf[KafkaException], () => createConsumer(configOverrides = configs)) + assertThrows(classOf[KafkaException], () => createAdminClient(configOverrides = configs)) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) + @MethodSource(Array("getTestGroupProtocolParametersAll")) + def testUnsupportedJwtRetriever(groupProtocol: String): Unit = { + val className = "org.apache.kafka.common.security.oauthbearer.ThisIsNotARealJwtRetriever" + + val configs = defaultOAuthConfigs() + configs.put(SaslConfigs.SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS, className) + + assertThrows(classOf[ConfigException], () => createProducer(configOverrides = configs)) + assertThrows(classOf[ConfigException], () => createConsumer(configOverrides = configs)) + assertThrows(classOf[ConfigException], () => createAdminClient(configOverrides = configs)) + } + + def generatePrivateKeyFile(): File = { + val file = File.createTempFile("private-", ".key") + val bytes = Base64.getEncoder.encode(privateKey.getEncoded) + var channel: FileChannel = null + + try { + channel = FileChannel.open(file.toPath, util.EnumSet.of(StandardOpenOption.WRITE)) + Utils.writeFully(channel, ByteBuffer.wrap(bytes)) + } finally { + channel.close() + } + + file + } +} \ No newline at end of file diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index ba83624b0de..7d278545054 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -123,6 +123,7 @@ versions += [ slf4j: "1.7.36", snappy: "1.1.10.7", spotbugs: "4.8.6", + mockOAuth2Server: "2.2.1", zinc: "1.9.2", // When updating the zstd version, please do as well in docker/native/native-image-configs/resource-config.json // Also make sure the compression levels in org.apache.kafka.common.record.CompressionType are still valid @@ -224,6 +225,7 @@ libs += [ snappy: "org.xerial.snappy:snappy-java:$versions.snappy", spotbugs: "com.github.spotbugs:spotbugs-annotations:$versions.spotbugs", swaggerAnnotations: "io.swagger.core.v3:swagger-annotations:$swaggerVersion", + mockOAuth2Server: "no.nav.security:mock-oauth2-server:$versions.mockOAuth2Server", jfreechart: "jfreechart:jfreechart:$versions.jfreechart", mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact", zstd: "com.github.luben:zstd-jni:$versions.zstd", diff --git a/licenses/mock-oauth2-server-MIT b/licenses/mock-oauth2-server-MIT new file mode 100644 index 00000000000..ef1b1129b10 --- /dev/null +++ b/licenses/mock-oauth2-server-MIT @@ -0,0 +1,21 @@ +# The MIT License + +Copyright 2025 NAV (Arbeids- og velferdsdirektoratet) - The Norwegian Labour and Welfare Administration + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the "Software"), +to deal in the Software without restriction, including without limitation +the rights to use, copy, modify, merge, publish, distribute, sublicense, +and/or sell copies of the Software, and to permit persons to whom the +Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included +in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file