KAFKA-10572 mirror-maker config changes for KIP-629 (#9429)

Author: Xavier Léauté <xavier@confluent.io>
Reviewer: Randall Hauch <rhauch@gmail.com>
This commit is contained in:
Xavier Léauté 2020-10-20 07:15:14 -07:00 committed by GitHub
parent c7f19aba37
commit 60d002f9eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 238 additions and 125 deletions

View File

@ -29,7 +29,7 @@ specific topics or groups:
A->B.groups = group-1, group-2
By default, all topics and consumer groups are replicated (except
blacklisted ones), across all enabled replication flows. Each
excluded ones), across all enabled replication flows. Each
replication flow must be explicitly enabled to begin replication:
A->B.enabled = true

View File

@ -20,58 +20,67 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.utils.ConfigUtils;
import java.util.Map;
import java.util.regex.Pattern;
/** Uses a blacklist of property names or regexes. */
/** Filters excluded property names or regexes. */
public class DefaultConfigPropertyFilter implements ConfigPropertyFilter {
public static final String CONFIG_PROPERTIES_BLACKLIST_CONFIG = "config.properties.blacklist";
private static final String CONFIG_PROPERTIES_BLACKLIST_DOC = "List of topic configuration properties and/or regexes "
public static final String CONFIG_PROPERTIES_EXCLUDE_CONFIG = "config.properties.exclude";
public static final String CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG = "config.properties.blacklist";
private static final String CONFIG_PROPERTIES_EXCLUDE_DOC = "List of topic configuration properties and/or regexes "
+ "that should not be replicated.";
public static final String CONFIG_PROPERTIES_BLACKLIST_DEFAULT = "follower\\.replication\\.throttled\\.replicas, "
public static final String CONFIG_PROPERTIES_EXCLUDE_DEFAULT = "follower\\.replication\\.throttled\\.replicas, "
+ "leader\\.replication\\.throttled\\.replicas, "
+ "message\\.timestamp\\.difference\\.max\\.ms, "
+ "message\\.timestamp\\.type, "
+ "unclean\\.leader\\.election\\.enable, "
+ "min\\.insync\\.replicas";
private Pattern blacklistPattern = MirrorUtils.compilePatternList(CONFIG_PROPERTIES_BLACKLIST_DEFAULT);
private Pattern excludePattern = MirrorUtils.compilePatternList(CONFIG_PROPERTIES_EXCLUDE_DEFAULT);
@Override
public void configure(Map<String, ?> props) {
ConfigPropertyFilterConfig config = new ConfigPropertyFilterConfig(props);
blacklistPattern = config.blacklistPattern();
excludePattern = config.excludePattern();
}
@Override
public void close() {
}
private boolean blacklisted(String prop) {
return blacklistPattern != null && blacklistPattern.matcher(prop).matches();
private boolean excluded(String prop) {
return excludePattern != null && excludePattern.matcher(prop).matches();
}
@Override
public boolean shouldReplicateConfigProperty(String prop) {
return !blacklisted(prop);
return !excluded(prop);
}
static class ConfigPropertyFilterConfig extends AbstractConfig {
static final ConfigDef DEF = new ConfigDef()
.define(CONFIG_PROPERTIES_BLACKLIST_CONFIG,
.define(CONFIG_PROPERTIES_EXCLUDE_CONFIG,
Type.LIST,
CONFIG_PROPERTIES_BLACKLIST_DEFAULT,
CONFIG_PROPERTIES_EXCLUDE_DEFAULT,
Importance.HIGH,
CONFIG_PROPERTIES_BLACKLIST_DOC);
CONFIG_PROPERTIES_EXCLUDE_DOC)
.define(CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG,
Type.LIST,
null,
Importance.HIGH,
"Deprecated. Use " + CONFIG_PROPERTIES_EXCLUDE_CONFIG + " instead.");
ConfigPropertyFilterConfig(Map<?, ?> props) {
super(DEF, props, false);
ConfigPropertyFilterConfig(Map<String, ?> props) {
super(DEF, ConfigUtils.translateDeprecatedConfigs(props, new String[][]{
{CONFIG_PROPERTIES_EXCLUDE_CONFIG, CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG}}), false);
}
Pattern blacklistPattern() {
return MirrorUtils.compilePatternList(getList(CONFIG_PROPERTIES_BLACKLIST_CONFIG));
Pattern excludePattern() {
return MirrorUtils.compilePatternList(getList(CONFIG_PROPERTIES_EXCLUDE_CONFIG));
}
}
}

View File

@ -20,72 +20,81 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.utils.ConfigUtils;
import java.util.Map;
import java.util.regex.Pattern;
/** Uses a whitelist and blacklist. */
/** Uses an include and exclude pattern. */
public class DefaultGroupFilter implements GroupFilter {
public static final String GROUPS_WHITELIST_CONFIG = "groups";
private static final String GROUPS_WHITELIST_DOC = "List of consumer group names and/or regexes to replicate.";
public static final String GROUPS_WHITELIST_DEFAULT = ".*";
public static final String GROUPS_INCLUDE_CONFIG = "groups";
private static final String GROUPS_INCLUDE_DOC = "List of consumer group names and/or regexes to replicate.";
public static final String GROUPS_INCLUDE_DEFAULT = ".*";
public static final String GROUPS_BLACKLIST_CONFIG = "groups.blacklist";
private static final String GROUPS_BLACKLIST_DOC = "List of consumer group names and/or regexes that should not be replicated.";
public static final String GROUPS_BLACKLIST_DEFAULT = "console-consumer-.*, connect-.*, __.*";
public static final String GROUPS_EXCLUDE_CONFIG = "groups.exclude";
public static final String GROUPS_EXCLUDE_CONFIG_ALIAS = "groups.blacklist";
private Pattern whitelistPattern;
private Pattern blacklistPattern;
private static final String GROUPS_EXCLUDE_DOC = "List of consumer group names and/or regexes that should not be replicated.";
public static final String GROUPS_EXCLUDE_DEFAULT = "console-consumer-.*, connect-.*, __.*";
private Pattern includePattern;
private Pattern excludePattern;
@Override
public void configure(Map<String, ?> props) {
GroupFilterConfig config = new GroupFilterConfig(props);
whitelistPattern = config.whitelistPattern();
blacklistPattern = config.blacklistPattern();
includePattern = config.includePattern();
excludePattern = config.excludePattern();
}
@Override
public void close() {
}
private boolean whitelisted(String group) {
return whitelistPattern != null && whitelistPattern.matcher(group).matches();
private boolean included(String group) {
return includePattern != null && includePattern.matcher(group).matches();
}
private boolean blacklisted(String group) {
return blacklistPattern != null && blacklistPattern.matcher(group).matches();
private boolean excluded(String group) {
return excludePattern != null && excludePattern.matcher(group).matches();
}
@Override
public boolean shouldReplicateGroup(String group) {
return whitelisted(group) && !blacklisted(group);
return included(group) && !excluded(group);
}
static class GroupFilterConfig extends AbstractConfig {
static final ConfigDef DEF = new ConfigDef()
.define(GROUPS_WHITELIST_CONFIG,
.define(GROUPS_INCLUDE_CONFIG,
Type.LIST,
GROUPS_WHITELIST_DEFAULT,
GROUPS_INCLUDE_DEFAULT,
Importance.HIGH,
GROUPS_WHITELIST_DOC)
.define(GROUPS_BLACKLIST_CONFIG,
GROUPS_INCLUDE_DOC)
.define(GROUPS_EXCLUDE_CONFIG,
Type.LIST,
GROUPS_BLACKLIST_DEFAULT,
GROUPS_EXCLUDE_DEFAULT,
Importance.HIGH,
GROUPS_BLACKLIST_DOC);
GROUPS_EXCLUDE_DOC)
.define(GROUPS_EXCLUDE_CONFIG_ALIAS,
Type.LIST,
null,
Importance.HIGH,
"Deprecated. Use " + GROUPS_EXCLUDE_CONFIG + " instead.");
GroupFilterConfig(Map<?, ?> props) {
super(DEF, props, false);
GroupFilterConfig(Map<String, ?> props) {
super(DEF, ConfigUtils.translateDeprecatedConfigs(props, new String[][]{
{GROUPS_EXCLUDE_CONFIG, GROUPS_EXCLUDE_CONFIG_ALIAS}}), false);
}
Pattern whitelistPattern() {
return MirrorUtils.compilePatternList(getList(GROUPS_WHITELIST_CONFIG));
Pattern includePattern() {
return MirrorUtils.compilePatternList(getList(GROUPS_INCLUDE_CONFIG));
}
Pattern blacklistPattern() {
return MirrorUtils.compilePatternList(getList(GROUPS_BLACKLIST_CONFIG));
Pattern excludePattern() {
return MirrorUtils.compilePatternList(getList(GROUPS_EXCLUDE_CONFIG));
}
}
}

View File

@ -20,72 +20,80 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.utils.ConfigUtils;
import java.util.Map;
import java.util.regex.Pattern;
/** Uses a whitelist and blacklist. */
/** Uses an include and exclude pattern. */
public class DefaultTopicFilter implements TopicFilter {
public static final String TOPICS_WHITELIST_CONFIG = "topics";
private static final String TOPICS_WHITELIST_DOC = "List of topics and/or regexes to replicate.";
public static final String TOPICS_WHITELIST_DEFAULT = ".*";
public static final String TOPICS_INCLUDE_CONFIG = "topics";
private static final String TOPICS_INCLUDE_DOC = "List of topics and/or regexes to replicate.";
public static final String TOPICS_INCLUDE_DEFAULT = ".*";
public static final String TOPICS_BLACKLIST_CONFIG = "topics.blacklist";
private static final String TOPICS_BLACKLIST_DOC = "List of topics and/or regexes that should not be replicated.";
public static final String TOPICS_BLACKLIST_DEFAULT = ".*[\\-\\.]internal, .*\\.replica, __.*";
public static final String TOPICS_EXCLUDE_CONFIG = "topics.exclude";
public static final String TOPICS_EXCLUDE_CONFIG_ALIAS = "topics.blacklist";
private static final String TOPICS_EXCLUDE_DOC = "List of topics and/or regexes that should not be replicated.";
public static final String TOPICS_EXCLUDE_DEFAULT = ".*[\\-\\.]internal, .*\\.replica, __.*";
private Pattern whitelistPattern;
private Pattern blacklistPattern;
private Pattern includePattern;
private Pattern excludePattern;
@Override
public void configure(Map<String, ?> props) {
TopicFilterConfig config = new TopicFilterConfig(props);
whitelistPattern = config.whitelistPattern();
blacklistPattern = config.blacklistPattern();
includePattern = config.includePattern();
excludePattern = config.excludePattern();
}
@Override
public void close() {
}
private boolean whitelisted(String topic) {
return whitelistPattern != null && whitelistPattern.matcher(topic).matches();
private boolean included(String topic) {
return includePattern != null && includePattern.matcher(topic).matches();
}
private boolean blacklisted(String topic) {
return blacklistPattern != null && blacklistPattern.matcher(topic).matches();
private boolean excluded(String topic) {
return excludePattern != null && excludePattern.matcher(topic).matches();
}
@Override
public boolean shouldReplicateTopic(String topic) {
return whitelisted(topic) && !blacklisted(topic);
return included(topic) && !excluded(topic);
}
static class TopicFilterConfig extends AbstractConfig {
static final ConfigDef DEF = new ConfigDef()
.define(TOPICS_WHITELIST_CONFIG,
.define(TOPICS_INCLUDE_CONFIG,
Type.LIST,
TOPICS_WHITELIST_DEFAULT,
TOPICS_INCLUDE_DEFAULT,
Importance.HIGH,
TOPICS_WHITELIST_DOC)
.define(TOPICS_BLACKLIST_CONFIG,
TOPICS_INCLUDE_DOC)
.define(TOPICS_EXCLUDE_CONFIG,
Type.LIST,
TOPICS_BLACKLIST_DEFAULT,
TOPICS_EXCLUDE_DEFAULT,
Importance.HIGH,
TOPICS_BLACKLIST_DOC);
TOPICS_EXCLUDE_DOC)
.define(TOPICS_EXCLUDE_CONFIG_ALIAS,
Type.LIST,
null,
Importance.HIGH,
"Deprecated. Use " + TOPICS_EXCLUDE_CONFIG + " instead.");
TopicFilterConfig(Map<?, ?> props) {
super(DEF, props, false);
TopicFilterConfig(Map<String, ?> props) {
super(DEF, ConfigUtils.translateDeprecatedConfigs(props, new String[][]{
{TOPICS_EXCLUDE_CONFIG, TOPICS_EXCLUDE_CONFIG_ALIAS}}), false);
}
Pattern whitelistPattern() {
return MirrorUtils.compilePatternList(getList(TOPICS_WHITELIST_CONFIG));
Pattern includePattern() {
return MirrorUtils.compilePatternList(getList(TOPICS_INCLUDE_CONFIG));
}
Pattern blacklistPattern() {
return MirrorUtils.compilePatternList(getList(TOPICS_BLACKLIST_CONFIG));
Pattern excludePattern() {
return MirrorUtils.compilePatternList(getList(TOPICS_EXCLUDE_CONFIG));
}
}
}

View File

@ -24,6 +24,7 @@ import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.utils.ConfigUtils;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
@ -90,23 +91,27 @@ public class MirrorConnectorConfig extends AbstractConfig {
public static final String REPLICATION_FACTOR = "replication.factor";
private static final String REPLICATION_FACTOR_DOC = "Replication factor for newly created remote topics.";
public static final int REPLICATION_FACTOR_DEFAULT = 2;
public static final String TOPICS = DefaultTopicFilter.TOPICS_WHITELIST_CONFIG;
public static final String TOPICS_DEFAULT = DefaultTopicFilter.TOPICS_WHITELIST_DEFAULT;
public static final String TOPICS = DefaultTopicFilter.TOPICS_INCLUDE_CONFIG;
public static final String TOPICS_DEFAULT = DefaultTopicFilter.TOPICS_INCLUDE_DEFAULT;
private static final String TOPICS_DOC = "Topics to replicate. Supports comma-separated topic names and regexes.";
public static final String TOPICS_BLACKLIST = DefaultTopicFilter.TOPICS_BLACKLIST_CONFIG;
public static final String TOPICS_BLACKLIST_DEFAULT = DefaultTopicFilter.TOPICS_BLACKLIST_DEFAULT;
private static final String TOPICS_BLACKLIST_DOC = "Blacklisted topics. Supports comma-separated topic names and regexes."
+ " Blacklists take precedence over whitelists.";
public static final String GROUPS = DefaultGroupFilter.GROUPS_WHITELIST_CONFIG;
public static final String GROUPS_DEFAULT = DefaultGroupFilter.GROUPS_WHITELIST_DEFAULT;
public static final String TOPICS_EXCLUDE = DefaultTopicFilter.TOPICS_EXCLUDE_CONFIG;
public static final String TOPICS_EXCLUDE_ALIAS = DefaultTopicFilter.TOPICS_EXCLUDE_CONFIG_ALIAS;
public static final String TOPICS_EXCLUDE_DEFAULT = DefaultTopicFilter.TOPICS_EXCLUDE_DEFAULT;
private static final String TOPICS_EXCLUDE_DOC = "Excluded topics. Supports comma-separated topic names and regexes."
+ " Excludes take precedence over includes.";
public static final String GROUPS = DefaultGroupFilter.GROUPS_INCLUDE_CONFIG;
public static final String GROUPS_DEFAULT = DefaultGroupFilter.GROUPS_INCLUDE_DEFAULT;
private static final String GROUPS_DOC = "Consumer groups to replicate. Supports comma-separated group IDs and regexes.";
public static final String GROUPS_BLACKLIST = DefaultGroupFilter.GROUPS_BLACKLIST_CONFIG;
public static final String GROUPS_BLACKLIST_DEFAULT = DefaultGroupFilter.GROUPS_BLACKLIST_DEFAULT;
private static final String GROUPS_BLACKLIST_DOC = "Blacklisted groups. Supports comma-separated group IDs and regexes."
+ " Blacklists take precedence over whitelists.";
public static final String CONFIG_PROPERTIES_BLACKLIST = DefaultConfigPropertyFilter.CONFIG_PROPERTIES_BLACKLIST_CONFIG;
public static final String CONFIG_PROPERTIES_BLACKLIST_DEFAULT = DefaultConfigPropertyFilter.CONFIG_PROPERTIES_BLACKLIST_DEFAULT;
private static final String CONFIG_PROPERTIES_BLACKLIST_DOC = "Topic config properties that should not be replicated. Supports "
public static final String GROUPS_EXCLUDE = DefaultGroupFilter.GROUPS_EXCLUDE_CONFIG;
public static final String GROUPS_EXCLUDE_ALIAS = DefaultGroupFilter.GROUPS_EXCLUDE_CONFIG_ALIAS;
public static final String GROUPS_EXCLUDE_DEFAULT = DefaultGroupFilter.GROUPS_EXCLUDE_DEFAULT;
private static final String GROUPS_EXCLUDE_DOC = "Exclude groups. Supports comma-separated group IDs and regexes."
+ " Excludes take precedence over includes.";
public static final String CONFIG_PROPERTIES_EXCLUDE = DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_CONFIG;
public static final String CONFIG_PROPERTIES_EXCLUDE_ALIAS = DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG;
public static final String CONFIG_PROPERTIES_EXCLUDE_DEFAULT = DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_DEFAULT;
private static final String CONFIG_PROPERTIES_EXCLUDE_DOC = "Topic config properties that should not be replicated. Supports "
+ "comma-separated property names and regexes.";
public static final String HEARTBEATS_TOPIC_REPLICATION_FACTOR = "heartbeats.topic.replication.factor";
@ -206,7 +211,10 @@ public class MirrorConnectorConfig extends AbstractConfig {
protected static final String ADMIN_CLIENT_PREFIX = "admin.";
public MirrorConnectorConfig(Map<String, String> props) {
this(CONNECTOR_CONFIG_DEF, props);
this(CONNECTOR_CONFIG_DEF, ConfigUtils.translateDeprecatedConfigs(props, new String[][]{
{TOPICS_EXCLUDE, TOPICS_EXCLUDE_ALIAS},
{GROUPS_EXCLUDE, GROUPS_EXCLUDE_ALIAS},
{CONFIG_PROPERTIES_EXCLUDE, CONFIG_PROPERTIES_EXCLUDE_ALIAS}}));
}
protected MirrorConnectorConfig(ConfigDef configDef, Map<String, String> props) {
@ -438,11 +446,17 @@ public class MirrorConnectorConfig extends AbstractConfig {
ConfigDef.Importance.HIGH,
TOPICS_DOC)
.define(
TOPICS_BLACKLIST,
TOPICS_EXCLUDE,
ConfigDef.Type.LIST,
TOPICS_BLACKLIST_DEFAULT,
TOPICS_EXCLUDE_DEFAULT,
ConfigDef.Importance.HIGH,
TOPICS_BLACKLIST_DOC)
TOPICS_EXCLUDE_DOC)
.define(
TOPICS_EXCLUDE_ALIAS,
ConfigDef.Type.LIST,
null,
ConfigDef.Importance.HIGH,
"Deprecated. Use " + TOPICS_EXCLUDE + " instead.")
.define(
GROUPS,
ConfigDef.Type.LIST,
@ -450,17 +464,29 @@ public class MirrorConnectorConfig extends AbstractConfig {
ConfigDef.Importance.HIGH,
GROUPS_DOC)
.define(
GROUPS_BLACKLIST,
GROUPS_EXCLUDE,
ConfigDef.Type.LIST,
GROUPS_BLACKLIST_DEFAULT,
GROUPS_EXCLUDE_DEFAULT,
ConfigDef.Importance.HIGH,
GROUPS_BLACKLIST_DOC)
GROUPS_EXCLUDE_DOC)
.define(
CONFIG_PROPERTIES_BLACKLIST,
GROUPS_EXCLUDE_ALIAS,
ConfigDef.Type.LIST,
CONFIG_PROPERTIES_BLACKLIST_DEFAULT,
null,
ConfigDef.Importance.HIGH,
CONFIG_PROPERTIES_BLACKLIST_DOC)
"Deprecated. Use " + GROUPS_EXCLUDE + " instead.")
.define(
CONFIG_PROPERTIES_EXCLUDE,
ConfigDef.Type.LIST,
CONFIG_PROPERTIES_EXCLUDE_DEFAULT,
ConfigDef.Importance.HIGH,
CONFIG_PROPERTIES_EXCLUDE_DOC)
.define(
CONFIG_PROPERTIES_EXCLUDE_ALIAS,
ConfigDef.Type.LIST,
null,
ConfigDef.Importance.HIGH,
"Deprecated. Use " + CONFIG_PROPERTIES_EXCLUDE + " instead.")
.define(
TOPIC_FILTER_CLASS,
ConfigDef.Type.CLASS,

View File

@ -70,11 +70,25 @@ public class MirrorConnectorConfigTest {
@Test
public void testConfigPropertyMatching() {
MirrorConnectorConfig config = new MirrorConnectorConfig(
makeProps("config.properties.blacklist", "prop2"));
makeProps("config.properties.exclude", "prop2"));
assertTrue(config.configPropertyFilter().shouldReplicateConfigProperty("prop1"));
assertFalse(config.configPropertyFilter().shouldReplicateConfigProperty("prop2"));
}
@Test
public void testConfigBackwardsCompatibility() {
MirrorConnectorConfig config = new MirrorConnectorConfig(
makeProps("config.properties.blacklist", "prop1",
"groups.blacklist", "group-1",
"topics.blacklist", "topic-1"));
assertFalse(config.configPropertyFilter().shouldReplicateConfigProperty("prop1"));
assertTrue(config.configPropertyFilter().shouldReplicateConfigProperty("prop2"));
assertFalse(config.topicFilter().shouldReplicateTopic("topic-1"));
assertTrue(config.topicFilter().shouldReplicateTopic("topic-2"));
assertFalse(config.groupFilter().shouldReplicateGroup("group-1"));
assertTrue(config.groupFilter().shouldReplicateGroup("group-2"));
}
@Test
public void testNoTopics() {
MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("topics", ""));

View File

@ -127,7 +127,7 @@ public class MirrorMakerConfigTest {
"topics", "topic-1",
"groups", "group-2",
"replication.policy.separator", "__",
"config.properties.blacklist", "property-3",
"config.properties.exclude", "property-3",
"metric.reporters", "FakeMetricsReporter",
"topic.filter.class", DefaultTopicFilter.class.getName(),
"xxx", "yyy"));
@ -137,12 +137,12 @@ public class MirrorMakerConfigTest {
MirrorConnectorConfig connectorConfig = new MirrorConnectorConfig(connectorProps);
assertEquals("Connector properties like tasks.max should be passed through to underlying Connectors.",
100, (int) connectorConfig.getInt("tasks.max"));
assertEquals("Topics whitelist should be passed through to underlying Connectors.",
assertEquals("Topics include should be passed through to underlying Connectors.",
Arrays.asList("topic-1"), connectorConfig.getList("topics"));
assertEquals("Groups whitelist should be passed through to underlying Connectors.",
assertEquals("Groups include should be passed through to underlying Connectors.",
Arrays.asList("group-2"), connectorConfig.getList("groups"));
assertEquals("Config properties blacklist should be passed through to underlying Connectors.",
Arrays.asList("property-3"), connectorConfig.getList("config.properties.blacklist"));
assertEquals("Config properties exclude should be passed through to underlying Connectors.",
Arrays.asList("property-3"), connectorConfig.getList("config.properties.exclude"));
assertEquals("Metrics reporters should be passed through to underlying Connectors.",
Arrays.asList("FakeMetricsReporter"), connectorConfig.getList("metric.reporters"));
assertEquals("Filters should be passed through to underlying Connectors.",
@ -153,12 +153,59 @@ public class MirrorMakerConfigTest {
connectorConfig.originals().containsKey("xxx"));
}
@Test
public void testConfigBackwardsCompatibility() {
MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
"clusters", "a, b",
"groups.blacklist", "group-7",
"topics.blacklist", "topic3",
"config.properties.blacklist", "property-3",
"topic.filter.class", DefaultTopicFilter.class.getName()));
SourceAndTarget sourceAndTarget = new SourceAndTarget("source", "target");
Map<String, String> connectorProps = mirrorConfig.connectorBaseConfig(sourceAndTarget,
MirrorSourceConnector.class);
MirrorConnectorConfig connectorConfig = new MirrorConnectorConfig(connectorProps);
DefaultTopicFilter.TopicFilterConfig filterConfig =
new DefaultTopicFilter.TopicFilterConfig(connectorProps);
assertEquals("Topics exclude should be backwards compatible.",
Arrays.asList("topic3"), filterConfig.getList("topics.exclude"));
assertEquals("Groups exclude should be backwards compatible.",
Arrays.asList("group-7"), connectorConfig.getList("groups.exclude"));
assertEquals("Config properties exclude should be backwards compatible.",
Arrays.asList("property-3"), connectorConfig.getList("config.properties.exclude"));
}
@Test
public void testConfigBackwardsCompatibilitySourceTarget() {
MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
"clusters", "a, b",
"source->target.topics.blacklist", "topic3",
"source->target.groups.blacklist", "group-7",
"topic.filter.class", DefaultTopicFilter.class.getName()));
SourceAndTarget sourceAndTarget = new SourceAndTarget("source", "target");
Map<String, String> connectorProps = mirrorConfig.connectorBaseConfig(sourceAndTarget,
MirrorSourceConnector.class);
MirrorConnectorConfig connectorConfig = new MirrorConnectorConfig(connectorProps);
DefaultTopicFilter.TopicFilterConfig filterConfig =
new DefaultTopicFilter.TopicFilterConfig(connectorProps);
assertEquals("Topics exclude should be backwards compatible.",
Arrays.asList("topic3"), filterConfig.getList("topics.exclude"));
assertEquals("Groups exclude should be backwards compatible.",
Arrays.asList("group-7"), connectorConfig.getList("groups.exclude"));
}
@Test
public void testIncludesTopicFilterProperties() {
MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
"clusters", "a, b",
"source->target.topics", "topic1, topic2",
"source->target.topics.blacklist", "topic3"));
"source->target.topics.exclude", "topic3"));
SourceAndTarget sourceAndTarget = new SourceAndTarget("source", "target");
Map<String, String> connectorProps = mirrorConfig.connectorBaseConfig(sourceAndTarget,
MirrorSourceConnector.class);
@ -166,8 +213,8 @@ public class MirrorMakerConfigTest {
new DefaultTopicFilter.TopicFilterConfig(connectorProps);
assertEquals("source->target.topics should be passed through to TopicFilters.",
Arrays.asList("topic1", "topic2"), filterConfig.getList("topics"));
assertEquals("source->target.topics.blacklist should be passed through to TopicFilters.",
Arrays.asList("topic3"), filterConfig.getList("topics.blacklist"));
assertEquals("source->target.topics.exclude should be passed through to TopicFilters.",
Arrays.asList("topic3"), filterConfig.getList("topics.exclude"));
}
@Test

View File

@ -129,7 +129,7 @@ public class MirrorSourceConnectorTest {
Config targetConfig = connector.targetConfig(config);
assertTrue("should replicate properties", targetConfig.entries().stream()
.anyMatch(x -> x.name().equals("name-1")));
assertFalse("should not replicate blacklisted properties", targetConfig.entries().stream()
assertFalse("should not replicate excluded properties", targetConfig.entries().stream()
.anyMatch(x -> x.name().equals("min.insync.replicas")));
}