KAFKA-10895: Attempt to prevent JAAS config from being overwritten for basic auth filter in Connect (#9806)

If a connector, converter, etc. invokes [Configuration::setConfiguration](https://docs.oracle.com/javase/8/docs/api/javax/security/auth/login/Configuration.html#setConfiguration-javax.security.auth.login.Configuration-), it will cause the Connect basic auth filter to use that JAAS config instead of the one configured at startup with the `-Djava.security.auth.login.config` JVM property. This can cause requests to the worker to succeed initially but start to be rejected after the JVM's global JAAS config is altered.

To alleviate this the current PR instructs the Connect Worker to cache the JVM's global JAAS configuration at the beginning (as soon as the `BasicAuthSecurityRestExtension` class is loaded), and use that for all future authentication. 

Existing tests for the JAAS basic auth filter are modified to work with the new internal logic. The `testEmptyCredentialsFile` test is corrected to actually operate on an empty credentials file (instead of a non-existent credentials file, which it currently operates on). A new test is added to ensure that, even if the global JAAS config is overwritten, the basic auth filter will use the first one it could find.

Reviewers: Greg Harris <gregh@confluent.io>, Konstantine Karantasis <k.karantasis@gmail.com>
This commit is contained in:
Chris Egerton 2021-01-12 00:20:57 -05:00 committed by GitHub
parent aedb53a4e6
commit 7455b70102
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 190 additions and 110 deletions

View File

@ -23,6 +23,7 @@ import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.security.auth.login.Configuration;
import java.io.IOException;
import java.util.Map;
@ -61,10 +62,14 @@ public class BasicAuthSecurityRestExtension implements ConnectRestExtension {
private static final Logger log = LoggerFactory.getLogger(BasicAuthSecurityRestExtension.class);
// Capture the JVM's global JAAS configuration as soon as possible, as it may be altered later
// by connectors, converters, other REST extensions, etc.
private static final Configuration CONFIGURATION = Configuration.getConfiguration();
@Override
public void register(ConnectRestExtensionContext restPluginContext) {
log.trace("Registering JAAS basic auth filter");
restPluginContext.configurable().register(JaasBasicAuthFilter.class);
restPluginContext.configurable().register(new JaasBasicAuthFilter(CONFIGURATION));
log.trace("Finished registering JAAS basic auth filter");
}

View File

@ -20,6 +20,7 @@ package org.apache.kafka.connect.rest.basic.auth.extension;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
import javax.security.auth.login.Configuration;
import javax.ws.rs.HttpMethod;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
@ -49,6 +50,13 @@ public class JaasBasicAuthFilter implements ContainerRequestFilter {
static final String AUTHORIZATION = "Authorization";
// Package-private for testing
final Configuration configuration;
public JaasBasicAuthFilter(Configuration configuration) {
this.configuration = configuration;
}
@Override
public void filter(ContainerRequestContext requestContext) throws IOException {
if (isInternalTaskConfigRequest(requestContext)) {
@ -58,9 +66,11 @@ public class JaasBasicAuthFilter implements ContainerRequestFilter {
try {
log.debug("Authenticating request");
LoginContext loginContext =
new LoginContext(CONNECT_LOGIN_MODULE, new BasicAuthCallBackHandler(
requestContext.getHeaderString(AUTHORIZATION)));
LoginContext loginContext = new LoginContext(
CONNECT_LOGIN_MODULE,
null,
new BasicAuthCallBackHandler(requestContext.getHeaderString(AUTHORIZATION)),
configuration);
loginContext.login();
} catch (LoginException | ConfigException e) {
// Log at debug here in order to avoid polluting log files whenever someone mistypes their credentials

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.rest.basic.auth.extension;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import javax.security.auth.login.Configuration;
import javax.ws.rs.core.Configurable;
import static org.junit.Assert.assertNotEquals;
public class BasicAuthSecurityRestExtensionTest {
Configuration priorConfiguration;
@Before
public void setup() {
priorConfiguration = Configuration.getConfiguration();
}
@After
public void tearDown() {
Configuration.setConfiguration(priorConfiguration);
}
@Test
@SuppressWarnings("unchecked")
public void testJaasConfigurationNotOverwritten() {
Capture<JaasBasicAuthFilter> jaasFilter = EasyMock.newCapture();
Configurable<? extends Configurable<?>> configurable = EasyMock.mock(Configurable.class);
EasyMock.expect(configurable.register(EasyMock.capture(jaasFilter))).andReturn(null);
ConnectRestExtensionContext context = EasyMock.mock(ConnectRestExtensionContext.class);
EasyMock.expect(context.configurable()).andReturn((Configurable) configurable);
EasyMock.replay(configurable, context);
BasicAuthSecurityRestExtension extension = new BasicAuthSecurityRestExtension();
Configuration overwrittenConfiguration = EasyMock.mock(Configuration.class);
Configuration.setConfiguration(overwrittenConfiguration);
extension.register(context);
assertNotEquals(
"Overwritten JAAS configuration should not be used by basic auth REST extension",
overwrittenConfiguration,
jaasFilter.getValue().configuration
);
}
}

View File

@ -23,15 +23,11 @@ import javax.security.auth.callback.ChoiceCallback;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.UriInfo;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.security.authenticator.TestJaasConfig;
import org.apache.kafka.connect.errors.ConnectException;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.MockStrict;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.modules.junit4.PowerMockRunner;
@ -41,136 +37,129 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.security.auth.login.Configuration;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.core.Response;
import static org.junit.Assert.assertThrows;
import static org.powermock.api.easymock.PowerMock.replayAll;
import static org.easymock.EasyMock.replay;
@RunWith(PowerMockRunner.class)
@PowerMockIgnore("javax.*")
public class JaasBasicAuthFilterTest {
@MockStrict
private ContainerRequestContext requestContext;
private JaasBasicAuthFilter jaasBasicAuthFilter = new JaasBasicAuthFilter();
private String previousJaasConfig;
private Configuration previousConfiguration;
@MockStrict
private UriInfo uriInfo;
@Before
public void setup() {
EasyMock.reset(requestContext);
}
@After
public void tearDown() {
if (previousJaasConfig != null) {
System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, previousJaasConfig);
}
Configuration.setConfiguration(previousConfiguration);
}
private static final String LOGIN_MODULE =
"org.apache.kafka.connect.rest.basic.auth.extension.PropertyFileLoginModule";
@Test
public void testSuccess() throws IOException {
File credentialFile = File.createTempFile("credential", ".properties");
credentialFile.deleteOnExit();
List<String> lines = new ArrayList<>();
lines.add("user=password");
lines.add("user1=password1");
Files.write(credentialFile.toPath(), lines, StandardCharsets.UTF_8);
setupJaasConfig("KafkaConnect", credentialFile.getPath(), true);
setMock("Basic", "user", "password", false);
jaasBasicAuthFilter.filter(requestContext);
}
@Test
public void testBadCredential() throws IOException {
setMock("Basic", "user1", "password", true);
jaasBasicAuthFilter.filter(requestContext);
}
@Test
public void testBadPassword() throws IOException {
setMock("Basic", "user", "password1", true);
jaasBasicAuthFilter.filter(requestContext);
}
@Test
public void testUnknownBearer() throws IOException {
setMock("Unknown", "user", "password", true);
jaasBasicAuthFilter.filter(requestContext);
}
@Test
public void testUnknownLoginModule() throws IOException {
setupJaasConfig("KafkaConnect1", "/tmp/testcrednetial", true);
Configuration.setConfiguration(null);
setMock("Basic", "user", "password", true);
jaasBasicAuthFilter.filter(requestContext);
}
@Test
public void testUnknownCredentialsFile() throws IOException {
setupJaasConfig("KafkaConnect", "/tmp/testcrednetial", true);
Configuration.setConfiguration(null);
setMock("Basic", "user", "password", true);
File credentialFile = setupPropertyLoginFile(true);
JaasBasicAuthFilter jaasBasicAuthFilter = setupJaasFilter("KafkaConnect", credentialFile.getPath());
ContainerRequestContext requestContext = setMock("Basic", "user", "password", false);
jaasBasicAuthFilter.filter(requestContext);
}
@Test
public void testEmptyCredentialsFile() throws IOException {
File jaasConfigFile = File.createTempFile("ks-jaas-", ".conf");
jaasConfigFile.deleteOnExit();
System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasConfigFile.getPath());
setupJaasConfig("KafkaConnect", "", true);
Configuration.setConfiguration(null);
setMock("Basic", "user", "password", true);
File credentialFile = setupPropertyLoginFile(false);
JaasBasicAuthFilter jaasBasicAuthFilter = setupJaasFilter("KafkaConnect", credentialFile.getPath());
ContainerRequestContext requestContext = setMock("Basic", "user", "password", false);
jaasBasicAuthFilter.filter(requestContext);
EasyMock.verify(requestContext);
}
@Test
public void testBadCredential() throws IOException {
File credentialFile = setupPropertyLoginFile(true);
JaasBasicAuthFilter jaasBasicAuthFilter = setupJaasFilter("KafkaConnect", credentialFile.getPath());
ContainerRequestContext requestContext = setMock("Basic", "user1", "password", true);
jaasBasicAuthFilter.filter(requestContext);
EasyMock.verify(requestContext);
}
@Test
public void testBadPassword() throws IOException {
File credentialFile = setupPropertyLoginFile(true);
JaasBasicAuthFilter jaasBasicAuthFilter = setupJaasFilter("KafkaConnect", credentialFile.getPath());
ContainerRequestContext requestContext = setMock("Basic", "user", "password1", true);
jaasBasicAuthFilter.filter(requestContext);
EasyMock.verify(requestContext);
}
@Test
public void testUnknownBearer() throws IOException {
File credentialFile = setupPropertyLoginFile(true);
JaasBasicAuthFilter jaasBasicAuthFilter = setupJaasFilter("KafkaConnect", credentialFile.getPath());
ContainerRequestContext requestContext = setMock("Unknown", "user", "password", true);
jaasBasicAuthFilter.filter(requestContext);
EasyMock.verify(requestContext);
}
@Test
public void testUnknownLoginModule() throws IOException {
File credentialFile = setupPropertyLoginFile(true);
JaasBasicAuthFilter jaasBasicAuthFilter = setupJaasFilter("KafkaConnect1", credentialFile.getPath());
ContainerRequestContext requestContext = setMock("Basic", "user", "password", true);
jaasBasicAuthFilter.filter(requestContext);
EasyMock.verify(requestContext);
}
@Test
public void testUnknownCredentialsFile() throws IOException {
JaasBasicAuthFilter jaasBasicAuthFilter = setupJaasFilter("KafkaConnect", "/tmp/testcrednetial");
ContainerRequestContext requestContext = setMock("Basic", "user", "password", true);
jaasBasicAuthFilter.filter(requestContext);
EasyMock.verify(requestContext);
}
@Test
public void testNoFileOption() throws IOException {
File jaasConfigFile = File.createTempFile("ks-jaas-", ".conf");
jaasConfigFile.deleteOnExit();
System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasConfigFile.getPath());
setupJaasConfig("KafkaConnect", "", false);
Configuration.setConfiguration(null);
setMock("Basic", "user", "password", true);
JaasBasicAuthFilter jaasBasicAuthFilter = setupJaasFilter("KafkaConnect", null);
ContainerRequestContext requestContext = setMock("Basic", "user", "password", true);
jaasBasicAuthFilter.filter(requestContext);
EasyMock.verify(requestContext);
}
@Test
public void testPostWithoutAppropriateCredential() throws IOException {
EasyMock.expect(requestContext.getMethod()).andReturn(HttpMethod.POST);
EasyMock.expect(requestContext.getUriInfo()).andReturn(uriInfo);
UriInfo uriInfo = EasyMock.strictMock(UriInfo.class);
EasyMock.expect(uriInfo.getPath()).andReturn("connectors/connName/tasks");
PowerMock.replayAll();
ContainerRequestContext requestContext = EasyMock.strictMock(ContainerRequestContext.class);
EasyMock.expect(requestContext.getMethod()).andReturn(HttpMethod.POST);
EasyMock.expect(requestContext.getUriInfo()).andReturn(uriInfo);
replay(uriInfo, requestContext);
File credentialFile = setupPropertyLoginFile(true);
JaasBasicAuthFilter jaasBasicAuthFilter = setupJaasFilter("KafkaConnect1", credentialFile.getPath());
jaasBasicAuthFilter.filter(requestContext);
EasyMock.verify(requestContext);
}
@Test
public void testPostNotChangingConnectorTask() throws IOException {
UriInfo uriInfo = EasyMock.strictMock(UriInfo.class);
EasyMock.expect(uriInfo.getPath()).andReturn("local:randomport/connectors/connName");
ContainerRequestContext requestContext = EasyMock.strictMock(ContainerRequestContext.class);
EasyMock.expect(requestContext.getMethod()).andReturn(HttpMethod.POST);
EasyMock.expect(requestContext.getUriInfo()).andReturn(uriInfo);
EasyMock.expect(uriInfo.getPath()).andReturn("local:randomport/connectors/connName");
String authHeader = "Basic" + Base64.getEncoder().encodeToString(("user" + ":" + "password").getBytes());
EasyMock.expect(requestContext.getHeaderString(JaasBasicAuthFilter.AUTHORIZATION))
.andReturn(authHeader);
requestContext.abortWith(EasyMock.anyObject(Response.class));
EasyMock.expectLastCall();
PowerMock.replayAll();
replay(uriInfo, requestContext);
File credentialFile = setupPropertyLoginFile(true);
JaasBasicAuthFilter jaasBasicAuthFilter = setupJaasFilter("KafkaConnect", credentialFile.getPath());
jaasBasicAuthFilter.filter(requestContext);
EasyMock.verify(requestContext);
}
@ -193,7 +182,8 @@ public class JaasBasicAuthFilterTest {
return authorization + " " + Base64.getEncoder().encodeToString((username + ":" + password).getBytes());
}
private void setMock(String authorization, String username, String password, boolean exceptionCase) {
private ContainerRequestContext setMock(String authorization, String username, String password, boolean exceptionCase) {
ContainerRequestContext requestContext = EasyMock.strictMock(ContainerRequestContext.class);
EasyMock.expect(requestContext.getMethod()).andReturn(HttpMethod.GET);
EasyMock.expect(requestContext.getHeaderString(JaasBasicAuthFilter.AUTHORIZATION))
.andReturn(authHeader(authorization, username, password));
@ -201,23 +191,29 @@ public class JaasBasicAuthFilterTest {
requestContext.abortWith(EasyMock.anyObject(Response.class));
EasyMock.expectLastCall();
}
replayAll();
replay(requestContext);
return requestContext;
}
private void setupJaasConfig(String loginModule, String credentialFilePath, boolean includeFileOptions) throws IOException {
File jaasConfigFile = File.createTempFile("ks-jaas-", ".conf");
jaasConfigFile.deleteOnExit();
previousJaasConfig = System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasConfigFile.getPath());
List<String> lines;
lines = new ArrayList<>();
lines.add(loginModule + " { org.apache.kafka.connect.rest.basic.auth.extension.PropertyFileLoginModule required ");
if (includeFileOptions) {
lines.add("file=\"" + credentialFilePath + "\"");
private File setupPropertyLoginFile(boolean includeUsers) throws IOException {
File credentialFile = File.createTempFile("credential", ".properties");
credentialFile.deleteOnExit();
if (includeUsers) {
List<String> lines = new ArrayList<>();
lines.add("user=password");
lines.add("user1=password1");
Files.write(credentialFile.toPath(), lines, StandardCharsets.UTF_8);
}
lines.add(";};");
Files.write(jaasConfigFile.toPath(), lines, StandardCharsets.UTF_8);
previousConfiguration = Configuration.getConfiguration();
Configuration.setConfiguration(null);
return credentialFile;
}
private JaasBasicAuthFilter setupJaasFilter(String name, String credentialFilePath) {
TestJaasConfig configuration = new TestJaasConfig();
Map<String, Object> moduleOptions = credentialFilePath != null
? Collections.singletonMap("file", credentialFilePath)
: Collections.emptyMap();
configuration.addEntry(name, LOGIN_MODULE, moduleOptions);
return new JaasBasicAuthFilter(configuration);
}
}