KAFKA-12379: Allow configuring the location of the offset-syncs topic with MirrorMaker2 (#10221)

This commit implements KIP-716. It introduces a new setting `offset-syncs.topic.location` that allows specifying where the offset-syncs topic is created.

Reviewers: Tom Bentley <tbentley@redhat.com>, Edoardo Comar <ecomar@uk.ibm.com>
This commit is contained in:
Mickael Maison 2021-06-29 21:33:59 +01:00 committed by GitHub
parent d3ec9f940c
commit 3c4be0b57a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 184 additions and 5 deletions

View File

@ -18,6 +18,7 @@ package org.apache.kafka.connect.mirror;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.ValidString;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
@ -77,6 +78,7 @@ public class MirrorConnectorConfig extends AbstractConfig {
public static final String ENABLED = "enabled";
private static final String ENABLED_DOC = "Whether to replicate source->target.";
public static final String SOURCE_CLUSTER_ALIAS = "source.cluster.alias";
public static final String SOURCE_CLUSTER_ALIAS_DEFAULT = "source";
private static final String SOURCE_CLUSTER_ALIAS_DOC = "Alias of source cluster";
public static final String TARGET_CLUSTER_ALIAS = "target.cluster.alias";
public static final String TARGET_CLUSTER_ALIAS_DEFAULT = "target";
@ -202,6 +204,10 @@ public class MirrorConnectorConfig extends AbstractConfig {
private static final String OFFSET_LAG_MAX_DOC = "How out-of-sync a remote partition can be before it is resynced.";
public static final long OFFSET_LAG_MAX_DEFAULT = 100L;
private static final String OFFSET_SYNCS_TOPIC_LOCATION = "offset-syncs.topic.location";
private static final String OFFSET_SYNCS_TOPIC_LOCATION_DEFAULT = SOURCE_CLUSTER_ALIAS_DEFAULT;
private static final String OFFSET_SYNCS_TOPIC_LOCATION_DOC = "The location (source/target) of the offset-syncs topic.";
protected static final String SOURCE_CLUSTER_PREFIX = MirrorMakerConfig.SOURCE_CLUSTER_PREFIX;
protected static final String TARGET_CLUSTER_PREFIX = MirrorMakerConfig.TARGET_CLUSTER_PREFIX;
protected static final String SOURCE_PREFIX = MirrorMakerConfig.SOURCE_PREFIX;
@ -281,6 +287,26 @@ public class MirrorConnectorConfig extends AbstractConfig {
return props;
}
Map<String, Object> targetProducerConfig() {
Map<String, Object> props = new HashMap<>();
props.putAll(originalsWithPrefix(TARGET_CLUSTER_PREFIX));
props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
props.putAll(originalsWithPrefix(PRODUCER_CLIENT_PREFIX));
props.putAll(originalsWithPrefix(TARGET_PREFIX + PRODUCER_CLIENT_PREFIX));
return props;
}
Map<String, Object> targetConsumerConfig() {
Map<String, Object> props = new HashMap<>();
props.putAll(originalsWithPrefix(TARGET_CLUSTER_PREFIX));
props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names());
props.putAll(originalsWithPrefix(CONSUMER_CLIENT_PREFIX));
props.putAll(originalsWithPrefix(TARGET_PREFIX + CONSUMER_CLIENT_PREFIX));
props.put(ENABLE_AUTO_COMMIT_CONFIG, "false");
props.putIfAbsent(AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
Map<String, Object> sourceAdminConfig() {
Map<String, Object> props = new HashMap<>();
props.putAll(originalsWithPrefix(SOURCE_CLUSTER_PREFIX));
@ -314,8 +340,33 @@ public class MirrorConnectorConfig extends AbstractConfig {
}
String offsetSyncsTopic() {
String otherClusterAlias = SOURCE_CLUSTER_ALIAS_DEFAULT.equals(offsetSyncsTopicLocation())
? targetClusterAlias()
: sourceClusterAlias();
// ".internal" suffix ensures this doesn't get replicated
return "mm2-offset-syncs." + targetClusterAlias() + ".internal";
return "mm2-offset-syncs." + otherClusterAlias + ".internal";
}
String offsetSyncsTopicLocation() {
return getString(OFFSET_SYNCS_TOPIC_LOCATION);
}
Map<String, Object> offsetSyncsTopicAdminConfig() {
return SOURCE_CLUSTER_ALIAS_DEFAULT.equals(offsetSyncsTopicLocation())
? sourceAdminConfig()
: targetAdminConfig();
}
Map<String, Object> offsetSyncsTopicProducerConfig() {
return SOURCE_CLUSTER_ALIAS_DEFAULT.equals(offsetSyncsTopicLocation())
? sourceProducerConfig()
: targetProducerConfig();
}
Map<String, Object> offsetSyncsTopicConsumerConfig() {
return SOURCE_CLUSTER_ALIAS_DEFAULT.equals(offsetSyncsTopicLocation())
? sourceConsumerConfig()
: targetConsumerConfig();
}
String heartbeatsTopic() {
@ -654,6 +705,13 @@ public class MirrorConnectorConfig extends AbstractConfig {
OFFSET_LAG_MAX_DEFAULT,
ConfigDef.Importance.LOW,
OFFSET_LAG_MAX_DOC)
.define(
OFFSET_SYNCS_TOPIC_LOCATION,
ConfigDef.Type.STRING,
OFFSET_SYNCS_TOPIC_LOCATION_DEFAULT,
ValidString.in(SOURCE_CLUSTER_ALIAS_DEFAULT, TARGET_CLUSTER_ALIAS_DEFAULT),
ConfigDef.Importance.LOW,
OFFSET_SYNCS_TOPIC_LOCATION_DOC)
.define(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
ConfigDef.Type.LIST,

View File

@ -304,7 +304,7 @@ public class MirrorSourceConnector extends SourceConnector {
}
private void createOffsetSyncsTopic() {
MirrorUtils.createSinglePartitionCompactedTopic(config.offsetSyncsTopic(), config.offsetSyncsTopicReplicationFactor(), config.sourceAdminConfig());
MirrorUtils.createSinglePartitionCompactedTopic(config.offsetSyncsTopic(), config.offsetSyncsTopicReplicationFactor(), config.offsetSyncsTopicAdminConfig());
}
void computeAndCreateTopicPartitions() throws ExecutionException, InterruptedException {

View File

@ -91,7 +91,7 @@ public class MirrorSourceTask extends SourceTask {
partitionStates = new HashMap<>();
offsetSyncsTopic = config.offsetSyncsTopic();
consumer = MirrorUtils.newConsumer(config.sourceConsumerConfig());
offsetProducer = MirrorUtils.newProducer(config.sourceProducerConfig());
offsetProducer = MirrorUtils.newProducer(config.offsetSyncsTopicProducerConfig());
Set<TopicPartition> taskTopicPartitions = config.taskTopicPartitions();
Map<TopicPartition, Long> topicPartitionOffsets = loadOffsets(taskTopicPartitions);
consumer.assign(topicPartitionOffsets.keySet());

View File

@ -35,7 +35,7 @@ class OffsetSyncStore implements AutoCloseable {
private TopicPartition offsetSyncTopicPartition;
OffsetSyncStore(MirrorConnectorConfig config) {
consumer = new KafkaConsumer<>(config.sourceConsumerConfig(),
consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
new ByteArrayDeserializer(), new ByteArrayDeserializer());
offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0);
consumer.assign(Collections.singleton(offsetSyncTopicPartition));

View File

@ -18,6 +18,7 @@ package org.apache.kafka.connect.mirror;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
@ -30,6 +31,7 @@ import java.util.HashSet;
import static org.apache.kafka.connect.mirror.TestUtils.makeProps;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class MirrorConnectorConfigTest {
@ -262,4 +264,67 @@ public class MirrorConnectorConfigTest {
assertEquals(expectedAdminProps, connectorAdminProps, prefix + " source connector admin props not matching");
}
@Test
public void testOffsetSyncsTopic() {
// Invalid location
Map<String, String> connectorProps = makeProps("offset-syncs.topic.location", "something");
assertThrows(ConfigException.class, () -> new MirrorConnectorConfig(connectorProps));
connectorProps.put("offset-syncs.topic.location", "source");
MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps);
assertEquals("mm2-offset-syncs.target2.internal", config.offsetSyncsTopic());
connectorProps.put("offset-syncs.topic.location", "target");
config = new MirrorConnectorConfig(connectorProps);
assertEquals("mm2-offset-syncs.source1.internal", config.offsetSyncsTopic());
// Default to source
connectorProps.remove("offset-syncs.topic.location");
config = new MirrorConnectorConfig(connectorProps);
assertEquals("mm2-offset-syncs.target2.internal", config.offsetSyncsTopic());
}
@Test
public void testConsumerConfigsForOffsetSyncsTopic() {
Map<String, String> connectorProps = makeProps(
"source.consumer.max.partition.fetch.bytes", "1",
"target.consumer.heartbeat.interval.ms", "1",
"consumer.max.poll.interval.ms", "1",
"fetch.min.bytes", "1"
);
MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps);
assertEquals(config.sourceConsumerConfig(), config.offsetSyncsTopicConsumerConfig());
connectorProps.put("offset-syncs.topic.location", "target");
config = new MirrorConnectorConfig(connectorProps);
assertEquals(config.targetConsumerConfig(), config.offsetSyncsTopicConsumerConfig());
}
@Test
public void testProducerConfigsForOffsetSyncsTopic() {
Map<String, String> connectorProps = makeProps(
"source.producer.batch.size", "1",
"target.producer.acks", "1",
"producer.max.poll.interval.ms", "1",
"fetch.min.bytes", "1"
);
MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps);
assertEquals(config.sourceProducerConfig(), config.offsetSyncsTopicProducerConfig());
connectorProps.put("offset-syncs.topic.location", "target");
config = new MirrorConnectorConfig(connectorProps);
assertEquals(config.targetProducerConfig(), config.offsetSyncsTopicProducerConfig());
}
@Test
public void testAdminConfigsForOffsetSyncsTopic() {
Map<String, String> connectorProps = makeProps(
"source.admin.request.timeout.ms", "1",
"target.admin.send.buffer.bytes", "1",
"admin.reconnect.backoff.max.ms", "1",
"retries", "123"
);
MirrorConnectorConfig config = new MirrorConnectorConfig(connectorProps);
assertEquals(config.sourceAdminConfig(), config.offsetSyncsTopicAdminConfig());
connectorProps.put("offset-syncs.topic.location", "target");
config = new MirrorConnectorConfig(connectorProps);
assertEquals(config.targetAdminConfig(), config.offsetSyncsTopicAdminConfig());
}
}

View File

@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.config.ConfigResource;
@ -33,6 +34,7 @@ import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
import org.apache.kafka.connect.mirror.MirrorMakerConfig;
import org.apache.kafka.connect.mirror.MirrorSourceConnector;
import org.apache.kafka.connect.mirror.SourceAndTarget;
import org.apache.kafka.connect.mirror.Checkpoint;
import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
@ -57,6 +59,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import org.junit.jupiter.api.Test;
@ -243,6 +246,7 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
// make sure the topic is auto-created in the other cluster
waitForTopicCreated(primary, "backup.test-topic-1");
waitForTopicCreated(backup, "primary.test-topic-1");
waitForTopicCreated(primary, "mm2-offset-syncs.backup.internal");
assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, getTopicConfig(backup.kafka(), "primary.test-topic-1", TopicConfig.CLEANUP_POLICY_CONFIG),
"topic config was not synced");
@ -461,7 +465,45 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
assertEquals(0, records.count(), "consumer record size is not zero");
backupConsumer.close();
}
@Test
public void testOffsetSyncsTopicsOnTarget() throws Exception {
// move offset-syncs topics to target
mm2Props.put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS + ".offset-syncs.topic.location", "target");
// one way replication from primary to backup
mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false");
mm2Config = new MirrorMakerConfig(mm2Props);
waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
// Ensure the offset syncs topic is created in the target cluster
waitForTopicCreated(backup.kafka(), "mm2-offset-syncs." + PRIMARY_CLUSTER_ALIAS + ".internal");
produceMessages(primary, "test-topic-1");
// Check offsets are pushed to the checkpoint topic
Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
"auto.offset.reset", "earliest"), PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal");
waitForCondition(() -> {
ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(Duration.ofSeconds(1L));
for (ConsumerRecord<byte[], byte[]> record : records) {
Checkpoint checkpoint = Checkpoint.deserializeRecord(record);
if ((PRIMARY_CLUSTER_ALIAS + ".test-topic-1").equals(checkpoint.topicPartition().topic())) {
return true;
}
}
return false;
}, 30_000,
"Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS + "test-topic-1"
);
// Ensure no offset-syncs topics have been created on the primary cluster
Set<String> primaryTopics = primary.kafka().createAdminClient().listTopics().names().get();
assertFalse(primaryTopics.contains("mm2-offset-syncs." + PRIMARY_CLUSTER_ALIAS + ".internal"));
assertFalse(primaryTopics.contains("mm2-offset-syncs." + BACKUP_CLUSTER_ALIAS + ".internal"));
}
/*
* launch the connectors on kafka connect cluster and check if they are running
*/
@ -640,4 +682,18 @@ public abstract class MirrorConnectorsIntegrationBaseTest {
dummyConsumer.commitSync();
dummyConsumer.close();
}
/*
* wait for the topic created on the cluster
*/
private static void waitForTopicCreated(EmbeddedKafkaCluster cluster, String topicName) throws InterruptedException {
try (final Admin adminClient = cluster.createAdminClient()) {
waitForCondition(() -> {
Set<String> topics = adminClient.listTopics().names().get();
return topics.contains(topicName);
}, OFFSET_SYNC_DURATION_MS,
"Topic: " + topicName + " didn't get created in the cluster"
);
}
}
}