mirror of https://github.com/apache/kafka.git
KAFKA-8340, KAFKA-8819: Use PluginClassLoader while statically initializing plugins (#7315)
Added plugin isolation unit tests for various scenarios, with a `TestPlugins` class that compiles and builds multiple test plugins without them being on the classpath and verifies that the Plugins and DelegatingClassLoader behave properly. These initially failed for several cases, but now pass since the issues have been fixed. KAFKA-8340 and KAFKA-8819 are closely related, and this fix corrects the problems reported in both issues. Author: Greg Harris <gregh@confluent.io> Reviewers: Chris Egerton <chrise@confluent.io>, Magesh Nandakumar <mageshn@confluent.io>, Konstantine Karantasis <konstantine@confluent.io>, Randall Hauch <rhauch@gmail.com>
This commit is contained in:
parent
3e30bf5439
commit
ff68b60429
|
@ -378,6 +378,7 @@
|
|||
<subpackage name="isolation">
|
||||
<allow pkg="com.fasterxml.jackson" />
|
||||
<allow pkg="org.apache.maven.artifact.versioning" />
|
||||
<allow pkg="javax.tools" />
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="distributed">
|
||||
|
|
|
@ -423,10 +423,10 @@ public class Worker {
|
|||
connectorStatusMetricsGroup.recordTaskAdded(id);
|
||||
ClassLoader savedLoader = plugins.currentThreadLoader();
|
||||
try {
|
||||
final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps);
|
||||
String connType = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
|
||||
String connType = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
|
||||
ClassLoader connectorLoader = plugins.delegatingLoader().connectorLoader(connType);
|
||||
savedLoader = Plugins.compareAndSwapLoaders(connectorLoader);
|
||||
final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps);
|
||||
final TaskConfig taskConfig = new TaskConfig(taskProps);
|
||||
final Class<? extends Task> taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
|
||||
final Task task = plugins.newTask(taskClass);
|
||||
|
|
|
@ -132,6 +132,25 @@ public class DelegatingClassLoader extends URLClassLoader {
|
|||
return connectorClientConfigPolicies;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve the PluginClassLoader associated with a plugin class
|
||||
* @param name The fully qualified class name of the plugin
|
||||
* @return the PluginClassLoader that should be used to load this, or null if the plugin is not isolated.
|
||||
*/
|
||||
public PluginClassLoader pluginClassLoader(String name) {
|
||||
if (!PluginUtils.shouldLoadInIsolation(name)) {
|
||||
return null;
|
||||
}
|
||||
SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name);
|
||||
if (inner == null) {
|
||||
return null;
|
||||
}
|
||||
ClassLoader pluginLoader = inner.get(inner.lastKey());
|
||||
return pluginLoader instanceof PluginClassLoader
|
||||
? (PluginClassLoader) pluginLoader
|
||||
: null;
|
||||
}
|
||||
|
||||
public ClassLoader connectorLoader(Connector connector) {
|
||||
return connectorLoader(connector.getClass().getName());
|
||||
}
|
||||
|
@ -141,8 +160,8 @@ public class DelegatingClassLoader extends URLClassLoader {
|
|||
String fullName = aliases.containsKey(connectorClassOrAlias)
|
||||
? aliases.get(connectorClassOrAlias)
|
||||
: connectorClassOrAlias;
|
||||
SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(fullName);
|
||||
if (inner == null) {
|
||||
PluginClassLoader classLoader = pluginClassLoader(fullName);
|
||||
if (classLoader == null) {
|
||||
log.error(
|
||||
"Plugin class loader for connector: '{}' was not found. Returning: {}",
|
||||
connectorClassOrAlias,
|
||||
|
@ -150,7 +169,7 @@ public class DelegatingClassLoader extends URLClassLoader {
|
|||
);
|
||||
return this;
|
||||
}
|
||||
return inner.get(inner.lastKey());
|
||||
return classLoader;
|
||||
}
|
||||
|
||||
private static PluginClassLoader newPluginClassLoader(
|
||||
|
@ -338,10 +357,16 @@ public class DelegatingClassLoader extends URLClassLoader {
|
|||
|
||||
@SuppressWarnings("unchecked")
|
||||
private <T> Collection<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, ClassLoader loader) {
|
||||
ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
|
||||
ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);
|
||||
Collection<PluginDesc<T>> result = new ArrayList<>();
|
||||
for (T pluginImpl : serviceLoader) {
|
||||
result.add(new PluginDesc<>((Class<? extends T>) pluginImpl.getClass(), versionFor(pluginImpl), loader));
|
||||
try {
|
||||
ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
|
||||
for (T pluginImpl : serviceLoader) {
|
||||
result.add(new PluginDesc<>((Class<? extends T>) pluginImpl.getClass(),
|
||||
versionFor(pluginImpl), loader));
|
||||
}
|
||||
} finally {
|
||||
Plugins.compareAndSwapLoaders(savedLoader);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
@ -357,19 +382,11 @@ public class DelegatingClassLoader extends URLClassLoader {
|
|||
|
||||
@Override
|
||||
protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
|
||||
if (!PluginUtils.shouldLoadInIsolation(name)) {
|
||||
// There are no paths in this classloader, will attempt to load with the parent.
|
||||
return super.loadClass(name, resolve);
|
||||
}
|
||||
|
||||
String fullName = aliases.containsKey(name) ? aliases.get(name) : name;
|
||||
SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(fullName);
|
||||
if (inner != null) {
|
||||
ClassLoader pluginLoader = inner.get(inner.lastKey());
|
||||
PluginClassLoader pluginLoader = pluginClassLoader(fullName);
|
||||
if (pluginLoader != null) {
|
||||
log.trace("Retrieving loaded class '{}' from '{}'", fullName, pluginLoader);
|
||||
return pluginLoader instanceof PluginClassLoader
|
||||
? ((PluginClassLoader) pluginLoader).loadClass(fullName, resolve)
|
||||
: super.loadClass(fullName, resolve);
|
||||
return pluginLoader.loadClass(fullName, resolve);
|
||||
}
|
||||
|
||||
return super.loadClass(fullName, resolve);
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package org.apache.kafka.connect.runtime.isolation;
|
||||
|
||||
import org.apache.kafka.common.Configurable;
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
import org.apache.kafka.common.config.AbstractConfig;
|
||||
import org.apache.kafka.common.config.provider.ConfigProvider;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
|
@ -72,13 +71,37 @@ public class Plugins {
|
|||
}
|
||||
|
||||
protected static <T> T newPlugin(Class<T> klass) {
|
||||
// KAFKA-8340: The thread classloader is used during static initialization and must be
|
||||
// set to the plugin's classloader during instantiation
|
||||
ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader());
|
||||
try {
|
||||
return Utils.newInstance(klass);
|
||||
} catch (Throwable t) {
|
||||
throw new ConnectException("Instantiation error", t);
|
||||
} finally {
|
||||
compareAndSwapLoaders(savedLoader);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected <U> Class<? extends U> pluginClassFromConfig(
|
||||
AbstractConfig config,
|
||||
String propertyName,
|
||||
Class<U> pluginClass,
|
||||
Collection<PluginDesc<U>> plugins
|
||||
) {
|
||||
Class<?> klass = config.getClass(propertyName);
|
||||
if (pluginClass.isAssignableFrom(klass)) {
|
||||
return (Class<? extends U>) klass;
|
||||
}
|
||||
throw new ConnectException(
|
||||
"Failed to find any class that implements " + pluginClass.getSimpleName()
|
||||
+ " for the config "
|
||||
+ propertyName + ", available classes are: "
|
||||
+ pluginNames(plugins)
|
||||
);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected static <U> Class<? extends U> pluginClass(
|
||||
DelegatingClassLoader loader,
|
||||
|
@ -215,18 +238,17 @@ public class Plugins {
|
|||
// it does not represent an internal converter (which has a default available)
|
||||
return null;
|
||||
}
|
||||
Converter plugin = null;
|
||||
Class<? extends Converter> klass = null;
|
||||
switch (classLoaderUsage) {
|
||||
case CURRENT_CLASSLOADER:
|
||||
// Attempt to load first with the current classloader, and plugins as a fallback.
|
||||
// Note: we can't use config.getConfiguredInstance because Converter doesn't implement Configurable, and even if it did
|
||||
// we have to remove the property prefixes before calling config(...) and we still always want to call Converter.config.
|
||||
plugin = getInstance(config, classPropertyName, Converter.class);
|
||||
klass = pluginClassFromConfig(config, classPropertyName, Converter.class, delegatingLoader.converters());
|
||||
break;
|
||||
case PLUGINS:
|
||||
// Attempt to load with the plugin class loader, which uses the current classloader as a fallback
|
||||
String converterClassOrAlias = config.getClass(classPropertyName).getName();
|
||||
Class<? extends Converter> klass;
|
||||
try {
|
||||
klass = pluginClass(delegatingLoader, converterClassOrAlias, Converter.class);
|
||||
} catch (ClassNotFoundException e) {
|
||||
|
@ -236,11 +258,10 @@ public class Plugins {
|
|||
+ pluginNames(delegatingLoader.converters())
|
||||
);
|
||||
}
|
||||
plugin = newPlugin(klass);
|
||||
break;
|
||||
}
|
||||
if (plugin == null) {
|
||||
throw new ConnectException("Unable to instantiate the Converter specified in '" + classPropertyName + "'");
|
||||
if (klass == null) {
|
||||
throw new ConnectException("Unable to initialize the Converter specified in '" + classPropertyName + "'");
|
||||
}
|
||||
|
||||
// Determine whether this is a key or value converter based upon the supplied property name ...
|
||||
|
@ -257,7 +278,7 @@ public class Plugins {
|
|||
// Have to override schemas.enable from true to false for internal JSON converters
|
||||
// Don't have to warn the user about anything since all deprecation warnings take place in the
|
||||
// WorkerConfig class
|
||||
if (plugin instanceof JsonConverter && isInternalConverter(classPropertyName)) {
|
||||
if (JsonConverter.class.isAssignableFrom(klass) && isInternalConverter(classPropertyName)) {
|
||||
// If they haven't explicitly specified values for internal.key.converter.schemas.enable
|
||||
// or internal.value.converter.schemas.enable, we can safely default them to false
|
||||
if (!converterConfig.containsKey(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG)) {
|
||||
|
@ -265,7 +286,14 @@ public class Plugins {
|
|||
}
|
||||
}
|
||||
|
||||
plugin.configure(converterConfig, isKeyConverter);
|
||||
Converter plugin;
|
||||
ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader());
|
||||
try {
|
||||
plugin = newPlugin(klass);
|
||||
plugin.configure(converterConfig, isKeyConverter);
|
||||
} finally {
|
||||
compareAndSwapLoaders(savedLoader);
|
||||
}
|
||||
return plugin;
|
||||
}
|
||||
|
||||
|
@ -280,7 +308,7 @@ public class Plugins {
|
|||
* @throws ConnectException if the {@link HeaderConverter} implementation class could not be found
|
||||
*/
|
||||
public HeaderConverter newHeaderConverter(AbstractConfig config, String classPropertyName, ClassLoaderUsage classLoaderUsage) {
|
||||
HeaderConverter plugin = null;
|
||||
Class<? extends HeaderConverter> klass = null;
|
||||
switch (classLoaderUsage) {
|
||||
case CURRENT_CLASSLOADER:
|
||||
if (!config.originals().containsKey(classPropertyName)) {
|
||||
|
@ -290,13 +318,12 @@ public class Plugins {
|
|||
// Attempt to load first with the current classloader, and plugins as a fallback.
|
||||
// Note: we can't use config.getConfiguredInstance because we have to remove the property prefixes
|
||||
// before calling config(...)
|
||||
plugin = getInstance(config, classPropertyName, HeaderConverter.class);
|
||||
klass = pluginClassFromConfig(config, classPropertyName, HeaderConverter.class, delegatingLoader.headerConverters());
|
||||
break;
|
||||
case PLUGINS:
|
||||
// Attempt to load with the plugin class loader, which uses the current classloader as a fallback.
|
||||
// Note that there will always be at least a default header converter for the worker
|
||||
String converterClassOrAlias = config.getClass(classPropertyName).getName();
|
||||
Class<? extends HeaderConverter> klass;
|
||||
try {
|
||||
klass = pluginClass(
|
||||
delegatingLoader,
|
||||
|
@ -311,17 +338,24 @@ public class Plugins {
|
|||
+ pluginNames(delegatingLoader.headerConverters())
|
||||
);
|
||||
}
|
||||
plugin = newPlugin(klass);
|
||||
}
|
||||
if (plugin == null) {
|
||||
throw new ConnectException("Unable to instantiate the Converter specified in '" + classPropertyName + "'");
|
||||
if (klass == null) {
|
||||
throw new ConnectException("Unable to initialize the HeaderConverter specified in '" + classPropertyName + "'");
|
||||
}
|
||||
|
||||
String configPrefix = classPropertyName + ".";
|
||||
Map<String, Object> converterConfig = config.originalsWithPrefix(configPrefix);
|
||||
converterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.HEADER.getName());
|
||||
log.debug("Configuring the header converter with configuration keys:{}{}", System.lineSeparator(), converterConfig.keySet());
|
||||
plugin.configure(converterConfig);
|
||||
|
||||
HeaderConverter plugin;
|
||||
ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader());
|
||||
try {
|
||||
plugin = newPlugin(klass);
|
||||
plugin.configure(converterConfig);
|
||||
} finally {
|
||||
compareAndSwapLoaders(savedLoader);
|
||||
}
|
||||
return plugin;
|
||||
}
|
||||
|
||||
|
@ -332,16 +366,15 @@ public class Plugins {
|
|||
// This configuration does not define the config provider via the specified property name
|
||||
return null;
|
||||
}
|
||||
ConfigProvider plugin = null;
|
||||
Class<? extends ConfigProvider> klass = null;
|
||||
switch (classLoaderUsage) {
|
||||
case CURRENT_CLASSLOADER:
|
||||
// Attempt to load first with the current classloader, and plugins as a fallback.
|
||||
plugin = getInstance(config, classPropertyName, ConfigProvider.class);
|
||||
klass = pluginClassFromConfig(config, classPropertyName, ConfigProvider.class, delegatingLoader.configProviders());
|
||||
break;
|
||||
case PLUGINS:
|
||||
// Attempt to load with the plugin class loader, which uses the current classloader as a fallback
|
||||
String configProviderClassOrAlias = originalConfig.get(classPropertyName);
|
||||
Class<? extends ConfigProvider> klass;
|
||||
try {
|
||||
klass = pluginClass(delegatingLoader, configProviderClassOrAlias, ConfigProvider.class);
|
||||
} catch (ClassNotFoundException e) {
|
||||
|
@ -351,17 +384,24 @@ public class Plugins {
|
|||
+ pluginNames(delegatingLoader.configProviders())
|
||||
);
|
||||
}
|
||||
plugin = newPlugin(klass);
|
||||
break;
|
||||
}
|
||||
if (plugin == null) {
|
||||
throw new ConnectException("Unable to instantiate the ConfigProvider specified in '" + classPropertyName + "'");
|
||||
if (klass == null) {
|
||||
throw new ConnectException("Unable to initialize the ConfigProvider specified in '" + classPropertyName + "'");
|
||||
}
|
||||
|
||||
// Configure the ConfigProvider
|
||||
String configPrefix = providerPrefix + ".param.";
|
||||
Map<String, Object> configProviderConfig = config.originalsWithPrefix(configPrefix);
|
||||
plugin.configure(configProviderConfig);
|
||||
|
||||
ConfigProvider plugin;
|
||||
ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader());
|
||||
try {
|
||||
plugin = newPlugin(klass);
|
||||
plugin.configure(configProviderConfig);
|
||||
} finally {
|
||||
compareAndSwapLoaders(savedLoader);
|
||||
}
|
||||
return plugin;
|
||||
}
|
||||
|
||||
|
@ -395,42 +435,25 @@ public class Plugins {
|
|||
+ "name matches %s", pluginKlass, klassName);
|
||||
throw new ConnectException(msg);
|
||||
}
|
||||
plugin = newPlugin(klass);
|
||||
if (plugin == null) {
|
||||
throw new ConnectException("Unable to instantiate '" + klassName + "'");
|
||||
}
|
||||
if (plugin instanceof Versioned) {
|
||||
Versioned versionedPlugin = (Versioned) plugin;
|
||||
if (versionedPlugin.version() == null || versionedPlugin.version().trim().isEmpty()) {
|
||||
throw new ConnectException("Version not defined for '" + klassName + "'");
|
||||
ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader());
|
||||
try {
|
||||
plugin = newPlugin(klass);
|
||||
if (plugin instanceof Versioned) {
|
||||
Versioned versionedPlugin = (Versioned) plugin;
|
||||
if (versionedPlugin.version() == null || versionedPlugin.version().trim()
|
||||
.isEmpty()) {
|
||||
throw new ConnectException("Version not defined for '" + klassName + "'");
|
||||
}
|
||||
}
|
||||
}
|
||||
if (plugin instanceof Configurable) {
|
||||
((Configurable) plugin).configure(config.originals());
|
||||
if (plugin instanceof Configurable) {
|
||||
((Configurable) plugin).configure(config.originals());
|
||||
}
|
||||
} finally {
|
||||
compareAndSwapLoaders(savedLoader);
|
||||
}
|
||||
return plugin;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get an instance of the give class specified by the given configuration key.
|
||||
*
|
||||
* @param key The configuration key for the class
|
||||
* @param t The interface the class should implement
|
||||
* @return A instance of the class
|
||||
*/
|
||||
private <T> T getInstance(AbstractConfig config, String key, Class<T> t) {
|
||||
Class<?> c = config.getClass(key);
|
||||
if (c == null) {
|
||||
return null;
|
||||
}
|
||||
// Instantiate the class, but we don't know if the class extends the supplied type
|
||||
Object o = Utils.newInstance(c);
|
||||
if (!t.isInstance(o)) {
|
||||
throw new KafkaException(c.getName() + " is not an instance of " + t.getName());
|
||||
}
|
||||
return t.cast(o);
|
||||
}
|
||||
|
||||
public <R extends ConnectRecord<R>> Transformation<R> newTranformations(
|
||||
String transformationClassOrAlias
|
||||
) {
|
||||
|
|
|
@ -17,8 +17,10 @@
|
|||
|
||||
package org.apache.kafka.connect.runtime.isolation;
|
||||
|
||||
import java.util.Collections;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
|
@ -38,4 +40,25 @@ public class DelegatingClassLoaderTest {
|
|||
DelegatingClassLoader.serviceLoaderManifestForPlugin("META-INF/services/org.apache.kafka.connect.transforms.Transformation"));
|
||||
assertFalse(DelegatingClassLoader.serviceLoaderManifestForPlugin("resource/version.properties"));
|
||||
}
|
||||
|
||||
@Test(expected = ClassNotFoundException.class)
|
||||
public void testLoadingUnloadedPluginClass() throws ClassNotFoundException {
|
||||
TestPlugins.assertAvailable();
|
||||
DelegatingClassLoader classLoader = new DelegatingClassLoader(Collections.emptyList());
|
||||
classLoader.initLoaders();
|
||||
for (String pluginClassName : TestPlugins.pluginClasses()) {
|
||||
classLoader.loadClass(pluginClassName);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadingPluginClass() throws ClassNotFoundException {
|
||||
TestPlugins.assertAvailable();
|
||||
DelegatingClassLoader classLoader = new DelegatingClassLoader(TestPlugins.pluginPath());
|
||||
classLoader.initLoaders();
|
||||
for (String pluginClassName : TestPlugins.pluginClasses()) {
|
||||
assertNotNull(classLoader.loadClass(pluginClassName));
|
||||
assertNotNull(classLoader.pluginClassLoader(pluginClassName));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,11 +17,16 @@
|
|||
|
||||
package org.apache.kafka.connect.runtime.isolation;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map.Entry;
|
||||
import org.apache.kafka.common.Configurable;
|
||||
import org.apache.kafka.common.config.AbstractConfig;
|
||||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.common.config.ConfigException;
|
||||
import org.apache.kafka.common.config.provider.ConfigProvider;
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.SchemaAndValue;
|
||||
import org.apache.kafka.connect.errors.ConnectException;
|
||||
import org.apache.kafka.connect.json.JsonConverter;
|
||||
import org.apache.kafka.connect.json.JsonConverterConfig;
|
||||
import org.apache.kafka.connect.rest.ConnectRestExtension;
|
||||
|
@ -34,7 +39,6 @@ import org.apache.kafka.connect.storage.ConverterType;
|
|||
import org.apache.kafka.connect.storage.HeaderConverter;
|
||||
import org.apache.kafka.connect.storage.SimpleHeaderConverter;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -45,31 +49,26 @@ import java.util.Map;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class PluginsTest {
|
||||
|
||||
private static Map<String, String> pluginProps;
|
||||
private static Plugins plugins;
|
||||
private Plugins plugins;
|
||||
private Map<String, String> props;
|
||||
private AbstractConfig config;
|
||||
private TestConverter converter;
|
||||
private TestHeaderConverter headerConverter;
|
||||
private TestInternalConverter internalConverter;
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeAll() {
|
||||
pluginProps = new HashMap<>();
|
||||
|
||||
// Set up the plugins to have no additional plugin directories.
|
||||
// This won't allow us to test classpath isolation, but it will allow us to test some of the utility methods.
|
||||
pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, "");
|
||||
plugins = new Plugins(pluginProps);
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Before
|
||||
public void setup() {
|
||||
Map<String, String> pluginProps = new HashMap<>();
|
||||
|
||||
// Set up the plugins with some test plugins to test isolation
|
||||
pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, String.join(",", TestPlugins.pluginPath()));
|
||||
plugins = new Plugins(pluginProps);
|
||||
props = new HashMap<>(pluginProps);
|
||||
props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
|
||||
props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
|
||||
|
@ -185,6 +184,186 @@ public class PluginsTest {
|
|||
assertTrue(headerConverter instanceof SimpleHeaderConverter);
|
||||
}
|
||||
|
||||
@Test(expected = ConnectException.class)
|
||||
public void shouldThrowIfPluginThrows() {
|
||||
TestPlugins.assertAvailable();
|
||||
|
||||
plugins.newPlugin(
|
||||
TestPlugins.ALWAYS_THROW_EXCEPTION,
|
||||
new AbstractConfig(new ConfigDef(), Collections.emptyMap()),
|
||||
Converter.class
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldShareStaticValuesBetweenSamePlugin() {
|
||||
// Plugins are not isolated from other instances of their own class.
|
||||
TestPlugins.assertAvailable();
|
||||
Converter firstPlugin = plugins.newPlugin(
|
||||
TestPlugins.ALIASED_STATIC_FIELD,
|
||||
new AbstractConfig(new ConfigDef(), Collections.emptyMap()),
|
||||
Converter.class
|
||||
);
|
||||
|
||||
assertInstanceOf(SamplingTestPlugin.class, firstPlugin, "Cannot collect samples");
|
||||
|
||||
Converter secondPlugin = plugins.newPlugin(
|
||||
TestPlugins.ALIASED_STATIC_FIELD,
|
||||
new AbstractConfig(new ConfigDef(), Collections.emptyMap()),
|
||||
Converter.class
|
||||
);
|
||||
|
||||
assertInstanceOf(SamplingTestPlugin.class, secondPlugin, "Cannot collect samples");
|
||||
assertSame(
|
||||
((SamplingTestPlugin) firstPlugin).otherSamples(),
|
||||
((SamplingTestPlugin) secondPlugin).otherSamples()
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void newPluginShouldServiceLoadWithPluginClassLoader() {
|
||||
TestPlugins.assertAvailable();
|
||||
Converter plugin = plugins.newPlugin(
|
||||
TestPlugins.SERVICE_LOADER,
|
||||
new AbstractConfig(new ConfigDef(), Collections.emptyMap()),
|
||||
Converter.class
|
||||
);
|
||||
|
||||
assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples");
|
||||
Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten();
|
||||
// Assert that the service loaded subclass is found in both environments
|
||||
assertTrue(samples.containsKey("ServiceLoadedSubclass.static"));
|
||||
assertTrue(samples.containsKey("ServiceLoadedSubclass.dynamic"));
|
||||
assertPluginClassLoaderAlwaysActive(samples);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void newPluginShouldInstantiateWithPluginClassLoader() {
|
||||
TestPlugins.assertAvailable();
|
||||
Converter plugin = plugins.newPlugin(
|
||||
TestPlugins.ALIASED_STATIC_FIELD,
|
||||
new AbstractConfig(new ConfigDef(), Collections.emptyMap()),
|
||||
Converter.class
|
||||
);
|
||||
|
||||
assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples");
|
||||
Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten();
|
||||
assertPluginClassLoaderAlwaysActive(samples);
|
||||
}
|
||||
|
||||
@Test(expected = ConfigException.class)
|
||||
public void shouldFailToFindConverterInCurrentClassloader() {
|
||||
TestPlugins.assertAvailable();
|
||||
props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, TestPlugins.SAMPLING_CONVERTER);
|
||||
createConfig();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void newConverterShouldConfigureWithPluginClassLoader() {
|
||||
TestPlugins.assertAvailable();
|
||||
props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, TestPlugins.SAMPLING_CONVERTER);
|
||||
ClassLoader classLoader = plugins.delegatingLoader().pluginClassLoader(TestPlugins.SAMPLING_CONVERTER);
|
||||
ClassLoader savedLoader = Plugins.compareAndSwapLoaders(classLoader);
|
||||
createConfig();
|
||||
Plugins.compareAndSwapLoaders(savedLoader);
|
||||
|
||||
Converter plugin = plugins.newConverter(
|
||||
config,
|
||||
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
|
||||
ClassLoaderUsage.PLUGINS
|
||||
);
|
||||
|
||||
assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples");
|
||||
Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten();
|
||||
assertTrue(samples.containsKey("configure"));
|
||||
assertPluginClassLoaderAlwaysActive(samples);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void newConfigProviderShouldConfigureWithPluginClassLoader() {
|
||||
TestPlugins.assertAvailable();
|
||||
String providerPrefix = "some.provider";
|
||||
props.put(providerPrefix + ".class", TestPlugins.SAMPLING_CONFIG_PROVIDER);
|
||||
|
||||
PluginClassLoader classLoader = plugins.delegatingLoader().pluginClassLoader(TestPlugins.SAMPLING_CONFIG_PROVIDER);
|
||||
assertNotNull(classLoader);
|
||||
ClassLoader savedLoader = Plugins.compareAndSwapLoaders(classLoader);
|
||||
createConfig();
|
||||
Plugins.compareAndSwapLoaders(savedLoader);
|
||||
|
||||
ConfigProvider plugin = plugins.newConfigProvider(
|
||||
config,
|
||||
providerPrefix,
|
||||
ClassLoaderUsage.PLUGINS
|
||||
);
|
||||
|
||||
assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples");
|
||||
Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten();
|
||||
assertTrue(samples.containsKey("configure"));
|
||||
assertPluginClassLoaderAlwaysActive(samples);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void newHeaderConverterShouldConfigureWithPluginClassLoader() {
|
||||
TestPlugins.assertAvailable();
|
||||
props.put(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, TestPlugins.SAMPLING_HEADER_CONVERTER);
|
||||
ClassLoader classLoader = plugins.delegatingLoader().pluginClassLoader(TestPlugins.SAMPLING_HEADER_CONVERTER);
|
||||
ClassLoader savedLoader = Plugins.compareAndSwapLoaders(classLoader);
|
||||
createConfig();
|
||||
Plugins.compareAndSwapLoaders(savedLoader);
|
||||
|
||||
HeaderConverter plugin = plugins.newHeaderConverter(
|
||||
config,
|
||||
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
|
||||
ClassLoaderUsage.PLUGINS
|
||||
);
|
||||
|
||||
assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples");
|
||||
Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten();
|
||||
assertTrue(samples.containsKey("configure")); // HeaderConverter::configure was called
|
||||
assertPluginClassLoaderAlwaysActive(samples);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void newPluginsShouldConfigureWithPluginClassLoader() {
|
||||
TestPlugins.assertAvailable();
|
||||
List<Configurable> configurables = plugins.newPlugins(
|
||||
Collections.singletonList(TestPlugins.SAMPLING_CONFIGURABLE),
|
||||
config,
|
||||
Configurable.class
|
||||
);
|
||||
assertEquals(1, configurables.size());
|
||||
Configurable plugin = configurables.get(0);
|
||||
|
||||
assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples");
|
||||
Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten();
|
||||
assertTrue(samples.containsKey("configure")); // Configurable::configure was called
|
||||
assertPluginClassLoaderAlwaysActive(samples);
|
||||
}
|
||||
|
||||
public static void assertPluginClassLoaderAlwaysActive(Map<String, SamplingTestPlugin> samples) {
|
||||
for (Entry<String, SamplingTestPlugin> e : samples.entrySet()) {
|
||||
String sampleName = "\"" + e.getKey() + "\" (" + e.getValue() + ")";
|
||||
assertInstanceOf(
|
||||
PluginClassLoader.class,
|
||||
e.getValue().staticClassloader(),
|
||||
sampleName + " has incorrect static classloader"
|
||||
);
|
||||
assertInstanceOf(
|
||||
PluginClassLoader.class,
|
||||
e.getValue().classloader(),
|
||||
sampleName + " has incorrect dynamic classloader"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public static void assertInstanceOf(Class<?> expected, Object actual, String message) {
|
||||
assertTrue(
|
||||
"Expected an instance of " + expected.getSimpleName() + ", found " + actual + " instead: " + message,
|
||||
expected.isInstance(actual)
|
||||
);
|
||||
}
|
||||
|
||||
protected void instantiateAndConfigureConverter(String configPropName, ClassLoaderUsage classLoaderUsage) {
|
||||
converter = (TestConverter) plugins.newConverter(config, configPropName, classLoaderUsage);
|
||||
assertNotNull(converter);
|
||||
|
|
|
@ -0,0 +1,122 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.connect.runtime.isolation;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
/**
|
||||
* Base class for plugins so we can sample information about their initialization
|
||||
*/
|
||||
public abstract class SamplingTestPlugin {
|
||||
|
||||
/**
|
||||
* @return the ClassLoader used to statically initialize this plugin class
|
||||
*/
|
||||
public abstract ClassLoader staticClassloader();
|
||||
|
||||
/**
|
||||
* @return the ClassLoader used to initialize this plugin instance
|
||||
*/
|
||||
public abstract ClassLoader classloader();
|
||||
|
||||
/**
|
||||
* @return a group of other SamplingTestPlugin instances known by this plugin
|
||||
* This should only return direct children, and not reference this instance directly
|
||||
*/
|
||||
public Map<String, SamplingTestPlugin> otherSamples() {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a flattened list of child samples including this entry keyed as "this"
|
||||
*/
|
||||
public Map<String, SamplingTestPlugin> flatten() {
|
||||
Map<String, SamplingTestPlugin> out = new HashMap<>();
|
||||
Map<String, SamplingTestPlugin> otherSamples = otherSamples();
|
||||
if (otherSamples != null) {
|
||||
for (Entry<String, SamplingTestPlugin> child : otherSamples.entrySet()) {
|
||||
for (Entry<String, SamplingTestPlugin> flattened : child.getValue().flatten().entrySet()) {
|
||||
String key = child.getKey();
|
||||
if (flattened.getKey().length() > 0) {
|
||||
key += "." + flattened.getKey();
|
||||
}
|
||||
out.put(key, flattened.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
out.put("", this);
|
||||
return out;
|
||||
}
|
||||
|
||||
/**
|
||||
* Log the parent method call as a child sample.
|
||||
* Stores only the last invocation of each method if there are multiple invocations.
|
||||
* @param samples The collection of samples to which this method call should be added
|
||||
*/
|
||||
public void logMethodCall(Map<String, SamplingTestPlugin> samples) {
|
||||
StackTraceElement[] stackTraces = Thread.currentThread().getStackTrace();
|
||||
if (stackTraces.length < 2) {
|
||||
return;
|
||||
}
|
||||
// 0 is inside getStackTrace
|
||||
// 1 is this method
|
||||
// 2 is our caller method
|
||||
StackTraceElement caller = stackTraces[2];
|
||||
|
||||
samples.put(caller.getMethodName(), new MethodCallSample(
|
||||
caller,
|
||||
Thread.currentThread().getContextClassLoader(),
|
||||
getClass().getClassLoader()
|
||||
));
|
||||
}
|
||||
|
||||
public static class MethodCallSample extends SamplingTestPlugin {
|
||||
|
||||
private final StackTraceElement caller;
|
||||
private final ClassLoader staticClassLoader;
|
||||
private final ClassLoader dynamicClassLoader;
|
||||
|
||||
public MethodCallSample(
|
||||
StackTraceElement caller,
|
||||
ClassLoader staticClassLoader,
|
||||
ClassLoader dynamicClassLoader
|
||||
) {
|
||||
this.caller = caller;
|
||||
this.staticClassLoader = staticClassLoader;
|
||||
this.dynamicClassLoader = dynamicClassLoader;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClassLoader staticClassloader() {
|
||||
return staticClassLoader;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClassLoader classloader() {
|
||||
return dynamicClassLoader;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return caller.toString();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,267 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.connect.runtime.isolation;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.StringWriter;
|
||||
import java.net.URL;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.jar.Attributes;
|
||||
import java.util.jar.JarEntry;
|
||||
import java.util.jar.JarOutputStream;
|
||||
import java.util.jar.Manifest;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.tools.JavaCompiler;
|
||||
import javax.tools.StandardJavaFileManager;
|
||||
import javax.tools.ToolProvider;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Utility class for constructing test plugins for Connect.
|
||||
*
|
||||
* <p>Plugins are built from their source under resources/test-plugins/ and placed into temporary
|
||||
* jar files that are deleted when the process exits.
|
||||
*
|
||||
* <p>To add a plugin, create the source files in the resource tree, and edit this class to build
|
||||
* that plugin during initialization. For example, the plugin class {@literal package.Class} should
|
||||
* be placed in {@literal resources/test-plugins/something/package/Class.java} and loaded using
|
||||
* {@code createPluginJar("something")}. The class name, contents, and plugin directory can take
|
||||
* any value you need for testing.
|
||||
*
|
||||
* <p>To use this class in your tests, make sure to first call
|
||||
* {@link TestPlugins#assertAvailable()} to verify that the plugins initialized correctly.
|
||||
* Otherwise, exceptions during the plugin build are not propagated, and may invalidate your test.
|
||||
* You can access the list of plugin jars for assembling a {@literal plugin.path}, and reference
|
||||
* the names of the different plugins directly via the exposed constants.
|
||||
*/
|
||||
public class TestPlugins {
|
||||
|
||||
/**
|
||||
* Class name of a plugin which will always throw an exception during loading
|
||||
*/
|
||||
public static final String ALWAYS_THROW_EXCEPTION = "test.plugins.AlwaysThrowException";
|
||||
/**
|
||||
* Class name of a plugin which samples information about its initialization.
|
||||
*/
|
||||
public static final String ALIASED_STATIC_FIELD = "test.plugins.AliasedStaticField";
|
||||
/**
|
||||
* Class name of a {@link org.apache.kafka.connect.storage.Converter}
|
||||
* which samples information about its method calls.
|
||||
*/
|
||||
public static final String SAMPLING_CONVERTER = "test.plugins.SamplingConverter";
|
||||
/**
|
||||
* Class name of a {@link org.apache.kafka.common.Configurable}
|
||||
* which samples information about its method calls.
|
||||
*/
|
||||
public static final String SAMPLING_CONFIGURABLE = "test.plugins.SamplingConfigurable";
|
||||
/**
|
||||
* Class name of a {@link org.apache.kafka.connect.storage.HeaderConverter}
|
||||
* which samples information about its method calls.
|
||||
*/
|
||||
public static final String SAMPLING_HEADER_CONVERTER = "test.plugins.SamplingHeaderConverter";
|
||||
/**
|
||||
* Class name of a {@link org.apache.kafka.common.config.provider.ConfigProvider}
|
||||
* which samples information about its method calls.
|
||||
*/
|
||||
public static final String SAMPLING_CONFIG_PROVIDER = "test.plugins.SamplingConfigProvider";
|
||||
/**
|
||||
* Class name of a plugin which uses a {@link java.util.ServiceLoader}
|
||||
* to load internal classes, and samples information about their initialization.
|
||||
*/
|
||||
public static final String SERVICE_LOADER = "test.plugins.ServiceLoaderPlugin";
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(TestPlugins.class);
|
||||
private static final Map<String, File> PLUGIN_JARS;
|
||||
private static final Throwable INITIALIZATION_EXCEPTION;
|
||||
|
||||
static {
|
||||
Throwable err = null;
|
||||
HashMap<String, File> pluginJars = new HashMap<>();
|
||||
try {
|
||||
pluginJars.put(ALWAYS_THROW_EXCEPTION, createPluginJar("always-throw-exception"));
|
||||
pluginJars.put(ALIASED_STATIC_FIELD, createPluginJar("aliased-static-field"));
|
||||
pluginJars.put(SAMPLING_CONVERTER, createPluginJar("sampling-converter"));
|
||||
pluginJars.put(SAMPLING_CONFIGURABLE, createPluginJar("sampling-configurable"));
|
||||
pluginJars.put(SAMPLING_HEADER_CONVERTER, createPluginJar("sampling-header-converter"));
|
||||
pluginJars.put(SAMPLING_CONFIG_PROVIDER, createPluginJar("sampling-config-provider"));
|
||||
pluginJars.put(SERVICE_LOADER, createPluginJar("service-loader"));
|
||||
} catch (Throwable e) {
|
||||
log.error("Could not set up plugin test jars", e);
|
||||
err = e;
|
||||
}
|
||||
PLUGIN_JARS = Collections.unmodifiableMap(pluginJars);
|
||||
INITIALIZATION_EXCEPTION = err;
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure that the test plugin JARs were assembled without error before continuing.
|
||||
* @throws AssertionError if any plugin failed to load, or no plugins were loaded.
|
||||
*/
|
||||
public static void assertAvailable() throws AssertionError {
|
||||
if (INITIALIZATION_EXCEPTION != null) {
|
||||
throw new AssertionError("TestPlugins did not initialize completely",
|
||||
INITIALIZATION_EXCEPTION);
|
||||
}
|
||||
if (PLUGIN_JARS.isEmpty()) {
|
||||
throw new AssertionError("No test plugins loaded");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A list of jar files containing test plugins
|
||||
* @return A list of plugin jar filenames
|
||||
*/
|
||||
public static List<String> pluginPath() {
|
||||
return PLUGIN_JARS.values()
|
||||
.stream()
|
||||
.map(File::getPath)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all of the classes that were successfully built by this class
|
||||
* @return A list of plugin class names
|
||||
*/
|
||||
public static List<String> pluginClasses() {
|
||||
return new ArrayList<>(PLUGIN_JARS.keySet());
|
||||
}
|
||||
|
||||
private static File createPluginJar(String resourceDir) throws IOException {
|
||||
Path inputDir = resourceDirectoryPath("test-plugins/" + resourceDir);
|
||||
Path binDir = Files.createTempDirectory(resourceDir + ".bin.");
|
||||
compileJavaSources(inputDir, binDir);
|
||||
File jarFile = Files.createTempFile(resourceDir + ".", ".jar").toFile();
|
||||
try (JarOutputStream jar = openJarFile(jarFile)) {
|
||||
writeJar(jar, inputDir);
|
||||
writeJar(jar, binDir);
|
||||
}
|
||||
removeDirectory(binDir);
|
||||
jarFile.deleteOnExit();
|
||||
return jarFile;
|
||||
}
|
||||
|
||||
private static Path resourceDirectoryPath(String resourceDir) throws IOException {
|
||||
URL resource = Thread.currentThread()
|
||||
.getContextClassLoader()
|
||||
.getResource(resourceDir);
|
||||
if (resource == null) {
|
||||
throw new IOException("Could not find test plugin resource: " + resourceDir);
|
||||
}
|
||||
File file = new File(resource.getFile());
|
||||
if (!file.isDirectory()) {
|
||||
throw new IOException("Resource is not a directory: " + resourceDir);
|
||||
}
|
||||
if (!file.canRead()) {
|
||||
throw new IOException("Resource directory is not readable: " + resourceDir);
|
||||
}
|
||||
return file.toPath();
|
||||
}
|
||||
|
||||
private static JarOutputStream openJarFile(File jarFile) throws IOException {
|
||||
Manifest manifest = new Manifest();
|
||||
manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0");
|
||||
return new JarOutputStream(new FileOutputStream(jarFile), manifest);
|
||||
}
|
||||
|
||||
private static void removeDirectory(Path binDir) throws IOException {
|
||||
List<File> classFiles = Files.walk(binDir)
|
||||
.sorted(Comparator.reverseOrder())
|
||||
.map(Path::toFile)
|
||||
.collect(Collectors.toList());
|
||||
for (File classFile : classFiles) {
|
||||
if (!classFile.delete()) {
|
||||
throw new IOException("Could not delete: " + classFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Compile a directory of .java source files into .class files
|
||||
* .class files are placed into the same directory as their sources.
|
||||
*
|
||||
* <p>Dependencies between source files in this directory are resolved against one another
|
||||
* and the classes present in the test environment.
|
||||
* See https://stackoverflow.com/questions/1563909/ for more information.
|
||||
* Additional dependencies in your plugins should be added as test scope to :connect:runtime.
|
||||
* @param sourceDir Directory containing java source files
|
||||
* @throws IOException if the files cannot be compiled
|
||||
*/
|
||||
private static void compileJavaSources(Path sourceDir, Path binDir) throws IOException {
|
||||
JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
|
||||
List<File> sourceFiles = Files.walk(sourceDir)
|
||||
.filter(Files::isRegularFile)
|
||||
.filter(path -> path.toFile().getName().endsWith(".java"))
|
||||
.map(Path::toFile)
|
||||
.collect(Collectors.toList());
|
||||
StringWriter writer = new StringWriter();
|
||||
List<String> options = Arrays.asList(
|
||||
"-d", binDir.toString() // Write class output to a different directory.
|
||||
);
|
||||
|
||||
try (StandardJavaFileManager fileManager = compiler.getStandardFileManager(null, null, null)) {
|
||||
boolean success = compiler.getTask(
|
||||
writer,
|
||||
fileManager,
|
||||
null,
|
||||
options,
|
||||
null,
|
||||
fileManager.getJavaFileObjectsFromFiles(sourceFiles)
|
||||
).call();
|
||||
if (!success) {
|
||||
throw new RuntimeException("Failed to compile test plugin:\n" + writer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void writeJar(JarOutputStream jar, Path inputDir) throws IOException {
|
||||
List<Path> paths = Files.walk(inputDir)
|
||||
.filter(Files::isRegularFile)
|
||||
.filter(path -> !path.toFile().getName().endsWith(".java"))
|
||||
.collect(Collectors.toList());
|
||||
for (Path path : paths) {
|
||||
try (InputStream in = new BufferedInputStream(new FileInputStream(path.toFile()))) {
|
||||
jar.putNextEntry(new JarEntry(
|
||||
inputDir.relativize(path)
|
||||
.toFile()
|
||||
.getPath()
|
||||
.replace(File.separator, "/")
|
||||
));
|
||||
byte[] buffer = new byte[1024];
|
||||
for (int count; (count = in.read(buffer)) != -1; ) {
|
||||
jar.write(buffer, 0, count);
|
||||
}
|
||||
jar.closeEntry();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,75 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package test.plugins;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.SchemaAndValue;
|
||||
import org.apache.kafka.connect.storage.Converter;
|
||||
import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
|
||||
|
||||
/**
|
||||
* Samples data about its initialization environment for later analysis
|
||||
* Samples are shared between instances of the same class in a static variable
|
||||
*/
|
||||
public class AliasedStaticField extends SamplingTestPlugin implements Converter {
|
||||
|
||||
private static final Map<String, SamplingTestPlugin> SAMPLES;
|
||||
private static final ClassLoader STATIC_CLASS_LOADER;
|
||||
private final ClassLoader classloader;
|
||||
|
||||
static {
|
||||
SAMPLES = new HashMap<>();
|
||||
STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader();
|
||||
}
|
||||
|
||||
{
|
||||
classloader = Thread.currentThread().getContextClassLoader();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(final Map<String, ?> configs, final boolean isKey) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] fromConnectData(final String topic, final Schema schema, final Object value) {
|
||||
return new byte[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public SchemaAndValue toConnectData(final String topic, final byte[] value) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClassLoader staticClassloader() {
|
||||
return STATIC_CLASS_LOADER;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClassLoader classloader() {
|
||||
return classloader;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, SamplingTestPlugin> otherSamples() {
|
||||
return SAMPLES;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package test.plugins;
|
||||
|
||||
import java.util.Map;
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.SchemaAndValue;
|
||||
import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
|
||||
import org.apache.kafka.connect.storage.Converter;
|
||||
|
||||
/**
|
||||
* Unconditionally throw an exception during static initialization.
|
||||
*/
|
||||
public class AlwaysThrowException implements Converter {
|
||||
|
||||
static {
|
||||
setup();
|
||||
}
|
||||
|
||||
public static void setup() {
|
||||
throw new RuntimeException("I always throw an exception");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(final Map<String, ?> configs, final boolean isKey) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] fromConnectData(final String topic, final Schema schema, final Object value) {
|
||||
return new byte[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public SchemaAndValue toConnectData(final String topic, final byte[] value) {
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
test.plugins.SamplingConfigProvider
|
|
@ -0,0 +1,101 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package test.plugins;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import org.apache.kafka.common.config.provider.ConfigProvider;
|
||||
import org.apache.kafka.common.config.ConfigData;
|
||||
import org.apache.kafka.common.config.ConfigChangeCallback;
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.SchemaAndValue;
|
||||
import org.apache.kafka.connect.storage.Converter;
|
||||
import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
|
||||
import org.apache.kafka.connect.storage.HeaderConverter;
|
||||
|
||||
/**
|
||||
* Samples data about its initialization environment for later analysis
|
||||
*/
|
||||
public class SamplingConfigProvider extends SamplingTestPlugin implements ConfigProvider {
|
||||
|
||||
private static final ClassLoader STATIC_CLASS_LOADER;
|
||||
private final ClassLoader classloader;
|
||||
private Map<String, SamplingTestPlugin> samples;
|
||||
|
||||
static {
|
||||
STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader();
|
||||
}
|
||||
|
||||
{
|
||||
samples = new HashMap<>();
|
||||
classloader = Thread.currentThread().getContextClassLoader();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConfigData get(String path) {
|
||||
logMethodCall(samples);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConfigData get(String path, Set<String> keys) {
|
||||
logMethodCall(samples);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void subscribe(String path, Set<String> keys, ConfigChangeCallback callback) {
|
||||
logMethodCall(samples);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unsubscribe(String path, Set<String> keys, ConfigChangeCallback callback) {
|
||||
logMethodCall(samples);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unsubscribeAll() {
|
||||
logMethodCall(samples);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(final Map<String, ?> configs) {
|
||||
logMethodCall(samples);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
logMethodCall(samples);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClassLoader staticClassloader() {
|
||||
return STATIC_CLASS_LOADER;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClassLoader classloader() {
|
||||
return classloader;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, SamplingTestPlugin> otherSamples() {
|
||||
return samples;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,79 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package test.plugins;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import org.apache.kafka.common.Configurable;
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.SchemaAndValue;
|
||||
import org.apache.kafka.connect.storage.Converter;
|
||||
import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
|
||||
|
||||
/**
|
||||
* Samples data about its initialization environment for later analysis
|
||||
*/
|
||||
public class SamplingConfigurable extends SamplingTestPlugin implements Converter, Configurable {
|
||||
|
||||
private static final ClassLoader STATIC_CLASS_LOADER;
|
||||
private final ClassLoader classloader;
|
||||
private Map<String, SamplingTestPlugin> samples;
|
||||
|
||||
static {
|
||||
STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader();
|
||||
}
|
||||
|
||||
{
|
||||
samples = new HashMap<>();
|
||||
classloader = Thread.currentThread().getContextClassLoader();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(final Map<String, ?> configs) {
|
||||
logMethodCall(samples);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(final Map<String, ?> configs, final boolean isKey) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] fromConnectData(final String topic, final Schema schema, final Object value) {
|
||||
return new byte[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public SchemaAndValue toConnectData(final String topic, final byte[] value) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClassLoader staticClassloader() {
|
||||
return STATIC_CLASS_LOADER;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClassLoader classloader() {
|
||||
return classloader;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, SamplingTestPlugin> otherSamples() {
|
||||
return samples;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package test.plugins;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.SchemaAndValue;
|
||||
import org.apache.kafka.connect.storage.Converter;
|
||||
import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
|
||||
|
||||
/**
|
||||
* Samples data about its initialization environment for later analysis
|
||||
*/
|
||||
public class SamplingConverter extends SamplingTestPlugin implements Converter {
|
||||
|
||||
private static final ClassLoader STATIC_CLASS_LOADER;
|
||||
private final ClassLoader classloader;
|
||||
private Map<String, SamplingTestPlugin> samples;
|
||||
|
||||
static {
|
||||
STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader();
|
||||
}
|
||||
|
||||
{
|
||||
samples = new HashMap<>();
|
||||
classloader = Thread.currentThread().getContextClassLoader();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(final Map<String, ?> configs, final boolean isKey) {
|
||||
logMethodCall(samples);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] fromConnectData(final String topic, final Schema schema, final Object value) {
|
||||
logMethodCall(samples);
|
||||
return new byte[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public SchemaAndValue toConnectData(final String topic, final byte[] value) {
|
||||
logMethodCall(samples);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClassLoader staticClassloader() {
|
||||
return STATIC_CLASS_LOADER;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClassLoader classloader() {
|
||||
return classloader;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, SamplingTestPlugin> otherSamples() {
|
||||
return samples;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,89 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package test.plugins;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.SchemaAndValue;
|
||||
import org.apache.kafka.connect.storage.Converter;
|
||||
import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
|
||||
import org.apache.kafka.connect.storage.HeaderConverter;
|
||||
|
||||
/**
|
||||
* Samples data about its initialization environment for later analysis
|
||||
*/
|
||||
public class SamplingHeaderConverter extends SamplingTestPlugin implements HeaderConverter {
|
||||
|
||||
private static final ClassLoader STATIC_CLASS_LOADER;
|
||||
private final ClassLoader classloader;
|
||||
private Map<String, SamplingTestPlugin> samples;
|
||||
|
||||
static {
|
||||
STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader();
|
||||
}
|
||||
|
||||
{
|
||||
samples = new HashMap<>();
|
||||
classloader = Thread.currentThread().getContextClassLoader();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) {
|
||||
logMethodCall(samples);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) {
|
||||
logMethodCall(samples);
|
||||
return new byte[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConfigDef config() {
|
||||
logMethodCall(samples);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(final Map<String, ?> configs) {
|
||||
logMethodCall(samples);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
logMethodCall(samples);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClassLoader staticClassloader() {
|
||||
return STATIC_CLASS_LOADER;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClassLoader classloader() {
|
||||
return classloader;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, SamplingTestPlugin> otherSamples() {
|
||||
return samples;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
test.plugins.ServiceLoadedSubclass
|
|
@ -0,0 +1,48 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package test.plugins;
|
||||
|
||||
import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
|
||||
|
||||
/**
|
||||
* Superclass for service loaded classes
|
||||
*/
|
||||
public class ServiceLoadedClass extends SamplingTestPlugin {
|
||||
|
||||
private static final ClassLoader STATIC_CLASS_LOADER;
|
||||
private final ClassLoader classloader;
|
||||
|
||||
static {
|
||||
STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader();
|
||||
}
|
||||
|
||||
{
|
||||
classloader = Thread.currentThread().getContextClassLoader();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClassLoader staticClassloader() {
|
||||
return STATIC_CLASS_LOADER;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClassLoader classloader() {
|
||||
return classloader;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package test.plugins;
|
||||
|
||||
/**
|
||||
* Instance of a service loaded class
|
||||
*/
|
||||
public class ServiceLoadedSubclass extends ServiceLoadedClass {
|
||||
|
||||
private static final ClassLoader STATIC_CLASS_LOADER;
|
||||
private final ClassLoader classloader;
|
||||
|
||||
static {
|
||||
STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader();
|
||||
}
|
||||
|
||||
{
|
||||
classloader = Thread.currentThread().getContextClassLoader();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClassLoader staticClassloader() {
|
||||
return STATIC_CLASS_LOADER;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClassLoader classloader() {
|
||||
return classloader;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,85 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package test.plugins;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import java.util.ServiceLoader;
|
||||
import java.util.Iterator;
|
||||
import org.apache.kafka.connect.data.Schema;
|
||||
import org.apache.kafka.connect.data.SchemaAndValue;
|
||||
import org.apache.kafka.connect.storage.Converter;
|
||||
import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
|
||||
|
||||
/**
|
||||
* Samples data about its initialization environment for later analysis
|
||||
*/
|
||||
public class ServiceLoaderPlugin extends SamplingTestPlugin implements Converter {
|
||||
|
||||
private static final ClassLoader STATIC_CLASS_LOADER;
|
||||
private static final Map<String, SamplingTestPlugin> SAMPLES;
|
||||
private final ClassLoader classloader;
|
||||
|
||||
static {
|
||||
STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader();
|
||||
SAMPLES = new HashMap<>();
|
||||
Iterator<ServiceLoadedClass> it = ServiceLoader.load(ServiceLoadedClass.class).iterator();
|
||||
while (it.hasNext()) {
|
||||
ServiceLoadedClass loaded = it.next();
|
||||
SAMPLES.put(loaded.getClass().getSimpleName() + ".static", loaded);
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
classloader = Thread.currentThread().getContextClassLoader();
|
||||
Iterator<ServiceLoadedClass> it = ServiceLoader.load(ServiceLoadedClass.class).iterator();
|
||||
while (it.hasNext()) {
|
||||
ServiceLoadedClass loaded = it.next();
|
||||
SAMPLES.put(loaded.getClass().getSimpleName() + ".dynamic", loaded);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(final Map<String, ?> configs, final boolean isKey) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] fromConnectData(final String topic, final Schema schema, final Object value) {
|
||||
return new byte[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public SchemaAndValue toConnectData(final String topic, final byte[] value) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClassLoader staticClassloader() {
|
||||
return STATIC_CLASS_LOADER;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClassLoader classloader() {
|
||||
return classloader;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, SamplingTestPlugin> otherSamples() {
|
||||
return SAMPLES;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue