KAFKA-18105 Fix flaky PlaintextAdminIntegrationTest#testElectPreferredLeaders (#20068)

## Changes
This PR improves the stability of the
PlaintextAdminIntegrationTest.testElectPreferredLeaders test by
introducing short Thread.sleep( ) delays before invoking:

- changePreferredLeader( )

- waitForBrokersOutOfIsr( )

## Reasons

- Metadata propagation for partition2 :
Kafka requires time to propagate the updated leader metadata across all
brokers. Without waiting,  metadataCache may return outdated leader
information for partition2.

- Eviction of broker1 from the ISR :
To simulate a scenario where broker1 is no longer eligible as leader,
the test relies on broker1 being removed from the ISR (e.g., due to
intentional shutdown). This eviction is not instantaneous and requires a
brief delay before Kafka reflects the change.

Reviewers: PoAn Yang <payang@apache.org>, TengYao Chi
 <kitingiao@gmail.com>, TaiJuWu <tjwu1217@gmail.com>, Ken Huang
 <s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Chang-Chi Hsu 2025-09-12 23:44:17 +02:00 committed by GitHub
parent 54b88f6721
commit af2a8db3c6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 26 additions and 4 deletions

View File

@ -3061,6 +3061,20 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
}
}
/**
* Waits until the metadata for the given partition has fully propagated and become consistent across all brokers.
*
* @param partition The partition whose leader metadata should be verified across all brokers.
*/
def waitForBrokerMetadataPropagation(partition: TopicPartition): Unit = {
while (brokers.exists(_.metadataCache.getPartitionLeaderEndpoint(partition.topic, partition.partition(), listenerName).isEmpty) ||
brokers.map(_.metadataCache.getPartitionLeaderEndpoint(partition.topic, partition.partition(), listenerName))
.filter(_.isPresent)
.map(_.get())
.toSet.size != 1)
TimeUnit.MILLISECONDS.sleep(300)
}
@Test
def testElectPreferredLeaders(): Unit = {
client = createAdminClient
@ -3087,12 +3101,12 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
val prior1 = brokers.head.metadataCache.getPartitionLeaderEndpoint(partition1.topic, partition1.partition(), listenerName).get.id()
val prior2 = brokers.head.metadataCache.getPartitionLeaderEndpoint(partition2.topic, partition2.partition(), listenerName).get.id()
var m = Map.empty[TopicPartition, Optional[NewPartitionReassignment]]
var reassignmentMap = Map.empty[TopicPartition, Optional[NewPartitionReassignment]]
if (prior1 != preferred)
m += partition1 -> Optional.of(new NewPartitionReassignment(newAssignment.map(Int.box).asJava))
reassignmentMap += partition1 -> Optional.of(new NewPartitionReassignment(newAssignment.map(Int.box).asJava))
if (prior2 != preferred)
m += partition2 -> Optional.of(new NewPartitionReassignment(newAssignment.map(Int.box).asJava))
client.alterPartitionReassignments(m.asJava).all().get()
reassignmentMap += partition2 -> Optional.of(new NewPartitionReassignment(newAssignment.map(Int.box).asJava))
client.alterPartitionReassignments(reassignmentMap.asJava).all().get()
TestUtils.waitUntilTrue(
() => preferredLeader(partition1) == preferred && preferredLeader(partition2) == preferred,
@ -3120,6 +3134,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
TestUtils.assertLeader(client, partition2, 0)
// Now change the preferred leader to 1
waitForBrokerMetadataPropagation(partition1)
waitForBrokerMetadataPropagation(partition2)
changePreferredLeader(prefer1)
// meaningful election
@ -3158,6 +3174,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
TestUtils.assertLeader(client, partition2, 1)
// Now change the preferred leader to 2
waitForBrokerMetadataPropagation(partition1)
waitForBrokerMetadataPropagation(partition2)
changePreferredLeader(prefer2)
// mixed results
@ -3174,9 +3192,13 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
TestUtils.assertLeader(client, partition2, 2)
// Now change the preferred leader to 1
waitForBrokerMetadataPropagation(partition1)
waitForBrokerMetadataPropagation(partition2)
changePreferredLeader(prefer1)
// but shut it down...
killBroker(1)
waitForBrokerMetadataPropagation(partition1)
waitForBrokerMetadataPropagation(partition2)
TestUtils.waitForBrokersOutOfIsr(client, Set(partition1, partition2), Set(1))
def assertPreferredLeaderNotAvailable(