KAFKA-6253: Improve sink connector topic regex validation

KAFKA-3073 added topic regex support for sink connectors. The addition requires that you only specify one of topics or topics.regex settings. This is being validated in one place, but not during submission of connectors. This PR adds validation at `AbstractHerder.validateConnectorConfig` and `WorkerConnector.initialize`.

This adds a test of the new behavior to `AbstractHerderTest`.

Author: Jeff Klukas <jeff@klukas.net>

Reviewers: Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #4251 from jklukas/connect-topics-validation

(cherry picked from commit eb3fef760e)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>
This commit is contained in:
Jeff Klukas 2018-02-05 09:46:07 -08:00 committed by Ewen Cheslack-Postava
parent 7723293582
commit dbd447f487
7 changed files with 92 additions and 47 deletions

View File

@ -256,9 +256,13 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
Connector connector = getConnector(connType);
ClassLoader savedLoader = plugins().compareAndSwapLoaders(connector);
try {
ConfigDef baseConfigDef = (connector instanceof SourceConnector)
? SourceConnectorConfig.configDef()
: SinkConnectorConfig.configDef();
ConfigDef baseConfigDef;
if (connector instanceof SourceConnector) {
baseConfigDef = SourceConnectorConfig.configDef();
} else {
baseConfigDef = SinkConnectorConfig.configDef();
SinkConnectorConfig.validate(connectorProps);
}
ConfigDef enrichedConfigDef = ConnectorConfig.enrich(plugins(), baseConfigDef, connectorProps, false);
Map<String, ConfigValue> validatedConnectorConfig = validateBasicConnectorConfig(
connector,

View File

@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.transforms.util.RegexValidator;
@ -34,7 +35,7 @@ public class SinkConnectorConfig extends ConnectorConfig {
public static final String TOPICS_DEFAULT = "";
private static final String TOPICS_DISPLAY = "Topics";
private static final String TOPICS_REGEX_CONFIG = SinkTask.TOPICS_REGEX_CONFIG;
public static final String TOPICS_REGEX_CONFIG = SinkTask.TOPICS_REGEX_CONFIG;
private static final String TOPICS_REGEX_DOC = "Regular expression giving topics to consume. " +
"Under the hood, the regex is compiled to a <code>java.util.regex.Pattern</code>. " +
"Only one of " + TOPICS_CONFIG + " or " + TOPICS_REGEX_CONFIG + " should be specified.";
@ -52,4 +53,34 @@ public class SinkConnectorConfig extends ConnectorConfig {
public SinkConnectorConfig(Plugins plugins, Map<String, String> props) {
super(plugins, config, props);
}
/**
* Throw an exception if the passed-in properties do not constitute a valid sink.
* @param props sink configuration properties
*/
public static void validate(Map<String, String> props) {
final boolean hasTopicsConfig = hasTopicsConfig(props);
final boolean hasTopicsRegexConfig = hasTopicsRegexConfig(props);
if (hasTopicsConfig && hasTopicsRegexConfig) {
throw new ConfigException(SinkTask.TOPICS_CONFIG + " and " + SinkTask.TOPICS_REGEX_CONFIG +
" are mutually exclusive options, but both are set.");
}
if (!hasTopicsConfig && !hasTopicsRegexConfig) {
throw new ConfigException("Must configure one of " +
SinkTask.TOPICS_CONFIG + " or " + SinkTask.TOPICS_REGEX_CONFIG);
}
}
public static boolean hasTopicsConfig(Map<String, String> props) {
String topicsStr = props.get(TOPICS_CONFIG);
return topicsStr != null && !topicsStr.trim().isEmpty();
}
public static boolean hasTopicsRegexConfig(Map<String, String> props) {
String topicsRegexStr = props.get(TOPICS_REGEX_CONFIG);
return topicsRegexStr != null && !topicsRegexStr.trim().isEmpty();
}
}

View File

@ -77,6 +77,9 @@ public class WorkerConnector {
try {
this.config = connectorConfig.originalsStrings();
log.debug("{} Initializing connector {} with config {}", this, connName, config);
if (isSinkConnector()) {
SinkConnectorConfig.validate(config);
}
connector.initialize(new ConnectorContext() {
@Override

View File

@ -25,7 +25,6 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
@ -265,27 +264,14 @@ class WorkerSinkTask extends WorkerTask {
* Initializes and starts the SinkTask.
*/
protected void initializeAndStart() {
String topicsStr = taskConfig.get(SinkTask.TOPICS_CONFIG);
boolean topicsStrPresent = topicsStr != null && !topicsStr.trim().isEmpty();
SinkConnectorConfig.validate(taskConfig);
String topicsRegexStr = taskConfig.get(SinkTask.TOPICS_REGEX_CONFIG);
boolean topicsRegexStrPresent = topicsRegexStr != null && !topicsRegexStr.trim().isEmpty();
if (topicsStrPresent && topicsRegexStrPresent) {
throw new ConfigException(SinkTask.TOPICS_CONFIG + " and " + SinkTask.TOPICS_REGEX_CONFIG +
" are mutually exclusive options, but both are set.");
}
if (!topicsStrPresent && !topicsRegexStrPresent) {
throw new ConfigException("Must configure one of " +
SinkTask.TOPICS_CONFIG + " or " + SinkTask.TOPICS_REGEX_CONFIG);
}
if (topicsStrPresent) {
String[] topics = topicsStr.split(",");
if (SinkConnectorConfig.hasTopicsConfig(taskConfig)) {
String[] topics = taskConfig.get(SinkTask.TOPICS_CONFIG).split(",");
consumer.subscribe(Arrays.asList(topics), new HandleRebalance());
log.debug("{} Initializing and starting task for topics {}", this, topics);
} else {
String topicsRegexStr = taskConfig.get(SinkTask.TOPICS_REGEX_CONFIG);
Pattern pattern = Pattern.compile(topicsRegexStr);
consumer.subscribe(pattern, new HandleRebalance());
log.debug("{} Initializing and starting task for topics regex {}", this, topicsRegexStr);

View File

@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
@ -183,6 +184,21 @@ public class AbstractHerderTest {
verifyAll();
}
@Test(expected = ConfigException.class)
public void testConfigValidationInvalidTopics() {
AbstractHerder herder = createConfigValidationHerder(TestSinkConnector.class);
replayAll();
Map<String, String> config = new HashMap();
config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSinkConnector.class.getName());
config.put(SinkConnectorConfig.TOPICS_CONFIG, "topic1,topic2");
config.put(SinkConnectorConfig.TOPICS_REGEX_CONFIG, "topic.*");
herder.validateConnectorConfig(config);
verifyAll();
}
@Test()
public void testConfigValidationTransformsExtendResults() {
AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class);

View File

@ -355,7 +355,7 @@ public class DistributedHerderTest {
PowerMock.expectLastCall();
// config validation
Connector connectorMock = PowerMock.createMock(Connector.class);
Connector connectorMock = PowerMock.createMock(SourceConnector.class);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@ -398,7 +398,7 @@ public class DistributedHerderTest {
PowerMock.expectLastCall();
// config validation
Connector connectorMock = PowerMock.createMock(Connector.class);
Connector connectorMock = PowerMock.createMock(SourceConnector.class);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@ -443,7 +443,7 @@ public class DistributedHerderTest {
PowerMock.expectLastCall();
// config validation
Connector connectorMock = PowerMock.createMock(Connector.class);
Connector connectorMock = PowerMock.createMock(SourceConnector.class);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@ -1338,7 +1338,7 @@ public class DistributedHerderTest {
PowerMock.expectLastCall();
// config validation
Connector connectorMock = PowerMock.createMock(Connector.class);
Connector connectorMock = PowerMock.createMock(SourceConnector.class);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);

View File

@ -125,7 +125,8 @@ public class StandaloneHerderTest {
expectAdd(SourceSink.SOURCE);
Map<String, String> config = connectorConfig(SourceSink.SOURCE);
expectConfigValidation(config);
Connector connectorMock = PowerMock.createMock(SourceConnector.class);
expectConfigValidation(connectorMock, true, config);
PowerMock.replayAll();
@ -142,7 +143,7 @@ public class StandaloneHerderTest {
Map<String, String> config = connectorConfig(SourceSink.SOURCE);
config.remove(ConnectorConfig.NAME_CONFIG);
Connector connectorMock = PowerMock.createMock(Connector.class);
Connector connectorMock = PowerMock.createMock(SourceConnector.class);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@ -167,7 +168,7 @@ public class StandaloneHerderTest {
public void testCreateConnectorFailedCustomValidation() throws Exception {
connector = PowerMock.createMock(BogusSourceConnector.class);
Connector connectorMock = PowerMock.createMock(Connector.class);
Connector connectorMock = PowerMock.createMock(SourceConnector.class);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
@ -199,7 +200,7 @@ public class StandaloneHerderTest {
expectAdd(SourceSink.SOURCE);
Map<String, String> config = connectorConfig(SourceSink.SOURCE);
Connector connectorMock = PowerMock.createMock(Connector.class);
Connector connectorMock = PowerMock.createMock(SourceConnector.class);
expectConfigValidation(connectorMock, true, config, config);
EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(2);
@ -224,7 +225,8 @@ public class StandaloneHerderTest {
expectAdd(SourceSink.SINK);
Map<String, String> config = connectorConfig(SourceSink.SINK);
expectConfigValidation(config);
Connector connectorMock = PowerMock.createMock(SinkConnector.class);
expectConfigValidation(connectorMock, true, config);
PowerMock.replayAll();
herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback);
@ -238,7 +240,8 @@ public class StandaloneHerderTest {
expectAdd(SourceSink.SOURCE);
Map<String, String> config = connectorConfig(SourceSink.SOURCE);
expectConfigValidation(config);
Connector connectorMock = PowerMock.createMock(SourceConnector.class);
expectConfigValidation(connectorMock, true, config);
EasyMock.expect(statusBackingStore.getAll(CONNECTOR_NAME)).andReturn(Collections.<TaskStatus>emptyList());
statusBackingStore.put(new ConnectorStatus(CONNECTOR_NAME, AbstractStatus.State.DESTROYED, WORKER_ID, 0));
@ -270,7 +273,8 @@ public class StandaloneHerderTest {
expectAdd(SourceSink.SOURCE);
Map<String, String> config = connectorConfig(SourceSink.SOURCE);
expectConfigValidation(config);
Connector connectorMock = PowerMock.createMock(SourceConnector.class);
expectConfigValidation(connectorMock, true, config);
worker.stopConnector(CONNECTOR_NAME);
EasyMock.expectLastCall().andReturn(true);
@ -295,7 +299,8 @@ public class StandaloneHerderTest {
expectAdd(SourceSink.SOURCE);
Map<String, String> config = connectorConfig(SourceSink.SOURCE);
expectConfigValidation(config);
Connector connectorMock = PowerMock.createMock(SourceConnector.class);
expectConfigValidation(connectorMock, true, config);
worker.stopConnector(CONNECTOR_NAME);
EasyMock.expectLastCall().andReturn(true);
@ -326,7 +331,8 @@ public class StandaloneHerderTest {
expectAdd(SourceSink.SOURCE);
Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
expectConfigValidation(connectorConfig);
Connector connectorMock = PowerMock.createMock(SourceConnector.class);
expectConfigValidation(connectorMock, true, connectorConfig);
worker.stopAndAwaitTask(taskId);
EasyMock.expectLastCall();
@ -351,7 +357,8 @@ public class StandaloneHerderTest {
expectAdd(SourceSink.SOURCE);
Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
expectConfigValidation(connectorConfig);
Connector connectorMock = PowerMock.createMock(SourceConnector.class);
expectConfigValidation(connectorMock, true, connectorConfig);
worker.stopAndAwaitTask(taskId);
EasyMock.expectLastCall();
@ -381,7 +388,8 @@ public class StandaloneHerderTest {
expectAdd(SourceSink.SOURCE);
Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
expectConfigValidation(connectorConfig);
Connector connectorMock = PowerMock.createMock(SourceConnector.class);
expectConfigValidation(connectorMock, true, connectorConfig);
// herder.stop() should stop any running connectors and tasks even if destroyConnector was not invoked
expectStop();
@ -402,6 +410,7 @@ public class StandaloneHerderTest {
@Test
public void testAccessors() throws Exception {
Map<String, String> connConfig = connectorConfig(SourceSink.SOURCE);
System.out.println(connConfig);
Callback<Collection<String>> listConnectorsCb = PowerMock.createMock(Callback.class);
Callback<ConnectorInfo> connectorInfoCb = PowerMock.createMock(Callback.class);
@ -421,7 +430,8 @@ public class StandaloneHerderTest {
// Create connector
connector = PowerMock.createMock(BogusSourceConnector.class);
expectAdd(SourceSink.SOURCE);
expectConfigValidation(connConfig);
Connector connectorMock = PowerMock.createMock(SourceConnector.class);
expectConfigValidation(connector, true, connConfig);
// Validate accessors with 1 connector
listConnectorsCb.onCompletion(null, singleton(CONNECTOR_NAME));
@ -467,7 +477,7 @@ public class StandaloneHerderTest {
// Create
connector = PowerMock.createMock(BogusSourceConnector.class);
expectAdd(SourceSink.SOURCE);
Connector connectorMock = PowerMock.createMock(Connector.class);
Connector connectorMock = PowerMock.createMock(SourceConnector.class);
expectConfigValidation(connectorMock, true, connConfig);
// Should get first config
@ -526,7 +536,8 @@ public class StandaloneHerderTest {
Map<String, String> config = new HashMap<>();
config.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, BogusSinkConnector.class.getName());
Connector connectorMock = PowerMock.createMock(Connector.class);
config.put(SinkConnectorConfig.TOPICS_CONFIG, TOPICS_LIST_STR);
Connector connectorMock = PowerMock.createMock(SinkConnector.class);
String error = "This is an error in your config!";
List<String> errors = new ArrayList<>(singletonList(error));
String key = "foo.invalid.key";
@ -592,7 +603,7 @@ public class StandaloneHerderTest {
EasyMock.expect(herder.connectorTypeForClass(BogusSourceConnector.class.getName()))
.andReturn(ConnectorType.SOURCE).anyTimes();
EasyMock.expect(herder.connectorTypeForClass(BogusSinkConnector.class.getName()))
.andReturn(ConnectorType.SINK).anyTimes();
.andReturn(ConnectorType.SINK).anyTimes();
worker.isSinkConnector(CONNECTOR_NAME);
PowerMock.expectLastCall().andReturn(sourceSink == SourceSink.SINK);
}
@ -631,12 +642,6 @@ public class StandaloneHerderTest {
return generatedTaskProps;
}
private void expectConfigValidation(Map<String, String> ... configs) {
Connector connectorMock = PowerMock.createMock(Connector.class);
expectConfigValidation(connectorMock, true, configs);
}
private void expectConfigValidation(
Connector connectorMock,
boolean shouldCreateConnector,