From a580826ae91a4ae61a87a1aa817ee81f1a34cda2 Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Tue, 22 Oct 2019 16:13:20 -0700 Subject: [PATCH] KAFKA-9057: Backport KAFKA-8819 and KAFKA-8340 to 1.0 (#7580) Includes fixes from PR-7315 (KAFKA-8819 and KAFKA-8340), but omits ConfigProvider and Configurable test cases and plugins, and replaces Java 8 language features with suitable Java 7 features. Also addressed the following merge conflicts from the `1.1` PR: * Remove test cases made incompatible by 1.1 Converter refactor * Adding PluginsTest file which did not exist before * Removing Converter::configure test case * Removing HeaderConverter test plugin and test cases * Removing Converter Types test cases * Removing classloader control test cases Signed-off-by: Greg Harris Author: Greg Harris Reviewer: Randall Hauch --- checkstyle/import-control.xml | 1 + .../apache/kafka/connect/runtime/Worker.java | 4 +- .../isolation/DelegatingClassLoader.java | 39 ++- .../connect/runtime/isolation/Plugins.java | 31 +- .../runtime/isolation/PluginsTest.java | 196 +++++++++++++ .../runtime/isolation/SamplingTestPlugin.java | 122 ++++++++ .../runtime/isolation/TestPlugins.java | 271 ++++++++++++++++++ .../test/plugins/AliasedStaticField.java | 75 +++++ .../test/plugins/AlwaysThrowException.java | 53 ++++ .../test/plugins/SamplingConverter.java | 76 +++++ .../services/test.plugins.ServiceLoadedClass | 16 ++ .../test/plugins/ServiceLoadedClass.java | 48 ++++ .../test/plugins/ServiceLoadedSubclass.java | 46 +++ .../test/plugins/ServiceLoaderPlugin.java | 85 ++++++ 14 files changed, 1046 insertions(+), 17 deletions(-) create mode 100644 connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java create mode 100644 connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SamplingTestPlugin.java create mode 100644 connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java create mode 100644 connect/runtime/src/test/resources/test-plugins/aliased-static-field/test/plugins/AliasedStaticField.java create mode 100644 connect/runtime/src/test/resources/test-plugins/always-throw-exception/test/plugins/AlwaysThrowException.java create mode 100644 connect/runtime/src/test/resources/test-plugins/sampling-converter/test/plugins/SamplingConverter.java create mode 100644 connect/runtime/src/test/resources/test-plugins/service-loader/META-INF/services/test.plugins.ServiceLoadedClass create mode 100644 connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoadedClass.java create mode 100644 connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoadedSubclass.java create mode 100644 connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoaderPlugin.java diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index ddb13bc4d09..76744cf154b 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -303,6 +303,7 @@ + diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index c6e2e173834..597cc53b046 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -366,10 +366,10 @@ public class Worker { final WorkerTask workerTask; ClassLoader savedLoader = plugins.currentThreadLoader(); try { - final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps); - String connType = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + String connType = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); ClassLoader connectorLoader = plugins.delegatingLoader().connectorLoader(connType); savedLoader = Plugins.compareAndSwapLoaders(connectorLoader); + final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps); final TaskConfig taskConfig = new TaskConfig(taskProps); final Class taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class); final Task task = plugins.newTask(taskClass); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java index 11f2bd0f5a1..cceb468bd54 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java @@ -97,13 +97,32 @@ public class DelegatingClassLoader extends URLClassLoader { return connectorLoader(connector.getClass().getName()); } + /** + * Retrieve the PluginClassLoader associated with a plugin class + * @param name The fully qualified class name of the plugin + * @return the PluginClassLoader that should be used to load this, or null if the plugin is not isolated. + */ + public PluginClassLoader pluginClassLoader(String name) { + if (!PluginUtils.shouldLoadInIsolation(name)) { + return null; + } + SortedMap, ClassLoader> inner = pluginLoaders.get(name); + if (inner == null) { + return null; + } + ClassLoader pluginLoader = inner.get(inner.lastKey()); + return pluginLoader instanceof PluginClassLoader + ? (PluginClassLoader) pluginLoader + : null; + } + public ClassLoader connectorLoader(String connectorClassOrAlias) { log.debug("Getting plugin class loader for connector: '{}'", connectorClassOrAlias); String fullName = aliases.containsKey(connectorClassOrAlias) ? aliases.get(connectorClassOrAlias) : connectorClassOrAlias; - SortedMap, ClassLoader> inner = pluginLoaders.get(fullName); - if (inner == null) { + PluginClassLoader classLoader = pluginClassLoader(fullName); + if (classLoader == null) { log.error( "Plugin class loader for connector: '{}' was not found. Returning: {}", connectorClassOrAlias, @@ -111,7 +130,7 @@ public class DelegatingClassLoader extends URLClassLoader { ); return this; } - return inner.get(inner.lastKey()); + return classLoader; } private static PluginClassLoader newPluginClassLoader( @@ -302,19 +321,11 @@ public class DelegatingClassLoader extends URLClassLoader { @Override protected Class loadClass(String name, boolean resolve) throws ClassNotFoundException { - if (!PluginUtils.shouldLoadInIsolation(name)) { - // There are no paths in this classloader, will attempt to load with the parent. - return super.loadClass(name, resolve); - } - String fullName = aliases.containsKey(name) ? aliases.get(name) : name; - SortedMap, ClassLoader> inner = pluginLoaders.get(fullName); - if (inner != null) { - ClassLoader pluginLoader = inner.get(inner.lastKey()); + PluginClassLoader pluginLoader = pluginClassLoader(fullName); + if (pluginLoader != null) { log.trace("Retrieving loaded class '{}' from '{}'", fullName, pluginLoader); - return pluginLoader instanceof PluginClassLoader - ? ((PluginClassLoader) pluginLoader).loadClass(fullName, resolve) - : super.loadClass(fullName, resolve); + return pluginLoader.loadClass(fullName, resolve); } return super.loadClass(fullName, resolve); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java index f7a5553126f..5fa48c15ee1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -63,13 +63,37 @@ public class Plugins { } protected static T newPlugin(Class klass) { + // KAFKA-8340: The thread classloader is used during static initialization and must be + // set to the plugin's classloader during instantiation + ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader()); try { return Utils.newInstance(klass); } catch (Throwable t) { throw new ConnectException("Instantiation error", t); + } finally { + compareAndSwapLoaders(savedLoader); } } + @SuppressWarnings("unchecked") + protected Class pluginClassFromConfig( + AbstractConfig config, + String propertyName, + Class pluginClass, + Collection> plugins + ) { + Class klass = config.getClass(propertyName); + if (pluginClass.isAssignableFrom(klass)) { + return (Class) klass; + } + throw new ConnectException( + "Failed to find any class that implements " + pluginClass.getSimpleName() + + " for the config " + + propertyName + ", available classes are: " + + pluginNames(plugins) + ); + } + protected static T newConfiguredPlugin(AbstractConfig config, Class klass) { T plugin = Utils.newInstance(klass); if (plugin instanceof Configurable) { @@ -204,7 +228,12 @@ public class Plugins { + pluginNames(delegatingLoader.converters()) ); } - return config != null ? newConfiguredPlugin(config, klass) : newPlugin(klass); + ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader()); + try { + return config != null ? newConfiguredPlugin(config, klass) : newPlugin(klass); + } finally { + compareAndSwapLoaders(savedLoader); + } } public > Transformation newTranformations( diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java new file mode 100644 index 00000000000..c2aa60b89ca --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.isolation; + +import java.util.Map.Entry; +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.storage.Converter; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +public class PluginsTest { + + private Plugins plugins; + private Map props; + private AbstractConfig config; + + @Before + public void setup() { + Map pluginProps = new HashMap<>(); + + // Set up the plugins with some test plugins to test isolation + pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, Utils.join(TestPlugins.pluginPath(), ",")); + plugins = new Plugins(pluginProps); + props = new HashMap<>(pluginProps); + props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, TestConverter.class.getName()); + props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, TestConverter.class.getName()); + props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, TestConverter.class.getName()); + props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, TestConverter.class.getName()); + + createConfig(); + } + + protected void createConfig() { + this.config = new TestableWorkerConfig(props); + } + + @Test(expected = ExceptionInInitializerError.class) + public void shouldThrowIfPluginThrows() { + TestPlugins.assertAvailable(); + props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, TestPlugins.ALWAYS_THROW_EXCEPTION); + ClassLoader classLoader = plugins.delegatingLoader().pluginClassLoader(TestPlugins.ALWAYS_THROW_EXCEPTION); + ClassLoader savedLoader = Plugins.compareAndSwapLoaders(classLoader); + try { + createConfig(); + } finally { + Plugins.compareAndSwapLoaders(savedLoader); + } + } + + @Test + public void shouldShareStaticValuesBetweenSamePlugin() { + // Plugins are not isolated from other instances of their own class. + TestPlugins.assertAvailable(); + + Converter firstPlugin = plugins.newConverter( + TestPlugins.ALIASED_STATIC_FIELD, + config + ); + + assertInstanceOf(SamplingTestPlugin.class, firstPlugin, "Cannot collect samples"); + + Converter secondPlugin = plugins.newConverter( + TestPlugins.ALIASED_STATIC_FIELD, + config + ); + + assertInstanceOf(SamplingTestPlugin.class, secondPlugin, "Cannot collect samples"); + assertSame( + ((SamplingTestPlugin) firstPlugin).otherSamples(), + ((SamplingTestPlugin) secondPlugin).otherSamples() + ); + } + + @Test + public void newPluginShouldServiceLoadWithPluginClassLoader() { + TestPlugins.assertAvailable(); + Converter plugin = plugins.newConverter( + TestPlugins.SERVICE_LOADER, + config + ); + + assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples"); + Map samples = ((SamplingTestPlugin) plugin).flatten(); + // Assert that the service loaded subclass is found in both environments + assertTrue(samples.containsKey("ServiceLoadedSubclass.static")); + assertTrue(samples.containsKey("ServiceLoadedSubclass.dynamic")); + assertPluginClassLoaderAlwaysActive(samples); + } + + @Test + public void newPluginShouldInstantiateWithPluginClassLoader() { + TestPlugins.assertAvailable(); + Converter plugin = plugins.newConverter( + TestPlugins.SERVICE_LOADER, + config + ); + + assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples"); + Map samples = ((SamplingTestPlugin) plugin).flatten(); + assertPluginClassLoaderAlwaysActive(samples); + } + + @Test(expected = ConfigException.class) + public void shouldFailToFindConverterInCurrentClassloader() { + TestPlugins.assertAvailable(); + props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, TestPlugins.SAMPLING_CONVERTER); + createConfig(); + } + + public static void assertPluginClassLoaderAlwaysActive(Map samples) { + for (Entry e : samples.entrySet()) { + String sampleName = "\"" + e.getKey() + "\" (" + e.getValue() + ")"; + assertInstanceOf( + PluginClassLoader.class, + e.getValue().staticClassloader(), + sampleName + " has incorrect static classloader" + ); + assertInstanceOf( + PluginClassLoader.class, + e.getValue().classloader(), + sampleName + " has incorrect dynamic classloader" + ); + } + } + + public static void assertInstanceOf(Class expected, Object actual, String message) { + assertTrue( + "Expected an instance of " + expected.getSimpleName() + ", found " + actual + " instead: " + message, + expected.isInstance(actual) + ); + } + + public static class TestableWorkerConfig extends WorkerConfig { + public TestableWorkerConfig(Map props) { + super(WorkerConfig.baseConfigDef(), props); + } + } + + public static class TestConverter implements Converter, Configurable { + public Map configs; + + public ConfigDef config() { + return null; + } + + @Override + public void configure(Map configs) { + this.configs = configs; + new JsonConverter().configure(configs, true); // requires the `converter.type` config be set + } + + @Override + public void configure(Map configs, boolean isKey) { + this.configs = configs; + } + + @Override + public byte[] fromConnectData(String topic, Schema schema, Object value) { + return new byte[0]; + } + + @Override + public SchemaAndValue toConnectData(String topic, byte[] value) { + return null; + } + } +} \ No newline at end of file diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SamplingTestPlugin.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SamplingTestPlugin.java new file mode 100644 index 00000000000..bcf8881898e --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SamplingTestPlugin.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.isolation; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +/** + * Base class for plugins so we can sample information about their initialization + */ +public abstract class SamplingTestPlugin { + + /** + * @return the ClassLoader used to statically initialize this plugin class + */ + public abstract ClassLoader staticClassloader(); + + /** + * @return the ClassLoader used to initialize this plugin instance + */ + public abstract ClassLoader classloader(); + + /** + * @return a group of other SamplingTestPlugin instances known by this plugin + * This should only return direct children, and not reference this instance directly + */ + public Map otherSamples() { + return Collections.emptyMap(); + } + + /** + * @return a flattened list of child samples including this entry keyed as "this" + */ + public Map flatten() { + Map out = new HashMap<>(); + Map otherSamples = otherSamples(); + if (otherSamples != null) { + for (Entry child : otherSamples.entrySet()) { + for (Entry flattened : child.getValue().flatten().entrySet()) { + String key = child.getKey(); + if (flattened.getKey().length() > 0) { + key += "." + flattened.getKey(); + } + out.put(key, flattened.getValue()); + } + } + } + out.put("", this); + return out; + } + + /** + * Log the parent method call as a child sample. + * Stores only the last invocation of each method if there are multiple invocations. + * @param samples The collection of samples to which this method call should be added + */ + public void logMethodCall(Map samples) { + StackTraceElement[] stackTraces = Thread.currentThread().getStackTrace(); + if (stackTraces.length < 2) { + return; + } + // 0 is inside getStackTrace + // 1 is this method + // 2 is our caller method + StackTraceElement caller = stackTraces[2]; + + samples.put(caller.getMethodName(), new MethodCallSample( + caller, + Thread.currentThread().getContextClassLoader(), + getClass().getClassLoader() + )); + } + + public static class MethodCallSample extends SamplingTestPlugin { + + private final StackTraceElement caller; + private final ClassLoader staticClassLoader; + private final ClassLoader dynamicClassLoader; + + public MethodCallSample( + StackTraceElement caller, + ClassLoader staticClassLoader, + ClassLoader dynamicClassLoader + ) { + this.caller = caller; + this.staticClassLoader = staticClassLoader; + this.dynamicClassLoader = dynamicClassLoader; + } + + @Override + public ClassLoader staticClassloader() { + return staticClassLoader; + } + + @Override + public ClassLoader classloader() { + return dynamicClassLoader; + } + + @Override + public String toString() { + return caller.toString(); + } + } +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java new file mode 100644 index 00000000000..e8b9a50f49c --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.isolation; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; +import java.net.URL; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.jar.Attributes; +import java.util.jar.JarEntry; +import java.util.jar.JarOutputStream; +import java.util.jar.Manifest; +import javax.tools.JavaCompiler; +import javax.tools.StandardJavaFileManager; +import javax.tools.ToolProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class for constructing test plugins for Connect. + * + *

Plugins are built from their source under resources/test-plugins/ and placed into temporary + * jar files that are deleted when the process exits. + * + *

To add a plugin, create the source files in the resource tree, and edit this class to build + * that plugin during initialization. For example, the plugin class {@literal package.Class} should + * be placed in {@literal resources/test-plugins/something/package/Class.java} and loaded using + * {@code createPluginJar("something")}. The class name, contents, and plugin directory can take + * any value you need for testing. + * + *

To use this class in your tests, make sure to first call + * {@link TestPlugins#assertAvailable()} to verify that the plugins initialized correctly. + * Otherwise, exceptions during the plugin build are not propagated, and may invalidate your test. + * You can access the list of plugin jars for assembling a {@literal plugin.path}, and reference + * the names of the different plugins directly via the exposed constants. + */ +public class TestPlugins { + + /** + * Class name of a plugin which will always throw an exception during loading + */ + public static final String ALWAYS_THROW_EXCEPTION = "test.plugins.AlwaysThrowException"; + /** + * Class name of a plugin which samples information about its initialization. + */ + public static final String ALIASED_STATIC_FIELD = "test.plugins.AliasedStaticField"; + /** + * Class name of a {@link org.apache.kafka.connect.storage.Converter} + * which samples information about its method calls. + */ + public static final String SAMPLING_CONVERTER = "test.plugins.SamplingConverter"; + /** + * Class name of a plugin which uses a {@link java.util.ServiceLoader} + * to load internal classes, and samples information about their initialization. + */ + public static final String SERVICE_LOADER = "test.plugins.ServiceLoaderPlugin"; + + private static final Logger log = LoggerFactory.getLogger(TestPlugins.class); + private static final Map PLUGIN_JARS; + private static final Throwable INITIALIZATION_EXCEPTION; + + static { + Throwable err = null; + HashMap pluginJars = new HashMap<>(); + try { + pluginJars.put(ALWAYS_THROW_EXCEPTION, createPluginJar("always-throw-exception")); + pluginJars.put(ALIASED_STATIC_FIELD, createPluginJar("aliased-static-field")); + pluginJars.put(SAMPLING_CONVERTER, createPluginJar("sampling-converter")); + pluginJars.put(SERVICE_LOADER, createPluginJar("service-loader")); + } catch (Throwable e) { + log.error("Could not set up plugin test jars", e); + err = e; + } + PLUGIN_JARS = Collections.unmodifiableMap(pluginJars); + INITIALIZATION_EXCEPTION = err; + } + + /** + * Ensure that the test plugin JARs were assembled without error before continuing. + * @throws AssertionError if any plugin failed to load, or no plugins were loaded. + */ + public static void assertAvailable() throws AssertionError { + if (INITIALIZATION_EXCEPTION != null) { + throw new AssertionError("TestPlugins did not initialize completely", + INITIALIZATION_EXCEPTION); + } + if (PLUGIN_JARS.isEmpty()) { + throw new AssertionError("No test plugins loaded"); + } + } + + /** + * A list of jar files containing test plugins + * @return A list of plugin jar filenames + */ + public static List pluginPath() { + List out = new ArrayList<>(); + for (File f : PLUGIN_JARS.values()) { + out.add(f.getPath()); + } + return out; + } + + /** + * Get all of the classes that were successfully built by this class + * @return A list of plugin class names + */ + public static List pluginClasses() { + return new ArrayList<>(PLUGIN_JARS.keySet()); + } + + private static File createPluginJar(String resourceDir) throws IOException { + Path inputDir = resourceDirectoryPath("test-plugins/" + resourceDir); + Path binDir = Files.createTempDirectory(resourceDir + ".bin."); + compileJavaSources(inputDir, binDir); + File jarFile = Files.createTempFile(resourceDir + ".", ".jar").toFile(); + try (JarOutputStream jar = openJarFile(jarFile)) { + writeJar(jar, inputDir); + writeJar(jar, binDir); + } + removeDirectory(binDir); + jarFile.deleteOnExit(); + return jarFile; + } + + private static Path resourceDirectoryPath(String resourceDir) throws IOException { + URL resource = Thread.currentThread() + .getContextClassLoader() + .getResource(resourceDir); + if (resource == null) { + throw new IOException("Could not find test plugin resource: " + resourceDir); + } + File file = new File(resource.getFile()); + if (!file.isDirectory()) { + throw new IOException("Resource is not a directory: " + resourceDir); + } + if (!file.canRead()) { + throw new IOException("Resource directory is not readable: " + resourceDir); + } + return file.toPath(); + } + + private static JarOutputStream openJarFile(File jarFile) throws IOException { + Manifest manifest = new Manifest(); + manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0"); + return new JarOutputStream(new FileOutputStream(jarFile), manifest); + } + + private static void removeDirectory(Path binDir) throws IOException { + Files.walkFileTree(binDir, new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { + if (!file.toFile().delete()) { + log.info("Could not delete " + file); + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException exc) { + if (!dir.toFile().delete()) { + log.info("Could not delete " + dir); + } + return FileVisitResult.CONTINUE; + } + }); + } + + /** + * Compile a directory of .java source files into .class files + * .class files are placed into the same directory as their sources. + * + *

Dependencies between source files in this directory are resolved against one another + * and the classes present in the test environment. + * See https://stackoverflow.com/questions/1563909/ for more information. + * Additional dependencies in your plugins should be added as test scope to :connect:runtime. + * @param sourceDir Directory containing java source files + * @throws IOException if the files cannot be compiled + */ + private static void compileJavaSources(Path sourceDir, Path binDir) throws IOException { + JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); + final List sourceFiles = new ArrayList<>(); + Files.walkFileTree(sourceDir, new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { + if (file.toFile().getName().endsWith(".java")) { + sourceFiles.add(file.toFile()); + } + return FileVisitResult.CONTINUE; + } + }); + + StringWriter writer = new StringWriter(); + List options = Arrays.asList( + "-d", binDir.toString() // Write class output to a different directory. + ); + + try (StandardJavaFileManager fileManager = compiler.getStandardFileManager(null, null, null)) { + boolean success = compiler.getTask( + writer, + fileManager, + null, + options, + null, + fileManager.getJavaFileObjectsFromFiles(sourceFiles) + ).call(); + if (!success) { + throw new RuntimeException("Failed to compile test plugin:\n" + writer); + } + } + } + + private static void writeJar(JarOutputStream jar, Path inputDir) throws IOException { + final List paths = new ArrayList<>(); + Files.walkFileTree(inputDir, new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { + if (!file.toFile().getName().endsWith(".java")) { + paths.add(file); + } + return FileVisitResult.CONTINUE; + } + }); + for (Path path : paths) { + try (InputStream in = new BufferedInputStream(new FileInputStream(path.toFile()))) { + jar.putNextEntry(new JarEntry( + inputDir.relativize(path) + .toFile() + .getPath() + .replace(File.separator, "/") + )); + byte[] buffer = new byte[1024]; + for (int count; (count = in.read(buffer)) != -1; ) { + jar.write(buffer, 0, count); + } + jar.closeEntry(); + } + } + } + +} diff --git a/connect/runtime/src/test/resources/test-plugins/aliased-static-field/test/plugins/AliasedStaticField.java b/connect/runtime/src/test/resources/test-plugins/aliased-static-field/test/plugins/AliasedStaticField.java new file mode 100644 index 00000000000..d865f4e91ba --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/aliased-static-field/test/plugins/AliasedStaticField.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.plugins; + +import java.util.Map; +import java.util.HashMap; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin; + +/** + * Samples data about its initialization environment for later analysis + * Samples are shared between instances of the same class in a static variable + */ +public class AliasedStaticField extends SamplingTestPlugin implements Converter { + + private static final Map SAMPLES; + private static final ClassLoader STATIC_CLASS_LOADER; + private final ClassLoader classloader; + + static { + SAMPLES = new HashMap<>(); + STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader(); + } + + { + classloader = Thread.currentThread().getContextClassLoader(); + } + + @Override + public void configure(final Map configs, final boolean isKey) { + + } + + @Override + public byte[] fromConnectData(final String topic, final Schema schema, final Object value) { + return new byte[0]; + } + + @Override + public SchemaAndValue toConnectData(final String topic, final byte[] value) { + return null; + } + + @Override + public ClassLoader staticClassloader() { + return STATIC_CLASS_LOADER; + } + + @Override + public ClassLoader classloader() { + return classloader; + } + + @Override + public Map otherSamples() { + return SAMPLES; + } +} diff --git a/connect/runtime/src/test/resources/test-plugins/always-throw-exception/test/plugins/AlwaysThrowException.java b/connect/runtime/src/test/resources/test-plugins/always-throw-exception/test/plugins/AlwaysThrowException.java new file mode 100644 index 00000000000..858f3ed5eac --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/always-throw-exception/test/plugins/AlwaysThrowException.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.plugins; + +import java.util.Map; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin; +import org.apache.kafka.connect.storage.Converter; + +/** + * Unconditionally throw an exception during static initialization. + */ +public class AlwaysThrowException implements Converter { + + static { + setup(); + } + + public static void setup() { + throw new RuntimeException("I always throw an exception"); + } + + @Override + public void configure(final Map configs, final boolean isKey) { + + } + + @Override + public byte[] fromConnectData(final String topic, final Schema schema, final Object value) { + return new byte[0]; + } + + @Override + public SchemaAndValue toConnectData(final String topic, final byte[] value) { + return null; + } +} diff --git a/connect/runtime/src/test/resources/test-plugins/sampling-converter/test/plugins/SamplingConverter.java b/connect/runtime/src/test/resources/test-plugins/sampling-converter/test/plugins/SamplingConverter.java new file mode 100644 index 00000000000..39109a1d4e5 --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/sampling-converter/test/plugins/SamplingConverter.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.plugins; + +import java.util.Map; +import java.util.HashMap; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin; + +/** + * Samples data about its initialization environment for later analysis + */ +public class SamplingConverter extends SamplingTestPlugin implements Converter { + + private static final ClassLoader STATIC_CLASS_LOADER; + private final ClassLoader classloader; + private Map samples; + + static { + STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader(); + } + + { + samples = new HashMap<>(); + classloader = Thread.currentThread().getContextClassLoader(); + } + + @Override + public void configure(final Map configs, final boolean isKey) { + logMethodCall(samples); + } + + @Override + public byte[] fromConnectData(final String topic, final Schema schema, final Object value) { + logMethodCall(samples); + return new byte[0]; + } + + @Override + public SchemaAndValue toConnectData(final String topic, final byte[] value) { + logMethodCall(samples); + return null; + } + + @Override + public ClassLoader staticClassloader() { + return STATIC_CLASS_LOADER; + } + + @Override + public ClassLoader classloader() { + return classloader; + } + + @Override + public Map otherSamples() { + return samples; + } +} diff --git a/connect/runtime/src/test/resources/test-plugins/service-loader/META-INF/services/test.plugins.ServiceLoadedClass b/connect/runtime/src/test/resources/test-plugins/service-loader/META-INF/services/test.plugins.ServiceLoadedClass new file mode 100644 index 00000000000..b8db8656487 --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/service-loader/META-INF/services/test.plugins.ServiceLoadedClass @@ -0,0 +1,16 @@ + # Licensed to the Apache Software Foundation (ASF) under one or more + # contributor license agreements. See the NOTICE file distributed with + # this work for additional information regarding copyright ownership. + # The ASF licenses this file to You under the Apache License, Version 2.0 + # (the "License"); you may not use this file except in compliance with + # the License. You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + +test.plugins.ServiceLoadedSubclass \ No newline at end of file diff --git a/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoadedClass.java b/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoadedClass.java new file mode 100644 index 00000000000..98677ed43d6 --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoadedClass.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.plugins; + +import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin; + +/** + * Superclass for service loaded classes + */ +public class ServiceLoadedClass extends SamplingTestPlugin { + + private static final ClassLoader STATIC_CLASS_LOADER; + private final ClassLoader classloader; + + static { + STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader(); + } + + { + classloader = Thread.currentThread().getContextClassLoader(); + } + + @Override + public ClassLoader staticClassloader() { + return STATIC_CLASS_LOADER; + } + + @Override + public ClassLoader classloader() { + return classloader; + } + +} diff --git a/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoadedSubclass.java b/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoadedSubclass.java new file mode 100644 index 00000000000..cfc6b6f9cfc --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoadedSubclass.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.plugins; + +/** + * Instance of a service loaded class + */ +public class ServiceLoadedSubclass extends ServiceLoadedClass { + + private static final ClassLoader STATIC_CLASS_LOADER; + private final ClassLoader classloader; + + static { + STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader(); + } + + { + classloader = Thread.currentThread().getContextClassLoader(); + } + + @Override + public ClassLoader staticClassloader() { + return STATIC_CLASS_LOADER; + } + + @Override + public ClassLoader classloader() { + return classloader; + } + +} diff --git a/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoaderPlugin.java b/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoaderPlugin.java new file mode 100644 index 00000000000..e6371baf56a --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoaderPlugin.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.plugins; + +import java.util.Map; +import java.util.HashMap; +import java.util.ServiceLoader; +import java.util.Iterator; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin; + +/** + * Samples data about its initialization environment for later analysis + */ +public class ServiceLoaderPlugin extends SamplingTestPlugin implements Converter { + + private static final ClassLoader STATIC_CLASS_LOADER; + private static final Map SAMPLES; + private final ClassLoader classloader; + + static { + STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader(); + SAMPLES = new HashMap<>(); + Iterator it = ServiceLoader.load(ServiceLoadedClass.class).iterator(); + while (it.hasNext()) { + ServiceLoadedClass loaded = it.next(); + SAMPLES.put(loaded.getClass().getSimpleName() + ".static", loaded); + } + } + + { + classloader = Thread.currentThread().getContextClassLoader(); + Iterator it = ServiceLoader.load(ServiceLoadedClass.class).iterator(); + while (it.hasNext()) { + ServiceLoadedClass loaded = it.next(); + SAMPLES.put(loaded.getClass().getSimpleName() + ".dynamic", loaded); + } + } + + @Override + public void configure(final Map configs, final boolean isKey) { + } + + @Override + public byte[] fromConnectData(final String topic, final Schema schema, final Object value) { + return new byte[0]; + } + + @Override + public SchemaAndValue toConnectData(final String topic, final byte[] value) { + return null; + } + + @Override + public ClassLoader staticClassloader() { + return STATIC_CLASS_LOADER; + } + + @Override + public ClassLoader classloader() { + return classloader; + } + + @Override + public Map otherSamples() { + return SAMPLES; + } +}