KAFKA-5807 - Check Connector.config() and Transformation.config() returns a valid ConfigDef

Little back story on this. Was helping a user over email. This could be much easier to debug if we assume that the connector developer might not return valid configs. For example Intellij will generate a stub that returns a null. This was the case that inspired this JIRA.

Author: Jeremy Custenborder <jcustenborder@gmail.com>

Reviewers: Jason Gustafson <jason@confluent.io>, Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #3762 from jcustenborder/KAFKA-5807
This commit is contained in:
Jeremy Custenborder 2018-05-22 13:53:15 -07:00 committed by Ewen Cheslack-Postava
parent 5be47a2f26
commit 7ecfbab92f
3 changed files with 35 additions and 2 deletions

View File

@ -19,6 +19,7 @@ package org.apache.kafka.connect.connector;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.errors.ConnectException;
import java.util.List;
import java.util.Map;
@ -130,13 +131,18 @@ public abstract class Connector {
*/
public Config validate(Map<String, String> connectorConfigs) {
ConfigDef configDef = config();
if (null == configDef) {
throw new ConnectException(
String.format("%s.config() must return a ConfigDef that is not null.", this.getClass().getName())
);
}
List<ConfigValue> configValues = configDef.validate(connectorConfigs);
return new Config(configValues);
}
/**
* Define the configuration for the connector.
* @return The ConfigDef for this connector.
* @return The ConfigDef for this connector; may not be null.
*/
public abstract ConfigDef config();
}

View File

@ -271,7 +271,23 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
// do custom connector-specific validation
Config config = connector.validate(connectorProps);
if (null == config) {
throw new BadRequestException(
String.format(
"%s.validate() must return a Config that is not null.",
connector.getClass().getName()
)
);
}
ConfigDef configDef = connector.config();
if (null == configDef) {
throw new BadRequestException(
String.format(
"%s.config() must return a ConfigDef that is not null.",
connector.getClass().getName()
)
);
}
configKeys.putAll(configDef.configKeys());
allGroups.addAll(configDef.groups());
configValues.addAll(config.configValues());

View File

@ -227,11 +227,22 @@ public class ConnectorConfig extends AbstractConfig {
if (transformationCls == null || !Transformation.class.isAssignableFrom(transformationCls)) {
throw new ConfigException(key, String.valueOf(transformationCls), "Not a Transformation");
}
Transformation transformation;
try {
return (transformationCls.asSubclass(Transformation.class).newInstance()).config();
transformation = transformationCls.asSubclass(Transformation.class).newInstance();
} catch (Exception e) {
throw new ConfigException(key, String.valueOf(transformationCls), "Error getting config definition from Transformation: " + e.getMessage());
}
ConfigDef configDef = transformation.config();
if (null == configDef) {
throw new ConnectException(
String.format(
"%s.config() must return a ConfigDef that is not null.",
transformationCls.getName()
)
);
}
return configDef;
}
/**