KAFKA-12558: Do not prematurely mutate internal partition state in Mirror Maker 2 (#11818)

Reviewers: Greg Harris <greg.harris@aiven.io>, Chris Egerton <chrise@aiven.io>
This commit is contained in:
emilnkrastev 2023-01-10 16:46:25 +02:00 committed by GitHub
parent def8d724c8
commit 6e7e2e08a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 156 additions and 11 deletions

View File

@ -69,7 +69,9 @@ public class MirrorSourceTask extends SourceTask {
// for testing
MirrorSourceTask(KafkaConsumer<byte[], byte[]> consumer, MirrorSourceMetrics metrics, String sourceClusterAlias,
ReplicationPolicy replicationPolicy, long maxOffsetLag, KafkaProducer<byte[], byte[]> producer) {
ReplicationPolicy replicationPolicy, long maxOffsetLag, KafkaProducer<byte[], byte[]> producer,
Semaphore outstandingOffsetSyncs, Map<TopicPartition, PartitionState> partitionStates,
String offsetSyncsTopic) {
this.consumer = consumer;
this.metrics = metrics;
this.sourceClusterAlias = sourceClusterAlias;
@ -77,6 +79,9 @@ public class MirrorSourceTask extends SourceTask {
this.maxOffsetLag = maxOffsetLag;
consumerAccess = new Semaphore(1);
this.offsetProducer = producer;
this.outstandingOffsetSyncs = outstandingOffsetSyncs;
this.partitionStates = partitionStates;
this.offsetSyncsTopic = offsetSyncsTopic;
}
@Override
@ -198,16 +203,18 @@ public class MirrorSourceTask extends SourceTask {
PartitionState partitionState =
partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag));
if (partitionState.update(upstreamOffset, downstreamOffset)) {
sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset);
if (sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset)) {
partitionState.reset();
}
}
}
// sends OffsetSync record upstream to internal offsets topic
private void sendOffsetSync(TopicPartition topicPartition, long upstreamOffset,
private boolean sendOffsetSync(TopicPartition topicPartition, long upstreamOffset,
long downstreamOffset) {
if (!outstandingOffsetSyncs.tryAcquire()) {
// Too many outstanding offset syncs.
return;
return false;
}
OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset);
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(offsetSyncsTopic, 0,
@ -221,6 +228,7 @@ public class MirrorSourceTask extends SourceTask {
}
outstandingOffsetSyncs.release();
});
return true;
}
private Map<TopicPartition, Long> loadOffsets(Set<TopicPartition> topicPartitions) {
@ -272,6 +280,7 @@ public class MirrorSourceTask extends SourceTask {
long lastSyncUpstreamOffset = -1L;
long lastSyncDownstreamOffset = -1L;
long maxOffsetLag;
boolean shouldSyncOffsets;
PartitionState(long maxOffsetLag) {
this.maxOffsetLag = maxOffsetLag;
@ -279,7 +288,6 @@ public class MirrorSourceTask extends SourceTask {
// true if we should emit an offset sync
boolean update(long upstreamOffset, long downstreamOffset) {
boolean shouldSyncOffsets = false;
long upstreamStep = upstreamOffset - lastSyncUpstreamOffset;
long downstreamTargetOffset = lastSyncDownstreamOffset + upstreamStep;
if (lastSyncDownstreamOffset == -1L
@ -294,5 +302,9 @@ public class MirrorSourceTask extends SourceTask {
previousDownstreamOffset = downstreamOffset;
return shouldSyncOffsets;
}
void reset() {
shouldSyncOffsets = false;
}
}
}

View File

@ -19,26 +19,37 @@ package org.apache.kafka.connect.mirror;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.connect.mirror.MirrorSourceTask.PartitionState;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Semaphore;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verifyNoInteractions;
@ -56,7 +67,7 @@ public class MirrorSourceTaskTest {
@SuppressWarnings("unchecked")
KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(null, null, "cluster7",
new DefaultReplicationPolicy(), 50, producer);
new DefaultReplicationPolicy(), 50, producer, null, null, null);
SourceRecord sourceRecord = mirrorSourceTask.convertRecord(consumerRecord);
assertEquals("cluster7.topic1", sourceRecord.topic(),
"Failure on cluster7.topic1 consumerRecord serde");
@ -81,15 +92,33 @@ public class MirrorSourceTaskTest {
MirrorSourceTask.PartitionState partitionState = new MirrorSourceTask.PartitionState(50);
assertTrue(partitionState.update(0, 100), "always emit offset sync on first update");
assertTrue(partitionState.shouldSyncOffsets, "should sync offsets");
partitionState.reset();
assertFalse(partitionState.shouldSyncOffsets, "should sync offsets to false");
assertTrue(partitionState.update(2, 102), "upstream offset skipped -> resync");
partitionState.reset();
assertFalse(partitionState.update(3, 152), "no sync");
partitionState.reset();
assertFalse(partitionState.update(4, 153), "no sync");
partitionState.reset();
assertFalse(partitionState.update(5, 154), "no sync");
partitionState.reset();
assertTrue(partitionState.update(6, 205), "one past target offset");
partitionState.reset();
assertTrue(partitionState.update(2, 206), "upstream reset");
partitionState.reset();
assertFalse(partitionState.update(3, 207), "no sync");
partitionState.reset();
assertTrue(partitionState.update(4, 3), "downstream reset");
partitionState.reset();
assertFalse(partitionState.update(5, 4), "no sync");
assertTrue(partitionState.update(7, 6), "sync");
assertTrue(partitionState.update(7, 6), "sync");
assertTrue(partitionState.update(8, 7), "sync");
assertTrue(partitionState.update(10, 57), "sync");
partitionState.reset();
assertFalse(partitionState.update(11, 58), "sync");
assertFalse(partitionState.shouldSyncOffsets, "should sync offsets to false");
}
@Test
@ -98,15 +127,32 @@ public class MirrorSourceTaskTest {
// if max offset lag is zero, should always emit offset syncs
assertTrue(partitionState.update(0, 100), "zeroOffsetSync downStreamOffset 100 is incorrect");
assertTrue(partitionState.shouldSyncOffsets, "should sync offsets");
partitionState.reset();
assertFalse(partitionState.shouldSyncOffsets, "should sync offsets to false");
assertTrue(partitionState.update(2, 102), "zeroOffsetSync downStreamOffset 102 is incorrect");
partitionState.reset();
assertTrue(partitionState.update(3, 153), "zeroOffsetSync downStreamOffset 153 is incorrect");
partitionState.reset();
assertTrue(partitionState.update(4, 154), "zeroOffsetSync downStreamOffset 154 is incorrect");
partitionState.reset();
assertTrue(partitionState.update(5, 155), "zeroOffsetSync downStreamOffset 155 is incorrect");
partitionState.reset();
assertTrue(partitionState.update(6, 207), "zeroOffsetSync downStreamOffset 207 is incorrect");
partitionState.reset();
assertTrue(partitionState.update(2, 208), "zeroOffsetSync downStreamOffset 208 is incorrect");
partitionState.reset();
assertTrue(partitionState.update(3, 209), "zeroOffsetSync downStreamOffset 209 is incorrect");
partitionState.reset();
assertTrue(partitionState.update(4, 3), "zeroOffsetSync downStreamOffset 3 is incorrect");
partitionState.reset();
assertTrue(partitionState.update(5, 4), "zeroOffsetSync downStreamOffset 4 is incorrect");
assertTrue(partitionState.update(7, 6), "zeroOffsetSync downStreamOffset 6 is incorrect");
assertTrue(partitionState.update(7, 6), "zeroOffsetSync downStreamOffset 6 is incorrect");
assertTrue(partitionState.update(8, 7), "zeroOffsetSync downStreamOffset 7 is incorrect");
assertTrue(partitionState.update(10, 57), "zeroOffsetSync downStreamOffset 57 is incorrect");
partitionState.reset();
assertTrue(partitionState.update(11, 58), "zeroOffsetSync downStreamOffset 58 is incorrect");
}
@Test
@ -140,7 +186,7 @@ public class MirrorSourceTaskTest {
String sourceClusterName = "cluster1";
ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName,
replicationPolicy, 50, producer);
replicationPolicy, 50, producer, null, null, null);
List<SourceRecord> sourceRecords = mirrorSourceTask.poll();
assertEquals(2, sourceRecords.size());
@ -186,7 +232,7 @@ public class MirrorSourceTaskTest {
String sourceClusterName = "cluster1";
ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName,
replicationPolicy, 50, producer);
replicationPolicy, 50, producer, null, null, null);
SourceRecord sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, 0, 0, System.currentTimeMillis(),
TimestampType.CREATE_TIME, key1.length, value1.length, key1, value1, headers, Optional.empty()));
@ -196,6 +242,91 @@ public class MirrorSourceTaskTest {
verifyNoInteractions(producer);
}
@Test
public void testSendSyncEvent() {
byte[] recordKey = "key".getBytes();
byte[] recordValue = "value".getBytes();
int maxOffsetLag = 50;
int recordPartition = 0;
int recordOffset = 0;
int metadataOffset = 100;
String topicName = "topic";
String sourceClusterName = "sourceCluster";
RecordHeaders headers = new RecordHeaders();
ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
@SuppressWarnings("unchecked")
KafkaConsumer<byte[], byte[]> consumer = mock(KafkaConsumer.class);
@SuppressWarnings("unchecked")
KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
MirrorSourceMetrics metrics = mock(MirrorSourceMetrics.class);
Semaphore outstandingOffsetSyncs = new Semaphore(1);
PartitionState partitionState = new PartitionState(maxOffsetLag);
Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName,
replicationPolicy, maxOffsetLag, producer, outstandingOffsetSyncs, partitionStates, topicName);
SourceRecord sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition,
recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length,
recordValue.length, recordKey, recordValue, headers, Optional.empty()));
TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(sourceRecord.sourcePartition());
partitionStates.put(sourceTopicPartition, partitionState);
RecordMetadata recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
ArgumentCaptor<Callback> producerCallback = ArgumentCaptor.forClass(Callback.class);
when(producer.send(any(), producerCallback.capture())).thenAnswer(mockInvocation -> {
producerCallback.getValue().onCompletion(null, null);
return null;
});
mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
verify(producer, times(1)).send(any(), any());
recordOffset = 2;
metadataOffset = 102;
recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition,
recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length,
recordValue.length, recordKey, recordValue, headers, Optional.empty()));
// Do not release outstanding sync semaphore
doReturn(null).when(producer).send(any(), producerCallback.capture());
mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
verify(producer, times(2)).send(any(), any());
// Do not send sync event
recordOffset = 4;
metadataOffset = 104;
recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition,
recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length,
recordValue.length, recordKey, recordValue, headers, Optional.empty()));
mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
verify(producer, times(2)).send(any(), any());
// Should send sync event
recordOffset = 5;
metadataOffset = 150;
recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition,
recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length,
recordValue.length, recordKey, recordValue, headers, Optional.empty()));
producerCallback.getValue().onCompletion(null, null);
mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
verify(producer, times(3)).send(any(), any());
}
private void compareHeaders(List<Header> expectedHeaders, List<org.apache.kafka.connect.header.Header> taskHeaders) {
assertEquals(expectedHeaders.size(), taskHeaders.size());
for (int i = 0; i < expectedHeaders.size(); i++) {

View File

@ -305,8 +305,10 @@ public class MirrorConnectorsIntegrationBaseTest {
Map<TopicPartition, OffsetAndMetadata> backupOffsets = backupClient.remoteConsumerOffsets(consumerGroupName, PRIMARY_CLUSTER_ALIAS,
Duration.ofMillis(CHECKPOINT_DURATION_MS));
assertTrue(backupOffsets.containsKey(
new TopicPartition("primary.test-topic-1", 0)), "Offsets not translated downstream to backup cluster. Found: " + backupOffsets);
for (int i = 0; i < NUM_PARTITIONS; i++) {
assertTrue(backupOffsets.containsKey(new TopicPartition("primary.test-topic-1", i)),
"Offsets not translated downstream to backup cluster. Found: " + backupOffsets);
}
// Failover consumer group to backup cluster.
try (Consumer<byte[], byte[]> primaryConsumer = backup.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) {