diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java index 220866c240f..f8621bc9133 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java @@ -122,7 +122,7 @@ public class ConsumerIntegrationTest { } }); - TestUtils.waitForCondition(() -> consumer.poll(Duration.ofSeconds(1)).count() == 1, + TestUtils.waitForCondition(() -> consumer.poll(Duration.ofMillis(100)).count() == 1, 5000, "failed to poll data"); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index e3f9eabbe98..2ab5f24d178 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -42,7 +42,6 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProces import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent; import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent; import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent; -import org.apache.kafka.clients.consumer.internals.events.AsyncPollEventProcessorContext; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent; @@ -462,19 +461,11 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { streamsRebalanceData ); final CompletableEventReaper applicationEventReaper = new CompletableEventReaper(logContext); - final Supplier asyncPollContextSupplier = AsyncPollEventProcessorContext.supplier( - logContext, - networkClientDelegateSupplier, - backgroundEventHandler, - offsetCommitCallbackInvoker, - applicationEventReaper, - fetchBuffer - ); final Supplier applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext, metadata, subscriptions, requestManagersSupplier, - asyncPollContextSupplier + applicationEventReaper ); this.applicationEventHandler = applicationEventHandlerFactory.build( logContext, @@ -664,20 +655,12 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { Optional.empty() ); final CompletableEventReaper applicationEventReaper = new CompletableEventReaper(logContext); - final Supplier asyncPollContextSupplier = AsyncPollEventProcessorContext.supplier( - logContext, - networkClientDelegateSupplier, - backgroundEventHandler, - offsetCommitCallbackInvoker, - applicationEventReaper, - fetchBuffer - ); Supplier applicationEventProcessorSupplier = ApplicationEventProcessor.supplier( logContext, metadata, subscriptions, requestManagersSupplier, - asyncPollContextSupplier + applicationEventReaper ); this.applicationEventHandler = new ApplicationEventHandler(logContext, time, @@ -897,10 +880,14 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { public void checkInflightPollResult(Timer timer) { if (inflightPoll == null) { log.trace("No existing inflight async poll event, submitting a new event"); - submitEvent(ApplicationEvent.Type.ASYNC_POLL, timer); + submitEvent(timer); } try { + // Note: this is calling user-supplied code, so make sure to handle possible errors. + offsetCommitCallbackInvoker.executeCallbacks(); + processBackgroundEvents(); + if (log.isTraceEnabled()) { log.trace( "Attempting to retrieve result from previously submitted {} with {} remaining on timer", @@ -924,21 +911,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { log.trace("Event {} failed, clearing inflight", inflightPoll); inflightPoll = null; - throw result.asKafkaException(); - } else if (state == AsyncPollEvent.State.CALLBACKS_REQUIRED) { - // The background thread detected that it needed to yield to the application thread to invoke - // callbacks. Even though the inflight reference _should_ be overwritten when the next stage of - // the event is submitted, go ahead and clear out the inflight request just to be sure. - log.trace("Event {} paused for callbacks, clearing inflight", inflightPoll); - inflightPoll = null; - - // Note: this is calling user-supplied code, so make sure to handle possible errors. - offsetCommitCallbackInvoker.executeCallbacks(); - processBackgroundEvents(); - - // The application thread callbacks are complete. Create another event to resume the polling at - // the next stage. - submitEvent(result.asNextEventType(), timer); + throw result.error(); } } catch (Throwable t) { // If an exception is hit, bubble it up to the user but make sure to clear out the inflight request @@ -949,10 +922,10 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { } } - private void submitEvent(ApplicationEvent.Type type, Timer timer) { + private void submitEvent(Timer timer) { long deadlineMs = calculateDeadlineMs(timer); long pollTimeMs = time.milliseconds(); - inflightPoll = new AsyncPollEvent(deadlineMs, pollTimeMs, type); + inflightPoll = new AsyncPollEvent(deadlineMs, pollTimeMs); applicationEventHandler.add(inflightPoll); if (log.isTraceEnabled()) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index abd05597e64..04cd7ec6abd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -19,9 +19,11 @@ package org.apache.kafka.clients.consumer.internals.events; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.internals.Acknowledgements; import org.apache.kafka.clients.consumer.internals.CachedSupplier; +import org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer; import org.apache.kafka.clients.consumer.internals.CommitRequestManager; import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread; +import org.apache.kafka.clients.consumer.internals.ConsumerUtils; import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal; import org.apache.kafka.clients.consumer.internals.RequestManagers; import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager; @@ -46,6 +48,7 @@ import java.util.Optional; import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.function.BiConsumer; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -61,19 +64,19 @@ public class ApplicationEventProcessor implements EventProcessor asyncPollContext; + private final Optional applicationEventReaper; private int metadataVersionSnapshot; public ApplicationEventProcessor(final LogContext logContext, final RequestManagers requestManagers, final ConsumerMetadata metadata, final SubscriptionState subscriptions, - final Optional asyncPollContext) { + final Optional applicationEventReaper) { this.log = logContext.logger(ApplicationEventProcessor.class); this.requestManagers = requestManagers; this.metadata = metadata; this.subscriptions = subscriptions; - this.asyncPollContext = asyncPollContext; + this.applicationEventReaper = applicationEventReaper; this.metadataVersionSnapshot = metadata.updateVersion(); } @@ -88,8 +91,8 @@ public class ApplicationEventProcessor implements EventProcessor + consumerMembershipManager.maybeReconcile(true)); - if (nextEventType == ApplicationEvent.Type.ASYNC_POLL) { - log.trace("Processing {} logic for {}", nextEventType, event); + if (requestManagers.commitRequestManager.isPresent()) { + CommitRequestManager commitRequestManager = requestManagers.commitRequestManager.get(); + commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs()); - // Trigger a reconciliation that can safely commit offsets if needed to rebalance, - // as we're processing before any new fetching starts - requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager -> - consumerMembershipManager.maybeReconcile(true)); - - if (requestManagers.commitRequestManager.isPresent()) { - CommitRequestManager commitRequestManager = requestManagers.commitRequestManager.get(); - commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs()); - - requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> { - hrm.membershipManager().onConsumerPoll(); - hrm.resetPollTimer(event.pollTimeMs()); - }); - requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> { - hrm.membershipManager().onConsumerPoll(); - hrm.resetPollTimer(event.pollTimeMs()); - }); - } - - nextEventType = ApplicationEvent.Type.CHECK_AND_UPDATE_POSITIONS; - - if (context.maybeCompleteWithCallbackRequired(event, nextEventType)) - return; + requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> { + hrm.membershipManager().onConsumerPoll(); + hrm.resetPollTimer(event.pollTimeMs()); + }); + requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> { + hrm.membershipManager().onConsumerPoll(); + hrm.resetPollTimer(event.pollTimeMs()); + }); } - if (nextEventType == ApplicationEvent.Type.CHECK_AND_UPDATE_POSITIONS) { - log.trace("Processing {} logic for {}", nextEventType, event); - CompletableFuture updatePositionsFuture = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs()); - context.trackCheckAndUpdatePositionsForTimeout(updatePositionsFuture, event.deadlineMs()); + log.trace("Processing check and update positions logic for {}", event); + CompletableFuture updatePositionsFuture = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs()); + trackCheckAndUpdatePositionsForTimeout(updatePositionsFuture, event.deadlineMs()); - updatePositionsFuture.whenComplete((__, updatePositionsError) -> { - if (context.maybeCompleteExceptionally(event, updatePositionsError)) + updatePositionsFuture.whenComplete((__, updatePositionsError) -> { + if (maybeCompleteAsyncPollEventExceptionally(event, updatePositionsError)) + return; + + log.trace("Processing create fetch requests logic for {}", event); + + // Create a fetch request if there's no data in the FetchBuffer. + requestManagers.fetchRequestManager.createFetchRequests().whenComplete((___, fetchError) -> { + if (maybeCompleteAsyncPollEventExceptionally(event, fetchError)) return; - log.trace("Processing {} logic for {}", ApplicationEvent.Type.CREATE_FETCH_REQUESTS, event); - - // Create a fetch request if there's no data in the FetchBuffer. - requestManagers.fetchRequestManager.createFetchRequests().whenComplete((___, fetchError) -> { - if (context.maybeCompleteExceptionally(event, fetchError)) - return; - - context.complete(event); - }); + event.completeSuccessfully(); + log.trace("Completed event processing for {}", event); }); + }); + } - return; + /** + * To maintain the flow from {@link ClassicKafkaConsumer}, the logic to check and update positions should be + * allowed to time out before moving on to the logic for sending fetch requests. This achieves that by reusing + * the {@link CompletableEventReaper} and allowing it to expire the {@link CompletableFuture} for the check and + * update positions stage. + */ + public void trackCheckAndUpdatePositionsForTimeout(CompletableFuture updatePositionsFuture, long deadlineMs) { + applicationEventReaper.ifPresent(reaper -> { + CompletableEvent event = new CompletableEvent<>() { + @Override + public CompletableFuture future() { + return updatePositionsFuture; + } + + @Override + public long deadlineMs() { + return deadlineMs; + } + + @Override + public String toString() { + return getClass().getSimpleName() + "{updatePositionsFuture=" + updatePositionsFuture + ", deadlineMs=" + deadlineMs + '}'; + } + }; + + reaper.add(event); + }); + } + + /** + * If there's an error to report to the user, the current event will be completed with + * {@link AsyncPollEvent.State#FAILED} and this method will return {@code true}. Otherwise, it will + * return {@code false}. + */ + private boolean maybeCompleteAsyncPollEventExceptionally(AsyncPollEvent event, Throwable t) { + if (t == null) + return false; + + if (t instanceof org.apache.kafka.common.errors.TimeoutException || t instanceof java.util.concurrent.TimeoutException) { + log.trace("Ignoring timeout for {}: {}", event, t.getMessage()); + return false; } - context.completeExceptionally(event, new KafkaException("Unknown next step for async poll: " + nextEventType)); + if (t instanceof CompletionException) { + t = t.getCause(); + } + + KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t); + event.completeExceptionally(e); + log.trace("Failing event processing for {}", event, e); + return true; } private BiConsumer complete(final CompletableFuture b) { @@ -828,18 +866,17 @@ public class ApplicationEventProcessor implements EventProcessor requestManagersSupplier, - final Supplier asyncPollContextSupplier) { + final CompletableEventReaper applicationEventReaper) { return new CachedSupplier<>() { @Override protected ApplicationEventProcessor create() { RequestManagers requestManagers = requestManagersSupplier.get(); - AsyncPollEventProcessorContext asyncPollContext = asyncPollContextSupplier.get(); return new ApplicationEventProcessor( logContext, requestManagers, metadata, subscriptions, - asyncPollContext + applicationEventReaper ); } }; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java index 2a5b4e7cf71..145612d6fa0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java @@ -17,16 +17,12 @@ package org.apache.kafka.clients.consumer.internals.events; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerInterceptor; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer; import org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer; import org.apache.kafka.clients.consumer.internals.ConsumerUtils; import org.apache.kafka.common.KafkaException; import java.time.Duration; -import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; @@ -34,8 +30,7 @@ import java.util.concurrent.atomic.AtomicReference; * This class represents the non-blocking event that executes logic functionally equivalent to the following: * *
    - *
  • {@link SharePollEvent}
  • - *
  • {@link UpdatePatternSubscriptionEvent}
  • + *
  • Polling
  • *
  • {@link CheckAndUpdatePositionsEvent}
  • *
  • {@link CreateFetchRequestsEvent}
  • *
@@ -48,16 +43,8 @@ import java.util.concurrent.atomic.AtomicReference; *

* * When the {@code AsyncPollEvent} is created, it exists in the {@link State#STARTED} state. The background - * thread will execute the {@code AsyncPollEvent} until it completes successfully ({@link State#SUCCEEDED}), - * hits an error ({@link State#FAILED}), or detects that the application thread needs to execute callbacks - * ({@link State#CALLBACKS_REQUIRED}). - * - *

- * - * It's possible that the background processing of the polling will need to be "paused" in order to execute a - * {@link ConsumerInterceptor}, {@link ConsumerRebalanceListener}, and/or {@link OffsetCommitCallback} in the - * application thread. The background thread is able to detect when it needs to complete processing so that the - * application thread can execute the awaiting callbacks. + * thread will execute the {@code AsyncPollEvent} until it completes successfully ({@link State#SUCCEEDED}) + * or hits an error ({@link State#FAILED}). */ public class AsyncPollEvent extends ApplicationEvent implements MetadataErrorNotifiable { @@ -65,79 +52,51 @@ public class AsyncPollEvent extends ApplicationEvent implements MetadataErrorNot STARTED, SUCCEEDED, - FAILED, - CALLBACKS_REQUIRED + FAILED } public static class Result { - /** - * This string value is used when the {@code Result} represents a completed event. This is used so that - * {@code null} isn't used for {@link #value}. - */ - private static final Object COMPLETED_SENTINEL = "COMPLETED"; - /** * Used as the initial state/result until the terminal state is achieved. */ private static final Result STARTED = new Result(State.STARTED, null); private final State state; - private final Object value; + private final KafkaException error; - public Result(State state, Object value) { + public Result(State state, KafkaException error) { this.state = state; - this.value = value; + this.error = error; } public State state() { return state; } - public Type asNextEventType() { - if (state != State.CALLBACKS_REQUIRED) - throw new KafkaException("The usage of asNextEventType is unexpected for state: " + state); - - if (!(value instanceof ApplicationEvent.Type)) - throw new KafkaException("The result value for the poll was unexpected: " + value); - - return (ApplicationEvent.Type) value; - } - - public KafkaException asKafkaException() { - if (state != State.FAILED) - throw new KafkaException("The usage of asKafkaException is unexpected for state: " + state); - - if (!(value instanceof KafkaException)) - throw new KafkaException("The result value for the poll was unexpected: " + value); - - return (KafkaException) value; + public KafkaException error() { + return error; } @Override public String toString() { - return "Result{" + "state=" + state + ", value=" + value + '}'; + return "Result{" + "state=" + state + ", error=" + error + '}'; } @Override public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; Result result = (Result) o; - return state == result.state && Objects.equals(value, result.value); + return state == result.state && Objects.equals(error, result.error); } @Override public int hashCode() { - return Objects.hash(state, value); + return Objects.hash(state, error); } } - private static final List ALLOWED_STARTING_EVENT_TYPES = List.of( - Type.ASYNC_POLL, - Type.CHECK_AND_UPDATE_POSITIONS - ); private final long deadlineMs; private final long pollTimeMs; - private final Type startingEventType; private final AtomicReference result; /** @@ -148,26 +107,9 @@ public class AsyncPollEvent extends ApplicationEvent implements MetadataErrorNot * @param pollTimeMs Time, in milliseconds, at which point the event was created */ public AsyncPollEvent(long deadlineMs, long pollTimeMs) { - this(deadlineMs, pollTimeMs, Type.ASYNC_POLL); - } - - /** - * Creates a new event to signify a multi-stage processing of {@link Consumer#poll(Duration)} logic. - * - * @param deadlineMs Time, in milliseconds, at which point the event must be completed; based on the - * {@link Duration} passed to {@link Consumer#poll(Duration)} - * @param pollTimeMs Time, in milliseconds, at which point the event was created - * @param startingEventType {@link ApplicationEvent.Type} that serves as the starting point for the event processing - */ - public AsyncPollEvent(long deadlineMs, long pollTimeMs, Type startingEventType) { super(Type.ASYNC_POLL); this.deadlineMs = deadlineMs; this.pollTimeMs = pollTimeMs; - - if (!ALLOWED_STARTING_EVENT_TYPES.contains(startingEventType)) - throw new KafkaException("The starting event type " + startingEventType + " is not valid. Should be one of " + ALLOWED_STARTING_EVENT_TYPES); - - this.startingEventType = startingEventType; this.result = new AtomicReference<>(Result.STARTED); } @@ -179,16 +121,12 @@ public class AsyncPollEvent extends ApplicationEvent implements MetadataErrorNot return pollTimeMs; } - public Type startingEventType() { - return startingEventType; - } - public Result result() { return result.get(); } public void completeSuccessfully() { - Result r = new Result(State.SUCCEEDED, Result.COMPLETED_SENTINEL); + Result r = new Result(State.SUCCEEDED, null); result.compareAndSet(Result.STARTED, r); } @@ -197,11 +135,6 @@ public class AsyncPollEvent extends ApplicationEvent implements MetadataErrorNot result.compareAndSet(Result.STARTED, r); } - public void completeWithCallbackRequired(Type nextEventType) { - Result r = new Result(State.CALLBACKS_REQUIRED, Objects.requireNonNull(nextEventType)); - result.compareAndSet(Result.STARTED, r); - } - @Override public void metadataError(Exception metadataException) { completeExceptionally(ConsumerUtils.maybeWrapAsKafkaException(metadataException)); @@ -212,7 +145,6 @@ public class AsyncPollEvent extends ApplicationEvent implements MetadataErrorNot return super.toStringBase() + ", deadlineMs=" + deadlineMs + ", pollTimeMs=" + pollTimeMs + - ", startingEventType=" + startingEventType + ", result=" + result.get(); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEventProcessorContext.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEventProcessorContext.java deleted file mode 100644 index e08bbd90645..00000000000 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEventProcessorContext.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.consumer.internals.events; - -import org.apache.kafka.clients.consumer.internals.CachedSupplier; -import org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer; -import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread; -import org.apache.kafka.clients.consumer.internals.ConsumerUtils; -import org.apache.kafka.clients.consumer.internals.FetchBuffer; -import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate; -import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.utils.LogContext; - -import org.slf4j.Logger; - -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.function.Supplier; - -/** - * This provides the context for the {@link ApplicationEventProcessor#process(ApplicationEvent)} that invokes the - * {@link AsyncPollEvent} process method. This is mostly to avoid polluting the {@link ApplicationEventProcessor} - * with instance variables and logic that's specific only to the background {@link AsyncPollEvent} processing. - */ -public class AsyncPollEventProcessorContext { - - private final Logger log; - private final NetworkClientDelegate networkClientDelegate; - private final BackgroundEventHandler backgroundEventHandler; - private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker; - private final CompletableEventReaper applicationEventReaper; - private final FetchBuffer fetchBuffer; - - private AsyncPollEventProcessorContext(LogContext logContext, - NetworkClientDelegate networkClientDelegate, - BackgroundEventHandler backgroundEventHandler, - OffsetCommitCallbackInvoker offsetCommitCallbackInvoker, - CompletableEventReaper applicationEventReaper, - FetchBuffer fetchBuffer) { - this.log = logContext.logger(getClass()); - this.networkClientDelegate = networkClientDelegate; - this.backgroundEventHandler = backgroundEventHandler; - this.offsetCommitCallbackInvoker = offsetCommitCallbackInvoker; - this.applicationEventReaper = applicationEventReaper; - this.fetchBuffer = fetchBuffer; - } - - /** - * Creates a {@link Supplier} for deferred creation during invocation by - * {@link ConsumerNetworkThread}. - */ - public static Supplier supplier(LogContext logContext, - Supplier networkClientDelegateSupplier, - BackgroundEventHandler backgroundEventHandler, - OffsetCommitCallbackInvoker offsetCommitCallbackInvoker, - CompletableEventReaper applicationEventReaper, - FetchBuffer fetchBuffer) { - return new CachedSupplier<>() { - @Override - protected AsyncPollEventProcessorContext create() { - NetworkClientDelegate networkClientDelegate = networkClientDelegateSupplier.get(); - - return new AsyncPollEventProcessorContext( - logContext, - networkClientDelegate, - backgroundEventHandler, - offsetCommitCallbackInvoker, - applicationEventReaper, - fetchBuffer - ); - } - }; - } - - /** - * To maintain the flow from {@link ClassicKafkaConsumer}, the logic to check and update positions should be - * allowed to time out before moving on to the logic for sending fetch requests. This achieves that by reusing - * the {@link CompletableEventReaper} and allowing it to expire the {@link CompletableFuture} for the check and - * update positions stage. - */ - public void trackCheckAndUpdatePositionsForTimeout(CompletableFuture updatePositionsFuture, long deadlineMs) { - CompletableEvent event = new CompletableEvent<>() { - @Override - public CompletableFuture future() { - return updatePositionsFuture; - } - - @Override - public long deadlineMs() { - return deadlineMs; - } - - @Override - public String toString() { - return getClass().getSimpleName() + "{updatePositionsFuture=" + updatePositionsFuture + ", deadlineMs=" + deadlineMs + '}'; - } - }; - - applicationEventReaper.add(event); - } - - /** - * Helper method that will check if any application thread user callbacks need to be executed. If so, the - * current event will be completed with {@link AsyncPollEvent.State#CALLBACKS_REQUIRED} and this method - * will return {@code true}. Otherwise, it will return {@code false}. - */ - public boolean maybeCompleteWithCallbackRequired(AsyncPollEvent event, ApplicationEvent.Type nextEventType) { - // If there are background events to process or enqueued callbacks to invoke, exit to - // the application thread. - if (backgroundEventHandler.size() > 0 || offsetCommitCallbackInvoker.size() > 0) { - log.trace( - "Pausing polling by completing {} with the state of {} and the next stage of {}", - event, - AsyncPollEvent.State.CALLBACKS_REQUIRED, - nextEventType - ); - event.completeWithCallbackRequired(nextEventType); - - // This to ensure that the application thread doesn't needlessly wait for the full time out if it's - // been detected that a callback is required. - fetchBuffer.wakeup(); - return true; - } - - return false; - } - - /** - * Helper method that checks if there's a non-null error from - * {@link NetworkClientDelegate#getAndClearMetadataError()} or if the provided exception is not a timeout-based - * exception. If there's an error to report to the user, the current event will be completed with - * {@link AsyncPollEvent.State#FAILED} and this method will return {@code true}. Otherwise, it will - * return {@code false}. - */ - public boolean maybeCompleteExceptionally(AsyncPollEvent event, Throwable t) { - if (maybeCompleteExceptionally(event)) - return true; - - if (t == null) - return false; - - if (t instanceof org.apache.kafka.common.errors.TimeoutException || t instanceof java.util.concurrent.TimeoutException) { - log.trace("Ignoring timeout for {}: {}", event, t.getMessage()); - return false; - } - - if (t instanceof CompletionException) { - t = t.getCause(); - } - - completeExceptionally(event, t); - return true; - } - - /** - * Helper method that checks if there's a non-null error from - * {@link NetworkClientDelegate#getAndClearMetadataError()}, and if so, reports it to the user by completing the - * current event with {@link AsyncPollEvent.State#FAILED} and returning {@code true}. Otherwise, it will - * return {@code false}. - */ - public boolean maybeCompleteExceptionally(AsyncPollEvent event) { - Optional exception = networkClientDelegate.getAndClearMetadataError(); - - if (exception.isPresent()) { - completeExceptionally(event, exception.get()); - return true; - } - - return false; - } - - /** - * Helper method to complete the given event with {@link AsyncPollEvent.State#FAILED}. - */ - public void completeExceptionally(AsyncPollEvent event, Throwable error) { - KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(error); - event.completeExceptionally(e); - log.trace("Failing event processing for {}", event, e); - } - - /** - * Helper method to complete the given event with {@link AsyncPollEvent.State#SUCCEEDED}. - */ - public void complete(AsyncPollEvent event) { - event.completeSuccessfully(); - log.trace("Completed event processing for {}", event); - } -} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 47036f5d6eb..de086fef71b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -2219,7 +2219,7 @@ public class AsyncKafkaConsumerTest { private void completeAsyncPollEventSuccessfully() { doAnswer(invocation -> { AsyncPollEvent event = invocation.getArgument(0); - event.completeWithCallbackRequired(ApplicationEvent.Type.CHECK_AND_UPDATE_POSITIONS); + event.completeSuccessfully(); return null; }).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class)); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index 654c4772d7c..bcd42abe2ae 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -111,8 +111,7 @@ public class ApplicationEventProcessorTest { new LogContext(), requestManagers, metadata, - subscriptionState, - Optional.of(mock(AsyncPollEventProcessorContext.class)) + subscriptionState ); } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index bfcc0bb0d4f..7e17b153746 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -1358,7 +1358,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { val consumer = createConsumer() consumer.assign(java.util.List.of(tp)) - assertThrows(classOf[TopicAuthorizationException], () => consumeRecords(consumer)) + assertThrows(classOf[AuthorizationException], () => consumeRecords(consumer)) } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)