KAFKA-15069: Refactor plugin scanning logic into ReflectionScanner (#13821)

Reviewers: Chris Egerton <chrise@aiven.io>
This commit is contained in:
Greg Harris 2023-07-06 10:22:28 -07:00 committed by GitHub
parent 4149e31cad
commit 1b925e9ee7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 751 additions and 416 deletions

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.isolation;
import java.net.URL;
import java.security.AccessController;
import java.security.PrivilegedAction;
/**
* Factory for {@link DelegatingClassLoader} and {@link PluginClassLoader} instances.
* Used for mocking classloader initialization in tests.
*/
public class ClassLoaderFactory implements PluginClassLoaderFactory {
public DelegatingClassLoader newDelegatingClassLoader(ClassLoader parent) {
return AccessController.doPrivileged(
(PrivilegedAction<DelegatingClassLoader>) () -> new DelegatingClassLoader(parent)
);
}
public PluginClassLoader newPluginClassLoader(URL pluginLocation, URL[] urls, ClassLoader parent) {
return AccessController.doPrivileged(
(PrivilegedAction<PluginClassLoader>) () -> new PluginClassLoader(pluginLocation, urls, parent)
);
}
}

View File

@ -16,49 +16,15 @@
*/ */
package org.apache.kafka.connect.runtime.isolation; package org.apache.kafka.connect.runtime.isolation;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.reflections.Configuration;
import org.reflections.Reflections;
import org.reflections.ReflectionsException;
import org.reflections.scanners.SubTypesScanner;
import org.reflections.util.ClasspathHelper;
import org.reflections.util.ConfigurationBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.MalformedURLException;
import java.net.URL; import java.net.URL;
import java.net.URLClassLoader; import java.net.URLClassLoader;
import java.nio.file.InvalidPathException;
import java.nio.file.Path;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.sql.Driver;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -75,11 +41,9 @@ import java.util.concurrent.ConcurrentMap;
*/ */
public class DelegatingClassLoader extends URLClassLoader { public class DelegatingClassLoader extends URLClassLoader {
private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class); private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class);
public static final String UNDEFINED_VERSION = "undefined";
private final ConcurrentMap<String, SortedMap<PluginDesc<?>, ClassLoader>> pluginLoaders; private final ConcurrentMap<String, SortedMap<PluginDesc<?>, ClassLoader>> pluginLoaders;
private final ConcurrentMap<String, String> aliases; private final ConcurrentMap<String, String> aliases;
private final List<Path> pluginLocations;
// Although this classloader does not load classes directly but rather delegates loading to a // Although this classloader does not load classes directly but rather delegates loading to a
// PluginClassLoader or its parent through its base class, because of the use of inheritance in // PluginClassLoader or its parent through its base class, because of the use of inheritance in
@ -89,19 +53,18 @@ public class DelegatingClassLoader extends URLClassLoader {
ClassLoader.registerAsParallelCapable(); ClassLoader.registerAsParallelCapable();
} }
public DelegatingClassLoader(List<Path> pluginLocations, ClassLoader parent) { public DelegatingClassLoader(ClassLoader parent) {
super(new URL[0], parent); super(new URL[0], parent);
this.pluginLocations = pluginLocations;
this.pluginLoaders = new ConcurrentHashMap<>(); this.pluginLoaders = new ConcurrentHashMap<>();
this.aliases = new ConcurrentHashMap<>(); this.aliases = new ConcurrentHashMap<>();
} }
public DelegatingClassLoader(List<Path> pluginLocations) { public DelegatingClassLoader() {
// Use as parent the classloader that loaded this class. In most cases this will be the // Use as parent the classloader that loaded this class. In most cases this will be the
// System classloader. But this choice here provides additional flexibility in managed // System classloader. But this choice here provides additional flexibility in managed
// environments that control classloading differently (OSGi, Spring and others) and don't // environments that control classloading differently (OSGi, Spring and others) and don't
// depend on the System classloader to load Connect's classes. // depend on the System classloader to load Connect's classes.
this(pluginLocations, DelegatingClassLoader.class.getClassLoader()); this(DelegatingClassLoader.class.getClassLoader());
} }
/** /**
@ -136,240 +99,7 @@ public class DelegatingClassLoader extends URLClassLoader {
return classLoader; return classLoader;
} }
// VisibleForTesting public void installDiscoveredPlugins(PluginScanResult scanResult) {
PluginClassLoader newPluginClassLoader(
final URL pluginLocation,
final URL[] urls,
final ClassLoader parent
) {
return AccessController.doPrivileged(
(PrivilegedAction<PluginClassLoader>) () -> new PluginClassLoader(pluginLocation, urls, parent)
);
}
public PluginScanResult initLoaders() {
List<PluginScanResult> results = new ArrayList<>();
for (Path pluginLocation : pluginLocations) {
try {
results.add(registerPlugin(pluginLocation));
} catch (InvalidPathException | MalformedURLException e) {
log.error("Invalid path in plugin path: {}. Ignoring.", pluginLocation, e);
} catch (IOException e) {
log.error("Could not get listing for plugin path: {}. Ignoring.", pluginLocation, e);
}
}
// Finally add parent/system loader.
results.add(scanUrlsAndAddPlugins(
getParent(),
ClasspathHelper.forJavaClassPath().toArray(new URL[0])
));
PluginScanResult scanResult = new PluginScanResult(results);
installDiscoveredPlugins(scanResult);
return scanResult;
}
private PluginScanResult registerPlugin(Path pluginLocation)
throws IOException {
log.info("Loading plugin from: {}", pluginLocation);
List<URL> pluginUrls = new ArrayList<>();
for (Path path : PluginUtils.pluginUrls(pluginLocation)) {
pluginUrls.add(path.toUri().toURL());
}
URL[] urls = pluginUrls.toArray(new URL[0]);
if (log.isDebugEnabled()) {
log.debug("Loading plugin urls: {}", Arrays.toString(urls));
}
PluginClassLoader loader = newPluginClassLoader(
pluginLocation.toUri().toURL(),
urls,
this
);
return scanUrlsAndAddPlugins(loader, urls);
}
private PluginScanResult scanUrlsAndAddPlugins(
ClassLoader loader,
URL[] urls
) {
PluginScanResult plugins = scanPluginPath(loader, urls);
log.info("Registered loader: {}", loader);
loadJdbcDrivers(loader);
return plugins;
}
private void loadJdbcDrivers(final ClassLoader loader) {
// Apply here what java.sql.DriverManager does to discover and register classes
// implementing the java.sql.Driver interface.
AccessController.doPrivileged(
(PrivilegedAction<Void>) () -> {
ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(
Driver.class,
loader
);
Iterator<Driver> driversIterator = loadedDrivers.iterator();
try {
while (driversIterator.hasNext()) {
Driver driver = driversIterator.next();
log.debug(
"Registered java.sql.Driver: {} to java.sql.DriverManager",
driver
);
}
} catch (Throwable t) {
log.debug(
"Ignoring java.sql.Driver classes listed in resources but not"
+ " present in class loader's classpath: ",
t
);
}
return null;
}
);
}
private PluginScanResult scanPluginPath(
ClassLoader loader,
URL[] urls
) {
ConfigurationBuilder builder = new ConfigurationBuilder();
builder.setClassLoaders(new ClassLoader[]{loader});
builder.addUrls(urls);
builder.setScanners(new SubTypesScanner());
builder.useParallelExecutor();
Reflections reflections = new InternalReflections(builder);
return new PluginScanResult(
getPluginDesc(reflections, SinkConnector.class, loader),
getPluginDesc(reflections, SourceConnector.class, loader),
getPluginDesc(reflections, Converter.class, loader),
getPluginDesc(reflections, HeaderConverter.class, loader),
getTransformationPluginDesc(loader, reflections),
getPredicatePluginDesc(loader, reflections),
getServiceLoaderPluginDesc(ConfigProvider.class, loader),
getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
);
}
@SuppressWarnings({"unchecked"})
private SortedSet<PluginDesc<Predicate<?>>> getPredicatePluginDesc(ClassLoader loader, Reflections reflections) {
return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) getPluginDesc(reflections, Predicate.class, loader);
}
@SuppressWarnings({"unchecked"})
private SortedSet<PluginDesc<Transformation<?>>> getTransformationPluginDesc(ClassLoader loader, Reflections reflections) {
return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) getPluginDesc(reflections, Transformation.class, loader);
}
private <T> SortedSet<PluginDesc<T>> getPluginDesc(
Reflections reflections,
Class<T> klass,
ClassLoader loader
) {
Set<Class<? extends T>> plugins;
try {
plugins = reflections.getSubTypesOf(klass);
} catch (ReflectionsException e) {
log.debug("Reflections scanner could not find any classes for URLs: " +
reflections.getConfiguration().getUrls(), e);
return Collections.emptySortedSet();
}
SortedSet<PluginDesc<T>> result = new TreeSet<>();
for (Class<? extends T> pluginKlass : plugins) {
if (!PluginUtils.isConcrete(pluginKlass)) {
log.debug("Skipping {} as it is not concrete implementation", pluginKlass);
continue;
}
if (pluginKlass.getClassLoader() != loader) {
log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading",
pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), loader);
continue;
}
try (LoaderSwap loaderSwap = withClassLoader(loader)) {
result.add(pluginDesc(pluginKlass, versionFor(pluginKlass), loader));
} catch (ReflectiveOperationException | LinkageError e) {
log.error("Failed to discover {}: Unable to instantiate {}{}", klass.getSimpleName(), pluginKlass.getSimpleName(), reflectiveErrorDescription(e), e);
}
}
return result;
}
@SuppressWarnings({"rawtypes", "unchecked"})
private <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String version, ClassLoader loader) {
return new PluginDesc(plugin, version, loader);
}
@SuppressWarnings("unchecked")
private <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, ClassLoader loader) {
SortedSet<PluginDesc<T>> result = new TreeSet<>();
ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
for (Iterator<T> iterator = serviceLoader.iterator(); iterator.hasNext(); ) {
try (LoaderSwap loaderSwap = withClassLoader(loader)) {
T pluginImpl;
try {
pluginImpl = iterator.next();
} catch (ServiceConfigurationError t) {
log.error("Failed to discover {}{}", klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
continue;
}
Class<? extends T> pluginKlass = (Class<? extends T>) pluginImpl.getClass();
if (pluginKlass.getClassLoader() != loader) {
log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading",
pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), loader);
continue;
}
result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), loader));
}
}
return result;
}
private static <T> String versionFor(T pluginImpl) {
try {
if (pluginImpl instanceof Versioned) {
return ((Versioned) pluginImpl).version();
}
} catch (Throwable t) {
log.error("Failed to get plugin version for " + pluginImpl.getClass(), t);
}
return UNDEFINED_VERSION;
}
public static <T> String versionFor(Class<? extends T> pluginKlass) throws ReflectiveOperationException {
// Unconditionally use the default constructor to create an instance to assert that
// the constructor exists and can complete successfully.
T pluginImpl = pluginKlass.getDeclaredConstructor().newInstance();
return versionFor(pluginImpl);
}
private static String reflectiveErrorDescription(Throwable t) {
if (t instanceof NoSuchMethodException) {
return ": Plugin class must have a no-args constructor, and cannot be a non-static inner class";
} else if (t instanceof SecurityException) {
return ": Security settings must allow reflective instantiation of plugin classes";
} else if (t instanceof IllegalAccessException) {
return ": Plugin class default constructor must be public";
} else if (t instanceof ExceptionInInitializerError) {
return ": Failed to statically initialize plugin class";
} else if (t instanceof InvocationTargetException) {
return ": Failed to invoke plugin constructor";
} else {
return "";
}
}
public LoaderSwap withClassLoader(ClassLoader loader) {
ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);
try {
return new LoaderSwap(savedLoader);
} catch (Throwable t) {
Plugins.compareAndSwapLoaders(savedLoader);
throw t;
}
}
private void installDiscoveredPlugins(PluginScanResult scanResult) {
pluginLoaders.putAll(computePluginLoaders(scanResult)); pluginLoaders.putAll(computePluginLoaders(scanResult));
for (String pluginClassName : pluginLoaders.keySet()) { for (String pluginClassName : pluginLoaders.keySet()) {
log.info("Added plugin '{}'", pluginClassName); log.info("Added plugin '{}'", pluginClassName);
@ -399,25 +129,4 @@ public class DelegatingClassLoader extends URLClassLoader {
.put(pluginDesc, pluginDesc.loader())); .put(pluginDesc, pluginDesc.loader()));
return pluginLoaders; return pluginLoaders;
} }
private static class InternalReflections extends Reflections {
public InternalReflections(Configuration configuration) {
super(configuration);
}
// When Reflections is used for parallel scans, it has a bug where it propagates ReflectionsException
// as RuntimeException. Override the scan behavior to emulate the singled-threaded logic.
@Override
protected void scan(URL url) {
try {
super.scan(url);
} catch (ReflectionsException e) {
Logger log = Reflections.log;
if (log != null && log.isWarnEnabled()) {
log.warn("could not create Vfs.Dir from url. ignoring the exception and continuing", e);
}
}
}
}
} }

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.isolation;
import java.net.URL;
/**
* Factory for {@link PluginClassLoader} instances.
* Used for mocking classloader initialization in tests.
*/
public interface PluginClassLoaderFactory {
PluginClassLoader newPluginClassLoader(URL pluginLocation, URL[] urls, ClassLoader parent);
}

