diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 14e327208f7..cfa50d402a2 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -101,6 +101,10 @@ public class LeaderState implements EpochState { * @return the remainingMs before the checkQuorumTimer expired */ public long timeUntilCheckQuorumExpires(long currentTimeMs) { + // if there's only 1 voter, it should never get expired. + if (voterStates.size() == 1) { + return Long.MAX_VALUE; + } checkQuorumTimer.update(currentTimeMs); long remainingMs = checkQuorumTimer.remainingMs(); if (remainingMs == 0) { diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 6d714daa11b..51a5938220f 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -539,6 +539,22 @@ public class KafkaRaftClientTest { context.assertResignedLeader(epoch, localId); } + @Test + public void testLeaderShouldNotResignLeadershipIfOnlyOneVoters() throws Exception { + int localId = 0; + Set voters = Utils.mkSet(localId); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters).build(); + assertEquals(OptionalInt.of(localId), context.currentLeader()); + + // checkQuorum timeout is expired without receiving fetch request from other voters, but since there is only 1 voter, + // the leader should not get resigned + context.time.sleep(context.checkQuorumTimeoutMs); + context.client.poll(); + + assertFalse(context.client.quorum().isResigned()); + } + @Test public void testElectionTimeoutAfterUserInitiatedResign() throws Exception { int localId = 0; diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index 97d290fa347..12cb2ffef20 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -489,6 +489,22 @@ public class LeaderStateTest { assertEquals(0, state.timeUntilCheckQuorumExpires(time.milliseconds())); } + @Test + public void testCheckQuorumWithOneVoter() { + int observer = 1; + // Only 1 voter quorum + LeaderState state = newLeaderState(mkSet(localId), 0L); + assertEquals(Long.MAX_VALUE, state.timeUntilCheckQuorumExpires(time.milliseconds())); + + // When checkQuorum timeout not exceeded and got no fetch request from voter, it should not expire the timer + time.sleep(checkQuorumTimeoutMs); + assertEquals(Long.MAX_VALUE, state.timeUntilCheckQuorumExpires(time.milliseconds())); + + // received fetch requests from 1 observer node, the timer still return Long.MAX_VALUE. + state.updateCheckQuorumForFollowingVoter(observer, time.milliseconds()); + assertEquals(Long.MAX_VALUE, state.timeUntilCheckQuorumExpires(time.milliseconds())); + } + @Test public void testNoOpForNegativeRemoteNodeId() { MockTime time = new MockTime();