MINOR: Use `pollUntilTrue` instead of `waitForCondition` (#19911)
CI / build (push) Waiting to run Details

We can use `pollUntilTrue` instead of `waitForCondition`, thus do a
little refactor to reduce the duplicate code

Reviewers: TengYao Chi <frankvicky@apache.org>, Lan Ding
 <isDing_L@163.com>, TaiJuWu <tjwu1217@gmail.com>
This commit is contained in:
Ken Huang 2025-06-09 15:33:00 +08:00 committed by GitHub
parent e2500186cb
commit d6861f3f15
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 46 additions and 29 deletions

View File

@ -109,12 +109,31 @@ public class ClientsTestUtils {
} }
public static void pollUntilTrue( public static void pollUntilTrue(
Consumer<byte[], byte[]> consumer, Consumer<byte[], byte[]> consumer,
Supplier<Boolean> testCondition, Supplier<Boolean> testCondition,
long waitTimeMs, String msg String msg
) throws InterruptedException {
pollUntilTrue(consumer, Duration.ofMillis(100), testCondition, 15_000L, msg);
}
public static void pollUntilTrue(
Consumer<byte[], byte[]> consumer,
Supplier<Boolean> testCondition,
long waitTimeMs,
String msg
) throws InterruptedException {
pollUntilTrue(consumer, Duration.ofMillis(100), testCondition, waitTimeMs, msg);
}
public static void pollUntilTrue(
Consumer<byte[], byte[]> consumer,
Duration timeout,
Supplier<Boolean> testCondition,
long waitTimeMs,
String msg
) throws InterruptedException { ) throws InterruptedException {
TestUtils.waitForCondition(() -> { TestUtils.waitForCondition(() -> {
consumer.poll(Duration.ofMillis(100)); consumer.poll(timeout);
return testCondition.get(); return testCondition.get();
}, waitTimeMs, msg); }, waitTimeMs, msg);
} }

View File

@ -16,14 +16,13 @@
*/ */
package org.apache.kafka.clients.consumer; package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.ClientsTestUtils;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.test.ClusterInstance; import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterTest; import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults; import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type; import org.apache.kafka.common.test.api.Type;
import java.time.Duration;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
@ -237,11 +236,9 @@ public class PlaintextConsumerCallbackTest {
// noop // noop
} }
}); });
TestUtils.waitForCondition( ClientsTestUtils.pollUntilTrue(
() -> { consumer,
consumer.poll(Duration.ofMillis(100)); partitionsAssigned::get,
return partitionsAssigned.get();
},
"Timed out before expected rebalance completed" "Timed out before expected rebalance completed"
); );
} }
@ -272,11 +269,9 @@ public class PlaintextConsumerCallbackTest {
} }
} }
}); });
TestUtils.waitForCondition( ClientsTestUtils.pollUntilTrue(
() -> { consumer,
consumer.poll(Duration.ofMillis(100)); partitionsAssigned::get,
return partitionsAssigned.get();
},
"Timed out before expected rebalance completed" "Timed out before expected rebalance completed"
); );
} }

View File

@ -16,13 +16,13 @@
*/ */
package org.apache.kafka.clients.consumer; package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.ClientsTestUtils;
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.test.ClusterInstance; import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty; import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest; import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults; import org.apache.kafka.common.test.api.ClusterTestDefaults;
@ -203,10 +203,11 @@ public class PlaintextConsumerCommitTest {
for (var i = 1; i <= count; i++) for (var i = 1; i <= count; i++)
consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(i)), callback); consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(i)), callback);
TestUtils.waitForCondition(() -> { ClientsTestUtils.pollUntilTrue(
consumer.poll(Duration.ofMillis(100)); consumer,
return callback.successCount >= count || callback.lastError.isPresent(); () -> callback.successCount >= count || callback.lastError.isPresent(),
}, "Failed to observe commit callback before timeout"); "Failed to observe commit callback before timeout"
);
assertEquals(Optional.empty(), callback.lastError); assertEquals(Optional.empty(), callback.lastError);
assertEquals(count, callback.successCount); assertEquals(count, callback.successCount);
@ -533,10 +534,10 @@ public class PlaintextConsumerCommitTest {
var commitCallback = new RetryCommitCallback(consumer, offsetsOpt); var commitCallback = new RetryCommitCallback(consumer, offsetsOpt);
commitCallback.sendAsyncCommit(); commitCallback.sendAsyncCommit();
TestUtils.waitForCondition(() -> { ClientsTestUtils.pollUntilTrue(
consumer.poll(Duration.ofMillis(100)); consumer,
return commitCallback.isComplete; () -> commitCallback.isComplete,
}, "Failed to observe commit callback before timeout" "Failed to observe commit callback before timeout"
); );
assertEquals(Optional.empty(), commitCallback.error); assertEquals(Optional.empty(), commitCallback.error);

View File

@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer; package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.ClientsTestUtils;
import org.apache.kafka.clients.ClientsTestUtils.TestConsumerReassignmentListener; import org.apache.kafka.clients.ClientsTestUtils.TestConsumerReassignmentListener;
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.MetricName; import org.apache.kafka.common.MetricName;
@ -559,10 +560,11 @@ public class PlaintextConsumerPollTest {
// Subscribe to different topic. This will trigger the delayed revocation exceeding rebalance timeout and get fenced // Subscribe to different topic. This will trigger the delayed revocation exceeding rebalance timeout and get fenced
consumer.subscribe(List.of(otherTopic), listener); consumer.subscribe(List.of(otherTopic), listener);
TestUtils.waitForCondition(() -> { ClientsTestUtils.pollUntilTrue(
consumer.poll(Duration.ofMillis(100)); consumer,
return rebalanceTimeoutExceeded.get(); rebalanceTimeoutExceeded::get,
}, "Timeout waiting for delayed callback to complete"); "Timeout waiting for delayed callback to complete"
);
// Verify consumer recovers after being fenced, being able to continue consuming. // Verify consumer recovers after being fenced, being able to continue consuming.
// (The member should automatically rejoin on the next poll, with the new topic as subscription) // (The member should automatically rejoin on the next poll, with the new topic as subscription)