diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index d1b1d3477cc..2bf20369090 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -166,7 +166,8 @@ import static org.apache.kafka.snapshot.Snapshots.BOOTSTRAP_SNAPSHOT_ID; */ @SuppressWarnings({ "ClassDataAbstractionCoupling", "ClassFanOutComplexity", "ParameterNumber", "NPathComplexity" }) public final class KafkaRaftClient implements RaftClient { - private static final int RETRY_BACKOFF_BASE_MS = 100; + // visible for testing + static final int RETRY_BACKOFF_BASE_MS = 50; private static final int MAX_NUMBER_OF_BATCHES = 10; public static final int MAX_FETCH_WAIT_MS = 500; public static final int MAX_BATCH_SIZE_BYTES = 8 * 1024 * 1024; @@ -1027,7 +1028,12 @@ public final class KafkaRaftClient implements RaftClient { // replica has failed multiple elections in succession. candidate.startBackingOff( currentTimeMs, - binaryExponentialElectionBackoffMs(candidate.retries()) + RaftUtil.binaryExponentialElectionBackoffMs( + quorumConfig.electionBackoffMaxMs(), + RETRY_BACKOFF_BASE_MS, + candidate.retries(), + random + ) ); } } else if (state instanceof ProspectiveState prospective) { @@ -1045,17 +1051,6 @@ public final class KafkaRaftClient implements RaftClient { } } - private int binaryExponentialElectionBackoffMs(int retries) { - if (retries <= 0) { - throw new IllegalArgumentException("Retries " + retries + " should be larger than zero"); - } - // upper limit exponential co-efficients at 20 to avoid overflow - return Math.min( - RETRY_BACKOFF_BASE_MS * random.nextInt(2 << Math.min(20, retries - 1)), - quorumConfig.electionBackoffMaxMs() - ); - } - private int strictExponentialElectionBackoffMs(int positionInSuccessors, int totalNumSuccessors) { if (positionInSuccessors == 0) { return 0; diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java index 12c48955b39..fea9846aa13 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java @@ -48,6 +48,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.Random; import java.util.function.Consumer; import java.util.function.UnaryOperator; import java.util.stream.Collectors; @@ -765,4 +766,18 @@ public class RaftUtil { data.topics().get(0).partitions().size() == 1 && data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); } + + static int binaryExponentialElectionBackoffMs(int backoffMaxMs, int backoffBaseMs, int retries, Random random) { + if (retries <= 0) { + throw new IllegalArgumentException("Retries " + retries + " should be larger than zero"); + } + // Takes minimum of the following: + // 1. exponential backoff calculation (maxes out at 102.4 seconds) + // 2. configurable electionBackoffMaxMs + jitter + // The jitter is added to prevent livelock of elections. + return Math.min( + backoffBaseMs * random.nextInt(1, 2 << Math.min(10, retries - 1)), + backoffMaxMs + random.nextInt(backoffBaseMs) + ); + } } diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 79c255efbc8..b1205bb1cdf 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -1817,7 +1817,7 @@ public class KafkaRaftClientTest { context.client.poll(); assertTrue(candidate.isBackingOff()); assertEquals( - context.electionBackoffMaxMs, + context.electionBackoffMaxMs + exponentialFactor, candidate.remainingBackoffMs(context.time.milliseconds()) ); @@ -1826,7 +1826,7 @@ public class KafkaRaftClientTest { // Even though candidacy was rejected, local replica will backoff for jitter period // before transitioning to prospective and starting a new election. - context.time.sleep(context.electionBackoffMaxMs - 1); + context.time.sleep(context.electionBackoffMaxMs + exponentialFactor - 1); context.client.poll(); context.assertVotedCandidate(epoch, localId); diff --git a/raft/src/test/java/org/apache/kafka/raft/MockableRandom.java b/raft/src/test/java/org/apache/kafka/raft/MockableRandom.java index b487b160678..45cfd568d80 100644 --- a/raft/src/test/java/org/apache/kafka/raft/MockableRandom.java +++ b/raft/src/test/java/org/apache/kafka/raft/MockableRandom.java @@ -48,4 +48,9 @@ class MockableRandom extends Random { public int nextInt(int bound) { return nextIntFunction.apply(bound).orElse(super.nextInt(bound)); } + + @Override + public int nextInt(int origin, int bound) { + return nextIntFunction.apply(bound).orElse(super.nextInt(bound)); + } } diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java index 681bcaae8de..81485adca69 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java @@ -58,16 +58,24 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.List; +import java.util.Random; import java.util.stream.Stream; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; +import static org.apache.kafka.raft.KafkaRaftClient.RETRY_BACKOFF_BASE_MS; +import static org.apache.kafka.raft.RaftUtil.binaryExponentialElectionBackoffMs; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; public class RaftUtilTest { @@ -569,6 +577,60 @@ public class RaftUtilTest { assertEquals(expectedJson, json.toString()); } + @ParameterizedTest + @ValueSource(ints = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}) + public void testExponentialBoundOfExponentialElectionBackoffMs(int retries) { + Random mockedRandom = Mockito.mock(Random.class); + int electionBackoffMaxMs = 1000; + + // test the bound of the method's first call to random.nextInt + binaryExponentialElectionBackoffMs(electionBackoffMaxMs, RETRY_BACKOFF_BASE_MS, retries, mockedRandom); + ArgumentCaptor nextIntCaptor = ArgumentCaptor.forClass(Integer.class); + Mockito.verify(mockedRandom).nextInt(Mockito.eq(1), nextIntCaptor.capture()); + int actualBound = nextIntCaptor.getValue(); + int expectedBound = (int) (2 * Math.pow(2, retries - 1)); + // after the 10th retry, the bound of the first call to random.nextInt will remain capped to + // (RETRY_BACKOFF_BASE_MS * 2 << 10)=2048 to prevent overflow + if (retries > 10) { + expectedBound = 2048; + } + assertEquals(expectedBound, actualBound, "Incorrect bound for retries=" + retries); + } + + // test that the return value of the method is capped to QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG + jitter + // any exponential >= (1000 + jitter)/(RETRY_BACKOFF_BASE_MS)=21 will result in this cap + @ParameterizedTest + @ValueSource(ints = {1, 2, 20, 21, 22, 2048}) + public void testExponentialElectionBackoffMsIsCapped(int exponential) { + Random mockedRandom = Mockito.mock(Random.class); + int electionBackoffMaxMs = 1000; + // this is the max bound of the method's first call to random.nextInt + int firstNextIntMaxBound = 2048; + + int jitterMs = 50; + Mockito.when(mockedRandom.nextInt(1, firstNextIntMaxBound)).thenReturn(exponential); + Mockito.when(mockedRandom.nextInt(RETRY_BACKOFF_BASE_MS)).thenReturn(jitterMs); + + int returnedBackoffMs = binaryExponentialElectionBackoffMs(electionBackoffMaxMs, RETRY_BACKOFF_BASE_MS, 11, mockedRandom); + + // verify nextInt was called on both expected bounds + ArgumentCaptor nextIntCaptor = ArgumentCaptor.forClass(Integer.class); + Mockito.verify(mockedRandom).nextInt(Mockito.eq(1), nextIntCaptor.capture()); + Mockito.verify(mockedRandom).nextInt(nextIntCaptor.capture()); + List allCapturedBounds = nextIntCaptor.getAllValues(); + assertEquals(firstNextIntMaxBound, allCapturedBounds.get(0)); + assertEquals(RETRY_BACKOFF_BASE_MS, allCapturedBounds.get(1)); + + // finally verify the backoff returned is capped to electionBackoffMaxMs + jitterMs + int backoffValueCap = electionBackoffMaxMs + jitterMs; + if (exponential < 21) { + assertEquals(RETRY_BACKOFF_BASE_MS * exponential, returnedBackoffMs); + assertTrue(returnedBackoffMs < backoffValueCap); + } else { + assertEquals(backoffValueCap, returnedBackoffMs); + } + } + private Records createRecords() { ByteBuffer allocate = ByteBuffer.allocate(1024);