KAFKA-19452: Fix flaky test LogRecoveryTest.testHWCheckpointWithFailuresMultipleLogSegments (#20121)
CI / build (push) Waiting to run Details

The `testHWCheckpointWithFailuresMultipleLogSegments` test in
`LogRecoveryTest` was failing intermittently due to a race condition
during its failure simulation.

In successful runs, the follower broker would restart and rejoin the
In-Sync Replica (ISR) set before the old leader's failure was fully
processed. This allowed for a clean and timely leader election to the
now in-sync follower.

However, in the failing runs, the follower did not rejoin the ISR before
the leader election was triggered. With no replicas in the ISR and
unclean leader election disabled by default for the test, the controller
correctly refused to elect a new leader, causing the test to time out.

This commit fixes the flakiness by overriding the controller
configuration for this test to explicitly enable unclean leader
election. This allows the out-of-sync replica to be promoted to leader,
making the test deterministic and stable.

Reviewers: Jun Rao <junrao@gmail.com>
This commit is contained in:
Rajani K 2025-07-14 22:12:00 +05:30 committed by GitHub
parent 873379873e
commit a61a37f7dd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 10 additions and 0 deletions

View File

@ -64,6 +64,16 @@ class LogRecoveryTest extends QuorumTestHarness {
def hwFile2 = new OffsetCheckpointFile(new File(configProps2.logDirs.get(0), ReplicaManager.HighWatermarkFilename), null)
var servers = Seq.empty[KafkaBroker]
// testHWCheckpointWithFailuresMultipleLogSegments simulates broker failures that can leave the only available replica out of the
// ISR. By enabling unclean leader election, we ensure that the test can proceed and elect
// the out-of-sync replica as the new leader, which is necessary to validate the log
// recovery and high-watermark checkpointing logic under these specific failure conditions.
override def kraftControllerConfigs(testInfo: TestInfo): Seq[Properties] = {
val properties = new Properties()
properties.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true")
Seq(properties)
}
// Some tests restart the brokers then produce more data. But since test brokers use random ports, we need
// to use a new producer that knows the new ports
def updateProducer(): Unit = {