Minor refactoring and added documentation

Replaces nextEventType with startingEventType in CompositePollEvent and related classes for improved clarity and correctness. Adds validation for allowed starting event types, updates method names, improves logging, and enhances documentation for event processing context and state transitions.
This commit is contained in:
Kirk True 2025-09-30 10:50:07 -07:00
parent 5c99d81b18
commit 0aed4aff89
4 changed files with 112 additions and 21 deletions

View File

@ -759,7 +759,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
private void process(final CompositePollEvent event) { private void process(final CompositePollEvent event) {
CompositePollEventProcessorContext context = compositePollContext.orElseThrow(IllegalArgumentException::new); CompositePollEventProcessorContext context = compositePollContext.orElseThrow(IllegalArgumentException::new);
ApplicationEvent.Type nextEventType = event.nextEventType(); ApplicationEvent.Type nextEventType = event.startingEventType();
if (context.maybeCompleteExceptionally(event) || context.maybeCompleteWithCallbackRequired(event, nextEventType)) if (context.maybeCompleteExceptionally(event) || context.maybeCompleteWithCallbackRequired(event, nextEventType))
return; return;

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.kafka.clients.consumer.internals.events; package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerInterceptor; import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.consumer.OffsetCommitCallback;
@ -24,6 +25,7 @@ import org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import java.time.Duration; import java.time.Duration;
import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -68,7 +70,15 @@ public class CompositePollEvent extends ApplicationEvent {
public static class Result { public static class Result {
private static final Object COMPLETED = new Object(); /**
* This string value is used when the {@code Result} represents a completed event. This is used so that
* {@code null} isn't used for {@link #value}.
*/
private static final Object COMPLETED_SENTINEL = "COMPLETED";
/**
* Used as the initial state/result until the terminal state is achieved.
*/
private static final Result STARTED = new Result(State.STARTED, null); private static final Result STARTED = new Result(State.STARTED, null);
private final State state; private final State state;
private final Object value; private final Object value;
@ -83,6 +93,9 @@ public class CompositePollEvent extends ApplicationEvent {
} }
public Type asNextEventType() { public Type asNextEventType() {
if (state != State.CALLBACKS_REQUIRED)
throw new KafkaException("The usage of asNextEventType is unexpected for state: " + state);
if (!(value instanceof ApplicationEvent.Type)) if (!(value instanceof ApplicationEvent.Type))
throw new KafkaException("The result value for the poll was unexpected: " + value); throw new KafkaException("The result value for the poll was unexpected: " + value);
@ -90,6 +103,9 @@ public class CompositePollEvent extends ApplicationEvent {
} }
public KafkaException asKafkaException() { public KafkaException asKafkaException() {
if (state != State.FAILED)
throw new KafkaException("The usage of asKafkaException is unexpected for state: " + state);
if (!(value instanceof KafkaException)) if (!(value instanceof KafkaException))
throw new KafkaException("The result value for the poll was unexpected: " + value); throw new KafkaException("The result value for the poll was unexpected: " + value);
@ -114,16 +130,33 @@ public class CompositePollEvent extends ApplicationEvent {
} }
} }
private static final List<Type> ALLOWED_STARTING_EVENT_TYPES = List.of(
Type.CHECK_AND_UPDATE_POSITIONS,
Type.POLL,
Type.UPDATE_SUBSCRIPTION_METADATA
);
private final long deadlineMs; private final long deadlineMs;
private final long pollTimeMs; private final long pollTimeMs;
private final Type nextEventType; private final Type startingEventType;
private final AtomicReference<Result> result; private final AtomicReference<Result> result;
public CompositePollEvent(long deadlineMs, long pollTimeMs, Type nextEventType) { /**
* Creates a new event to signify a multi-stage processing of {@link Consumer#poll(Duration)} logic.
*
* @param deadlineMs Time, in milliseconds, at which point the event must be completed; based on the
* {@link Duration} passed to {@link Consumer#poll(Duration)}
* @param pollTimeMs Time, in milliseconds, at which point the event was created
* @param startingEventType {@link ApplicationEvent.Type} that serves as the starting point for the event processing
*/
public CompositePollEvent(long deadlineMs, long pollTimeMs, Type startingEventType) {
super(Type.COMPOSITE_POLL); super(Type.COMPOSITE_POLL);
this.deadlineMs = deadlineMs; this.deadlineMs = deadlineMs;
this.pollTimeMs = pollTimeMs; this.pollTimeMs = pollTimeMs;
this.nextEventType = nextEventType;
if (!ALLOWED_STARTING_EVENT_TYPES.contains(startingEventType))
throw new KafkaException("The starting event type " + startingEventType + " is not valid. Should be one of " + ALLOWED_STARTING_EVENT_TYPES);
this.startingEventType = startingEventType;
this.result = new AtomicReference<>(Result.STARTED); this.result = new AtomicReference<>(Result.STARTED);
} }
@ -135,8 +168,8 @@ public class CompositePollEvent extends ApplicationEvent {
return pollTimeMs; return pollTimeMs;
} }
public Type nextEventType() { public Type startingEventType() {
return nextEventType; return startingEventType;
} }
public Result result() { public Result result() {
@ -144,7 +177,7 @@ public class CompositePollEvent extends ApplicationEvent {
} }
public void completeSuccessfully() { public void completeSuccessfully() {
Result r = new Result(State.SUCCEEDED, Result.COMPLETED); Result r = new Result(State.SUCCEEDED, Result.COMPLETED_SENTINEL);
result.compareAndSet(Result.STARTED, r); result.compareAndSet(Result.STARTED, r);
} }
@ -160,6 +193,10 @@ public class CompositePollEvent extends ApplicationEvent {
@Override @Override
protected String toStringBase() { protected String toStringBase() {
return super.toStringBase() + ", deadlineMs=" + deadlineMs + ", pollTimeMs=" + pollTimeMs + ", nextEventType=" + nextEventType + ", result=" + result; return super.toStringBase() +
", deadlineMs=" + deadlineMs +
", pollTimeMs=" + pollTimeMs +
", startingEventType=" + startingEventType +
", result=" + result;
} }
} }

View File

@ -71,24 +71,34 @@ public class CompositePollEventInvoker {
); );
} }
// Result should be non-null and starts off as State.STARTED.
CompositePollEvent.Result result = inflight.result(); CompositePollEvent.Result result = inflight.result();
CompositePollEvent.State state = result.state(); CompositePollEvent.State state = result.state();
if (state == CompositePollEvent.State.SUCCEEDED) { if (state == CompositePollEvent.State.SUCCEEDED) {
// Make sure to clear out the inflight request since it's complete. // The composite event has completed all the requisite stages, though it does not imply that
// there is data in the FetchBuffer yet. Make sure to clear out the inflight request.
log.trace("Event {} completed, clearing inflight", inflight); log.trace("Event {} completed, clearing inflight", inflight);
inflight = null; inflight = null;
} else if (state == CompositePollEvent.State.FAILED) { } else if (state == CompositePollEvent.State.FAILED) {
// The composite event failed at one of the stages. Make sure to clear out the inflight request
// before the underlying error is surfaced to the user.
log.trace("Event {} failed, clearing inflight", inflight); log.trace("Event {} failed, clearing inflight", inflight);
inflight = null; inflight = null;
throw result.asKafkaException(); throw result.asKafkaException();
} else if (state == CompositePollEvent.State.CALLBACKS_REQUIRED) { } else if (state == CompositePollEvent.State.CALLBACKS_REQUIRED) {
// The background thread detected that it needed to yield to the application thread to invoke
// callbacks. Even though the inflight reference _should_ be overwritten when the next stage of
// the event is submitted, go ahead and clear out the inflight request just to be sure.
log.trace("Event {} paused for callbacks, clearing inflight", inflight); log.trace("Event {} paused for callbacks, clearing inflight", inflight);
inflight = null;
// Note: this is calling user-supplied code, so make sure to handle possible errors. // Note: this is calling user-supplied code, so make sure to handle possible errors.
applicationThreadCallbacks.run(); applicationThreadCallbacks.run();
// The application thread callbacks are complete. Resume the polling // The application thread callbacks are complete. Create another event to resume the polling at
// the next stage.
submitEvent(result.asNextEventType(), timer); submitEvent(result.asNextEventType(), timer);
} }
} catch (Throwable t) { } catch (Throwable t) {
@ -102,8 +112,11 @@ public class CompositePollEventInvoker {
private void submitEvent(ApplicationEvent.Type type, Timer timer) { private void submitEvent(ApplicationEvent.Type type, Timer timer) {
long deadlineMs = calculateDeadlineMs(timer); long deadlineMs = calculateDeadlineMs(timer);
inflight = new CompositePollEvent(deadlineMs, time.milliseconds(), type); long pollTimeMs = time.milliseconds();
inflight = new CompositePollEvent(deadlineMs, pollTimeMs, type);
applicationEventHandler.add(inflight); applicationEventHandler.add(inflight);
log.debug("Submitted new {} with {} remaining on timer", inflight, timer.remainingMs());
if (log.isTraceEnabled())
log.trace("Submitted new {} with {} remaining on timer", inflight, timer.remainingMs());
} }
} }

