From 71120224f4d5d2528f02d5c58fe183c0bb9a4848 Mon Sep 17 00:00:00 2001 From: Kirk True Date: Sat, 20 Sep 2025 16:51:30 -0700 Subject: [PATCH] Update AsyncKafkaConsumer.java --- .../clients/consumer/internals/AsyncKafkaConsumer.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 124500432be..7402fbf3050 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -399,7 +399,6 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { private Optional clientTelemetryReporter = Optional.empty(); private final AsyncConsumerApplicationThreadRequirement asyncApplicationThreadRequirement; - private final WakeupTrigger wakeupTrigger = new WakeupTrigger(); private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker; private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker; @@ -932,6 +931,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { // of the fetches. A wakeup between returned fetches and returning records would lead to never // returning the records in the fetches. Thus, we trigger a possible wake-up before we poll fetches. wakeupTrigger.maybeTriggerWakeup(); + processBackgroundEvents(); offsetCommitCallbackInvoker.executeCallbacks(); pollInvoker.poll(timer); @@ -1201,6 +1201,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { try { Map> topicMetadata = applicationEventHandler.addAndGet(topicMetadataEvent); + return topicMetadata.getOrDefault(topic, Collections.emptyList()); } finally { wakeupTrigger.clearTask(); @@ -1889,7 +1890,12 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { * done as an optimization so that the next round of data can be pre-fetched. */ private Fetch collectFetch() { - return fetchCollector.collectFetch(fetchBuffer); + final Fetch fetch = fetchCollector.collectFetch(fetchBuffer); + + // Notify the network thread to wake up and start the next round of fetching. + applicationEventHandler.wakeupNetworkThread(); + + return fetch; } /**