diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index cfb8ae0d9d8..607a7244d2e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.ConfigKey; import org.apache.kafka.common.config.ConfigDef.Type; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.errors.NotFoundException; @@ -229,10 +228,10 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con } @Override - public ConfigInfos validateConnectorConfig(Map connectorConfig) { - String connType = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + public ConfigInfos validateConnectorConfig(Map connectorProps) { + String connType = connectorProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); if (connType == null) - throw new BadRequestException("Connector config " + connectorConfig + " contains no connector type"); + throw new BadRequestException("Connector config " + connectorProps + " contains no connector type"); List configValues = new ArrayList<>(); Map configKeys = new LinkedHashMap<>(); @@ -241,43 +240,26 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con Connector connector = getConnector(connType); ClassLoader savedLoader = plugins().compareAndSwapLoaders(connector); try { - // do basic connector validation (name, connector type, etc.) - ConfigDef basicConfigDef = (connector instanceof SourceConnector) - ? SourceConnectorConfig.configDef() - : SinkConnectorConfig.configDef(); + ConfigDef baseConfigDef = (connector instanceof SourceConnector) + ? SourceConnectorConfig.configDef() + : SinkConnectorConfig.configDef(); + ConfigDef enrichedConfigDef = ConnectorConfig.enrich(plugins(), baseConfigDef, connectorProps, false); Map validatedConnectorConfig = validateBasicConnectorConfig( connector, - basicConfigDef, - connectorConfig + enrichedConfigDef, + connectorProps ); configValues.addAll(validatedConnectorConfig.values()); - configKeys.putAll(basicConfigDef.configKeys()); - allGroups.addAll(basicConfigDef.groups()); - - ConnectorConfig connectorConfigToEnrich = (connector instanceof SourceConnector) - ? new SourceConnectorConfig(plugins(), connectorConfig) - : new SinkConnectorConfig(plugins(), connectorConfig); - final ConfigDef connectorConfigDef = connectorConfigToEnrich.enrich( - plugins(), - basicConfigDef, - connectorConfig, - false - ); - - // Override is required here after the enriched ConfigDef has been created successfully - configKeys.putAll(connectorConfigDef.configKeys()); - allGroups.addAll(connectorConfigDef.groups()); + configKeys.putAll(enrichedConfigDef.configKeys()); + allGroups.addAll(enrichedConfigDef.groups()); // do custom connector-specific validation - Config config = connector.validate(connectorConfig); + Config config = connector.validate(connectorProps); ConfigDef configDef = connector.config(); configKeys.putAll(configDef.configKeys()); allGroups.addAll(configDef.groups()); configValues.addAll(config.configValues()); return generateResult(connType, configKeys, configValues, new ArrayList<>(allGroups)); - } catch (ConfigException e) { - // Basic validation must have failed. Return the result. - return generateResult(connType, configKeys, configValues, new ArrayList<>(allGroups)); } finally { Plugins.compareAndSwapLoaders(savedLoader); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index 869cfbd79d2..0f8c39088eb 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -164,7 +164,7 @@ public class ConnectorConfig extends AbstractConfig { *

* {@code requireFullConfig} specifies whether required config values that are missing should cause an exception to be thrown. */ - public ConfigDef enrich(Plugins plugins, ConfigDef baseConfigDef, Map props, boolean requireFullConfig) { + public static ConfigDef enrich(Plugins plugins, ConfigDef baseConfigDef, Map props, boolean requireFullConfig) { Object transformAliases = ConfigDef.parseType(TRANSFORMS_CONFIG, props.get(TRANSFORMS_CONFIG), Type.LIST); if (!(transformAliases instanceof List)) { return baseConfigDef; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java index 69154219ade..ab8fd01c035 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java @@ -23,7 +23,11 @@ import java.util.Map; public class SourceConnectorConfig extends ConnectorConfig { - private static ConfigDef config = configDef(); + private static ConfigDef config = ConnectorConfig.configDef(); + + public static ConfigDef configDef() { + return config; + } public SourceConnectorConfig(Plugins plugins, Map props) { super(plugins, config, props); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index b9276af3cf0..c261ab6c53d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -16,9 +16,17 @@ */ package org.apache.kafka.connect.runtime; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.runtime.isolation.PluginDesc; +import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; +import org.apache.kafka.connect.runtime.rest.errors.BadRequestException; import org.apache.kafka.connect.storage.ConfigBackingStore; import org.apache.kafka.connect.storage.StatusBackingStore; +import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.util.ConnectorTaskId; import org.easymock.Capture; import org.easymock.EasyMock; @@ -26,19 +34,29 @@ import org.easymock.EasyMockSupport; import org.easymock.IAnswer; import org.junit.Test; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class AbstractHerderTest extends EasyMockSupport { + private final Worker worker = strictMock(Worker.class); + private final String workerId = "workerId"; + private final int generation = 5; + private final String connector = "connector"; + private final Plugins plugins = strictMock(Plugins.class); + private final ClassLoader classLoader = strictMock(ClassLoader.class); @Test public void connectorStatus() { - Worker worker = null; - String workerId = "workerId"; - String connector = "connector"; - int generation = 5; ConnectorTaskId taskId = new ConnectorTaskId(connector, 0); ConfigBackingStore configStore = strictMock(ConfigBackingStore.class); @@ -79,7 +97,6 @@ public class AbstractHerderTest extends EasyMockSupport { @Test public void taskStatus() { - Worker worker = null; ConnectorTaskId taskId = new ConnectorTaskId("connector", 0); String workerId = "workerId"; @@ -117,4 +134,135 @@ public class AbstractHerderTest extends EasyMockSupport { verifyAll(); } + + + @Test(expected = BadRequestException.class) + public void testConfigValidationEmptyConfig() { + AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class); + replayAll(); + + herder.validateConnectorConfig(new HashMap()); + + verifyAll(); + } + + @Test() + public void testConfigValidationMissingName() { + AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class); + replayAll(); + + Map config = Collections.singletonMap(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSourceConnector.class.getName()); + ConfigInfos result = herder.validateConnectorConfig(config); + + // We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on + // the config fields for SourceConnectorConfig, but we expect these to change rarely. + assertEquals(TestSourceConnector.class.getName(), result.name()); + assertEquals(Arrays.asList(ConnectorConfig.COMMON_GROUP, ConnectorConfig.TRANSFORMS_GROUP), result.groups()); + assertEquals(2, result.errorCount()); + // Base connector config has 6 fields, connector's configs add 2 + assertEquals(8, result.values().size()); + // Missing name should generate an error + assertEquals(ConnectorConfig.NAME_CONFIG, result.values().get(0).configValue().name()); + assertEquals(1, result.values().get(0).configValue().errors().size()); + // "required" config from connector should generate an error + assertEquals("required", result.values().get(6).configValue().name()); + assertEquals(1, result.values().get(6).configValue().errors().size()); + + verifyAll(); + } + + @Test() + public void testConfigValidationTransformsExtendResults() { + AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class); + + // 2 transform aliases defined -> 2 plugin lookups + Set> transformations = new HashSet<>(); + transformations.add(new PluginDesc(SampleTransformation.class, "1.0", classLoader)); + EasyMock.expect(plugins.transformations()).andReturn(transformations).times(2); + + replayAll(); + + // Define 2 transformations. One has a class defined and so can get embedded configs, the other is missing + // class info that should generate an error. + Map config = new HashMap<>(); + config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSourceConnector.class.getName()); + config.put(ConnectorConfig.NAME_CONFIG, "connector-name"); + config.put(ConnectorConfig.TRANSFORMS_CONFIG, "xformA,xformB"); + config.put(ConnectorConfig.TRANSFORMS_CONFIG + ".xformA.type", SampleTransformation.class.getName()); + config.put("required", "value"); // connector required config + ConfigInfos result = herder.validateConnectorConfig(config); + + // We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on + // the config fields for SourceConnectorConfig, but we expect these to change rarely. + assertEquals(TestSourceConnector.class.getName(), result.name()); + // Each transform also gets its own group + List expectedGroups = Arrays.asList( + ConnectorConfig.COMMON_GROUP, + ConnectorConfig.TRANSFORMS_GROUP, + "Transforms: xformA", + "Transforms: xformB" + ); + assertEquals(expectedGroups, result.groups()); + assertEquals(2, result.errorCount()); + // Base connector config has 6 fields, connector's configs add 2, 2 type fields from the transforms, and + // 1 from the valid transformation's config + assertEquals(11, result.values().size()); + // Should get 2 type fields from the transforms, first adds its own config since it has a valid class + assertEquals("transforms.xformA.type", result.values().get(6).configValue().name()); + assertTrue(result.values().get(6).configValue().errors().isEmpty()); + assertEquals("transforms.xformA.subconfig", result.values().get(7).configValue().name()); + assertEquals("transforms.xformB.type", result.values().get(8).configValue().name()); + assertFalse(result.values().get(8).configValue().errors().isEmpty()); + + verifyAll(); + } + + private AbstractHerder createConfigValidationHerder(Class connectorClass) { + + + ConfigBackingStore configStore = strictMock(ConfigBackingStore.class); + StatusBackingStore statusStore = strictMock(StatusBackingStore.class); + + AbstractHerder herder = partialMockBuilder(AbstractHerder.class) + .withConstructor(Worker.class, String.class, StatusBackingStore.class, ConfigBackingStore.class) + .withArgs(worker, workerId, statusStore, configStore) + .addMockedMethod("generation") + .createMock(); + EasyMock.expect(herder.generation()).andStubReturn(generation); + + // Call to validateConnectorConfig + EasyMock.expect(worker.getPlugins()).andStubReturn(plugins); + final Connector connector; + try { + connector = connectorClass.newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + throw new RuntimeException("Couldn't create connector", e); + } + EasyMock.expect(plugins.newConnector(connectorClass.getName())).andReturn(connector); + EasyMock.expect(plugins.compareAndSwapLoaders(connector)).andReturn(classLoader); + return herder; + } + + public static class SampleTransformation> implements Transformation { + @Override + public void configure(Map configs) { + + } + + @Override + public R apply(R record) { + return record; + } + + @Override + public ConfigDef config() { + return new ConfigDef() + .define("subconfig", ConfigDef.Type.STRING, "default", ConfigDef.Importance.LOW, "docs"); + } + + @Override + public void close() { + + } + } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TestSinkConnector.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TestSinkConnector.java new file mode 100644 index 00000000000..a6e3bb17bd8 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TestSinkConnector.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.sink.SinkConnector; + +import java.util.List; +import java.util.Map; + +public class TestSinkConnector extends SinkConnector { + + public static final String VERSION = "some great version"; + + @Override + public String version() { + return VERSION; + } + + @Override + public void start(Map props) { + + } + + @Override + public Class taskClass() { + return null; + } + + @Override + public List> taskConfigs(int maxTasks) { + return null; + } + + @Override + public void stop() { + + } + + @Override + public ConfigDef config() { + return new ConfigDef() + .define("required", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "required docs") + .define("optional", ConfigDef.Type.STRING, "defaultVal", ConfigDef.Importance.HIGH, "optional docs"); + } +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TestSourceConnector.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TestSourceConnector.java new file mode 100644 index 00000000000..5f754e218a8 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TestSourceConnector.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.source.SourceConnector; + +import java.util.List; +import java.util.Map; + +public class TestSourceConnector extends SourceConnector { + + public static final String VERSION = "an entirely different version"; + + @Override + public String version() { + return VERSION; + } + + @Override + public void start(Map props) { + + } + + @Override + public Class taskClass() { + return null; + } + + @Override + public List> taskConfigs(int maxTasks) { + return null; + } + + @Override + public void stop() { + + } + + @Override + public ConfigDef config() { + return new ConfigDef() + .define("required", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "required docs") + .define("optional", ConfigDef.Type.STRING, "defaultVal", ConfigDef.Importance.HIGH, "optional docs"); + } +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 18d83c55937..7834a891120 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -320,7 +320,7 @@ public class DistributedHerderTest { // config validation Connector connectorMock = PowerMock.createMock(Connector.class); - EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4); + EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef()); @@ -362,6 +362,11 @@ public class DistributedHerderTest { EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); + + EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef()); + ConfigValue validatedValue = new ConfigValue("foo.bar"); + EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(singletonList(validatedValue))); + EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader); // CONN2 creation should fail @@ -396,7 +401,7 @@ public class DistributedHerderTest { // config validation Connector connectorMock = PowerMock.createMock(Connector.class); - EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4); + EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); @@ -444,7 +449,7 @@ public class DistributedHerderTest { // config validation Connector connectorMock = PowerMock.createMock(SinkConnector.class); - EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4); + EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef()); @@ -1263,7 +1268,7 @@ public class DistributedHerderTest { // config validation Connector connectorMock = PowerMock.createMock(Connector.class); - EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(6); + EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(5); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef()); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java index 2d0448eb500..538e5dbb762 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java @@ -30,6 +30,8 @@ import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.runtime.AbstractHerder; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.TestSinkConnector; +import org.apache.kafka.connect.runtime.TestSourceConnector; import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.isolation.PluginDesc; import org.apache.kafka.connect.runtime.isolation.Plugins; @@ -493,76 +495,6 @@ public class ConnectorPluginsResourceTest { } } - public static class TestSinkConnector extends SinkConnector { - - static final String VERSION = "some great version"; - - @Override - public String version() { - return VERSION; - } - - @Override - public void start(Map props) { - - } - - @Override - public Class taskClass() { - return null; - } - - @Override - public List> taskConfigs(int maxTasks) { - return null; - } - - @Override - public void stop() { - - } - - @Override - public ConfigDef config() { - return null; - } - } - - public static class TestSourceConnector extends SourceConnector { - - static final String VERSION = "an entirely different version"; - - @Override - public String version() { - return VERSION; - } - - @Override - public void start(Map props) { - - } - - @Override - public Class taskClass() { - return null; - } - - @Override - public List> taskConfigs(int maxTasks) { - return null; - } - - @Override - public void stop() { - - } - - @Override - public ConfigDef config() { - return null; - } - } - /* Name here needs to be unique as we are testing the aliasing mechanism */ public static class ConnectorPluginsResourceTestConnector extends Connector { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java index 1c3dddbd43f..c58d70265d3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java @@ -128,6 +128,7 @@ public class StandaloneHerderTest { @Test public void testCreateConnectorFailedBasicValidation() throws Exception { + // Basic validation should be performed and return an error, but should still evaluate the connector's config connector = PowerMock.createMock(BogusSourceConnector.class); Map config = connectorConfig(SourceSink.SOURCE); @@ -137,7 +138,11 @@ public class StandaloneHerderTest { EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); + EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef()); + + ConfigValue validatedValue = new ConfigValue("foo.bar"); + EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(singletonList(validatedValue))); EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader); createCallback.onCompletion(EasyMock.anyObject(), EasyMock.>isNull()); @@ -155,7 +160,7 @@ public class StandaloneHerderTest { connector = PowerMock.createMock(BogusSourceConnector.class); Connector connectorMock = PowerMock.createMock(Connector.class); - EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4); + EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); @@ -189,7 +194,7 @@ public class StandaloneHerderTest { Connector connectorMock = PowerMock.createMock(Connector.class); expectConfigValidation(connectorMock, true, config, config); - EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); + EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(2); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); // No new connector is created EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader); @@ -522,7 +527,7 @@ public class StandaloneHerderTest { ); ConfigDef configDef = new ConfigDef(); configDef.define(key, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, ""); - EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4); + EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); EasyMock.expect(worker.getPlugins()).andStubReturn(plugins); EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock); @@ -620,7 +625,7 @@ public class StandaloneHerderTest { Map... configs ) { // config validation - EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4); + EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3); EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader); if (shouldCreateConnector) { EasyMock.expect(worker.getPlugins()).andReturn(plugins);