KAFKA-13988: Enable replicating from latest offset with MirrorMaker 2 (#14567)

Reviewers: hudeqi <1217150961@qq.com>, Federico Valeri <fedevaleri@gmail.com>, Greg Harris <gharris1727@gmail.com>
This commit is contained in:
Chris Egerton 2023-12-04 16:37:37 -05:00 committed by GitHub
parent 74be72a559
commit a83bc2d977
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 118 additions and 7 deletions

View File

@ -103,12 +103,8 @@ public class MirrorSourceTask extends SourceTask {
consumer = MirrorUtils.newConsumer(config.sourceConsumerConfig("replication-consumer"));
offsetProducer = MirrorUtils.newProducer(config.offsetSyncsTopicProducerConfig());
Set<TopicPartition> taskTopicPartitions = config.taskTopicPartitions();
Map<TopicPartition, Long> topicPartitionOffsets = loadOffsets(taskTopicPartitions);
consumer.assign(topicPartitionOffsets.keySet());
log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.entrySet().stream()
.filter(x -> x.getValue() == 0L).count());
log.trace("Seeking offsets: {}", topicPartitionOffsets);
topicPartitionOffsets.forEach(consumer::seek);
initializeConsumer(taskTopicPartitions);
log.info("{} replicating {} topic-partitions {}->{}: {}.", Thread.currentThread().getName(),
taskTopicPartitions.size(), sourceClusterAlias, config.targetClusterAlias(), taskTopicPartitions);
}
@ -266,7 +262,26 @@ public class MirrorSourceTask extends SourceTask {
private Long loadOffset(TopicPartition topicPartition) {
Map<String, Object> wrappedPartition = MirrorUtils.wrapPartition(topicPartition, sourceClusterAlias);
Map<String, Object> wrappedOffset = context.offsetStorageReader().offset(wrappedPartition);
return MirrorUtils.unwrapOffset(wrappedOffset) + 1;
return MirrorUtils.unwrapOffset(wrappedOffset);
}
// visible for testing
void initializeConsumer(Set<TopicPartition> taskTopicPartitions) {
Map<TopicPartition, Long> topicPartitionOffsets = loadOffsets(taskTopicPartitions);
consumer.assign(topicPartitionOffsets.keySet());
log.info("Starting with {} previously uncommitted partitions.", topicPartitionOffsets.values().stream()
.filter(this::isUncommitted).count());
topicPartitionOffsets.forEach((topicPartition, offset) -> {
// Do not call seek on partitions that don't have an existing offset committed.
if (isUncommitted(offset)) {
log.trace("Skipping seeking offset for topicPartition: {}", topicPartition);
return;
}
long nextOffsetToCommittedOffset = offset + 1L;
log.trace("Seeking to offset {} for topicPartition: {}", nextOffsetToCommittedOffset, topicPartition);
consumer.seek(topicPartition, nextOffsetToCommittedOffset);
});
}
// visible for testing
@ -302,6 +317,10 @@ public class MirrorSourceTask extends SourceTask {
}
}
private boolean isUncommitted(Long offset) {
return offset == null || offset < 0;
}
static class PartitionState {
long previousUpstreamOffset = -1L;
long previousDownstreamOffset = -1L;

View File

@ -31,25 +31,33 @@ import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.connect.mirror.MirrorSourceTask.PartitionState;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Semaphore;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verifyNoInteractions;
@ -214,6 +222,61 @@ public class MirrorSourceTaskTest {
}
}
@Test
public void testSeekBehaviorDuringStart() {
// Setting up mock behavior.
@SuppressWarnings("unchecked")
KafkaConsumer<byte[], byte[]> mockConsumer = mock(KafkaConsumer.class);
SourceTaskContext mockSourceTaskContext = mock(SourceTaskContext.class);
OffsetStorageReader mockOffsetStorageReader = mock(OffsetStorageReader.class);
when(mockSourceTaskContext.offsetStorageReader()).thenReturn(mockOffsetStorageReader);
Set<TopicPartition> topicPartitions = new HashSet<>(Arrays.asList(
new TopicPartition("previouslyReplicatedTopic", 8),
new TopicPartition("previouslyReplicatedTopic1", 0),
new TopicPartition("previouslyReplicatedTopic", 1),
new TopicPartition("newTopicToReplicate1", 1),
new TopicPartition("newTopicToReplicate1", 4),
new TopicPartition("newTopicToReplicate2", 0)
));
long arbitraryCommittedOffset = 4L;
long offsetToSeek = arbitraryCommittedOffset + 1L;
when(mockOffsetStorageReader.offset(anyMap())).thenAnswer(testInvocation -> {
Map<String, Object> topicPartitionOffsetMap = testInvocation.getArgument(0);
String topicName = topicPartitionOffsetMap.get("topic").toString();
// Only return the offset for previously replicated topics.
// For others, there is no value set.
if (topicName.startsWith("previouslyReplicatedTopic")) {
topicPartitionOffsetMap.put("offset", arbitraryCommittedOffset);
}
return topicPartitionOffsetMap;
});
MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(mockConsumer, null, null,
new DefaultReplicationPolicy(), 50, null, null, null, null);
mirrorSourceTask.initialize(mockSourceTaskContext);
// Call test subject
mirrorSourceTask.initializeConsumer(topicPartitions);
// Verifications
// Ensure all the topic partitions are assigned to consumer
verify(mockConsumer, times(1)).assign(topicPartitions);
// Ensure seek is only called for previously committed topic partitions.
verify(mockConsumer, times(1))
.seek(new TopicPartition("previouslyReplicatedTopic", 8), offsetToSeek);
verify(mockConsumer, times(1))
.seek(new TopicPartition("previouslyReplicatedTopic", 1), offsetToSeek);
verify(mockConsumer, times(1))
.seek(new TopicPartition("previouslyReplicatedTopic1", 0), offsetToSeek);
verifyNoMoreInteractions(mockConsumer);
}
@Test
public void testCommitRecordWithNullMetadata() {
// Create a consumer mock

View File

@ -81,6 +81,7 @@ import org.junit.jupiter.api.Tag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@ -874,6 +875,34 @@ public class MirrorConnectorsIntegrationBaseTest {
}, 30000, "Topic configurations were not synced");
}
@Test
public void testReplicateFromLatest() throws Exception {
// populate topic with records that should not be replicated
String topic = "test-topic-1";
produceMessages(primaryProducer, topic, NUM_PARTITIONS);
// consume from the ends of topics when no committed offsets are found
mm2Props.put(PRIMARY_CLUSTER_ALIAS + ".consumer." + AUTO_OFFSET_RESET_CONFIG, "latest");
// one way replication from primary to backup
mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false");
mm2Config = new MirrorMakerConfig(mm2Props);
waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
// produce some more messages to the topic, now that MM2 is running and replication should be taking place
produceMessages(primaryProducer, topic, NUM_PARTITIONS);
String backupTopic = remoteTopicName(topic, PRIMARY_CLUSTER_ALIAS);
// wait for at least the expected number of records to be replicated to the backup cluster
backup.kafka().consume(NUM_PARTITIONS * NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, backupTopic);
// consume all records from backup cluster
ConsumerRecords<byte[], byte[]> replicatedRecords = backup.kafka().consumeAll(RECORD_TRANSFER_DURATION_MS, backupTopic);
// ensure that we only replicated the records produced after startup
replicatedRecords.partitions().forEach(topicPartition -> {
int replicatedCount = replicatedRecords.records(topicPartition).size();
assertEquals(NUM_RECORDS_PER_PARTITION, replicatedCount);
});
}
private TopicPartition remoteTopicPartition(TopicPartition tp, String alias) {
return new TopicPartition(remoteTopicName(tp.topic(), alias), tp.partition());
}