KAFKA-12459; Use property testing library for raft event simulation tests (#10323)

This patch changes the raft simulation tests to use jqwik, which is a property testing library. This provides two main benefits:

- It simplifies the randomization of test parameters. Currently the tests use a fixed set of `Random` seeds, which means that most builds are doing redundant work. We get a bigger benefit from allowing each build to test different parameterizations.
- It makes it easier to reproduce failures. Whenever a test fails, jqwik will report the random seed that failed. A developer can then modify the `@Property` annotation to use that specific seed in order to reproduce the failure.

This patch also includes an optimization for `MockLog.earliestSnapshotId` which reduces the time to run the simulation tests dramatically.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>, José Armando García Sancio <jsancio@gmail.com>, David Jacot <djacot@confluent.io>
This commit is contained in:
Jason Gustafson 2021-03-17 19:20:07 -07:00 committed by GitHub
parent 9adfac2803
commit 8ef1619f3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 265 additions and 347 deletions

1
.gitignore vendored
View File

@ -57,5 +57,6 @@ jmh-benchmarks/generated
jmh-benchmarks/src/main/generated jmh-benchmarks/src/main/generated
streams/src/generated streams/src/generated
raft/src/generated raft/src/generated
raft/.jqwik-database
core/src/generated core/src/generated
metadata/src/generated metadata/src/generated

4
NOTICE
View File

@ -6,3 +6,7 @@ The Apache Software Foundation (https://www.apache.org/).
This distribution has a binary dependency on jersey, which is available under the CDDL This distribution has a binary dependency on jersey, which is available under the CDDL
License. The source code of jersey can be found at https://github.com/jersey/jersey/. License. The source code of jersey can be found at https://github.com/jersey/jersey/.
This distribution has a binary test dependency on jqwik, which is available under
the Eclipse Public License 2.0. The source code can be found at
https://github.com/jlink/jqwik.

View File

@ -1255,6 +1255,7 @@ project(':raft') {
testImplementation project(':clients').sourceSets.test.output testImplementation project(':clients').sourceSets.test.output
testImplementation libs.junitJupiter testImplementation libs.junitJupiter
testImplementation libs.mockitoCore testImplementation libs.mockitoCore
testImplementation libs.jqwik
testRuntimeOnly libs.slf4jlog4j testRuntimeOnly libs.slf4jlog4j
} }
@ -1308,6 +1309,12 @@ project(':raft') {
} }
} }
test {
useJUnitPlatform {
includeEngines 'jqwik', 'junit-jupiter'
}
}
clean.doFirst { clean.doFirst {
delete "$buildDir/kafka/" delete "$buildDir/kafka/"
} }

View File

@ -404,6 +404,7 @@
<allow pkg="org.apache.kafka.metalog" /> <allow pkg="org.apache.kafka.metalog" />
<allow pkg="org.apache.kafka.test"/> <allow pkg="org.apache.kafka.test"/>
<allow pkg="com.fasterxml.jackson" /> <allow pkg="com.fasterxml.jackson" />
<allow pkg="net.jqwik"/>
</subpackage> </subpackage>
<subpackage name="snapshot"> <subpackage name="snapshot">

View File

