KAFKA-15076; KRaft should prefer latest snapshot (#13834)

If the KRaft listener is at offset 0, the start of the log, and KRaft has generated a snapshot, it should prefer the latest snapshot instead of having the listener read from the start of the log.

This is implemented by having KafkaRaftClient send a Listener.handleLoadSnapshot event, if the Listener is at offset 0 and the KRaft partition has generated a snapshot.

Reviewers: Jason Gustafson <jason@confluent.io>, David Arthur <mumrah@gmail.com>
This commit is contained in:
José Armando García Sancio 2023-06-12 07:25:42 -07:00 committed by GitHub
parent c0cb8dd4bc
commit b7a6a8fd5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 41 additions and 19 deletions

View File

@ -308,7 +308,12 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private void updateListenersProgress(long highWatermark) {
for (ListenerContext listenerContext : listenerContexts.values()) {
listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset -> {
if (nextExpectedOffset < log.startOffset() && nextExpectedOffset < highWatermark) {
// Send snapshot to the listener, if the listener is at the beginning of the log and there is a snapshot,
// or the listener is trying to read an offset for which there isn't a segment in the log.
if (nextExpectedOffset < highWatermark &&
((nextExpectedOffset == 0 && latestSnapshot().isPresent()) ||
nextExpectedOffset < log.startOffset())
) {
SnapshotReader<T> snapshot = latestSnapshot().orElseThrow(() -> new IllegalStateException(
String.format(
"Snapshot expected since next offset of %s is %d, log start offset is %d and high-watermark is %d",

View File

@ -37,6 +37,8 @@ import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.RecordsSnapshotWriter;
import org.apache.kafka.snapshot.SnapshotWriterReaderTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -87,19 +89,24 @@ final public class KafkaRaftClientSnapshotTest {
assertEquals(Optional.empty(), context.client.latestSnapshotId());
}
@Test
public void testLeaderListenerNotified() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testLeaderListenerNotified(boolean entireLog) throws Exception {
int localId = 0;
int otherNodeId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
OffsetAndEpoch snapshotId = new OffsetAndEpoch(3, 1);
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
RaftClientTestContext.Builder contextBuilder = new RaftClientTestContext.Builder(localId, voters)
.appendToLog(snapshotId.epoch(), Arrays.asList("a", "b", "c"))
.appendToLog(snapshotId.epoch(), Arrays.asList("d", "e", "f"))
.withEmptySnapshot(snapshotId)
.deleteBeforeSnapshot(snapshotId)
.build();
.withEmptySnapshot(snapshotId);
if (!entireLog) {
contextBuilder.deleteBeforeSnapshot(snapshotId);
}
RaftClientTestContext context = contextBuilder.build();
context.becomeLeader();
int epoch = context.currentEpoch();
@ -118,21 +125,26 @@ final public class KafkaRaftClientSnapshotTest {
}
}
@Test
public void testFollowerListenerNotified() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testFollowerListenerNotified(boolean entireLog) throws Exception {
int localId = 0;
int leaderId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, leaderId);
int epoch = 2;
OffsetAndEpoch snapshotId = new OffsetAndEpoch(3, 1);
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
RaftClientTestContext.Builder contextBuilder = new RaftClientTestContext.Builder(localId, voters)
.appendToLog(snapshotId.epoch(), Arrays.asList("a", "b", "c"))
.appendToLog(snapshotId.epoch(), Arrays.asList("d", "e", "f"))
.withEmptySnapshot(snapshotId)
.deleteBeforeSnapshot(snapshotId)
.withElectedLeader(epoch, leaderId)
.build();
.withElectedLeader(epoch, leaderId);
if (!entireLog) {
contextBuilder.deleteBeforeSnapshot(snapshotId);
}
RaftClientTestContext context = contextBuilder.build();
// Advance the highWatermark
long localLogEndOffset = context.log.endOffset().offset;
@ -155,21 +167,26 @@ final public class KafkaRaftClientSnapshotTest {
}
}
@Test
public void testSecondListenerNotified() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSecondListenerNotified(boolean entireLog) throws Exception {
int localId = 0;
int leaderId = localId + 1;
Set<Integer> voters = Utils.mkSet(localId, leaderId);
int epoch = 2;
OffsetAndEpoch snapshotId = new OffsetAndEpoch(3, 1);
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
RaftClientTestContext.Builder contextBuilder = new RaftClientTestContext.Builder(localId, voters)
.appendToLog(snapshotId.epoch(), Arrays.asList("a", "b", "c"))
.appendToLog(snapshotId.epoch(), Arrays.asList("d", "e", "f"))
.withEmptySnapshot(snapshotId)
.deleteBeforeSnapshot(snapshotId)
.withElectedLeader(epoch, leaderId)
.build();
.withElectedLeader(epoch, leaderId);
if (!entireLog) {
contextBuilder.deleteBeforeSnapshot(snapshotId);
}
RaftClientTestContext context = contextBuilder.build();
// Advance the highWatermark
long localLogEndOffset = context.log.endOffset().offset;