View File

@ -22,6 +22,7 @@ import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
import java.util.Objects; import java.util.Objects;
public class PluginDesc<T> implements Comparable<PluginDesc<T>> { public class PluginDesc<T> implements Comparable<PluginDesc<T>> {
public static final String UNDEFINED_VERSION = "undefined";
private final Class<? extends T> klass; private final Class<? extends T> klass;
private final String name; private final String name;
private final String version; private final String version;

View File

@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.isolation;
import org.apache.kafka.connect.components.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.sql.Driver;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
/**
* Superclass for plugin discovery implementations.
*
* <p>Callers of this class should use {@link #discoverPlugins(Set)} to discover plugins which are present in the
* passed-in {@link PluginSource} instances.
*
* <p>Implementors of this class should implement {@link #scanPlugins(PluginSource)}, in order to scan a single source.
* The returned {@link PluginScanResult} should contain only plugins which are loadable from the passed-in source.
* The superclass has some common functionality which is usable in subclasses, and handles merging multiple results.
*
* <p>Implementations of this class must be thread-safe, but may have side effects on the provided {@link ClassLoader}
* instances and plugin classes which may not be thread safe. This depends on the thread safety of the plugin
* implementations, due to the necessity of initializing and instantiate plugin classes to evaluate their versions.
*/
public abstract class PluginScanner {
private static final Logger log = LoggerFactory.getLogger(PluginScanner.class);
/**
* Entry point for plugin scanning. Discovers plugins present in any of the provided plugin sources.
* <p>See the implementation-specific documentation for the conditions for a plugin to appear in this result.
* @param sources to scan for contained plugins
* @return A {@link PluginScanResult} containing all plugins which this scanning implementation could discover.
*/
public PluginScanResult discoverPlugins(Set<PluginSource> sources) {
long startMs = System.currentTimeMillis();
List<PluginScanResult> results = new ArrayList<>();
for (PluginSource source : sources) {
results.add(scanUrlsAndAddPlugins(source));
}
long endMs = System.currentTimeMillis();
log.info("Scanning plugins with {} took {} ms", getClass().getSimpleName(), endMs - startMs);
return new PluginScanResult(results);
}
private PluginScanResult scanUrlsAndAddPlugins(PluginSource source) {
log.info("Loading plugin from: {}", source.location());
if (log.isDebugEnabled()) {
log.debug("Loading plugin urls: {}", Arrays.toString(source.urls()));
}
PluginScanResult plugins = scanPlugins(source);
log.info("Registered loader: {}", source.loader());
loadJdbcDrivers(source.loader());
return plugins;
}
/**
* Implementation-specific strategy for scanning a single {@link PluginSource}.
* @param source A single source to scan for plugins.
* @return A {@link PluginScanResult} containing all plugins which this scanning implementation could discover.
*/
protected abstract PluginScanResult scanPlugins(PluginSource source);
private void loadJdbcDrivers(final ClassLoader loader) {
// Apply here what java.sql.DriverManager does to discover and register classes
// implementing the java.sql.Driver interface.
AccessController.doPrivileged(
(PrivilegedAction<Void>) () -> {
ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(
Driver.class,
loader
);
Iterator<Driver> driversIterator = loadedDrivers.iterator();
try {
while (driversIterator.hasNext()) {
Driver driver = driversIterator.next();
log.debug(
"Registered java.sql.Driver: {} to java.sql.DriverManager",
driver
);
}
} catch (Throwable t) {
log.debug(
"Ignoring java.sql.Driver classes listed in resources but not"
+ " present in class loader's classpath: ",
t
);
}
return null;
}
);
}
@SuppressWarnings({"rawtypes", "unchecked"})
protected <T> PluginDesc<T> pluginDesc(Class<? extends T> plugin, String version, ClassLoader loader) {
return new PluginDesc(plugin, version, loader);
}
@SuppressWarnings("unchecked")
protected <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, ClassLoader loader) {
SortedSet<PluginDesc<T>> result = new TreeSet<>();
ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
for (Iterator<T> iterator = serviceLoader.iterator(); iterator.hasNext(); ) {
try (LoaderSwap loaderSwap = withClassLoader(loader)) {
T pluginImpl;
try {
pluginImpl = iterator.next();
} catch (ServiceConfigurationError t) {
log.error("Failed to discover {}{}", klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
continue;
}
Class<? extends T> pluginKlass = (Class<? extends T>) pluginImpl.getClass();
if (pluginKlass.getClassLoader() != loader) {
log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading",
pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), loader);
continue;
}
result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), loader));
}
}
return result;
}
protected static <T> String versionFor(T pluginImpl) {
try {
if (pluginImpl instanceof Versioned) {
return ((Versioned) pluginImpl).version();
}
} catch (Throwable t) {
log.error("Failed to get plugin version for " + pluginImpl.getClass(), t);
}
return PluginDesc.UNDEFINED_VERSION;
}
protected static String reflectiveErrorDescription(Throwable t) {
if (t instanceof NoSuchMethodException) {
return ": Plugin class must have a no-args constructor, and cannot be a non-static inner class";
} else if (t instanceof SecurityException) {
return ": Security settings must allow reflective instantiation of plugin classes";
} else if (t instanceof IllegalAccessException) {
return ": Plugin class default constructor must be public";
} else if (t instanceof ExceptionInInitializerError) {
return ": Failed to statically initialize plugin class";
} else if (t instanceof InvocationTargetException) {
return ": Failed to invoke plugin constructor";
} else {
return "";
}
}
protected LoaderSwap withClassLoader(ClassLoader loader) {
ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);
try {
return new LoaderSwap(savedLoader);
} catch (Throwable t) {
Plugins.compareAndSwapLoaders(savedLoader);
throw t;
}
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.isolation;
import java.net.URL;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Objects;
public class PluginSource {
private final Path location;
private final ClassLoader loader;
private final URL[] urls;
public PluginSource(Path location, ClassLoader loader, URL[] urls) {
this.location = location;
this.loader = loader;
this.urls = urls;
}
public Path location() {
return location;
}
public ClassLoader loader() {
return loader;
}
public URL[] urls() {
return urls;
}
public boolean isolated() {
return loader instanceof PluginClassLoader;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PluginSource that = (PluginSource) o;
return Objects.equals(location, that.location) && loader.equals(that.loader) && Arrays.equals(urls, that.urls);
}
@Override
public int hashCode() {
int result = Objects.hash(location, loader);
result = 31 * result + Arrays.hashCode(urls);
return result;
}
}