View File

@ -17,6 +17,8 @@
package org.apache.kafka.clients.consumer.internals.events; package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.internals.CachedSupplier; import org.apache.kafka.clients.consumer.internals.CachedSupplier;
import org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils; import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate; import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker; import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;
@ -30,6 +32,11 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
import java.util.function.Supplier; import java.util.function.Supplier;
/**
* This provides the context for the {@link ApplicationEventProcessor#process(ApplicationEvent)} that invokes the
* {@link CompositePollEvent} process method. This is mostly to avoid polluting the {@link ApplicationEventProcessor}
* with instance variables and logic that's specific only to the background {@link CompositePollEvent} processing.
*/
public class CompositePollEventProcessorContext { public class CompositePollEventProcessorContext {
private final Logger log; private final Logger log;
@ -50,6 +57,10 @@ public class CompositePollEventProcessorContext {
this.applicationEventReaper = applicationEventReaper; this.applicationEventReaper = applicationEventReaper;
} }
/**
* Creates a {@link Supplier} for deferred creation during invocation by
* {@link ConsumerNetworkThread}.
*/
public static Supplier<CompositePollEventProcessorContext> supplier(LogContext logContext, public static Supplier<CompositePollEventProcessorContext> supplier(LogContext logContext,
Supplier<NetworkClientDelegate> networkClientDelegateSupplier, Supplier<NetworkClientDelegate> networkClientDelegateSupplier,
BackgroundEventHandler backgroundEventHandler, BackgroundEventHandler backgroundEventHandler,
@ -71,11 +82,17 @@ public class CompositePollEventProcessorContext {
}; };
} }
public <T> void trackCheckAndUpdatePositionsForTimeout(CompletableFuture<T> future, long deadlineMs) { /**
CompletableEvent<T> event = new CompletableEvent<>() { * To maintain the flow from {@link ClassicKafkaConsumer}, the logic to check and update positions should be
* allowed to time out before moving on to the logic for sending fetch requests. This achieves that by reusing
* the {@link CompletableEventReaper} and allowing it to expire the {@link CompletableFuture} for the check and
* update positions stage.
*/
public void trackCheckAndUpdatePositionsForTimeout(CompletableFuture<Boolean> updatePositionsFuture, long deadlineMs) {
CompletableEvent<Boolean> event = new CompletableEvent<>() {
@Override @Override
public CompletableFuture<T> future() { public CompletableFuture<Boolean> future() {
return future; return updatePositionsFuture;
} }
@Override @Override
@ -85,18 +102,23 @@ public class CompositePollEventProcessorContext {
@Override @Override
public String toString() { public String toString() {
return getClass().getSimpleName() + "{future=" + future + ", deadlineMs=" + deadlineMs + '}'; return getClass().getSimpleName() + "{updatePositionsFuture=" + updatePositionsFuture + ", deadlineMs=" + deadlineMs + '}';
} }
}; };
applicationEventReaper.add(event); applicationEventReaper.add(event);
} }
/**
* Helper method that will check if any application thread user callbacks need to be executed. If so, the
* current event will be completed with {@link CompositePollEvent.State#CALLBACKS_REQUIRED} and this method
* will return {@code true}. Otherwise, it will return {@code false}.
*/
public boolean maybeCompleteWithCallbackRequired(CompositePollEvent event, ApplicationEvent.Type nextEventType) { public boolean maybeCompleteWithCallbackRequired(CompositePollEvent event, ApplicationEvent.Type nextEventType) {
// If there are background events to process or enqueued callbacks to invoke, exit to // If there are background events to process or enqueued callbacks to invoke, exit to
// the application thread. // the application thread.
if (backgroundEventHandler.size() > 0 || offsetCommitCallbackInvoker.size() > 0) { if (backgroundEventHandler.size() > 0 || offsetCommitCallbackInvoker.size() > 0) {
log.debug( log.trace(
"Pausing polling by completing {} with the state of {} and the next stage of {}", "Pausing polling by completing {} with the state of {} and the next stage of {}",
event, event,
CompositePollEvent.State.CALLBACKS_REQUIRED, CompositePollEvent.State.CALLBACKS_REQUIRED,
@ -109,6 +131,13 @@ public class CompositePollEventProcessorContext {
return false; return false;
} }
/**
* Helper method that checks if there's a non-null error from
* {@link NetworkClientDelegate#getAndClearMetadataError()} or if the provided exception is not a timeout-based
* exception. If there's an error to report to the user, the current event will be completed with
* {@link CompositePollEvent.State#FAILED} and this method will return {@code true}. Otherwise, it will
* return {@code false}.
*/
public boolean maybeCompleteExceptionally(CompositePollEvent event, Throwable t) { public boolean maybeCompleteExceptionally(CompositePollEvent event, Throwable t) {
if (maybeCompleteExceptionally(event)) if (maybeCompleteExceptionally(event))
return true; return true;
@ -129,6 +158,12 @@ public class CompositePollEventProcessorContext {
return true; return true;
} }
/**
* Helper method that checks if there's a non-null error from
* {@link NetworkClientDelegate#getAndClearMetadataError()}, and if so, reports it to the user by completing the
* current event with {@link CompositePollEvent.State#FAILED} and returning {@code true}. Otherwise, it will
* return {@code false}.
*/
public boolean maybeCompleteExceptionally(CompositePollEvent event) { public boolean maybeCompleteExceptionally(CompositePollEvent event) {
Optional<Exception> exception = networkClientDelegate.getAndClearMetadataError(); Optional<Exception> exception = networkClientDelegate.getAndClearMetadataError();
@ -140,14 +175,20 @@ public class CompositePollEventProcessorContext {
return false; return false;
} }
/**
* Helper method to complete the given event with {@link CompositePollEvent.State#FAILED}.
*/
public void completeExceptionally(CompositePollEvent event, Throwable error) { public void completeExceptionally(CompositePollEvent event, Throwable error) {
KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(error); KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(error);
event.completeExceptionally(e); event.completeExceptionally(e);
log.debug("Failing event processing for {}", event, e); log.trace("Failing event processing for {}", event, e);
} }
/**
* Helper method to complete the given event with {@link CompositePollEvent.State#SUCCEEDED}.
*/
public void complete(CompositePollEvent event) { public void complete(CompositePollEvent event) {
event.completeSuccessfully(); event.completeSuccessfully();
log.debug("Completed CompositePollEvent {}", event); log.trace("Completed event processing for {}", event);
} }
} }