KAFKA-15102: Add replication.policy.internal.topic.separator.enabled property to MirrorMaker 2 (KIP-949) (#14082)

Reviewers: Chris Egerton <chrise@aiven.io>
This commit is contained in:
Omnia G.H Ibrahim 2023-08-16 00:58:52 +01:00 committed by Chris Egerton
parent 228c3fe138
commit f9fde0eec1
No known key found for this signature in database
GPG Key ID: B90BFC8C4393F2F0
4 changed files with 95 additions and 4 deletions

View File

@ -33,8 +33,12 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable
public static final String SEPARATOR_CONFIG = MirrorClientConfig.REPLICATION_POLICY_SEPARATOR;
public static final String SEPARATOR_DEFAULT = ".";
public static final String INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG = MirrorClientConfig.INTERNAL_TOPIC_SEPARATOR_ENABLED;
public static final Boolean INTERNAL_TOPIC_SEPARATOR_ENABLED_DEFAULT = true;
private String separator = SEPARATOR_DEFAULT;
private Pattern separatorPattern = Pattern.compile(Pattern.quote(SEPARATOR_DEFAULT));
private boolean isInternalTopicSeparatorEnabled = true;
@Override
public void configure(Map<String, ?> props) {
@ -42,6 +46,13 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable
separator = (String) props.get(SEPARATOR_CONFIG);
log.info("Using custom remote topic separator: '{}'", separator);
separatorPattern = Pattern.compile(Pattern.quote(separator));
if (props.containsKey(INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG)) {
isInternalTopicSeparatorEnabled = Boolean.parseBoolean(props.get(INTERNAL_TOPIC_SEPARATOR_ENABLED_CONFIG).toString());
if (!isInternalTopicSeparatorEnabled) {
log.warn("Disabling custom topic separator for internal topics; will use '.' instead of '{}'", separator);
}
}
}
}
@ -71,17 +82,20 @@ public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable
}
}
private String internalSeparator() {
return isInternalTopicSeparatorEnabled ? separator : ".";
}
private String internalSuffix() {
return separator + "internal";
return internalSeparator() + "internal";
}
private String checkpointsTopicSuffix() {
return separator + "checkpoints" + internalSuffix();
return internalSeparator() + "checkpoints" + internalSuffix();
}
@Override
public String offsetSyncsTopic(String clusterAlias) {
return "mm2-offset-syncs" + separator + clusterAlias + internalSuffix();
return "mm2-offset-syncs" + internalSeparator() + clusterAlias + internalSuffix();
}
@Override

View File

