KAFKA-14645: Use plugin classloader when retrieving connector plugin config definitions (#13148)

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Greg Harris <gharris1727@gmail.com>
This commit is contained in:
Chris Egerton 2023-01-30 12:06:02 -05:00 committed by GitHub
parent 72cfc994f5
commit 17559d581e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 61 additions and 44 deletions

View File

@ -797,12 +797,18 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
@Override @Override
public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) { public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) {
List<ConfigKeyInfo> results = new ArrayList<>(); Plugins p = plugins();
ConfigDef configDefs; Class<?> pluginClass;
try { try {
Plugins p = plugins(); pluginClass = p.pluginClass(pluginName);
} catch (ClassNotFoundException cnfe) {
throw new NotFoundException("Unknown plugin " + pluginName + ".");
}
try (LoaderSwap loaderSwap = p.withClassLoader(pluginClass.getClassLoader())) {
Object plugin = p.newPlugin(pluginName); Object plugin = p.newPlugin(pluginName);
PluginType pluginType = PluginType.from(plugin.getClass()); PluginType pluginType = PluginType.from(plugin.getClass());
ConfigDef configDefs;
switch (pluginType) { switch (pluginType) {
case SINK: case SINK:
case SOURCE: case SOURCE:
@ -823,13 +829,14 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
default: default:
throw new BadRequestException("Invalid plugin type " + pluginType + ". Valid types are sink, source, converter, header_converter, transformation, predicate."); throw new BadRequestException("Invalid plugin type " + pluginType + ". Valid types are sink, source, converter, header_converter, transformation, predicate.");
} }
} catch (ClassNotFoundException cnfe) { List<ConfigKeyInfo> results = new ArrayList<>();
throw new NotFoundException("Unknown plugin " + pluginName + "."); for (ConfigDef.ConfigKey configKey : configDefs.configKeys().values()) {
results.add(AbstractHerder.convertConfigKey(configKey));
}
return results;
} catch (ClassNotFoundException e) {
throw new ConnectException("Failed to load plugin class or one of its dependencies", e);
} }
for (ConfigDef.ConfigKey configKey : configDefs.configKeys().values()) {
results.add(AbstractHerder.convertConfigKey(configKey));
}
return results;
} }
} }

View File

