KAFKA-18863: Connect Multiversion Support (Versioned Connector Creation and related changes) (#17743)

Reviewers: Greg Harris <greg.harris@aiven.io>
This commit is contained in:
snehashisp 2025-02-27 04:42:34 +05:30 committed by GitHub
parent 36f19057e1
commit 9dc9973c1c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 403 additions and 212 deletions

View File

@ -43,6 +43,7 @@ import org.apache.kafka.connect.runtime.errors.ProcessingContext;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
@ -70,6 +71,7 @@ import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
@ -233,11 +235,12 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask<SourceRecord,
RetryWithToleranceOperator<SourceRecord> retryWithToleranceOperator,
StatusBackingStore statusBackingStore,
Executor closeExecutor,
Supplier<List<ErrorReporter<SourceRecord>>> errorReportersSupplier) {
Supplier<List<ErrorReporter<SourceRecord>>> errorReportersSupplier,
Function<ClassLoader, LoaderSwap> pluginLoaderSwapper) {
super(id, statusListener, initialState, loader, connectMetrics, errorMetrics,
retryWithToleranceOperator, transformationChain, errorReportersSupplier,
time, statusBackingStore);
time, statusBackingStore, pluginLoaderSwapper);
this.workerConfig = workerConfig;
this.task = task;
@ -491,11 +494,17 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask<SourceRecord,
RecordHeaders headers = retryWithToleranceOperator.execute(context, () -> convertHeaderFor(record), Stage.HEADER_CONVERTER, headerConverterPlugin.get().getClass());
byte[] key = retryWithToleranceOperator.execute(context, () -> keyConverterPlugin.get().fromConnectData(record.topic(), headers, record.keySchema(), record.key()),
Stage.KEY_CONVERTER, keyConverterPlugin.get().getClass());
byte[] key = retryWithToleranceOperator.execute(context, () -> {
try (LoaderSwap swap = pluginLoaderSwapper.apply(keyConverterPlugin.get().getClass().getClassLoader())) {
return keyConverterPlugin.get().fromConnectData(record.topic(), headers, record.keySchema(), record.key());
}
}, Stage.KEY_CONVERTER, keyConverterPlugin.get().getClass());
byte[] value = retryWithToleranceOperator.execute(context, () -> valueConverterPlugin.get().fromConnectData(record.topic(), headers, record.valueSchema(), record.value()),
Stage.VALUE_CONVERTER, valueConverterPlugin.get().getClass());
byte[] value = retryWithToleranceOperator.execute(context, () -> {
try (LoaderSwap swap = pluginLoaderSwapper.apply(valueConverterPlugin.get().getClass().getClassLoader())) {
return valueConverterPlugin.get().fromConnectData(record.topic(), headers, record.valueSchema(), record.value());
}
}, Stage.VALUE_CONVERTER, valueConverterPlugin.get().getClass());
if (context.failed()) {
return null;
@ -551,8 +560,11 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask<SourceRecord,
String topic = record.topic();
for (Header header : headers) {
String key = header.key();
byte[] rawHeader = headerConverterPlugin.get().fromConnectHeader(topic, key, header.schema(), header.value());
result.add(key, rawHeader);
try (LoaderSwap swap = pluginLoaderSwapper.apply(headerConverterPlugin.get().getClass().getClassLoader())) {
byte[] rawHeader = headerConverterPlugin.get().fromConnectHeader(topic, key, header.schema(), header.value());
result.add(key, rawHeader);
}
}
}
return result;

View File

