KAFKA-15078; KRaft leader replys with snapshot for offset 0 (#13845)

If the follower has an empty log, fetches with offset 0, it is more
efficient for the leader to reply with a snapshot id (redirect to
FETCH_SNAPSHOT) than for the follower to continue fetching from the log
segments.

Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>
This commit is contained in:
José Armando García Sancio 2023-06-28 14:21:11 -07:00 committed by GitHub
parent 2f71708955
commit 3a246b1aba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 57 additions and 2 deletions

View File

@ -1017,7 +1017,16 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
long fetchOffset = request.fetchOffset();
int lastFetchedEpoch = request.lastFetchedEpoch();
LeaderState<T> state = quorum.leaderStateOrThrow();
ValidOffsetAndEpoch validOffsetAndEpoch = log.validateOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
Optional<OffsetAndEpoch> latestSnapshotId = log.latestSnapshotId();
final ValidOffsetAndEpoch validOffsetAndEpoch;
if (fetchOffset == 0 && latestSnapshotId.isPresent()) {
// If the follower has an empty log and a snapshot exist, it is always more efficient
// to reply with a snapshot id (FETCH_SNAPSHOT) instead of fetching from the log segments.
validOffsetAndEpoch = ValidOffsetAndEpoch.snapshot(latestSnapshotId.get());
} else {
validOffsetAndEpoch = log.validateOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
}
final Records records;
if (validOffsetAndEpoch.kind() == ValidOffsetAndEpoch.Kind.VALID) {

View File

@ -303,7 +303,53 @@ final public class KafkaRaftClientSnapshotTest {
context.client.poll();
// Send Fetch request less than start offset
context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 0, epoch, 0));
context.deliverRequest(context.fetchRequest(epoch, otherNodeId, snapshotId.offset() - 2, snapshotId.epoch(), 0));
context.pollUntilResponse();
FetchResponseData.PartitionData partitionResponse = context.assertSentFetchPartitionResponse();
assertEquals(Errors.NONE, Errors.forCode(partitionResponse.errorCode()));
assertEquals(epoch, partitionResponse.currentLeader().leaderEpoch());
assertEquals(localId, partitionResponse.currentLeader().leaderId());
assertEquals(snapshotId.epoch(), partitionResponse.snapshotId().epoch());
assertEquals(snapshotId.offset(), partitionResponse.snapshotId().endOffset());
}
@Test
public void testFetchRequestOffsetAtZero() throws Exception {
// When the follower sends a FETCH request at offset 0, reply with snapshot id if it exists
int localId = 0;
int otherNodeId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.withAppendLingerMs(1)
.build();
context.becomeLeader();
int epoch = context.currentEpoch();
List<String> appendRecords = Arrays.asList("a", "b", "c");
context.client.scheduleAppend(epoch, appendRecords);
context.time.sleep(context.appendLingerMs());
context.client.poll();
long localLogEndOffset = context.log.endOffset().offset;
assertTrue(
appendRecords.size() <= localLogEndOffset,
String.format("Record length = %s, log end offset = %s", appendRecords.size(), localLogEndOffset)
);
// Advance the highWatermark
context.advanceLocalLeaderHighWatermarkToLogEndOffset();
// Generate a snapshot at the LEO
OffsetAndEpoch snapshotId = new OffsetAndEpoch(localLogEndOffset, epoch);
try (SnapshotWriter<String> snapshot = context.client.createSnapshot(snapshotId, 0).get()) {
assertEquals(snapshotId, snapshot.snapshotId());
snapshot.freeze();
}
// Send Fetch request for offset 0
context.deliverRequest(context.fetchRequest(epoch, otherNodeId, 0, 0, 0));
context.pollUntilResponse();
FetchResponseData.PartitionData partitionResponse = context.assertSentFetchPartitionResponse();
assertEquals(Errors.NONE, Errors.forCode(partitionResponse.errorCode()));