KAFKA-14797: Emit offset sync when offset translation lag would exceed max.offset.lag (#13367)

Reviewers: Chris Egerton <chrise@aiven.io>
This commit is contained in:
Greg Harris 2023-03-21 06:31:08 -07:00 committed by GitHub
parent 52a4917988
commit 897ced12ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 52 additions and 41 deletions

View File

@ -305,7 +305,6 @@ public class MirrorSourceTask extends SourceTask {
static class PartitionState {
long previousUpstreamOffset = -1L;
long previousDownstreamOffset = -1L;
long lastSyncUpstreamOffset = -1L;
long lastSyncDownstreamOffset = -1L;
long maxOffsetLag;
boolean shouldSyncOffsets;
@ -316,13 +315,14 @@ public class MirrorSourceTask extends SourceTask {
// true if we should emit an offset sync
boolean update(long upstreamOffset, long downstreamOffset) {
long upstreamStep = upstreamOffset - lastSyncUpstreamOffset;
long downstreamTargetOffset = lastSyncDownstreamOffset + upstreamStep;
if (lastSyncDownstreamOffset == -1L
|| downstreamOffset - downstreamTargetOffset >= maxOffsetLag
|| upstreamOffset - previousUpstreamOffset != 1L
|| downstreamOffset < previousDownstreamOffset) {
lastSyncUpstreamOffset = upstreamOffset;
// Emit an offset sync if any of the following conditions are true
boolean noPreviousSyncThisLifetime = lastSyncDownstreamOffset == -1L;
// the OffsetSync::translateDownstream method will translate this offset 1 past the last sync, so add 1.
// TODO: share common implementation to enforce this relationship
boolean translatedOffsetTooStale = downstreamOffset - (lastSyncDownstreamOffset + 1) >= maxOffsetLag;
boolean skippedUpstreamRecord = upstreamOffset - previousUpstreamOffset != 1L;
boolean truncatedDownstreamTopic = downstreamOffset < previousDownstreamOffset;
if (noPreviousSyncThisLifetime || translatedOffsetTooStale || skippedUpstreamRecord || truncatedDownstreamTopic) {
lastSyncDownstreamOffset = downstreamOffset;
shouldSyncOffsets = true;
}

View File

@ -99,11 +99,13 @@ public class MirrorSourceTaskTest {
partitionState.reset();
assertFalse(partitionState.update(3, 152), "no sync");
partitionState.reset();
assertFalse(partitionState.update(4, 153), "no sync");
assertTrue(partitionState.update(4, 153), "one past target offset");
partitionState.reset();
assertFalse(partitionState.update(5, 154), "no sync");
partitionState.reset();
assertTrue(partitionState.update(6, 205), "one past target offset");
assertFalse(partitionState.update(6, 203), "no sync");
partitionState.reset();
assertTrue(partitionState.update(7, 204), "one past target offset");
partitionState.reset();
assertTrue(partitionState.update(2, 206), "upstream reset");
partitionState.reset();

View File

@ -18,7 +18,6 @@ package org.apache.kafka.connect.mirror.integration;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.TopicPartition;
@ -188,8 +187,8 @@ public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrat
}
}
@Test
public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedException {
@Override
public void testOneWayReplicationWithOffsetSyncs(int offsetLagMax) throws InterruptedException {
produceMessages(primary, "test-topic-1");
String consumerGroupName = "consumer-group-testOneWayReplicationWithAutoOffsetSync";
Map<String, Object> consumerProps = new HashMap<String, Object>() {{
@ -206,6 +205,7 @@ public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrat
// enable automated consumer group offset sync
mm2Props.put("sync.group.offsets.enabled", "true");
mm2Props.put("sync.group.offsets.interval.seconds", "1");
mm2Props.put("offset.lag.max", Integer.toString(offsetLagMax));
// one way replication from primary to backup
mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false");
@ -221,14 +221,9 @@ public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrat
consumerProps, "test-topic-1")) {
waitForConsumerGroupFullSync(backup, Collections.singletonList("test-topic-1"),
consumerGroupName, NUM_RECORDS_PRODUCED);
consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
// the size of consumer record should be zero, because the offsets of the same consumer group
// have been automatically synchronized from primary to backup by the background job, so no
// more records to consume from the replicated topic by the same consumer group at backup cluster
assertEquals(0, records.count(), "consumer record size is not zero");
assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer, offsetLagMax);
}
// now create a new topic in primary cluster
@ -251,11 +246,9 @@ public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrat
"group.id", consumerGroupName), "test-topic-1", "test-topic-2")) {
waitForConsumerGroupFullSync(backup, Arrays.asList("test-topic-1", "test-topic-2"),
consumerGroupName, NUM_RECORDS_PRODUCED);
consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
// similar reasoning as above, no more records to consume by the same consumer group at backup cluster
assertEquals(0, records.count(), "consumer record size is not zero");
assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer, offsetLagMax);
}
assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");

View File

@ -90,6 +90,7 @@ public class MirrorConnectorsIntegrationBaseTest {
protected static final int NUM_RECORDS_PER_PARTITION = 10;
protected static final int NUM_PARTITIONS = 10;
protected static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * NUM_RECORDS_PER_PARTITION;
protected static final int OFFSET_LAG_MAX = 10;
protected static final int RECORD_TRANSFER_DURATION_MS = 30_000;
protected static final int CHECKPOINT_DURATION_MS = 20_000;
private static final int RECORD_CONSUME_DURATION_MS = 20_000;
@ -416,6 +417,15 @@ public class MirrorConnectorsIntegrationBaseTest {
@Test
public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedException {
testOneWayReplicationWithOffsetSyncs(OFFSET_LAG_MAX);
}
@Test
public void testOneWayReplicationWithFrequentOffsetSyncs() throws InterruptedException {
testOneWayReplicationWithOffsetSyncs(0);
}
public void testOneWayReplicationWithOffsetSyncs(int offsetLagMax) throws InterruptedException {
produceMessages(primary, "test-topic-1");
String consumerGroupName = "consumer-group-testOneWayReplicationWithAutoOffsetSync";
Map<String, Object> consumerProps = new HashMap<String, Object>() {{
@ -432,6 +442,7 @@ public class MirrorConnectorsIntegrationBaseTest {
// enable automated consumer group offset sync
mm2Props.put("sync.group.offsets.enabled", "true");
mm2Props.put("sync.group.offsets.interval.seconds", "1");
mm2Props.put("offset.lag.max", Integer.toString(offsetLagMax));
// one way replication from primary to backup
mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false");
@ -448,14 +459,9 @@ public class MirrorConnectorsIntegrationBaseTest {
consumerProps, "primary.test-topic-1")) {
waitForConsumerGroupFullSync(backup, Collections.singletonList("primary.test-topic-1"),
consumerGroupName, NUM_RECORDS_PRODUCED);
consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
// the size of consumer record should be zero, because the offsets of the same consumer group
// have been automatically synchronized from primary to backup by the background job, so no
// more records to consume from the replicated topic by the same consumer group at backup cluster
assertEquals(0, records.count(), "consumer record size is not zero");
assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer, offsetLagMax);
}
// now create a new topic in primary cluster
@ -478,11 +484,9 @@ public class MirrorConnectorsIntegrationBaseTest {
"group.id", consumerGroupName), "primary.test-topic-1", "primary.test-topic-2")) {
waitForConsumerGroupFullSync(backup, Arrays.asList("primary.test-topic-1", "primary.test-topic-2"),
consumerGroupName, NUM_RECORDS_PRODUCED);
consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
// similar reasoning as above, no more records to consume by the same consumer group at backup cluster
assertEquals(0, records.count(), "consumer record size is not zero");
assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer, offsetLagMax);
}
assertMonotonicCheckpoints(backup, PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal");
@ -608,13 +612,14 @@ public class MirrorConnectorsIntegrationBaseTest {
warmUpConsumer(consumerProps);
mm2Props.put("sync.group.offsets.enabled", "true");
mm2Props.put("sync.group.offsets.interval.seconds", "1");
mm2Props.put("offset.lag.max", Integer.toString(OFFSET_LAG_MAX));
mm2Config = new MirrorMakerConfig(mm2Props);
waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
produceMessages(primary, "test-topic-1");
try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
waitForConsumingAllRecords(primaryConsumer, NUM_RECORDS_PRODUCED);
}
waitForConsumerGroupFullSync(backup, Collections.singletonList(remoteTopic), consumerGroupName, NUM_RECORDS_PRODUCED);
waitForConsumerGroupFullSync(backup, Collections.singletonList(remoteTopic), consumerGroupName, NUM_RECORDS_PRODUCED, OFFSET_LAG_MAX);
restartMirrorMakerConnectors(backup, CONNECTOR_LIST);
assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
Thread.sleep(5000);
@ -622,7 +627,7 @@ public class MirrorConnectorsIntegrationBaseTest {
try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
waitForConsumingAllRecords(primaryConsumer, NUM_RECORDS_PRODUCED);
}
waitForConsumerGroupFullSync(backup, Collections.singletonList(remoteTopic), consumerGroupName, 2 * NUM_RECORDS_PRODUCED);
waitForConsumerGroupFullSync(backup, Collections.singletonList(remoteTopic), consumerGroupName, 2 * NUM_RECORDS_PRODUCED, OFFSET_LAG_MAX);
assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
}
@ -800,7 +805,11 @@ public class MirrorConnectorsIntegrationBaseTest {
* offsets are eventually synced to the expected offset numbers
*/
protected static <T> void waitForConsumerGroupFullSync(
EmbeddedConnectCluster connect, List<String> topics, String consumerGroupId, int numRecords
EmbeddedConnectCluster connect,
List<String> topics,
String consumerGroupId,
int numRecords,
int offsetLagMax
) throws InterruptedException {
int expectedRecords = numRecords * topics.size();
Map<String, Object> consumerProps = new HashMap<>();
@ -834,7 +843,7 @@ public class MirrorConnectorsIntegrationBaseTest {
for (TopicPartition tp : tps) {
assertTrue(consumerGroupOffsets.containsKey(tp),
"TopicPartition " + tp + " does not have translated offsets");
assertTrue(consumerGroupOffsets.get(tp).offset() > lastOffset.get(tp),
assertTrue(consumerGroupOffsets.get(tp).offset() > lastOffset.get(tp) - offsetLagMax,
"TopicPartition " + tp + " does not have fully-translated offsets");
assertTrue(consumerGroupOffsets.get(tp).offset() <= endOffsets.get(tp).offset(),
"TopicPartition " + tp + " has downstream offsets beyond the log end, this would lead to negative lag metrics");
@ -870,6 +879,15 @@ public class MirrorConnectorsIntegrationBaseTest {
}
}
protected static void assertDownstreamRedeliveriesBoundedByMaxLag(Consumer<byte[], byte[]> targetConsumer, int offsetLagMax) {
ConsumerRecords<byte[], byte[]> records = targetConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
// After a full sync, there should be at most offset.lag.max records per partition consumed by both upstream and downstream consumers.
for (TopicPartition tp : records.partitions()) {
int count = records.records(tp).size();
assertTrue(count < offsetLagMax, "downstream consumer is re-reading more than " + offsetLagMax + " records from" + tp);
}
}
/*
* make sure the consumer to consume expected number of records
*/
@ -903,8 +921,6 @@ public class MirrorConnectorsIntegrationBaseTest {
mm2Props.put("offset.storage.replication.factor", "1");
mm2Props.put("status.storage.replication.factor", "1");
mm2Props.put("replication.factor", "1");
// Sync offsets as soon as possible to ensure the final record in a finite test has its offset translated.
mm2Props.put("offset.lag.max", "0");
mm2Props.put(PRIMARY_CLUSTER_ALIAS + ".offset.flush.interval.ms", "5000");
mm2Props.put(BACKUP_CLUSTER_ALIAS + ".offset.flush.interval.ms", "5000");
return mm2Props;