KAFKA-3684: SinkConnectorConfig does not return topics in config validation.

Author: Liquan Pei <liquanpei@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1356 from Ishiihara/bug-fix-validate
This commit is contained in:
Liquan Pei 2016-05-09 17:37:17 -07:00 committed by Ewen Cheslack-Postava
parent 18226ff0be
commit 9575e93070
4 changed files with 14 additions and 12 deletions

View File

@ -241,7 +241,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
connectorConfigDef = SinkConnectorConfig.configDef(); connectorConfigDef = SinkConnectorConfig.configDef();
} }
List<ConfigValue> connectorConfigValues = connectorConfigDef.validate(connectorConfig); List<ConfigValue> connectorConfigValues = connectorConfigDef.validate(connectorConfig);
Config config = connector.validate(connectorConfig); Config config = connector.validate(connectorConfig);
ConfigDef configDef = connector.config(); ConfigDef configDef = connector.config();
Map<String, ConfigKey> configKeys = configDef.configKeys(); Map<String, ConfigKey> configKeys = configDef.configKeys();

View File

@ -60,17 +60,11 @@ public class ConnectorConfig extends AbstractConfig {
private static final String TASK_MAX_DISPLAY = "Tasks max"; private static final String TASK_MAX_DISPLAY = "Tasks max";
protected static ConfigDef config;
static {
config = new ConfigDef()
.define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC, COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY)
.define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, 2, Width.LONG, CONNECTOR_CLASS_DISPLAY)
.define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY);
}
public static ConfigDef configDef() { public static ConfigDef configDef() {
return config; return new ConfigDef()
.define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC, COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY)
.define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, 2, Width.LONG, CONNECTOR_CLASS_DISPLAY)
.define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY);
} }
public ConnectorConfig() { public ConnectorConfig() {
@ -78,7 +72,7 @@ public class ConnectorConfig extends AbstractConfig {
} }
public ConnectorConfig(Map<String, String> props) { public ConnectorConfig(Map<String, String> props) {
super(config, props); super(configDef(), props);
} }
public ConnectorConfig(ConfigDef subClassConfig, Map<String, String> props) { public ConnectorConfig(ConfigDef subClassConfig, Map<String, String> props) {

View File

@ -40,6 +40,10 @@ public class SinkConnectorConfig extends ConnectorConfig {
this(new HashMap<String, String>()); this(new HashMap<String, String>());
} }
public static ConfigDef configDef() {
return config;
}
public SinkConnectorConfig(Map<String, String> props) { public SinkConnectorConfig(Map<String, String> props) {
super(config, props); super(config, props);
} }

View File

@ -17,10 +17,14 @@
package org.apache.kafka.connect.runtime; package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.config.ConfigDef;
import java.util.Map; import java.util.Map;
public class SourceConnectorConfig extends ConnectorConfig { public class SourceConnectorConfig extends ConnectorConfig {
private static ConfigDef config = configDef();
public SourceConnectorConfig(Map<String, String> props) { public SourceConnectorConfig(Map<String, String> props) {
super(config, props); super(config, props);
} }