KAFKA-5475: Connector config validation should include fields for defined transformation aliases

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Jason Gustafson <jason@confluent.io>

Closes #3399 from ewencp/kafka-5475-validation-transformations

(cherry picked from commit 96587f4b1f)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>
This commit is contained in:
Ewen Cheslack-Postava 2017-06-21 14:20:48 -07:00
parent 9aef218f0d
commit 1d65f15f2b
9 changed files with 313 additions and 115 deletions

View File

@ -20,7 +20,6 @@ import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.ConfigKey;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.errors.NotFoundException;
@ -229,10 +228,10 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
}
@Override
public ConfigInfos validateConnectorConfig(Map<String, String> connectorConfig) {
String connType = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
public ConfigInfos validateConnectorConfig(Map<String, String> connectorProps) {
String connType = connectorProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
if (connType == null)
throw new BadRequestException("Connector config " + connectorConfig + " contains no connector type");
throw new BadRequestException("Connector config " + connectorProps + " contains no connector type");
List<ConfigValue> configValues = new ArrayList<>();
Map<String, ConfigKey> configKeys = new LinkedHashMap<>();
@ -241,43 +240,26 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
Connector connector = getConnector(connType);
ClassLoader savedLoader = plugins().compareAndSwapLoaders(connector);
try {
// do basic connector validation (name, connector type, etc.)
ConfigDef basicConfigDef = (connector instanceof SourceConnector)
ConfigDef baseConfigDef = (connector instanceof SourceConnector)
? SourceConnectorConfig.configDef()
: SinkConnectorConfig.configDef();
ConfigDef enrichedConfigDef = ConnectorConfig.enrich(plugins(), baseConfigDef, connectorProps, false);
Map<String, ConfigValue> validatedConnectorConfig = validateBasicConnectorConfig(
connector,
basicConfigDef,
connectorConfig
enrichedConfigDef,
connectorProps
);
configValues.addAll(validatedConnectorConfig.values());
configKeys.putAll(basicConfigDef.configKeys());
allGroups.addAll(basicConfigDef.groups());
ConnectorConfig connectorConfigToEnrich = (connector instanceof SourceConnector)
? new SourceConnectorConfig(plugins(), connectorConfig)
: new SinkConnectorConfig(plugins(), connectorConfig);
final ConfigDef connectorConfigDef = connectorConfigToEnrich.enrich(
plugins(),
basicConfigDef,
connectorConfig,
false
);
// Override is required here after the enriched ConfigDef has been created successfully
configKeys.putAll(connectorConfigDef.configKeys());
allGroups.addAll(connectorConfigDef.groups());
configKeys.putAll(enrichedConfigDef.configKeys());
allGroups.addAll(enrichedConfigDef.groups());
// do custom connector-specific validation
Config config = connector.validate(connectorConfig);
Config config = connector.validate(connectorProps);
ConfigDef configDef = connector.config();
configKeys.putAll(configDef.configKeys());
allGroups.addAll(configDef.groups());
configValues.addAll(config.configValues());
return generateResult(connType, configKeys, configValues, new ArrayList<>(allGroups));
} catch (ConfigException e) {
// Basic validation must have failed. Return the result.
return generateResult(connType, configKeys, configValues, new ArrayList<>(allGroups));
} finally {
Plugins.compareAndSwapLoaders(savedLoader);
}

View File

@ -164,7 +164,7 @@ public class ConnectorConfig extends AbstractConfig {
* <p>
* {@code requireFullConfig} specifies whether required config values that are missing should cause an exception to be thrown.
*/
public ConfigDef enrich(Plugins plugins, ConfigDef baseConfigDef, Map<String, String> props, boolean requireFullConfig) {
public static ConfigDef enrich(Plugins plugins, ConfigDef baseConfigDef, Map<String, String> props, boolean requireFullConfig) {
Object transformAliases = ConfigDef.parseType(TRANSFORMS_CONFIG, props.get(TRANSFORMS_CONFIG), Type.LIST);
if (!(transformAliases instanceof List)) {
return baseConfigDef;

View File

@ -23,7 +23,11 @@ import java.util.Map;
public class SourceConnectorConfig extends ConnectorConfig {
private static ConfigDef config = configDef();
private static ConfigDef config = ConnectorConfig.configDef();
public static ConfigDef configDef() {
return config;
}
public SourceConnectorConfig(Plugins plugins, Map<String, String> props) {
super(plugins, config, props);

View File

@ -16,9 +16,17 @@
*/
package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.easymock.Capture;
import org.easymock.EasyMock;
@ -26,19 +34,29 @@ import org.easymock.EasyMockSupport;
import org.easymock.IAnswer;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class AbstractHerderTest extends EasyMockSupport {
private final Worker worker = strictMock(Worker.class);
private final String workerId = "workerId";
private final int generation = 5;
private final String connector = "connector";
private final Plugins plugins = strictMock(Plugins.class);
private final ClassLoader classLoader = strictMock(ClassLoader.class);
@Test
public void connectorStatus() {
Worker worker = null;
String workerId = "workerId";
String connector = "connector";
int generation = 5;
ConnectorTaskId taskId = new ConnectorTaskId(connector, 0);
ConfigBackingStore configStore = strictMock(ConfigBackingStore.class);
@ -79,7 +97,6 @@ public class AbstractHerderTest extends EasyMockSupport {
@Test
public void taskStatus() {
Worker worker = null;
ConnectorTaskId taskId = new ConnectorTaskId("connector", 0);
String workerId = "workerId";
@ -117,4 +134,135 @@ public class AbstractHerderTest extends EasyMockSupport {
verifyAll();
}
@Test(expected = BadRequestException.class)
public void testConfigValidationEmptyConfig() {
AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class);
replayAll();
herder.validateConnectorConfig(new HashMap<String, String>());
verifyAll();
}
@Test()
public void testConfigValidationMissingName() {
AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class);
replayAll();
Map<String, String> config = Collections.singletonMap(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSourceConnector.class.getName());
ConfigInfos result = herder.validateConnectorConfig(config);
// We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on
// the config fields for SourceConnectorConfig, but we expect these to change rarely.
assertEquals(TestSourceConnector.class.getName(), result.name());
assertEquals(Arrays.asList(ConnectorConfig.COMMON_GROUP, ConnectorConfig.TRANSFORMS_GROUP), result.groups());
assertEquals(2, result.errorCount());
// Base connector config has 6 fields, connector's configs add 2
assertEquals(8, result.values().size());
// Missing name should generate an error
assertEquals(ConnectorConfig.NAME_CONFIG, result.values().get(0).configValue().name());
assertEquals(1, result.values().get(0).configValue().errors().size());
// "required" config from connector should generate an error
assertEquals("required", result.values().get(6).configValue().name());
assertEquals(1, result.values().get(6).configValue().errors().size());
verifyAll();
}
@Test()
public void testConfigValidationTransformsExtendResults() {
AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class);
// 2 transform aliases defined -> 2 plugin lookups
Set<PluginDesc<Transformation>> transformations = new HashSet<>();
transformations.add(new PluginDesc<Transformation>(SampleTransformation.class, "1.0", classLoader));
EasyMock.expect(plugins.transformations()).andReturn(transformations).times(2);
replayAll();
// Define 2 transformations. One has a class defined and so can get embedded configs, the other is missing
// class info that should generate an error.
Map<String, String> config = new HashMap<>();
config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSourceConnector.class.getName());
config.put(ConnectorConfig.NAME_CONFIG, "connector-name");
config.put(ConnectorConfig.TRANSFORMS_CONFIG, "xformA,xformB");
config.put(ConnectorConfig.TRANSFORMS_CONFIG + ".xformA.type", SampleTransformation.class.getName());
config.put("required", "value"); // connector required config
ConfigInfos result = herder.validateConnectorConfig(config);
// We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on
// the config fields for SourceConnectorConfig, but we expect these to change rarely.
assertEquals(TestSourceConnector.class.getName(), result.name());
// Each transform also gets its own group
List<String> expectedGroups = Arrays.asList(
ConnectorConfig.COMMON_GROUP,
ConnectorConfig.TRANSFORMS_GROUP,
"Transforms: xformA",
"Transforms: xformB"
);
assertEquals(expectedGroups, result.groups());
assertEquals(2, result.errorCount());
// Base connector config has 6 fields, connector's configs add 2, 2 type fields from the transforms, and
// 1 from the valid transformation's config
assertEquals(11, result.values().size());
// Should get 2 type fields from the transforms, first adds its own config since it has a valid class
assertEquals("transforms.xformA.type", result.values().get(6).configValue().name());
assertTrue(result.values().get(6).configValue().errors().isEmpty());
assertEquals("transforms.xformA.subconfig", result.values().get(7).configValue().name());
assertEquals("transforms.xformB.type", result.values().get(8).configValue().name());
assertFalse(result.values().get(8).configValue().errors().isEmpty());
verifyAll();
}
private AbstractHerder createConfigValidationHerder(Class<? extends Connector> connectorClass) {
ConfigBackingStore configStore = strictMock(ConfigBackingStore.class);
StatusBackingStore statusStore = strictMock(StatusBackingStore.class);
AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
.withConstructor(Worker.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
.withArgs(worker, workerId, statusStore, configStore)
.addMockedMethod("generation")
.createMock();
EasyMock.expect(herder.generation()).andStubReturn(generation);
// Call to validateConnectorConfig
EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
final Connector connector;
try {
connector = connectorClass.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException("Couldn't create connector", e);
}
EasyMock.expect(plugins.newConnector(connectorClass.getName())).andReturn(connector);
EasyMock.expect(plugins.compareAndSwapLoaders(connector)).andReturn(classLoader);
return herder;
}
public static class SampleTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public R apply(R record) {
return record;
}
@Override
public ConfigDef config() {
return new ConfigDef()
.define("subconfig", ConfigDef.Type.STRING, "default", ConfigDef.Importance.LOW, "docs");
}
@Override
public void close() {
}
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
import java.util.List;
import java.util.Map;
public class TestSinkConnector extends SinkConnector {
public static final String VERSION = "some great version";
@Override
public String version() {
return VERSION;
}
@Override
public void start(Map<String, String> props) {
}
@Override
public Class<? extends Task> taskClass() {
return null;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return null;
}
@Override
public void stop() {
}
@Override
public ConfigDef config() {
return new ConfigDef()
.define("required", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "required docs")
.define("optional", ConfigDef.Type.STRING, "defaultVal", ConfigDef.Importance.HIGH, "optional docs");
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import java.util.List;
import java.util.Map;
public class TestSourceConnector extends SourceConnector {
public static final String VERSION = "an entirely different version";
@Override
public String version() {
return VERSION;
}
@Override
public void start(Map<String, String> props) {
}
@Override
public Class<? extends Task> taskClass() {
return null;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return null;
}
@Override
public void stop() {
}
@Override
public ConfigDef config() {
return new ConfigDef()
.define("required", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "required docs")
.define("optional", ConfigDef.Type.STRING, "defaultVal", ConfigDef.Importance.HIGH, "optional docs");
}
}

View File

@ -320,7 +320,7 @@ public class DistributedHerderTest {
// config validation
Connector connectorMock = PowerMock.createMock(Connector.class);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef());
@ -362,6 +362,11 @@ public class DistributedHerderTest {
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef());
ConfigValue validatedValue = new ConfigValue("foo.bar");
EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(singletonList(validatedValue)));
EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
// CONN2 creation should fail
@ -396,7 +401,7 @@ public class DistributedHerderTest {
// config validation
Connector connectorMock = PowerMock.createMock(Connector.class);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@ -444,7 +449,7 @@ public class DistributedHerderTest {
// config validation
Connector connectorMock = PowerMock.createMock(SinkConnector.class);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef());
@ -1263,7 +1268,7 @@ public class DistributedHerderTest {
// config validation
Connector connectorMock = PowerMock.createMock(Connector.class);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(6);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(5);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef());

View File

@ -30,6 +30,8 @@ import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.runtime.AbstractHerder;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.TestSinkConnector;
import org.apache.kafka.connect.runtime.TestSourceConnector;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.Plugins;
@ -493,76 +495,6 @@ public class ConnectorPluginsResourceTest {
}
}
public static class TestSinkConnector extends SinkConnector {
static final String VERSION = "some great version";
@Override
public String version() {
return VERSION;
}
@Override
public void start(Map<String, String> props) {
}
@Override
public Class<? extends Task> taskClass() {
return null;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return null;
}
@Override
public void stop() {
}
@Override
public ConfigDef config() {
return null;
}
}
public static class TestSourceConnector extends SourceConnector {
static final String VERSION = "an entirely different version";
@Override
public String version() {
return VERSION;
}
@Override
public void start(Map<String, String> props) {
}
@Override
public Class<? extends Task> taskClass() {
return null;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return null;
}
@Override
public void stop() {
}
@Override
public ConfigDef config() {
return null;
}
}
/* Name here needs to be unique as we are testing the aliasing mechanism */
public static class ConnectorPluginsResourceTestConnector extends Connector {

View File

@ -128,6 +128,7 @@ public class StandaloneHerderTest {
@Test
public void testCreateConnectorFailedBasicValidation() throws Exception {
// Basic validation should be performed and return an error, but should still evaluate the connector's config
connector = PowerMock.createMock(BogusSourceConnector.class);
Map<String, String> config = connectorConfig(SourceSink.SOURCE);
@ -137,7 +138,11 @@ public class StandaloneHerderTest {
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
EasyMock.expect(connectorMock.config()).andStubReturn(new ConfigDef());
ConfigValue validatedValue = new ConfigValue("foo.bar");
EasyMock.expect(connectorMock.validate(config)).andReturn(new Config(singletonList(validatedValue)));
EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
createCallback.onCompletion(EasyMock.<BadRequestException>anyObject(), EasyMock.<Herder.Created<ConnectorInfo>>isNull());
@ -155,7 +160,7 @@ public class StandaloneHerderTest {
connector = PowerMock.createMock(BogusSourceConnector.class);
Connector connectorMock = PowerMock.createMock(Connector.class);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@ -189,7 +194,7 @@ public class StandaloneHerderTest {
Connector connectorMock = PowerMock.createMock(Connector.class);
expectConfigValidation(connectorMock, true, config, config);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(2);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
// No new connector is created
EasyMock.expect(Plugins.compareAndSwapLoaders(delegatingLoader)).andReturn(pluginLoader);
@ -522,7 +527,7 @@ public class StandaloneHerderTest {
);
ConfigDef configDef = new ConfigDef();
configDef.define(key, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "");
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@ -620,7 +625,7 @@ public class StandaloneHerderTest {
Map<String, String>... configs
) {
// config validation
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(4);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
if (shouldCreateConnector) {
EasyMock.expect(worker.getPlugins()).andReturn(plugins);