KAFKA-16536; Use BeginQuorumEpoch as leader heartbeat (#16399)

With KIP-853, the leader's endpoint is sent to the other voters using the BeginQuorumEpoch RPC. The remote replicas never store the leader's endpoint. That means that leaders need to resend the leader's endpoint if a voter restarts.

This change accomplishes this by sending the BeginQuorumEpoch as a heartbeat. The period is sent to the half the fetch timeout to prevent voters from transitioning to the candidate state when restarting.

Reviewers: José Armando García Sancio <jsancio@apache.org>
This commit is contained in:
Alyssa Huang 2024-06-28 07:27:30 -07:00 committed by GitHub
parent e57cbe0346
commit b0054f3a2f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 108 additions and 17 deletions

View File

@ -2319,6 +2319,28 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
return timeUntilDrain;
}
private long maybeSendBeginQuorumEpochRequests(
LeaderState<T> state,
long currentTimeMs
) {
long timeUntilNextBeginQuorumSend = state.timeUntilBeginQuorumEpochTimerExpires(currentTimeMs);
if (timeUntilNextBeginQuorumSend == 0) {
VoterSet lastVoterSet = partitionState.lastVoterSet();
timeUntilNextBeginQuorumSend = maybeSendRequests(
currentTimeMs,
lastVoterSet.voterNodes(lastVoterSet.voterIds().stream().filter(id -> id != quorum.localIdOrThrow()), channel.listenerName()),
this::buildBeginQuorumEpochRequest
);
state.resetBeginQuorumEpochTimer(currentTimeMs);
logger.trace(
"Attempted to send BeginQuorumEpochRequest as heartbeat to all voters. " +
"Request can be retried in {} ms",
timeUntilNextBeginQuorumSend
);
}
return timeUntilNextBeginQuorumSend;
}
private long pollResigned(long currentTimeMs) {
ResignedState state = quorum.resignedStateOrThrow();
long endQuorumBackoffMs = maybeSendRequests(
@ -2360,15 +2382,12 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
currentTimeMs
);
long timeUntilSend = maybeSendRequests(
currentTimeMs,
partitionState
.lastVoterSet()
.voterNodes(state.nonAcknowledgingVoters().stream(), channel.listenerName()),
this::buildBeginQuorumEpochRequest
long timeUntilNextBeginQuorumSend = maybeSendBeginQuorumEpochRequests(
state,
currentTimeMs
);
return Math.min(timeUntilFlush, Math.min(timeUntilSend, timeUntilCheckQuorumExpires));
return Math.min(timeUntilFlush, Math.min(timeUntilNextBeginQuorumSend, timeUntilCheckQuorumExpires));
}
private long maybeSendVoteRequests(

View File

@ -68,6 +68,8 @@ public class LeaderState<T> implements EpochState {
private final Set<Integer> fetchedVoters = new HashSet<>();
private final Timer checkQuorumTimer;
private final int checkQuorumTimeoutMs;
private final Timer beginQuorumEpochTimer;
private final int beginQuorumEpochTimeoutMs;
// This is volatile because resignation can be requested from an external thread.
private volatile boolean resignRequested = false;
@ -102,6 +104,18 @@ public class LeaderState<T> implements EpochState {
// use the 1.5x of fetch timeout to tolerate some network transition time or other IO time.
this.checkQuorumTimeoutMs = (int) (fetchTimeoutMs * CHECK_QUORUM_TIMEOUT_FACTOR);
this.checkQuorumTimer = time.timer(checkQuorumTimeoutMs);
this.beginQuorumEpochTimeoutMs = fetchTimeoutMs / 2;
this.beginQuorumEpochTimer = time.timer(0);
}
public long timeUntilBeginQuorumEpochTimerExpires(long currentTimeMs) {
beginQuorumEpochTimer.update(currentTimeMs);
return beginQuorumEpochTimer.remainingMs();
}
public void resetBeginQuorumEpochTimer(long currentTimeMs) {
beginQuorumEpochTimer.update(currentTimeMs);
beginQuorumEpochTimer.reset(beginQuorumEpochTimeoutMs);
}
/**
@ -224,7 +238,8 @@ public class LeaderState<T> implements EpochState {
return this.grantingVoters;
}
public Set<Integer> nonAcknowledgingVoters() {
// visible for testing
Set<Integer> nonAcknowledgingVoters() {
Set<Integer> nonAcknowledging = new HashSet<>();
for (ReplicaState state : voterStates.values()) {
if (!state.hasAcknowledgedLeader)

View File

@ -546,6 +546,33 @@ public class KafkaRaftClientTest {
context.listener.currentLeaderAndEpoch());
}
@Test
public void testBeginQuorumHeartbeat() throws Exception {
int localId = 0;
int remoteId1 = 1;
int remoteId2 = 2;
Set<Integer> voters = Utils.mkSet(localId, remoteId1, remoteId2);
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build();
context.becomeLeader();
assertEquals(OptionalInt.of(localId), context.currentLeader());
// begin epoch requests should be sent out every beginQuorumEpochTimeoutMs
context.time.sleep(context.beginQuorumEpochTimeoutMs);
context.client.poll();
context.assertSentBeginQuorumEpochRequest(context.currentEpoch(), Utils.mkSet(remoteId1, remoteId2));
int partialDelay = context.beginQuorumEpochTimeoutMs / 2;
context.time.sleep(partialDelay);
context.client.poll();
context.assertSentBeginQuorumEpochRequest(context.currentEpoch(), Utils.mkSet());
context.time.sleep(context.beginQuorumEpochTimeoutMs - partialDelay);
context.client.poll();
context.assertSentBeginQuorumEpochRequest(context.currentEpoch(), Utils.mkSet(remoteId1, remoteId2));
}
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testLeaderShouldResignLeadershipIfNotGetFetchRequestFromMajorityVoters(boolean withKip853Rpc) throws Exception {
@ -774,7 +801,7 @@ public class KafkaRaftClientTest {
// Send BeginQuorumEpoch to voters
context.client.poll();
context.assertSentBeginQuorumEpochRequest(1, 1);
context.assertSentBeginQuorumEpochRequest(1, Utils.mkSet(otherNodeId));
Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
RecordBatch batch = records.batches().iterator().next();
@ -818,7 +845,7 @@ public class KafkaRaftClientTest {
// Send BeginQuorumEpoch to voters
context.client.poll();
context.assertSentBeginQuorumEpochRequest(1, 2);
context.assertSentBeginQuorumEpochRequest(1, Utils.mkSet(firstNodeId, secondNodeId));
Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
RecordBatch batch = records.batches().iterator().next();
@ -2918,7 +2945,7 @@ public class KafkaRaftClientTest {
context.pollUntilRequest();
// We send BeginEpoch, but it gets lost and the destination finds the leader through the Fetch API
context.assertSentBeginQuorumEpochRequest(epoch, 1);
context.assertSentBeginQuorumEpochRequest(epoch, Utils.mkSet(otherNodeKey.id()));
context.deliverRequest(context.fetchRequest(epoch, otherNodeKey, 0L, 0, 500));
@ -3132,7 +3159,10 @@ public class KafkaRaftClientTest {
context.expectAndGrantVotes(epoch);
context.pollUntilRequest();
RaftRequest.Outbound request = context.assertSentBeginQuorumEpochRequest(epoch, 1);
List<RaftRequest.Outbound> requests = context.collectBeginEpochRequests(epoch);
assertEquals(1, requests.size());
RaftRequest.Outbound request = requests.get(0);
assertEquals(otherNodeId, request.destination().id());
BeginQuorumEpochResponseData response = new BeginQuorumEpochResponseData()
.setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());

View File

@ -57,6 +57,7 @@ public class LeaderStateTest {
private final MockTime time = new MockTime();
private final int fetchTimeoutMs = 2000;
private final int checkQuorumTimeoutMs = (int) (fetchTimeoutMs * CHECK_QUORUM_TIMEOUT_FACTOR);
private final int beginQuorumEpochTimeoutMs = fetchTimeoutMs / 2;
private LeaderState<?> newLeaderState(
VoterSet voters,
@ -1098,6 +1099,30 @@ public class LeaderStateTest {
);
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testBeginQuorumEpochTimer(boolean withDirectoryId) {
int follower1 = 1;
long epochStartOffset = 10L;
VoterSet voters = localWithRemoteVoterSet(IntStream.of(follower1), withDirectoryId);
LeaderState<?> state = newLeaderState(
voters,
epochStartOffset
);
assertEquals(0, state.timeUntilBeginQuorumEpochTimerExpires(time.milliseconds()));
time.sleep(5);
state.resetBeginQuorumEpochTimer(time.milliseconds());
assertEquals(beginQuorumEpochTimeoutMs, state.timeUntilBeginQuorumEpochTimerExpires(time.milliseconds()));
time.sleep(5);
assertEquals(beginQuorumEpochTimeoutMs - 5, state.timeUntilBeginQuorumEpochTimerExpires(time.milliseconds()));
time.sleep(beginQuorumEpochTimeoutMs);
assertEquals(0, state.timeUntilBeginQuorumEpochTimerExpires(time.milliseconds()));
}
private static class MockOffsetMetadata implements OffsetMetadata {
private final String value;

View File

@ -97,6 +97,7 @@ public final class RaftClientTestContext {
final int fetchMaxWaitMs = Builder.FETCH_MAX_WAIT_MS;
final int fetchTimeoutMs = Builder.FETCH_TIMEOUT_MS;
final int checkQuorumTimeoutMs = (int) (fetchTimeoutMs * CHECK_QUORUM_TIMEOUT_FACTOR);
final int beginQuorumEpochTimeoutMs = fetchTimeoutMs / 2;
final int retryBackoffMs = Builder.RETRY_BACKOFF_MS;
private int electionTimeoutMs;
@ -671,10 +672,10 @@ public final class RaftClientTestContext {
channel.mockReceive(new RaftResponse.Inbound(correlationId, response, source));
}
RaftRequest.Outbound assertSentBeginQuorumEpochRequest(int epoch, int numBeginEpochRequests) {
void assertSentBeginQuorumEpochRequest(int epoch, Set<Integer> destinationIds) {
List<RaftRequest.Outbound> requests = collectBeginEpochRequests(epoch);
assertEquals(numBeginEpochRequests, requests.size());
return requests.get(0);
assertEquals(destinationIds.size(), requests.size());
assertEquals(destinationIds, requests.stream().map(r -> r.destination().id()).collect(Collectors.toSet()));
}
private List<RaftResponse.Outbound> drainSentResponses(
@ -1001,10 +1002,11 @@ public final class RaftClientTestContext {
assertElectedLeader(epoch, leaderId);
}
private List<RaftRequest.Outbound> collectBeginEpochRequests(int epoch) {
List<RaftRequest.Outbound> collectBeginEpochRequests(int epoch) {
List<RaftRequest.Outbound> requests = new ArrayList<>();
for (RaftRequest.Outbound raftRequest : channel.drainSentRequests(Optional.of(ApiKeys.BEGIN_QUORUM_EPOCH))) {
assertInstanceOf(BeginQuorumEpochRequestData.class, raftRequest.data());
assertNotEquals(localIdOrThrow(), raftRequest.destination().id());
BeginQuorumEpochRequestData request = (BeginQuorumEpochRequestData) raftRequest.data();
BeginQuorumEpochRequestData.PartitionData partitionRequest =