diff --git a/raft/src/main/java/org/apache/kafka/raft/CandidateState.java b/raft/src/main/java/org/apache/kafka/raft/CandidateState.java index d66de84e284..d96d08cdf8e 100644 --- a/raft/src/main/java/org/apache/kafka/raft/CandidateState.java +++ b/raft/src/main/java/org/apache/kafka/raft/CandidateState.java @@ -30,15 +30,12 @@ public class CandidateState implements NomineeState { private final int localId; private final Uuid localDirectoryId; private final int epoch; - private final int retries; private final EpochElection epochElection; private final Optional highWatermark; private final int electionTimeoutMs; private final Timer electionTimer; - private final Timer backoffTimer; private final Logger log; - private boolean isBackingOff; /** * The lifetime of a candidate state is the following. * @@ -54,7 +51,6 @@ public class CandidateState implements NomineeState { int epoch, VoterSet voters, Optional highWatermark, - int retries, int electionTimeoutMs, LogContext logContext ) { @@ -73,28 +69,14 @@ public class CandidateState implements NomineeState { this.localDirectoryId = localDirectoryId; this.epoch = epoch; this.highWatermark = highWatermark; - this.retries = retries; - this.isBackingOff = false; this.electionTimeoutMs = electionTimeoutMs; this.electionTimer = time.timer(electionTimeoutMs); - this.backoffTimer = time.timer(0); this.log = logContext.logger(CandidateState.class); this.epochElection = new EpochElection(voters.voterKeys()); epochElection.recordVote(localId, true); } - /** - * Check if the candidate is backing off for the next election - */ - public boolean isBackingOff() { - return isBackingOff; - } - - public int retries() { - return retries; - } - @Override public EpochElection epochElection() { return epochElection; @@ -118,34 +100,12 @@ public class CandidateState implements NomineeState { return epochElection().recordVote(remoteNodeId, false); } - /** - * Record the current election has failed since we've either received sufficient rejecting voters or election timed out - */ - public void startBackingOff(long currentTimeMs, long backoffDurationMs) { - this.backoffTimer.update(currentTimeMs); - this.backoffTimer.reset(backoffDurationMs); - this.isBackingOff = true; - } - @Override public boolean hasElectionTimeoutExpired(long currentTimeMs) { electionTimer.update(currentTimeMs); return electionTimer.isExpired(); } - public boolean isBackoffComplete(long currentTimeMs) { - backoffTimer.update(currentTimeMs); - return backoffTimer.isExpired(); - } - - public long remainingBackoffMs(long currentTimeMs) { - if (!isBackingOff) { - throw new IllegalStateException("Candidate is not currently backing off"); - } - backoffTimer.update(currentTimeMs); - return backoffTimer.remainingMs(); - } - @Override public long remainingElectionTimeMs(long currentTimeMs) { electionTimer.update(currentTimeMs); @@ -201,12 +161,11 @@ public class CandidateState implements NomineeState { @Override public String toString() { return String.format( - "CandidateState(localId=%d, localDirectoryId=%s, epoch=%d, retries=%d, epochElection=%s, " + + "CandidateState(localId=%d, localDirectoryId=%s, epoch=%d, epochElection=%s, " + "highWatermark=%s, electionTimeoutMs=%d)", localId, localDirectoryId, epoch, - retries, epochElection(), highWatermark, electionTimeoutMs diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 5f2db26d26f..760d6b8c159 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -1018,26 +1018,13 @@ public final class KafkaRaftClient implements RaftClient { */ private void maybeHandleElectionLoss(NomineeState state, long currentTimeMs) { if (state instanceof CandidateState candidate) { - if (candidate.epochElection().isVoteRejected() && !candidate.isBackingOff()) { + if (candidate.epochElection().isVoteRejected()) { logger.info( - "Insufficient remaining votes to become leader. We will backoff before retrying election again. " + - "Current epoch election state is {}.", + "Insufficient remaining votes to become leader. Candidate will wait the remaining election " + + "timeout ({}) before transitioning back to Prospective. Current epoch election state is {}.", + candidate.remainingElectionTimeMs(currentTimeMs), candidate.epochElection() ); - // Go immediately to a random, exponential backoff. The backoff starts low to prevent - // needing to wait the entire election timeout when the vote result has already been - // determined. The randomness prevents the next election from being gridlocked with - // another nominee due to timing. The exponential aspect limits epoch churn when the - // replica has failed multiple elections in succession. - candidate.startBackingOff( - currentTimeMs, - RaftUtil.binaryExponentialElectionBackoffMs( - quorumConfig.electionBackoffMaxMs(), - RETRY_BACKOFF_BASE_MS, - candidate.retries(), - random - ) - ); } } else if (state instanceof ProspectiveState prospective) { if (prospective.epochElection().isVoteRejected()) { @@ -3149,13 +3136,6 @@ public final class KafkaRaftClient implements RaftClient { // 3) the shutdown timer expires long minRequestBackoffMs = maybeSendVoteRequests(state, currentTimeMs); return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs); - } else if (state.isBackingOff()) { - if (state.isBackoffComplete(currentTimeMs)) { - logger.info("Transition to prospective after election backoff has completed"); - transitionToProspective(currentTimeMs); - return 0L; - } - return state.remainingBackoffMs(currentTimeMs); } else if (state.hasElectionTimeoutExpired(currentTimeMs)) { logger.info("Election was not granted, transitioning to prospective"); transitionToProspective(currentTimeMs); diff --git a/raft/src/main/java/org/apache/kafka/raft/ProspectiveState.java b/raft/src/main/java/org/apache/kafka/raft/ProspectiveState.java index d66fe2c3e5f..911dcfef44e 100644 --- a/raft/src/main/java/org/apache/kafka/raft/ProspectiveState.java +++ b/raft/src/main/java/org/apache/kafka/raft/ProspectiveState.java @@ -37,7 +37,6 @@ public class ProspectiveState implements NomineeState { private final VoterSet voters; private final EpochElection epochElection; private final Optional highWatermark; - private final int retries; private final long electionTimeoutMs; private final Timer electionTimer; private final Logger log; @@ -60,7 +59,6 @@ public class ProspectiveState implements NomineeState { Optional votedKey, VoterSet voters, Optional highWatermark, - int retries, int electionTimeoutMs, LogContext logContext ) { @@ -71,7 +69,6 @@ public class ProspectiveState implements NomineeState { this.votedKey = votedKey; this.voters = voters; this.highWatermark = highWatermark; - this.retries = retries; this.electionTimeoutMs = electionTimeoutMs; this.electionTimer = time.timer(electionTimeoutMs); this.log = logContext.logger(ProspectiveState.class); @@ -89,10 +86,6 @@ public class ProspectiveState implements NomineeState { return epochElection; } - public int retries() { - return retries; - } - @Override public boolean recordGrantedVote(int remoteNodeId) { return epochElection().recordVote(remoteNodeId, true); @@ -160,11 +153,10 @@ public class ProspectiveState implements NomineeState { @Override public String toString() { return String.format( - "ProspectiveState(epoch=%d, leaderId=%s, retries=%d, votedKey=%s, epochElection=%s, " + + "ProspectiveState(epoch=%d, leaderId=%s, votedKey=%s, epochElection=%s, " + "electionTimeoutMs=%s, highWatermark=%s)", epoch, leaderId, - retries, votedKey, epochElection, electionTimeoutMs, diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java index d4b81bd3ce7..1462b824fab 100644 --- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java +++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java @@ -200,7 +200,6 @@ public class QuorumState { election.epoch(), partitionState.lastVoterSet(), Optional.empty(), - 1, randomElectionTimeoutMs(), logContext ); @@ -481,6 +480,9 @@ public class QuorumState { int epoch, ReplicaKey candidateKey ) { + // Verify the current state is prospective, this method should only be used to add voted state to + // prospective state. Transitions from other states to prospective use transitionToProspective instead. + prospectiveStateOrThrow(); int currentEpoch = state.epoch(); if (localId.isPresent() && candidateKey.id() == localId.getAsInt()) { throw new IllegalStateException( @@ -505,7 +507,6 @@ public class QuorumState { ); } - ProspectiveState prospectiveState = prospectiveStateOrThrow(); // Note that we reset the election timeout after voting for a candidate because we // know that the candidate has at least as good of a chance of getting elected as us durableTransitionTo( @@ -518,7 +519,6 @@ public class QuorumState { Optional.of(candidateKey), partitionState.lastVoterSet(), state.highWatermark(), - prospectiveState.retries(), randomElectionTimeoutMs(), logContext ) @@ -620,8 +620,6 @@ public class QuorumState { " is state " + state); } - int retries = isCandidate() ? candidateStateOrThrow().retries() + 1 : 1; - // Durable transition is not necessary since there is no change to the persisted electionState memoryTransitionTo( new ProspectiveState( @@ -633,7 +631,6 @@ public class QuorumState { votedKey(), partitionState.lastVoterSet(), state.highWatermark(), - retries, randomElectionTimeoutMs(), logContext ) @@ -646,8 +643,6 @@ public class QuorumState { int newEpoch = epoch() + 1; int electionTimeoutMs = randomElectionTimeoutMs(); - int retries = isProspective() ? prospectiveStateOrThrow().retries() : 1; - durableTransitionTo(new CandidateState( time, localIdOrThrow(), @@ -655,7 +650,6 @@ public class QuorumState { newEpoch, partitionState.lastVoterSet(), state.highWatermark(), - retries, electionTimeoutMs, logContext )); diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java index f0b1fb57123..caa087378c5 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java @@ -48,7 +48,6 @@ import java.net.InetSocketAddress; import java.util.Collection; import java.util.List; import java.util.Optional; -import java.util.Random; import java.util.function.Consumer; import java.util.function.UnaryOperator; import java.util.stream.Collectors; @@ -756,18 +755,4 @@ public class RaftUtil { data.topics().get(0).partitions().size() == 1 && data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); } - - static int binaryExponentialElectionBackoffMs(int backoffMaxMs, int backoffBaseMs, int retries, Random random) { - if (retries <= 0) { - throw new IllegalArgumentException("Retries " + retries + " should be larger than zero"); - } - // Takes minimum of the following: - // 1. exponential backoff calculation (maxes out at 102.4 seconds) - // 2. configurable electionBackoffMaxMs + jitter - // The jitter is added to prevent livelock of elections. - return Math.min( - backoffBaseMs * random.nextInt(1, 2 << Math.min(10, retries - 1)), - backoffMaxMs + random.nextInt(backoffBaseMs) - ); - } } diff --git a/raft/src/test/java/org/apache/kafka/raft/CandidateStateTest.java b/raft/src/test/java/org/apache/kafka/raft/CandidateStateTest.java index 4abfab9660c..1b3226064e5 100644 --- a/raft/src/test/java/org/apache/kafka/raft/CandidateStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/CandidateStateTest.java @@ -50,7 +50,6 @@ public class CandidateStateTest { epoch, voters, Optional.empty(), - 1, electionTimeoutMs, logContext ); diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 51f784ccace..95cc3c3c4fb 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -1132,14 +1132,9 @@ class KafkaRaftClientTest { context.client.poll(); context.assertVotedCandidate(epoch, localId); - // Enter the backoff period + // After election timeout, replica will become prospective again context.time.sleep(1); context.client.poll(); - context.assertVotedCandidate(epoch, localId); - - // After backoff, replica will become prospective again - context.time.sleep(context.electionBackoffMaxMs); - context.client.poll(); assertTrue(context.client.quorum().isProspective()); } @@ -1775,15 +1770,15 @@ class KafkaRaftClientTest { @ParameterizedTest @ValueSource(booleans = { true, false }) - public void testCandidateBackoffElection(boolean withKip853Rpc) throws Exception { + public void testCandidateWaitsRestOfElectionTimeoutAfterElectionLoss(boolean withKip853Rpc) throws Exception { int localId = randomReplicaId(); int otherNodeId = localId + 1; int epoch = 1; - int exponentialFactor = 85; // set it large enough so that replica will bound on jitter + int jitter = 85; Set voters = Set.of(localId, otherNodeId); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .updateRandom(r -> r.mockNextInt(exponentialFactor)) + .updateRandom(r -> r.mockNextInt(jitter)) .withKip853Rpc(withKip853Rpc) .build(); @@ -1793,12 +1788,11 @@ class KafkaRaftClientTest { context.pollUntilRequest(); context.assertVotedCandidate(epoch, localId); CandidateState candidate = context.client.quorum().candidateStateOrThrow(); - assertEquals(1, candidate.retries()); assertEquals( - context.electionTimeoutMs() + exponentialFactor, + context.electionTimeoutMs() + jitter, candidate.remainingElectionTimeMs(context.time.milliseconds()) ); - assertFalse(candidate.isBackingOff()); + assertFalse(candidate.epochElection().isVoteRejected()); // Quorum size is two. If the other member rejects, then the local replica will lose the election. RaftRequest.Outbound request = context.assertSentVoteRequest(epoch, 0, 0L, 1); @@ -1809,44 +1803,28 @@ class KafkaRaftClientTest { ); context.client.poll(); - assertTrue(candidate.isBackingOff()); - assertEquals( - context.electionBackoffMaxMs + exponentialFactor, - candidate.remainingBackoffMs(context.time.milliseconds()) - ); + assertTrue(candidate.epochElection().isVoteRejected()); // Election is lost, but local replica should still remember that it has voted context.assertVotedCandidate(epoch, localId); - // Even though candidacy was rejected, local replica will backoff for jitter period + // Even though candidacy was rejected, local replica will backoff for remaining election timeout // before transitioning to prospective and starting a new election. - context.time.sleep(context.electionBackoffMaxMs + exponentialFactor - 1); + context.time.sleep(context.electionTimeoutMs() + jitter - 1); context.client.poll(); context.assertVotedCandidate(epoch, localId); - // After jitter expires, become a prospective again + // After election timeout expires, become a prospective again context.time.sleep(1); context.client.poll(); assertTrue(context.client.quorum().isProspective()); ProspectiveState prospective = context.client.quorum().prospectiveStateOrThrow(); - assertEquals(2, prospective.retries()); context.pollUntilRequest(); - request = context.assertSentPreVoteRequest(epoch, 0, 0L, 1); + context.assertSentPreVoteRequest(epoch, 0, 0L, 1); assertEquals( - context.electionTimeoutMs() + exponentialFactor, + context.electionTimeoutMs() + jitter, prospective.remainingElectionTimeMs(context.time.milliseconds()) ); - - // After becoming candidate again, retries should be 2 - context.deliverResponse( - request.correlationId(), - request.destination(), - context.voteResponse(true, OptionalInt.empty(), 1) - ); - context.client.poll(); - context.assertVotedCandidate(epoch + 1, localId); - candidate = context.client.quorum().candidateStateOrThrow(); - assertEquals(2, candidate.retries()); } @ParameterizedTest @@ -1870,12 +1848,11 @@ class KafkaRaftClientTest { context.assertVotedCandidate(epoch, localId); context.assertSentVoteRequest(epoch, 0, 0L, 1); CandidateState candidate = context.client.quorum().candidateStateOrThrow(); - assertEquals(1, candidate.retries()); assertEquals( context.electionTimeoutMs() + jitter, candidate.remainingElectionTimeMs(context.time.milliseconds()) ); - assertFalse(candidate.isBackingOff()); + assertFalse(candidate.epochElection().isVoteRejected()); // If election times out, replica transition to prospective without any additional backoff context.time.sleep(candidate.remainingElectionTimeMs(context.time.milliseconds())); @@ -1883,7 +1860,6 @@ class KafkaRaftClientTest { assertTrue(context.client.quorum().isProspective()); ProspectiveState prospective = context.client.quorum().prospectiveStateOrThrow(); - assertEquals(2, prospective.retries()); context.pollUntilRequest(); context.assertSentPreVoteRequest(epoch, 0, 0L, 1); assertEquals( diff --git a/raft/src/test/java/org/apache/kafka/raft/MockableRandom.java b/raft/src/test/java/org/apache/kafka/raft/MockableRandom.java index 45cfd568d80..b487b160678 100644 --- a/raft/src/test/java/org/apache/kafka/raft/MockableRandom.java +++ b/raft/src/test/java/org/apache/kafka/raft/MockableRandom.java @@ -48,9 +48,4 @@ class MockableRandom extends Random { public int nextInt(int bound) { return nextIntFunction.apply(bound).orElse(super.nextInt(bound)); } - - @Override - public int nextInt(int origin, int bound) { - return nextIntFunction.apply(bound).orElse(super.nextInt(bound)); - } } diff --git a/raft/src/test/java/org/apache/kafka/raft/ProspectiveStateTest.java b/raft/src/test/java/org/apache/kafka/raft/ProspectiveStateTest.java index 2aa9e78a8d4..2f995cf8f53 100644 --- a/raft/src/test/java/org/apache/kafka/raft/ProspectiveStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/ProspectiveStateTest.java @@ -71,7 +71,6 @@ public class ProspectiveStateTest { votedKey, voters, Optional.empty(), - 1, electionTimeoutMs, logContext ); @@ -87,7 +86,6 @@ public class ProspectiveStateTest { Optional.empty(), voters, Optional.empty(), - 1, electionTimeoutMs, logContext ); diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java index 4b84046513b..89faf338721 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java @@ -59,22 +59,15 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; -import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.stream.Stream; -import static org.apache.kafka.raft.KafkaRaftClient.RETRY_BACKOFF_BASE_MS; -import static org.apache.kafka.raft.RaftUtil.binaryExponentialElectionBackoffMs; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; public class RaftUtilTest { @@ -628,60 +621,6 @@ public class RaftUtilTest { assertEquals(expectedJson, json.toString()); } - @ParameterizedTest - @ValueSource(ints = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}) - public void testExponentialBoundOfExponentialElectionBackoffMs(int retries) { - Random mockedRandom = Mockito.mock(Random.class); - int electionBackoffMaxMs = 1000; - - // test the bound of the method's first call to random.nextInt - binaryExponentialElectionBackoffMs(electionBackoffMaxMs, RETRY_BACKOFF_BASE_MS, retries, mockedRandom); - ArgumentCaptor nextIntCaptor = ArgumentCaptor.forClass(Integer.class); - Mockito.verify(mockedRandom).nextInt(Mockito.eq(1), nextIntCaptor.capture()); - int actualBound = nextIntCaptor.getValue(); - int expectedBound = (int) (2 * Math.pow(2, retries - 1)); - // after the 10th retry, the bound of the first call to random.nextInt will remain capped to - // (RETRY_BACKOFF_BASE_MS * 2 << 10)=2048 to prevent overflow - if (retries > 10) { - expectedBound = 2048; - } - assertEquals(expectedBound, actualBound, "Incorrect bound for retries=" + retries); - } - - // test that the return value of the method is capped to QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG + jitter - // any exponential >= (1000 + jitter)/(RETRY_BACKOFF_BASE_MS)=21 will result in this cap - @ParameterizedTest - @ValueSource(ints = {1, 2, 20, 21, 22, 2048}) - public void testExponentialElectionBackoffMsIsCapped(int exponential) { - Random mockedRandom = Mockito.mock(Random.class); - int electionBackoffMaxMs = 1000; - // this is the max bound of the method's first call to random.nextInt - int firstNextIntMaxBound = 2048; - - int jitterMs = 50; - Mockito.when(mockedRandom.nextInt(1, firstNextIntMaxBound)).thenReturn(exponential); - Mockito.when(mockedRandom.nextInt(RETRY_BACKOFF_BASE_MS)).thenReturn(jitterMs); - - int returnedBackoffMs = binaryExponentialElectionBackoffMs(electionBackoffMaxMs, RETRY_BACKOFF_BASE_MS, 11, mockedRandom); - - // verify nextInt was called on both expected bounds - ArgumentCaptor nextIntCaptor = ArgumentCaptor.forClass(Integer.class); - Mockito.verify(mockedRandom).nextInt(Mockito.eq(1), nextIntCaptor.capture()); - Mockito.verify(mockedRandom).nextInt(nextIntCaptor.capture()); - List allCapturedBounds = nextIntCaptor.getAllValues(); - assertEquals(firstNextIntMaxBound, allCapturedBounds.get(0)); - assertEquals(RETRY_BACKOFF_BASE_MS, allCapturedBounds.get(1)); - - // finally verify the backoff returned is capped to electionBackoffMaxMs + jitterMs - int backoffValueCap = electionBackoffMaxMs + jitterMs; - if (exponential < 21) { - assertEquals(RETRY_BACKOFF_BASE_MS * exponential, returnedBackoffMs); - assertTrue(returnedBackoffMs < backoffValueCap); - } else { - assertEquals(backoffValueCap, returnedBackoffMs); - } - } - private Records createRecords() { ByteBuffer allocate = ByteBuffer.allocate(1024);