From 4985c7de178b70c4df709ff2a6d9a2a61fb271b1 Mon Sep 17 00:00:00 2001 From: Kirk True Date: Tue, 7 Oct 2025 08:52:21 -0700 Subject: [PATCH] Updates for MetadataErrorNotifiableEvent --- .../internals/ConsumerNetworkThread.java | 45 +++++++------------ .../events/AbstractTopicMetadataEvent.java | 4 +- .../internals/events/AsyncPollEvent.java | 4 +- .../events/CheckAndUpdatePositionsEvent.java | 4 +- .../internals/events/ListOffsetsEvent.java | 4 +- .../events/MetadataErrorNotifiableEvent.java | 4 +- 6 files changed, 27 insertions(+), 38 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index 0f1c24829d9..9666041fc64 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -199,7 +199,7 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { // This call is meant to handle "immediately completed events" which may not enter the awaiting state, // so metadata errors need to be checked and handled right away. if (event instanceof MetadataErrorNotifiableEvent) { - if (maybeFailOnMetadataError((MetadataErrorNotifiableEvent) event)) + if (maybeFailOnMetadataError(List.of(event))) continue; } applicationEventProcessor.process(event); @@ -372,38 +372,27 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { /** * If there is a metadata error, complete all uncompleted events that require subscription metadata. */ - private void maybeFailOnMetadataError(List> events) { - List notifiables = new ArrayList<>(); + private boolean maybeFailOnMetadataError(List events) { + List filteredEvents = new ArrayList<>(); - for (CompletableEvent ce : events) { - if (ce instanceof MetadataErrorNotifiableEvent) { - notifiables.add((MetadataErrorNotifiableEvent) ce); + for (Object obj : events) { + if (obj instanceof MetadataErrorNotifiableEvent) { + filteredEvents.add((MetadataErrorNotifiableEvent) obj); } } - if (notifiables.isEmpty()) - return; - - Optional metadataErrorOpt = networkClientDelegate.getAndClearMetadataError(); - - if (metadataErrorOpt.isEmpty()) - return; - - Exception metadataError = metadataErrorOpt.get(); - notifiables.forEach(n -> n.completeExceptionallyWithMetadataError(metadataError)); - } - - /** - * If there is a metadata error, complete this event exceptionally. - */ - private boolean maybeFailOnMetadataError(MetadataErrorNotifiableEvent notifiable) { - Optional metadataErrorOpt = networkClientDelegate.getAndClearMetadataError(); - - if (metadataErrorOpt.isEmpty()) + // Don't get-and-clear the metadata error if there are no events that will be notified. + if (filteredEvents.isEmpty()) return false; - Exception metadataError = metadataErrorOpt.get(); - notifiable.completeExceptionallyWithMetadataError(metadataError); - return true; + Optional andClearMetadataError = networkClientDelegate.getAndClearMetadataError(); + + if (andClearMetadataError.isPresent()) { + Exception metadataError = andClearMetadataError.get(); + filteredEvents.forEach(e -> e.onMetadataError(metadataError)); + return true; + } else { + return false; + } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java index a23c5828c19..2db2b16b1d2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AbstractTopicMetadataEvent.java @@ -28,7 +28,7 @@ public abstract class AbstractTopicMetadataEvent extends CompletableApplicationE } @Override - public void completeExceptionallyWithMetadataError(Exception metadataException) { - future().completeExceptionally(metadataException); + public void onMetadataError(Exception metadataError) { + future().completeExceptionally(metadataError); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java index 07b8fffcd13..e63f85ecc33 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java @@ -85,8 +85,8 @@ public class AsyncPollEvent extends ApplicationEvent implements MetadataErrorNot } @Override - public void completeExceptionallyWithMetadataError(Exception metadataException) { - completeExceptionally(ConsumerUtils.maybeWrapAsKafkaException(metadataException)); + public void onMetadataError(Exception metadataError) { + completeExceptionally(ConsumerUtils.maybeWrapAsKafkaException(metadataError)); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java index c9cfa0219c0..4fd834eaf09 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CheckAndUpdatePositionsEvent.java @@ -44,7 +44,7 @@ public class CheckAndUpdatePositionsEvent extends CompletableApplicationEvent * * - * @param metadataException Error that originally came from {@link Metadata#maybeThrowAnyException()} + * @param metadataError Error that originally came from {@link Metadata#maybeThrowAnyException()} */ - void completeExceptionallyWithMetadataError(Exception metadataException); + void onMetadataError(Exception metadataError); }