From c28f46459ac26693b3639fca26c0961283c2ee65 Mon Sep 17 00:00:00 2001 From: Alyssa Huang Date: Mon, 12 May 2025 13:23:18 -0700 Subject: [PATCH] KAFKA-18345; Prevent livelocked elections (#19658) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit At the retry limit binaryExponentialElectionBackoffMs it becomes statistically likely that the exponential backoff returned electionBackoffMaxMs. This is an issue as multiple replicas can get stuck starting elections at the same cadence. This change fixes that by added a random jitter to the max election backoff. Reviewers: José Armando García Sancio , TaiJuWu , Yung --- .../apache/kafka/raft/KafkaRaftClient.java | 21 +++---- .../java/org/apache/kafka/raft/RaftUtil.java | 15 +++++ .../kafka/raft/KafkaRaftClientTest.java | 4 +- .../org/apache/kafka/raft/MockableRandom.java | 5 ++ .../org/apache/kafka/raft/RaftUtilTest.java | 61 +++++++++++++++++++ 5 files changed, 91 insertions(+), 15 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 40864bb08c9..5f2db26d26f 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -168,7 +168,8 @@ import static org.apache.kafka.snapshot.Snapshots.BOOTSTRAP_SNAPSHOT_ID; */ @SuppressWarnings({ "ClassDataAbstractionCoupling", "ClassFanOutComplexity", "ParameterNumber", "NPathComplexity", "JavaNCSS" }) public final class KafkaRaftClient implements RaftClient { - private static final int RETRY_BACKOFF_BASE_MS = 100; + // visible for testing + static final int RETRY_BACKOFF_BASE_MS = 50; private static final int MAX_NUMBER_OF_BATCHES = 10; public static final int MAX_FETCH_WAIT_MS = 500; public static final int MAX_BATCH_SIZE_BYTES = 8 * 1024 * 1024; @@ -1030,7 +1031,12 @@ public final class KafkaRaftClient implements RaftClient { // replica has failed multiple elections in succession. candidate.startBackingOff( currentTimeMs, - binaryExponentialElectionBackoffMs(candidate.retries()) + RaftUtil.binaryExponentialElectionBackoffMs( + quorumConfig.electionBackoffMaxMs(), + RETRY_BACKOFF_BASE_MS, + candidate.retries(), + random + ) ); } } else if (state instanceof ProspectiveState prospective) { @@ -1048,17 +1054,6 @@ public final class KafkaRaftClient implements RaftClient { } } - private int binaryExponentialElectionBackoffMs(int retries) { - if (retries <= 0) { - throw new IllegalArgumentException("Retries " + retries + " should be larger than zero"); - } - // upper limit exponential co-efficients at 20 to avoid overflow - return Math.min( - RETRY_BACKOFF_BASE_MS * random.nextInt(2 << Math.min(20, retries - 1)), - quorumConfig.electionBackoffMaxMs() - ); - } - private int strictExponentialElectionBackoffMs(int positionInSuccessors, int totalNumSuccessors) { if (positionInSuccessors == 0) { return 0; diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java index caa087378c5..f0b1fb57123 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java @@ -48,6 +48,7 @@ import java.net.InetSocketAddress; import java.util.Collection; import java.util.List; import java.util.Optional; +import java.util.Random; import java.util.function.Consumer; import java.util.function.UnaryOperator; import java.util.stream.Collectors; @@ -755,4 +756,18 @@ public class RaftUtil { data.topics().get(0).partitions().size() == 1 && data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition(); } + + static int binaryExponentialElectionBackoffMs(int backoffMaxMs, int backoffBaseMs, int retries, Random random) { + if (retries <= 0) { + throw new IllegalArgumentException("Retries " + retries + " should be larger than zero"); + } + // Takes minimum of the following: + // 1. exponential backoff calculation (maxes out at 102.4 seconds) + // 2. configurable electionBackoffMaxMs + jitter + // The jitter is added to prevent livelock of elections. + return Math.min( + backoffBaseMs * random.nextInt(1, 2 << Math.min(10, retries - 1)), + backoffMaxMs + random.nextInt(backoffBaseMs) + ); + } } diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index a1e92f6afcf..51f784ccace 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -1811,7 +1811,7 @@ class KafkaRaftClientTest { context.client.poll(); assertTrue(candidate.isBackingOff()); assertEquals( - context.electionBackoffMaxMs, + context.electionBackoffMaxMs + exponentialFactor, candidate.remainingBackoffMs(context.time.milliseconds()) ); @@ -1820,7 +1820,7 @@ class KafkaRaftClientTest { // Even though candidacy was rejected, local replica will backoff for jitter period // before transitioning to prospective and starting a new election. - context.time.sleep(context.electionBackoffMaxMs - 1); + context.time.sleep(context.electionBackoffMaxMs + exponentialFactor - 1); context.client.poll(); context.assertVotedCandidate(epoch, localId); diff --git a/raft/src/test/java/org/apache/kafka/raft/MockableRandom.java b/raft/src/test/java/org/apache/kafka/raft/MockableRandom.java index b487b160678..45cfd568d80 100644 --- a/raft/src/test/java/org/apache/kafka/raft/MockableRandom.java +++ b/raft/src/test/java/org/apache/kafka/raft/MockableRandom.java @@ -48,4 +48,9 @@ class MockableRandom extends Random { public int nextInt(int bound) { return nextIntFunction.apply(bound).orElse(super.nextInt(bound)); } + + @Override + public int nextInt(int origin, int bound) { + return nextIntFunction.apply(bound).orElse(super.nextInt(bound)); + } } diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java index 89faf338721..4b84046513b 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java @@ -59,15 +59,22 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.stream.Stream; +import static org.apache.kafka.raft.KafkaRaftClient.RETRY_BACKOFF_BASE_MS; +import static org.apache.kafka.raft.RaftUtil.binaryExponentialElectionBackoffMs; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; public class RaftUtilTest { @@ -621,6 +628,60 @@ public class RaftUtilTest { assertEquals(expectedJson, json.toString()); } + @ParameterizedTest + @ValueSource(ints = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}) + public void testExponentialBoundOfExponentialElectionBackoffMs(int retries) { + Random mockedRandom = Mockito.mock(Random.class); + int electionBackoffMaxMs = 1000; + + // test the bound of the method's first call to random.nextInt + binaryExponentialElectionBackoffMs(electionBackoffMaxMs, RETRY_BACKOFF_BASE_MS, retries, mockedRandom); + ArgumentCaptor nextIntCaptor = ArgumentCaptor.forClass(Integer.class); + Mockito.verify(mockedRandom).nextInt(Mockito.eq(1), nextIntCaptor.capture()); + int actualBound = nextIntCaptor.getValue(); + int expectedBound = (int) (2 * Math.pow(2, retries - 1)); + // after the 10th retry, the bound of the first call to random.nextInt will remain capped to + // (RETRY_BACKOFF_BASE_MS * 2 << 10)=2048 to prevent overflow + if (retries > 10) { + expectedBound = 2048; + } + assertEquals(expectedBound, actualBound, "Incorrect bound for retries=" + retries); + } + + // test that the return value of the method is capped to QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG + jitter + // any exponential >= (1000 + jitter)/(RETRY_BACKOFF_BASE_MS)=21 will result in this cap + @ParameterizedTest + @ValueSource(ints = {1, 2, 20, 21, 22, 2048}) + public void testExponentialElectionBackoffMsIsCapped(int exponential) { + Random mockedRandom = Mockito.mock(Random.class); + int electionBackoffMaxMs = 1000; + // this is the max bound of the method's first call to random.nextInt + int firstNextIntMaxBound = 2048; + + int jitterMs = 50; + Mockito.when(mockedRandom.nextInt(1, firstNextIntMaxBound)).thenReturn(exponential); + Mockito.when(mockedRandom.nextInt(RETRY_BACKOFF_BASE_MS)).thenReturn(jitterMs); + + int returnedBackoffMs = binaryExponentialElectionBackoffMs(electionBackoffMaxMs, RETRY_BACKOFF_BASE_MS, 11, mockedRandom); + + // verify nextInt was called on both expected bounds + ArgumentCaptor nextIntCaptor = ArgumentCaptor.forClass(Integer.class); + Mockito.verify(mockedRandom).nextInt(Mockito.eq(1), nextIntCaptor.capture()); + Mockito.verify(mockedRandom).nextInt(nextIntCaptor.capture()); + List allCapturedBounds = nextIntCaptor.getAllValues(); + assertEquals(firstNextIntMaxBound, allCapturedBounds.get(0)); + assertEquals(RETRY_BACKOFF_BASE_MS, allCapturedBounds.get(1)); + + // finally verify the backoff returned is capped to electionBackoffMaxMs + jitterMs + int backoffValueCap = electionBackoffMaxMs + jitterMs; + if (exponential < 21) { + assertEquals(RETRY_BACKOFF_BASE_MS * exponential, returnedBackoffMs); + assertTrue(returnedBackoffMs < backoffValueCap); + } else { + assertEquals(backoffValueCap, returnedBackoffMs); + } + } + private Records createRecords() { ByteBuffer allocate = ByteBuffer.allocate(1024);