Inject NetworkClientDelegate into ApplicationEventProcessor

Adds NetworkClientDelegate as a dependency to ApplicationEventProcessor and updates AsyncKafkaConsumer and ShareConsumerImpl to supply it. Introduces error handling in composite poll processing using metadata errors from NetworkClientDelegate. Updates related tests to mock the new dependency.
This commit is contained in:
Kirk True 2025-09-20 16:05:44 -07:00
parent 6775aacc2c
commit 2c3547e06a
4 changed files with 32 additions and 7 deletions

View File

@ -109,7 +109,6 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.event.Level;
@ -513,6 +512,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
metadata,
subscriptions,
requestManagersSupplier,
networkClientDelegateSupplier,
backgroundEventHandler,
Optional.of(offsetCommitCallbackInvoker),
applicationEventReaper
@ -710,6 +710,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
metadata,
subscriptions,
requestManagersSupplier,
networkClientDelegateSupplier,
backgroundEventHandler,
Optional.of(offsetCommitCallbackInvoker),
applicationEventReaper

View File

@ -303,6 +303,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
metadata,
subscriptions,
requestManagersSupplier,
networkClientDelegateSupplier,
backgroundEventHandler,
Optional.empty(),
applicationEventReaper
@ -413,6 +414,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
metadata,
subscriptions,
requestManagersSupplier,
networkClientDelegateSupplier,
backgroundEventHandler,
Optional.empty(),
applicationEventReaper

View File

@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
@ -64,6 +65,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
private final ConsumerMetadata metadata;
private final SubscriptionState subscriptions;
private final RequestManagers requestManagers;
private final NetworkClientDelegate networkClientDelegate;
private final RequiresApplicationThreadExecution backgroundEventProcessingRequiredTest;
private final RequiresApplicationThreadExecution offsetCommitCallbackInvocationRequiredTest;
private final CompletableEventReaper applicationEventReaper;
@ -71,6 +73,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
public ApplicationEventProcessor(final LogContext logContext,
final RequestManagers requestManagers,
final NetworkClientDelegate networkClientDelegate,
final ConsumerMetadata metadata,
final SubscriptionState subscriptions,
final BackgroundEventHandler backgroundEventHandler,
@ -78,6 +81,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
final CompletableEventReaper applicationEventReaper) {
this.log = logContext.logger(ApplicationEventProcessor.class);
this.requestManagers = requestManagers;
this.networkClientDelegate = networkClientDelegate;
this.metadata = metadata;
this.subscriptions = subscriptions;
this.applicationEventReaper = applicationEventReaper;
@ -240,7 +244,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
}
private void process(final CompositePollEvent event) {
if (maybePauseCompositePoll(event, ApplicationEvent.Type.POLL))
if (maybeFailCompositePoll(event) || maybePauseCompositePoll(event, ApplicationEvent.Type.POLL))
return;
ApplicationEvent.Type nextEventType = event.nextEventType();
@ -250,7 +254,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
processPollEvent(event.pollTimeMs());
nextEventType = ApplicationEvent.Type.UPDATE_SUBSCRIPTION_METADATA;
if (maybePauseCompositePoll(event, nextEventType))
if (maybeFailCompositePoll(event) || maybePauseCompositePoll(event, nextEventType))
return;
}
@ -259,7 +263,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
processUpdatePatternSubscriptionEvent();
nextEventType = ApplicationEvent.Type.CHECK_AND_UPDATE_POSITIONS;
if (maybePauseCompositePoll(event, nextEventType))
if (maybeFailCompositePoll(event) || maybePauseCompositePoll(event, nextEventType))
return;
}
@ -269,14 +273,14 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
applicationEventReaper.add(new CompositePollPsuedoEvent<>(updatePositionsFuture, event.deadlineMs()));
updatePositionsFuture.whenComplete((__, updatePositionsError) -> {
if (maybeFailCompositePoll(event, updatePositionsError))
if (maybeFailCompositePoll(event) || maybeFailCompositePoll(event, updatePositionsError))
return;
log.debug("Processing {} logic for {}", ApplicationEvent.Type.POLL, event);
log.debug("Processing {} logic for {}", ApplicationEvent.Type.CREATE_FETCH_REQUESTS, event);
// If needed, create a fetch request if there's no data in the FetchBuffer.
requestManagers.fetchRequestManager.createFetchRequests().whenComplete((___, fetchError) -> {
if (maybeFailCompositePoll(event, fetchError))
if (maybeFailCompositePoll(event) || maybeFailCompositePoll(event, fetchError))
return;
event.complete(CompositePollEvent.State.COMPLETE, Optional.empty());
@ -326,6 +330,19 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
return true;
}
private boolean maybeFailCompositePoll(CompositePollEvent event) {
Optional<Exception> exception = networkClientDelegate.getAndClearMetadataError();
if (exception.isPresent()) {
KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(exception.get());
event.completeExceptionally(e);
log.debug("Failing event processing for {}", event, e);
return true;
}
return false;
}
private void process(final PollEvent event) {
processPollEvent(event.pollTimeMs());
event.markReconcileAndAutoCommitComplete();
@ -820,6 +837,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
final ConsumerMetadata metadata,
final SubscriptionState subscriptions,
final Supplier<RequestManagers> requestManagersSupplier,
final Supplier<NetworkClientDelegate> networkClientDelegateSupplier,
final BackgroundEventHandler backgroundEventHandler,
final Optional<OffsetCommitCallbackInvoker> offsetCommitCallbackInvoker,
final CompletableEventReaper applicationEventReaper) {
@ -827,10 +845,12 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
@Override
protected ApplicationEventProcessor create() {
RequestManagers requestManagers = requestManagersSupplier.get();
NetworkClientDelegate networkClientDelegate = networkClientDelegateSupplier.get();
return new ApplicationEventProcessor(
logContext,
requestManagers,
networkClientDelegate,
metadata,
subscriptions,
backgroundEventHandler,

View File

@ -114,6 +114,7 @@ public class ApplicationEventProcessorTest {
processor = new ApplicationEventProcessor(
new LogContext(),
requestManagers,
mock(NetworkClientDelegate.class),
metadata,
subscriptionState,
backgroundEventHandler,
@ -138,6 +139,7 @@ public class ApplicationEventProcessorTest {
processor = new ApplicationEventProcessor(
new LogContext(),
requestManagers,
mock(NetworkClientDelegate.class),
metadata,
subscriptionState,
backgroundEventHandler,