mirror of https://github.com/apache/kafka.git
KAFKA-13546: Do not fail connector validation if default topic creation group is explicitly specified (#11615)
Reviewers: Chris Egerton <fearthecellos@gmail.com>
This commit is contained in:
parent
78038bca66
commit
a02c8d336a
|
@ -30,6 +30,8 @@ import java.util.List;
|
|||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.apache.kafka.connect.runtime.SourceConnectorConfig.ExactlyOnceSupportLevel.REQUESTED;
|
||||
import static org.apache.kafka.connect.runtime.SourceConnectorConfig.ExactlyOnceSupportLevel.REQUIRED;
|
||||
|
@ -48,6 +50,8 @@ import static org.apache.kafka.common.utils.Utils.enumOptions;
|
|||
|
||||
public class SourceConnectorConfig extends ConnectorConfig {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SourceConnectorConfig.class);
|
||||
|
||||
protected static final String TOPIC_CREATION_GROUP = "Topic Creation";
|
||||
|
||||
public static final String TOPIC_CREATION_PREFIX = "topic.creation.";
|
||||
|
@ -223,6 +227,13 @@ public class SourceConnectorConfig extends ConnectorConfig {
|
|||
topicCreationGroups.addAll((List<?>) aliases);
|
||||
}
|
||||
|
||||
//Remove "topic.creation.groups" config if its present and the value is "default"
|
||||
if (topicCreationGroups.contains(DEFAULT_TOPIC_CREATION_GROUP)) {
|
||||
log.warn("'{}' topic creation group always exists and does not need to be listed explicitly",
|
||||
DEFAULT_TOPIC_CREATION_GROUP);
|
||||
topicCreationGroups.removeAll(Collections.singleton(DEFAULT_TOPIC_CREATION_GROUP));
|
||||
}
|
||||
|
||||
ConfigDef newDef = new ConfigDef(baseConfigDef);
|
||||
String defaultGroupPrefix = TOPIC_CREATION_PREFIX + DEFAULT_TOPIC_CREATION_GROUP + ".";
|
||||
short defaultGroupReplicationFactor = defaultGroupConfig.getShort(defaultGroupPrefix + REPLICATION_FACTOR_CONFIG);
|
||||
|
|
|
@ -33,6 +33,7 @@ import static org.apache.kafka.common.config.TopicConfig.RETENTION_MS_CONFIG;
|
|||
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
|
||||
import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
|
||||
import static org.apache.kafka.connect.runtime.ConnectorConfigTest.MOCK_PLUGINS;
|
||||
import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_GROUPS_CONFIG;
|
||||
import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_GROUP;
|
||||
import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
|
||||
import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
|
||||
|
@ -47,6 +48,8 @@ import static org.junit.Assert.assertTrue;
|
|||
public class SourceConnectorConfigTest {
|
||||
|
||||
private static final String FOO_CONNECTOR = "foo-source";
|
||||
private static final String TOPIC_CREATION_GROUP_1 = "group1";
|
||||
private static final String TOPIC_CREATION_GROUP_2 = "group2";
|
||||
private static final short DEFAULT_REPLICATION_FACTOR = -1;
|
||||
private static final int DEFAULT_PARTITIONS = -1;
|
||||
|
||||
|
@ -64,6 +67,16 @@ public class SourceConnectorConfigTest {
|
|||
return props;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldNotFailWithExplicitlySpecifiedDefaultTopicCreationGroup() {
|
||||
Map<String, String> props = defaultConnectorProps();
|
||||
props.put(TOPIC_CREATION_GROUPS_CONFIG, String.join(",", DEFAULT_TOPIC_CREATION_GROUP,
|
||||
TOPIC_CREATION_GROUP_1, TOPIC_CREATION_GROUP_2));
|
||||
props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG, "1");
|
||||
props.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG, "1");
|
||||
SourceConnectorConfig config = new SourceConnectorConfig(MOCK_PLUGINS, props, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void noTopicCreation() {
|
||||
Map<String, String> props = defaultConnectorProps();
|
||||
|
|
Loading…
Reference in New Issue