KAFKA-19354: KRaft observer should send fetch to best node (#19854)

Observers may get stuck fetching from bootstrap servers even on
discovery of a leader from a fetch response.

Observers currently fetch from bootstrap servers once their fetch
timeout expires, and even on rediscovery of the leader (e.g. observer
receives fetch response indicating location of leader), observers will
not  attempt to resume fetching from the leader.

This change simplifies observer polling logic to just rely on
maybeSendFetchToBestNode which takes care of the logic of selecting
either the leader or bootstrap servers  as the destination based on
timeouts and backoffs.

Reviewers: José Armando García Sancio <jsancio@apache.org>
This commit is contained in:
Alyssa Huang 2025-07-28 10:03:14 -07:00 committed by GitHub
parent dfced692d2
commit 7339d65b27
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 87 additions and 13 deletions

View File

@ -131,12 +131,18 @@ public class FollowerState implements EpochState {
return fetchTimer.isExpired(); return fetchTimer.isExpired();
} }
/**
* Reset the fetch timeout after successful fetch from leader.
*/
public void resetFetchTimeoutForSuccessfulFetch(long currentTimeMs) { public void resetFetchTimeoutForSuccessfulFetch(long currentTimeMs) {
fetchTimer.update(currentTimeMs); overrideFetchTimeout(currentTimeMs, fetchTimeoutMs);
fetchTimer.reset(fetchTimeoutMs);
hasFetchedFromLeader = true; hasFetchedFromLeader = true;
} }
/**
* Override the fetch timeout to a specific value. This is useful for short-circuiting followers' timeouts after
* they receive end quorum requests
*/
public void overrideFetchTimeout(long currentTimeMs, long timeoutMs) { public void overrideFetchTimeout(long currentTimeMs, long timeoutMs) {
fetchTimer.update(currentTimeMs); fetchTimer.update(currentTimeMs);
fetchTimer.reset(timeoutMs); fetchTimer.reset(timeoutMs);

View File

@ -2570,7 +2570,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
transitionToUnattached(epoch, OptionalInt.empty()); transitionToUnattached(epoch, OptionalInt.empty());
} }
} else if ( } else if (
leaderId.isPresent() && leaderId.isPresent() &&
(!quorum.hasLeader() || leaderEndpoints.size() > quorum.leaderEndpoints().size()) (!quorum.hasLeader() || leaderEndpoints.size() > quorum.leaderEndpoints().size())
) { ) {
// The request or response indicates the leader of the current epoch // The request or response indicates the leader of the current epoch
@ -3277,11 +3277,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
} }
private long pollFollowerAsObserver(FollowerState state, long currentTimeMs) { private long pollFollowerAsObserver(FollowerState state, long currentTimeMs) {
if (state.hasFetchTimeoutExpired(currentTimeMs)) { return maybeSendFetchToBestNode(state, currentTimeMs);
return maybeSendFetchToAnyBootstrap(currentTimeMs);
} else {
return maybeSendFetchToBestNode(state, currentTimeMs);
}
} }
private long maybeSendFetchToBestNode(FollowerState state, long currentTimeMs) { private long maybeSendFetchToBestNode(FollowerState state, long currentTimeMs) {

View File

@ -2109,11 +2109,11 @@ class KafkaRaftClientTest {
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = { true, false }) @ValueSource(booleans = { true, false })
public void testObserverSendDiscoveryFetchAfterFetchTimeout(boolean withKip853Rpc) throws Exception { public void testObserverUnattachedSendFetchToBootstrapServers(boolean withKip853Rpc) throws Exception {
int localId = randomReplicaId(); int localId = randomReplicaId();
int leaderId = localId + 1; int leaderId = localId + 1;
int otherNodeId = localId + 2; int otherNodeId = localId + 2;
int epoch = 5; int epoch = 0;
Set<Integer> voters = Set.of(leaderId, otherNodeId); Set<Integer> voters = Set.of(leaderId, otherNodeId);
List<InetSocketAddress> bootstrapServers = voters List<InetSocketAddress> bootstrapServers = voters
.stream() .stream()
@ -2125,10 +2125,12 @@ class KafkaRaftClientTest {
.withKip853Rpc(withKip853Rpc) .withKip853Rpc(withKip853Rpc)
.build(); .build();
// unattached observer will send fetches to bootstrap servers to discover leader
context.pollUntilRequest(); context.pollUntilRequest();
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest(); RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id())); assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id()));
context.assertFetchRequestData(fetchRequest, 0, 0L, 0, context.client.highWatermark()); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
context.assertUnknownLeaderAndNoVotedCandidate(epoch);
context.deliverResponse( context.deliverResponse(
fetchRequest.correlationId(), fetchRequest.correlationId(),
@ -2136,16 +2138,51 @@ class KafkaRaftClientTest {
context.fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH) context.fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L, Errors.FENCED_LEADER_EPOCH)
); );
// unattached observer becomes a follower after discovering leader
context.client.poll(); context.client.poll();
context.assertElectedLeader(epoch, leaderId); context.assertElectedLeader(epoch, leaderId);
}
context.time.sleep(context.fetchTimeoutMs); @ParameterizedTest
@ValueSource(booleans = { true, false })
public void testObserverFollowerSendFetchToBestNode(boolean withKip853Rpc) throws Exception {
int localId = randomReplicaId();
int leaderId = localId + 1;
int otherNodeId = localId + 2;
int epoch = 0;
Set<Integer> voters = Set.of(leaderId, otherNodeId);
List<InetSocketAddress> bootstrapServers = voters
.stream()
.map(RaftClientTestContext::mockAddress)
.toList();
context.pollUntilRequest(); RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
.withBootstrapServers(Optional.of(bootstrapServers))
.withElectedLeader(epoch, leaderId)
.withKip853Rpc(withKip853Rpc)
.build();
context.client.poll();
// observer will send fetch to the leader
RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
assertEquals(leaderId, fetchRequest.destination().id());
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
// if request times out, observer will retry to a bootstrap server
context.time.sleep(context.requestTimeoutMs());
context.client.poll();
fetchRequest = context.assertSentFetchRequest(); fetchRequest = context.assertSentFetchRequest();
assertNotEquals(leaderId, fetchRequest.destination().id()); assertNotEquals(leaderId, fetchRequest.destination().id());
assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id())); assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id()));
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark()); context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
// observer will retry to the leader
context.time.sleep(context.requestTimeoutMs());
context.client.poll();
fetchRequest = context.assertSentFetchRequest();
assertEquals(leaderId, fetchRequest.destination().id());
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, context.client.highWatermark());
} }
@ParameterizedTest @ParameterizedTest