@ -52,6 +52,14 @@ public class MirrorClientConfig extends AbstractConfig {
private static final String REPLICATION_POLICY_SEPARATOR_DOC = "Separator used in remote topic naming convention.";
public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT =
DefaultReplicationPolicy.SEPARATOR_DEFAULT;
public static final String INTERNAL_TOPIC_SEPARATOR_ENABLED = "replication.policy.internal.topic.separator.enabled";
public static final String INTERNAL_TOPIC_SEPARATOR_ENABLED_DOC =
"Whether to use replication.policy.separator to control the names of topics used for checkpoints and offset syncs. " +
"By default, custom separators are used in these topic names; however, if upgrading MirrorMaker 2 from older versions " +
"that did not allow for these topic names to be customized, it may be necessary to set this property to 'false' in order " +
"to continue using the same names for those topics.";
public static final Boolean INTERNAL_TOPIC_SEPARATOR_ENABLED_DEFAULT =
DefaultReplicationPolicy.INTERNAL_TOPIC_SEPARATOR_ENABLED_DEFAULT;
public static final String ADMIN_CLIENT_PREFIX = "admin.";
public static final String CONSUMER_CLIENT_PREFIX = "consumer.";
@ -122,6 +130,12 @@ public class MirrorClientConfig extends AbstractConfig {
REPLICATION_POLICY_SEPARATOR_DEFAULT,
ConfigDef.Importance.LOW,
REPLICATION_POLICY_SEPARATOR_DOC)
.define(
INTERNAL_TOPIC_SEPARATOR_ENABLED,
ConfigDef.Type.BOOLEAN,
INTERNAL_TOPIC_SEPARATOR_ENABLED_DEFAULT,
ConfigDef.Importance.LOW,
INTERNAL_TOPIC_SEPARATOR_ENABLED_DOC)
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,

View File

@ -17,14 +17,25 @@
package org.apache.kafka.connect.mirror;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
public class ReplicationPolicyTest {
private static final DefaultReplicationPolicy DEFAULT_REPLICATION_POLICY = new DefaultReplicationPolicy();
@BeforeEach
public void setUp() {
DEFAULT_REPLICATION_POLICY.configure(Collections.emptyMap());
}
@Test
public void testInternalTopic() {
// starts with '__'
@ -39,4 +50,45 @@ public class ReplicationPolicyTest {
// non-internal topic.
assertFalse(DEFAULT_REPLICATION_POLICY.isInternalTopic("mm2-offsets_CLUSTER_internal"));
}
@Test
public void offsetSyncsTopic_shouldBeEffectedByInternalTopicSeparatorEnabled() {
Map<String, Object> config = new HashMap<>();
config.put(MirrorClientConfig.REPLICATION_POLICY_SEPARATOR, "__");
config.put(MirrorClientConfig.INTERNAL_TOPIC_SEPARATOR_ENABLED, false);
DEFAULT_REPLICATION_POLICY.configure(config);
assertEquals("mm2-offset-syncs.CLUSTER.internal", DEFAULT_REPLICATION_POLICY.offsetSyncsTopic("CLUSTER"));
config.put(MirrorClientConfig.INTERNAL_TOPIC_SEPARATOR_ENABLED, true);
DEFAULT_REPLICATION_POLICY.configure(config);
assertEquals("mm2-offset-syncs__CLUSTER__internal", DEFAULT_REPLICATION_POLICY.offsetSyncsTopic("CLUSTER"));
}
@Test
public void checkpointsTopic_shouldBeEffectedByInternalTopicSeparatorEnabled() {
Map<String, Object> config = new HashMap<>();
config.put(MirrorClientConfig.REPLICATION_POLICY_SEPARATOR, "__");
config.put(MirrorClientConfig.INTERNAL_TOPIC_SEPARATOR_ENABLED, false);
DEFAULT_REPLICATION_POLICY.configure(config);
assertEquals("CLUSTER.checkpoints.internal", DEFAULT_REPLICATION_POLICY.checkpointsTopic("CLUSTER"));
config.put(MirrorClientConfig.INTERNAL_TOPIC_SEPARATOR_ENABLED, true);
DEFAULT_REPLICATION_POLICY.configure(config);
assertEquals("CLUSTER__checkpoints__internal", DEFAULT_REPLICATION_POLICY.checkpointsTopic("CLUSTER"));
}
@Test
public void heartbeatsTopic_shouldNotBeEffectedByInternalTopicSeparatorConfig() {
Map<String, Object> config = new HashMap<>();
config.put(MirrorClientConfig.REPLICATION_POLICY_SEPARATOR, "__");
config.put(MirrorClientConfig.INTERNAL_TOPIC_SEPARATOR_ENABLED, true);
assertEquals("heartbeats", DEFAULT_REPLICATION_POLICY.heartbeatsTopic());
config.put(MirrorClientConfig.INTERNAL_TOPIC_SEPARATOR_ENABLED, false);
DEFAULT_REPLICATION_POLICY.configure(config);
assertEquals("heartbeats", DEFAULT_REPLICATION_POLICY.heartbeatsTopic());
}
}

View File

@ -135,6 +135,11 @@ public class MirrorConnectorConfig extends AbstractConfig {
private static final String CONSUMER_POLL_TIMEOUT_MILLIS_DOC = "Timeout when polling source cluster.";
public static final long CONSUMER_POLL_TIMEOUT_MILLIS_DEFAULT = 1000L;
private static final String INTERNAL_TOPIC_SEPARATOR_ENABLED = MirrorClientConfig.INTERNAL_TOPIC_SEPARATOR_ENABLED;
private static final String INTERNAL_TOPIC_SEPARATOR_ENABLED_DOC = MirrorClientConfig.INTERNAL_TOPIC_SEPARATOR_ENABLED_DOC;
public static final Boolean INTERNAL_TOPIC_SEPARATOR_ENABLED_DEFAULT =
DefaultReplicationPolicy.INTERNAL_TOPIC_SEPARATOR_ENABLED_DEFAULT;
public static final String ADMIN_TASK_TIMEOUT_MILLIS = "admin.timeout.ms";
private static final String ADMIN_TASK_TIMEOUT_MILLIS_DOC = "Timeout for administrative tasks, e.g. detecting new topics.";
public static final long ADMIN_TASK_TIMEOUT_MILLIS_DEFAULT = 60000L;
@ -672,6 +677,12 @@ public class MirrorConnectorConfig extends AbstractConfig {
REPLICATION_POLICY_SEPARATOR_DEFAULT,
ConfigDef.Importance.LOW,
REPLICATION_POLICY_SEPARATOR_DOC)
.define(
INTERNAL_TOPIC_SEPARATOR_ENABLED,
ConfigDef.Type.BOOLEAN,
INTERNAL_TOPIC_SEPARATOR_ENABLED_DEFAULT,
ConfigDef.Importance.LOW,
INTERNAL_TOPIC_SEPARATOR_ENABLED_DOC)
.define(
REPLICATION_FACTOR,
ConfigDef.Type.INT,