mirror of https://github.com/apache/kafka.git
KAFKA-12958: add an invariant that notified leaders are never asked to load snapshot (#10932)
Track handleSnapshot calls and make sure it is never triggered on the leader node. Reviewers: Luke Chen <showuon@gmail.com>, José Armando García Sancio <jsancio@users.noreply.github.com>, Boyang Chen <bchen11@outlook.com>
This commit is contained in:
parent
fa685fa152
commit
10b1f73cd4
|
@ -38,6 +38,8 @@ public class ReplicatedCounter implements RaftClient.Listener<Integer> {
|
||||||
private OptionalInt claimedEpoch = OptionalInt.empty();
|
private OptionalInt claimedEpoch = OptionalInt.empty();
|
||||||
private long lastOffsetSnapshotted = -1;
|
private long lastOffsetSnapshotted = -1;
|
||||||
|
|
||||||
|
private int handleSnapshotCalls = 0;
|
||||||
|
|
||||||
public ReplicatedCounter(
|
public ReplicatedCounter(
|
||||||
int nodeId,
|
int nodeId,
|
||||||
RaftClient<Integer> client,
|
RaftClient<Integer> client,
|
||||||
|
@ -152,6 +154,7 @@ public class ReplicatedCounter implements RaftClient.Listener<Integer> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
lastOffsetSnapshotted = reader.lastContainedLogOffset();
|
lastOffsetSnapshotted = reader.lastContainedLogOffset();
|
||||||
|
handleSnapshotCalls += 1;
|
||||||
log.debug("Finished loading snapshot. Set value: {}", committed);
|
log.debug("Finished loading snapshot. Set value: {}", committed);
|
||||||
} finally {
|
} finally {
|
||||||
reader.close();
|
reader.close();
|
||||||
|
@ -170,5 +173,11 @@ public class ReplicatedCounter implements RaftClient.Listener<Integer> {
|
||||||
uncommitted = -1;
|
uncommitted = -1;
|
||||||
claimedEpoch = OptionalInt.empty();
|
claimedEpoch = OptionalInt.empty();
|
||||||
}
|
}
|
||||||
|
handleSnapshotCalls = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Use handleSnapshotCalls to verify leader is never asked to load snapshot */
|
||||||
|
public int handleSnapshotCalls() {
|
||||||
|
return handleSnapshotCalls;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -352,6 +352,7 @@ public class RaftEventSimulationTest {
|
||||||
scheduler.addInvariant(new MajorityReachedHighWatermark(cluster));
|
scheduler.addInvariant(new MajorityReachedHighWatermark(cluster));
|
||||||
scheduler.addInvariant(new SingleLeader(cluster));
|
scheduler.addInvariant(new SingleLeader(cluster));
|
||||||
scheduler.addInvariant(new SnapshotAtLogStart(cluster));
|
scheduler.addInvariant(new SnapshotAtLogStart(cluster));
|
||||||
|
scheduler.addInvariant(new LeaderNeverLoadSnapshot(cluster));
|
||||||
scheduler.addValidation(new ConsistentCommittedData(cluster));
|
scheduler.addValidation(new ConsistentCommittedData(cluster));
|
||||||
return scheduler;
|
return scheduler;
|
||||||
}
|
}
|
||||||
|
@ -1014,6 +1015,23 @@ public class RaftEventSimulationTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class LeaderNeverLoadSnapshot implements Invariant {
|
||||||
|
final Cluster cluster;
|
||||||
|
|
||||||
|
private LeaderNeverLoadSnapshot(Cluster cluster) {
|
||||||
|
this.cluster = cluster;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void verify() {
|
||||||
|
for (RaftNode raftNode : cluster.running()) {
|
||||||
|
if (raftNode.counter.isWritable()) {
|
||||||
|
assertEquals(0, raftNode.counter.handleSnapshotCalls());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Validating the committed data is expensive, so we do this as a {@link Validation}. We depend
|
* Validating the committed data is expensive, so we do this as a {@link Validation}. We depend
|
||||||
* on the following external invariants:
|
* on the following external invariants:
|
||||||
|
|
Loading…
Reference in New Issue