diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndQuorumEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/EndQuorumEpochRequest.java index 6aba5601585..017897f5aa2 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/EndQuorumEpochRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/EndQuorumEpochRequest.java @@ -70,16 +70,14 @@ public class EndQuorumEpochRequest extends AbstractRequest { } public static EndQuorumEpochRequestData singletonRequest(TopicPartition topicPartition, - int replicaId, int leaderEpoch, int leaderId, List preferredSuccessors) { - return singletonRequest(topicPartition, null, replicaId, leaderEpoch, leaderId, preferredSuccessors); + return singletonRequest(topicPartition, null, leaderEpoch, leaderId, preferredSuccessors); } public static EndQuorumEpochRequestData singletonRequest(TopicPartition topicPartition, String clusterId, - int replicaId, int leaderEpoch, int leaderId, List preferredSuccessors) { @@ -91,7 +89,6 @@ public class EndQuorumEpochRequest extends AbstractRequest { .setPartitions(Collections.singletonList( new EndQuorumEpochRequestData.PartitionData() .setPartitionIndex(topicPartition.partition()) - .setReplicaId(replicaId) .setLeaderEpoch(leaderEpoch) .setLeaderId(leaderId) .setPreferredSuccessors(preferredSuccessors)))) diff --git a/clients/src/main/resources/common/message/EndQuorumEpochRequest.json b/clients/src/main/resources/common/message/EndQuorumEpochRequest.json index e8ef7cae3ec..45dacde5fe9 100644 --- a/clients/src/main/resources/common/message/EndQuorumEpochRequest.json +++ b/clients/src/main/resources/common/message/EndQuorumEpochRequest.json @@ -29,10 +29,8 @@ "versions": "0+", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, - { "name": "ReplicaId", "type": "int32", "versions": "0+", - "about": "The ID of the replica sending this request"}, { "name": "LeaderId", "type": "int32", "versions": "0+", - "about": "The current leader ID or -1 if there is a vote in progress"}, + "about": "The current leader ID that is resigning"}, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The current epoch"}, { "name": "PreferredSuccessors", "type": "[]int32", "versions": "0+", diff --git a/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala b/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala index b95b00b9429..71697589650 100644 --- a/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala +++ b/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala @@ -163,9 +163,8 @@ class KafkaNetworkChannelTest { BeginQuorumEpochRequest.singletonRequest(topicPartition, clusterId, leaderEpoch, leaderId) case ApiKeys.END_QUORUM_EPOCH => - val replicaId = 1 - EndQuorumEpochRequest.singletonRequest(topicPartition, clusterId, replicaId, - leaderId, leaderEpoch, Collections.singletonList(2)) + EndQuorumEpochRequest.singletonRequest(topicPartition, clusterId, leaderId, + leaderEpoch, Collections.singletonList(2)) case ApiKeys.VOTE => val lastEpoch = 4 diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index b596a40f1e9..ed886363b17 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -578,7 +578,7 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.END_QUORUM_EPOCH => new EndQuorumEpochRequest.Builder(EndQuorumEpochRequest.singletonRequest( - tp, 10, 2, 5, Collections.singletonList(3))) + tp, 10, 5, Collections.singletonList(3))) case ApiKeys.ALTER_ISR => new AlterIsrRequest.Builder(new AlterIsrRequestData()) diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 029290f08ef..66553f66f33 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -434,6 +434,11 @@ public class KafkaRaftClient implements RaftClient { resetConnections(); } + private void transitionToResigned(List preferredSuccessors) { + quorum.transitionToResigned(preferredSuccessors); + resetConnections(); + } + private void transitionToVoted(int candidateId, int epoch) throws IOException { maybeResignLeadership(); quorum.transitionToVoted(epoch, candidateId); @@ -517,13 +522,17 @@ public class KafkaRaftClient implements RaftClient { final boolean voteGranted; if (quorum.isLeader()) { - logger.debug("Ignoring vote request {} with epoch {} since we are already leader on that epoch", + logger.debug("Rejecting vote request {} with epoch {} since we are already leader on that epoch", request, candidateEpoch); voteGranted = false; } else if (quorum.isCandidate()) { - logger.debug("Ignoring vote request {} with epoch {} since we are already candidate on that epoch", + logger.debug("Rejecting vote request {} with epoch {} since we are already candidate on that epoch", request, candidateEpoch); voteGranted = false; + } else if (quorum.isResigned()) { + logger.debug("Rejecting vote request {} with epoch {} since we have resigned as candidate/leader in this epoch", + request, candidateEpoch); + voteGranted = false; } else if (quorum.isFollower()) { FollowerState state = quorum.followerStateOrThrow(); logger.debug("Rejecting vote request {} with epoch {} since we already have a leader {} on that epoch", @@ -747,32 +756,27 @@ public class KafkaRaftClient implements RaftClient { request.topics().get(0).partitions().get(0); int requestEpoch = partitionRequest.leaderEpoch(); - int requestReplicaId = partitionRequest.replicaId(); + int requestLeaderId = partitionRequest.leaderId(); - Optional errorOpt = validateVoterOnlyRequest(requestReplicaId, requestEpoch); + Optional errorOpt = validateVoterOnlyRequest(requestLeaderId, requestEpoch); if (errorOpt.isPresent()) { return buildEndQuorumEpochResponse(errorOpt.get()); } - - OptionalInt requestLeaderId = optionalLeaderId(partitionRequest.leaderId()); - maybeTransition(requestLeaderId, requestEpoch, currentTimeMs); + maybeTransition(OptionalInt.of(requestLeaderId), requestEpoch, currentTimeMs); if (quorum.isFollower()) { FollowerState state = quorum.followerStateOrThrow(); - if (state.leaderId() == requestReplicaId) { + if (state.leaderId() == requestLeaderId) { List preferredSuccessors = partitionRequest.preferredSuccessors(); if (!preferredSuccessors.contains(quorum.localId)) { return buildEndQuorumEpochResponse(Errors.INCONSISTENT_VOTER_SET); } long electionBackoffMs = endEpochElectionBackoff(preferredSuccessors); + logger.debug("Overriding follower fetch timeout to {} after receiving " + + "EndQuorumEpoch request from leader {} in epoch {}", electionBackoffMs, + requestLeaderId, requestEpoch); state.overrideFetchTimeout(currentTimeMs, electionBackoffMs); } - } else if (quorum.isVoted()) { - VotedState state = quorum.votedStateOrThrow(); - if (state.votedId() == requestReplicaId) { - long electionBackoffMs = binaryExponentialElectionBackoffMs(1); - state.overrideElectionTimeout(currentTimeMs, electionBackoffMs); - } } return buildEndQuorumEpochResponse(Errors.NONE); } @@ -816,6 +820,8 @@ public class KafkaRaftClient implements RaftClient { if (handled.isPresent()) { return handled.get(); } else if (partitionError == Errors.NONE) { + ResignedState resignedState = quorum.resignedStateOrThrow(); + resignedState.acknowledgeResignation(responseMetadata.sourceId()); return true; } else { return handleUnexpectedError(partitionError, responseMetadata); @@ -1374,17 +1380,14 @@ public class KafkaRaftClient implements RaftClient { return connection.remainingRequestTimeMs(currentTimeMs); } - private EndQuorumEpochRequestData buildEndQuorumEpochRequest() { - List preferredSuccessors = quorum.isLeader() ? - quorum.leaderStateOrThrow().nonLeaderVotersByDescendingFetchOffset() : - Collections.emptyList(); - + private EndQuorumEpochRequestData buildEndQuorumEpochRequest( + ResignedState state + ) { return EndQuorumEpochRequest.singletonRequest( log.topicPartition(), - quorum.localId, quorum.epoch(), - quorum.leaderIdOrNil(), - preferredSuccessors + quorum.localId, + state.preferredSuccessors() ); } @@ -1457,40 +1460,6 @@ public class KafkaRaftClient implements RaftClient { return gracefulShutdown != null && !gracefulShutdown.isFinished(); } - private void pollShutdown(GracefulShutdown shutdown) throws IOException { - // Graceful shutdown allows a leader or candidate to resign its leadership without - // awaiting expiration of the election timeout. As soon as another leader is elected, - // the shutdown is considered complete. - - shutdown.update(); - if (shutdown.isFinished()) { - return; - } - - long currentTimeMs = shutdown.finishTimer.currentTimeMs(); - - if (quorum.remoteVoters().isEmpty() || quorum.hasRemoteLeader()) { - shutdown.complete(); - return; - } - - long pollTimeoutMs = shutdown.finishTimer.remainingMs(); - if (quorum.isLeader() || quorum.isCandidate()) { - long backoffMs = maybeSendRequests( - currentTimeMs, - quorum.remoteVoters(), - this::buildEndQuorumEpochRequest - ); - pollTimeoutMs = Math.min(backoffMs, pollTimeoutMs); - } - - List inboundMessages = channel.receive(pollTimeoutMs); - for (RaftMessage message : inboundMessages) { - handleInboundMessage(message, currentTimeMs); - currentTimeMs = time.milliseconds(); - } - } - private void appendBatch( LeaderState state, BatchAccumulator.CompletedBatch batch, @@ -1545,10 +1514,40 @@ public class KafkaRaftClient implements RaftClient { return timeUnitFlush; } + private long pollResigned(long currentTimeMs) throws IOException { + ResignedState state = quorum.resignedStateOrThrow(); + long endQuorumBackoffMs = maybeSendRequests( + currentTimeMs, + state.unackedVoters(), + () -> buildEndQuorumEpochRequest(state) + ); + + GracefulShutdown shutdown = this.shutdown.get(); + final long stateTimeoutMs; + if (shutdown != null) { + // If we are shutting down, then we will remain in the resigned state + // until either the shutdown expires or an election bumps the epoch + stateTimeoutMs = shutdown.remainingTimeMs(); + } else if (state.hasElectionTimeoutExpired(currentTimeMs)) { + transitionToCandidate(currentTimeMs); + stateTimeoutMs = 0L; + } else { + stateTimeoutMs = state.remainingElectionTimeMs(currentTimeMs); + } + + return Math.min(stateTimeoutMs, endQuorumBackoffMs); + } + private long pollLeader(long currentTimeMs) { LeaderState state = quorum.leaderStateOrThrow(); maybeFireHandleClaim(state); + GracefulShutdown shutdown = this.shutdown.get(); + if (shutdown != null) { + transitionToResigned(state.nonLeaderVotersByDescendingFetchOffset()); + return 0L; + } + long timeUntilFlush = maybeAppendBatches( state, currentTimeMs @@ -1563,9 +1562,34 @@ public class KafkaRaftClient implements RaftClient { return Math.min(timeUntilFlush, timeUntilSend); } + private long maybeSendVoteRequests( + CandidateState state, + long currentTimeMs + ) { + // Continue sending Vote requests as long as we still have a chance to win the election + if (!state.isVoteRejected()) { + return maybeSendRequests( + currentTimeMs, + state.unrecordedVoters(), + this::buildVoteRequest + ); + } + return Long.MAX_VALUE; + } + private long pollCandidate(long currentTimeMs) throws IOException { CandidateState state = quorum.candidateStateOrThrow(); - if (state.isBackingOff()) { + GracefulShutdown shutdown = this.shutdown.get(); + + if (shutdown != null) { + // If we happen to shutdown while we are a candidate, we will continue + // with the current election until one of the following conditions is met: + // 1) we are elected as leader (which allows us to resign) + // 2) another leader is elected + // 3) the shutdown timer expires + long minRequestBackoffMs = maybeSendVoteRequests(state, currentTimeMs); + return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs); + } else if (state.isBackingOff()) { if (state.isBackoffComplete(currentTimeMs)) { logger.info("Re-elect as candidate after election backoff has completed"); transitionToCandidate(currentTimeMs); @@ -1578,16 +1602,9 @@ public class KafkaRaftClient implements RaftClient { backoffDurationMs); state.startBackingOff(currentTimeMs, backoffDurationMs); return backoffDurationMs; - } else if (!state.isVoteRejected()) { - // Continue sending Vote requests as long as we still have a chance to win the election - long minRequestBackoffMs = maybeSendRequests( - currentTimeMs, - state.unrecordedVoters(), - this::buildVoteRequest - ); - return Math.min(minRequestBackoffMs, state.remainingElectionTimeMs(currentTimeMs)); } else { - return state.remainingElectionTimeMs(currentTimeMs); + long minRequestBackoffMs = maybeSendVoteRequests(state, currentTimeMs); + return Math.min(minRequestBackoffMs, state.remainingElectionTimeMs(currentTimeMs)); } } @@ -1601,7 +1618,12 @@ public class KafkaRaftClient implements RaftClient { } private long pollFollowerAsVoter(FollowerState state, long currentTimeMs) throws IOException { - if (state.hasFetchTimeoutExpired(currentTimeMs)) { + GracefulShutdown shutdown = this.shutdown.get(); + if (shutdown != null) { + // If we are a follower, then we can shutdown immediately. We want to + // skip the transition to candidate in any case. + return 0; + } else if (state.hasFetchTimeoutExpired(currentTimeMs)) { logger.info("Become candidate due to fetch timeout"); transitionToCandidate(currentTimeMs); return 0L; @@ -1636,7 +1658,6 @@ public class KafkaRaftClient implements RaftClient { state.leaderId(), this::buildFetchRequest ); - } return Math.min(backoffMs, state.remainingFetchTimeMs(currentTimeMs)); } @@ -1644,7 +1665,13 @@ public class KafkaRaftClient implements RaftClient { private long pollVoted(long currentTimeMs) throws IOException { VotedState state = quorum.votedStateOrThrow(); - if (state.hasElectionTimeoutExpired(currentTimeMs)) { + GracefulShutdown shutdown = this.shutdown.get(); + + if (shutdown != null) { + // If shutting down, then remain in this state until either the + // shutdown completes or an epoch bump forces another state transition + return shutdown.remainingTimeMs(); + } else if (state.hasElectionTimeoutExpired(currentTimeMs)) { transitionToCandidate(currentTimeMs); return 0L; } else { @@ -1662,7 +1689,12 @@ public class KafkaRaftClient implements RaftClient { } private long pollUnattachedAsVoter(UnattachedState state, long currentTimeMs) throws IOException { - if (state.hasElectionTimeoutExpired(currentTimeMs)) { + GracefulShutdown shutdown = this.shutdown.get(); + if (shutdown != null) { + // If shutting down, then remain in this state until either the + // shutdown completes or an epoch bump forces another state transition + return shutdown.remainingTimeMs(); + } else if (state.hasElectionTimeoutExpired(currentTimeMs)) { transitionToCandidate(currentTimeMs); return 0L; } else { @@ -1686,6 +1718,8 @@ public class KafkaRaftClient implements RaftClient { return pollVoted(currentTimeMs); } else if (quorum.isUnattached()) { return pollUnattached(currentTimeMs); + } else if (quorum.isResigned()) { + return pollResigned(currentTimeMs); } else { throw new IllegalStateException("Unexpected quorum state " + quorum); } @@ -1713,26 +1747,48 @@ public class KafkaRaftClient implements RaftClient { }); } + private boolean maybeCompleteShutdown(long currentTimeMs) { + GracefulShutdown shutdown = this.shutdown.get(); + if (shutdown == null) { + return false; + } + + shutdown.update(currentTimeMs); + if (shutdown.hasTimedOut()) { + shutdown.failWithTimeout(); + return true; + } + + if (quorum.isObserver() + || quorum.remoteVoters().isEmpty() + || quorum.hasRemoteLeader()) { + + shutdown.complete(); + return true; + } + + return false; + } + public void poll() throws IOException { - GracefulShutdown gracefulShutdown = shutdown.get(); - if (gracefulShutdown != null) { - pollShutdown(gracefulShutdown); - } else { - pollListeners(); + pollListeners(); - long currentTimeMs = time.milliseconds(); - long pollTimeoutMs = pollCurrentState(currentTimeMs); - kafkaRaftMetrics.updatePollStart(currentTimeMs); + long currentTimeMs = time.milliseconds(); + if (maybeCompleteShutdown(currentTimeMs)) { + return; + } - List inboundMessages = channel.receive(pollTimeoutMs); + long pollTimeoutMs = pollCurrentState(currentTimeMs); + kafkaRaftMetrics.updatePollStart(currentTimeMs); + List inboundMessages = channel.receive(pollTimeoutMs); + + currentTimeMs = time.milliseconds(); + kafkaRaftMetrics.updatePollEnd(currentTimeMs); + + for (RaftMessage message : inboundMessages) { + handleInboundMessage(message, currentTimeMs); currentTimeMs = time.milliseconds(); - kafkaRaftMetrics.updatePollEnd(currentTimeMs); - - for (RaftMessage message : inboundMessages) { - handleInboundMessage(message, currentTimeMs); - currentTimeMs = time.milliseconds(); - } } } @@ -1777,26 +1833,27 @@ public class KafkaRaftClient implements RaftClient { this.completeFuture = completeFuture; } - public void update() { - finishTimer.update(); - if (finishTimer.isExpired()) { - close(); - logger.warn("Graceful shutdown timed out after {}ms", finishTimer.timeoutMs()); - completeFuture.completeExceptionally( - new TimeoutException("Timeout expired before shutdown completed")); - } + public void update(long currentTimeMs) { + finishTimer.update(currentTimeMs); + } + + public boolean hasTimedOut() { + return finishTimer.isExpired(); } public boolean isFinished() { return completeFuture.isDone(); } - public boolean succeeded() { - return isFinished() && !failed(); + public long remainingTimeMs() { + return finishTimer.remainingMs(); } - public boolean failed() { - return completeFuture.isCompletedExceptionally(); + public void failWithTimeout() { + close(); + logger.warn("Graceful shutdown timed out after {}ms", finishTimer.timeoutMs()); + completeFuture.completeExceptionally( + new TimeoutException("Timeout expired before graceful shutdown completed")); } public void complete() { diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java index 720d82b586f..d1eea74dbb5 100644 --- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java +++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java @@ -21,7 +21,9 @@ import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import java.io.IOException; +import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Optional; import java.util.OptionalInt; import java.util.Random; @@ -29,10 +31,11 @@ import java.util.Set; import java.util.stream.Collectors; /** - * This class is responsible for managing the current state of this node and ensuring only - * valid state transitions. + * This class is responsible for managing the current state of this node and ensuring + * only valid state transitions. Below we define the possible state transitions and + * how they are triggered: * - * Unattached => + * Unattached|Resigned => * Unattached: After learning of a new election with a higher epoch * Voted: After granting a vote to a candidate * Candidate: After expiration of the election timeout @@ -49,15 +52,16 @@ import java.util.stream.Collectors; * * Leader => * Unattached: After learning of a new election with a higher epoch + * Resigned: When shutting down gracefully * * Follower => * Unattached: After learning of a new election with a higher epoch * Candidate: After expiration of the fetch timeout * Follower: After discovering a leader with a larger epoch * - * Observers follow a simpler state machine. The Voted/Candidate/Leader states - * are not possible for observers, so the only transitions that are possible are - * between Unattached and Follower. + * Observers follow a simpler state machine. The Voted/Candidate/Leader/Resigned + * states are not possible for observers, so the only transitions that are possible + * are between Unattached and Follower. * * Unattached => * Unattached: After learning of a new election with a higher epoch @@ -136,16 +140,19 @@ public class QuorumState { randomElectionTimeoutMs() ); } else if (election.isLeader(localId)) { - // If we were previously a leader, then we will start out as unattached - // in the same epoch. This protects the invariant that each record - // is uniquely identified by offset and epoch, which might otherwise - // be violated if unflushed data is lost after restarting. - initialState = new UnattachedState( + // If we were previously a leader, then we will start out as resigned + // in the same epoch. This serves two purposes: + // 1. It ensures that we cannot vote for another leader in the same epoch. + // 2. It protects the invariant that each record is uniquely identified by + // offset and epoch, which might otherwise be violated if unflushed data + // is lost after restarting. + initialState = new ResignedState( time, + localId, election.epoch, voters, - Optional.empty(), - randomElectionTimeoutMs() + randomElectionTimeoutMs(), + Collections.emptyList() ); } else if (election.isVotedCandidate(localId)) { initialState = new CandidateState( @@ -232,9 +239,28 @@ public class QuorumState { return !isVoter(); } + public void transitionToResigned(List preferredSuccessors) { + if (!isLeader()) { + throw new IllegalStateException("Invalid transition to Resigned state from " + state); + } + + // The Resigned state is a soft state which does not need to be persisted. + // A leader will always be re-initialized in this state. + int epoch = state.epoch(); + this.state = new ResignedState( + time, + localId, + epoch, + voters, + randomElectionTimeoutMs(), + preferredSuccessors + ); + log.info("Completed transition to {}", state); + } + /** - * Transition to the "unattached" state. This means we have found an epoch strictly larger - * than what is currently known, but wo do not yet know of an elected leader. + * Transition to the "unattached" state. This means we have found an epoch greater than + * or equal to the current epoch, but wo do not yet know of the elected leader. */ public void transitionToUnattached(int epoch) throws IOException { int currentEpoch = state.epoch(); @@ -434,6 +460,12 @@ public class QuorumState { throw new IllegalStateException("Expected to be Leader, but current state is " + state); } + public ResignedState resignedStateOrThrow() { + if (isResigned()) + return (ResignedState) state; + throw new IllegalStateException("Expected to be Resigned, but current state is " + state); + } + public CandidateState candidateStateOrThrow() { if (isCandidate()) return (CandidateState) state; @@ -461,6 +493,10 @@ public class QuorumState { return state instanceof LeaderState; } + public boolean isResigned() { + return state instanceof ResignedState; + } + public boolean isCandidate() { return state instanceof CandidateState; } diff --git a/raft/src/main/java/org/apache/kafka/raft/ResignedState.java b/raft/src/main/java/org/apache/kafka/raft/ResignedState.java new file mode 100644 index 00000000000..0dcb719a42e --- /dev/null +++ b/raft/src/main/java/org/apache/kafka/raft/ResignedState.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This state represents a leader which has fenced itself either because it + * is shutting down or because it has encountered a soft failure of some sort. + * No writes are accepted in this state and we are not permitted to vote for + * any other candidate in this epoch. + * + * A resigned leader may initiate a new election by sending `EndQuorumEpoch` + * requests to all of the voters. This state tracks delivery of this request + * in order to prevent unnecessary retries. + * + * A voter will remain in the `Resigned` state until we either learn about + * another election, or our own election timeout expires and we become a + * Candidate. + */ +public class ResignedState implements EpochState { + private final int localId; + private final int epoch; + private final Set voters; + private final long electionTimeoutMs; + private final Set unackedVoters; + private final Timer electionTimer; + private final List preferredSuccessors; + + public ResignedState( + Time time, + int localId, + int epoch, + Set voters, + long electionTimeoutMs, + List preferredSuccessors + ) { + this.localId = localId; + this.epoch = epoch; + this.voters = voters; + this.unackedVoters = new HashSet<>(voters); + this.unackedVoters.remove(localId); + this.electionTimeoutMs = electionTimeoutMs; + this.electionTimer = time.timer(electionTimeoutMs); + this.preferredSuccessors = preferredSuccessors; + } + + @Override + public ElectionState election() { + return ElectionState.withElectedLeader(epoch, localId, voters); + } + + @Override + public int epoch() { + return epoch; + } + + /** + * Get the set of voters which have yet to acknowledge the resignation. + * This node will send `EndQuorumEpoch` requests to this set until these + * voters acknowledge the request or we transition to another state. + * + * @return the set of unacknowledged voters + */ + public Set unackedVoters() { + return unackedVoters; + } + + /** + * Invoked after receiving a successful `EndQuorumEpoch` response. This + * is in order to prevent unnecessary retries. + * + * @param voterId the ID of the voter that send the successful response + */ + public void acknowledgeResignation(int voterId) { + if (!voters.contains(voterId)) { + throw new IllegalArgumentException("Attempt to acknowledge delivery of `EndQuorumEpoch` " + + "by a non-voter " + voterId); + } + unackedVoters.remove(voterId); + } + + /** + * Check whether the timeout has expired. + * + * @param currentTimeMs current time in milliseconds + * @return true if the timeout has expired, false otherwise + */ + public boolean hasElectionTimeoutExpired(long currentTimeMs) { + electionTimer.update(currentTimeMs); + return electionTimer.isExpired(); + } + + /** + * Check the time remaining until the timeout expires. + * + * @param currentTimeMs current time in milliseconds + * @return the duration in milliseconds from the current time before the timeout expires + */ + public long remainingElectionTimeMs(long currentTimeMs) { + electionTimer.update(currentTimeMs); + return electionTimer.remainingMs(); + } + + public List preferredSuccessors() { + return preferredSuccessors; + } + + @Override + public String name() { + return "Resigned"; + } + + @Override + public String toString() { + return "ResignedState(" + + "localId=" + localId + + ", epoch=" + epoch + + ", voters=" + voters + + ", electionTimeoutMs=" + electionTimeoutMs + + ", unackedVoters=" + unackedVoters + + ", preferredSuccessors=" + preferredSuccessors + + ')'; + } +} diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 57b13822b0b..126a380d397 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -88,7 +88,57 @@ public class KafkaRaftClientTest { } @Test - public void testInitializeAsLeaderFromStateStore() throws Exception { + public void testRejectVotesFromSameEpochAfterResigningLeadership() throws Exception { + int localId = 0; + int remoteId = 1; + Set voters = Utils.mkSet(localId, remoteId); + int epoch = 2; + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .updateRandom(random -> { + Mockito.doReturn(0).when(random).nextInt(DEFAULT_ELECTION_TIMEOUT_MS); + }) + .withElectedLeader(epoch, localId) + .build(); + + assertEquals(0L, context.log.endOffset().offset); + context.assertElectedLeader(epoch, localId); + + // Since we were the leader in epoch 2, we should ensure that we will not vote for any + // other voter in the same epoch, even if it has caught up to the same position. + context.deliverRequest(context.voteRequest(epoch, remoteId, + context.log.lastFetchedEpoch(), context.log.endOffset().offset)); + context.client.poll(); + context.assertSentVoteResponse(Errors.NONE, epoch, OptionalInt.of(localId), false); + } + + @Test + public void testRejectVotesFromSameEpochAfterResigningCandidacy() throws Exception { + int localId = 0; + int remoteId = 1; + Set voters = Utils.mkSet(localId, remoteId); + int epoch = 2; + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .updateRandom(random -> { + Mockito.doReturn(0).when(random).nextInt(DEFAULT_ELECTION_TIMEOUT_MS); + }) + .withVotedCandidate(epoch, localId) + .build(); + + assertEquals(0L, context.log.endOffset().offset); + context.assertVotedCandidate(epoch, localId); + + // Since we were the leader in epoch 2, we should ensure that we will not vote for any + // other voter in the same epoch, even if it has caught up to the same position. + context.deliverRequest(context.voteRequest(epoch, remoteId, + context.log.lastFetchedEpoch(), context.log.endOffset().offset)); + context.client.poll(); + context.assertSentVoteResponse(Errors.NONE, epoch, OptionalInt.empty(), false); + } + + @Test + public void testInitializeAsResignedLeaderFromStateStore() throws Exception { int localId = 0; Set voters = Utils.mkSet(localId, 1); int epoch = 2; @@ -100,14 +150,63 @@ public class KafkaRaftClientTest { .withElectedLeader(epoch, localId) .build(); + // The node will remain elected, but start up in a resigned state + // in which no additional writes are accepted. assertEquals(0L, context.log.endOffset().offset); - context.assertUnknownLeader(epoch); + context.assertElectedLeader(epoch, localId); + context.client.poll(); + assertEquals(Long.MAX_VALUE, context.client.scheduleAppend(epoch, Arrays.asList("a", "b"))); + + context.pollUntilSend(); + int correlationId = context.assertSentEndQuorumEpochRequest(epoch, 1); + context.deliverResponse(correlationId, 1, context.endEpochResponse(epoch, OptionalInt.of(localId))); + context.client.poll(); context.time.sleep(context.electionTimeoutMs); context.pollUntilSend(); + context.assertVotedCandidate(epoch + 1, localId); context.assertSentVoteRequest(epoch + 1, 0, 0L); } + @Test + public void testEndQuorumEpochRetriesWhileResigned() throws Exception { + int localId = 0; + int voter1 = 1; + int voter2 = 2; + Set voters = Utils.mkSet(localId, voter1, voter2); + int epoch = 19; + + // Start off as leader so that we will initialize in the Resigned state. + // Note that we intentionally set a request timeout which is smaller than + // the election timeout so that we can still in the Resigned state and + // verify retry behavior. + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .withElectionTimeoutMs(10000) + .withRequestTimeoutMs(5000) + .withElectedLeader(epoch, localId) + .build(); + + context.pollUntilSend(); + List requests = context.collectEndQuorumRequests(epoch, Utils.mkSet(voter1, voter2)); + assertEquals(2, requests.size()); + + // Respond to one of the requests so that we can verify that no additional + // request to this node is sent. + RaftRequest.Outbound endEpochOutbound = requests.get(0); + context.deliverResponse(endEpochOutbound.correlationId, endEpochOutbound.destinationId(), + context.endEpochResponse(epoch, OptionalInt.of(localId))); + context.client.poll(); + assertEquals(Collections.emptyList(), context.channel.drainSendQueue()); + + // Now sleep for the request timeout and verify that we get only one + // retried request from the voter that hasn't responded yet. + int nonRespondedId = requests.get(1).destinationId(); + context.time.sleep(6000); + context.pollUntilSend(); + List retries = context.collectEndQuorumRequests(epoch, Utils.mkSet(nonRespondedId)); + assertEquals(1, retries.size()); + } + @Test public void testInitializeAsCandidateFromStateStore() throws Exception { int localId = 0; @@ -117,12 +216,11 @@ public class KafkaRaftClientTest { RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) .withVotedCandidate(2, localId) .build(); - + context.assertVotedCandidate(2, localId); assertEquals(0L, context.log.endOffset().offset); - // Send out vote requests. - context.client.poll(); - + // The candidate will resume the election after reinitialization + context.pollUntilSend(); List voteRequests = context.collectVoteRequests(2, 0, 0); assertEquals(2, voteRequests.size()); } @@ -205,10 +303,10 @@ public class KafkaRaftClientTest { } @Test - public void testEndQuorumIgnoredIfAlreadyCandidate() throws Exception { + public void testEndQuorumIgnoredAsCandidateIfOlderEpoch() throws Exception { int localId = 0; int otherNodeId = 1; - int epoch = 2; + int epoch = 5; int jitterMs = 85; Set voters = Utils.mkSet(localId, otherNodeId); @@ -216,14 +314,19 @@ public class KafkaRaftClientTest { .updateRandom(random -> { Mockito.doReturn(jitterMs).when(random).nextInt(Mockito.anyInt()); }) - .withVotedCandidate(epoch, localId) + .withUnknownLeader(epoch - 1) .build(); - context.deliverRequest(context.endEpochRequest(epoch, OptionalInt.empty(), - otherNodeId, Collections.singletonList(context.localId))); + // Sleep a little to ensure that we become a candidate + context.time.sleep(context.electionTimeoutMs + jitterMs); + context.client.poll(); + context.assertVotedCandidate(epoch, localId); + + context.deliverRequest(context.endEpochRequest(epoch - 2, otherNodeId, + Collections.singletonList(context.localId))); context.client.poll(); - context.assertSentEndQuorumEpochResponse(Errors.NONE, epoch, OptionalInt.empty()); + context.assertSentEndQuorumEpochResponse(Errors.FENCED_LEADER_EPOCH, epoch, OptionalInt.empty()); // We should still be candidate until expiration of election timeout context.time.sleep(context.electionTimeoutMs + jitterMs - 1); @@ -242,21 +345,20 @@ public class KafkaRaftClientTest { } @Test - public void testEndQuorumIgnoredIfAlreadyLeader() throws Exception { + public void testEndQuorumIgnoredAsLeaderIfOlderEpoch() throws Exception { int localId = 0; int voter2 = localId + 1; int voter3 = localId + 2; - int epoch = 2; + int epoch = 7; Set voters = Utils.mkSet(localId, voter2, voter3); RaftClientTestContext context = RaftClientTestContext.initializeAsLeader(localId, voters, epoch); - // One of the voters may have sent EndEpoch as a candidate because it - // had not yet been notified that the local node was the leader. - context.deliverRequest(context.endEpochRequest(epoch, OptionalInt.empty(), voter2, Arrays.asList(context.localId, voter3))); + // One of the voters may have sent EndQuorumEpoch from an earlier epoch + context.deliverRequest(context.endEpochRequest(epoch - 2, voter2, Arrays.asList(context.localId, voter3))); context.client.poll(); - context.assertSentEndQuorumEpochResponse(Errors.NONE, epoch, OptionalInt.of(context.localId)); + context.assertSentEndQuorumEpochResponse(Errors.FENCED_LEADER_EPOCH, epoch, OptionalInt.of(context.localId)); // We should still be leader as long as fetch timeout has not expired context.time.sleep(context.fetchTimeoutMs - 1); @@ -264,27 +366,6 @@ public class KafkaRaftClientTest { context.assertElectedLeader(epoch, context.localId); } - @Test - public void testEndQuorumStartsNewElectionAfterBackoffIfReceivedFromVotedCandidate() throws Exception { - int localId = 0; - int otherNodeId = 1; - int epoch = 2; - Set voters = Utils.mkSet(localId, otherNodeId); - - RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .withVotedCandidate(epoch, otherNodeId) - .build(); - - context.deliverRequest(context.endEpochRequest(epoch, OptionalInt.empty(), - otherNodeId, Collections.singletonList(context.localId))); - context.client.poll(); - context.assertSentEndQuorumEpochResponse(Errors.NONE, epoch, OptionalInt.empty()); - - context.time.sleep(context.electionBackoffMaxMs); - context.client.poll(); - context.assertVotedCandidate(epoch + 1, context.localId); - } - @Test public void testEndQuorumStartsNewElectionImmediatelyIfFollowerUnattached() throws Exception { int localId = 0; @@ -297,7 +378,8 @@ public class KafkaRaftClientTest { .withUnknownLeader(epoch) .build(); - context.deliverRequest(context.endEpochRequest(epoch, OptionalInt.of(voter2), voter2, Arrays.asList(context.localId, voter3))); + context.deliverRequest(context.endEpochRequest(epoch, voter2, + Arrays.asList(context.localId, voter3))); context.client.poll(); context.assertSentEndQuorumEpochResponse(Errors.NONE, epoch, OptionalInt.of(voter2)); @@ -406,7 +488,8 @@ public class KafkaRaftClientTest { .withElectedLeader(leaderEpoch, oldLeaderId) .build(); - context.deliverRequest(context.endEpochRequest(leaderEpoch, OptionalInt.of(oldLeaderId), oldLeaderId, Collections.singletonList(context.localId))); + context.deliverRequest(context.endEpochRequest(leaderEpoch, oldLeaderId, + Collections.singletonList(context.localId))); context.client.poll(); context.assertSentEndQuorumEpochResponse(Errors.NONE, leaderEpoch, OptionalInt.of(oldLeaderId)); @@ -427,8 +510,8 @@ public class KafkaRaftClientTest { .withElectedLeader(leaderEpoch, oldLeaderId) .build(); - context.deliverRequest(context.endEpochRequest(leaderEpoch, - OptionalInt.of(oldLeaderId), oldLeaderId, Arrays.asList(preferredNextLeader, context.localId))); + context.deliverRequest(context.endEpochRequest(leaderEpoch, oldLeaderId, + Arrays.asList(preferredNextLeader, context.localId))); context.pollUntilSend(); context.assertSentEndQuorumEpochResponse(Errors.NONE, leaderEpoch, OptionalInt.of(oldLeaderId)); @@ -896,7 +979,7 @@ public class KafkaRaftClientTest { context.client.poll(); context.assertSentBeginQuorumEpochResponse(Errors.INCONSISTENT_VOTER_SET, epoch, OptionalInt.of(context.localId)); - context.deliverRequest(context.endEpochRequest(epoch, OptionalInt.of(context.localId), nonVoterId, Collections.singletonList(otherNodeId))); + context.deliverRequest(context.endEpochRequest(epoch, nonVoterId, Collections.singletonList(otherNodeId))); context.client.poll(); // The sent request has no context.localId as a preferable voter. @@ -1080,16 +1163,19 @@ public class KafkaRaftClientTest { int voter2 = localId + 1; int voter3 = localId + 2; int epoch = 5; - // This node initializes as a candidate Set voters = Utils.mkSet(voter1, voter2, voter3); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .withVotedCandidate(epoch, voter1) + .withUnknownLeader(epoch - 1) .build(); - context.assertVotedCandidate(epoch, voter1); + context.assertUnknownLeader(epoch - 1); + + // Sleep a little to ensure that we become a candidate + context.time.sleep(context.electionTimeoutMs * 2); // Wait until the vote requests are inflight context.pollUntilSend(); + context.assertVotedCandidate(epoch, context.localId); List voteRequests = context.collectVoteRequests(epoch, 0, 0); assertEquals(2, voteRequests.size()); @@ -1199,10 +1285,10 @@ public class KafkaRaftClientTest { assertFalse(shutdownFuture.isDone()); // Send EndQuorumEpoch request to the other voter - context.client.poll(); + context.pollUntilSend(); assertTrue(context.client.isShuttingDown()); assertTrue(context.client.isRunning()); - context.assertSentEndQuorumEpochRequest(1, OptionalInt.of(context.localId), otherNodeId); + context.assertSentEndQuorumEpochRequest(1, otherNodeId); // We should still be able to handle vote requests during graceful shutdown // in order to help the new leader get elected @@ -1241,11 +1327,11 @@ public class KafkaRaftClientTest { assertTrue(context.client.isRunning()); // Send EndQuorumEpoch request to the close follower - context.client.poll(); + context.pollUntilSend(); assertTrue(context.client.isRunning()); - List endQuorumRequests = - context.collectEndQuorumRequests(1, OptionalInt.of(context.localId), Utils.mkSet(closeFollower, laggingFollower)); + List endQuorumRequests = context.collectEndQuorumRequests( + 1, Utils.mkSet(closeFollower, laggingFollower)); assertEquals(2, endQuorumRequests.size()); } @@ -1312,10 +1398,10 @@ public class KafkaRaftClientTest { assertFalse(shutdownFuture.isDone()); // Send EndQuorumEpoch request to the other vote - context.client.poll(); + context.pollUntilSend(); assertTrue(context.client.isRunning()); - context.assertSentEndQuorumEpochRequest(epoch, OptionalInt.of(context.localId), otherNodeId); + context.assertSentEndQuorumEpochRequest(epoch, otherNodeId); // The shutdown timeout is hit before we receive any requests or responses indicating an epoch bump context.time.sleep(shutdownTimeoutMs); @@ -1351,6 +1437,31 @@ public class KafkaRaftClientTest { assertNull(shutdownFuture.get()); } + @Test + public void testObserverGracefulShutdown() throws Exception { + int localId = 0; + int voter1 = 1; + int voter2 = 2; + Set voters = Utils.mkSet(voter1, voter2); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .withUnknownLeader(5) + .build(); + context.client.poll(); + context.assertUnknownLeader(5); + + // Observer shutdown should complete immediately even if the + // current leader is unknown + CompletableFuture shutdownFuture = context.client.shutdown(5000); + assertTrue(context.client.isRunning()); + assertFalse(shutdownFuture.isDone()); + + context.client.poll(); + assertFalse(context.client.isRunning()); + assertTrue(shutdownFuture.isDone()); + assertNull(shutdownFuture.get()); + } + @Test public void testGracefulShutdownSingleMemberQuorum() throws IOException { int localId = 0; @@ -1663,12 +1774,14 @@ public class KafkaRaftClientTest { Set voters = Utils.mkSet(localId, otherNodeId); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .withVotedCandidate(epoch, localId) + .withUnknownLeader(epoch - 1) .build(); + // Sleep a little to ensure that we become a candidate + context.time.sleep(context.electionTimeoutMs * 2); + context.pollUntilSend(); context.assertVotedCandidate(epoch, context.localId); - context.pollUntilSend(); int correlationId = context.assertSentVoteRequest(epoch, 0, 0L); VoteResponseData response = new VoteResponseData() .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()); @@ -1689,7 +1802,7 @@ public class KafkaRaftClientTest { context.client.shutdown(5000); context.pollUntilSend(); - int correlationId = context.assertSentEndQuorumEpochRequest(epoch, OptionalInt.of(context.localId), otherNodeId); + int correlationId = context.assertSentEndQuorumEpochRequest(epoch, otherNodeId); EndQuorumEpochResponseData response = new EndQuorumEpochResponseData() .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()); diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java index c2a1ba49275..a0e9e14d5ba 100644 --- a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java @@ -136,12 +136,13 @@ public class QuorumStateTest { } @Test - public void testInitializeAsCandidate() throws IOException { + public void testInitializeAsResignedCandidate() throws IOException { int node1 = 1; int node2 = 2; int epoch = 5; Set voters = Utils.mkSet(localId, node1, node2); - store.writeElectionState(ElectionState.withVotedCandidate(epoch, localId, voters)); + ElectionState election = ElectionState.withVotedCandidate(epoch, localId, voters); + store.writeElectionState(election); int jitterMs = 2500; Mockito.doReturn(jitterMs).when(random).nextInt(Mockito.anyInt()); @@ -153,6 +154,7 @@ public class QuorumStateTest { CandidateState candidateState = state.candidateStateOrThrow(); assertEquals(epoch, candidateState.epoch()); + assertEquals(election, candidateState.election()); assertEquals(Utils.mkSet(node1, node2), candidateState.unrecordedVoters()); assertEquals(Utils.mkSet(localId), candidateState.grantingVoters()); assertEquals(Collections.emptySet(), candidateState.rejectingVoters()); @@ -161,16 +163,17 @@ public class QuorumStateTest { } @Test - public void testInitializeAsLeader() throws IOException { + public void testInitializeAsResignedLeader() throws IOException { int node1 = 1; int node2 = 2; int epoch = 5; Set voters = Utils.mkSet(localId, node1, node2); - store.writeElectionState(ElectionState.withElectedLeader(epoch, localId, voters)); + ElectionState election = ElectionState.withElectedLeader(epoch, localId, voters); + store.writeElectionState(election); - // If we were previously a leader, we will start as unattached - // so that records are always uniquely defined by epoch and offset - // even accounting for the loss of unflushed data. + // If we were previously a leader, we will start as resigned in order to ensure + // a new leader gets elected. This ensures that records are always uniquely + // defined by epoch and offset even accounting for the loss of unflushed data. // The election timeout should be reset after we become a candidate again int jitterMs = 2500; @@ -181,10 +184,12 @@ public class QuorumStateTest { assertFalse(state.isLeader()); assertEquals(epoch, state.epoch()); - UnattachedState unattachedState = state.unattachedStateOrThrow(); - assertEquals(epoch, unattachedState.epoch()); + ResignedState resignedState = state.resignedStateOrThrow(); + assertEquals(epoch, resignedState.epoch()); + assertEquals(election, resignedState.election()); + assertEquals(Utils.mkSet(node1, node2), resignedState.unackedVoters()); assertEquals(electionTimeoutMs + jitterMs, - unattachedState.remainingElectionTimeMs(time.milliseconds())); + resignedState.remainingElectionTimeMs(time.milliseconds())); } @Test @@ -230,6 +235,23 @@ public class QuorumStateTest { candidate2.remainingElectionTimeMs(time.milliseconds())); } + @Test + public void testCandidateToResigned() throws IOException { + int node1 = 1; + int node2 = 2; + Set voters = Utils.mkSet(localId, node1, node2); + assertNull(store.readElectionState()); + + QuorumState state = initializeEmptyState(voters); + state.transitionToCandidate(); + assertTrue(state.isCandidate()); + assertEquals(1, state.epoch()); + + assertThrows(IllegalStateException.class, () -> + state.transitionToResigned(Collections.singletonList(localId))); + assertTrue(state.isCandidate()); + } + @Test public void testCandidateToLeader() throws IOException { Set voters = Utils.mkSet(localId); @@ -339,6 +361,27 @@ public class QuorumStateTest { assertEquals(1, state.epoch()); } + @Test + public void testLeaderToResigned() throws IOException { + Set voters = Utils.mkSet(localId); + assertNull(store.readElectionState()); + + QuorumState state = initializeEmptyState(voters); + state.initialize(new OffsetAndEpoch(0L, logEndEpoch)); + state.transitionToCandidate(); + state.transitionToLeader(0L); + assertTrue(state.isLeader()); + assertEquals(1, state.epoch()); + + state.transitionToResigned(Collections.singletonList(localId)); + assertTrue(state.isResigned()); + ResignedState resignedState = state.resignedStateOrThrow(); + assertEquals(ElectionState.withElectedLeader(1, localId, voters), + resignedState.election()); + assertEquals(1, resignedState.epoch()); + assertEquals(Collections.emptySet(), resignedState.unackedVoters()); + } + @Test public void testLeaderToCandidate() throws IOException { Set voters = Utils.mkSet(localId); @@ -434,7 +477,7 @@ public class QuorumStateTest { } @Test - public void testUnattachedToLeader() throws IOException { + public void testUnattachedToLeaderOrResigned() throws IOException { int leaderId = 1; int epoch = 5; Set voters = Utils.mkSet(localId, leaderId); @@ -443,6 +486,7 @@ public class QuorumStateTest { state.initialize(new OffsetAndEpoch(0L, logEndEpoch)); assertTrue(state.isUnattached()); assertThrows(IllegalStateException.class, () -> state.transitionToLeader(0L)); + assertThrows(IllegalStateException.class, () -> state.transitionToResigned(Collections.emptyList())); } @Test @@ -568,7 +612,7 @@ public class QuorumStateTest { } @Test - public void testVotedToLeader() throws IOException { + public void testVotedToInvalidLeaderOrResigned() throws IOException { int node1 = 1; int node2 = 2; Set voters = Utils.mkSet(localId, node1, node2); @@ -576,6 +620,7 @@ public class QuorumStateTest { state.initialize(new OffsetAndEpoch(0L, logEndEpoch)); state.transitionToVoted(5, node1); assertThrows(IllegalStateException.class, () -> state.transitionToLeader(0)); + assertThrows(IllegalStateException.class, () -> state.transitionToResigned(Collections.emptyList())); } @Test @@ -721,7 +766,7 @@ public class QuorumStateTest { } @Test - public void testFollowerToLeader() throws IOException { + public void testFollowerToLeaderOrResigned() throws IOException { int node1 = 1; int node2 = 2; Set voters = Utils.mkSet(localId, node1, node2); @@ -729,6 +774,7 @@ public class QuorumStateTest { state.initialize(new OffsetAndEpoch(0L, logEndEpoch)); state.transitionToFollower(8, node2); assertThrows(IllegalStateException.class, () -> state.transitionToLeader(0)); + assertThrows(IllegalStateException.class, () -> state.transitionToResigned(Collections.emptyList())); } @Test diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 4e9b133d437..4d9db4aaf90 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -44,6 +44,7 @@ import org.apache.kafka.common.requests.BeginQuorumEpochRequest; import org.apache.kafka.common.requests.BeginQuorumEpochResponse; import org.apache.kafka.common.requests.DescribeQuorumResponse; import org.apache.kafka.common.requests.EndQuorumEpochRequest; +import org.apache.kafka.common.requests.EndQuorumEpochResponse; import org.apache.kafka.common.requests.VoteRequest; import org.apache.kafka.common.requests.VoteResponse; import org.apache.kafka.common.utils.LogContext; @@ -85,7 +86,7 @@ final class RaftClientTestContext { final int electionTimeoutMs = Builder.DEFAULT_ELECTION_TIMEOUT_MS; final int electionFetchMaxWaitMs = Builder.FETCH_MAX_WAIT_MS; final int fetchTimeoutMs = Builder.FETCH_TIMEOUT_MS; - final int requestTimeoutMs = Builder.REQUEST_TIMEOUT_MS; + final int requestTimeoutMs = Builder.DEFAULT_REQUEST_TIMEOUT_MS; final int retryBackoffMs = Builder.RETRY_BACKOFF_MS; private final QuorumStateStore quorumStateStore; @@ -107,7 +108,7 @@ final class RaftClientTestContext { private static final int FETCH_MAX_WAIT_MS = 0; // fetch timeout is usually larger than election timeout private static final int FETCH_TIMEOUT_MS = 50000; - private static final int REQUEST_TIMEOUT_MS = 5000; + private static final int DEFAULT_REQUEST_TIMEOUT_MS = 5000; private static final int RETRY_BACKOFF_MS = 50; private static final int DEFAULT_APPEND_LINGER_MS = 0; @@ -118,6 +119,7 @@ final class RaftClientTestContext { private final Set voters; private final int localId; + private int requestTimeoutMs = DEFAULT_REQUEST_TIMEOUT_MS; private int electionTimeoutMs = DEFAULT_ELECTION_TIMEOUT_MS; private int appendLingerMs = DEFAULT_APPEND_LINGER_MS; private MemoryPool memoryPool = MemoryPool.NONE; @@ -163,6 +165,16 @@ final class RaftClientTestContext { return this; } + Builder withElectionTimeoutMs(int electionTimeoutMs) { + this.electionTimeoutMs = electionTimeoutMs; + return this; + } + + Builder withRequestTimeoutMs(int requestTimeoutMs) { + this.requestTimeoutMs = requestTimeoutMs; + return this; + } + RaftClientTestContext build() throws IOException { Metrics metrics = new Metrics(time); MockNetworkChannel channel = new MockNetworkChannel(); @@ -190,7 +202,7 @@ final class RaftClientTestContext { voterAddresses, ELECTION_BACKOFF_MAX_MS, RETRY_BACKOFF_MS, - REQUEST_TIMEOUT_MS, + requestTimeoutMs, FETCH_MAX_WAIT_MS, appendLingerMs, logContext, @@ -462,9 +474,9 @@ final class RaftClientTestContext { assertEquals(partitionError, Errors.forCode(partitionResponse.errorCode())); } - int assertSentEndQuorumEpochRequest(int epoch, OptionalInt leaderId, int destinationId) { + int assertSentEndQuorumEpochRequest(int epoch, int destinationId) { List endQuorumRequests = collectEndQuorumRequests( - epoch, leaderId, Collections.singleton(destinationId)); + epoch, Collections.singleton(destinationId)); assertEquals(1, endQuorumRequests.size()); return endQuorumRequests.get(0).correlationId(); } @@ -553,7 +565,7 @@ final class RaftClientTestContext { assertSentFetchResponse(1L, epoch); } - List collectEndQuorumRequests(int epoch, OptionalInt leaderId, Set destinationIdSet) { + List collectEndQuorumRequests(int epoch, Set destinationIdSet) { List endQuorumRequests = new ArrayList<>(); Set collectedDestinationIdSet = new HashSet<>(); for (RaftMessage raftMessage : channel.drainSendQueue()) { @@ -564,8 +576,7 @@ final class RaftClientTestContext { request.topics().get(0).partitions().get(0); assertEquals(epoch, partitionRequest.leaderEpoch()); - assertEquals(leaderId.orElse(-1), partitionRequest.leaderId()); - assertEquals(localId, partitionRequest.replicaId()); + assertEquals(localId, partitionRequest.leaderId()); RaftRequest.Outbound outboundRequest = (RaftRequest.Outbound) raftMessage; collectedDestinationIdSet.add(outboundRequest.destinationId()); @@ -626,16 +637,28 @@ final class RaftClientTestContext { return new InetSocketAddress("localhost", 9990 + id); } + EndQuorumEpochResponseData endEpochResponse( + int epoch, + OptionalInt leaderId + ) { + return EndQuorumEpochResponse.singletonResponse( + Errors.NONE, + metadataPartition, + Errors.NONE, + epoch, + leaderId.orElse(-1) + ); + } + EndQuorumEpochRequestData endEpochRequest( int epoch, - OptionalInt leaderId, - int replicaId, - List preferredSuccessors) { + int leaderId, + List preferredSuccessors + ) { return EndQuorumEpochRequest.singletonRequest( metadataPartition, - replicaId, epoch, - leaderId.orElse(-1), + leaderId, preferredSuccessors ); } diff --git a/raft/src/test/java/org/apache/kafka/raft/ResignedStateTest.java b/raft/src/test/java/org/apache/kafka/raft/ResignedStateTest.java new file mode 100644 index 00000000000..20e2e8493d3 --- /dev/null +++ b/raft/src/test/java/org/apache/kafka/raft/ResignedStateTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Utils; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class ResignedStateTest { + + private final MockTime time = new MockTime(); + + @Test + public void testResignedState() { + int electionTimeoutMs = 5000; + int localId = 0; + int remoteId = 1; + int epoch = 5; + Set voters = Utils.mkSet(localId, remoteId); + + ResignedState state = new ResignedState( + time, + localId, + epoch, + voters, + electionTimeoutMs, + Collections.emptyList() + ); + + assertEquals(ElectionState.withElectedLeader(epoch, localId, voters), state.election()); + assertEquals(epoch, state.epoch()); + + assertEquals(Collections.singleton(remoteId), state.unackedVoters()); + state.acknowledgeResignation(remoteId); + assertEquals(Collections.emptySet(), state.unackedVoters()); + + assertEquals(electionTimeoutMs, state.remainingElectionTimeMs(time.milliseconds())); + assertFalse(state.hasElectionTimeoutExpired(time.milliseconds())); + time.sleep(electionTimeoutMs / 2); + assertEquals(electionTimeoutMs / 2, state.remainingElectionTimeMs(time.milliseconds())); + assertFalse(state.hasElectionTimeoutExpired(time.milliseconds())); + time.sleep(electionTimeoutMs / 2); + assertEquals(0, state.remainingElectionTimeMs(time.milliseconds())); + assertTrue(state.hasElectionTimeoutExpired(time.milliseconds())); + } + +}