View File

@ -16,11 +16,14 @@
*/ */
package org.apache.kafka.connect.runtime.isolation; package org.apache.kafka.connect.runtime.isolation;
import org.reflections.util.ClasspathHelper;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Modifier; import java.lang.reflect.Modifier;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.DirectoryStream; import java.nio.file.DirectoryStream;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.InvalidPathException; import java.nio.file.InvalidPathException;
@ -325,6 +328,34 @@ public class PluginUtils {
return Arrays.asList(archives.toArray(new Path[0])); return Arrays.asList(archives.toArray(new Path[0]));
} }
public static Set<PluginSource> pluginSources(List<Path> pluginLocations, ClassLoader classLoader, PluginClassLoaderFactory factory) {
Set<PluginSource> pluginSources = new HashSet<>();
for (Path pluginLocation : pluginLocations) {
try {
List<URL> pluginUrls = new ArrayList<>();
for (Path path : pluginUrls(pluginLocation)) {
pluginUrls.add(path.toUri().toURL());
}
URL[] urls = pluginUrls.toArray(new URL[0]);
PluginClassLoader loader = factory.newPluginClassLoader(
pluginLocation.toUri().toURL(),
urls,
classLoader
);
pluginSources.add(new PluginSource(pluginLocation, loader, urls));
} catch (InvalidPathException | MalformedURLException e) {
log.error("Invalid path in plugin path: {}. Ignoring.", pluginLocation, e);
} catch (IOException e) {
log.error("Could not get listing for plugin path: {}. Ignoring.", pluginLocation, e);
}
}
URL[] classpathUrls = ClasspathHelper.forJavaClassPath().toArray(new URL[0]);
pluginSources.add(new PluginSource(null, classLoader.getParent(), classpathUrls));
return pluginSources;
}
/** /**
* Return the simple class name of a plugin as {@code String}. * Return the simple class name of a plugin as {@code String}.
* *

View File

@ -38,8 +38,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.nio.file.Path; import java.nio.file.Path;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
@ -59,22 +57,22 @@ public class Plugins {
private final PluginScanResult scanResult; private final PluginScanResult scanResult;
public Plugins(Map<String, String> props) { public Plugins(Map<String, String> props) {
this(props, Plugins.class.getClassLoader()); this(props, Plugins.class.getClassLoader(), new ClassLoaderFactory());
} }
// VisibleForTesting // VisibleForTesting
Plugins(Map<String, String> props, ClassLoader parent) { Plugins(Map<String, String> props, ClassLoader parent, ClassLoaderFactory factory) {
String pluginPath = WorkerConfig.pluginPath(props); String pluginPath = WorkerConfig.pluginPath(props);
List<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath); List<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath);
delegatingLoader = newDelegatingClassLoader(pluginLocations, parent); delegatingLoader = factory.newDelegatingClassLoader(parent);
scanResult = delegatingLoader.initLoaders(); Set<PluginSource> pluginSources = PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory);
scanResult = initLoaders(pluginSources);
} }
// VisibleForTesting private PluginScanResult initLoaders(Set<PluginSource> pluginSources) {
protected DelegatingClassLoader newDelegatingClassLoader(final List<Path> pluginLocations, ClassLoader parent) { PluginScanResult reflectiveScanResult = new ReflectionScanner().discoverPlugins(pluginSources);
return AccessController.doPrivileged( delegatingLoader.installDiscoveredPlugins(reflectiveScanResult);
(PrivilegedAction<DelegatingClassLoader>) () -> new DelegatingClassLoader(pluginLocations, parent) return reflectiveScanResult;
);
} }
private static <T> String pluginNames(Collection<PluginDesc<T>> plugins) { private static <T> String pluginNames(Collection<PluginDesc<T>> plugins) {

View File

@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.isolation;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.reflections.Configuration;
import org.reflections.Reflections;
import org.reflections.ReflectionsException;
import org.reflections.scanners.SubTypesScanner;
import org.reflections.util.ConfigurationBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URL;
import java.util.Collections;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
/**
* A {@link PluginScanner} implementation which uses reflection and {@link ServiceLoader} to discover plugins.
* <p>This implements the legacy discovery strategy, which uses a combination of reflection and service loading in
* order to discover plugins. Specifically, a plugin appears in the scan result if all the following conditions are true:
* <ul>
* <li>The class is concrete</li>
* <li>The class is public</li>
* <li>The class has a no-args constructor</li>
* <li>The no-args constructor is public</li>
* <li>Static initialization of the class completes without throwing an exception</li>
* <li>The no-args constructor completes without throwing an exception</li>
* <li>One of the following is true:
* <ul>
* <li>Is a subclass of {@link SinkConnector}, {@link SourceConnector}, {@link Converter},
* {@link HeaderConverter}, {@link Transformation}, or {@link Predicate}</li>
* <li>Is a subclass of {@link ConfigProvider}, {@link ConnectRestExtension}, or
* {@link ConnectorClientConfigOverridePolicy}, and has a {@link ServiceLoader} compatible
* manifest file or module declaration</li>
* </ul>
* </li>
* </ul>
* <p>Note: This scanner has a runtime proportional to the number of overall classes in the passed-in
* {@link PluginSource} objects, which may be significant for plugins with large dependencies.
*/
public class ReflectionScanner extends PluginScanner {
private static final Logger log = LoggerFactory.getLogger(ReflectionScanner.class);
public static <T> String versionFor(Class<? extends T> pluginKlass) throws ReflectiveOperationException {
T pluginImpl = pluginKlass.getDeclaredConstructor().newInstance();
return versionFor(pluginImpl);
}
@Override
protected PluginScanResult scanPlugins(PluginSource source) {
ClassLoader loader = source.loader();
ConfigurationBuilder builder = new ConfigurationBuilder();
builder.setClassLoaders(new ClassLoader[]{loader});
builder.addUrls(source.urls());
builder.setScanners(new SubTypesScanner());
builder.useParallelExecutor();
Reflections reflections = new InternalReflections(builder);
return new PluginScanResult(
getPluginDesc(reflections, SinkConnector.class, loader),
getPluginDesc(reflections, SourceConnector.class, loader),
getPluginDesc(reflections, Converter.class, loader),
getPluginDesc(reflections, HeaderConverter.class, loader),
getTransformationPluginDesc(loader, reflections),
getPredicatePluginDesc(loader, reflections),
getServiceLoaderPluginDesc(ConfigProvider.class, loader),
getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
);
}
@SuppressWarnings({"unchecked"})
private SortedSet<PluginDesc<Predicate<?>>> getPredicatePluginDesc(ClassLoader loader, Reflections reflections) {
return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) getPluginDesc(reflections, Predicate.class, loader);
}
@SuppressWarnings({"unchecked"})
private SortedSet<PluginDesc<Transformation<?>>> getTransformationPluginDesc(ClassLoader loader, Reflections reflections) {
return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) getPluginDesc(reflections, Transformation.class, loader);
}
private <T> SortedSet<PluginDesc<T>> getPluginDesc(
Reflections reflections,
Class<T> klass,
ClassLoader loader
) {
Set<Class<? extends T>> plugins;
try {
plugins = reflections.getSubTypesOf(klass);
} catch (ReflectionsException e) {
log.debug("Reflections scanner could not find any classes for URLs: " +
reflections.getConfiguration().getUrls(), e);
return Collections.emptySortedSet();
}
SortedSet<PluginDesc<T>> result = new TreeSet<>();
for (Class<? extends T> pluginKlass : plugins) {
if (!PluginUtils.isConcrete(pluginKlass)) {
log.debug("Skipping {} as it is not concrete implementation", pluginKlass);
continue;
}
if (pluginKlass.getClassLoader() != loader) {
log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading",
pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), loader);
continue;
}
try (LoaderSwap loaderSwap = withClassLoader(loader)) {
result.add(pluginDesc(pluginKlass, versionFor(pluginKlass), loader));
} catch (ReflectiveOperationException | LinkageError e) {
log.error("Failed to discover {}: Unable to instantiate {}{}", klass.getSimpleName(), pluginKlass.getSimpleName(), reflectiveErrorDescription(e), e);
}
}
return result;
}
private static class InternalReflections extends Reflections {
public InternalReflections(Configuration configuration) {
super(configuration);
}
// When Reflections is used for parallel scans, it has a bug where it propagates ReflectionsException
// as RuntimeException. Override the scan behavior to emulate the singled-threaded logic.
@Override
protected void scan(URL url) {
try {
super.scan(url);
} catch (ReflectionsException e) {
Logger log = Reflections.log;
if (log != null && log.isWarnEnabled()) {
log.warn("could not create Vfs.Dir from url. ignoring the exception and continuing", e);
}
}
}
}
}

