KAFKA-14610: Publish Mirror Maker 2 offset syncs in task commit() method (#13181)

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Greg Harris <gharris1727@gmail.com>
This commit is contained in:
Chris Egerton 2023-02-06 04:53:58 -05:00 committed by GitHub
parent 4a7fedd46a
commit 9ab689f7d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 71 additions and 22 deletions

View File

@ -36,6 +36,8 @@ import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -62,6 +64,7 @@ public class MirrorSourceTask extends SourceTask {
private ReplicationPolicy replicationPolicy; private ReplicationPolicy replicationPolicy;
private MirrorSourceMetrics metrics; private MirrorSourceMetrics metrics;
private boolean stopping = false; private boolean stopping = false;
private final Map<TopicPartition, OffsetSync> pendingOffsetSyncs = new LinkedHashMap<>();
private Semaphore outstandingOffsetSyncs; private Semaphore outstandingOffsetSyncs;
private Semaphore consumerAccess; private Semaphore consumerAccess;
@ -87,6 +90,7 @@ public class MirrorSourceTask extends SourceTask {
@Override @Override
public void start(Map<String, String> props) { public void start(Map<String, String> props) {
MirrorSourceTaskConfig config = new MirrorSourceTaskConfig(props); MirrorSourceTaskConfig config = new MirrorSourceTaskConfig(props);
pendingOffsetSyncs.clear();
outstandingOffsetSyncs = new Semaphore(MAX_OUTSTANDING_OFFSET_SYNCS); outstandingOffsetSyncs = new Semaphore(MAX_OUTSTANDING_OFFSET_SYNCS);
consumerAccess = new Semaphore(1); // let one thread at a time access the consumer consumerAccess = new Semaphore(1); // let one thread at a time access the consumer
sourceClusterAlias = config.sourceClusterAlias(); sourceClusterAlias = config.sourceClusterAlias();
@ -111,7 +115,9 @@ public class MirrorSourceTask extends SourceTask {
@Override @Override
public void commit() { public void commit() {
// nop // Publish any offset syncs that we've queued up, but have not yet been able to publish
// (likely because we previously reached our limit for number of outstanding syncs)
firePendingOffsetSyncs();
} }
@Override @Override
@ -194,41 +200,63 @@ public class MirrorSourceTask extends SourceTask {
TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition()); TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition());
long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset()); long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
long downstreamOffset = metadata.offset(); long downstreamOffset = metadata.offset();
maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset); maybeQueueOffsetSyncs(sourceTopicPartition, upstreamOffset, downstreamOffset);
// We may be able to immediately publish an offset sync that we've queued up here
firePendingOffsetSyncs();
} }
// updates partition state and sends OffsetSync if necessary // updates partition state and queues up OffsetSync if necessary
private void maybeSyncOffsets(TopicPartition topicPartition, long upstreamOffset, private void maybeQueueOffsetSyncs(TopicPartition topicPartition, long upstreamOffset,
long downstreamOffset) { long downstreamOffset) {
PartitionState partitionState = PartitionState partitionState =
partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag)); partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag));
if (partitionState.update(upstreamOffset, downstreamOffset)) { if (partitionState.update(upstreamOffset, downstreamOffset)) {
if (sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset)) { OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset);
synchronized (this) {
pendingOffsetSyncs.put(topicPartition, offsetSync);
}
partitionState.reset(); partitionState.reset();
} }
} }
private void firePendingOffsetSyncs() {
while (true) {
OffsetSync pendingOffsetSync;
synchronized (this) {
Iterator<OffsetSync> syncIterator = pendingOffsetSyncs.values().iterator();
if (!syncIterator.hasNext()) {
// Nothing to sync
log.trace("No more pending offset syncs");
return;
}
pendingOffsetSync = syncIterator.next();
if (!outstandingOffsetSyncs.tryAcquire()) {
// Too many outstanding syncs
log.trace("Too many in-flight offset syncs; will try to send remaining offset syncs later");
return;
}
syncIterator.remove();
}
// Publish offset sync outside of synchronized block; we may have to
// wait for producer metadata to update before Producer::send returns
sendOffsetSync(pendingOffsetSync);
log.trace("Dispatched offset sync for {}", pendingOffsetSync.topicPartition());
}
} }
// sends OffsetSync record upstream to internal offsets topic // sends OffsetSync record to internal offsets topic
private boolean sendOffsetSync(TopicPartition topicPartition, long upstreamOffset, private void sendOffsetSync(OffsetSync offsetSync) {
long downstreamOffset) {
if (!outstandingOffsetSyncs.tryAcquire()) {
// Too many outstanding offset syncs.
return false;
}
OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset);
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(offsetSyncsTopic, 0, ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(offsetSyncsTopic, 0,
offsetSync.recordKey(), offsetSync.recordValue()); offsetSync.recordKey(), offsetSync.recordValue());
offsetProducer.send(record, (x, e) -> { offsetProducer.send(record, (x, e) -> {
if (e != null) { if (e != null) {
log.error("Failure sending offset sync.", e); log.error("Failure sending offset sync.", e);
} else { } else {
log.trace("Sync'd offsets for {}: {}=={}", topicPartition, log.trace("Sync'd offsets for {}: {}=={}", offsetSync.topicPartition(),
upstreamOffset, downstreamOffset); offsetSync.upstreamOffset(), offsetSync.downstreamOffset());
} }
outstandingOffsetSyncs.release(); outstandingOffsetSyncs.release();
}); });
return true;
} }
private Map<TopicPartition, Long> loadOffsets(Set<TopicPartition> topicPartitions) { private Map<TopicPartition, Long> loadOffsets(Set<TopicPartition> topicPartitions) {

View File

@ -283,7 +283,11 @@ public class MirrorSourceTaskTest {
}); });
mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
// We should have dispatched this sync to the producer
verify(producer, times(1)).send(any(), any());
mirrorSourceTask.commit();
// No more syncs should take place; we've been able to publish all of them so far
verify(producer, times(1)).send(any(), any()); verify(producer, times(1)).send(any(), any());
recordOffset = 2; recordOffset = 2;
@ -297,7 +301,11 @@ public class MirrorSourceTaskTest {
doReturn(null).when(producer).send(any(), producerCallback.capture()); doReturn(null).when(producer).send(any(), producerCallback.capture());
mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
// We should have dispatched this sync to the producer
verify(producer, times(2)).send(any(), any());
mirrorSourceTask.commit();
// No more syncs should take place; we've been able to publish all of them so far
verify(producer, times(2)).send(any(), any()); verify(producer, times(2)).send(any(), any());
// Do not send sync event // Do not send sync event
@ -309,22 +317,35 @@ public class MirrorSourceTaskTest {
recordValue.length, recordKey, recordValue, headers, Optional.empty())); recordValue.length, recordKey, recordValue, headers, Optional.empty()));
mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
mirrorSourceTask.commit();
// We should not have dispatched any more syncs to the producer; there were too many already in flight
verify(producer, times(2)).send(any(), any()); verify(producer, times(2)).send(any(), any());
// Now the in-flight sync has been ack'd
producerCallback.getValue().onCompletion(null, null);
mirrorSourceTask.commit();
// We should dispatch the offset sync that was queued but previously not sent to the producer now
verify(producer, times(3)).send(any(), any());
// Ack the latest sync immediately
producerCallback.getValue().onCompletion(null, null);
// Should send sync event // Should send sync event
recordOffset = 5; recordOffset = 6;
metadataOffset = 150; metadataOffset = 106;
recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition); recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition, sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition,
recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length, recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length,
recordValue.length, recordKey, recordValue, headers, Optional.empty())); recordValue.length, recordKey, recordValue, headers, Optional.empty()));
producerCallback.getValue().onCompletion(null, null);
mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
// We should have dispatched this sync to the producer
verify(producer, times(4)).send(any(), any());
verify(producer, times(3)).send(any(), any()); mirrorSourceTask.commit();
// No more syncs should take place; we've been able to publish all of them so far
verify(producer, times(4)).send(any(), any());
} }
private void compareHeaders(List<Header> expectedHeaders, List<org.apache.kafka.connect.header.Header> taskHeaders) { private void compareHeaders(List<Header> expectedHeaders, List<org.apache.kafka.connect.header.Header> taskHeaders) {