diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java index cedabe5a828..cf0a5a1ba7f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java @@ -18,7 +18,6 @@ package org.apache.kafka.connect.runtime.isolation; import org.apache.kafka.common.config.provider.ConfigProvider; import org.apache.kafka.connect.components.Versioned; -import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.rest.ConnectRestExtension; import org.apache.kafka.connect.sink.SinkConnector; @@ -48,10 +47,11 @@ import java.security.PrivilegedAction; import java.sql.Driver; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.ServiceConfigurationError; import java.util.ServiceLoader; import java.util.Set; @@ -79,15 +79,6 @@ public class DelegatingClassLoader extends URLClassLoader { private final ConcurrentMap, ClassLoader>> pluginLoaders; private final ConcurrentMap aliases; - private final SortedSet> sinkConnectors; - private final SortedSet> sourceConnectors; - private final SortedSet> converters; - private final SortedSet> headerConverters; - private final SortedSet>> transformations; - private final SortedSet>> predicates; - private final SortedSet> configProviders; - private final SortedSet> restExtensions; - private final SortedSet> connectorClientConfigPolicies; private final List pluginLocations; // Although this classloader does not load classes directly but rather delegates loading to a @@ -103,15 +94,6 @@ public class DelegatingClassLoader extends URLClassLoader { this.pluginLocations = pluginLocations; this.pluginLoaders = new ConcurrentHashMap<>(); this.aliases = new ConcurrentHashMap<>(); - this.sinkConnectors = new TreeSet<>(); - this.sourceConnectors = new TreeSet<>(); - this.converters = new TreeSet<>(); - this.headerConverters = new TreeSet<>(); - this.transformations = new TreeSet<>(); - this.predicates = new TreeSet<>(); - this.configProviders = new TreeSet<>(); - this.restExtensions = new TreeSet<>(); - this.connectorClientConfigPolicies = new TreeSet<>(); } public DelegatingClassLoader(List pluginLocations) { @@ -122,49 +104,6 @@ public class DelegatingClassLoader extends URLClassLoader { this(pluginLocations, DelegatingClassLoader.class.getClassLoader()); } - @SuppressWarnings({"unchecked", "rawtypes"}) - public Set> connectors() { - Set> connectors = new TreeSet<>((Set) sinkConnectors); - connectors.addAll((Set) sourceConnectors); - return connectors; - } - - public Set> sinkConnectors() { - return sinkConnectors; - } - - public Set> sourceConnectors() { - return sourceConnectors; - } - - public Set> converters() { - return converters; - } - - public Set> headerConverters() { - return headerConverters; - } - - public Set>> transformations() { - return transformations; - } - - public Set>> predicates() { - return predicates; - } - - public Set> configProviders() { - return configProviders; - } - - public Set> restExtensions() { - return restExtensions; - } - - public Set> connectorClientConfigPolicies() { - return connectorClientConfigPolicies; - } - /** * Retrieve the PluginClassLoader associated with a plugin class * @param name The fully qualified class name of the plugin @@ -208,24 +147,11 @@ public class DelegatingClassLoader extends URLClassLoader { ); } - private void addPlugins(Collection> plugins, ClassLoader loader) { - for (PluginDesc plugin : plugins) { - String pluginClassName = plugin.className(); - SortedMap, ClassLoader> inner = pluginLoaders.get(pluginClassName); - if (inner == null) { - inner = new TreeMap<>(); - pluginLoaders.put(pluginClassName, inner); - // TODO: once versioning is enabled this line should be moved outside this if branch - log.info("Added plugin '{}'", pluginClassName); - } - inner.put(plugin, loader); - } - } - - protected void initLoaders() { + public PluginScanResult initLoaders() { + List results = new ArrayList<>(); for (Path pluginLocation : pluginLocations) { try { - registerPlugin(pluginLocation); + results.add(registerPlugin(pluginLocation)); } catch (InvalidPathException | MalformedURLException e) { log.error("Invalid path in plugin path: {}. Ignoring.", pluginLocation, e); } catch (IOException e) { @@ -233,14 +159,16 @@ public class DelegatingClassLoader extends URLClassLoader { } } // Finally add parent/system loader. - scanUrlsAndAddPlugins( + results.add(scanUrlsAndAddPlugins( getParent(), ClasspathHelper.forJavaClassPath().toArray(new URL[0]) - ); - addAllAliases(); + )); + PluginScanResult scanResult = new PluginScanResult(results); + installDiscoveredPlugins(scanResult); + return scanResult; } - private void registerPlugin(Path pluginLocation) + private PluginScanResult registerPlugin(Path pluginLocation) throws IOException { log.info("Loading plugin from: {}", pluginLocation); List pluginUrls = new ArrayList<>(); @@ -256,37 +184,17 @@ public class DelegatingClassLoader extends URLClassLoader { urls, this ); - scanUrlsAndAddPlugins(loader, urls); + return scanUrlsAndAddPlugins(loader, urls); } - private void scanUrlsAndAddPlugins( + private PluginScanResult scanUrlsAndAddPlugins( ClassLoader loader, URL[] urls ) { PluginScanResult plugins = scanPluginPath(loader, urls); log.info("Registered loader: {}", loader); - if (!plugins.isEmpty()) { - addPlugins(plugins.sinkConnectors(), loader); - sinkConnectors.addAll(plugins.sinkConnectors()); - addPlugins(plugins.sourceConnectors(), loader); - sourceConnectors.addAll(plugins.sourceConnectors()); - addPlugins(plugins.converters(), loader); - converters.addAll(plugins.converters()); - addPlugins(plugins.headerConverters(), loader); - headerConverters.addAll(plugins.headerConverters()); - addPlugins(plugins.transformations(), loader); - transformations.addAll(plugins.transformations()); - addPlugins(plugins.predicates(), loader); - predicates.addAll(plugins.predicates()); - addPlugins(plugins.configProviders(), loader); - configProviders.addAll(plugins.configProviders()); - addPlugins(plugins.restExtensions(), loader); - restExtensions.addAll(plugins.restExtensions()); - addPlugins(plugins.connectorClientConfigPolicies(), loader); - connectorClientConfigPolicies.addAll(plugins.connectorClientConfigPolicies()); - } - loadJdbcDrivers(loader); + return plugins; } private void loadJdbcDrivers(final ClassLoader loader) { @@ -344,16 +252,16 @@ public class DelegatingClassLoader extends URLClassLoader { } @SuppressWarnings({"unchecked"}) - private Collection>> getPredicatePluginDesc(ClassLoader loader, Reflections reflections) { - return (Collection>>) (Collection) getPluginDesc(reflections, Predicate.class, loader); + private SortedSet>> getPredicatePluginDesc(ClassLoader loader, Reflections reflections) { + return (SortedSet>>) (SortedSet) getPluginDesc(reflections, Predicate.class, loader); } @SuppressWarnings({"unchecked"}) - private Collection>> getTransformationPluginDesc(ClassLoader loader, Reflections reflections) { - return (Collection>>) (Collection) getPluginDesc(reflections, Transformation.class, loader); + private SortedSet>> getTransformationPluginDesc(ClassLoader loader, Reflections reflections) { + return (SortedSet>>) (SortedSet) getPluginDesc(reflections, Transformation.class, loader); } - private Collection> getPluginDesc( + private SortedSet> getPluginDesc( Reflections reflections, Class klass, ClassLoader loader @@ -364,10 +272,10 @@ public class DelegatingClassLoader extends URLClassLoader { } catch (ReflectionsException e) { log.debug("Reflections scanner could not find any classes for URLs: " + reflections.getConfiguration().getUrls(), e); - return Collections.emptyList(); + return Collections.emptySortedSet(); } - Collection> result = new ArrayList<>(); + SortedSet> result = new TreeSet<>(); for (Class pluginKlass : plugins) { if (!PluginUtils.isConcrete(pluginKlass)) { log.debug("Skipping {} as it is not concrete implementation", pluginKlass); @@ -393,8 +301,8 @@ public class DelegatingClassLoader extends URLClassLoader { } @SuppressWarnings("unchecked") - private Collection> getServiceLoaderPluginDesc(Class klass, ClassLoader loader) { - Collection> result = new ArrayList<>(); + private SortedSet> getServiceLoaderPluginDesc(Class klass, ClassLoader loader) { + SortedSet> result = new TreeSet<>(); ServiceLoader serviceLoader = ServiceLoader.load(klass, loader); for (Iterator iterator = serviceLoader.iterator(); iterator.hasNext(); ) { try (LoaderSwap loaderSwap = withClassLoader(loader)) { @@ -461,6 +369,17 @@ public class DelegatingClassLoader extends URLClassLoader { } } + private void installDiscoveredPlugins(PluginScanResult scanResult) { + pluginLoaders.putAll(computePluginLoaders(scanResult)); + for (String pluginClassName : pluginLoaders.keySet()) { + log.info("Added plugin '{}'", pluginClassName); + } + aliases.putAll(PluginUtils.computeAliases(scanResult)); + for (Map.Entry alias : aliases.entrySet()) { + log.info("Added alias '{}' to plugin '{}'", alias.getKey(), alias.getValue()); + } + } + @Override protected Class loadClass(String name, boolean resolve) throws ClassNotFoundException { String fullName = aliases.getOrDefault(name, name); @@ -473,35 +392,12 @@ public class DelegatingClassLoader extends URLClassLoader { return super.loadClass(fullName, resolve); } - private void addAllAliases() { - addAliases(connectors()); - addAliases(converters); - addAliases(headerConverters); - addAliases(transformations); - addAliases(predicates); - addAliases(restExtensions); - addAliases(connectorClientConfigPolicies); - } - - private void addAliases(Collection> plugins) { - for (PluginDesc plugin : plugins) { - if (PluginUtils.isAliasUnique(plugin, plugins)) { - String simple = PluginUtils.simpleName(plugin); - String pruned = PluginUtils.prunedName(plugin); - aliases.put(simple, plugin.className()); - if (simple.equals(pruned)) { - log.info("Added alias '{}' to plugin '{}'", simple, plugin.className()); - } else { - aliases.put(pruned, plugin.className()); - log.info( - "Added aliases '{}' and '{}' to plugin '{}'", - simple, - pruned, - plugin.className() - ); - } - } - } + private static Map, ClassLoader>> computePluginLoaders(PluginScanResult plugins) { + Map, ClassLoader>> pluginLoaders = new HashMap<>(); + plugins.forEach(pluginDesc -> + pluginLoaders.computeIfAbsent(pluginDesc.className(), k -> new TreeMap<>()) + .put(pluginDesc, pluginDesc.loader())); + return pluginLoaders; } private static class InternalReflections extends Reflections { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java index 62a7d6cd65d..c2829c60273 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java @@ -29,6 +29,7 @@ public class PluginDesc implements Comparable> { private final PluginType type; private final String typeName; private final String location; + private final ClassLoader loader; public PluginDesc(Class klass, String version, ClassLoader loader) { this.klass = klass; @@ -40,6 +41,7 @@ public class PluginDesc implements Comparable> { this.location = loader instanceof PluginClassLoader ? ((PluginClassLoader) loader).location() : "classpath"; + this.loader = loader; } @Override @@ -83,6 +85,10 @@ public class PluginDesc implements Comparable> { return location; } + public ClassLoader loader() { + return loader; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -105,6 +111,9 @@ public class PluginDesc implements Comparable> { @Override public int compareTo(PluginDesc other) { int nameComp = name.compareTo(other.name); - return nameComp != 0 ? nameComp : encodedVersion.compareTo(other.encodedVersion); + int versionComp = encodedVersion.compareTo(other.encodedVersion); + // isolated plugins appear after classpath plugins when they have identical versions. + int isolatedComp = Boolean.compare(other.loader instanceof PluginClassLoader, loader instanceof PluginClassLoader); + return nameComp != 0 ? nameComp : (versionComp != 0 ? versionComp : isolatedComp); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java index 565fe6dc67d..ae015ed3507 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java @@ -27,32 +27,35 @@ import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.predicates.Predicate; import java.util.Arrays; -import java.util.Collection; +import java.util.SortedSet; import java.util.List; +import java.util.TreeSet; +import java.util.function.Consumer; +import java.util.function.Function; public class PluginScanResult { - private final Collection> sinkConnectors; - private final Collection> sourceConnectors; - private final Collection> converters; - private final Collection> headerConverters; - private final Collection>> transformations; - private final Collection>> predicates; - private final Collection> configProviders; - private final Collection> restExtensions; - private final Collection> connectorClientConfigPolicies; + private final SortedSet> sinkConnectors; + private final SortedSet> sourceConnectors; + private final SortedSet> converters; + private final SortedSet> headerConverters; + private final SortedSet>> transformations; + private final SortedSet>> predicates; + private final SortedSet> configProviders; + private final SortedSet> restExtensions; + private final SortedSet> connectorClientConfigPolicies; - private final List> allPlugins; + private final List>> allPlugins; public PluginScanResult( - Collection> sinkConnectors, - Collection> sourceConnectors, - Collection> converters, - Collection> headerConverters, - Collection>> transformations, - Collection>> predicates, - Collection> configProviders, - Collection> restExtensions, - Collection> connectorClientConfigPolicies + SortedSet> sinkConnectors, + SortedSet> sourceConnectors, + SortedSet> converters, + SortedSet> headerConverters, + SortedSet>> transformations, + SortedSet>> predicates, + SortedSet> configProviders, + SortedSet> restExtensions, + SortedSet> connectorClientConfigPolicies ) { this.sinkConnectors = sinkConnectors; this.sourceConnectors = sourceConnectors; @@ -64,49 +67,78 @@ public class PluginScanResult { this.restExtensions = restExtensions; this.connectorClientConfigPolicies = connectorClientConfigPolicies; this.allPlugins = - Arrays.asList(sinkConnectors, sourceConnectors, converters, headerConverters, transformations, configProviders, - connectorClientConfigPolicies); + Arrays.asList(sinkConnectors, sourceConnectors, converters, headerConverters, transformations, predicates, + configProviders, restExtensions, connectorClientConfigPolicies); } - public Collection> sinkConnectors() { + /** + * Merge one or more {@link PluginScanResult results} into one result object + */ + public PluginScanResult(List results) { + this( + merge(results, PluginScanResult::sinkConnectors), + merge(results, PluginScanResult::sourceConnectors), + merge(results, PluginScanResult::converters), + merge(results, PluginScanResult::headerConverters), + merge(results, PluginScanResult::transformations), + merge(results, PluginScanResult::predicates), + merge(results, PluginScanResult::configProviders), + merge(results, PluginScanResult::restExtensions), + merge(results, PluginScanResult::connectorClientConfigPolicies) + ); + } + + private static > SortedSet merge(List results, Function> accessor) { + SortedSet merged = new TreeSet<>(); + for (PluginScanResult element : results) { + merged.addAll(accessor.apply(element)); + } + return merged; + } + + public SortedSet> sinkConnectors() { return sinkConnectors; } - public Collection> sourceConnectors() { + public SortedSet> sourceConnectors() { return sourceConnectors; } - public Collection> converters() { + public SortedSet> converters() { return converters; } - public Collection> headerConverters() { + public SortedSet> headerConverters() { return headerConverters; } - public Collection>> transformations() { + public SortedSet>> transformations() { return transformations; } - public Collection>> predicates() { + public SortedSet>> predicates() { return predicates; } - public Collection> configProviders() { + public SortedSet> configProviders() { return configProviders; } - public Collection> restExtensions() { + public SortedSet> restExtensions() { return restExtensions; } - public Collection> connectorClientConfigPolicies() { + public SortedSet> connectorClientConfigPolicies() { return connectorClientConfigPolicies; } + public void forEach(Consumer> consumer) { + allPlugins.forEach(plugins -> plugins.forEach(consumer)); + } + public boolean isEmpty() { boolean isEmpty = true; - for (Collection plugins : allPlugins) { + for (SortedSet plugins : allPlugins) { isEmpty = isEmpty && plugins.isEmpty(); } return isEmpty; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java index e7e0271d16b..3592e0ec96c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java @@ -28,13 +28,14 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.regex.Pattern; @@ -352,31 +353,6 @@ public class PluginUtils { } } - /** - * Verify whether a given plugin's alias matches another alias in a collection of plugins. - * - * @param alias the plugin descriptor to test for alias matching. - * @param plugins the collection of plugins to test against. - * @param the plugin type. - * @return false if a match was found in the collection, otherwise true. - */ - public static boolean isAliasUnique( - PluginDesc alias, - Collection> plugins - ) { - boolean matched = false; - for (PluginDesc plugin : plugins) { - if (simpleName(alias).equals(simpleName(plugin)) - || prunedName(alias).equals(prunedName(plugin))) { - if (matched) { - return false; - } - matched = true; - } - } - return true; - } - private static String prunePluginName(PluginDesc plugin, String suffix) { String simple = plugin.pluginClass().getSimpleName(); int pos = simple.lastIndexOf(suffix); @@ -386,6 +362,25 @@ public class PluginUtils { return simple; } + public static Map computeAliases(PluginScanResult scanResult) { + Map> aliasCollisions = new HashMap<>(); + scanResult.forEach(pluginDesc -> { + aliasCollisions.computeIfAbsent(simpleName(pluginDesc), ignored -> new HashSet<>()).add(pluginDesc.className()); + aliasCollisions.computeIfAbsent(prunedName(pluginDesc), ignored -> new HashSet<>()).add(pluginDesc.className()); + }); + Map aliases = new HashMap<>(); + for (Map.Entry> entry : aliasCollisions.entrySet()) { + String alias = entry.getKey(); + Set classNames = entry.getValue(); + if (classNames.size() == 1) { + aliases.put(alias, classNames.stream().findAny().get()); + } else { + log.warn("Ignoring ambiguous alias '{}' since it refers to multiple distinct plugins {}", alias, classNames); + } + } + return aliases; + } + private static class DirectoryEntry { final DirectoryStream stream; final Iterator iterator; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java index cc763ae8dbc..a43f0f226a2 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -45,6 +45,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeSet; public class Plugins { @@ -55,6 +56,7 @@ public class Plugins { private static final Logger log = LoggerFactory.getLogger(Plugins.class); private final DelegatingClassLoader delegatingLoader; + private final PluginScanResult scanResult; public Plugins(Map props) { this(props, Plugins.class.getClassLoader()); @@ -65,7 +67,7 @@ public class Plugins { String pluginPath = WorkerConfig.pluginPath(props); List pluginLocations = PluginUtils.pluginLocations(pluginPath); delegatingLoader = newDelegatingClassLoader(pluginLocations, parent); - delegatingLoader.initLoaders(); + scanResult = delegatingLoader.initLoaders(); } // VisibleForTesting @@ -194,32 +196,39 @@ public class Plugins { return delegatingLoader.connectorLoader(connectorClassOrAlias); } + @SuppressWarnings({"unchecked", "rawtypes"}) + public Set> connectors() { + Set> connectors = new TreeSet<>((Set) sinkConnectors()); + connectors.addAll((Set) sourceConnectors()); + return connectors; + } + public Set> sinkConnectors() { - return delegatingLoader.sinkConnectors(); + return scanResult.sinkConnectors(); } public Set> sourceConnectors() { - return delegatingLoader.sourceConnectors(); + return scanResult.sourceConnectors(); } public Set> converters() { - return delegatingLoader.converters(); + return scanResult.converters(); } public Set> headerConverters() { - return delegatingLoader.headerConverters(); + return scanResult.headerConverters(); } public Set>> transformations() { - return delegatingLoader.transformations(); + return scanResult.transformations(); } public Set>> predicates() { - return delegatingLoader.predicates(); + return scanResult.predicates(); } public Set> connectorClientConfigPolicies() { - return delegatingLoader.connectorClientConfigPolicies(); + return scanResult.connectorClientConfigPolicies(); } public Object newPlugin(String classOrAlias) throws ClassNotFoundException { @@ -242,7 +251,7 @@ public class Plugins { ); } catch (ClassNotFoundException e) { List> matches = new ArrayList<>(); - Set> connectors = delegatingLoader.connectors(); + Set> connectors = connectors(); for (PluginDesc plugin : connectors) { Class pluginClass = plugin.pluginClass(); String simpleName = pluginClass.getSimpleName(); @@ -300,7 +309,7 @@ public class Plugins { // Attempt to load first with the current classloader, and plugins as a fallback. // Note: we can't use config.getConfiguredInstance because Converter doesn't implement Configurable, and even if it did // we have to remove the property prefixes before calling config(...) and we still always want to call Converter.config. - klass = pluginClassFromConfig(config, classPropertyName, Converter.class, delegatingLoader.converters()); + klass = pluginClassFromConfig(config, classPropertyName, Converter.class, scanResult.converters()); break; case PLUGINS: // Attempt to load with the plugin class loader, which uses the current classloader as a fallback @@ -311,7 +320,7 @@ public class Plugins { throw new ConnectException( "Failed to find any class that implements Converter and which name matches " + converterClassOrAlias + ", available converters are: " - + pluginNames(delegatingLoader.converters()) + + pluginNames(scanResult.converters()) ); } break; @@ -383,7 +392,7 @@ public class Plugins { // Attempt to load first with the current classloader, and plugins as a fallback. // Note: we can't use config.getConfiguredInstance because we have to remove the property prefixes // before calling config(...) - klass = pluginClassFromConfig(config, classPropertyName, HeaderConverter.class, delegatingLoader.headerConverters()); + klass = pluginClassFromConfig(config, classPropertyName, HeaderConverter.class, scanResult.headerConverters()); break; case PLUGINS: // Attempt to load with the plugin class loader, which uses the current classloader as a fallback. @@ -400,7 +409,7 @@ public class Plugins { "Failed to find any class that implements HeaderConverter and which name matches " + converterClassOrAlias + ", available header converters are: " - + pluginNames(delegatingLoader.headerConverters()) + + pluginNames(scanResult.headerConverters()) ); } } @@ -432,7 +441,7 @@ public class Plugins { switch (classLoaderUsage) { case CURRENT_CLASSLOADER: // Attempt to load first with the current classloader, and plugins as a fallback. - klass = pluginClassFromConfig(config, classPropertyName, ConfigProvider.class, delegatingLoader.configProviders()); + klass = pluginClassFromConfig(config, classPropertyName, ConfigProvider.class, scanResult.configProviders()); break; case PLUGINS: // Attempt to load with the plugin class loader, which uses the current classloader as a fallback @@ -443,7 +452,7 @@ public class Plugins { throw new ConnectException( "Failed to find any class that implements ConfigProvider and which name matches " + configProviderClassOrAlias + ", available ConfigProviders are: " - + pluginNames(delegatingLoader.configProviders()) + + pluginNames(scanResult.configProviders()) ); } break; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java index 72a2493e7f0..affd7df268c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.transforms.Transformation; +import org.apache.kafka.connect.transforms.predicates.Predicate; import org.junit.Before; import org.junit.Test; @@ -223,6 +224,20 @@ public class PluginDescTest { ); assertNewer(transformDescPluginPath, transformDescClasspath); + + PluginDesc predicateDescPluginPath = new PluginDesc<>( + Predicate.class, + regularVersion, + pluginLoader + ); + + PluginDesc predicateDescClasspath = new PluginDesc<>( + Predicate.class, + regularVersion, + systemLoader + ); + + assertNewer(predicateDescPluginPath, predicateDescClasspath); } private static void assertPluginDesc( diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java index 19766989fcf..3e4c219b0ab 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java @@ -16,6 +16,17 @@ */ package org.apache.kafka.connect.runtime.isolation; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; +import org.apache.kafka.connect.tools.MockSinkConnector; +import org.apache.kafka.connect.tools.MockSourceConnector; +import org.apache.kafka.connect.transforms.Transformation; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -28,7 +39,11 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -490,6 +505,173 @@ public class PluginUtilsTest { assertUrls(expectedUrls, PluginUtils.pluginUrls(pluginPath)); } + @Test + public void testNonCollidingAliases() { + SortedSet> sinkConnectors = new TreeSet<>(); + sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, MockSinkConnector.class.getClassLoader())); + SortedSet> sourceConnectors = new TreeSet<>(); + sourceConnectors.add(new PluginDesc<>(MockSourceConnector.class, null, MockSourceConnector.class.getClassLoader())); + SortedSet> converters = new TreeSet<>(); + converters.add(new PluginDesc<>(CollidingConverter.class, null, CollidingConverter.class.getClassLoader())); + PluginScanResult result = new PluginScanResult( + sinkConnectors, + sourceConnectors, + converters, + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet() + ); + Map aliases = PluginUtils.computeAliases(result); + Map actualAliases = PluginUtils.computeAliases(result); + Map expectedAliases = new HashMap<>(); + expectedAliases.put("MockSinkConnector", MockSinkConnector.class.getName()); + expectedAliases.put("MockSink", MockSinkConnector.class.getName()); + expectedAliases.put("MockSourceConnector", MockSourceConnector.class.getName()); + expectedAliases.put("MockSource", MockSourceConnector.class.getName()); + expectedAliases.put("CollidingConverter", CollidingConverter.class.getName()); + expectedAliases.put("Colliding", CollidingConverter.class.getName()); + assertEquals(expectedAliases, actualAliases); + } + + @Test + public void testMultiVersionAlias() { + SortedSet> sinkConnectors = new TreeSet<>(); + // distinct versions don't cause an alias collision (the class name is the same) + sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, MockSinkConnector.class.getClassLoader())); + sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, "1.0", MockSinkConnector.class.getClassLoader())); + assertEquals(2, sinkConnectors.size()); + PluginScanResult result = new PluginScanResult( + sinkConnectors, + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet() + ); + Map actualAliases = PluginUtils.computeAliases(result); + Map expectedAliases = new HashMap<>(); + expectedAliases.put("MockSinkConnector", MockSinkConnector.class.getName()); + expectedAliases.put("MockSink", MockSinkConnector.class.getName()); + assertEquals(expectedAliases, actualAliases); + } + + @Test + public void testCollidingPrunedAlias() { + SortedSet> converters = new TreeSet<>(); + converters.add(new PluginDesc<>(CollidingConverter.class, null, CollidingConverter.class.getClassLoader())); + SortedSet> headerConverters = new TreeSet<>(); + headerConverters.add(new PluginDesc<>(CollidingHeaderConverter.class, null, CollidingHeaderConverter.class.getClassLoader())); + PluginScanResult result = new PluginScanResult( + Collections.emptySortedSet(), + Collections.emptySortedSet(), + converters, + headerConverters, + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet() + ); + Map actualAliases = PluginUtils.computeAliases(result); + Map expectedAliases = new HashMap<>(); + expectedAliases.put("CollidingConverter", CollidingConverter.class.getName()); + expectedAliases.put("CollidingHeaderConverter", CollidingHeaderConverter.class.getName()); + assertEquals(expectedAliases, actualAliases); + } + + @SuppressWarnings("unchecked") + @Test + public void testCollidingSimpleAlias() { + SortedSet> converters = new TreeSet<>(); + converters.add(new PluginDesc<>(CollidingConverter.class, null, CollidingConverter.class.getClassLoader())); + SortedSet>> transformations = new TreeSet<>(); + transformations.add(new PluginDesc<>((Class>) (Class) Colliding.class, null, Colliding.class.getClassLoader())); + PluginScanResult result = new PluginScanResult( + Collections.emptySortedSet(), + Collections.emptySortedSet(), + converters, + Collections.emptySortedSet(), + transformations, + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet(), + Collections.emptySortedSet() + ); + Map actualAliases = PluginUtils.computeAliases(result); + Map expectedAliases = new HashMap<>(); + expectedAliases.put("CollidingConverter", CollidingConverter.class.getName()); + assertEquals(expectedAliases, actualAliases); + } + + public static class CollidingConverter implements Converter { + @Override + public void configure(Map configs, boolean isKey) { + } + + @Override + public byte[] fromConnectData(String topic, Schema schema, Object value) { + return new byte[0]; + } + + @Override + public SchemaAndValue toConnectData(String topic, byte[] value) { + return null; + } + } + + public static class CollidingHeaderConverter implements HeaderConverter { + + @Override + public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) { + return null; + } + + @Override + public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) { + return new byte[0]; + } + + @Override + public ConfigDef config() { + return null; + } + + @Override + public void close() throws IOException { + } + + @Override + public void configure(Map configs) { + } + } + + public static class Colliding> implements Transformation { + + @Override + public void configure(Map configs) { + } + + @Override + public R apply(R record) { + return null; + } + + @Override + public ConfigDef config() { + return null; + } + + @Override + public void close() { + } + } + private void createBasicDirectoryLayout() throws IOException { Files.createDirectories(pluginPath.resolve("connectorA")); Files.createDirectories(pluginPath.resolve("connectorB/deps"));