@ -81,6 +81,7 @@ versions += [
jfreechart: "1.0.0", jfreechart: "1.0.0",
jopt: "5.0.4", jopt: "5.0.4",
junit: "5.7.1", junit: "5.7.1",
jqwik: "1.5.0",
kafka_0100: "0.10.0.1", kafka_0100: "0.10.0.1",
kafka_0101: "0.10.1.1", kafka_0101: "0.10.1.1",
kafka_0102: "0.10.2.2", kafka_0102: "0.10.2.2",
@ -153,6 +154,7 @@ libs += [
junitJupiter: "org.junit.jupiter:junit-jupiter:$versions.junit", junitJupiter: "org.junit.jupiter:junit-jupiter:$versions.junit",
junitJupiterApi: "org.junit.jupiter:junit-jupiter-api:$versions.junit", junitJupiterApi: "org.junit.jupiter:junit-jupiter-api:$versions.junit",
junitVintageEngine: "org.junit.vintage:junit-vintage-engine:$versions.junit", junitVintageEngine: "org.junit.vintage:junit-vintage-engine:$versions.junit",
jqwik: "net.jqwik:jqwik:$versions.jqwik",
hamcrest: "org.hamcrest:hamcrest:$versions.hamcrest", hamcrest: "org.hamcrest:hamcrest:$versions.hamcrest",
kafkaStreams_0100: "org.apache.kafka:kafka-streams:$versions.kafka_0100", kafkaStreams_0100: "org.apache.kafka:kafka-streams:$versions.kafka_0100",
kafkaStreams_0101: "org.apache.kafka:kafka-streams:$versions.kafka_0101", kafkaStreams_0101: "org.apache.kafka:kafka-streams:$versions.kafka_0101",

View File

@ -38,8 +38,8 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.OptionalInt; import java.util.OptionalInt;
@ -404,20 +404,14 @@ public class MockLog implements ReplicatedLog {
@Override @Override
public Optional<OffsetAndEpoch> latestSnapshotId() { public Optional<OffsetAndEpoch> latestSnapshotId() {
try { return Optional.ofNullable(snapshots.lastEntry())
return Optional.of(snapshots.lastKey()); .map(Map.Entry::getKey);
} catch (NoSuchElementException e) {
return Optional.empty();
}
} }
@Override @Override
public Optional<OffsetAndEpoch> earliestSnapshotId() { public Optional<OffsetAndEpoch> earliestSnapshotId() {
try { return Optional.ofNullable(snapshots.firstEntry())
return Optional.of(snapshots.firstKey()); .map(Map.Entry::getKey);
} catch (NoSuchElementException e) {
return Optional.empty();
}
} }
@Override @Override

View File

@ -16,6 +16,9 @@
*/ */
package org.apache.kafka.raft; package org.apache.kafka.raft;
import net.jqwik.api.ForAll;
import net.jqwik.api.Property;
import net.jqwik.api.constraints.IntRange;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.memory.MemoryPool; import org.apache.kafka.common.memory.MemoryPool;
@ -32,7 +35,6 @@ import org.apache.kafka.raft.MockLog.LogBatch;
import org.apache.kafka.raft.MockLog.LogEntry; import org.apache.kafka.raft.MockLog.LogEntry;
import org.apache.kafka.raft.internals.BatchMemoryPool; import org.apache.kafka.raft.internals.BatchMemoryPool;
import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -59,9 +61,45 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.fail;
/**
* The simulation testing framework provides a way to verify quorum behavior under
* different conditions. It is similar to system testing in that the test involves
* independently executing nodes, but there are several important differences:
*
* 1. Simulation behavior is deterministic provided an initial random seed. This
* makes it easy to reproduce and debug test failures.
* 2. The simulation uses an in-memory message router instead of a real network.
* Not only is this much cheaper and faster, it provides an easy way to create
* flaky network conditions or even network partitions without losing the
* simulation determinism.
* 3. Similarly, persistent state is stored in memory. We can nevertheless simulate
* different kinds of failures, such as the loss of unflushed data after a hard
* node restart using {@link MockLog}.
*
* The framework uses a single event scheduler in order to provide deterministic
* executions. Each test is setup as a specific scenario with a variable number of
* voters and observers. Much like system tests, there is typically a warmup
* period, followed by some cluster event (such as a node failure), and then some
* logic to validate behavior after recovery.
*
* If any of the tests fail on a particular seed, the easiest way to reproduce
* the failure is to change the `@Property` annotation to specify the failing seed.
* For example:
*
* <pre>
* {@code
* @Property(tries = 1, seed = "-590031835267299290", shrinking = ShrinkingMode.OFF)
* }
* </pre>
*
* (Note that we disable parameter shrinking since it is not too useful for simulation
* failures and this allows us to isolate a single execution, which makes the logging
* more useful if enabled.)
*/
@Tag("integration") @Tag("integration")
public class RaftEventSimulationTest { public class RaftEventSimulationTest {
private static final TopicPartition METADATA_PARTITION = new TopicPartition("__cluster_metadata", 0); private static final TopicPartition METADATA_PARTITION = new TopicPartition("@metadata", 0);
private static final int ELECTION_TIMEOUT_MS = 1000; private static final int ELECTION_TIMEOUT_MS = 1000;
private static final int ELECTION_JITTER_MS = 100; private static final int ELECTION_JITTER_MS = 100;
private static final int FETCH_TIMEOUT_MS = 3000; private static final int FETCH_TIMEOUT_MS = 3000;
@ -70,335 +108,200 @@ public class RaftEventSimulationTest {
private static final int FETCH_MAX_WAIT_MS = 100; private static final int FETCH_MAX_WAIT_MS = 100;
private static final int LINGER_MS = 0; private static final int LINGER_MS = 0;
@Test @Property(tries = 100)
public void testInitialLeaderElectionQuorumSizeOne() { void canElectInitialLeader(
testInitialLeaderElection(new QuorumConfig(1)); @ForAll Random random,
@ForAll @IntRange(min = 1, max = 5) int numVoters,
@ForAll @IntRange(min = 0, max = 5) int numObservers
) {
Cluster cluster = new Cluster(numVoters, numObservers, random);
MessageRouter router = new MessageRouter(cluster);
EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
cluster.startAll();
schedulePolling(scheduler, cluster, 3, 5);
scheduler.schedule(router::deliverAll, 0, 2, 1);
scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
scheduler.runUntil(cluster::hasConsistentLeader);
scheduler.runUntil(() -> cluster.allReachedHighWatermark(10));
} }
@Test @Property(tries = 100)
public void testInitialLeaderElectionQuorumSizeTwo() { void canElectNewLeaderAfterOldLeaderFailure(
testInitialLeaderElection(new QuorumConfig(2)); @ForAll Random random,
} @ForAll @IntRange(min = 3, max = 5) int numVoters,
@ForAll @IntRange(min = 0, max = 5) int numObservers,
@ForAll boolean isGracefulShutdown
) {
Cluster cluster = new Cluster(numVoters, numObservers, random);
MessageRouter router = new MessageRouter(cluster);
EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
@Test // Seed the cluster with some data
public void testInitialLeaderElectionQuorumSizeThree() { cluster.startAll();
testInitialLeaderElection(new QuorumConfig(3)); schedulePolling(scheduler, cluster, 3, 5);
} scheduler.schedule(router::deliverAll, 0, 2, 1);
scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
scheduler.runUntil(cluster::hasConsistentLeader);
scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10));
@Test // Shutdown the leader and write some more data. We can verify the new leader has been elected
public void testInitialLeaderElectionQuorumSizeFour() { // by verifying that the high watermark can still advance.
testInitialLeaderElection(new QuorumConfig(4)); int leaderId = cluster.latestLeader().orElseThrow(() ->
} new AssertionError("Failed to find current leader")
);
@Test if (isGracefulShutdown) {
public void testInitialLeaderElectionQuorumSizeFive() { cluster.shutdown(leaderId);
testInitialLeaderElection(new QuorumConfig(5)); } else {
} cluster.kill(leaderId);
private void testInitialLeaderElection(QuorumConfig config) {
for (int seed = 0; seed < 100; seed++) {
Cluster cluster = new Cluster(config, seed);
MessageRouter router = new MessageRouter(cluster);
EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
cluster.startAll();
schedulePolling(scheduler, cluster, 3, 5);
scheduler.schedule(router::deliverAll, 0, 2, 1);
scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
scheduler.runUntil(cluster::hasConsistentLeader);
scheduler.runUntil(() -> cluster.allReachedHighWatermark(10));
} }
scheduler.runUntil(() -> cluster.allReachedHighWatermark(20));
long highWatermark = cluster.maxHighWatermarkReached();
// Restart the node and verify it catches up
cluster.start(leaderId);
scheduler.runUntil(() -> cluster.allReachedHighWatermark(highWatermark + 10));
} }
@Test @Property(tries = 100)
public void testElectionAfterLeaderFailureQuorumSizeThree() { void canRecoverAfterAllNodesKilled(
testElectionAfterLeaderFailure(new QuorumConfig(3, 0)); @ForAll Random random,
} @ForAll @IntRange(min = 1, max = 5) int numVoters,
@ForAll @IntRange(min = 0, max = 5) int numObservers
) {
Cluster cluster = new Cluster(numVoters, numObservers, random);
MessageRouter router = new MessageRouter(cluster);
EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
@Test // Seed the cluster with some data
public void testElectionAfterLeaderFailureQuorumSizeThreeAndTwoObservers() { cluster.startAll();
testElectionAfterLeaderFailure(new QuorumConfig(3, 1)); schedulePolling(scheduler, cluster, 3, 5);
} scheduler.schedule(router::deliverAll, 0, 2, 1);
scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
scheduler.runUntil(cluster::hasConsistentLeader);
scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10));
long highWatermark = cluster.maxHighWatermarkReached();
@Test // We kill all of the nodes. Then we bring back a majority and verify that
public void testElectionAfterLeaderFailureQuorumSizeFour() { // they are able to elect a leader and continue making progress
testElectionAfterLeaderFailure(new QuorumConfig(4, 0)); cluster.killAll();
}
@Test Iterator<Integer> nodeIdsIterator = cluster.nodes().iterator();
public void testElectionAfterLeaderFailureQuorumSizeFourAndTwoObservers() { for (int i = 0; i < cluster.majoritySize(); i++) {
testElectionAfterLeaderFailure(new QuorumConfig(4, 2)); Integer nodeId = nodeIdsIterator.next();
} cluster.start(nodeId);
@Test
public void testElectionAfterLeaderFailureQuorumSizeFive() {
testElectionAfterLeaderFailure(new QuorumConfig(5, 0));
}
@Test
public void testElectionAfterLeaderFailureQuorumSizeFiveAndThreeObservers() {
testElectionAfterLeaderFailure(new QuorumConfig(5, 3));
}
private void testElectionAfterLeaderFailure(QuorumConfig config) {
checkElectionAfterLeaderShutdown(config, false);
}
@Test
public void testElectionAfterLeaderGracefulShutdownQuorumSizeThree() {
checkElectionAfterLeaderGracefulShutdown(new QuorumConfig(3, 0));
}
@Test
public void testElectionAfterLeaderGracefulShutdownQuorumSizeThreeAndTwoObservers() {
checkElectionAfterLeaderGracefulShutdown(new QuorumConfig(3, 2));
}
@Test
public void testElectionAfterLeaderGracefulShutdownQuorumSizeFour() {
checkElectionAfterLeaderGracefulShutdown(new QuorumConfig(4, 0));
}
@Test
public void testElectionAfterLeaderGracefulShutdownQuorumSizeFourAndTwoObservers() {
checkElectionAfterLeaderGracefulShutdown(new QuorumConfig(4, 2));
}
@Test
public void testElectionAfterLeaderGracefulShutdownQuorumSizeFive() {
checkElectionAfterLeaderGracefulShutdown(new QuorumConfig(5, 0));
}
@Test
public void testElectionAfterLeaderGracefulShutdownQuorumSizeFiveAndThreeObservers() {
checkElectionAfterLeaderGracefulShutdown(new QuorumConfig(5, 3));
}
private void checkElectionAfterLeaderGracefulShutdown(QuorumConfig config) {
checkElectionAfterLeaderShutdown(config, true);
}
private void checkElectionAfterLeaderShutdown(QuorumConfig config, boolean isGracefulShutdown) {
for (int seed = 0; seed < 100; seed++) {
Cluster cluster = new Cluster(config, seed);
MessageRouter router = new MessageRouter(cluster);
EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
// Seed the cluster with some data
cluster.startAll();
schedulePolling(scheduler, cluster, 3, 5);
scheduler.schedule(router::deliverAll, 0, 2, 1);
scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
scheduler.runUntil(cluster::hasConsistentLeader);
scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10));
// Shutdown the leader and write some more data. We can verify the new leader has been elected
// by verifying that the high watermark can still advance.
int leaderId = cluster.latestLeader().getAsInt();
if (isGracefulShutdown) {
cluster.shutdown(leaderId);
} else {
cluster.kill(leaderId);
}
scheduler.runUntil(() -> cluster.allReachedHighWatermark(20));
long highWatermark = cluster.maxHighWatermarkReached();
// Restart the node and verify it catches up
cluster.start(leaderId);
scheduler.runUntil(() -> cluster.allReachedHighWatermark(highWatermark + 10));
} }
scheduler.runUntil(() -> cluster.allReachedHighWatermark(highWatermark + 10));
} }
@Test @Property(tries = 100)
public void testRecoveryAfterAllNodesFailQuorumSizeThree() { void canElectNewLeaderAfterOldLeaderPartitionedAway(
checkRecoveryAfterAllNodesFail(new QuorumConfig(3)); @ForAll Random random,
@ForAll @IntRange(min = 3, max = 5) int numVoters,
@ForAll @IntRange(min = 0, max = 5) int numObservers
) {
Cluster cluster = new Cluster(numVoters, numObservers, random);
MessageRouter router = new MessageRouter(cluster);
EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
// Seed the cluster with some data
cluster.startAll();
schedulePolling(scheduler, cluster, 3, 5);
scheduler.schedule(router::deliverAll, 0, 2, 2);
scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
scheduler.runUntil(cluster::hasConsistentLeader);
scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10));
// The leader gets partitioned off. We can verify the new leader has been elected
// by writing some data and ensuring that it gets replicated
int leaderId = cluster.latestLeader().orElseThrow(() ->
new AssertionError("Failed to find current leader")
);
router.filter(leaderId, new DropAllTraffic());
Set<Integer> nonPartitionedNodes = new HashSet<>(cluster.nodes());
nonPartitionedNodes.remove(leaderId);
scheduler.runUntil(() -> cluster.allReachedHighWatermark(20, nonPartitionedNodes));
} }
@Test @Property(tries = 100)
public void testRecoveryAfterAllNodesFailQuorumSizeFour() { void canMakeProgressIfMajorityIsReachable(
checkRecoveryAfterAllNodesFail(new QuorumConfig(4)); @ForAll Random random,
@ForAll @IntRange(min = 0, max = 3) int numObservers
) {
int numVoters = 5;
Cluster cluster = new Cluster(numVoters, numObservers, random);
MessageRouter router = new MessageRouter(cluster);
EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
// Seed the cluster with some data
cluster.startAll();
schedulePolling(scheduler, cluster, 3, 5);
scheduler.schedule(router::deliverAll, 0, 2, 2);
scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
scheduler.runUntil(cluster::hasConsistentLeader);
scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10));
// Partition the nodes into two sets. Nodes are reachable within each set,
// but the two sets cannot communicate with each other. We should be able
// to make progress even if an election is needed in the larger set.
router.filter(0, new DropOutboundRequestsFrom(Utils.mkSet(2, 3, 4)));
router.filter(1, new DropOutboundRequestsFrom(Utils.mkSet(2, 3, 4)));
router.filter(2, new DropOutboundRequestsFrom(Utils.mkSet(0, 1)));
router.filter(3, new DropOutboundRequestsFrom(Utils.mkSet(0, 1)));
router.filter(4, new DropOutboundRequestsFrom(Utils.mkSet(0, 1)));
scheduler.runUntil(() -> cluster.anyReachedHighWatermark(20));
long minorityHighWatermark = cluster.maxHighWatermarkReached(Utils.mkSet(0, 1));
long majorityHighWatermark = cluster.maxHighWatermarkReached(Utils.mkSet(2, 3, 4));
assertTrue(majorityHighWatermark > minorityHighWatermark);
// Now restore the partition and verify everyone catches up
router.filter(0, new PermitAllTraffic());
router.filter(1, new PermitAllTraffic());
router.filter(2, new PermitAllTraffic());
router.filter(3, new PermitAllTraffic());
router.filter(4, new PermitAllTraffic());
scheduler.runUntil(() -> cluster.allReachedHighWatermark(30));
} }
@Test @Property(tries = 100)
public void testRecoveryAfterAllNodesFailQuorumSizeFive() { void canMakeProgressAfterBackToBackLeaderFailures(
checkRecoveryAfterAllNodesFail(new QuorumConfig(5)); @ForAll Random random,
} @ForAll @IntRange(min = 3, max = 5) int numVoters,
@ForAll @IntRange(min = 0, max = 5) int numObservers
) {
Cluster cluster = new Cluster(numVoters, numObservers, random);
MessageRouter router = new MessageRouter(cluster);
EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
private void checkRecoveryAfterAllNodesFail(QuorumConfig config) { // Seed the cluster with some data
for (int seed = 0; seed < 100; seed++) { cluster.startAll();
Cluster cluster = new Cluster(config, seed); schedulePolling(scheduler, cluster, 3, 5);
MessageRouter router = new MessageRouter(cluster); scheduler.schedule(router::deliverAll, 0, 2, 5);
EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
scheduler.runUntil(cluster::hasConsistentLeader);
scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10));
// Seed the cluster with some data int leaderId = cluster.latestLeader().getAsInt();
cluster.startAll(); router.filter(leaderId, new DropAllTraffic());
schedulePolling(scheduler, cluster, 3, 5); scheduler.runUntil(() -> cluster.latestLeader().isPresent() && cluster.latestLeader().getAsInt() != leaderId);
scheduler.schedule(router::deliverAll, 0, 2, 1);
scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
scheduler.runUntil(cluster::hasConsistentLeader);
scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10));
long highWatermark = cluster.maxHighWatermarkReached();
// We kill all of the nodes. Then we bring back a majority and verify that // As soon as we have a new leader, restore traffic to the old leader and partition the new leader
// they are able to elect a leader and continue making progress int newLeaderId = cluster.latestLeader().getAsInt();
router.filter(leaderId, new PermitAllTraffic());
router.filter(newLeaderId, new DropAllTraffic());
cluster.killAll(); // Verify now that we can make progress
long targetHighWatermark = cluster.maxHighWatermarkReached() + 10;
Iterator<Integer> nodeIdsIterator = cluster.nodes().iterator(); scheduler.runUntil(() -> cluster.anyReachedHighWatermark(targetHighWatermark));
for (int i = 0; i < cluster.majoritySize(); i++) {
Integer nodeId = nodeIdsIterator.next();
cluster.start(nodeId);
}
scheduler.runUntil(() -> cluster.allReachedHighWatermark(highWatermark + 10));
}
}
@Test
public void testElectionAfterLeaderNetworkPartitionQuorumSizeThree() {
checkElectionAfterLeaderNetworkPartition(new QuorumConfig(3));
}
@Test
public void testElectionAfterLeaderNetworkPartitionQuorumSizeThreeAndTwoObservers() {
checkElectionAfterLeaderNetworkPartition(new QuorumConfig(3, 2));
}
@Test
public void testElectionAfterLeaderNetworkPartitionQuorumSizeFour() {
checkElectionAfterLeaderNetworkPartition(new QuorumConfig(4));
}
@Test
public void testElectionAfterLeaderNetworkPartitionQuorumSizeFourAndTwoObservers() {
checkElectionAfterLeaderNetworkPartition(new QuorumConfig(4, 2));
}
@Test
public void testElectionAfterLeaderNetworkPartitionQuorumSizeFive() {
checkElectionAfterLeaderNetworkPartition(new QuorumConfig(5));
}
@Test
public void testElectionAfterLeaderNetworkPartitionQuorumSizeFiveAndThreeObservers() {
checkElectionAfterLeaderNetworkPartition(new QuorumConfig(5, 3));
}
private void checkElectionAfterLeaderNetworkPartition(QuorumConfig config) {
for (int seed = 0; seed < 100; seed++) {
Cluster cluster = new Cluster(config, seed);
MessageRouter router = new MessageRouter(cluster);
EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
// Seed the cluster with some data
cluster.startAll();
schedulePolling(scheduler, cluster, 3, 5);
scheduler.schedule(router::deliverAll, 0, 2, 2);
scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
scheduler.runUntil(cluster::hasConsistentLeader);
scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10));
// The leader gets partitioned off. We can verify the new leader has been elected
// by writing some data and ensuring that it gets replicated
int leaderId = cluster.latestLeader().getAsInt();
router.filter(leaderId, new DropAllTraffic());
Set<Integer> nonPartitionedNodes = new HashSet<>(cluster.nodes());
nonPartitionedNodes.remove(leaderId);
scheduler.runUntil(() -> cluster.allReachedHighWatermark(20, nonPartitionedNodes));
}
}
@Test
public void testElectionAfterMultiNodeNetworkPartitionQuorumSizeFive() {
checkElectionAfterMultiNodeNetworkPartition(new QuorumConfig(5));
}
@Test
public void testElectionAfterMultiNodeNetworkPartitionQuorumSizeFiveAndTwoObservers() {
checkElectionAfterMultiNodeNetworkPartition(new QuorumConfig(5, 2));
}
private void checkElectionAfterMultiNodeNetworkPartition(QuorumConfig config) {
for (int seed = 0; seed < 100; seed++) {
Cluster cluster = new Cluster(config, seed);
MessageRouter router = new MessageRouter(cluster);
EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
// Seed the cluster with some data
cluster.startAll();
schedulePolling(scheduler, cluster, 3, 5);
scheduler.schedule(router::deliverAll, 0, 2, 2);
scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
scheduler.runUntil(cluster::hasConsistentLeader);
scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10));
// Partition the nodes into two sets. Nodes are reachable within each set,
// but the two sets cannot communicate with each other. We should be able
// to make progress even if an election is needed in the larger set.
router.filter(0, new DropOutboundRequestsFrom(Utils.mkSet(2, 3, 4)));
router.filter(1, new DropOutboundRequestsFrom(Utils.mkSet(2, 3, 4)));
router.filter(2, new DropOutboundRequestsFrom(Utils.mkSet(0, 1)));
router.filter(3, new DropOutboundRequestsFrom(Utils.mkSet(0, 1)));
router.filter(4, new DropOutboundRequestsFrom(Utils.mkSet(0, 1)));
scheduler.runUntil(() -> cluster.anyReachedHighWatermark(20));
long minorityHighWatermark = cluster.maxHighWatermarkReached(Utils.mkSet(0, 1));
long majorityHighWatermark = cluster.maxHighWatermarkReached(Utils.mkSet(2, 3, 4));
assertTrue(majorityHighWatermark > minorityHighWatermark);
// Now restore the partition and verify everyone catches up
router.filter(0, new PermitAllTraffic());
router.filter(1, new PermitAllTraffic());
router.filter(2, new PermitAllTraffic());
router.filter(3, new PermitAllTraffic());
router.filter(4, new PermitAllTraffic());
scheduler.runUntil(() -> cluster.allReachedHighWatermark(30));
}
}
@Test
public void testBackToBackLeaderFailuresQuorumSizeThree() {
checkBackToBackLeaderFailures(new QuorumConfig(3));
}
@Test
public void testBackToBackLeaderFailuresQuorumSizeFiveAndTwoObservers() {
checkBackToBackLeaderFailures(new QuorumConfig(5, 2));
}
private void checkBackToBackLeaderFailures(QuorumConfig config) {
for (int seed = 0; seed < 100; seed++) {
Cluster cluster = new Cluster(config, seed);
MessageRouter router = new MessageRouter(cluster);
EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
// Seed the cluster with some data
cluster.startAll();
schedulePolling(scheduler, cluster, 3, 5);
scheduler.schedule(router::deliverAll, 0, 2, 5);
scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
scheduler.runUntil(cluster::hasConsistentLeader);
scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10));
int leaderId = cluster.latestLeader().getAsInt();
router.filter(leaderId, new DropAllTraffic());
scheduler.runUntil(() -> cluster.latestLeader().isPresent() && cluster.latestLeader().getAsInt() != leaderId);
// As soon as we have a new leader, restore traffic to the old leader and partition the new leader
int newLeaderId = cluster.latestLeader().getAsInt();
router.filter(leaderId, new PermitAllTraffic());
router.filter(newLeaderId, new DropAllTraffic());
// Verify now that we can make progress
long targetHighWatermark = cluster.maxHighWatermarkReached() + 10;
scheduler.runUntil(() -> cluster.anyReachedHighWatermark(targetHighWatermark));
}
} }
private EventScheduler schedulerWithDefaultInvariants(Cluster cluster) { private EventScheduler schedulerWithDefaultInvariants(Cluster cluster) {
@ -495,6 +398,8 @@ public class RaftEventSimulationTest {
} }
private static class EventScheduler { private static class EventScheduler {
private static final int MAX_ITERATIONS = 500000;
final AtomicInteger eventIdGenerator = new AtomicInteger(0); final AtomicInteger eventIdGenerator = new AtomicInteger(0);
final PriorityQueue<Event> queue = new PriorityQueue<>(); final PriorityQueue<Event> queue = new PriorityQueue<>();
final Random random; final Random random;
@ -525,9 +430,15 @@ public class RaftEventSimulationTest {
} }
void runUntil(Supplier<Boolean> exitCondition) { void runUntil(Supplier<Boolean> exitCondition) {
while (!exitCondition.get()) { for (int iteration = 0; iteration < MAX_ITERATIONS; iteration++) {
if (queue.isEmpty()) if (exitCondition.get()) {
break;
}
if (queue.isEmpty()) {
throw new IllegalStateException("Event queue exhausted before condition was satisfied"); throw new IllegalStateException("Event queue exhausted before condition was satisfied");
}
Event event = queue.poll(); Event event = queue.poll();
long delayMs = Math.max(event.deadlineMs - time.milliseconds(), 0); long delayMs = Math.max(event.deadlineMs - time.milliseconds(), 0);
time.sleep(delayMs); time.sleep(delayMs);
@ -535,25 +446,13 @@ public class RaftEventSimulationTest {
invariants.forEach(Invariant::verify); invariants.forEach(Invariant::verify);
} }
assertTrue(exitCondition.get(), "Simulation condition was not satisfied after "
+ MAX_ITERATIONS + " iterations");
validations.forEach(Validation::validate); validations.forEach(Validation::validate);
} }
} }
private static class QuorumConfig {
final int numVoters;
final int numObservers;
private QuorumConfig(int numVoters, int numObservers) {
this.numVoters = numVoters;
this.numObservers = numObservers;
}
private QuorumConfig(int numVoters) {
this(numVoters, 0);
}
}
private static class PersistentState { private static class PersistentState {
final MockQuorumStateStore store = new MockQuorumStateStore(); final MockQuorumStateStore store = new MockQuorumStateStore();
final MockLog log = new MockLog(METADATA_PARTITION); final MockLog log = new MockLog(METADATA_PARTITION);
@ -568,16 +467,16 @@ public class RaftEventSimulationTest {
final Map<Integer, PersistentState> nodes = new HashMap<>(); final Map<Integer, PersistentState> nodes = new HashMap<>();
final Map<Integer, RaftNode> running = new HashMap<>(); final Map<Integer, RaftNode> running = new HashMap<>();
private Cluster(QuorumConfig config, int randomSeed) { private Cluster(int numVoters, int numObservers, Random random) {
this.random = new Random(randomSeed); this.random = random;
int nodeId = 0; int nodeId = 0;
for (; nodeId < config.numVoters; nodeId++) { for (; nodeId < numVoters; nodeId++) {
voters.add(nodeId); voters.add(nodeId);
nodes.put(nodeId, new PersistentState()); nodes.put(nodeId, new PersistentState());
} }
for (; nodeId < config.numVoters + config.numObservers; nodeId++) { for (; nodeId < numVoters + numObservers; nodeId++) {
nodes.put(nodeId, new PersistentState()); nodes.put(nodeId, new PersistentState());
} }
} }
@ -606,12 +505,12 @@ public class RaftEventSimulationTest {
boolean anyReachedHighWatermark(long offset) { boolean anyReachedHighWatermark(long offset) {
return running.values().stream() return running.values().stream()
.anyMatch(node -> node.client.quorum().highWatermark().map(hw -> hw.offset).orElse(0L) > offset); .anyMatch(node -> node.highWatermark() > offset);
} }
long maxHighWatermarkReached() { long maxHighWatermarkReached() {
return running.values().stream() return running.values().stream()
.map(node -> node.client.quorum().highWatermark().map(hw -> hw.offset).orElse(0L)) .map(RaftNode::highWatermark)
.max(Long::compareTo) .max(Long::compareTo)
.orElse(0L); .orElse(0L);
} }
@ -619,20 +518,19 @@ public class RaftEventSimulationTest {
long maxHighWatermarkReached(Set<Integer> nodeIds) { long maxHighWatermarkReached(Set<Integer> nodeIds) {
return running.values().stream() return running.values().stream()
.filter(node -> nodeIds.contains(node.nodeId)) .filter(node -> nodeIds.contains(node.nodeId))
.map(node -> node.client.quorum().highWatermark().map(hw -> hw.offset).orElse(0L)) .map(RaftNode::highWatermark)
.max(Long::compareTo) .max(Long::compareTo)
.orElse(0L); .orElse(0L);
} }
boolean allReachedHighWatermark(long offset, Set<Integer> nodeIds) { boolean allReachedHighWatermark(long offset, Set<Integer> nodeIds) {
return nodeIds.stream() return nodeIds.stream()
.allMatch(nodeId -> running.get(nodeId).client.quorum().highWatermark().map(hw -> hw.offset) .allMatch(nodeId -> running.get(nodeId).highWatermark() > offset);
.orElse(0L) > offset);
} }
boolean allReachedHighWatermark(long offset) { boolean allReachedHighWatermark(long offset) {
return running.values().stream() return running.values().stream()
.allMatch(node -> node.client.quorum().highWatermark().map(hw -> hw.offset).orElse(0L) > offset); .allMatch(node -> node.highWatermark() > offset);
} }
OptionalInt latestLeader() { OptionalInt latestLeader() {
@ -835,6 +733,17 @@ public class RaftEventSimulationTest {
throw new RuntimeException("Uncaught exception during poll of node " + nodeId, e); throw new RuntimeException("Uncaught exception during poll of node " + nodeId, e);
} }
} }
long highWatermark() {
return client.quorum().highWatermark()
.map(hw -> hw.offset)
.orElse(0L);
}
@Override
public String toString() {
return "Node(id=" + nodeId + ", hw=" + highWatermark() + ")";
}
} }
private static class InflightRequest { private static class InflightRequest {