From 9dc9973c1ca53218449f311e21478848f2d1fa92 Mon Sep 17 00:00:00 2001 From: snehashisp Date: Thu, 27 Feb 2025 04:42:34 +0530 Subject: [PATCH] KAFKA-18863: Connect Multiversion Support (Versioned Connector Creation and related changes) (#17743) Reviewers: Greg Harris --- .../runtime/AbstractWorkerSourceTask.java | 28 ++- .../connect/runtime/ConnectorConfig.java | 26 ++- .../runtime/ExactlyOnceWorkerSourceTask.java | 7 +- .../connect/runtime/TransformationStage.java | 25 +- .../apache/kafka/connect/runtime/Worker.java | 206 ++++++++++------- .../kafka/connect/runtime/WorkerSinkTask.java | 27 ++- .../connect/runtime/WorkerSourceTask.java | 7 +- .../kafka/connect/runtime/WorkerTask.java | 7 +- .../connect/runtime/isolation/Plugins.java | 4 +- .../runtime/AbstractWorkerSourceTaskTest.java | 3 +- .../connect/runtime/ConnectorConfigTest.java | 6 +- .../runtime/ErrorHandlingTaskTest.java | 9 +- .../ExactlyOnceWorkerSourceTaskTest.java | 3 +- .../runtime/TransformationStageTest.java | 5 +- .../connect/runtime/WorkerSinkTaskTest.java | 3 +- .../runtime/WorkerSinkTaskThreadedTest.java | 3 +- .../connect/runtime/WorkerSourceTaskTest.java | 3 +- .../kafka/connect/runtime/WorkerTaskTest.java | 3 +- .../kafka/connect/runtime/WorkerTest.java | 215 ++++++++++++------ .../connect/runtime/WorkerTestUtils.java | 4 +- .../runtime/isolation/PluginsTest.java | 10 +- .../runtime/isolation/TestPlugins.java | 7 + .../kafka/connect/util/TopicCreationTest.java | 4 +- 23 files changed, 403 insertions(+), 212 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java index d1675d5455a..683eb3abed0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java @@ -43,6 +43,7 @@ import org.apache.kafka.connect.runtime.errors.ProcessingContext; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.Stage; import org.apache.kafka.connect.runtime.errors.ToleranceType; +import org.apache.kafka.connect.runtime.isolation.LoaderSwap; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.source.SourceTaskContext; @@ -70,6 +71,7 @@ import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.function.Supplier; import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG; @@ -233,11 +235,12 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask retryWithToleranceOperator, StatusBackingStore statusBackingStore, Executor closeExecutor, - Supplier>> errorReportersSupplier) { + Supplier>> errorReportersSupplier, + Function pluginLoaderSwapper) { super(id, statusListener, initialState, loader, connectMetrics, errorMetrics, retryWithToleranceOperator, transformationChain, errorReportersSupplier, - time, statusBackingStore); + time, statusBackingStore, pluginLoaderSwapper); this.workerConfig = workerConfig; this.task = task; @@ -491,11 +494,17 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask convertHeaderFor(record), Stage.HEADER_CONVERTER, headerConverterPlugin.get().getClass()); - byte[] key = retryWithToleranceOperator.execute(context, () -> keyConverterPlugin.get().fromConnectData(record.topic(), headers, record.keySchema(), record.key()), - Stage.KEY_CONVERTER, keyConverterPlugin.get().getClass()); + byte[] key = retryWithToleranceOperator.execute(context, () -> { + try (LoaderSwap swap = pluginLoaderSwapper.apply(keyConverterPlugin.get().getClass().getClassLoader())) { + return keyConverterPlugin.get().fromConnectData(record.topic(), headers, record.keySchema(), record.key()); + } + }, Stage.KEY_CONVERTER, keyConverterPlugin.get().getClass()); - byte[] value = retryWithToleranceOperator.execute(context, () -> valueConverterPlugin.get().fromConnectData(record.topic(), headers, record.valueSchema(), record.value()), - Stage.VALUE_CONVERTER, valueConverterPlugin.get().getClass()); + byte[] value = retryWithToleranceOperator.execute(context, () -> { + try (LoaderSwap swap = pluginLoaderSwapper.apply(valueConverterPlugin.get().getClass().getClassLoader())) { + return valueConverterPlugin.get().fromConnectData(record.topic(), headers, record.valueSchema(), record.value()); + } + }, Stage.VALUE_CONVERTER, valueConverterPlugin.get().getClass()); if (context.failed()) { return null; @@ -551,8 +560,11 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask> List> transformationStages(ConnectorTaskId connectorTaskId, ConnectMetrics metrics) { + public > List> transformationStages(Plugins plugins, ConnectorTaskId connectorTaskId, ConnectMetrics metrics) { final List transformAliases = getList(TRANSFORMS_CONFIG); final List> transformations = new ArrayList<>(transformAliases.size()); @@ -370,8 +370,9 @@ public class ConnectorConfig extends AbstractConfig { final String prefix = TRANSFORMS_CONFIG + "." + alias + "."; try { - @SuppressWarnings("unchecked") - final Transformation transformation = Utils.newInstance(getClass(prefix + "type"), Transformation.class); + final String typeConfig = prefix + "type"; + final String versionConfig = prefix + WorkerConfig.PLUGIN_VERSION_SUFFIX; + @SuppressWarnings("unchecked") final Transformation transformation = getTransformationOrPredicate(plugins, typeConfig, versionConfig); Map configs = originalsWithPrefix(prefix); Object predicateAlias = configs.remove(TransformationStage.PREDICATE_CONFIG); Object negate = configs.remove(TransformationStage.NEGATE_CONFIG); @@ -379,13 +380,15 @@ public class ConnectorConfig extends AbstractConfig { Plugin> transformationPlugin = metrics.wrap(transformation, connectorTaskId, alias); if (predicateAlias != null) { String predicatePrefix = PREDICATES_PREFIX + predicateAlias + "."; + final String predicateTypeConfig = predicatePrefix + "type"; + final String predicateVersionConfig = predicatePrefix + WorkerConfig.PLUGIN_VERSION_SUFFIX; @SuppressWarnings("unchecked") - Predicate predicate = Utils.newInstance(getClass(predicatePrefix + "type"), Predicate.class); + Predicate predicate = getTransformationOrPredicate(plugins, predicateTypeConfig, predicateVersionConfig); predicate.configure(originalsWithPrefix(predicatePrefix)); Plugin> predicatePlugin = metrics.wrap(predicate, connectorTaskId, (String) predicateAlias); - transformations.add(new TransformationStage<>(predicatePlugin, negate != null && Boolean.parseBoolean(negate.toString()), transformationPlugin)); + transformations.add(new TransformationStage<>(predicatePlugin, negate != null && Boolean.parseBoolean(negate.toString()), transformationPlugin, plugins.safeLoaderSwapper())); } else { - transformations.add(new TransformationStage<>(transformationPlugin)); + transformations.add(new TransformationStage<>(transformationPlugin, plugins.safeLoaderSwapper())); } } catch (Exception e) { throw new ConnectException(e); @@ -395,6 +398,17 @@ public class ConnectorConfig extends AbstractConfig { return transformations; } + @SuppressWarnings("unchecked") + private T getTransformationOrPredicate(Plugins plugins, String classConfig, String versionConfig) { + try { + VersionRange range = PluginUtils.connectorVersionRequirement(getString(versionConfig)); + VersionRange connectorRange = PluginUtils.connectorVersionRequirement(getString(CONNECTOR_VERSION)); + return (T) plugins.newPlugin(getClass(classConfig).getName(), range, plugins.pluginLoader(getString(CONNECTOR_CLASS_CONFIG), connectorRange)); + } catch (Exception e) { + throw new ConnectException(e); + } + } + /** * Returns an enriched {@link ConfigDef} building upon the {@code ConfigDef}, using the current configuration specified in {@code props} as an input. *

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java index 706a3e4b9d5..fafbdbbc3f4 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java @@ -32,6 +32,7 @@ import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; import org.apache.kafka.connect.runtime.errors.ErrorReporter; import org.apache.kafka.connect.runtime.errors.ProcessingContext; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; +import org.apache.kafka.connect.runtime.isolation.LoaderSwap; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.source.SourceTask.TransactionBoundary; @@ -57,6 +58,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.function.Supplier; @@ -101,11 +103,12 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask { Executor closeExecutor, Runnable preProducerCheck, Runnable postProducerCheck, - Supplier>> errorReportersSupplier) { + Supplier>> errorReportersSupplier, + Function pluginLoaderSwapper) { super(id, task, statusListener, initialState, configState, keyConverterPlugin, valueConverterPlugin, headerConverterPlugin, transformationChain, buildTransactionContext(sourceConfig), producer, admin, topicGroups, offsetReader, offsetWriter, offsetStore, workerConfig, connectMetrics, errorMetrics, - loader, time, retryWithToleranceOperator, statusBackingStore, closeExecutor, errorReportersSupplier); + loader, time, retryWithToleranceOperator, statusBackingStore, closeExecutor, errorReportersSupplier, pluginLoaderSwapper); this.transactionOpen = false; this.committableRecords = new LinkedHashMap<>(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java index 90b3559866a..a86c4878ab3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java @@ -20,9 +20,12 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.common.internals.Plugin; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.runtime.isolation.LoaderSwap; import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.predicates.Predicate; +import java.util.function.Function; + /** * Wrapper for a {@link Transformation} and corresponding optional {@link Predicate} * which applies the transformation when the {@link Predicate} is true (or false, according to {@code negate}). @@ -36,15 +39,18 @@ public class TransformationStage> implements AutoClos private final Plugin> predicatePlugin; private final Plugin> transformationPlugin; private final boolean negate; + private final Function pluginLoaderSwapper; - TransformationStage(Plugin> transformationPlugin) { - this(null, false, transformationPlugin); + + TransformationStage(Plugin> transformationPlugin, Function pluginLoaderSwapper) { + this(null, false, transformationPlugin, pluginLoaderSwapper); } - TransformationStage(Plugin> predicatePlugin, boolean negate, Plugin> transformationPlugin) { + TransformationStage(Plugin> predicatePlugin, boolean negate, Plugin> transformationPlugin, Function pluginLoaderSwapper) { this.predicatePlugin = predicatePlugin; this.negate = negate; this.transformationPlugin = transformationPlugin; + this.pluginLoaderSwapper = pluginLoaderSwapper; } public Class> transformClass() { @@ -54,8 +60,17 @@ public class TransformationStage> implements AutoClos } public R apply(R record) { - if (predicatePlugin == null || predicatePlugin.get() == null || negate ^ predicatePlugin.get().test(record)) { - return transformationPlugin.get().apply(record); + Predicate predicate = predicatePlugin != null ? predicatePlugin.get() : null; + boolean shouldTransform = predicate == null; + if (predicate != null) { + try (LoaderSwap swap = pluginLoaderSwapper.apply(predicate.getClass().getClassLoader())) { + shouldTransform = negate ^ predicate.test(record); + } + } + if (shouldTransform) { + try (LoaderSwap swap = pluginLoaderSwapper.apply(transformationPlugin.get().getClass().getClassLoader())) { + record = transformationPlugin.get().apply(record); + } } return record; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index e01a1adef45..a435d281a7c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -67,8 +67,10 @@ import org.apache.kafka.connect.runtime.errors.LogReporter; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter; import org.apache.kafka.connect.runtime.isolation.LoaderSwap; +import org.apache.kafka.connect.runtime.isolation.PluginUtils; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage; +import org.apache.kafka.connect.runtime.isolation.VersionedPluginLoadingException; import org.apache.kafka.connect.runtime.rest.RestServer; import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; @@ -99,6 +101,7 @@ import org.apache.kafka.connect.util.SinkUtils; import org.apache.kafka.connect.util.TopicAdmin; import org.apache.kafka.connect.util.TopicCreationGroup; +import org.apache.maven.artifact.versioning.InvalidVersionSpecificationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -314,32 +317,38 @@ public final class Worker { final WorkerConnector workerConnector; final String connClass = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); - ClassLoader connectorLoader = plugins.connectorLoader(connClass); - try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { - log.info("Creating connector {} of type {}", connName, connClass); - final Connector connector = plugins.newConnector(connClass); - final ConnectorConfig connConfig; - final CloseableOffsetStorageReader offsetReader; - final ConnectorOffsetBackingStore offsetStore; - if (ConnectUtils.isSinkConnector(connector)) { - connConfig = new SinkConnectorConfig(plugins, connProps); - offsetReader = null; - offsetStore = null; - } else { - SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connProps, config.topicCreationEnable()); - connConfig = sourceConfig; + final ClassLoader connectorLoader; - // Set up the offset backing store for this connector instance - offsetStore = config.exactlyOnceSourceEnabled() + try { + connectorLoader = connectorClassLoader(connProps); + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + log.info("Creating connector {} of type {}", connName, connClass); + final Connector connector = instantiateConnector(connProps); + + final ConnectorConfig connConfig; + final CloseableOffsetStorageReader offsetReader; + final ConnectorOffsetBackingStore offsetStore; + + if (ConnectUtils.isSinkConnector(connector)) { + connConfig = new SinkConnectorConfig(plugins, connProps); + offsetReader = null; + offsetStore = null; + } else { + SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connProps, config.topicCreationEnable()); + connConfig = sourceConfig; + + // Set up the offset backing store for this connector instance + offsetStore = config.exactlyOnceSourceEnabled() ? offsetStoreForExactlyOnceSourceConnector(sourceConfig, connName, connector, null) : offsetStoreForRegularSourceConnector(sourceConfig, connName, connector, null); - offsetStore.configure(config); - offsetReader = new OffsetStorageReaderImpl(offsetStore, connName, internalKeyConverter, internalValueConverter); - } - workerConnector = new WorkerConnector( + offsetStore.configure(config); + offsetReader = new OffsetStorageReaderImpl(offsetStore, connName, internalKeyConverter, internalValueConverter); + } + workerConnector = new WorkerConnector( connName, connector, connConfig, ctx, metrics, connectorStatusListener, offsetReader, offsetStore, connectorLoader); - log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connector.getClass()); - workerConnector.transitionTo(initialState, onConnectorStateChange); + log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connector.getClass()); + workerConnector.transitionTo(initialState, onConnectorStateChange); + } } catch (Throwable t) { log.error("Failed to start connector {}", connName, t); connectorStatusListener.onFailure(connName, t); @@ -655,51 +664,51 @@ public final class Worker { throw new ConnectException("Task already exists in this worker: " + id); connectorStatusMetricsGroup.recordTaskAdded(id); - String connType = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); - ClassLoader connectorLoader = plugins.connectorLoader(connType); - try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { - final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps); + final ClassLoader connectorLoader; + try { + connectorLoader = connectorClassLoader(connProps); + try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps); - int maxTasks = connConfig.tasksMax(); - int numTasks = configState.taskCount(id.connector()); - checkTasksMax(id.connector(), numTasks, maxTasks, connConfig.enforceTasksMax()); + int maxTasks = connConfig.tasksMax(); + int numTasks = configState.taskCount(id.connector()); + checkTasksMax(id.connector(), numTasks, maxTasks, connConfig.enforceTasksMax()); - final TaskConfig taskConfig = new TaskConfig(taskProps); - final Class taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class); - final Task task = plugins.newTask(taskClass); - log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName()); + final TaskConfig taskConfig = new TaskConfig(taskProps); + final Class taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class); + final Task task = plugins.newTask(taskClass); + log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName()); - // By maintaining connector's specific class loader for this thread here, we first - // search for converters within the connector dependencies. - // If any of these aren't found, that means the connector didn't configure specific converters, - // so we should instantiate based upon the worker configuration - Converter keyConverter = plugins.newConverter(connConfig, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage - .CURRENT_CLASSLOADER); - Converter valueConverter = plugins.newConverter(connConfig, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.CURRENT_CLASSLOADER); - HeaderConverter headerConverter = plugins.newHeaderConverter(connConfig, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, - ClassLoaderUsage.CURRENT_CLASSLOADER); - if (keyConverter == null) { - keyConverter = plugins.newConverter(config, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS); - log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), id); - } else { - log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), id); - } - if (valueConverter == null) { - valueConverter = plugins.newConverter(config, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS); - log.info("Set up the value converter {} for task {} using the worker config", valueConverter.getClass(), id); - } else { - log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), id); - } - if (headerConverter == null) { - headerConverter = plugins.newHeaderConverter(config, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, ClassLoaderUsage - .PLUGINS); - log.info("Set up the header converter {} for task {} using the worker config", headerConverter.getClass(), id); - } else { - log.info("Set up the header converter {} for task {} using the connector config", headerConverter.getClass(), id); - } - workerTask = taskBuilder + // By maintaining connector's specific class loader for this thread here, we first + // search for converters within the connector dependencies. + // If any of these aren't found, that means the connector didn't configure specific converters, + // so we should instantiate based upon the worker configuration + Converter keyConverter = plugins.newConverter(connConfig, ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG); + Converter valueConverter = plugins.newConverter(connConfig, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG); + HeaderConverter headerConverter = plugins.newHeaderConverter(connConfig, ConnectorConfig.HEADER_CONVERTER_CLASS_CONFIG, ConnectorConfig.HEADER_CONVERTER_VERSION_CONFIG); + + if (keyConverter == null) { + keyConverter = plugins.newConverter(config, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, WorkerConfig.KEY_CONVERTER_VERSION); + log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), id); + } else { + log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), id); + } + if (valueConverter == null) { + valueConverter = plugins.newConverter(config, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, WorkerConfig.VALUE_CONVERTER_VERSION); + log.info("Set up the value converter {} for task {} using the worker config", valueConverter.getClass(), id); + } else { + log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), id); + } + if (headerConverter == null) { + headerConverter = plugins.newHeaderConverter(config, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, WorkerConfig.HEADER_CONVERTER_VERSION); + log.info("Set up the header converter {} for task {} using the worker config", headerConverter.getClass(), id); + } else { + log.info("Set up the header converter {} for task {} using the connector config", headerConverter.getClass(), id); + } + + workerTask = taskBuilder .withTask(task) .withConnectorConfig(connConfig) .withKeyConverterPlugin(metrics.wrap(keyConverter, id, true)) @@ -708,7 +717,8 @@ public final class Worker { .withClassloader(connectorLoader) .build(); - workerTask.initialize(taskConfig); + workerTask.initialize(taskConfig); + } } catch (Throwable t) { log.error("Failed to start task {}", id, t); connectorStatusMetricsGroup.recordTaskRemoved(id); @@ -739,19 +749,17 @@ public final class Worker { public KafkaFuture fenceZombies(String connName, int numTasks, Map connProps) { log.debug("Fencing out {} task producers for source connector {}", numTasks, connName); try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) { - String connType = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); - ClassLoader connectorLoader = plugins.connectorLoader(connType); - try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { + Class connectorClass = connectorClass(connProps); + ClassLoader classLoader = connectorClassLoader(connProps); + try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) { final SourceConnectorConfig connConfig = new SourceConnectorConfig(plugins, connProps, config.topicCreationEnable()); - final Class connClass = plugins.connectorClass( - connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG)); Map adminConfig = adminConfigs( connName, "connector-worker-adminclient-" + connName, config, connConfig, - connClass, + connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE); @@ -1198,11 +1206,9 @@ public final class Worker { * @param cb callback to invoke upon completion of the request */ public void connectorOffsets(String connName, Map connectorConfig, Callback cb) { - String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); - ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); - + Connector connector = instantiateConnector(connectorConfig); + ClassLoader connectorLoader = connectorClassLoader(connectorConfig); try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { - Connector connector = plugins.newConnector(connectorClassOrAlias); if (ConnectUtils.isSinkConnector(connector)) { log.debug("Fetching offsets for sink connector: {}", connName); sinkConnectorOffsets(connName, connector, connectorConfig, cb); @@ -1213,6 +1219,43 @@ public final class Worker { } } + private Connector instantiateConnector(Map connProps) throws ConnectException { + + final String klass = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + final String version = connProps.get(ConnectorConfig.CONNECTOR_VERSION); + + try { + return plugins.newConnector(klass, PluginUtils.connectorVersionRequirement(version)); + } catch (InvalidVersionSpecificationException | VersionedPluginLoadingException e) { + throw new ConnectException( + String.format("Failed to instantiate class for connector %s, class %s", klass, connProps.get(ConnectorConfig.NAME_CONFIG)), e); + } + } + + private ClassLoader connectorClassLoader(Map connProps) throws ConnectException { + final String klass = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + final String version = connProps.get(ConnectorConfig.CONNECTOR_VERSION); + + try { + return plugins.pluginLoader(klass, PluginUtils.connectorVersionRequirement(version)); + } catch (InvalidVersionSpecificationException | VersionedPluginLoadingException e) { + throw new ConnectException( + String.format("Failed to get class loader for connector %s, class %s", klass, connProps.get(ConnectorConfig.NAME_CONFIG)), e); + } + } + + private Class connectorClass(Map connProps) throws ConnectException { + final String klass = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + final String version = connProps.get(ConnectorConfig.CONNECTOR_VERSION); + + try { + return plugins.connectorClass(klass, PluginUtils.connectorVersionRequirement(version)); + } catch (InvalidVersionSpecificationException | VersionedPluginLoadingException e) { + throw new ConnectException( + String.format("Failed to get class for connector %s, class %s", klass, connProps.get(ConnectorConfig.NAME_CONFIG)), e); + } + } + /** * Get the current consumer group offsets for a sink connector. *

@@ -1311,12 +1354,10 @@ public final class Worker { */ public void modifyConnectorOffsets(String connName, Map connectorConfig, Map, Map> offsets, Callback cb) { - String connectorClassOrAlias = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); - ClassLoader connectorLoader = plugins.connectorLoader(connectorClassOrAlias); - Connector connector; + final Connector connector = instantiateConnector(connectorConfig); + ClassLoader connectorLoader = connectorClassLoader(connectorConfig); try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) { - connector = plugins.newConnector(connectorClassOrAlias); if (ConnectUtils.isSinkConnector(connector)) { log.debug("Modifying offsets for sink connector: {}", connName); modifySinkConnectorOffsets(connName, connector, connectorConfig, offsets, connectorLoader, cb); @@ -1791,13 +1832,12 @@ public final class Worker { Objects.requireNonNull(classLoader, "Classloader used by task cannot be null"); ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id); - final Class connectorClass = plugins.connectorClass( - connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG)); + final Class connectorClass = connectorClass(connectorConfig.originalsStrings()); RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator<>(connectorConfig.errorRetryTimeout(), connectorConfig.errorMaxDelayInMillis(), connectorConfig.errorToleranceType(), Time.SYSTEM, errorHandlingMetrics); - TransformationChain transformationChain = new TransformationChain<>(connectorConfig.transformationStages(id, metrics), retryWithToleranceOperator); + TransformationChain transformationChain = new TransformationChain<>(connectorConfig.transformationStages(plugins, id, metrics), retryWithToleranceOperator); log.info("Initializing: {}", transformationChain); return doBuild(task, id, configState, statusListener, initialState, @@ -1862,7 +1902,7 @@ public final class Worker { return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverterPlugin, valueConverterPlugin, errorHandlingMetrics, headerConverterPlugin, transformationChain, consumer, classLoader, time, retryWithToleranceOperator, workerErrantRecordReporter, herder.statusBackingStore(), - () -> sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass)); + () -> sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass), plugins.safeLoaderSwapper()); } } @@ -1922,7 +1962,7 @@ public final class Worker { return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverterPlugin, valueConverterPlugin, errorHandlingMetrics, headerConverterPlugin, transformationChain, producer, topicAdmin, topicCreationGroups, offsetReader, offsetWriter, offsetStore, config, configState, metrics, classLoader, time, - retryWithToleranceOperator, herder.statusBackingStore(), executor, () -> sourceTaskReporters(id, sourceConfig, errorHandlingMetrics)); + retryWithToleranceOperator, herder.statusBackingStore(), executor, () -> sourceTaskReporters(id, sourceConfig, errorHandlingMetrics), plugins.safeLoaderSwapper()); } } @@ -1987,7 +2027,7 @@ public final class Worker { headerConverterPlugin, transformationChain, producer, topicAdmin, topicCreationGroups, offsetReader, offsetWriter, offsetStore, config, configState, metrics, errorHandlingMetrics, classLoader, time, retryWithToleranceOperator, herder.statusBackingStore(), sourceConfig, executor, preProducerCheck, postProducerCheck, - () -> sourceTaskReporters(id, sourceConfig, errorHandlingMetrics)); + () -> sourceTaskReporters(id, sourceConfig, errorHandlingMetrics), plugins.safeLoaderSwapper()); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index c050c61fb5f..4b8256115ed 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -47,6 +47,7 @@ import org.apache.kafka.connect.runtime.errors.ProcessingContext; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.Stage; import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter; +import org.apache.kafka.connect.runtime.isolation.LoaderSwap; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.storage.ClusterConfigState; @@ -66,6 +67,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Function; import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -122,9 +124,10 @@ class WorkerSinkTask extends WorkerTask, SinkReco RetryWithToleranceOperator> retryWithToleranceOperator, WorkerErrantRecordReporter workerErrantRecordReporter, StatusBackingStore statusBackingStore, - Supplier>>> errorReportersSupplier) { + Supplier>>> errorReportersSupplier, + Function pluginLoaderSwapper) { super(id, statusListener, initialState, loader, connectMetrics, errorMetrics, - retryWithToleranceOperator, transformationChain, errorReportersSupplier, time, statusBackingStore); + retryWithToleranceOperator, transformationChain, errorReportersSupplier, time, statusBackingStore, pluginLoaderSwapper); this.workerConfig = workerConfig; this.task = task; @@ -539,11 +542,17 @@ class WorkerSinkTask extends WorkerTask, SinkReco } private SinkRecord convertAndTransformRecord(ProcessingContext> context, final ConsumerRecord msg) { - SchemaAndValue keyAndSchema = retryWithToleranceOperator.execute(context, () -> keyConverterPlugin.get().toConnectData(msg.topic(), msg.headers(), msg.key()), - Stage.KEY_CONVERTER, keyConverterPlugin.get().getClass()); + SchemaAndValue keyAndSchema = retryWithToleranceOperator.execute(context, () -> { + try (LoaderSwap swap = pluginLoaderSwapper.apply(keyConverterPlugin.get().getClass().getClassLoader())) { + return keyConverterPlugin.get().toConnectData(msg.topic(), msg.headers(), msg.key()); + } + }, Stage.KEY_CONVERTER, keyConverterPlugin.get().getClass()); - SchemaAndValue valueAndSchema = retryWithToleranceOperator.execute(context, () -> valueConverterPlugin.get().toConnectData(msg.topic(), msg.headers(), msg.value()), - Stage.VALUE_CONVERTER, valueConverterPlugin.get().getClass()); + SchemaAndValue valueAndSchema = retryWithToleranceOperator.execute(context, () -> { + try (LoaderSwap swap = pluginLoaderSwapper.apply(valueConverterPlugin.get().getClass().getClassLoader())) { + return valueConverterPlugin.get().toConnectData(msg.topic(), msg.headers(), msg.value()); + } + }, Stage.VALUE_CONVERTER, valueConverterPlugin.get().getClass()); Headers headers = retryWithToleranceOperator.execute(context, () -> convertHeadersFor(msg), Stage.HEADER_CONVERTER, headerConverterPlugin.get().getClass()); @@ -580,8 +589,10 @@ class WorkerSinkTask extends WorkerTask, SinkReco if (recordHeaders != null) { String topic = record.topic(); for (org.apache.kafka.common.header.Header recordHeader : recordHeaders) { - SchemaAndValue schemaAndValue = headerConverterPlugin.get().toConnectHeader(topic, recordHeader.key(), recordHeader.value()); - result.add(recordHeader.key(), schemaAndValue); + try (LoaderSwap swap = pluginLoaderSwapper.apply(headerConverterPlugin.get().getClass().getClassLoader())) { + SchemaAndValue schemaAndValue = headerConverterPlugin.get().toConnectHeader(topic, recordHeader.key(), recordHeader.value()); + result.add(recordHeader.key(), schemaAndValue); + } } } return result; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index a92ebe0fc39..0806e887735 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -28,6 +28,7 @@ import org.apache.kafka.connect.runtime.errors.ProcessingContext; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.Stage; import org.apache.kafka.connect.runtime.errors.ToleranceType; +import org.apache.kafka.connect.runtime.isolation.LoaderSwap; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.storage.CloseableOffsetStorageReader; @@ -53,6 +54,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.function.Supplier; import static org.apache.kafka.connect.runtime.SubmittedRecords.CommittableOffsets; @@ -91,12 +93,13 @@ class WorkerSourceTask extends AbstractWorkerSourceTask { RetryWithToleranceOperator retryWithToleranceOperator, StatusBackingStore statusBackingStore, Executor closeExecutor, - Supplier>> errorReportersSupplier) { + Supplier>> errorReportersSupplier, + Function pluginLoaderSwapper) { super(id, task, statusListener, initialState, configState, keyConverterPlugin, valueConverterPlugin, headerConverterPlugin, transformationChain, null, producer, admin, topicGroups, offsetReader, offsetWriter, offsetStore, workerConfig, connectMetrics, errorMetrics, loader, - time, retryWithToleranceOperator, statusBackingStore, closeExecutor, errorReportersSupplier); + time, retryWithToleranceOperator, statusBackingStore, closeExecutor, errorReportersSupplier, pluginLoaderSwapper); this.committableOffsets = CommittableOffsets.EMPTY; this.submittedRecords = new SubmittedRecords(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java index ff0ecfce276..fa28a4e7b0e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java @@ -33,6 +33,7 @@ import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; import org.apache.kafka.connect.runtime.errors.ErrorReporter; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; +import org.apache.kafka.connect.runtime.isolation.LoaderSwap; import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.LoggingContext; @@ -44,6 +45,7 @@ import java.util.List; import java.util.Locale; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.function.Supplier; /** @@ -78,6 +80,7 @@ abstract class WorkerTask> implements Runnable { protected final RetryWithToleranceOperator retryWithToleranceOperator; protected final TransformationChain transformationChain; private final Supplier>> errorReportersSupplier; + protected final Function pluginLoaderSwapper; protected final PluginMetricsImpl pluginMetrics; public WorkerTask(ConnectorTaskId id, @@ -90,7 +93,8 @@ abstract class WorkerTask> implements Runnable { TransformationChain transformationChain, Supplier>> errorReportersSupplier, Time time, - StatusBackingStore statusBackingStore) { + StatusBackingStore statusBackingStore, + Function pluginLoaderSwapper) { this.id = id; this.taskMetricsGroup = new TaskMetricsGroup(this.id, connectMetrics, statusListener); this.errorMetrics = errorMetrics; @@ -106,6 +110,7 @@ abstract class WorkerTask> implements Runnable { this.errorReportersSupplier = errorReportersSupplier; this.time = time; this.statusBackingStore = statusBackingStore; + this.pluginLoaderSwapper = pluginLoaderSwapper; this.pluginMetrics = connectMetrics.taskPluginMetrics(id); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java index 8be45e773b3..98f33ea582b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -549,7 +549,7 @@ public class Plugins { } private HeaderConverter newHeaderConverter(AbstractConfig config, String classPropertyName, String versionPropertyName, ClassLoaderUsage classLoaderUsage) { - if (!config.originals().containsKey(classPropertyName) && classLoaderUsage == ClassLoaderUsage.CURRENT_CLASSLOADER) { + if (config.getClass(classPropertyName) == null && classLoaderUsage == ClassLoaderUsage.CURRENT_CLASSLOADER) { // This configuration does not define the Header Converter via the specified property name return null; } @@ -602,7 +602,7 @@ public class Plugins { // if the config specifies the class name, use it, otherwise use the default which we can get from config.getClass String classOrAlias = config.originalsStrings().get(classPropertyName); if (classOrAlias == null) { - classOrAlias = config.getClass(classPropertyName).getName(); + classOrAlias = config.getClass(classPropertyName) == null ? null : config.getClass(classPropertyName).getName(); } try { klass = pluginClass(delegatingLoader, classOrAlias, basePluginClass, range); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java index 780cd19d093..704a1f1ff40 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java @@ -43,6 +43,7 @@ import org.apache.kafka.connect.runtime.errors.ProcessingContext; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest; import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.runtime.isolation.TestPlugins; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; @@ -966,7 +967,7 @@ public class AbstractWorkerSourceTaskTest { taskId, sourceTask, statusListener, TargetState.STARTED, configState, keyConverterPlugin, valueConverterPlugin, headerConverterPlugin, transformationChain, workerTransactionContext, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, offsetStore, config, metrics, errorHandlingMetrics, plugins.delegatingLoader(), Time.SYSTEM, retryWithToleranceOperator, - statusBackingStore, Runnable::run, errorReportersSupplier) { + statusBackingStore, Runnable::run, errorReportersSupplier, TestPlugins.noOpLoaderSwap()) { @Override protected void prepareToInitializeTask() { } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java index 2c01c4b4fbc..5253bcb47da 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java @@ -163,7 +163,7 @@ public class ConnectorConfigTest> { props.put("transforms.a.type", SimpleTransformation.class.getName()); props.put("transforms.a.magic.number", "42"); final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props); - final List> transformationStages = config.transformationStages(CONNECTOR_TASK_ID, METRICS); + final List> transformationStages = config.transformationStages(MOCK_PLUGINS, CONNECTOR_TASK_ID, METRICS); assertEquals(1, transformationStages.size()); final TransformationStage stage = transformationStages.get(0); assertEquals(SimpleTransformation.class, stage.transformClass()); @@ -192,7 +192,7 @@ public class ConnectorConfigTest> { props.put("transforms.b.type", SimpleTransformation.class.getName()); props.put("transforms.b.magic.number", "84"); final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props); - final List> transformationStages = config.transformationStages(CONNECTOR_TASK_ID, METRICS); + final List> transformationStages = config.transformationStages(MOCK_PLUGINS, CONNECTOR_TASK_ID, METRICS); assertEquals(2, transformationStages.size()); assertEquals(42, transformationStages.get(0).apply(DUMMY_RECORD).kafkaPartition().intValue()); assertEquals(84, transformationStages.get(1).apply(DUMMY_RECORD).kafkaPartition().intValue()); @@ -293,7 +293,7 @@ public class ConnectorConfigTest> { private void assertTransformationStageWithPredicate(Map props, boolean expectedNegated) { final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props); - final List> transformationStages = config.transformationStages(CONNECTOR_TASK_ID, METRICS); + final List> transformationStages = config.transformationStages(MOCK_PLUGINS, CONNECTOR_TASK_ID, METRICS); assertEquals(1, transformationStages.size()); TransformationStage stage = transformationStages.get(0); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java index 035b30216da..70edfb0f598 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java @@ -44,6 +44,7 @@ import org.apache.kafka.connect.runtime.errors.ToleranceType; import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter; import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.runtime.isolation.TestPlugins; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.sink.SinkRecord; @@ -427,7 +428,7 @@ public class ErrorHandlingTaskTest { Plugin> transformationPlugin = metrics.wrap(new FaultyPassthrough(), taskId, ""); TransformationChain, SinkRecord> sinkTransforms = - new TransformationChain<>(singletonList(new TransformationStage<>(transformationPlugin)), retryWithToleranceOperator); + new TransformationChain<>(singletonList(new TransformationStage<>(transformationPlugin, TestPlugins.noOpLoaderSwap())), retryWithToleranceOperator); Plugin keyConverterPlugin = metrics.wrap(converter, taskId, true); Plugin valueConverterPlugin = metrics.wrap(converter, taskId, false); @@ -437,7 +438,7 @@ public class ErrorHandlingTaskTest { ClusterConfigState.EMPTY, metrics, keyConverterPlugin, valueConverterPlugin, errorHandlingMetrics, headerConverterPlugin, sinkTransforms, consumer, pluginLoader, time, retryWithToleranceOperator, workerErrantRecordReporter, - statusBackingStore, () -> errorReporters); + statusBackingStore, () -> errorReporters, TestPlugins.noOpLoaderSwap()); } private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator, List> errorReporters) { @@ -463,7 +464,7 @@ public class ErrorHandlingTaskTest { List> errorReporters, Converter converter) { Plugin> transformationPlugin = metrics.wrap(new FaultyPassthrough(), taskId, ""); TransformationChain sourceTransforms = new TransformationChain<>(singletonList( - new TransformationStage<>(transformationPlugin)), retryWithToleranceOperator); + new TransformationStage<>(transformationPlugin, TestPlugins.noOpLoaderSwap())), retryWithToleranceOperator); Plugin keyConverterPlugin = metrics.wrap(converter, taskId, true); Plugin valueConverterPlugin = metrics.wrap(converter, taskId, false); @@ -476,7 +477,7 @@ public class ErrorHandlingTaskTest { offsetReader, offsetWriter, offsetStore, workerConfig, ClusterConfigState.EMPTY, metrics, pluginLoader, time, retryWithToleranceOperator, - statusBackingStore, Runnable::run, () -> errorReporters)); + statusBackingStore, Runnable::run, () -> errorReporters, TestPlugins.noOpLoaderSwap())); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java index 24103aefd97..a6375398d29 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java @@ -36,6 +36,7 @@ import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest; import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.runtime.isolation.TestPlugins; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; @@ -283,7 +284,7 @@ public class ExactlyOnceWorkerSourceTaskTest { workerTask = new ExactlyOnceWorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverterPlugin, valueConverterPlugin, headerConverterPlugin, transformationChain, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, offsetStore, config, clusterConfigState, metrics, errorHandlingMetrics, plugins.delegatingLoader(), time, RetryWithToleranceOperatorTest.noneOperator(), statusBackingStore, - sourceConfig, Runnable::run, preProducerCheck, postProducerCheck, Collections::emptyList); + sourceConfig, Runnable::run, preProducerCheck, postProducerCheck, Collections::emptyList, TestPlugins.noOpLoaderSwap()); } @ParameterizedTest diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java index 88ed326457a..e2791a63f7b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.connect.runtime.isolation.TestPlugins; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.predicates.Predicate; @@ -61,7 +62,9 @@ public class TransformationStageTest { TransformationStage stage = new TransformationStage<>( predicatePlugin, negate, - transformationPlugin); + transformationPlugin, + TestPlugins.noOpLoaderSwap() + ); assertEquals(expectedResult, stage.apply(initial)); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index 0750b0855cb..2607ee8b03b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -48,6 +48,7 @@ import org.apache.kafka.connect.runtime.errors.ProcessingContext; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest; import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; +import org.apache.kafka.connect.runtime.isolation.TestPlugins; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.sink.SinkRecord; @@ -228,7 +229,7 @@ public class WorkerSinkTaskTest { taskId, task, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, connectMetrics, keyConverterPlugin, valueConverterPlugin, errorMetrics, headerConverterPlugin, transformationChain, consumer, loader, time, - retryWithToleranceOperator, null, statusBackingStore, errorReportersSupplier); + retryWithToleranceOperator, null, statusBackingStore, errorReportersSupplier, TestPlugins.noOpLoaderSwap()); } @AfterEach diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index 15ea638ef02..74021118098 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; import org.apache.kafka.connect.runtime.errors.ProcessingContext; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest; import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; +import org.apache.kafka.connect.runtime.isolation.TestPlugins; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.sink.SinkRecord; @@ -182,7 +183,7 @@ public class WorkerSinkTaskThreadedTest { taskId, sinkTask, statusListener, initialState, workerConfig, ClusterConfigState.EMPTY, metrics, keyConverterPlugin, valueConverterPlugin, errorHandlingMetrics, headerConverterPlugin, transformationChain, consumer, pluginLoader, time, RetryWithToleranceOperatorTest.noneOperator(), null, statusBackingStore, - Collections::emptyList); + Collections::emptyList, TestPlugins.noOpLoaderSwap()); recordsReturned = 0; } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 6d6ab60c4a7..23fb3618f81 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -38,6 +38,7 @@ import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest; import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.runtime.isolation.TestPlugins; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; @@ -254,7 +255,7 @@ public class WorkerSourceTaskTest { workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverterPlugin, valueConverterPlugin, errorHandlingMetrics, headerConverterPlugin, transformationChain, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, offsetStore, config, clusterConfigState, metrics, plugins.delegatingLoader(), Time.SYSTEM, - retryWithToleranceOperator, statusBackingStore, Runnable::run, Collections::emptyList); + retryWithToleranceOperator, statusBackingStore, Runnable::run, Collections::emptyList, TestPlugins.noOpLoaderSwap()); } @ParameterizedTest diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java index c8c8cc49d05..eae9c96998b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.connect.runtime.WorkerTask.TaskMetricsGroup; import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics; import org.apache.kafka.connect.runtime.errors.ErrorReporter; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; +import org.apache.kafka.connect.runtime.isolation.TestPlugins; import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.storage.StatusBackingStore; @@ -299,7 +300,7 @@ public class WorkerTaskTest { Supplier>> errorReporterSupplier, Time time, StatusBackingStore statusBackingStore) { super(id, statusListener, initialState, loader, connectMetrics, errorHandlingMetrics, - retryWithToleranceOperator, transformationChain, errorReporterSupplier, time, statusBackingStore); + retryWithToleranceOperator, transformationChain, errorReporterSupplier, time, statusBackingStore, TestPlugins.noOpLoaderSwap()); } @Override diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index b4ad23b37a9..2f7af629f06 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -89,6 +89,7 @@ import org.apache.kafka.connect.util.FutureCallback; import org.apache.kafka.connect.util.SinkUtils; import org.apache.kafka.connect.util.TopicAdmin; +import org.apache.maven.artifact.versioning.VersionRange; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -322,7 +323,7 @@ public class WorkerTest { // Create mockKafkaClusterId(); - mockConnectorIsolation(connectorClass, sourceConnector); + mockVersionedConnectorIsolation(connectorClass, null, sourceConnector); mockExecutorRealSubmit(WorkerConnector.class); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); @@ -358,7 +359,7 @@ public class WorkerTest { verifyKafkaClusterId(); - verifyConnectorIsolation(sourceConnector); + verifyVersionedConnectorIsolation(connectorClass, null, sourceConnector); verifyExecutorSubmit(); verify(sourceConnector).initialize(any(ConnectorContext.class)); verify(sourceConnector).start(connectorProps); @@ -389,7 +390,8 @@ public class WorkerTest { mockKafkaClusterId(); mockGenericIsolation(); - when(plugins.newConnector(anyString())).thenThrow(exception); + when(plugins.pluginLoader(nonConnectorClass, null)).thenReturn(pluginLoader); + when(plugins.newConnector(nonConnectorClass, null)).thenThrow(exception); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); worker.herder = herder; @@ -414,7 +416,7 @@ public class WorkerTest { assertStatistics(worker, 0, 0); assertStartupStatistics(worker, 1, 1, 0, 0); - verify(plugins).newConnector(anyString()); + verify(plugins).newConnector(nonConnectorClass, null); verifyKafkaClusterId(); verifyGenericIsolation(); verify(connectorStatusListener).onFailure(eq(CONNECTOR_ID), any(ConnectException.class)); @@ -426,7 +428,7 @@ public class WorkerTest { setup(enableTopicCreation); final String connectorAlias = "SampleSourceConnector"; mockKafkaClusterId(); - mockConnectorIsolation(connectorAlias, sinkConnector); + mockVersionedConnectorIsolation(connectorAlias, null, sinkConnector); mockExecutorRealSubmit(WorkerConnector.class); connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorAlias); @@ -456,7 +458,7 @@ public class WorkerTest { assertStartupStatistics(worker, 1, 0, 0, 0); verifyKafkaClusterId(); - verifyConnectorIsolation(sinkConnector); + verifyVersionedConnectorIsolation(connectorAlias, null, sinkConnector); verifyExecutorSubmit(); verify(sinkConnector).initialize(any(ConnectorContext.class)); verify(sinkConnector).start(connectorProps); @@ -472,7 +474,7 @@ public class WorkerTest { final String shortConnectorAlias = "WorkerTest"; mockKafkaClusterId(); - mockConnectorIsolation(shortConnectorAlias, sinkConnector); + mockVersionedConnectorIsolation(shortConnectorAlias, null, sinkConnector); mockExecutorRealSubmit(WorkerConnector.class); connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, shortConnectorAlias); @@ -499,7 +501,7 @@ public class WorkerTest { assertStatistics(worker, 0, 0); verifyKafkaClusterId(); - verifyConnectorIsolation(sinkConnector); + verifyVersionedConnectorIsolation(shortConnectorAlias, null, sinkConnector); verify(sinkConnector).initialize(any(ConnectorContext.class)); verify(sinkConnector).start(connectorProps); verify(connectorStatusListener).onStartup(CONNECTOR_ID); @@ -531,7 +533,7 @@ public class WorkerTest { final String connectorClass = SampleSourceConnector.class.getName(); mockKafkaClusterId(); - mockConnectorIsolation(connectorClass, sinkConnector); + mockVersionedConnectorIsolation(connectorClass, null, sinkConnector); mockExecutorRealSubmit(WorkerConnector.class); Map taskProps = Collections.singletonMap("foo", "bar"); @@ -584,7 +586,7 @@ public class WorkerTest { assertStatistics(worker, 0, 0); verifyKafkaClusterId(); - verifyConnectorIsolation(sinkConnector); + verifyVersionedConnectorIsolation(connectorClass, null, sinkConnector); verifyExecutorSubmit(); verify(sinkConnector).initialize(any(ConnectorContext.class)); verify(sinkConnector).start(connectorProps); @@ -601,10 +603,10 @@ public class WorkerTest { public void testAddRemoveSourceTask(boolean enableTopicCreation) { setup(enableTopicCreation); mockKafkaClusterId(); - mockTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, task); - mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter); - mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter); - mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, taskHeaderConverter); + mockVersionedTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, null, sourceConnector, task); + mockVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG, taskKeyConverter); + mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, taskValueConverter); + mockVersionedTaskHeaderConverterFromConnector(taskHeaderConverter); mockExecutorFakeSubmit(WorkerTask.class); Map origProps = Collections.singletonMap(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); @@ -642,10 +644,10 @@ public class WorkerTest { assertStatistics(worker, 0, 0); verifyKafkaClusterId(); - verifyTaskIsolation(task); - verifyTaskConverter(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG); - verifyTaskConverter(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG); - verifyTaskHeaderConverter(); + verifyVersionedTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, null, task); + verifyVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG); + verifyVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG); + verifyVersionedTaskHeaderConverterFromConnector(); verifyExecutorSubmit(); } @@ -657,10 +659,10 @@ public class WorkerTest { // Most of the other cases use source tasks; we make sure to get code coverage for sink tasks here as well SinkTask task = mock(TestSinkTask.class); mockKafkaClusterId(); - mockTaskIsolation(SampleSinkConnector.class, TestSinkTask.class, task); - mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter); - mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter); - mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, taskHeaderConverter); + mockVersionedTaskIsolation(SampleSinkConnector.class, TestSinkTask.class, null, sinkConnector, task); + mockVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG, taskKeyConverter); + mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, taskValueConverter); + mockVersionedTaskHeaderConverterFromConnector(taskHeaderConverter); mockExecutorFakeSubmit(WorkerTask.class); Map origProps = Collections.singletonMap(TaskConfig.TASK_CLASS_CONFIG, TestSinkTask.class.getName()); @@ -700,10 +702,10 @@ public class WorkerTest { assertStatistics(worker, 0, 0); verifyKafkaClusterId(); - verifyTaskIsolation(task); - verifyTaskConverter(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG); - verifyTaskConverter(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG); - verifyTaskHeaderConverter(); + verifyVersionedTaskIsolation(SampleSinkConnector.class, TestSinkTask.class, null, task); + verifyVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG); + verifyVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG); + verifyVersionedTaskHeaderConverterFromConnector(); verifyExecutorSubmit(); } @@ -729,10 +731,10 @@ public class WorkerTest { config = new DistributedConfig(workerProps); mockKafkaClusterId(); - mockTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, task); - mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter); - mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter); - mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, taskHeaderConverter); + mockVersionedTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, null, sourceConnector, task); + mockVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG, taskKeyConverter); + mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, taskValueConverter); + mockVersionedTaskHeaderConverterFromConnector(taskHeaderConverter); mockExecutorFakeSubmit(WorkerTask.class); Runnable preProducer = mock(Runnable.class); @@ -774,10 +776,10 @@ public class WorkerTest { assertStatistics(worker, 0, 0); verifyKafkaClusterId(); - verifyTaskIsolation(task); - verifyTaskConverter(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG); - verifyTaskConverter(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG); - verifyTaskHeaderConverter(); + verifyVersionedTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, null, task); + verifyVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG); + verifyVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG); + verifyVersionedTaskHeaderConverterFromConnector(); verifyExecutorSubmit(); } @@ -794,11 +796,10 @@ public class WorkerTest { TaskConfig taskConfig = new TaskConfig(origProps); mockKafkaClusterId(); - mockTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, task); - // Expect that the worker will create converters and will find them using the current classloader ... - mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter); - mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter); - mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, taskHeaderConverter); + mockVersionedTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, null, sourceConnector, task); + mockVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG, taskKeyConverter); + mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, taskValueConverter); + mockVersionedTaskHeaderConverterFromConnector(taskHeaderConverter); mockExecutorFakeSubmit(WorkerTask.class); @@ -851,7 +852,7 @@ public class WorkerTest { verify(instantiatedTask).initialize(taskConfig); verify(herder, times(5)).taskStatus(TASK_ID); verifyKafkaClusterId(); - verifyTaskIsolation(task); + verifyVersionedTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, null, task); verifyExecutorSubmit(); verify(instantiatedTask, atLeastOnce()).id(); verify(instantiatedTask).awaitStop(anyLong()); @@ -860,9 +861,9 @@ public class WorkerTest { // Called when we stop the worker verify(instantiatedTask).loader(); verify(instantiatedTask).stop(); - verifyTaskConverter(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG); - verifyTaskConverter(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG); - verifyTaskHeaderConverter(); + verifyVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG); + verifyVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG); + verifyVersionedTaskHeaderConverterFromConnector(); } @ParameterizedTest @@ -907,6 +908,7 @@ public class WorkerTest { mockKafkaClusterId(); mockGenericIsolation(); + when(plugins.pluginLoader(SampleSourceConnector.class.getName(), null)).thenReturn(pluginLoader); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, noneConnectorClientConfigOverridePolicy); worker.herder = herder; @@ -935,14 +937,14 @@ public class WorkerTest { mockFileConfigProvider(); mockKafkaClusterId(); - mockTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, task); + mockVersionedTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, null, sourceConnector, task); // Expect that the worker will create converters and will not initially find them using the current classloader ... - mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, null); - mockTaskConverter(ClassLoaderUsage.PLUGINS, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter); - mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, null); - mockTaskConverter(ClassLoaderUsage.PLUGINS, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter); - mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null); - mockTaskHeaderConverter(ClassLoaderUsage.PLUGINS, taskHeaderConverter); + mockVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG, null); + mockVersionedTaskConverterFromWorker(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, WorkerConfig.KEY_CONVERTER_VERSION, taskKeyConverter); + mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, null); + mockVersionedTaskConverterFromWorker(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, WorkerConfig.VALUE_CONVERTER_VERSION, taskValueConverter); + mockVersionedTaskHeaderConverterFromConnector(null); + mockVersionedTaskHeaderConverterFromWorker(taskHeaderConverter); mockExecutorFakeSubmit(WorkerTask.class); Map origProps = Collections.singletonMap(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); @@ -968,7 +970,13 @@ public class WorkerTest { verify(constructedMockTask).awaitStop(anyLong()); verify(constructedMockTask).removeMetrics(); verifyKafkaClusterId(); - verifyTaskIsolation(task); + verifyVersionedTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, null, task); + verifyVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG); + verifyVersionedTaskConverterFromWorker(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, WorkerConfig.KEY_CONVERTER_VERSION); + verifyVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG); + verifyVersionedTaskConverterFromWorker(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, WorkerConfig.VALUE_CONVERTER_VERSION); + verifyVersionedTaskHeaderConverterFromConnector(); + verifyVersionedTaskHeaderConverterFromWorker(); verifyConverters(); verifyExecutorSubmit(); } @@ -985,14 +993,14 @@ public class WorkerTest { TaskConfig taskConfig = new TaskConfig(origProps); mockKafkaClusterId(); - mockTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, task); + mockVersionedTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, null, sourceConnector, task); // Expect that the worker will create converters and will not initially find them using the current classloader ... - mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, null); - mockTaskConverter(ClassLoaderUsage.PLUGINS, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter); - mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, null); - mockTaskConverter(ClassLoaderUsage.PLUGINS, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter); - mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null); - mockTaskHeaderConverter(ClassLoaderUsage.PLUGINS, taskHeaderConverter); + mockVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG, null); + mockVersionedTaskConverterFromWorker(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, WorkerConfig.KEY_CONVERTER_VERSION, taskKeyConverter); + mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, null); + mockVersionedTaskConverterFromWorker(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, WorkerConfig.VALUE_CONVERTER_VERSION, taskValueConverter); + mockVersionedTaskHeaderConverterFromConnector(null); + mockVersionedTaskHeaderConverterFromWorker(taskHeaderConverter); mockExecutorFakeSubmit(WorkerTask.class); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService, @@ -1024,7 +1032,13 @@ public class WorkerTest { verify(instantiatedTask).removeMetrics(); verifyKafkaClusterId(); - verifyTaskIsolation(task); + verifyVersionedTaskIsolation(SampleSourceConnector.class, TestSourceTask.class, null, task); + verifyVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG); + verifyVersionedTaskConverterFromWorker(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, WorkerConfig.KEY_CONVERTER_VERSION); + verifyVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG); + verifyVersionedTaskConverterFromWorker(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, WorkerConfig.VALUE_CONVERTER_VERSION); + verifyVersionedTaskHeaderConverterFromConnector(); + verifyVersionedTaskHeaderConverterFromWorker(); verifyExecutorSubmit(); verifyStorage(); } @@ -1860,7 +1874,7 @@ public class WorkerTest { @ParameterizedTest @ValueSource(booleans = {true, false}) - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) public void testZombieFencing(boolean enableTopicCreation) { setup(enableTopicCreation); Admin admin = mock(Admin.class); @@ -1878,6 +1892,8 @@ public class WorkerTest { mockKafkaClusterId(); mockGenericIsolation(); + when(plugins.connectorClass(anyString(), any())).thenReturn((Class) sourceConnector.getClass()); + when(plugins.pluginLoader(SampleSourceConnector.class.getName(), null)).thenReturn(pluginLoader); worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService, allConnectorClientConfigOverridePolicy, mockAdminConstructor); @@ -2087,7 +2103,8 @@ public class WorkerTest { worker.start(); mockGenericIsolation(); - when(plugins.newConnector(anyString())).thenReturn(sourceConnector); + when(plugins.newConnector(anyString(), any())).thenReturn(sourceConnector); + when(plugins.pluginLoader(SampleSourceConnector.class.getName(), null)).thenReturn(pluginLoader); when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg()); when(sourceConnector.alterOffsets(eq(connectorProps), anyMap())).thenThrow(new UnsupportedOperationException("This connector doesn't " + "support altering of offsets")); @@ -2679,7 +2696,7 @@ public class WorkerTest { String connectorClass = SampleSourceConnector.class.getName(); connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass); connectorProps.put(TASKS_MAX_ENFORCE_CONFIG, Boolean.toString(enforced)); - mockConnectorIsolation(connectorClass, sourceConnector); + mockVersionedConnectorIsolation(connectorClass, null, sourceConnector); mockExecutorRealSubmit(WorkerConnector.class); @@ -2851,10 +2868,10 @@ public class WorkerTest { tasksMaxExceededMessage = failureCaptor.getValue().getMessage(); } else { - mockTaskIsolation(SampleSinkConnector.class, TestSinkTask.class, task); - mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter); - mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter); - mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, taskHeaderConverter); + mockVersionedTaskIsolation(SampleSinkConnector.class, TestSinkTask.class, null, sinkConnector, task); + mockVersionedTaskConverterFromConnector(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, ConnectorConfig.KEY_CONVERTER_VERSION_CONFIG, taskKeyConverter); + mockVersionedTaskConverterFromConnector(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, ConnectorConfig.VALUE_CONVERTER_VERSION_CONFIG, taskValueConverter); + mockVersionedTaskHeaderConverterFromConnector(taskHeaderConverter); mockExecutorFakeSubmit(WorkerTask.class); assertTrue(worker.startSinkTask( @@ -2964,8 +2981,20 @@ public class WorkerTest { .thenReturn(returning); } - private void verifyTaskConverter(String converterClassConfig) { - verify(plugins).newConverter(any(AbstractConfig.class), eq(converterClassConfig), eq(ClassLoaderUsage.CURRENT_CLASSLOADER)); + private void mockVersionedTaskConverterFromConnector(String converterClassConfig, String converterVersionConfig, Converter returning) { + when(plugins.newConverter(any(ConnectorConfig.class), eq(converterClassConfig), eq(converterVersionConfig))).thenReturn(returning); + } + + private void verifyVersionedTaskConverterFromConnector(String converterClassConfig, String converterVersionConfig) { + verify(plugins).newConverter(any(ConnectorConfig.class), eq(converterClassConfig), eq(converterVersionConfig)); + } + + private void mockVersionedTaskConverterFromWorker(String converterClassConfig, String converterVersionConfig, Converter returning) { + when(plugins.newConverter(any(WorkerConfig.class), eq(converterClassConfig), eq(converterVersionConfig))).thenReturn(returning); + } + + private void verifyVersionedTaskConverterFromWorker(String converterClassConfig, String converterVersionConfig) { + verify(plugins).newConverter(any(WorkerConfig.class), eq(converterClassConfig), eq(converterVersionConfig)); } private void mockTaskHeaderConverter(ClassLoaderUsage classLoaderUsage, HeaderConverter returning) { @@ -2977,8 +3006,25 @@ public class WorkerTest { verify(plugins).newHeaderConverter(any(AbstractConfig.class), eq(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG), eq(ClassLoaderUsage.CURRENT_CLASSLOADER)); } + private void mockVersionedTaskHeaderConverterFromConnector(HeaderConverter returning) { + when(plugins.newHeaderConverter(any(ConnectorConfig.class), eq(ConnectorConfig.HEADER_CONVERTER_CLASS_CONFIG), eq(ConnectorConfig.HEADER_CONVERTER_VERSION_CONFIG))) + .thenReturn(returning); + } + + private void verifyVersionedTaskHeaderConverterFromConnector() { + verify(plugins).newHeaderConverter(any(ConnectorConfig.class), eq(ConnectorConfig.HEADER_CONVERTER_CLASS_CONFIG), eq(ConnectorConfig.HEADER_CONVERTER_VERSION_CONFIG)); + } + + private void mockVersionedTaskHeaderConverterFromWorker(HeaderConverter returning) { + when(plugins.newHeaderConverter(any(WorkerConfig.class), eq(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG), eq(WorkerConfig.HEADER_CONVERTER_VERSION))) + .thenReturn(returning); + } + + private void verifyVersionedTaskHeaderConverterFromWorker() { + verify(plugins).newHeaderConverter(any(WorkerConfig.class), eq(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG), eq(WorkerConfig.HEADER_CONVERTER_VERSION)); + } + private void mockGenericIsolation() { - when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader); when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap); } @@ -2993,12 +3039,26 @@ public class WorkerTest { when(connector.version()).thenReturn("1.0"); } + private void mockVersionedConnectorIsolation(String connectorClass, VersionRange range, Connector connector) { + mockGenericIsolation(); + when(plugins.pluginLoader(connectorClass, range)).thenReturn(pluginLoader); + when(plugins.newConnector(connectorClass, range)).thenReturn(connector); + when(connector.version()).thenReturn(range == null ? "unknown" : range.toString()); + } + private void verifyConnectorIsolation(Connector connector) { verifyGenericIsolation(); verify(plugins).newConnector(anyString()); verify(connector, atLeastOnce()).version(); } + private void verifyVersionedConnectorIsolation(String connectorClass, VersionRange range, Connector connector) { + verifyGenericIsolation(); + verify(plugins).pluginLoader(connectorClass, range); + verify(plugins).newConnector(connectorClass, range); + verify(connector, atLeastOnce()).version(); + } + private void mockTaskIsolation(Class connector, Class taskClass, Task task) { mockGenericIsolation(); doReturn(connector).when(plugins).connectorClass(connector.getName()); @@ -3006,6 +3066,15 @@ public class WorkerTest { when(task.version()).thenReturn("1.0"); } + @SuppressWarnings({"unchecked", "rawtypes"}) + private void mockVersionedTaskIsolation(Class connectorClass, Class taskClass, VersionRange range, Connector connector, Task task) { + mockGenericIsolation(); + when(plugins.pluginLoader(connectorClass.getName(), range)).thenReturn(pluginLoader); + when(plugins.connectorClass(connectorClass.getName(), range)).thenReturn((Class) connectorClass); + when(plugins.newTask(taskClass)).thenReturn(task); + when(task.version()).thenReturn(range == null ? "unknown" : range.toString()); + } + private void verifyTaskIsolation(Task task) { verifyGenericIsolation(); verify(plugins).connectorClass(anyString()); @@ -3013,6 +3082,14 @@ public class WorkerTest { verify(task).version(); } + private void verifyVersionedTaskIsolation(Class connectorClass, Class taskClass, VersionRange range, Task task) { + verifyGenericIsolation(); + verify(plugins).pluginLoader(connectorClass.getName(), range); + verify(plugins).connectorClass(connectorClass.getName(), range); + verify(plugins).newTask(taskClass); + verify(task).version(); + } + private void mockExecutorRealSubmit(Class runnableClass) { // This test expects the runnable to be executed, so have the isolated runnable pass-through. // Requires using the Worker constructor without the mocked executorService diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java index d4cac9484e6..c91fa1017ce 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.internals.Plugin; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; +import org.apache.kafka.connect.runtime.isolation.TestPlugins; import org.apache.kafka.connect.storage.AppliedConnectorConfig; import org.apache.kafka.connect.storage.ClusterConfigState; import org.apache.kafka.connect.transforms.Transformation; @@ -195,7 +196,8 @@ public class WorkerTestUtils { TransformationStage stage = new TransformationStage<>( predicatePlugin, false, - transformationPlugin); + transformationPlugin, + TestPlugins.noOpLoaderSwap()); TransformationChain realTransformationChainRetriableException = new TransformationChain(List.of(stage), toleranceOperator); TransformationChain transformationChainRetriableException = Mockito.spy(realTransformationChainRetriableException); return transformationChainRetriableException; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java index 55a3445a331..ca4c29931d0 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java @@ -69,7 +69,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -197,16 +196,9 @@ public class PluginsTest { props.remove(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG); createConfig(); - // Because it's not explicitly set on the supplied configuration, the logic to use the current classloader for the connector - // will exit immediately, and so this method always returns null HeaderConverter headerConverter = plugins.newHeaderConverter(config, - WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, - ClassLoaderUsage.CURRENT_CLASSLOADER); - assertNull(headerConverter); - // But we should always find it (or the worker's default) when using the plugins classloader ... - headerConverter = plugins.newHeaderConverter(config, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, - ClassLoaderUsage.PLUGINS); + ClassLoaderUsage.CURRENT_CLASSLOADER); assertNotNull(headerConverter); assertInstanceOf(SimpleHeaderConverter.class, headerConverter); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java index adb2c2418d5..0e472a3ebe0 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java @@ -39,6 +39,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.function.Function; import java.util.function.Predicate; import java.util.jar.Attributes; import java.util.jar.JarEntry; @@ -374,6 +375,12 @@ public class TestPlugins { .collect(Collectors.toList()); } + public static Function noOpLoaderSwap() { + return classLoader -> { + return new LoaderSwap(Thread.currentThread().getContextClassLoader()); + }; + } + private static TestPlugin[] defaultPlugins() { return Arrays.stream(TestPlugin.values()) .filter(TestPlugin::includeByDefault) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java index 29a9426e337..ca358f18f43 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java @@ -519,7 +519,7 @@ public class TopicCreationTest { topicCreation.addTopic(FOO_TOPIC); assertFalse(topicCreation.isTopicCreationRequired(FOO_TOPIC)); - List> transformationStages = sourceConfig.transformationStages(CONNECTOR_TASK_ID, METRICS); + List> transformationStages = sourceConfig.transformationStages(MOCK_PLUGINS, CONNECTOR_TASK_ID, METRICS); assertEquals(1, transformationStages.size()); TransformationStage xform = transformationStages.get(0); SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, null, Schema.INT8_SCHEMA, 42)); @@ -626,7 +626,7 @@ public class TopicCreationTest { assertEquals(barPartitions, barTopicSpec.numPartitions()); assertEquals(barTopicProps, barTopicSpec.configs()); - List> transformationStages = sourceConfig.transformationStages(CONNECTOR_TASK_ID, METRICS); + List> transformationStages = sourceConfig.transformationStages(MOCK_PLUGINS, CONNECTOR_TASK_ID, METRICS); assertEquals(2, transformationStages.size()); TransformationStage castXForm = transformationStages.get(0);