diff --git a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java b/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java index 90503e66b1a..a3dafed5e4c 100644 --- a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java +++ b/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java @@ -70,6 +70,7 @@ import scala.collection.JavaConverters; import scala.collection.Seq; import scala.collection.mutable.HashMap; +import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -202,7 +203,7 @@ public class EligibleLeaderReplicasIntegrationTest extends KafkaServerTestHarnes } void waitUntilOneMessageIsConsumed(Consumer consumer) { - kafka.utils.TestUtils.waitUntilTrue( + TestUtils.waitUntilTrue( () -> { try { ConsumerRecords record = consumer.poll(Duration.ofMillis(100L)); @@ -212,7 +213,7 @@ public class EligibleLeaderReplicasIntegrationTest extends KafkaServerTestHarnes } }, () -> "fail to consume messages", - org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L + DEFAULT_MAX_WAIT_MS, 100L ); } @@ -417,30 +418,39 @@ public class EligibleLeaderReplicasIntegrationTest extends KafkaServerTestHarnes waitForIsrAndElr((isrSize, elrSize) -> { return isrSize > 0 && elrSize == 0; }); - topicPartitionInfo = adminClient.describeTopics(Collections.singletonList(testTopicName)) - .allTopicNames().get().get(testTopicName).partitions().get(0); - assertEquals(0, topicPartitionInfo.lastKnownElr().size()); - assertEquals(0, topicPartitionInfo.elr().size()); - assertEquals(lastKnownLeader, topicPartitionInfo.leader().id()); + + TestUtils.waitUntilTrue( + () -> { + try { + TopicPartitionInfo partition = adminClient.describeTopics(Collections.singletonList(testTopicName)) + .allTopicNames().get().get(testTopicName).partitions().get(0); + if (partition.leader() == null) return false; + return partition.lastKnownElr().isEmpty() && partition.elr().isEmpty() && partition.leader().id() == lastKnownLeader; + } catch (Exception e) { + return false; + } + }, + () -> String.format("Partition metadata for %s is not correct", testTopicName), + DEFAULT_MAX_WAIT_MS, 100L + ); } finally { restartDeadBrokers(false); } } void waitForIsrAndElr(BiFunction isIsrAndElrSizeSatisfied) { - kafka.utils.TestUtils.waitUntilTrue( + TestUtils.waitUntilTrue( () -> { try { TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName)) .allTopicNames().get().get(testTopicName); TopicPartitionInfo partition = topicDescription.partitions().get(0); - if (!isIsrAndElrSizeSatisfied.apply(partition.isr().size(), partition.elr().size())) return false; + return isIsrAndElrSizeSatisfied.apply(partition.isr().size(), partition.elr().size()); } catch (Exception e) { return false; } - return true; }, () -> String.format("Partition metadata for %s is not propagated", testTopicName), - org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS, 100L); + DEFAULT_MAX_WAIT_MS, 100L); } }