From b7a6a8fd5f03144c2ff964f5531470e456dbb4ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Armando=20Garc=C3=ADa=20Sancio?= Date: Mon, 12 Jun 2023 07:25:42 -0700 Subject: [PATCH] KAFKA-15076; KRaft should prefer latest snapshot (#13834) If the KRaft listener is at offset 0, the start of the log, and KRaft has generated a snapshot, it should prefer the latest snapshot instead of having the listener read from the start of the log. This is implemented by having KafkaRaftClient send a Listener.handleLoadSnapshot event, if the Listener is at offset 0 and the KRaft partition has generated a snapshot. Reviewers: Jason Gustafson , David Arthur --- .../apache/kafka/raft/KafkaRaftClient.java | 7 ++- .../raft/KafkaRaftClientSnapshotTest.java | 53 ++++++++++++------- 2 files changed, 41 insertions(+), 19 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 94002f36d2a..fbb1117da25 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -308,7 +308,12 @@ public class KafkaRaftClient implements RaftClient { private void updateListenersProgress(long highWatermark) { for (ListenerContext listenerContext : listenerContexts.values()) { listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset -> { - if (nextExpectedOffset < log.startOffset() && nextExpectedOffset < highWatermark) { + // Send snapshot to the listener, if the listener is at the beginning of the log and there is a snapshot, + // or the listener is trying to read an offset for which there isn't a segment in the log. + if (nextExpectedOffset < highWatermark && + ((nextExpectedOffset == 0 && latestSnapshot().isPresent()) || + nextExpectedOffset < log.startOffset()) + ) { SnapshotReader snapshot = latestSnapshot().orElseThrow(() -> new IllegalStateException( String.format( "Snapshot expected since next offset of %s is %d, log start offset is %d and high-watermark is %d", diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java index 4e9a377d5fd..2de6853d2e1 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java @@ -37,6 +37,8 @@ import org.apache.kafka.snapshot.SnapshotReader; import org.apache.kafka.snapshot.RecordsSnapshotWriter; import org.apache.kafka.snapshot.SnapshotWriterReaderTest; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.nio.ByteBuffer; @@ -87,19 +89,24 @@ final public class KafkaRaftClientSnapshotTest { assertEquals(Optional.empty(), context.client.latestSnapshotId()); } - @Test - public void testLeaderListenerNotified() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testLeaderListenerNotified(boolean entireLog) throws Exception { int localId = 0; int otherNodeId = localId + 1; Set voters = Utils.mkSet(localId, otherNodeId); OffsetAndEpoch snapshotId = new OffsetAndEpoch(3, 1); - RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + RaftClientTestContext.Builder contextBuilder = new RaftClientTestContext.Builder(localId, voters) .appendToLog(snapshotId.epoch(), Arrays.asList("a", "b", "c")) .appendToLog(snapshotId.epoch(), Arrays.asList("d", "e", "f")) - .withEmptySnapshot(snapshotId) - .deleteBeforeSnapshot(snapshotId) - .build(); + .withEmptySnapshot(snapshotId); + + if (!entireLog) { + contextBuilder.deleteBeforeSnapshot(snapshotId); + } + + RaftClientTestContext context = contextBuilder.build(); context.becomeLeader(); int epoch = context.currentEpoch(); @@ -118,21 +125,26 @@ final public class KafkaRaftClientSnapshotTest { } } - @Test - public void testFollowerListenerNotified() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testFollowerListenerNotified(boolean entireLog) throws Exception { int localId = 0; int leaderId = localId + 1; Set voters = Utils.mkSet(localId, leaderId); int epoch = 2; OffsetAndEpoch snapshotId = new OffsetAndEpoch(3, 1); - RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + RaftClientTestContext.Builder contextBuilder = new RaftClientTestContext.Builder(localId, voters) .appendToLog(snapshotId.epoch(), Arrays.asList("a", "b", "c")) .appendToLog(snapshotId.epoch(), Arrays.asList("d", "e", "f")) .withEmptySnapshot(snapshotId) - .deleteBeforeSnapshot(snapshotId) - .withElectedLeader(epoch, leaderId) - .build(); + .withElectedLeader(epoch, leaderId); + + if (!entireLog) { + contextBuilder.deleteBeforeSnapshot(snapshotId); + } + + RaftClientTestContext context = contextBuilder.build(); // Advance the highWatermark long localLogEndOffset = context.log.endOffset().offset; @@ -155,21 +167,26 @@ final public class KafkaRaftClientSnapshotTest { } } - @Test - public void testSecondListenerNotified() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testSecondListenerNotified(boolean entireLog) throws Exception { int localId = 0; int leaderId = localId + 1; Set voters = Utils.mkSet(localId, leaderId); int epoch = 2; OffsetAndEpoch snapshotId = new OffsetAndEpoch(3, 1); - RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + RaftClientTestContext.Builder contextBuilder = new RaftClientTestContext.Builder(localId, voters) .appendToLog(snapshotId.epoch(), Arrays.asList("a", "b", "c")) .appendToLog(snapshotId.epoch(), Arrays.asList("d", "e", "f")) .withEmptySnapshot(snapshotId) - .deleteBeforeSnapshot(snapshotId) - .withElectedLeader(epoch, leaderId) - .build(); + .withElectedLeader(epoch, leaderId); + + if (!entireLog) { + contextBuilder.deleteBeforeSnapshot(snapshotId); + } + + RaftClientTestContext context = contextBuilder.build(); // Advance the highWatermark long localLogEndOffset = context.log.endOffset().offset;