@ -123,6 +123,10 @@ public class Plugins {
); );
} }
public Class<?> pluginClass(String classOrAlias) throws ClassNotFoundException {
return pluginClass(delegatingLoader, classOrAlias, Object.class);
}
public static ClassLoader compareAndSwapLoaders(ClassLoader loader) { public static ClassLoader compareAndSwapLoaders(ClassLoader loader) {
ClassLoader current = Thread.currentThread().getContextClassLoader(); ClassLoader current = Thread.currentThread().getContextClassLoader();
if (!current.equals(loader)) { if (!current.equals(loader)) {

View File

@ -67,6 +67,7 @@ import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.kafka.connect.runtime.AbstractHerder.keysWithVariableValues; import static org.apache.kafka.connect.runtime.AbstractHerder.keysWithVariableValues;
@ -899,48 +900,51 @@ public class AbstractHerderTest {
} }
@Test @Test
public void testConnectorPluginConfig() throws Exception { public void testSinkConnectorPluginConfig() throws ClassNotFoundException {
testConnectorPluginConfig("sink", SampleSinkConnector::new, SampleSinkConnector::config);
}
@Test
public void testSourceConnectorPluginConfig() throws ClassNotFoundException {
testConnectorPluginConfig("source", SampleSourceConnector::new, SampleSourceConnector::config);
}
@Test
public void testConverterPluginConfig() throws ClassNotFoundException {
testConnectorPluginConfig("converter", SampleConverterWithHeaders::new, SampleConverterWithHeaders::config);
}
@Test
public void testHeaderConverterPluginConfig() throws ClassNotFoundException {
testConnectorPluginConfig("header-converter", SampleHeaderConverter::new, SampleHeaderConverter::config);
}
@Test
public void testPredicatePluginConfig() throws ClassNotFoundException {
testConnectorPluginConfig("predicate", SamplePredicate::new, SamplePredicate::config);
}
@Test
public void testTransformationPluginConfig() throws ClassNotFoundException {
testConnectorPluginConfig("transformation", SampleTransformation::new, SampleTransformation::config);
}
private <T> void testConnectorPluginConfig(String pluginName, Supplier<T> newPluginInstance, Function<T, ConfigDef> pluginConfig) throws ClassNotFoundException {
AbstractHerder herder = mock(AbstractHerder.class, withSettings() AbstractHerder herder = mock(AbstractHerder.class, withSettings()
.useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy) .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.defaultAnswer(CALLS_REAL_METHODS)); .defaultAnswer(CALLS_REAL_METHODS));
when(plugins.newPlugin(anyString())).then(invocation -> { when(plugins.pluginClass(pluginName)).then(invocation -> newPluginInstance.get().getClass());
String name = invocation.getArgument(0); when(plugins.newPlugin(anyString())).then(invocation -> newPluginInstance.get());
switch (name) {
case "sink": return new SampleSinkConnector();
case "source": return new SampleSourceConnector();
case "converter": return new SampleConverterWithHeaders();
case "header-converter": return new SampleHeaderConverter();
case "predicate": return new SamplePredicate();
default: return new SampleTransformation<>();
}
});
when(herder.plugins()).thenReturn(plugins); when(herder.plugins()).thenReturn(plugins);
List<ConfigKeyInfo> sinkConnectorConfigs = herder.connectorPluginConfig("sink"); List<ConfigKeyInfo> configs = herder.connectorPluginConfig(pluginName);
assertNotNull(sinkConnectorConfigs); assertNotNull(configs);
assertEquals(new SampleSinkConnector().config().names().size(), sinkConnectorConfigs.size());
List<ConfigKeyInfo> sourceConnectorConfigs = herder.connectorPluginConfig("source"); ConfigDef expectedConfig = pluginConfig.apply(newPluginInstance.get());
assertNotNull(sourceConnectorConfigs); assertEquals(expectedConfig.names().size(), configs.size());
assertEquals(new SampleSourceConnector().config().names().size(), sourceConnectorConfigs.size()); // Make sure that we used the correct class loader when interacting with the plugin
verify(plugins).withClassLoader(newPluginInstance.get().getClass().getClassLoader());
List<ConfigKeyInfo> converterConfigs = herder.connectorPluginConfig("converter");
assertNotNull(converterConfigs);
assertEquals(new SampleConverterWithHeaders().config().names().size(), converterConfigs.size());
List<ConfigKeyInfo> headerConverterConfigs = herder.connectorPluginConfig("header-converter");
assertNotNull(headerConverterConfigs);
assertEquals(new SampleHeaderConverter().config().names().size(), headerConverterConfigs.size());
List<ConfigKeyInfo> predicateConfigs = herder.connectorPluginConfig("predicate");
assertNotNull(predicateConfigs);
assertEquals(new SamplePredicate().config().names().size(), predicateConfigs.size());
List<ConfigKeyInfo> transformationConfigs = herder.connectorPluginConfig("transformation");
assertNotNull(transformationConfigs);
assertEquals(new SampleTransformation<>().config().names().size(), transformationConfigs.size());
} }
@Test(expected = NotFoundException.class) @Test(expected = NotFoundException.class)
@ -950,17 +954,19 @@ public class AbstractHerderTest {
.useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy) .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.defaultAnswer(CALLS_REAL_METHODS)); .defaultAnswer(CALLS_REAL_METHODS));
when(worker.getPlugins()).thenReturn(plugins); when(worker.getPlugins()).thenReturn(plugins);
when(plugins.newPlugin(anyString())).thenThrow(new ClassNotFoundException()); when(plugins.pluginClass(anyString())).thenThrow(new ClassNotFoundException());
herder.connectorPluginConfig(connName); herder.connectorPluginConfig(connName);
} }
@Test(expected = BadRequestException.class) @Test(expected = BadRequestException.class)
@SuppressWarnings({"rawtypes", "unchecked"})
public void testGetConnectorConfigDefWithInvalidPluginType() throws Exception { public void testGetConnectorConfigDefWithInvalidPluginType() throws Exception {
String connName = "AnotherPlugin"; String connName = "AnotherPlugin";
AbstractHerder herder = mock(AbstractHerder.class, withSettings() AbstractHerder herder = mock(AbstractHerder.class, withSettings()
.useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy) .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
.defaultAnswer(CALLS_REAL_METHODS)); .defaultAnswer(CALLS_REAL_METHODS));
when(worker.getPlugins()).thenReturn(plugins); when(worker.getPlugins()).thenReturn(plugins);
when(plugins.pluginClass(anyString())).thenReturn((Class) Object.class);
when(plugins.newPlugin(anyString())).thenReturn(new DirectoryConfigProvider()); when(plugins.newPlugin(anyString())).thenReturn(new DirectoryConfigProvider());
herder.connectorPluginConfig(connName); herder.connectorPluginConfig(connName);
} }