View File

@ -19,7 +19,6 @@ package org.apache.kafka.connect.runtime.rest.entities;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
import org.apache.kafka.connect.runtime.isolation.PluginDesc; import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.PluginType; import org.apache.kafka.connect.runtime.isolation.PluginType;
@ -91,7 +90,7 @@ public class PluginInfo {
public static final class NoVersionFilter { public static final class NoVersionFilter {
// This method is used by Jackson to filter the version field for plugins that don't have a version // This method is used by Jackson to filter the version field for plugins that don't have a version
public boolean equals(Object obj) { public boolean equals(Object obj) {
return DelegatingClassLoader.UNDEFINED_VERSION.equals(obj); return PluginDesc.UNDEFINED_VERSION.equals(obj);
} }
// Dummy hashCode method to not fail compilation because of equals() method // Dummy hashCode method to not fail compilation because of equals() method

View File

@ -14,112 +14,101 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.kafka.connect.runtime.isolation; package org.apache.kafka.connect.runtime.isolation;
import org.junit.Rule; import org.apache.kafka.connect.sink.SinkConnector;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.nio.file.Files; import java.net.MalformedURLException;
import java.nio.file.Path; import java.net.URL;
import java.util.Collections; import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class DelegatingClassLoaderTest { public class DelegatingClassLoaderTest {
@Rule public PluginClassLoader parent;
public TemporaryFolder pluginDir = new TemporaryFolder(); public PluginClassLoader pluginLoader;
public DelegatingClassLoader classLoader;
public PluginDesc<SinkConnector> pluginDesc;
public PluginScanResult scanResult;
@Test // Arbitrary values, their contents is not meaningful.
public void testLoadingUnloadedPluginClass() { public static final String ARBITRARY = "arbitrary";
DelegatingClassLoader classLoader = new DelegatingClassLoader( public static final Class<?> ARBITRARY_CLASS = org.mockito.Mockito.class;
Collections.emptyList(), public static final URL ARBITRARY_URL;
DelegatingClassLoader.class.getClassLoader()
); static {
classLoader.initLoaders(); try {
for (String pluginClassName : TestPlugins.pluginClasses()) { ARBITRARY_URL = new URL("jar:file://" + ARBITRARY + "!/" + ARBITRARY);
assertThrows(ClassNotFoundException.class, () -> classLoader.loadClass(pluginClassName)); } catch (MalformedURLException e) {
throw new RuntimeException(e);
} }
} }
@Test @Before
public void testLoadingPluginClass() throws ClassNotFoundException { @SuppressWarnings({"unchecked"})
DelegatingClassLoader classLoader = new DelegatingClassLoader( public void setUp() {
TestPlugins.pluginPath(), parent = mock(PluginClassLoader.class);
DelegatingClassLoader.class.getClassLoader() pluginLoader = mock(PluginClassLoader.class);
classLoader = new DelegatingClassLoader(parent);
SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>();
// Lie to the DCL that this arbitrary class is a connector, since all real connector classes we have access to
// are forced to be non-isolated by PluginUtils.shouldLoadInIsolation.
pluginDesc = new PluginDesc<>((Class<? extends SinkConnector>) ARBITRARY_CLASS, null, pluginLoader);
assertTrue(PluginUtils.shouldLoadInIsolation(pluginDesc.className()));
sinkConnectors.add(pluginDesc);
scanResult = new PluginScanResult(
sinkConnectors,
Collections.emptySortedSet(),
Collections.emptySortedSet(),
Collections.emptySortedSet(),
Collections.emptySortedSet(),
Collections.emptySortedSet(),
Collections.emptySortedSet(),
Collections.emptySortedSet(),
Collections.emptySortedSet()
); );
classLoader.initLoaders();
for (String pluginClassName : TestPlugins.pluginClasses()) {
assertNotNull(classLoader.loadClass(pluginClassName));
assertNotNull(classLoader.pluginClassLoader(pluginClassName));
}
} }
@Test @Test
public void testLoadingInvalidUberJar() throws Exception { public void testEmptyConnectorLoader() {
pluginDir.newFile("invalid.jar"); assertSame(classLoader, classLoader.connectorLoader(ARBITRARY));
DelegatingClassLoader classLoader = new DelegatingClassLoader(
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()),
DelegatingClassLoader.class.getClassLoader()
);
classLoader.initLoaders();
} }
@Test @Test
public void testLoadingPluginDirContainsInvalidJarsOnly() throws Exception { @SuppressWarnings({"unchecked", "rawtypes"})
pluginDir.newFolder("my-plugin"); public void testEmptyLoadClass() throws ClassNotFoundException {
pluginDir.newFile("my-plugin/invalid.jar"); when(parent.loadClass(ARBITRARY, false)).thenReturn((Class) ARBITRARY_CLASS);
assertSame(ARBITRARY_CLASS, classLoader.loadClass(ARBITRARY, false));
DelegatingClassLoader classLoader = new DelegatingClassLoader(
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()),
DelegatingClassLoader.class.getClassLoader()
);
classLoader.initLoaders();
} }
@Test @Test
public void testLoadingNoPlugins() { public void testEmptyGetResource() {
DelegatingClassLoader classLoader = new DelegatingClassLoader( when(parent.getResource(ARBITRARY)).thenReturn(ARBITRARY_URL);
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()), assertSame(ARBITRARY_URL, classLoader.getResource(ARBITRARY));
DelegatingClassLoader.class.getClassLoader()
);
classLoader.initLoaders();
} }
@Test @Test
public void testLoadingPluginDirEmpty() throws Exception { public void testInitializedConnectorLoader() {
pluginDir.newFolder("my-plugin"); classLoader.installDiscoveredPlugins(scanResult);
assertSame(pluginLoader, classLoader.connectorLoader(PluginUtils.prunedName(pluginDesc)));
DelegatingClassLoader classLoader = new DelegatingClassLoader( assertSame(pluginLoader, classLoader.connectorLoader(PluginUtils.simpleName(pluginDesc)));
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()), assertSame(pluginLoader, classLoader.connectorLoader(pluginDesc.className()));
DelegatingClassLoader.class.getClassLoader()
);
classLoader.initLoaders();
} }
@Test @Test
public void testLoadingMixOfValidAndInvalidPlugins() throws Exception { @SuppressWarnings({"unchecked", "rawtypes"})
pluginDir.newFile("invalid.jar"); public void testInitializedLoadClass() throws ClassNotFoundException {
pluginDir.newFolder("my-plugin"); classLoader.installDiscoveredPlugins(scanResult);
pluginDir.newFile("my-plugin/invalid.jar"); String className = pluginDesc.className();
Path pluginPath = this.pluginDir.getRoot().toPath(); when(pluginLoader.loadClass(className, false)).thenReturn((Class) ARBITRARY_CLASS);
assertSame(ARBITRARY_CLASS, classLoader.loadClass(className, false));
for (Path source : TestPlugins.pluginPath()) {
Files.copy(source, pluginPath.resolve(source.getFileName()));
}
DelegatingClassLoader classLoader = new DelegatingClassLoader(
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()),
DelegatingClassLoader.class.getClassLoader()
);
classLoader.initLoaders();
for (String pluginClassName : TestPlugins.pluginClasses()) {
assertNotNull(classLoader.loadClass(pluginClassName));
assertNotNull(classLoader.pluginClassLoader(pluginClassName));
}
} }
} }

