mirror of https://github.com/apache/kafka.git
Update AsyncKafkaConsumer.java
This commit is contained in:
parent
81598844bd
commit
71120224f4
|
@ -399,7 +399,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
private Optional<ClientTelemetryReporter> clientTelemetryReporter = Optional.empty();
|
private Optional<ClientTelemetryReporter> clientTelemetryReporter = Optional.empty();
|
||||||
|
|
||||||
private final AsyncConsumerApplicationThreadRequirement asyncApplicationThreadRequirement;
|
private final AsyncConsumerApplicationThreadRequirement asyncApplicationThreadRequirement;
|
||||||
|
|
||||||
private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
|
private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
|
||||||
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
|
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
|
||||||
private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker;
|
private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker;
|
||||||
|
@ -932,6 +931,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
// of the fetches. A wakeup between returned fetches and returning records would lead to never
|
// of the fetches. A wakeup between returned fetches and returning records would lead to never
|
||||||
// returning the records in the fetches. Thus, we trigger a possible wake-up before we poll fetches.
|
// returning the records in the fetches. Thus, we trigger a possible wake-up before we poll fetches.
|
||||||
wakeupTrigger.maybeTriggerWakeup();
|
wakeupTrigger.maybeTriggerWakeup();
|
||||||
|
|
||||||
processBackgroundEvents();
|
processBackgroundEvents();
|
||||||
offsetCommitCallbackInvoker.executeCallbacks();
|
offsetCommitCallbackInvoker.executeCallbacks();
|
||||||
pollInvoker.poll(timer);
|
pollInvoker.poll(timer);
|
||||||
|
@ -1201,6 +1201,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
try {
|
try {
|
||||||
Map<String, List<PartitionInfo>> topicMetadata =
|
Map<String, List<PartitionInfo>> topicMetadata =
|
||||||
applicationEventHandler.addAndGet(topicMetadataEvent);
|
applicationEventHandler.addAndGet(topicMetadataEvent);
|
||||||
|
|
||||||
return topicMetadata.getOrDefault(topic, Collections.emptyList());
|
return topicMetadata.getOrDefault(topic, Collections.emptyList());
|
||||||
} finally {
|
} finally {
|
||||||
wakeupTrigger.clearTask();
|
wakeupTrigger.clearTask();
|
||||||
|
@ -1889,7 +1890,12 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
||||||
* done as an optimization so that the <em>next round of data can be pre-fetched</em>.
|
* done as an optimization so that the <em>next round of data can be pre-fetched</em>.
|
||||||
*/
|
*/
|
||||||
private Fetch<K, V> collectFetch() {
|
private Fetch<K, V> collectFetch() {
|
||||||
return fetchCollector.collectFetch(fetchBuffer);
|
final Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer);
|
||||||
|
|
||||||
|
// Notify the network thread to wake up and start the next round of fetching.
|
||||||
|
applicationEventHandler.wakeupNetworkThread();
|
||||||
|
|
||||||
|
return fetch;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue