From b0054f3a2f28ebd19045a4b8a8d356987275965b Mon Sep 17 00:00:00 2001 From: Alyssa Huang Date: Fri, 28 Jun 2024 07:27:30 -0700 Subject: [PATCH] KAFKA-16536; Use BeginQuorumEpoch as leader heartbeat (#16399) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit With KIP-853, the leader's endpoint is sent to the other voters using the BeginQuorumEpoch RPC. The remote replicas never store the leader's endpoint. That means that leaders need to resend the leader's endpoint if a voter restarts. This change accomplishes this by sending the BeginQuorumEpoch as a heartbeat. The period is sent to the half the fetch timeout to prevent voters from transitioning to the candidate state when restarting. Reviewers: José Armando García Sancio --- .../apache/kafka/raft/KafkaRaftClient.java | 33 ++++++++++++---- .../org/apache/kafka/raft/LeaderState.java | 17 ++++++++- .../org/apache/kafka/raft/RequestManager.java | 2 +- .../kafka/raft/KafkaRaftClientTest.java | 38 +++++++++++++++++-- .../apache/kafka/raft/LeaderStateTest.java | 25 ++++++++++++ .../kafka/raft/RaftClientTestContext.java | 10 +++-- 6 files changed, 108 insertions(+), 17 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 23e0d96a52b..da44be4dc65 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -2319,6 +2319,28 @@ public final class KafkaRaftClient implements RaftClient { return timeUntilDrain; } + private long maybeSendBeginQuorumEpochRequests( + LeaderState state, + long currentTimeMs + ) { + long timeUntilNextBeginQuorumSend = state.timeUntilBeginQuorumEpochTimerExpires(currentTimeMs); + if (timeUntilNextBeginQuorumSend == 0) { + VoterSet lastVoterSet = partitionState.lastVoterSet(); + timeUntilNextBeginQuorumSend = maybeSendRequests( + currentTimeMs, + lastVoterSet.voterNodes(lastVoterSet.voterIds().stream().filter(id -> id != quorum.localIdOrThrow()), channel.listenerName()), + this::buildBeginQuorumEpochRequest + ); + state.resetBeginQuorumEpochTimer(currentTimeMs); + logger.trace( + "Attempted to send BeginQuorumEpochRequest as heartbeat to all voters. " + + "Request can be retried in {} ms", + timeUntilNextBeginQuorumSend + ); + } + return timeUntilNextBeginQuorumSend; + } + private long pollResigned(long currentTimeMs) { ResignedState state = quorum.resignedStateOrThrow(); long endQuorumBackoffMs = maybeSendRequests( @@ -2360,15 +2382,12 @@ public final class KafkaRaftClient implements RaftClient { currentTimeMs ); - long timeUntilSend = maybeSendRequests( - currentTimeMs, - partitionState - .lastVoterSet() - .voterNodes(state.nonAcknowledgingVoters().stream(), channel.listenerName()), - this::buildBeginQuorumEpochRequest + long timeUntilNextBeginQuorumSend = maybeSendBeginQuorumEpochRequests( + state, + currentTimeMs ); - return Math.min(timeUntilFlush, Math.min(timeUntilSend, timeUntilCheckQuorumExpires)); + return Math.min(timeUntilFlush, Math.min(timeUntilNextBeginQuorumSend, timeUntilCheckQuorumExpires)); } private long maybeSendVoteRequests( diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 031e3c8e14e..4719a65c31c 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -68,6 +68,8 @@ public class LeaderState implements EpochState { private final Set fetchedVoters = new HashSet<>(); private final Timer checkQuorumTimer; private final int checkQuorumTimeoutMs; + private final Timer beginQuorumEpochTimer; + private final int beginQuorumEpochTimeoutMs; // This is volatile because resignation can be requested from an external thread. private volatile boolean resignRequested = false; @@ -102,6 +104,18 @@ public class LeaderState implements EpochState { // use the 1.5x of fetch timeout to tolerate some network transition time or other IO time. this.checkQuorumTimeoutMs = (int) (fetchTimeoutMs * CHECK_QUORUM_TIMEOUT_FACTOR); this.checkQuorumTimer = time.timer(checkQuorumTimeoutMs); + this.beginQuorumEpochTimeoutMs = fetchTimeoutMs / 2; + this.beginQuorumEpochTimer = time.timer(0); + } + + public long timeUntilBeginQuorumEpochTimerExpires(long currentTimeMs) { + beginQuorumEpochTimer.update(currentTimeMs); + return beginQuorumEpochTimer.remainingMs(); + } + + public void resetBeginQuorumEpochTimer(long currentTimeMs) { + beginQuorumEpochTimer.update(currentTimeMs); + beginQuorumEpochTimer.reset(beginQuorumEpochTimeoutMs); } /** @@ -224,7 +238,8 @@ public class LeaderState implements EpochState { return this.grantingVoters; } - public Set nonAcknowledgingVoters() { + // visible for testing + Set nonAcknowledgingVoters() { Set nonAcknowledging = new HashSet<>(); for (ReplicaState state : voterStates.values()) { if (!state.hasAcknowledgedLeader) diff --git a/raft/src/main/java/org/apache/kafka/raft/RequestManager.java b/raft/src/main/java/org/apache/kafka/raft/RequestManager.java index 42f19eb40bc..72eab680777 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RequestManager.java +++ b/raft/src/main/java/org/apache/kafka/raft/RequestManager.java @@ -220,7 +220,7 @@ public class RequestManager { return 0; } - return state.remainingBackoffMs(timeMs); + return state.remainingBackoffMs(timeMs); } public boolean isResponseExpected(Node node, long correlationId) { diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index c1a49a2b788..ef4e85ffcc2 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -546,6 +546,33 @@ public class KafkaRaftClientTest { context.listener.currentLeaderAndEpoch()); } + @Test + public void testBeginQuorumHeartbeat() throws Exception { + int localId = 0; + int remoteId1 = 1; + int remoteId2 = 2; + Set voters = Utils.mkSet(localId, remoteId1, remoteId2); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build(); + + context.becomeLeader(); + assertEquals(OptionalInt.of(localId), context.currentLeader()); + + // begin epoch requests should be sent out every beginQuorumEpochTimeoutMs + context.time.sleep(context.beginQuorumEpochTimeoutMs); + context.client.poll(); + context.assertSentBeginQuorumEpochRequest(context.currentEpoch(), Utils.mkSet(remoteId1, remoteId2)); + + int partialDelay = context.beginQuorumEpochTimeoutMs / 2; + context.time.sleep(partialDelay); + context.client.poll(); + context.assertSentBeginQuorumEpochRequest(context.currentEpoch(), Utils.mkSet()); + + context.time.sleep(context.beginQuorumEpochTimeoutMs - partialDelay); + context.client.poll(); + context.assertSentBeginQuorumEpochRequest(context.currentEpoch(), Utils.mkSet(remoteId1, remoteId2)); + } + @ParameterizedTest @ValueSource(booleans = { true, false }) public void testLeaderShouldResignLeadershipIfNotGetFetchRequestFromMajorityVoters(boolean withKip853Rpc) throws Exception { @@ -774,7 +801,7 @@ public class KafkaRaftClientTest { // Send BeginQuorumEpoch to voters context.client.poll(); - context.assertSentBeginQuorumEpochRequest(1, 1); + context.assertSentBeginQuorumEpochRequest(1, Utils.mkSet(otherNodeId)); Records records = context.log.read(0, Isolation.UNCOMMITTED).records; RecordBatch batch = records.batches().iterator().next(); @@ -818,7 +845,7 @@ public class KafkaRaftClientTest { // Send BeginQuorumEpoch to voters context.client.poll(); - context.assertSentBeginQuorumEpochRequest(1, 2); + context.assertSentBeginQuorumEpochRequest(1, Utils.mkSet(firstNodeId, secondNodeId)); Records records = context.log.read(0, Isolation.UNCOMMITTED).records; RecordBatch batch = records.batches().iterator().next(); @@ -2918,7 +2945,7 @@ public class KafkaRaftClientTest { context.pollUntilRequest(); // We send BeginEpoch, but it gets lost and the destination finds the leader through the Fetch API - context.assertSentBeginQuorumEpochRequest(epoch, 1); + context.assertSentBeginQuorumEpochRequest(epoch, Utils.mkSet(otherNodeKey.id())); context.deliverRequest(context.fetchRequest(epoch, otherNodeKey, 0L, 0, 500)); @@ -3132,7 +3159,10 @@ public class KafkaRaftClientTest { context.expectAndGrantVotes(epoch); context.pollUntilRequest(); - RaftRequest.Outbound request = context.assertSentBeginQuorumEpochRequest(epoch, 1); + List requests = context.collectBeginEpochRequests(epoch); + assertEquals(1, requests.size()); + RaftRequest.Outbound request = requests.get(0); + assertEquals(otherNodeId, request.destination().id()); BeginQuorumEpochResponseData response = new BeginQuorumEpochResponseData() .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code()); diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index 866d6573177..133f1356326 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -57,6 +57,7 @@ public class LeaderStateTest { private final MockTime time = new MockTime(); private final int fetchTimeoutMs = 2000; private final int checkQuorumTimeoutMs = (int) (fetchTimeoutMs * CHECK_QUORUM_TIMEOUT_FACTOR); + private final int beginQuorumEpochTimeoutMs = fetchTimeoutMs / 2; private LeaderState newLeaderState( VoterSet voters, @@ -1098,6 +1099,30 @@ public class LeaderStateTest { ); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testBeginQuorumEpochTimer(boolean withDirectoryId) { + int follower1 = 1; + long epochStartOffset = 10L; + + VoterSet voters = localWithRemoteVoterSet(IntStream.of(follower1), withDirectoryId); + LeaderState state = newLeaderState( + voters, + epochStartOffset + ); + assertEquals(0, state.timeUntilBeginQuorumEpochTimerExpires(time.milliseconds())); + + time.sleep(5); + state.resetBeginQuorumEpochTimer(time.milliseconds()); + assertEquals(beginQuorumEpochTimeoutMs, state.timeUntilBeginQuorumEpochTimerExpires(time.milliseconds())); + + time.sleep(5); + assertEquals(beginQuorumEpochTimeoutMs - 5, state.timeUntilBeginQuorumEpochTimerExpires(time.milliseconds())); + + time.sleep(beginQuorumEpochTimeoutMs); + assertEquals(0, state.timeUntilBeginQuorumEpochTimerExpires(time.milliseconds())); + } + private static class MockOffsetMetadata implements OffsetMetadata { private final String value; diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 50d8afebb58..2f0d35ab5f2 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -97,6 +97,7 @@ public final class RaftClientTestContext { final int fetchMaxWaitMs = Builder.FETCH_MAX_WAIT_MS; final int fetchTimeoutMs = Builder.FETCH_TIMEOUT_MS; final int checkQuorumTimeoutMs = (int) (fetchTimeoutMs * CHECK_QUORUM_TIMEOUT_FACTOR); + final int beginQuorumEpochTimeoutMs = fetchTimeoutMs / 2; final int retryBackoffMs = Builder.RETRY_BACKOFF_MS; private int electionTimeoutMs; @@ -671,10 +672,10 @@ public final class RaftClientTestContext { channel.mockReceive(new RaftResponse.Inbound(correlationId, response, source)); } - RaftRequest.Outbound assertSentBeginQuorumEpochRequest(int epoch, int numBeginEpochRequests) { + void assertSentBeginQuorumEpochRequest(int epoch, Set destinationIds) { List requests = collectBeginEpochRequests(epoch); - assertEquals(numBeginEpochRequests, requests.size()); - return requests.get(0); + assertEquals(destinationIds.size(), requests.size()); + assertEquals(destinationIds, requests.stream().map(r -> r.destination().id()).collect(Collectors.toSet())); } private List drainSentResponses( @@ -1001,10 +1002,11 @@ public final class RaftClientTestContext { assertElectedLeader(epoch, leaderId); } - private List collectBeginEpochRequests(int epoch) { + List collectBeginEpochRequests(int epoch) { List requests = new ArrayList<>(); for (RaftRequest.Outbound raftRequest : channel.drainSentRequests(Optional.of(ApiKeys.BEGIN_QUORUM_EPOCH))) { assertInstanceOf(BeginQuorumEpochRequestData.class, raftRequest.data()); + assertNotEquals(localIdOrThrow(), raftRequest.destination().id()); BeginQuorumEpochRequestData request = (BeginQuorumEpochRequestData) raftRequest.data(); BeginQuorumEpochRequestData.PartitionData partitionRequest =