KAFKA-17078: Add SecurityManagerCompatibility shim (#16522) (#19221)

Backport for 3.9 for https://github.com/apache/kafka/pull/16522

The reason to do this is that this is necessary in order for 3.9 to support Java 24.

Please see https://lists.apache.org/thread/6k942pphowd28dh9gn6xbnngk6nxs3n0 where it is being discussed whether to do this.

Co-authored-by: Greg Harris <greg.harris@aiven.io>

Reviewers: Luke Chen <showuon@gmail.com>, Greg Harris <greg.harris@aiven.io>
This commit is contained in:
Stig Døssing 2025-04-07 04:39:24 +02:00 committed by GitHub
parent 0f12ea4c08
commit 392a56423e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 805 additions and 69 deletions

View File

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.internals;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.security.PrivilegedAction;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import javax.security.auth.Subject;
/**
* This strategy combines the functionality of the {@link LegacyStrategy}, {@link ModernStrategy}, and
* {@link UnsupportedStrategy} strategies to provide the legacy APIs as long as they are present and not degraded.
* If the legacy APIs are missing or degraded, this falls back to the modern APIs.
*/
class CompositeStrategy implements SecurityManagerCompatibility {
private static final Logger log = LoggerFactory.getLogger(CompositeStrategy.class);
static final CompositeStrategy INSTANCE = new CompositeStrategy(ReflectiveStrategy.Loader.forName());
private final SecurityManagerCompatibility fallbackStrategy;
private final AtomicReference<SecurityManagerCompatibility> activeStrategy;
// Visible for testing
CompositeStrategy(ReflectiveStrategy.Loader loader) {
SecurityManagerCompatibility initial;
SecurityManagerCompatibility fallback = null;
try {
initial = new LegacyStrategy(loader);
try {
fallback = new ModernStrategy(loader);
// This is expected for JRE 18+
log.debug("Loaded legacy SecurityManager methods, will fall back to modern methods after UnsupportedOperationException");
} catch (NoSuchMethodException | ClassNotFoundException ex) {
// This is expected for JRE <= 17
log.debug("Unable to load modern Subject methods, relying only on legacy methods", ex);
}
} catch (ClassNotFoundException | NoSuchMethodException e) {
try {
initial = new ModernStrategy(loader);
// This is expected for JREs after the removal takes place.
log.debug("Unable to load legacy SecurityManager methods, relying only on modern methods", e);
} catch (NoSuchMethodException | ClassNotFoundException ex) {
initial = new UnsupportedStrategy(e, ex);
// This is not expected in normal use, only in test environments.
log.error("Unable to load legacy SecurityManager methods", e);
log.error("Unable to load modern Subject methods", ex);
}
}
Objects.requireNonNull(initial, "initial strategy must be defined");
activeStrategy = new AtomicReference<>(initial);
fallbackStrategy = fallback;
}
private <T> T performAction(Function<SecurityManagerCompatibility, T> action) {
SecurityManagerCompatibility active = activeStrategy.get();
try {
return action.apply(active);
} catch (UnsupportedOperationException e) {
// If we chose a fallback strategy during loading, switch to it and retry this operation.
if (active != fallbackStrategy && fallbackStrategy != null) {
if (activeStrategy.compareAndSet(active, fallbackStrategy)) {
log.debug("Using fallback strategy after encountering degraded legacy method", e);
}
return action.apply(fallbackStrategy);
}
// If we're already using the fallback strategy, then there's nothing to do to handle these exceptions.
throw e;
}
}
@Override
public <T> T doPrivileged(PrivilegedAction<T> action) {
return performAction(compatibility -> compatibility.doPrivileged(action));
}
@Override
public Subject current() {
return performAction(SecurityManagerCompatibility::current);
}
@Override
public <T> T callAs(Subject subject, Callable<T> action) throws CompletionException {
return performAction(compatibility -> compatibility.callAs(subject, action));
}
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.internals;
import java.lang.reflect.Method;
import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionException;
import javax.security.auth.Subject;
/**
* This class implements reflective access to the deprecated-for-removal methods of AccessController and Subject.
* <p>Instantiating this class may fail if any of the required classes or methods are not found.
* Method invocations for this class may fail with {@link UnsupportedOperationException} if all methods are found,
* but the operation is not permitted to be invoked.
* <p>This class is expected to be instantiable in JRE >=8 until the removal finally takes place.
*/
@SuppressWarnings("unchecked")
class LegacyStrategy implements SecurityManagerCompatibility {
private final Method doPrivileged;
private final Method getContext;
private final Method getSubject;
private final Method doAs;
// Visible for testing
LegacyStrategy(ReflectiveStrategy.Loader loader) throws ClassNotFoundException, NoSuchMethodException {
Class<?> accessController = loader.loadClass("java.security.AccessController");
doPrivileged = accessController.getDeclaredMethod("doPrivileged", PrivilegedAction.class);
getContext = accessController.getDeclaredMethod("getContext");
Class<?> accessControlContext = loader.loadClass("java.security.AccessControlContext");
Class<?> subject = loader.loadClass(Subject.class.getName());
getSubject = subject.getDeclaredMethod("getSubject", accessControlContext);
// Note that the Subject class isn't deprecated or removed, so reference it as an argument type.
// This allows for mocking out the method implementation while still accepting Subject instances as arguments.
doAs = subject.getDeclaredMethod("doAs", Subject.class, PrivilegedExceptionAction.class);
}
@Override
public <T> T doPrivileged(PrivilegedAction<T> action) {
return (T) ReflectiveStrategy.invoke(doPrivileged, null, action);
}
/**
* @return the result of AccessController.getContext(), of type AccessControlContext
*/
private Object getContext() {
return ReflectiveStrategy.invoke(getContext, null);
}
/**
* @param context The current AccessControlContext
* @return The result of Subject.getSubject(AccessControlContext)
*/
private Subject getSubject(Object context) {
return (Subject) ReflectiveStrategy.invoke(getSubject, null, context);
}
@Override
public Subject current() {
return getSubject(getContext());
}
/**
* @return The result of Subject.doAs(Subject, PrivilegedExceptionAction)
*/
private <T> T doAs(Subject subject, PrivilegedExceptionAction<T> action) throws PrivilegedActionException {
return (T) ReflectiveStrategy.invokeChecked(doAs, PrivilegedActionException.class, null, subject, action);
}
@Override
public <T> T callAs(Subject subject, Callable<T> callable) throws CompletionException {
try {
return doAs(subject, callable::call);
} catch (PrivilegedActionException e) {
throw new CompletionException(e.getCause());
}
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.internals;
import java.lang.reflect.Method;
import java.security.PrivilegedAction;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionException;
import javax.security.auth.Subject;
/**
* This class implements reflective access to the methods of Subject added to replace deprecated methods.
* <p>Instantiating this class may fail if any of the required classes or methods are not found.
* Method invocations for this class may fail with {@link UnsupportedOperationException} if all methods are found,
* but the operation is not permitted to be invoked.
* <p>This class is expected to be instantiable in JRE >= 18. At the time of writing, these methods do not have
* a sunset date, and are expected to be available past the removal of the SecurityManager.
*/
@SuppressWarnings("unchecked")
class ModernStrategy implements SecurityManagerCompatibility {
private final Method current;
private final Method callAs;
// Visible for testing
ModernStrategy(ReflectiveStrategy.Loader loader) throws NoSuchMethodException, ClassNotFoundException {
Class<?> subject = loader.loadClass(Subject.class.getName());
current = subject.getDeclaredMethod("current");
// Note that the Subject class isn't deprecated or removed, so reference it as an argument type.
// This allows for mocking out the method implementation while still accepting Subject instances as arguments.
callAs = subject.getDeclaredMethod("callAs", Subject.class, Callable.class);
}
@Override
public <T> T doPrivileged(PrivilegedAction<T> action) {
// This is intentionally a pass-through
return action.run();
}
@Override
public Subject current() {
return (Subject) ReflectiveStrategy.invoke(current, null);
}
@Override
public <T> T callAs(Subject subject, Callable<T> action) throws CompletionException {
return (T) ReflectiveStrategy.invokeChecked(callAs, CompletionException.class, null, subject, action);
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.internals;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
/**
* Utility methods for strategies which use reflection to access methods without requiring them at compile-time.
*/
class ReflectiveStrategy {
static Object invoke(Method method, Object obj, Object... args) {
try {
return method.invoke(obj, args);
} catch (IllegalAccessException e) {
throw new UnsupportedOperationException(e);
} catch (InvocationTargetException e) {
Throwable cause = e.getCause();
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else {
throw new RuntimeException(cause);
}
}
}
static <T extends Exception> Object invokeChecked(Method method, Class<T> ex, Object obj, Object... args) throws T {
try {
return method.invoke(obj, args);
} catch (IllegalAccessException e) {
throw new UnsupportedOperationException(e);
} catch (InvocationTargetException e) {
Throwable cause = e.getCause();
if (ex.isInstance(cause)) {
throw ex.cast(cause);
} else if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else {
throw new RuntimeException(cause);
}
}
}
/**
* Interface to allow mocking out classloading infrastructure. This is used to test reflective operations.
*/
interface Loader {
Class<?> loadClass(String className) throws ClassNotFoundException;
static Loader forName() {
return className -> Class.forName(className, true, Loader.class.getClassLoader());
}
}
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.internals;
import java.security.PrivilegedAction;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionException;
import javax.security.auth.Subject;
/**
* This is a compatibility class to provide dual-support for JREs with and without SecurityManager support.
* <p>Users should call {@link #get()} to retrieve a singleton instance, and call instance methods
* {@link #doPrivileged(PrivilegedAction)}, {@link #current()}, and {@link #callAs(Subject, Callable)}.
* <p>This class's motivation and expected behavior is defined in
* <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1006%3A+Remove+SecurityManager+Support">KIP-1006</a>
*/
public interface SecurityManagerCompatibility {
/**
* @return an implementation of this interface which conforms to the functionality available in the current JRE.
*/
static SecurityManagerCompatibility get() {
return CompositeStrategy.INSTANCE;
}
/**
* Performs the specified {@code PrivilegedAction} with privileges
* enabled. The action is performed with <i>all</i> of the permissions
* possessed by the caller's protection domain.
*
* <p> If the action's {@code run} method throws an (unchecked)
* exception, it will propagate through this method.
*
* <p> Note that any DomainCombiner associated with the current
* AccessControlContext will be ignored while the action is performed.
*
* @param <T> the type of the value returned by the PrivilegedAction's
* {@code run} method.
*
* @param action the action to be performed.
*
* @return the value returned by the action's {@code run} method.
*
* @exception NullPointerException if the action is {@code null}
* @see java.security.AccessController#doPrivileged(PrivilegedAction)
*/
<T> T doPrivileged(PrivilegedAction<T> action);
/**
* Returns the current subject.
* <p>
* The current subject is installed by the {@link #callAs} method.
* When {@code callAs(subject, action)} is called, {@code action} is
* executed with {@code subject} as its current subject which can be
* retrieved by this method. After {@code action} is finished, the current
* subject is reset to its previous value. The current
* subject is {@code null} before the first call of {@code callAs()}.
*
* @return the current subject, or {@code null} if a current subject is
* not installed or the current subject is set to {@code null}.
* @see #callAs(Subject, Callable)
* @see Subject#current()
* @see Subject#callAs(Subject, Callable)
*/
Subject current();
/**
* Executes a {@code Callable} with {@code subject} as the
* current subject.
*
* @param subject the {@code Subject} that the specified {@code action}
* will run as. This parameter may be {@code null}.
* @param action the code to be run with {@code subject} as its current
* subject. Must not be {@code null}.
* @param <T> the type of value returned by the {@code call} method
* of {@code action}
* @return the value returned by the {@code call} method of {@code action}
* @throws NullPointerException if {@code action} is {@code null}
* @throws CompletionException if {@code action.call()} throws an exception.
* The cause of the {@code CompletionException} is set to the exception
* thrown by {@code action.call()}.
* @see #current()
* @see Subject#current()
* @see Subject#callAs(Subject, Callable)
*/
<T> T callAs(Subject subject, Callable<T> action) throws CompletionException;
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.internals;
import java.security.PrivilegedAction;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionException;
import javax.security.auth.Subject;
/**
* This is a fallback strategy to use if no other strategies are available.
* <p>This is used to improve control flow and provide detailed error messages in unusual situations.
*/
class UnsupportedStrategy implements SecurityManagerCompatibility {
private final Throwable e1;
private final Throwable e2;
UnsupportedStrategy(Throwable e1, Throwable e2) {
this.e1 = e1;
this.e2 = e2;
}
private UnsupportedOperationException createException(String message) {
UnsupportedOperationException e = new UnsupportedOperationException(message);
e.addSuppressed(e1);
e.addSuppressed(e2);
return e;
}
@Override
public <T> T doPrivileged(PrivilegedAction<T> action) {
throw createException("Unable to find suitable AccessController#doPrivileged implementation");
}
@Override
public Subject current() {
throw createException("Unable to find suitable Subject#getCurrent or Subject#current implementation");
}
@Override
public <T> T callAs(Subject subject, Callable<T> action) throws CompletionException {
throw createException("Unable to find suitable Subject#doAs or Subject#callAs implementation");
}
}

View File

@ -23,6 +23,7 @@ import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.internals.SecurityManagerCompatibility;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
@ -59,8 +60,6 @@ import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.security.Principal;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
@ -70,6 +69,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletionException;
import javax.security.auth.Subject;
import javax.security.sasl.Sasl;
@ -215,7 +215,7 @@ public class SaslClientAuthenticator implements Authenticator {
// visible for testing
SaslClient createSaslClient() {
try {
return Subject.doAs(subject, (PrivilegedExceptionAction<SaslClient>) () -> {
return SecurityManagerCompatibility.get().callAs(subject, () -> {
String[] mechs = {mechanism};
log.debug("Creating SaslClient: client={};service={};serviceHostname={};mechs={}",
clientPrincipalName, servicePrincipal, host, Arrays.toString(mechs));
@ -225,7 +225,7 @@ public class SaslClientAuthenticator implements Authenticator {
}
return retvalSaslClient;
});
} catch (PrivilegedActionException e) {
} catch (CompletionException e) {
throw new SaslAuthenticationException("Failed to create SaslClient with mechanism " + mechanism, e.getCause());
}
}
@ -533,8 +533,8 @@ public class SaslClientAuthenticator implements Authenticator {
if (isInitial && !saslClient.hasInitialResponse())
return saslToken;
else
return Subject.doAs(subject, (PrivilegedExceptionAction<byte[]>) () -> saslClient.evaluateChallenge(saslToken));
} catch (PrivilegedActionException e) {
return SecurityManagerCompatibility.get().callAs(subject, () -> saslClient.evaluateChallenge(saslToken));
} catch (CompletionException e) {
String error = "An error: (" + e + ") occurred when evaluating SASL token received from the Kafka Broker.";
KerberosError kerberosError = KerberosError.fromException(e);
// Try to provide hints to use about what went wrong so they can fix their configuration.
@ -545,7 +545,7 @@ public class SaslClientAuthenticator implements Authenticator {
" Users must configure FQDN of kafka brokers when authenticating using SASL and" +
" `socketChannel.socket().getInetAddress().getHostName()` must match the hostname in `principal/hostname@realm`";
}
//Unwrap the SaslException inside `PrivilegedActionException`
//Unwrap the SaslException
Throwable cause = e.getCause();
// Treat transient Kerberos errors as non-fatal SaslExceptions that are processed as I/O exceptions
// and all other failures as fatal SaslAuthenticationException.

View File

@ -17,13 +17,13 @@
package org.apache.kafka.common.security.authenticator;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.internals.SecurityManagerCompatibility;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.SaslExtensions;
import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
import org.apache.kafka.common.security.scram.ScramExtensionsCallback;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import java.security.AccessController;
import java.util.List;
import java.util.Map;
@ -55,7 +55,7 @@ public class SaslClientCallbackHandler implements AuthenticateCallbackHandler {
@Override
public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
Subject subject = Subject.getSubject(AccessController.getContext());
Subject subject = SecurityManagerCompatibility.get().current();
for (Callback callback : callbacks) {
if (callback instanceof NameCallback) {
NameCallback nc = (NameCallback) callback;

View File

@ -25,6 +25,7 @@ import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.SecurityManagerCompatibility;
import org.apache.kafka.common.message.SaslAuthenticateResponseData;
import org.apache.kafka.common.message.SaslHandshakeResponseData;
import org.apache.kafka.common.network.Authenticator;
@ -73,8 +74,6 @@ import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
@ -82,6 +81,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletionException;
import java.util.function.Function;
import javax.net.ssl.SSLSession;
@ -205,12 +205,12 @@ public class SaslServerAuthenticator implements Authenticator {
saslServer = createSaslKerberosServer(callbackHandler, configs, subject);
} else {
try {
saslServer = Subject.doAs(subject, (PrivilegedExceptionAction<SaslServer>) () ->
saslServer = SecurityManagerCompatibility.get().callAs(subject, () ->
Sasl.createSaslServer(saslMechanism, "kafka", serverAddress().getHostName(), configs, callbackHandler));
if (saslServer == null) {
throw new SaslException("Kafka Server failed to create a SaslServer to interact with a client during session authentication with server mechanism " + saslMechanism);
}
} catch (PrivilegedActionException e) {
} catch (CompletionException e) {
throw new SaslException("Kafka Server failed to create a SaslServer to interact with a client during session authentication with server mechanism " + saslMechanism, e.getCause());
}
}
@ -231,9 +231,9 @@ public class SaslServerAuthenticator implements Authenticator {
LOG.debug("Creating SaslServer for {} with mechanism {}", kerberosName, saslMechanism);
try {
return Subject.doAs(subject, (PrivilegedExceptionAction<SaslServer>) () ->
return SecurityManagerCompatibility.get().callAs(subject, () ->
Sasl.createSaslServer(saslMechanism, servicePrincipalName, serviceHostname, configs, saslServerCallbackHandler));
} catch (PrivilegedActionException e) {
} catch (CompletionException e) {
throw new SaslException("Kafka Server failed to create a SaslServer to interact with a client during session authentication", e.getCause());
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.security.oauthbearer.internals;
import org.apache.kafka.common.internals.SecurityManagerCompatibility;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.SaslExtensions;
import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
@ -27,7 +28,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.security.AccessController;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
@ -83,7 +83,7 @@ public class OAuthBearerSaslClientCallbackHandler implements AuthenticateCallbac
if (callback instanceof OAuthBearerTokenCallback)
handleCallback((OAuthBearerTokenCallback) callback);
else if (callback instanceof SaslExtensionsCallback)
handleCallback((SaslExtensionsCallback) callback, Subject.getSubject(AccessController.getContext()));
handleCallback((SaslExtensionsCallback) callback, SecurityManagerCompatibility.get().current());
else
throw new UnsupportedCallbackException(callback);
}
@ -97,7 +97,7 @@ public class OAuthBearerSaslClientCallbackHandler implements AuthenticateCallbac
private void handleCallback(OAuthBearerTokenCallback callback) throws IOException {
if (callback.token() != null)
throw new IllegalArgumentException("Callback had a token already");
Subject subject = Subject.getSubject(AccessController.getContext());
Subject subject = SecurityManagerCompatibility.get().current();
Set<OAuthBearerToken> privateCredentials = subject != null
? subject.getPrivateCredentials(OAuthBearerToken.class)
: Collections.emptySet();

View File

@ -0,0 +1,252 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.internals;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledForJreRange;
import org.junit.jupiter.api.condition.JRE;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionException;
import javax.security.auth.Subject;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class SecurityManagerCompatibilityTest {
@EnabledForJreRange(min = JRE.JAVA_8, max = JRE.JAVA_22)
@Test
public void testLegacyStrategyLoadable() throws ClassNotFoundException, NoSuchMethodException {
new LegacyStrategy(ReflectiveStrategy.Loader.forName());
}
@EnabledForJreRange(min = JRE.JAVA_18)
@Test
public void testModernStrategyLoadable() throws ClassNotFoundException, NoSuchMethodException {
new ModernStrategy(ReflectiveStrategy.Loader.forName());
}
@Test
public void testCompositeStrategyLoadable() {
new CompositeStrategy(ReflectiveStrategy.Loader.forName());
}
@Test
public void testDefaultStrategyLoadable() {
assertNotNull(SecurityManagerCompatibility.get());
}
@Test
public void testDefaultStrategyDoPrivilegedReturn() {
Object object = new Object();
Object returned = SecurityManagerCompatibility.get().doPrivileged(() -> object);
assertSame(object, returned);
}
@Test
public void testDefaultStrategyDoPrivilegedThrow() {
assertThrows(RuntimeException.class, () ->
SecurityManagerCompatibility.get().doPrivileged(() -> {
throw new RuntimeException();
})
);
}
@Test
public void testDefaultStrategyCurrentNull() {
Subject current = SecurityManagerCompatibility.get().current();
assertNull(current);
}
@Test
public void testDefaultStrategyCallAsReturn() {
Subject subject = new Subject();
Object object = new Object();
Object returned = SecurityManagerCompatibility.get().callAs(subject, () -> object);
assertSame(object, returned);
}
@Test
public void testDefaultStrategyCallAsCurrent() {
Subject subject = new Subject();
Subject returned = SecurityManagerCompatibility.get().callAs(subject, SecurityManagerCompatibility.get()::current);
assertSame(subject, returned);
}
@Test
public void testLegacyStrategyThrowsWhenSecurityManagerRemoved() {
ReflectiveStrategy.Loader loader = simulateSecurityManagerRemoval();
assertThrows(ClassNotFoundException.class, () -> new LegacyStrategy(loader));
}
@EnabledForJreRange(min = JRE.JAVA_18)
@Test
public void testModernStrategyLoadableWhenSecurityManagerRemoved() throws ClassNotFoundException, NoSuchMethodException {
ReflectiveStrategy.Loader loader = simulateSecurityManagerRemoval();
new ModernStrategy(loader);
}
@Test
public void testCompositeStrategyLoadableWhenSecurityManagerRemoved() {
ReflectiveStrategy.Loader loader = simulateSecurityManagerRemoval();
new CompositeStrategy(loader);
}
@Test
public void testLegacyStrategyCurrentThrowsWhenSecurityManagerUnsupported() throws ClassNotFoundException, NoSuchMethodException {
ReflectiveStrategy.Loader loader = simulateMethodsThrowUnsupportedOperationExceptions();
SecurityManagerCompatibility legacy = new LegacyStrategy(loader);
assertThrows(UnsupportedOperationException.class, legacy::current);
}
@Test
public void testLegacyStrategyCallAsThrowsWhenSecurityManagerUnsupported() throws ClassNotFoundException, NoSuchMethodException {
ReflectiveStrategy.Loader loader = simulateMethodsThrowUnsupportedOperationExceptions();
SecurityManagerCompatibility legacy = new LegacyStrategy(loader);
assertThrows(UnsupportedOperationException.class, () -> legacy.callAs(null, () -> null));
}
@Test
public void testCompositeStrategyDoPrivilegedWhenSecurityManagerUnsupported() {
ReflectiveStrategy.Loader loader = simulateMethodsThrowUnsupportedOperationExceptions();
CompositeStrategy composite = new CompositeStrategy(loader);
Object object = new Object();
Object returned = composite.doPrivileged(() -> object);
assertSame(object, returned);
}
@Test
public void testCompositeStrategyCurrentWhenSecurityManagerUnsupported() {
ReflectiveStrategy.Loader loader = simulateMethodsThrowUnsupportedOperationExceptions();
CompositeStrategy composite = new CompositeStrategy(loader);
Object returned = composite.current();
assertNull(returned);
}
@Test
public void testCompositeStrategyCallAsWhenSecurityManagerUnsupported() {
ReflectiveStrategy.Loader loader = simulateMethodsThrowUnsupportedOperationExceptions();
CompositeStrategy composite = new CompositeStrategy(loader);
Subject subject = new Subject();
Subject returned = composite.callAs(subject, composite::current);
assertSame(subject, returned);
}
private ReflectiveStrategy.Loader simulateSecurityManagerRemoval() {
return name -> {
if (name.equals("java.security.AccessController")) {
throw new ClassNotFoundException();
} else {
return ReflectiveStrategy.Loader.forName().loadClass(name);
}
};
}
private ReflectiveStrategy.Loader simulateMethodsThrowUnsupportedOperationExceptions() {
// WARNING: These assertions are here to prevent warnings about unused methods.
// These methods are used reflectively, and can't be removed.
assertThrows(UnsupportedOperationException.class, () -> UnsupportedOperations.doPrivileged(null));
assertThrows(UnsupportedOperationException.class, () -> UnsupportedOperations.getSubject(null));
assertThrows(UnsupportedOperationException.class, () -> UnsupportedOperations.doAs(null, null));
assertNull(UnsupportedOperations.current());
assertNull(UnsupportedOperations.callAs(null, () -> null));
return name -> {
switch (name) {
case "java.security.AccessController":
case "javax.security.auth.Subject":
return UnsupportedOperations.class;
case "java.security.AccessControlContext":
return UnsupportedOperations.DummyContext.class;
default:
return ReflectiveStrategy.Loader.forName().loadClass(name);
}
};
}
/**
* This is a class meant to stand-in for the AccessController, and Subject classes.
* It simulates a scenario where all legacy methods throw UnsupportedOperationException, and only the modern
* methods are functional.
*/
public static class UnsupportedOperations {
/**
* This class stands-in for the AccessControlContext in the mocked signatures below, because we can't have a
* compile-time dependency on the real class. This needs no methods and is just a dummy class.
*/
public static class DummyContext {
}
private static final ThreadLocal<Subject> ACTIVE_SUBJECT = new ThreadLocal<>();
/**
* Copy of AccessController#doPrivileged
*/
public static <T> void doPrivileged(PrivilegedAction<T> ignored) {
throw new UnsupportedOperationException();
}
/**
* Copy of AccessController#getContext
*/
public static DummyContext getContext() {
throw new UnsupportedOperationException();
}
/**
* Copy of Subject#getSubject
*/
public static void getSubject(DummyContext ignored) {
throw new UnsupportedOperationException();
}
/**
* Copy of Subject#doAs
*/
public static <T> void doAs(Subject ignored1, PrivilegedExceptionAction<T> ignored2) {
throw new UnsupportedOperationException();
}
/**
* Copy of Subject#current
*/
public static Subject current() {
return ACTIVE_SUBJECT.get();
}
/**
* Copy of Subject#callAs
*/
public static <T> T callAs(Subject subject, Callable<T> action) throws CompletionException {
Subject previous = ACTIVE_SUBJECT.get();
ACTIVE_SUBJECT.set(subject);
try {
return action.call();
} catch (Throwable e) {
throw new CompletionException(e);
} finally {
ACTIVE_SUBJECT.set(previous);
}
}
}
}

View File

@ -16,16 +16,15 @@
*/
package org.apache.kafka.common.security.oauthbearer;
import org.apache.kafka.common.internals.SecurityManagerCompatibility;
import org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletionException;
import javax.security.auth.Subject;
import javax.security.auth.callback.Callback;
@ -66,8 +65,8 @@ public class OAuthBearerSaslClientCallbackHandlerTest {
@Test
public void testWithZeroTokens() {
OAuthBearerSaslClientCallbackHandler handler = createCallbackHandler();
PrivilegedActionException e = assertThrows(PrivilegedActionException.class, () -> Subject.doAs(new Subject(),
(PrivilegedExceptionAction<Void>) () -> {
CompletionException e = assertThrows(CompletionException.class, () -> SecurityManagerCompatibility.get().callAs(new Subject(),
() -> {
OAuthBearerTokenCallback callback = new OAuthBearerTokenCallback();
handler.handle(new Callback[] {callback});
return null;
@ -77,11 +76,11 @@ public class OAuthBearerSaslClientCallbackHandlerTest {
}
@Test()
public void testWithPotentiallyMultipleTokens() throws Exception {
public void testWithPotentiallyMultipleTokens() {
OAuthBearerSaslClientCallbackHandler handler = createCallbackHandler();
Subject.doAs(new Subject(), (PrivilegedExceptionAction<Void>) () -> {
SecurityManagerCompatibility.get().callAs(new Subject(), () -> {
final int maxTokens = 4;
final Set<Object> privateCredentials = Subject.getSubject(AccessController.getContext())
final Set<Object> privateCredentials = SecurityManagerCompatibility.get().current()
.getPrivateCredentials();
privateCredentials.clear();
for (int num = 1; num <= maxTokens; ++num) {

View File

@ -16,9 +16,9 @@
*/
package org.apache.kafka.connect.runtime.isolation;
import org.apache.kafka.common.internals.SecurityManagerCompatibility;
import java.net.URL;
import java.security.AccessController;
import java.security.PrivilegedAction;
/**
* Factory for {@link DelegatingClassLoader} and {@link PluginClassLoader} instances.
@ -27,14 +27,14 @@ import java.security.PrivilegedAction;
public class ClassLoaderFactory implements PluginClassLoaderFactory {
public DelegatingClassLoader newDelegatingClassLoader(ClassLoader parent) {
return AccessController.doPrivileged(
(PrivilegedAction<DelegatingClassLoader>) () -> new DelegatingClassLoader(parent)
return SecurityManagerCompatibility.get().doPrivileged(
() -> new DelegatingClassLoader(parent)
);
}
public PluginClassLoader newPluginClassLoader(URL pluginLocation, URL[] urls, ClassLoader parent) {
return AccessController.doPrivileged(
(PrivilegedAction<PluginClassLoader>) () -> new PluginClassLoader(pluginLocation, urls, parent)
return SecurityManagerCompatibility.get().doPrivileged(
() -> new PluginClassLoader(pluginLocation, urls, parent)
);
}

View File

@ -16,14 +16,13 @@
*/
package org.apache.kafka.connect.runtime.isolation;
import org.apache.kafka.common.internals.SecurityManagerCompatibility;
import org.apache.kafka.connect.components.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.sql.Driver;
import java.util.ArrayList;
import java.util.Arrays;
@ -93,8 +92,8 @@ public abstract class PluginScanner {
private void loadJdbcDrivers(final ClassLoader loader) {
// Apply here what java.sql.DriverManager does to discover and register classes
// implementing the java.sql.Driver interface.
AccessController.doPrivileged(
(PrivilegedAction<Void>) () -> {
SecurityManagerCompatibility.get().doPrivileged(
() -> {
ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(
Driver.class,
loader

View File

@ -22,6 +22,7 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.internals.SecurityManagerCompatibility;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.Converter;
@ -40,8 +41,6 @@ import java.lang.management.ManagementFactory;
import java.lang.management.MonitorInfo;
import java.lang.management.ThreadInfo;
import java.net.URL;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
@ -161,9 +160,8 @@ public class SynchronizationTest {
private class SynchronizedClassLoaderFactory extends ClassLoaderFactory {
@Override
public DelegatingClassLoader newDelegatingClassLoader(ClassLoader parent) {
return AccessController.doPrivileged(
(PrivilegedAction<DelegatingClassLoader>) () ->
new SynchronizedDelegatingClassLoader(parent, dclBreakpoint)
return SecurityManagerCompatibility.get().doPrivileged(
() -> new SynchronizedDelegatingClassLoader(parent, dclBreakpoint)
);
}
@ -173,9 +171,8 @@ public class SynchronizationTest {
URL[] urls,
ClassLoader parent
) {
return AccessController.doPrivileged(
(PrivilegedAction<PluginClassLoader>) () ->
new SynchronizedPluginClassLoader(pluginLocation, urls, parent, pclBreakpoint)
return SecurityManagerCompatibility.get().doPrivileged(
() -> new SynchronizedPluginClassLoader(pluginLocation, urls, parent, pclBreakpoint)
);
}
}

View File

@ -32,6 +32,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.internals.SecurityManagerCompatibility;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Quota;
@ -98,7 +99,6 @@ import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.security.PrivilegedAction;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
@ -330,20 +330,16 @@ public class RemoteLogManager implements Closeable {
}
}
@SuppressWarnings("removal")
RemoteStorageManager createRemoteStorageManager() {
return java.security.AccessController.doPrivileged(new PrivilegedAction<RemoteStorageManager>() {
private final String classPath = rlmConfig.remoteStorageManagerClassPath();
public RemoteStorageManager run() {
return SecurityManagerCompatibility.get().doPrivileged(() -> {
final String classPath = rlmConfig.remoteStorageManagerClassPath();
if (classPath != null && !classPath.trim().isEmpty()) {
ChildFirstClassLoader classLoader = new ChildFirstClassLoader(classPath, this.getClass().getClassLoader());
RemoteStorageManager delegate = createDelegate(classLoader, rlmConfig.remoteStorageManagerClassName());
return new ClassLoaderAwareRemoteStorageManager(delegate, classLoader);
return (RemoteStorageManager) new ClassLoaderAwareRemoteStorageManager(delegate, classLoader);
} else {
return createDelegate(this.getClass().getClassLoader(), rlmConfig.remoteStorageManagerClassName());
}
}
});
}
@ -353,20 +349,16 @@ public class RemoteLogManager implements Closeable {
remoteLogStorageManager.configure(rsmProps);
}
@SuppressWarnings("removal")
RemoteLogMetadataManager createRemoteLogMetadataManager() {
return java.security.AccessController.doPrivileged(new PrivilegedAction<RemoteLogMetadataManager>() {
private final String classPath = rlmConfig.remoteLogMetadataManagerClassPath();
public RemoteLogMetadataManager run() {
return SecurityManagerCompatibility.get().doPrivileged(() -> {
final String classPath = rlmConfig.remoteLogMetadataManagerClassPath();
if (classPath != null && !classPath.trim().isEmpty()) {
ClassLoader classLoader = new ChildFirstClassLoader(classPath, this.getClass().getClassLoader());
RemoteLogMetadataManager delegate = createDelegate(classLoader, rlmConfig.remoteLogMetadataManagerClassName());
return new ClassLoaderAwareRemoteLogMetadataManager(delegate, classLoader);
return (RemoteLogMetadataManager) new ClassLoaderAwareRemoteLogMetadataManager(delegate, classLoader);
} else {
return createDelegate(this.getClass().getClassLoader(), rlmConfig.remoteLogMetadataManagerClassName());
}
}
});
}