KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication (#13429)

Reviewers: Daniel Urban <durban@cloudera.com>, Chris Egerton <chrise@aiven.io>
This commit is contained in:
Greg Harris 2023-04-26 00:30:13 -07:00 committed by GitHub
parent ced1f62c1b
commit baf127a663
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 516 additions and 50 deletions

View File

@ -42,6 +42,7 @@ import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.Collections;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.concurrent.ExecutionException;
import java.time.Duration;
@ -67,20 +68,21 @@ public class MirrorCheckpointTask extends SourceTask {
private MirrorCheckpointMetrics metrics;
private Scheduler scheduler;
private Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset;
private Map<String, List<Checkpoint>> checkpointsPerConsumerGroup;
private Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup;
public MirrorCheckpointTask() {}
// for testing
MirrorCheckpointTask(String sourceClusterAlias, String targetClusterAlias,
ReplicationPolicy replicationPolicy, OffsetSyncStore offsetSyncStore,
Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset,
Map<String, List<Checkpoint>> checkpointsPerConsumerGroup) {
Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup) {
this.sourceClusterAlias = sourceClusterAlias;
this.targetClusterAlias = targetClusterAlias;
this.replicationPolicy = replicationPolicy;
this.offsetSyncStore = offsetSyncStore;
this.idleConsumerGroupsOffset = idleConsumerGroupsOffset;
this.checkpointsPerConsumerGroup = checkpointsPerConsumerGroup;
this.topicFilter = topic -> true;
}
@Override
@ -167,9 +169,11 @@ public class MirrorCheckpointTask extends SourceTask {
private List<SourceRecord> sourceRecordsForGroup(String group) throws InterruptedException {
try {
long timestamp = System.currentTimeMillis();
List<Checkpoint> checkpoints = checkpointsForGroup(group);
checkpointsPerConsumerGroup.put(group, checkpoints);
return checkpoints.stream()
Map<TopicPartition, OffsetAndMetadata> upstreamGroupOffsets = listConsumerGroupOffsets(group);
Map<TopicPartition, Checkpoint> newCheckpoints = checkpointsForGroup(upstreamGroupOffsets, group);
Map<TopicPartition, Checkpoint> oldCheckpoints = checkpointsPerConsumerGroup.computeIfAbsent(group, ignored -> new HashMap<>());
oldCheckpoints.putAll(newCheckpoints);
return newCheckpoints.values().stream()
.map(x -> checkpointRecord(x, timestamp))
.collect(Collectors.toList());
} catch (ExecutionException e) {
@ -178,13 +182,44 @@ public class MirrorCheckpointTask extends SourceTask {
}
}
private List<Checkpoint> checkpointsForGroup(String group) throws ExecutionException, InterruptedException {
return listConsumerGroupOffsets(group).entrySet().stream()
// for testing
Map<TopicPartition, Checkpoint> checkpointsForGroup(Map<TopicPartition, OffsetAndMetadata> upstreamGroupOffsets, String group) {
return upstreamGroupOffsets.entrySet().stream()
.filter(x -> shouldCheckpointTopic(x.getKey().topic())) // Only perform relevant checkpoints filtered by "topic filter"
.map(x -> checkpoint(group, x.getKey(), x.getValue()))
.flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) // do not emit checkpoints for partitions that don't have offset-syncs
.filter(x -> x.downstreamOffset() >= 0) // ignore offsets we cannot translate accurately
.collect(Collectors.toList());
.filter(this::checkpointIsMoreRecent) // do not emit checkpoints for partitions that have a later checkpoint
.collect(Collectors.toMap(Checkpoint::topicPartition, Function.identity()));
}
private boolean checkpointIsMoreRecent(Checkpoint checkpoint) {
Map<TopicPartition, Checkpoint> checkpoints = checkpointsPerConsumerGroup.get(checkpoint.consumerGroupId());
if (checkpoints == null) {
log.trace("Emitting {} (first for this group)", checkpoint);
return true;
}
Checkpoint lastCheckpoint = checkpoints.get(checkpoint.topicPartition());
if (lastCheckpoint == null) {
log.trace("Emitting {} (first for this partition)", checkpoint);
return true;
}
// Emit sync after a rewind of the upstream consumer group takes place (checkpoints can be non-monotonic)
if (checkpoint.upstreamOffset() < lastCheckpoint.upstreamOffset()) {
log.trace("Emitting {} (upstream offset rewind)", checkpoint);
return true;
}
// Or if the downstream offset is newer (force checkpoints to be monotonic)
if (checkpoint.downstreamOffset() > lastCheckpoint.downstreamOffset()) {
log.trace("Emitting {} (downstream offset advanced)", checkpoint);
return true;
}
if (checkpoint.downstreamOffset() != lastCheckpoint.downstreamOffset()) {
log.trace("Skipping {} (preventing downstream rewind)", checkpoint);
} else {
log.trace("Skipping {} (repeated checkpoint)", checkpoint);
}
return false;
}
private Map<TopicPartition, OffsetAndMetadata> listConsumerGroupOffsets(String group)
@ -201,7 +236,7 @@ public class MirrorCheckpointTask extends SourceTask {
if (offsetAndMetadata != null) {
long upstreamOffset = offsetAndMetadata.offset();
OptionalLong downstreamOffset =
offsetSyncStore.translateDownstream(topicPartition, upstreamOffset);
offsetSyncStore.translateDownstream(group, topicPartition, upstreamOffset);
if (downstreamOffset.isPresent()) {
return Optional.of(new Checkpoint(group, renameTopicPartition(topicPartition),
upstreamOffset, downstreamOffset.getAsLong(), offsetAndMetadata.metadata()));
@ -336,10 +371,10 @@ public class MirrorCheckpointTask extends SourceTask {
Map<String, Map<TopicPartition, OffsetAndMetadata>> getConvertedUpstreamOffset() {
Map<String, Map<TopicPartition, OffsetAndMetadata>> result = new HashMap<>();
for (Entry<String, List<Checkpoint>> entry : checkpointsPerConsumerGroup.entrySet()) {
for (Entry<String, Map<TopicPartition, Checkpoint>> entry : checkpointsPerConsumerGroup.entrySet()) {
String consumerId = entry.getKey();
Map<TopicPartition, OffsetAndMetadata> convertedUpstreamOffset = new HashMap<>();
for (Checkpoint checkpoint : entry.getValue()) {
for (Checkpoint checkpoint : entry.getValue().values()) {
convertedUpstreamOffset.put(checkpoint.topicPartition(), checkpoint.offsetAndMetadata());
}
result.put(consumerId, convertedUpstreamOffset);

View File

@ -25,17 +25,46 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.ConcurrentHashMap;
/** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
/**
* Used internally by MirrorMaker. Stores offset syncs and performs offset translation.
* <p>A limited number of offset syncs can be stored per TopicPartition, in a way which provides better translation
* later in the topic, closer to the live end of the topic.
* This maintains the following invariants for each topic-partition in the in-memory sync storage:
* <ul>
* <li>Invariant A: syncs[0] is the latest offset sync from the syncs topic</li>
* <li>Invariant B: For each i,j, i < j, syncs[i] != syncs[j]: syncs[i].upstream <= syncs[j].upstream + 2^j - 2^i</li>
* <li>Invariant C: For each i,j, i < j, syncs[i] != syncs[j]: syncs[i].upstream >= syncs[j].upstream + 2^(i-2)</li>
* <li>Invariant D: syncs[63] is the earliest offset sync from the syncs topic usable for translation</li>
* </ul>
* <p>The above invariants ensure that the store is kept updated upon receipt of each sync, and that distinct
* offset syncs are separated by approximately exponential space. They can be checked locally (by comparing all adjacent
* indexes) but hold globally (for all pairs of any distance). This allows updates to the store in linear time.
* <p>Offset translation uses the syncs[i] which most closely precedes the upstream consumer group's current offset.
* For a fixed in-memory state, translation of variable upstream offsets will be monotonic.
* For variable in-memory state, translation of a fixed upstream offset will not be monotonic.
* <p>Translation will be unavailable for all topic-partitions before an initial read-to-end of the offset syncs topic
* is complete. Translation will be unavailable after that if no syncs are present for a topic-partition, if replication
* started after the position of the consumer group, or if relevant offset syncs for the topic were potentially used as
* for translation in an earlier generation of the sync store.
*/
class OffsetSyncStore implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(OffsetSyncStore.class);
// Store one offset sync for each bit of the topic offset.
// Visible for testing
static final int SYNCS_PER_PARTITION = Long.SIZE;
private final KafkaBasedLog<byte[], byte[]> backingStore;
private final Map<TopicPartition, OffsetSync> offsetSyncs = new ConcurrentHashMap<>();
private final Map<TopicPartition, OffsetSync[]> offsetSyncs = new ConcurrentHashMap<>();
private final TopicAdmin admin;
protected volatile boolean readToEnd = false;
@ -99,16 +128,21 @@ class OffsetSyncStore implements AutoCloseable {
readToEnd = true;
}
OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstreamOffset) {
OptionalLong translateDownstream(String group, TopicPartition sourceTopicPartition, long upstreamOffset) {
if (!readToEnd) {
// If we have not read to the end of the syncs topic at least once, decline to translate any offsets.
// This prevents emitting stale offsets while initially reading the offset syncs topic.
log.debug("translateDownstream({},{},{}): Skipped (initial offset syncs read still in progress)",
group, sourceTopicPartition, upstreamOffset);
return OptionalLong.empty();
}
Optional<OffsetSync> offsetSync = latestOffsetSync(sourceTopicPartition);
Optional<OffsetSync> offsetSync = latestOffsetSync(sourceTopicPartition, upstreamOffset);
if (offsetSync.isPresent()) {
if (offsetSync.get().upstreamOffset() > upstreamOffset) {
// Offset is too far in the past to translate accurately
log.debug("translateDownstream({},{},{}): Skipped ({} is ahead of upstream consumer group {})",
group, sourceTopicPartition, upstreamOffset,
offsetSync.get(), upstreamOffset);
return OptionalLong.of(-1L);
}
// If the consumer group is ahead of the offset sync, we can translate the upstream offset only 1
@ -124,8 +158,15 @@ class OffsetSyncStore implements AutoCloseable {
// vv
// target |-sg----r-----|
long upstreamStep = upstreamOffset == offsetSync.get().upstreamOffset() ? 0 : 1;
log.debug("translateDownstream({},{},{}): Translated {} (relative to {})",
group, sourceTopicPartition, upstreamOffset,
offsetSync.get().downstreamOffset() + upstreamStep,
offsetSync.get()
);
return OptionalLong.of(offsetSync.get().downstreamOffset() + upstreamStep);
} else {
log.debug("translateDownstream({},{},{}): Skipped (offset sync not found)",
group, sourceTopicPartition, upstreamOffset);
return OptionalLong.empty();
}
}
@ -139,10 +180,147 @@ class OffsetSyncStore implements AutoCloseable {
protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
TopicPartition sourceTopicPartition = offsetSync.topicPartition();
offsetSyncs.put(sourceTopicPartition, offsetSync);
offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
syncs == null ? createInitialSyncs(offsetSync) : updateExistingSyncs(syncs, offsetSync)
);
}
private Optional<OffsetSync> latestOffsetSync(TopicPartition topicPartition) {
return Optional.ofNullable(offsetSyncs.get(topicPartition));
private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) {
// Make a copy of the array before mutating it, so that readers do not see inconsistent data
// TODO: consider batching updates so that this copy can be performed less often for high-volume sync topics.
OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION);
updateSyncArray(mutableSyncs, offsetSync);
if (log.isTraceEnabled()) {
log.trace("New sync {} applied, new state is {}", offsetSync, offsetArrayToString(mutableSyncs));
}
return mutableSyncs;
}
private String offsetArrayToString(OffsetSync[] syncs) {
StringBuilder stateString = new StringBuilder();
stateString.append("[");
for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
if (i == 0 || syncs[i] != syncs[i - 1]) {
if (i != 0) {
stateString.append(",");
}
// Print only if the sync is interesting, a series of repeated syncs will be elided
stateString.append(syncs[i].upstreamOffset());
stateString.append(":");
stateString.append(syncs[i].downstreamOffset());
}
}
stateString.append("]");
return stateString.toString();
}
private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
OffsetSync[] syncs = new OffsetSync[SYNCS_PER_PARTITION];
clearSyncArray(syncs, firstSync);
return syncs;
}
private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
// If every element of the store is the same, then it satisfies invariants B and C trivially.
for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
syncs[i] = offsetSync;
}
}
private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
long upstreamOffset = offsetSync.upstreamOffset();
// While reading to the end of the topic, ensure that our earliest sync is later than
// any earlier sync that could have been used for translation, to preserve monotonicity
// If the upstream offset rewinds, all previous offsets are invalid, so overwrite them all.
if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
clearSyncArray(syncs, offsetSync);
return;
}
OffsetSync replacement = offsetSync;
// The most-recently-discarded offset sync
// We track this since it may still be eligible for use in the syncs array at a later index
OffsetSync oldValue = syncs[0];
// Invariant A is always violated once a new sync appears.
// Repair Invariant A: the latest sync must always be updated
syncs[0] = replacement;
for (int current = 1; current < SYNCS_PER_PARTITION; current++) {
int previous = current - 1;
// We can potentially use oldValue instead of replacement, allowing us to keep more distinct values stored
// If oldValue is not recent, it should be expired from the store
boolean isRecent = invariantB(syncs[previous], oldValue, previous, current);
// Ensure that this value is sufficiently separated from the previous value
// We prefer to keep more recent syncs of similar precision (i.e. the value in replacement)
boolean separatedFromPrevious = invariantC(syncs[previous], oldValue, previous);
// Ensure that this value is sufficiently separated from the next value
// We prefer to keep existing syncs of lower precision (i.e. the value in syncs[next])
int next = current + 1;
boolean separatedFromNext = next >= SYNCS_PER_PARTITION || invariantC(oldValue, syncs[next], current);
// If this condition is false, oldValue will be expired from the store and lost forever.
if (isRecent && separatedFromPrevious && separatedFromNext) {
replacement = oldValue;
}
// The replacement variable always contains a value which satisfies the invariants for this index.
// This replacement may or may not be used, since the invariants could already be satisfied,
// and in that case, prefer to keep the existing tail of the syncs array rather than updating it.
assert invariantB(syncs[previous], replacement, previous, current);
assert invariantC(syncs[previous], replacement, previous);
// Test if changes to the previous index affected the invariant for this index
if (invariantB(syncs[previous], syncs[current], previous, current)) {
// Invariant B holds for syncs[current]: it must also hold for all later values
break;
} else {
// Invariant B violated for syncs[current]: sync is now too old and must be updated
// Repair Invariant B: swap in replacement, and save the old value for the next iteration
oldValue = syncs[current];
syncs[current] = replacement;
assert invariantB(syncs[previous], syncs[current], previous, current);
assert invariantC(syncs[previous], syncs[current], previous);
}
}
}
private boolean invariantB(OffsetSync iSync, OffsetSync jSync, int i, int j) {
long bound = jSync.upstreamOffset() + (1L << j) - (1L << i);
return iSync == jSync || bound < 0 || iSync.upstreamOffset() <= bound;
}
private boolean invariantC(OffsetSync iSync, OffsetSync jSync, int i) {
long bound = jSync.upstreamOffset() + (1L << Math.max(i - 2, 0));
return iSync == jSync || (bound >= 0 && iSync.upstreamOffset() >= bound);
}
private Optional<OffsetSync> latestOffsetSync(TopicPartition topicPartition, long upstreamOffset) {
return Optional.ofNullable(offsetSyncs.get(topicPartition))
.map(syncs -> lookupLatestSync(syncs, upstreamOffset));
}
private OffsetSync lookupLatestSync(OffsetSync[] syncs, long upstreamOffset) {
// linear search the syncs, effectively a binary search over the topic offsets
// Search from latest to earliest to find the sync that gives the best accuracy
for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
OffsetSync offsetSync = syncs[i];
if (offsetSync.upstreamOffset() <= upstreamOffset) {
return offsetSync;
}
}
return syncs[SYNCS_PER_PARTITION - 1];
}
// For testing
OffsetSync syncFor(TopicPartition topicPartition, int syncIdx) {
OffsetSync[] syncs = offsetSyncs.get(topicPartition);
if (syncs == null)
throw new IllegalArgumentException("No syncs present for " + topicPartition);
if (syncIdx >= syncs.length)
throw new IllegalArgumentException(
"Requested sync " + (syncIdx + 1) + " for " + topicPartition
+ " but there are only " + syncs.length + " syncs available for that topic partition"
);
return syncs[syncIdx];
}
}

View File

@ -16,12 +16,11 @@
*/
package org.apache.kafka.connect.mirror;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Collections;
import java.util.Optional;
import java.util.OptionalLong;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@ -31,6 +30,7 @@ import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class MirrorCheckpointTaskTest {
@ -118,7 +118,7 @@ public class MirrorCheckpointTaskTest {
@Test
public void testSyncOffset() {
Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset = new HashMap<>();
Map<String, List<Checkpoint>> checkpointsPerConsumerGroup = new HashMap<>();
Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup = new HashMap<>();
String consumer1 = "consumer1";
String consumer2 = "consumer2";
@ -147,16 +147,16 @@ public class MirrorCheckpointTaskTest {
// 'cpC2T2p0' denotes 'checkpoint' of topic2, partition 0 for consumer2
Checkpoint cpC2T2P0 = new Checkpoint(consumer2, new TopicPartition(topic2, 0), 100, 51, "metadata");
// 'checkpointListC1' denotes 'checkpoint' list for consumer1
List<Checkpoint> checkpointListC1 = new ArrayList<>();
checkpointListC1.add(cpC1T1P0);
// 'checkpointMapC1' denotes 'checkpoint' map for consumer1
Map<TopicPartition, Checkpoint> checkpointMapC1 = new HashMap<>();
checkpointMapC1.put(cpC1T1P0.topicPartition(), cpC1T1P0);
// 'checkpointListC2' denotes 'checkpoint' list for consumer2
List<Checkpoint> checkpointListC2 = new ArrayList<>();
checkpointListC2.add(cpC2T2P0);
// 'checkpointMapC2' denotes 'checkpoint' map for consumer2
Map<TopicPartition, Checkpoint> checkpointMapC2 = new HashMap<>();
checkpointMapC2.put(cpC2T2P0.topicPartition(), cpC2T2P0);
checkpointsPerConsumerGroup.put(consumer1, checkpointListC1);
checkpointsPerConsumerGroup.put(consumer2, checkpointListC2);
checkpointsPerConsumerGroup.put(consumer1, checkpointMapC1);
checkpointsPerConsumerGroup.put(consumer2, checkpointMapC2);
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), null, idleConsumerGroupsOffset, checkpointsPerConsumerGroup);
@ -195,4 +195,52 @@ public class MirrorCheckpointTaskTest {
Optional<Checkpoint> checkpoint = mirrorCheckpointTask.checkpoint("g1", new TopicPartition("topic1", 0), null);
assertFalse(checkpoint.isPresent());
}
@Test
public void testCheckpointRecordsMonotonicIfStoreRewinds() {
OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
offsetSyncStore.start();
Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup = new HashMap<>();
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), checkpointsPerConsumerGroup);
TopicPartition tp = new TopicPartition("topic1", 0);
TopicPartition targetTP = new TopicPartition("source1.topic1", 0);
long upstream = 11L;
long downstream = 4L;
// Emit syncs 0 and 1, and use the sync 1 to translate offsets and commit checkpoints
offsetSyncStore.sync(tp, upstream++, downstream++);
offsetSyncStore.sync(tp, upstream++, downstream++);
long consumerGroupOffset = upstream;
long expectedDownstreamOffset = downstream;
assertEquals(OptionalLong.of(expectedDownstreamOffset), offsetSyncStore.translateDownstream("g1", tp, consumerGroupOffset));
Map<TopicPartition, Checkpoint> checkpoints = assertCheckpointForTopic(mirrorCheckpointTask, tp, targetTP, consumerGroupOffset, true);
// the task normally does this, but simulate it here
checkpointsPerConsumerGroup.put("g1", checkpoints);
// Emit syncs 2-6 which will cause the store to drop sync 1, forcing translation to fall back to 0.
offsetSyncStore.sync(tp, upstream++, downstream++);
offsetSyncStore.sync(tp, upstream++, downstream++);
offsetSyncStore.sync(tp, upstream++, downstream++);
offsetSyncStore.sync(tp, upstream++, downstream++);
offsetSyncStore.sync(tp, upstream++, downstream++);
// The OffsetSyncStore will change its translation of the same offset
assertNotEquals(OptionalLong.of(expectedDownstreamOffset), offsetSyncStore.translateDownstream("g1", tp, consumerGroupOffset));
// But the task will filter this out and not emit a checkpoint
assertCheckpointForTopic(mirrorCheckpointTask, tp, targetTP, consumerGroupOffset, false);
// If then the upstream offset rewinds in the topic and is still translatable, a checkpoint will be emitted
// also rewinding the downstream offsets to match. This will not affect auto-synced groups, only checkpoints.
assertCheckpointForTopic(mirrorCheckpointTask, tp, targetTP, consumerGroupOffset - 1, true);
}
private Map<TopicPartition, Checkpoint> assertCheckpointForTopic(
MirrorCheckpointTask task, TopicPartition tp, TopicPartition remoteTp, long consumerGroupOffset, boolean truth
) {
Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets = Collections.singletonMap(tp, new OffsetAndMetadata(consumerGroupOffset));
Map<TopicPartition, Checkpoint> checkpoints = task.checkpointsForGroup(consumerGroupOffsets, "g1");
assertEquals(truth, checkpoints.containsKey(remoteTp), "should" + (truth ? "" : " not") + " emit offset sync");
return checkpoints;
}
}

View File

@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test;
import java.util.OptionalLong;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class OffsetSyncStoreTest {
@ -57,22 +58,22 @@ public class OffsetSyncStoreTest {
// Emit synced downstream offset without dead-reckoning
store.sync(tp, 100, 200);
assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 150));
assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 150));
// Translate exact offsets
store.sync(tp, 150, 251);
assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 150));
assertEquals(OptionalLong.of(251), store.translateDownstream(null, tp, 150));
// Use old offset (5) prior to any sync -> can't translate
assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 5));
assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 5));
// Downstream offsets reset
store.sync(tp, 200, 10);
assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 200));
assertEquals(OptionalLong.of(10), store.translateDownstream(null, tp, 200));
// Upstream offsets reset
store.sync(tp, 20, 20);
assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 20));
assertEquals(OptionalLong.of(20), store.translateDownstream(null, tp, 20));
}
}
@ -80,21 +81,21 @@ public class OffsetSyncStoreTest {
public void testNoTranslationIfStoreNotStarted() {
try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
// no offsets exist and store is not started
assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
// read a sync during startup
store.sync(tp, 100, 200);
assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
// After the store is started all offsets are visible
store.start();
assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 0));
assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 100));
assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 200));
assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0));
assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100));
assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200));
}
}
@ -102,7 +103,132 @@ public class OffsetSyncStoreTest {
public void testNoTranslationIfNoOffsetSync() {
try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
store.start();
assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
}
}
@Test
public void testPastOffsetTranslation() {
try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
long maxOffsetLag = 10;
int offset = 0;
for (; offset <= 1000; offset += maxOffsetLag) {
store.sync(tp, offset, offset);
assertSparseSyncInvariant(store, tp);
}
store.start();
// After starting but before seeing new offsets, only the latest startup offset can be translated
assertSparseSync(store, 1000, -1);
for (; offset <= 10000; offset += maxOffsetLag) {
store.sync(tp, offset, offset);
assertSparseSyncInvariant(store, tp);
}
// After seeing new offsets, we still cannot translate earlier than the latest startup offset
// Invariant D: the last sync from the initial read-to-end is still stored
assertSparseSync(store, 1000, -1);
// We can translate offsets between the latest startup offset and the latest offset with variable precision
// Older offsets are less precise and translation ends up farther apart
assertSparseSync(store, 4840, 1000);
assertSparseSync(store, 6760, 4840);
assertSparseSync(store, 8680, 6760);
assertSparseSync(store, 9160, 8680);
assertSparseSync(store, 9640, 9160);
assertSparseSync(store, 9880, 9640);
assertSparseSync(store, 9940, 9880);
assertSparseSync(store, 9970, 9940);
assertSparseSync(store, 9990, 9970);
assertSparseSync(store, 10000, 9990);
// Rewinding upstream offsets should clear all historical syncs
store.sync(tp, 1500, 11000);
assertSparseSyncInvariant(store, tp);
assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 1499));
assertEquals(OptionalLong.of(11000), store.translateDownstream(null, tp, 1500));
assertEquals(OptionalLong.of(11001), store.translateDownstream(null, tp, 2000));
}
}
@Test
public void testKeepMostDistinctSyncs() {
// Under normal operation, the incoming syncs will be regularly spaced and the store should keep a set of syncs
// which provide the best translation accuracy (expires as few syncs as possible)
// Each new sync should be added to the cache and expire at most one other sync from the cache
long iterations = 10000;
long maxStep = Long.MAX_VALUE / iterations;
// Test a variety of steps (corresponding to the offset.lag.max configuration)
for (long step = 1; step < maxStep; step = (step * 2) + 1) {
for (long firstOffset = 0; firstOffset < 30; firstOffset++) {
try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
int lastCount = 1;
store.start();
for (long offset = firstOffset; offset <= iterations; offset += step) {
store.sync(tp, offset, offset);
// Invariant A: the latest sync is present
assertEquals(offset, store.syncFor(tp, 0).upstreamOffset());
// Invariant D: the earliest sync is present
assertEquals(firstOffset, store.syncFor(tp, 63).upstreamOffset());
int count = countDistinctStoredSyncs(store, tp);
int diff = count - lastCount;
assertTrue(diff >= 0,
"Store expired too many syncs: " + diff + " after receiving offset " + offset);
lastCount = count;
}
}
}
}
}
private void assertSparseSync(FakeOffsetSyncStore store, long syncOffset, long previousOffset) {
assertEquals(OptionalLong.of(previousOffset == -1 ? previousOffset : previousOffset + 1), store.translateDownstream(null, tp, syncOffset - 1));
assertEquals(OptionalLong.of(syncOffset), store.translateDownstream(null, tp, syncOffset));
assertEquals(OptionalLong.of(syncOffset + 1), store.translateDownstream(null, tp, syncOffset + 1));
assertEquals(OptionalLong.of(syncOffset + 1), store.translateDownstream(null, tp, syncOffset + 2));
}
private int countDistinctStoredSyncs(FakeOffsetSyncStore store, TopicPartition topicPartition) {
int count = 1;
for (int i = 1; i < OffsetSyncStore.SYNCS_PER_PARTITION; i++) {
if (store.syncFor(topicPartition, i - 1) != store.syncFor(topicPartition, i)) {
count++;
}
}
return count;
}
private void assertSparseSyncInvariant(FakeOffsetSyncStore store, TopicPartition topicPartition) {
for (int j = 0; j < OffsetSyncStore.SYNCS_PER_PARTITION; j++) {
for (int i = 0; i < j; i++) {
long jUpstream = store.syncFor(topicPartition, j).upstreamOffset();
long iUpstream = store.syncFor(topicPartition, i).upstreamOffset();
if (jUpstream == iUpstream) {
continue;
}
int exponent = Math.max(i - 2, 0);
long iUpstreamLowerBound = jUpstream + (1L << exponent);
if (iUpstreamLowerBound < 0) {
continue;
}
assertTrue(
iUpstream >= iUpstreamLowerBound,
"Invariant C(" + i + "," + j + "): Upstream offset " + iUpstream + " at position " + i
+ " should be at least " + iUpstreamLowerBound
+ " (" + jUpstream + " + 2^" + exponent + ")"
);
long iUpstreamUpperBound = jUpstream + (1L << j) - (1L << i);
if (iUpstreamUpperBound < 0)
continue;
assertTrue(
iUpstream <= iUpstreamUpperBound,
"Invariant B(" + i + "," + j + "): Upstream offset " + iUpstream + " at position " + i
+ " should be no greater than " + iUpstreamUpperBound
+ " (" + jUpstream + " + 2^" + j + " - 2^" + i + ")"
);
}
}
}
}

