KAFKA-9057: Backport KAFKA-8819 and KAFKA-8340 to 1.0 (#7580)

Includes fixes from PR-7315 (KAFKA-8819 and KAFKA-8340), but omits ConfigProvider and Configurable test cases and plugins, and replaces Java 8 language features with suitable Java 7 features. Also addressed the following merge conflicts from the `1.1` PR:

* Remove test cases made incompatible by 1.1 Converter refactor
* Adding PluginsTest file which did not exist before
* Removing Converter::configure test case
* Removing HeaderConverter test plugin and test cases
* Removing Converter Types test cases
* Removing classloader control test cases

Signed-off-by: Greg Harris <gregh@confluent.io>
Author: Greg Harris <gregh@confluent.io>
Reviewer: Randall Hauch <rhauch@gmail.com>
This commit is contained in:
Greg Harris 2019-10-22 16:13:20 -07:00 committed by Randall Hauch
parent f40b2597ab
commit a580826ae9
14 changed files with 1046 additions and 17 deletions

View File

@ -303,6 +303,7 @@
<subpackage name="isolation">
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.maven.artifact.versioning" />
<allow pkg="javax.tools" />
</subpackage>
</subpackage>

View File

@ -366,10 +366,10 @@ public class Worker {
final WorkerTask workerTask;
ClassLoader savedLoader = plugins.currentThreadLoader();
try {
final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps);
String connType = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
String connType = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
ClassLoader connectorLoader = plugins.delegatingLoader().connectorLoader(connType);
savedLoader = Plugins.compareAndSwapLoaders(connectorLoader);
final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps);
final TaskConfig taskConfig = new TaskConfig(taskProps);
final Class<? extends Task> taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
final Task task = plugins.newTask(taskClass);

View File

@ -97,13 +97,32 @@ public class DelegatingClassLoader extends URLClassLoader {
return connectorLoader(connector.getClass().getName());
}
/**
* Retrieve the PluginClassLoader associated with a plugin class
* @param name The fully qualified class name of the plugin
* @return the PluginClassLoader that should be used to load this, or null if the plugin is not isolated.
*/
public PluginClassLoader pluginClassLoader(String name) {
if (!PluginUtils.shouldLoadInIsolation(name)) {
return null;
}
SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name);
if (inner == null) {
return null;
}
ClassLoader pluginLoader = inner.get(inner.lastKey());
return pluginLoader instanceof PluginClassLoader
? (PluginClassLoader) pluginLoader
: null;
}
public ClassLoader connectorLoader(String connectorClassOrAlias) {
log.debug("Getting plugin class loader for connector: '{}'", connectorClassOrAlias);
String fullName = aliases.containsKey(connectorClassOrAlias)
? aliases.get(connectorClassOrAlias)
: connectorClassOrAlias;
SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(fullName);
if (inner == null) {
PluginClassLoader classLoader = pluginClassLoader(fullName);
if (classLoader == null) {
log.error(
"Plugin class loader for connector: '{}' was not found. Returning: {}",
connectorClassOrAlias,
@ -111,7 +130,7 @@ public class DelegatingClassLoader extends URLClassLoader {
);
return this;
}
return inner.get(inner.lastKey());
return classLoader;
}
private static PluginClassLoader newPluginClassLoader(
@ -302,19 +321,11 @@ public class DelegatingClassLoader extends URLClassLoader {
@Override
protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
if (!PluginUtils.shouldLoadInIsolation(name)) {
// There are no paths in this classloader, will attempt to load with the parent.
return super.loadClass(name, resolve);
}
String fullName = aliases.containsKey(name) ? aliases.get(name) : name;
SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(fullName);
if (inner != null) {
ClassLoader pluginLoader = inner.get(inner.lastKey());
PluginClassLoader pluginLoader = pluginClassLoader(fullName);
if (pluginLoader != null) {
log.trace("Retrieving loaded class '{}' from '{}'", fullName, pluginLoader);
return pluginLoader instanceof PluginClassLoader
? ((PluginClassLoader) pluginLoader).loadClass(fullName, resolve)
: super.loadClass(fullName, resolve);
return pluginLoader.loadClass(fullName, resolve);
}
return super.loadClass(fullName, resolve);

View File

@ -63,13 +63,37 @@ public class Plugins {
}
protected static <T> T newPlugin(Class<T> klass) {
// KAFKA-8340: The thread classloader is used during static initialization and must be
// set to the plugin's classloader during instantiation
ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader());
try {
return Utils.newInstance(klass);
} catch (Throwable t) {
throw new ConnectException("Instantiation error", t);
} finally {
compareAndSwapLoaders(savedLoader);
}
}
@SuppressWarnings("unchecked")
protected <U> Class<? extends U> pluginClassFromConfig(
AbstractConfig config,
String propertyName,
Class<U> pluginClass,
Collection<PluginDesc<U>> plugins
) {
Class<?> klass = config.getClass(propertyName);
if (pluginClass.isAssignableFrom(klass)) {
return (Class<? extends U>) klass;
}
throw new ConnectException(
"Failed to find any class that implements " + pluginClass.getSimpleName()
+ " for the config "
+ propertyName + ", available classes are: "
+ pluginNames(plugins)
);
}
protected static <T> T newConfiguredPlugin(AbstractConfig config, Class<T> klass) {
T plugin = Utils.newInstance(klass);
if (plugin instanceof Configurable) {
@ -204,7 +228,12 @@ public class Plugins {
+ pluginNames(delegatingLoader.converters())
);
}
return config != null ? newConfiguredPlugin(config, klass) : newPlugin(klass);
ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader());
try {
return config != null ? newConfiguredPlugin(config, klass) : newPlugin(klass);
} finally {
compareAndSwapLoaders(savedLoader);
}
}
public <R extends ConnectRecord<R>> Transformation<R> newTranformations(

View File

@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.isolation;
import java.util.Map.Entry;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.Converter;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
public class PluginsTest {
private Plugins plugins;
private Map<String, String> props;
private AbstractConfig config;
@Before
public void setup() {
Map<String, String> pluginProps = new HashMap<>();
// Set up the plugins with some test plugins to test isolation
pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, Utils.join(TestPlugins.pluginPath(), ","));
plugins = new Plugins(pluginProps);
props = new HashMap<>(pluginProps);
props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
createConfig();
}
protected void createConfig() {
this.config = new TestableWorkerConfig(props);
}
@Test(expected = ExceptionInInitializerError.class)
public void shouldThrowIfPluginThrows() {
TestPlugins.assertAvailable();
props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, TestPlugins.ALWAYS_THROW_EXCEPTION);
ClassLoader classLoader = plugins.delegatingLoader().pluginClassLoader(TestPlugins.ALWAYS_THROW_EXCEPTION);
ClassLoader savedLoader = Plugins.compareAndSwapLoaders(classLoader);
try {
createConfig();
} finally {
Plugins.compareAndSwapLoaders(savedLoader);
}
}
@Test
public void shouldShareStaticValuesBetweenSamePlugin() {
// Plugins are not isolated from other instances of their own class.
TestPlugins.assertAvailable();
Converter firstPlugin = plugins.newConverter(
TestPlugins.ALIASED_STATIC_FIELD,
config
);
assertInstanceOf(SamplingTestPlugin.class, firstPlugin, "Cannot collect samples");
Converter secondPlugin = plugins.newConverter(
TestPlugins.ALIASED_STATIC_FIELD,
config
);
assertInstanceOf(SamplingTestPlugin.class, secondPlugin, "Cannot collect samples");
assertSame(
((SamplingTestPlugin) firstPlugin).otherSamples(),
((SamplingTestPlugin) secondPlugin).otherSamples()
);
}
@Test
public void newPluginShouldServiceLoadWithPluginClassLoader() {
TestPlugins.assertAvailable();
Converter plugin = plugins.newConverter(
TestPlugins.SERVICE_LOADER,
config
);
assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples");
Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten();
// Assert that the service loaded subclass is found in both environments
assertTrue(samples.containsKey("ServiceLoadedSubclass.static"));
assertTrue(samples.containsKey("ServiceLoadedSubclass.dynamic"));
assertPluginClassLoaderAlwaysActive(samples);
}
@Test
public void newPluginShouldInstantiateWithPluginClassLoader() {
TestPlugins.assertAvailable();
Converter plugin = plugins.newConverter(
TestPlugins.SERVICE_LOADER,
config
);
assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples");
Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten();
assertPluginClassLoaderAlwaysActive(samples);
}
@Test(expected = ConfigException.class)
public void shouldFailToFindConverterInCurrentClassloader() {
TestPlugins.assertAvailable();
props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, TestPlugins.SAMPLING_CONVERTER);
createConfig();
}
public static void assertPluginClassLoaderAlwaysActive(Map<String, SamplingTestPlugin> samples) {
for (Entry<String, SamplingTestPlugin> e : samples.entrySet()) {
String sampleName = "\"" + e.getKey() + "\" (" + e.getValue() + ")";
assertInstanceOf(
PluginClassLoader.class,
e.getValue().staticClassloader(),
sampleName + " has incorrect static classloader"
);
assertInstanceOf(
PluginClassLoader.class,
e.getValue().classloader(),
sampleName + " has incorrect dynamic classloader"
);
}
}
public static void assertInstanceOf(Class<?> expected, Object actual, String message) {
assertTrue(
"Expected an instance of " + expected.getSimpleName() + ", found " + actual + " instead: " + message,
expected.isInstance(actual)
);
}
public static class TestableWorkerConfig extends WorkerConfig {
public TestableWorkerConfig(Map<String, String> props) {
super(WorkerConfig.baseConfigDef(), props);
}
}
public static class TestConverter implements Converter, Configurable {
public Map<String, ?> configs;
public ConfigDef config() {
return null;
}
@Override
public void configure(Map<String, ?> configs) {
this.configs = configs;
new JsonConverter().configure(configs, true); // requires the `converter.type` config be set
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
this.configs = configs;
}
@Override
public byte[] fromConnectData(String topic, Schema schema, Object value) {
return new byte[0];
}
@Override
public SchemaAndValue toConnectData(String topic, byte[] value) {
return null;
}
}
}

View File

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.isolation;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
/**
* Base class for plugins so we can sample information about their initialization
*/
public abstract class SamplingTestPlugin {
/**
* @return the ClassLoader used to statically initialize this plugin class
*/
public abstract ClassLoader staticClassloader();
/**
* @return the ClassLoader used to initialize this plugin instance
*/
public abstract ClassLoader classloader();
/**
* @return a group of other SamplingTestPlugin instances known by this plugin
* This should only return direct children, and not reference this instance directly
*/
public Map<String, SamplingTestPlugin> otherSamples() {
return Collections.emptyMap();
}
/**
* @return a flattened list of child samples including this entry keyed as "this"
*/
public Map<String, SamplingTestPlugin> flatten() {
Map<String, SamplingTestPlugin> out = new HashMap<>();
Map<String, SamplingTestPlugin> otherSamples = otherSamples();
if (otherSamples != null) {
for (Entry<String, SamplingTestPlugin> child : otherSamples.entrySet()) {
for (Entry<String, SamplingTestPlugin> flattened : child.getValue().flatten().entrySet()) {
String key = child.getKey();
if (flattened.getKey().length() > 0) {
key += "." + flattened.getKey();
}
out.put(key, flattened.getValue());
}
}
}
out.put("", this);
return out;
}
/**
* Log the parent method call as a child sample.
* Stores only the last invocation of each method if there are multiple invocations.
* @param samples The collection of samples to which this method call should be added
*/
public void logMethodCall(Map<String, SamplingTestPlugin> samples) {
StackTraceElement[] stackTraces = Thread.currentThread().getStackTrace();
if (stackTraces.length < 2) {
return;
}
// 0 is inside getStackTrace
// 1 is this method
// 2 is our caller method
StackTraceElement caller = stackTraces[2];
samples.put(caller.getMethodName(), new MethodCallSample(
caller,
Thread.currentThread().getContextClassLoader(),
getClass().getClassLoader()
));
}
public static class MethodCallSample extends SamplingTestPlugin {
private final StackTraceElement caller;
private final ClassLoader staticClassLoader;
private final ClassLoader dynamicClassLoader;
public MethodCallSample(
StackTraceElement caller,
ClassLoader staticClassLoader,
ClassLoader dynamicClassLoader
) {
this.caller = caller;
this.staticClassLoader = staticClassLoader;
this.dynamicClassLoader = dynamicClassLoader;
}
@Override
public ClassLoader staticClassloader() {
return staticClassLoader;
}
@Override
public ClassLoader classloader() {
return dynamicClassLoader;
}
@Override
public String toString() {
return caller.toString();
}
}
}

View File

@ -0,0 +1,271 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.isolation;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.net.URL;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.jar.Attributes;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
import javax.tools.JavaCompiler;
import javax.tools.StandardJavaFileManager;
import javax.tools.ToolProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Utility class for constructing test plugins for Connect.
*
* <p>Plugins are built from their source under resources/test-plugins/ and placed into temporary
* jar files that are deleted when the process exits.
*
* <p>To add a plugin, create the source files in the resource tree, and edit this class to build
* that plugin during initialization. For example, the plugin class {@literal package.Class} should
* be placed in {@literal resources/test-plugins/something/package/Class.java} and loaded using
* {@code createPluginJar("something")}. The class name, contents, and plugin directory can take
* any value you need for testing.
*
* <p>To use this class in your tests, make sure to first call
* {@link TestPlugins#assertAvailable()} to verify that the plugins initialized correctly.
* Otherwise, exceptions during the plugin build are not propagated, and may invalidate your test.
* You can access the list of plugin jars for assembling a {@literal plugin.path}, and reference
* the names of the different plugins directly via the exposed constants.
*/
public class TestPlugins {
/**
* Class name of a plugin which will always throw an exception during loading
*/
public static final String ALWAYS_THROW_EXCEPTION = "test.plugins.AlwaysThrowException";
/**
* Class name of a plugin which samples information about its initialization.
*/
public static final String ALIASED_STATIC_FIELD = "test.plugins.AliasedStaticField";
/**
* Class name of a {@link org.apache.kafka.connect.storage.Converter}
* which samples information about its method calls.
*/
public static final String SAMPLING_CONVERTER = "test.plugins.SamplingConverter";
/**
* Class name of a plugin which uses a {@link java.util.ServiceLoader}
* to load internal classes, and samples information about their initialization.
*/
public static final String SERVICE_LOADER = "test.plugins.ServiceLoaderPlugin";
private static final Logger log = LoggerFactory.getLogger(TestPlugins.class);
private static final Map<String, File> PLUGIN_JARS;
private static final Throwable INITIALIZATION_EXCEPTION;
static {
Throwable err = null;
HashMap<String, File> pluginJars = new HashMap<>();
try {
pluginJars.put(ALWAYS_THROW_EXCEPTION, createPluginJar("always-throw-exception"));
pluginJars.put(ALIASED_STATIC_FIELD, createPluginJar("aliased-static-field"));
pluginJars.put(SAMPLING_CONVERTER, createPluginJar("sampling-converter"));
pluginJars.put(SERVICE_LOADER, createPluginJar("service-loader"));
} catch (Throwable e) {
log.error("Could not set up plugin test jars", e);
err = e;
}
PLUGIN_JARS = Collections.unmodifiableMap(pluginJars);
INITIALIZATION_EXCEPTION = err;
}
/**
* Ensure that the test plugin JARs were assembled without error before continuing.
* @throws AssertionError if any plugin failed to load, or no plugins were loaded.
*/
public static void assertAvailable() throws AssertionError {
if (INITIALIZATION_EXCEPTION != null) {
throw new AssertionError("TestPlugins did not initialize completely",
INITIALIZATION_EXCEPTION);
}
if (PLUGIN_JARS.isEmpty()) {
throw new AssertionError("No test plugins loaded");
}
}
/**
* A list of jar files containing test plugins
* @return A list of plugin jar filenames
*/
public static List<String> pluginPath() {
List<String> out = new ArrayList<>();
for (File f : PLUGIN_JARS.values()) {
out.add(f.getPath());
}
return out;
}
/**
* Get all of the classes that were successfully built by this class
* @return A list of plugin class names
*/
public static List<String> pluginClasses() {
return new ArrayList<>(PLUGIN_JARS.keySet());
}
private static File createPluginJar(String resourceDir) throws IOException {
Path inputDir = resourceDirectoryPath("test-plugins/" + resourceDir);
Path binDir = Files.createTempDirectory(resourceDir + ".bin.");
compileJavaSources(inputDir, binDir);
File jarFile = Files.createTempFile(resourceDir + ".", ".jar").toFile();
try (JarOutputStream jar = openJarFile(jarFile)) {
writeJar(jar, inputDir);
writeJar(jar, binDir);
}
removeDirectory(binDir);
jarFile.deleteOnExit();
return jarFile;
}
private static Path resourceDirectoryPath(String resourceDir) throws IOException {
URL resource = Thread.currentThread()
.getContextClassLoader()
.getResource(resourceDir);
if (resource == null) {
throw new IOException("Could not find test plugin resource: " + resourceDir);
}
File file = new File(resource.getFile());
if (!file.isDirectory()) {
throw new IOException("Resource is not a directory: " + resourceDir);
}
if (!file.canRead()) {
throw new IOException("Resource directory is not readable: " + resourceDir);
}
return file.toPath();
}
private static JarOutputStream openJarFile(File jarFile) throws IOException {
Manifest manifest = new Manifest();
manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0");
return new JarOutputStream(new FileOutputStream(jarFile), manifest);
}
private static void removeDirectory(Path binDir) throws IOException {
Files.walkFileTree(binDir, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
if (!file.toFile().delete()) {
log.info("Could not delete " + file);
}
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) {
if (!dir.toFile().delete()) {
log.info("Could not delete " + dir);
}
return FileVisitResult.CONTINUE;
}
});
}
/**
* Compile a directory of .java source files into .class files
* .class files are placed into the same directory as their sources.
*
* <p>Dependencies between source files in this directory are resolved against one another
* and the classes present in the test environment.
* See https://stackoverflow.com/questions/1563909/ for more information.
* Additional dependencies in your plugins should be added as test scope to :connect:runtime.
* @param sourceDir Directory containing java source files
* @throws IOException if the files cannot be compiled
*/
private static void compileJavaSources(Path sourceDir, Path binDir) throws IOException {
JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
final List<File> sourceFiles = new ArrayList<>();
Files.walkFileTree(sourceDir, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
if (file.toFile().getName().endsWith(".java")) {
sourceFiles.add(file.toFile());
}
return FileVisitResult.CONTINUE;
}
});
StringWriter writer = new StringWriter();
List<String> options = Arrays.asList(
"-d", binDir.toString() // Write class output to a different directory.
);
try (StandardJavaFileManager fileManager = compiler.getStandardFileManager(null, null, null)) {
boolean success = compiler.getTask(
writer,
fileManager,
null,
options,
null,
fileManager.getJavaFileObjectsFromFiles(sourceFiles)
).call();
if (!success) {
throw new RuntimeException("Failed to compile test plugin:\n" + writer);
}
}
}
private static void writeJar(JarOutputStream jar, Path inputDir) throws IOException {
final List<Path> paths = new ArrayList<>();
Files.walkFileTree(inputDir, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
if (!file.toFile().getName().endsWith(".java")) {
paths.add(file);
}
return FileVisitResult.CONTINUE;
}
});
for (Path path : paths) {
try (InputStream in = new BufferedInputStream(new FileInputStream(path.toFile()))) {
jar.putNextEntry(new JarEntry(
inputDir.relativize(path)
.toFile()
.getPath()
.replace(File.separator, "/")
));
byte[] buffer = new byte[1024];
for (int count; (count = in.read(buffer)) != -1; ) {
jar.write(buffer, 0, count);
}
jar.closeEntry();
}
}
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package test.plugins;
import java.util.Map;
import java.util.HashMap;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
/**
* Samples data about its initialization environment for later analysis
* Samples are shared between instances of the same class in a static variable
*/
public class AliasedStaticField extends SamplingTestPlugin implements Converter {
private static final Map<String, SamplingTestPlugin> SAMPLES;
private static final ClassLoader STATIC_CLASS_LOADER;
private final ClassLoader classloader;
static {
SAMPLES = new HashMap<>();
STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader();
}
{
classloader = Thread.currentThread().getContextClassLoader();
}
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
}
@Override
public byte[] fromConnectData(final String topic, final Schema schema, final Object value) {
return new byte[0];
}
@Override
public SchemaAndValue toConnectData(final String topic, final byte[] value) {
return null;
}
@Override
public ClassLoader staticClassloader() {
return STATIC_CLASS_LOADER;
}
@Override
public ClassLoader classloader() {
return classloader;
}
@Override
public Map<String, SamplingTestPlugin> otherSamples() {
return SAMPLES;
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package test.plugins;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
import org.apache.kafka.connect.storage.Converter;
/**
* Unconditionally throw an exception during static initialization.
*/
public class AlwaysThrowException implements Converter {
static {
setup();
}
public static void setup() {
throw new RuntimeException("I always throw an exception");
}
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
}
@Override
public byte[] fromConnectData(final String topic, final Schema schema, final Object value) {
return new byte[0];
}
@Override
public SchemaAndValue toConnectData(final String topic, final byte[] value) {
return null;
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package test.plugins;
import java.util.Map;
import java.util.HashMap;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
/**
* Samples data about its initialization environment for later analysis
*/
public class SamplingConverter extends SamplingTestPlugin implements Converter {
private static final ClassLoader STATIC_CLASS_LOADER;
private final ClassLoader classloader;
private Map<String, SamplingTestPlugin> samples;
static {
STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader();
}
{
samples = new HashMap<>();
classloader = Thread.currentThread().getContextClassLoader();
}
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
logMethodCall(samples);
}
@Override
public byte[] fromConnectData(final String topic, final Schema schema, final Object value) {
logMethodCall(samples);
return new byte[0];
}
@Override
public SchemaAndValue toConnectData(final String topic, final byte[] value) {
logMethodCall(samples);
return null;
}
@Override
public ClassLoader staticClassloader() {
return STATIC_CLASS_LOADER;
}
@Override
public ClassLoader classloader() {
return classloader;
}
@Override
public Map<String, SamplingTestPlugin> otherSamples() {
return samples;
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
test.plugins.ServiceLoadedSubclass

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package test.plugins;
import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
/**
* Superclass for service loaded classes
*/
public class ServiceLoadedClass extends SamplingTestPlugin {
private static final ClassLoader STATIC_CLASS_LOADER;
private final ClassLoader classloader;
static {
STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader();
}
{
classloader = Thread.currentThread().getContextClassLoader();
}
@Override
public ClassLoader staticClassloader() {
return STATIC_CLASS_LOADER;
}
@Override
public ClassLoader classloader() {
return classloader;
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package test.plugins;
/**
* Instance of a service loaded class
*/
public class ServiceLoadedSubclass extends ServiceLoadedClass {
private static final ClassLoader STATIC_CLASS_LOADER;
private final ClassLoader classloader;
static {
STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader();
}
{
classloader = Thread.currentThread().getContextClassLoader();
}
@Override
public ClassLoader staticClassloader() {
return STATIC_CLASS_LOADER;
}
@Override
public ClassLoader classloader() {
return classloader;
}
}

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package test.plugins;
import java.util.Map;
import java.util.HashMap;
import java.util.ServiceLoader;
import java.util.Iterator;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
/**
* Samples data about its initialization environment for later analysis
*/
public class ServiceLoaderPlugin extends SamplingTestPlugin implements Converter {
private static final ClassLoader STATIC_CLASS_LOADER;
private static final Map<String, SamplingTestPlugin> SAMPLES;
private final ClassLoader classloader;
static {
STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader();
SAMPLES = new HashMap<>();
Iterator<ServiceLoadedClass> it = ServiceLoader.load(ServiceLoadedClass.class).iterator();
while (it.hasNext()) {
ServiceLoadedClass loaded = it.next();
SAMPLES.put(loaded.getClass().getSimpleName() + ".static", loaded);
}
}
{
classloader = Thread.currentThread().getContextClassLoader();
Iterator<ServiceLoadedClass> it = ServiceLoader.load(ServiceLoadedClass.class).iterator();
while (it.hasNext()) {
ServiceLoadedClass loaded = it.next();
SAMPLES.put(loaded.getClass().getSimpleName() + ".dynamic", loaded);
}
}
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
}
@Override
public byte[] fromConnectData(final String topic, final Schema schema, final Object value) {
return new byte[0];
}
@Override
public SchemaAndValue toConnectData(final String topic, final byte[] value) {
return null;
}
@Override
public ClassLoader staticClassloader() {
return STATIC_CLASS_LOADER;
}
@Override
public ClassLoader classloader() {
return classloader;
}
@Override
public Map<String, SamplingTestPlugin> otherSamples() {
return SAMPLES;
}
}