KAFKA-16144: skip checkQuorum for only 1 voter case (#15235)

When there's only 1 voter, there will be no fetch request from other voters. In this case, we should still not expire the checkQuorum timer because there's just 1 voter.

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Federico Valeri <fedevaleri@gmail.com>, José Armando García Sancio <jsancio@apache.org>
This commit is contained in:
Luke Chen 2024-01-23 11:17:53 +09:00
parent 6535fe7f04
commit aaf3a2f72f
3 changed files with 36 additions and 0 deletions

View File

@ -101,6 +101,10 @@ public class LeaderState<T> implements EpochState {
* @return the remainingMs before the checkQuorumTimer expired
*/
public long timeUntilCheckQuorumExpires(long currentTimeMs) {
// if there's only 1 voter, it should never get expired.
if (voterStates.size() == 1) {
return Long.MAX_VALUE;
}
checkQuorumTimer.update(currentTimeMs);
long remainingMs = checkQuorumTimer.remainingMs();
if (remainingMs == 0) {

View File

@ -539,6 +539,22 @@ public class KafkaRaftClientTest {
context.assertResignedLeader(epoch, localId);
}
@Test
public void testLeaderShouldNotResignLeadershipIfOnlyOneVoters() throws Exception {
int localId = 0;
Set<Integer> voters = Utils.mkSet(localId);
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build();
assertEquals(OptionalInt.of(localId), context.currentLeader());
// checkQuorum timeout is expired without receiving fetch request from other voters, but since there is only 1 voter,
// the leader should not get resigned
context.time.sleep(context.checkQuorumTimeoutMs);
context.client.poll();
assertFalse(context.client.quorum().isResigned());
}
@Test
public void testElectionTimeoutAfterUserInitiatedResign() throws Exception {
int localId = 0;

View File

@ -489,6 +489,22 @@ public class LeaderStateTest {
assertEquals(0, state.timeUntilCheckQuorumExpires(time.milliseconds()));
}
@Test
public void testCheckQuorumWithOneVoter() {
int observer = 1;
// Only 1 voter quorum
LeaderState<?> state = newLeaderState(mkSet(localId), 0L);
assertEquals(Long.MAX_VALUE, state.timeUntilCheckQuorumExpires(time.milliseconds()));
// When checkQuorum timeout not exceeded and got no fetch request from voter, it should not expire the timer
time.sleep(checkQuorumTimeoutMs);
assertEquals(Long.MAX_VALUE, state.timeUntilCheckQuorumExpires(time.milliseconds()));
// received fetch requests from 1 observer node, the timer still return Long.MAX_VALUE.
state.updateCheckQuorumForFollowingVoter(observer, time.milliseconds());
assertEquals(Long.MAX_VALUE, state.timeUntilCheckQuorumExpires(time.milliseconds()));
}
@Test
public void testNoOpForNegativeRemoteNodeId() {
MockTime time = new MockTime();