From f6864a3ac46c23a530b6a32d05d548c403cd96ea Mon Sep 17 00:00:00 2001 From: Kirk True Date: Thu, 2 Oct 2025 14:20:15 -0700 Subject: [PATCH] Refactor POLL into ASYNC_POLL and SHARE_POLL --- .../internals/AsyncKafkaConsumer.java | 114 +++++++++++----- .../consumer/internals/ShareConsumerImpl.java | 4 +- .../internals/events/ApplicationEvent.java | 4 +- .../events/ApplicationEventProcessor.java | 92 ++++++------- ...sitePollEvent.java => AsyncPollEvent.java} | 25 +++- ...va => AsyncPollEventProcessorContext.java} | 61 ++++----- .../events/CompositePollEventInvoker.java | 122 ------------------ .../{PollEvent.java => SharePollEvent.java} | 30 +---- .../ApplicationEventHandlerTest.java | 4 +- .../internals/AsyncKafkaConsumerTest.java | 56 ++++---- .../internals/ConsumerNetworkThreadTest.java | 4 +- .../internals/ShareConsumerImplTest.java | 4 +- .../events/ApplicationEventProcessorTest.java | 9 +- 13 files changed, 215 insertions(+), 314 deletions(-) rename clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/{CompositePollEvent.java => AsyncPollEvent.java} (88%) rename clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/{CompositePollEventProcessorContext.java => AsyncPollEventProcessorContext.java} (74%) delete mode 100644 clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompositePollEventInvoker.java rename clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/{PollEvent.java => SharePollEvent.java} (54%) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 9a411826da3..e3f9eabbe98 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -41,6 +41,8 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandle import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent; import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent; +import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent; +import org.apache.kafka.clients.consumer.internals.events.AsyncPollEventProcessorContext; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent; @@ -49,8 +51,6 @@ import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; -import org.apache.kafka.clients.consumer.internals.events.CompositePollEventInvoker; -import org.apache.kafka.clients.consumer.internals.events.CompositePollEventProcessorContext; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent; import org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent; @@ -326,7 +326,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { // Init value is needed to avoid NPE in case of exception raised in the constructor private Optional clientTelemetryReporter = Optional.empty(); - private final CompositePollEventInvoker pollInvoker; + private AsyncPollEvent inflightPoll; private final WakeupTrigger wakeupTrigger = new WakeupTrigger(); private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker; private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker; @@ -462,7 +462,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { streamsRebalanceData ); final CompletableEventReaper applicationEventReaper = new CompletableEventReaper(logContext); - final Supplier compositePollContextSupplier = CompositePollEventProcessorContext.supplier( + final Supplier asyncPollContextSupplier = AsyncPollEventProcessorContext.supplier( logContext, networkClientDelegateSupplier, backgroundEventHandler, @@ -474,7 +474,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { metadata, subscriptions, requestManagersSupplier, - compositePollContextSupplier + asyncPollContextSupplier ); this.applicationEventHandler = applicationEventHandlerFactory.build( logContext, @@ -496,15 +496,6 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { new StreamsRebalanceListenerInvoker(logContext, s)); this.backgroundEventProcessor = new BackgroundEventProcessor(); this.backgroundEventReaper = backgroundEventReaperFactory.build(logContext); - this.pollInvoker = new CompositePollEventInvoker( - logContext, - time, - applicationEventHandler, - () -> { - offsetCommitCallbackInvoker.executeCallbacks(); - processBackgroundEvents(); - } - ); // The FetchCollector is only used on the application thread. this.fetchCollector = fetchCollectorFactory.build(logContext, @@ -579,15 +570,6 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { this.clientTelemetryReporter = Optional.empty(); this.autoCommitEnabled = autoCommitEnabled; this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors); - this.pollInvoker = new CompositePollEventInvoker( - logContext, - time, - applicationEventHandler, - () -> { - offsetCommitCallbackInvoker.executeCallbacks(); - processBackgroundEvents(); - } - ); this.backgroundEventHandler = new BackgroundEventHandler( backgroundEventQueue, time, @@ -682,7 +664,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { Optional.empty() ); final CompletableEventReaper applicationEventReaper = new CompletableEventReaper(logContext); - final Supplier compositePollContextSupplier = CompositePollEventProcessorContext.supplier( + final Supplier asyncPollContextSupplier = AsyncPollEventProcessorContext.supplier( logContext, networkClientDelegateSupplier, backgroundEventHandler, @@ -695,7 +677,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { metadata, subscriptions, requestManagersSupplier, - compositePollContextSupplier + asyncPollContextSupplier ); this.applicationEventHandler = new ApplicationEventHandler(logContext, time, @@ -708,15 +690,6 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { this.streamsRebalanceListenerInvoker = Optional.empty(); this.backgroundEventProcessor = new BackgroundEventProcessor(); this.backgroundEventReaper = new CompletableEventReaper(logContext); - this.pollInvoker = new CompositePollEventInvoker( - logContext, - time, - applicationEventHandler, - () -> { - offsetCommitCallbackInvoker.executeCallbacks(); - processBackgroundEvents(); - } - ); } // auxiliary interface for testing @@ -887,7 +860,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { // returning the records in the fetches. Thus, we trigger a possible wake-up before we poll fetches. wakeupTrigger.maybeTriggerWakeup(); - pollInvoker.poll(timer); + checkInflightPollResult(timer); final Fetch fetch = pollForFetches(timer); if (!fetch.isEmpty()) { // before returning the fetched records, we can send off the next round of fetches @@ -915,6 +888,77 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { } } + /** + * {@code checkInflightPollResult()} manages the lifetime of the {@link AsyncPollEvent} processing. If it is + * called when no event is currently processing, it will start a new event processing asynchronously. A check + * is made during each invocation to see if the inflight event has reached a + * {@link AsyncPollEvent.State terminal state}. If it has, the result will be processed accordingly. + */ + public void checkInflightPollResult(Timer timer) { + if (inflightPoll == null) { + log.trace("No existing inflight async poll event, submitting a new event"); + submitEvent(ApplicationEvent.Type.ASYNC_POLL, timer); + } + + try { + if (log.isTraceEnabled()) { + log.trace( + "Attempting to retrieve result from previously submitted {} with {} remaining on timer", + inflightPoll, + timer.remainingMs() + ); + } + + // Result should be non-null and starts off as State.STARTED. + AsyncPollEvent.Result result = inflightPoll.result(); + AsyncPollEvent.State state = result.state(); + + if (state == AsyncPollEvent.State.SUCCEEDED) { + // The async poll event has completed all the requisite stages, though it does not imply that + // there is data in the FetchBuffer yet. Make sure to clear out the inflight request. + log.trace("Event {} completed, clearing inflight", inflightPoll); + inflightPoll = null; + } else if (state == AsyncPollEvent.State.FAILED) { + // The async poll failed at one of the stages. Make sure to clear out the inflight request + // before the underlying error is surfaced to the user. + log.trace("Event {} failed, clearing inflight", inflightPoll); + inflightPoll = null; + + throw result.asKafkaException(); + } else if (state == AsyncPollEvent.State.CALLBACKS_REQUIRED) { + // The background thread detected that it needed to yield to the application thread to invoke + // callbacks. Even though the inflight reference _should_ be overwritten when the next stage of + // the event is submitted, go ahead and clear out the inflight request just to be sure. + log.trace("Event {} paused for callbacks, clearing inflight", inflightPoll); + inflightPoll = null; + + // Note: this is calling user-supplied code, so make sure to handle possible errors. + offsetCommitCallbackInvoker.executeCallbacks(); + processBackgroundEvents(); + + // The application thread callbacks are complete. Create another event to resume the polling at + // the next stage. + submitEvent(result.asNextEventType(), timer); + } + } catch (Throwable t) { + // If an exception is hit, bubble it up to the user but make sure to clear out the inflight request + // because the error effectively renders it complete. + log.debug("Event {} failed due to {}, clearing inflight", inflightPoll, String.valueOf(t)); + inflightPoll = null; + throw ConsumerUtils.maybeWrapAsKafkaException(t); + } + } + + private void submitEvent(ApplicationEvent.Type type, Timer timer) { + long deadlineMs = calculateDeadlineMs(timer); + long pollTimeMs = time.milliseconds(); + inflightPoll = new AsyncPollEvent(deadlineMs, pollTimeMs, type); + applicationEventHandler.add(inflightPoll); + + if (log.isTraceEnabled()) + log.trace("Submitted new {} with {} remaining on timer", inflightPoll, timer.remainingMs()); + } + /** * Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and * partitions. diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index ecdc9b8a048..6b578f59c02 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -38,13 +38,13 @@ import org.apache.kafka.clients.consumer.internals.events.CompletableEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.clients.consumer.internals.events.EventProcessor; -import org.apache.kafka.clients.consumer.internals.events.PollEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeAsyncEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeSyncEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent; import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent; +import org.apache.kafka.clients.consumer.internals.events.SharePollEvent; import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent; import org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent; @@ -583,7 +583,7 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { do { // Make sure the network thread can tell the application is actively polling - applicationEventHandler.add(new PollEvent(timer.currentTimeMs())); + applicationEventHandler.add(new SharePollEvent(timer.currentTimeMs())); processBackgroundEvents(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java index 20e6828777d..79ca558123a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java @@ -28,14 +28,14 @@ import java.util.Objects; public abstract class ApplicationEvent { public enum Type { - COMMIT_ASYNC, COMMIT_SYNC, COMPOSITE_POLL, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE, + COMMIT_ASYNC, COMMIT_SYNC, ASYNC_POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE, LIST_OFFSETS, CHECK_AND_UPDATE_POSITIONS, RESET_OFFSET, TOPIC_METADATA, ALL_TOPICS_METADATA, TOPIC_SUBSCRIPTION_CHANGE, TOPIC_PATTERN_SUBSCRIPTION_CHANGE, TOPIC_RE2J_PATTERN_SUBSCRIPTION_CHANGE, UPDATE_SUBSCRIPTION_METADATA, UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED, COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS, LEAVE_GROUP_ON_CLOSE, STOP_FIND_COORDINATOR_ON_CLOSE, PAUSE_PARTITIONS, RESUME_PARTITIONS, CURRENT_LAG, - SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC, + SHARE_POLL, SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC, SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE, SHARE_ACKNOWLEDGE_ON_CLOSE, SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK_REGISTRATION, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 7c78f10bbdb..c241cd5eb8b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -61,19 +61,19 @@ public class ApplicationEventProcessor implements EventProcessor compositePollContext; + private final Optional asyncPollContext; private int metadataVersionSnapshot; public ApplicationEventProcessor(final LogContext logContext, final RequestManagers requestManagers, final ConsumerMetadata metadata, final SubscriptionState subscriptions, - final Optional compositePollContext) { + final Optional asyncPollContext) { this.log = logContext.logger(ApplicationEventProcessor.class); this.requestManagers = requestManagers; this.metadata = metadata; this.subscriptions = subscriptions; - this.compositePollContext = compositePollContext; + this.asyncPollContext = asyncPollContext; this.metadataVersionSnapshot = metadata.updateVersion(); } @@ -88,16 +88,20 @@ public class ApplicationEventProcessor implements EventProcessor consumerMembershipManager.maybeReconcile(true)); - if (requestManagers.commitRequestManager.isPresent()) { - CommitRequestManager commitRequestManager = requestManagers.commitRequestManager.get(); - commitRequestManager.updateTimerAndMaybeCommit(pollTimeMs); - // all commit request generation points have been passed, - // so it's safe to notify the app thread could proceed and start fetching - requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> { - hrm.membershipManager().onConsumerPoll(); - hrm.resetPollTimer(pollTimeMs); - }); - requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> { - hrm.membershipManager().onConsumerPoll(); - hrm.resetPollTimer(pollTimeMs); - }); - } else { - // safe to unblock - no auto-commit risk here: - // 1. commitRequestManager is not present - // 2. shareConsumer has no auto-commit mechanism - requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> { - hrm.membershipManager().onConsumerPoll(); - hrm.resetPollTimer(pollTimeMs); - }); - } + requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> { + hrm.membershipManager().onConsumerPoll(); + hrm.resetPollTimer(event.pollTimeMs()); + }); } private void process(final CreateFetchRequestsEvent event) { @@ -757,16 +732,35 @@ public class ApplicationEventProcessor implements EventProcessor + consumerMembershipManager.maybeReconcile(true)); + + if (requestManagers.commitRequestManager.isPresent()) { + CommitRequestManager commitRequestManager = requestManagers.commitRequestManager.get(); + commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs()); + + requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> { + hrm.membershipManager().onConsumerPoll(); + hrm.resetPollTimer(event.pollTimeMs()); + }); + requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> { + hrm.membershipManager().onConsumerPoll(); + hrm.resetPollTimer(event.pollTimeMs()); + }); + } + nextEventType = ApplicationEvent.Type.UPDATE_SUBSCRIPTION_METADATA; if (context.maybeCompleteExceptionally(event) || context.maybeCompleteWithCallbackRequired(event, nextEventType)) @@ -805,7 +799,7 @@ public class ApplicationEventProcessor implements EventProcessor BiConsumer complete(final CompletableFuture b) { @@ -847,18 +841,18 @@ public class ApplicationEventProcessor implements EventProcessor requestManagersSupplier, - final Supplier compositePollEventProcessorContextSupplier) { + final Supplier asyncPollContextSupplier) { return new CachedSupplier<>() { @Override protected ApplicationEventProcessor create() { RequestManagers requestManagers = requestManagersSupplier.get(); - CompositePollEventProcessorContext compositePollContext = compositePollEventProcessorContextSupplier.get(); + AsyncPollEventProcessorContext asyncPollContext = asyncPollContextSupplier.get(); return new ApplicationEventProcessor( logContext, requestManagers, metadata, subscriptions, - compositePollContext + asyncPollContext ); } }; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompositePollEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java similarity index 88% rename from clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompositePollEvent.java rename to clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java index f5aa9602253..e412016ae88 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompositePollEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java @@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicReference; * This class represents the non-blocking event that executes logic functionally equivalent to the following: * *
    - *
  • {@link PollEvent}
  • + *
  • {@link SharePollEvent}
  • *
  • {@link UpdatePatternSubscriptionEvent}
  • *
  • {@link CheckAndUpdatePositionsEvent}
  • *
  • {@link CreateFetchRequestsEvent}
  • @@ -46,8 +46,8 @@ import java.util.concurrent.atomic.AtomicReference; * *

    * - * When the {@code CompositePollEvent} is created, it exists in the {@link State#STARTED} state. The background - * thread will execute the {@code CompositePollEvent} until it completes successfully ({@link State#SUCCEEDED}), + * When the {@code AsyncPollEvent} is created, it exists in the {@link State#STARTED} state. The background + * thread will execute the {@code AsyncPollEvent} until it completes successfully ({@link State#SUCCEEDED}), * hits an error ({@link State#FAILED}), or detects that the application thread needs to execute callbacks * ({@link State#CALLBACKS_REQUIRED}). * @@ -58,7 +58,7 @@ import java.util.concurrent.atomic.AtomicReference; * application thread. The background thread is able to detect when it needs to complete processing so that the * application thread can execute the awaiting callbacks. */ -public class CompositePollEvent extends ApplicationEvent { +public class AsyncPollEvent extends ApplicationEvent { public enum State { @@ -131,8 +131,8 @@ public class CompositePollEvent extends ApplicationEvent { } private static final List ALLOWED_STARTING_EVENT_TYPES = List.of( + Type.ASYNC_POLL, Type.CHECK_AND_UPDATE_POSITIONS, - Type.POLL, Type.UPDATE_SUBSCRIPTION_METADATA ); private final long deadlineMs; @@ -140,6 +140,17 @@ public class CompositePollEvent extends ApplicationEvent { private final Type startingEventType; private final AtomicReference result; + /** + * Creates a new event to signify a multi-stage processing of {@link Consumer#poll(Duration)} logic. + * + * @param deadlineMs Time, in milliseconds, at which point the event must be completed; based on the + * {@link Duration} passed to {@link Consumer#poll(Duration)} + * @param pollTimeMs Time, in milliseconds, at which point the event was created + */ + public AsyncPollEvent(long deadlineMs, long pollTimeMs) { + this(deadlineMs, pollTimeMs, Type.ASYNC_POLL); + } + /** * Creates a new event to signify a multi-stage processing of {@link Consumer#poll(Duration)} logic. * @@ -148,8 +159,8 @@ public class CompositePollEvent extends ApplicationEvent { * @param pollTimeMs Time, in milliseconds, at which point the event was created * @param startingEventType {@link ApplicationEvent.Type} that serves as the starting point for the event processing */ - public CompositePollEvent(long deadlineMs, long pollTimeMs, Type startingEventType) { - super(Type.COMPOSITE_POLL); + public AsyncPollEvent(long deadlineMs, long pollTimeMs, Type startingEventType) { + super(Type.ASYNC_POLL); this.deadlineMs = deadlineMs; this.pollTimeMs = pollTimeMs; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompositePollEventProcessorContext.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEventProcessorContext.java similarity index 74% rename from clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompositePollEventProcessorContext.java rename to clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEventProcessorContext.java index 8996f189cd2..e08bbd90645 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompositePollEventProcessorContext.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEventProcessorContext.java @@ -35,10 +35,10 @@ import java.util.function.Supplier; /** * This provides the context for the {@link ApplicationEventProcessor#process(ApplicationEvent)} that invokes the - * {@link CompositePollEvent} process method. This is mostly to avoid polluting the {@link ApplicationEventProcessor} - * with instance variables and logic that's specific only to the background {@link CompositePollEvent} processing. + * {@link AsyncPollEvent} process method. This is mostly to avoid polluting the {@link ApplicationEventProcessor} + * with instance variables and logic that's specific only to the background {@link AsyncPollEvent} processing. */ -public class CompositePollEventProcessorContext { +public class AsyncPollEventProcessorContext { private final Logger log; private final NetworkClientDelegate networkClientDelegate; @@ -47,12 +47,12 @@ public class CompositePollEventProcessorContext { private final CompletableEventReaper applicationEventReaper; private final FetchBuffer fetchBuffer; - private CompositePollEventProcessorContext(LogContext logContext, - NetworkClientDelegate networkClientDelegate, - BackgroundEventHandler backgroundEventHandler, - OffsetCommitCallbackInvoker offsetCommitCallbackInvoker, - CompletableEventReaper applicationEventReaper, - FetchBuffer fetchBuffer) { + private AsyncPollEventProcessorContext(LogContext logContext, + NetworkClientDelegate networkClientDelegate, + BackgroundEventHandler backgroundEventHandler, + OffsetCommitCallbackInvoker offsetCommitCallbackInvoker, + CompletableEventReaper applicationEventReaper, + FetchBuffer fetchBuffer) { this.log = logContext.logger(getClass()); this.networkClientDelegate = networkClientDelegate; this.backgroundEventHandler = backgroundEventHandler; @@ -65,18 +65,18 @@ public class CompositePollEventProcessorContext { * Creates a {@link Supplier} for deferred creation during invocation by * {@link ConsumerNetworkThread}. */ - public static Supplier supplier(LogContext logContext, - Supplier networkClientDelegateSupplier, - BackgroundEventHandler backgroundEventHandler, - OffsetCommitCallbackInvoker offsetCommitCallbackInvoker, - CompletableEventReaper applicationEventReaper, - FetchBuffer fetchBuffer) { + public static Supplier supplier(LogContext logContext, + Supplier networkClientDelegateSupplier, + BackgroundEventHandler backgroundEventHandler, + OffsetCommitCallbackInvoker offsetCommitCallbackInvoker, + CompletableEventReaper applicationEventReaper, + FetchBuffer fetchBuffer) { return new CachedSupplier<>() { @Override - protected CompositePollEventProcessorContext create() { + protected AsyncPollEventProcessorContext create() { NetworkClientDelegate networkClientDelegate = networkClientDelegateSupplier.get(); - return new CompositePollEventProcessorContext( + return new AsyncPollEventProcessorContext( logContext, networkClientDelegate, backgroundEventHandler, @@ -117,20 +117,23 @@ public class CompositePollEventProcessorContext { /** * Helper method that will check if any application thread user callbacks need to be executed. If so, the - * current event will be completed with {@link CompositePollEvent.State#CALLBACKS_REQUIRED} and this method + * current event will be completed with {@link AsyncPollEvent.State#CALLBACKS_REQUIRED} and this method * will return {@code true}. Otherwise, it will return {@code false}. */ - public boolean maybeCompleteWithCallbackRequired(CompositePollEvent event, ApplicationEvent.Type nextEventType) { + public boolean maybeCompleteWithCallbackRequired(AsyncPollEvent event, ApplicationEvent.Type nextEventType) { // If there are background events to process or enqueued callbacks to invoke, exit to // the application thread. if (backgroundEventHandler.size() > 0 || offsetCommitCallbackInvoker.size() > 0) { log.trace( "Pausing polling by completing {} with the state of {} and the next stage of {}", event, - CompositePollEvent.State.CALLBACKS_REQUIRED, + AsyncPollEvent.State.CALLBACKS_REQUIRED, nextEventType ); event.completeWithCallbackRequired(nextEventType); + + // This to ensure that the application thread doesn't needlessly wait for the full time out if it's + // been detected that a callback is required. fetchBuffer.wakeup(); return true; } @@ -142,10 +145,10 @@ public class CompositePollEventProcessorContext { * Helper method that checks if there's a non-null error from * {@link NetworkClientDelegate#getAndClearMetadataError()} or if the provided exception is not a timeout-based * exception. If there's an error to report to the user, the current event will be completed with - * {@link CompositePollEvent.State#FAILED} and this method will return {@code true}. Otherwise, it will + * {@link AsyncPollEvent.State#FAILED} and this method will return {@code true}. Otherwise, it will * return {@code false}. */ - public boolean maybeCompleteExceptionally(CompositePollEvent event, Throwable t) { + public boolean maybeCompleteExceptionally(AsyncPollEvent event, Throwable t) { if (maybeCompleteExceptionally(event)) return true; @@ -153,7 +156,7 @@ public class CompositePollEventProcessorContext { return false; if (t instanceof org.apache.kafka.common.errors.TimeoutException || t instanceof java.util.concurrent.TimeoutException) { - log.debug("Ignoring timeout for CompositePollEvent {}: {}", event, t.getMessage()); + log.trace("Ignoring timeout for {}: {}", event, t.getMessage()); return false; } @@ -168,10 +171,10 @@ public class CompositePollEventProcessorContext { /** * Helper method that checks if there's a non-null error from * {@link NetworkClientDelegate#getAndClearMetadataError()}, and if so, reports it to the user by completing the - * current event with {@link CompositePollEvent.State#FAILED} and returning {@code true}. Otherwise, it will + * current event with {@link AsyncPollEvent.State#FAILED} and returning {@code true}. Otherwise, it will * return {@code false}. */ - public boolean maybeCompleteExceptionally(CompositePollEvent event) { + public boolean maybeCompleteExceptionally(AsyncPollEvent event) { Optional exception = networkClientDelegate.getAndClearMetadataError(); if (exception.isPresent()) { @@ -183,18 +186,18 @@ public class CompositePollEventProcessorContext { } /** - * Helper method to complete the given event with {@link CompositePollEvent.State#FAILED}. + * Helper method to complete the given event with {@link AsyncPollEvent.State#FAILED}. */ - public void completeExceptionally(CompositePollEvent event, Throwable error) { + public void completeExceptionally(AsyncPollEvent event, Throwable error) { KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(error); event.completeExceptionally(e); log.trace("Failing event processing for {}", event, e); } /** - * Helper method to complete the given event with {@link CompositePollEvent.State#SUCCEEDED}. + * Helper method to complete the given event with {@link AsyncPollEvent.State#SUCCEEDED}. */ - public void complete(CompositePollEvent event) { + public void complete(AsyncPollEvent event) { event.completeSuccessfully(); log.trace("Completed event processing for {}", event); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompositePollEventInvoker.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompositePollEventInvoker.java deleted file mode 100644 index 0feb1c0d664..00000000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompositePollEventInvoker.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.consumer.internals.events; - -import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer; -import org.apache.kafka.clients.consumer.internals.ConsumerUtils; -import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Timer; - -import org.slf4j.Logger; - -import java.time.Duration; - -import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs; - -/** - * {@code CompositePollEventInvoker} is executed on the application thread in the - * {@link AsyncKafkaConsumer#poll(Duration)}. - */ -public class CompositePollEventInvoker { - - private final Logger log; - private final Time time; - private final ApplicationEventHandler applicationEventHandler; - private final Runnable applicationThreadCallbacks; - private CompositePollEvent inflight; - - public CompositePollEventInvoker(LogContext logContext, - Time time, - ApplicationEventHandler applicationEventHandler, - Runnable applicationThreadCallbacks) { - this.log = logContext.logger(getClass()); - this.time = time; - this.applicationEventHandler = applicationEventHandler; - this.applicationThreadCallbacks = applicationThreadCallbacks; - } - - /** - * {@code poll()} manages the lifetime of the {@link CompositePollEvent} processing. If it is called when - * no event is currently processing, it will start a new event processing asynchronously. A check is made - * during each invocation to see if the inflight event has reached a - * {@link CompositePollEvent.State terminal state}. If it has, the result will be processed accordingly. - */ - public void poll(Timer timer) { - if (inflight == null) { - log.trace("No existing inflight event, submitting a new event"); - submitEvent(ApplicationEvent.Type.POLL, timer); - } - - try { - if (log.isTraceEnabled()) { - log.trace( - "Attempting to retrieve result from previously submitted {} with {} remaining on timer", - inflight, - timer.remainingMs() - ); - } - - // Result should be non-null and starts off as State.STARTED. - CompositePollEvent.Result result = inflight.result(); - CompositePollEvent.State state = result.state(); - - if (state == CompositePollEvent.State.SUCCEEDED) { - // The composite event has completed all the requisite stages, though it does not imply that - // there is data in the FetchBuffer yet. Make sure to clear out the inflight request. - log.trace("Event {} completed, clearing inflight", inflight); - inflight = null; - } else if (state == CompositePollEvent.State.FAILED) { - // The composite event failed at one of the stages. Make sure to clear out the inflight request - // before the underlying error is surfaced to the user. - log.trace("Event {} failed, clearing inflight", inflight); - inflight = null; - - throw result.asKafkaException(); - } else if (state == CompositePollEvent.State.CALLBACKS_REQUIRED) { - // The background thread detected that it needed to yield to the application thread to invoke - // callbacks. Even though the inflight reference _should_ be overwritten when the next stage of - // the event is submitted, go ahead and clear out the inflight request just to be sure. - log.trace("Event {} paused for callbacks, clearing inflight", inflight); - inflight = null; - - // Note: this is calling user-supplied code, so make sure to handle possible errors. - applicationThreadCallbacks.run(); - - // The application thread callbacks are complete. Create another event to resume the polling at - // the next stage. - submitEvent(result.asNextEventType(), timer); - } - } catch (Throwable t) { - // If an exception is hit, bubble it up to the user but make sure to clear out the inflight request - // because the error effectively renders it complete. - log.debug("Event {} failed due to {}, clearing inflight", inflight, String.valueOf(t)); - inflight = null; - throw ConsumerUtils.maybeWrapAsKafkaException(t); - } - } - - private void submitEvent(ApplicationEvent.Type type, Timer timer) { - long deadlineMs = calculateDeadlineMs(timer); - long pollTimeMs = time.milliseconds(); - inflight = new CompositePollEvent(deadlineMs, pollTimeMs, type); - applicationEventHandler.add(inflight); - - if (log.isTraceEnabled()) - log.trace("Submitted new {} with {} remaining on timer", inflight, timer.remainingMs()); - } -} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SharePollEvent.java similarity index 54% rename from clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java rename to clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SharePollEvent.java index 37df5d9ddc2..2db7b18173c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/PollEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SharePollEvent.java @@ -16,28 +16,12 @@ */ package org.apache.kafka.clients.consumer.internals.events; -import java.util.concurrent.CompletableFuture; - -public class PollEvent extends ApplicationEvent { +public class SharePollEvent extends ApplicationEvent { private final long pollTimeMs; - /** - * A future that represents the completion of reconciliation and auto-commit - * processing. - * This future is completed when all commit request generation points have - * been passed, including: - *

      - *
    • auto-commit on rebalance
    • - *
    • auto-commit on the interval
    • - *
    - * Once completed, it signals that it's safe for the consumer to proceed with - * fetching new records. - */ - private final CompletableFuture reconcileAndAutoCommit = new CompletableFuture<>(); - - public PollEvent(final long pollTimeMs) { - super(Type.POLL); + public SharePollEvent(final long pollTimeMs) { + super(Type.SHARE_POLL); this.pollTimeMs = pollTimeMs; } @@ -45,14 +29,6 @@ public class PollEvent extends ApplicationEvent { return pollTimeMs; } - public CompletableFuture reconcileAndAutoCommit() { - return reconcileAndAutoCommit; - } - - public void markReconcileAndAutoCommitComplete() { - reconcileAndAutoCommit.complete(null); - } - @Override public String toStringBase() { return super.toStringBase() + ", pollTimeMs=" + pollTimeMs; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java index 402697227ee..891e15846f3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java @@ -19,8 +19,8 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; -import org.apache.kafka.clients.consumer.internals.events.PollEvent; import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.LogContext; @@ -61,7 +61,7 @@ public class ApplicationEventHandlerTest { asyncConsumerMetrics )) { // add event - applicationEventHandler.add(new PollEvent(time.milliseconds())); + applicationEventHandler.add(new AsyncPollEvent(time.milliseconds() + 10, time.milliseconds())); verify(asyncConsumerMetrics).recordApplicationEventQueueSize(1); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 07a4e0869a6..47036f5d6eb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -36,6 +36,7 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent; import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent; +import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent; import org.apache.kafka.clients.consumer.internals.events.CommitEvent; @@ -43,14 +44,12 @@ import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; -import org.apache.kafka.clients.consumer.internals.events.CompositePollEvent; import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent; import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.clients.consumer.internals.events.EventProcessor; import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent; import org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent; -import org.apache.kafka.clients.consumer.internals.events.PollEvent; import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent; import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent; import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent; @@ -427,7 +426,7 @@ public class AsyncKafkaConsumerTest { consumer.wakeup(); - markReconcileAndAutoCommitCompleteForPollEvent(); + completeAsyncPollEventSuccessfully(); assertThrows(WakeupException.class, () -> consumer.poll(Duration.ZERO)); assertDoesNotThrow(() -> consumer.poll(Duration.ZERO)); } @@ -447,7 +446,7 @@ public class AsyncKafkaConsumerTest { completeAssignmentChangeEventSuccessfully(); consumer.assign(singleton(tp)); - markReconcileAndAutoCommitCompleteForPollEvent(); + completeAsyncPollEventSuccessfully(); assertThrows(WakeupException.class, () -> consumer.poll(Duration.ofMinutes(1))); assertDoesNotThrow(() -> consumer.poll(Duration.ZERO)); } @@ -471,7 +470,7 @@ public class AsyncKafkaConsumerTest { completeAssignmentChangeEventSuccessfully(); consumer.assign(singleton(tp)); - markReconcileAndAutoCommitCompleteForPollEvent(); + completeAsyncPollEventSuccessfully(); // since wakeup() is called when the non-empty fetch is returned the wakeup should be ignored assertDoesNotThrow(() -> consumer.poll(Duration.ofMinutes(1))); // the previously ignored wake-up should not be ignored in the next call @@ -508,7 +507,7 @@ public class AsyncKafkaConsumerTest { completeTopicSubscriptionChangeEventSuccessfully(); consumer.subscribe(Collections.singletonList(topicName), listener); - markReconcileAndAutoCommitCompleteForPollEvent(); + completeAsyncPollEventSuccessfully(); ConsumerPollTestUtils.waitForCondition( consumer, callbackExecuted::get, @@ -533,7 +532,7 @@ public class AsyncKafkaConsumerTest { completeAssignmentChangeEventSuccessfully(); consumer.assign(singleton(tp)); - markReconcileAndAutoCommitCompleteForPollEvent(); + completeAsyncPollEventSuccessfully(); consumer.poll(Duration.ZERO); assertDoesNotThrow(() -> consumer.poll(Duration.ZERO)); @@ -679,7 +678,7 @@ public class AsyncKafkaConsumerTest { completeAssignmentChangeEventSuccessfully(); consumer.assign(Collections.singleton(new TopicPartition("foo", 0))); assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); - markReconcileAndAutoCommitCompleteForPollEvent(); + completeAsyncPollEventSuccessfully(); ConsumerPollTestUtils.waitForCondition( consumer, () -> callback.invoked == 1 && callback.exception == null, @@ -1483,7 +1482,7 @@ public class AsyncKafkaConsumerTest { backgroundEventQueue.add(e); } - markReconcileAndAutoCommitCompleteForPollEvent(); + completeAsyncPollEventSuccessfully(); // This will trigger the background event queue to process our background event message. // If any error is happening inside the rebalance callbacks, we expect the first exception to be thrown from poll. @@ -1559,7 +1558,7 @@ public class AsyncKafkaConsumerTest { backgroundEventQueue.add(errorEvent); completeAssignmentChangeEventSuccessfully(); consumer.assign(singletonList(new TopicPartition("topic", 0))); - markReconcileAndAutoCommitCompleteForPollEvent(); + completeAsyncPollEventSuccessfully(); ConsumerPollTestUtils.waitForException( consumer, t -> t.getMessage().equals(expectedException.getMessage()), @@ -1580,7 +1579,7 @@ public class AsyncKafkaConsumerTest { backgroundEventQueue.add(errorEvent2); completeAssignmentChangeEventSuccessfully(); consumer.assign(singletonList(new TopicPartition("topic", 0))); - markReconcileAndAutoCommitCompleteForPollEvent(); + completeAsyncPollEventSuccessfully(); ConsumerPollTestUtils.waitForException( consumer, t -> t.getMessage().equals(expectedException1.getMessage()), @@ -1665,9 +1664,9 @@ public class AsyncKafkaConsumerTest { completeTopicSubscriptionChangeEventSuccessfully(); consumer.subscribe(singletonList("topic1")); - markReconcileAndAutoCommitCompleteForPollEvent(); + completeAsyncPollEventSuccessfully(); consumer.poll(Duration.ofMillis(100)); - verify(applicationEventHandler, atLeastOnce()).add(any(CompositePollEvent.class)); + verify(applicationEventHandler, atLeastOnce()).add(any(AsyncPollEvent.class)); } private Properties requiredConsumerConfigAndGroupId(final String groupId) { @@ -1683,7 +1682,7 @@ public class AsyncKafkaConsumerTest { completeAssignmentChangeEventSuccessfully(); consumer.assign(singleton(new TopicPartition("t1", 1))); - markReconcileAndAutoCommitCompleteForPollEvent(); + completeAsyncPollEventSuccessfully(); consumer.poll(Duration.ZERO); } @@ -1717,7 +1716,7 @@ public class AsyncKafkaConsumerTest { ).when(fetchCollector).collectFetch(any(FetchBuffer.class)); when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); - markReconcileAndAutoCommitCompleteForPollEvent(); + completeAsyncPollEventSuccessfully(); // And then poll for up to 10000ms, which should return 2 records without timing out ConsumerRecords returnedRecords = consumer.poll(Duration.ofMillis(10000)); assertEquals(2, returnedRecords.count()); @@ -1821,7 +1820,7 @@ public class AsyncKafkaConsumerTest { // interrupt the thread and call poll try { Thread.currentThread().interrupt(); - markReconcileAndAutoCommitCompleteForPollEvent(); + completeAsyncPollEventSuccessfully(); assertThrows(InterruptException.class, () -> consumer.poll(Duration.ZERO)); } finally { // clear interrupted state again since this thread may be reused by JUnit @@ -1853,7 +1852,7 @@ public class AsyncKafkaConsumerTest { completeTopicSubscriptionChangeEventSuccessfully(); consumer.subscribe(Collections.singletonList("topic")); when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); - markReconcileAndAutoCommitCompleteForPollEvent(); + completeAsyncPollEventSuccessfully(); ConsumerPollTestUtils.waitForCondition( consumer, @@ -1921,7 +1920,7 @@ public class AsyncKafkaConsumerTest { completeUnsubscribeApplicationEventSuccessfully(); consumer.assign(singleton(new TopicPartition("topic1", 0))); - markReconcileAndAutoCommitCompleteForPollEvent(); + completeAsyncPollEventSuccessfully(); consumer.poll(Duration.ZERO); verify(applicationEventHandler, never()).addAndGet(any(UpdatePatternSubscriptionEvent.class)); @@ -2217,6 +2216,14 @@ public class AsyncKafkaConsumerTest { }).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(SeekUnvalidatedEvent.class)); } + private void completeAsyncPollEventSuccessfully() { + doAnswer(invocation -> { + AsyncPollEvent event = invocation.getArgument(0); + event.completeWithCallbackRequired(ApplicationEvent.Type.CHECK_AND_UPDATE_POSITIONS); + return null; + }).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class)); + } + private void forceCommitCallbackInvocation() { // Invokes callback consumer.commitAsync(); @@ -2294,17 +2301,4 @@ public class AsyncKafkaConsumerTest { verify(mockStreamsListener).onTasksRevoked(any()); } } - - private void markReconcileAndAutoCommitCompleteForPollEvent() { - doAnswer(invocation -> { - PollEvent event = invocation.getArgument(0); - event.markReconcileAndAutoCommitComplete(); - return null; - }).when(applicationEventHandler).add(ArgumentMatchers.isA(PollEvent.class)); - doAnswer(invocation -> { - CompositePollEvent event = invocation.getArgument(0); - event.completeWithCallbackRequired(ApplicationEvent.Type.CHECK_AND_UPDATE_POSITIONS); - return null; - }).when(applicationEventHandler).add(ArgumentMatchers.isA(CompositePollEvent.class)); - } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java index 35ccb17dfab..88004ebbcd7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java @@ -18,8 +18,8 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; -import org.apache.kafka.clients.consumer.internals.events.PollEvent; import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.metrics.Metrics; @@ -258,7 +258,7 @@ public class ConsumerNetworkThreadTest { )) { consumerNetworkThread.initializeResources(); - PollEvent event = new PollEvent(0); + AsyncPollEvent event = new AsyncPollEvent(10, 0); event.setEnqueuedMs(time.milliseconds()); applicationEventQueue.add(event); asyncConsumerMetrics.recordApplicationEventQueueSize(1); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java index 5dddd0772df..b3833098d66 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java @@ -24,11 +24,11 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandle import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; -import org.apache.kafka.clients.consumer.internals.events.PollEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent; import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent; +import org.apache.kafka.clients.consumer.internals.events.SharePollEvent; import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent; import org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent; @@ -680,7 +680,7 @@ public class ShareConsumerImplTest { consumer.subscribe(subscriptionTopic); consumer.poll(Duration.ofMillis(100)); - verify(applicationEventHandler).add(any(PollEvent.class)); + verify(applicationEventHandler).add(any(SharePollEvent.class)); verify(applicationEventHandler).addAndGet(any(ShareSubscriptionChangeEvent.class)); completeShareAcknowledgeOnCloseApplicationEventSuccessfully(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index dde3f567132..654c4772d7c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -111,7 +111,8 @@ public class ApplicationEventProcessorTest { new LogContext(), requestManagers, metadata, - subscriptionState + subscriptionState, + Optional.of(mock(AsyncPollEventProcessorContext.class)) ); } @@ -171,7 +172,7 @@ public class ApplicationEventProcessorTest { private static Stream applicationEvents() { return Stream.of( - Arguments.of(new PollEvent(100)), + Arguments.of(new AsyncPollEvent(calculateDeadlineMs(12345, 100), 100)), Arguments.of(new CreateFetchRequestsEvent(calculateDeadlineMs(12345, 100))), Arguments.of(new CheckAndUpdatePositionsEvent(500)), Arguments.of(new TopicMetadataEvent("topic", Long.MAX_VALUE)), @@ -265,12 +266,12 @@ public class ApplicationEventProcessorTest { @Test public void testPollEvent() { - PollEvent event = new PollEvent(12345); + AsyncPollEvent event = new AsyncPollEvent(12346, 12345); setupProcessor(true); when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager); + when(offsetsRequestManager.updateFetchPositions(anyLong())).thenReturn(new CompletableFuture<>()); processor.process(event); - assertTrue(event.reconcileAndAutoCommit().isDone()); verify(commitRequestManager).updateTimerAndMaybeCommit(12345); verify(membershipManager).onConsumerPoll(); verify(heartbeatRequestManager).resetPollTimer(12345);