From 81598844bde86cc64c72260349abee4c12e79caf Mon Sep 17 00:00:00 2001 From: Kirk True Date: Sat, 20 Sep 2025 16:44:49 -0700 Subject: [PATCH] Refactor application thread requirement handling Introduces AsyncConsumerApplicationThreadRequirement to encapsulate logic for determining when to pause event processing for application thread execution. Updates ApplicationEventProcessor and related classes to use a unified CompositePollApplicationThreadRequirement interface, simplifying constructor signatures and improving code clarity. --- .../internals/AsyncKafkaConsumer.java | 45 +++++++++++++++++-- .../consumer/internals/ShareConsumerImpl.java | 6 +-- .../events/ApplicationEventProcessor.java | 45 +++++++------------ .../events/ApplicationEventProcessorTest.java | 13 ++---- 4 files changed, 63 insertions(+), 46 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 91a328b1837..124500432be 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -174,6 +174,31 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { private static final long NO_CURRENT_THREAD = -1L; + private static class AsyncConsumerApplicationThreadRequirement implements ApplicationEventProcessor.CompositePollApplicationThreadRequirement { + + private final BackgroundEventHandler backgroundEventHandler; + private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker; + + public AsyncConsumerApplicationThreadRequirement(BackgroundEventHandler backgroundEventHandler, + OffsetCommitCallbackInvoker offsetCommitCallbackInvoker) { + this.backgroundEventHandler = backgroundEventHandler; + this.offsetCommitCallbackInvoker = offsetCommitCallbackInvoker; + } + + @Override + public Optional requirement() { + // If there are background events to process, exit to the application thread. + if (backgroundEventHandler.size() > 0) + return Optional.of(CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED); + + // If there are enqueued callbacks to invoke, exit to the application thread. + if (offsetCommitCallbackInvoker.size() > 0) + return Optional.of(CompositePollEvent.State.OFFSET_COMMIT_CALLBACKS_REQUIRED); + + return Optional.empty(); + } + } + private class CompositePollEventInvoker { private CompositePollEvent latest; @@ -373,6 +398,8 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { // Init value is needed to avoid NPE in case of exception raised in the constructor private Optional clientTelemetryReporter = Optional.empty(); + private final AsyncConsumerApplicationThreadRequirement asyncApplicationThreadRequirement; + private final WakeupTrigger wakeupTrigger = new WakeupTrigger(); private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker; private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker; @@ -508,13 +535,16 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { streamsRebalanceData ); final CompletableEventReaper applicationEventReaper = new CompletableEventReaper(logContext); + this.asyncApplicationThreadRequirement = new AsyncConsumerApplicationThreadRequirement( + backgroundEventHandler, + offsetCommitCallbackInvoker + ); final Supplier applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext, metadata, subscriptions, requestManagersSupplier, networkClientDelegateSupplier, - backgroundEventHandler, - Optional.of(offsetCommitCallbackInvoker), + asyncApplicationThreadRequirement, applicationEventReaper ); this.applicationEventHandler = applicationEventHandlerFactory.build( @@ -616,6 +646,10 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { time, asyncConsumerMetrics ); + this.asyncApplicationThreadRequirement = new AsyncConsumerApplicationThreadRequirement( + backgroundEventHandler, + offsetCommitCallbackInvoker + ); } AsyncKafkaConsumer(LogContext logContext, @@ -705,14 +739,17 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { Optional.empty() ); final CompletableEventReaper applicationEventReaper = new CompletableEventReaper(logContext); + this.asyncApplicationThreadRequirement = new AsyncConsumerApplicationThreadRequirement( + backgroundEventHandler, + offsetCommitCallbackInvoker + ); Supplier applicationEventProcessorSupplier = ApplicationEventProcessor.supplier( logContext, metadata, subscriptions, requestManagersSupplier, networkClientDelegateSupplier, - backgroundEventHandler, - Optional.of(offsetCommitCallbackInvoker), + asyncApplicationThreadRequirement, applicationEventReaper ); this.applicationEventHandler = new ApplicationEventHandler(logContext, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index 08767c397e4..5b37407a178 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -304,8 +304,7 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { subscriptions, requestManagersSupplier, networkClientDelegateSupplier, - backgroundEventHandler, - Optional.empty(), + Optional::empty, applicationEventReaper ); @@ -415,8 +414,7 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { subscriptions, requestManagersSupplier, networkClientDelegateSupplier, - backgroundEventHandler, - Optional.empty(), + Optional::empty, applicationEventReaper ); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index dffc81feacd..20af1f70542 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -25,7 +25,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread; import org.apache.kafka.clients.consumer.internals.ConsumerUtils; import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate; import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal; -import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker; import org.apache.kafka.clients.consumer.internals.RequestManagers; import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager; import org.apache.kafka.clients.consumer.internals.SubscriptionState; @@ -66,32 +65,25 @@ public class ApplicationEventProcessor implements EventProcessor offsetCommitCallbackInvoker, + final NetworkClientDelegate networkClientDelegate, + final CompositePollApplicationThreadRequirement compositePollApplicationThreadRequirement, final CompletableEventReaper applicationEventReaper) { this.log = logContext.logger(ApplicationEventProcessor.class); this.requestManagers = requestManagers; - this.networkClientDelegate = networkClientDelegate; this.metadata = metadata; this.subscriptions = subscriptions; + this.networkClientDelegate = networkClientDelegate; + this.compositePollApplicationThreadRequirement = compositePollApplicationThreadRequirement; this.applicationEventReaper = applicationEventReaper; this.metadataVersionSnapshot = metadata.updateVersion(); - - // If there are background events to process, exit to the application thread. - this.backgroundEventProcessingRequiredTest = () -> backgroundEventHandler.size() > 0; - - // If there are enqueued callbacks to invoke, exit to the application thread. - this.offsetCommitCallbackInvocationRequiredTest = () -> offsetCommitCallbackInvoker.isPresent() && offsetCommitCallbackInvoker.get().size() > 0; } @SuppressWarnings({"CyclomaticComplexity", "JavaNCSSCheck"}) @@ -296,15 +288,12 @@ public class ApplicationEventProcessor implements EventProcessor stateOpt = compositePollApplicationThreadRequirement.requirement(); - if (offsetCommitCallbackInvocationRequiredTest.requiresApplicationThread()) { - log.debug("Pausing event processing for {} with {} as next step", event, nextEventType); - event.complete(CompositePollEvent.State.OFFSET_COMMIT_CALLBACKS_REQUIRED, Optional.of(nextEventType)); + if (stateOpt.isPresent()) { + CompositePollEvent.State state = stateOpt.get(); + log.debug("Pausing event processing for {} with {} as next step", state, nextEventType); + event.complete(state, Optional.of(nextEventType)); return true; } @@ -838,8 +827,7 @@ public class ApplicationEventProcessor implements EventProcessor requestManagersSupplier, final Supplier networkClientDelegateSupplier, - final BackgroundEventHandler backgroundEventHandler, - final Optional offsetCommitCallbackInvoker, + final CompositePollApplicationThreadRequirement applicationThreadRequirement, final CompletableEventReaper applicationEventReaper) { return new CachedSupplier<>() { @Override @@ -850,11 +838,10 @@ public class ApplicationEventProcessor implements EventProcessorinterrupted so that processing * can return to the application thread. */ - private interface RequiresApplicationThreadExecution { + public interface CompositePollApplicationThreadRequirement { - boolean requiresApplicationThread(); + Optional requirement(); } private static class CompositePollPsuedoEvent implements CompletableEvent { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index 8c5623a7f26..dcf7ea03bc9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -29,7 +29,6 @@ import org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager; import org.apache.kafka.clients.consumer.internals.FetchRequestManager; import org.apache.kafka.clients.consumer.internals.MockRebalanceListener; import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate; -import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker; import org.apache.kafka.clients.consumer.internals.OffsetsRequestManager; import org.apache.kafka.clients.consumer.internals.RequestManagers; import org.apache.kafka.clients.consumer.internals.StreamsGroupHeartbeatRequestManager; @@ -93,8 +92,6 @@ public class ApplicationEventProcessorTest { private final ConsumerMetadata metadata = mock(ConsumerMetadata.class); private final StreamsGroupHeartbeatRequestManager streamsGroupHeartbeatRequestManager = mock(StreamsGroupHeartbeatRequestManager.class); private final StreamsMembershipManager streamsMembershipManager = mock(StreamsMembershipManager.class); - private final BackgroundEventHandler backgroundEventHandler = mock(BackgroundEventHandler.class); - private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker = mock(OffsetCommitCallbackInvoker.class); private final NetworkClientDelegate networkClientDelegate = mock(NetworkClientDelegate.class); private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); private ApplicationEventProcessor processor; @@ -115,11 +112,10 @@ public class ApplicationEventProcessorTest { processor = new ApplicationEventProcessor( new LogContext(), requestManagers, - networkClientDelegate, metadata, subscriptionState, - backgroundEventHandler, - Optional.of(offsetCommitCallbackInvoker), + networkClientDelegate, + Optional::empty, applicationEventReaper ); } @@ -140,11 +136,10 @@ public class ApplicationEventProcessorTest { processor = new ApplicationEventProcessor( new LogContext(), requestManagers, - networkClientDelegate, metadata, subscriptionState, - backgroundEventHandler, - Optional.of(offsetCommitCallbackInvoker), + networkClientDelegate, + Optional::empty, applicationEventReaper ); }