diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java index 2aa47b45de3..da1cb278665 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java @@ -18,6 +18,7 @@ package org.apache.kafka.connect.mirror; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.ValidString; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.KafkaMetricsContext; import org.apache.kafka.common.metrics.MetricsReporter; @@ -77,6 +78,7 @@ public class MirrorConnectorConfig extends AbstractConfig { public static final String ENABLED = "enabled"; private static final String ENABLED_DOC = "Whether to replicate source->target."; public static final String SOURCE_CLUSTER_ALIAS = "source.cluster.alias"; + public static final String SOURCE_CLUSTER_ALIAS_DEFAULT = "source"; private static final String SOURCE_CLUSTER_ALIAS_DOC = "Alias of source cluster"; public static final String TARGET_CLUSTER_ALIAS = "target.cluster.alias"; public static final String TARGET_CLUSTER_ALIAS_DEFAULT = "target"; @@ -202,6 +204,10 @@ public class MirrorConnectorConfig extends AbstractConfig { private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote partition can be before it is resynced."; public static final long OFFSET_LAG_MAX_DEFAULT = 100L; + private static final String OFFSET_SYNCS_TOPIC_LOCATION = "offset-syncs.topic.location"; + private static final String OFFSET_SYNCS_TOPIC_LOCATION_DEFAULT = SOURCE_CLUSTER_ALIAS_DEFAULT; + private static final String OFFSET_SYNCS_TOPIC_LOCATION_DOC = "The location (source/target) of the offset-syncs topic."; + protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX; protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX; protected static final String SOURCE_PREFIX = MirrorMakerConfig.SOURCE_PREFIX; @@ -281,6 +287,26 @@ public class MirrorConnectorConfig extends AbstractConfig { return props; } + Map targetProducerConfig() { + Map props = new HashMap<>(); + props.putAll(originalsWithPrefix(TARGET_CLUSTER_PREFIX)); + props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names()); + props.putAll(originalsWithPrefix(PRODUCER_CLIENT_PREFIX)); + props.putAll(originalsWithPrefix(TARGET_PREFIX + PRODUCER_CLIENT_PREFIX)); + return props; + } + + Map targetConsumerConfig() { + Map props = new HashMap<>(); + props.putAll(originalsWithPrefix(TARGET_CLUSTER_PREFIX)); + props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names()); + props.putAll(originalsWithPrefix(CONSUMER_CLIENT_PREFIX)); + props.putAll(originalsWithPrefix(TARGET_PREFIX + CONSUMER_CLIENT_PREFIX)); + props.put(ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.putIfAbsent(AUTO_OFFSET_RESET_CONFIG, "earliest"); + return props; + } + Map sourceAdminConfig() { Map props = new HashMap<>(); props.putAll(originalsWithPrefix(SOURCE_CLUSTER_PREFIX)); @@ -314,8 +340,33 @@ public class MirrorConnectorConfig extends AbstractConfig { } String offsetSyncsTopic() { + String otherClusterAlias = SOURCE_CLUSTER_ALIAS_DEFAULT.equals(offsetSyncsTopicLocation()) + ? targetClusterAlias() + : sourceClusterAlias(); // ".internal" suffix ensures this doesn't get replicated - return "mm2-offset-syncs." + targetClusterAlias() + ".internal"; + return "mm2-offset-syncs." + otherClusterAlias + ".internal"; + } + + String offsetSyncsTopicLocation() { + return getString(OFFSET_SYNCS_TOPIC_LOCATION); + } + + Map offsetSyncsTopicAdminConfig() { + return SOURCE_CLUSTER_ALIAS_DEFAULT.equals(offsetSyncsTopicLocation()) + ? sourceAdminConfig() + : targetAdminConfig(); + } + + Map offsetSyncsTopicProducerConfig() { + return SOURCE_CLUSTER_ALIAS_DEFAULT.equals(offsetSyncsTopicLocation()) + ? sourceProducerConfig() + : targetProducerConfig(); + } + + Map offsetSyncsTopicConsumerConfig() { + return SOURCE_CLUSTER_ALIAS_DEFAULT.equals(offsetSyncsTopicLocation()) + ? sourceConsumerConfig() + : targetConsumerConfig(); } String heartbeatsTopic() { @@ -654,6 +705,13 @@ public class MirrorConnectorConfig extends AbstractConfig { OFFSET_LAG_MAX_DEFAULT, ConfigDef.Importance.LOW, OFFSET_LAG_MAX_DOC) + .define( + OFFSET_SYNCS_TOPIC_LOCATION, + ConfigDef.Type.STRING, + OFFSET_SYNCS_TOPIC_LOCATION_DEFAULT, + ValidString.in(SOURCE_CLUSTER_ALIAS_DEFAULT, TARGET_CLUSTER_ALIAS_DEFAULT), + ConfigDef.Importance.LOW, + OFFSET_SYNCS_TOPIC_LOCATION_DOC) .define( CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, ConfigDef.Type.LIST, diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java index 91a40a638cb..e1db62a7b6c 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java @@ -304,7 +304,7 @@ public class MirrorSourceConnector extends SourceConnector { } private void createOffsetSyncsTopic() { - MirrorUtils.createSinglePartitionCompactedTopic(config.offsetSyncsTopic(), config.offsetSyncsTopicReplicationFactor(), config.sourceAdminConfig()); + MirrorUtils.createSinglePartitionCompactedTopic(config.offsetSyncsTopic(), config.offsetSyncsTopicReplicationFactor(), config.offsetSyncsTopicAdminConfig()); } void computeAndCreateTopicPartitions() throws ExecutionException, InterruptedException { diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java index e01aadf4785..fb5c844417e 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java @@ -91,7 +91,7 @@ public class MirrorSourceTask extends SourceTask { partitionStates = new HashMap<>(); offsetSyncsTopic = config.offsetSyncsTopic(); consumer = MirrorUtils.newConsumer(config.sourceConsumerConfig()); - offsetProducer = MirrorUtils.newProducer(config.sourceProducerConfig()); + offsetProducer = MirrorUtils.newProducer(config.offsetSyncsTopicProducerConfig()); Set taskTopicPartitions = config.taskTopicPartitions(); Map topicPartitionOffsets = loadOffsets(taskTopicPartitions); consumer.assign(topicPartitionOffsets.keySet()); diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java index fff1abd1cf0..600dda46f31 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java @@ -35,7 +35,7 @@ class OffsetSyncStore implements AutoCloseable { private TopicPartition offsetSyncTopicPartition; OffsetSyncStore(MirrorConnectorConfig config) { - consumer = new KafkaConsumer<>(config.sourceConsumerConfig(), + consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(), new ByteArrayDeserializer(), new ByteArrayDeserializer()); offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0); consumer.assign(Collections.singleton(offsetSyncTopicPartition)); diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java index 7abe30def6d..c7f629edd95 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.connect.mirror; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; import org.junit.jupiter.api.Test; import java.util.Arrays; @@ -30,6 +31,7 @@ import java.util.HashSet; import static org.apache.kafka.connect.mirror.TestUtils.makeProps; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertEquals; public class MirrorConnectorConfigTest { @@ -262,4 +264,67 @@ public class MirrorConnectorConfigTest { assertEquals(expectedAdminProps, connectorAdminProps, prefix + " source connector admin props not matching"); } + @Test + public void testOffsetSyncsTopic() { + // Invalid location + Map connectorProps = makeProps("offset-syncs.topic.location", "something"); + assertThrows(ConfigException.class, () -> new MirrorConnectorConfig(connectorProps)); + + connectorProps.put("offset-syncs.topic.location", "source"); + MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps); + assertEquals("mm2-offset-syncs.target2.internal", config.offsetSyncsTopic()); + connectorProps.put("offset-syncs.topic.location", "target"); + config = new MirrorConnectorConfig(connectorProps); + assertEquals("mm2-offset-syncs.source1.internal", config.offsetSyncsTopic()); + // Default to source + connectorProps.remove("offset-syncs.topic.location"); + config = new MirrorConnectorConfig(connectorProps); + assertEquals("mm2-offset-syncs.target2.internal", config.offsetSyncsTopic()); + } + + @Test + public void testConsumerConfigsForOffsetSyncsTopic() { + Map connectorProps = makeProps( + "source.consumer.max.partition.fetch.bytes", "1", + "target.consumer.heartbeat.interval.ms", "1", + "consumer.max.poll.interval.ms", "1", + "fetch.min.bytes", "1" + ); + MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps); + assertEquals(config.sourceConsumerConfig(), config.offsetSyncsTopicConsumerConfig()); + connectorProps.put("offset-syncs.topic.location", "target"); + config = new MirrorConnectorConfig(connectorProps); + assertEquals(config.targetConsumerConfig(), config.offsetSyncsTopicConsumerConfig()); + } + + @Test + public void testProducerConfigsForOffsetSyncsTopic() { + Map connectorProps = makeProps( + "source.producer.batch.size", "1", + "target.producer.acks", "1", + "producer.max.poll.interval.ms", "1", + "fetch.min.bytes", "1" + ); + MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps); + assertEquals(config.sourceProducerConfig(), config.offsetSyncsTopicProducerConfig()); + connectorProps.put("offset-syncs.topic.location", "target"); + config = new MirrorConnectorConfig(connectorProps); + assertEquals(config.targetProducerConfig(), config.offsetSyncsTopicProducerConfig()); + } + + @Test + public void testAdminConfigsForOffsetSyncsTopic() { + Map connectorProps = makeProps( + "source.admin.request.timeout.ms", "1", + "target.admin.send.buffer.bytes", "1", + "admin.reconnect.backoff.max.ms", "1", + "retries", "123" + ); + MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps); + assertEquals(config.sourceAdminConfig(), config.offsetSyncsTopicAdminConfig()); + connectorProps.put("offset-syncs.topic.location", "target"); + config = new MirrorConnectorConfig(connectorProps); + assertEquals(config.targetAdminConfig(), config.offsetSyncsTopicAdminConfig()); + } + } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java index 841eedc33c6..b82b26305e4 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.DescribeConfigsResult; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.config.ConfigResource; @@ -33,6 +34,7 @@ import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector; import org.apache.kafka.connect.mirror.MirrorMakerConfig; import org.apache.kafka.connect.mirror.MirrorSourceConnector; import org.apache.kafka.connect.mirror.SourceAndTarget; +import org.apache.kafka.connect.mirror.Checkpoint; import org.apache.kafka.connect.mirror.MirrorCheckpointConnector; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; @@ -57,6 +59,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertNotNull; import org.junit.jupiter.api.Test; @@ -243,6 +246,7 @@ public abstract class MirrorConnectorsIntegrationBaseTest { // make sure the topic is auto-created in the other cluster waitForTopicCreated(primary, "backup.test-topic-1"); waitForTopicCreated(backup, "primary.test-topic-1"); + waitForTopicCreated(primary, "mm2-offset-syncs.backup.internal"); assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, getTopicConfig(backup.kafka(), "primary.test-topic-1", TopicConfig.CLEANUP_POLICY_CONFIG), "topic config was not synced"); @@ -461,7 +465,45 @@ public abstract class MirrorConnectorsIntegrationBaseTest { assertEquals(0, records.count(), "consumer record size is not zero"); backupConsumer.close(); } - + + @Test + public void testOffsetSyncsTopicsOnTarget() throws Exception { + // move offset-syncs topics to target + mm2Props.put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS + ".offset-syncs.topic.location", "target"); + // one way replication from primary to backup + mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false"); + + mm2Config = new MirrorMakerConfig(mm2Props); + + waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS); + + // Ensure the offset syncs topic is created in the target cluster + waitForTopicCreated(backup.kafka(), "mm2-offset-syncs." + PRIMARY_CLUSTER_ALIAS + ".internal"); + + produceMessages(primary, "test-topic-1"); + + // Check offsets are pushed to the checkpoint topic + Consumer backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( + "auto.offset.reset", "earliest"), PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal"); + waitForCondition(() -> { + ConsumerRecords records = backupConsumer.poll(Duration.ofSeconds(1L)); + for (ConsumerRecord record : records) { + Checkpoint checkpoint = Checkpoint.deserializeRecord(record); + if ((PRIMARY_CLUSTER_ALIAS + ".test-topic-1").equals(checkpoint.topicPartition().topic())) { + return true; + } + } + return false; + }, 30_000, + "Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS + "test-topic-1" + ); + + // Ensure no offset-syncs topics have been created on the primary cluster + Set primaryTopics = primary.kafka().createAdminClient().listTopics().names().get(); + assertFalse(primaryTopics.contains("mm2-offset-syncs." + PRIMARY_CLUSTER_ALIAS + ".internal")); + assertFalse(primaryTopics.contains("mm2-offset-syncs." + BACKUP_CLUSTER_ALIAS + ".internal")); + } + /* * launch the connectors on kafka connect cluster and check if they are running */ @@ -640,4 +682,18 @@ public abstract class MirrorConnectorsIntegrationBaseTest { dummyConsumer.commitSync(); dummyConsumer.close(); } + + /* + * wait for the topic created on the cluster + */ + private static void waitForTopicCreated(EmbeddedKafkaCluster cluster, String topicName) throws InterruptedException { + try (final Admin adminClient = cluster.createAdminClient()) { + waitForCondition(() -> { + Set topics = adminClient.listTopics().names().get(); + return topics.contains(topicName); + }, OFFSET_SYNC_DURATION_MS, + "Topic: " + topicName + " didn't get created in the cluster" + ); + } + } }