mirror of https://github.com/apache/kafka.git
KAFKA-17529 Remove blacklist from MM2 (#17202)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
e89fc4f5f5
commit
9e809be4ce
|
|
@ -20,7 +20,6 @@ import org.apache.kafka.common.config.AbstractConfig;
|
|||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.common.config.ConfigDef.Importance;
|
||||
import org.apache.kafka.common.config.ConfigDef.Type;
|
||||
import org.apache.kafka.common.utils.ConfigUtils;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.regex.Pattern;
|
||||
|
|
@ -29,7 +28,6 @@ import java.util.regex.Pattern;
|
|||
public class DefaultConfigPropertyFilter implements ConfigPropertyFilter {
|
||||
|
||||
public static final String CONFIG_PROPERTIES_EXCLUDE_CONFIG = "config.properties.exclude";
|
||||
public static final String CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG = "config.properties.blacklist";
|
||||
public static final String USE_DEFAULTS_FROM = "use.defaults.from";
|
||||
private static final String USE_DEFAULTS_FROM_DOC = "Which cluster's defaults (source or target) to use "
|
||||
+ "when syncing topic configurations that have default values.";
|
||||
|
|
@ -75,11 +73,6 @@ public class DefaultConfigPropertyFilter implements ConfigPropertyFilter {
|
|||
CONFIG_PROPERTIES_EXCLUDE_DEFAULT,
|
||||
Importance.HIGH,
|
||||
CONFIG_PROPERTIES_EXCLUDE_DOC)
|
||||
.define(CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG,
|
||||
Type.LIST,
|
||||
null,
|
||||
Importance.HIGH,
|
||||
"Deprecated. Use " + CONFIG_PROPERTIES_EXCLUDE_CONFIG + " instead.")
|
||||
.define(USE_DEFAULTS_FROM,
|
||||
Type.STRING,
|
||||
USE_DEFAULTS_FROM_DEFAULT,
|
||||
|
|
@ -88,8 +81,7 @@ public class DefaultConfigPropertyFilter implements ConfigPropertyFilter {
|
|||
|
||||
|
||||
ConfigPropertyFilterConfig(Map<String, ?> props) {
|
||||
super(DEF, ConfigUtils.translateDeprecatedConfigs(props, new String[][]{
|
||||
{CONFIG_PROPERTIES_EXCLUDE_CONFIG, CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG}}), false);
|
||||
super(DEF, props, false);
|
||||
}
|
||||
|
||||
Pattern excludePattern() {
|
||||
|
|
|
|||
|
|
@ -20,7 +20,6 @@ import org.apache.kafka.common.config.AbstractConfig;
|
|||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.common.config.ConfigDef.Importance;
|
||||
import org.apache.kafka.common.config.ConfigDef.Type;
|
||||
import org.apache.kafka.common.utils.ConfigUtils;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.regex.Pattern;
|
||||
|
|
@ -33,7 +32,6 @@ public class DefaultGroupFilter implements GroupFilter {
|
|||
public static final String GROUPS_INCLUDE_DEFAULT = ".*";
|
||||
|
||||
public static final String GROUPS_EXCLUDE_CONFIG = "groups.exclude";
|
||||
public static final String GROUPS_EXCLUDE_CONFIG_ALIAS = "groups.blacklist";
|
||||
|
||||
private static final String GROUPS_EXCLUDE_DOC = "List of consumer group names and/or regexes that should not be replicated.";
|
||||
public static final String GROUPS_EXCLUDE_DEFAULT = "console-consumer-.*, connect-.*, __.*";
|
||||
|
|
@ -73,16 +71,10 @@ public class DefaultGroupFilter implements GroupFilter {
|
|||
Type.LIST,
|
||||
GROUPS_EXCLUDE_DEFAULT,
|
||||
Importance.HIGH,
|
||||
GROUPS_EXCLUDE_DOC)
|
||||
.define(GROUPS_EXCLUDE_CONFIG_ALIAS,
|
||||
Type.LIST,
|
||||
null,
|
||||
Importance.HIGH,
|
||||
"Deprecated. Use " + GROUPS_EXCLUDE_CONFIG + " instead.");
|
||||
GROUPS_EXCLUDE_DOC);
|
||||
|
||||
GroupFilterConfig(Map<String, ?> props) {
|
||||
super(DEF, ConfigUtils.translateDeprecatedConfigs(props, new String[][]{
|
||||
{GROUPS_EXCLUDE_CONFIG, GROUPS_EXCLUDE_CONFIG_ALIAS}}), false);
|
||||
super(DEF, props, false);
|
||||
}
|
||||
|
||||
Pattern includePattern() {
|
||||
|
|
|
|||
|
|
@ -20,7 +20,6 @@ import org.apache.kafka.common.config.AbstractConfig;
|
|||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.common.config.ConfigDef.Importance;
|
||||
import org.apache.kafka.common.config.ConfigDef.Type;
|
||||
import org.apache.kafka.common.utils.ConfigUtils;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.regex.Pattern;
|
||||
|
|
@ -33,7 +32,6 @@ public class DefaultTopicFilter implements TopicFilter {
|
|||
public static final String TOPICS_INCLUDE_DEFAULT = ".*";
|
||||
|
||||
public static final String TOPICS_EXCLUDE_CONFIG = "topics.exclude";
|
||||
public static final String TOPICS_EXCLUDE_CONFIG_ALIAS = "topics.blacklist";
|
||||
private static final String TOPICS_EXCLUDE_DOC = "List of topics and/or regexes that should not be replicated.";
|
||||
public static final String TOPICS_EXCLUDE_DEFAULT = ".*[\\-\\.]internal, .*\\.replica, __.*";
|
||||
|
||||
|
|
@ -72,16 +70,10 @@ public class DefaultTopicFilter implements TopicFilter {
|
|||
Type.LIST,
|
||||
TOPICS_EXCLUDE_DEFAULT,
|
||||
Importance.HIGH,
|
||||
TOPICS_EXCLUDE_DOC)
|
||||
.define(TOPICS_EXCLUDE_CONFIG_ALIAS,
|
||||
Type.LIST,
|
||||
null,
|
||||
Importance.HIGH,
|
||||
"Deprecated. Use " + TOPICS_EXCLUDE_CONFIG + " instead.");
|
||||
TOPICS_EXCLUDE_DOC);
|
||||
|
||||
TopicFilterConfig(Map<String, ?> props) {
|
||||
super(DEF, ConfigUtils.translateDeprecatedConfigs(props, new String[][]{
|
||||
{TOPICS_EXCLUDE_CONFIG, TOPICS_EXCLUDE_CONFIG_ALIAS}}), false);
|
||||
super(DEF, props, false);
|
||||
}
|
||||
|
||||
Pattern includePattern() {
|
||||
|
|
|
|||
|
|
@ -17,7 +17,6 @@
|
|||
package org.apache.kafka.connect.mirror;
|
||||
|
||||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.common.utils.ConfigUtils;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.HashMap;
|
||||
|
|
@ -33,7 +32,6 @@ public class MirrorCheckpointConfig extends MirrorConnectorConfig {
|
|||
public static final String GROUPS_DEFAULT = DefaultGroupFilter.GROUPS_INCLUDE_DEFAULT;
|
||||
private static final String GROUPS_DOC = "Consumer groups to replicate. Supports comma-separated group IDs and regexes.";
|
||||
public static final String GROUPS_EXCLUDE = DefaultGroupFilter.GROUPS_EXCLUDE_CONFIG;
|
||||
public static final String GROUPS_EXCLUDE_ALIAS = DefaultGroupFilter.GROUPS_EXCLUDE_CONFIG_ALIAS;
|
||||
|
||||
public static final String GROUPS_EXCLUDE_DEFAULT = DefaultGroupFilter.GROUPS_EXCLUDE_DEFAULT;
|
||||
private static final String GROUPS_EXCLUDE_DOC = "Exclude groups. Supports comma-separated group IDs and regexes."
|
||||
|
|
@ -80,9 +78,7 @@ public class MirrorCheckpointConfig extends MirrorConnectorConfig {
|
|||
public static final String CHECKPOINTS_TARGET_CONSUMER_ROLE = "checkpoints-target-consumer";
|
||||
|
||||
public MirrorCheckpointConfig(Map<String, String> props) {
|
||||
super(CONNECTOR_CONFIG_DEF, ConfigUtils.translateDeprecatedConfigs(props, new String[][]{
|
||||
{GROUPS_EXCLUDE, GROUPS_EXCLUDE_ALIAS},
|
||||
}));
|
||||
super(CONNECTOR_CONFIG_DEF, props);
|
||||
}
|
||||
|
||||
public MirrorCheckpointConfig(ConfigDef configDef, Map<String, String> props) {
|
||||
|
|
@ -206,12 +202,6 @@ public class MirrorCheckpointConfig extends MirrorConnectorConfig {
|
|||
GROUPS_EXCLUDE_DEFAULT,
|
||||
ConfigDef.Importance.HIGH,
|
||||
GROUPS_EXCLUDE_DOC)
|
||||
.define(
|
||||
GROUPS_EXCLUDE_ALIAS,
|
||||
ConfigDef.Type.LIST,
|
||||
null,
|
||||
ConfigDef.Importance.HIGH,
|
||||
"Deprecated. Use " + GROUPS_EXCLUDE + " instead.")
|
||||
.define(
|
||||
GROUP_FILTER_CLASS,
|
||||
ConfigDef.Type.CLASS,
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@ package org.apache.kafka.connect.mirror;
|
|||
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.common.utils.ConfigUtils;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
|
|
@ -40,13 +39,11 @@ public class MirrorSourceConfig extends MirrorConnectorConfig {
|
|||
public static final String TOPICS_DEFAULT = DefaultTopicFilter.TOPICS_INCLUDE_DEFAULT;
|
||||
private static final String TOPICS_DOC = "Topics to replicate. Supports comma-separated topic names and regexes.";
|
||||
public static final String TOPICS_EXCLUDE = DefaultTopicFilter.TOPICS_EXCLUDE_CONFIG;
|
||||
public static final String TOPICS_EXCLUDE_ALIAS = DefaultTopicFilter.TOPICS_EXCLUDE_CONFIG_ALIAS;
|
||||
public static final String TOPICS_EXCLUDE_DEFAULT = DefaultTopicFilter.TOPICS_EXCLUDE_DEFAULT;
|
||||
private static final String TOPICS_EXCLUDE_DOC = "Excluded topics. Supports comma-separated topic names and regexes."
|
||||
+ " Excludes take precedence over includes.";
|
||||
|
||||
public static final String CONFIG_PROPERTIES_EXCLUDE = DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_CONFIG;
|
||||
public static final String CONFIG_PROPERTIES_EXCLUDE_ALIAS = DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_ALIAS_CONFIG;
|
||||
public static final String CONFIG_PROPERTIES_EXCLUDE_DEFAULT = DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_DEFAULT;
|
||||
private static final String CONFIG_PROPERTIES_EXCLUDE_DOC = "Topic config properties that should not be replicated. Supports "
|
||||
+ "comma-separated property names and regexes.";
|
||||
|
|
@ -97,9 +94,7 @@ public class MirrorSourceConfig extends MirrorConnectorConfig {
|
|||
public static final String OFFSET_SYNCS_TARGET_ADMIN_ROLE = OFFSET_SYNCS_CLIENT_ROLE_PREFIX + "target-admin";
|
||||
|
||||
public MirrorSourceConfig(Map<String, String> props) {
|
||||
super(CONNECTOR_CONFIG_DEF, ConfigUtils.translateDeprecatedConfigs(props, new String[][]{
|
||||
{TOPICS_EXCLUDE, TOPICS_EXCLUDE_ALIAS},
|
||||
{CONFIG_PROPERTIES_EXCLUDE, CONFIG_PROPERTIES_EXCLUDE_ALIAS}}));
|
||||
super(CONNECTOR_CONFIG_DEF, props);
|
||||
}
|
||||
|
||||
public MirrorSourceConfig(ConfigDef configDef, Map<String, String> props) {
|
||||
|
|
@ -212,24 +207,12 @@ public class MirrorSourceConfig extends MirrorConnectorConfig {
|
|||
TOPICS_EXCLUDE_DEFAULT,
|
||||
ConfigDef.Importance.HIGH,
|
||||
TOPICS_EXCLUDE_DOC)
|
||||
.define(
|
||||
TOPICS_EXCLUDE_ALIAS,
|
||||
ConfigDef.Type.LIST,
|
||||
null,
|
||||
ConfigDef.Importance.HIGH,
|
||||
"Deprecated. Use " + TOPICS_EXCLUDE + " instead.")
|
||||
.define(
|
||||
CONFIG_PROPERTIES_EXCLUDE,
|
||||
ConfigDef.Type.LIST,
|
||||
CONFIG_PROPERTIES_EXCLUDE_DEFAULT,
|
||||
ConfigDef.Importance.HIGH,
|
||||
CONFIG_PROPERTIES_EXCLUDE_DOC)
|
||||
.define(
|
||||
CONFIG_PROPERTIES_EXCLUDE_ALIAS,
|
||||
ConfigDef.Type.LIST,
|
||||
null,
|
||||
ConfigDef.Importance.HIGH,
|
||||
"Deprecated. Use " + CONFIG_PROPERTIES_EXCLUDE + " instead.")
|
||||
.define(
|
||||
TOPIC_FILTER_CLASS,
|
||||
ConfigDef.Type.CLASS,
|
||||
|
|
|
|||
|
|
@ -171,54 +171,6 @@ public class MirrorMakerConfigTest {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConfigBackwardsCompatibility() {
|
||||
MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
|
||||
"clusters", "a, b",
|
||||
"groups.blacklist", "group-7",
|
||||
"topics.blacklist", "topic3",
|
||||
"config.properties.blacklist", "property-3",
|
||||
"topic.filter.class", DefaultTopicFilter.class.getName()));
|
||||
SourceAndTarget sourceAndTarget = new SourceAndTarget("source", "target");
|
||||
Map<String, String> connectorProps = mirrorConfig.connectorBaseConfig(sourceAndTarget,
|
||||
MirrorSourceConnector.class);
|
||||
MirrorSourceConfig sourceConfig = new MirrorSourceConfig(connectorProps);
|
||||
DefaultTopicFilter.TopicFilterConfig filterConfig =
|
||||
new DefaultTopicFilter.TopicFilterConfig(connectorProps);
|
||||
|
||||
assertEquals(Collections.singletonList("topic3"), filterConfig.getList("topics.exclude"),
|
||||
"Topics exclude should be backwards compatible.");
|
||||
|
||||
assertEquals(Collections.singletonList("property-3"), sourceConfig.getList("config.properties.exclude"),
|
||||
"Config properties exclude should be backwards compatible.");
|
||||
|
||||
MirrorCheckpointConfig checkpointConfig = new MirrorCheckpointConfig(connectorProps);
|
||||
assertEquals(Collections.singletonList("group-7"), checkpointConfig.getList("groups.exclude"),
|
||||
"Groups exclude should be backwards compatible.");
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConfigBackwardsCompatibilitySourceTarget() {
|
||||
MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
|
||||
"clusters", "a, b",
|
||||
"source->target.topics.blacklist", "topic3",
|
||||
"source->target.groups.blacklist", "group-7",
|
||||
"topic.filter.class", DefaultTopicFilter.class.getName()));
|
||||
SourceAndTarget sourceAndTarget = new SourceAndTarget("source", "target");
|
||||
Map<String, String> connectorProps = mirrorConfig.connectorBaseConfig(sourceAndTarget,
|
||||
MirrorSourceConnector.class);
|
||||
MirrorCheckpointConfig connectorConfig = new MirrorCheckpointConfig(connectorProps);
|
||||
DefaultTopicFilter.TopicFilterConfig filterConfig =
|
||||
new DefaultTopicFilter.TopicFilterConfig(connectorProps);
|
||||
|
||||
assertEquals(Collections.singletonList("topic3"), filterConfig.getList("topics.exclude"),
|
||||
"Topics exclude should be backwards compatible.");
|
||||
|
||||
assertEquals(Collections.singletonList("group-7"), connectorConfig.getList("groups.exclude"),
|
||||
"Groups exclude should be backwards compatible.");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncludesTopicFilterProperties() {
|
||||
MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
|
||||
|
|
|
|||
|
|
@ -66,17 +66,6 @@ public class MirrorSourceConfigTest {
|
|||
"config.properties.exclude incorrectly included prop2");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConfigBackwardsCompatibility() {
|
||||
MirrorSourceConfig config = new MirrorSourceConfig(
|
||||
makeProps("config.properties.blacklist", "prop1",
|
||||
"topics.blacklist", "topic-1"));
|
||||
assertFalse(config.configPropertyFilter().shouldReplicateConfigProperty("prop1"));
|
||||
assertTrue(config.configPropertyFilter().shouldReplicateConfigProperty("prop2"));
|
||||
assertFalse(config.topicFilter().shouldReplicateTopic("topic-1"));
|
||||
assertTrue(config.topicFilter().shouldReplicateTopic("topic-2"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoTopics() {
|
||||
MirrorSourceConfig config = new MirrorSourceConfig(makeProps("topics", ""));
|
||||
|
|
|
|||
|
|
@ -49,6 +49,15 @@
|
|||
<li>The <code>add.source.alias.to.metrics</code> configuration was removed from <code>MirrorSourceConnector</code>.
|
||||
The source cluster alias is now always added to the metrics.
|
||||
</li>
|
||||
<li>The <code>config.properties.blacklist</code> was removed from the <code>org.apache.kafka.connect.mirror.MirrorSourceConfig</code>
|
||||
Please use <code>config.properties.exclude</code> instead.
|
||||
</li>
|
||||
<li>The <code>topics.blacklist</code> was removed from the <code>org.apache.kafka.connect.mirror.MirrorSourceConfig</code>
|
||||
Please use <code>topics.exclude</code> instead.
|
||||
</li>
|
||||
<li>The <code>groups.blacklist</code> was removed from the <code>org.apache.kafka.connect.mirror.MirrorSourceConfig</code>
|
||||
Please use <code>groups.exclude</code> instead.
|
||||
</li>
|
||||
</ul>
|
||||
</li>
|
||||
<li><b>Tools</b>
|
||||
|
|
|
|||
Loading…
Reference in New Issue