KAFKA-9726: Add IdentityReplicationPolicy to MirrorMaker2 (#10652)

This new policy enables active/passive, one-way replication without renaming topics, similar to MM1. This implementation is described in KIP-382 (adopted), originally as "LegacyReplicationPolicy".

This enables operators to migrate from MM1 to MM2 without re-architecting their replication flows, and enables some additional use-cases for MM2. For example, operators may wish to "upgrade" their Kafka clusters by mirroring everything to a completely new cluster. Such a migration would have been difficult with either MM1 or MM2 previously.

When using IdentityReplicationPolicy, operators should be aware that MM2 will not be able to detect cycles among replicated topics. A misconfigured topology may result in replicating the same records back-and-forth or in an infinite loop. However, we don't prevent this behavior, as some use-cases involve filtering records (via SMTs) to prevent cycles.

Reviewers: Mickael Maison <mickael.maison@gmail.com>

Co-authored-by: Ryanne Dolan <rdolan@twitter.com>
Co-authored-by: Matthew de Detrich <mdedetrich@gmail.com>
Co-authored-by: Ivan Yurchenko <ivanyu@aiven.io>
This commit is contained in:
Ryanne Dolan 2021-07-01 03:21:27 -05:00 committed by GitHub
parent b4e45cd0d2
commit 93f57370c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 502 additions and 36 deletions

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.mirror;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** IdentityReplicationPolicy does not rename remote topics. This is useful for migrating
* from legacy MM1, or for any use-case involving one-way replication.
*
* N.B. MirrorMaker is not able to prevent cycles when using this class, so take care that
* your replication topology is acyclic. If migrating from MirrorMaker v1, this will likely
* already be the case.
*/
public class IdentityReplicationPolicy extends DefaultReplicationPolicy {
private static final Logger log = LoggerFactory.getLogger(IdentityReplicationPolicy.class);
public static final String SOURCE_CLUSTER_ALIAS_CONFIG = "source.cluster.alias";
private String sourceClusterAlias = null;
@Override
public void configure(Map<String, ?> props) {
super.configure(props);
if (props.containsKey(SOURCE_CLUSTER_ALIAS_CONFIG)) {
sourceClusterAlias = (String) props.get(SOURCE_CLUSTER_ALIAS_CONFIG);
log.info("Using source cluster alias `{}`.", sourceClusterAlias);
}
}
/** Unlike DefaultReplicationPolicy, IdentityReplicationPolicy does not include the source
* cluster alias in the remote topic name. Instead, topic names are unchanged.
*
* In the special case of heartbeats, we defer to DefaultReplicationPolicy.
*/
@Override
public String formatRemoteTopic(String sourceClusterAlias, String topic) {
if (looksLikeHeartbeat(topic)) {
return super.formatRemoteTopic(sourceClusterAlias, topic);
} else {
return topic;
}
}
/** Unlike DefaultReplicationPolicy, IdendityReplicationPolicy cannot know the source of
* a remote topic based on its name alone. If `source.cluster.alias` is provided,
* `topicSource` will return that.
*
* In the special case of heartbeats, we defer to DefaultReplicationPolicy.
*/
@Override
public String topicSource(String topic) {
if (looksLikeHeartbeat(topic)) {
return super.topicSource(topic);
} else {
return sourceClusterAlias;
}
}
/** Since any topic may be a "remote topic", this just returns `topic`.
*
* In the special case of heartbeats, we defer to DefaultReplicationPolicy.
*/
@Override
public String upstreamTopic(String topic) {
if (looksLikeHeartbeat(topic)) {
return super.upstreamTopic(topic);
} else {
return topic;
}
}
private boolean looksLikeHeartbeat(String topic) {
return topic != null && topic.endsWith(MirrorClientConfig.HEARTBEATS_TOPIC);
}
}

View File

@ -192,6 +192,7 @@ public class MirrorClient implements AutoCloseable {
int countHopsForTopic(String topic, String sourceClusterAlias) {
int hops = 0;
Set<String> visited = new HashSet<>();
while (true) {
hops++;
String source = replicationPolicy.topicSource(topic);
@ -201,6 +202,12 @@ public class MirrorClient implements AutoCloseable {
if (source.equals(sourceClusterAlias)) {
return hops;
}
if (visited.contains(source)) {
// Extra check for IdentityReplicationPolicy and similar impls that cannot prevent cycles.
// We assume we're stuck in a cycle and will never find sourceClusterAlias.
return -1;
}
visited.add(source);
topic = replicationPolicy.upstreamTopic(topic);
}
}
@ -223,7 +230,8 @@ public class MirrorClient implements AutoCloseable {
Set<String> allSources(String topic) {
Set<String> sources = new HashSet<>();
String source = replicationPolicy.topicSource(topic);
while (source != null) {
while (source != null && !sources.contains(source)) {
// The extra Set.contains above is for ReplicationPolicies that cannot prevent cycles.
sources.add(source);
topic = replicationPolicy.upstreamTopic(topic);
source = replicationPolicy.topicSource(topic);

View File

@ -45,7 +45,7 @@ public interface ReplicationPolicy {
*/
default String originalTopic(String topic) {
String upstream = upstreamTopic(topic);
if (upstream == null) {
if (upstream == null || upstream.equals(topic)) {
return topic;
} else {
return originalTopic(upstream);

View File

@ -37,7 +37,11 @@ public class MirrorClientTest {
List<String> topics;
FakeMirrorClient(List<String> topics) {
super(null, new DefaultReplicationPolicy(), null);
this(new DefaultReplicationPolicy(), topics);
}
FakeMirrorClient(ReplicationPolicy replicationPolicy, List<String> topics) {
super(null, replicationPolicy, null);
this.topics = topics;
}
@ -131,6 +135,23 @@ public class MirrorClientTest {
assertFalse(sources.contains(null));
}
@Test
public void testIdentityReplicationUpstreamClusters() throws InterruptedException {
// IdentityReplicationPolicy treats heartbeats as a special case, so these should work as usual.
MirrorClient client = new FakeMirrorClient(identityReplicationPolicy("source"), Arrays.asList("topic1",
"topic2", "heartbeats", "source1.heartbeats", "source1.source2.heartbeats",
"source3.source4.source5.heartbeats"));
Set<String> sources = client.upstreamClusters();
assertTrue(sources.contains("source1"));
assertTrue(sources.contains("source2"));
assertTrue(sources.contains("source3"));
assertTrue(sources.contains("source4"));
assertTrue(sources.contains("source5"));
assertFalse(sources.contains(""));
assertFalse(sources.contains(null));
assertEquals(5, sources.size());
}
@Test
public void remoteTopicsTest() throws InterruptedException {
MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "topic3",
@ -144,6 +165,20 @@ public class MirrorClientTest {
assertTrue(remoteTopics.contains("source3.source4.source5.topic6"));
}
@Test
public void testIdentityReplicationRemoteTopics() throws InterruptedException {
// IdentityReplicationPolicy should consider any topic to be remote.
MirrorClient client = new FakeMirrorClient(identityReplicationPolicy("source"), Arrays.asList(
"topic1", "topic2", "topic3", "heartbeats", "backup.heartbeats"));
Set<String> remoteTopics = client.remoteTopics();
assertTrue(remoteTopics.contains("topic1"));
assertTrue(remoteTopics.contains("topic2"));
assertTrue(remoteTopics.contains("topic3"));
// Heartbeats are treated as a special case
assertFalse(remoteTopics.contains("heartbeats"));
assertTrue(remoteTopics.contains("backup.heartbeats"));
}
@Test
public void remoteTopicsSeparatorTest() throws InterruptedException {
MirrorClient client = new FakeMirrorClient(Arrays.asList("topic1", "topic2", "topic3",
@ -159,4 +194,25 @@ public class MirrorClientTest {
assertTrue(remoteTopics.contains("source3__source4__source5__topic6"));
}
@Test
public void testIdentityReplicationTopicSource() {
MirrorClient client = new FakeMirrorClient(
identityReplicationPolicy("primary"), Arrays.asList());
assertEquals("topic1", client.replicationPolicy()
.formatRemoteTopic("primary", "topic1"));
assertEquals("primary", client.replicationPolicy()
.topicSource("topic1"));
// Heartbeats are handled as a special case
assertEquals("backup.heartbeats", client.replicationPolicy()
.formatRemoteTopic("backup", "heartbeats"));
assertEquals("backup", client.replicationPolicy()
.topicSource("backup.heartbeats"));
}
private ReplicationPolicy identityReplicationPolicy(String source) {
IdentityReplicationPolicy policy = new IdentityReplicationPolicy();
policy.configure(Collections.singletonMap(
IdentityReplicationPolicy.SOURCE_CLUSTER_ALIAS_CONFIG, source));
return policy;
}
}

View File

@ -175,6 +175,7 @@ public class MirrorMakerConfig extends AbstractConfig {
props.putAll(stringsWithPrefix("header.converter"));
props.putAll(stringsWithPrefix("task"));
props.putAll(stringsWithPrefix("worker"));
props.putAll(stringsWithPrefix("replication.policy"));
// transform any expression like ${provider:path:key}, since the worker doesn't do so
props = transform(props);
@ -203,6 +204,7 @@ public class MirrorMakerConfig extends AbstractConfig {
props.keySet().retainAll(MirrorConnectorConfig.CONNECTOR_CONFIG_DEF.names());
props.putAll(stringsWithPrefix(CONFIG_PROVIDERS_CONFIG));
props.putAll(stringsWithPrefix("replication.policy"));
Map<String, String> sourceClusterProps = clusterProps(sourceAndTarget.source());
// attrs non prefixed with producer|consumer|admin

View File

@ -492,7 +492,12 @@ public class MirrorSourceConnector extends SourceConnector {
} else if (source.equals(sourceAndTarget.target())) {
return true;
} else {
return isCycle(replicationPolicy.upstreamTopic(topic));
String upstreamTopic = replicationPolicy.upstreamTopic(topic);
if (upstreamTopic.equals(topic)) {
// Extra check for IdentityReplicationPolicy and similar impls that don't prevent cycles.
return false;
}
return isCycle(upstreamTopic);
}
}

View File

@ -77,10 +77,32 @@ public class MirrorSourceConnectorTest {
assertFalse(connector.shouldReplicateTopic("target.topic1"), "should not allow cycles");
assertFalse(connector.shouldReplicateTopic("target.source.topic1"), "should not allow cycles");
assertFalse(connector.shouldReplicateTopic("source.target.topic1"), "should not allow cycles");
assertFalse(connector.shouldReplicateTopic("target.source.target.topic1"), "should not allow cycles");
assertFalse(connector.shouldReplicateTopic("source.target.source.topic1"), "should not allow cycles");
assertTrue(connector.shouldReplicateTopic("topic1"), "should allow anything else");
assertTrue(connector.shouldReplicateTopic("source.topic1"), "should allow anything else");
}
@Test
public void testIdentityReplication() {
MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),
new IdentityReplicationPolicy(), x -> true, x -> true);
assertTrue(connector.shouldReplicateTopic("target.topic1"), "should allow cycles");
assertTrue(connector.shouldReplicateTopic("target.source.topic1"), "should allow cycles");
assertTrue(connector.shouldReplicateTopic("source.target.topic1"), "should allow cycles");
assertTrue(connector.shouldReplicateTopic("target.source.target.topic1"), "should allow cycles");
assertTrue(connector.shouldReplicateTopic("source.target.source.topic1"), "should allow cycles");
assertTrue(connector.shouldReplicateTopic("topic1"), "should allow normal topics");
assertTrue(connector.shouldReplicateTopic("othersource.topic1"), "should allow normal topics");
assertFalse(connector.shouldReplicateTopic("target.heartbeats"), "should not allow heartbeat cycles");
assertFalse(connector.shouldReplicateTopic("target.source.heartbeats"), "should not allow heartbeat cycles");
assertFalse(connector.shouldReplicateTopic("source.target.heartbeats"), "should not allow heartbeat cycles");
assertFalse(connector.shouldReplicateTopic("target.source.target.heartbeats"), "should not allow heartbeat cycles");
assertFalse(connector.shouldReplicateTopic("source.target.source.heartbeats"), "should not allow heartbeat cycles");
assertTrue(connector.shouldReplicateTopic("heartbeats"), "should allow heartbeat topics");
assertTrue(connector.shouldReplicateTopic("othersource.heartbeats"), "should allow heartbeat topics");
}
@Test
public void testAclFiltering() {
MirrorSourceConnector connector = new MirrorSourceConnector(new SourceAndTarget("source", "target"),

View File

@ -0,0 +1,263 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.mirror.integration;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.mirror.IdentityReplicationPolicy;
import org.apache.kafka.connect.mirror.MirrorClient;
import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
import org.apache.kafka.connect.mirror.MirrorMakerConfig;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Tag;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.BeforeEach;
/**
* Tests MM2 replication and failover logic for {@link IdentityReplicationPolicy}.
*
* <p>MM2 is configured with active/passive replication between two Kafka clusters with {@link IdentityReplicationPolicy}.
* Tests validate that records sent to the primary cluster arrive at the backup cluster. Then, a consumer group is
* migrated from the primary cluster to the backup cluster. Tests validate that consumer offsets
* are translated and replicated from the primary cluster to the backup cluster during this failover.
*/
@Tag("integration")
public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrationBaseTest {
@BeforeEach
public void startClusters() throws Exception {
super.startClusters(new HashMap<String, String>() {{
put("replication.policy.class", IdentityReplicationPolicy.class.getName());
put("topics", "test-topic-.*");
put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false");
put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS + ".enabled", "true");
}});
}
@Test
public void testReplication() throws Exception {
produceMessages(primary, "test-topic-1");
String consumerGroupName = "consumer-group-testReplication";
Map<String, Object> consumerProps = new HashMap<String, Object>() {{
put("group.id", consumerGroupName);
put("auto.offset.reset", "latest");
}};
// warm up consumers before starting the connectors so we don't need to wait for discovery
warmUpConsumer(consumerProps);
mm2Config = new MirrorMakerConfig(mm2Props);
waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
waitUntilMirrorMakerIsRunning(primary, Collections.singletonList(MirrorHeartbeatConnector.class), mm2Config, BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS);
MirrorClient primaryClient = new MirrorClient(mm2Config.clientConfig(PRIMARY_CLUSTER_ALIAS));
MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
// make sure the topic is auto-created in the other cluster
waitForTopicCreated(primary, "test-topic-1");
waitForTopicCreated(backup, "test-topic-1");
assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, getTopicConfig(backup.kafka(), "test-topic-1", TopicConfig.CLEANUP_POLICY_CONFIG),
"topic config was not synced");
assertEquals(NUM_RECORDS_PRODUCED, primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(),
"Records were not produced to primary cluster.");
assertEquals(NUM_RECORDS_PRODUCED, backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(),
"Records were not replicated to backup cluster.");
assertTrue(primary.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0,
"Heartbeats were not emitted to primary cluster.");
assertTrue(backup.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0,
"Heartbeats were not emitted to backup cluster.");
assertTrue(backup.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "primary.heartbeats").count() > 0,
"Heartbeats were not replicated downstream to backup cluster.");
assertTrue(primary.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0,
"Heartbeats were not replicated downstream to primary cluster.");
assertTrue(backupClient.upstreamClusters().contains(PRIMARY_CLUSTER_ALIAS), "Did not find upstream primary cluster.");
assertEquals(1, backupClient.replicationHops(PRIMARY_CLUSTER_ALIAS), "Did not calculate replication hops correctly.");
assertTrue(backup.kafka().consume(1, CHECKPOINT_DURATION_MS, "primary.checkpoints.internal").count() > 0,
"Checkpoints were not emitted downstream to backup cluster.");
Map<TopicPartition, OffsetAndMetadata> backupOffsets = backupClient.remoteConsumerOffsets(consumerGroupName, PRIMARY_CLUSTER_ALIAS,
Duration.ofMillis(CHECKPOINT_DURATION_MS));
assertTrue(backupOffsets.containsKey(
new TopicPartition("test-topic-1", 0)), "Offsets not translated downstream to backup cluster. Found: " + backupOffsets);
// Failover consumer group to backup cluster.
try (Consumer<byte[], byte[]> primaryConsumer = backup.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) {
primaryConsumer.assign(backupOffsets.keySet());
backupOffsets.forEach(primaryConsumer::seek);
primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
primaryConsumer.commitAsync();
assertTrue(primaryConsumer.position(new TopicPartition("test-topic-1", 0)) > 0, "Consumer failedover to zero offset.");
assertTrue(primaryConsumer.position(
new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PRODUCED, "Consumer failedover beyond expected offset.");
}
primaryClient.close();
backupClient.close();
// create more matching topics
primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
// make sure the topic is auto-created in the other cluster
waitForTopicCreated(backup, "test-topic-2");
// only produce messages to the first partition
produceMessages(primary, "test-topic-2", 1);
// expect total consumed messages equals to NUM_RECORDS_PER_PARTITION
assertEquals(NUM_RECORDS_PER_PARTITION, primary.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-2").count(),
"Records were not produced to primary cluster.");
assertEquals(NUM_RECORDS_PER_PARTITION, backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "test-topic-2").count(),
"New topic was not replicated to backup cluster.");
}
@Test
public void testReplicationWithEmptyPartition() throws Exception {
String consumerGroupName = "consumer-group-testReplicationWithEmptyPartition";
Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
// create topic
String topic = "test-topic-with-empty-partition";
primary.kafka().createTopic(topic, NUM_PARTITIONS);
// produce to all test-topic-empty's partitions, except the last partition
produceMessages(primary, topic, NUM_PARTITIONS - 1);
// consume before starting the connectors so we don't need to wait for discovery
int expectedRecords = NUM_RECORDS_PER_PARTITION * (NUM_PARTITIONS - 1);
try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, topic)) {
waitForConsumingAllRecords(primaryConsumer, expectedRecords);
}
// one way replication from primary to backup
mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false");
mm2Config = new MirrorMakerConfig(mm2Props);
waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
// sleep few seconds to have MM2 finish replication so that "end" consumer will consume some record
Thread.sleep(TimeUnit.SECONDS.toMillis(3));
// note that with IdentityReplicationPolicy, topics on the backup are NOT renamed to PRIMARY_CLUSTER_ALIAS + "." + topic
String backupTopic = topic;
// consume all records from backup cluster
try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps,
backupTopic)) {
waitForConsumingAllRecords(backupConsumer, expectedRecords);
}
try (Admin backupClient = backup.kafka().createAdminClient()) {
// retrieve the consumer group offset from backup cluster
Map<TopicPartition, OffsetAndMetadata> remoteOffsets =
backupClient.listConsumerGroupOffsets(consumerGroupName).partitionsToOffsetAndMetadata().get();
// pinpoint the offset of the last partition which does not receive records
OffsetAndMetadata offset = remoteOffsets.get(new TopicPartition(backupTopic, NUM_PARTITIONS - 1));
// offset of the last partition should exist, but its value should be 0
assertNotNull(offset, "Offset of last partition was not replicated");
assertEquals(0, offset.offset(), "Offset of last partition is not zero");
}
}
@Test
public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedException {
produceMessages(primary, "test-topic-1");
String consumerGroupName = "consumer-group-testOneWayReplicationWithAutoOffsetSync";
Map<String, Object> consumerProps = new HashMap<String, Object>() {{
put("group.id", consumerGroupName);
put("auto.offset.reset", "earliest");
}};
// create consumers before starting the connectors so we don't need to wait for discovery
try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps,
"test-topic-1")) {
// we need to wait for consuming all the records for MM2 replicating the expected offsets
waitForConsumingAllRecords(primaryConsumer, NUM_RECORDS_PRODUCED);
}
// enable automated consumer group offset sync
mm2Props.put("sync.group.offsets.enabled", "true");
mm2Props.put("sync.group.offsets.interval.seconds", "1");
// one way replication from primary to backup
mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false");
mm2Config = new MirrorMakerConfig(mm2Props);
waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
// make sure the topic is created in the other cluster
waitForTopicCreated(primary, "backup.test-topic-1");
waitForTopicCreated(backup, "test-topic-1");
// create a consumer at backup cluster with same consumer group Id to consume 1 topic
Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(
consumerProps, "test-topic-1");
waitForConsumerGroupOffsetSync(backup, backupConsumer, Collections.singletonList("test-topic-1"),
consumerGroupName, NUM_RECORDS_PRODUCED);
ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
// the size of consumer record should be zero, because the offsets of the same consumer group
// have been automatically synchronized from primary to backup by the background job, so no
// more records to consume from the replicated topic by the same consumer group at backup cluster
assertEquals(0, records.count(), "consumer record size is not zero");
// now create a new topic in primary cluster
primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
// make sure the topic is created in backup cluster
waitForTopicCreated(backup, "test-topic-2");
// produce some records to the new topic in primary cluster
produceMessages(primary, "test-topic-2");
// create a consumer at primary cluster to consume the new topic
try (Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
"group.id", "consumer-group-1"), "test-topic-2")) {
// we need to wait for consuming all the records for MM2 replicating the expected offsets
waitForConsumingAllRecords(consumer1, NUM_RECORDS_PRODUCED);
}
// create a consumer at backup cluster with same consumer group Id to consume old and new topic
backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
"group.id", consumerGroupName), "test-topic-1", "test-topic-2");
waitForConsumerGroupOffsetSync(backup, backupConsumer, Arrays.asList("test-topic-1", "test-topic-2"),
consumerGroupName, NUM_RECORDS_PRODUCED);
records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
// similar reasoning as above, no more records to consume by the same consumer group at backup cluster
assertEquals(0, records.count(), "consumer record size is not zero");
backupConsumer.close();
}
}

View File

@ -36,6 +36,7 @@ import org.apache.kafka.connect.mirror.MirrorSourceConnector;
import org.apache.kafka.connect.mirror.SourceAndTarget;
import org.apache.kafka.connect.mirror.Checkpoint;
import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
import org.apache.kafka.connect.mirror.ReplicationPolicy;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
@ -80,29 +81,29 @@ import static org.apache.kafka.connect.mirror.TestUtils.generateRecords;
public abstract class MirrorConnectorsIntegrationBaseTest {
private static final Logger log = LoggerFactory.getLogger(MirrorConnectorsIntegrationBaseTest.class);
private static final int NUM_RECORDS_PER_PARTITION = 10;
private static final int NUM_PARTITIONS = 10;
private static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * NUM_RECORDS_PER_PARTITION;
private static final int RECORD_TRANSFER_DURATION_MS = 30_000;
private static final int CHECKPOINT_DURATION_MS = 20_000;
protected static final int NUM_RECORDS_PER_PARTITION = 10;
protected static final int NUM_PARTITIONS = 10;
protected static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * NUM_RECORDS_PER_PARTITION;
protected static final int RECORD_TRANSFER_DURATION_MS = 30_000;
protected static final int CHECKPOINT_DURATION_MS = 20_000;
private static final int RECORD_CONSUME_DURATION_MS = 20_000;
private static final int OFFSET_SYNC_DURATION_MS = 30_000;
private static final int TOPIC_SYNC_DURATION_MS = 60_000;
private static final int REQUEST_TIMEOUT_DURATION_MS = 60_000;
private static final int NUM_WORKERS = 3;
private static final Duration CONSUMER_POLL_TIMEOUT_MS = Duration.ofMillis(500);
protected static final Duration CONSUMER_POLL_TIMEOUT_MS = Duration.ofMillis(500);
protected static final String PRIMARY_CLUSTER_ALIAS = "primary";
protected static final String BACKUP_CLUSTER_ALIAS = "backup";
private static final List<Class<? extends Connector>> CONNECTOR_LIST =
protected static final List<Class<? extends Connector>> CONNECTOR_LIST =
Arrays.asList(MirrorSourceConnector.class, MirrorCheckpointConnector.class, MirrorHeartbeatConnector.class);
private volatile boolean shuttingDown;
protected Map<String, String> mm2Props = new HashMap<>();
private MirrorMakerConfig mm2Config;
private EmbeddedConnectCluster primary;
private EmbeddedConnectCluster backup;
protected MirrorMakerConfig mm2Config;
protected EmbeddedConnectCluster primary;
protected EmbeddedConnectCluster backup;
private Exit.Procedure exitProcedure;
protected Exit.Procedure exitProcedure;
private Exit.Procedure haltProcedure;
protected Properties primaryBrokerProps = new Properties();
@ -112,6 +113,14 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
@BeforeEach
public void startClusters() throws Exception {
startClusters(new HashMap<String, String>() {{
put("topics", "test-topic-.*, primary.test-topic-.*, backup.test-topic-.*");
put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS + ".enabled", "true");
put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "true");
}});
}
public void startClusters(Map<String, String> additionalMM2Config) throws Exception {
shuttingDown = false;
exitProcedure = (code, message) -> {
if (shuttingDown) {
@ -142,10 +151,11 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
primaryBrokerProps.put("auto.create.topics.enable", "false");
backupBrokerProps.put("auto.create.topics.enable", "false");
mm2Props.putAll(basicMM2Config());
mm2Config = new MirrorMakerConfig(mm2Props);
mm2Props.putAll(basicMM2Config());
mm2Props.putAll(additionalMM2Config);
mm2Config = new MirrorMakerConfig(mm2Props);
primaryWorkerProps = mm2Config.workerConfig(new SourceAndTarget(BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS));
backupWorkerProps.putAll(mm2Config.workerConfig(new SourceAndTarget(PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS)));
@ -174,7 +184,7 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
waitForTopicCreated(primary, "mm2-status.backup.internal");
waitForTopicCreated(primary, "mm2-offsets.backup.internal");
waitForTopicCreated(primary, "mm2-configs.backup.internal");
backup.start();
backup.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Workers of " + BACKUP_CLUSTER_ALIAS + "-connect-cluster did not start in time.");
@ -377,9 +387,11 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
// sleep few seconds to have MM2 finish replication so that "end" consumer will consume some record
Thread.sleep(TimeUnit.SECONDS.toMillis(3));
String backupTopic = PRIMARY_CLUSTER_ALIAS + "." + topic;
// consume all records from backup cluster
try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps,
PRIMARY_CLUSTER_ALIAS + "." + topic)) {
try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps,
backupTopic)) {
waitForConsumingAllRecords(backupConsumer, expectedRecords);
}
@ -389,7 +401,7 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
backupClient.listConsumerGroupOffsets(consumerGroupName).partitionsToOffsetAndMetadata().get();
// pinpoint the offset of the last partition which does not receive records
OffsetAndMetadata offset = remoteOffsets.get(new TopicPartition(PRIMARY_CLUSTER_ALIAS + "." + topic, NUM_PARTITIONS - 1));
OffsetAndMetadata offset = remoteOffsets.get(new TopicPartition(backupTopic, NUM_PARTITIONS - 1));
// offset of the last partition should exist, but its value should be 0
assertNotNull(offset, "Offset of last partition was not replicated");
assertEquals(0, offset.offset(), "Offset of last partition is not zero");
@ -482,6 +494,9 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
produceMessages(primary, "test-topic-1");
ReplicationPolicy replicationPolicy = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS)).replicationPolicy();
String remoteTopic = replicationPolicy.formatRemoteTopic(PRIMARY_CLUSTER_ALIAS, "test-topic-1");
// Check offsets are pushed to the checkpoint topic
Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
"auto.offset.reset", "earliest"), PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal");
@ -489,13 +504,13 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(Duration.ofSeconds(1L));
for (ConsumerRecord<byte[], byte[]> record : records) {
Checkpoint checkpoint = Checkpoint.deserializeRecord(record);
if ((PRIMARY_CLUSTER_ALIAS + ".test-topic-1").equals(checkpoint.topicPartition().topic())) {
if (remoteTopic.equals(checkpoint.topicPartition().topic())) {
return true;
}
}
return false;
}, 30_000,
"Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS + "test-topic-1"
"Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS + ".test-topic-1"
);
// Ensure no offset-syncs topics have been created on the primary cluster
@ -507,7 +522,7 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
/*
* launch the connectors on kafka connect cluster and check if they are running
*/
private static void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connectCluster,
protected static void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connectCluster,
List<Class<? extends Connector>> connectorClasses, MirrorMakerConfig mm2Config,
String primary, String backup) throws InterruptedException {
for (Class<? extends Connector> connector : connectorClasses) {
@ -527,7 +542,7 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
/*
* wait for the topic created on the cluster
*/
private static void waitForTopicCreated(EmbeddedConnectCluster cluster, String topicName) throws InterruptedException {
protected static void waitForTopicCreated(EmbeddedConnectCluster cluster, String topicName) throws InterruptedException {
try (final Admin adminClient = cluster.kafka().createAdminClient()) {
waitForCondition(() -> adminClient.listTopics().names().get().contains(topicName), TOPIC_SYNC_DURATION_MS,
"Topic: " + topicName + " didn't get created on cluster: " + cluster.getName()
@ -549,7 +564,7 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
/*
* retrieve the config value based on the input cluster, topic and config name
*/
private static String getTopicConfig(EmbeddedKafkaCluster cluster, String topic, String configName) throws Exception {
protected static String getTopicConfig(EmbeddedKafkaCluster cluster, String topic, String configName) throws Exception {
try (Admin client = cluster.createAdminClient()) {
Collection<ConfigResource> cr = Collections.singleton(
new ConfigResource(ConfigResource.Type.TOPIC, topic));
@ -584,7 +599,7 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
* given consumer group, topics and expected number of records, make sure the consumer group
* offsets are eventually synced to the expected offset numbers
*/
private static <T> void waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect,
protected static <T> void waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect,
Consumer<T, T> consumer, List<String> topics, String consumerGroupId, int numRecords)
throws InterruptedException {
try (Admin adminClient = connect.kafka().createAdminClient()) {
@ -614,7 +629,7 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
/*
* make sure the consumer to consume expected number of records
*/
private static <T> void waitForConsumingAllRecords(Consumer<T, T> consumer, int numExpectedRecords)
protected static <T> void waitForConsumingAllRecords(Consumer<T, T> consumer, int numExpectedRecords)
throws InterruptedException {
final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
waitForCondition(() -> {
@ -627,14 +642,11 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
/*
* MM2 config to use in integration tests
*/
private static Map<String, String> basicMM2Config() {
protected static Map<String, String> basicMM2Config() {
Map<String, String> mm2Props = new HashMap<>();
mm2Props.put("clusters", PRIMARY_CLUSTER_ALIAS + ", " + BACKUP_CLUSTER_ALIAS);
mm2Props.put("max.tasks", "10");
mm2Props.put("topics", "test-topic-.*, primary.test-topic-.*, backup.test-topic-.*");
mm2Props.put("groups", "consumer-group-.*");
mm2Props.put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS + ".enabled", "true");
mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "true");
mm2Props.put("sync.topic.acls.enabled", "false");
mm2Props.put("emit.checkpoints.interval.seconds", "1");
mm2Props.put("emit.heartbeats.interval.seconds", "1");
@ -672,7 +684,7 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
/*
* Generate some consumer activity on both clusters to ensure the checkpoint connector always starts promptly
*/
private void warmUpConsumer(Map<String, Object> consumerProps) throws InterruptedException {
protected void warmUpConsumer(Map<String, Object> consumerProps) throws InterruptedException {
Consumer<byte[], byte[]> dummyConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1");
dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
dummyConsumer.commitSync();

View File

@ -371,7 +371,7 @@ public class EmbeddedKafkaCluster {
+ brokers.length + ") for desired replication (" + replication + ")");
}
log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }",
log.info("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }",
topic, partitions, replication, topicConfig);
final NewTopic newTopic = new NewTopic(topic, partitions, (short) replication);
newTopic.configs(topicConfig);

View File

@ -83,7 +83,13 @@
understood by brokers or version 2.5 or higher, so you must upgrade your kafka cluster to get the stronger semantics. Otherwise, you can just pass
in <code>new ConsumerGroupMetadata(consumerGroupId)</code> to work with older brokers. See <a href="https://cwiki.apache.org/confluence/x/zJONCg">KIP-732</a> for more details.
</li>
<li> The Connect-based MirrorMaker (MM2) includes changes to support <code>IdentityReplicationPolicy</code>, enabling replication without renaming topics.
The existing <code>DefaultReplicationPolicy</code> is still used by default, but identity replication can be enabled via the
<code>replication.policy</code> configuration property. This is especially useful for users migrating from the older MirrorMaker (MM1), or for
use-cases with simple one-way replication topologies where topic renaming is undesirable. Note that <code>IdentityReplicationPolicy</code>, unlike
<code>DefaultReplicationPolicy</code>, cannot prevent replication cycles based on topic names, so take care to avoid cycles when constructing your
replication topology.
</li>
</ul>
<h5><a id="upgrade_280_notable" href="#upgrade_280_notable">Notable changes in 2.8.0</a></h5>