KAFKA-19605; Fix the busy loop occurring in kraft client observers (#20354)

The broker observer should not read update voter set timer value when
polling to determine its backoff, since brokers cannot auto-join the
KRaft voter set. If auto-join or kraft.version=1 is not supported,
controller observers should not read this timer either when polling.

The updateVoterSetPeriodMs timer is not something that should be
considered when calculating the backoff returned by polling, since this
timer does not represent the same thing as the fetchTimeMs timer.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, José Armando García
 Sancio <jsancio@apache.org>, Alyssa Huang <ahuang@confluent.io>,
 Kuan-Po Tseng <brandboat@gmail.com>
This commit is contained in:
Kevin Wu 2025-08-15 09:43:46 -05:00 committed by GitHub
parent 55260e9835
commit 833e25f015
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 4 additions and 17 deletions

View File

@ -159,11 +159,6 @@ public class FollowerState implements EpochState {
return updateVoterSetPeriodTimer.isExpired();
}
public long remainingUpdateVoterSetPeriodMs(long currentTimeMs) {
updateVoterSetPeriodTimer.update(currentTimeMs);
return updateVoterSetPeriodTimer.remainingMs();
}
public void resetUpdateVoterSetPeriod(long currentTimeMs) {
updateVoterSetPeriodTimer.update(currentTimeMs);
updateVoterSetPeriodTimer.reset(updateVoterPeriodMs());

View File

@ -3315,10 +3315,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
return Math.min(
backoffMs,
Math.min(
state.remainingFetchTimeMs(currentTimeMs),
state.remainingUpdateVoterSetPeriodMs(currentTimeMs)
)
state.remainingFetchTimeMs(currentTimeMs)
);
}
@ -3333,11 +3330,10 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
private long pollFollowerAsObserver(FollowerState state, long currentTimeMs) {
GracefulShutdown shutdown = this.shutdown.get();
final long backoffMs;
if (shutdown != null) {
// If we are an observer, then we can shutdown immediately. We want to
// skip potentially sending any add or remove voter RPCs.
backoffMs = 0;
return 0;
} else if (shouldSendAddOrRemoveVoterRequest(state, currentTimeMs)) {
final var localReplicaKey = quorum.localReplicaKeyOrThrow();
final var voters = partitionState.lastVoterSet();
@ -3356,17 +3352,13 @@ public final class KafkaRaftClient<T> implements RaftClient<T> {
} else {
sendResult = maybeSendAddVoterRequest(state, currentTimeMs);
}
backoffMs = sendResult.timeToWaitMs();
if (sendResult.requestSent()) {
state.resetUpdateVoterSetPeriod(currentTimeMs);
}
return sendResult.timeToWaitMs();
} else {
backoffMs = maybeSendFetchToBestNode(state, currentTimeMs);
return maybeSendFetchToBestNode(state, currentTimeMs);
}
return Math.min(
backoffMs,
state.remainingUpdateVoterSetPeriodMs(currentTimeMs)
);
}
private long maybeSendFetchToBestNode(FollowerState state, long currentTimeMs) {