From 3a246b1abab0cfa8050546f54c987af2ec6cdd4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Armando=20Garc=C3=ADa=20Sancio?= Date: Wed, 28 Jun 2023 14:21:11 -0700 Subject: [PATCH] KAFKA-15078; KRaft leader replys with snapshot for offset 0 (#13845) If the follower has an empty log, fetches with offset 0, it is more efficient for the leader to reply with a snapshot id (redirect to FETCH_SNAPSHOT) than for the follower to continue fetching from the log segments. Reviewers: David Arthur , dengziming --- .../apache/kafka/raft/KafkaRaftClient.java | 11 ++++- .../raft/KafkaRaftClientSnapshotTest.java | 48 ++++++++++++++++++- 2 files changed, 57 insertions(+), 2 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index fbb1117da25..40c6a695908 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -1017,7 +1017,16 @@ public class KafkaRaftClient implements RaftClient { long fetchOffset = request.fetchOffset(); int lastFetchedEpoch = request.lastFetchedEpoch(); LeaderState state = quorum.leaderStateOrThrow(); - ValidOffsetAndEpoch validOffsetAndEpoch = log.validateOffsetAndEpoch(fetchOffset, lastFetchedEpoch); + + Optional latestSnapshotId = log.latestSnapshotId(); + final ValidOffsetAndEpoch validOffsetAndEpoch; + if (fetchOffset == 0 && latestSnapshotId.isPresent()) { + // If the follower has an empty log and a snapshot exist, it is always more efficient + // to reply with a snapshot id (FETCH_SNAPSHOT) instead of fetching from the log segments. + validOffsetAndEpoch = ValidOffsetAndEpoch.snapshot(latestSnapshotId.get()); + } else { + validOffsetAndEpoch = log.validateOffsetAndEpoch(fetchOffset, lastFetchedEpoch); + } final Records records; if (validOffsetAndEpoch.kind() == ValidOffsetAndEpoch.Kind.VALID) { diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java index 2de6853d2e1..bc1d0ca21fc 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java @@ -303,7 +303,53 @@ final public class KafkaRaftClientSnapshotTest { context.client.poll(); // Send Fetch request less than start offset - context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 0, epoch, 0)); + context.deliverRequest(context.fetchRequest(epoch, otherNodeId, snapshotId.offset() - 2, snapshotId.epoch(), 0)); + context.pollUntilResponse(); + FetchResponseData.PartitionData partitionResponse = context.assertSentFetchPartitionResponse(); + assertEquals(Errors.NONE, Errors.forCode(partitionResponse.errorCode())); + assertEquals(epoch, partitionResponse.currentLeader().leaderEpoch()); + assertEquals(localId, partitionResponse.currentLeader().leaderId()); + assertEquals(snapshotId.epoch(), partitionResponse.snapshotId().epoch()); + assertEquals(snapshotId.offset(), partitionResponse.snapshotId().endOffset()); + } + + @Test + public void testFetchRequestOffsetAtZero() throws Exception { + // When the follower sends a FETCH request at offset 0, reply with snapshot id if it exists + int localId = 0; + int otherNodeId = localId + 1; + Set voters = Utils.mkSet(localId, otherNodeId); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .withAppendLingerMs(1) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + List appendRecords = Arrays.asList("a", "b", "c"); + context.client.scheduleAppend(epoch, appendRecords); + context.time.sleep(context.appendLingerMs()); + context.client.poll(); + + long localLogEndOffset = context.log.endOffset().offset; + assertTrue( + appendRecords.size() <= localLogEndOffset, + String.format("Record length = %s, log end offset = %s", appendRecords.size(), localLogEndOffset) + ); + + // Advance the highWatermark + context.advanceLocalLeaderHighWatermarkToLogEndOffset(); + + // Generate a snapshot at the LEO + OffsetAndEpoch snapshotId = new OffsetAndEpoch(localLogEndOffset, epoch); + try (SnapshotWriter snapshot = context.client.createSnapshot(snapshotId, 0).get()) { + assertEquals(snapshotId, snapshot.snapshotId()); + snapshot.freeze(); + } + + // Send Fetch request for offset 0 + context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 0, 0, 0)); context.pollUntilResponse(); FetchResponseData.PartitionData partitionResponse = context.assertSentFetchPartitionResponse(); assertEquals(Errors.NONE, Errors.forCode(partitionResponse.errorCode()));