MINOR: Refactor DelegatingClassLoader to emit immutable PluginScanResult (#13771)

Reviewers: Chris Egerton <chrise@aiven.io>
This commit is contained in:
Greg Harris 2023-06-06 06:37:57 -07:00 committed by GitHub
parent 9ebe395c57
commit c8cb85274e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 356 additions and 218 deletions

View File

@ -18,7 +18,6 @@ package org.apache.kafka.connect.runtime.isolation;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.sink.SinkConnector;
@ -48,10 +47,11 @@ import java.security.PrivilegedAction;
import java.sql.Driver;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
import java.util.Set;
@ -79,15 +79,6 @@ public class DelegatingClassLoader extends URLClassLoader {
private final ConcurrentMap<String, SortedMap<PluginDesc<?>, ClassLoader>> pluginLoaders;
private final ConcurrentMap<String, String> aliases;
private final SortedSet<PluginDesc<SinkConnector>> sinkConnectors;
private final SortedSet<PluginDesc<SourceConnector>> sourceConnectors;
private final SortedSet<PluginDesc<Converter>> converters;
private final SortedSet<PluginDesc<HeaderConverter>> headerConverters;
private final SortedSet<PluginDesc<Transformation<?>>> transformations;
private final SortedSet<PluginDesc<Predicate<?>>> predicates;
private final SortedSet<PluginDesc<ConfigProvider>> configProviders;
private final SortedSet<PluginDesc<ConnectRestExtension>> restExtensions;
private final SortedSet<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies;
private final List<Path> pluginLocations;
// Although this classloader does not load classes directly but rather delegates loading to a
@ -103,15 +94,6 @@ public class DelegatingClassLoader extends URLClassLoader {
this.pluginLocations = pluginLocations;
this.pluginLoaders = new ConcurrentHashMap<>();
this.aliases = new ConcurrentHashMap<>();
this.sinkConnectors = new TreeSet<>();
this.sourceConnectors = new TreeSet<>();
this.converters = new TreeSet<>();
this.headerConverters = new TreeSet<>();
this.transformations = new TreeSet<>();
this.predicates = new TreeSet<>();
this.configProviders = new TreeSet<>();
this.restExtensions = new TreeSet<>();
this.connectorClientConfigPolicies = new TreeSet<>();
}
public DelegatingClassLoader(List<Path> pluginLocations) {
@ -122,49 +104,6 @@ public class DelegatingClassLoader extends URLClassLoader {
this(pluginLocations, DelegatingClassLoader.class.getClassLoader());
}
@SuppressWarnings({"unchecked", "rawtypes"})
public Set<PluginDesc<Connector>> connectors() {
Set<PluginDesc<Connector>> connectors = new TreeSet<>((Set) sinkConnectors);
connectors.addAll((Set) sourceConnectors);
return connectors;
}
public Set<PluginDesc<SinkConnector>> sinkConnectors() {
return sinkConnectors;
}
public Set<PluginDesc<SourceConnector>> sourceConnectors() {
return sourceConnectors;
}
public Set<PluginDesc<Converter>> converters() {
return converters;
}
public Set<PluginDesc<HeaderConverter>> headerConverters() {
return headerConverters;
}
public Set<PluginDesc<Transformation<?>>> transformations() {
return transformations;
}
public Set<PluginDesc<Predicate<?>>> predicates() {
return predicates;
}
public Set<PluginDesc<ConfigProvider>> configProviders() {
return configProviders;
}
public Set<PluginDesc<ConnectRestExtension>> restExtensions() {
return restExtensions;
}
public Set<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies() {
return connectorClientConfigPolicies;
}
/**
* Retrieve the PluginClassLoader associated with a plugin class
* @param name The fully qualified class name of the plugin
@ -208,24 +147,11 @@ public class DelegatingClassLoader extends URLClassLoader {
);
}
private <T> void addPlugins(Collection<PluginDesc<T>> plugins, ClassLoader loader) {
for (PluginDesc<T> plugin : plugins) {
String pluginClassName = plugin.className();
SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(pluginClassName);
if (inner == null) {
inner = new TreeMap<>();
pluginLoaders.put(pluginClassName, inner);
// TODO: once versioning is enabled this line should be moved outside this if branch
log.info("Added plugin '{}'", pluginClassName);
}
inner.put(plugin, loader);
}
}
protected void initLoaders() {
public PluginScanResult initLoaders() {
List<PluginScanResult> results = new ArrayList<>();
for (Path pluginLocation : pluginLocations) {
try {
registerPlugin(pluginLocation);
results.add(registerPlugin(pluginLocation));
} catch (InvalidPathException | MalformedURLException e) {
log.error("Invalid path in plugin path: {}. Ignoring.", pluginLocation, e);
} catch (IOException e) {
@ -233,14 +159,16 @@ public class DelegatingClassLoader extends URLClassLoader {
}
}
// Finally add parent/system loader.
scanUrlsAndAddPlugins(
results.add(scanUrlsAndAddPlugins(
getParent(),
ClasspathHelper.forJavaClassPath().toArray(new URL[0])
);
addAllAliases();
));
PluginScanResult scanResult = new PluginScanResult(results);
installDiscoveredPlugins(scanResult);
return scanResult;
}
private void registerPlugin(Path pluginLocation)
private PluginScanResult registerPlugin(Path pluginLocation)
throws IOException {
log.info("Loading plugin from: {}", pluginLocation);
List<URL> pluginUrls = new ArrayList<>();
@ -256,37 +184,17 @@ public class DelegatingClassLoader extends URLClassLoader {
urls,
this
);
scanUrlsAndAddPlugins(loader, urls);
return scanUrlsAndAddPlugins(loader, urls);
}
private void scanUrlsAndAddPlugins(
private PluginScanResult scanUrlsAndAddPlugins(
ClassLoader loader,
URL[] urls
) {
PluginScanResult plugins = scanPluginPath(loader, urls);
log.info("Registered loader: {}", loader);
if (!plugins.isEmpty()) {
addPlugins(plugins.sinkConnectors(), loader);
sinkConnectors.addAll(plugins.sinkConnectors());
addPlugins(plugins.sourceConnectors(), loader);
sourceConnectors.addAll(plugins.sourceConnectors());
addPlugins(plugins.converters(), loader);
converters.addAll(plugins.converters());
addPlugins(plugins.headerConverters(), loader);
headerConverters.addAll(plugins.headerConverters());
addPlugins(plugins.transformations(), loader);
transformations.addAll(plugins.transformations());
addPlugins(plugins.predicates(), loader);
predicates.addAll(plugins.predicates());
addPlugins(plugins.configProviders(), loader);
configProviders.addAll(plugins.configProviders());
addPlugins(plugins.restExtensions(), loader);
restExtensions.addAll(plugins.restExtensions());
addPlugins(plugins.connectorClientConfigPolicies(), loader);
connectorClientConfigPolicies.addAll(plugins.connectorClientConfigPolicies());
}
loadJdbcDrivers(loader);
return plugins;
}
private void loadJdbcDrivers(final ClassLoader loader) {
@ -344,16 +252,16 @@ public class DelegatingClassLoader extends URLClassLoader {
}
@SuppressWarnings({"unchecked"})
private Collection<PluginDesc<Predicate<?>>> getPredicatePluginDesc(ClassLoader loader, Reflections reflections) {
return (Collection<PluginDesc<Predicate<?>>>) (Collection<?>) getPluginDesc(reflections, Predicate.class, loader);
private SortedSet<PluginDesc<Predicate<?>>> getPredicatePluginDesc(ClassLoader loader, Reflections reflections) {
return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) getPluginDesc(reflections, Predicate.class, loader);
}
@SuppressWarnings({"unchecked"})
private Collection<PluginDesc<Transformation<?>>> getTransformationPluginDesc(ClassLoader loader, Reflections reflections) {
return (Collection<PluginDesc<Transformation<?>>>) (Collection<?>) getPluginDesc(reflections, Transformation.class, loader);
private SortedSet<PluginDesc<Transformation<?>>> getTransformationPluginDesc(ClassLoader loader, Reflections reflections) {
return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) getPluginDesc(reflections, Transformation.class, loader);
}
private <T> Collection<PluginDesc<T>> getPluginDesc(
private <T> SortedSet<PluginDesc<T>> getPluginDesc(
Reflections reflections,
Class<T> klass,
ClassLoader loader
@ -364,10 +272,10 @@ public class DelegatingClassLoader extends URLClassLoader {
} catch (ReflectionsException e) {
log.debug("Reflections scanner could not find any classes for URLs: " +
reflections.getConfiguration().getUrls(), e);
return Collections.emptyList();
return Collections.emptySortedSet();
}
Collection<PluginDesc<T>> result = new ArrayList<>();
SortedSet<PluginDesc<T>> result = new TreeSet<>();
for (Class<? extends T> pluginKlass : plugins) {
if (!PluginUtils.isConcrete(pluginKlass)) {
log.debug("Skipping {} as it is not concrete implementation", pluginKlass);
@ -393,8 +301,8 @@ public class DelegatingClassLoader extends URLClassLoader {
}
@SuppressWarnings("unchecked")
private <T> Collection<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, ClassLoader loader) {
Collection<PluginDesc<T>> result = new ArrayList<>();
private <T> SortedSet<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, ClassLoader loader) {
SortedSet<PluginDesc<T>> result = new TreeSet<>();
ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
for (Iterator<T> iterator = serviceLoader.iterator(); iterator.hasNext(); ) {
try (LoaderSwap loaderSwap = withClassLoader(loader)) {
@ -461,6 +369,17 @@ public class DelegatingClassLoader extends URLClassLoader {
}
}
private void installDiscoveredPlugins(PluginScanResult scanResult) {
pluginLoaders.putAll(computePluginLoaders(scanResult));
for (String pluginClassName : pluginLoaders.keySet()) {
log.info("Added plugin '{}'", pluginClassName);
}
aliases.putAll(PluginUtils.computeAliases(scanResult));
for (Map.Entry<String, String> alias : aliases.entrySet()) {
log.info("Added alias '{}' to plugin '{}'", alias.getKey(), alias.getValue());
}
}
@Override
protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
String fullName = aliases.getOrDefault(name, name);
@ -473,35 +392,12 @@ public class DelegatingClassLoader extends URLClassLoader {
return super.loadClass(fullName, resolve);
}
private void addAllAliases() {
addAliases(connectors());
addAliases(converters);
addAliases(headerConverters);
addAliases(transformations);
addAliases(predicates);
addAliases(restExtensions);
addAliases(connectorClientConfigPolicies);
}
private <S> void addAliases(Collection<PluginDesc<S>> plugins) {
for (PluginDesc<S> plugin : plugins) {
if (PluginUtils.isAliasUnique(plugin, plugins)) {
String simple = PluginUtils.simpleName(plugin);
String pruned = PluginUtils.prunedName(plugin);
aliases.put(simple, plugin.className());
if (simple.equals(pruned)) {
log.info("Added alias '{}' to plugin '{}'", simple, plugin.className());
} else {
aliases.put(pruned, plugin.className());
log.info(
"Added aliases '{}' and '{}' to plugin '{}'",
simple,
pruned,
plugin.className()
);
}
}
}
private static Map<String, SortedMap<PluginDesc<?>, ClassLoader>> computePluginLoaders(PluginScanResult plugins) {
Map<String, SortedMap<PluginDesc<?>, ClassLoader>> pluginLoaders = new HashMap<>();
plugins.forEach(pluginDesc ->
pluginLoaders.computeIfAbsent(pluginDesc.className(), k -> new TreeMap<>())
.put(pluginDesc, pluginDesc.loader()));
return pluginLoaders;
}
private static class InternalReflections extends Reflections {

View File

@ -29,6 +29,7 @@ public class PluginDesc<T> implements Comparable<PluginDesc<T>> {
private final PluginType type;
private final String typeName;
private final String location;
private final ClassLoader loader;
public PluginDesc(Class<? extends T> klass, String version, ClassLoader loader) {
this.klass = klass;
@ -40,6 +41,7 @@ public class PluginDesc<T> implements Comparable<PluginDesc<T>> {
this.location = loader instanceof PluginClassLoader
? ((PluginClassLoader) loader).location()
: "classpath";
this.loader = loader;
}
@Override
@ -83,6 +85,10 @@ public class PluginDesc<T> implements Comparable<PluginDesc<T>> {
return location;
}
public ClassLoader loader() {
return loader;
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -105,6 +111,9 @@ public class PluginDesc<T> implements Comparable<PluginDesc<T>> {
@Override
public int compareTo(PluginDesc<T> other) {
int nameComp = name.compareTo(other.name);
return nameComp != 0 ? nameComp : encodedVersion.compareTo(other.encodedVersion);
int versionComp = encodedVersion.compareTo(other.encodedVersion);
// isolated plugins appear after classpath plugins when they have identical versions.
int isolatedComp = Boolean.compare(other.loader instanceof PluginClassLoader, loader instanceof PluginClassLoader);
return nameComp != 0 ? nameComp : (versionComp != 0 ? versionComp : isolatedComp);
}
}

View File

@ -27,32 +27,35 @@ import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import java.util.Arrays;
import java.util.Collection;
import java.util.SortedSet;
import java.util.List;
import java.util.TreeSet;
import java.util.function.Consumer;
import java.util.function.Function;
public class PluginScanResult {
private final Collection<PluginDesc<SinkConnector>> sinkConnectors;
private final Collection<PluginDesc<SourceConnector>> sourceConnectors;
private final Collection<PluginDesc<Converter>> converters;
private final Collection<PluginDesc<HeaderConverter>> headerConverters;
private final Collection<PluginDesc<Transformation<?>>> transformations;
private final Collection<PluginDesc<Predicate<?>>> predicates;
private final Collection<PluginDesc<ConfigProvider>> configProviders;
private final Collection<PluginDesc<ConnectRestExtension>> restExtensions;
private final Collection<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies;
private final SortedSet<PluginDesc<SinkConnector>> sinkConnectors;
private final SortedSet<PluginDesc<SourceConnector>> sourceConnectors;
private final SortedSet<PluginDesc<Converter>> converters;
private final SortedSet<PluginDesc<HeaderConverter>> headerConverters;
private final SortedSet<PluginDesc<Transformation<?>>> transformations;
private final SortedSet<PluginDesc<Predicate<?>>> predicates;
private final SortedSet<PluginDesc<ConfigProvider>> configProviders;
private final SortedSet<PluginDesc<ConnectRestExtension>> restExtensions;
private final SortedSet<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies;
private final List<Collection<?>> allPlugins;
private final List<SortedSet<? extends PluginDesc<?>>> allPlugins;
public PluginScanResult(
Collection<PluginDesc<SinkConnector>> sinkConnectors,
Collection<PluginDesc<SourceConnector>> sourceConnectors,
Collection<PluginDesc<Converter>> converters,
Collection<PluginDesc<HeaderConverter>> headerConverters,
Collection<PluginDesc<Transformation<?>>> transformations,
Collection<PluginDesc<Predicate<?>>> predicates,
Collection<PluginDesc<ConfigProvider>> configProviders,
Collection<PluginDesc<ConnectRestExtension>> restExtensions,
Collection<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies
SortedSet<PluginDesc<SinkConnector>> sinkConnectors,
SortedSet<PluginDesc<SourceConnector>> sourceConnectors,
SortedSet<PluginDesc<Converter>> converters,
SortedSet<PluginDesc<HeaderConverter>> headerConverters,
SortedSet<PluginDesc<Transformation<?>>> transformations,
SortedSet<PluginDesc<Predicate<?>>> predicates,
SortedSet<PluginDesc<ConfigProvider>> configProviders,
SortedSet<PluginDesc<ConnectRestExtension>> restExtensions,
SortedSet<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies
) {
this.sinkConnectors = sinkConnectors;
this.sourceConnectors = sourceConnectors;
@ -64,49 +67,78 @@ public class PluginScanResult {
this.restExtensions = restExtensions;
this.connectorClientConfigPolicies = connectorClientConfigPolicies;
this.allPlugins =
Arrays.asList(sinkConnectors, sourceConnectors, converters, headerConverters, transformations, configProviders,
connectorClientConfigPolicies);
Arrays.asList(sinkConnectors, sourceConnectors, converters, headerConverters, transformations, predicates,
configProviders, restExtensions, connectorClientConfigPolicies);
}
public Collection<PluginDesc<SinkConnector>> sinkConnectors() {
/**
* Merge one or more {@link PluginScanResult results} into one result object
*/
public PluginScanResult(List<PluginScanResult> results) {
this(
merge(results, PluginScanResult::sinkConnectors),
merge(results, PluginScanResult::sourceConnectors),
merge(results, PluginScanResult::converters),
merge(results, PluginScanResult::headerConverters),
merge(results, PluginScanResult::transformations),
merge(results, PluginScanResult::predicates),
merge(results, PluginScanResult::configProviders),
merge(results, PluginScanResult::restExtensions),
merge(results, PluginScanResult::connectorClientConfigPolicies)
);
}
private static <R extends Comparable<R>> SortedSet<R> merge(List<PluginScanResult> results, Function<PluginScanResult, SortedSet<R>> accessor) {
SortedSet<R> merged = new TreeSet<>();
for (PluginScanResult element : results) {
merged.addAll(accessor.apply(element));
}
return merged;
}
public SortedSet<PluginDesc<SinkConnector>> sinkConnectors() {
return sinkConnectors;
}
public Collection<PluginDesc<SourceConnector>> sourceConnectors() {
public SortedSet<PluginDesc<SourceConnector>> sourceConnectors() {
return sourceConnectors;
}
public Collection<PluginDesc<Converter>> converters() {
public SortedSet<PluginDesc<Converter>> converters() {
return converters;
}
public Collection<PluginDesc<HeaderConverter>> headerConverters() {
public SortedSet<PluginDesc<HeaderConverter>> headerConverters() {
return headerConverters;
}
public Collection<PluginDesc<Transformation<?>>> transformations() {
public SortedSet<PluginDesc<Transformation<?>>> transformations() {
return transformations;
}
public Collection<PluginDesc<Predicate<?>>> predicates() {
public SortedSet<PluginDesc<Predicate<?>>> predicates() {
return predicates;
}
public Collection<PluginDesc<ConfigProvider>> configProviders() {
public SortedSet<PluginDesc<ConfigProvider>> configProviders() {
return configProviders;
}
public Collection<PluginDesc<ConnectRestExtension>> restExtensions() {
public SortedSet<PluginDesc<ConnectRestExtension>> restExtensions() {
return restExtensions;
}
public Collection<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies() {
public SortedSet<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies() {
return connectorClientConfigPolicies;
}
public void forEach(Consumer<PluginDesc<?>> consumer) {
allPlugins.forEach(plugins -> plugins.forEach(consumer));
}
public boolean isEmpty() {
boolean isEmpty = true;
for (Collection<?> plugins : allPlugins) {
for (SortedSet<?> plugins : allPlugins) {
isEmpty = isEmpty && plugins.isEmpty();
}
return isEmpty;

View File

@ -28,13 +28,14 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.regex.Pattern;
@ -352,31 +353,6 @@ public class PluginUtils {
}
}
/**
* Verify whether a given plugin's alias matches another alias in a collection of plugins.
*
* @param alias the plugin descriptor to test for alias matching.
* @param plugins the collection of plugins to test against.
* @param <U> the plugin type.
* @return false if a match was found in the collection, otherwise true.
*/
public static <U> boolean isAliasUnique(
PluginDesc<U> alias,
Collection<PluginDesc<U>> plugins
) {
boolean matched = false;
for (PluginDesc<U> plugin : plugins) {
if (simpleName(alias).equals(simpleName(plugin))
|| prunedName(alias).equals(prunedName(plugin))) {
if (matched) {
return false;
}
matched = true;
}
}
return true;
}
private static String prunePluginName(PluginDesc<?> plugin, String suffix) {
String simple = plugin.pluginClass().getSimpleName();
int pos = simple.lastIndexOf(suffix);
@ -386,6 +362,25 @@ public class PluginUtils {
return simple;
}
public static Map<String, String> computeAliases(PluginScanResult scanResult) {
Map<String, Set<String>> aliasCollisions = new HashMap<>();
scanResult.forEach(pluginDesc -> {
aliasCollisions.computeIfAbsent(simpleName(pluginDesc), ignored -> new HashSet<>()).add(pluginDesc.className());
aliasCollisions.computeIfAbsent(prunedName(pluginDesc), ignored -> new HashSet<>()).add(pluginDesc.className());
});
Map<String, String> aliases = new HashMap<>();
for (Map.Entry<String, Set<String>> entry : aliasCollisions.entrySet()) {
String alias = entry.getKey();
Set<String> classNames = entry.getValue();
if (classNames.size() == 1) {
aliases.put(alias, classNames.stream().findAny().get());
} else {
log.warn("Ignoring ambiguous alias '{}' since it refers to multiple distinct plugins {}", alias, classNames);
}
}
return aliases;
}
private static class DirectoryEntry {
final DirectoryStream<Path> stream;
final Iterator<Path> iterator;

View File

@ -45,6 +45,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
public class Plugins {
@ -55,6 +56,7 @@ public class Plugins {
private static final Logger log = LoggerFactory.getLogger(Plugins.class);
private final DelegatingClassLoader delegatingLoader;
private final PluginScanResult scanResult;
public Plugins(Map<String, String> props) {
this(props, Plugins.class.getClassLoader());
@ -65,7 +67,7 @@ public class Plugins {
String pluginPath = WorkerConfig.pluginPath(props);
List<Path> pluginLocations = PluginUtils.pluginLocations(pluginPath);
delegatingLoader = newDelegatingClassLoader(pluginLocations, parent);
delegatingLoader.initLoaders();
scanResult = delegatingLoader.initLoaders();
}
// VisibleForTesting
@ -194,32 +196,39 @@ public class Plugins {
return delegatingLoader.connectorLoader(connectorClassOrAlias);
}
@SuppressWarnings({"unchecked", "rawtypes"})
public Set<PluginDesc<Connector>> connectors() {
Set<PluginDesc<Connector>> connectors = new TreeSet<>((Set) sinkConnectors());
connectors.addAll((Set) sourceConnectors());
return connectors;
}
public Set<PluginDesc<SinkConnector>> sinkConnectors() {
return delegatingLoader.sinkConnectors();
return scanResult.sinkConnectors();
}
public Set<PluginDesc<SourceConnector>> sourceConnectors() {
return delegatingLoader.sourceConnectors();
return scanResult.sourceConnectors();
}
public Set<PluginDesc<Converter>> converters() {
return delegatingLoader.converters();
return scanResult.converters();
}
public Set<PluginDesc<HeaderConverter>> headerConverters() {
return delegatingLoader.headerConverters();
return scanResult.headerConverters();
}
public Set<PluginDesc<Transformation<?>>> transformations() {
return delegatingLoader.transformations();
return scanResult.transformations();
}
public Set<PluginDesc<Predicate<?>>> predicates() {
return delegatingLoader.predicates();
return scanResult.predicates();
}
public Set<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies() {
return delegatingLoader.connectorClientConfigPolicies();
return scanResult.connectorClientConfigPolicies();
}
public Object newPlugin(String classOrAlias) throws ClassNotFoundException {
@ -242,7 +251,7 @@ public class Plugins {
);
} catch (ClassNotFoundException e) {
List<PluginDesc<? extends Connector>> matches = new ArrayList<>();
Set<PluginDesc<Connector>> connectors = delegatingLoader.connectors();
Set<PluginDesc<Connector>> connectors = connectors();
for (PluginDesc<? extends Connector> plugin : connectors) {
Class<?> pluginClass = plugin.pluginClass();
String simpleName = pluginClass.getSimpleName();
@ -300,7 +309,7 @@ public class Plugins {
// Attempt to load first with the current classloader, and plugins as a fallback.
// Note: we can't use config.getConfiguredInstance because Converter doesn't implement Configurable, and even if it did
// we have to remove the property prefixes before calling config(...) and we still always want to call Converter.config.
klass = pluginClassFromConfig(config, classPropertyName, Converter.class, delegatingLoader.converters());
klass = pluginClassFromConfig(config, classPropertyName, Converter.class, scanResult.converters());
break;
case PLUGINS:
// Attempt to load with the plugin class loader, which uses the current classloader as a fallback
@ -311,7 +320,7 @@ public class Plugins {
throw new ConnectException(
"Failed to find any class that implements Converter and which name matches "
+ converterClassOrAlias + ", available converters are: "
+ pluginNames(delegatingLoader.converters())
+ pluginNames(scanResult.converters())
);
}
break;
@ -383,7 +392,7 @@ public class Plugins {
// Attempt to load first with the current classloader, and plugins as a fallback.
// Note: we can't use config.getConfiguredInstance because we have to remove the property prefixes
// before calling config(...)
klass = pluginClassFromConfig(config, classPropertyName, HeaderConverter.class, delegatingLoader.headerConverters());
klass = pluginClassFromConfig(config, classPropertyName, HeaderConverter.class, scanResult.headerConverters());
break;
case PLUGINS:
// Attempt to load with the plugin class loader, which uses the current classloader as a fallback.
@ -400,7 +409,7 @@ public class Plugins {
"Failed to find any class that implements HeaderConverter and which name matches "
+ converterClassOrAlias
+ ", available header converters are: "
+ pluginNames(delegatingLoader.headerConverters())
+ pluginNames(scanResult.headerConverters())
);
}
}
@ -432,7 +441,7 @@ public class Plugins {
switch (classLoaderUsage) {
case CURRENT_CLASSLOADER:
// Attempt to load first with the current classloader, and plugins as a fallback.
klass = pluginClassFromConfig(config, classPropertyName, ConfigProvider.class, delegatingLoader.configProviders());
klass = pluginClassFromConfig(config, classPropertyName, ConfigProvider.class, scanResult.configProviders());
break;
case PLUGINS:
// Attempt to load with the plugin class loader, which uses the current classloader as a fallback
@ -443,7 +452,7 @@ public class Plugins {
throw new ConnectException(
"Failed to find any class that implements ConfigProvider and which name matches "
+ configProviderClassOrAlias + ", available ConfigProviders are: "
+ pluginNames(delegatingLoader.configProviders())
+ pluginNames(scanResult.configProviders())
);
}
break;

View File

@ -22,6 +22,7 @@ import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.junit.Before;
import org.junit.Test;
@ -223,6 +224,20 @@ public class PluginDescTest {
);
assertNewer(transformDescPluginPath, transformDescClasspath);
PluginDesc<Predicate> predicateDescPluginPath = new PluginDesc<>(
Predicate.class,
regularVersion,
pluginLoader
);
PluginDesc<Predicate> predicateDescClasspath = new PluginDesc<>(
Predicate.class,
regularVersion,
systemLoader
);
assertNewer(predicateDescPluginPath, predicateDescClasspath);
}
private static <T> void assertPluginDesc(

View File

@ -16,6 +16,17 @@
*/
package org.apache.kafka.connect.runtime.isolation;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.tools.MockSinkConnector;
import org.apache.kafka.connect.tools.MockSourceConnector;
import org.apache.kafka.connect.transforms.Transformation;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@ -28,7 +39,11 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -490,6 +505,173 @@ public class PluginUtilsTest {
assertUrls(expectedUrls, PluginUtils.pluginUrls(pluginPath));
}
@Test
public void testNonCollidingAliases() {
SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>();
sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, MockSinkConnector.class.getClassLoader()));
SortedSet<PluginDesc<SourceConnector>> sourceConnectors = new TreeSet<>();
sourceConnectors.add(new PluginDesc<>(MockSourceConnector.class, null, MockSourceConnector.class.getClassLoader()));
SortedSet<PluginDesc<Converter>> converters = new TreeSet<>();
converters.add(new PluginDesc<>(CollidingConverter.class, null, CollidingConverter.class.getClassLoader()));
PluginScanResult result = new PluginScanResult(
sinkConnectors,
sourceConnectors,
converters,
Collections.emptySortedSet(),
Collections.emptySortedSet(),
Collections.emptySortedSet(),
Collections.emptySortedSet(),
Collections.emptySortedSet(),
Collections.emptySortedSet()
);
Map<String, String> aliases = PluginUtils.computeAliases(result);
Map<String, String> actualAliases = PluginUtils.computeAliases(result);
Map<String, String> expectedAliases = new HashMap<>();
expectedAliases.put("MockSinkConnector", MockSinkConnector.class.getName());
expectedAliases.put("MockSink", MockSinkConnector.class.getName());
expectedAliases.put("MockSourceConnector", MockSourceConnector.class.getName());
expectedAliases.put("MockSource", MockSourceConnector.class.getName());
expectedAliases.put("CollidingConverter", CollidingConverter.class.getName());
expectedAliases.put("Colliding", CollidingConverter.class.getName());
assertEquals(expectedAliases, actualAliases);
}
@Test
public void testMultiVersionAlias() {
SortedSet<PluginDesc<SinkConnector>> sinkConnectors = new TreeSet<>();
// distinct versions don't cause an alias collision (the class name is the same)
sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, MockSinkConnector.class.getClassLoader()));
sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, "1.0", MockSinkConnector.class.getClassLoader()));
assertEquals(2, sinkConnectors.size());
PluginScanResult result = new PluginScanResult(
sinkConnectors,
Collections.emptySortedSet(),
Collections.emptySortedSet(),
Collections.emptySortedSet(),
Collections.emptySortedSet(),
Collections.emptySortedSet(),
Collections.emptySortedSet(),
Collections.emptySortedSet(),
Collections.emptySortedSet()
);
Map<String, String> actualAliases = PluginUtils.computeAliases(result);
Map<String, String> expectedAliases = new HashMap<>();
expectedAliases.put("MockSinkConnector", MockSinkConnector.class.getName());
expectedAliases.put("MockSink", MockSinkConnector.class.getName());
assertEquals(expectedAliases, actualAliases);
}
@Test
public void testCollidingPrunedAlias() {
SortedSet<PluginDesc<Converter>> converters = new TreeSet<>();
converters.add(new PluginDesc<>(CollidingConverter.class, null, CollidingConverter.class.getClassLoader()));
SortedSet<PluginDesc<HeaderConverter>> headerConverters = new TreeSet<>();
headerConverters.add(new PluginDesc<>(CollidingHeaderConverter.class, null, CollidingHeaderConverter.class.getClassLoader()));
PluginScanResult result = new PluginScanResult(
Collections.emptySortedSet(),
Collections.emptySortedSet(),
converters,
headerConverters,
Collections.emptySortedSet(),
Collections.emptySortedSet(),
Collections.emptySortedSet(),
Collections.emptySortedSet(),
Collections.emptySortedSet()
);
Map<String, String> actualAliases = PluginUtils.computeAliases(result);
Map<String, String> expectedAliases = new HashMap<>();
expectedAliases.put("CollidingConverter", CollidingConverter.class.getName());
expectedAliases.put("CollidingHeaderConverter", CollidingHeaderConverter.class.getName());
assertEquals(expectedAliases, actualAliases);
}
@SuppressWarnings("unchecked")
@Test
public void testCollidingSimpleAlias() {
SortedSet<PluginDesc<Converter>> converters = new TreeSet<>();
converters.add(new PluginDesc<>(CollidingConverter.class, null, CollidingConverter.class.getClassLoader()));
SortedSet<PluginDesc<Transformation<?>>> transformations = new TreeSet<>();
transformations.add(new PluginDesc<>((Class<? extends Transformation<?>>) (Class<?>) Colliding.class, null, Colliding.class.getClassLoader()));
PluginScanResult result = new PluginScanResult(
Collections.emptySortedSet(),
Collections.emptySortedSet(),
converters,
Collections.emptySortedSet(),
transformations,
Collections.emptySortedSet(),
Collections.emptySortedSet(),
Collections.emptySortedSet(),
Collections.emptySortedSet()
);
Map<String, String> actualAliases = PluginUtils.computeAliases(result);
Map<String, String> expectedAliases = new HashMap<>();
expectedAliases.put("CollidingConverter", CollidingConverter.class.getName());
assertEquals(expectedAliases, actualAliases);
}
public static class CollidingConverter implements Converter {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] fromConnectData(String topic, Schema schema, Object value) {
return new byte[0];
}
@Override
public SchemaAndValue toConnectData(String topic, byte[] value) {
return null;
}
}
public static class CollidingHeaderConverter implements HeaderConverter {
@Override
public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) {
return null;
}
@Override
public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) {
return new byte[0];
}
@Override
public ConfigDef config() {
return null;
}
@Override
public void close() throws IOException {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
public static class Colliding<R extends ConnectRecord<R>> implements Transformation<R> {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public R apply(R record) {
return null;
}
@Override
public ConfigDef config() {
return null;
}
@Override
public void close() {
}
}
private void createBasicDirectoryLayout() throws IOException {
Files.createDirectories(pluginPath.resolve("connectorA"));
Files.createDirectories(pluginPath.resolve("connectorB/deps"));