@ -362,7 +362,7 @@ public class ConnectorConfig extends AbstractConfig {
* {@link Transformation transformations} and {@link Predicate predicates}
* as they are specified in the {@link #TRANSFORMS_CONFIG} and {@link #PREDICATES_CONFIG}
*/
public <R extends ConnectRecord<R>> List<TransformationStage<R>> transformationStages(ConnectorTaskId connectorTaskId, ConnectMetrics metrics) {
public <R extends ConnectRecord<R>> List<TransformationStage<R>> transformationStages(Plugins plugins, ConnectorTaskId connectorTaskId, ConnectMetrics metrics) {
final List<String> transformAliases = getList(TRANSFORMS_CONFIG);
final List<TransformationStage<R>> transformations = new ArrayList<>(transformAliases.size());
@ -370,8 +370,9 @@ public class ConnectorConfig extends AbstractConfig {
final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
try {
@SuppressWarnings("unchecked")
final Transformation<R> transformation = Utils.newInstance(getClass(prefix + "type"), Transformation.class);
final String typeConfig = prefix + "type";
final String versionConfig = prefix + WorkerConfig.PLUGIN_VERSION_SUFFIX;
@SuppressWarnings("unchecked") final Transformation<R> transformation = getTransformationOrPredicate(plugins, typeConfig, versionConfig);
Map<String, Object> configs = originalsWithPrefix(prefix);
Object predicateAlias = configs.remove(TransformationStage.PREDICATE_CONFIG);
Object negate = configs.remove(TransformationStage.NEGATE_CONFIG);
@ -379,13 +380,15 @@ public class ConnectorConfig extends AbstractConfig {
Plugin<Transformation<R>> transformationPlugin = metrics.wrap(transformation, connectorTaskId, alias);
if (predicateAlias != null) {
String predicatePrefix = PREDICATES_PREFIX + predicateAlias + ".";
final String predicateTypeConfig = predicatePrefix + "type";
final String predicateVersionConfig = predicatePrefix + WorkerConfig.PLUGIN_VERSION_SUFFIX;
@SuppressWarnings("unchecked")
Predicate<R> predicate = Utils.newInstance(getClass(predicatePrefix + "type"), Predicate.class);
Predicate<R> predicate = getTransformationOrPredicate(plugins, predicateTypeConfig, predicateVersionConfig);
predicate.configure(originalsWithPrefix(predicatePrefix));
Plugin<Predicate<R>> predicatePlugin = metrics.wrap(predicate, connectorTaskId, (String) predicateAlias);
transformations.add(new TransformationStage<>(predicatePlugin, negate != null && Boolean.parseBoolean(negate.toString()), transformationPlugin));
transformations.add(new TransformationStage<>(predicatePlugin, negate != null && Boolean.parseBoolean(negate.toString()), transformationPlugin, plugins.safeLoaderSwapper()));
} else {
transformations.add(new TransformationStage<>(transformationPlugin));
transformations.add(new TransformationStage<>(transformationPlugin, plugins.safeLoaderSwapper()));
}
} catch (Exception e) {
throw new ConnectException(e);
@ -395,6 +398,17 @@ public class ConnectorConfig extends AbstractConfig {
return transformations;
}
@SuppressWarnings("unchecked")
private <T> T getTransformationOrPredicate(Plugins plugins, String classConfig, String versionConfig) {
try {
VersionRange range = PluginUtils.connectorVersionRequirement(getString(versionConfig));
VersionRange connectorRange = PluginUtils.connectorVersionRequirement(getString(CONNECTOR_VERSION));
return (T) plugins.newPlugin(getClass(classConfig).getName(), range, plugins.pluginLoader(getString(CONNECTOR_CLASS_CONFIG), connectorRange));
} catch (Exception e) {
throw new ConnectException(e);
}
}
/**
* Returns an enriched {@link ConfigDef} building upon the {@code ConfigDef}, using the current configuration specified in {@code props} as an input.
* <p>

View File

@ -32,6 +32,7 @@ import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.ProcessingContext;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTask.TransactionBoundary;
@ -57,6 +58,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
@ -101,11 +103,12 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask {
Executor closeExecutor,
Runnable preProducerCheck,
Runnable postProducerCheck,
Supplier<List<ErrorReporter<SourceRecord>>> errorReportersSupplier) {
Supplier<List<ErrorReporter<SourceRecord>>> errorReportersSupplier,
Function<ClassLoader, LoaderSwap> pluginLoaderSwapper) {
super(id, task, statusListener, initialState, configState, keyConverterPlugin, valueConverterPlugin, headerConverterPlugin, transformationChain,
buildTransactionContext(sourceConfig),
producer, admin, topicGroups, offsetReader, offsetWriter, offsetStore, workerConfig, connectMetrics, errorMetrics,
loader, time, retryWithToleranceOperator, statusBackingStore, closeExecutor, errorReportersSupplier);
loader, time, retryWithToleranceOperator, statusBackingStore, closeExecutor, errorReportersSupplier, pluginLoaderSwapper);
this.transactionOpen = false;
this.committableRecords = new LinkedHashMap<>();

View File

@ -20,9 +20,12 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import java.util.function.Function;
/**
* Wrapper for a {@link Transformation} and corresponding optional {@link Predicate}
* which applies the transformation when the {@link Predicate} is true (or false, according to {@code negate}).
@ -36,15 +39,18 @@ public class TransformationStage<R extends ConnectRecord<R>> implements AutoClos
private final Plugin<Predicate<R>> predicatePlugin;
private final Plugin<Transformation<R>> transformationPlugin;
private final boolean negate;
private final Function<ClassLoader, LoaderSwap> pluginLoaderSwapper;
TransformationStage(Plugin<Transformation<R>> transformationPlugin) {
this(null, false, transformationPlugin);
TransformationStage(Plugin<Transformation<R>> transformationPlugin, Function<ClassLoader, LoaderSwap> pluginLoaderSwapper) {
this(null, false, transformationPlugin, pluginLoaderSwapper);
}
TransformationStage(Plugin<Predicate<R>> predicatePlugin, boolean negate, Plugin<Transformation<R>> transformationPlugin) {
TransformationStage(Plugin<Predicate<R>> predicatePlugin, boolean negate, Plugin<Transformation<R>> transformationPlugin, Function<ClassLoader, LoaderSwap> pluginLoaderSwapper) {
this.predicatePlugin = predicatePlugin;
this.negate = negate;
this.transformationPlugin = transformationPlugin;
this.pluginLoaderSwapper = pluginLoaderSwapper;
}
public Class<? extends Transformation<R>> transformClass() {
@ -54,8 +60,17 @@ public class TransformationStage<R extends ConnectRecord<R>> implements AutoClos
}
public R apply(R record) {
if (predicatePlugin == null || predicatePlugin.get() == null || negate ^ predicatePlugin.get().test(record)) {
return transformationPlugin.get().apply(record);
Predicate<R> predicate = predicatePlugin != null ? predicatePlugin.get() : null;
boolean shouldTransform = predicate == null;
if (predicate != null) {
try (LoaderSwap swap = pluginLoaderSwapper.apply(predicate.getClass().getClassLoader())) {
shouldTransform = negate ^ predicate.test(record);
}
}
if (shouldTransform) {
try (LoaderSwap swap = pluginLoaderSwapper.apply(transformationPlugin.get().getClass().getClassLoader())) {
record = transformationPlugin.get().apply(record);
}
}
return record;
}

View File

@ -67,8 +67,10 @@ import org.apache.kafka.connect.runtime.errors.LogReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.runtime.isolation.PluginUtils;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
import org.apache.kafka.connect.runtime.isolation.VersionedPluginLoadingException;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
@ -99,6 +101,7 @@ import org.apache.kafka.connect.util.SinkUtils;
import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreationGroup;
import org.apache.maven.artifact.versioning.InvalidVersionSpecificationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -314,32 +317,38 @@ public final class Worker {
final WorkerConnector workerConnector;
final String connClass = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
ClassLoader connectorLoader = plugins.connectorLoader(connClass);
try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) {
log.info("Creating connector {} of type {}", connName, connClass);
final Connector connector = plugins.newConnector(connClass);
final ConnectorConfig connConfig;
final CloseableOffsetStorageReader offsetReader;
final ConnectorOffsetBackingStore offsetStore;
if (ConnectUtils.isSinkConnector(connector)) {
connConfig = new SinkConnectorConfig(plugins, connProps);
offsetReader = null;
offsetStore = null;
} else {
SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connProps, config.topicCreationEnable());
connConfig = sourceConfig;
final ClassLoader connectorLoader;
// Set up the offset backing store for this connector instance
offsetStore = config.exactlyOnceSourceEnabled()
try {
connectorLoader = connectorClassLoader(connProps);
try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) {
log.info("Creating connector {} of type {}", connName, connClass);
final Connector connector = instantiateConnector(connProps);
final ConnectorConfig connConfig;
final CloseableOffsetStorageReader offsetReader;
final ConnectorOffsetBackingStore offsetStore;
if (ConnectUtils.isSinkConnector(connector)) {
connConfig = new SinkConnectorConfig(plugins, connProps);
offsetReader = null;
offsetStore = null;
} else {
SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connProps, config.topicCreationEnable());
connConfig = sourceConfig;
// Set up the offset backing store for this connector instance
offsetStore = config.exactlyOnceSourceEnabled()
? offsetStoreForExactlyOnceSourceConnector(sourceConfig, connName, connector, null)
: offsetStoreForRegularSourceConnector(sourceConfig, connName, connector, null);
offsetStore.configure(config);
offsetReader = new OffsetStorageReaderImpl(offsetStore, connName, internalKeyConverter, internalValueConverter);
}
workerConnector = new WorkerConnector(
offsetStore.configure(config);
offsetReader = new OffsetStorageReaderImpl(offsetStore, connName, internalKeyConverter, internalValueConverter);
}
workerConnector = new WorkerConnector(
connName, connector, connConfig, ctx, metrics, connectorStatusListener, offsetReader, offsetStore, connectorLoader);
log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connector.getClass());
workerConnector.transitionTo(initialState, onConnectorStateChange);
log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connector.getClass());
workerConnector.transitionTo(initialState, onConnectorStateChange);
}
} catch (Throwable t) {
log.error("Failed to start connector {}", connName, t);
connectorStatusListener.onFailure(connName, t);
@ -655,51 +664,51 @@ public final class Worker {
throw new ConnectException("Task already exists in this worker: " + id);
connectorStatusMetricsGroup.recordTaskAdded(id);
String connType = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
ClassLoader connectorLoader = plugins.connectorLoader(connType);
try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) {
final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps);
final ClassLoader connectorLoader;
try {
connectorLoader = connectorClassLoader(connProps);
try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) {
final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps);
int maxTasks = connConfig.tasksMax();
int numTasks = configState.taskCount(id.connector());
checkTasksMax(id.connector(), numTasks, maxTasks, connConfig.enforceTasksMax());
int maxTasks = connConfig.tasksMax();
int numTasks = configState.taskCount(id.connector());
checkTasksMax(id.connector(), numTasks, maxTasks, connConfig.enforceTasksMax());
final TaskConfig taskConfig = new TaskConfig(taskProps);
final Class<? extends Task> taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
final Task task = plugins.newTask(taskClass);
log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName());
final TaskConfig taskConfig = new TaskConfig(taskProps);
final Class<? extends Task> taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
final Task task = plugins.newTask(taskClass);
log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName());
// By maintaining connector's specific class loader for this thread here, we first
// search for converters within the connector dependencies.
// If any of these aren't found, that means the connector didn't configure specific converters,
// so we should instantiate based upon the worker configuration
Converter keyConverter = plugins.newConverter(connConfig, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage
.CURRENT_CLASSLOADER);
Converter valueConverter = plugins.newConverter(connConfig, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.CURRENT_CLASSLOADER);
HeaderConverter headerConverter = plugins.newHeaderConverter(connConfig, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
ClassLoaderUsage.CURRENT_CLASSLOADER);
if (keyConverter == null) {
keyConverter = plugins.newConverter(config, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), id);
} else {
log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), id);
}
if (valueConverter == null) {
valueConverter = plugins.newConverter(config, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
log.info("Set up the value converter {} for task {} using the worker config", valueConverter.getClass(), id);
} else {
log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), id);
}
if (headerConverter == null) {
headerConverter = plugins.newHeaderConverter(config, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, ClassLoaderUsage
.PLUGINS);
log.info("Set up the header converter {} for task {} using the worker config", headerConverter.getClass(), id);
} else {
log.info("Set up the header converter {} for task {} using the connector config", headerConverter.getClass(), id);
}
workerTask = taskBuilder
// By maintaining connector's specific class loader for this thread here, we first
// search for converters within the connector dependencies.
// If any of these aren't found, that means the connector didn't configure specific converters,
// so we should instantiate based upon the worker configuration
Converter keyConverter = plugins.newConverter(connConfig, ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG);
Converter valueConverter = plugins.newConverter(connConfig, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG);
HeaderConverter headerConverter = plugins.newHeaderConverter(connConfig, ConnectorConfig.HEADER_CONVERTER_CLASS_CONFIG, ConnectorConfig.HEADER_CONVERTER_VERSION_CONFIG);
if (keyConverter == null) {
keyConverter = plugins.newConverter(config, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, WorkerConfig.KEY_CONVERTER_VERSION);
log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), id);
} else {
log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), id);
}
if (valueConverter == null) {
valueConverter = plugins.newConverter(config, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, WorkerConfig.VALUE_CONVERTER_VERSION);
log.info("Set up the value converter {} for task {} using the worker config", valueConverter.getClass(), id);
} else {
log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), id);
}
if (headerConverter == null) {
headerConverter = plugins.newHeaderConverter(config, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, WorkerConfig.HEADER_CONVERTER_VERSION);
log.info("Set up the header converter {} for task {} using the worker config", headerConverter.getClass(), id);
} else {
log.info("Set up the header converter {} for task {} using the connector config", headerConverter.getClass(), id);
}
workerTask = taskBuilder
.withTask(task)
.withConnectorConfig(connConfig)
.withKeyConverterPlugin(metrics.wrap(keyConverter, id, true))
@ -708,7 +717,8 @@ public final class Worker {
.withClassloader(connectorLoader)
.build();
workerTask.initialize(taskConfig);
workerTask.initialize(taskConfig);
}
} catch (Throwable t) {
log.error("Failed to start task {}", id, t);
connectorStatusMetricsGroup.recordTaskRemoved(id);
@ -739,19 +749,17 @@ public final class Worker {
public KafkaFuture<Void> fenceZombies(String connName, int numTasks, Map<String, String> connProps) {
log.debug("Fencing out {} task producers for source connector {}", numTasks, connName);
try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) {
String connType = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
ClassLoader connectorLoader = plugins.connectorLoader(connType);
try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) {
Class<? extends Connector> connectorClass = connectorClass(connProps);
ClassLoader classLoader = connectorClassLoader(connProps);
try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
final SourceConnectorConfig connConfig = new SourceConnectorConfig(plugins, connProps, config.topicCreationEnable());
final Class<? extends Connector> connClass = plugins.connectorClass(
connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
Map<String, Object> adminConfig = adminConfigs(
connName,
"connector-worker-adminclient-" + connName,
config,
connConfig,
connClass,
connectorClass,
connectorClientConfigOverridePolicy,
kafkaClusterId,
ConnectorType.SOURCE);
@ -1198,11 +1206,9 @@ public final class Worker {
* @param cb callback to invoke upon completion of the request
*/
public void connectorOffsets(String connName, Map<String, String> connectorConfig, Callback<ConnectorOffsets> cb) {
String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias);
Connector connector = instantiateConnector(connectorConfig);
ClassLoader connectorLoader = connectorClassLoader(connectorConfig);
try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) {
Connector connector = plugins.newConnector(connectorClassOrAlias);
if (ConnectUtils.isSinkConnector(connector)) {
log.debug("Fetching offsets for sink connector: {}", connName);
sinkConnectorOffsets(connName, connector, connectorConfig, cb);
@ -1213,6 +1219,43 @@ public final class Worker {
}
}
private Connector instantiateConnector(Map<String, String> connProps) throws ConnectException {
final String klass = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
final String version = connProps.get(ConnectorConfig.CONNECTOR_VERSION);
try {
return plugins.newConnector(klass, PluginUtils.connectorVersionRequirement(version));
} catch (InvalidVersionSpecificationException | VersionedPluginLoadingException e) {
throw new ConnectException(
String.format("Failed to instantiate class for connector %s, class %s", klass, connProps.get(ConnectorConfig.NAME_CONFIG)), e);
}
}
private ClassLoader connectorClassLoader(Map<String, String> connProps) throws ConnectException {
final String klass = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
final String version = connProps.get(ConnectorConfig.CONNECTOR_VERSION);
try {
return plugins.pluginLoader(klass, PluginUtils.connectorVersionRequirement(version));
} catch (InvalidVersionSpecificationException | VersionedPluginLoadingException e) {
throw new ConnectException(
String.format("Failed to get class loader for connector %s, class %s", klass, connProps.get(ConnectorConfig.NAME_CONFIG)), e);
}
}
private Class<? extends Connector> connectorClass(Map<String, String> connProps) throws ConnectException {
final String klass = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
final String version = connProps.get(ConnectorConfig.CONNECTOR_VERSION);
try {
return plugins.connectorClass(klass, PluginUtils.connectorVersionRequirement(version));
} catch (InvalidVersionSpecificationException | VersionedPluginLoadingException e) {
throw new ConnectException(
String.format("Failed to get class for connector %s, class %s", klass, connProps.get(ConnectorConfig.NAME_CONFIG)), e);
}
}
/**
* Get the current consumer group offsets for a sink connector.
* <p>
@ -1311,12 +1354,10 @@ public final class Worker {
*/
public void modifyConnectorOffsets(String connName, Map<String, String> connectorConfig,
Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb) {
String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias);
Connector connector;
final Connector connector = instantiateConnector(connectorConfig);
ClassLoader connectorLoader = connectorClassLoader(connectorConfig);
try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) {
connector = plugins.newConnector(connectorClassOrAlias);
if (ConnectUtils.isSinkConnector(connector)) {
log.debug("Modifying offsets for sink connector: {}", connName);
modifySinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb);
@ -1791,13 +1832,12 @@ public final class Worker {
Objects.requireNonNull(classLoader, "Classloader used by task cannot be null");
ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id);
final Class<? extends Connector> connectorClass = plugins.connectorClass(
connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
final Class<? extends Connector> connectorClass = connectorClass(connectorConfig.originalsStrings());
RetryWithToleranceOperator<T> retryWithToleranceOperator = new RetryWithToleranceOperator<>(connectorConfig.errorRetryTimeout(),
connectorConfig.errorMaxDelayInMillis(), connectorConfig.errorToleranceType(), Time.SYSTEM, errorHandlingMetrics);
TransformationChain<T, R> transformationChain = new TransformationChain<>(connectorConfig.<R>transformationStages(id, metrics), retryWithToleranceOperator);
TransformationChain<T, R> transformationChain = new TransformationChain<>(connectorConfig.<R>transformationStages(plugins, id, metrics), retryWithToleranceOperator);
log.info("Initializing: {}", transformationChain);
return doBuild(task, id, configState, statusListener, initialState,
@ -1862,7 +1902,7 @@ public final class Worker {
return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverterPlugin,
valueConverterPlugin, errorHandlingMetrics, headerConverterPlugin, transformationChain, consumer, classLoader, time,
retryWithToleranceOperator, workerErrantRecordReporter, herder.statusBackingStore(),
() -> sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
() -> sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass), plugins.safeLoaderSwapper());
}
}
@ -1922,7 +1962,7 @@ public final class Worker {
return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverterPlugin, valueConverterPlugin, errorHandlingMetrics,
headerConverterPlugin, transformationChain, producer, topicAdmin, topicCreationGroups,
offsetReader, offsetWriter, offsetStore, config, configState, metrics, classLoader, time,
retryWithToleranceOperator, herder.statusBackingStore(), executor, () -> sourceTaskReporters(id, sourceConfig, errorHandlingMetrics));
retryWithToleranceOperator, herder.statusBackingStore(), executor, () -> sourceTaskReporters(id, sourceConfig, errorHandlingMetrics), plugins.safeLoaderSwapper());
}
}
@ -1987,7 +2027,7 @@ public final class Worker {
headerConverterPlugin, transformationChain, producer, topicAdmin, topicCreationGroups,
offsetReader, offsetWriter, offsetStore, config, configState, metrics, errorHandlingMetrics, classLoader, time, retryWithToleranceOperator,
herder.statusBackingStore(), sourceConfig, executor, preProducerCheck, postProducerCheck,
() -> sourceTaskReporters(id, sourceConfig, errorHandlingMetrics));
() -> sourceTaskReporters(id, sourceConfig, errorHandlingMetrics), plugins.safeLoaderSwapper());
}
}

