From ea99a13021347b801b4c44a7fc600885e1752c7d Mon Sep 17 00:00:00 2001 From: Kirk True Date: Sat, 20 Sep 2025 16:12:38 -0700 Subject: [PATCH] Handle immediate metadata errors for CompletableEvents Added logic to check and fail CompletableEvents for metadata errors immediately upon processing, ensuring events that do not enter the awaiting state are handled correctly. Updated related tests to use consistent mocks and reduced poll durations for faster execution. --- .../consumer/internals/ConsumerNetworkThread.java | 9 +++++++-- .../apache/kafka/clients/consumer/KafkaConsumerTest.java | 4 ++-- .../consumer/internals/NetworkClientDelegateTest.java | 7 ++----- .../internals/events/ApplicationEventProcessorTest.java | 5 +++-- 4 files changed, 14 insertions(+), 11 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index d5b2dc02b74..d2d178a88c3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -174,7 +174,8 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { reapExpiredApplicationEvents(currentTimeMs); List> uncompletedEvents = applicationEventReaper.uncompletedEvents(); - maybeFailOnMetadataError(uncompletedEvents); } + maybeFailOnMetadataError(uncompletedEvents); + } /** * Process the events—if any—that were produced by the application thread. @@ -192,6 +193,10 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { try { if (event instanceof CompletableEvent) { applicationEventReaper.add((CompletableEvent) event); + // Check if there are any metadata errors and fail the CompletableEvent if an error is present. + // This call is meant to handle "immediately completed events" which may not enter the awaiting state, + // so metadata errors need to be checked and handled right away. + maybeFailOnMetadataError(List.of((CompletableEvent) event)); } applicationEventProcessor.process(event); } catch (Throwable t) { @@ -374,7 +379,7 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { if (subscriptionMetadataEvent.isEmpty()) return; networkClientDelegate.getAndClearMetadataError().ifPresent(metadataError -> - subscriptionMetadataEvent.forEach(event -> event.future().completeExceptionally(metadataError)) + subscriptionMetadataEvent.forEach(event -> event.future().completeExceptionally(metadataError)) ); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 5bdd3296619..78ff15cee5f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1046,7 +1046,7 @@ public class KafkaConsumerTest { }, fetchResponse(tp0, 50L, 5)); @SuppressWarnings("unchecked") - ConsumerRecords records = (ConsumerRecords) consumer.poll(Duration.ofMillis(1000)); + ConsumerRecords records = (ConsumerRecords) consumer.poll(Duration.ofMillis(1)); assertEquals(5, records.count()); assertEquals(Set.of(tp0), records.partitions()); assertEquals(1, records.nextOffsets().size()); @@ -1826,7 +1826,7 @@ public class KafkaConsumerTest { client.prepareResponse(fetchResponse(tp0, 10L, 1)); @SuppressWarnings("unchecked") - ConsumerRecords records = (ConsumerRecords) consumer.poll(Duration.ofMillis(1000)); + ConsumerRecords records = (ConsumerRecords) consumer.poll(Duration.ofMillis(1)); assertEquals(1, records.count()); assertEquals(11L, consumer.position(tp0)); assertEquals(1, records.nextOffsets().size()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java index da68a2626a7..0347423137b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java @@ -73,16 +73,13 @@ public class NetworkClientDelegateTest { private MockTime time; private MockClient client; private Metadata metadata; - private AsyncConsumerMetrics asyncConsumerMetrics; private BackgroundEventHandler backgroundEventHandler; @BeforeEach public void setup() { this.time = new MockTime(0); this.metadata = mock(Metadata.class); - this.asyncConsumerMetrics = mock(AsyncConsumerMetrics.class); - BlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>(); - this.backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue, time, asyncConsumerMetrics); + this.backgroundEventHandler = mock(BackgroundEventHandler.class); this.client = new MockClient(time, Collections.singletonList(mockNode())); } @@ -286,7 +283,7 @@ public class NetworkClientDelegateTest { } public NetworkClientDelegate newNetworkClientDelegate(boolean notifyMetadataErrorsViaErrorQueue) { - return newNetworkClientDelegate(notifyMetadataErrorsViaErrorQueue, asyncConsumerMetrics); + return newNetworkClientDelegate(notifyMetadataErrorsViaErrorQueue, mock(AsyncConsumerMetrics.class)); } public NetworkClientDelegate newNetworkClientDelegate(boolean notifyMetadataErrorsViaErrorQueue, AsyncConsumerMetrics asyncConsumerMetrics) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index ed37fecf28d..8c5623a7f26 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -95,6 +95,7 @@ public class ApplicationEventProcessorTest { private final StreamsMembershipManager streamsMembershipManager = mock(StreamsMembershipManager.class); private final BackgroundEventHandler backgroundEventHandler = mock(BackgroundEventHandler.class); private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker = mock(OffsetCommitCallbackInvoker.class); + private final NetworkClientDelegate networkClientDelegate = mock(NetworkClientDelegate.class); private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); private ApplicationEventProcessor processor; @@ -114,7 +115,7 @@ public class ApplicationEventProcessorTest { processor = new ApplicationEventProcessor( new LogContext(), requestManagers, - mock(NetworkClientDelegate.class), + networkClientDelegate, metadata, subscriptionState, backgroundEventHandler, @@ -139,7 +140,7 @@ public class ApplicationEventProcessorTest { processor = new ApplicationEventProcessor( new LogContext(), requestManagers, - mock(NetworkClientDelegate.class), + networkClientDelegate, metadata, subscriptionState, backgroundEventHandler,