KAFKA-14654: Connector classes should statically initialize with plugin classloader (#13165)

Reviewers: Chaitanya Mukka <chaitanya.mvs2007@gmail.com>, Chris Egerton <chrise@aiven.io>
This commit is contained in:
Greg Harris 2023-05-25 07:22:44 -07:00 committed by GitHub
parent fe303b9c3e
commit dc00832b96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 252 additions and 45 deletions

View File

@ -400,7 +400,7 @@ public class DelegatingClassLoader extends URLClassLoader {
Collection<PluginDesc<T>> result = new ArrayList<>();
for (Class<? extends T> plugin : plugins) {
if (PluginUtils.isConcrete(plugin)) {
try {
try (LoaderSwap loaderSwap = withClassLoader(loader)) {
result.add(pluginDesc(plugin, versionFor(plugin), loader));
} catch (ReflectiveOperationException | LinkageError e) {
log.error("Failed to discover {}: Unable to instantiate {}{}", klass.getSimpleName(), plugin.getSimpleName(), reflectiveErrorDescription(e), e);
@ -419,11 +419,10 @@ public class DelegatingClassLoader extends URLClassLoader {
@SuppressWarnings("unchecked")
private <T> Collection<PluginDesc<T>> getServiceLoaderPluginDesc(Class<T> klass, ClassLoader loader) {
ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);
Collection<PluginDesc<T>> result = new ArrayList<>();
try {
ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
for (Iterator<T> iterator = serviceLoader.iterator(); iterator.hasNext(); ) {
ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
for (Iterator<T> iterator = serviceLoader.iterator(); iterator.hasNext(); ) {
try (LoaderSwap loaderSwap = withClassLoader(loader)) {
T pluginImpl;
try {
pluginImpl = iterator.next();
@ -434,8 +433,6 @@ public class DelegatingClassLoader extends URLClassLoader {
result.add(pluginDesc((Class<? extends T>) pluginImpl.getClass(),
versionFor(pluginImpl), loader));
}
} finally {
Plugins.compareAndSwapLoaders(savedLoader);
}
return result;
}
@ -473,6 +470,16 @@ public class DelegatingClassLoader extends URLClassLoader {
}
}
public LoaderSwap withClassLoader(ClassLoader loader) {
ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);
try {
return new LoaderSwap(savedLoader);
} catch (Throwable t) {
Plugins.compareAndSwapLoaders(savedLoader);
throw t;
}
}
@Override
protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
String fullName = aliases.getOrDefault(name, name);

View File

@ -30,6 +30,7 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
@ -295,7 +296,7 @@ public class PluginsTest {
// Assert that the service loaded subclass is found in both environments
assertTrue(samples.containsKey("ServiceLoadedSubclass.static"));
assertTrue(samples.containsKey("ServiceLoadedSubclass.dynamic"));
assertPluginClassLoaderAlwaysActive(samples);
assertPluginClassLoaderAlwaysActive(plugin);
}
@Test
@ -306,9 +307,7 @@ public class PluginsTest {
Converter.class
);
assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples");
Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten();
assertPluginClassLoaderAlwaysActive(samples);
assertPluginClassLoaderAlwaysActive(plugin);
}
@Test
@ -334,7 +333,7 @@ public class PluginsTest {
assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples");
Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten();
assertTrue(samples.containsKey("configure"));
assertPluginClassLoaderAlwaysActive(samples);
assertPluginClassLoaderAlwaysActive(plugin);
}
@Test
@ -357,7 +356,7 @@ public class PluginsTest {
assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples");
Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten();
assertTrue(samples.containsKey("configure"));
assertPluginClassLoaderAlwaysActive(samples);
assertPluginClassLoaderAlwaysActive(plugin);
}
@Test
@ -377,7 +376,17 @@ public class PluginsTest {
assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples");
Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten();
assertTrue(samples.containsKey("configure")); // HeaderConverter::configure was called
assertPluginClassLoaderAlwaysActive(samples);
assertPluginClassLoaderAlwaysActive(plugin);
}
@Test
public void newConnectorShouldInstantiateWithPluginClassLoader() {
Connector plugin = plugins.newConnector(TestPlugin.SAMPLING_CONNECTOR.className());
assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples");
Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten();
assertTrue(samples.containsKey("<init>")); // constructor was called
assertPluginClassLoaderAlwaysActive(plugin);
}
@Test
@ -393,7 +402,7 @@ public class PluginsTest {
assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples");
Map<String, SamplingTestPlugin> samples = ((SamplingTestPlugin) plugin).flatten();
assertTrue(samples.containsKey("configure")); // Configurable::configure was called
assertPluginClassLoaderAlwaysActive(samples);
assertPluginClassLoaderAlwaysActive(plugin);
}
@Test
@ -460,19 +469,23 @@ public class PluginsTest {
converter.toConnectData(null, null).value());
}
public static void assertPluginClassLoaderAlwaysActive(Map<String, SamplingTestPlugin> samples) {
for (Entry<String, SamplingTestPlugin> e : samples.entrySet()) {
String sampleName = "\"" + e.getKey() + "\" (" + e.getValue() + ")";
assertInstanceOf(
PluginClassLoader.class,
e.getValue().staticClassloader(),
sampleName + " has incorrect static classloader"
);
assertInstanceOf(
PluginClassLoader.class,
e.getValue().classloader(),
sampleName + " has incorrect dynamic classloader"
);
public static void assertPluginClassLoaderAlwaysActive(Object plugin) {
assertInstanceOf(SamplingTestPlugin.class, plugin, "Cannot collect samples");
for (SamplingTestPlugin instance : ((SamplingTestPlugin) plugin).allInstances()) {
Map<String, SamplingTestPlugin> samples = instance.flatten();
for (Entry<String, SamplingTestPlugin> e : samples.entrySet()) {
String sampleName = "\"" + e.getKey() + "\" (" + e.getValue() + ")";
assertInstanceOf(
PluginClassLoader.class,
e.getValue().staticClassloader(),
sampleName + " has incorrect static classloader"
);
assertInstanceOf(
PluginClassLoader.class,
e.getValue().classloader(),
sampleName + " has incorrect dynamic classloader"
);
}
}
}

View File

@ -19,36 +19,44 @@ package org.apache.kafka.connect.runtime.isolation;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
/**
* Base class for plugins so we can sample information about their initialization
*/
public abstract class SamplingTestPlugin {
public interface SamplingTestPlugin {
/**
* @return the ClassLoader used to statically initialize this plugin class
*/
public abstract ClassLoader staticClassloader();
ClassLoader staticClassloader();
/**
* @return the ClassLoader used to initialize this plugin instance
*/
public abstract ClassLoader classloader();
ClassLoader classloader();
/**
* @return All known instances of this class, including this instance.
*/
default List<SamplingTestPlugin> allInstances() {
return Collections.singletonList(this);
}
/**
* @return a group of other SamplingTestPlugin instances known by this plugin
* This should only return direct children, and not reference this instance directly
*/
public Map<String, SamplingTestPlugin> otherSamples() {
default Map<String, SamplingTestPlugin> otherSamples() {
return Collections.emptyMap();
}
/**
* @return a flattened list of child samples including this entry keyed as "this"
*/
public Map<String, SamplingTestPlugin> flatten() {
default Map<String, SamplingTestPlugin> flatten() {
Map<String, SamplingTestPlugin> out = new HashMap<>();
Map<String, SamplingTestPlugin> otherSamples = otherSamples();
if (otherSamples != null) {
@ -71,7 +79,7 @@ public abstract class SamplingTestPlugin {
* Stores only the last invocation of each method if there are multiple invocations.
* @param samples The collection of samples to which this method call should be added
*/
public void logMethodCall(Map<String, SamplingTestPlugin> samples) {
default void logMethodCall(Map<String, SamplingTestPlugin> samples) {
StackTraceElement[] stackTraces = Thread.currentThread().getStackTrace();
if (stackTraces.length < 2) {
return;
@ -88,7 +96,7 @@ public abstract class SamplingTestPlugin {
));
}
public static class MethodCallSample extends SamplingTestPlugin {
class MethodCallSample implements SamplingTestPlugin {
private final StackTraceElement caller;
private final ClassLoader staticClassLoader;

View File

@ -94,6 +94,11 @@ public class TestPlugins {
* which samples information about its method calls.
*/
SAMPLING_CONFIG_PROVIDER("sampling-config-provider", "test.plugins.SamplingConfigProvider"),
/**
* A {@link org.apache.kafka.connect.sink.SinkConnector}
* which samples information about its method calls.
*/
SAMPLING_CONNECTOR("sampling-connector", "test.plugins.SamplingConnector"),
/**
* A plugin which uses a {@link java.util.ServiceLoader}
* to load internal classes, and samples information about their initialization.

View File

@ -30,7 +30,7 @@ import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
* <p>Samples data about its initialization environment for later analysis.
* Samples are shared between instances of the same class in a static variable.
*/
public class AliasedStaticField extends SamplingTestPlugin implements Converter {
public class AliasedStaticField implements SamplingTestPlugin, Converter {
private static final Map<String, SamplingTestPlugin> SAMPLES;
private static final ClassLoader STATIC_CLASS_LOADER;

View File

@ -20,7 +20,6 @@ package test.plugins;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
import org.apache.kafka.connect.storage.Converter;
/**

View File

@ -21,7 +21,6 @@ import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
import org.apache.kafka.connect.storage.Converter;
/**

View File

@ -21,7 +21,6 @@ import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
import org.apache.kafka.connect.runtime.isolation.TestPlugins;
import org.apache.kafka.connect.storage.Converter;

View File

@ -17,9 +17,13 @@
package test.plugins;
import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.config.ConfigData;
import org.apache.kafka.common.config.ConfigChangeCallback;
@ -34,14 +38,16 @@ import org.apache.kafka.connect.storage.HeaderConverter;
* See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
* <p>Samples data about its initialization environment for later analysis.
*/
public class SamplingConfigProvider extends SamplingTestPlugin implements ConfigProvider {
public final class SamplingConfigProvider implements SamplingTestPlugin, ConfigProvider {
private static final ClassLoader STATIC_CLASS_LOADER;
private static List<SamplingTestPlugin> instances;
private final ClassLoader classloader;
private Map<String, SamplingTestPlugin> samples;
static {
STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader();
instances = Collections.synchronizedList(new ArrayList<>());
}
{
@ -61,6 +67,11 @@ public class SamplingConfigProvider extends SamplingTestPlugin implements Config
return null;
}
public SamplingConfigProvider() {
logMethodCall(samples);
instances.add(this);
}
@Override
public void subscribe(String path, Set<String> keys, ConfigChangeCallback callback) {
logMethodCall(samples);
@ -100,4 +111,10 @@ public class SamplingConfigProvider extends SamplingTestPlugin implements Config
public Map<String, SamplingTestPlugin> otherSamples() {
return samples;
}
@Override
public List<SamplingTestPlugin> allInstances() {
return instances;
}
}

View File

@ -17,8 +17,12 @@
package test.plugins;
import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
@ -30,14 +34,16 @@ import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
* See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
* <p>Samples data about its initialization environment for later analysis.
*/
public class SamplingConfigurable extends SamplingTestPlugin implements Converter, Configurable {
public final class SamplingConfigurable implements SamplingTestPlugin, Converter, Configurable {
private static final ClassLoader STATIC_CLASS_LOADER;
private static List<SamplingTestPlugin> instances;
private final ClassLoader classloader;
private Map<String, SamplingTestPlugin> samples;
static {
STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader();
instances = Collections.synchronizedList(new ArrayList<>());
}
{
@ -45,6 +51,11 @@ public class SamplingConfigurable extends SamplingTestPlugin implements Converte
classloader = Thread.currentThread().getContextClassLoader();
}
public SamplingConfigurable() {
logMethodCall(samples);
instances.add(this);
}
@Override
public void configure(final Map<String, ?> configs) {
logMethodCall(samples);
@ -78,4 +89,9 @@ public class SamplingConfigurable extends SamplingTestPlugin implements Converte
public Map<String, SamplingTestPlugin> otherSamples() {
return samples;
}
@Override
public List<SamplingTestPlugin> allInstances() {
return instances;
}
}

View File

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package test.plugins;
import java.io.IOException;
import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
/**
* Fake plugin class for testing classloading isolation.
* See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
* <p>Samples data about its initialization environment for later analysis.
*/
public final class SamplingConnector extends SinkConnector implements SamplingTestPlugin {
private static final ClassLoader STATIC_CLASS_LOADER;
private static List<SamplingTestPlugin> instances;
private final ClassLoader classloader;
private Map<String, SamplingTestPlugin> samples;
static {
STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader();
instances = Collections.synchronizedList(new ArrayList<>());
}
{
samples = new HashMap<>();
classloader = Thread.currentThread().getContextClassLoader();
}
public SamplingConnector() {
logMethodCall(samples);
instances.add(this);
}
@Override
public void start(Map<String, String> props) {
logMethodCall(samples);
}
@Override
public Class<? extends Task> taskClass() {
logMethodCall(samples);
return null;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
logMethodCall(samples);
return null;
}
@Override
public void stop() {
logMethodCall(samples);
}
@Override
public ConfigDef config() {
logMethodCall(samples);
return null;
}
@Override
public String version() {
logMethodCall(samples);
return "1.0.0";
}
@Override
public ClassLoader staticClassloader() {
return STATIC_CLASS_LOADER;
}
@Override
public ClassLoader classloader() {
return classloader;
}
@Override
public Map<String, SamplingTestPlugin> otherSamples() {
return samples;
}
@Override
public List<SamplingTestPlugin> allInstances() {
return instances;
}
}

View File

@ -17,8 +17,12 @@
package test.plugins;
import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;
@ -29,14 +33,16 @@ import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
* See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
* <p>Samples data about its initialization environment for later analysis.
*/
public class SamplingConverter extends SamplingTestPlugin implements Converter {
public final class SamplingConverter implements SamplingTestPlugin, Converter {
private static final ClassLoader STATIC_CLASS_LOADER;
private static List<SamplingTestPlugin> instances;
private final ClassLoader classloader;
private Map<String, SamplingTestPlugin> samples;
static {
STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader();
instances = Collections.synchronizedList(new ArrayList<>());
}
{
@ -44,6 +50,11 @@ public class SamplingConverter extends SamplingTestPlugin implements Converter {
classloader = Thread.currentThread().getContextClassLoader();
}
public SamplingConverter() {
logMethodCall(samples);
instances.add(this);
}
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
logMethodCall(samples);
@ -75,4 +86,9 @@ public class SamplingConverter extends SamplingTestPlugin implements Converter {
public Map<String, SamplingTestPlugin> otherSamples() {
return samples;
}
@Override
public List<SamplingTestPlugin> allInstances() {
return instances;
}
}

View File

@ -17,8 +17,12 @@
package test.plugins;
import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
@ -31,14 +35,16 @@ import org.apache.kafka.connect.storage.HeaderConverter;
* See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
* <p>Samples data about its initialization environment for later analysis.
*/
public class SamplingHeaderConverter extends SamplingTestPlugin implements HeaderConverter {
public final class SamplingHeaderConverter implements SamplingTestPlugin, HeaderConverter {
private static final ClassLoader STATIC_CLASS_LOADER;
private static List<SamplingTestPlugin> instances;
private final ClassLoader classloader;
private Map<String, SamplingTestPlugin> samples;
static {
STATIC_CLASS_LOADER = Thread.currentThread().getContextClassLoader();
instances = Collections.synchronizedList(new ArrayList<>());
}
{
@ -46,6 +52,11 @@ public class SamplingHeaderConverter extends SamplingTestPlugin implements Heade
classloader = Thread.currentThread().getContextClassLoader();
}
public SamplingHeaderConverter() {
logMethodCall(samples);
instances.add(this);
}
@Override
public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) {
logMethodCall(samples);
@ -88,4 +99,9 @@ public class SamplingHeaderConverter extends SamplingTestPlugin implements Heade
public Map<String, SamplingTestPlugin> otherSamples() {
return samples;
}
@Override
public List<SamplingTestPlugin> allInstances() {
return instances;
}
}

View File

@ -22,7 +22,7 @@ import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
/**
* Superclass for service loaded classes
*/
public class ServiceLoadedClass extends SamplingTestPlugin {
public class ServiceLoadedClass implements SamplingTestPlugin {
private static final ClassLoader STATIC_CLASS_LOADER;
private final ClassLoader classloader;

View File

@ -31,7 +31,7 @@ import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
* See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}.
* <p>Samples data about its initialization environment for later analysis.
*/
public class ServiceLoaderPlugin extends SamplingTestPlugin implements Converter {
public class ServiceLoaderPlugin implements SamplingTestPlugin, Converter {
private static final ClassLoader STATIC_CLASS_LOADER;
private static final Map<String, SamplingTestPlugin> SAMPLES;