Refactoring to remove interim callback step

This commit is contained in:
Kirk True 2025-10-05 21:06:08 -07:00
parent eace3ee1d6
commit 767316ba60
8 changed files with 118 additions and 381 deletions

View File

@ -122,7 +122,7 @@ public class ConsumerIntegrationTest {
}
});
TestUtils.waitForCondition(() -> consumer.poll(Duration.ofSeconds(1)).count() == 1,
TestUtils.waitForCondition(() -> consumer.poll(Duration.ofMillis(100)).count() == 1,
5000,
"failed to poll data");
}

View File

@ -42,7 +42,6 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProces
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncPollEventProcessorContext;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent;
@ -462,19 +461,11 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
streamsRebalanceData
);
final CompletableEventReaper applicationEventReaper = new CompletableEventReaper(logContext);
final Supplier<AsyncPollEventProcessorContext> asyncPollContextSupplier = AsyncPollEventProcessorContext.supplier(
logContext,
networkClientDelegateSupplier,
backgroundEventHandler,
offsetCommitCallbackInvoker,
applicationEventReaper,
fetchBuffer
);
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext,
metadata,
subscriptions,
requestManagersSupplier,
asyncPollContextSupplier
applicationEventReaper
);
this.applicationEventHandler = applicationEventHandlerFactory.build(
logContext,
@ -664,20 +655,12 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
Optional.empty()
);
final CompletableEventReaper applicationEventReaper = new CompletableEventReaper(logContext);
final Supplier<AsyncPollEventProcessorContext> asyncPollContextSupplier = AsyncPollEventProcessorContext.supplier(
logContext,
networkClientDelegateSupplier,
backgroundEventHandler,
offsetCommitCallbackInvoker,
applicationEventReaper,
fetchBuffer
);
Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(
logContext,
metadata,
subscriptions,
requestManagersSupplier,
asyncPollContextSupplier
applicationEventReaper
);
this.applicationEventHandler = new ApplicationEventHandler(logContext,
time,
@ -897,10 +880,14 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
public void checkInflightPollResult(Timer timer) {
if (inflightPoll == null) {
log.trace("No existing inflight async poll event, submitting a new event");
submitEvent(ApplicationEvent.Type.ASYNC_POLL, timer);
submitEvent(timer);
}
try {
// Note: this is calling user-supplied code, so make sure to handle possible errors.
offsetCommitCallbackInvoker.executeCallbacks();
processBackgroundEvents();
if (log.isTraceEnabled()) {
log.trace(
"Attempting to retrieve result from previously submitted {} with {} remaining on timer",
@ -924,21 +911,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
log.trace("Event {} failed, clearing inflight", inflightPoll);
inflightPoll = null;
throw result.asKafkaException();
} else if (state == AsyncPollEvent.State.CALLBACKS_REQUIRED) {
// The background thread detected that it needed to yield to the application thread to invoke
// callbacks. Even though the inflight reference _should_ be overwritten when the next stage of
// the event is submitted, go ahead and clear out the inflight request just to be sure.
log.trace("Event {} paused for callbacks, clearing inflight", inflightPoll);
inflightPoll = null;
// Note: this is calling user-supplied code, so make sure to handle possible errors.
offsetCommitCallbackInvoker.executeCallbacks();
processBackgroundEvents();
// The application thread callbacks are complete. Create another event to resume the polling at
// the next stage.
submitEvent(result.asNextEventType(), timer);
throw result.error();
}
} catch (Throwable t) {
// If an exception is hit, bubble it up to the user but make sure to clear out the inflight request
@ -949,10 +922,10 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
}
}
private void submitEvent(ApplicationEvent.Type type, Timer timer) {
private void submitEvent(Timer timer) {
long deadlineMs = calculateDeadlineMs(timer);
long pollTimeMs = time.milliseconds();
inflightPoll = new AsyncPollEvent(deadlineMs, pollTimeMs, type);
inflightPoll = new AsyncPollEvent(deadlineMs, pollTimeMs);
applicationEventHandler.add(inflightPoll);
if (log.isTraceEnabled())

View File

@ -19,9 +19,11 @@ package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.Acknowledgements;
import org.apache.kafka.clients.consumer.internals.CachedSupplier;
import org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager;
@ -46,6 +48,7 @@ import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -61,19 +64,19 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
private final ConsumerMetadata metadata;
private final SubscriptionState subscriptions;
private final RequestManagers requestManagers;
private final Optional<AsyncPollEventProcessorContext> asyncPollContext;
private final Optional<CompletableEventReaper> applicationEventReaper;
private int metadataVersionSnapshot;
public ApplicationEventProcessor(final LogContext logContext,
final RequestManagers requestManagers,
final ConsumerMetadata metadata,
final SubscriptionState subscriptions,
final Optional<AsyncPollEventProcessorContext> asyncPollContext) {
final Optional<CompletableEventReaper> applicationEventReaper) {
this.log = logContext.logger(ApplicationEventProcessor.class);
this.requestManagers = requestManagers;
this.metadata = metadata;
this.subscriptions = subscriptions;
this.asyncPollContext = asyncPollContext;
this.applicationEventReaper = applicationEventReaper;
this.metadataVersionSnapshot = metadata.updateVersion();
}
@ -88,8 +91,8 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
final RequestManagers requestManagers,
final ConsumerMetadata metadata,
final SubscriptionState subscriptions,
final AsyncPollEventProcessorContext asyncPollContext) {
this(logContext, requestManagers, metadata, subscriptions, Optional.of(asyncPollContext));
final CompletableEventReaper applicationEventReaper) {
this(logContext, requestManagers, metadata, subscriptions, Optional.of(applicationEventReaper));
}
@SuppressWarnings({"CyclomaticComplexity", "JavaNCSSCheck"})
@ -729,64 +732,99 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
}
private void process(final AsyncPollEvent event) {
AsyncPollEventProcessorContext context = asyncPollContext.orElseThrow(IllegalArgumentException::new);
ApplicationEvent.Type nextEventType = event.startingEventType();
log.trace("Processing poll logic for {}", event);
if (context.maybeCompleteWithCallbackRequired(event, nextEventType))
return;
// Trigger a reconciliation that can safely commit offsets if needed to rebalance,
// as we're processing before any new fetching starts
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
consumerMembershipManager.maybeReconcile(true));
if (nextEventType == ApplicationEvent.Type.ASYNC_POLL) {
log.trace("Processing {} logic for {}", nextEventType, event);
if (requestManagers.commitRequestManager.isPresent()) {
CommitRequestManager commitRequestManager = requestManagers.commitRequestManager.get();
commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
// Trigger a reconciliation that can safely commit offsets if needed to rebalance,
// as we're processing before any new fetching starts
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
consumerMembershipManager.maybeReconcile(true));
if (requestManagers.commitRequestManager.isPresent()) {
CommitRequestManager commitRequestManager = requestManagers.commitRequestManager.get();
commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs());
});
requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs());
});
}
nextEventType = ApplicationEvent.Type.CHECK_AND_UPDATE_POSITIONS;
if (context.maybeCompleteWithCallbackRequired(event, nextEventType))
return;
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs());
});
requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> {
hrm.membershipManager().onConsumerPoll();
hrm.resetPollTimer(event.pollTimeMs());
});
}
if (nextEventType == ApplicationEvent.Type.CHECK_AND_UPDATE_POSITIONS) {
log.trace("Processing {} logic for {}", nextEventType, event);
CompletableFuture<Boolean> updatePositionsFuture = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
context.trackCheckAndUpdatePositionsForTimeout(updatePositionsFuture, event.deadlineMs());
log.trace("Processing check and update positions logic for {}", event);
CompletableFuture<Boolean> updatePositionsFuture = requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
trackCheckAndUpdatePositionsForTimeout(updatePositionsFuture, event.deadlineMs());
updatePositionsFuture.whenComplete((__, updatePositionsError) -> {
if (context.maybeCompleteExceptionally(event, updatePositionsError))
updatePositionsFuture.whenComplete((__, updatePositionsError) -> {
if (maybeCompleteAsyncPollEventExceptionally(event, updatePositionsError))
return;
log.trace("Processing create fetch requests logic for {}", event);
// Create a fetch request if there's no data in the FetchBuffer.
requestManagers.fetchRequestManager.createFetchRequests().whenComplete((___, fetchError) -> {
if (maybeCompleteAsyncPollEventExceptionally(event, fetchError))
return;
log.trace("Processing {} logic for {}", ApplicationEvent.Type.CREATE_FETCH_REQUESTS, event);
// Create a fetch request if there's no data in the FetchBuffer.
requestManagers.fetchRequestManager.createFetchRequests().whenComplete((___, fetchError) -> {
if (context.maybeCompleteExceptionally(event, fetchError))
return;
context.complete(event);
});
event.completeSuccessfully();
log.trace("Completed event processing for {}", event);
});
});
}
return;
/**
* To maintain the flow from {@link ClassicKafkaConsumer}, the logic to check and update positions should be
* allowed to time out before moving on to the logic for sending fetch requests. This achieves that by reusing
* the {@link CompletableEventReaper} and allowing it to expire the {@link CompletableFuture} for the check and
* update positions stage.
*/
public void trackCheckAndUpdatePositionsForTimeout(CompletableFuture<Boolean> updatePositionsFuture, long deadlineMs) {
applicationEventReaper.ifPresent(reaper -> {
CompletableEvent<Boolean> event = new CompletableEvent<>() {
@Override
public CompletableFuture<Boolean> future() {
return updatePositionsFuture;
}
@Override
public long deadlineMs() {
return deadlineMs;
}
@Override
public String toString() {
return getClass().getSimpleName() + "{updatePositionsFuture=" + updatePositionsFuture + ", deadlineMs=" + deadlineMs + '}';
}
};
reaper.add(event);
});
}
/**
* If there's an error to report to the user, the current event will be completed with
* {@link AsyncPollEvent.State#FAILED} and this method will return {@code true}. Otherwise, it will
* return {@code false}.
*/
private boolean maybeCompleteAsyncPollEventExceptionally(AsyncPollEvent event, Throwable t) {
if (t == null)
return false;
if (t instanceof org.apache.kafka.common.errors.TimeoutException || t instanceof java.util.concurrent.TimeoutException) {
log.trace("Ignoring timeout for {}: {}", event, t.getMessage());
return false;
}
context.completeExceptionally(event, new KafkaException("Unknown next step for async poll: " + nextEventType));
if (t instanceof CompletionException) {
t = t.getCause();
}
KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t);
event.completeExceptionally(e);
log.trace("Failing event processing for {}", event, e);
return true;
}
private <T> BiConsumer<? super T, ? super Throwable> complete(final CompletableFuture<T> b) {
@ -828,18 +866,17 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
final ConsumerMetadata metadata,
final SubscriptionState subscriptions,
final Supplier<RequestManagers> requestManagersSupplier,
final Supplier<AsyncPollEventProcessorContext> asyncPollContextSupplier) {
final CompletableEventReaper applicationEventReaper) {
return new CachedSupplier<>() {
@Override
protected ApplicationEventProcessor create() {
RequestManagers requestManagers = requestManagersSupplier.get();
AsyncPollEventProcessorContext asyncPollContext = asyncPollContextSupplier.get();
return new ApplicationEventProcessor(
logContext,
requestManagers,
metadata,
subscriptions,
asyncPollContext
applicationEventReaper
);
}
};

View File

@ -17,16 +17,12 @@
package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
import org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.common.KafkaException;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
@ -34,8 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
* This class represents the non-blocking event that executes logic functionally equivalent to the following:
*
* <ul>
* <li>{@link SharePollEvent}</li>
* <li>{@link UpdatePatternSubscriptionEvent}</li>
* <li>Polling</li>
* <li>{@link CheckAndUpdatePositionsEvent}</li>
* <li>{@link CreateFetchRequestsEvent}</li>
* </ul>
@ -48,16 +43,8 @@ import java.util.concurrent.atomic.AtomicReference;
* <p/>
*
* When the {@code AsyncPollEvent} is created, it exists in the {@link State#STARTED} state. The background
* thread will execute the {@code AsyncPollEvent} until it completes successfully ({@link State#SUCCEEDED}),
* hits an error ({@link State#FAILED}), or detects that the application thread needs to execute callbacks
* ({@link State#CALLBACKS_REQUIRED}).
*
* <p/>
*
* It's possible that the background processing of the polling will need to be "paused" in order to execute a
* {@link ConsumerInterceptor}, {@link ConsumerRebalanceListener}, and/or {@link OffsetCommitCallback} in the
* application thread. The background thread is able to detect when it needs to complete processing so that the
* application thread can execute the awaiting callbacks.
* thread will execute the {@code AsyncPollEvent} until it completes successfully ({@link State#SUCCEEDED})
* or hits an error ({@link State#FAILED}).
*/
public class AsyncPollEvent extends ApplicationEvent implements MetadataErrorNotifiable {
@ -65,79 +52,51 @@ public class AsyncPollEvent extends ApplicationEvent implements MetadataErrorNot
STARTED,
SUCCEEDED,
FAILED,
CALLBACKS_REQUIRED
FAILED
}
public static class Result {
/**
* This string value is used when the {@code Result} represents a completed event. This is used so that
* {@code null} isn't used for {@link #value}.
*/
private static final Object COMPLETED_SENTINEL = "COMPLETED";
/**
* Used as the initial state/result until the terminal state is achieved.
*/
private static final Result STARTED = new Result(State.STARTED, null);
private final State state;
private final Object value;
private final KafkaException error;
public Result(State state, Object value) {
public Result(State state, KafkaException error) {
this.state = state;
this.value = value;
this.error = error;
}
public State state() {
return state;
}
public Type asNextEventType() {
if (state != State.CALLBACKS_REQUIRED)
throw new KafkaException("The usage of asNextEventType is unexpected for state: " + state);
if (!(value instanceof ApplicationEvent.Type))
throw new KafkaException("The result value for the poll was unexpected: " + value);
return (ApplicationEvent.Type) value;
}
public KafkaException asKafkaException() {
if (state != State.FAILED)
throw new KafkaException("The usage of asKafkaException is unexpected for state: " + state);
if (!(value instanceof KafkaException))
throw new KafkaException("The result value for the poll was unexpected: " + value);
return (KafkaException) value;
public KafkaException error() {
return error;
}
@Override
public String toString() {
return "Result{" + "state=" + state + ", value=" + value + '}';
return "Result{" + "state=" + state + ", error=" + error + '}';
}
@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
Result result = (Result) o;
return state == result.state && Objects.equals(value, result.value);
return state == result.state && Objects.equals(error, result.error);
}
@Override
public int hashCode() {
return Objects.hash(state, value);
return Objects.hash(state, error);
}
}
private static final List<Type> ALLOWED_STARTING_EVENT_TYPES = List.of(
Type.ASYNC_POLL,
Type.CHECK_AND_UPDATE_POSITIONS
);
private final long deadlineMs;
private final long pollTimeMs;
private final Type startingEventType;
private final AtomicReference<Result> result;
/**
@ -148,26 +107,9 @@ public class AsyncPollEvent extends ApplicationEvent implements MetadataErrorNot
* @param pollTimeMs Time, in milliseconds, at which point the event was created
*/
public AsyncPollEvent(long deadlineMs, long pollTimeMs) {
this(deadlineMs, pollTimeMs, Type.ASYNC_POLL);
}
/**
* Creates a new event to signify a multi-stage processing of {@link Consumer#poll(Duration)} logic.
*
* @param deadlineMs Time, in milliseconds, at which point the event must be completed; based on the
* {@link Duration} passed to {@link Consumer#poll(Duration)}
* @param pollTimeMs Time, in milliseconds, at which point the event was created
* @param startingEventType {@link ApplicationEvent.Type} that serves as the starting point for the event processing
*/
public AsyncPollEvent(long deadlineMs, long pollTimeMs, Type startingEventType) {
super(Type.ASYNC_POLL);
this.deadlineMs = deadlineMs;
this.pollTimeMs = pollTimeMs;
if (!ALLOWED_STARTING_EVENT_TYPES.contains(startingEventType))
throw new KafkaException("The starting event type " + startingEventType + " is not valid. Should be one of " + ALLOWED_STARTING_EVENT_TYPES);
this.startingEventType = startingEventType;
this.result = new AtomicReference<>(Result.STARTED);
}
@ -179,16 +121,12 @@ public class AsyncPollEvent extends ApplicationEvent implements MetadataErrorNot
return pollTimeMs;
}
public Type startingEventType() {
return startingEventType;
}
public Result result() {
return result.get();
}
public void completeSuccessfully() {
Result r = new Result(State.SUCCEEDED, Result.COMPLETED_SENTINEL);
Result r = new Result(State.SUCCEEDED, null);
result.compareAndSet(Result.STARTED, r);
}
@ -197,11 +135,6 @@ public class AsyncPollEvent extends ApplicationEvent implements MetadataErrorNot
result.compareAndSet(Result.STARTED, r);
}
public void completeWithCallbackRequired(Type nextEventType) {
Result r = new Result(State.CALLBACKS_REQUIRED, Objects.requireNonNull(nextEventType));
result.compareAndSet(Result.STARTED, r);
}
@Override
public void metadataError(Exception metadataException) {
completeExceptionally(ConsumerUtils.maybeWrapAsKafkaException(metadataException));
@ -212,7 +145,6 @@ public class AsyncPollEvent extends ApplicationEvent implements MetadataErrorNot
return super.toStringBase() +
", deadlineMs=" + deadlineMs +
", pollTimeMs=" + pollTimeMs +
", startingEventType=" + startingEventType +
", result=" + result.get();
}
}

View File

@ -1,204 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.internals.CachedSupplier;
import org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.clients.consumer.internals.FetchBuffer;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Supplier;
/**
* This provides the context for the {@link ApplicationEventProcessor#process(ApplicationEvent)} that invokes the
* {@link AsyncPollEvent} process method. This is mostly to avoid polluting the {@link ApplicationEventProcessor}
* with instance variables and logic that's specific only to the background {@link AsyncPollEvent} processing.
*/
public class AsyncPollEventProcessorContext {
private final Logger log;
private final NetworkClientDelegate networkClientDelegate;
private final BackgroundEventHandler backgroundEventHandler;
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
private final CompletableEventReaper applicationEventReaper;
private final FetchBuffer fetchBuffer;
private AsyncPollEventProcessorContext(LogContext logContext,
NetworkClientDelegate networkClientDelegate,
BackgroundEventHandler backgroundEventHandler,
OffsetCommitCallbackInvoker offsetCommitCallbackInvoker,
CompletableEventReaper applicationEventReaper,
FetchBuffer fetchBuffer) {
this.log = logContext.logger(getClass());
this.networkClientDelegate = networkClientDelegate;
this.backgroundEventHandler = backgroundEventHandler;
this.offsetCommitCallbackInvoker = offsetCommitCallbackInvoker;
this.applicationEventReaper = applicationEventReaper;
this.fetchBuffer = fetchBuffer;
}
/**
* Creates a {@link Supplier} for deferred creation during invocation by
* {@link ConsumerNetworkThread}.
*/
public static Supplier<AsyncPollEventProcessorContext> supplier(LogContext logContext,
Supplier<NetworkClientDelegate> networkClientDelegateSupplier,
BackgroundEventHandler backgroundEventHandler,
OffsetCommitCallbackInvoker offsetCommitCallbackInvoker,
CompletableEventReaper applicationEventReaper,
FetchBuffer fetchBuffer) {
return new CachedSupplier<>() {
@Override
protected AsyncPollEventProcessorContext create() {
NetworkClientDelegate networkClientDelegate = networkClientDelegateSupplier.get();
return new AsyncPollEventProcessorContext(
logContext,
networkClientDelegate,
backgroundEventHandler,
offsetCommitCallbackInvoker,
applicationEventReaper,
fetchBuffer
);
}
};
}
/**
* To maintain the flow from {@link ClassicKafkaConsumer}, the logic to check and update positions should be
* allowed to time out before moving on to the logic for sending fetch requests. This achieves that by reusing
* the {@link CompletableEventReaper} and allowing it to expire the {@link CompletableFuture} for the check and
* update positions stage.
*/
public void trackCheckAndUpdatePositionsForTimeout(CompletableFuture<Boolean> updatePositionsFuture, long deadlineMs) {
CompletableEvent<Boolean> event = new CompletableEvent<>() {
@Override
public CompletableFuture<Boolean> future() {
return updatePositionsFuture;
}
@Override
public long deadlineMs() {
return deadlineMs;
}
@Override
public String toString() {
return getClass().getSimpleName() + "{updatePositionsFuture=" + updatePositionsFuture + ", deadlineMs=" + deadlineMs + '}';
}
};
applicationEventReaper.add(event);
}
/**
* Helper method that will check if any application thread user callbacks need to be executed. If so, the
* current event will be completed with {@link AsyncPollEvent.State#CALLBACKS_REQUIRED} and this method
* will return {@code true}. Otherwise, it will return {@code false}.
*/
public boolean maybeCompleteWithCallbackRequired(AsyncPollEvent event, ApplicationEvent.Type nextEventType) {
// If there are background events to process or enqueued callbacks to invoke, exit to
// the application thread.
if (backgroundEventHandler.size() > 0 || offsetCommitCallbackInvoker.size() > 0) {
log.trace(
"Pausing polling by completing {} with the state of {} and the next stage of {}",
event,
AsyncPollEvent.State.CALLBACKS_REQUIRED,
nextEventType
);
event.completeWithCallbackRequired(nextEventType);
// This to ensure that the application thread doesn't needlessly wait for the full time out if it's
// been detected that a callback is required.
fetchBuffer.wakeup();
return true;
}
return false;
}
/**
* Helper method that checks if there's a non-null error from
* {@link NetworkClientDelegate#getAndClearMetadataError()} or if the provided exception is not a timeout-based
* exception. If there's an error to report to the user, the current event will be completed with
* {@link AsyncPollEvent.State#FAILED} and this method will return {@code true}. Otherwise, it will
* return {@code false}.
*/
public boolean maybeCompleteExceptionally(AsyncPollEvent event, Throwable t) {
if (maybeCompleteExceptionally(event))
return true;
if (t == null)
return false;
if (t instanceof org.apache.kafka.common.errors.TimeoutException || t instanceof java.util.concurrent.TimeoutException) {
log.trace("Ignoring timeout for {}: {}", event, t.getMessage());
return false;
}
if (t instanceof CompletionException) {
t = t.getCause();
}
completeExceptionally(event, t);
return true;
}
/**
* Helper method that checks if there's a non-null error from
* {@link NetworkClientDelegate#getAndClearMetadataError()}, and if so, reports it to the user by completing the
* current event with {@link AsyncPollEvent.State#FAILED} and returning {@code true}. Otherwise, it will
* return {@code false}.
*/
public boolean maybeCompleteExceptionally(AsyncPollEvent event) {
Optional<Exception> exception = networkClientDelegate.getAndClearMetadataError();
if (exception.isPresent()) {
completeExceptionally(event, exception.get());
return true;
}
return false;
}
/**
* Helper method to complete the given event with {@link AsyncPollEvent.State#FAILED}.
*/
public void completeExceptionally(AsyncPollEvent event, Throwable error) {
KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(error);
event.completeExceptionally(e);
log.trace("Failing event processing for {}", event, e);
}
/**
* Helper method to complete the given event with {@link AsyncPollEvent.State#SUCCEEDED}.
*/
public void complete(AsyncPollEvent event) {
event.completeSuccessfully();
log.trace("Completed event processing for {}", event);
}
}

View File

@ -2219,7 +2219,7 @@ public class AsyncKafkaConsumerTest {
private void completeAsyncPollEventSuccessfully() {
doAnswer(invocation -> {
AsyncPollEvent event = invocation.getArgument(0);
event.completeWithCallbackRequired(ApplicationEvent.Type.CHECK_AND_UPDATE_POSITIONS);
event.completeSuccessfully();
return null;
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class));
}

View File

@ -111,8 +111,7 @@ public class ApplicationEventProcessorTest {
new LogContext(),
requestManagers,
metadata,
subscriptionState,
Optional.of(mock(AsyncPollEventProcessorContext.class))
subscriptionState
);
}

View File

@ -1358,7 +1358,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
val consumer = createConsumer()
consumer.assign(java.util.List.of(tp))
assertThrows(classOf[TopicAuthorizationException], () => consumeRecords(consumer))
assertThrows(classOf[AuthorizationException], () => consumeRecords(consumer))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)