KAFKA-18419: KIP-891 Connect Multiversion Support (Transformation and Predicate Changes) (#17742)

Reviewers: Greg Harris <greg.harris@aiven.io>
This commit is contained in:
snehashisp 2025-01-07 01:48:45 +05:30 committed by GitHub
parent 23e77ed2d4
commit ad3369859c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 255 additions and 112 deletions

View File

@ -859,9 +859,11 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
addNullValuedErrors(connectorProps, validatedConnectorConfig);
ConfigInfos connectorConfigInfo = validateConnectorPluginSpecifiedConfigs(connectorProps, validatedConnectorConfig, enrichedConfigDef, connector, reportStage);
// the order of operations here is important, converter validations can add error messages to the connector config
// which are collected and converted to ConfigInfos in validateConnectorPluginSpecifiedConfigs
ConfigInfos converterConfigInfo = validateAllConverterConfigs(connectorProps, validatedConnectorConfig, connectorLoader, reportStage);
ConfigInfos clientOverrideInfo = validateClientOverrides(connectorProps, connectorType, connector.getClass(), reportStage, doLog);
ConfigInfos connectorConfigInfo = validateConnectorPluginSpecifiedConfigs(connectorProps, validatedConnectorConfig, enrichedConfigDef, connector, reportStage);
return mergeConfigInfos(connType,
connectorConfigInfo,

View File

@ -18,6 +18,7 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.VersionedPluginLoadingException;
@ -31,8 +32,8 @@ public class CachedConnectors {
private static final String LATEST_VERSION = "latest";
private final Map<String, Map<String, Connector>> connectors;
private final Map<String, Exception> invalidConnectors;
private final Map<String, Map<String, Exception>> invalidVersions;
private final Map<String, Throwable> invalidConnectors;
private final Map<String, Map<String, VersionedPluginLoadingException>> invalidVersions;
private final Plugins plugins;
public CachedConnectors(Plugins plugins) {
@ -42,14 +43,14 @@ public class CachedConnectors {
this.invalidVersions = new ConcurrentHashMap<>();
}
private void validate(String connectorName, VersionRange range) throws Exception {
private void validate(String connectorName, VersionRange range) throws ConnectException, VersionedPluginLoadingException {
if (invalidConnectors.containsKey(connectorName)) {
throw new Exception(invalidConnectors.get(connectorName));
throw new ConnectException(invalidConnectors.get(connectorName));
}
String version = range == null ? LATEST_VERSION : range.toString();
if (invalidVersions.containsKey(connectorName) && invalidVersions.get(connectorName).containsKey(version)) {
throw new Exception(invalidVersions.get(connectorName).get(version));
throw new VersionedPluginLoadingException(invalidVersions.get(connectorName).get(version).getMessage());
}
}

View File

@ -27,6 +27,7 @@ import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.PluginType;
import org.apache.kafka.connect.runtime.isolation.PluginUtils;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.PluginsRecommenders;
@ -274,17 +275,17 @@ public class ConnectorConfig extends AbstractConfig {
public static ConfigDef enrichedConfigDef(Plugins plugins, Map<String, String> connProps, WorkerConfig workerConfig) {
PluginsRecommenders recommender = new PluginsRecommenders(plugins);
ConverterDefaults keyConverterDefaults = converterDefaults(plugins, KEY_CONVERTER_CLASS_CONFIG,
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, WorkerConfig.KEY_CONVERTER_VERSION, connProps, workerConfig, Converter.class);
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, WorkerConfig.KEY_CONVERTER_VERSION, connProps, workerConfig, PluginType.CONVERTER);
ConverterDefaults valueConverterDefaults = converterDefaults(plugins, VALUE_CONVERTER_CLASS_CONFIG,
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, WorkerConfig.VALUE_CONVERTER_VERSION, connProps, workerConfig, Converter.class);
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, WorkerConfig.VALUE_CONVERTER_VERSION, connProps, workerConfig, PluginType.CONVERTER);
ConverterDefaults headerConverterDefaults = converterDefaults(plugins, HEADER_CONVERTER_CLASS_CONFIG,
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, WorkerConfig.HEADER_CONVERTER_VERSION, connProps, workerConfig, HeaderConverter.class);
return configDef(plugins.latestVersion(connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)),
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, WorkerConfig.HEADER_CONVERTER_VERSION, connProps, workerConfig, PluginType.HEADER_CONVERTER);
return configDef(plugins.latestVersion(connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG), PluginType.SINK, PluginType.SOURCE),
keyConverterDefaults, valueConverterDefaults, headerConverterDefaults, recommender);
}
public static ConfigDef enrichedConfigDef(Plugins plugins, String connectorClass) {
return configDef(plugins.latestVersion(connectorClass), CONVERTER_DEFAULTS, CONVERTER_DEFAULTS, CONVERTER_DEFAULTS, EMPTY_RECOMMENDER);
return configDef(plugins.latestVersion(connectorClass, PluginType.SINK, PluginType.SOURCE), CONVERTER_DEFAULTS, CONVERTER_DEFAULTS, CONVERTER_DEFAULTS, EMPTY_RECOMMENDER);
}
private static ConfigDef.CompositeValidator aliasValidator(String kind) {
@ -395,10 +396,9 @@ public class ConnectorConfig extends AbstractConfig {
* <p>
* {@code requireFullConfig} specifies whether required config values that are missing should cause an exception to be thrown.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public static ConfigDef enrich(Plugins plugins, ConfigDef baseConfigDef, Map<String, String> props, boolean requireFullConfig) {
ConfigDef newDef = new ConfigDef(baseConfigDef);
new EnrichablePlugin<Transformation<?>>("Transformation", TRANSFORMS_CONFIG, TRANSFORMS_GROUP, (Class) Transformation.class,
new EnrichablePlugin<Transformation<?>>("Transformation", TRANSFORMS_CONFIG, TRANSFORMS_GROUP, PluginType.TRANSFORMATION,
props, requireFullConfig) {
@Override
@ -417,8 +417,8 @@ public class ConnectorConfig extends AbstractConfig {
}
@Override
protected Stream<Map.Entry<String, ConfigDef.ConfigKey>> configDefsForClass(String typeConfig) {
return super.configDefsForClass(typeConfig)
protected Stream<Map.Entry<String, ConfigDef.ConfigKey>> configDefsForClass(String typeConfig, String versionConfig, Plugins plugins) {
return super.configDefsForClass(typeConfig, versionConfig, plugins)
.filter(entry -> {
// The implicit parameters mask any from the transformer with the same name
if (TransformationStage.PREDICATE_CONFIG.equals(entry.getKey())
@ -447,10 +447,16 @@ public class ConnectorConfig extends AbstractConfig {
"but there is no config '" + prefixedPredicate + "' defining a predicate to be negated.");
}
}
}.enrich(newDef);
@Override
protected ConfigDef.Recommender versionRecommender(String typeConfig) {
return new PluginsRecommenders(plugins).transformationPluginRecommender(typeConfig);
}
}.enrich(newDef, plugins);
new EnrichablePlugin<Predicate<?>>("Predicate", PREDICATES_CONFIG, PREDICATES_GROUP,
(Class) Predicate.class, props, requireFullConfig) {
PluginType.PREDICATE, props, requireFullConfig) {
@Override
protected Set<PluginDesc<Predicate<?>>> plugins() {
return plugins.predicates();
@ -460,7 +466,14 @@ public class ConnectorConfig extends AbstractConfig {
protected ConfigDef config(Predicate<?> predicate) {
return predicate.config();
}
}.enrich(newDef);
@Override
protected ConfigDef.Recommender versionRecommender(String typeConfig) {
return new PluginsRecommenders(plugins).predicatePluginRecommender(typeConfig);
}
}.enrich(newDef, plugins);
return newDef;
}
@ -471,7 +484,7 @@ public class ConnectorConfig extends AbstractConfig {
String workerConverterVersionConfig,
Map<String, String> connectorProps,
WorkerConfig workerConfig,
Class<T> converterType
PluginType converterType
) {
/*
if a converter is specified in the connector config it overrides the worker config for the corresponding converter
@ -510,34 +523,23 @@ public class ConnectorConfig extends AbstractConfig {
String version = null;
if (connectorConverter != null) {
version = fetchPluginVersion(plugins, connectorConverter, connectorVersion, connectorConverter);
version = fetchPluginVersion(plugins, connectorClass, connectorVersion, connectorConverter, converterType);
} else {
version = workerConfig.originalsStrings().get(workerConverterVersionConfig);
if (version == null) {
version = plugins.latestVersion(workerConverter);
version = plugins.latestVersion(workerConverter, converterType);
}
}
return new ConverterDefaults(type, version);
}
private static void updateKeyDefault(ConfigDef configDef, String versionConfigKey, String versionDefault) {
ConfigDef.ConfigKey key = configDef.configKeys().get(versionConfigKey);
if (key == null) {
return;
}
configDef.configKeys().put(versionConfigKey, new ConfigDef.ConfigKey(
versionConfigKey, key.type, versionDefault, key.validator, key.importance, key.documentation, key.group, key.orderInGroup, key.width, key.displayName, key.dependents, key.recommender, false
));
}
@SuppressWarnings("unchecked")
private static <T> String fetchPluginVersion(Plugins plugins, String connectorClass, String connectorVersion, String pluginName) {
if (pluginName == null) {
private static <T> String fetchPluginVersion(Plugins plugins, String connectorClass, String connectorVersion, String pluginName, PluginType pluginType) {
if (pluginName == null || connectorClass == null) {
return null;
}
try {
VersionRange range = PluginUtils.connectorVersionRequirement(connectorVersion);
return plugins.pluginVersion(pluginName, plugins.pluginLoader(connectorClass, range));
return plugins.pluginVersion(pluginName, plugins.pluginLoader(connectorClass, range), pluginType);
} catch (InvalidVersionSpecificationException | VersionedPluginLoadingException e) {
// these errors should be captured in other places, so we can ignore them here
log.warn("Failed to determine default plugin version for {}", connectorClass, e);
@ -559,24 +561,27 @@ public class ConnectorConfig extends AbstractConfig {
private final String aliasKind;
private final String aliasConfig;
private final String aliasGroup;
private final PluginType pluginType;
private final Class<T> baseClass;
private final Map<String, String> props;
private final boolean requireFullConfig;
@SuppressWarnings("unchecked")
public EnrichablePlugin(
String aliasKind,
String aliasConfig, String aliasGroup, Class<T> baseClass,
String aliasConfig, String aliasGroup, PluginType pluginType,
Map<String, String> props, boolean requireFullConfig) {
this.aliasKind = aliasKind;
this.aliasConfig = aliasConfig;
this.aliasGroup = aliasGroup;
this.baseClass = baseClass;
this.pluginType = pluginType;
this.baseClass = (Class<T>) pluginType.superClass();
this.props = props;
this.requireFullConfig = requireFullConfig;
}
/** Add the configs for this alias to the given {@code ConfigDef}. */
void enrich(ConfigDef newDef) {
void enrich(ConfigDef newDef, Plugins plugins) {
Object aliases = ConfigDef.parseType(aliasConfig, props.get(aliasConfig), Type.LIST);
if (!(aliases instanceof List)) {
return;
@ -593,12 +598,17 @@ public class ConnectorConfig extends AbstractConfig {
int orderInGroup = 0;
final String typeConfig = prefix + "type";
final String versionConfig = prefix + WorkerConfig.PLUGIN_VERSION_SUFFIX;
final String defaultVersion = fetchPluginVersion(plugins, props.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG),
props.get(ConnectorConfig.CONNECTOR_VERSION), props.get(typeConfig), pluginType);
// Add the class configuration
final ConfigDef.Validator typeValidator = ConfigDef.LambdaValidator.with(
(String name, Object value) -> {
validateProps(prefix);
// The value will be null if the class couldn't be found; no point in performing follow-up validation
if (value != null) {
getConfigDefFromConfigProvidingClass(typeConfig, (Class<?>) value);
getConfigDefFromPlugin(typeConfig, ((Class<?>) value).getName(), props.getOrDefault(versionConfig, defaultVersion), plugins);
}
},
() -> "valid configs for " + alias + " " + aliasKind.toLowerCase(Locale.ENGLISH));
@ -607,7 +617,25 @@ public class ConnectorConfig extends AbstractConfig {
baseClass.getSimpleName() + " type for " + alias,
Collections.emptyList(), new ClassRecommender());
final ConfigDef configDef = populateConfigDef(typeConfig);
// Add the version configuration
final ConfigDef.Validator versionValidator = (name, value) -> {
if (value != null) {
try {
getConfigDefFromPlugin(typeConfig, props.get(typeConfig), (String) value, plugins);
} catch (VersionedPluginLoadingException e) {
throw e;
} catch (Exception e) {
// ignore any other exception here as they are not related to version validation and
// will be captured in the validation of the class configuration
}
}
};
newDef.define(versionConfig, Type.STRING, defaultVersion, versionValidator, Importance.HIGH,
"Version of the '" + alias + "' " + aliasKind.toLowerCase(Locale.ENGLISH) + ".", group, orderInGroup++, Width.LONG,
baseClass.getSimpleName() + " version for " + alias,
Collections.emptyList(), versionRecommender(typeConfig));
final ConfigDef configDef = populateConfigDef(typeConfig, versionConfig, plugins);
if (configDef == null) continue;
newDef.embed(prefix, group, orderInGroup, configDef);
}
@ -621,10 +649,10 @@ public class ConnectorConfig extends AbstractConfig {
* Populates the ConfigDef according to the configs returned from {@code configs()} method of class
* named in the {@code ...type} parameter of the {@code props}.
*/
protected ConfigDef populateConfigDef(String typeConfig) {
protected ConfigDef populateConfigDef(String typeConfig, String versionConfig, Plugins plugins) {
final ConfigDef configDef = initialConfigDef();
try {
configDefsForClass(typeConfig)
configDefsForClass(typeConfig, versionConfig, plugins)
.forEach(entry -> configDef.define(entry.getValue()));
} catch (ConfigException e) {
if (requireFullConfig) {
@ -640,9 +668,11 @@ public class ConnectorConfig extends AbstractConfig {
* Return a stream of configs provided by the {@code configs()} method of class
* named in the {@code ...type} parameter of the {@code props}.
*/
protected Stream<Map.Entry<String, ConfigDef.ConfigKey>> configDefsForClass(String typeConfig) {
final Class<?> cls = (Class<?>) ConfigDef.parseType(typeConfig, props.get(typeConfig), Type.CLASS);
return getConfigDefFromConfigProvidingClass(typeConfig, cls)
protected Stream<Map.Entry<String, ConfigDef.ConfigKey>> configDefsForClass(String typeConfig, String versionConfig, Plugins plugins) {
if (props.get(typeConfig) == null) {
throw new ConfigException(typeConfig, null, "Not a " + baseClass.getSimpleName());
}
return getConfigDefFromPlugin(typeConfig, props.get(typeConfig), props.get(versionConfig), plugins)
.configKeys().entrySet().stream();
}
@ -651,30 +681,46 @@ public class ConnectorConfig extends AbstractConfig {
return new ConfigDef();
}
/**
* Return {@link ConfigDef} from {@code cls}, which is expected to be a non-null {@code Class<T>},
* by instantiating it and invoking {@link #config(T)}.
* @param key
* @param cls The subclass of the baseclass.
*/
ConfigDef getConfigDefFromConfigProvidingClass(String key, Class<?> cls) {
if (cls == null) {
throw new ConfigException(key, null, "Not a " + baseClass.getSimpleName());
@SuppressWarnings("unchecked")
ConfigDef getConfigDefFromPlugin(String key, String pluginClass, String version, Plugins plugins) {
String connectorClass = props.get(CONNECTOR_CLASS_CONFIG);
if (pluginClass == null || connectorClass == null) {
// if transformation class is null or connector class is null, we return empty as these validations are done in respective validators
return new ConfigDef();
}
VersionRange connectorVersionRange;
try {
connectorVersionRange = PluginUtils.connectorVersionRequirement(props.get(CONNECTOR_VERSION));
} catch (InvalidVersionSpecificationException e) {
// this should be caught in connector version validation
return new ConfigDef();
}
VersionRange pluginVersion;
try {
pluginVersion = PluginUtils.connectorVersionRequirement(version);
} catch (InvalidVersionSpecificationException e) {
throw new VersionedPluginLoadingException(e.getMessage());
}
// validate that the plugin class is a subclass of the base class
final Class<?> cls = (Class<?>) ConfigDef.parseType(key, props.get(key), Type.CLASS);
Utils.ensureConcreteSubclass(baseClass, cls);
T pluginInstance;
T plugin;
try {
pluginInstance = Utils.newInstance(cls, baseClass);
plugin = (T) plugins.newPlugin(pluginClass, pluginVersion, plugins.pluginLoader(connectorClass, connectorVersionRange));
} catch (VersionedPluginLoadingException e) {
throw e;
} catch (Exception e) {
throw new ConfigException(key, String.valueOf(cls), "Error getting config definition from " + baseClass.getSimpleName() + ": " + e.getMessage());
throw new ConfigException(key, pluginClass, "Error getting config definition from " + baseClass.getSimpleName() + ": " + e.getMessage());
}
ConfigDef configDef = config(pluginInstance);
ConfigDef configDef = config(plugin);
if (null == configDef) {
throw new ConnectException(
String.format(
"%s.config() must return a ConfigDef that is not null.",
cls.getName()
plugin.getClass().getName()
)
);
}
@ -694,6 +740,8 @@ public class ConnectorConfig extends AbstractConfig {
*/
protected abstract Set<PluginDesc<T>> plugins();
protected abstract ConfigDef.Recommender versionRecommender(String typeConfig);
/**
* Recommend bundled transformations or predicates.
*/
@ -741,7 +789,7 @@ public class ConnectorConfig extends AbstractConfig {
try {
PluginUtils.connectorVersionRequirement((String) value);
} catch (InvalidVersionSpecificationException e) {
throw new ConfigException(name, value, e.getMessage());
throw new VersionedPluginLoadingException(e.getMessage());
}
}
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.connect.runtime.isolation;
import org.apache.maven.artifact.versioning.ArtifactVersion;
import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
import org.apache.maven.artifact.versioning.VersionRange;
import org.slf4j.Logger;
@ -28,6 +27,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
@ -127,19 +127,7 @@ public class DelegatingClassLoader extends URLClassLoader {
return aliases.getOrDefault(classOrAlias, classOrAlias);
}
String latestVersion(String classOrAlias) {
if (classOrAlias == null) {
return null;
}
String fullName = aliases.getOrDefault(classOrAlias, classOrAlias);
SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(fullName);
if (inner == null) {
return null;
}
return inner.lastKey().version();
}
String versionInLocation(String classOrAlias, String location) {
PluginDesc<?> pluginDesc(String classOrAlias, String preferredLocation, Set<PluginType> allowedTypes) {
if (classOrAlias == null) {
return null;
}
@ -148,12 +136,17 @@ public class DelegatingClassLoader extends URLClassLoader {
if (inner == null) {
return null;
}
PluginDesc<?> result = null;
for (Map.Entry<PluginDesc<?>, ClassLoader> entry : inner.entrySet()) {
if (entry.getKey().location().equals(location)) {
return entry.getKey().version();
if (!allowedTypes.contains(entry.getKey().type())) {
continue;
}
result = entry.getKey();
if (result.location().equals(preferredLocation)) {
return result;
}
}
return null;
return result;
}
private ClassLoader findPluginLoader(
@ -170,7 +163,6 @@ public class DelegatingClassLoader extends URLClassLoader {
+ "Provided soft version: %s ", range));
}
ArtifactVersion version = null;
ClassLoader loader = null;
for (Map.Entry<PluginDesc<?>, ClassLoader> entry : loaders.entrySet()) {
// the entries should be in sorted order of versions so this should end up picking the latest version which matches the range
@ -227,19 +219,19 @@ public class DelegatingClassLoader extends URLClassLoader {
if (range == null) {
return plugin;
}
verifyClasspathVersionedPlugin(name, plugin, range);
verifyClasspathVersionedPlugin(fullName, plugin, range);
}
return plugin;
}
private void verifyClasspathVersionedPlugin(String name, Class<?> plugin, VersionRange range) throws VersionedPluginLoadingException {
private void verifyClasspathVersionedPlugin(String fullName, Class<?> plugin, VersionRange range) throws VersionedPluginLoadingException {
String pluginVersion;
SortedMap<PluginDesc<?>, ClassLoader> scannedPlugin = pluginLoaders.get(name);
SortedMap<PluginDesc<?>, ClassLoader> scannedPlugin = pluginLoaders.get(fullName);
if (scannedPlugin == null) {
throw new VersionedPluginLoadingException(String.format(
"Plugin %s is not part of Connect's plugin loading mechanism (ClassPath or Plugin Path)",
name
fullName
));
}
@ -255,7 +247,7 @@ public class DelegatingClassLoader extends URLClassLoader {
throw new VersionedPluginLoadingException(String.format(
"Plugin %s has multiple versions specified in class path, "
+ "only one version is allowed in class path for loading a plugin with version range",
name
fullName
));
} else if (classpathPlugins.isEmpty()) {
throw new VersionedPluginLoadingException("Invalid plugin found in classpath");
@ -264,7 +256,7 @@ public class DelegatingClassLoader extends URLClassLoader {
if (!range.containsVersion(new DefaultArtifactVersion(pluginVersion))) {
throw new VersionedPluginLoadingException(String.format(
"Plugin %s has version %s which does not match the required version range %s",
name,
fullName,
pluginVersion,
range
), Collections.singletonList(pluginVersion));

View File

@ -45,6 +45,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -265,15 +266,17 @@ public class Plugins {
};
}
public String latestVersion(String classOrAlias) {
return delegatingLoader.latestVersion(classOrAlias);
public String latestVersion(String classOrAlias, PluginType... allowedTypes) {
return pluginVersion(classOrAlias, null, allowedTypes);
}
public String pluginVersion(String classOrAlias, ClassLoader sourceLoader) {
if (!(sourceLoader instanceof PluginClassLoader)) {
return latestVersion(classOrAlias);
public String pluginVersion(String classOrAlias, ClassLoader sourceLoader, PluginType... allowedTypes) {
String location = (sourceLoader instanceof PluginClassLoader) ? ((PluginClassLoader) sourceLoader).location() : null;
PluginDesc<?> desc = delegatingLoader.pluginDesc(classOrAlias, location, new HashSet<>(Arrays.asList(allowedTypes)));
if (desc != null) {
return desc.version();
}
return delegatingLoader.versionInLocation(classOrAlias, ((PluginClassLoader) sourceLoader).location());
return null;
}
public DelegatingClassLoader delegatingLoader() {
@ -376,7 +379,7 @@ public class Plugins {
public Object newPlugin(String classOrAlias, VersionRange range, ClassLoader sourceLoader) throws ClassNotFoundException {
if (range == null && sourceLoader instanceof PluginClassLoader) {
sourceLoader.loadClass(classOrAlias);
return newPlugin(sourceLoader.loadClass(classOrAlias));
}
return newPlugin(classOrAlias, range);
}

View File

@ -19,10 +19,13 @@ package org.apache.kafka.connect.runtime.isolation;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -74,6 +77,14 @@ public class PluginsRecommenders {
return headerConverterPluginVersionRecommender;
}
public TransformationPluginRecommender transformationPluginRecommender(String classOrAlias) {
return new TransformationPluginRecommender(classOrAlias);
}
public PredicatePluginRecommender predicatePluginRecommender(String classOrAlias) {
return new PredicatePluginRecommender(classOrAlias);
}
public class ConnectorPluginVersionRecommender implements ConfigDef.Recommender {
@SuppressWarnings({"unchecked", "rawtypes"})
@ -195,4 +206,60 @@ public class PluginsRecommenders {
.map(PluginDesc::version).distinct().collect(Collectors.toList());
}
}
// Recommender for transformation and predicate plugins
public abstract class SMTPluginRecommender<T> implements ConfigDef.Recommender {
protected abstract Function<String, Set<PluginDesc<T>>> plugins();
protected final String classOrAliasConfig;
public SMTPluginRecommender(String classOrAliasConfig) {
this.classOrAliasConfig = classOrAliasConfig;
}
@Override
@SuppressWarnings({"rawtypes"})
public List<Object> validValues(String name, Map<String, Object> parsedConfig) {
if (plugins == null) {
return Collections.emptyList();
}
if (parsedConfig.get(classOrAliasConfig) == null) {
return Collections.emptyList();
}
Class classOrAlias = (Class) parsedConfig.get(classOrAliasConfig);
return plugins().apply(classOrAlias.getName())
.stream().map(PluginDesc::version).distinct().collect(Collectors.toList());
}
@Override
public boolean visible(String name, Map<String, Object> parsedConfig) {
return true;
}
}
public class TransformationPluginRecommender extends SMTPluginRecommender<Transformation<?>> {
public TransformationPluginRecommender(String classOrAliasConfig) {
super(classOrAliasConfig);
}
@Override
protected Function<String, Set<PluginDesc<Transformation<?>>>> plugins() {
return plugins::transformations;
}
}
public class PredicatePluginRecommender extends SMTPluginRecommender<Predicate<?>> {
public PredicatePluginRecommender(String classOrAliasConfig) {
super(classOrAliasConfig);
}
@Override
protected Function<String, Set<PluginDesc<Predicate<?>>>> plugins() {
return plugins::predicates;
}
}
}

View File

@ -62,6 +62,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
@ -559,12 +560,14 @@ public class AbstractHerderTest {
}
@Test
public void testConfigValidationTransformsExtendResults() {
@SuppressWarnings("rawtypes")
public void testConfigValidationTransformsExtendResults() throws ClassNotFoundException {
final Class<? extends Connector> connectorClass = SampleSourceConnector.class;
AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);
// 2 transform aliases defined -> 2 plugin lookups
when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc()));
Mockito.lenient().when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc()));
Mockito.lenient().when(plugins.newPlugin(SampleTransformation.class.getName(), null, classLoader)).thenReturn(new SampleTransformation());
// Define 2 transformations. One has a class defined and so can get embedded configs, the other is missing
// class info that should generate an error.
@ -575,6 +578,7 @@ public class AbstractHerderTest {
config.put(ConnectorConfig.TRANSFORMS_CONFIG + ".xformA.type", SampleTransformation.class.getName());
config.put("required", "value"); // connector required config
ConfigInfos result = herder.validateConnectorConfig(config, s -> null, false);
assertEquals(herder.connectorType(config), ConnectorType.SOURCE);
// We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on
@ -596,7 +600,7 @@ public class AbstractHerderTest {
assertEquals(1, result.errorCount());
Map<String, ConfigInfo> infos = result.values().stream()
.collect(Collectors.toMap(info -> info.configKey().name(), Function.identity()));
assertEquals(31, infos.size());
assertEquals(33, infos.size());
// Should get 2 type fields from the transforms, first adds its own config since it has a valid class
assertEquals("transforms.xformA.type",
infos.get("transforms.xformA.type").configValue().name());
@ -611,12 +615,15 @@ public class AbstractHerderTest {
}
@Test
public void testConfigValidationPredicatesExtendResults() {
@SuppressWarnings("rawtypes")
public void testConfigValidationPredicatesExtendResults() throws ClassNotFoundException {
final Class<? extends Connector> connectorClass = SampleSourceConnector.class;
AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);
when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc()));
when(plugins.predicates()).thenReturn(Collections.singleton(predicatePluginDesc()));
Mockito.lenient().when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc()));
Mockito.lenient().when(plugins.predicates()).thenReturn(Collections.singleton(predicatePluginDesc()));
Mockito.lenient().when(plugins.newPlugin(SampleTransformation.class.getName(), null, classLoader)).thenReturn(new SampleTransformation());
Mockito.lenient().when(plugins.newPlugin(SamplePredicate.class.getName(), null, classLoader)).thenReturn(new SamplePredicate());
// Define 2 predicates. One has a class defined and so can get embedded configs, the other is missing
// class info that should generate an error.
@ -653,7 +660,7 @@ public class AbstractHerderTest {
assertEquals(1, result.errorCount());
Map<String, ConfigInfo> infos = result.values().stream()
.collect(Collectors.toMap(info -> info.configKey().name(), Function.identity()));
assertEquals(33, infos.size());
assertEquals(36, infos.size());
// Should get 2 type fields from the transforms, first adds its own config since it has a valid class
assertEquals("transforms.xformA.type", infos.get("transforms.xformA.type").configValue().name());
assertTrue(infos.get("transforms.xformA.type").configValue().errors().isEmpty());

View File

@ -41,6 +41,8 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ConnectorConfigTest<R extends ConnectRecord<R>> {
@ -455,13 +457,19 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
}
@Test
public void testEnrichedConfigDef() {
@SuppressWarnings("rawtypes")
public void testEnrichedConfigDef() throws ClassNotFoundException {
String alias = "hdt";
String prefix = ConnectorConfig.TRANSFORMS_CONFIG + "." + alias + ".";
Map<String, String> props = new HashMap<>();
props.put(ConnectorConfig.TRANSFORMS_CONFIG, alias);
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestConnector.class.getName());
props.put(prefix + "type", HasDuplicateConfigTransformation.class.getName());
ConfigDef def = ConnectorConfig.enrich(MOCK_PLUGINS, new ConfigDef(), props, false);
Plugins mockPlugins = mock(Plugins.class);
when(mockPlugins.newPlugin(HasDuplicateConfigTransformation.class.getName(),
null, (ClassLoader) null)).thenReturn(new HasDuplicateConfigTransformation());
when(mockPlugins.transformations()).thenReturn(Collections.emptySet());
ConfigDef def = ConnectorConfig.enrich(mockPlugins, new ConfigDef(), props, false);
assertEnrichedConfigDef(def, prefix, HasDuplicateConfigTransformation.MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN);
assertEnrichedConfigDef(def, prefix, TransformationStage.PREDICATE_CONFIG, ConfigDef.Type.STRING);
assertEnrichedConfigDef(def, prefix, TransformationStage.NEGATE_CONFIG, ConfigDef.Type.BOOLEAN);

View File

@ -29,12 +29,16 @@ import org.apache.kafka.connect.transforms.ReplaceField;
import org.apache.kafka.connect.transforms.SetSchemaMetadata;
import org.apache.kafka.connect.transforms.TimestampConverter;
import org.apache.kafka.connect.transforms.TimestampRouter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.ValueToKey;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Tests that transformations' configs can be composed with ConnectorConfig during its construction, ensuring no
* conflicting fields or other issues.
@ -42,8 +46,19 @@ import java.util.HashMap;
* This test appears here simply because it requires both connect-runtime and connect-transforms and connect-runtime
* already depends on connect-transforms.
*/
@SuppressWarnings("rawtypes")
public class TransformationConfigTest {
private Plugins setupMockPlugins(Transformation transformation) {
Plugins plugins = mock(Plugins.class);
try {
when(plugins.newPlugin(transformation.getClass().getName(), null, (ClassLoader) null)).thenReturn(transformation);
} catch (ClassNotFoundException e) {
// Shouldn't happen since we're mocking the plugins
}
return plugins;
}
@Test
public void testEmbeddedConfigCast() {
// Validate that we can construct a Connector config containing the extended config for the transform
@ -54,7 +69,7 @@ public class TransformationConfigTest {
connProps.put("transforms.example.type", Cast.Value.class.getName());
connProps.put("transforms.example.spec", "int8");
Plugins plugins = null; // Safe when we're only constructing the config
Plugins plugins = setupMockPlugins(new Cast.Value());
new ConnectorConfig(plugins, connProps);
}
@ -68,7 +83,7 @@ public class TransformationConfigTest {
connProps.put("transforms.example.type", ExtractField.Value.class.getName());
connProps.put("transforms.example.field", "field");
Plugins plugins = null; // Safe when we're only constructing the config
Plugins plugins = setupMockPlugins(new ExtractField.Value());
new ConnectorConfig(plugins, connProps);
}
@ -81,7 +96,7 @@ public class TransformationConfigTest {
connProps.put("transforms", "example");
connProps.put("transforms.example.type", Flatten.Value.class.getName());
Plugins plugins = null; // Safe when we're only constructing the config
Plugins plugins = setupMockPlugins(new Flatten.Value());
new ConnectorConfig(plugins, connProps);
}
@ -95,7 +110,7 @@ public class TransformationConfigTest {
connProps.put("transforms.example.type", HoistField.Value.class.getName());
connProps.put("transforms.example.field", "field");
Plugins plugins = null; // Safe when we're only constructing the config
Plugins plugins = setupMockPlugins(new HoistField.Value());
new ConnectorConfig(plugins, connProps);
}
@ -108,7 +123,7 @@ public class TransformationConfigTest {
connProps.put("transforms", "example");
connProps.put("transforms.example.type", InsertField.Value.class.getName());
Plugins plugins = null; // Safe when we're only constructing the config
Plugins plugins = setupMockPlugins(new InsertField.Value());
new ConnectorConfig(plugins, connProps);
}
@ -123,7 +138,7 @@ public class TransformationConfigTest {
connProps.put("transforms.example.fields", "field");
connProps.put("transforms.example.replacement", "nothing");
Plugins plugins = null; // Safe when we're only constructing the config
Plugins plugins = setupMockPlugins(new MaskField.Value());
new ConnectorConfig(plugins, connProps);
}
@ -138,7 +153,7 @@ public class TransformationConfigTest {
connProps.put("transforms.example.regex", "(.*)");
connProps.put("transforms.example.replacement", "prefix-$1");
Plugins plugins = null; // Safe when we're only constructing the config
Plugins plugins = setupMockPlugins(new RegexRouter());
new ConnectorConfig(plugins, connProps);
}
@ -151,7 +166,7 @@ public class TransformationConfigTest {
connProps.put("transforms", "example");
connProps.put("transforms.example.type", ReplaceField.Value.class.getName());
Plugins plugins = null; // Safe when we're only constructing the config
Plugins plugins = setupMockPlugins(new ReplaceField.Value());
new ConnectorConfig(plugins, connProps);
}
@ -164,7 +179,7 @@ public class TransformationConfigTest {
connProps.put("transforms", "example");
connProps.put("transforms.example.type", SetSchemaMetadata.Value.class.getName());
Plugins plugins = null; // Safe when we're only constructing the config
Plugins plugins = setupMockPlugins(new SetSchemaMetadata.Value());
new ConnectorConfig(plugins, connProps);
}
@ -178,7 +193,7 @@ public class TransformationConfigTest {
connProps.put("transforms.example.type", TimestampConverter.Value.class.getName());
connProps.put("transforms.example.target.type", "unix");
Plugins plugins = null; // Safe when we're only constructing the config
Plugins plugins = setupMockPlugins(new TimestampConverter.Value());
new ConnectorConfig(plugins, connProps);
}
@ -191,7 +206,7 @@ public class TransformationConfigTest {
connProps.put("transforms", "example");
connProps.put("transforms.example.type", TimestampRouter.class.getName());
Plugins plugins = null; // Safe when we're only constructing the config
Plugins plugins = setupMockPlugins(new TimestampRouter());
new ConnectorConfig(plugins, connProps);
}
@ -205,7 +220,7 @@ public class TransformationConfigTest {
connProps.put("transforms.example.type", ValueToKey.class.getName());
connProps.put("transforms.example.fields", "field");
Plugins plugins = null; // Safe when we're only constructing the config
Plugins plugins = setupMockPlugins(new ValueToKey());
new ConnectorConfig(plugins, connProps);
}