Refactor application thread requirement handling

Introduces AsyncConsumerApplicationThreadRequirement to encapsulate logic for determining when to pause event processing for application thread execution. Updates ApplicationEventProcessor and related classes to use a unified CompositePollApplicationThreadRequirement interface, simplifying constructor signatures and improving code clarity.
This commit is contained in:
Kirk True 2025-09-20 16:44:49 -07:00
parent 81b707e745
commit 81598844bd
4 changed files with 63 additions and 46 deletions

View File

@ -174,6 +174,31 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
private static final long NO_CURRENT_THREAD = -1L;
private static class AsyncConsumerApplicationThreadRequirement implements ApplicationEventProcessor.CompositePollApplicationThreadRequirement {
private final BackgroundEventHandler backgroundEventHandler;
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
public AsyncConsumerApplicationThreadRequirement(BackgroundEventHandler backgroundEventHandler,
OffsetCommitCallbackInvoker offsetCommitCallbackInvoker) {
this.backgroundEventHandler = backgroundEventHandler;
this.offsetCommitCallbackInvoker = offsetCommitCallbackInvoker;
}
@Override
public Optional<CompositePollEvent.State> requirement() {
// If there are background events to process, exit to the application thread.
if (backgroundEventHandler.size() > 0)
return Optional.of(CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED);
// If there are enqueued callbacks to invoke, exit to the application thread.
if (offsetCommitCallbackInvoker.size() > 0)
return Optional.of(CompositePollEvent.State.OFFSET_COMMIT_CALLBACKS_REQUIRED);
return Optional.empty();
}
}
private class CompositePollEventInvoker {
private CompositePollEvent latest;
@ -373,6 +398,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
// Init value is needed to avoid NPE in case of exception raised in the constructor
private Optional<ClientTelemetryReporter> clientTelemetryReporter = Optional.empty();
private final AsyncConsumerApplicationThreadRequirement asyncApplicationThreadRequirement;
private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker;
@ -508,13 +535,16 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
streamsRebalanceData
);
final CompletableEventReaper applicationEventReaper = new CompletableEventReaper(logContext);
this.asyncApplicationThreadRequirement = new AsyncConsumerApplicationThreadRequirement(
backgroundEventHandler,
offsetCommitCallbackInvoker
);
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext,
metadata,
subscriptions,
requestManagersSupplier,
networkClientDelegateSupplier,
backgroundEventHandler,
Optional.of(offsetCommitCallbackInvoker),
asyncApplicationThreadRequirement,
applicationEventReaper
);
this.applicationEventHandler = applicationEventHandlerFactory.build(
@ -616,6 +646,10 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
time,
asyncConsumerMetrics
);
this.asyncApplicationThreadRequirement = new AsyncConsumerApplicationThreadRequirement(
backgroundEventHandler,
offsetCommitCallbackInvoker
);
}
AsyncKafkaConsumer(LogContext logContext,
@ -705,14 +739,17 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
Optional.empty()
);
final CompletableEventReaper applicationEventReaper = new CompletableEventReaper(logContext);
this.asyncApplicationThreadRequirement = new AsyncConsumerApplicationThreadRequirement(
backgroundEventHandler,
offsetCommitCallbackInvoker
);
Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(
logContext,
metadata,
subscriptions,
requestManagersSupplier,
networkClientDelegateSupplier,
backgroundEventHandler,
Optional.of(offsetCommitCallbackInvoker),
asyncApplicationThreadRequirement,
applicationEventReaper
);
this.applicationEventHandler = new ApplicationEventHandler(logContext,

View File

@ -304,8 +304,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
subscriptions,
requestManagersSupplier,
networkClientDelegateSupplier,
backgroundEventHandler,
Optional.empty(),
Optional::empty,
applicationEventReaper
);
@ -415,8 +414,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
subscriptions,
requestManagersSupplier,
networkClientDelegateSupplier,
backgroundEventHandler,
Optional.empty(),
Optional::empty,
applicationEventReaper
);

View File

