diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 4ac326f8bb5..5c43d26b28a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -109,6 +109,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.event.Level; @@ -176,22 +177,26 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { private class CompositePollEventInvoker { - private final Timer timer; - private final long pollTimeMs; private CompositePollEvent latest; private int backoff = -1; - public CompositePollEventInvoker(Timer timer, long pollTimeMs) { - this.timer = timer; - this.pollTimeMs = pollTimeMs; - } + private void poll(Timer timer) { + if (latest == null) { + submitEvent(ApplicationEvent.Type.POLL, timer); + } - private void poll() { - if (latest == null || latest.isComplete()) { - long deadlineMs = calculateDeadlineMs(timer); - latest = new CompositePollEvent(deadlineMs, pollTimeMs, ApplicationEvent.Type.POLL); - applicationEventHandler.add(latest); - } else { + log.debug("Attempting to retrieve result from previously submitted {} with {} remaining on timer", latest, timer.remainingMs()); + + CompositePollEvent.Result result = latest.resultOrError(); + CompositePollEvent.State state = result.state(); + + if (state == CompositePollEvent.State.COMPLETE) { + if (fetchBuffer.isEmpty()) + submitEvent(ApplicationEvent.Type.POLL, timer); + } else if (state == CompositePollEvent.State.UNKNOWN) { + latest = null; + throw new KafkaException("Unexpected poll result received"); + } else if (state == CompositePollEvent.State.INCOMPLETE) { if (backoff == -1) backoff = 1; else @@ -199,10 +204,25 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { long sleep = Math.min(Math.min(backoff, retryBackoffMs), timer.remainingMs()); timer.sleep(sleep); + } else if (state == CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED) { + processBackgroundEvents(); + result.nextEventType().ifPresent(t -> submitEvent(t, timer)); + } else if (state == CompositePollEvent.State.OFFSET_COMMIT_CALLBACKS_REQUIRED) { + offsetCommitCallbackInvoker.executeCallbacks(); + result.nextEventType().ifPresent(t -> submitEvent(t, timer)); } } + + private void submitEvent(ApplicationEvent.Type type, Timer timer) { + long deadlineMs = calculateDeadlineMs(timer); + latest = new CompositePollEvent(deadlineMs, time.milliseconds(), type); + applicationEventHandler.add(latest); + log.debug("Submitted new {} submitted with {} remaining on timer", latest, timer.remainingMs()); + } } + private final CompositePollEventInvoker pollInvoker = new CompositePollEventInvoker(); + /** * An {@link org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is created and executes in the * application thread for the purpose of processing {@link BackgroundEvent background events} generated by the @@ -466,6 +486,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { fetchMetricsManager.throttleTimeSensor(), clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null), backgroundEventHandler, + false, asyncConsumerMetrics ); this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors); @@ -661,6 +682,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { client, metadata, backgroundEventHandler, + false, asyncConsumerMetrics ); this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors); @@ -866,8 +888,6 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); } - CompositePollEventInvoker pollEventInvoker = new CompositePollEventInvoker(timer, time.milliseconds()); - do { // We must not allow wake-ups between polling for fetches and returning the records. // If the polled fetches are not empty the consumed position has already been updated in the polling @@ -876,7 +896,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { wakeupTrigger.maybeTriggerWakeup(); processBackgroundEvents(); offsetCommitCallbackInvoker.executeCallbacks(); - pollEventInvoker.poll(); + pollInvoker.poll(timer); final Fetch fetch = pollForFetches(timer); if (!fetch.isEmpty()) { // before returning the fetched records, we can send off the next round of fetches @@ -1143,8 +1163,6 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { try { Map> topicMetadata = applicationEventHandler.addAndGet(topicMetadataEvent); - - processBackgroundEvents(); return topicMetadata.getOrDefault(topic, Collections.emptyList()); } finally { wakeupTrigger.clearTask(); @@ -1170,9 +1188,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { final AllTopicsMetadataEvent topicMetadataEvent = new AllTopicsMetadataEvent(calculateDeadlineMs(time, timeout)); wakeupTrigger.setActiveTask(topicMetadataEvent.future()); try { - Map> map = applicationEventHandler.addAndGet(topicMetadataEvent); - processBackgroundEvents(); - return map; + return applicationEventHandler.addAndGet(topicMetadataEvent); } finally { wakeupTrigger.clearTask(); } @@ -1254,7 +1270,6 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { try { Map offsets = applicationEventHandler.addAndGet(listOffsetsEvent); - processBackgroundEvents(); Map results = new HashMap<>(offsets.size()); offsets.forEach((k, v) -> results.put(k, v != null ? v.buildOffsetAndTimestamp() : null)); return results; @@ -1320,7 +1335,6 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { Map offsetAndTimestampMap; try { offsetAndTimestampMap = applicationEventHandler.addAndGet(listOffsetsEvent); - processBackgroundEvents(); return offsetAndTimestampMap.entrySet() .stream() .collect(Collectors.toMap( diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index d3ac47903b2..d5b2dc02b74 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; +import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEvent; import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper; import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; @@ -34,6 +35,7 @@ import org.slf4j.Logger; import java.io.Closeable; import java.time.Duration; +import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; import java.util.List; @@ -171,7 +173,8 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { cachedMaximumTimeToWait = maxTimeToWaitMs; reapExpiredApplicationEvents(currentTimeMs); - } + List> uncompletedEvents = applicationEventReaper.uncompletedEvents(); + maybeFailOnMetadataError(uncompletedEvents); } /** * Process the events—if any—that were produced by the application thread. @@ -356,4 +359,22 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { log.debug("Closed the consumer network thread"); } } + + /** + * If there is a metadata error, complete all uncompleted events that require subscription metadata. + */ + private void maybeFailOnMetadataError(List> events) { + List> subscriptionMetadataEvent = new ArrayList<>(); + + for (CompletableEvent ce : events) { + if (ce instanceof CompletableApplicationEvent && ((CompletableApplicationEvent) ce).requireSubscriptionMetadata()) + subscriptionMetadataEvent.add((CompletableApplicationEvent) ce); + } + + if (subscriptionMetadataEvent.isEmpty()) + return; + networkClientDelegate.getAndClearMetadataError().ifPresent(metadataError -> + subscriptionMetadataEvent.forEach(event -> event.future().completeExceptionally(metadataError)) + ); + } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java index 1cb25bb46e0..0b827f9e1c0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java @@ -70,6 +70,8 @@ public class NetworkClientDelegate implements AutoCloseable { private final int requestTimeoutMs; private final Queue unsentRequests; private final long retryBackoffMs; + private Optional metadataError; + private final boolean notifyMetadataErrorsViaErrorQueue; private final AsyncConsumerMetrics asyncConsumerMetrics; public NetworkClientDelegate( @@ -79,6 +81,7 @@ public class NetworkClientDelegate implements AutoCloseable { final KafkaClient client, final Metadata metadata, final BackgroundEventHandler backgroundEventHandler, + final boolean notifyMetadataErrorsViaErrorQueue, final AsyncConsumerMetrics asyncConsumerMetrics) { this.time = time; this.client = client; @@ -88,6 +91,8 @@ public class NetworkClientDelegate implements AutoCloseable { this.unsentRequests = new ArrayDeque<>(); this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); + this.metadataError = Optional.empty(); + this.notifyMetadataErrorsViaErrorQueue = notifyMetadataErrorsViaErrorQueue; this.asyncConsumerMetrics = asyncConsumerMetrics; } @@ -155,7 +160,11 @@ public class NetworkClientDelegate implements AutoCloseable { try { metadata.maybeThrowAnyException(); } catch (Exception e) { - backgroundEventHandler.add(new ErrorEvent(e)); + if (notifyMetadataErrorsViaErrorQueue) { + backgroundEventHandler.add(new ErrorEvent(e)); + } else { + metadataError = Optional.of(e); + } } } @@ -238,7 +247,13 @@ public class NetworkClientDelegate implements AutoCloseable { unsent.handler ); } - + + public Optional getAndClearMetadataError() { + Optional metadataError = this.metadataError; + this.metadataError = Optional.empty(); + return metadataError; + } + public Node leastLoadedNode() { return this.client.leastLoadedNode(time.milliseconds()).node(); } @@ -437,6 +452,7 @@ public class NetworkClientDelegate implements AutoCloseable { final Sensor throttleTimeSensor, final ClientTelemetrySender clientTelemetrySender, final BackgroundEventHandler backgroundEventHandler, + final boolean notifyMetadataErrorsViaErrorQueue, final AsyncConsumerMetrics asyncConsumerMetrics) { return new CachedSupplier<>() { @Override @@ -451,7 +467,7 @@ public class NetworkClientDelegate implements AutoCloseable { metadata, throttleTimeSensor, clientTelemetrySender); - return new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, asyncConsumerMetrics); + return new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue, asyncConsumerMetrics); } }; } @@ -466,6 +482,7 @@ public class NetworkClientDelegate implements AutoCloseable { final KafkaClient client, final Metadata metadata, final BackgroundEventHandler backgroundEventHandler, + final boolean notifyMetadataErrorsViaErrorQueue, final AsyncConsumerMetrics asyncConsumerMetrics) { return new CachedSupplier<>() { @Override @@ -477,6 +494,7 @@ public class NetworkClientDelegate implements AutoCloseable { client, metadata, backgroundEventHandler, + notifyMetadataErrorsViaErrorQueue, asyncConsumerMetrics ); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java index 0e411776374..112fa3c37dd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java @@ -27,7 +27,6 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetData; import org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetResult; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; -import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.common.ClusterResource; import org.apache.kafka.common.ClusterResourceListener; import org.apache.kafka.common.IsolationLevel; @@ -55,8 +54,8 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -86,7 +85,6 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou private final Logger log; private final OffsetFetcherUtils offsetFetcherUtils; private final SubscriptionState subscriptionState; - private final BackgroundEventHandler backgroundEventHandler; private final Set requestsToRetry; private final List requestsToSend; @@ -97,6 +95,12 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou private final CommitRequestManager commitRequestManager; private final long defaultApiTimeoutMs; + /** + * Exception that occurred while updating positions after the triggering event had already + * expired. It will be propagated and cleared on the next call to update fetch positions. + */ + private final AtomicReference cachedUpdatePositionsException = new AtomicReference<>(); + /** * This holds the last OffsetFetch request triggered to retrieve committed offsets to update * fetch positions that hasn't completed yet. When a response is received, it's used to @@ -133,7 +137,6 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou this.requestsToRetry = new HashSet<>(); this.requestsToSend = new ArrayList<>(); this.subscriptionState = subscriptionState; - this.backgroundEventHandler = backgroundEventHandler; this.time = time; this.requestTimeoutMs = requestTimeoutMs; this.defaultApiTimeoutMs = defaultApiTimeoutMs; @@ -235,6 +238,10 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou CompletableFuture result = new CompletableFuture<>(); try { + if (maybeCompleteWithPreviousException(result)) { + return result; + } + validatePositionsIfNeeded(); if (subscriptionState.hasAllFetchPositions()) { @@ -258,6 +265,15 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou return result; } + private boolean maybeCompleteWithPreviousException(CompletableFuture result) { + Throwable cachedException = cachedUpdatePositionsException.getAndSet(null); + if (cachedException != null) { + result.completeExceptionally(cachedException); + return true; + } + return false; + } + /** * Generate requests to fetch offsets and update positions once a response is received. This will first attempt * to use the committed offsets if available. If no committed offsets available, it will use the partition @@ -305,10 +321,7 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou result.whenComplete((__, error) -> { boolean updatePositionsExpired = time.milliseconds() >= deadlineMs; if (error != null && updatePositionsExpired) { - if (error instanceof CompletionException) - error = error.getCause(); - - backgroundEventHandler.add(new ErrorEvent(error)); + cachedUpdatePositionsException.set(error); } }); } @@ -329,12 +342,8 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou // Mark partitions that need reset, using the configured reset strategy. If no // strategy is defined, this will raise a NoOffsetForPartitionException exception. subscriptionState.resetInitializingPositions(initializingPartitions::contains); - } catch (Throwable t) { - if (t instanceof CompletionException) - t = t.getCause(); - - backgroundEventHandler.add(new ErrorEvent(t)); - result.completeExceptionally(t); + } catch (Exception e) { + result.completeExceptionally(e); return result; } @@ -470,13 +479,9 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou try { partitionAutoOffsetResetStrategyMap = offsetFetcherUtils.getOffsetResetStrategyForPartitions(); - } catch (Throwable t) { - if (t instanceof CompletionException) - t = t.getCause(); - - backgroundEventHandler.add(new ErrorEvent(t)); + } catch (Exception e) { CompletableFuture result = new CompletableFuture<>(); - result.completeExceptionally(t); + result.completeExceptionally(e); return result; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index dd6d35427af..c9db04ceb9e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -279,6 +279,7 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { shareFetchMetricsManager.throttleTimeSensor(), clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null), backgroundEventHandler, + true, asyncConsumerMetrics ); this.completedAcknowledgements = new LinkedList<>(); @@ -387,7 +388,7 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { backgroundEventQueue, time, asyncConsumerMetrics); final Supplier networkClientDelegateSupplier = - NetworkClientDelegate.supplier(time, config, logContext, client, metadata, backgroundEventHandler, asyncConsumerMetrics); + NetworkClientDelegate.supplier(time, config, logContext, client, metadata, backgroundEventHandler, true, asyncConsumerMetrics); GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig( config, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index e510b21f802..e7593eee9e3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.internals.CachedSupplier; import org.apache.kafka.clients.consumer.internals.CommitRequestManager; import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread; +import org.apache.kafka.clients.consumer.internals.ConsumerUtils; import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal; import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker; import org.apache.kafka.clients.consumer.internals.RequestManagers; @@ -66,7 +67,6 @@ public class ApplicationEventProcessor implements EventProcessor updatePositionsFuture = processCheckAndUpdatePositionsEvent(event.deadlineMs()); applicationEventReaper.add(new CompositePollPsuedoEvent<>(updatePositionsFuture, event.deadlineMs())); updatePositionsFuture.whenComplete((__, updatePositionsError) -> { - if (maybeFailCompositePoll(event, updatePositionsError) || maybePauseCompositePoll(event)) + if (maybeFailCompositePoll(event, updatePositionsError)) return; + log.debug("Processing {} logic for {}", ApplicationEvent.Type.POLL, event); + // If needed, create a fetch request if there's no data in the FetchBuffer. requestManagers.fetchRequestManager.createFetchRequests().whenComplete((___, fetchError) -> { - if (maybeFailCompositePoll(event, fetchError) || maybePauseCompositePoll(event)) + if (maybeFailCompositePoll(event, fetchError)) return; - event.complete(); - log.trace("Completed CompositePollEvent {}", event); + event.complete(CompositePollEvent.State.COMPLETE, Optional.empty()); + log.debug("Completed CompositePollEvent {}", event); }); }); @@ -286,17 +288,19 @@ public class ApplicationEventProcessor implements EventProcessor> uncompletedEvents() { + // The following code does not use the Java Collections Streams API to reduce overhead in the critical + // path of the ConsumerNetworkThread loop. + List> events = new ArrayList<>(); + + for (CompletableEvent event : tracked) { + if (!event.future().isDone()) + events.add(event); + } + + return events; + } + /** * For all the {@link CompletableEvent}s in the collection, if they're not already complete, invoke * {@link CompletableFuture#completeExceptionally(Throwable)}. diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompositePollEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompositePollEvent.java index 7106f03f04c..4796a78bb79 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompositePollEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompositePollEvent.java @@ -16,20 +16,72 @@ */ package org.apache.kafka.clients.consumer.internals.events; -import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.kafka.common.KafkaException; + +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; public class CompositePollEvent extends ApplicationEvent { + public enum State { + + OFFSET_COMMIT_CALLBACKS_REQUIRED, + BACKGROUND_EVENT_PROCESSING_REQUIRED, + INCOMPLETE, + COMPLETE, + UNKNOWN + } + + public static class Result { + + private static final Result INCOMPLETE = new Result(State.INCOMPLETE, Optional.empty()); + private final State state; + + private final Optional nextEventType; + + public Result(State state, Optional nextEventType) { + this.state = state; + this.nextEventType = nextEventType; + } + + public State state() { + return state; + } + + public Optional nextEventType() { + return nextEventType; + } + + @Override + public String toString() { + return "Result{" + "state=" + state + ", nextEventType=" + nextEventType + '}'; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + Result result = (Result) o; + return state == result.state && Objects.equals(nextEventType, result.nextEventType); + } + + @Override + public int hashCode() { + return Objects.hash(state, nextEventType); + } + } + private final long deadlineMs; private final long pollTimeMs; private final Type nextEventType; - private final AtomicBoolean complete = new AtomicBoolean(); + private final AtomicReference resultOrError; public CompositePollEvent(long deadlineMs, long pollTimeMs, Type nextEventType) { super(Type.COMPOSITE_POLL); this.deadlineMs = deadlineMs; this.pollTimeMs = pollTimeMs; this.nextEventType = nextEventType; + this.resultOrError = new AtomicReference<>(Result.INCOMPLETE); } public long deadlineMs() { @@ -44,16 +96,30 @@ public class CompositePollEvent extends ApplicationEvent { return nextEventType; } - public boolean isComplete() { - return complete.get(); + public Result resultOrError() { + Object o = resultOrError.get(); + + if (o instanceof KafkaException) + throw (KafkaException) o; + else + return (Result) o; } - public void complete() { - complete.set(true); + public void complete(State state, Optional nextEventType) { + Result result = new Result( + Objects.requireNonNull(state), + Objects.requireNonNull(nextEventType) + ); + + resultOrError.compareAndSet(Result.INCOMPLETE, result); + } + + public void completeExceptionally(KafkaException e) { + resultOrError.compareAndSet(Result.INCOMPLETE, Objects.requireNonNull(e)); } @Override protected String toStringBase() { - return super.toStringBase() + ", deadlineMs=" + deadlineMs + ", pollTimeMs=" + pollTimeMs + ", nextEventType=" + nextEventType + ", complete=" + complete; + return super.toStringBase() + ", deadlineMs=" + deadlineMs + ", pollTimeMs=" + pollTimeMs + ", nextEventType=" + nextEventType + ", resultOrError=" + resultOrError; } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 78ff15cee5f..5bdd3296619 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1046,7 +1046,7 @@ public class KafkaConsumerTest { }, fetchResponse(tp0, 50L, 5)); @SuppressWarnings("unchecked") - ConsumerRecords records = (ConsumerRecords) consumer.poll(Duration.ofMillis(1)); + ConsumerRecords records = (ConsumerRecords) consumer.poll(Duration.ofMillis(1000)); assertEquals(5, records.count()); assertEquals(Set.of(tp0), records.partitions()); assertEquals(1, records.nextOffsets().size()); @@ -1826,7 +1826,7 @@ public class KafkaConsumerTest { client.prepareResponse(fetchResponse(tp0, 10L, 1)); @SuppressWarnings("unchecked") - ConsumerRecords records = (ConsumerRecords) consumer.poll(Duration.ofMillis(1)); + ConsumerRecords records = (ConsumerRecords) consumer.poll(Duration.ofMillis(1000)); assertEquals(1, records.count()); assertEquals(11L, consumer.position(tp0)); assertEquals(1, records.nextOffsets().size()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java index 6c05bb3c12f..f806ab65b6b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java @@ -4127,7 +4127,7 @@ public class FetchRequestManagerTest { properties.setProperty(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, String.valueOf(retryBackoffMs)); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); ConsumerConfig config = new ConsumerConfig(properties); - networkClientDelegate = spy(new TestableNetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler)); + networkClientDelegate = spy(new TestableNetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, true)); } private List collectRecordOffsets(List> records) { @@ -4212,8 +4212,9 @@ public class FetchRequestManagerTest { LogContext logContext, KafkaClient client, Metadata metadata, - BackgroundEventHandler backgroundEventHandler) { - super(time, config, logContext, client, metadata, backgroundEventHandler, mock(AsyncConsumerMetrics.class)); + BackgroundEventHandler backgroundEventHandler, + boolean notifyMetadataErrorsViaErrorQueue) { + super(time, config, logContext, client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue, mock(AsyncConsumerMetrics.class)); } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java index 2abe2584e22..da68a2626a7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java @@ -45,7 +45,6 @@ import org.junit.jupiter.params.provider.MethodSource; import java.util.ArrayList; import java.util.Collections; -import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.Properties; @@ -89,7 +88,7 @@ public class NetworkClientDelegateTest { @Test void testPollResultTimer() throws Exception { - try (NetworkClientDelegate ncd = newNetworkClientDelegate()) { + try (NetworkClientDelegate ncd = newNetworkClientDelegate(false)) { NetworkClientDelegate.UnsentRequest req = new NetworkClientDelegate.UnsentRequest( new FindCoordinatorRequest.Builder( new FindCoordinatorRequestData() @@ -113,7 +112,7 @@ public class NetworkClientDelegateTest { @Test public void testSuccessfulResponse() throws Exception { - try (NetworkClientDelegate ncd = newNetworkClientDelegate()) { + try (NetworkClientDelegate ncd = newNetworkClientDelegate(false)) { NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest(); prepareFindCoordinatorResponse(Errors.NONE); @@ -127,7 +126,7 @@ public class NetworkClientDelegateTest { @Test public void testTimeoutBeforeSend() throws Exception { - try (NetworkClientDelegate ncd = newNetworkClientDelegate()) { + try (NetworkClientDelegate ncd = newNetworkClientDelegate(false)) { client.setUnreachable(mockNode(), REQUEST_TIMEOUT_MS); NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest(); ncd.add(unsentRequest); @@ -141,7 +140,7 @@ public class NetworkClientDelegateTest { @Test public void testTimeoutAfterSend() throws Exception { - try (NetworkClientDelegate ncd = newNetworkClientDelegate()) { + try (NetworkClientDelegate ncd = newNetworkClientDelegate(false)) { NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest(); ncd.add(unsentRequest); ncd.poll(0, time.milliseconds()); @@ -175,7 +174,7 @@ public class NetworkClientDelegateTest { @Test public void testEnsureTimerSetOnAdd() { - NetworkClientDelegate ncd = newNetworkClientDelegate(); + NetworkClientDelegate ncd = newNetworkClientDelegate(false); NetworkClientDelegate.UnsentRequest findCoordRequest = newUnsentFindCoordinatorRequest(); assertNull(findCoordRequest.timer()); @@ -192,7 +191,7 @@ public class NetworkClientDelegateTest { @Test public void testHasAnyPendingRequests() throws Exception { - try (NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate()) { + try (NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(false)) { NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest(); networkClientDelegate.add(unsentRequest); @@ -223,18 +222,14 @@ public class NetworkClientDelegateTest { AuthenticationException authException = new AuthenticationException("Test Auth Exception"); doThrow(authException).when(metadata).maybeThrowAnyException(); - NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(); - List backgroundEvents = backgroundEventHandler.drainEvents(); - assertTrue(backgroundEvents.isEmpty()); + NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(false); + assertTrue(networkClientDelegate.getAndClearMetadataError().isEmpty()); networkClientDelegate.poll(0, time.milliseconds()); - backgroundEvents = backgroundEventHandler.drainEvents(); - assertEquals(1, backgroundEvents.size()); - BackgroundEvent event = backgroundEvents.get(0); - assertInstanceOf(ErrorEvent.class, event); - ErrorEvent errorEvent = (ErrorEvent) event; - assertInstanceOf(AuthenticationException.class, errorEvent.error()); - assertEquals(authException.getMessage(), errorEvent.error().getMessage()); + Optional metadataError = networkClientDelegate.getAndClearMetadataError(); + assertTrue(metadataError.isPresent()); + assertInstanceOf(AuthenticationException.class, metadataError.get()); + assertEquals(authException.getMessage(), metadataError.get().getMessage()); } @Test @@ -244,7 +239,7 @@ public class NetworkClientDelegateTest { BlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>(); this.backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue, time, mock(AsyncConsumerMetrics.class)); - NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(); + NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(true); assertEquals(0, backgroundEventQueue.size()); networkClientDelegate.poll(0, time.milliseconds()); @@ -261,7 +256,7 @@ public class NetworkClientDelegateTest { public void testRecordUnsentRequestsQueueTime(String groupName) throws Exception { try (Metrics metrics = new Metrics(); AsyncConsumerMetrics asyncConsumerMetrics = new AsyncConsumerMetrics(metrics, groupName); - NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(asyncConsumerMetrics)) { + NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(false, asyncConsumerMetrics)) { NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest(); networkClientDelegate.add(unsentRequest); asyncConsumerMetrics.recordUnsentRequestsQueueSize(1, time.milliseconds()); @@ -290,11 +285,11 @@ public class NetworkClientDelegateTest { } } - public NetworkClientDelegate newNetworkClientDelegate() { - return newNetworkClientDelegate(asyncConsumerMetrics); + public NetworkClientDelegate newNetworkClientDelegate(boolean notifyMetadataErrorsViaErrorQueue) { + return newNetworkClientDelegate(notifyMetadataErrorsViaErrorQueue, asyncConsumerMetrics); } - public NetworkClientDelegate newNetworkClientDelegate(AsyncConsumerMetrics asyncConsumerMetrics) { + public NetworkClientDelegate newNetworkClientDelegate(boolean notifyMetadataErrorsViaErrorQueue, AsyncConsumerMetrics asyncConsumerMetrics) { LogContext logContext = new LogContext(); Properties properties = new Properties(); properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); @@ -308,6 +303,7 @@ public class NetworkClientDelegateTest { this.client, this.metadata, this.backgroundEventHandler, + notifyMetadataErrorsViaErrorQueue, asyncConsumerMetrics ); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java index f2b3d7210ec..a4268b7eca0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java @@ -2687,7 +2687,7 @@ public class ShareConsumeRequestManagerTest { ConsumerConfig config = new ConsumerConfig(properties); networkClientDelegate = spy(new TestableNetworkClientDelegate( time, config, logContext, client, metadata, - new BackgroundEventHandler(new LinkedBlockingQueue<>(), time, mock(AsyncConsumerMetrics.class)))); + new BackgroundEventHandler(new LinkedBlockingQueue<>(), time, mock(AsyncConsumerMetrics.class)), false)); } private class TestableShareConsumeRequestManager extends ShareConsumeRequestManager { @@ -2751,8 +2751,9 @@ public class ShareConsumeRequestManagerTest { LogContext logContext, KafkaClient client, Metadata metadata, - BackgroundEventHandler backgroundEventHandler) { - super(time, config, logContext, client, metadata, backgroundEventHandler, mock(AsyncConsumerMetrics.class)); + BackgroundEventHandler backgroundEventHandler, + boolean notifyMetadataErrorsViaErrorQueue) { + super(time, config, logContext, client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue, mock(AsyncConsumerMetrics.class)); } @Override