Refactor poll event handling and metadata error propagation

Refactored AsyncKafkaConsumer and related classes to improve composite poll event handling, including explicit state management and pausing for background event processing or offset commit callbacks. Metadata errors are now optionally propagated via a dedicated error field in NetworkClientDelegate, allowing for more flexible error handling. Updated tests and logging to reflect these changes.
This commit is contained in:
Kirk True 2025-09-20 15:40:48 -07:00
parent ae0ddcc4c0
commit 6775aacc2c
12 changed files with 247 additions and 107 deletions

View File

@ -109,6 +109,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.event.Level;
@ -176,22 +177,26 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
private class CompositePollEventInvoker {
private final Timer timer;
private final long pollTimeMs;
private CompositePollEvent latest;
private int backoff = -1;
public CompositePollEventInvoker(Timer timer, long pollTimeMs) {
this.timer = timer;
this.pollTimeMs = pollTimeMs;
private void poll(Timer timer) {
if (latest == null) {
submitEvent(ApplicationEvent.Type.POLL, timer);
}
private void poll() {
if (latest == null || latest.isComplete()) {
long deadlineMs = calculateDeadlineMs(timer);
latest = new CompositePollEvent(deadlineMs, pollTimeMs, ApplicationEvent.Type.POLL);
applicationEventHandler.add(latest);
} else {
log.debug("Attempting to retrieve result from previously submitted {} with {} remaining on timer", latest, timer.remainingMs());
CompositePollEvent.Result result = latest.resultOrError();
CompositePollEvent.State state = result.state();
if (state == CompositePollEvent.State.COMPLETE) {
if (fetchBuffer.isEmpty())
submitEvent(ApplicationEvent.Type.POLL, timer);
} else if (state == CompositePollEvent.State.UNKNOWN) {
latest = null;
throw new KafkaException("Unexpected poll result received");
} else if (state == CompositePollEvent.State.INCOMPLETE) {
if (backoff == -1)
backoff = 1;
else
@ -199,9 +204,24 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
long sleep = Math.min(Math.min(backoff, retryBackoffMs), timer.remainingMs());
timer.sleep(sleep);
} else if (state == CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED) {
processBackgroundEvents();
result.nextEventType().ifPresent(t -> submitEvent(t, timer));
} else if (state == CompositePollEvent.State.OFFSET_COMMIT_CALLBACKS_REQUIRED) {
offsetCommitCallbackInvoker.executeCallbacks();
result.nextEventType().ifPresent(t -> submitEvent(t, timer));
}
}
private void submitEvent(ApplicationEvent.Type type, Timer timer) {
long deadlineMs = calculateDeadlineMs(timer);
latest = new CompositePollEvent(deadlineMs, time.milliseconds(), type);
applicationEventHandler.add(latest);
log.debug("Submitted new {} submitted with {} remaining on timer", latest, timer.remainingMs());
}
}
private final CompositePollEventInvoker pollInvoker = new CompositePollEventInvoker();
/**
* An {@link org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is created and executes in the
@ -466,6 +486,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
fetchMetricsManager.throttleTimeSensor(),
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null),
backgroundEventHandler,
false,
asyncConsumerMetrics
);
this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors);
@ -661,6 +682,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
client,
metadata,
backgroundEventHandler,
false,
asyncConsumerMetrics
);
this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors);
@ -866,8 +888,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
}
CompositePollEventInvoker pollEventInvoker = new CompositePollEventInvoker(timer, time.milliseconds());
do {
// We must not allow wake-ups between polling for fetches and returning the records.
// If the polled fetches are not empty the consumed position has already been updated in the polling
@ -876,7 +896,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
wakeupTrigger.maybeTriggerWakeup();
processBackgroundEvents();
offsetCommitCallbackInvoker.executeCallbacks();
pollEventInvoker.poll();
pollInvoker.poll(timer);
final Fetch<K, V> fetch = pollForFetches(timer);
if (!fetch.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
@ -1143,8 +1163,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
try {
Map<String, List<PartitionInfo>> topicMetadata =
applicationEventHandler.addAndGet(topicMetadataEvent);
processBackgroundEvents();
return topicMetadata.getOrDefault(topic, Collections.emptyList());
} finally {
wakeupTrigger.clearTask();
@ -1170,9 +1188,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
final AllTopicsMetadataEvent topicMetadataEvent = new AllTopicsMetadataEvent(calculateDeadlineMs(time, timeout));
wakeupTrigger.setActiveTask(topicMetadataEvent.future());
try {
Map<String, List<PartitionInfo>> map = applicationEventHandler.addAndGet(topicMetadataEvent);
processBackgroundEvents();
return map;
return applicationEventHandler.addAndGet(topicMetadataEvent);
} finally {
wakeupTrigger.clearTask();
}
@ -1254,7 +1270,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
try {
Map<TopicPartition, OffsetAndTimestampInternal> offsets = applicationEventHandler.addAndGet(listOffsetsEvent);
processBackgroundEvents();
Map<TopicPartition, OffsetAndTimestamp> results = new HashMap<>(offsets.size());
offsets.forEach((k, v) -> results.put(k, v != null ? v.buildOffsetAndTimestamp() : null));
return results;
@ -1320,7 +1335,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
Map<TopicPartition, OffsetAndTimestampInternal> offsetAndTimestampMap;
try {
offsetAndTimestampMap = applicationEventHandler.addAndGet(listOffsetsEvent);
processBackgroundEvents();
return offsetAndTimestampMap.entrySet()
.stream()
.collect(Collectors.toMap(

View File

@ -20,6 +20,7 @@ import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
@ -34,6 +35,7 @@ import org.slf4j.Logger;
import java.io.Closeable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
@ -171,7 +173,8 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
cachedMaximumTimeToWait = maxTimeToWaitMs;
reapExpiredApplicationEvents(currentTimeMs);
}
List<CompletableEvent<?>> uncompletedEvents = applicationEventReaper.uncompletedEvents();
maybeFailOnMetadataError(uncompletedEvents); }
/**
* Process the eventsif anythat were produced by the application thread.
@ -356,4 +359,22 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
log.debug("Closed the consumer network thread");
}
}
/**
* If there is a metadata error, complete all uncompleted events that require subscription metadata.
*/
private void maybeFailOnMetadataError(List<CompletableEvent<?>> events) {
List<CompletableApplicationEvent<?>> subscriptionMetadataEvent = new ArrayList<>();
for (CompletableEvent<?> ce : events) {
if (ce instanceof CompletableApplicationEvent && ((CompletableApplicationEvent<?>) ce).requireSubscriptionMetadata())
subscriptionMetadataEvent.add((CompletableApplicationEvent<?>) ce);
}
if (subscriptionMetadataEvent.isEmpty())
return;
networkClientDelegate.getAndClearMetadataError().ifPresent(metadataError ->
subscriptionMetadataEvent.forEach(event -> event.future().completeExceptionally(metadataError))
);
}
}

View File

@ -70,6 +70,8 @@ public class NetworkClientDelegate implements AutoCloseable {
private final int requestTimeoutMs;
private final Queue<UnsentRequest> unsentRequests;
private final long retryBackoffMs;
private Optional<Exception> metadataError;
private final boolean notifyMetadataErrorsViaErrorQueue;
private final AsyncConsumerMetrics asyncConsumerMetrics;
public NetworkClientDelegate(
@ -79,6 +81,7 @@ public class NetworkClientDelegate implements AutoCloseable {
final KafkaClient client,
final Metadata metadata,
final BackgroundEventHandler backgroundEventHandler,
final boolean notifyMetadataErrorsViaErrorQueue,
final AsyncConsumerMetrics asyncConsumerMetrics) {
this.time = time;
this.client = client;
@ -88,6 +91,8 @@ public class NetworkClientDelegate implements AutoCloseable {
this.unsentRequests = new ArrayDeque<>();
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.metadataError = Optional.empty();
this.notifyMetadataErrorsViaErrorQueue = notifyMetadataErrorsViaErrorQueue;
this.asyncConsumerMetrics = asyncConsumerMetrics;
}
@ -155,7 +160,11 @@ public class NetworkClientDelegate implements AutoCloseable {
try {
metadata.maybeThrowAnyException();
} catch (Exception e) {
if (notifyMetadataErrorsViaErrorQueue) {
backgroundEventHandler.add(new ErrorEvent(e));
} else {
metadataError = Optional.of(e);
}
}
}
@ -239,6 +248,12 @@ public class NetworkClientDelegate implements AutoCloseable {
);
}
public Optional<Exception> getAndClearMetadataError() {
Optional<Exception> metadataError = this.metadataError;
this.metadataError = Optional.empty();
return metadataError;
}
public Node leastLoadedNode() {
return this.client.leastLoadedNode(time.milliseconds()).node();
}
@ -437,6 +452,7 @@ public class NetworkClientDelegate implements AutoCloseable {
final Sensor throttleTimeSensor,
final ClientTelemetrySender clientTelemetrySender,
final BackgroundEventHandler backgroundEventHandler,
final boolean notifyMetadataErrorsViaErrorQueue,
final AsyncConsumerMetrics asyncConsumerMetrics) {
return new CachedSupplier<>() {
@Override
@ -451,7 +467,7 @@ public class NetworkClientDelegate implements AutoCloseable {
metadata,
throttleTimeSensor,
clientTelemetrySender);
return new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, asyncConsumerMetrics);
return new NetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue, asyncConsumerMetrics);
}
};
}
@ -466,6 +482,7 @@ public class NetworkClientDelegate implements AutoCloseable {
final KafkaClient client,
final Metadata metadata,
final BackgroundEventHandler backgroundEventHandler,
final boolean notifyMetadataErrorsViaErrorQueue,
final AsyncConsumerMetrics asyncConsumerMetrics) {
return new CachedSupplier<>() {
@Override
@ -477,6 +494,7 @@ public class NetworkClientDelegate implements AutoCloseable {
client,
metadata,
backgroundEventHandler,
notifyMetadataErrorsViaErrorQueue,
asyncConsumerMetrics
);
}

View File

@ -27,7 +27,6 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetData;
import org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.ListOffsetResult;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.ClusterResourceListener;
import org.apache.kafka.common.IsolationLevel;
@ -55,8 +54,8 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@ -86,7 +85,6 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou
private final Logger log;
private final OffsetFetcherUtils offsetFetcherUtils;
private final SubscriptionState subscriptionState;
private final BackgroundEventHandler backgroundEventHandler;
private final Set<ListOffsetsRequestState> requestsToRetry;
private final List<NetworkClientDelegate.UnsentRequest> requestsToSend;
@ -97,6 +95,12 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou
private final CommitRequestManager commitRequestManager;
private final long defaultApiTimeoutMs;
/**
* Exception that occurred while updating positions after the triggering event had already
* expired. It will be propagated and cleared on the next call to update fetch positions.
*/
private final AtomicReference<Throwable> cachedUpdatePositionsException = new AtomicReference<>();
/**
* This holds the last OffsetFetch request triggered to retrieve committed offsets to update
* fetch positions that hasn't completed yet. When a response is received, it's used to
@ -133,7 +137,6 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou
this.requestsToRetry = new HashSet<>();
this.requestsToSend = new ArrayList<>();
this.subscriptionState = subscriptionState;
this.backgroundEventHandler = backgroundEventHandler;
this.time = time;
this.requestTimeoutMs = requestTimeoutMs;
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
@ -235,6 +238,10 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou
CompletableFuture<Boolean> result = new CompletableFuture<>();
try {
if (maybeCompleteWithPreviousException(result)) {
return result;
}
validatePositionsIfNeeded();
if (subscriptionState.hasAllFetchPositions()) {
@ -258,6 +265,15 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou
return result;
}
private boolean maybeCompleteWithPreviousException(CompletableFuture<Boolean> result) {
Throwable cachedException = cachedUpdatePositionsException.getAndSet(null);
if (cachedException != null) {
result.completeExceptionally(cachedException);
return true;
}
return false;
}
/**
* Generate requests to fetch offsets and update positions once a response is received. This will first attempt
* to use the committed offsets if available. If no committed offsets available, it will use the partition
@ -305,10 +321,7 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou
result.whenComplete((__, error) -> {
boolean updatePositionsExpired = time.milliseconds() >= deadlineMs;
if (error != null && updatePositionsExpired) {
if (error instanceof CompletionException)
error = error.getCause();
backgroundEventHandler.add(new ErrorEvent(error));
cachedUpdatePositionsException.set(error);
}
});
}
@ -329,12 +342,8 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou
// Mark partitions that need reset, using the configured reset strategy. If no
// strategy is defined, this will raise a NoOffsetForPartitionException exception.
subscriptionState.resetInitializingPositions(initializingPartitions::contains);
} catch (Throwable t) {
if (t instanceof CompletionException)
t = t.getCause();
backgroundEventHandler.add(new ErrorEvent(t));
result.completeExceptionally(t);
} catch (Exception e) {
result.completeExceptionally(e);
return result;
}
@ -470,13 +479,9 @@ public final class OffsetsRequestManager implements RequestManager, ClusterResou
try {
partitionAutoOffsetResetStrategyMap = offsetFetcherUtils.getOffsetResetStrategyForPartitions();
} catch (Throwable t) {
if (t instanceof CompletionException)
t = t.getCause();
backgroundEventHandler.add(new ErrorEvent(t));
} catch (Exception e) {
CompletableFuture<Void> result = new CompletableFuture<>();
result.completeExceptionally(t);
result.completeExceptionally(e);
return result;
}

View File

@ -279,6 +279,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
shareFetchMetricsManager.throttleTimeSensor(),
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null),
backgroundEventHandler,
true,
asyncConsumerMetrics
);
this.completedAcknowledgements = new LinkedList<>();
@ -387,7 +388,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
backgroundEventQueue, time, asyncConsumerMetrics);
final Supplier<NetworkClientDelegate> networkClientDelegateSupplier =
NetworkClientDelegate.supplier(time, config, logContext, client, metadata, backgroundEventHandler, asyncConsumerMetrics);
NetworkClientDelegate.supplier(time, config, logContext, client, metadata, backgroundEventHandler, true, asyncConsumerMetrics);
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
config,

View File

@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.internals.CachedSupplier;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
@ -66,7 +67,6 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
private final RequiresApplicationThreadExecution backgroundEventProcessingRequiredTest;
private final RequiresApplicationThreadExecution offsetCommitCallbackInvocationRequiredTest;
private final CompletableEventReaper applicationEventReaper;
private final BackgroundEventHandler backgroundEventHandler;
private int metadataVersionSnapshot;
public ApplicationEventProcessor(final LogContext logContext,
@ -81,7 +81,6 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
this.metadata = metadata;
this.subscriptions = subscriptions;
this.applicationEventReaper = applicationEventReaper;
this.backgroundEventHandler = backgroundEventHandler;
this.metadataVersionSnapshot = metadata.updateVersion();
// If there are background events to process, exit to the application thread.
@ -241,44 +240,47 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
}
private void process(final CompositePollEvent event) {
if (maybePauseCompositePoll(event))
if (maybePauseCompositePoll(event, ApplicationEvent.Type.POLL))
return;
ApplicationEvent.Type nextEventType = event.nextEventType();
if (nextEventType == ApplicationEvent.Type.POLL) {
log.debug("Processing {} logic for {}", nextEventType, event);
processPollEvent(event.pollTimeMs());
if (maybePauseCompositePoll(event))
return;
nextEventType = ApplicationEvent.Type.UPDATE_SUBSCRIPTION_METADATA;
if (maybePauseCompositePoll(event, nextEventType))
return;
}
if (nextEventType == ApplicationEvent.Type.UPDATE_SUBSCRIPTION_METADATA) {
log.debug("Processing {} logic for {}", nextEventType, event);
processUpdatePatternSubscriptionEvent();
if (maybePauseCompositePoll(event))
return;
nextEventType = ApplicationEvent.Type.CHECK_AND_UPDATE_POSITIONS;
if (maybePauseCompositePoll(event, nextEventType))
return;
}
if (nextEventType == ApplicationEvent.Type.CHECK_AND_UPDATE_POSITIONS) {
log.debug("Processing {} logic for {}", nextEventType, event);
CompletableFuture<Boolean> updatePositionsFuture = processCheckAndUpdatePositionsEvent(event.deadlineMs());
applicationEventReaper.add(new CompositePollPsuedoEvent<>(updatePositionsFuture, event.deadlineMs()));
updatePositionsFuture.whenComplete((__, updatePositionsError) -> {
if (maybeFailCompositePoll(event, updatePositionsError) || maybePauseCompositePoll(event))
if (maybeFailCompositePoll(event, updatePositionsError))
return;
log.debug("Processing {} logic for {}", ApplicationEvent.Type.POLL, event);
// If needed, create a fetch request if there's no data in the FetchBuffer.
requestManagers.fetchRequestManager.createFetchRequests().whenComplete((___, fetchError) -> {
if (maybeFailCompositePoll(event, fetchError) || maybePauseCompositePoll(event))
if (maybeFailCompositePoll(event, fetchError))
return;
event.complete();
log.trace("Completed CompositePollEvent {}", event);
event.complete(CompositePollEvent.State.COMPLETE, Optional.empty());
log.debug("Completed CompositePollEvent {}", event);
});
});
@ -286,17 +288,19 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
}
log.warn("Unknown next step for composite poll: {}", nextEventType);
event.complete();
event.complete(CompositePollEvent.State.UNKNOWN, Optional.empty());
}
private boolean maybePauseCompositePoll(CompositePollEvent event) {
private boolean maybePauseCompositePoll(CompositePollEvent event, ApplicationEvent.Type nextEventType) {
if (backgroundEventProcessingRequiredTest.requiresApplicationThread()) {
event.complete();
log.debug("Pausing event processing for {} with {} as next step", event, nextEventType);
event.complete(CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED, Optional.of(nextEventType));
return true;
}
if (offsetCommitCallbackInvocationRequiredTest.requiresApplicationThread()) {
event.complete();
log.debug("Pausing event processing for {} with {} as next step", event, nextEventType);
event.complete(CompositePollEvent.State.OFFSET_COMMIT_CALLBACKS_REQUIRED, Optional.of(nextEventType));
return true;
}
@ -308,7 +312,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
return false;
if (t instanceof org.apache.kafka.common.errors.TimeoutException || t instanceof java.util.concurrent.TimeoutException) {
log.trace("Ignoring timeout for CompositePollEvent {}: {}", event, t.getMessage());
log.debug("Ignoring timeout for CompositePollEvent {}: {}", event, t.getMessage());
return false;
}
@ -316,9 +320,9 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
t = t.getCause();
}
backgroundEventHandler.add(new ErrorEvent(t));
event.complete();
log.trace("Failing CompositePollEvent {}", event, t);
KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t);
event.completeExceptionally(e);
log.debug("Failing event processing for {}", event, e);
return true;
}

View File

@ -156,6 +156,19 @@ public class CompletableEventReaper {
return tracked.size();
}
public List<CompletableEvent<?>> uncompletedEvents() {
// The following code does not use the Java Collections Streams API to reduce overhead in the critical
// path of the ConsumerNetworkThread loop.
List<CompletableEvent<?>> events = new ArrayList<>();
for (CompletableEvent<?> event : tracked) {
if (!event.future().isDone())
events.add(event);
}
return events;
}
/**
* For all the {@link CompletableEvent}s in the collection, if they're not already complete, invoke
* {@link CompletableFuture#completeExceptionally(Throwable)}.

View File

@ -16,20 +16,72 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.KafkaException;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
public class CompositePollEvent extends ApplicationEvent {
public enum State {
OFFSET_COMMIT_CALLBACKS_REQUIRED,
BACKGROUND_EVENT_PROCESSING_REQUIRED,
INCOMPLETE,
COMPLETE,
UNKNOWN
}
public static class Result {
private static final Result INCOMPLETE = new Result(State.INCOMPLETE, Optional.empty());
private final State state;
private final Optional<Type> nextEventType;
public Result(State state, Optional<Type> nextEventType) {
this.state = state;
this.nextEventType = nextEventType;
}
public State state() {
return state;
}
public Optional<Type> nextEventType() {
return nextEventType;
}
@Override
public String toString() {
return "Result{" + "state=" + state + ", nextEventType=" + nextEventType + '}';
}
@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
Result result = (Result) o;
return state == result.state && Objects.equals(nextEventType, result.nextEventType);
}
@Override
public int hashCode() {
return Objects.hash(state, nextEventType);
}
}
private final long deadlineMs;
private final long pollTimeMs;
private final Type nextEventType;
private final AtomicBoolean complete = new AtomicBoolean();
private final AtomicReference<Object> resultOrError;
public CompositePollEvent(long deadlineMs, long pollTimeMs, Type nextEventType) {
super(Type.COMPOSITE_POLL);
this.deadlineMs = deadlineMs;
this.pollTimeMs = pollTimeMs;
this.nextEventType = nextEventType;
this.resultOrError = new AtomicReference<>(Result.INCOMPLETE);
}
public long deadlineMs() {
@ -44,16 +96,30 @@ public class CompositePollEvent extends ApplicationEvent {
return nextEventType;
}
public boolean isComplete() {
return complete.get();
public Result resultOrError() {
Object o = resultOrError.get();
if (o instanceof KafkaException)
throw (KafkaException) o;
else
return (Result) o;
}
public void complete() {
complete.set(true);
public void complete(State state, Optional<Type> nextEventType) {
Result result = new Result(
Objects.requireNonNull(state),
Objects.requireNonNull(nextEventType)
);
resultOrError.compareAndSet(Result.INCOMPLETE, result);
}
public void completeExceptionally(KafkaException e) {
resultOrError.compareAndSet(Result.INCOMPLETE, Objects.requireNonNull(e));
}
@Override
protected String toStringBase() {
return super.toStringBase() + ", deadlineMs=" + deadlineMs + ", pollTimeMs=" + pollTimeMs + ", nextEventType=" + nextEventType + ", complete=" + complete;
return super.toStringBase() + ", deadlineMs=" + deadlineMs + ", pollTimeMs=" + pollTimeMs + ", nextEventType=" + nextEventType + ", resultOrError=" + resultOrError;
}
}

View File

@ -1046,7 +1046,7 @@ public class KafkaConsumerTest {
}, fetchResponse(tp0, 50L, 5));
@SuppressWarnings("unchecked")
ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1));
ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1000));
assertEquals(5, records.count());
assertEquals(Set.of(tp0), records.partitions());
assertEquals(1, records.nextOffsets().size());
@ -1826,7 +1826,7 @@ public class KafkaConsumerTest {
client.prepareResponse(fetchResponse(tp0, 10L, 1));
@SuppressWarnings("unchecked")
ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1));
ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1000));
assertEquals(1, records.count());
assertEquals(11L, consumer.position(tp0));
assertEquals(1, records.nextOffsets().size());

View File

@ -4127,7 +4127,7 @@ public class FetchRequestManagerTest {
properties.setProperty(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, String.valueOf(retryBackoffMs));
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
ConsumerConfig config = new ConsumerConfig(properties);
networkClientDelegate = spy(new TestableNetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler));
networkClientDelegate = spy(new TestableNetworkClientDelegate(time, config, logContext, client, metadata, backgroundEventHandler, true));
}
private <T> List<Long> collectRecordOffsets(List<ConsumerRecord<T, T>> records) {
@ -4212,8 +4212,9 @@ public class FetchRequestManagerTest {
LogContext logContext,
KafkaClient client,
Metadata metadata,
BackgroundEventHandler backgroundEventHandler) {
super(time, config, logContext, client, metadata, backgroundEventHandler, mock(AsyncConsumerMetrics.class));
BackgroundEventHandler backgroundEventHandler,
boolean notifyMetadataErrorsViaErrorQueue) {
super(time, config, logContext, client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue, mock(AsyncConsumerMetrics.class));
}
@Override

View File

@ -45,7 +45,6 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
@ -89,7 +88,7 @@ public class NetworkClientDelegateTest {
@Test
void testPollResultTimer() throws Exception {
try (NetworkClientDelegate ncd = newNetworkClientDelegate()) {
try (NetworkClientDelegate ncd = newNetworkClientDelegate(false)) {
NetworkClientDelegate.UnsentRequest req = new NetworkClientDelegate.UnsentRequest(
new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
@ -113,7 +112,7 @@ public class NetworkClientDelegateTest {
@Test
public void testSuccessfulResponse() throws Exception {
try (NetworkClientDelegate ncd = newNetworkClientDelegate()) {
try (NetworkClientDelegate ncd = newNetworkClientDelegate(false)) {
NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest();
prepareFindCoordinatorResponse(Errors.NONE);
@ -127,7 +126,7 @@ public class NetworkClientDelegateTest {
@Test
public void testTimeoutBeforeSend() throws Exception {
try (NetworkClientDelegate ncd = newNetworkClientDelegate()) {
try (NetworkClientDelegate ncd = newNetworkClientDelegate(false)) {
client.setUnreachable(mockNode(), REQUEST_TIMEOUT_MS);
NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest();
ncd.add(unsentRequest);
@ -141,7 +140,7 @@ public class NetworkClientDelegateTest {
@Test
public void testTimeoutAfterSend() throws Exception {
try (NetworkClientDelegate ncd = newNetworkClientDelegate()) {
try (NetworkClientDelegate ncd = newNetworkClientDelegate(false)) {
NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest();
ncd.add(unsentRequest);
ncd.poll(0, time.milliseconds());
@ -175,7 +174,7 @@ public class NetworkClientDelegateTest {
@Test
public void testEnsureTimerSetOnAdd() {
NetworkClientDelegate ncd = newNetworkClientDelegate();
NetworkClientDelegate ncd = newNetworkClientDelegate(false);
NetworkClientDelegate.UnsentRequest findCoordRequest = newUnsentFindCoordinatorRequest();
assertNull(findCoordRequest.timer());
@ -192,7 +191,7 @@ public class NetworkClientDelegateTest {
@Test
public void testHasAnyPendingRequests() throws Exception {
try (NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate()) {
try (NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(false)) {
NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest();
networkClientDelegate.add(unsentRequest);
@ -223,18 +222,14 @@ public class NetworkClientDelegateTest {
AuthenticationException authException = new AuthenticationException("Test Auth Exception");
doThrow(authException).when(metadata).maybeThrowAnyException();
NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate();
List<BackgroundEvent> backgroundEvents = backgroundEventHandler.drainEvents();
assertTrue(backgroundEvents.isEmpty());
NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(false);
assertTrue(networkClientDelegate.getAndClearMetadataError().isEmpty());
networkClientDelegate.poll(0, time.milliseconds());
backgroundEvents = backgroundEventHandler.drainEvents();
assertEquals(1, backgroundEvents.size());
BackgroundEvent event = backgroundEvents.get(0);
assertInstanceOf(ErrorEvent.class, event);
ErrorEvent errorEvent = (ErrorEvent) event;
assertInstanceOf(AuthenticationException.class, errorEvent.error());
assertEquals(authException.getMessage(), errorEvent.error().getMessage());
Optional<Exception> metadataError = networkClientDelegate.getAndClearMetadataError();
assertTrue(metadataError.isPresent());
assertInstanceOf(AuthenticationException.class, metadataError.get());
assertEquals(authException.getMessage(), metadataError.get().getMessage());
}
@Test
@ -244,7 +239,7 @@ public class NetworkClientDelegateTest {
BlockingQueue<BackgroundEvent> backgroundEventQueue = new LinkedBlockingQueue<>();
this.backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue, time, mock(AsyncConsumerMetrics.class));
NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate();
NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(true);
assertEquals(0, backgroundEventQueue.size());
networkClientDelegate.poll(0, time.milliseconds());
@ -261,7 +256,7 @@ public class NetworkClientDelegateTest {
public void testRecordUnsentRequestsQueueTime(String groupName) throws Exception {
try (Metrics metrics = new Metrics();
AsyncConsumerMetrics asyncConsumerMetrics = new AsyncConsumerMetrics(metrics, groupName);
NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(asyncConsumerMetrics)) {
NetworkClientDelegate networkClientDelegate = newNetworkClientDelegate(false, asyncConsumerMetrics)) {
NetworkClientDelegate.UnsentRequest unsentRequest = newUnsentFindCoordinatorRequest();
networkClientDelegate.add(unsentRequest);
asyncConsumerMetrics.recordUnsentRequestsQueueSize(1, time.milliseconds());
@ -290,11 +285,11 @@ public class NetworkClientDelegateTest {
}
}
public NetworkClientDelegate newNetworkClientDelegate() {
return newNetworkClientDelegate(asyncConsumerMetrics);
public NetworkClientDelegate newNetworkClientDelegate(boolean notifyMetadataErrorsViaErrorQueue) {
return newNetworkClientDelegate(notifyMetadataErrorsViaErrorQueue, asyncConsumerMetrics);
}
public NetworkClientDelegate newNetworkClientDelegate(AsyncConsumerMetrics asyncConsumerMetrics) {
public NetworkClientDelegate newNetworkClientDelegate(boolean notifyMetadataErrorsViaErrorQueue, AsyncConsumerMetrics asyncConsumerMetrics) {
LogContext logContext = new LogContext();
Properties properties = new Properties();
properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
@ -308,6 +303,7 @@ public class NetworkClientDelegateTest {
this.client,
this.metadata,
this.backgroundEventHandler,
notifyMetadataErrorsViaErrorQueue,
asyncConsumerMetrics
);
}

View File

@ -2687,7 +2687,7 @@ public class ShareConsumeRequestManagerTest {
ConsumerConfig config = new ConsumerConfig(properties);
networkClientDelegate = spy(new TestableNetworkClientDelegate(
time, config, logContext, client, metadata,
new BackgroundEventHandler(new LinkedBlockingQueue<>(), time, mock(AsyncConsumerMetrics.class))));
new BackgroundEventHandler(new LinkedBlockingQueue<>(), time, mock(AsyncConsumerMetrics.class)), false));
}
private class TestableShareConsumeRequestManager<K, V> extends ShareConsumeRequestManager {
@ -2751,8 +2751,9 @@ public class ShareConsumeRequestManagerTest {
LogContext logContext,
KafkaClient client,
Metadata metadata,
BackgroundEventHandler backgroundEventHandler) {
super(time, config, logContext, client, metadata, backgroundEventHandler, mock(AsyncConsumerMetrics.class));
BackgroundEventHandler backgroundEventHandler,
boolean notifyMetadataErrorsViaErrorQueue) {
super(time, config, logContext, client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue, mock(AsyncConsumerMetrics.class));
}
@Override