KAFKA-18345; Prevent livelocked elections (#19658)

At the retry limit binaryExponentialElectionBackoffMs it becomes
statistically likely that the exponential backoff returned
electionBackoffMaxMs. This is an issue as multiple replicas can get
stuck starting elections at the same cadence.

This change fixes that by added a random jitter to the max election
backoff.

Reviewers: José Armando García Sancio <jsancio@apache.org>, TaiJuWu
 <tjwu1217@gmail.com>, Yung <yungyung7654321@gmail.com>
This commit is contained in:
Alyssa Huang 2025-05-12 13:23:18 -07:00 committed by José Armando García Sancio
parent 6da199903e
commit ab1a4053ad
4 changed files with 124 additions and 15 deletions

View File

@ -149,8 +149,9 @@ import static org.apache.kafka.raft.RaftUtil.hasValidTopicPartition;
* as FileRecords, but we use {@link UnalignedRecords} in FetchSnapshotResponse because the records
* are not necessarily offset-aligned.
*/
final public class KafkaRaftClient<T> implements RaftClient<T> {
private static final int RETRY_BACKOFF_BASE_MS = 100;
public final class KafkaRaftClient<T> implements RaftClient<T> {
// visible for testing
static final int RETRY_BACKOFF_BASE_MS = 50;
public static final int MAX_FETCH_WAIT_MS = 500;
public static final int MAX_BATCH_SIZE_BYTES = 8 * 1024 * 1024;
public static final int MAX_FETCH_SIZE_BYTES = MAX_BATCH_SIZE_BYTES;
@ -738,7 +739,12 @@ final public class KafkaRaftClient<T> implements RaftClient<T> {
state.startBackingOff(
currentTimeMs,
binaryExponentialElectionBackoffMs(state.retries())
RaftUtil.binaryExponentialElectionBackoffMs(
quorumConfig.electionBackoffMaxMs(),
RETRY_BACKOFF_BASE_MS,
state.retries(),
random
)
);
}
}
@ -752,15 +758,6 @@ final public class KafkaRaftClient<T> implements RaftClient<T> {
}
}
private int binaryExponentialElectionBackoffMs(int retries) {
if (retries <= 0) {
throw new IllegalArgumentException("Retries " + retries + " should be larger than zero");
}
// upper limit exponential co-efficients at 20 to avoid overflow
return Math.min(RETRY_BACKOFF_BASE_MS * random.nextInt(2 << Math.min(20, retries - 1)),
quorumConfig.electionBackoffMaxMs());
}
private int strictExponentialElectionBackoffMs(int positionInSuccessors, int totalNumSuccessors) {
if (positionInSuccessors <= 0 || positionInSuccessors >= totalNumSuccessors) {
throw new IllegalArgumentException("Position " + positionInSuccessors + " should be larger than zero" +
@ -2180,7 +2177,12 @@ final public class KafkaRaftClient<T> implements RaftClient<T> {
}
return state.remainingBackoffMs(currentTimeMs);
} else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
long backoffDurationMs = binaryExponentialElectionBackoffMs(state.retries());
long backoffDurationMs = RaftUtil.binaryExponentialElectionBackoffMs(
quorumConfig.electionBackoffMaxMs(),
RETRY_BACKOFF_BASE_MS,
state.retries(),
random
);
logger.info("Election has timed out, backing off for {}ms before becoming a candidate again",
backoffDurationMs);
state.startBackingOff(currentTimeMs, backoffDurationMs);

View File

@ -33,6 +33,7 @@ import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import java.util.Collections;
import java.util.Random;
import java.util.function.Consumer;
import static java.util.Collections.singletonList;
@ -162,4 +163,18 @@ public class RaftUtil {
data.topics().get(0).partitions().size() == 1 &&
data.topics().get(0).partitions().get(0).partitionIndex() == topicPartition.partition();
}
static int binaryExponentialElectionBackoffMs(int backoffMaxMs, int backoffBaseMs, int retries, Random random) {
if (retries <= 0) {
throw new IllegalArgumentException("Retries " + retries + " should be larger than zero");
}
// Takes minimum of the following:
// 1. exponential backoff calculation (maxes out at 102.5 seconds with backoffBaseMs of 50ms)
// 2. configurable electionBackoffMaxMs + jitter
// The jitter is added to prevent livelock of elections.
return Math.min(
backoffBaseMs * (1 + random.nextInt(2 << Math.min(10, retries - 1))),
backoffMaxMs + random.nextInt(backoffBaseMs)
);
}
}

View File

@ -843,7 +843,7 @@ public class KafkaRaftClientTest {
context.assertVotedCandidate(epoch, localId);
// After backoff, we will become a candidate again
context.time.sleep(context.electionBackoffMaxMs);
context.time.sleep(context.electionBackoffMaxMs + jitterMs);
context.client.poll();
context.assertVotedCandidate(epoch + 1, localId);
}
@ -1364,6 +1364,7 @@ public class KafkaRaftClientTest {
context.time.sleep(2 * context.electionTimeoutMs());
context.pollUntilRequest();
context.assertVotedCandidate(epoch, localId);
CandidateState candidate = context.client.quorum().candidateStateOrThrow();
// Quorum size is two. If the other member rejects, then we need to schedule a revote.
RaftRequest.Outbound request = context.assertSentVoteRequest(epoch, 0, 0L, 1);
@ -1374,13 +1375,18 @@ public class KafkaRaftClientTest {
);
context.client.poll();
assertTrue(candidate.isBackingOff());
assertEquals(
context.electionBackoffMaxMs + exponentialFactor,
candidate.remainingBackoffMs(context.time.milliseconds())
);
// All nodes have rejected our candidacy, but we should still remember that we had voted
context.assertVotedCandidate(epoch, localId);
// Even though our candidacy was rejected, we will backoff for jitter period
// before we bump the epoch and start a new election.
context.time.sleep(context.electionBackoffMaxMs - 1);
context.time.sleep(context.electionBackoffMaxMs + exponentialFactor - 1);
context.client.poll();
context.assertVotedCandidate(epoch, localId);

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.raft;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.util.List;
import java.util.Random;
import static org.apache.kafka.raft.KafkaRaftClient.RETRY_BACKOFF_BASE_MS;
import static org.apache.kafka.raft.RaftUtil.binaryExponentialElectionBackoffMs;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class RaftUtilTest {
@ParameterizedTest
@ValueSource(ints = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13})
public void testExponentialBoundOfExponentialElectionBackoffMs(int retries) {
Random mockedRandom = Mockito.mock(Random.class);
int electionBackoffMaxMs = 1000;
// test the bound of the method's first call to random.nextInt
binaryExponentialElectionBackoffMs(electionBackoffMaxMs, RETRY_BACKOFF_BASE_MS, retries, mockedRandom);
ArgumentCaptor<Integer> nextIntCaptor = ArgumentCaptor.forClass(Integer.class);
Mockito.verify(mockedRandom, Mockito.times(2)).nextInt(nextIntCaptor.capture());
List<Integer> allCapturedBounds = nextIntCaptor.getAllValues();
int actualBound = allCapturedBounds.get(0);
int expectedBound = (int) (2 * Math.pow(2, retries - 1));
// after the 10th retry, the bound of the first call to random.nextInt will remain capped to
// (RETRY_BACKOFF_BASE_MS * 2 << 10)=2048 to prevent overflow
if (retries > 10) {
expectedBound = 2048;
}
assertEquals(expectedBound, actualBound, "Incorrect bound for retries=" + retries);
}
// test that the return value of the method is capped to QUORUM_ELECTION_BACKOFF_MAX_MS_CONFIG + jitter
// any exponential >= (1000 + jitter)/(RETRY_BACKOFF_BASE_MS) - 1 = 20 will result in this cap
@ParameterizedTest
@ValueSource(ints = {1, 2, 19, 20, 21, 2048})
public void testExponentialElectionBackoffMsIsCapped(int exponential) {
Random mockedRandom = Mockito.mock(Random.class);
int electionBackoffMaxMs = 1000;
// this is the max bound of the method's first call to random.nextInt
int firstNextIntMaxBound = 2048;
int jitterMs = 50;
Mockito.when(mockedRandom.nextInt(firstNextIntMaxBound)).thenReturn(exponential);
Mockito.when(mockedRandom.nextInt(RETRY_BACKOFF_BASE_MS)).thenReturn(jitterMs);
int returnedBackoffMs = binaryExponentialElectionBackoffMs(electionBackoffMaxMs, RETRY_BACKOFF_BASE_MS, 11, mockedRandom);
// verify nextInt was called on both expected bounds
ArgumentCaptor<Integer> nextIntCaptor = ArgumentCaptor.forClass(Integer.class);
Mockito.verify(mockedRandom, Mockito.times(2)).nextInt(nextIntCaptor.capture());
List<Integer> allCapturedBounds = nextIntCaptor.getAllValues();
assertEquals(firstNextIntMaxBound, allCapturedBounds.get(0));
assertEquals(RETRY_BACKOFF_BASE_MS, allCapturedBounds.get(1));
// finally verify the backoff returned is capped to electionBackoffMaxMs + jitterMs
int backoffValueCap = electionBackoffMaxMs + jitterMs;
if (exponential < 20) {
assertEquals(RETRY_BACKOFF_BASE_MS * (exponential + 1), returnedBackoffMs);
assertTrue(returnedBackoffMs < backoffValueCap);
} else {
assertEquals(backoffValueCap, returnedBackoffMs);
}
}
}