From 6775aacc2c7f299ce85e5a1b3db4164a1e2b133a Mon Sep 17 00:00:00 2001 From: Kirk True Date: Sat, 20 Sep 2025 15:40:48 -0700 Subject: [PATCH] Refactor poll event handling and metadata error propagation Refactored AsyncKafkaConsumer and related classes to improve composite poll event handling, including explicit state management and pausing for background event processing or offset commit callbacks. Metadata errors are now optionally propagated via a dedicated error field in NetworkClientDelegate, allowing for more flexible error handling. Updated tests and logging to reflect these changes. --- .../internals/AsyncKafkaConsumer.java | 58 +++++++++----- .../internals/ConsumerNetworkThread.java | 23 +++++- .../internals/NetworkClientDelegate.java | 24 +++++- .../internals/OffsetsRequestManager.java | 45 ++++++----- .../consumer/internals/ShareConsumerImpl.java | 3 +- .../events/ApplicationEventProcessor.java | 50 ++++++------ .../events/CompletableEventReaper.java | 13 +++ .../internals/events/CompositePollEvent.java | 80 +++++++++++++++++-- .../clients/consumer/KafkaConsumerTest.java | 4 +- .../internals/FetchRequestManagerTest.java | 7 +- .../internals/NetworkClientDelegateTest.java | 40 +++++----- .../ShareConsumeRequestManagerTest.java | 7 +- 12 files changed, 247 insertions(+), 107 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 4ac326f8bb5..5c43d26b28a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -109,6 +109,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.event.Level; @@ -176,22 +177,26 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { private class CompositePollEventInvoker { - private final Timer timer; - private final long pollTimeMs; private CompositePollEvent latest; private int backoff = -1; - public CompositePollEventInvoker(Timer timer, long pollTimeMs) { - this.timer = timer; - this.pollTimeMs = pollTimeMs; - } + private void poll(Timer timer) { + if (latest == null) { + submitEvent(ApplicationEvent.Type.POLL, timer); + } - private void poll() { - if (latest == null || latest.isComplete()) { - long deadlineMs = calculateDeadlineMs(timer); - latest = new CompositePollEvent(deadlineMs, pollTimeMs, ApplicationEvent.Type.POLL); - applicationEventHandler.add(latest); - } else { + log.debug("Attempting to retrieve result from previously submitted {} with {} remaining on timer", latest, timer.remainingMs()); + + CompositePollEvent.Result result = latest.resultOrError(); + CompositePollEvent.State state = result.state(); + + if (state == CompositePollEvent.State.COMPLETE) { + if (fetchBuffer.isEmpty()) + submitEvent(ApplicationEvent.Type.POLL, timer); + } else if (state == CompositePollEvent.State.UNKNOWN) { + latest = null; + throw new KafkaException("Unexpected poll result received"); + } else if (state == CompositePollEvent.State.INCOMPLETE) { if (backoff == -1) backoff = 1; else @@ -199,10 +204,25 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { long sleep = Math.min(Math.min(backoff, retryBackoffMs), timer.remainingMs()); timer.sleep(sleep); + } else if (state == CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED) { + processBackgroundEvents(); + result.nextEventType().ifPresent(t -> submitEvent(t, timer)); + } else if (state == CompositePollEvent.State.OFFSET_COMMIT_CALLBACKS_REQUIRED) { + offsetCommitCallbackInvoker.executeCallbacks(); + result.nextEventType().ifPresent(t -> submitEvent(t, timer)); } } + + private void submitEvent(ApplicationEvent.Type type, Timer timer) { + long deadlineMs = calculateDeadlineMs(timer); + latest = new CompositePollEvent(deadlineMs, time.milliseconds(), type); + applicationEventHandler.add(latest); + log.debug("Submitted new {} submitted with {} remaining on timer", latest, timer.remainingMs()); + } } + private final CompositePollEventInvoker pollInvoker = new CompositePollEventInvoker(); + /** * An {@link org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is created and executes in the * application thread for the purpose of processing {@link BackgroundEvent background events} generated by the @@ -466,6 +486,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { fetchMetricsManager.throttleTimeSensor(), clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null), backgroundEventHandler, + false, asyncConsumerMetrics ); this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors); @@ -661,6 +682,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { client, metadata, backgroundEventHandler, + false, asyncConsumerMetrics ); this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors); @@ -866,8 +888,6 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); } - CompositePollEventInvoker pollEventInvoker = new CompositePollEventInvoker(timer, time.milliseconds()); - do { // We must not allow wake-ups between polling for fetches and returning the records. // If the polled fetches are not empty the consumed position has already been updated in the polling @@ -876,7 +896,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { wakeupTrigger.maybeTriggerWakeup(); processBackgroundEvents(); offsetCommitCallbackInvoker.executeCallbacks(); - pollEventInvoker.poll(); + pollInvoker.poll(timer); final Fetch fetch = pollForFetches(timer); if (!fetch.isEmpty()) { // before returning the fetched records, we can send off the next round of fetches @@ -1143,8 +1163,6 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { try { Map> topicMetadata = applicationEventHandler.addAndGet(topicMetadataEvent); - - processBackgroundEvents(); return topicMetadata.getOrDefault(topic, Collections.emptyList()); } finally { wakeupTrigger.clearTask(); @@ -1170,9 +1188,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { final AllTopicsMetadataEvent topicMetadataEvent = new AllTopicsMetadataEvent(calculateDeadlineMs(time, timeout)); wakeupTrigger.setActiveTask(topicMetadataEvent.future()); try { - Map> map = applicationEventHandler.addAndGet(topicMetadataEvent); - processBackgroundEvents(); - return map; + return applicationEventHandler.addAndGet(topicMetadataEvent); } finally { wakeupTrigger.clearTask(); } @@ -1254,7 +1270,6 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { try { Map offsets = applicationEventHandler.addAndGet(listOffsetsEvent); - processBackgroundEvents(); Map results = new HashMap<>(offsets.size()); offsets.forEach((k, v) -> results.put(k, v != null ? v.buildOffsetAndTimestamp() : null)); return results; @@ -1320,7 +1335,6 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { Map offsetAndTimestampMap; try { offsetAndTimestampMap = applicationEventHandler.addAndGet(listOffsetsEvent); - processBackgroundEvents(); return offsetAndTimestampMap.entrySet() .stream() .collect(Collectors.toMap( diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index d3ac47903b2..d5b2dc02b74 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; +import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; @@ -34,6 +35,7 @@ import org.slf4j.Logger; import java.io.Closeable; import java.time.Duration; +import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; import java.util.List; @@ -171,7 +173,8 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { cachedMaximumTimeToWait = maxTimeToWaitMs; reapExpiredApplicationEvents(currentTimeMs); - } + List> uncompletedEvents = applicationEventReaper.uncompletedEvents(); + maybeFailOnMetadataError(uncompletedEvents); } /** * Process the events—if any—that were produced by the application thread. @@ -356,4 +359,22 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { log.debug("Closed the consumer network thread"); } } + + /** + * If there is a metadata error, complete all uncompleted events that require subscription metadata. + */ + private void maybeFailOnMetadataError(List> events) { + List> subscriptionMetadataEvent = new ArrayList<>(); + + for (CompletableEvent ce : events) { + if (ce instanceof CompletableApplicationEvent && ((CompletableApplicationEvent) ce).requireSubscriptionMetadata()) + subscriptionMetadataEvent.add((CompletableApplicationEvent) ce); + } + + if (subscriptionMetadataEvent.isEmpty()) + return; + networkClientDelegate.getAndClearMetadataError().ifPresent(metadataError -> + subscriptionMetadataEvent.forEach(event -> event.future().completeExceptionally(metadataError)) + ); + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java index 1cb25bb46e0..0b827f9e1c0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java @@ -70,6 +70,8 @@ public class NetworkClientDelegate implements AutoCloseable { private final int requestTimeoutMs; private final Queue unsentRequests; private final long retryBackoffMs; + private Optional metadataError; + private final boolean notifyMetadataErrorsViaErrorQueue; private final AsyncConsumerMetrics asyncConsumerMetrics; public NetworkClientDelegate( @@ -79,6 +81,7 @@ public class NetworkClientDelegate implements AutoCloseable { final KafkaClient client, final Metadata metadata, final BackgroundEventHandler backgroundEventHandler, + final boolean notifyMetadataErrorsViaErrorQueue, final AsyncConsumerMetrics asyncConsumerMetrics) { this.time = time; this.client = client; @@ -88,6 +91,8 @@ public class NetworkClientDelegate implements AutoCloseable { this.unsentRequests = new ArrayDeque<>(); this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); + this.metadataError = Optional.empty(); + this.notifyMetadataErrorsViaErrorQueue = notifyMetadataErrorsViaErrorQueue; this.asyncConsumerMetrics = asyncConsumerMetrics; } @@ -155,7 +160,11 @@ public class NetworkClientDelegate implements AutoCloseable { try { metadata.maybeThrowAnyException(); } catch (Exception e) { - backgroundEventHandler.add(new ErrorEvent(e)); + if (notifyMetadataErrorsViaErrorQueue) { + backgroundEventHandler.add(new ErrorEvent(e)); + } else { + metadataError = Optional.of(e); + } } } @@ -238,7 +247,13 @@ public class NetworkClientDelegate implements AutoCloseable { unsent.handler ); } - + + public Optional getAndClearMetadataError() { + Optional metadataError = this.metadataError; + this.metadataError = Optional.empty(); + return metadataError; + } + public Node leastLoadedNode() { return this.client.leastLoadedNode(time.milliseconds()).node(); } @@ -437,6 +452,7 @@ public class NetworkClientDelegate implements AutoCloseable { final Sensor throttleTimeSensor, final ClientTelemetrySender clientTelemetrySender, final BackgroundEventHandler backgroundEventHandler, + final boolean notifyMetadataErrorsViaErrorQueue, final AsyncConsumerMetrics asyncConsumerMetrics) { return new CachedSupplier<>() { @Override @@ -451,7 +467,7 @@ public class NetworkClientDelegate implements AutoCloseable { metadata, throttleTimeSensor, clientTelemetrySender); - return new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, asyncConsumerMetrics); + return new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue, asyncConsumerMetrics); } }; } @@ -466,6 +482,7 @@ public class NetworkClientDelegate implements AutoCloseable { final KafkaClient client, final Metadata metadata, final BackgroundEventHandler backgroundEventHandler, + final boolean notifyMetadataErrorsViaErrorQueue, final AsyncConsumerMetrics asyncConsumerMetrics) { return new CachedSupplier<>() { @Override @@ -477,6 +494,7 @@ public class NetworkClientDelegate implements AutoCloseable { client, metadata, backgroundEventHandler, + notifyMetadataErrorsViaErrorQueue, asyncConsumerMetrics ); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java index 0e411776374..112fa3c37dd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java @@ -27,7 +27,6 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetData; import org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetResult; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; -import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.common.ClusterResource; import org.apache.kafka.common.ClusterResourceListener; import org.apache.kafka.common.IsolationLevel; @@ -55,8 +54,8 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -86,7 +85,6 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou private final Logger log; private final OffsetFetcherUtils offsetFetcherUtils; private final SubscriptionState subscriptionState; - private final BackgroundEventHandler backgroundEventHandler; private final Set requestsToRetry; private final List requestsToSend; @@ -97,6 +95,12 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou private final CommitRequestManager commitRequestManager; private final long defaultApiTimeoutMs; + /** + * Exception that occurred while updating positions after the triggering event had already + * expired. It will be propagated and cleared on the next call to update fetch positions. + */ + private final AtomicReference cachedUpdatePositionsException = new AtomicReference<>(); + /** * This holds the last OffsetFetch request triggered to retrieve committed offsets to update * fetch positions that hasn't completed yet. When a response is received, it's used to @@ -133,7 +137,6 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou this.requestsToRetry = new HashSet<>(); this.requestsToSend = new ArrayList<>(); this.subscriptionState = subscriptionState; - this.backgroundEventHandler = backgroundEventHandler; this.time = time; this.requestTimeoutMs = requestTimeoutMs; this.defaultApiTimeoutMs = defaultApiTimeoutMs; @@ -235,6 +238,10 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou CompletableFuture result = new CompletableFuture<>(); try { + if (maybeCompleteWithPreviousException(result)) { + return result; + } + validatePositionsIfNeeded(); if (subscriptionState.hasAllFetchPositions()) { @@ -258,6 +265,15 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou return result; } + private boolean maybeCompleteWithPreviousException(CompletableFuture result) { + Throwable cachedException = cachedUpdatePositionsException.getAndSet(null); + if (cachedException != null) { + result.completeExceptionally(cachedException); + return true; + } + return false; + } + /** * Generate requests to fetch offsets and update positions once a response is received. This will first attempt * to use the committed offsets if available. If no committed offsets available, it will use the partition @@ -305,10 +321,7 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou result.whenComplete((__, error) -> { boolean updatePositionsExpired = time.milliseconds() >= deadlineMs; if (error != null && updatePositionsExpired) { - if (error instanceof CompletionException) - error = error.getCause(); - - backgroundEventHandler.add(new ErrorEvent(error)); + cachedUpdatePositionsException.set(error); } }); } @@ -329,12 +342,8 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou // Mark partitions that need reset, using the configured reset strategy. If no // strategy is defined, this will raise a NoOffsetForPartitionException exception. subscriptionState.resetInitializingPositions(initializingPartitions::contains); - } catch (Throwable t) { - if (t instanceof CompletionException) - t = t.getCause(); - - backgroundEventHandler.add(new ErrorEvent(t)); - result.completeExceptionally(t); + } catch (Exception e) { + result.completeExceptionally(e); return result; } @@ -470,13 +479,9 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou try { partitionAutoOffsetResetStrategyMap = offsetFetcherUtils.getOffsetResetStrategyForPartitions(); - } catch (Throwable t) { - if (t instanceof CompletionException) - t = t.getCause(); - - backgroundEventHandler.add(new ErrorEvent(t)); + } catch (Exception e) { CompletableFuture result = new CompletableFuture<>(); - result.completeExceptionally(t); + result.completeExceptionally(e); return result; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index dd6d35427af..c9db04ceb9e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -279,6 +279,7 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { shareFetchMetricsManager.throttleTimeSensor(), clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null), backgroundEventHandler, + true, asyncConsumerMetrics ); this.completedAcknowledgements = new LinkedList<>(); @@ -387,7 +388,7 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { backgroundEventQueue, time, asyncConsumerMetrics); final Supplier networkClientDelegateSupplier = - NetworkClientDelegate.supplier(time, config, logContext, client, metadata, backgroundEventHandler, asyncConsumerMetrics); + NetworkClientDelegate.supplier(time, config, logContext, client, metadata, backgroundEventHandler, true, asyncConsumerMetrics); GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig( config, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index e510b21f802..e7593eee9e3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.internals.CachedSupplier; import org.apache.kafka.clients.consumer.internals.CommitRequestManager; import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread; +import org.apache.kafka.clients.consumer.internals.ConsumerUtils; import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal; import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker; import org.apache.kafka.clients.consumer.internals.RequestManagers; @@ -66,7 +67,6 @@ public class ApplicationEventProcessor implements EventProcessor updatePositionsFuture = processCheckAndUpdatePositionsEvent(event.deadlineMs()); applicationEventReaper.add(new CompositePollPsuedoEvent<>(updatePositionsFuture, event.deadlineMs())); updatePositionsFuture.whenComplete((__, updatePositionsError) -> { - if (maybeFailCompositePoll(event, updatePositionsError) || maybePauseCompositePoll(event)) + if (maybeFailCompositePoll(event, updatePositionsError)) return; + log.debug("Processing {} logic for {}", ApplicationEvent.Type.POLL, event); + // If needed, create a fetch request if there's no data in the FetchBuffer. requestManagers.fetchRequestManager.createFetchRequests().whenComplete((___, fetchError) -> { - if (maybeFailCompositePoll(event, fetchError) || maybePauseCompositePoll(event)) + if (maybeFailCompositePoll(event, fetchError)) return; - event.complete(); - log.trace("Completed CompositePollEvent {}", event); + event.complete(CompositePollEvent.State.COMPLETE, Optional.empty()); + log.debug("Completed CompositePollEvent {}", event); }); }); @@ -286,17 +288,19 @@ public class ApplicationEventProcessor implements EventProcessor> uncompletedEvents() { + // The following code does not use the Java Collections Streams API to reduce overhead in the critical + // path of the ConsumerNetworkThread loop. + List> events = new ArrayList<>(); + + for (CompletableEvent event : tracked) { + if (!event.future().isDone()) + events.add(event); + } + + return events; + } + /** * For all the {@link CompletableEvent}s in the collection, if they're not already complete, invoke * {@link CompletableFuture#completeExceptionally(Throwable)}. diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompositePollEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompositePollEvent.java index 7106f03f04c..4796a78bb79 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompositePollEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompositePollEvent.java @@ -16,20 +16,72 @@ */ package org.apache.kafka.clients.consumer.internals.events; -import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.kafka.common.KafkaException; + +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; public class CompositePollEvent extends ApplicationEvent { + public enum State { + + OFFSET_COMMIT_CALLBACKS_REQUIRED, + BACKGROUND_EVENT_PROCESSING_REQUIRED, + INCOMPLETE, + COMPLETE, + UNKNOWN + } + + public static class Result { + + private static final Result INCOMPLETE = new Result(State.INCOMPLETE, Optional.empty()); + private final State state; + + private final Optional nextEventType; + + public Result(State state, Optional nextEventType) { + this.state = state; + this.nextEventType = nextEventType; + } + + public State state() { + return state; + } + + public Optional nextEventType() { + return nextEventType; + } + + @Override + public String toString() { + return "Result{" + "state=" + state + ", nextEventType=" + nextEventType + '}'; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + Result result = (Result) o; + return state == result.state && Objects.equals(nextEventType, result.nextEventType); + } + + @Override + public int hashCode() { + return Objects.hash(state, nextEventType); + } + } + private final long deadlineMs; private final long pollTimeMs; private final Type nextEventType; - private final AtomicBoolean complete = new AtomicBoolean(); + private final AtomicReference resultOrError; public CompositePollEvent(long deadlineMs, long pollTimeMs, Type nextEventType) { super(Type.COMPOSITE_POLL); this.deadlineMs = deadlineMs; this.pollTimeMs = pollTimeMs; this.nextEventType = nextEventType; + this.resultOrError = new AtomicReference<>(Result.INCOMPLETE); } public long deadlineMs() { @@ -44,16 +96,30 @@ public class CompositePollEvent extends ApplicationEvent { return nextEventType; } - public boolean isComplete() { - return complete.get(); + public Result resultOrError() { + Object o = resultOrError.get(); + + if (o instanceof KafkaException) + throw (KafkaException) o; + else + return (Result) o; } - public void complete() { - complete.set(true); + public void complete(State state, Optional nextEventType) { + Result result = new Result( + Objects.requireNonNull(state), + Objects.requireNonNull(nextEventType) + ); + + resultOrError.compareAndSet(Result.INCOMPLETE, result); + } + + public void completeExceptionally(KafkaException e) { + resultOrError.compareAndSet(Result.INCOMPLETE, Objects.requireNonNull(e)); } @Override protected String toStringBase() { - return super.toStringBase() + ", deadlineMs=" + deadlineMs + ", pollTimeMs=" + pollTimeMs + ", nextEventType=" + nextEventType + ", complete=" + complete; + return super.toStringBase() + ", deadlineMs=" + deadlineMs + ", pollTimeMs=" + pollTimeMs + ", nextEventType=" + nextEventType + ", resultOrError=" + resultOrError; } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 78ff15cee5f..5bdd3296619 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1046,7 +1046,7 @@ public class KafkaConsumerTest { }, fetchResponse(tp0, 50L, 5)); @SuppressWarnings("unchecked") - ConsumerRecords records = (ConsumerRecords) consumer.poll(Duration.ofMillis(1)); + ConsumerRecords records = (ConsumerRecords) consumer.poll(Duration.ofMillis(1000)); assertEquals(5, records.count()); assertEquals(Set.of(tp0), records.partitions()); assertEquals(1, records.nextOffsets().size()); @@ -1826,7 +1826,7 @@ public class KafkaConsumerTest { client.prepareResponse(fetchResponse(tp0, 10L, 1)); @SuppressWarnings("unchecked") - ConsumerRecords records = (ConsumerRecords) consumer.poll(Duration.ofMillis(1)); + ConsumerRecords records = (ConsumerRecords) consumer.poll(Duration.ofMillis(1000)); assertEquals(1, records.count()); assertEquals(11L, consumer.position(tp0)); assertEquals(1, records.nextOffsets().size()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java index 6c05bb3c12f..f806ab65b6b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java @@ -4127,7 +4127,7 @@ public class FetchRequestManagerTest { properties.setProperty(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, String.valueOf(retryBackoffMs)); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); ConsumerConfig config = new ConsumerConfig(properties); - networkClientDelegate = spy(new TestableNetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler)); + networkClientDelegate = spy(new TestableNetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, true)); } private List collectRecordOffsets(List> records) { @@ -4212,8 +4212,9 @@ public class FetchRequestManagerTest { LogContext logContext, KafkaClient client, Metadata metadata, - BackgroundEventHandler backgroundEventHandler) { - super(time, config, logContext, client, metadata, backgroundEventHandler, mock(AsyncConsumerMetrics.class)); + BackgroundEventHandler backgroundEventHandler, + boolean notifyMetadataErrorsViaErrorQueue) { + super(time, config, logContext, client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue, mock(AsyncConsumerMetrics.class)); } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java index 2abe2584e22..da68a2626a7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java @@ -45,7 +45,6 @@ import org.junit.jupiter.params.provider.MethodSource; import java.util.ArrayList; import java.util.Collections; -import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.Properties; @@ -89,7 +88,7 @@ public class NetworkClientDelegateTest { @Test void testPollResultTimer() throws Exception { - try (NetworkClientDelegate ncd = newNetworkClientDelegate()) { + try (NetworkClientDelegate ncd = newNetworkClientDelegate(false)) { NetworkClientDelegate.UnsentRequest req = new NetworkClientDelegate.UnsentRequest( new FindCoordinatorRequest.Builder( new FindCoordinatorRequestData() @@ -113,7 +112,7 @@ public class NetworkClientDelegateTest { @Test public void testSuccessfulResponse() throws Exception { - try (NetworkClientDelegate ncd = newNetworkClientDelegate()) { + try (NetworkClientDelegate ncd = newNetworkClientDelegate(false)) { NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest(); prepareFindCoordinatorResponse(Errors.NONE); @@ -127,7 +126,7 @@ public class NetworkClientDelegateTest { @Test public void testTimeoutBeforeSend() throws Exception { - try (NetworkClientDelegate ncd = newNetworkClientDelegate()) { + try (NetworkClientDelegate ncd = newNetworkClientDelegate(false)) { client.setUnreachable(mockNode(), REQUEST_TIMEOUT_MS); NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest(); ncd.add(unsentRequest); @@ -141,7 +140,7 @@ public class NetworkClientDelegateTest { @Test public void testTimeoutAfterSend() throws Exception { - try (NetworkClientDelegate ncd = newNetworkClientDelegate()) { + try (NetworkClientDelegate ncd = newNetworkClientDelegate(false)) { NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest(); ncd.add(unsentRequest); ncd.poll(0, time.milliseconds()); @@ -175,7 +174,7 @@ public class NetworkClientDelegateTest { @Test public void testEnsureTimerSetOnAdd() { - NetworkClientDelegate ncd = newNetworkClientDelegate(); + NetworkClientDelegate ncd = newNetworkClientDelegate(false); NetworkClientDelegate.UnsentRequest findCoordRequest = newUnsentFindCoordinatorRequest(); assertNull(findCoordRequest.timer()); @@ -192,7 +191,7 @@ public class NetworkClientDelegateTest { @Test public void testHasAnyPendingRequests() throws Exception { - try (NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate()) { + try (NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(false)) { NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest(); networkClientDelegate.add(unsentRequest); @@ -223,18 +222,14 @@ public class NetworkClientDelegateTest { AuthenticationException authException = new AuthenticationException("Test Auth Exception"); doThrow(authException).when(metadata).maybeThrowAnyException(); - NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(); - List backgroundEvents = backgroundEventHandler.drainEvents(); - assertTrue(backgroundEvents.isEmpty()); + NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(false); + assertTrue(networkClientDelegate.getAndClearMetadataError().isEmpty()); networkClientDelegate.poll(0, time.milliseconds()); - backgroundEvents = backgroundEventHandler.drainEvents(); - assertEquals(1, backgroundEvents.size()); - BackgroundEvent event = backgroundEvents.get(0); - assertInstanceOf(ErrorEvent.class, event); - ErrorEvent errorEvent = (ErrorEvent) event; - assertInstanceOf(AuthenticationException.class, errorEvent.error()); - assertEquals(authException.getMessage(), errorEvent.error().getMessage()); + Optional metadataError = networkClientDelegate.getAndClearMetadataError(); + assertTrue(metadataError.isPresent()); + assertInstanceOf(AuthenticationException.class, metadataError.get()); + assertEquals(authException.getMessage(), metadataError.get().getMessage()); } @Test @@ -244,7 +239,7 @@ public class NetworkClientDelegateTest { BlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>(); this.backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue, time, mock(AsyncConsumerMetrics.class)); - NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(); + NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(true); assertEquals(0, backgroundEventQueue.size()); networkClientDelegate.poll(0, time.milliseconds()); @@ -261,7 +256,7 @@ public class NetworkClientDelegateTest { public void testRecordUnsentRequestsQueueTime(String groupName) throws Exception { try (Metrics metrics = new Metrics(); AsyncConsumerMetrics asyncConsumerMetrics = new AsyncConsumerMetrics(metrics, groupName); - NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(asyncConsumerMetrics)) { + NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(false, asyncConsumerMetrics)) { NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest(); networkClientDelegate.add(unsentRequest); asyncConsumerMetrics.recordUnsentRequestsQueueSize(1, time.milliseconds()); @@ -290,11 +285,11 @@ public class NetworkClientDelegateTest { } } - public NetworkClientDelegate newNetworkClientDelegate() { - return newNetworkClientDelegate(asyncConsumerMetrics); + public NetworkClientDelegate newNetworkClientDelegate(boolean notifyMetadataErrorsViaErrorQueue) { + return newNetworkClientDelegate(notifyMetadataErrorsViaErrorQueue, asyncConsumerMetrics); } - public NetworkClientDelegate newNetworkClientDelegate(AsyncConsumerMetrics asyncConsumerMetrics) { + public NetworkClientDelegate newNetworkClientDelegate(boolean notifyMetadataErrorsViaErrorQueue, AsyncConsumerMetrics asyncConsumerMetrics) { LogContext logContext = new LogContext(); Properties properties = new Properties(); properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); @@ -308,6 +303,7 @@ public class NetworkClientDelegateTest { this.client, this.metadata, this.backgroundEventHandler, + notifyMetadataErrorsViaErrorQueue, asyncConsumerMetrics ); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java index f2b3d7210ec..a4268b7eca0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java @@ -2687,7 +2687,7 @@ public class ShareConsumeRequestManagerTest { ConsumerConfig config = new ConsumerConfig(properties); networkClientDelegate = spy(new TestableNetworkClientDelegate( time, config, logContext, client, metadata, - new BackgroundEventHandler(new LinkedBlockingQueue<>(), time, mock(AsyncConsumerMetrics.class)))); + new BackgroundEventHandler(new LinkedBlockingQueue<>(), time, mock(AsyncConsumerMetrics.class)), false)); } private class TestableShareConsumeRequestManager extends ShareConsumeRequestManager { @@ -2751,8 +2751,9 @@ public class ShareConsumeRequestManagerTest { LogContext logContext, KafkaClient client, Metadata metadata, - BackgroundEventHandler backgroundEventHandler) { - super(time, config, logContext, client, metadata, backgroundEventHandler, mock(AsyncConsumerMetrics.class)); + BackgroundEventHandler backgroundEventHandler, + boolean notifyMetadataErrorsViaErrorQueue) { + super(time, config, logContext, client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue, mock(AsyncConsumerMetrics.class)); } @Override