KAFKA-16254: Allow MM2 to fully disable offset sync feature (#15999)

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chris Egerton <fearthecellos@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Igor Soarez <soarez@apple.com>
This commit is contained in:
Omnia Ibrahim 2024-07-09 11:38:03 +01:00 committed by GitHub
parent 67e6859632
commit 0781151794
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 638 additions and 279 deletions

View File

@ -20,11 +20,11 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.ConfigUtils;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class MirrorCheckpointConfig extends MirrorConnectorConfig {
protected static final String REFRESH_GROUPS = "refresh.groups";
protected static final String EMIT_CHECKPOINTS = "emit.checkpoints";
protected static final String SYNC_GROUP_OFFSETS = "sync.group.offsets";
@ -73,11 +73,11 @@ public class MirrorCheckpointConfig extends MirrorConnectorConfig {
public static final String GROUP_FILTER_CLASS = "group.filter.class";
private static final String GROUP_FILTER_CLASS_DOC = "GroupFilter to use. Selects consumer groups to replicate.";
public static final Class<?> GROUP_FILTER_CLASS_DEFAULT = DefaultGroupFilter.class;
public static final String OFFSET_SYNCS_SOURCE_CONSUMER_ROLE = "offset-syncs-source-consumer";
public static final String OFFSET_SYNCS_TARGET_CONSUMER_ROLE = "offset-syncs-target-consumer";
public static final String OFFSET_SYNCS_SOURCE_CONSUMER_ROLE = OFFSET_SYNCS_CLIENT_ROLE_PREFIX + "source-consumer";
public static final String OFFSET_SYNCS_TARGET_CONSUMER_ROLE = OFFSET_SYNCS_CLIENT_ROLE_PREFIX + "target-consumer";
public static final String OFFSET_SYNCS_SOURCE_ADMIN_ROLE = OFFSET_SYNCS_CLIENT_ROLE_PREFIX + "source-admin";
public static final String OFFSET_SYNCS_TARGET_ADMIN_ROLE = OFFSET_SYNCS_CLIENT_ROLE_PREFIX + "target-admin";
public static final String CHECKPOINTS_TARGET_CONSUMER_ROLE = "checkpoints-target-consumer";
public static final String OFFSET_SYNCS_SOURCE_ADMIN_ROLE = "offset-syncs-source-admin";
public static final String OFFSET_SYNCS_TARGET_ADMIN_ROLE = "offset-syncs-target-admin";
public MirrorCheckpointConfig(Map<String, String> props) {
super(CONNECTOR_CONFIG_DEF, ConfigUtils.translateDeprecatedConfigs(props, new String[][]{
@ -166,6 +166,26 @@ public class MirrorCheckpointConfig extends MirrorConnectorConfig {
return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
}
public static Map<String, String> validate(Map<String, String> configs) {
Map<String, String> invalidConfigs = new HashMap<>();
// No point to validate when connector is disabled.
if ("false".equals(configs.getOrDefault(ENABLED, "true"))) {
return invalidConfigs;
}
if ("false".equals(configs.get(EMIT_CHECKPOINTS_ENABLED))) {
invalidConfigs.putIfAbsent(EMIT_CHECKPOINTS_ENABLED, "MirrorCheckpointConnector can't run with " +
EMIT_CHECKPOINTS_ENABLED + " set to false");
}
if ("false".equals(configs.get(EMIT_OFFSET_SYNCS_ENABLED))) {
invalidConfigs.putIfAbsent(EMIT_OFFSET_SYNCS_ENABLED, "MirrorCheckpointConnector can't run without offset syncs");
}
return invalidConfigs;
}
private static ConfigDef defineCheckpointConfig(ConfigDef baseConfig) {
return baseConfig
.define(

View File

@ -20,7 +20,9 @@ import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Task;
@ -106,6 +108,23 @@ public class MirrorCheckpointConnector extends SourceConnector {
Utils.closeQuietly(targetAdminClient, "target admin client");
}
@Override
public Config validate(Map<String, String> connectorConfigs) {
List<ConfigValue> configValues = super.validate(connectorConfigs).configValues();
MirrorCheckpointConfig.validate(connectorConfigs).forEach((config, errorMsg) -> {
ConfigValue configValue = configValues.stream()
.filter(conf -> conf.name().equals(config))
.findAny()
.orElseGet(() -> {
ConfigValue result = new ConfigValue(config);
configValues.add(result);
return result;
});
configValue.addErrorMessage(errorMsg);
});
return new Config(configValues);
}
@Override
public Class<? extends Task> taskClass() {
return MirrorCheckpointTask.class;

View File

@ -110,9 +110,19 @@ public abstract class MirrorConnectorConfig extends AbstractConfig {
public static final String TOPIC_FILTER_CLASS_DOC = "TopicFilter to use. Selects topics to replicate.";
public static final Class<?> TOPIC_FILTER_CLASS_DEFAULT = DefaultTopicFilter.class;
public static final String OFFSET_SYNCS_TOPIC_LOCATION = "offset-syncs.topic.location";
public static final String OFFSET_SYNCS_TOPIC_CONFIG_PREFIX = "offset-syncs.topic.";
public static final String OFFSET_SYNCS_TOPIC_LOCATION = OFFSET_SYNCS_TOPIC_CONFIG_PREFIX + "location";
public static final String OFFSET_SYNCS_TOPIC_LOCATION_DEFAULT = SOURCE_CLUSTER_ALIAS_DEFAULT;
public static final String OFFSET_SYNCS_TOPIC_LOCATION_DOC = "The location (source/target) of the offset-syncs topic.";
public static final String EMIT_OFFSET_SYNCS_ENABLED = "emit.offset-syncs" + ENABLED_SUFFIX;
public static final String EMIT_OFFSET_SYNCS_ENABLED_DOC = "Whether to store the new offset of the replicated records in offset-syncs topic or not. " +
"MirrorCheckpointConnector will not be able to sync group offsets or emit checkpoints if emit.checkpoints.enabled and/or sync.group.offsets.enabled are enabled while " +
EMIT_OFFSET_SYNCS_ENABLED + " is disabled.";
public static final boolean EMIT_OFFSET_SYNCS_ENABLED_DEFAULT = true;
public static final String OFFSET_SYNCS_CLIENT_ROLE_PREFIX = "offset-syncs-";
public static final String TASK_INDEX = "task.index";
private final ReplicationPolicy replicationPolicy;
@ -316,8 +326,7 @@ public abstract class MirrorConnectorConfig extends AbstractConfig {
true,
ConfigDef.Importance.LOW,
CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_DOC
)
.withClientSslSupport()
).withClientSslSupport()
.withClientSaslSupport();
public static void main(String[] args) {

View File

@ -51,7 +51,7 @@ public class MirrorSourceConfig extends MirrorConnectorConfig {
private static final String CONFIG_PROPERTIES_EXCLUDE_DOC = "Topic config properties that should not be replicated. Supports "
+ "comma-separated property names and regexes.";
public static final String OFFSET_SYNCS_TOPIC_REPLICATION_FACTOR = "offset-syncs.topic.replication.factor";
public static final String OFFSET_SYNCS_TOPIC_REPLICATION_FACTOR = OFFSET_SYNCS_TOPIC_CONFIG_PREFIX + "replication.factor";
public static final String OFFSET_SYNCS_TOPIC_REPLICATION_FACTOR_DOC = "Replication factor for offset-syncs topic.";
public static final short OFFSET_SYNCS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3;
@ -109,10 +109,10 @@ public class MirrorSourceConfig extends MirrorConnectorConfig {
+ "Metrics have the target, topic and partition tags. When this setting is enabled, it adds the source tag. "
+ "This configuration will be removed in Kafka 4.0 and the default behavior will be to always have the source tag.";
public static final boolean ADD_SOURCE_ALIAS_TO_METRICS_DEFAULT = false;
public static final String OFFSET_SYNCS_SOURCE_PRODUCER_ROLE = "offset-syncs-source-producer";
public static final String OFFSET_SYNCS_TARGET_PRODUCER_ROLE = "offset-syncs-target-producer";
public static final String OFFSET_SYNCS_SOURCE_ADMIN_ROLE = "offset-syncs-source-admin";
public static final String OFFSET_SYNCS_TARGET_ADMIN_ROLE = "offset-syncs-target-admin";
public static final String OFFSET_SYNCS_SOURCE_PRODUCER_ROLE = OFFSET_SYNCS_CLIENT_ROLE_PREFIX + "source-producer";
public static final String OFFSET_SYNCS_TARGET_PRODUCER_ROLE = OFFSET_SYNCS_CLIENT_ROLE_PREFIX + "target-producer";
public static final String OFFSET_SYNCS_SOURCE_ADMIN_ROLE = OFFSET_SYNCS_CLIENT_ROLE_PREFIX + "source-admin";
public static final String OFFSET_SYNCS_TARGET_ADMIN_ROLE = OFFSET_SYNCS_CLIENT_ROLE_PREFIX + "target-admin";
public MirrorSourceConfig(Map<String, String> props) {
super(CONNECTOR_CONFIG_DEF, ConfigUtils.translateDeprecatedConfigs(props, new String[][]{
@ -220,6 +220,10 @@ public class MirrorSourceConfig extends MirrorConnectorConfig {
return getBoolean(ADD_SOURCE_ALIAS_TO_METRICS);
}
boolean emitOffsetSyncsEnabled() {
return getBoolean(EMIT_OFFSET_SYNCS_ENABLED);
}
private static ConfigDef defineSourceConfig(ConfigDef baseConfig) {
return baseConfig
.define(
@ -343,7 +347,14 @@ public class MirrorSourceConfig extends MirrorConnectorConfig {
ConfigDef.Type.BOOLEAN,
ADD_SOURCE_ALIAS_TO_METRICS_DEFAULT,
ConfigDef.Importance.LOW,
ADD_SOURCE_ALIAS_TO_METRICS_DOC);
ADD_SOURCE_ALIAS_TO_METRICS_DOC)
.define(
EMIT_OFFSET_SYNCS_ENABLED,
ConfigDef.Type.BOOLEAN,
EMIT_OFFSET_SYNCS_ENABLED_DEFAULT,
ConfigDef.Importance.LOW,
EMIT_OFFSET_SYNCS_ENABLED_DOC
);
}
protected static final ConfigDef CONNECTOR_CONFIG_DEF = defineSourceConfig(new ConfigDef(BASE_CONNECTOR_CONFIG_DEF));

View File

@ -72,6 +72,8 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.OFFSET_SYNCS_CLIENT_ROLE_PREFIX;
import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.OFFSET_SYNCS_TOPIC_CONFIG_PREFIX;
import static org.apache.kafka.connect.mirror.MirrorSourceConfig.SYNC_TOPIC_ACLS_ENABLED;
import static org.apache.kafka.connect.mirror.MirrorUtils.SOURCE_CLUSTER_KEY;
import static org.apache.kafka.connect.mirror.MirrorUtils.TOPIC_KEY;
@ -104,7 +106,6 @@ public class MirrorSourceConnector extends SourceConnector {
private int replicationFactor;
private Admin sourceAdminClient;
private Admin targetAdminClient;
private Admin offsetSyncsAdminClient;
private volatile boolean useIncrementalAlterConfigs;
public MirrorSourceConnector() {
@ -161,7 +162,7 @@ public class MirrorSourceConnector extends SourceConnector {
sourceAdminClient = config.forwardingAdmin(config.sourceAdminConfig("replication-source-admin"));
targetAdminClient = config.forwardingAdmin(config.targetAdminConfig("replication-target-admin"));
useIncrementalAlterConfigs = !config.useIncrementalAlterConfigs().equals(MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIGS);
offsetSyncsAdminClient = config.forwardingAdmin(config.offsetSyncsTopicAdminConfig());
scheduler = new Scheduler(getClass(), config.entityLabel(), config.adminTimeout());
scheduler.execute(this::createOffsetSyncsTopic, "creating upstream offset-syncs topic");
scheduler.execute(this::loadTopicPartitions, "loading initial set of topic-partitions");
@ -187,7 +188,6 @@ public class MirrorSourceConnector extends SourceConnector {
Utils.closeQuietly(configPropertyFilter, "config property filter");
Utils.closeQuietly(sourceAdminClient, "source admin client");
Utils.closeQuietly(targetAdminClient, "target admin client");
Utils.closeQuietly(offsetSyncsAdminClient, "offset syncs admin client");
log.info("Stopping {} took {} ms.", connectorName, System.currentTimeMillis() - start);
}
@ -234,6 +234,30 @@ public class MirrorSourceConnector extends SourceConnector {
@Override
public org.apache.kafka.common.config.Config validate(Map<String, String> props) {
List<ConfigValue> configValues = super.validate(props).configValues();
validateExactlyOnceConfigs(props, configValues);
validateEmitOffsetSyncConfigs(props, configValues);
return new org.apache.kafka.common.config.Config(configValues);
}
private static void validateEmitOffsetSyncConfigs(Map<String, String> props, List<ConfigValue> configValues) {
boolean offsetSyncsConfigured = props.keySet().stream()
.anyMatch(conf -> conf.startsWith(OFFSET_SYNCS_CLIENT_ROLE_PREFIX) || conf.startsWith(OFFSET_SYNCS_TOPIC_CONFIG_PREFIX));
if ("false".equals(props.get(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED)) && offsetSyncsConfigured) {
ConfigValue emitOffsetSyncs = configValues.stream().filter(prop -> MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED.equals(prop.name()))
.findAny()
.orElseGet(() -> {
ConfigValue result = new ConfigValue(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED);
configValues.add(result);
return result;
});
emitOffsetSyncs.addErrorMessage("MirrorSourceConnector can't setup offset-syncs feature while " +
MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED + " set to false");
}
}
private void validateExactlyOnceConfigs(Map<String, String> props, List<ConfigValue> configValues) {
if ("required".equals(props.get(EXACTLY_ONCE_SUPPORT_CONFIG))) {
if (!consumerUsesReadCommitted(props)) {
ConfigValue exactlyOnceSupport = configValues.stream()
@ -256,7 +280,6 @@ public class MirrorSourceConnector extends SourceConnector {
);
}
}
return new org.apache.kafka.common.config.Config(configValues);
}
@Override
@ -430,11 +453,15 @@ public class MirrorSourceConnector extends SourceConnector {
}
private void createOffsetSyncsTopic() {
MirrorUtils.createSinglePartitionCompactedTopic(
config.offsetSyncsTopic(),
config.offsetSyncsTopicReplicationFactor(),
offsetSyncsAdminClient
);
if (config.emitOffsetSyncsEnabled()) {
try (Admin offsetSyncsAdminClient = config.forwardingAdmin(config.offsetSyncsTopicAdminConfig())) {
MirrorUtils.createSinglePartitionCompactedTopic(
config.offsetSyncsTopic(),
config.offsetSyncsTopicReplicationFactor(),
offsetSyncsAdminClient
);
}
}
}
void computeAndCreateTopicPartitions() throws ExecutionException, InterruptedException {

View File

@ -19,8 +19,6 @@ package org.apache.kafka.connect.mirror;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
@ -38,9 +36,6 @@ import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -52,57 +47,41 @@ public class MirrorSourceTask extends SourceTask {
private static final Logger log = LoggerFactory.getLogger(MirrorSourceTask.class);
private static final int MAX_OUTSTANDING_OFFSET_SYNCS = 10;
private KafkaConsumer<byte[], byte[]> consumer;
private KafkaProducer<byte[], byte[]> offsetProducer;
private String sourceClusterAlias;
private String offsetSyncsTopic;
private Duration pollTimeout;
private long maxOffsetLag;
private Map<TopicPartition, PartitionState> partitionStates;
private ReplicationPolicy replicationPolicy;
private MirrorSourceMetrics metrics;
private boolean stopping = false;
private final Map<TopicPartition, OffsetSync> delayedOffsetSyncs = new LinkedHashMap<>();
private final Map<TopicPartition, OffsetSync> pendingOffsetSyncs = new LinkedHashMap<>();
private Semaphore outstandingOffsetSyncs;
private Semaphore consumerAccess;
private OffsetSyncWriter offsetSyncWriter;
public MirrorSourceTask() {}
// for testing
MirrorSourceTask(KafkaConsumer<byte[], byte[]> consumer, MirrorSourceMetrics metrics, String sourceClusterAlias,
ReplicationPolicy replicationPolicy, long maxOffsetLag, KafkaProducer<byte[], byte[]> producer,
Semaphore outstandingOffsetSyncs, Map<TopicPartition, PartitionState> partitionStates,
String offsetSyncsTopic) {
ReplicationPolicy replicationPolicy,
OffsetSyncWriter offsetSyncWriter) {
this.consumer = consumer;
this.metrics = metrics;
this.sourceClusterAlias = sourceClusterAlias;
this.replicationPolicy = replicationPolicy;
this.maxOffsetLag = maxOffsetLag;
consumerAccess = new Semaphore(1);
this.offsetProducer = producer;
this.outstandingOffsetSyncs = outstandingOffsetSyncs;
this.partitionStates = partitionStates;
this.offsetSyncsTopic = offsetSyncsTopic;
this.offsetSyncWriter = offsetSyncWriter;
}
@Override
public void start(Map<String, String> props) {
MirrorSourceTaskConfig config = new MirrorSourceTaskConfig(props);
pendingOffsetSyncs.clear();
outstandingOffsetSyncs = new Semaphore(MAX_OUTSTANDING_OFFSET_SYNCS);
consumerAccess = new Semaphore(1); // let one thread at a time access the consumer
sourceClusterAlias = config.sourceClusterAlias();
metrics = config.metrics();
pollTimeout = config.consumerPollTimeout();
maxOffsetLag = config.maxOffsetLag();
replicationPolicy = config.replicationPolicy();
partitionStates = new HashMap<>();
offsetSyncsTopic = config.offsetSyncsTopic();
if (config.emitOffsetSyncsEnabled()) {
offsetSyncWriter = new OffsetSyncWriter(config);
}
consumer = MirrorUtils.newConsumer(config.sourceConsumerConfig("replication-consumer"));
offsetProducer = MirrorUtils.newProducer(config.offsetSyncsTopicProducerConfig());
Set<TopicPartition> taskTopicPartitions = config.taskTopicPartitions();
initializeConsumer(taskTopicPartitions);
@ -112,12 +91,15 @@ public class MirrorSourceTask extends SourceTask {
@Override
public void commit() {
// Offset syncs which were not emitted immediately due to their offset spacing should be sent periodically
// This ensures that low-volume topics aren't left with persistent lag at the end of the topic
promoteDelayedOffsetSyncs();
// Publish any offset syncs that we've queued up, but have not yet been able to publish
// (likely because we previously reached our limit for number of outstanding syncs)
firePendingOffsetSyncs();
// Handle delayed and pending offset syncs only when offsetSyncWriter is available
if (offsetSyncWriter != null) {
// Offset syncs which were not emitted immediately due to their offset spacing should be sent periodically
// This ensures that low-volume topics aren't left with persistent lag at the end of the topic
offsetSyncWriter.promoteDelayedOffsetSyncs();
// Publish any offset syncs that we've queued up, but have not yet been able to publish
// (likely because we previously reached our limit for number of outstanding syncs)
offsetSyncWriter.firePendingOffsetSyncs();
}
}
@Override
@ -131,7 +113,7 @@ public class MirrorSourceTask extends SourceTask {
log.warn("Interrupted waiting for access to consumer. Will try closing anyway.");
}
Utils.closeQuietly(consumer, "source consumer");
Utils.closeQuietly(offsetProducer, "offset producer");
Utils.closeQuietly(offsetSyncWriter, "offset sync writer");
Utils.closeQuietly(metrics, "metrics");
log.info("Stopping {} took {} ms.", Thread.currentThread().getName(), System.currentTimeMillis() - start);
}
@ -197,79 +179,16 @@ public class MirrorSourceTask extends SourceTask {
long latency = System.currentTimeMillis() - record.timestamp();
metrics.countRecord(topicPartition);
metrics.replicationLatency(topicPartition, latency);
TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition());
long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
long downstreamOffset = metadata.offset();
maybeQueueOffsetSyncs(sourceTopicPartition, upstreamOffset, downstreamOffset);
// We may be able to immediately publish an offset sync that we've queued up here
firePendingOffsetSyncs();
}
// updates partition state and queues up OffsetSync if necessary
private void maybeQueueOffsetSyncs(TopicPartition topicPartition, long upstreamOffset,
long downstreamOffset) {
PartitionState partitionState =
partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag));
OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset);
if (partitionState.update(upstreamOffset, downstreamOffset)) {
// Queue this sync for an immediate send, as downstream state is sufficiently stale
synchronized (this) {
delayedOffsetSyncs.remove(topicPartition);
pendingOffsetSyncs.put(topicPartition, offsetSync);
}
partitionState.reset();
} else {
// Queue this sync to be delayed until the next periodic offset commit
synchronized (this) {
delayedOffsetSyncs.put(topicPartition, offsetSync);
}
// Queue offset syncs only when offsetWriter is available
if (offsetSyncWriter != null) {
TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition());
long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
long downstreamOffset = metadata.offset();
offsetSyncWriter.maybeQueueOffsetSyncs(sourceTopicPartition, upstreamOffset, downstreamOffset);
// We may be able to immediately publish an offset sync that we've queued up here
offsetSyncWriter.firePendingOffsetSyncs();
}
}
private synchronized void promoteDelayedOffsetSyncs() {
pendingOffsetSyncs.putAll(delayedOffsetSyncs);
delayedOffsetSyncs.clear();
}
private void firePendingOffsetSyncs() {
while (true) {
OffsetSync pendingOffsetSync;
synchronized (this) {
Iterator<OffsetSync> syncIterator = pendingOffsetSyncs.values().iterator();
if (!syncIterator.hasNext()) {
// Nothing to sync
log.trace("No more pending offset syncs");
return;
}
pendingOffsetSync = syncIterator.next();
if (!outstandingOffsetSyncs.tryAcquire()) {
// Too many outstanding syncs
log.trace("Too many in-flight offset syncs; will try to send remaining offset syncs later");
return;
}
syncIterator.remove();
}
// Publish offset sync outside of synchronized block; we may have to
// wait for producer metadata to update before Producer::send returns
sendOffsetSync(pendingOffsetSync);
log.trace("Dispatched offset sync for {}", pendingOffsetSync.topicPartition());
}
}
// sends OffsetSync record to internal offsets topic
private void sendOffsetSync(OffsetSync offsetSync) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(offsetSyncsTopic, 0,
offsetSync.recordKey(), offsetSync.recordValue());
offsetProducer.send(record, (x, e) -> {
if (e != null) {
log.error("Failure sending offset sync.", e);
} else {
log.trace("Sync'd offsets for {}: {}=={}", offsetSync.topicPartition(),
offsetSync.upstreamOffset(), offsetSync.downstreamOffset());
}
outstandingOffsetSyncs.release();
});
}
private Map<TopicPartition, Long> loadOffsets(Set<TopicPartition> topicPartitions) {
return topicPartitions.stream().collect(Collectors.toMap(x -> x, this::loadOffset));
@ -336,38 +255,4 @@ public class MirrorSourceTask extends SourceTask {
private boolean isUncommitted(Long offset) {
return offset == null || offset < 0;
}
static class PartitionState {
long previousUpstreamOffset = -1L;
long previousDownstreamOffset = -1L;
long lastSyncDownstreamOffset = -1L;
long maxOffsetLag;
boolean shouldSyncOffsets;
PartitionState(long maxOffsetLag) {
this.maxOffsetLag = maxOffsetLag;
}
// true if we should emit an offset sync
boolean update(long upstreamOffset, long downstreamOffset) {
// Emit an offset sync if any of the following conditions are true
boolean noPreviousSyncThisLifetime = lastSyncDownstreamOffset == -1L;
// the OffsetSync::translateDownstream method will translate this offset 1 past the last sync, so add 1.
// TODO: share common implementation to enforce this relationship
boolean translatedOffsetTooStale = downstreamOffset - (lastSyncDownstreamOffset + 1) >= maxOffsetLag;
boolean skippedUpstreamRecord = upstreamOffset - previousUpstreamOffset != 1L;
boolean truncatedDownstreamTopic = downstreamOffset < previousDownstreamOffset;
if (noPreviousSyncThisLifetime || translatedOffsetTooStale || skippedUpstreamRecord || truncatedDownstreamTopic) {
lastSyncDownstreamOffset = downstreamOffset;
shouldSyncOffsets = true;
}
previousUpstreamOffset = upstreamOffset;
previousDownstreamOffset = downstreamOffset;
return shouldSyncOffsets;
}
void reset() {
shouldSyncOffsets = false;
}
}
}

View File

@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.mirror;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Semaphore;
/**
* Used internally by MirrorMaker to write translated offsets into offset-syncs topic, with some buffering logic to limit the number of in-flight records.
*/
class OffsetSyncWriter implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(OffsetSyncWriter.class);
private static final int MAX_OUTSTANDING_OFFSET_SYNCS = 10;
private final Map<TopicPartition, OffsetSync> delayedOffsetSyncs = new LinkedHashMap<>();
private final Map<TopicPartition, OffsetSync> pendingOffsetSyncs = new LinkedHashMap<>();
private final Semaphore outstandingOffsetSyncs;
private final KafkaProducer<byte[], byte[]> offsetProducer;
private final String offsetSyncsTopic;
private final long maxOffsetLag;
private final Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
public OffsetSyncWriter(MirrorSourceTaskConfig config) {
outstandingOffsetSyncs = new Semaphore(MAX_OUTSTANDING_OFFSET_SYNCS);
offsetSyncsTopic = config.offsetSyncsTopic();
offsetProducer = MirrorUtils.newProducer(config.offsetSyncsTopicProducerConfig());
maxOffsetLag = config.maxOffsetLag();
}
// Visible for testing
public OffsetSyncWriter(KafkaProducer<byte[], byte[]> producer,
String offsetSyncsTopic,
Semaphore outstandingOffsetSyncs,
long maxOffsetLag) {
this.offsetProducer = producer;
this.offsetSyncsTopic = offsetSyncsTopic;
this.outstandingOffsetSyncs = outstandingOffsetSyncs;
this.maxOffsetLag = maxOffsetLag;
}
public void close() {
Utils.closeQuietly(offsetProducer, "offset producer");
}
public long maxOffsetLag() {
return maxOffsetLag;
}
public Map<TopicPartition, PartitionState> partitionStates() {
return this.partitionStates;
}
// sends OffsetSync record to internal offsets topic
private void sendOffsetSync(OffsetSync offsetSync) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(offsetSyncsTopic, 0,
offsetSync.recordKey(), offsetSync.recordValue());
offsetProducer.send(record, (x, e) -> {
if (e != null) {
LOG.error("Failure sending offset sync.", e);
} else {
LOG.trace("Sync'd offsets for {}: {}=={}", offsetSync.topicPartition(),
offsetSync.upstreamOffset(), offsetSync.downstreamOffset());
}
outstandingOffsetSyncs.release();
});
}
void firePendingOffsetSyncs() {
while (true) {
OffsetSync pendingOffsetSync;
synchronized (this) {
Iterator<OffsetSync> syncIterator = pendingOffsetSyncs.values().iterator();
if (!syncIterator.hasNext()) {
// Nothing to sync
LOG.trace("No more pending offset syncs");
return;
}
pendingOffsetSync = syncIterator.next();
if (!outstandingOffsetSyncs.tryAcquire()) {
// Too many outstanding syncs
LOG.trace("Too many in-flight offset syncs; will try to send remaining offset syncs later");
return;
}
syncIterator.remove();
}
// Publish offset sync outside of synchronized block; we may have to
// wait for producer metadata to update before Producer::send returns
sendOffsetSync(pendingOffsetSync);
LOG.trace("Dispatched offset sync for {}", pendingOffsetSync.topicPartition());
}
}
synchronized void promoteDelayedOffsetSyncs() {
pendingOffsetSyncs.putAll(delayedOffsetSyncs);
delayedOffsetSyncs.clear();
}
// updates partition state and queues up OffsetSync if necessary
void maybeQueueOffsetSyncs(TopicPartition topicPartition, long upstreamOffset, long downstreamOffset) {
PartitionState partitionState =
partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag));
OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset);
if (partitionState.update(upstreamOffset, downstreamOffset)) {
// Queue this sync for an immediate send, as downstream state is sufficiently stale
synchronized (this) {
delayedOffsetSyncs.remove(topicPartition);
pendingOffsetSyncs.put(topicPartition, offsetSync);
}
partitionState.reset();
} else {
// Queue this sync to be delayed until the next periodic offset commit
synchronized (this) {
delayedOffsetSyncs.put(topicPartition, offsetSync);
}
}
}
// visible for testing
protected Map<TopicPartition, OffsetSync> getDelayedOffsetSyncs() {
return delayedOffsetSyncs;
}
// visible for testing
protected Map<TopicPartition, OffsetSync> getPendingOffsetSyncs() {
return pendingOffsetSyncs;
}
static class PartitionState {
long previousUpstreamOffset = -1L;
long previousDownstreamOffset = -1L;
long lastSyncDownstreamOffset = -1L;
long maxOffsetLag;
boolean shouldSyncOffsets;
PartitionState(long maxOffsetLag) {
this.maxOffsetLag = maxOffsetLag;
}
// true if we should emit an offset sync
boolean update(long upstreamOffset, long downstreamOffset) {
// Emit an offset sync if any of the following conditions are true
boolean noPreviousSyncThisLifetime = lastSyncDownstreamOffset == -1L;
// the OffsetSync::translateDownstream method will translate this offset 1 past the last sync, so add 1.
// TODO: share common implementation to enforce this relationship
boolean translatedOffsetTooStale = downstreamOffset - (lastSyncDownstreamOffset + 1) >= maxOffsetLag;
boolean skippedUpstreamRecord = upstreamOffset - previousUpstreamOffset != 1L;
boolean truncatedDownstreamTopic = downstreamOffset < previousDownstreamOffset;
if (noPreviousSyncThisLifetime || translatedOffsetTooStale || skippedUpstreamRecord || truncatedDownstreamTopic) {
lastSyncDownstreamOffset = downstreamOffset;
shouldSyncOffsets = true;
}
previousUpstreamOffset = upstreamOffset;
previousDownstreamOffset = downstreamOffset;
return shouldSyncOffsets;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof PartitionState)) return false;
PartitionState that = (PartitionState) o;
return previousUpstreamOffset == that.previousUpstreamOffset &&
previousDownstreamOffset == that.previousDownstreamOffset &&
lastSyncDownstreamOffset == that.lastSyncDownstreamOffset &&
maxOffsetLag == that.maxOffsetLag &&
shouldSyncOffsets == that.shouldSyncOffsets;
}
@Override
public int hashCode() {
return Objects.hash(previousUpstreamOffset, previousDownstreamOffset, lastSyncDownstreamOffset, maxOffsetLag, shouldSyncOffsets);
}
void reset() {
shouldSyncOffsets = false;
}
}
}

View File

@ -17,10 +17,12 @@
package org.apache.kafka.connect.mirror;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -95,4 +97,36 @@ public class MirrorCheckpointConfigTest {
"source1->target2|ConnectorName|" + MirrorCheckpointConfig.OFFSET_SYNCS_TARGET_CONSUMER_ROLE,
offsetSyncsTopicTargetConsumerConfig.get("client.id"));
}
@Test
public void testSkipValidationIfConnectorDisabled() {
Map<String, String> configValues = MirrorCheckpointConfig.validate(makeProps(
MirrorConnectorConfig.ENABLED, "false",
MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED, "false",
MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED, "false"));
assertTrue(configValues.isEmpty());
configValues = MirrorCheckpointConfig.validate(makeProps(
MirrorConnectorConfig.ENABLED, "false",
MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED, "true",
MirrorCheckpointConfig.EMIT_OFFSET_SYNCS_ENABLED, "false"));
assertTrue(configValues.isEmpty());
}
@Test
public void testValidateIfConnectorEnabled() {
Map<String, String> configValues = MirrorCheckpointConfig.validate(makeProps(
MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED, "false",
MirrorCheckpointConfig.SYNC_GROUP_OFFSETS_ENABLED, "false"));
assertEquals(configValues.keySet(), Collections.singleton(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED));
configValues = MirrorCheckpointConfig.validate(makeProps(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED, "true",
MirrorCheckpointConfig.EMIT_OFFSET_SYNCS_ENABLED, "false"));
assertEquals(configValues.keySet(), Utils.mkSet(MirrorCheckpointConfig.EMIT_OFFSET_SYNCS_ENABLED));
configValues = MirrorCheckpointConfig.validate(makeProps(MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED, "true",
MirrorCheckpointConfig.EMIT_CHECKPOINTS_ENABLED, "true",
MirrorCheckpointConfig.EMIT_OFFSET_SYNCS_ENABLED, "true"));
assertTrue(configValues.isEmpty());
}
}

View File

@ -59,11 +59,10 @@ public class MirrorCheckpointConnectorTest {
Set<String> knownConsumerGroups = new HashSet<>();
knownConsumerGroups.add(CONSUMER_GROUP);
// MirrorCheckpointConnector as minimum to run taskConfig()
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups,
config);
List<Map<String, String>> output = connector.taskConfigs(1);
// expect no task will be created
List<Map<String, String>> output = new MirrorCheckpointConnector(knownConsumerGroups, config).taskConfigs(1);
assertEquals(0, output.size(), "MirrorCheckpointConnector not disabled");
}

View File

@ -19,7 +19,6 @@ package org.apache.kafka.connect.mirror;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
@ -28,13 +27,12 @@ import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.mirror.MirrorSourceTask.PartitionState;
import org.apache.kafka.connect.mirror.OffsetSyncWriter.PartitionState;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import java.util.ArrayList;
import java.util.Arrays;
@ -45,18 +43,17 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Semaphore;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.doReturn;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
@ -71,10 +68,8 @@ public class MirrorSourceTaskTest {
headers.add("header2", new byte[]{'p', 'q', 'r', 's', 't'});
ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>("topic1", 2, 3L, 4L,
TimestampType.CREATE_TIME, 5, 6, key, value, headers, Optional.empty());
@SuppressWarnings("unchecked")
KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(null, null, "cluster7",
new DefaultReplicationPolicy(), 50, producer, null, null, null);
new DefaultReplicationPolicy(), null);
SourceRecord sourceRecord = mirrorSourceTask.convertRecord(consumerRecord);
assertEquals("cluster7.topic1", sourceRecord.topic(),
"Failure on cluster7.topic1 consumerRecord serde");
@ -96,7 +91,7 @@ public class MirrorSourceTaskTest {
@Test
public void testOffsetSync() {
MirrorSourceTask.PartitionState partitionState = new MirrorSourceTask.PartitionState(50);
OffsetSyncWriter.PartitionState partitionState = new OffsetSyncWriter.PartitionState(50);
assertTrue(partitionState.update(0, 100), "always emit offset sync on first update");
assertTrue(partitionState.shouldSyncOffsets, "should sync offsets");
@ -132,7 +127,7 @@ public class MirrorSourceTaskTest {
@Test
public void testZeroOffsetSync() {
MirrorSourceTask.PartitionState partitionState = new MirrorSourceTask.PartitionState(0);
OffsetSyncWriter.PartitionState partitionState = new OffsetSyncWriter.PartitionState(0);
// if max offset lag is zero, should always emit offset syncs
assertTrue(partitionState.update(0, 100), "zeroOffsetSync downStreamOffset 100 is incorrect");
@ -186,8 +181,6 @@ public class MirrorSourceTaskTest {
@SuppressWarnings("unchecked")
KafkaConsumer<byte[], byte[]> consumer = mock(KafkaConsumer.class);
@SuppressWarnings("unchecked")
KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
when(consumer.poll(any())).thenReturn(consumerRecords);
MirrorSourceMetrics metrics = mock(MirrorSourceMetrics.class);
@ -195,7 +188,7 @@ public class MirrorSourceTaskTest {
String sourceClusterName = "cluster1";
ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName,
replicationPolicy, 50, producer, null, null, null);
replicationPolicy, null);
List<SourceRecord> sourceRecords = mirrorSourceTask.poll();
assertEquals(2, sourceRecords.size());
@ -255,7 +248,7 @@ public class MirrorSourceTaskTest {
});
MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(mockConsumer, null, null,
new DefaultReplicationPolicy(), 50, null, null, null, null);
new DefaultReplicationPolicy(), null);
mirrorSourceTask.initialize(mockSourceTaskContext);
// Call test subject
@ -296,21 +289,20 @@ public class MirrorSourceTaskTest {
String sourceClusterName = "cluster1";
ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName,
replicationPolicy, 50, producer, null, null, null);
replicationPolicy, null);
SourceRecord sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, 0, 0, System.currentTimeMillis(),
TimestampType.CREATE_TIME, key1.length, value1.length, key1, value1, headers, Optional.empty()));
// Expect that commitRecord will not throw an exception
mirrorSourceTask.commitRecord(sourceRecord, null);
verifyNoInteractions(producer);
}
@Test
public void testSendSyncEvent() {
byte[] recordKey = "key".getBytes();
byte[] recordValue = "value".getBytes();
int maxOffsetLag = 50;
long maxOffsetLag = 50;
int recordPartition = 0;
int recordOffset = 0;
int metadataOffset = 100;
@ -322,15 +314,16 @@ public class MirrorSourceTaskTest {
@SuppressWarnings("unchecked")
KafkaConsumer<byte[], byte[]> consumer = mock(KafkaConsumer.class);
@SuppressWarnings("unchecked")
KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
MirrorSourceMetrics metrics = mock(MirrorSourceMetrics.class);
Semaphore outstandingOffsetSyncs = new Semaphore(1);
PartitionState partitionState = new PartitionState(maxOffsetLag);
Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
OffsetSyncWriter offsetSyncWriter = mock(OffsetSyncWriter.class);
when(offsetSyncWriter.maxOffsetLag()).thenReturn(maxOffsetLag);
doNothing().when(offsetSyncWriter).firePendingOffsetSyncs();
doNothing().when(offsetSyncWriter).promoteDelayedOffsetSyncs();
MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName,
replicationPolicy, maxOffsetLag, producer, outstandingOffsetSyncs, partitionStates, topicName);
replicationPolicy, offsetSyncWriter);
SourceRecord sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition,
recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length,
@ -339,101 +332,17 @@ public class MirrorSourceTaskTest {
TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(sourceRecord.sourcePartition());
partitionStates.put(sourceTopicPartition, partitionState);
RecordMetadata recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
ArgumentCaptor<Callback> producerCallback = ArgumentCaptor.forClass(Callback.class);
when(producer.send(any(), producerCallback.capture())).thenAnswer(mockInvocation -> {
producerCallback.getValue().onCompletion(null, null);
return null;
});
doNothing().when(offsetSyncWriter).maybeQueueOffsetSyncs(eq(sourceTopicPartition), eq((long) recordOffset), eq(recordMetadata.offset()));
mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
// We should have dispatched this sync to the producer
verify(producer, times(1)).send(any(), any());
verify(offsetSyncWriter, times(1)).maybeQueueOffsetSyncs(eq(sourceTopicPartition), eq((long) recordOffset), eq(recordMetadata.offset()));
verify(offsetSyncWriter, times(1)).firePendingOffsetSyncs();
mirrorSourceTask.commit();
// No more syncs should take place; we've been able to publish all of them so far
verify(producer, times(1)).send(any(), any());
recordOffset = 2;
metadataOffset = 102;
recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition,
recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length,
recordValue.length, recordKey, recordValue, headers, Optional.empty()));
// Do not release outstanding sync semaphore
doReturn(null).when(producer).send(any(), producerCallback.capture());
mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
// We should have dispatched this sync to the producer
verify(producer, times(2)).send(any(), any());
mirrorSourceTask.commit();
// No more syncs should take place; we've been able to publish all of them so far
verify(producer, times(2)).send(any(), any());
// Do not send sync event
recordOffset = 4;
metadataOffset = 104;
recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition,
recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length,
recordValue.length, recordKey, recordValue, headers, Optional.empty()));
mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
mirrorSourceTask.commit();
// We should not have dispatched any more syncs to the producer; there were too many already in flight
verify(producer, times(2)).send(any(), any());
// Now the in-flight sync has been ack'd
producerCallback.getValue().onCompletion(null, null);
mirrorSourceTask.commit();
// We should dispatch the offset sync that was queued but previously not sent to the producer now
verify(producer, times(3)).send(any(), any());
// Ack the latest sync immediately
producerCallback.getValue().onCompletion(null, null);
// Should send sync event
recordOffset = 6;
metadataOffset = 106;
recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition,
recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length,
recordValue.length, recordKey, recordValue, headers, Optional.empty()));
mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
// We should have dispatched this sync to the producer
verify(producer, times(4)).send(any(), any());
// Ack the latest sync immediately
producerCallback.getValue().onCompletion(null, null);
mirrorSourceTask.commit();
// No more syncs should take place; we've been able to publish all of them so far
verify(producer, times(4)).send(any(), any());
// Don't skip the upstream record, so that the offset.lag.max determines whether the offset is emitted.
recordOffset = 7;
metadataOffset = 107;
recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition,
recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length,
recordValue.length, recordKey, recordValue, headers, Optional.empty()));
mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
// We should not have dispatched any more syncs to the producer; this sync was within offset.lag.max of the previous one.
verify(producer, times(4)).send(any(), any());
mirrorSourceTask.commit();
// We should dispatch the offset sync that was delayed until the next periodic offset commit.
verify(producer, times(5)).send(any(), any());
// Ack the latest sync immediately
producerCallback.getValue().onCompletion(null, null);
mirrorSourceTask.commit();
// No more syncs should take place; we've been able to publish all of them so far
verify(producer, times(5)).send(any(), any());
verify(offsetSyncWriter, times(1)).promoteDelayedOffsetSyncs();
verify(offsetSyncWriter, times(2)).firePendingOffsetSyncs();
}
private void compareHeaders(List<Header> expectedHeaders, List<org.apache.kafka.connect.header.Header> taskHeaders) {

View File

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.mirror;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import java.util.Map;
import java.util.concurrent.Semaphore;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class OffsetSyncWriterTest {
String topicName = "topic";
@SuppressWarnings("unchecked")
KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
TopicPartition topicPartition = new TopicPartition(topicName, 0);
@Test
public void testMaybeQueueOffsetSyncs() {
int maxOffsetLag = 2;
@SuppressWarnings("unchecked")
KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
Semaphore outstandingOffsetSyncs = new Semaphore(1);
OffsetSyncWriter offsetSyncWriter = new OffsetSyncWriter(producer, topicName, outstandingOffsetSyncs, maxOffsetLag);
offsetSyncWriter.maybeQueueOffsetSyncs(topicPartition, 0, 1);
assertFalse(offsetSyncWriter.getDelayedOffsetSyncs().containsKey(topicPartition));
assertTrue(offsetSyncWriter.getPendingOffsetSyncs().containsKey(topicPartition));
assertEquals(offsetSyncWriter.partitionStates().get(topicPartition).lastSyncDownstreamOffset, 1);
offsetSyncWriter.maybeQueueOffsetSyncs(topicPartition, 1, 2);
assertTrue(offsetSyncWriter.getDelayedOffsetSyncs().containsKey(topicPartition));
assertEquals(offsetSyncWriter.partitionStates().get(topicPartition).lastSyncDownstreamOffset, 1);
}
@Test
public void testFirePendingOffsetSyncs() {
int maxOffsetLag = 1;
Semaphore outstandingOffsetSyncs = new Semaphore(1);
OffsetSyncWriter offsetSyncWriter = new OffsetSyncWriter(producer, topicName, outstandingOffsetSyncs, maxOffsetLag);
offsetSyncWriter.maybeQueueOffsetSyncs(topicPartition, 0, 100);
assertEquals(offsetSyncWriter.partitionStates().get(topicPartition).lastSyncDownstreamOffset, 100);
offsetSyncWriter.firePendingOffsetSyncs();
ArgumentCaptor<Callback> producerCallback = ArgumentCaptor.forClass(Callback.class);
when(producer.send(any(), producerCallback.capture())).thenAnswer(mockInvocation -> {
producerCallback.getValue().onCompletion(null, null);
return null;
});
// We should have dispatched this sync to the producer
verify(producer, times(1)).send(any(), any());
offsetSyncWriter.maybeQueueOffsetSyncs(topicPartition, 2, 102);
assertEquals(offsetSyncWriter.partitionStates().get(topicPartition).lastSyncDownstreamOffset, 102);
offsetSyncWriter.firePendingOffsetSyncs();
// in-flight offset syncs; will not try to send remaining offset syncs immediately
verifyNoMoreInteractions(producer);
}
@Test
public void testPromoteDelayedOffsetSyncs() {
int maxOffsetLag = 50;
@SuppressWarnings("unchecked")
KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
Semaphore outstandingOffsetSyncs = new Semaphore(1);
OffsetSyncWriter offsetSyncWriter = new OffsetSyncWriter(producer, topicName, outstandingOffsetSyncs, maxOffsetLag);
offsetSyncWriter.maybeQueueOffsetSyncs(topicPartition, 0, 100);
offsetSyncWriter.maybeQueueOffsetSyncs(topicPartition, 1, 101);
offsetSyncWriter.promoteDelayedOffsetSyncs();
assertTrue(offsetSyncWriter.getDelayedOffsetSyncs().isEmpty());
Map<TopicPartition, OffsetSync> pendingOffsetSyncs = offsetSyncWriter.getPendingOffsetSyncs();
assertEquals(1, pendingOffsetSyncs.size());
assertEquals(1, pendingOffsetSyncs.get(topicPartition).upstreamOffset());
assertEquals(101, pendingOffsetSyncs.get(topicPartition).downstreamOffset());
}
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.connect.mirror.integration;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.utils.Utils;
@ -44,10 +45,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@ -55,10 +58,12 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
import static org.apache.kafka.connect.mirror.MirrorMaker.CONNECTOR_CLASSES;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("integration")
public class DedicatedMirrorIntegrationTest {
@ -188,6 +193,75 @@ public class DedicatedMirrorIntegrationTest {
}
}
@Test
public void testClusterWithEmitOffsetDisabled() throws Exception {
Properties brokerProps = new Properties();
EmbeddedKafkaCluster clusterA = startKafkaCluster("A", 1, brokerProps);
EmbeddedKafkaCluster clusterB = startKafkaCluster("B", 1, brokerProps);
try (Admin adminB = clusterB.createAdminClient()) {
// Cluster aliases
final String a = "A";
final String b = "B";
final String ab = a + "->" + b;
final String testTopicPrefix = "test-topic-";
Map<String, String> mmProps = new HashMap<String, String>() {{
put("dedicated.mode.enable.internal.rest", "false");
put("listeners", "http://localhost:0");
// Refresh topics very frequently to quickly pick up on topics that are created
// after the MM2 nodes are brought up during testing
put("refresh.topics.interval.seconds", "1");
put("clusters", String.join(", ", a, b));
put(a + ".bootstrap.servers", clusterA.bootstrapServers());
put(b + ".bootstrap.servers", clusterB.bootstrapServers());
put(ab + ".enabled", "true");
put(ab + ".topics", "^" + testTopicPrefix + ".*");
put("replication.factor", "1");
put("checkpoints.topic.replication.factor", "1");
put("heartbeats.topic.replication.factor", "1");
put("emit.offset-syncs.enabled", "false");
put("status.storage.replication.factor", "1");
put("offset.storage.replication.factor", "1");
put("config.storage.replication.factor", "1");
}};
// Bring up a single-node cluster
final MirrorMaker mm = startMirrorMaker("no-offset-syncing", mmProps);
final SourceAndTarget sourceAndTarget = new SourceAndTarget(a, b);
awaitMirrorMakerStart(mm, sourceAndTarget, Arrays.asList(MirrorSourceConnector.class, MirrorHeartbeatConnector.class));
// wait for mirror source and heartbeat connectors to start a task
awaitConnectorTasksStart(mm, MirrorHeartbeatConnector.class, sourceAndTarget);
final int numMessages = 10;
String topic = testTopicPrefix + "1";
// Create the topic on cluster A
clusterA.createTopic(topic, 1);
// and wait for MirrorMaker to create it on cluster B
awaitTopicCreation(b, adminB, a + "." + topic);
// wait for source connector to start a task
awaitConnectorTasksStart(mm, MirrorSourceConnector.class, sourceAndTarget);
// Write data to the topic on cluster A
writeToTopic(clusterA, topic, numMessages);
// and wait for MirrorMaker to copy it to cluster B
awaitTopicContent(clusterB, b, a + "." + topic, numMessages);
List<TopicDescription> offsetSyncTopic = clusterA.describeTopics("mm2-offset-syncs.B.internal").values()
.stream()
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
assertTrue(offsetSyncTopic.isEmpty());
}
}
/**
* Test that a multi-node dedicated cluster is able to dynamically detect new topics at runtime
* and reconfigure its connectors and their tasks to replicate those topics correctly.
@ -320,11 +394,14 @@ public class DedicatedMirrorIntegrationTest {
cluster.produce(topic, Integer.toString(i));
}
}
private void awaitMirrorMakerStart(final MirrorMaker mm, final SourceAndTarget sourceAndTarget) throws InterruptedException {
awaitMirrorMakerStart(mm, sourceAndTarget, CONNECTOR_CLASSES);
}
private void awaitMirrorMakerStart(final MirrorMaker mm, final SourceAndTarget sourceAndTarget, List<Class<?>> connectorClasses) throws InterruptedException {
waitForCondition(() -> {
try {
return CONNECTOR_CLASSES.stream().allMatch(
return connectorClasses.stream().allMatch(
connectorClazz -> isConnectorRunningForMirrorMaker(connectorClazz, mm, sourceAndTarget));
} catch (Exception ex) {
log.error("Something unexpected occurred. Unable to check for startup status for mirror maker {}", mm, ex);

View File

@ -24,6 +24,7 @@ import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@ -43,6 +44,7 @@ import org.apache.kafka.connect.mirror.Checkpoint;
import org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter;
import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
import org.apache.kafka.connect.mirror.MirrorClient;
import org.apache.kafka.connect.mirror.MirrorConnectorConfig;
import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
import org.apache.kafka.connect.mirror.MirrorMakerConfig;
import org.apache.kafka.connect.mirror.MirrorSourceConnector;
@ -72,6 +74,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@ -85,6 +88,8 @@ import java.util.function.LongUnaryOperator;
import java.util.stream.Collectors;
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.OFFSET_SYNCS_CLIENT_ROLE_PREFIX;
import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.OFFSET_SYNCS_TOPIC_CONFIG_PREFIX;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -534,6 +539,49 @@ public class MirrorConnectorsIntegrationBaseTest {
assertMonotonicCheckpoints(backup, PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal");
}
@Test
public void testReplicationWithoutOffsetSyncWillNotCreateOffsetSyncsTopic() throws Exception {
produceMessages(primaryProducer, "test-topic-1");
String backupTopic1 = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
if (replicateBackupToPrimary) {
produceMessages(backupProducer, "test-topic-1");
}
String consumerGroupName = "consumer-group-testReplication";
Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
// warm up consumers before starting the connectors, so we don't need to wait for discovery
warmUpConsumer(consumerProps);
mm2Props.keySet().removeIf(prop -> prop.startsWith(OFFSET_SYNCS_CLIENT_ROLE_PREFIX) || prop.startsWith(OFFSET_SYNCS_TOPIC_CONFIG_PREFIX));
mm2Props.put(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED, "false");
mm2Config = new MirrorMakerConfig(mm2Props);
waitUntilMirrorMakerIsRunning(backup, Arrays.asList(MirrorSourceConnector.class, MirrorHeartbeatConnector.class), mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
MirrorClient primaryClient = new MirrorClient(mm2Config.clientConfig(PRIMARY_CLUSTER_ALIAS));
MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
// make sure the topic is auto-created in the other cluster
waitForTopicCreated(backup, backupTopic1);
assertEquals(NUM_RECORDS_PRODUCED, primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(),
"Records were not produced to primary cluster.");
assertEquals(NUM_RECORDS_PRODUCED, backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, backupTopic1).count(),
"Records were not replicated to backup cluster.");
assertEquals(NUM_RECORDS_PRODUCED, backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(),
"Records were not produced to backup cluster.");
List<TopicDescription> offsetSyncTopic = primary.kafka().describeTopics("mm2-offset-syncs.backup.internal").values()
.stream()
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
assertTrue(offsetSyncTopic.isEmpty());
primaryClient.close();
backupClient.close();
}
@Test
public void testOffsetSyncsTopicsOnTarget() throws Exception {
// move offset-syncs topics to target