View File

@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.isolation;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
public class PluginScannerTest {
@Rule
public TemporaryFolder pluginDir = new TemporaryFolder();
@Test
public void testLoadingUnloadedPluginClass() {
DelegatingClassLoader classLoader = initClassLoader(
Collections.emptyList()
);
for (String pluginClassName : TestPlugins.pluginClasses()) {
assertThrows(ClassNotFoundException.class, () -> classLoader.loadClass(pluginClassName));
}
}
@Test
public void testLoadingPluginClass() throws ClassNotFoundException {
DelegatingClassLoader classLoader = initClassLoader(
TestPlugins.pluginPath()
);
for (String pluginClassName : TestPlugins.pluginClasses()) {
assertNotNull(classLoader.loadClass(pluginClassName));
assertNotNull(classLoader.pluginClassLoader(pluginClassName));
}
}
@Test
public void testLoadingInvalidUberJar() throws Exception {
pluginDir.newFile("invalid.jar");
initClassLoader(
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
);
}
@Test
public void testLoadingPluginDirContainsInvalidJarsOnly() throws Exception {
pluginDir.newFolder("my-plugin");
pluginDir.newFile("my-plugin/invalid.jar");
initClassLoader(
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
);
}
@Test
public void testLoadingNoPlugins() {
initClassLoader(
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
);
}
@Test
public void testLoadingPluginDirEmpty() throws Exception {
pluginDir.newFolder("my-plugin");
initClassLoader(
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
);
}
@Test
public void testLoadingMixOfValidAndInvalidPlugins() throws Exception {
pluginDir.newFile("invalid.jar");
pluginDir.newFolder("my-plugin");
pluginDir.newFile("my-plugin/invalid.jar");
Path pluginPath = this.pluginDir.getRoot().toPath();
for (Path source : TestPlugins.pluginPath()) {
Files.copy(source, pluginPath.resolve(source.getFileName()));
}
DelegatingClassLoader classLoader = initClassLoader(
Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath())
);
for (String pluginClassName : TestPlugins.pluginClasses()) {
assertNotNull(classLoader.loadClass(pluginClassName));
assertNotNull(classLoader.pluginClassLoader(pluginClassName));
}
}
private DelegatingClassLoader initClassLoader(List<Path> pluginLocations) {
ClassLoaderFactory factory = new ClassLoaderFactory();
DelegatingClassLoader classLoader = factory.newDelegatingClassLoader(DelegatingClassLoader.class.getClassLoader());
Set<PluginSource> pluginSources = PluginUtils.pluginSources(pluginLocations, classLoader, factory);
PluginScanResult scanResult = new ReflectionScanner().discoverPlugins(pluginSources);
classLoader.installDiscoveredPlugins(scanResult);
return classLoader;
}
}