View File

@ -359,6 +359,7 @@ public class MirrorConnectorsIntegrationBaseTest {
assertTrue(primaryConsumer.position(
new TopicPartition(reverseTopic1, 0)) <= NUM_RECORDS_PRODUCED, "Consumer failedback beyond expected downstream offset.");
}
}
// create more matching topics
@ -511,7 +512,6 @@ public class MirrorConnectorsIntegrationBaseTest {
waitForConsumerGroupFullSync(backup, Arrays.asList(backupTopic1, remoteTopic2),
consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer, offsetLagMax);
}
@ -657,6 +657,72 @@ public class MirrorConnectorsIntegrationBaseTest {
assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
}
@Test
public void testOffsetTranslationBehindReplicationFlow() throws InterruptedException {
String consumerGroupName = "consumer-group-lagging-behind";
Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
String remoteTopic = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
warmUpConsumer(consumerProps);
mm2Props.put("sync.group.offsets.enabled", "true");
mm2Props.put("sync.group.offsets.interval.seconds", "1");
mm2Props.put("offset.lag.max", Integer.toString(OFFSET_LAG_MAX));
mm2Config = new MirrorMakerConfig(mm2Props);
waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
// Produce a large number of records to the topic, all replicated within one MM2 lifetime.
int iterations = 100;
for (int i = 0; i < iterations; i++) {
produceMessages(primary, "test-topic-1");
}
waitForTopicCreated(backup, remoteTopic);
assertEquals(iterations * NUM_RECORDS_PRODUCED, backup.kafka().consume(iterations * NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, remoteTopic).count(),
"Records were not replicated to backup cluster.");
// Once the replication has finished, we spin up the upstream consumer and start slowly consuming records
ConsumerRecords<byte[], byte[]> allRecords = primary.kafka().consume(iterations * NUM_RECORDS_PRODUCED, RECORD_CONSUME_DURATION_MS, "test-topic-1");
MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
Map<TopicPartition, OffsetAndMetadata> initialCheckpoints = waitForCheckpointOnAllPartitions(
backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, remoteTopic);
Map<TopicPartition, OffsetAndMetadata> partialCheckpoints;
log.info("Initial checkpoints: {}", initialCheckpoints);
try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
primaryConsumer.commitSync(partialOffsets(allRecords, 0.9f));
partialCheckpoints = waitForNewCheckpointOnAllPartitions(
backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, remoteTopic, initialCheckpoints);
log.info("Partial checkpoints: {}", partialCheckpoints);
}
for (TopicPartition tp : initialCheckpoints.keySet()) {
assertTrue(initialCheckpoints.get(tp).offset() < partialCheckpoints.get(tp).offset(),
"Checkpoints should advance when the upstream consumer group advances");
}
assertMonotonicCheckpoints(backup, PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal");
Map<TopicPartition, OffsetAndMetadata> finalCheckpoints;
try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
primaryConsumer.commitSync(partialOffsets(allRecords, 0.1f));
finalCheckpoints = waitForNewCheckpointOnAllPartitions(
backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, remoteTopic, partialCheckpoints);
log.info("Final checkpoints: {}", finalCheckpoints);
}
for (TopicPartition tp : partialCheckpoints.keySet()) {
assertTrue(finalCheckpoints.get(tp).offset() < partialCheckpoints.get(tp).offset(),
"Checkpoints should rewind when the upstream consumer group rewinds");
}
}
private Map<TopicPartition, OffsetAndMetadata> partialOffsets(ConsumerRecords<byte[], byte[]> allRecords, double fraction) {
return allRecords.partitions()
.stream()
.collect(Collectors.toMap(Function.identity(), partition -> {
List<ConsumerRecord<byte[], byte[]>> records = allRecords.records(partition);
int index = (int) (records.size() * fraction);
return new OffsetAndMetadata(records.get(index).offset());
}));
}
@Test
public void testSyncTopicConfigs() throws InterruptedException {
mm2Config = new MirrorMakerConfig(mm2Props);
@ -786,7 +852,6 @@ public class MirrorConnectorsIntegrationBaseTest {
}, 30000, "Topic configurations were not synced");
}
private TopicPartition remoteTopicPartition(TopicPartition tp, String alias) {
return new TopicPartition(remoteTopicName(tp.topic(), alias), tp.partition());
}
@ -929,6 +994,13 @@ public class MirrorConnectorsIntegrationBaseTest {
private static Map<TopicPartition, OffsetAndMetadata> waitForCheckpointOnAllPartitions(
MirrorClient client, String consumerGroupName, String remoteClusterAlias, String topicName
) throws InterruptedException {
return waitForNewCheckpointOnAllPartitions(client, consumerGroupName, remoteClusterAlias, topicName, Collections.emptyMap());
}
protected static Map<TopicPartition, OffsetAndMetadata> waitForNewCheckpointOnAllPartitions(
MirrorClient client, String consumerGroupName, String remoteClusterAlias, String topicName,
Map<TopicPartition, OffsetAndMetadata> lastCheckpoint
) throws InterruptedException {
AtomicReference<Map<TopicPartition, OffsetAndMetadata>> ret = new AtomicReference<>();
waitForCondition(
@ -936,9 +1008,13 @@ public class MirrorConnectorsIntegrationBaseTest {
Map<TopicPartition, OffsetAndMetadata> offsets = client.remoteConsumerOffsets(
consumerGroupName, remoteClusterAlias, Duration.ofMillis(3000));
for (int i = 0; i < NUM_PARTITIONS; i++) {
if (!offsets.containsKey(new TopicPartition(topicName, i))) {
TopicPartition tp = new TopicPartition(topicName, i);
if (!offsets.containsKey(tp)) {
log.info("Checkpoint is missing for {}: {}-{}", consumerGroupName, topicName, i);
return false;
} else if (lastCheckpoint.containsKey(tp) && lastCheckpoint.get(tp).equals(offsets.get(tp))) {
log.info("Checkpoint is the same as previous checkpoint");
return false;
}
}
ret.set(offsets);
@ -998,9 +1074,12 @@ public class MirrorConnectorsIntegrationBaseTest {
for (TopicPartition tp : tps) {
assertTrue(consumerGroupOffsets.containsKey(tp),
"TopicPartition " + tp + " does not have translated offsets");
assertTrue(consumerGroupOffsets.get(tp).offset() > lastOffset.get(tp) - offsetLagMax,
"TopicPartition " + tp + " does not have fully-translated offsets");
assertTrue(consumerGroupOffsets.get(tp).offset() <= endOffsets.get(tp).offset(),
long offset = consumerGroupOffsets.get(tp).offset();
assertTrue(offset > lastOffset.get(tp) - offsetLagMax,
"TopicPartition " + tp + " does not have fully-translated offsets: "
+ offset + " is not close enough to " + lastOffset.get(tp)
+ " (strictly more than " + (lastOffset.get(tp) - offsetLagMax) + ")");
assertTrue(offset <= endOffsets.get(tp).offset(),
"TopicPartition " + tp + " has downstream offsets beyond the log end, this would lead to negative lag metrics");
}
return true;