Replace awaitNonEmptyRecords with waitForRecords in PlaintextConsumerTest

This commit is contained in:
Kirk True 2025-10-03 19:44:58 -07:00
parent a8ccdb6f48
commit e1cf7b7056
1 changed files with 8 additions and 27 deletions

View File

@ -66,7 +66,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.BROKER_COUNT;
@ -810,7 +809,7 @@ public class PlaintextConsumerTest {
// Create a consumer and consumer some messages.
var listener = new TestConsumerReassignmentListener();
consumer.subscribe(List.of(TOPIC, topic2), listener);
var records = awaitNonEmptyRecords(consumer, TP);
var records = ConsumerPollTestUtils.waitForRecords(consumer);
assertEquals(1, listener.callsToAssigned, "should be assigned once");
// Verify the metric exist.
@ -877,7 +876,7 @@ public class PlaintextConsumerTest {
// Create a consumer and consumer some messages.
var listener = new TestConsumerReassignmentListener();
consumer.subscribe(List.of(TOPIC, topic2), listener);
var records = awaitNonEmptyRecords(consumer, TP);
var records = ConsumerPollTestUtils.waitForRecords(consumer);
assertEquals(1, listener.callsToAssigned, "should be assigned once");
// Verify the metric exist.
@ -944,7 +943,7 @@ public class PlaintextConsumerTest {
sendRecords(producer, tp2, numMessages, System.currentTimeMillis());
consumer.assign(List.of(TP));
var records = awaitNonEmptyRecords(consumer, TP);
var records = ConsumerPollTestUtils.waitForRecords(consumer);
// Verify the metric exist.
Map<String, String> tags = Map.of(
@ -958,7 +957,7 @@ public class PlaintextConsumerTest {
assertEquals((double) records.count(), fetchLead.metricValue(), "The lead should be " + records.count());
consumer.assign(List.of(tp2));
awaitNonEmptyRecords(consumer, tp2);
ConsumerPollTestUtils.waitForRecords(consumer);
assertNull(consumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags)));
}
}
@ -999,7 +998,7 @@ public class PlaintextConsumerTest {
sendRecords(producer, tp2, numMessages, System.currentTimeMillis());
consumer.assign(List.of(TP));
var records = awaitNonEmptyRecords(consumer, TP);
var records = ConsumerPollTestUtils.waitForRecords(consumer);
// Verify the metric exist.
Map<String, String> tags = Map.of(
@ -1014,7 +1013,7 @@ public class PlaintextConsumerTest {
var expectedLag = numMessages - records.count();
assertEquals(expectedLag, (double) fetchLag.metricValue(), EPSILON, "The lag should be " + expectedLag);
consumer.assign(List.of(tp2));
awaitNonEmptyRecords(consumer, tp2);
ConsumerPollTestUtils.waitForRecords(consumer);
assertNull(consumer.metrics().get(new MetricName(TP + ".records-lag", "consumer-fetch-manager-metrics", "", tags)));
assertNull(consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags)));
}
@ -1058,7 +1057,7 @@ public class PlaintextConsumerTest {
sendRecords(producer, tp2, numMessages, System.currentTimeMillis());
consumer.assign(List.of(TP));
awaitNonEmptyRecords(consumer, TP);
ConsumerPollTestUtils.waitForRecords(consumer);
// Verify the metric exist.
Map<String, String> tags = Map.of(
@ -1655,7 +1654,7 @@ public class PlaintextConsumerTest {
consumer.subscribe(List.of(testTopic));
// This is here to allow the consumer time to settle the group membership/assignment.
awaitNonEmptyRecords(consumer, new TopicPartition(testTopic, 0));
ConsumerPollTestUtils.waitForRecords(consumer);
// Keep track of the last time the poll is invoked to ensure the deltas between invocations don't
// exceed the delay threshold defined above.
@ -1675,24 +1674,6 @@ public class PlaintextConsumerTest {
}
}
private ConsumerRecords<byte[], byte[]> awaitNonEmptyRecords(
Consumer<byte[], byte[]> consumer,
TopicPartition tp
) throws Exception {
AtomicReference<ConsumerRecords<byte[], byte[]>> result = new AtomicReference<>();
TestUtils.waitForCondition(() -> {
var polledRecords = consumer.poll(Duration.ofSeconds(10));
boolean hasRecords = !polledRecords.isEmpty();
if (hasRecords) {
result.set(polledRecords);
}
return hasRecords;
}, "Timed out waiting for non-empty records from topic " + tp.topic() + " partition " + tp.partition());
return result.get();
}
public static class SerializerImpl implements Serializer<byte[]> {
private final ByteArraySerializer serializer = new ByteArraySerializer();