diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java index 6ef77f0fb74..8c41762d76b 100644 --- a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java +++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java @@ -18,6 +18,7 @@ package org.apache.kafka.connect.rest.basic.auth.extension; import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.rest.ConnectRestExtension; import org.apache.kafka.connect.rest.ConnectRestExtensionContext; import org.slf4j.Logger; @@ -26,6 +27,7 @@ import org.slf4j.LoggerFactory; import javax.security.auth.login.Configuration; import java.io.IOException; import java.util.Map; +import java.util.function.Supplier; /** * Provides the ability to authenticate incoming BasicAuth credentials using the configured JAAS {@link @@ -62,14 +64,38 @@ public class BasicAuthSecurityRestExtension implements ConnectRestExtension { private static final Logger log = LoggerFactory.getLogger(BasicAuthSecurityRestExtension.class); + private static final Supplier CONFIGURATION = initializeConfiguration(Configuration::getConfiguration); + // Capture the JVM's global JAAS configuration as soon as possible, as it may be altered later // by connectors, converters, other REST extensions, etc. - private static final Configuration CONFIGURATION = Configuration.getConfiguration(); + static Supplier initializeConfiguration(Supplier configurationSupplier) { + try { + Configuration configuration = configurationSupplier.get(); + return () -> configuration; + } catch (Exception e) { + // We have to be careful not to throw anything here as this static block gets executed during plugin scanning and any exceptions will + // cause the worker to fail during startup, even if it's not configured to use the basic auth extension. + return () -> { + throw new ConnectException("Failed to retrieve JAAS configuration", e); + }; + } + } + + private final Supplier configuration; + + public BasicAuthSecurityRestExtension() { + this(CONFIGURATION); + } + + // For testing + BasicAuthSecurityRestExtension(Supplier configuration) { + this.configuration = configuration; + } @Override public void register(ConnectRestExtensionContext restPluginContext) { log.trace("Registering JAAS basic auth filter"); - restPluginContext.configurable().register(new JaasBasicAuthFilter(CONFIGURATION)); + restPluginContext.configurable().register(new JaasBasicAuthFilter(configuration.get())); log.trace("Finished registering JAAS basic auth filter"); } @@ -80,7 +106,8 @@ public class BasicAuthSecurityRestExtension implements ConnectRestExtension { @Override public void configure(Map configs) { - + // If we failed to retrieve a JAAS configuration during startup, throw that exception now + configuration.get(); } @Override diff --git a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtensionTest.java b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtensionTest.java index 2d809de0f89..a0ec4bba553 100644 --- a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtensionTest.java +++ b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtensionTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.rest.basic.auth.extension; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.rest.ConnectRestExtensionContext; import org.easymock.Capture; import org.easymock.EasyMock; @@ -27,7 +28,15 @@ import org.junit.jupiter.api.Test; import javax.security.auth.login.Configuration; import javax.ws.rs.core.Configurable; +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; public class BasicAuthSecurityRestExtensionTest { @@ -63,4 +72,44 @@ public class BasicAuthSecurityRestExtensionTest { assertNotEquals(overwrittenConfiguration, jaasFilter.getValue().configuration, "Overwritten JAAS configuration should not be used by basic auth REST extension"); } + + @Test + public void testBadJaasConfigInitialization() { + SecurityException jaasConfigurationException = new SecurityException(new IOException("Bad JAAS config is bad")); + Supplier configuration = BasicAuthSecurityRestExtension.initializeConfiguration(() -> { + throw jaasConfigurationException; + }); + + ConnectException thrownException = assertThrows(ConnectException.class, configuration::get); + assertEquals(jaasConfigurationException, thrownException.getCause()); + } + + @Test + public void testGoodJaasConfigInitialization() { + AtomicBoolean configurationInitializerEvaluated = new AtomicBoolean(false); + Configuration mockConfiguration = EasyMock.mock(Configuration.class); + Supplier configuration = BasicAuthSecurityRestExtension.initializeConfiguration(() -> { + configurationInitializerEvaluated.set(true); + return mockConfiguration; + }); + + assertTrue(configurationInitializerEvaluated.get()); + assertEquals(mockConfiguration, configuration.get()); + } + + @Test + public void testBadJaasConfigExtensionSetup() { + SecurityException jaasConfigurationException = new SecurityException(new IOException("Bad JAAS config is bad")); + Supplier configuration = () -> { + throw jaasConfigurationException; + }; + + BasicAuthSecurityRestExtension extension = new BasicAuthSecurityRestExtension(configuration); + + Exception thrownException = assertThrows(Exception.class, () -> extension.configure(Collections.emptyMap())); + assertEquals(jaasConfigurationException, thrownException); + + thrownException = assertThrows(Exception.class, () -> extension.register(EasyMock.mock(ConnectRestExtensionContext.class))); + assertEquals(jaasConfigurationException, thrownException); + } }