View File

@ -245,6 +245,41 @@ public class RequestManagerTest {
assertFalse(cache.isResponseExpected(otherNode, 1)); assertFalse(cache.isResponseExpected(otherNode, 1));
} }
@Test
public void testHasRequestTimedOut() {
List<Node> bootstrapList = makeBootstrapList(2);
RequestManager cache = new RequestManager(
bootstrapList,
retryBackoffMs,
requestTimeoutMs,
random
);
// Find a ready node with the starting state
Node bootstrapNode1 = cache.findReadyBootstrapServer(time.milliseconds()).get();
assertTrue(
bootstrapList.contains(bootstrapNode1),
String.format("%s is not in %s", bootstrapNode1, bootstrapList)
);
// Before sending a request, no request should have timed out
assertFalse(cache.hasRequestTimedOut(bootstrapNode1, time.milliseconds()));
// Send a request
cache.onRequestSent(bootstrapNode1, 1, time.milliseconds());
assertEquals(
Optional.empty(),
cache.findReadyBootstrapServer(time.milliseconds())
);
assertFalse(cache.hasRequestTimedOut(bootstrapNode1, time.milliseconds()));
time.sleep(requestTimeoutMs - 1);
assertFalse(cache.hasRequestTimedOut(bootstrapNode1, time.milliseconds()));
// Timeout the request
time.sleep(1);
assertTrue(cache.hasRequestTimedOut(bootstrapNode1, time.milliseconds()));
}
@Test @Test
public void testAnyInflightRequestWithAnyRequest() { public void testAnyInflightRequestWithAnyRequest() {
Node otherNode = new Node(1, "other-node", 1234); Node otherNode = new Node(1, "other-node", 1234);