View File

@ -47,6 +47,7 @@ import org.apache.kafka.connect.runtime.errors.ProcessingContext;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.ClusterConfigState;
@ -66,6 +67,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -122,9 +124,10 @@ class WorkerSinkTask extends WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
RetryWithToleranceOperator<ConsumerRecord<byte[], byte[]>> retryWithToleranceOperator,
WorkerErrantRecordReporter workerErrantRecordReporter,
StatusBackingStore statusBackingStore,
Supplier<List<ErrorReporter<ConsumerRecord<byte[], byte[]>>>> errorReportersSupplier) {
Supplier<List<ErrorReporter<ConsumerRecord<byte[], byte[]>>>> errorReportersSupplier,
Function<ClassLoader, LoaderSwap> pluginLoaderSwapper) {
super(id, statusListener, initialState, loader, connectMetrics, errorMetrics,
retryWithToleranceOperator, transformationChain, errorReportersSupplier, time, statusBackingStore);
retryWithToleranceOperator, transformationChain, errorReportersSupplier, time, statusBackingStore, pluginLoaderSwapper);
this.workerConfig = workerConfig;
this.task = task;
@ -539,11 +542,17 @@ class WorkerSinkTask extends WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
}
private SinkRecord convertAndTransformRecord(ProcessingContext<ConsumerRecord<byte[], byte[]>> context, final ConsumerRecord<byte[], byte[]> msg) {
SchemaAndValue keyAndSchema = retryWithToleranceOperator.execute(context, () -> keyConverterPlugin.get().toConnectData(msg.topic(), msg.headers(), msg.key()),
Stage.KEY_CONVERTER, keyConverterPlugin.get().getClass());
SchemaAndValue keyAndSchema = retryWithToleranceOperator.execute(context, () -> {
try (LoaderSwap swap = pluginLoaderSwapper.apply(keyConverterPlugin.get().getClass().getClassLoader())) {
return keyConverterPlugin.get().toConnectData(msg.topic(), msg.headers(), msg.key());
}
}, Stage.KEY_CONVERTER, keyConverterPlugin.get().getClass());
SchemaAndValue valueAndSchema = retryWithToleranceOperator.execute(context, () -> valueConverterPlugin.get().toConnectData(msg.topic(), msg.headers(), msg.value()),
Stage.VALUE_CONVERTER, valueConverterPlugin.get().getClass());
SchemaAndValue valueAndSchema = retryWithToleranceOperator.execute(context, () -> {
try (LoaderSwap swap = pluginLoaderSwapper.apply(valueConverterPlugin.get().getClass().getClassLoader())) {
return valueConverterPlugin.get().toConnectData(msg.topic(), msg.headers(), msg.value());
}
}, Stage.VALUE_CONVERTER, valueConverterPlugin.get().getClass());
Headers headers = retryWithToleranceOperator.execute(context, () -> convertHeadersFor(msg), Stage.HEADER_CONVERTER, headerConverterPlugin.get().getClass());
@ -580,8 +589,10 @@ class WorkerSinkTask extends WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
if (recordHeaders != null) {
String topic = record.topic();
for (org.apache.kafka.common.header.Header recordHeader : recordHeaders) {
SchemaAndValue schemaAndValue = headerConverterPlugin.get().toConnectHeader(topic, recordHeader.key(), recordHeader.value());
result.add(recordHeader.key(), schemaAndValue);
try (LoaderSwap swap = pluginLoaderSwapper.apply(headerConverterPlugin.get().getClass().getClassLoader())) {
SchemaAndValue schemaAndValue = headerConverterPlugin.get().toConnectHeader(topic, recordHeader.key(), recordHeader.value());
result.add(recordHeader.key(), schemaAndValue);
}
}
}
return result;

View File

@ -28,6 +28,7 @@ import org.apache.kafka.connect.runtime.errors.ProcessingContext;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
@ -53,6 +54,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import static org.apache.kafka.connect.runtime.SubmittedRecords.CommittableOffsets;
@ -91,12 +93,13 @@ class WorkerSourceTask extends AbstractWorkerSourceTask {
RetryWithToleranceOperator<SourceRecord> retryWithToleranceOperator,
StatusBackingStore statusBackingStore,
Executor closeExecutor,
Supplier<List<ErrorReporter<SourceRecord>>> errorReportersSupplier) {
Supplier<List<ErrorReporter<SourceRecord>>> errorReportersSupplier,
Function<ClassLoader, LoaderSwap> pluginLoaderSwapper) {
super(id, task, statusListener, initialState, configState, keyConverterPlugin, valueConverterPlugin, headerConverterPlugin, transformationChain,
null, producer,
admin, topicGroups, offsetReader, offsetWriter, offsetStore, workerConfig, connectMetrics, errorMetrics, loader,
time, retryWithToleranceOperator, statusBackingStore, closeExecutor, errorReportersSupplier);
time, retryWithToleranceOperator, statusBackingStore, closeExecutor, errorReportersSupplier, pluginLoaderSwapper);
this.committableOffsets = CommittableOffsets.EMPTY;
this.submittedRecords = new SubmittedRecords();

View File

@ -33,6 +33,7 @@ import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.LoggingContext;
@ -44,6 +45,7 @@ import java.util.List;
import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
/**
@ -78,6 +80,7 @@ abstract class WorkerTask<T, R extends ConnectRecord<R>> implements Runnable {
protected final RetryWithToleranceOperator<T> retryWithToleranceOperator;
protected final TransformationChain<T, R> transformationChain;
private final Supplier<List<ErrorReporter<T>>> errorReportersSupplier;
protected final Function<ClassLoader, LoaderSwap> pluginLoaderSwapper;
protected final PluginMetricsImpl pluginMetrics;
public WorkerTask(ConnectorTaskId id,
@ -90,7 +93,8 @@ abstract class WorkerTask<T, R extends ConnectRecord<R>> implements Runnable {
TransformationChain<T, R> transformationChain,
Supplier<List<ErrorReporter<T>>> errorReportersSupplier,
Time time,
StatusBackingStore statusBackingStore) {
StatusBackingStore statusBackingStore,
Function<ClassLoader, LoaderSwap> pluginLoaderSwapper) {
this.id = id;
this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, statusListener);
this.errorMetrics = errorMetrics;
@ -106,6 +110,7 @@ abstract class WorkerTask<T, R extends ConnectRecord<R>> implements Runnable {
this.errorReportersSupplier = errorReportersSupplier;
this.time = time;
this.statusBackingStore = statusBackingStore;
this.pluginLoaderSwapper = pluginLoaderSwapper;
this.pluginMetrics = connectMetrics.taskPluginMetrics(id);
}

View File

@ -549,7 +549,7 @@ public class Plugins {
}
private HeaderConverter newHeaderConverter(AbstractConfig config, String classPropertyName, String versionPropertyName, ClassLoaderUsage classLoaderUsage) {
if (!config.originals().containsKey(classPropertyName) && classLoaderUsage == ClassLoaderUsage.CURRENT_CLASSLOADER) {
if (config.getClass(classPropertyName) == null && classLoaderUsage == ClassLoaderUsage.CURRENT_CLASSLOADER) {
// This configuration does not define the Header Converter via the specified property name
return null;
}
@ -602,7 +602,7 @@ public class Plugins {
// if the config specifies the class name, use it, otherwise use the default which we can get from config.getClass
String classOrAlias = config.originalsStrings().get(classPropertyName);
if (classOrAlias == null) {
classOrAlias = config.getClass(classPropertyName).getName();
classOrAlias = config.getClass(classPropertyName) == null ? null : config.getClass(classPropertyName).getName();
}
try {
klass = pluginClass(delegatingLoader, classOrAlias, basePluginClass, range);

View File

@ -43,6 +43,7 @@ import org.apache.kafka.connect.runtime.errors.ProcessingContext;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.TestPlugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
@ -966,7 +967,7 @@ public class AbstractWorkerSourceTaskTest {
taskId, sourceTask, statusListener, TargetState.STARTED, configState, keyConverterPlugin, valueConverterPlugin, headerConverterPlugin, transformationChain,
workerTransactionContext, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, offsetStore,
config, metrics, errorHandlingMetrics, plugins.delegatingLoader(), Time.SYSTEM, retryWithToleranceOperator,
statusBackingStore, Runnable::run, errorReportersSupplier) {
statusBackingStore, Runnable::run, errorReportersSupplier, TestPlugins.noOpLoaderSwap()) {
@Override
protected void prepareToInitializeTask() {
}

View File

@ -163,7 +163,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
props.put("transforms.a.type", SimpleTransformation.class.getName());
props.put("transforms.a.magic.number", "42");
final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props);
final List<TransformationStage<SinkRecord>> transformationStages = config.transformationStages(CONNECTOR_TASK_ID, METRICS);
final List<TransformationStage<SinkRecord>> transformationStages = config.transformationStages(MOCK_PLUGINS, CONNECTOR_TASK_ID, METRICS);
assertEquals(1, transformationStages.size());
final TransformationStage<SinkRecord> stage = transformationStages.get(0);
assertEquals(SimpleTransformation.class, stage.transformClass());
@ -192,7 +192,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
props.put("transforms.b.type", SimpleTransformation.class.getName());
props.put("transforms.b.magic.number", "84");
final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props);
final List<TransformationStage<SinkRecord>> transformationStages = config.transformationStages(CONNECTOR_TASK_ID, METRICS);
final List<TransformationStage<SinkRecord>> transformationStages = config.transformationStages(MOCK_PLUGINS, CONNECTOR_TASK_ID, METRICS);
assertEquals(2, transformationStages.size());
assertEquals(42, transformationStages.get(0).apply(DUMMY_RECORD).kafkaPartition().intValue());
assertEquals(84, transformationStages.get(1).apply(DUMMY_RECORD).kafkaPartition().intValue());
@ -293,7 +293,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
private void assertTransformationStageWithPredicate(Map<String, String> props, boolean expectedNegated) {
final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props);
final List<TransformationStage<SinkRecord>> transformationStages = config.transformationStages(CONNECTOR_TASK_ID, METRICS);
final List<TransformationStage<SinkRecord>> transformationStages = config.transformationStages(MOCK_PLUGINS, CONNECTOR_TASK_ID, METRICS);
assertEquals(1, transformationStages.size());
TransformationStage<SinkRecord> stage = transformationStages.get(0);

View File

@ -44,6 +44,7 @@ import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.TestPlugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
@ -427,7 +428,7 @@ public class ErrorHandlingTaskTest {
Plugin<Transformation<SinkRecord>> transformationPlugin = metrics.wrap(new FaultyPassthrough<SinkRecord>(), taskId, "");
TransformationChain<ConsumerRecord<byte[], byte[]>, SinkRecord> sinkTransforms =
new TransformationChain<>(singletonList(new TransformationStage<>(transformationPlugin)), retryWithToleranceOperator);
new TransformationChain<>(singletonList(new TransformationStage<>(transformationPlugin, TestPlugins.noOpLoaderSwap())), retryWithToleranceOperator);
Plugin<Converter> keyConverterPlugin = metrics.wrap(converter, taskId, true);
Plugin<Converter> valueConverterPlugin = metrics.wrap(converter, taskId, false);
@ -437,7 +438,7 @@ public class ErrorHandlingTaskTest {
ClusterConfigState.EMPTY, metrics, keyConverterPlugin, valueConverterPlugin, errorHandlingMetrics,
headerConverterPlugin, sinkTransforms, consumer, pluginLoader, time,
retryWithToleranceOperator, workerErrantRecordReporter,
statusBackingStore, () -> errorReporters);
statusBackingStore, () -> errorReporters, TestPlugins.noOpLoaderSwap());
}
private void createSourceTask(TargetState initialState, RetryWithToleranceOperator<SourceRecord> retryWithToleranceOperator, List<ErrorReporter<SourceRecord>> errorReporters) {
@ -463,7 +464,7 @@ public class ErrorHandlingTaskTest {
List<ErrorReporter<SourceRecord>> errorReporters, Converter converter) {
Plugin<Transformation<SourceRecord>> transformationPlugin = metrics.wrap(new FaultyPassthrough<SourceRecord>(), taskId, "");
TransformationChain<SourceRecord, SourceRecord> sourceTransforms = new TransformationChain<>(singletonList(
new TransformationStage<>(transformationPlugin)), retryWithToleranceOperator);
new TransformationStage<>(transformationPlugin, TestPlugins.noOpLoaderSwap())), retryWithToleranceOperator);
Plugin<Converter> keyConverterPlugin = metrics.wrap(converter, taskId, true);
Plugin<Converter> valueConverterPlugin = metrics.wrap(converter, taskId, false);
@ -476,7 +477,7 @@ public class ErrorHandlingTaskTest {
offsetReader, offsetWriter, offsetStore, workerConfig,
ClusterConfigState.EMPTY, metrics, pluginLoader, time,
retryWithToleranceOperator,
statusBackingStore, Runnable::run, () -> errorReporters));
statusBackingStore, Runnable::run, () -> errorReporters, TestPlugins.noOpLoaderSwap()));
}

View File

@ -36,6 +36,7 @@ import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.TestPlugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
@ -283,7 +284,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
workerTask = new ExactlyOnceWorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverterPlugin, valueConverterPlugin, headerConverterPlugin,
transformationChain, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, offsetStore,
config, clusterConfigState, metrics, errorHandlingMetrics, plugins.delegatingLoader(), time, RetryWithToleranceOperatorTest.noneOperator(), statusBackingStore,
sourceConfig, Runnable::run, preProducerCheck, postProducerCheck, Collections::emptyList);
sourceConfig, Runnable::run, preProducerCheck, postProducerCheck, Collections::emptyList, TestPlugins.noOpLoaderSwap());
}
@ParameterizedTest

View File

@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.connect.runtime.isolation.TestPlugins;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
@ -61,7 +62,9 @@ public class TransformationStageTest {
TransformationStage<SourceRecord> stage = new TransformationStage<>(
predicatePlugin,
negate,
transformationPlugin);
transformationPlugin,
TestPlugins.noOpLoaderSwap()
);
assertEquals(expectedResult, stage.apply(initial));

View File

@ -48,6 +48,7 @@ import org.apache.kafka.connect.runtime.errors.ProcessingContext;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.TestPlugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
@ -228,7 +229,7 @@ public class WorkerSinkTaskTest {
taskId, task, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, connectMetrics,
keyConverterPlugin, valueConverterPlugin, errorMetrics, headerConverterPlugin,
transformationChain, consumer, loader, time,
retryWithToleranceOperator, null, statusBackingStore, errorReportersSupplier);
retryWithToleranceOperator, null, statusBackingStore, errorReportersSupplier, TestPlugins.noOpLoaderSwap());
}
@AfterEach

View File

@ -35,6 +35,7 @@ import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ProcessingContext;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.TestPlugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
@ -182,7 +183,7 @@ public class WorkerSinkTaskThreadedTest {
taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, keyConverterPlugin,
valueConverterPlugin, errorHandlingMetrics, headerConverterPlugin, transformationChain,
consumer, pluginLoader, time, RetryWithToleranceOperatorTest.noneOperator(), null, statusBackingStore,
Collections::emptyList);
Collections::emptyList, TestPlugins.noOpLoaderSwap());
recordsReturned = 0;
}

View File

@ -38,6 +38,7 @@ import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.TestPlugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
@ -254,7 +255,7 @@ public class WorkerSourceTaskTest {
workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverterPlugin, valueConverterPlugin, errorHandlingMetrics, headerConverterPlugin,
transformationChain, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig),
offsetReader, offsetWriter, offsetStore, config, clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM,
retryWithToleranceOperator, statusBackingStore, Runnable::run, Collections::emptyList);
retryWithToleranceOperator, statusBackingStore, Runnable::run, Collections::emptyList, TestPlugins.noOpLoaderSwap());
}
@ParameterizedTest

View File

@ -24,6 +24,7 @@ import org.apache.kafka.connect.runtime.WorkerTask.TaskMetricsGroup;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.isolation.TestPlugins;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.StatusBackingStore;
@ -299,7 +300,7 @@ public class WorkerTaskTest {
Supplier<List<ErrorReporter<Object>>> errorReporterSupplier,
Time time, StatusBackingStore statusBackingStore) {
super(id, statusListener, initialState, loader, connectMetrics, errorHandlingMetrics,
retryWithToleranceOperator, transformationChain, errorReporterSupplier, time, statusBackingStore);
retryWithToleranceOperator, transformationChain, errorReporterSupplier, time, statusBackingStore, TestPlugins.noOpLoaderSwap());
}
@Override

View File

@ -89,6 +89,7 @@ import org.apache.kafka.connect.util.FutureCallback;
import org.apache.kafka.connect.util.SinkUtils;
import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.maven.artifact.versioning.VersionRange;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@ -322,7 +323,7 @@ public class WorkerTest {
// Create
mockKafkaClusterId();
mockConnectorIsolation(connectorClass, sourceConnector);
mockVersionedConnectorIsolation(connectorClass, null, sourceConnector);
mockExecutorRealSubmit(WorkerConnector.class);
worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
@ -358,7 +359,7 @@ public class WorkerTest {
verifyKafkaClusterId();
verifyConnectorIsolation(sourceConnector);
verifyVersionedConnectorIsolation(connectorClass, null, sourceConnector);
verifyExecutorSubmit();
verify(sourceConnector).initialize(any(ConnectorContext.class));
verify(sourceConnector).start(connectorProps);
@ -389,7 +390,8 @@ public class WorkerTest {
mockKafkaClusterId();
mockGenericIsolation();
when(plugins.newConnector(anyString())).thenThrow(exception);
when(plugins.pluginLoader(nonConnectorClass, null)).thenReturn(pluginLoader);
when(plugins.newConnector(nonConnectorClass, null)).thenThrow(exception);
worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
worker.herder = herder;
@ -414,7 +416,7 @@ public class WorkerTest {
assertStatistics(worker, 0, 0);
assertStartupStatistics(worker, 1, 1, 0, 0);
verify(plugins).newConnector(anyString());
verify(plugins).newConnector(nonConnectorClass, null);
verifyKafkaClusterId();
verifyGenericIsolation();
verify(connectorStatusListener).onFailure(eq(CONNECTOR_ID), any(ConnectException.class));
@ -426,7 +428,7 @@ public class WorkerTest {
setup(enableTopicCreation);
final String connectorAlias = "SampleSourceConnector";
mockKafkaClusterId();
mockConnectorIsolation(connectorAlias, sinkConnector);
mockVersionedConnectorIsolation(connectorAlias, null, sinkConnector);
mockExecutorRealSubmit(WorkerConnector.class);
connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorAlias);
@ -456,7 +458,7 @@ public class WorkerTest {
assertStartupStatistics(worker, 1, 0, 0, 0);
verifyKafkaClusterId();
verifyConnectorIsolation(sinkConnector);
verifyVersionedConnectorIsolation(connectorAlias, null, sinkConnector);
verifyExecutorSubmit();
verify(sinkConnector).initialize(any(ConnectorContext.class));
verify(sinkConnector).start(connectorProps);
@ -472,7 +474,7 @@ public class WorkerTest {
final String shortConnectorAlias = "WorkerTest";
mockKafkaClusterId();
mockConnectorIsolation(shortConnectorAlias, sinkConnector);
mockVersionedConnectorIsolation(shortConnectorAlias, null, sinkConnector);
mockExecutorRealSubmit(WorkerConnector.class);
connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, shortConnectorAlias);
@ -499,7 +501,7 @@ public class WorkerTest {
assertStatistics(worker, 0, 0);
verifyKafkaClusterId();
verifyConnectorIsolation(sinkConnector);
verifyVersionedConnectorIsolation(shortConnectorAlias, null, sinkConnector);
verify(sinkConnector).initialize(any(ConnectorContext.class));
verify(sinkConnector).start(connectorProps);
verify(connectorStatusListener).onStartup(CONNECTOR_ID);
@ -531,7 +533,7 @@ public class WorkerTest {
final String connectorClass = SampleSourceConnector.class.getName();
mockKafkaClusterId();
mockConnectorIsolation(connectorClass, sinkConnector);
mockVersionedConnectorIsolation(connectorClass, null, sinkConnector);
mockExecutorRealSubmit(WorkerConnector.class);
Map<String, String> taskProps = Collections.singletonMap("foo", "bar");
@ -584,7 +586,7 @@ public class WorkerTest {
assertStatistics(worker, 0, 0);
verifyKafkaClusterId();
verifyConnectorIsolation(sinkConnector);
verifyVersionedConnectorIsolation(connectorClass, null, sinkConnector);
verifyExecutorSubmit();
verify(sinkConnector).initialize(any(ConnectorContext.class));
verify(sinkConnector).start(connectorProps);
@ -601,10 +603,10 @@ public class WorkerTest {
public void testAddRemoveSourceTask(boolean enableTopicCreation) {
setup(enableTopicCreation);
mockKafkaClusterId();
mockTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, task);
mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter);
mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter);
mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, taskHeaderConverter);
mockVersionedTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, null, sourceConnector, task);
mockVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG, taskKeyConverter);
mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, taskValueConverter);
mockVersionedTaskHeaderConverterFromConnector(taskHeaderConverter);
mockExecutorFakeSubmit(WorkerTask.class);
Map<String, String> origProps = Collections.singletonMap(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
@ -642,10 +644,10 @@ public class WorkerTest {
assertStatistics(worker, 0, 0);
verifyKafkaClusterId();
verifyTaskIsolation(task);
verifyTaskConverter(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG);
verifyTaskConverter(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG);
verifyTaskHeaderConverter();
verifyVersionedTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, null, task);
verifyVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG);
verifyVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG);
verifyVersionedTaskHeaderConverterFromConnector();
verifyExecutorSubmit();
}
@ -657,10 +659,10 @@ public class WorkerTest {
// Most of the other cases use source tasks; we make sure to get code coverage for sink tasks here as well
SinkTask task = mock(TestSinkTask.class);
mockKafkaClusterId();
mockTaskIsolation(SampleSinkConnector.class, TestSinkTask.class, task);
mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter);
mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter);
mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, taskHeaderConverter);
mockVersionedTaskIsolation(SampleSinkConnector.class, TestSinkTask.class, null, sinkConnector, task);
mockVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG, taskKeyConverter);
mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, taskValueConverter);
mockVersionedTaskHeaderConverterFromConnector(taskHeaderConverter);
mockExecutorFakeSubmit(WorkerTask.class);
Map<String, String> origProps = Collections.singletonMap(TaskConfig.TASK_CLASS_CONFIG, TestSinkTask.class.getName());
@ -700,10 +702,10 @@ public class WorkerTest {
assertStatistics(worker, 0, 0);
verifyKafkaClusterId();
verifyTaskIsolation(task);
verifyTaskConverter(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG);
verifyTaskConverter(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG);
verifyTaskHeaderConverter();
verifyVersionedTaskIsolation(SampleSinkConnector.class, TestSinkTask.class, null, task);
verifyVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG);
verifyVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG);
verifyVersionedTaskHeaderConverterFromConnector();
verifyExecutorSubmit();
}
@ -729,10 +731,10 @@ public class WorkerTest {
config = new DistributedConfig(workerProps);
mockKafkaClusterId();
mockTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, task);
mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter);
mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter);
mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, taskHeaderConverter);
mockVersionedTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, null, sourceConnector, task);
mockVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG, taskKeyConverter);
mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, taskValueConverter);
mockVersionedTaskHeaderConverterFromConnector(taskHeaderConverter);
mockExecutorFakeSubmit(WorkerTask.class);
Runnable preProducer = mock(Runnable.class);
@ -774,10 +776,10 @@ public class WorkerTest {
assertStatistics(worker, 0, 0);
verifyKafkaClusterId();
verifyTaskIsolation(task);
verifyTaskConverter(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG);
verifyTaskConverter(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG);
verifyTaskHeaderConverter();
verifyVersionedTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, null, task);
verifyVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG);
verifyVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG);
verifyVersionedTaskHeaderConverterFromConnector();
verifyExecutorSubmit();
}
@ -794,11 +796,10 @@ public class WorkerTest {
TaskConfig taskConfig = new TaskConfig(origProps);
mockKafkaClusterId();
mockTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, task);
// Expect that the worker will create converters and will find them using the current classloader ...
mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter);
mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter);
mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, taskHeaderConverter);
mockVersionedTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, null, sourceConnector, task);
mockVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG, taskKeyConverter);
mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, taskValueConverter);
mockVersionedTaskHeaderConverterFromConnector(taskHeaderConverter);
mockExecutorFakeSubmit(WorkerTask.class);
@ -851,7 +852,7 @@ public class WorkerTest {
verify(instantiatedTask).initialize(taskConfig);
verify(herder, times(5)).taskStatus(TASK_ID);
verifyKafkaClusterId();
verifyTaskIsolation(task);
verifyVersionedTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, null, task);
verifyExecutorSubmit();
verify(instantiatedTask, atLeastOnce()).id();
verify(instantiatedTask).awaitStop(anyLong());
@ -860,9 +861,9 @@ public class WorkerTest {
// Called when we stop the worker
verify(instantiatedTask).loader();
verify(instantiatedTask).stop();
verifyTaskConverter(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG);
verifyTaskConverter(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG);
verifyTaskHeaderConverter();
verifyVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG);
verifyVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG);
verifyVersionedTaskHeaderConverterFromConnector();
}
@ParameterizedTest
@ -907,6 +908,7 @@ public class WorkerTest {
mockKafkaClusterId();
mockGenericIsolation();
when(plugins.pluginLoader(SampleSourceConnector.class.getName(), null)).thenReturn(pluginLoader);
worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy);
worker.herder = herder;
@ -935,14 +937,14 @@ public class WorkerTest {
mockFileConfigProvider();
mockKafkaClusterId();
mockTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, task);
mockVersionedTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, null, sourceConnector, task);
// Expect that the worker will create converters and will not initially find them using the current classloader ...
mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, null);
mockTaskConverter(ClassLoaderUsage.PLUGINS, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter);
mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, null);
mockTaskConverter(ClassLoaderUsage.PLUGINS, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter);
mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
mockTaskHeaderConverter(ClassLoaderUsage.PLUGINS, taskHeaderConverter);
mockVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG, null);
mockVersionedTaskConverterFromWorker(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, WorkerConfig.KEY_CONVERTER_VERSION, taskKeyConverter);
mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, null);
mockVersionedTaskConverterFromWorker(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, WorkerConfig.VALUE_CONVERTER_VERSION, taskValueConverter);
mockVersionedTaskHeaderConverterFromConnector(null);
mockVersionedTaskHeaderConverterFromWorker(taskHeaderConverter);
mockExecutorFakeSubmit(WorkerTask.class);
Map<String, String> origProps = Collections.singletonMap(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
@ -968,7 +970,13 @@ public class WorkerTest {
verify(constructedMockTask).awaitStop(anyLong());
verify(constructedMockTask).removeMetrics();
verifyKafkaClusterId();
verifyTaskIsolation(task);
verifyVersionedTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, null, task);
verifyVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG);
verifyVersionedTaskConverterFromWorker(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, WorkerConfig.KEY_CONVERTER_VERSION);
verifyVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG);
verifyVersionedTaskConverterFromWorker(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, WorkerConfig.VALUE_CONVERTER_VERSION);
verifyVersionedTaskHeaderConverterFromConnector();
verifyVersionedTaskHeaderConverterFromWorker();
verifyConverters();
verifyExecutorSubmit();
}
@ -985,14 +993,14 @@ public class WorkerTest {
TaskConfig taskConfig = new TaskConfig(origProps);
mockKafkaClusterId();
mockTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, task);
mockVersionedTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, null, sourceConnector, task);
// Expect that the worker will create converters and will not initially find them using the current classloader ...
mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, null);
mockTaskConverter(ClassLoaderUsage.PLUGINS, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter);
mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, null);
mockTaskConverter(ClassLoaderUsage.PLUGINS, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter);
mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
mockTaskHeaderConverter(ClassLoaderUsage.PLUGINS, taskHeaderConverter);
mockVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG, null);
mockVersionedTaskConverterFromWorker(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, WorkerConfig.KEY_CONVERTER_VERSION, taskKeyConverter);
mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, null);
mockVersionedTaskConverterFromWorker(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, WorkerConfig.VALUE_CONVERTER_VERSION, taskValueConverter);
mockVersionedTaskHeaderConverterFromConnector(null);
mockVersionedTaskHeaderConverterFromWorker(taskHeaderConverter);
mockExecutorFakeSubmit(WorkerTask.class);
worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService,
@ -1024,7 +1032,13 @@ public class WorkerTest {
verify(instantiatedTask).removeMetrics();
verifyKafkaClusterId();
verifyTaskIsolation(task);
verifyVersionedTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, null, task);
verifyVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG);
verifyVersionedTaskConverterFromWorker(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, WorkerConfig.KEY_CONVERTER_VERSION);
verifyVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG);
verifyVersionedTaskConverterFromWorker(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, WorkerConfig.VALUE_CONVERTER_VERSION);
verifyVersionedTaskHeaderConverterFromConnector();
verifyVersionedTaskHeaderConverterFromWorker();
verifyExecutorSubmit();
verifyStorage();
}
@ -1860,7 +1874,7 @@ public class WorkerTest {
@ParameterizedTest
@ValueSource(booleans = {true, false})
@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "rawtypes"})
public void testZombieFencing(boolean enableTopicCreation) {
setup(enableTopicCreation);
Admin admin = mock(Admin.class);
@ -1878,6 +1892,8 @@ public class WorkerTest {
mockKafkaClusterId();
mockGenericIsolation();
when(plugins.connectorClass(anyString(), any())).thenReturn((Class) sourceConnector.getClass());
when(plugins.pluginLoader(SampleSourceConnector.class.getName(), null)).thenReturn(pluginLoader);
worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService,
allConnectorClientConfigOverridePolicy, mockAdminConstructor);
@ -2087,7 +2103,8 @@ public class WorkerTest {
worker.start();
mockGenericIsolation();
when(plugins.newConnector(anyString())).thenReturn(sourceConnector);
when(plugins.newConnector(anyString(), any())).thenReturn(sourceConnector);
when(plugins.pluginLoader(SampleSourceConnector.class.getName(), null)).thenReturn(pluginLoader);
when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
when(sourceConnector.alterOffsets(eq(connectorProps), anyMap())).thenThrow(new UnsupportedOperationException("This connector doesn't " +
"support altering of offsets"));
@ -2679,7 +2696,7 @@ public class WorkerTest {
String connectorClass = SampleSourceConnector.class.getName();
connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass);
connectorProps.put(TASKS_MAX_ENFORCE_CONFIG, Boolean.toString(enforced));
mockConnectorIsolation(connectorClass, sourceConnector);
mockVersionedConnectorIsolation(connectorClass, null, sourceConnector);
mockExecutorRealSubmit(WorkerConnector.class);
@ -2851,10 +2868,10 @@ public class WorkerTest {
tasksMaxExceededMessage = failureCaptor.getValue().getMessage();
} else {
mockTaskIsolation(SampleSinkConnector.class, TestSinkTask.class, task);
mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter);
mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter);
mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, taskHeaderConverter);
mockVersionedTaskIsolation(SampleSinkConnector.class, TestSinkTask.class, null, sinkConnector, task);
mockVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG, taskKeyConverter);
mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, taskValueConverter);
mockVersionedTaskHeaderConverterFromConnector(taskHeaderConverter);
mockExecutorFakeSubmit(WorkerTask.class);
assertTrue(worker.startSinkTask(
@ -2964,8 +2981,20 @@ public class WorkerTest {
.thenReturn(returning);
}
private void verifyTaskConverter(String converterClassConfig) {
verify(plugins).newConverter(any(AbstractConfig.class), eq(converterClassConfig), eq(ClassLoaderUsage.CURRENT_CLASSLOADER));
private void mockVersionedTaskConverterFromConnector(String converterClassConfig, String converterVersionConfig, Converter returning) {
when(plugins.newConverter(any(ConnectorConfig.class), eq(converterClassConfig), eq(converterVersionConfig))).thenReturn(returning);
}
private void verifyVersionedTaskConverterFromConnector(String converterClassConfig, String converterVersionConfig) {
verify(plugins).newConverter(any(ConnectorConfig.class), eq(converterClassConfig), eq(converterVersionConfig));
}
private void mockVersionedTaskConverterFromWorker(String converterClassConfig, String converterVersionConfig, Converter returning) {
when(plugins.newConverter(any(WorkerConfig.class), eq(converterClassConfig), eq(converterVersionConfig))).thenReturn(returning);
}
private void verifyVersionedTaskConverterFromWorker(String converterClassConfig, String converterVersionConfig) {
verify(plugins).newConverter(any(WorkerConfig.class), eq(converterClassConfig), eq(converterVersionConfig));
}
private void mockTaskHeaderConverter(ClassLoaderUsage classLoaderUsage, HeaderConverter returning) {
@ -2977,8 +3006,25 @@ public class WorkerTest {
verify(plugins).newHeaderConverter(any(AbstractConfig.class), eq(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG), eq(ClassLoaderUsage.CURRENT_CLASSLOADER));
}
private void mockVersionedTaskHeaderConverterFromConnector(HeaderConverter returning) {
when(plugins.newHeaderConverter(any(ConnectorConfig.class), eq(ConnectorConfig.HEADER_CONVERTER_CLASS_CONFIG), eq(ConnectorConfig.HEADER_CONVERTER_VERSION_CONFIG)))
.thenReturn(returning);
}
private void verifyVersionedTaskHeaderConverterFromConnector() {
verify(plugins).newHeaderConverter(any(ConnectorConfig.class), eq(ConnectorConfig.HEADER_CONVERTER_CLASS_CONFIG), eq(ConnectorConfig.HEADER_CONVERTER_VERSION_CONFIG));
}
private void mockVersionedTaskHeaderConverterFromWorker(HeaderConverter returning) {
when(plugins.newHeaderConverter(any(WorkerConfig.class), eq(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG), eq(WorkerConfig.HEADER_CONVERTER_VERSION)))
.thenReturn(returning);
}
private void verifyVersionedTaskHeaderConverterFromWorker() {
verify(plugins).newHeaderConverter(any(WorkerConfig.class), eq(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG), eq(WorkerConfig.HEADER_CONVERTER_VERSION));
}
private void mockGenericIsolation() {
when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader);
when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
}
@ -2993,12 +3039,26 @@ public class WorkerTest {
when(connector.version()).thenReturn("1.0");
}
private void mockVersionedConnectorIsolation(String connectorClass, VersionRange range, Connector connector) {
mockGenericIsolation();
when(plugins.pluginLoader(connectorClass, range)).thenReturn(pluginLoader);
when(plugins.newConnector(connectorClass, range)).thenReturn(connector);
when(connector.version()).thenReturn(range == null ? "unknown" : range.toString());
}
private void verifyConnectorIsolation(Connector connector) {
verifyGenericIsolation();
verify(plugins).newConnector(anyString());
verify(connector, atLeastOnce()).version();
}
private void verifyVersionedConnectorIsolation(String connectorClass, VersionRange range, Connector connector) {
verifyGenericIsolation();
verify(plugins).pluginLoader(connectorClass, range);
verify(plugins).newConnector(connectorClass, range);
verify(connector, atLeastOnce()).version();
}
private void mockTaskIsolation(Class<? extends Connector> connector, Class<? extends Task> taskClass, Task task) {
mockGenericIsolation();
doReturn(connector).when(plugins).connectorClass(connector.getName());
@ -3006,6 +3066,15 @@ public class WorkerTest {
when(task.version()).thenReturn("1.0");
}
@SuppressWarnings({"unchecked", "rawtypes"})
private void mockVersionedTaskIsolation(Class<? extends Connector> connectorClass, Class<? extends Task> taskClass, VersionRange range, Connector connector, Task task) {
mockGenericIsolation();
when(plugins.pluginLoader(connectorClass.getName(), range)).thenReturn(pluginLoader);
when(plugins.connectorClass(connectorClass.getName(), range)).thenReturn((Class) connectorClass);
when(plugins.newTask(taskClass)).thenReturn(task);
when(task.version()).thenReturn(range == null ? "unknown" : range.toString());
}
private void verifyTaskIsolation(Task task) {
verifyGenericIsolation();
verify(plugins).connectorClass(anyString());
@ -3013,6 +3082,14 @@ public class WorkerTest {
verify(task).version();
}
private void verifyVersionedTaskIsolation(Class<? extends Connector> connectorClass, Class<? extends Task> taskClass, VersionRange range, Task task) {
verifyGenericIsolation();
verify(plugins).pluginLoader(connectorClass.getName(), range);
verify(plugins).connectorClass(connectorClass.getName(), range);
verify(plugins).newTask(taskClass);
verify(task).version();
}
private void mockExecutorRealSubmit(Class<? extends Runnable> runnableClass) {
// This test expects the runnable to be executed, so have the isolated runnable pass-through.
// Requires using the Worker constructor without the mocked executorService

View File

@ -20,6 +20,7 @@ import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.isolation.TestPlugins;
import org.apache.kafka.connect.storage.AppliedConnectorConfig;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.transforms.Transformation;
@ -195,7 +196,8 @@ public class WorkerTestUtils {
TransformationStage<R> stage = new TransformationStage<>(
predicatePlugin,
false,
transformationPlugin);
transformationPlugin,
TestPlugins.noOpLoaderSwap());
TransformationChain<T, R> realTransformationChainRetriableException = new TransformationChain(List.of(stage), toleranceOperator);
TransformationChain<T, R> transformationChainRetriableException = Mockito.spy(realTransformationChainRetriableException);
return transformationChainRetriableException;

View File

@ -69,7 +69,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -197,16 +196,9 @@ public class PluginsTest {
props.remove(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG);
createConfig();
// Because it's not explicitly set on the supplied configuration, the logic to use the current classloader for the connector
// will exit immediately, and so this method always returns null
HeaderConverter headerConverter = plugins.newHeaderConverter(config,
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
ClassLoaderUsage.CURRENT_CLASSLOADER);
assertNull(headerConverter);
// But we should always find it (or the worker's default) when using the plugins classloader ...
headerConverter = plugins.newHeaderConverter(config,
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
ClassLoaderUsage.PLUGINS);
ClassLoaderUsage.CURRENT_CLASSLOADER);
assertNotNull(headerConverter);
assertInstanceOf(SimpleHeaderConverter.class, headerConverter);
}

View File

@ -39,6 +39,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.jar.Attributes;
import java.util.jar.JarEntry;
@ -374,6 +375,12 @@ public class TestPlugins {
.collect(Collectors.toList());
}
public static Function<ClassLoader, LoaderSwap> noOpLoaderSwap() {
return classLoader -> {
return new LoaderSwap(Thread.currentThread().getContextClassLoader());
};
}
private static TestPlugin[] defaultPlugins() {
return Arrays.stream(TestPlugin.values())
.filter(TestPlugin::includeByDefault)

View File

@ -519,7 +519,7 @@ public class TopicCreationTest {
topicCreation.addTopic(FOO_TOPIC);
assertFalse(topicCreation.isTopicCreationRequired(FOO_TOPIC));
List<TransformationStage<SourceRecord>> transformationStages = sourceConfig.transformationStages(CONNECTOR_TASK_ID, METRICS);
List<TransformationStage<SourceRecord>> transformationStages = sourceConfig.transformationStages(MOCK_PLUGINS, CONNECTOR_TASK_ID, METRICS);
assertEquals(1, transformationStages.size());
TransformationStage<SourceRecord> xform = transformationStages.get(0);
SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, null, Schema.INT8_SCHEMA, 42));
@ -626,7 +626,7 @@ public class TopicCreationTest {
assertEquals(barPartitions, barTopicSpec.numPartitions());
assertEquals(barTopicProps, barTopicSpec.configs());
List<TransformationStage<SourceRecord>> transformationStages = sourceConfig.transformationStages(CONNECTOR_TASK_ID, METRICS);
List<TransformationStage<SourceRecord>> transformationStages = sourceConfig.transformationStages(MOCK_PLUGINS, CONNECTOR_TASK_ID, METRICS);
assertEquals(2, transformationStages.size());
TransformationStage<SourceRecord> castXForm = transformationStages.get(0);