KAFKA-17030; Unattached voters will fetch from bootstrap servers (#17352)

Because the set of voters are dynamic (KIP-953), it is possible for a replica to believe they are a voter while the current leader doesn't have that replica in the voter set. In this replicated state, the leader will not sent BeginQuorumEpoch requests to such a replica. This means that such replicas will not be able to discover the leader.

This change will help Unattached rediscover the leader by sending Fetch requests to the the bootstrap servers.
Followers have a similar issue - if they are unable to communicate with the leader they should try contacting the bootstrap servers.

Reviewers: José Armando García Sancio <jsancio@apache.org>
This commit is contained in:
Alyssa Huang 2024-12-11 08:38:14 -08:00 committed by GitHub
parent d09e222846
commit e979fce94e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 157 additions and 31 deletions

View File

@ -2755,7 +2755,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
.setReplicaState(new FetchRequestData.ReplicaState().setReplicaId(quorum.localIdOrSentinel())); .setReplicaState(new FetchRequestData.ReplicaState().setReplicaId(quorum.localIdOrSentinel()));
} }
private long maybeSendAnyVoterFetch(long currentTimeMs) { private long maybeSendFetchToAnyBootstrap(long currentTimeMs) {
Optional<Node> readyNode = requestManager.findReadyBootstrapServer(currentTimeMs); Optional<Node> readyNode = requestManager.findReadyBootstrapServer(currentTimeMs);
if (readyNode.isPresent()) { if (readyNode.isPresent()) {
return maybeSendRequest( return maybeSendRequest(
@ -3045,7 +3045,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
} }
state.resetUpdateVoterPeriod(currentTimeMs); state.resetUpdateVoterPeriod(currentTimeMs);
} else { } else {
backoffMs = maybeSendFetchOrFetchSnapshot(state, currentTimeMs); backoffMs = maybeSendFetchToBestNode(state, currentTimeMs);
} }
return Math.min( return Math.min(
@ -3059,30 +3059,32 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
private long pollFollowerAsObserver(FollowerState state, long currentTimeMs) { private long pollFollowerAsObserver(FollowerState state, long currentTimeMs) {
if (state.hasFetchTimeoutExpired(currentTimeMs)) { if (state.hasFetchTimeoutExpired(currentTimeMs)) {
return maybeSendAnyVoterFetch(currentTimeMs); return maybeSendFetchToAnyBootstrap(currentTimeMs);
} else { } else {
final long backoffMs; return maybeSendFetchToBestNode(state, currentTimeMs);
// If the current leader is backing off due to some failure or if the
// request has timed out, then we attempt to send the Fetch to another
// voter in order to discover if there has been a leader change.
Node leaderNode = state.leaderNode(channel.listenerName());
if (requestManager.hasRequestTimedOut(leaderNode, currentTimeMs)) {
// Once the request has timed out backoff the connection
requestManager.reset(leaderNode);
backoffMs = maybeSendAnyVoterFetch(currentTimeMs);
} else if (requestManager.isBackingOff(leaderNode, currentTimeMs)) {
backoffMs = maybeSendAnyVoterFetch(currentTimeMs);
} else if (!requestManager.hasAnyInflightRequest(currentTimeMs)) {
backoffMs = maybeSendFetchOrFetchSnapshot(state, currentTimeMs);
} else {
backoffMs = requestManager.backoffBeforeAvailableBootstrapServer(currentTimeMs);
}
return Math.min(backoffMs, state.remainingFetchTimeMs(currentTimeMs));
} }
} }
private long maybeSendFetchToBestNode(FollowerState state, long currentTimeMs) {
// If the current leader is backing off due to some failure or if the
// request has timed out, then we attempt to send the Fetch to another
// voter in order to discover if there has been a leader change.
final long backoffMs;
Node leaderNode = state.leaderNode(channel.listenerName());
if (requestManager.hasRequestTimedOut(leaderNode, currentTimeMs)) {
// Once the request has timed out backoff the connection
requestManager.reset(leaderNode);
backoffMs = maybeSendFetchToAnyBootstrap(currentTimeMs);
} else if (requestManager.isBackingOff(leaderNode, currentTimeMs)) {
backoffMs = maybeSendFetchToAnyBootstrap(currentTimeMs);
} else if (!requestManager.hasAnyInflightRequest(currentTimeMs)) {
backoffMs = maybeSendFetchOrFetchSnapshot(state, currentTimeMs);
} else {
backoffMs = requestManager.backoffBeforeAvailableBootstrapServer(currentTimeMs);
}
return Math.min(backoffMs, state.remainingFetchTimeMs(currentTimeMs));
}
private long maybeSendFetchOrFetchSnapshot(FollowerState state, long currentTimeMs) { private long maybeSendFetchOrFetchSnapshot(FollowerState state, long currentTimeMs) {
final Supplier<ApiMessage> requestSupplier; final Supplier<ApiMessage> requestSupplier;
@ -3125,7 +3127,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
if (quorum.isVoter()) { if (quorum.isVoter()) {
return pollUnattachedAsVoter(state, currentTimeMs); return pollUnattachedAsVoter(state, currentTimeMs);
} else { } else {
return pollUnattachedAsObserver(state, currentTimeMs); return pollUnattachedCommon(state, currentTimeMs);
} }
} }
@ -3139,12 +3141,12 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
transitionToCandidate(currentTimeMs); transitionToCandidate(currentTimeMs);
return 0L; return 0L;
} else { } else {
return state.remainingElectionTimeMs(currentTimeMs); return pollUnattachedCommon(state, currentTimeMs);
} }
} }
private long pollUnattachedAsObserver(UnattachedState state, long currentTimeMs) { private long pollUnattachedCommon(UnattachedState state, long currentTimeMs) {
long fetchBackoffMs = maybeSendAnyVoterFetch(currentTimeMs); long fetchBackoffMs = maybeSendFetchToAnyBootstrap(currentTimeMs);
return Math.min(fetchBackoffMs, state.remainingElectionTimeMs(currentTimeMs)); return Math.min(fetchBackoffMs, state.remainingElectionTimeMs(currentTimeMs));
} }

