diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 80a0619d067..4ac326f8bb5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -174,6 +174,35 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { private static final long NO_CURRENT_THREAD = -1L; + private class CompositePollEventInvoker { + + private final Timer timer; + private final long pollTimeMs; + private CompositePollEvent latest; + private int backoff = -1; + + public CompositePollEventInvoker(Timer timer, long pollTimeMs) { + this.timer = timer; + this.pollTimeMs = pollTimeMs; + } + + private void poll() { + if (latest == null || latest.isComplete()) { + long deadlineMs = calculateDeadlineMs(timer); + latest = new CompositePollEvent(deadlineMs, pollTimeMs, ApplicationEvent.Type.POLL); + applicationEventHandler.add(latest); + } else { + if (backoff == -1) + backoff = 1; + else + backoff *= 2; + + long sleep = Math.min(Math.min(backoff, retryBackoffMs), timer.remainingMs()); + timer.sleep(sleep); + } + } + } + /** * An {@link org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is created and executes in the * application thread for the purpose of processing {@link BackgroundEvent background events} generated by the @@ -837,13 +866,17 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); } + CompositePollEventInvoker pollEventInvoker = new CompositePollEventInvoker(timer, time.milliseconds()); + do { // We must not allow wake-ups between polling for fetches and returning the records. // If the polled fetches are not empty the consumed position has already been updated in the polling // of the fetches. A wakeup between returned fetches and returning records would lead to never // returning the records in the fetches. Thus, we trigger a possible wake-up before we poll fetches. wakeupTrigger.maybeTriggerWakeup(); - prepareFetch(timer); + processBackgroundEvents(); + offsetCommitCallbackInvoker.executeCallbacks(); + pollEventInvoker.poll(); final Fetch fetch = pollForFetches(timer); if (!fetch.isEmpty()) { // before returning the fetched records, we can send off the next round of fetches @@ -871,20 +904,6 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { } } - private void prepareFetch(Timer timer) { - processBackgroundEvents(); - offsetCommitCallbackInvoker.executeCallbacks(); - - long pollTimeMs = timer.currentTimeMs(); - - CompositePollEvent event = new CompositePollEvent( - calculateDeadlineMs(timer), - pollTimeMs, - ApplicationEvent.Type.POLL - ); - applicationEventHandler.add(event); - } - /** * Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and * partitions. diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index e498fc3797a..e510b21f802 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -241,7 +241,7 @@ public class ApplicationEventProcessor implements EventProcessor(updatePositionsFuture, event.deadlineMs())); updatePositionsFuture.whenComplete((__, updatePositionsError) -> { - if (maybeFailCompositePoll(event, updatePositionsError) || maybePauseCompositePoll()) + if (maybeFailCompositePoll(event, updatePositionsError) || maybePauseCompositePoll(event)) return; // If needed, create a fetch request if there's no data in the FetchBuffer. requestManagers.fetchRequestManager.createFetchRequests().whenComplete((___, fetchError) -> { - if (maybeFailCompositePoll(event, fetchError) || maybePauseCompositePoll()) + if (maybeFailCompositePoll(event, fetchError) || maybePauseCompositePoll(event)) return; + event.complete(); log.trace("Completed CompositePollEvent {}", event); }); }); @@ -285,14 +286,19 @@ public class ApplicationEventProcessor implements EventProcessor