KAFKA-10661; Add new resigned state for graceful shutdown/initialization (#9531)

When initializing the raft state machine after shutting down as a leader, we were previously entering the "unattached" state, which means we have no leader and no voted candidate. This was a bug because it allowed a reinitialized leader to cast a vote for a candidate in the same epoch that it was already the leader of. This patch fixes the problem by introducing a new "resigned" state which allows us to retain the leader state so that we cannot change our vote and we will not accept additional appends.

This patch also revamps the shutdown logic to make use of the new "resigned" state. Previously we had a separate path in `KafkaRaftClient.poll` for the shutdown logic which resulted in some duplication. Instead now we incorporate shutdown behavior into each state's respective logic.

Finally, this patch changes the shutdown logic so that `EndQuorumEpoch` is only sent by resigning leaders. Previously we allowed this request to be sent by candidates as well.

Reviewers: dengziming <dengziming1993@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Jason Gustafson 2020-11-09 12:52:28 -08:00 committed by GitHub
parent d61dc0c183
commit f49c6c203f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 687 additions and 206 deletions

View File

@ -70,16 +70,14 @@ public class EndQuorumEpochRequest extends AbstractRequest {
}
public static EndQuorumEpochRequestData singletonRequest(TopicPartition topicPartition,
int replicaId,
int leaderEpoch,
int leaderId,
List<Integer> preferredSuccessors) {
return singletonRequest(topicPartition, null, replicaId, leaderEpoch, leaderId, preferredSuccessors);
return singletonRequest(topicPartition, null, leaderEpoch, leaderId, preferredSuccessors);
}
public static EndQuorumEpochRequestData singletonRequest(TopicPartition topicPartition,
String clusterId,
int replicaId,
int leaderEpoch,
int leaderId,
List<Integer> preferredSuccessors) {
@ -91,7 +89,6 @@ public class EndQuorumEpochRequest extends AbstractRequest {
.setPartitions(Collections.singletonList(
new EndQuorumEpochRequestData.PartitionData()
.setPartitionIndex(topicPartition.partition())
.setReplicaId(replicaId)
.setLeaderEpoch(leaderEpoch)
.setLeaderId(leaderId)
.setPreferredSuccessors(preferredSuccessors))))

View File

@ -29,10 +29,8 @@
"versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ReplicaId", "type": "int32", "versions": "0+",
"about": "The ID of the replica sending this request"},
{ "name": "LeaderId", "type": "int32", "versions": "0+",
"about": "The current leader ID or -1 if there is a vote in progress"},
"about": "The current leader ID that is resigning"},
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
"about": "The current epoch"},
{ "name": "PreferredSuccessors", "type": "[]int32", "versions": "0+",

View File

@ -163,9 +163,8 @@ class KafkaNetworkChannelTest {
BeginQuorumEpochRequest.singletonRequest(topicPartition, clusterId, leaderEpoch, leaderId)
case ApiKeys.END_QUORUM_EPOCH =>
val replicaId = 1
EndQuorumEpochRequest.singletonRequest(topicPartition, clusterId, replicaId,
leaderId, leaderEpoch, Collections.singletonList(2))
EndQuorumEpochRequest.singletonRequest(topicPartition, clusterId, leaderId,
leaderEpoch, Collections.singletonList(2))
case ApiKeys.VOTE =>
val lastEpoch = 4

View File

@ -578,7 +578,7 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.END_QUORUM_EPOCH =>
new EndQuorumEpochRequest.Builder(EndQuorumEpochRequest.singletonRequest(
tp, 10, 2, 5, Collections.singletonList(3)))
tp, 10, 5, Collections.singletonList(3)))
case ApiKeys.ALTER_ISR =>
new AlterIsrRequest.Builder(new AlterIsrRequestData())

View File

@ -434,6 +434,11 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
resetConnections();
}
private void transitionToResigned(List<Integer> preferredSuccessors) {
quorum.transitionToResigned(preferredSuccessors);
resetConnections();
}
private void transitionToVoted(int candidateId, int epoch) throws IOException {
maybeResignLeadership();
quorum.transitionToVoted(epoch, candidateId);
@ -517,13 +522,17 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
final boolean voteGranted;
if (quorum.isLeader()) {
logger.debug("Ignoring vote request {} with epoch {} since we are already leader on that epoch",
logger.debug("Rejecting vote request {} with epoch {} since we are already leader on that epoch",
request, candidateEpoch);
voteGranted = false;
} else if (quorum.isCandidate()) {
logger.debug("Ignoring vote request {} with epoch {} since we are already candidate on that epoch",
logger.debug("Rejecting vote request {} with epoch {} since we are already candidate on that epoch",
request, candidateEpoch);
voteGranted = false;
} else if (quorum.isResigned()) {
logger.debug("Rejecting vote request {} with epoch {} since we have resigned as candidate/leader in this epoch",
request, candidateEpoch);
voteGranted = false;
} else if (quorum.isFollower()) {
FollowerState state = quorum.followerStateOrThrow();
logger.debug("Rejecting vote request {} with epoch {} since we already have a leader {} on that epoch",
@ -747,32 +756,27 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
request.topics().get(0).partitions().get(0);
int requestEpoch = partitionRequest.leaderEpoch();
int requestReplicaId = partitionRequest.replicaId();
int requestLeaderId = partitionRequest.leaderId();
Optional<Errors> errorOpt = validateVoterOnlyRequest(requestReplicaId, requestEpoch);
Optional<Errors> errorOpt = validateVoterOnlyRequest(requestLeaderId, requestEpoch);
if (errorOpt.isPresent()) {
return buildEndQuorumEpochResponse(errorOpt.get());
}
OptionalInt requestLeaderId = optionalLeaderId(partitionRequest.leaderId());
maybeTransition(requestLeaderId, requestEpoch, currentTimeMs);
maybeTransition(OptionalInt.of(requestLeaderId), requestEpoch, currentTimeMs);
if (quorum.isFollower()) {
FollowerState state = quorum.followerStateOrThrow();
if (state.leaderId() == requestReplicaId) {
if (state.leaderId() == requestLeaderId) {
List<Integer> preferredSuccessors = partitionRequest.preferredSuccessors();
if (!preferredSuccessors.contains(quorum.localId)) {
return buildEndQuorumEpochResponse(Errors.INCONSISTENT_VOTER_SET);
}
long electionBackoffMs = endEpochElectionBackoff(preferredSuccessors);
logger.debug("Overriding follower fetch timeout to {} after receiving " +
"EndQuorumEpoch request from leader {} in epoch {}", electionBackoffMs,
requestLeaderId, requestEpoch);
state.overrideFetchTimeout(currentTimeMs, electionBackoffMs);
}
} else if (quorum.isVoted()) {
VotedState state = quorum.votedStateOrThrow();
if (state.votedId() == requestReplicaId) {
long electionBackoffMs = binaryExponentialElectionBackoffMs(1);
state.overrideElectionTimeout(currentTimeMs, electionBackoffMs);
}
}
return buildEndQuorumEpochResponse(Errors.NONE);
}
@ -816,6 +820,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
if (handled.isPresent()) {
return handled.get();
} else if (partitionError == Errors.NONE) {
ResignedState resignedState = quorum.resignedStateOrThrow();
resignedState.acknowledgeResignation(responseMetadata.sourceId());
return true;
} else {
return handleUnexpectedError(partitionError, responseMetadata);
@ -1374,17 +1380,14 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
return connection.remainingRequestTimeMs(currentTimeMs);
}
private EndQuorumEpochRequestData buildEndQuorumEpochRequest() {
List<Integer> preferredSuccessors = quorum.isLeader() ?
quorum.leaderStateOrThrow().nonLeaderVotersByDescendingFetchOffset() :
Collections.emptyList();
private EndQuorumEpochRequestData buildEndQuorumEpochRequest(
ResignedState state
) {
return EndQuorumEpochRequest.singletonRequest(
log.topicPartition(),
quorum.localId,
quorum.epoch(),
quorum.leaderIdOrNil(),
preferredSuccessors
quorum.localId,
state.preferredSuccessors()
);
}
@ -1457,40 +1460,6 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
return gracefulShutdown != null && !gracefulShutdown.isFinished();
}
private void pollShutdown(GracefulShutdown shutdown) throws IOException {
// Graceful shutdown allows a leader or candidate to resign its leadership without
// awaiting expiration of the election timeout. As soon as another leader is elected,
// the shutdown is considered complete.
shutdown.update();
if (shutdown.isFinished()) {
return;
}
long currentTimeMs = shutdown.finishTimer.currentTimeMs();
if (quorum.remoteVoters().isEmpty() || quorum.hasRemoteLeader()) {
shutdown.complete();
return;
}
long pollTimeoutMs = shutdown.finishTimer.remainingMs();
if (quorum.isLeader() || quorum.isCandidate()) {
long backoffMs = maybeSendRequests(
currentTimeMs,
quorum.remoteVoters(),
this::buildEndQuorumEpochRequest
);
pollTimeoutMs = Math.min(backoffMs, pollTimeoutMs);
}
List<RaftMessage> inboundMessages = channel.receive(pollTimeoutMs);
for (RaftMessage message : inboundMessages) {
handleInboundMessage(message, currentTimeMs);
currentTimeMs = time.milliseconds();
}
}
private void appendBatch(
LeaderState state,
BatchAccumulator.CompletedBatch<T> batch,
@ -1545,10 +1514,40 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
return timeUnitFlush;
}
private long pollResigned(long currentTimeMs) throws IOException {
ResignedState state = quorum.resignedStateOrThrow();
long endQuorumBackoffMs = maybeSendRequests(
currentTimeMs,
state.unackedVoters(),
() -> buildEndQuorumEpochRequest(state)
);
GracefulShutdown shutdown = this.shutdown.get();
final long stateTimeoutMs;
if (shutdown != null) {
// If we are shutting down, then we will remain in the resigned state
// until either the shutdown expires or an election bumps the epoch
stateTimeoutMs = shutdown.remainingTimeMs();
} else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
transitionToCandidate(currentTimeMs);
stateTimeoutMs = 0L;
} else {
stateTimeoutMs = state.remainingElectionTimeMs(currentTimeMs);
}
return Math.min(stateTimeoutMs, endQuorumBackoffMs);
}
private long pollLeader(long currentTimeMs) {
LeaderState state = quorum.leaderStateOrThrow();
maybeFireHandleClaim(state);
GracefulShutdown shutdown = this.shutdown.get();
if (shutdown != null) {
transitionToResigned(state.nonLeaderVotersByDescendingFetchOffset());
return 0L;
}
long timeUntilFlush = maybeAppendBatches(
state,
currentTimeMs
@ -1563,9 +1562,34 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
return Math.min(timeUntilFlush, timeUntilSend);
}
private long maybeSendVoteRequests(
CandidateState state,
long currentTimeMs
) {
// Continue sending Vote requests as long as we still have a chance to win the election
if (!state.isVoteRejected()) {
return maybeSendRequests(
currentTimeMs,
state.unrecordedVoters(),
this::buildVoteRequest
);
}
return Long.MAX_VALUE;
}
private long pollCandidate(long currentTimeMs) throws IOException {
CandidateState state = quorum.candidateStateOrThrow();
if (state.isBackingOff()) {
GracefulShutdown shutdown = this.shutdown.get();
if (shutdown != null) {
// If we happen to shutdown while we are a candidate, we will continue
// with the current election until one of the following conditions is met:
// 1) we are elected as leader (which allows us to resign)
// 2) another leader is elected
// 3) the shutdown timer expires
long minRequestBackoffMs = maybeSendVoteRequests(state, currentTimeMs);
return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs);
} else if (state.isBackingOff()) {
if (state.isBackoffComplete(currentTimeMs)) {
logger.info("Re-elect as candidate after election backoff has completed");
transitionToCandidate(currentTimeMs);
@ -1578,16 +1602,9 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
backoffDurationMs);
state.startBackingOff(currentTimeMs, backoffDurationMs);
return backoffDurationMs;
} else if (!state.isVoteRejected()) {
// Continue sending Vote requests as long as we still have a chance to win the election
long minRequestBackoffMs = maybeSendRequests(
currentTimeMs,
state.unrecordedVoters(),
this::buildVoteRequest
);
return Math.min(minRequestBackoffMs, state.remainingElectionTimeMs(currentTimeMs));
} else {
return state.remainingElectionTimeMs(currentTimeMs);
long minRequestBackoffMs = maybeSendVoteRequests(state, currentTimeMs);
return Math.min(minRequestBackoffMs, state.remainingElectionTimeMs(currentTimeMs));
}
}
@ -1601,7 +1618,12 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
private long pollFollowerAsVoter(FollowerState state, long currentTimeMs) throws IOException {
if (state.hasFetchTimeoutExpired(currentTimeMs)) {
GracefulShutdown shutdown = this.shutdown.get();
if (shutdown != null) {
// If we are a follower, then we can shutdown immediately. We want to
// skip the transition to candidate in any case.
return 0;
} else if (state.hasFetchTimeoutExpired(currentTimeMs)) {
logger.info("Become candidate due to fetch timeout");
transitionToCandidate(currentTimeMs);
return 0L;
@ -1636,7 +1658,6 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
state.leaderId(),
this::buildFetchRequest
);
}
return Math.min(backoffMs, state.remainingFetchTimeMs(currentTimeMs));
}
@ -1644,7 +1665,13 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private long pollVoted(long currentTimeMs) throws IOException {
VotedState state = quorum.votedStateOrThrow();
if (state.hasElectionTimeoutExpired(currentTimeMs)) {
GracefulShutdown shutdown = this.shutdown.get();
if (shutdown != null) {
// If shutting down, then remain in this state until either the
// shutdown completes or an epoch bump forces another state transition
return shutdown.remainingTimeMs();
} else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
transitionToCandidate(currentTimeMs);
return 0L;
} else {
@ -1662,7 +1689,12 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
private long pollUnattachedAsVoter(UnattachedState state, long currentTimeMs) throws IOException {
if (state.hasElectionTimeoutExpired(currentTimeMs)) {
GracefulShutdown shutdown = this.shutdown.get();
if (shutdown != null) {
// If shutting down, then remain in this state until either the
// shutdown completes or an epoch bump forces another state transition
return shutdown.remainingTimeMs();
} else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
transitionToCandidate(currentTimeMs);
return 0L;
} else {
@ -1686,6 +1718,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
return pollVoted(currentTimeMs);
} else if (quorum.isUnattached()) {
return pollUnattached(currentTimeMs);
} else if (quorum.isResigned()) {
return pollResigned(currentTimeMs);
} else {
throw new IllegalStateException("Unexpected quorum state " + quorum);
}
@ -1713,26 +1747,48 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
});
}
private boolean maybeCompleteShutdown(long currentTimeMs) {
GracefulShutdown shutdown = this.shutdown.get();
if (shutdown == null) {
return false;
}
shutdown.update(currentTimeMs);
if (shutdown.hasTimedOut()) {
shutdown.failWithTimeout();
return true;
}
if (quorum.isObserver()
|| quorum.remoteVoters().isEmpty()
|| quorum.hasRemoteLeader()) {
shutdown.complete();
return true;
}
return false;
}
public void poll() throws IOException {
GracefulShutdown gracefulShutdown = shutdown.get();
if (gracefulShutdown != null) {
pollShutdown(gracefulShutdown);
} else {
pollListeners();
pollListeners();
long currentTimeMs = time.milliseconds();
long pollTimeoutMs = pollCurrentState(currentTimeMs);
kafkaRaftMetrics.updatePollStart(currentTimeMs);
long currentTimeMs = time.milliseconds();
if (maybeCompleteShutdown(currentTimeMs)) {
return;
}
List<RaftMessage> inboundMessages = channel.receive(pollTimeoutMs);
long pollTimeoutMs = pollCurrentState(currentTimeMs);
kafkaRaftMetrics.updatePollStart(currentTimeMs);
List<RaftMessage> inboundMessages = channel.receive(pollTimeoutMs);
currentTimeMs = time.milliseconds();
kafkaRaftMetrics.updatePollEnd(currentTimeMs);
for (RaftMessage message : inboundMessages) {
handleInboundMessage(message, currentTimeMs);
currentTimeMs = time.milliseconds();
kafkaRaftMetrics.updatePollEnd(currentTimeMs);
for (RaftMessage message : inboundMessages) {
handleInboundMessage(message, currentTimeMs);
currentTimeMs = time.milliseconds();
}
}
}
@ -1777,26 +1833,27 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
this.completeFuture = completeFuture;
}
public void update() {
finishTimer.update();
if (finishTimer.isExpired()) {
close();
logger.warn("Graceful shutdown timed out after {}ms", finishTimer.timeoutMs());
completeFuture.completeExceptionally(
new TimeoutException("Timeout expired before shutdown completed"));
}
public void update(long currentTimeMs) {
finishTimer.update(currentTimeMs);
}
public boolean hasTimedOut() {
return finishTimer.isExpired();
}
public boolean isFinished() {
return completeFuture.isDone();
}
public boolean succeeded() {
return isFinished() && !failed();
public long remainingTimeMs() {
return finishTimer.remainingMs();
}
public boolean failed() {
return completeFuture.isCompletedExceptionally();
public void failWithTimeout() {
close();
logger.warn("Graceful shutdown timed out after {}ms", finishTimer.timeoutMs());
completeFuture.completeExceptionally(
new TimeoutException("Timeout expired before graceful shutdown completed"));
}
public void complete() {

View File

@ -21,7 +21,9 @@ import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Random;
@ -29,10 +31,11 @@ import java.util.Set;
import java.util.stream.Collectors;
/**
* This class is responsible for managing the current state of this node and ensuring only
* valid state transitions.
* This class is responsible for managing the current state of this node and ensuring
* only valid state transitions. Below we define the possible state transitions and
* how they are triggered:
*
* Unattached =>
* Unattached|Resigned =>
* Unattached: After learning of a new election with a higher epoch
* Voted: After granting a vote to a candidate
* Candidate: After expiration of the election timeout
@ -49,15 +52,16 @@ import java.util.stream.Collectors;
*
* Leader =>
* Unattached: After learning of a new election with a higher epoch
* Resigned: When shutting down gracefully
*
* Follower =>
* Unattached: After learning of a new election with a higher epoch
* Candidate: After expiration of the fetch timeout
* Follower: After discovering a leader with a larger epoch
*
* Observers follow a simpler state machine. The Voted/Candidate/Leader states
* are not possible for observers, so the only transitions that are possible are
* between Unattached and Follower.
* Observers follow a simpler state machine. The Voted/Candidate/Leader/Resigned
* states are not possible for observers, so the only transitions that are possible
* are between Unattached and Follower.
*
* Unattached =>
* Unattached: After learning of a new election with a higher epoch
@ -136,16 +140,19 @@ public class QuorumState {
randomElectionTimeoutMs()
);
} else if (election.isLeader(localId)) {
// If we were previously a leader, then we will start out as unattached
// in the same epoch. This protects the invariant that each record
// is uniquely identified by offset and epoch, which might otherwise
// be violated if unflushed data is lost after restarting.
initialState = new UnattachedState(
// If we were previously a leader, then we will start out as resigned
// in the same epoch. This serves two purposes:
// 1. It ensures that we cannot vote for another leader in the same epoch.
// 2. It protects the invariant that each record is uniquely identified by
// offset and epoch, which might otherwise be violated if unflushed data
// is lost after restarting.
initialState = new ResignedState(
time,
localId,
election.epoch,
voters,
Optional.empty(),
randomElectionTimeoutMs()
randomElectionTimeoutMs(),
Collections.emptyList()
);
} else if (election.isVotedCandidate(localId)) {
initialState = new CandidateState(
@ -232,9 +239,28 @@ public class QuorumState {
return !isVoter();
}
public void transitionToResigned(List<Integer> preferredSuccessors) {
if (!isLeader()) {
throw new IllegalStateException("Invalid transition to Resigned state from " + state);
}
// The Resigned state is a soft state which does not need to be persisted.
// A leader will always be re-initialized in this state.
int epoch = state.epoch();
this.state = new ResignedState(
time,
localId,
epoch,
voters,
randomElectionTimeoutMs(),
preferredSuccessors
);
log.info("Completed transition to {}", state);
}
/**
* Transition to the "unattached" state. This means we have found an epoch strictly larger
* than what is currently known, but wo do not yet know of an elected leader.
* Transition to the "unattached" state. This means we have found an epoch greater than
* or equal to the current epoch, but wo do not yet know of the elected leader.
*/
public void transitionToUnattached(int epoch) throws IOException {
int currentEpoch = state.epoch();
@ -434,6 +460,12 @@ public class QuorumState {
throw new IllegalStateException("Expected to be Leader, but current state is " + state);
}
public ResignedState resignedStateOrThrow() {
if (isResigned())
return (ResignedState) state;
throw new IllegalStateException("Expected to be Resigned, but current state is " + state);
}
public CandidateState candidateStateOrThrow() {
if (isCandidate())
return (CandidateState) state;
@ -461,6 +493,10 @@ public class QuorumState {
return state instanceof LeaderState;
}
public boolean isResigned() {
return state instanceof ResignedState;
}
public boolean isCandidate() {
return state instanceof CandidateState;
}

View File

@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.raft;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* This state represents a leader which has fenced itself either because it
* is shutting down or because it has encountered a soft failure of some sort.
* No writes are accepted in this state and we are not permitted to vote for
* any other candidate in this epoch.
*
* A resigned leader may initiate a new election by sending `EndQuorumEpoch`
* requests to all of the voters. This state tracks delivery of this request
* in order to prevent unnecessary retries.
*
* A voter will remain in the `Resigned` state until we either learn about
* another election, or our own election timeout expires and we become a
* Candidate.
*/
public class ResignedState implements EpochState {
private final int localId;
private final int epoch;
private final Set<Integer> voters;
private final long electionTimeoutMs;
private final Set<Integer> unackedVoters;
private final Timer electionTimer;
private final List<Integer> preferredSuccessors;
public ResignedState(
Time time,
int localId,
int epoch,
Set<Integer> voters,
long electionTimeoutMs,
List<Integer> preferredSuccessors
) {
this.localId = localId;
this.epoch = epoch;
this.voters = voters;
this.unackedVoters = new HashSet<>(voters);
this.unackedVoters.remove(localId);
this.electionTimeoutMs = electionTimeoutMs;
this.electionTimer = time.timer(electionTimeoutMs);
this.preferredSuccessors = preferredSuccessors;
}
@Override
public ElectionState election() {
return ElectionState.withElectedLeader(epoch, localId, voters);
}
@Override
public int epoch() {
return epoch;
}
/**
* Get the set of voters which have yet to acknowledge the resignation.
* This node will send `EndQuorumEpoch` requests to this set until these
* voters acknowledge the request or we transition to another state.
*
* @return the set of unacknowledged voters
*/
public Set<Integer> unackedVoters() {
return unackedVoters;
}
/**
* Invoked after receiving a successful `EndQuorumEpoch` response. This
* is in order to prevent unnecessary retries.
*
* @param voterId the ID of the voter that send the successful response
*/
public void acknowledgeResignation(int voterId) {
if (!voters.contains(voterId)) {
throw new IllegalArgumentException("Attempt to acknowledge delivery of `EndQuorumEpoch` " +
"by a non-voter " + voterId);
}
unackedVoters.remove(voterId);
}
/**
* Check whether the timeout has expired.
*
* @param currentTimeMs current time in milliseconds
* @return true if the timeout has expired, false otherwise
*/
public boolean hasElectionTimeoutExpired(long currentTimeMs) {
electionTimer.update(currentTimeMs);
return electionTimer.isExpired();
}
/**
* Check the time remaining until the timeout expires.
*
* @param currentTimeMs current time in milliseconds
* @return the duration in milliseconds from the current time before the timeout expires
*/
public long remainingElectionTimeMs(long currentTimeMs) {
electionTimer.update(currentTimeMs);
return electionTimer.remainingMs();
}
public List<Integer> preferredSuccessors() {
return preferredSuccessors;
}
@Override
public String name() {
return "Resigned";
}
@Override
public String toString() {
return "ResignedState(" +
"localId=" + localId +
", epoch=" + epoch +
", voters=" + voters +
", electionTimeoutMs=" + electionTimeoutMs +
", unackedVoters=" + unackedVoters +
", preferredSuccessors=" + preferredSuccessors +
')';
}
}

View File

@ -88,7 +88,57 @@ public class KafkaRaftClientTest {
}
@Test
public void testInitializeAsLeaderFromStateStore() throws Exception {
public void testRejectVotesFromSameEpochAfterResigningLeadership() throws Exception {
int localId = 0;
int remoteId = 1;
Set<Integer> voters = Utils.mkSet(localId, remoteId);
int epoch = 2;
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.updateRandom(random -> {
Mockito.doReturn(0).when(random).nextInt(DEFAULT_ELECTION_TIMEOUT_MS);
})
.withElectedLeader(epoch, localId)
.build();
assertEquals(0L, context.log.endOffset().offset);
context.assertElectedLeader(epoch, localId);
// Since we were the leader in epoch 2, we should ensure that we will not vote for any
// other voter in the same epoch, even if it has caught up to the same position.
context.deliverRequest(context.voteRequest(epoch, remoteId,
context.log.lastFetchedEpoch(), context.log.endOffset().offset));
context.client.poll();
context.assertSentVoteResponse(Errors.NONE, epoch, OptionalInt.of(localId), false);
}
@Test
public void testRejectVotesFromSameEpochAfterResigningCandidacy() throws Exception {
int localId = 0;
int remoteId = 1;
Set<Integer> voters = Utils.mkSet(localId, remoteId);
int epoch = 2;
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.updateRandom(random -> {
Mockito.doReturn(0).when(random).nextInt(DEFAULT_ELECTION_TIMEOUT_MS);
})
.withVotedCandidate(epoch, localId)
.build();
assertEquals(0L, context.log.endOffset().offset);
context.assertVotedCandidate(epoch, localId);
// Since we were the leader in epoch 2, we should ensure that we will not vote for any
// other voter in the same epoch, even if it has caught up to the same position.
context.deliverRequest(context.voteRequest(epoch, remoteId,
context.log.lastFetchedEpoch(), context.log.endOffset().offset));
context.client.poll();
context.assertSentVoteResponse(Errors.NONE, epoch, OptionalInt.empty(), false);
}
@Test
public void testInitializeAsResignedLeaderFromStateStore() throws Exception {
int localId = 0;
Set<Integer> voters = Utils.mkSet(localId, 1);
int epoch = 2;
@ -100,14 +150,63 @@ public class KafkaRaftClientTest {
.withElectedLeader(epoch, localId)
.build();
// The node will remain elected, but start up in a resigned state
// in which no additional writes are accepted.
assertEquals(0L, context.log.endOffset().offset);
context.assertUnknownLeader(epoch);
context.assertElectedLeader(epoch, localId);
context.client.poll();
assertEquals(Long.MAX_VALUE, context.client.scheduleAppend(epoch, Arrays.asList("a", "b")));
context.pollUntilSend();
int correlationId = context.assertSentEndQuorumEpochRequest(epoch, 1);
context.deliverResponse(correlationId, 1, context.endEpochResponse(epoch, OptionalInt.of(localId)));
context.client.poll();
context.time.sleep(context.electionTimeoutMs);
context.pollUntilSend();
context.assertVotedCandidate(epoch + 1, localId);
context.assertSentVoteRequest(epoch + 1, 0, 0L);
}
@Test
public void testEndQuorumEpochRetriesWhileResigned() throws Exception {
int localId = 0;
int voter1 = 1;
int voter2 = 2;
Set<Integer> voters = Utils.mkSet(localId, voter1, voter2);
int epoch = 19;
// Start off as leader so that we will initialize in the Resigned state.
// Note that we intentionally set a request timeout which is smaller than
// the election timeout so that we can still in the Resigned state and
// verify retry behavior.
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.withElectionTimeoutMs(10000)
.withRequestTimeoutMs(5000)
.withElectedLeader(epoch, localId)
.build();
context.pollUntilSend();
List<RaftRequest.Outbound> requests = context.collectEndQuorumRequests(epoch, Utils.mkSet(voter1, voter2));
assertEquals(2, requests.size());
// Respond to one of the requests so that we can verify that no additional
// request to this node is sent.
RaftRequest.Outbound endEpochOutbound = requests.get(0);
context.deliverResponse(endEpochOutbound.correlationId, endEpochOutbound.destinationId(),
context.endEpochResponse(epoch, OptionalInt.of(localId)));
context.client.poll();
assertEquals(Collections.emptyList(), context.channel.drainSendQueue());
// Now sleep for the request timeout and verify that we get only one
// retried request from the voter that hasn't responded yet.
int nonRespondedId = requests.get(1).destinationId();
context.time.sleep(6000);
context.pollUntilSend();
List<RaftRequest.Outbound> retries = context.collectEndQuorumRequests(epoch, Utils.mkSet(nonRespondedId));
assertEquals(1, retries.size());
}
@Test
public void testInitializeAsCandidateFromStateStore() throws Exception {
int localId = 0;
@ -117,12 +216,11 @@ public class KafkaRaftClientTest {
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.withVotedCandidate(2, localId)
.build();
context.assertVotedCandidate(2, localId);
assertEquals(0L, context.log.endOffset().offset);
// Send out vote requests.
context.client.poll();
// The candidate will resume the election after reinitialization
context.pollUntilSend();
List<RaftRequest.Outbound> voteRequests = context.collectVoteRequests(2, 0, 0);
assertEquals(2, voteRequests.size());
}
@ -205,10 +303,10 @@ public class KafkaRaftClientTest {
}
@Test
public void testEndQuorumIgnoredIfAlreadyCandidate() throws Exception {
public void testEndQuorumIgnoredAsCandidateIfOlderEpoch() throws Exception {
int localId = 0;
int otherNodeId = 1;
int epoch = 2;
int epoch = 5;
int jitterMs = 85;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@ -216,14 +314,19 @@ public class KafkaRaftClientTest {
.updateRandom(random -> {
Mockito.doReturn(jitterMs).when(random).nextInt(Mockito.anyInt());
})
.withVotedCandidate(epoch, localId)
.withUnknownLeader(epoch - 1)
.build();
context.deliverRequest(context.endEpochRequest(epoch, OptionalInt.empty(),
otherNodeId, Collections.singletonList(context.localId)));
// Sleep a little to ensure that we become a candidate
context.time.sleep(context.electionTimeoutMs + jitterMs);
context.client.poll();
context.assertVotedCandidate(epoch, localId);
context.deliverRequest(context.endEpochRequest(epoch - 2, otherNodeId,
Collections.singletonList(context.localId)));
context.client.poll();
context.assertSentEndQuorumEpochResponse(Errors.NONE, epoch, OptionalInt.empty());
context.assertSentEndQuorumEpochResponse(Errors.FENCED_LEADER_EPOCH, epoch, OptionalInt.empty());
// We should still be candidate until expiration of election timeout
context.time.sleep(context.electionTimeoutMs + jitterMs - 1);
@ -242,21 +345,20 @@ public class KafkaRaftClientTest {
}
@Test
public void testEndQuorumIgnoredIfAlreadyLeader() throws Exception {
public void testEndQuorumIgnoredAsLeaderIfOlderEpoch() throws Exception {
int localId = 0;
int voter2 = localId + 1;
int voter3 = localId + 2;
int epoch = 2;
int epoch = 7;
Set<Integer> voters = Utils.mkSet(localId, voter2, voter3);
RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch);
// One of the voters may have sent EndEpoch as a candidate because it
// had not yet been notified that the local node was the leader.
context.deliverRequest(context.endEpochRequest(epoch, OptionalInt.empty(), voter2, Arrays.asList(context.localId, voter3)));
// One of the voters may have sent EndQuorumEpoch from an earlier epoch
context.deliverRequest(context.endEpochRequest(epoch - 2, voter2, Arrays.asList(context.localId, voter3)));
context.client.poll();
context.assertSentEndQuorumEpochResponse(Errors.NONE, epoch, OptionalInt.of(context.localId));
context.assertSentEndQuorumEpochResponse(Errors.FENCED_LEADER_EPOCH, epoch, OptionalInt.of(context.localId));
// We should still be leader as long as fetch timeout has not expired
context.time.sleep(context.fetchTimeoutMs - 1);
@ -264,27 +366,6 @@ public class KafkaRaftClientTest {
context.assertElectedLeader(epoch, context.localId);
}
@Test
public void testEndQuorumStartsNewElectionAfterBackoffIfReceivedFromVotedCandidate() throws Exception {
int localId = 0;
int otherNodeId = 1;
int epoch = 2;
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.withVotedCandidate(epoch, otherNodeId)
.build();
context.deliverRequest(context.endEpochRequest(epoch, OptionalInt.empty(),
otherNodeId, Collections.singletonList(context.localId)));
context.client.poll();
context.assertSentEndQuorumEpochResponse(Errors.NONE, epoch, OptionalInt.empty());
context.time.sleep(context.electionBackoffMaxMs);
context.client.poll();
context.assertVotedCandidate(epoch + 1, context.localId);
}
@Test
public void testEndQuorumStartsNewElectionImmediatelyIfFollowerUnattached() throws Exception {
int localId = 0;
@ -297,7 +378,8 @@ public class KafkaRaftClientTest {
.withUnknownLeader(epoch)
.build();
context.deliverRequest(context.endEpochRequest(epoch, OptionalInt.of(voter2), voter2, Arrays.asList(context.localId, voter3)));
context.deliverRequest(context.endEpochRequest(epoch, voter2,
Arrays.asList(context.localId, voter3)));
context.client.poll();
context.assertSentEndQuorumEpochResponse(Errors.NONE, epoch, OptionalInt.of(voter2));
@ -406,7 +488,8 @@ public class KafkaRaftClientTest {
.withElectedLeader(leaderEpoch, oldLeaderId)
.build();
context.deliverRequest(context.endEpochRequest(leaderEpoch, OptionalInt.of(oldLeaderId), oldLeaderId, Collections.singletonList(context.localId)));
context.deliverRequest(context.endEpochRequest(leaderEpoch, oldLeaderId,
Collections.singletonList(context.localId)));
context.client.poll();
context.assertSentEndQuorumEpochResponse(Errors.NONE, leaderEpoch, OptionalInt.of(oldLeaderId));
@ -427,8 +510,8 @@ public class KafkaRaftClientTest {
.withElectedLeader(leaderEpoch, oldLeaderId)
.build();
context.deliverRequest(context.endEpochRequest(leaderEpoch,
OptionalInt.of(oldLeaderId), oldLeaderId, Arrays.asList(preferredNextLeader, context.localId)));
context.deliverRequest(context.endEpochRequest(leaderEpoch, oldLeaderId,
Arrays.asList(preferredNextLeader, context.localId)));
context.pollUntilSend();
context.assertSentEndQuorumEpochResponse(Errors.NONE, leaderEpoch, OptionalInt.of(oldLeaderId));
@ -896,7 +979,7 @@ public class KafkaRaftClientTest {
context.client.poll();
context.assertSentBeginQuorumEpochResponse(Errors.INCONSISTENT_VOTER_SET, epoch, OptionalInt.of(context.localId));
context.deliverRequest(context.endEpochRequest(epoch, OptionalInt.of(context.localId), nonVoterId, Collections.singletonList(otherNodeId)));
context.deliverRequest(context.endEpochRequest(epoch, nonVoterId, Collections.singletonList(otherNodeId)));
context.client.poll();
// The sent request has no context.localId as a preferable voter.
@ -1080,16 +1163,19 @@ public class KafkaRaftClientTest {
int voter2 = localId + 1;
int voter3 = localId + 2;
int epoch = 5;
// This node initializes as a candidate
Set<Integer> voters = Utils.mkSet(voter1, voter2, voter3);
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.withVotedCandidate(epoch, voter1)
.withUnknownLeader(epoch - 1)
.build();
context.assertVotedCandidate(epoch, voter1);
context.assertUnknownLeader(epoch - 1);
// Sleep a little to ensure that we become a candidate
context.time.sleep(context.electionTimeoutMs * 2);
// Wait until the vote requests are inflight
context.pollUntilSend();
context.assertVotedCandidate(epoch, context.localId);
List<RaftRequest.Outbound> voteRequests = context.collectVoteRequests(epoch, 0, 0);
assertEquals(2, voteRequests.size());
@ -1199,10 +1285,10 @@ public class KafkaRaftClientTest {
assertFalse(shutdownFuture.isDone());
// Send EndQuorumEpoch request to the other voter
context.client.poll();
context.pollUntilSend();
assertTrue(context.client.isShuttingDown());
assertTrue(context.client.isRunning());
context.assertSentEndQuorumEpochRequest(1, OptionalInt.of(context.localId), otherNodeId);
context.assertSentEndQuorumEpochRequest(1, otherNodeId);
// We should still be able to handle vote requests during graceful shutdown
// in order to help the new leader get elected
@ -1241,11 +1327,11 @@ public class KafkaRaftClientTest {
assertTrue(context.client.isRunning());
// Send EndQuorumEpoch request to the close follower
context.client.poll();
context.pollUntilSend();
assertTrue(context.client.isRunning());
List<RaftRequest.Outbound> endQuorumRequests =
context.collectEndQuorumRequests(1, OptionalInt.of(context.localId), Utils.mkSet(closeFollower, laggingFollower));
List<RaftRequest.Outbound> endQuorumRequests = context.collectEndQuorumRequests(
1, Utils.mkSet(closeFollower, laggingFollower));
assertEquals(2, endQuorumRequests.size());
}
@ -1312,10 +1398,10 @@ public class KafkaRaftClientTest {
assertFalse(shutdownFuture.isDone());
// Send EndQuorumEpoch request to the other vote
context.client.poll();
context.pollUntilSend();
assertTrue(context.client.isRunning());
context.assertSentEndQuorumEpochRequest(epoch, OptionalInt.of(context.localId), otherNodeId);
context.assertSentEndQuorumEpochRequest(epoch, otherNodeId);
// The shutdown timeout is hit before we receive any requests or responses indicating an epoch bump
context.time.sleep(shutdownTimeoutMs);
@ -1351,6 +1437,31 @@ public class KafkaRaftClientTest {
assertNull(shutdownFuture.get());
}
@Test
public void testObserverGracefulShutdown() throws Exception {
int localId = 0;
int voter1 = 1;
int voter2 = 2;
Set<Integer> voters = Utils.mkSet(voter1, voter2);
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.withUnknownLeader(5)
.build();
context.client.poll();
context.assertUnknownLeader(5);
// Observer shutdown should complete immediately even if the
// current leader is unknown
CompletableFuture<Void> shutdownFuture = context.client.shutdown(5000);
assertTrue(context.client.isRunning());
assertFalse(shutdownFuture.isDone());
context.client.poll();
assertFalse(context.client.isRunning());
assertTrue(shutdownFuture.isDone());
assertNull(shutdownFuture.get());
}
@Test
public void testGracefulShutdownSingleMemberQuorum() throws IOException {
int localId = 0;
@ -1663,12 +1774,14 @@ public class KafkaRaftClientTest {
Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.withVotedCandidate(epoch, localId)
.withUnknownLeader(epoch - 1)
.build();
// Sleep a little to ensure that we become a candidate
context.time.sleep(context.electionTimeoutMs * 2);
context.pollUntilSend();
context.assertVotedCandidate(epoch, context.localId);
context.pollUntilSend();
int correlationId = context.assertSentVoteRequest(epoch, 0, 0L);
VoteResponseData response = new VoteResponseData()
.setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
@ -1689,7 +1802,7 @@ public class KafkaRaftClientTest {
context.client.shutdown(5000);
context.pollUntilSend();
int correlationId = context.assertSentEndQuorumEpochRequest(epoch, OptionalInt.of(context.localId), otherNodeId);
int correlationId = context.assertSentEndQuorumEpochRequest(epoch, otherNodeId);
EndQuorumEpochResponseData response = new EndQuorumEpochResponseData()
.setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());

View File

@ -136,12 +136,13 @@ public class QuorumStateTest {
}
@Test
public void testInitializeAsCandidate() throws IOException {
public void testInitializeAsResignedCandidate() throws IOException {
int node1 = 1;
int node2 = 2;
int epoch = 5;
Set<Integer> voters = Utils.mkSet(localId, node1, node2);
store.writeElectionState(ElectionState.withVotedCandidate(epoch, localId, voters));
ElectionState election = ElectionState.withVotedCandidate(epoch, localId, voters);
store.writeElectionState(election);
int jitterMs = 2500;
Mockito.doReturn(jitterMs).when(random).nextInt(Mockito.anyInt());
@ -153,6 +154,7 @@ public class QuorumStateTest {
CandidateState candidateState = state.candidateStateOrThrow();
assertEquals(epoch, candidateState.epoch());
assertEquals(election, candidateState.election());
assertEquals(Utils.mkSet(node1, node2), candidateState.unrecordedVoters());
assertEquals(Utils.mkSet(localId), candidateState.grantingVoters());
assertEquals(Collections.emptySet(), candidateState.rejectingVoters());
@ -161,16 +163,17 @@ public class QuorumStateTest {
}
@Test
public void testInitializeAsLeader() throws IOException {
public void testInitializeAsResignedLeader() throws IOException {
int node1 = 1;
int node2 = 2;
int epoch = 5;
Set<Integer> voters = Utils.mkSet(localId, node1, node2);
store.writeElectionState(ElectionState.withElectedLeader(epoch, localId, voters));
ElectionState election = ElectionState.withElectedLeader(epoch, localId, voters);
store.writeElectionState(election);
// If we were previously a leader, we will start as unattached
// so that records are always uniquely defined by epoch and offset
// even accounting for the loss of unflushed data.
// If we were previously a leader, we will start as resigned in order to ensure
// a new leader gets elected. This ensures that records are always uniquely
// defined by epoch and offset even accounting for the loss of unflushed data.
// The election timeout should be reset after we become a candidate again
int jitterMs = 2500;
@ -181,10 +184,12 @@ public class QuorumStateTest {
assertFalse(state.isLeader());
assertEquals(epoch, state.epoch());
UnattachedState unattachedState = state.unattachedStateOrThrow();
assertEquals(epoch, unattachedState.epoch());
ResignedState resignedState = state.resignedStateOrThrow();
assertEquals(epoch, resignedState.epoch());
assertEquals(election, resignedState.election());
assertEquals(Utils.mkSet(node1, node2), resignedState.unackedVoters());
assertEquals(electionTimeoutMs + jitterMs,
unattachedState.remainingElectionTimeMs(time.milliseconds()));
resignedState.remainingElectionTimeMs(time.milliseconds()));
}
@Test
@ -230,6 +235,23 @@ public class QuorumStateTest {
candidate2.remainingElectionTimeMs(time.milliseconds()));
}
@Test
public void testCandidateToResigned() throws IOException {
int node1 = 1;
int node2 = 2;
Set<Integer> voters = Utils.mkSet(localId, node1, node2);
assertNull(store.readElectionState());
QuorumState state = initializeEmptyState(voters);
state.transitionToCandidate();
assertTrue(state.isCandidate());
assertEquals(1, state.epoch());
assertThrows(IllegalStateException.class, () ->
state.transitionToResigned(Collections.singletonList(localId)));
assertTrue(state.isCandidate());
}
@Test
public void testCandidateToLeader() throws IOException {
Set<Integer> voters = Utils.mkSet(localId);
@ -339,6 +361,27 @@ public class QuorumStateTest {
assertEquals(1, state.epoch());
}
@Test
public void testLeaderToResigned() throws IOException {
Set<Integer> voters = Utils.mkSet(localId);
assertNull(store.readElectionState());
QuorumState state = initializeEmptyState(voters);
state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
state.transitionToCandidate();
state.transitionToLeader(0L);
assertTrue(state.isLeader());
assertEquals(1, state.epoch());
state.transitionToResigned(Collections.singletonList(localId));
assertTrue(state.isResigned());
ResignedState resignedState = state.resignedStateOrThrow();
assertEquals(ElectionState.withElectedLeader(1, localId, voters),
resignedState.election());
assertEquals(1, resignedState.epoch());
assertEquals(Collections.emptySet(), resignedState.unackedVoters());
}
@Test
public void testLeaderToCandidate() throws IOException {
Set<Integer> voters = Utils.mkSet(localId);
@ -434,7 +477,7 @@ public class QuorumStateTest {
}
@Test
public void testUnattachedToLeader() throws IOException {
public void testUnattachedToLeaderOrResigned() throws IOException {
int leaderId = 1;
int epoch = 5;
Set<Integer> voters = Utils.mkSet(localId, leaderId);
@ -443,6 +486,7 @@ public class QuorumStateTest {
state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
assertTrue(state.isUnattached());
assertThrows(IllegalStateException.class, () -> state.transitionToLeader(0L));
assertThrows(IllegalStateException.class, () -> state.transitionToResigned(Collections.emptyList()));
}
@Test
@ -568,7 +612,7 @@ public class QuorumStateTest {
}
@Test
public void testVotedToLeader() throws IOException {
public void testVotedToInvalidLeaderOrResigned() throws IOException {
int node1 = 1;
int node2 = 2;
Set<Integer> voters = Utils.mkSet(localId, node1, node2);
@ -576,6 +620,7 @@ public class QuorumStateTest {
state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
state.transitionToVoted(5, node1);
assertThrows(IllegalStateException.class, () -> state.transitionToLeader(0));
assertThrows(IllegalStateException.class, () -> state.transitionToResigned(Collections.emptyList()));
}
@Test
@ -721,7 +766,7 @@ public class QuorumStateTest {
}
@Test
public void testFollowerToLeader() throws IOException {
public void testFollowerToLeaderOrResigned() throws IOException {
int node1 = 1;
int node2 = 2;
Set<Integer> voters = Utils.mkSet(localId, node1, node2);
@ -729,6 +774,7 @@ public class QuorumStateTest {
state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
state.transitionToFollower(8, node2);
assertThrows(IllegalStateException.class, () -> state.transitionToLeader(0));
assertThrows(IllegalStateException.class, () -> state.transitionToResigned(Collections.emptyList()));
}
@Test

View File

@ -44,6 +44,7 @@ import org.apache.kafka.common.requests.BeginQuorumEpochRequest;
import org.apache.kafka.common.requests.BeginQuorumEpochResponse;
import org.apache.kafka.common.requests.DescribeQuorumResponse;
import org.apache.kafka.common.requests.EndQuorumEpochRequest;
import org.apache.kafka.common.requests.EndQuorumEpochResponse;
import org.apache.kafka.common.requests.VoteRequest;
import org.apache.kafka.common.requests.VoteResponse;
import org.apache.kafka.common.utils.LogContext;
@ -85,7 +86,7 @@ final class RaftClientTestContext {
final int electionTimeoutMs = Builder.DEFAULT_ELECTION_TIMEOUT_MS;
final int electionFetchMaxWaitMs = Builder.FETCH_MAX_WAIT_MS;
final int fetchTimeoutMs = Builder.FETCH_TIMEOUT_MS;
final int requestTimeoutMs = Builder.REQUEST_TIMEOUT_MS;
final int requestTimeoutMs = Builder.DEFAULT_REQUEST_TIMEOUT_MS;
final int retryBackoffMs = Builder.RETRY_BACKOFF_MS;
private final QuorumStateStore quorumStateStore;
@ -107,7 +108,7 @@ final class RaftClientTestContext {
private static final int FETCH_MAX_WAIT_MS = 0;
// fetch timeout is usually larger than election timeout
private static final int FETCH_TIMEOUT_MS = 50000;
private static final int REQUEST_TIMEOUT_MS = 5000;
private static final int DEFAULT_REQUEST_TIMEOUT_MS = 5000;
private static final int RETRY_BACKOFF_MS = 50;
private static final int DEFAULT_APPEND_LINGER_MS = 0;
@ -118,6 +119,7 @@ final class RaftClientTestContext {
private final Set<Integer> voters;
private final int localId;
private int requestTimeoutMs = DEFAULT_REQUEST_TIMEOUT_MS;
private int electionTimeoutMs = DEFAULT_ELECTION_TIMEOUT_MS;
private int appendLingerMs = DEFAULT_APPEND_LINGER_MS;
private MemoryPool memoryPool = MemoryPool.NONE;
@ -163,6 +165,16 @@ final class RaftClientTestContext {
return this;
}
Builder withElectionTimeoutMs(int electionTimeoutMs) {
this.electionTimeoutMs = electionTimeoutMs;
return this;
}
Builder withRequestTimeoutMs(int requestTimeoutMs) {
this.requestTimeoutMs = requestTimeoutMs;
return this;
}
RaftClientTestContext build() throws IOException {
Metrics metrics = new Metrics(time);
MockNetworkChannel channel = new MockNetworkChannel();
@ -190,7 +202,7 @@ final class RaftClientTestContext {
voterAddresses,
ELECTION_BACKOFF_MAX_MS,
RETRY_BACKOFF_MS,
REQUEST_TIMEOUT_MS,
requestTimeoutMs,
FETCH_MAX_WAIT_MS,
appendLingerMs,
logContext,
@ -462,9 +474,9 @@ final class RaftClientTestContext {
assertEquals(partitionError, Errors.forCode(partitionResponse.errorCode()));
}
int assertSentEndQuorumEpochRequest(int epoch, OptionalInt leaderId, int destinationId) {
int assertSentEndQuorumEpochRequest(int epoch, int destinationId) {
List<RaftRequest.Outbound> endQuorumRequests = collectEndQuorumRequests(
epoch, leaderId, Collections.singleton(destinationId));
epoch, Collections.singleton(destinationId));
assertEquals(1, endQuorumRequests.size());
return endQuorumRequests.get(0).correlationId();
}
@ -553,7 +565,7 @@ final class RaftClientTestContext {
assertSentFetchResponse(1L, epoch);
}
List<RaftRequest.Outbound> collectEndQuorumRequests(int epoch, OptionalInt leaderId, Set<Integer> destinationIdSet) {
List<RaftRequest.Outbound> collectEndQuorumRequests(int epoch, Set<Integer> destinationIdSet) {
List<RaftRequest.Outbound> endQuorumRequests = new ArrayList<>();
Set<Integer> collectedDestinationIdSet = new HashSet<>();
for (RaftMessage raftMessage : channel.drainSendQueue()) {
@ -564,8 +576,7 @@ final class RaftClientTestContext {
request.topics().get(0).partitions().get(0);
assertEquals(epoch, partitionRequest.leaderEpoch());
assertEquals(leaderId.orElse(-1), partitionRequest.leaderId());
assertEquals(localId, partitionRequest.replicaId());
assertEquals(localId, partitionRequest.leaderId());
RaftRequest.Outbound outboundRequest = (RaftRequest.Outbound) raftMessage;
collectedDestinationIdSet.add(outboundRequest.destinationId());
@ -626,16 +637,28 @@ final class RaftClientTestContext {
return new InetSocketAddress("localhost", 9990 + id);
}
EndQuorumEpochResponseData endEpochResponse(
int epoch,
OptionalInt leaderId
) {
return EndQuorumEpochResponse.singletonResponse(
Errors.NONE,
metadataPartition,
Errors.NONE,
epoch,
leaderId.orElse(-1)
);
}
EndQuorumEpochRequestData endEpochRequest(
int epoch,
OptionalInt leaderId,
int replicaId,
List<Integer> preferredSuccessors) {
int leaderId,
List<Integer> preferredSuccessors
) {
return EndQuorumEpochRequest.singletonRequest(
metadataPartition,
replicaId,
epoch,
leaderId.orElse(-1),
leaderId,
preferredSuccessors
);
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.raft;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
class ResignedStateTest {
private final MockTime time = new MockTime();
@Test
public void testResignedState() {
int electionTimeoutMs = 5000;
int localId = 0;
int remoteId = 1;
int epoch = 5;
Set<Integer> voters = Utils.mkSet(localId, remoteId);
ResignedState state = new ResignedState(
time,
localId,
epoch,
voters,
electionTimeoutMs,
Collections.emptyList()
);
assertEquals(ElectionState.withElectedLeader(epoch, localId, voters), state.election());
assertEquals(epoch, state.epoch());
assertEquals(Collections.singleton(remoteId), state.unackedVoters());
state.acknowledgeResignation(remoteId);
assertEquals(Collections.emptySet(), state.unackedVoters());
assertEquals(electionTimeoutMs, state.remainingElectionTimeMs(time.milliseconds()));
assertFalse(state.hasElectionTimeoutExpired(time.milliseconds()));
time.sleep(electionTimeoutMs / 2);
assertEquals(electionTimeoutMs / 2, state.remainingElectionTimeMs(time.milliseconds()));
assertFalse(state.hasElectionTimeoutExpired(time.milliseconds()));
time.sleep(electionTimeoutMs / 2);
assertEquals(0, state.remainingElectionTimeMs(time.milliseconds()));
assertTrue(state.hasElectionTimeoutExpired(time.milliseconds()));
}
}