KAFKA-18345; Prevent livelocked elections (#19658)
CI / build (push) Has been cancelled Details

At the retry limit binaryExponentialElectionBackoffMs it becomes
statistically likely that the exponential backoff returned
electionBackoffMaxMs. This is an issue as multiple replicas can get
stuck starting elections at the same cadence.

This change fixes that by added a random jitter to the max election
backoff.

Reviewers: José Armando García Sancio <jsancio@apache.org>, TaiJuWu
 <tjwu1217@gmail.com>, Yung <yungyung7654321@gmail.com>
This commit is contained in:
Alyssa Huang 2025-05-12 13:23:18 -07:00 committed by José Armando García Sancio
parent 14fd498ed0
commit 3170e1130c
5 changed files with 92 additions and 15 deletions

View File

@ -166,7 +166,8 @@ import static org.apache.kafka.snapshot.Snapshots.BOOTSTRAP_SNAPSHOT_ID;
*/
@SuppressWarnings({ "ClassDataAbstractionCoupling", "ClassFanOutComplexity", "ParameterNumber", "NPathComplexity" })
public final class KafkaRaftClient<T> implements RaftClient<T> {
private static final int RETRY_BACKOFF_BASE_MS = 100;
// visible for testing
static final int RETRY_BACKOFF_BASE_MS = 50;
private static final int MAX_NUMBER_OF_BATCHES = 10;
public static final int MAX_FETCH_WAIT_MS = 500;
public static final int MAX_BATCH_SIZE_BYTES = 8 * 1024 * 1024;
@ -1027,7 +1028,12 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
// replica has failed multiple elections in succession.
candidate.startBackingOff(
currentTimeMs,
binaryExponentialElectionBackoffMs(candidate.retries())
RaftUtil.binaryExponentialElectionBackoffMs(
quorumConfig.electionBackoffMaxMs(),
RETRY_BACKOFF_BASE_MS,
candidate.retries(),
random
)
);
}
} else if (state instanceof ProspectiveState prospective) {
@ -1045,17 +1051,6 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
}
}
private int binaryExponentialElectionBackoffMs(int retries) {
if (retries <= 0) {
throw new IllegalArgumentException("Retries " + retries + " should be larger than zero");
}
// upper limit exponential co-efficients at 20 to avoid overflow
return Math.min(
RETRY_BACKOFF_BASE_MS * random.nextInt(2 << Math.min(20, retries - 1)),
quorumConfig.electionBackoffMaxMs()
);
}
private int strictExponentialElectionBackoffMs(int positionInSuccessors, int totalNumSuccessors) {
if (positionInSuccessors == 0) {
return 0;

View File

@ -48,6 +48,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
@ -765,4 +766,18 @@ public class RaftUtil {
data.topics().get(0).partitions().size() == 1 &&
data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition();
}
static int binaryExponentialElectionBackoffMs(int backoffMaxMs, int backoffBaseMs, int retries, Random random) {
if (retries <= 0) {
throw new IllegalArgumentException("Retries " + retries + " should be larger than zero");
}
// Takes minimum of the following:
// 1. exponential backoff calculation (maxes out at 102.4 seconds)
// 2. configurable electionBackoffMaxMs + jitter
// The jitter is added to prevent livelock of elections.
return Math.min(
backoffBaseMs * random.nextInt(1, 2 << Math.min(10, retries - 1)),
backoffMaxMs + random.nextInt(backoffBaseMs)
);
}
}

View File

@ -1817,7 +1817,7 @@ public class KafkaRaftClientTest {
context.client.poll();
assertTrue(candidate.isBackingOff());
assertEquals(
context.electionBackoffMaxMs,
context.electionBackoffMaxMs + exponentialFactor,
candidate.remainingBackoffMs(context.time.milliseconds())
);
@ -1826,7 +1826,7 @@ public class KafkaRaftClientTest {
// Even though candidacy was rejected, local replica will backoff for jitter period
// before transitioning to prospective and starting a new election.
context.time.sleep(context.electionBackoffMaxMs - 1);
context.time.sleep(context.electionBackoffMaxMs + exponentialFactor - 1);
context.client.poll();
context.assertVotedCandidate(epoch, localId);

View File

@ -48,4 +48,9 @@ class MockableRandom extends Random {
public int nextInt(int bound) {
return nextIntFunction.apply(bound).orElse(super.nextInt(bound));
}
@Override
public int nextInt(int origin, int bound) {
return nextIntFunction.apply(bound).orElse(super.nextInt(bound));
}
}

View File

@ -58,16 +58,24 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.stream.Stream;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.raft.KafkaRaftClient.RETRY_BACKOFF_BASE_MS;
import static org.apache.kafka.raft.RaftUtil.binaryExponentialElectionBackoffMs;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class RaftUtilTest {
@ -569,6 +577,60 @@ public class RaftUtilTest {
assertEquals(expectedJson, json.toString());
}
@ParameterizedTest
@ValueSource(ints = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13})
public void testExponentialBoundOfExponentialElectionBackoffMs(int retries) {
Random mockedRandom = Mockito.mock(Random.class);
int electionBackoffMaxMs = 1000;
// test the bound of the method's first call to random.nextInt
binaryExponentialElectionBackoffMs(electionBackoffMaxMs, RETRY_BACKOFF_BASE_MS, retries, mockedRandom);
ArgumentCaptor<Integer> nextIntCaptor = ArgumentCaptor.forClass(Integer.class);
Mockito.verify(mockedRandom).nextInt(Mockito.eq(1), nextIntCaptor.capture());
int actualBound = nextIntCaptor.getValue();
int expectedBound = (int) (2 * Math.pow(2, retries - 1));
// after the 10th retry, the bound of the first call to random.nextInt will remain capped to
// (RETRY_BACKOFF_BASE_MS * 2 << 10)=2048 to prevent overflow
if (retries > 10) {
expectedBound = 2048;
}
assertEquals(expectedBound, actualBound, "Incorrect bound for retries=" + retries);
}
// test that the return value of the method is capped to QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG + jitter
// any exponential >= (1000 + jitter)/(RETRY_BACKOFF_BASE_MS)=21 will result in this cap
@ParameterizedTest
@ValueSource(ints = {1, 2, 20, 21, 22, 2048})
public void testExponentialElectionBackoffMsIsCapped(int exponential) {
Random mockedRandom = Mockito.mock(Random.class);
int electionBackoffMaxMs = 1000;
// this is the max bound of the method's first call to random.nextInt
int firstNextIntMaxBound = 2048;
int jitterMs = 50;
Mockito.when(mockedRandom.nextInt(1, firstNextIntMaxBound)).thenReturn(exponential);
Mockito.when(mockedRandom.nextInt(RETRY_BACKOFF_BASE_MS)).thenReturn(jitterMs);
int returnedBackoffMs = binaryExponentialElectionBackoffMs(electionBackoffMaxMs, RETRY_BACKOFF_BASE_MS, 11, mockedRandom);
// verify nextInt was called on both expected bounds
ArgumentCaptor<Integer> nextIntCaptor = ArgumentCaptor.forClass(Integer.class);
Mockito.verify(mockedRandom).nextInt(Mockito.eq(1), nextIntCaptor.capture());
Mockito.verify(mockedRandom).nextInt(nextIntCaptor.capture());
List<Integer> allCapturedBounds = nextIntCaptor.getAllValues();
assertEquals(firstNextIntMaxBound, allCapturedBounds.get(0));
assertEquals(RETRY_BACKOFF_BASE_MS, allCapturedBounds.get(1));
// finally verify the backoff returned is capped to electionBackoffMaxMs + jitterMs
int backoffValueCap = electionBackoffMaxMs + jitterMs;
if (exponential < 21) {
assertEquals(RETRY_BACKOFF_BASE_MS * exponential, returnedBackoffMs);
assertTrue(returnedBackoffMs < backoffValueCap);
} else {
assertEquals(backoffValueCap, returnedBackoffMs);
}
}
private Records createRecords() {
ByteBuffer allocate = ByteBuffer.allocate(1024);