View File

@ -498,7 +498,7 @@ public class PluginsTest {
WorkerConfig.PLUGIN_PATH_CONFIG, WorkerConfig.PLUGIN_PATH_CONFIG,
TestPlugins.pluginPathJoined(childResource) TestPlugins.pluginPathJoined(childResource)
); );
plugins = new Plugins(pluginProps, parent); plugins = new Plugins(pluginProps, parent, new ClassLoaderFactory());
Converter converter = plugins.newPlugin( Converter converter = plugins.newPlugin(
className, className,

View File

@ -24,12 +24,10 @@ import java.lang.management.ManagementFactory;
import java.lang.management.MonitorInfo; import java.lang.management.MonitorInfo;
import java.lang.management.ThreadInfo; import java.lang.management.ThreadInfo;
import java.net.URL; import java.net.URL;
import java.nio.file.Path;
import java.security.AccessController; import java.security.AccessController;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.BrokenBarrierException;
@ -79,15 +77,7 @@ public class SynchronizationTest {
+ "." + testName.getMethodName() + "-"; + "." + testName.getMethodName() + "-";
dclBreakpoint = new Breakpoint<>(); dclBreakpoint = new Breakpoint<>();
pclBreakpoint = new Breakpoint<>(); pclBreakpoint = new Breakpoint<>();
plugins = new Plugins(pluginProps) { plugins = new Plugins(pluginProps, Plugins.class.getClassLoader(), new SynchronizedClassLoaderFactory());
@Override
protected DelegatingClassLoader newDelegatingClassLoader(List<Path> pluginLocations, ClassLoader parent) {
return AccessController.doPrivileged(
(PrivilegedAction<DelegatingClassLoader>) () ->
new SynchronizedDelegatingClassLoader(pluginLocations, parent)
);
}
};
exec = new ThreadPoolExecutor( exec = new ThreadPoolExecutor(
2, 2,
2, 2,
@ -167,25 +157,35 @@ public class SynchronizationTest {
} }
} }
private class SynchronizedClassLoaderFactory extends ClassLoaderFactory {
@Override
public DelegatingClassLoader newDelegatingClassLoader(ClassLoader parent) {
return AccessController.doPrivileged(
(PrivilegedAction<DelegatingClassLoader>) () ->
new SynchronizedDelegatingClassLoader(parent)
);
}
@Override
public PluginClassLoader newPluginClassLoader(
URL pluginLocation,
URL[] urls,
ClassLoader parent
) {
return AccessController.doPrivileged(
(PrivilegedAction<PluginClassLoader>) () ->
new SynchronizedPluginClassLoader(pluginLocation, urls, parent)
);
}
}
private class SynchronizedDelegatingClassLoader extends DelegatingClassLoader { private class SynchronizedDelegatingClassLoader extends DelegatingClassLoader {
{ {
ClassLoader.registerAsParallelCapable(); ClassLoader.registerAsParallelCapable();
} }
public SynchronizedDelegatingClassLoader(List<Path> pluginLocations, ClassLoader parent) { public SynchronizedDelegatingClassLoader(ClassLoader parent) {
super(pluginLocations, parent); super(parent);
}
@Override
protected PluginClassLoader newPluginClassLoader(
URL pluginLocation,
URL[] urls,
ClassLoader parent
) {
return AccessController.doPrivileged(
(PrivilegedAction<PluginClassLoader>) () ->
new SynchronizedPluginClassLoader(pluginLocation, urls, parent)
);
} }
@Override @Override

View File

@ -16,7 +16,7 @@
*/ */
package org.apache.kafka.connect.runtime.rest.entities; package org.apache.kafka.connect.runtime.rest.entities;
import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader; import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.junit.Test; import org.junit.Test;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
@ -32,6 +32,6 @@ public class PluginInfoTest {
assertFalse(filter.equals("1.0")); assertFalse(filter.equals("1.0"));
assertFalse(filter.equals(new Object())); assertFalse(filter.equals(new Object()));
assertFalse(filter.equals(null)); assertFalse(filter.equals(null));
assertTrue(filter.equals(DelegatingClassLoader.UNDEFINED_VERSION)); assertTrue(filter.equals(PluginDesc.UNDEFINED_VERSION));
} }
} }

View File

@ -33,11 +33,11 @@ import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.SampleSinkConnector; import org.apache.kafka.connect.runtime.SampleSinkConnector;
import org.apache.kafka.connect.runtime.SampleSourceConnector; import org.apache.kafka.connect.runtime.SampleSourceConnector;
import org.apache.kafka.connect.runtime.distributed.DistributedHerder; import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.PluginDesc; import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.PluginType; import org.apache.kafka.connect.runtime.isolation.PluginType;
import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.ReflectionScanner;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
@ -453,7 +453,7 @@ public class ConnectorPluginsResourceTest {
public MockConnectorPluginDesc(Class<T> klass) throws Exception { public MockConnectorPluginDesc(Class<T> klass) throws Exception {
super( super(
klass, klass,
DelegatingClassLoader.versionFor(klass), ReflectionScanner.versionFor(klass),
new MockPluginClassLoader(null, new URL[0]) new MockPluginClassLoader(null, new URL[0])
); );
} }