From d6861f3f1535cb29f90ad5671d48e6569b70249d Mon Sep 17 00:00:00 2001 From: Ken Huang Date: Mon, 9 Jun 2025 15:33:00 +0800 Subject: [PATCH] MINOR: Use `pollUntilTrue` instead of `waitForCondition` (#19911) We can use `pollUntilTrue` instead of `waitForCondition`, thus do a little refactor to reduce the duplicate code Reviewers: TengYao Chi , Lan Ding , TaiJuWu --- .../kafka/clients/ClientsTestUtils.java | 27 ++++++++++++++++--- .../PlaintextConsumerCallbackTest.java | 19 +++++-------- .../consumer/PlaintextConsumerCommitTest.java | 19 ++++++------- .../consumer/PlaintextConsumerPollTest.java | 10 ++++--- 4 files changed, 46 insertions(+), 29 deletions(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java index 0cf81f9538a..efbfc884e72 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java @@ -109,12 +109,31 @@ public class ClientsTestUtils { } public static void pollUntilTrue( - Consumer consumer, - Supplier testCondition, - long waitTimeMs, String msg + Consumer consumer, + Supplier testCondition, + String msg + ) throws InterruptedException { + pollUntilTrue(consumer, Duration.ofMillis(100), testCondition, 15_000L, msg); + } + + public static void pollUntilTrue( + Consumer consumer, + Supplier testCondition, + long waitTimeMs, + String msg + ) throws InterruptedException { + pollUntilTrue(consumer, Duration.ofMillis(100), testCondition, waitTimeMs, msg); + } + + public static void pollUntilTrue( + Consumer consumer, + Duration timeout, + Supplier testCondition, + long waitTimeMs, + String msg ) throws InterruptedException { TestUtils.waitForCondition(() -> { - consumer.poll(Duration.ofMillis(100)); + consumer.poll(timeout); return testCondition.get(); }, waitTimeMs, msg); } diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCallbackTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCallbackTest.java index 05882efdde9..c81a3cd1667 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCallbackTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCallbackTest.java @@ -16,14 +16,13 @@ */ package org.apache.kafka.clients.consumer; +import org.apache.kafka.clients.ClientsTestUtils; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.test.ClusterInstance; -import org.apache.kafka.common.test.TestUtils; import org.apache.kafka.common.test.api.ClusterTest; import org.apache.kafka.common.test.api.ClusterTestDefaults; import org.apache.kafka.common.test.api.Type; -import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.Locale; @@ -237,11 +236,9 @@ public class PlaintextConsumerCallbackTest { // noop } }); - TestUtils.waitForCondition( - () -> { - consumer.poll(Duration.ofMillis(100)); - return partitionsAssigned.get(); - }, + ClientsTestUtils.pollUntilTrue( + consumer, + partitionsAssigned::get, "Timed out before expected rebalance completed" ); } @@ -272,11 +269,9 @@ public class PlaintextConsumerCallbackTest { } } }); - TestUtils.waitForCondition( - () -> { - consumer.poll(Duration.ofMillis(100)); - return partitionsAssigned.get(); - }, + ClientsTestUtils.pollUntilTrue( + consumer, + partitionsAssigned::get, "Timed out before expected rebalance completed" ); } diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java index 162b7baa371..b5bd27cf41b 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java @@ -16,13 +16,13 @@ */ package org.apache.kafka.clients.consumer; +import org.apache.kafka.clients.ClientsTestUtils; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.test.ClusterInstance; -import org.apache.kafka.common.test.TestUtils; import org.apache.kafka.common.test.api.ClusterConfigProperty; import org.apache.kafka.common.test.api.ClusterTest; import org.apache.kafka.common.test.api.ClusterTestDefaults; @@ -203,10 +203,11 @@ public class PlaintextConsumerCommitTest { for (var i = 1; i <= count; i++) consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(i)), callback); - TestUtils.waitForCondition(() -> { - consumer.poll(Duration.ofMillis(100)); - return callback.successCount >= count || callback.lastError.isPresent(); - }, "Failed to observe commit callback before timeout"); + ClientsTestUtils.pollUntilTrue( + consumer, + () -> callback.successCount >= count || callback.lastError.isPresent(), + "Failed to observe commit callback before timeout" + ); assertEquals(Optional.empty(), callback.lastError); assertEquals(count, callback.successCount); @@ -533,10 +534,10 @@ public class PlaintextConsumerCommitTest { var commitCallback = new RetryCommitCallback(consumer, offsetsOpt); commitCallback.sendAsyncCommit(); - TestUtils.waitForCondition(() -> { - consumer.poll(Duration.ofMillis(100)); - return commitCallback.isComplete; - }, "Failed to observe commit callback before timeout" + ClientsTestUtils.pollUntilTrue( + consumer, + () -> commitCallback.isComplete, + "Failed to observe commit callback before timeout" ); assertEquals(Optional.empty(), commitCallback.error); diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerPollTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerPollTest.java index bd90e54db45..20994134c5b 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerPollTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerPollTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.consumer; +import org.apache.kafka.clients.ClientsTestUtils; import org.apache.kafka.clients.ClientsTestUtils.TestConsumerReassignmentListener; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.MetricName; @@ -559,10 +560,11 @@ public class PlaintextConsumerPollTest { // Subscribe to different topic. This will trigger the delayed revocation exceeding rebalance timeout and get fenced consumer.subscribe(List.of(otherTopic), listener); - TestUtils.waitForCondition(() -> { - consumer.poll(Duration.ofMillis(100)); - return rebalanceTimeoutExceeded.get(); - }, "Timeout waiting for delayed callback to complete"); + ClientsTestUtils.pollUntilTrue( + consumer, + rebalanceTimeoutExceeded::get, + "Timeout waiting for delayed callback to complete" + ); // Verify consumer recovers after being fenced, being able to continue consuming. // (The member should automatically rejoin on the next poll, with the new topic as subscription)