From dc00832b965c29fbbb4148cc680fadfc3f28642a Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Thu, 25 May 2023 07:22:44 -0700 Subject: [PATCH] KAFKA-14654: Connector classes should statically initialize with plugin classloader (#13165) Reviewers: Chaitanya Mukka , Chris Egerton --- .../isolation/DelegatingClassLoader.java | 21 ++-- .../runtime/isolation/PluginsTest.java | 55 +++++---- .../runtime/isolation/SamplingTestPlugin.java | 22 ++-- .../runtime/isolation/TestPlugins.java | 5 + .../test/plugins/AliasedStaticField.java | 2 +- .../test/plugins/AlwaysThrowException.java | 1 - .../test/plugins/ThingOne.java | 1 - .../test/plugins/ThingTwo.java | 1 - .../test/plugins/SamplingConfigProvider.java | 19 ++- .../test/plugins/SamplingConfigurable.java | 18 ++- .../test/plugins/SamplingConnector.java | 112 ++++++++++++++++++ .../test/plugins/SamplingConverter.java | 18 ++- .../test/plugins/SamplingHeaderConverter.java | 18 ++- .../test/plugins/ServiceLoadedClass.java | 2 +- .../test/plugins/ServiceLoaderPlugin.java | 2 +- 15 files changed, 252 insertions(+), 45 deletions(-) create mode 100644 connect/runtime/src/test/resources/test-plugins/sampling-connector/test/plugins/SamplingConnector.java diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java index df6006f197f..b996b04f94b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java @@ -400,7 +400,7 @@ public class DelegatingClassLoader extends URLClassLoader { Collection> result = new ArrayList<>(); for (Class plugin : plugins) { if (PluginUtils.isConcrete(plugin)) { - try { + try (LoaderSwap loaderSwap = withClassLoader(loader)) { result.add(pluginDesc(plugin, versionFor(plugin), loader)); } catch (ReflectiveOperationException | LinkageError e) { log.error("Failed to discover {}: Unable to instantiate {}{}", klass.getSimpleName(), plugin.getSimpleName(), reflectiveErrorDescription(e), e); @@ -419,11 +419,10 @@ public class DelegatingClassLoader extends URLClassLoader { @SuppressWarnings("unchecked") private Collection> getServiceLoaderPluginDesc(Class klass, ClassLoader loader) { - ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader); Collection> result = new ArrayList<>(); - try { - ServiceLoader serviceLoader = ServiceLoader.load(klass, loader); - for (Iterator iterator = serviceLoader.iterator(); iterator.hasNext(); ) { + ServiceLoader serviceLoader = ServiceLoader.load(klass, loader); + for (Iterator iterator = serviceLoader.iterator(); iterator.hasNext(); ) { + try (LoaderSwap loaderSwap = withClassLoader(loader)) { T pluginImpl; try { pluginImpl = iterator.next(); @@ -434,8 +433,6 @@ public class DelegatingClassLoader extends URLClassLoader { result.add(pluginDesc((Class) pluginImpl.getClass(), versionFor(pluginImpl), loader)); } - } finally { - Plugins.compareAndSwapLoaders(savedLoader); } return result; } @@ -473,6 +470,16 @@ public class DelegatingClassLoader extends URLClassLoader { } } + public LoaderSwap withClassLoader(ClassLoader loader) { + ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader); + try { + return new LoaderSwap(savedLoader); + } catch (Throwable t) { + Plugins.compareAndSwapLoaders(savedLoader); + throw t; + } + } + @Override protected Class loadClass(String name, boolean resolve) throws ClassNotFoundException { String fullName = aliases.getOrDefault(name, name); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java index ca01b89bf77..a95695e451c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.provider.ConfigProvider; +import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.ConnectException; @@ -295,7 +296,7 @@ public class PluginsTest { // Assert that the service loaded subclass is found in both environments assertTrue(samples.containsKey("ServiceLoadedSubclass.static")); assertTrue(samples.containsKey("ServiceLoadedSubclass.dynamic")); - assertPluginClassLoaderAlwaysActive(samples); + assertPluginClassLoaderAlwaysActive(plugin); } @Test @@ -306,9 +307,7 @@ public class PluginsTest { Converter.class ); - assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples"); - Map samples = ((SamplingTestPlugin) plugin).flatten(); - assertPluginClassLoaderAlwaysActive(samples); + assertPluginClassLoaderAlwaysActive(plugin); } @Test @@ -334,7 +333,7 @@ public class PluginsTest { assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples"); Map samples = ((SamplingTestPlugin) plugin).flatten(); assertTrue(samples.containsKey("configure")); - assertPluginClassLoaderAlwaysActive(samples); + assertPluginClassLoaderAlwaysActive(plugin); } @Test @@ -357,7 +356,7 @@ public class PluginsTest { assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples"); Map samples = ((SamplingTestPlugin) plugin).flatten(); assertTrue(samples.containsKey("configure")); - assertPluginClassLoaderAlwaysActive(samples); + assertPluginClassLoaderAlwaysActive(plugin); } @Test @@ -377,7 +376,17 @@ public class PluginsTest { assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples"); Map samples = ((SamplingTestPlugin) plugin).flatten(); assertTrue(samples.containsKey("configure")); // HeaderConverter::configure was called - assertPluginClassLoaderAlwaysActive(samples); + assertPluginClassLoaderAlwaysActive(plugin); + } + + @Test + public void newConnectorShouldInstantiateWithPluginClassLoader() { + Connector plugin = plugins.newConnector(TestPlugin.SAMPLING_CONNECTOR.className()); + + assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples"); + Map samples = ((SamplingTestPlugin) plugin).flatten(); + assertTrue(samples.containsKey("")); // constructor was called + assertPluginClassLoaderAlwaysActive(plugin); } @Test @@ -393,7 +402,7 @@ public class PluginsTest { assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples"); Map samples = ((SamplingTestPlugin) plugin).flatten(); assertTrue(samples.containsKey("configure")); // Configurable::configure was called - assertPluginClassLoaderAlwaysActive(samples); + assertPluginClassLoaderAlwaysActive(plugin); } @Test @@ -460,19 +469,23 @@ public class PluginsTest { converter.toConnectData(null, null).value()); } - public static void assertPluginClassLoaderAlwaysActive(Map samples) { - for (Entry e : samples.entrySet()) { - String sampleName = "\"" + e.getKey() + "\" (" + e.getValue() + ")"; - assertInstanceOf( - PluginClassLoader.class, - e.getValue().staticClassloader(), - sampleName + " has incorrect static classloader" - ); - assertInstanceOf( - PluginClassLoader.class, - e.getValue().classloader(), - sampleName + " has incorrect dynamic classloader" - ); + public static void assertPluginClassLoaderAlwaysActive(Object plugin) { + assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples"); + for (SamplingTestPlugin instance : ((SamplingTestPlugin) plugin).allInstances()) { + Map samples = instance.flatten(); + for (Entry e : samples.entrySet()) { + String sampleName = "\"" + e.getKey() + "\" (" + e.getValue() + ")"; + assertInstanceOf( + PluginClassLoader.class, + e.getValue().staticClassloader(), + sampleName + " has incorrect static classloader" + ); + assertInstanceOf( + PluginClassLoader.class, + e.getValue().classloader(), + sampleName + " has incorrect dynamic classloader" + ); + } } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SamplingTestPlugin.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SamplingTestPlugin.java index bcf8881898e..8b664227a19 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SamplingTestPlugin.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SamplingTestPlugin.java @@ -19,36 +19,44 @@ package org.apache.kafka.connect.runtime.isolation; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; /** * Base class for plugins so we can sample information about their initialization */ -public abstract class SamplingTestPlugin { +public interface SamplingTestPlugin { /** * @return the ClassLoader used to statically initialize this plugin class */ - public abstract ClassLoader staticClassloader(); + ClassLoader staticClassloader(); /** * @return the ClassLoader used to initialize this plugin instance */ - public abstract ClassLoader classloader(); + ClassLoader classloader(); + + /** + * @return All known instances of this class, including this instance. + */ + default List allInstances() { + return Collections.singletonList(this); + } /** * @return a group of other SamplingTestPlugin instances known by this plugin * This should only return direct children, and not reference this instance directly */ - public Map otherSamples() { + default Map otherSamples() { return Collections.emptyMap(); } /** * @return a flattened list of child samples including this entry keyed as "this" */ - public Map flatten() { + default Map flatten() { Map out = new HashMap<>(); Map otherSamples = otherSamples(); if (otherSamples != null) { @@ -71,7 +79,7 @@ public abstract class SamplingTestPlugin { * Stores only the last invocation of each method if there are multiple invocations. * @param samples The collection of samples to which this method call should be added */ - public void logMethodCall(Map samples) { + default void logMethodCall(Map samples) { StackTraceElement[] stackTraces = Thread.currentThread().getStackTrace(); if (stackTraces.length < 2) { return; @@ -88,7 +96,7 @@ public abstract class SamplingTestPlugin { )); } - public static class MethodCallSample extends SamplingTestPlugin { + class MethodCallSample implements SamplingTestPlugin { private final StackTraceElement caller; private final ClassLoader staticClassLoader; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java index 1b018f1194a..f8d6db2ffb6 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java @@ -94,6 +94,11 @@ public class TestPlugins { * which samples information about its method calls. */ SAMPLING_CONFIG_PROVIDER("sampling-config-provider", "test.plugins.SamplingConfigProvider"), + /** + * A {@link org.apache.kafka.connect.sink.SinkConnector} + * which samples information about its method calls. + */ + SAMPLING_CONNECTOR("sampling-connector", "test.plugins.SamplingConnector"), /** * A plugin which uses a {@link java.util.ServiceLoader} * to load internal classes, and samples information about their initialization. diff --git a/connect/runtime/src/test/resources/test-plugins/aliased-static-field/test/plugins/AliasedStaticField.java b/connect/runtime/src/test/resources/test-plugins/aliased-static-field/test/plugins/AliasedStaticField.java index df77f300a7e..818a10a44bc 100644 --- a/connect/runtime/src/test/resources/test-plugins/aliased-static-field/test/plugins/AliasedStaticField.java +++ b/connect/runtime/src/test/resources/test-plugins/aliased-static-field/test/plugins/AliasedStaticField.java @@ -30,7 +30,7 @@ import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin; *

Samples data about its initialization environment for later analysis. * Samples are shared between instances of the same class in a static variable. */ -public class AliasedStaticField extends SamplingTestPlugin implements Converter { +public class AliasedStaticField implements SamplingTestPlugin, Converter { private static final Map SAMPLES; private static final ClassLoader STATIC_CLASS_LOADER; diff --git a/connect/runtime/src/test/resources/test-plugins/always-throw-exception/test/plugins/AlwaysThrowException.java b/connect/runtime/src/test/resources/test-plugins/always-throw-exception/test/plugins/AlwaysThrowException.java index cbbce91579c..f977a4ed5e1 100644 --- a/connect/runtime/src/test/resources/test-plugins/always-throw-exception/test/plugins/AlwaysThrowException.java +++ b/connect/runtime/src/test/resources/test-plugins/always-throw-exception/test/plugins/AlwaysThrowException.java @@ -20,7 +20,6 @@ package test.plugins; import java.util.Map; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; -import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin; import org.apache.kafka.connect.storage.Converter; /** diff --git a/connect/runtime/src/test/resources/test-plugins/multiple-plugins-in-jar/test/plugins/ThingOne.java b/connect/runtime/src/test/resources/test-plugins/multiple-plugins-in-jar/test/plugins/ThingOne.java index 955b370e4f6..dd5f101ac58 100644 --- a/connect/runtime/src/test/resources/test-plugins/multiple-plugins-in-jar/test/plugins/ThingOne.java +++ b/connect/runtime/src/test/resources/test-plugins/multiple-plugins-in-jar/test/plugins/ThingOne.java @@ -21,7 +21,6 @@ import java.nio.charset.StandardCharsets; import java.util.Map; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; -import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin; import org.apache.kafka.connect.storage.Converter; /** diff --git a/connect/runtime/src/test/resources/test-plugins/multiple-plugins-in-jar/test/plugins/ThingTwo.java b/connect/runtime/src/test/resources/test-plugins/multiple-plugins-in-jar/test/plugins/ThingTwo.java index 9a0a0e6dee3..ac265ce9823 100644 --- a/connect/runtime/src/test/resources/test-plugins/multiple-plugins-in-jar/test/plugins/ThingTwo.java +++ b/connect/runtime/src/test/resources/test-plugins/multiple-plugins-in-jar/test/plugins/ThingTwo.java @@ -21,7 +21,6 @@ import java.nio.charset.StandardCharsets; import java.util.Map; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; -import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin; import org.apache.kafka.connect.runtime.isolation.TestPlugins; import org.apache.kafka.connect.storage.Converter; diff --git a/connect/runtime/src/test/resources/test-plugins/sampling-config-provider/test/plugins/SamplingConfigProvider.java b/connect/runtime/src/test/resources/test-plugins/sampling-config-provider/test/plugins/SamplingConfigProvider.java index f2417d3de58..b0a79a18ef2 100644 --- a/connect/runtime/src/test/resources/test-plugins/sampling-config-provider/test/plugins/SamplingConfigProvider.java +++ b/connect/runtime/src/test/resources/test-plugins/sampling-config-provider/test/plugins/SamplingConfigProvider.java @@ -17,9 +17,13 @@ package test.plugins; +import java.util.Collections; +import java.util.ArrayList; +import java.util.List; import java.util.Set; import java.util.Map; import java.util.HashMap; + import org.apache.kafka.common.config.provider.ConfigProvider; import org.apache.kafka.common.config.ConfigData; import org.apache.kafka.common.config.ConfigChangeCallback; @@ -34,14 +38,16 @@ import org.apache.kafka.connect.storage.HeaderConverter; * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}. *

Samples data about its initialization environment for later analysis. */ -public class SamplingConfigProvider extends SamplingTestPlugin implements ConfigProvider { +public final class SamplingConfigProvider implements SamplingTestPlugin, ConfigProvider { private static final ClassLoader STATIC_CLASS_LOADER; + private static List instances; private final ClassLoader classloader; private Map samples; static { STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader(); + instances = Collections.synchronizedList(new ArrayList<>()); } { @@ -61,6 +67,11 @@ public class SamplingConfigProvider extends SamplingTestPlugin implements Config return null; } + public SamplingConfigProvider() { + logMethodCall(samples); + instances.add(this); + } + @Override public void subscribe(String path, Set keys, ConfigChangeCallback callback) { logMethodCall(samples); @@ -100,4 +111,10 @@ public class SamplingConfigProvider extends SamplingTestPlugin implements Config public Map otherSamples() { return samples; } + + + @Override + public List allInstances() { + return instances; + } } diff --git a/connect/runtime/src/test/resources/test-plugins/sampling-configurable/test/plugins/SamplingConfigurable.java b/connect/runtime/src/test/resources/test-plugins/sampling-configurable/test/plugins/SamplingConfigurable.java index 610ae6b724b..3e8f4652761 100644 --- a/connect/runtime/src/test/resources/test-plugins/sampling-configurable/test/plugins/SamplingConfigurable.java +++ b/connect/runtime/src/test/resources/test-plugins/sampling-configurable/test/plugins/SamplingConfigurable.java @@ -17,8 +17,12 @@ package test.plugins; +import java.util.Collections; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.HashMap; + import org.apache.kafka.common.Configurable; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; @@ -30,14 +34,16 @@ import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin; * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}. *

Samples data about its initialization environment for later analysis. */ -public class SamplingConfigurable extends SamplingTestPlugin implements Converter, Configurable { +public final class SamplingConfigurable implements SamplingTestPlugin, Converter, Configurable { private static final ClassLoader STATIC_CLASS_LOADER; + private static List instances; private final ClassLoader classloader; private Map samples; static { STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader(); + instances = Collections.synchronizedList(new ArrayList<>()); } { @@ -45,6 +51,11 @@ public class SamplingConfigurable extends SamplingTestPlugin implements Converte classloader = Thread.currentThread().getContextClassLoader(); } + public SamplingConfigurable() { + logMethodCall(samples); + instances.add(this); + } + @Override public void configure(final Map configs) { logMethodCall(samples); @@ -78,4 +89,9 @@ public class SamplingConfigurable extends SamplingTestPlugin implements Converte public Map otherSamples() { return samples; } + + @Override + public List allInstances() { + return instances; + } } diff --git a/connect/runtime/src/test/resources/test-plugins/sampling-connector/test/plugins/SamplingConnector.java b/connect/runtime/src/test/resources/test-plugins/sampling-connector/test/plugins/SamplingConnector.java new file mode 100644 index 00000000000..263b41ef50c --- /dev/null +++ b/connect/runtime/src/test/resources/test-plugins/sampling-connector/test/plugins/SamplingConnector.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.plugins; + +import java.io.IOException; +import java.util.Collections; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.HashMap; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin; + +/** + * Fake plugin class for testing classloading isolation. + * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}. + *

Samples data about its initialization environment for later analysis. + */ +public final class SamplingConnector extends SinkConnector implements SamplingTestPlugin { + + private static final ClassLoader STATIC_CLASS_LOADER; + private static List instances; + private final ClassLoader classloader; + private Map samples; + + static { + STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader(); + instances = Collections.synchronizedList(new ArrayList<>()); + } + + { + samples = new HashMap<>(); + classloader = Thread.currentThread().getContextClassLoader(); + } + + public SamplingConnector() { + logMethodCall(samples); + instances.add(this); + } + + @Override + public void start(Map props) { + logMethodCall(samples); + } + + @Override + public Class taskClass() { + logMethodCall(samples); + return null; + } + + @Override + public List> taskConfigs(int maxTasks) { + logMethodCall(samples); + return null; + } + + @Override + public void stop() { + logMethodCall(samples); + } + + @Override + public ConfigDef config() { + logMethodCall(samples); + return null; + } + + @Override + public String version() { + logMethodCall(samples); + return "1.0.0"; + } + + @Override + public ClassLoader staticClassloader() { + return STATIC_CLASS_LOADER; + } + + @Override + public ClassLoader classloader() { + return classloader; + } + + @Override + public Map otherSamples() { + return samples; + } + + @Override + public List allInstances() { + return instances; + } +} diff --git a/connect/runtime/src/test/resources/test-plugins/sampling-converter/test/plugins/SamplingConverter.java b/connect/runtime/src/test/resources/test-plugins/sampling-converter/test/plugins/SamplingConverter.java index 217e6efc776..2ed6a34edc8 100644 --- a/connect/runtime/src/test/resources/test-plugins/sampling-converter/test/plugins/SamplingConverter.java +++ b/connect/runtime/src/test/resources/test-plugins/sampling-converter/test/plugins/SamplingConverter.java @@ -17,8 +17,12 @@ package test.plugins; +import java.util.Collections; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.HashMap; + import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.storage.Converter; @@ -29,14 +33,16 @@ import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin; * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}. *

Samples data about its initialization environment for later analysis. */ -public class SamplingConverter extends SamplingTestPlugin implements Converter { +public final class SamplingConverter implements SamplingTestPlugin, Converter { private static final ClassLoader STATIC_CLASS_LOADER; + private static List instances; private final ClassLoader classloader; private Map samples; static { STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader(); + instances = Collections.synchronizedList(new ArrayList<>()); } { @@ -44,6 +50,11 @@ public class SamplingConverter extends SamplingTestPlugin implements Converter { classloader = Thread.currentThread().getContextClassLoader(); } + public SamplingConverter() { + logMethodCall(samples); + instances.add(this); + } + @Override public void configure(final Map configs, final boolean isKey) { logMethodCall(samples); @@ -75,4 +86,9 @@ public class SamplingConverter extends SamplingTestPlugin implements Converter { public Map otherSamples() { return samples; } + + @Override + public List allInstances() { + return instances; + } } diff --git a/connect/runtime/src/test/resources/test-plugins/sampling-header-converter/test/plugins/SamplingHeaderConverter.java b/connect/runtime/src/test/resources/test-plugins/sampling-header-converter/test/plugins/SamplingHeaderConverter.java index 914c6faac64..7f6df858f77 100644 --- a/connect/runtime/src/test/resources/test-plugins/sampling-header-converter/test/plugins/SamplingHeaderConverter.java +++ b/connect/runtime/src/test/resources/test-plugins/sampling-header-converter/test/plugins/SamplingHeaderConverter.java @@ -17,8 +17,12 @@ package test.plugins; +import java.util.Collections; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.HashMap; + import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; @@ -31,14 +35,16 @@ import org.apache.kafka.connect.storage.HeaderConverter; * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}. *

Samples data about its initialization environment for later analysis. */ -public class SamplingHeaderConverter extends SamplingTestPlugin implements HeaderConverter { +public final class SamplingHeaderConverter implements SamplingTestPlugin, HeaderConverter { private static final ClassLoader STATIC_CLASS_LOADER; + private static List instances; private final ClassLoader classloader; private Map samples; static { STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader(); + instances = Collections.synchronizedList(new ArrayList<>()); } { @@ -46,6 +52,11 @@ public class SamplingHeaderConverter extends SamplingTestPlugin implements Heade classloader = Thread.currentThread().getContextClassLoader(); } + public SamplingHeaderConverter() { + logMethodCall(samples); + instances.add(this); + } + @Override public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) { logMethodCall(samples); @@ -88,4 +99,9 @@ public class SamplingHeaderConverter extends SamplingTestPlugin implements Heade public Map otherSamples() { return samples; } + + @Override + public List allInstances() { + return instances; + } } diff --git a/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoadedClass.java b/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoadedClass.java index 98677ed43d6..354514a9385 100644 --- a/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoadedClass.java +++ b/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoadedClass.java @@ -22,7 +22,7 @@ import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin; /** * Superclass for service loaded classes */ -public class ServiceLoadedClass extends SamplingTestPlugin { +public class ServiceLoadedClass implements SamplingTestPlugin { private static final ClassLoader STATIC_CLASS_LOADER; private final ClassLoader classloader; diff --git a/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoaderPlugin.java b/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoaderPlugin.java index 6c50dcc888f..263479b1e30 100644 --- a/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoaderPlugin.java +++ b/connect/runtime/src/test/resources/test-plugins/service-loader/test/plugins/ServiceLoaderPlugin.java @@ -31,7 +31,7 @@ import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin; * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}. *

Samples data about its initialization environment for later analysis. */ -public class ServiceLoaderPlugin extends SamplingTestPlugin implements Converter { +public class ServiceLoaderPlugin implements SamplingTestPlugin, Converter { private static final ClassLoader STATIC_CLASS_LOADER; private static final Map SAMPLES;