From 9ab689f7d45458e92883019f54fdcf9a2d9ce456 Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Mon, 6 Feb 2023 04:53:58 -0500 Subject: [PATCH] KAFKA-14610: Publish Mirror Maker 2 offset syncs in task commit() method (#13181) Reviewers: Mickael Maison , Greg Harris --- .../connect/mirror/MirrorSourceTask.java | 62 ++++++++++++++----- .../connect/mirror/MirrorSourceTaskTest.java | 31 ++++++++-- 2 files changed, 71 insertions(+), 22 deletions(-) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java index 5635eb7189d..09de13aff34 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java @@ -36,6 +36,8 @@ import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.Map; import java.util.HashMap; import java.util.List; @@ -62,6 +64,7 @@ public class MirrorSourceTask extends SourceTask { private ReplicationPolicy replicationPolicy; private MirrorSourceMetrics metrics; private boolean stopping = false; + private final Map pendingOffsetSyncs = new LinkedHashMap<>(); private Semaphore outstandingOffsetSyncs; private Semaphore consumerAccess; @@ -87,6 +90,7 @@ public class MirrorSourceTask extends SourceTask { @Override public void start(Map props) { MirrorSourceTaskConfig config = new MirrorSourceTaskConfig(props); + pendingOffsetSyncs.clear(); outstandingOffsetSyncs = new Semaphore(MAX_OUTSTANDING_OFFSET_SYNCS); consumerAccess = new Semaphore(1); // let one thread at a time access the consumer sourceClusterAlias = config.sourceClusterAlias(); @@ -111,7 +115,9 @@ public class MirrorSourceTask extends SourceTask { @Override public void commit() { - // nop + // Publish any offset syncs that we've queued up, but have not yet been able to publish + // (likely because we previously reached our limit for number of outstanding syncs) + firePendingOffsetSyncs(); } @Override @@ -194,41 +200,63 @@ public class MirrorSourceTask extends SourceTask { TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition()); long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset()); long downstreamOffset = metadata.offset(); - maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset); + maybeQueueOffsetSyncs(sourceTopicPartition, upstreamOffset, downstreamOffset); + // We may be able to immediately publish an offset sync that we've queued up here + firePendingOffsetSyncs(); } - // updates partition state and sends OffsetSync if necessary - private void maybeSyncOffsets(TopicPartition topicPartition, long upstreamOffset, - long downstreamOffset) { + // updates partition state and queues up OffsetSync if necessary + private void maybeQueueOffsetSyncs(TopicPartition topicPartition, long upstreamOffset, + long downstreamOffset) { PartitionState partitionState = partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag)); if (partitionState.update(upstreamOffset, downstreamOffset)) { - if (sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset)) { - partitionState.reset(); + OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset); + synchronized (this) { + pendingOffsetSyncs.put(topicPartition, offsetSync); } + partitionState.reset(); } } - // sends OffsetSync record upstream to internal offsets topic - private boolean sendOffsetSync(TopicPartition topicPartition, long upstreamOffset, - long downstreamOffset) { - if (!outstandingOffsetSyncs.tryAcquire()) { - // Too many outstanding offset syncs. - return false; + private void firePendingOffsetSyncs() { + while (true) { + OffsetSync pendingOffsetSync; + synchronized (this) { + Iterator syncIterator = pendingOffsetSyncs.values().iterator(); + if (!syncIterator.hasNext()) { + // Nothing to sync + log.trace("No more pending offset syncs"); + return; + } + pendingOffsetSync = syncIterator.next(); + if (!outstandingOffsetSyncs.tryAcquire()) { + // Too many outstanding syncs + log.trace("Too many in-flight offset syncs; will try to send remaining offset syncs later"); + return; + } + syncIterator.remove(); + } + // Publish offset sync outside of synchronized block; we may have to + // wait for producer metadata to update before Producer::send returns + sendOffsetSync(pendingOffsetSync); + log.trace("Dispatched offset sync for {}", pendingOffsetSync.topicPartition()); } - OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset); + } + + // sends OffsetSync record to internal offsets topic + private void sendOffsetSync(OffsetSync offsetSync) { ProducerRecord record = new ProducerRecord<>(offsetSyncsTopic, 0, offsetSync.recordKey(), offsetSync.recordValue()); offsetProducer.send(record, (x, e) -> { if (e != null) { log.error("Failure sending offset sync.", e); } else { - log.trace("Sync'd offsets for {}: {}=={}", topicPartition, - upstreamOffset, downstreamOffset); + log.trace("Sync'd offsets for {}: {}=={}", offsetSync.topicPartition(), + offsetSync.upstreamOffset(), offsetSync.downstreamOffset()); } outstandingOffsetSyncs.release(); }); - return true; } private Map loadOffsets(Set topicPartitions) { diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java index 9dfcf807ed2..b309df79fd9 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java @@ -283,7 +283,11 @@ public class MirrorSourceTaskTest { }); mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); + // We should have dispatched this sync to the producer + verify(producer, times(1)).send(any(), any()); + mirrorSourceTask.commit(); + // No more syncs should take place; we've been able to publish all of them so far verify(producer, times(1)).send(any(), any()); recordOffset = 2; @@ -297,7 +301,11 @@ public class MirrorSourceTaskTest { doReturn(null).when(producer).send(any(), producerCallback.capture()); mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); + // We should have dispatched this sync to the producer + verify(producer, times(2)).send(any(), any()); + mirrorSourceTask.commit(); + // No more syncs should take place; we've been able to publish all of them so far verify(producer, times(2)).send(any(), any()); // Do not send sync event @@ -309,22 +317,35 @@ public class MirrorSourceTaskTest { recordValue.length, recordKey, recordValue, headers, Optional.empty())); mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); + mirrorSourceTask.commit(); + // We should not have dispatched any more syncs to the producer; there were too many already in flight verify(producer, times(2)).send(any(), any()); + // Now the in-flight sync has been ack'd + producerCallback.getValue().onCompletion(null, null); + mirrorSourceTask.commit(); + // We should dispatch the offset sync that was queued but previously not sent to the producer now + verify(producer, times(3)).send(any(), any()); + + // Ack the latest sync immediately + producerCallback.getValue().onCompletion(null, null); + // Should send sync event - recordOffset = 5; - metadataOffset = 150; + recordOffset = 6; + metadataOffset = 106; recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition); sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition, recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length, recordValue.length, recordKey, recordValue, headers, Optional.empty())); - producerCallback.getValue().onCompletion(null, null); - mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); + // We should have dispatched this sync to the producer + verify(producer, times(4)).send(any(), any()); - verify(producer, times(3)).send(any(), any()); + mirrorSourceTask.commit(); + // No more syncs should take place; we've been able to publish all of them so far + verify(producer, times(4)).send(any(), any()); } private void compareHeaders(List
expectedHeaders, List taskHeaders) {