KAFKA-18345; Wait the entire election timeout on election loss (#19747)
CI / build (push) Waiting to run Details

Replaces exponential backoff for candidate state after losing election
with waiting rest of election timeout. There is no need to have an
exponential backoff when the election timeout already provides a natural
throttle and it is randomized.

Reviewers: José Armando García Sancio <jsancio@apache.org>, TaiJuWu
 <tjwu1217@gmail.com>
This commit is contained in:
Alyssa Huang 2025-05-23 12:35:49 -04:00 committed by GitHub
parent af4d048da6
commit 97db06689b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 22 additions and 205 deletions

View File

@ -30,15 +30,12 @@ public class CandidateState implements NomineeState {
private final int localId;
private final Uuid localDirectoryId;
private final int epoch;
private final int retries;
private final EpochElection epochElection;
private final Optional<LogOffsetMetadata> highWatermark;
private final int electionTimeoutMs;
private final Timer electionTimer;
private final Timer backoffTimer;
private final Logger log;
private boolean isBackingOff;
/**
* The lifetime of a candidate state is the following.
*
@ -54,7 +51,6 @@ public class CandidateState implements NomineeState {
int epoch,
VoterSet voters,
Optional<LogOffsetMetadata> highWatermark,
int retries,
int electionTimeoutMs,
LogContext logContext
) {
@ -73,28 +69,14 @@ public class CandidateState implements NomineeState {
this.localDirectoryId = localDirectoryId;
this.epoch = epoch;
this.highWatermark = highWatermark;
this.retries = retries;
this.isBackingOff = false;
this.electionTimeoutMs = electionTimeoutMs;
this.electionTimer = time.timer(electionTimeoutMs);
this.backoffTimer = time.timer(0);
this.log = logContext.logger(CandidateState.class);
this.epochElection = new EpochElection(voters.voterKeys());
epochElection.recordVote(localId, true);
}
/**
* Check if the candidate is backing off for the next election
*/
public boolean isBackingOff() {
return isBackingOff;
}
public int retries() {
return retries;
}
@Override
public EpochElection epochElection() {
return epochElection;
@ -118,34 +100,12 @@ public class CandidateState implements NomineeState {
return epochElection().recordVote(remoteNodeId, false);
}
/**
* Record the current election has failed since we've either received sufficient rejecting voters or election timed out
*/
public void startBackingOff(long currentTimeMs, long backoffDurationMs) {
this.backoffTimer.update(currentTimeMs);
this.backoffTimer.reset(backoffDurationMs);
this.isBackingOff = true;
}
@Override
public boolean hasElectionTimeoutExpired(long currentTimeMs) {
electionTimer.update(currentTimeMs);
return electionTimer.isExpired();
}
public boolean isBackoffComplete(long currentTimeMs) {
backoffTimer.update(currentTimeMs);
return backoffTimer.isExpired();
}
public long remainingBackoffMs(long currentTimeMs) {
if (!isBackingOff) {
throw new IllegalStateException("Candidate is not currently backing off");
}
backoffTimer.update(currentTimeMs);
return backoffTimer.remainingMs();
}
@Override
public long remainingElectionTimeMs(long currentTimeMs) {
electionTimer.update(currentTimeMs);
@ -201,12 +161,11 @@ public class CandidateState implements NomineeState {
@Override
public String toString() {
return String.format(
"CandidateState(localId=%d, localDirectoryId=%s, epoch=%d, retries=%d, epochElection=%s, " +
"CandidateState(localId=%d, localDirectoryId=%s, epoch=%d, epochElection=%s, " +
"highWatermark=%s, electionTimeoutMs=%d)",
localId,
localDirectoryId,
epoch,
retries,
epochElection(),
highWatermark,
electionTimeoutMs

View File

@ -1018,26 +1018,13 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
*/
private void maybeHandleElectionLoss(NomineeState state, long currentTimeMs) {
if (state instanceof CandidateState candidate) {
if (candidate.epochElection().isVoteRejected() && !candidate.isBackingOff()) {
if (candidate.epochElection().isVoteRejected()) {
logger.info(
"Insufficient remaining votes to become leader. We will backoff before retrying election again. " +
"Current epoch election state is {}.",
"Insufficient remaining votes to become leader. Candidate will wait the remaining election " +
"timeout ({}) before transitioning back to Prospective. Current epoch election state is {}.",
candidate.remainingElectionTimeMs(currentTimeMs),
candidate.epochElection()
);
// Go immediately to a random, exponential backoff. The backoff starts low to prevent
// needing to wait the entire election timeout when the vote result has already been
// determined. The randomness prevents the next election from being gridlocked with
// another nominee due to timing. The exponential aspect limits epoch churn when the
// replica has failed multiple elections in succession.
candidate.startBackingOff(
currentTimeMs,
RaftUtil.binaryExponentialElectionBackoffMs(
quorumConfig.electionBackoffMaxMs(),
RETRY_BACKOFF_BASE_MS,
candidate.retries(),
random
)
);
}
} else if (state instanceof ProspectiveState prospective) {
if (prospective.epochElection().isVoteRejected()) {
@ -3149,13 +3136,6 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
// 3) the shutdown timer expires
long minRequestBackoffMs = maybeSendVoteRequests(state, currentTimeMs);
return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs);
} else if (state.isBackingOff()) {
if (state.isBackoffComplete(currentTimeMs)) {
logger.info("Transition to prospective after election backoff has completed");
transitionToProspective(currentTimeMs);
return 0L;
}
return state.remainingBackoffMs(currentTimeMs);
} else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
logger.info("Election was not granted, transitioning to prospective");
transitionToProspective(currentTimeMs);

View File

@ -37,7 +37,6 @@ public class ProspectiveState implements NomineeState {
private final VoterSet voters;
private final EpochElection epochElection;
private final Optional<LogOffsetMetadata> highWatermark;
private final int retries;
private final long electionTimeoutMs;
private final Timer electionTimer;
private final Logger log;
@ -60,7 +59,6 @@ public class ProspectiveState implements NomineeState {
Optional<ReplicaKey> votedKey,
VoterSet voters,
Optional<LogOffsetMetadata> highWatermark,
int retries,
int electionTimeoutMs,
LogContext logContext
) {
@ -71,7 +69,6 @@ public class ProspectiveState implements NomineeState {
this.votedKey = votedKey;
this.voters = voters;
this.highWatermark = highWatermark;
this.retries = retries;
this.electionTimeoutMs = electionTimeoutMs;
this.electionTimer = time.timer(electionTimeoutMs);
this.log = logContext.logger(ProspectiveState.class);
@ -89,10 +86,6 @@ public class ProspectiveState implements NomineeState {
return epochElection;
}
public int retries() {
return retries;
}
@Override
public boolean recordGrantedVote(int remoteNodeId) {
return epochElection().recordVote(remoteNodeId, true);
@ -160,11 +153,10 @@ public class ProspectiveState implements NomineeState {
@Override
public String toString() {
return String.format(
"ProspectiveState(epoch=%d, leaderId=%s, retries=%d, votedKey=%s, epochElection=%s, " +
"ProspectiveState(epoch=%d, leaderId=%s, votedKey=%s, epochElection=%s, " +
"electionTimeoutMs=%s, highWatermark=%s)",
epoch,
leaderId,
retries,
votedKey,
epochElection,
electionTimeoutMs,

View File

@ -200,7 +200,6 @@ public class QuorumState {
election.epoch(),
partitionState.lastVoterSet(),
Optional.empty(),
1,
randomElectionTimeoutMs(),
logContext
);
@ -481,6 +480,9 @@ public class QuorumState {
int epoch,
ReplicaKey candidateKey
) {
// Verify the current state is prospective, this method should only be used to add voted state to
// prospective state. Transitions from other states to prospective use transitionToProspective instead.
prospectiveStateOrThrow();
int currentEpoch = state.epoch();
if (localId.isPresent() && candidateKey.id() == localId.getAsInt()) {
throw new IllegalStateException(
@ -505,7 +507,6 @@ public class QuorumState {
);
}
ProspectiveState prospectiveState = prospectiveStateOrThrow();
// Note that we reset the election timeout after voting for a candidate because we
// know that the candidate has at least as good of a chance of getting elected as us
durableTransitionTo(
@ -518,7 +519,6 @@ public class QuorumState {
Optional.of(candidateKey),
partitionState.lastVoterSet(),
state.highWatermark(),
prospectiveState.retries(),
randomElectionTimeoutMs(),
logContext
)
@ -620,8 +620,6 @@ public class QuorumState {
" is state " + state);
}
int retries = isCandidate() ? candidateStateOrThrow().retries() + 1 : 1;
// Durable transition is not necessary since there is no change to the persisted electionState
memoryTransitionTo(
new ProspectiveState(
@ -633,7 +631,6 @@ public class QuorumState {
votedKey(),
partitionState.lastVoterSet(),
state.highWatermark(),
retries,
randomElectionTimeoutMs(),
logContext
)
@ -646,8 +643,6 @@ public class QuorumState {
int newEpoch = epoch() + 1;
int electionTimeoutMs = randomElectionTimeoutMs();
int retries = isProspective() ? prospectiveStateOrThrow().retries() : 1;
durableTransitionTo(new CandidateState(
time,
localIdOrThrow(),
@ -655,7 +650,6 @@ public class QuorumState {
newEpoch,
partitionState.lastVoterSet(),
state.highWatermark(),
retries,
electionTimeoutMs,
logContext
));

View File

@ -48,7 +48,6 @@ import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
@ -756,18 +755,4 @@ public class RaftUtil {
data.topics().get(0).partitions().size() == 1 &&
data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition();
}
static int binaryExponentialElectionBackoffMs(int backoffMaxMs, int backoffBaseMs, int retries, Random random) {
if (retries <= 0) {
throw new IllegalArgumentException("Retries " + retries + " should be larger than zero");
}
// Takes minimum of the following:
// 1. exponential backoff calculation (maxes out at 102.4 seconds)
// 2. configurable electionBackoffMaxMs + jitter
// The jitter is added to prevent livelock of elections.
return Math.min(
backoffBaseMs * random.nextInt(1, 2 << Math.min(10, retries - 1)),
backoffMaxMs + random.nextInt(backoffBaseMs)
);
}
}

View File

@ -50,7 +50,6 @@ public class CandidateStateTest {
epoch,
voters,
Optional.empty(),
1,
electionTimeoutMs,
logContext
);

View File

@ -1132,14 +1132,9 @@ class KafkaRaftClientTest {
context.client.poll();
context.assertVotedCandidate(epoch, localId);
// Enter the backoff period
// After election timeout, replica will become prospective again
context.time.sleep(1);
context.client.poll();
context.assertVotedCandidate(epoch, localId);
// After backoff, replica will become prospective again
context.time.sleep(context.electionBackoffMaxMs);
context.client.poll();
assertTrue(context.client.quorum().isProspective());
}
@ -1775,15 +1770,15 @@ class KafkaRaftClientTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testCandidateBackoffElection(boolean withKip853Rpc) throws Exception {
public void testCandidateWaitsRestOfElectionTimeoutAfterElectionLoss(boolean withKip853Rpc) throws Exception {
int localId = randomReplicaId();
int otherNodeId = localId + 1;
int epoch = 1;
int exponentialFactor = 85; // set it large enough so that replica will bound on jitter
int jitter = 85;
Set<Integer> voters = Set.of(localId, otherNodeId);
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.updateRandom(r -> r.mockNextInt(exponentialFactor))
.updateRandom(r -> r.mockNextInt(jitter))
.withKip853Rpc(withKip853Rpc)
.build();
@ -1793,12 +1788,11 @@ class KafkaRaftClientTest {
context.pollUntilRequest();
context.assertVotedCandidate(epoch, localId);
CandidateState candidate = context.client.quorum().candidateStateOrThrow();
assertEquals(1, candidate.retries());
assertEquals(
context.electionTimeoutMs() + exponentialFactor,
context.electionTimeoutMs() + jitter,
candidate.remainingElectionTimeMs(context.time.milliseconds())
);
assertFalse(candidate.isBackingOff());
assertFalse(candidate.epochElection().isVoteRejected());
// Quorum size is two. If the other member rejects, then the local replica will lose the election.
RaftRequest.Outbound request = context.assertSentVoteRequest(epoch, 0, 0L, 1);
@ -1809,44 +1803,28 @@ class KafkaRaftClientTest {
);
context.client.poll();
assertTrue(candidate.isBackingOff());
assertEquals(
context.electionBackoffMaxMs + exponentialFactor,
candidate.remainingBackoffMs(context.time.milliseconds())
);
assertTrue(candidate.epochElection().isVoteRejected());
// Election is lost, but local replica should still remember that it has voted
context.assertVotedCandidate(epoch, localId);
// Even though candidacy was rejected, local replica will backoff for jitter period
// Even though candidacy was rejected, local replica will backoff for remaining election timeout
// before transitioning to prospective and starting a new election.
context.time.sleep(context.electionBackoffMaxMs + exponentialFactor - 1);
context.time.sleep(context.electionTimeoutMs() + jitter - 1);
context.client.poll();
context.assertVotedCandidate(epoch, localId);
// After jitter expires, become a prospective again
// After election timeout expires, become a prospective again
context.time.sleep(1);
context.client.poll();
assertTrue(context.client.quorum().isProspective());
ProspectiveState prospective = context.client.quorum().prospectiveStateOrThrow();
assertEquals(2, prospective.retries());
context.pollUntilRequest();
request = context.assertSentPreVoteRequest(epoch, 0, 0L, 1);
context.assertSentPreVoteRequest(epoch, 0, 0L, 1);
assertEquals(
context.electionTimeoutMs() + exponentialFactor,
context.electionTimeoutMs() + jitter,
prospective.remainingElectionTimeMs(context.time.milliseconds())
);
// After becoming candidate again, retries should be 2
context.deliverResponse(
request.correlationId(),
request.destination(),
context.voteResponse(true, OptionalInt.empty(), 1)
);
context.client.poll();
context.assertVotedCandidate(epoch + 1, localId);
candidate = context.client.quorum().candidateStateOrThrow();
assertEquals(2, candidate.retries());
}
@ParameterizedTest
@ -1870,12 +1848,11 @@ class KafkaRaftClientTest {
context.assertVotedCandidate(epoch, localId);
context.assertSentVoteRequest(epoch, 0, 0L, 1);
CandidateState candidate = context.client.quorum().candidateStateOrThrow();
assertEquals(1, candidate.retries());
assertEquals(
context.electionTimeoutMs() + jitter,
candidate.remainingElectionTimeMs(context.time.milliseconds())
);
assertFalse(candidate.isBackingOff());
assertFalse(candidate.epochElection().isVoteRejected());
// If election times out, replica transition to prospective without any additional backoff
context.time.sleep(candidate.remainingElectionTimeMs(context.time.milliseconds()));
@ -1883,7 +1860,6 @@ class KafkaRaftClientTest {
assertTrue(context.client.quorum().isProspective());
ProspectiveState prospective = context.client.quorum().prospectiveStateOrThrow();
assertEquals(2, prospective.retries());
context.pollUntilRequest();
context.assertSentPreVoteRequest(epoch, 0, 0L, 1);
assertEquals(

View File

@ -48,9 +48,4 @@ class MockableRandom extends Random {
public int nextInt(int bound) {
return nextIntFunction.apply(bound).orElse(super.nextInt(bound));
}
@Override
public int nextInt(int origin, int bound) {
return nextIntFunction.apply(bound).orElse(super.nextInt(bound));
}
}

View File

@ -71,7 +71,6 @@ public class ProspectiveStateTest {
votedKey,
voters,
Optional.empty(),
1,
electionTimeoutMs,
logContext
);
@ -87,7 +86,6 @@ public class ProspectiveStateTest {
Optional.empty(),
voters,
Optional.empty(),
1,
electionTimeoutMs,
logContext
);

View File

@ -59,22 +59,15 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Stream;
import static org.apache.kafka.raft.KafkaRaftClient.RETRY_BACKOFF_BASE_MS;
import static org.apache.kafka.raft.RaftUtil.binaryExponentialElectionBackoffMs;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class RaftUtilTest {
@ -628,60 +621,6 @@ public class RaftUtilTest {
assertEquals(expectedJson, json.toString());
}
@ParameterizedTest
@ValueSource(ints = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13})
public void testExponentialBoundOfExponentialElectionBackoffMs(int retries) {
Random mockedRandom = Mockito.mock(Random.class);
int electionBackoffMaxMs = 1000;
// test the bound of the method's first call to random.nextInt
binaryExponentialElectionBackoffMs(electionBackoffMaxMs, RETRY_BACKOFF_BASE_MS, retries, mockedRandom);
ArgumentCaptor<Integer> nextIntCaptor = ArgumentCaptor.forClass(Integer.class);
Mockito.verify(mockedRandom).nextInt(Mockito.eq(1), nextIntCaptor.capture());
int actualBound = nextIntCaptor.getValue();
int expectedBound = (int) (2 * Math.pow(2, retries - 1));
// after the 10th retry, the bound of the first call to random.nextInt will remain capped to
// (RETRY_BACKOFF_BASE_MS * 2 << 10)=2048 to prevent overflow
if (retries > 10) {
expectedBound = 2048;
}
assertEquals(expectedBound, actualBound, "Incorrect bound for retries=" + retries);
}
// test that the return value of the method is capped to QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG + jitter
// any exponential >= (1000 + jitter)/(RETRY_BACKOFF_BASE_MS)=21 will result in this cap
@ParameterizedTest
@ValueSource(ints = {1, 2, 20, 21, 22, 2048})
public void testExponentialElectionBackoffMsIsCapped(int exponential) {
Random mockedRandom = Mockito.mock(Random.class);
int electionBackoffMaxMs = 1000;
// this is the max bound of the method's first call to random.nextInt
int firstNextIntMaxBound = 2048;
int jitterMs = 50;
Mockito.when(mockedRandom.nextInt(1, firstNextIntMaxBound)).thenReturn(exponential);
Mockito.when(mockedRandom.nextInt(RETRY_BACKOFF_BASE_MS)).thenReturn(jitterMs);
int returnedBackoffMs = binaryExponentialElectionBackoffMs(electionBackoffMaxMs, RETRY_BACKOFF_BASE_MS, 11, mockedRandom);
// verify nextInt was called on both expected bounds
ArgumentCaptor<Integer> nextIntCaptor = ArgumentCaptor.forClass(Integer.class);
Mockito.verify(mockedRandom).nextInt(Mockito.eq(1), nextIntCaptor.capture());
Mockito.verify(mockedRandom).nextInt(nextIntCaptor.capture());
List<Integer> allCapturedBounds = nextIntCaptor.getAllValues();
assertEquals(firstNextIntMaxBound, allCapturedBounds.get(0));
assertEquals(RETRY_BACKOFF_BASE_MS, allCapturedBounds.get(1));
// finally verify the backoff returned is capped to electionBackoffMaxMs + jitterMs
int backoffValueCap = electionBackoffMaxMs + jitterMs;
if (exponential < 21) {
assertEquals(RETRY_BACKOFF_BASE_MS * exponential, returnedBackoffMs);
assertTrue(returnedBackoffMs < backoffValueCap);
} else {
assertEquals(backoffValueCap, returnedBackoffMs);
}
}
private Records createRecords() {
ByteBuffer allocate = ByteBuffer.allocate(1024);