@ -25,7 +25,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
@ -66,32 +65,25 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
private final SubscriptionState subscriptions;
private final RequestManagers requestManagers;
private final NetworkClientDelegate networkClientDelegate;
private final RequiresApplicationThreadExecution backgroundEventProcessingRequiredTest;
private final RequiresApplicationThreadExecution offsetCommitCallbackInvocationRequiredTest;
private final CompositePollApplicationThreadRequirement compositePollApplicationThreadRequirement;
private final CompletableEventReaper applicationEventReaper;
private int metadataVersionSnapshot;
public ApplicationEventProcessor(final LogContext logContext,
final RequestManagers requestManagers,
final NetworkClientDelegate networkClientDelegate,
final ConsumerMetadata metadata,
final SubscriptionState subscriptions,
final BackgroundEventHandler backgroundEventHandler,
final Optional<OffsetCommitCallbackInvoker> offsetCommitCallbackInvoker,
final NetworkClientDelegate networkClientDelegate,
final CompositePollApplicationThreadRequirement compositePollApplicationThreadRequirement,
final CompletableEventReaper applicationEventReaper) {
this.log = logContext.logger(ApplicationEventProcessor.class);
this.requestManagers = requestManagers;
this.networkClientDelegate = networkClientDelegate;
this.metadata = metadata;
this.subscriptions = subscriptions;
this.networkClientDelegate = networkClientDelegate;
this.compositePollApplicationThreadRequirement = compositePollApplicationThreadRequirement;
this.applicationEventReaper = applicationEventReaper;
this.metadataVersionSnapshot = metadata.updateVersion();
// If there are background events to process, exit to the application thread.
this.backgroundEventProcessingRequiredTest = () -> backgroundEventHandler.size() > 0;
// If there are enqueued callbacks to invoke, exit to the application thread.
this.offsetCommitCallbackInvocationRequiredTest = () -> offsetCommitCallbackInvoker.isPresent() && offsetCommitCallbackInvoker.get().size() > 0;
}
@SuppressWarnings({"CyclomaticComplexity", "JavaNCSSCheck"})
@ -296,15 +288,12 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
}
private boolean maybePauseCompositePoll(CompositePollEvent event, ApplicationEvent.Type nextEventType) {
if (backgroundEventProcessingRequiredTest.requiresApplicationThread()) {
log.debug("Pausing event processing for {} with {} as next step", event, nextEventType);
event.complete(CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED, Optional.of(nextEventType));
return true;
}
Optional<CompositePollEvent.State> stateOpt = compositePollApplicationThreadRequirement.requirement();
if (offsetCommitCallbackInvocationRequiredTest.requiresApplicationThread()) {
log.debug("Pausing event processing for {} with {} as next step", event, nextEventType);
event.complete(CompositePollEvent.State.OFFSET_COMMIT_CALLBACKS_REQUIRED, Optional.of(nextEventType));
if (stateOpt.isPresent()) {
CompositePollEvent.State state = stateOpt.get();
log.debug("Pausing event processing for {} with {} as next step", state, nextEventType);
event.complete(state, Optional.of(nextEventType));
return true;
}
@ -838,8 +827,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
final SubscriptionState subscriptions,
final Supplier<RequestManagers> requestManagersSupplier,
final Supplier<NetworkClientDelegate> networkClientDelegateSupplier,
final BackgroundEventHandler backgroundEventHandler,
final Optional<OffsetCommitCallbackInvoker> offsetCommitCallbackInvoker,
final CompositePollApplicationThreadRequirement applicationThreadRequirement,
final CompletableEventReaper applicationEventReaper) {
return new CachedSupplier<>() {
@Override
@ -850,11 +838,10 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
return new ApplicationEventProcessor(
logContext,
requestManagers,
networkClientDelegate,
metadata,
subscriptions,
backgroundEventHandler,
offsetCommitCallbackInvoker,
networkClientDelegate,
applicationThreadRequirement,
applicationEventReaper
);
}
@ -934,13 +921,13 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
}
/**
* This interface exists mostly to make the code more intuitive. When {@link #requiresApplicationThread()}
* This interface exists mostly to make the code more intuitive. When {@link #requirement()}
* returns true, the {@link CompositePollEvent} processing needs to be <em>interrupted</em> so that processing
* can return to the application thread.
*/
private interface RequiresApplicationThreadExecution {
public interface CompositePollApplicationThreadRequirement {
boolean requiresApplicationThread();
Optional<CompositePollEvent.State> requirement();
}
private static class CompositePollPsuedoEvent<T> implements CompletableEvent<T> {

View File

@ -29,7 +29,6 @@ import org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager;
import org.apache.kafka.clients.consumer.internals.FetchRequestManager;
import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;
import org.apache.kafka.clients.consumer.internals.OffsetsRequestManager;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.clients.consumer.internals.StreamsGroupHeartbeatRequestManager;
@ -93,8 +92,6 @@ public class ApplicationEventProcessorTest {
private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
private final StreamsGroupHeartbeatRequestManager streamsGroupHeartbeatRequestManager = mock(StreamsGroupHeartbeatRequestManager.class);
private final StreamsMembershipManager streamsMembershipManager = mock(StreamsMembershipManager.class);
private final BackgroundEventHandler backgroundEventHandler = mock(BackgroundEventHandler.class);
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker = mock(OffsetCommitCallbackInvoker.class);
private final NetworkClientDelegate networkClientDelegate = mock(NetworkClientDelegate.class);
private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class);
private ApplicationEventProcessor processor;
@ -115,11 +112,10 @@ public class ApplicationEventProcessorTest {
processor = new ApplicationEventProcessor(
new LogContext(),
requestManagers,
networkClientDelegate,
metadata,
subscriptionState,
backgroundEventHandler,
Optional.of(offsetCommitCallbackInvoker),
networkClientDelegate,
Optional::empty,
applicationEventReaper
);
}
@ -140,11 +136,10 @@ public class ApplicationEventProcessorTest {
processor = new ApplicationEventProcessor(
new LogContext(),
requestManagers,
networkClientDelegate,
metadata,
subscriptionState,
backgroundEventHandler,
Optional.of(offsetCommitCallbackInvoker),
networkClientDelegate,
Optional::empty,
applicationEventReaper
);
}