diff --git a/config/connect-mirror-maker.properties b/config/connect-mirror-maker.properties
new file mode 100644
index 00000000000..16c1b791d3d
--- /dev/null
+++ b/config/connect-mirror-maker.properties
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under A or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see org.apache.kafka.clients.consumer.ConsumerConfig for more details
+
+# Sample MirrorMaker 2.0 top-level configuration file
+# Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties
+
+# specify any number of cluster aliases
+clusters = A, B, C
+
+# connection information for each cluster
+A.bootstrap.servers = A_host1:9092, A_host2:9092, A_host3:9092
+B.bootstrap.servers = B_host1:9092, B_host2:9092, B_host3:9092
+C.bootstrap.servers = C_host1:9092, C_host2:9092, C_host3:9092
+
+# enable and configure individual replication flows
+A->B.enabled = true
+A->B.topics = foo-.*
+B->C.enabled = true
+B->C.topics = bar-.*
+
+# customize as needed
+# replication.policy.separator = _
+# sync.topic.acls.enabled = false
+# emit.heartbeats.interval.seconds = 5
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
index 8767a62045e..4dea6ce80bd 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.source;
import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.List;
import java.util.Map;
@@ -88,7 +89,13 @@ public abstract class SourceTask implements Task {
/**
*
- * Commit an individual {@link SourceRecord} when the callback from the producer client is received, or if a record is filtered by a transformation.
+ * Commit an individual {@link SourceRecord} when the callback from the producer client is received. This method is
+ * also called when a record is filtered by a transformation, and thus will never be ACK'd by a broker.
+ *
+ *
+ * This is an alias for {@link commitRecord(SourceRecord, RecordMetadata)} for backwards compatibility. The default
+ * implementation of {@link commitRecord(SourceRecord, RecordMetadata)} just calls this method. It is not necessary
+ * to override both methods.
*
*
* SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
@@ -96,10 +103,37 @@ public abstract class SourceTask implements Task {
* in their own system.
*
*
- * @param record {@link SourceRecord} that was successfully sent via the producer.
+ * @param record {@link SourceRecord} that was successfully sent via the producer or filtered by a transformation
* @throws InterruptedException
+ * @see commitRecord(SourceRecord, RecordMetadata)
*/
public void commitRecord(SourceRecord record) throws InterruptedException {
// This space intentionally left blank.
}
+
+ /**
+ *
+ * Commit an individual {@link SourceRecord} when the callback from the producer client is received. This method is
+ * also called when a record is filtered by a transformation, and thus will never be ACK'd by a broker. In this case
+ * {@code metadata} will be null.
+ *
+ *
+ * SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
+ * automatically. This hook is provided for systems that also need to store offsets internally
+ * in their own system.
+ *
+ *
+ * The default implementation just calls @{link commitRecord(SourceRecord)}, which is a nop by default. It is
+ * not necessary to implement both methods.
+ *
+ *
+ * @param record {@link SourceRecord} that was successfully sent via the producer or filtered by a transformation
+ * @param metadata {@link RecordMetadata} record metadata returned from the broker, or null if the record was filtered
+ * @throws InterruptedException
+ */
+ public void commitRecord(SourceRecord record, RecordMetadata metadata)
+ throws InterruptedException {
+ // by default, just call other method for backwards compatability
+ commitRecord(record);
+ }
}
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java
new file mode 100644
index 00000000000..74db7461ef0
--- /dev/null
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Checkpoint.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.nio.ByteBuffer;
+
+/** Checkpoint records emitted from MirrorCheckpointConnector. Encodes remote consumer group state. */
+public class Checkpoint {
+ public static final String TOPIC_KEY = "topic";
+ public static final String PARTITION_KEY = "partition";
+ public static final String CONSUMER_GROUP_ID_KEY = "group";
+ public static final String UPSTREAM_OFFSET_KEY = "upstreamOffset";
+ public static final String DOWNSTREAM_OFFSET_KEY = "offset";
+ public static final String METADATA_KEY = "metadata";
+ public static final String VERSION_KEY = "version";
+ public static final short VERSION = 0;
+
+ public static final Schema VALUE_SCHEMA_V0 = new Schema(
+ new Field(UPSTREAM_OFFSET_KEY, Type.INT64),
+ new Field(DOWNSTREAM_OFFSET_KEY, Type.INT64),
+ new Field(METADATA_KEY, Type.STRING));
+
+ public static final Schema KEY_SCHEMA = new Schema(
+ new Field(CONSUMER_GROUP_ID_KEY, Type.STRING),
+ new Field(TOPIC_KEY, Type.STRING),
+ new Field(PARTITION_KEY, Type.INT32));
+
+ public static final Schema HEADER_SCHEMA = new Schema(
+ new Field(VERSION_KEY, Type.INT16));
+
+ private String consumerGroupId;
+ private TopicPartition topicPartition;
+ private long upstreamOffset;
+ private long downstreamOffset;
+ private String metadata;
+
+ public Checkpoint(String consumerGroupId, TopicPartition topicPartition, long upstreamOffset,
+ long downstreamOffset, String metadata) {
+ this.consumerGroupId = consumerGroupId;
+ this.topicPartition = topicPartition;
+ this.upstreamOffset = upstreamOffset;
+ this.downstreamOffset = downstreamOffset;
+ this.metadata = metadata;
+ }
+
+ public String consumerGroupId() {
+ return consumerGroupId;
+ }
+
+ public TopicPartition topicPartition() {
+ return topicPartition;
+ }
+
+ public long upstreamOffset() {
+ return upstreamOffset;
+ }
+
+ public long downstreamOffset() {
+ return downstreamOffset;
+ }
+
+ public String metadata() {
+ return metadata;
+ }
+
+ public OffsetAndMetadata offsetAndMetadata() {
+ return new OffsetAndMetadata(downstreamOffset, metadata);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Checkpoint{consumerGroupId=%s, topicPartition=%s, "
+ + "upstreamOffset=%d, downstreamOffset=%d, metatadata=%s}",
+ consumerGroupId, topicPartition, upstreamOffset, downstreamOffset, metadata);
+ }
+
+ ByteBuffer serializeValue(short version) {
+ Struct header = headerStruct(version);
+ Schema valueSchema = valueSchema(version);
+ Struct valueStruct = valueStruct(valueSchema);
+ ByteBuffer buffer = ByteBuffer.allocate(HEADER_SCHEMA.sizeOf(header) + valueSchema.sizeOf(valueStruct));
+ HEADER_SCHEMA.write(buffer, header);
+ valueSchema.write(buffer, valueStruct);
+ buffer.flip();
+ return buffer;
+ }
+
+ ByteBuffer serializeKey() {
+ Struct struct = keyStruct();
+ ByteBuffer buffer = ByteBuffer.allocate(KEY_SCHEMA.sizeOf(struct));
+ KEY_SCHEMA.write(buffer, struct);
+ buffer.flip();
+ return buffer;
+ }
+
+ public static Checkpoint deserializeRecord(ConsumerRecord record) {
+ ByteBuffer value = ByteBuffer.wrap(record.value());
+ Struct header = HEADER_SCHEMA.read(value);
+ short version = header.getShort(VERSION_KEY);
+ Schema valueSchema = valueSchema(version);
+ Struct valueStruct = valueSchema.read(value);
+ long upstreamOffset = valueStruct.getLong(UPSTREAM_OFFSET_KEY);
+ long downstreamOffset = valueStruct.getLong(DOWNSTREAM_OFFSET_KEY);
+ String metadata = valueStruct.getString(METADATA_KEY);
+ Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(record.key()));
+ String group = keyStruct.getString(CONSUMER_GROUP_ID_KEY);
+ String topic = keyStruct.getString(TOPIC_KEY);
+ int partition = keyStruct.getInt(PARTITION_KEY);
+ return new Checkpoint(group, new TopicPartition(topic, partition), upstreamOffset,
+ downstreamOffset, metadata);
+ }
+
+ private static Schema valueSchema(short version) {
+ assert version == 0;
+ return VALUE_SCHEMA_V0;
+ }
+
+ private Struct valueStruct(Schema schema) {
+ Struct struct = new Struct(schema);
+ struct.set(UPSTREAM_OFFSET_KEY, upstreamOffset);
+ struct.set(DOWNSTREAM_OFFSET_KEY, downstreamOffset);
+ struct.set(METADATA_KEY, metadata);
+ return struct;
+ }
+
+ private Struct keyStruct() {
+ Struct struct = new Struct(KEY_SCHEMA);
+ struct.set(CONSUMER_GROUP_ID_KEY, consumerGroupId);
+ struct.set(TOPIC_KEY, topicPartition.topic());
+ struct.set(PARTITION_KEY, topicPartition.partition());
+ return struct;
+ }
+
+ private Struct headerStruct(short version) {
+ Struct struct = new Struct(HEADER_SCHEMA);
+ struct.set(VERSION_KEY, version);
+ return struct;
+ }
+
+ Map connectPartition() {
+ Map partition = new HashMap<>();
+ partition.put(CONSUMER_GROUP_ID_KEY, consumerGroupId);
+ partition.put(TOPIC_KEY, topicPartition.topic());
+ partition.put(PARTITION_KEY, topicPartition.partition());
+ return partition;
+ }
+
+ static String unwrapGroup(Map connectPartition) {
+ return connectPartition.get(CONSUMER_GROUP_ID_KEY).toString();
+ }
+
+ byte[] recordKey() {
+ return serializeKey().array();
+ }
+
+ byte[] recordValue() {
+ return serializeValue(VERSION).array();
+ }
+};
+
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java
new file mode 100644
index 00000000000..30d75348a8a
--- /dev/null
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/DefaultReplicationPolicy.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.Configurable;
+
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Defines remote topics like "us-west.topic1". The separator is customizable and defaults to a period. */
+public class DefaultReplicationPolicy implements ReplicationPolicy, Configurable {
+
+ private static final Logger log = LoggerFactory.getLogger(DefaultReplicationPolicy.class);
+
+ // In order to work with various metrics stores, we allow custom separators.
+ public static final String SEPARATOR_CONFIG = MirrorClientConfig.REPLICATION_POLICY_SEPARATOR;
+ public static final String SEPARATOR_DEFAULT = ".";
+
+ private String separator = SEPARATOR_DEFAULT;
+ private Pattern separatorPattern = Pattern.compile(Pattern.quote(SEPARATOR_DEFAULT));
+
+ @Override
+ public void configure(Map props) {
+ if (props.containsKey(SEPARATOR_CONFIG)) {
+ separator = (String) props.get(SEPARATOR_CONFIG);
+ log.info("Using custom remote topic separator: '{}'", separator);
+ separatorPattern = Pattern.compile(Pattern.quote(separator));
+ }
+ }
+
+ @Override
+ public String formatRemoteTopic(String sourceClusterAlias, String topic) {
+ return sourceClusterAlias + separator + topic;
+ }
+
+ @Override
+ public String topicSource(String topic) {
+ String[] parts = separatorPattern.split(topic);
+ if (parts.length < 2) {
+ // this is not a remote topic
+ return null;
+ } else {
+ return parts[0];
+ }
+ }
+
+ @Override
+ public String upstreamTopic(String topic) {
+ String source = topicSource(topic);
+ if (source == null) {
+ return null;
+ } else {
+ return topic.substring(source.length() + separator.length());
+ }
+ }
+}
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Heartbeat.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Heartbeat.java
new file mode 100644
index 00000000000..a34ce9efb32
--- /dev/null
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/Heartbeat.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.nio.ByteBuffer;
+
+/** Heartbeat message sent from MirrorHeartbeatTask to target cluster. Heartbeats are always replicated. */
+public class Heartbeat {
+ public static final String SOURCE_CLUSTER_ALIAS_KEY = "sourceClusterAlias";
+ public static final String TARGET_CLUSTER_ALIAS_KEY = "targetClusterAlias";
+ public static final String TIMESTAMP_KEY = "timestamp";
+ public static final String VERSION_KEY = "version";
+ public static final short VERSION = 0;
+
+ public static final Schema VALUE_SCHEMA_V0 = new Schema(
+ new Field(TIMESTAMP_KEY, Type.INT64));
+
+ public static final Schema KEY_SCHEMA = new Schema(
+ new Field(SOURCE_CLUSTER_ALIAS_KEY, Type.STRING),
+ new Field(TARGET_CLUSTER_ALIAS_KEY, Type.STRING));
+
+ public static final Schema HEADER_SCHEMA = new Schema(
+ new Field(VERSION_KEY, Type.INT16));
+
+ private String sourceClusterAlias;
+ private String targetClusterAlias;
+ private long timestamp;
+
+ public Heartbeat(String sourceClusterAlias, String targetClusterAlias, long timestamp) {
+ this.sourceClusterAlias = sourceClusterAlias;
+ this.targetClusterAlias = targetClusterAlias;
+ this.timestamp = timestamp;
+ }
+
+ public String sourceClusterAlias() {
+ return sourceClusterAlias;
+ }
+
+ public String targetClusterAlias() {
+ return targetClusterAlias;
+ }
+
+ public long timestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Heartbeat{sourceClusterAlias=%s, targetClusterAlias=%s, timestamp=%d}",
+ sourceClusterAlias, targetClusterAlias, timestamp);
+ }
+
+ ByteBuffer serializeValue(short version) {
+ Schema valueSchema = valueSchema(version);
+ Struct header = headerStruct(version);
+ Struct value = valueStruct(valueSchema);
+ ByteBuffer buffer = ByteBuffer.allocate(HEADER_SCHEMA.sizeOf(header) + valueSchema.sizeOf(value));
+ HEADER_SCHEMA.write(buffer, header);
+ valueSchema.write(buffer, value);
+ buffer.flip();
+ return buffer;
+ }
+
+ ByteBuffer serializeKey() {
+ Struct struct = keyStruct();
+ ByteBuffer buffer = ByteBuffer.allocate(KEY_SCHEMA.sizeOf(struct));
+ KEY_SCHEMA.write(buffer, struct);
+ buffer.flip();
+ return buffer;
+ }
+
+ public static Heartbeat deserializeRecord(ConsumerRecord record) {
+ ByteBuffer value = ByteBuffer.wrap(record.value());
+ Struct headerStruct = HEADER_SCHEMA.read(value);
+ short version = headerStruct.getShort(VERSION_KEY);
+ Struct valueStruct = valueSchema(version).read(value);
+ long timestamp = valueStruct.getLong(TIMESTAMP_KEY);
+ Struct keyStruct = KEY_SCHEMA.read(ByteBuffer.wrap(record.key()));
+ String sourceClusterAlias = keyStruct.getString(SOURCE_CLUSTER_ALIAS_KEY);
+ String targetClusterAlias = keyStruct.getString(TARGET_CLUSTER_ALIAS_KEY);
+ return new Heartbeat(sourceClusterAlias, targetClusterAlias, timestamp);
+ }
+
+ private Struct headerStruct(short version) {
+ Struct struct = new Struct(HEADER_SCHEMA);
+ struct.set(VERSION_KEY, version);
+ return struct;
+ }
+
+ private Struct valueStruct(Schema schema) {
+ Struct struct = new Struct(schema);
+ struct.set(TIMESTAMP_KEY, timestamp);
+ return struct;
+ }
+
+ private Struct keyStruct() {
+ Struct struct = new Struct(KEY_SCHEMA);
+ struct.set(SOURCE_CLUSTER_ALIAS_KEY, sourceClusterAlias);
+ struct.set(TARGET_CLUSTER_ALIAS_KEY, targetClusterAlias);
+ return struct;
+ }
+
+ Map connectPartition() {
+ Map partition = new HashMap<>();
+ partition.put(SOURCE_CLUSTER_ALIAS_KEY, sourceClusterAlias);
+ partition.put(TARGET_CLUSTER_ALIAS_KEY, targetClusterAlias);
+ return partition;
+ }
+
+ byte[] recordKey() {
+ return serializeKey().array();
+ }
+
+ byte[] recordValue() {
+ return serializeValue(VERSION).array();
+ }
+
+ private static Schema valueSchema(short version) {
+ assert version == 0;
+ return VALUE_SCHEMA_V0;
+ }
+};
+
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
new file mode 100644
index 00000000000..17d18ecb58a
--- /dev/null
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.protocol.types.SchemaException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Collections;
+import java.util.Collection;
+import java.util.stream.Collectors;
+import java.util.concurrent.ExecutionException;
+
+/** Interprets MM2's internal topics (checkpoints, heartbeats) on a given cluster.
+ *
+ * Given a top-level "mm2.properties" configuration file, MirrorClients can be constructed
+ * for individual clusters as follows:
+ *
+ *
+ * MirrorMakerConfig mmConfig = new MirrorMakerConfig(props);
+ * MirrorClientConfig mmClientConfig = mmConfig.clientConfig("some-cluster");
+ * MirrorClient mmClient = new Mirrorclient(mmClientConfig);
+ *
+ */
+public class MirrorClient implements AutoCloseable {
+ private static final Logger log = LoggerFactory.getLogger(MirrorClient.class);
+
+ private AdminClient adminClient;
+ private ReplicationPolicy replicationPolicy;
+ private Map consumerConfig;
+
+ public MirrorClient(Map props) {
+ this(new MirrorClientConfig(props));
+ }
+
+ public MirrorClient(MirrorClientConfig config) {
+ adminClient = AdminClient.create(config.adminConfig());
+ consumerConfig = config.consumerConfig();
+ replicationPolicy = config.replicationPolicy();
+ }
+
+ // for testing
+ MirrorClient(AdminClient adminClient, ReplicationPolicy replicationPolicy,
+ Map consumerConfig) {
+ this.adminClient = adminClient;
+ this.replicationPolicy = replicationPolicy;
+ this.consumerConfig = consumerConfig;
+ }
+
+ /** Close internal clients. */
+ public void close() {
+ adminClient.close();
+ }
+
+ /** Get the ReplicationPolicy instance used to interpret remote topics. This instance is constructed based on
+ * relevant configuration properties, including {@code replication.policy.class}. */
+ public ReplicationPolicy replicationPolicy() {
+ return replicationPolicy;
+ }
+
+ /** Compute shortest number of hops from an upstream source cluster.
+ * For example, given replication flow A->B->C, there are two hops from A to C.
+ * Returns -1 if upstream cluster is unreachable.
+ */
+ public int replicationHops(String upstreamClusterAlias) throws InterruptedException {
+ return heartbeatTopics().stream()
+ .map(x -> countHopsForTopic(x, upstreamClusterAlias))
+ .filter(x -> x != -1)
+ .mapToInt(x -> x)
+ .min()
+ .orElse(-1);
+ }
+
+ /** Find all heartbeat topics on this cluster. Heartbeat topics are replicated from other clusters. */
+ public Set heartbeatTopics() throws InterruptedException {
+ return listTopics().stream()
+ .filter(this::isHeartbeatTopic)
+ .collect(Collectors.toSet());
+ }
+
+ /** Find all checkpoint topics on this cluster. */
+ public Set checkpointTopics() throws InterruptedException {
+ return listTopics().stream()
+ .filter(this::isCheckpointTopic)
+ .collect(Collectors.toSet());
+ }
+
+ /** Find upstream clusters, which may be multiple hops away, based on incoming heartbeats. */
+ public Set upstreamClusters() throws InterruptedException {
+ return listTopics().stream()
+ .filter(this::isHeartbeatTopic)
+ .flatMap(x -> allSources(x).stream())
+ .distinct()
+ .collect(Collectors.toSet());
+ }
+
+ /** Find all remote topics on this cluster. This does not include internal topics (heartbeats, checkpoints). */
+ public Set remoteTopics() throws InterruptedException {
+ return listTopics().stream()
+ .filter(this::isRemoteTopic)
+ .collect(Collectors.toSet());
+ }
+
+ /** Find all remote topics that have been replicated directly from the given source cluster. */
+ public Set remoteTopics(String source) throws InterruptedException {
+ return listTopics().stream()
+ .filter(this::isRemoteTopic)
+ .filter(x -> source.equals(replicationPolicy.topicSource(x)))
+ .distinct()
+ .collect(Collectors.toSet());
+ }
+
+ /** Translate a remote consumer group's offsets into corresponding local offsets. Topics are automatically
+ * renamed according to the ReplicationPolicy.
+ * @param consumerGroupId group ID of remote consumer group
+ * @param remoteClusterAlias alias of remote cluster
+ * @param timeout timeout
+ */
+ public Map remoteConsumerOffsets(String consumerGroupId,
+ String remoteClusterAlias, Duration timeout) {
+ long deadline = System.currentTimeMillis() + timeout.toMillis();
+ Map offsets = new HashMap<>();
+ KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig,
+ new ByteArrayDeserializer(), new ByteArrayDeserializer());
+ try {
+ // checkpoint topics are not "remote topics", as they are not replicated. So we don't need
+ // to use ReplicationPolicy to create the checkpoint topic here.
+ String checkpointTopic = remoteClusterAlias + MirrorClientConfig.CHECKPOINTS_TOPIC_SUFFIX;
+ List checkpointAssignment =
+ Collections.singletonList(new TopicPartition(checkpointTopic, 0));
+ consumer.assign(checkpointAssignment);
+ consumer.seekToBeginning(checkpointAssignment);
+ while (System.currentTimeMillis() < deadline && !endOfStream(consumer, checkpointAssignment)) {
+ ConsumerRecords records = consumer.poll(timeout);
+ for (ConsumerRecord record : records) {
+ try {
+ Checkpoint checkpoint = Checkpoint.deserializeRecord(record);
+ if (checkpoint.consumerGroupId().equals(consumerGroupId)) {
+ offsets.put(checkpoint.topicPartition(), checkpoint.offsetAndMetadata());
+ }
+ } catch (SchemaException e) {
+ log.info("Could not deserialize record. Skipping.", e);
+ }
+ }
+ }
+ log.info("Consumed {} checkpoint records for {} from {}.", offsets.size(),
+ consumerGroupId, checkpointTopic);
+ } finally {
+ consumer.close();
+ }
+ return offsets;
+ }
+
+ Set listTopics() throws InterruptedException {
+ try {
+ return adminClient.listTopics().names().get();
+ } catch (ExecutionException e) {
+ throw new KafkaException(e.getCause());
+ }
+ }
+
+ int countHopsForTopic(String topic, String sourceClusterAlias) {
+ int hops = 0;
+ while (true) {
+ hops++;
+ String source = replicationPolicy.topicSource(topic);
+ if (source == null) {
+ return -1;
+ }
+ if (source.equals(sourceClusterAlias)) {
+ return hops;
+ }
+ topic = replicationPolicy.upstreamTopic(topic);
+ }
+ }
+
+ boolean isHeartbeatTopic(String topic) {
+ // heartbeats are replicated, so we must use ReplicationPolicy here
+ return MirrorClientConfig.HEARTBEATS_TOPIC.equals(replicationPolicy.originalTopic(topic));
+ }
+
+ boolean isCheckpointTopic(String topic) {
+ // checkpoints are not replicated, so we don't need to use ReplicationPolicy here
+ return topic.endsWith(MirrorClientConfig.CHECKPOINTS_TOPIC_SUFFIX);
+ }
+
+ boolean isRemoteTopic(String topic) {
+ return !replicationPolicy.isInternalTopic(topic)
+ && replicationPolicy.topicSource(topic) != null;
+ }
+
+ Set allSources(String topic) {
+ Set sources = new HashSet<>();
+ String source = replicationPolicy.topicSource(topic);
+ while (source != null) {
+ sources.add(source);
+ topic = replicationPolicy.upstreamTopic(topic);
+ source = replicationPolicy.topicSource(topic);
+ }
+ return sources;
+ }
+
+ static private boolean endOfStream(Consumer, ?> consumer, Collection assignments) {
+ Map endOffsets = consumer.endOffsets(assignments);
+ for (TopicPartition topicPartition : assignments) {
+ if (consumer.position(topicPartition) < endOffsets.get(topicPartition)) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
new file mode 100644
index 00000000000..0c163d87fb6
--- /dev/null
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClientConfig.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.clients.CommonClientConfigs;
+
+import java.util.Map;
+import java.util.HashMap;
+
+/** Configuration required for MirrorClient to talk to a given target cluster.
+ *
+ * Generally, these properties come from an mm2.properties configuration file
+ * (@see MirrorMakerConfig.clientConfig):
+ *
+ *
+ * MirrorMakerConfig mmConfig = new MirrorMakerConfig(props);
+ * MirrorClientConfig mmClientConfig = mmConfig.clientConfig("some-cluster");
+ *
+ *
+ * In addition to the properties defined here, sub-configs are supported for Admin, Consumer, and Producer clients.
+ * For example:
+ *
+ *
+ * bootstrap.servers = host1:9092
+ * consumer.client.id = mm2-client
+ * replication.policy.separator = __
+ *
+ */
+public class MirrorClientConfig extends AbstractConfig {
+ public static final String REPLICATION_POLICY_CLASS = "replication.policy.class";
+ private static final String REPLICATION_POLICY_CLASS_DOC = "Class which defines the remote topic naming convention.";
+ public static final Class REPLICATION_POLICY_CLASS_DEFAULT = DefaultReplicationPolicy.class;
+ public static final String REPLICATION_POLICY_SEPARATOR = "replication.policy.separator";
+ private static final String REPLICATION_POLICY_SEPARATOR_DOC = "Separator used in remote topic naming convention.";
+ public static final String REPLICATION_POLICY_SEPARATOR_DEFAULT =
+ DefaultReplicationPolicy.SEPARATOR_DEFAULT;
+
+ public static final String ADMIN_CLIENT_PREFIX = "admin.";
+ public static final String CONSUMER_CLIENT_PREFIX = "consumer.";
+ public static final String PRODUCER_CLIENT_PREFIX = "producer.";
+
+ static final String CHECKPOINTS_TOPIC_SUFFIX = ".checkpoints.internal"; // internal so not replicated
+ static final String HEARTBEATS_TOPIC = "heartbeats";
+
+ MirrorClientConfig(Map, ?> props) {
+ super(CONFIG_DEF, props, true);
+ }
+
+ public ReplicationPolicy replicationPolicy() {
+ return getConfiguredInstance(REPLICATION_POLICY_CLASS, ReplicationPolicy.class);
+ }
+
+ /** Sub-config for Admin clients. */
+ public Map adminConfig() {
+ return clientConfig(ADMIN_CLIENT_PREFIX);
+ }
+
+ /** Sub-config for Consumer clients. */
+ public Map consumerConfig() {
+ return clientConfig(CONSUMER_CLIENT_PREFIX);
+ }
+
+ /** Sub-config for Producer clients. */
+ public Map producerConfig() {
+ return clientConfig(PRODUCER_CLIENT_PREFIX);
+ }
+
+ private Map clientConfig(String prefix) {
+ Map props = new HashMap<>();
+ props.putAll(valuesWithPrefixOverride(prefix));
+ props.keySet().retainAll(CLIENT_CONFIG_DEF.names());
+ props.entrySet().removeIf(x -> x.getValue() == null);
+ return props;
+ }
+
+ // Properties passed to internal Kafka clients
+ static final ConfigDef CLIENT_CONFIG_DEF = new ConfigDef()
+ .define(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+ Type.LIST,
+ null,
+ Importance.HIGH,
+ CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
+ // security support
+ .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
+ Type.STRING,
+ CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
+ Importance.MEDIUM,
+ CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+ .withClientSslSupport()
+ .withClientSaslSupport();
+
+ static final ConfigDef CONFIG_DEF = new ConfigDef()
+ .define(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+ Type.STRING,
+ null,
+ Importance.HIGH,
+ CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
+ .define(
+ REPLICATION_POLICY_CLASS,
+ ConfigDef.Type.CLASS,
+ REPLICATION_POLICY_CLASS_DEFAULT,
+ ConfigDef.Importance.LOW,
+ REPLICATION_POLICY_CLASS_DOC)
+ .define(
+ REPLICATION_POLICY_SEPARATOR,
+ ConfigDef.Type.STRING,
+ REPLICATION_POLICY_SEPARATOR_DEFAULT,
+ ConfigDef.Importance.LOW,
+ REPLICATION_POLICY_SEPARATOR_DOC)
+ .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
+ Type.STRING,
+ CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
+ Importance.MEDIUM,
+ CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+ .withClientSslSupport()
+ .withClientSaslSupport();
+}
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java
new file mode 100644
index 00000000000..f9343198545
--- /dev/null
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/RemoteClusterUtils.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.time.Duration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Convenience methods for multi-cluster environments. Wraps MirrorClient (@see MirrorClient).
+ *
+ * Properties passed to these methods are used to construct internal Admin and Consumer clients.
+ * Sub-configs like "admin.xyz" are also supported. For example:
+ *
+ *
+ * bootstrap.servers = host1:9092
+ * consumer.client.id = mm2-client
+ *
+ *
+ * @see MirrorClientConfig for additional properties used by the internal MirrorClient.
+ *
+ */
+public final class RemoteClusterUtils {
+ private static final Logger log = LoggerFactory.getLogger(RemoteClusterUtils.class);
+
+ // utility class
+ private RemoteClusterUtils() {}
+
+ /** Find shortest number of hops from an upstream cluster.
+ * Returns -1 if the cluster is unreachable */
+ public static int replicationHops(Map properties, String upstreamClusterAlias)
+ throws InterruptedException, TimeoutException {
+ try (MirrorClient client = new MirrorClient(properties)) {
+ return client.replicationHops(upstreamClusterAlias);
+ }
+ }
+
+ /** Find all heartbeat topics */
+ public static Set heartbeatTopics(Map properties)
+ throws InterruptedException, TimeoutException {
+ try (MirrorClient client = new MirrorClient(properties)) {
+ return client.heartbeatTopics();
+ }
+ }
+
+ /** Find all checkpoint topics */
+ public static Set checkpointTopics(Map properties)
+ throws InterruptedException, TimeoutException {
+ try (MirrorClient client = new MirrorClient(properties)) {
+ return client.checkpointTopics();
+ }
+ }
+
+ /** Find all upstream clusters */
+ public static Set upstreamClusters(Map properties)
+ throws InterruptedException, TimeoutException {
+ try (MirrorClient client = new MirrorClient(properties)) {
+ return client.upstreamClusters();
+ }
+ }
+
+ /** Translate a remote consumer group's offsets into corresponding local offsets. Topics are automatically
+ * renamed according to the ReplicationPolicy.
+ * @param properties @see MirrorClientConfig
+ * @param consumerGroupId group ID of remote consumer group
+ * @param remoteClusterAlias alias of remote cluster
+ * @param timeout timeout
+ */
+ public static Map translateOffsets(Map properties,
+ String remoteClusterAlias, String consumerGroupId, Duration timeout)
+ throws InterruptedException, TimeoutException {
+ try (MirrorClient client = new MirrorClient(properties)) {
+ return client.remoteConsumerOffsets(consumerGroupId, remoteClusterAlias, timeout);
+ }
+ }
+}
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
new file mode 100644
index 00000000000..11f73f50cea
--- /dev/null
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/** Defines which topics are "remote topics". e.g. "us-west.topic1". */
+@InterfaceStability.Evolving
+public interface ReplicationPolicy {
+
+ /** How to rename remote topics; generally should be like us-west.topic1. */
+ String formatRemoteTopic(String sourceClusterAlias, String topic);
+
+ /** Source cluster alias of given remote topic, e.g. "us-west" for "us-west.topic1".
+ * Returns null if not a remote topic.
+ */
+ String topicSource(String topic);
+
+ /** Name of topic on the source cluster, e.g. "topic1" for "us-west.topic1".
+ *
+ * Topics may be replicated multiple hops, so the immediately upstream topic
+ * may itself be a remote topic.
+ *
+ * Returns null if not a remote topic.
+ */
+ String upstreamTopic(String topic);
+
+ /** The name of the original source-topic, which may have been replicated multiple hops.
+ * Returns the topic if it is not a remote topic.
+ */
+ default String originalTopic(String topic) {
+ String upstream = upstreamTopic(topic);
+ if (upstream == null) {
+ return topic;
+ } else {
+ return originalTopic(upstream);
+ }
+ }
+
+ /** Internal topics are never replicated. */
+ default boolean isInternalTopic(String topic) {
+ return topic.endsWith(".internal") || topic.endsWith("-internal") || topic.startsWith("__")
+ || topic.startsWith(".");
+ }
+}
diff --git a/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java
new file mode 100644
index 00000000000..f853dc40c16
--- /dev/null
+++ b/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/SourceAndTarget.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+/** Directional pair of clustes, where source is replicated to target. */
+public class SourceAndTarget {
+ private String source;
+ private String target;
+
+ public SourceAndTarget(String source, String target) {
+ this.source = source;
+ this.target = target;
+ }
+
+ public String source() {
+ return source;
+ }
+
+ public String target() {
+ return target;
+ }
+
+ @Override
+ public String toString() {
+ return source + "->" + target;
+ }
+
+ @Override
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return other != null && toString().equals(other.toString());
+ }
+}
+
diff --git a/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java
new file mode 100644
index 00000000000..c2536d5db85
--- /dev/null
+++ b/connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.Configurable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Arrays;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+public class MirrorClientTest {
+
+ private static class FakeMirrorClient extends MirrorClient {
+
+ List topics;
+
+ FakeMirrorClient(List topics) {
+ super(null, new DefaultReplicationPolicy(), null);
+ this.topics = topics;
+ }
+
+ FakeMirrorClient() {
+ this(Collections.emptyList());
+ }
+
+ @Override
+ protected Set listTopics() {
+ return new HashSet<>(topics);
+ }
+ }
+
+ @Test
+ public void testIsHeartbeatTopic() throws InterruptedException, TimeoutException {
+ MirrorClient client = new FakeMirrorClient();
+ assertTrue(client.isHeartbeatTopic("heartbeats"));
+ assertTrue(client.isHeartbeatTopic("source1.heartbeats"));
+ assertTrue(client.isHeartbeatTopic("source2.source1.heartbeats"));
+ assertFalse(client.isHeartbeatTopic("heartbeats!"));
+ assertFalse(client.isHeartbeatTopic("!heartbeats"));
+ assertFalse(client.isHeartbeatTopic("source1heartbeats"));
+ assertFalse(client.isHeartbeatTopic("source1-heartbeats"));
+ }
+
+ @Test
+ public void testIsCheckpointTopic() throws InterruptedException, TimeoutException {
+ MirrorClient client = new FakeMirrorClient();
+ assertTrue(client.isCheckpointTopic("source1.checkpoints.internal"));
+ assertFalse(client.isCheckpointTopic("checkpoints.internal"));
+ assertFalse(client.isCheckpointTopic("checkpoints-internal"));
+ assertFalse(client.isCheckpointTopic("checkpoints.internal!"));
+ assertFalse(client.isCheckpointTopic("!checkpoints.internal"));
+ assertFalse(client.isCheckpointTopic("source1checkpointsinternal"));
+ }
+
+ @Test
+ public void countHopsForTopicTest() throws InterruptedException, TimeoutException {
+ MirrorClient client = new FakeMirrorClient();
+ assertEquals(-1, client.countHopsForTopic("topic", "source"));
+ assertEquals(-1, client.countHopsForTopic("source", "source"));
+ assertEquals(-1, client.countHopsForTopic("sourcetopic", "source"));
+ assertEquals(-1, client.countHopsForTopic("source1.topic", "source2"));
+ assertEquals(1, client.countHopsForTopic("source1.topic", "source1"));
+ assertEquals(1, client.countHopsForTopic("source2.source1.topic", "source2"));
+ assertEquals(2, client.countHopsForTopic("source2.source1.topic", "source1"));
+ assertEquals(3, client.countHopsForTopic("source3.source2.source1.topic", "source1"));
+ assertEquals(-1, client.countHopsForTopic("source3.source2.source1.topic", "source4"));
+ }
+
+ @Test
+ public void heartbeatTopicsTest() throws InterruptedException, TimeoutException {
+ MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "heartbeats",
+ "source1.heartbeats", "source2.source1.heartbeats", "source3.heartbeats"));
+ Set heartbeatTopics = client.heartbeatTopics();
+ assertEquals(heartbeatTopics, new HashSet<>(Arrays.asList("heartbeats", "source1.heartbeats",
+ "source2.source1.heartbeats", "source3.heartbeats")));
+ }
+
+ @Test
+ public void checkpointsTopicsTest() throws InterruptedException, TimeoutException {
+ MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "checkpoints.internal",
+ "source1.checkpoints.internal", "source2.source1.checkpoints.internal", "source3.checkpoints.internal"));
+ Set checkpointTopics = client.checkpointTopics();
+ assertEquals(new HashSet<>(Arrays.asList("source1.checkpoints.internal",
+ "source2.source1.checkpoints.internal", "source3.checkpoints.internal")), checkpointTopics);
+ }
+
+ @Test
+ public void replicationHopsTest() throws InterruptedException, TimeoutException {
+ MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "heartbeats",
+ "source1.heartbeats", "source1.source2.heartbeats", "source3.heartbeats"));
+ assertEquals(1, client.replicationHops("source1"));
+ assertEquals(2, client.replicationHops("source2"));
+ assertEquals(1, client.replicationHops("source3"));
+ assertEquals(-1, client.replicationHops("source4"));
+ }
+
+ @Test
+ public void upstreamClustersTest() throws InterruptedException {
+ MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "heartbeats",
+ "source1.heartbeats", "source1.source2.heartbeats", "source3.source4.source5.heartbeats"));
+ Set sources = client.upstreamClusters();
+ assertTrue(sources.contains("source1"));
+ assertTrue(sources.contains("source2"));
+ assertTrue(sources.contains("source3"));
+ assertTrue(sources.contains("source4"));
+ assertTrue(sources.contains("source5"));
+ assertFalse(sources.contains("sourceX"));
+ assertFalse(sources.contains(""));
+ assertFalse(sources.contains(null));
+ }
+
+ @Test
+ public void remoteTopicsTest() throws InterruptedException {
+ MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "topic3",
+ "source1.topic4", "source1.source2.topic5", "source3.source4.source5.topic6"));
+ Set remoteTopics = client.remoteTopics();
+ assertFalse(remoteTopics.contains("topic1"));
+ assertFalse(remoteTopics.contains("topic2"));
+ assertFalse(remoteTopics.contains("topic3"));
+ assertTrue(remoteTopics.contains("source1.topic4"));
+ assertTrue(remoteTopics.contains("source1.source2.topic5"));
+ assertTrue(remoteTopics.contains("source3.source4.source5.topic6"));
+ }
+
+ @Test
+ public void remoteTopicsSeparatorTest() throws InterruptedException {
+ MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "topic3",
+ "source1__topic4", "source1__source2__topic5", "source3__source4__source5__topic6"));
+ ((Configurable) client.replicationPolicy()).configure(
+ Collections.singletonMap("replication.policy.separator", "__"));
+ Set remoteTopics = client.remoteTopics();
+ assertFalse(remoteTopics.contains("topic1"));
+ assertFalse(remoteTopics.contains("topic2"));
+ assertFalse(remoteTopics.contains("topic3"));
+ assertTrue(remoteTopics.contains("source1__topic4"));
+ assertTrue(remoteTopics.contains("source1__source2__topic5"));
+ assertTrue(remoteTopics.contains("source3__source4__source5__topic6"));
+ }
+
+}
diff --git a/connect/mirror/README.md b/connect/mirror/README.md
new file mode 100644
index 00000000000..68e3536d94c
--- /dev/null
+++ b/connect/mirror/README.md
@@ -0,0 +1,222 @@
+
+# MirrorMaker 2.0
+
+MM2 leverages the Connect framework to replicate topics between Kafka
+clusters. MM2 includes several new features, including:
+
+ - both topics and consumer groups are replicated
+ - topic configuration and ACLs are replicated
+ - cross-cluster offsets are synchronized
+ - partitioning is preserved
+
+## Replication flows
+
+MM2 replicates topics and consumer groups from upstream source clusters
+to downstream target clusters. These directional flows are notated
+`A->B`.
+
+It's possible to create complex replication topologies based on these
+`source->target` flows, including:
+
+ - *fan-out*, e.g. `K->A, K->B, K->C`
+ - *aggregation*, e.g. `A->K, B->K, C->K`
+ - *active/active*, e.g. `A->B, B->A`
+
+Each replication flow can be configured independently, e.g. to replicate
+specific topics or groups:
+
+ A->B.topics = topic-1, topic-2
+ A->B.groups = group-1, group-2
+
+By default, all topics and consumer groups are replicated (except
+blacklisted ones), across all enabled replication flows. Each
+replication flow must be explicitly enabled to begin replication:
+
+ A->B.enabled = true
+ B->A.enabled = true
+
+## Starting an MM2 process
+
+You can run any number of MM2 processes as needed. Any MM2 processes
+which are configured to replicate the same Kafka clusters will find each
+other, share configuration, load balance, etc.
+
+To start an MM2 process, first specify Kafka cluster information in a
+configuration file as follows:
+
+ # mm2.properties
+ clusters = us-west, us-east
+ us-west.bootstrap.servers = host1:9092
+ us-east.bootstrap.servers = host2:9092
+
+You can list any number of clusters this way.
+
+Optionally, you can override default MirrorMaker properties:
+
+ topics = .*
+ groups = group1, group2
+ emit.checkpoints.interval.seconds = 10
+
+These will apply to all replication flows. You can also override default
+properties for specific clusters or replication flows:
+
+ # configure a specific cluster
+ us-west.offset.storage.topic = mm2-offsets
+
+ # configure a specific source->target replication flow
+ us-west->us-east.emit.heartbeats = false
+
+Next, enable individual replication flows as follows:
+
+ us-west->us-east.enabled = true # disabled by default
+
+Finally, launch one or more MirrorMaker processes with the `connect-mirror-maker.sh`
+script:
+
+ $ ./bin/connect-mirror-maker.sh mm2.properties
+
+## Multicluster environments
+
+MM2 supports replication between multiple Kafka clusters, whether in the
+same data center or across multiple data centers. A single MM2 cluster
+can span multiple data centers, but it is recommended to keep MM2's producers
+as close as possible to their target clusters. To do so, specify a subset
+of clusters for each MM2 node as follows:
+
+ # in west DC:
+ $ ./bin/connect-mirror-maker.sh mm2.properties --clusters west-1 west-2
+
+This signals to the node that the given clusters are nearby, and prevents the
+node from sending records or configuration to clusters in other data centers.
+
+### Example
+
+Say there are three data centers (west, east, north) with two Kafka
+clusters in each data center (west-1, west-2 etc). We can configure MM2
+for active/active replication within each data center, as well as cross data
+center replication (XDCR) as follows:
+
+ # mm2.properties
+ clusters: west-1, west-2, east-1, east-2, north-1, north-2
+
+ west-1.bootstrap.servers = ...
+ ---%<---
+
+ # active/active in west
+ west-1->west-2.enabled = true
+ west-2->west-1.enabled = true
+
+ # active/active in east
+ east-1->east-2.enabled = true
+ east-2->east-1.enabled = true
+
+ # active/active in north
+ north-1->north-2.enabled = true
+ north-2->north-1.enabled = true
+
+ # XDCR via west-1, east-1, north-1
+ west-1->east-1.enabled = true
+ west-1->north-1.enabled = true
+ east-1->west-1.enabled = true
+ east-1->north-1.enabled = true
+ north-1->west-1.enabled = true
+ north-1->east-1.enabled = true
+
+Then, launch MM2 in each data center as follows:
+
+ # in west:
+ $ ./bin/connect-mirror-maker.sh mm2.properties --clusters west-1 west-2
+
+ # in east:
+ $ ./bin/connect-mirror-maker.sh mm2.properties --clusters east-1 east-2
+
+ # in north:
+ $ ./bin/connect-mirror-maker.sh mm2.properties --clusters north-1 north-2
+
+With this configuration, records produced to any cluster will be replicated
+within the data center, as well as across to other data centers. By providing
+the `--clusters` parameter, we ensure that each node only produces records to
+nearby clusters.
+
+N.B. that the `--clusters` parameter is not technically required here. MM2 will work fine without it; however, throughput may suffer from "producer lag" between
+data centers, and you may incur unnecessary data transfer costs.
+
+## Shared configuration
+
+MM2 processes share configuration via their target Kafka clusters.
+For example, the following two processes would be racy:
+
+ # process1:
+ A->B.enabled = true
+ A->B.topics = foo
+
+ # process2:
+ A->B.enabled = true
+ A->B.topics = bar
+
+In this case, the two processes will share configuration via cluster `B`.
+Depending on which processes is elected "leader", the result will be
+that either `foo` or `bar` is replicated -- but not both. For this reason,
+it is important to keep configuration consistent across flows to the same
+target cluster. In most cases, your entire organization should use a single
+MM2 configuration file.
+
+## Remote topics
+
+MM2 employs a naming convention to ensure that records from different
+clusters are not written to the same partition. By default, replicated
+topics are renamed based on "source cluster aliases":
+
+ topic-1 --> source.topic-1
+
+This can be customized by overriding the `replication.policy.separator`
+property (default is a period). If you need more control over how
+remote topics are defined, you can implement a custom `ReplicationPolicy`
+and override `replication.policy.class` (default is
+`DefaultReplicationPolicy`).
+
+## Monitoring an MM2 process
+
+MM2 is built on the Connect framework and inherits all of Connect's metrics, e.g.
+`source-record-poll-rate`. In addition, MM2 produces its own metrics under the
+`kafka.connect.mirror` metric group. Metrics are tagged with the following properties:
+
+ - *target*: alias of target cluster
+ - *source*: alias of source cluster
+ - *topic*: remote topic on target cluster
+ - *partition*: partition being replicated
+
+Metrics are tracked for each *remote* topic. The source cluster can be inferred
+from the topic name. For example, replicating `topic1` from `A->B` will yield metrics
+like:
+
+ - `target=B`
+ - `topic=A.topic1`
+ - `partition=1`
+
+The following metrics are emitted:
+
+ # MBean: kafka.connect.mirror:type=MirrorSourceConnector,target=([-.w]+),topic=([-.w]+),partition=([0-9]+)
+
+ record-count # number of records replicated source -> target
+ record-age-ms # age of records when they are replicated
+ record-age-ms-min
+ record-age-ms-max
+ record-age-ms-avg
+ replication-latecny-ms # time it takes records to propagate source->target
+ replication-latency-ms-min
+ replication-latency-ms-max
+ replication-latency-ms-avg
+ byte-rate # average number of bytes/sec in replicated records
+
+
+ # MBean: kafka.connect.mirror:type=MirrorCheckpointConnector,source=([-.w]+),target=([-.w]+)
+
+ checkpoint-latency-ms # time it takes to replicate consumer offsets
+ checkpoint-latency-ms-min
+ checkpoint-latency-ms-max
+ checkpoint-latency-ms-avg
+
+These metrics do not discern between created-at and log-append timestamps.
+
+
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java
new file mode 100644
index 00000000000..ec6b3b91071
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import java.util.Map;
+
+/** Defines which topic configuration properties should be replicated. */
+@InterfaceStability.Evolving
+public interface ConfigPropertyFilter extends Configurable, AutoCloseable {
+
+ boolean shouldReplicateConfigProperty(String prop);
+
+ default void close() {
+ //nop
+ }
+
+ default void configure(Map props) {
+ //nop
+ }
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java
new file mode 100644
index 00000000000..f51db1cbd7d
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultConfigPropertyFilter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/** Uses a blacklist of property names or regexes. */
+public class DefaultConfigPropertyFilter implements ConfigPropertyFilter {
+
+ public static final String CONFIG_PROPERTIES_BLACKLIST_CONFIG = "config.properties.blacklist";
+ private static final String CONFIG_PROPERTIES_BLACKLIST_DOC = "List of topic configuration properties and/or regexes "
+ + "that should not be replicated.";
+ public static final String CONFIG_PROPERTIES_BLACKLIST_DEFAULT = "follower\\.replication\\.throttled\\.replicas, "
+ + "leader\\.replication\\.throttled\\.replicas, "
+ + "message\\.timestamp\\.difference\\.max\\.ms, "
+ + "message\\.timestamp\\.type, "
+ + "unclean\\.leader\\.election\\.enable, "
+ + "min\\.insync\\.replicas";
+ private Pattern blacklistPattern = MirrorUtils.compilePatternList(CONFIG_PROPERTIES_BLACKLIST_DEFAULT);
+
+ @Override
+ public void configure(Map props) {
+ ConfigPropertyFilterConfig config = new ConfigPropertyFilterConfig(props);
+ blacklistPattern = config.blacklistPattern();
+ }
+
+ @Override
+ public void close() {
+ }
+
+ private boolean blacklisted(String prop) {
+ return blacklistPattern != null && blacklistPattern.matcher(prop).matches();
+ }
+
+ @Override
+ public boolean shouldReplicateConfigProperty(String prop) {
+ return !blacklisted(prop);
+ }
+
+ static class ConfigPropertyFilterConfig extends AbstractConfig {
+
+ static final ConfigDef DEF = new ConfigDef()
+ .define(CONFIG_PROPERTIES_BLACKLIST_CONFIG,
+ Type.LIST,
+ CONFIG_PROPERTIES_BLACKLIST_DEFAULT,
+ Importance.HIGH,
+ CONFIG_PROPERTIES_BLACKLIST_DOC);
+
+ ConfigPropertyFilterConfig(Map, ?> props) {
+ super(DEF, props, false);
+ }
+
+ Pattern blacklistPattern() {
+ return MirrorUtils.compilePatternList(getList(CONFIG_PROPERTIES_BLACKLIST_CONFIG));
+ }
+ }
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultGroupFilter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultGroupFilter.java
new file mode 100644
index 00000000000..acf5115236d
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultGroupFilter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/** Uses a whitelist and blacklist. */
+public class DefaultGroupFilter implements GroupFilter {
+
+ public static final String GROUPS_WHITELIST_CONFIG = "groups";
+ private static final String GROUPS_WHITELIST_DOC = "List of consumer group names and/or regexes to replicate.";
+ public static final String GROUPS_WHITELIST_DEFAULT = ".*";
+
+ public static final String GROUPS_BLACKLIST_CONFIG = "groups.blacklist";
+ private static final String GROUPS_BLACKLIST_DOC = "List of consumer group names and/or regexes that should not be replicated.";
+ public static final String GROUPS_BLACKLIST_DEFAULT = "console-consumer-.*, connect-.*, __.*";
+
+ private Pattern whitelistPattern;
+ private Pattern blacklistPattern;
+
+ @Override
+ public void configure(Map props) {
+ GroupFilterConfig config = new GroupFilterConfig(props);
+ whitelistPattern = config.whitelistPattern();
+ blacklistPattern = config.blacklistPattern();
+ }
+
+ @Override
+ public void close() {
+ }
+
+ private boolean whitelisted(String group) {
+ return whitelistPattern != null && whitelistPattern.matcher(group).matches();
+ }
+
+ private boolean blacklisted(String group) {
+ return blacklistPattern != null && blacklistPattern.matcher(group).matches();
+ }
+
+ @Override
+ public boolean shouldReplicateGroup(String group) {
+ return whitelisted(group) && !blacklisted(group);
+ }
+
+ static class GroupFilterConfig extends AbstractConfig {
+
+ static final ConfigDef DEF = new ConfigDef()
+ .define(GROUPS_WHITELIST_CONFIG,
+ Type.LIST,
+ GROUPS_WHITELIST_DEFAULT,
+ Importance.HIGH,
+ GROUPS_WHITELIST_DOC)
+ .define(GROUPS_BLACKLIST_CONFIG,
+ Type.LIST,
+ GROUPS_BLACKLIST_DEFAULT,
+ Importance.HIGH,
+ GROUPS_BLACKLIST_DOC);
+
+ GroupFilterConfig(Map, ?> props) {
+ super(DEF, props, false);
+ }
+
+ Pattern whitelistPattern() {
+ return MirrorUtils.compilePatternList(getList(GROUPS_WHITELIST_CONFIG));
+ }
+
+ Pattern blacklistPattern() {
+ return MirrorUtils.compilePatternList(getList(GROUPS_BLACKLIST_CONFIG));
+ }
+ }
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java
new file mode 100644
index 00000000000..308bdbfd8a8
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/DefaultTopicFilter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/** Uses a whitelist and blacklist. */
+public class DefaultTopicFilter implements TopicFilter {
+
+ public static final String TOPICS_WHITELIST_CONFIG = "topics";
+ private static final String TOPICS_WHITELIST_DOC = "List of topics and/or regexes to replicate.";
+ public static final String TOPICS_WHITELIST_DEFAULT = ".*";
+
+ public static final String TOPICS_BLACKLIST_CONFIG = "topics.blacklist";
+ private static final String TOPICS_BLACKLIST_DOC = "List of topics and/or regexes that should not be replicated.";
+ public static final String TOPICS_BLACKLIST_DEFAULT = ".*[\\-\\.]internal, .*\\.replica, __.*";
+
+ private Pattern whitelistPattern;
+ private Pattern blacklistPattern;
+
+ @Override
+ public void configure(Map props) {
+ TopicFilterConfig config = new TopicFilterConfig(props);
+ whitelistPattern = config.whitelistPattern();
+ blacklistPattern = config.blacklistPattern();
+ }
+
+ @Override
+ public void close() {
+ }
+
+ private boolean whitelisted(String topic) {
+ return whitelistPattern != null && whitelistPattern.matcher(topic).matches();
+ }
+
+ private boolean blacklisted(String topic) {
+ return blacklistPattern != null && blacklistPattern.matcher(topic).matches();
+ }
+
+ @Override
+ public boolean shouldReplicateTopic(String topic) {
+ return whitelisted(topic) && !blacklisted(topic);
+ }
+
+ static class TopicFilterConfig extends AbstractConfig {
+
+ static final ConfigDef DEF = new ConfigDef()
+ .define(TOPICS_WHITELIST_CONFIG,
+ Type.LIST,
+ TOPICS_WHITELIST_DEFAULT,
+ Importance.HIGH,
+ TOPICS_WHITELIST_DOC)
+ .define(TOPICS_BLACKLIST_CONFIG,
+ Type.LIST,
+ TOPICS_BLACKLIST_DEFAULT,
+ Importance.HIGH,
+ TOPICS_BLACKLIST_DOC);
+
+ TopicFilterConfig(Map, ?> props) {
+ super(DEF, props, false);
+ }
+
+ Pattern whitelistPattern() {
+ return MirrorUtils.compilePatternList(getList(TOPICS_WHITELIST_CONFIG));
+ }
+
+ Pattern blacklistPattern() {
+ return MirrorUtils.compilePatternList(getList(TOPICS_BLACKLIST_CONFIG));
+ }
+ }
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/GroupFilter.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/GroupFilter.java
new file mode 100644
index 00000000000..0202dd5d2b3
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/GroupFilter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import java.util.Map;
+
+/** Defines which consumer groups should be replicated. */
+@InterfaceStability.Evolving
+public interface GroupFilter extends Configurable, AutoCloseable {
+
+ boolean shouldReplicateGroup(String group);
+
+ default void close() {
+ //nop
+ }
+
+ default void configure(Map props) {
+ //nop
+ }
+}
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
new file mode 100644
index 00000000000..a358584dff0
--- /dev/null
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ConsumerGroupListing;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.util.ConnectorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/** Replicate consumer group state between clusters. Emits checkpoint records.
+ *
+ * @see MirrorConnectorConfig for supported config properties.
+ */
+public class MirrorCheckpointConnector extends SourceConnector {
+
+ private static final Logger log = LoggerFactory.getLogger(MirrorCheckpointConnector.class);
+
+ private Scheduler scheduler;
+ private MirrorConnectorConfig config;
+ private GroupFilter groupFilter;
+ private AdminClient sourceAdminClient;
+ private SourceAndTarget sourceAndTarget;
+ private String connectorName;
+ private List knownConsumerGroups = Collections.emptyList();
+
+ @Override
+ public void start(Map props) {
+ config = new MirrorConnectorConfig(props);
+ if (!config.enabled()) {
+ return;
+ }
+ connectorName = config.connectorName();
+ sourceAndTarget = new SourceAndTarget(config.sourceClusterAlias(), config.targetClusterAlias());
+ groupFilter = config.groupFilter();
+ sourceAdminClient = AdminClient.create(config.sourceAdminConfig());
+ scheduler = new Scheduler(MirrorCheckpointConnector.class, config.adminTimeout());
+ scheduler.execute(this::createInternalTopics, "creating internal topics");
+ scheduler.execute(this::loadInitialConsumerGroups, "loading initial consumer groups");
+ scheduler.scheduleRepeatingDelayed(this::refreshConsumerGroups, config.refreshGroupsInterval(),
+ "refreshing consumer groups");
+ log.info("Started {} with {} consumer groups.", connectorName, knownConsumerGroups.size());
+ log.debug("Started {} with consumer groups: {}", connectorName, knownConsumerGroups);
+ }
+
+ @Override
+ public void stop() {
+ if (!config.enabled()) {
+ return;
+ }
+ Utils.closeQuietly(scheduler, "scheduler");
+ Utils.closeQuietly(groupFilter, "group filter");
+ Utils.closeQuietly(sourceAdminClient, "source admin client");
+ }
+
+ @Override
+ public Class extends Task> taskClass() {
+ return MirrorCheckpointTask.class;
+ }
+
+ // divide consumer groups among tasks
+ @Override
+ public List