View File

@ -848,12 +848,33 @@ public class KafkaRaftClientTest {
.build(); .build();
context.assertUnknownLeader(0); context.assertUnknownLeader(0);
context.time.sleep(2L * context.electionTimeoutMs()); context.pollUntilRequest();
RaftRequest.Outbound request = context.assertSentFetchRequest(0, 0L, 0);
assertTrue(context.client.quorum().isUnattached());
assertTrue(context.client.quorum().isVoter());
// receives a fetch response which does not specify who the leader is
context.time.sleep(context.electionTimeoutMs() / 2);
context.deliverResponse(
request.correlationId(),
request.destination(),
context.fetchResponse(0, -1, MemoryRecords.EMPTY, -1, Errors.NOT_LEADER_OR_FOLLOWER)
);
// should remain unattached voter
context.client.poll();
assertTrue(context.client.quorum().isUnattached());
assertTrue(context.client.quorum().isVoter());
// after election timeout should become candidate
context.time.sleep(context.electionTimeoutMs() * 2L);
context.pollUntilRequest();
assertTrue(context.client.quorum().isCandidate());
context.pollUntilRequest(); context.pollUntilRequest();
context.assertVotedCandidate(1, localId); context.assertVotedCandidate(1, localId);
RaftRequest.Outbound request = context.assertSentVoteRequest(1, 0, 0L, 1); request = context.assertSentVoteRequest(1, 0, 0L, 1);
context.deliverResponse( context.deliverResponse(
request.correlationId(), request.correlationId(),
request.destination(), request.destination(),
@ -1818,6 +1839,37 @@ public class KafkaRaftClientTest {
assertEquals(0, context.channel.drainSendQueue().size()); assertEquals(0, context.channel.drainSendQueue().size());
} }
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testUnattachedAsVoterCanBecomeFollowerAfterFindingLeader(boolean withKip853Rpc) throws Exception {
int localId = randomReplicaId();
int otherNodeId = localId + 1;
int leaderNodeId = localId + 2;
int epoch = 5;
Set<Integer> voters = Set.of(localId, otherNodeId, leaderNodeId);
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.withUnknownLeader(epoch)
.withKip853Rpc(withKip853Rpc)
.build();
context.pollUntilRequest();
RaftRequest.Outbound request = context.assertSentFetchRequest(epoch, 0L, 0);
assertTrue(context.client.quorum().isUnattached());
assertTrue(context.client.quorum().isVoter());
// receives a fetch response specifying who the leader is
Errors responseError = (request.destination().id() == otherNodeId) ? Errors.NOT_LEADER_OR_FOLLOWER : Errors.NONE;
context.deliverResponse(
request.correlationId(),
request.destination(),
context.fetchResponse(epoch, leaderNodeId, MemoryRecords.EMPTY, 0L, responseError)
);
context.client.poll();
assertTrue(context.client.quorum().isFollower());
}
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = { true, false }) @ValueSource(booleans = { true, false })
public void testInitializeObserverNoPreviousState(boolean withKip853Rpc) throws Exception { public void testInitializeObserverNoPreviousState(boolean withKip853Rpc) throws Exception {
@ -2666,6 +2718,80 @@ public class KafkaRaftClientTest {
context.assertElectedLeader(epoch, voter3); context.assertElectedLeader(epoch, voter3);
} }
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testFollowerLeaderRediscoveryAfterBrokerNotAvailableError(boolean withKip853Rpc) throws Exception {
int localId = randomReplicaId();
int leaderId = localId + 1;
int otherNodeId = localId + 2;
int epoch = 5;
Set<Integer> voters = Set.of(leaderId, localId, otherNodeId);
List<InetSocketAddress> bootstrapServers = voters
.stream()
.map(RaftClientTestContext::mockAddress)
.collect(Collectors.toList());
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.withBootstrapServers(Optional.of(bootstrapServers))
.withKip853Rpc(withKip853Rpc)
.withElectedLeader(epoch, leaderId)
.build();
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
assertEquals(leaderId, fetchRequest1.destination().id());
context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
context.deliverResponse(
fetchRequest1.correlationId(),
fetchRequest1.destination(),
context.fetchResponse(epoch, -1, MemoryRecords.EMPTY, -1, Errors.BROKER_NOT_AVAILABLE)
);
context.pollUntilRequest();
// We should retry the Fetch against the other voter since the original
// voter connection will be backing off.
RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
assertNotEquals(leaderId, fetchRequest2.destination().id());
assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id()));
context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
}
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testFollowerLeaderRediscoveryAfterRequestTimeout(boolean withKip853Rpc) throws Exception {
int localId = randomReplicaId();
int leaderId = localId + 1;
int otherNodeId = localId + 2;
int epoch = 5;
Set<Integer> voters = Set.of(leaderId, localId, otherNodeId);
List<InetSocketAddress> bootstrapServers = voters
.stream()
.map(RaftClientTestContext::mockAddress)
.collect(Collectors.toList());
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.withBootstrapServers(Optional.of(bootstrapServers))
.withKip853Rpc(withKip853Rpc)
.withElectedLeader(epoch, leaderId)
.build();
context.pollUntilRequest();
RaftRequest.Outbound fetchRequest1 = context.assertSentFetchRequest();
assertEquals(leaderId, fetchRequest1.destination().id());
context.assertFetchRequestData(fetchRequest1, epoch, 0L, 0);
context.time.sleep(context.requestTimeoutMs());
context.pollUntilRequest();
// We should retry the Fetch against the other voter since the original
// voter connection will be backing off.
RaftRequest.Outbound fetchRequest2 = context.assertSentFetchRequest();
assertNotEquals(leaderId, fetchRequest2.destination().id());
assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id()));
context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
}
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = { true, false }) @ValueSource(booleans = { true, false })
public void testObserverLeaderRediscoveryAfterBrokerNotAvailableError(boolean withKip853Rpc) throws Exception { public void testObserverLeaderRediscoveryAfterBrokerNotAvailableError(boolean withKip853Rpc) throws Exception {
@ -2705,12 +2831,10 @@ public class KafkaRaftClientTest {
assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id())); assertTrue(context.bootstrapIds.contains(fetchRequest2.destination().id()));
context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0); context.assertFetchRequestData(fetchRequest2, epoch, 0L, 0);
Errors error = fetchRequest2.destination().id() == leaderId ?
Errors.NONE : Errors.NOT_LEADER_OR_FOLLOWER;
context.deliverResponse( context.deliverResponse(
fetchRequest2.correlationId(), fetchRequest2.correlationId(),
fetchRequest2.destination(), fetchRequest2.destination(),
context.fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, error) context.fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, Errors.NOT_LEADER_OR_FOLLOWER)
); );
context.client.poll(); context.client.poll();