diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java index d5b2dc02b74..d2d178a88c3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java @@ -174,7 +174,8 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { reapExpiredApplicationEvents(currentTimeMs); List> uncompletedEvents = applicationEventReaper.uncompletedEvents(); - maybeFailOnMetadataError(uncompletedEvents); } + maybeFailOnMetadataError(uncompletedEvents); + } /** * Process the events—if any—that were produced by the application thread. @@ -192,6 +193,10 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { try { if (event instanceof CompletableEvent) { applicationEventReaper.add((CompletableEvent) event); + // Check if there are any metadata errors and fail the CompletableEvent if an error is present. + // This call is meant to handle "immediately completed events" which may not enter the awaiting state, + // so metadata errors need to be checked and handled right away. + maybeFailOnMetadataError(List.of((CompletableEvent) event)); } applicationEventProcessor.process(event); } catch (Throwable t) { @@ -374,7 +379,7 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable { if (subscriptionMetadataEvent.isEmpty()) return; networkClientDelegate.getAndClearMetadataError().ifPresent(metadataError -> - subscriptionMetadataEvent.forEach(event -> event.future().completeExceptionally(metadataError)) + subscriptionMetadataEvent.forEach(event -> event.future().completeExceptionally(metadataError)) ); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 5bdd3296619..78ff15cee5f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1046,7 +1046,7 @@ public class KafkaConsumerTest { }, fetchResponse(tp0, 50L, 5)); @SuppressWarnings("unchecked") - ConsumerRecords records = (ConsumerRecords) consumer.poll(Duration.ofMillis(1000)); + ConsumerRecords records = (ConsumerRecords) consumer.poll(Duration.ofMillis(1)); assertEquals(5, records.count()); assertEquals(Set.of(tp0), records.partitions()); assertEquals(1, records.nextOffsets().size()); @@ -1826,7 +1826,7 @@ public class KafkaConsumerTest { client.prepareResponse(fetchResponse(tp0, 10L, 1)); @SuppressWarnings("unchecked") - ConsumerRecords records = (ConsumerRecords) consumer.poll(Duration.ofMillis(1000)); + ConsumerRecords records = (ConsumerRecords) consumer.poll(Duration.ofMillis(1)); assertEquals(1, records.count()); assertEquals(11L, consumer.position(tp0)); assertEquals(1, records.nextOffsets().size()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java index da68a2626a7..0347423137b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java @@ -73,16 +73,13 @@ public class NetworkClientDelegateTest { private MockTime time; private MockClient client; private Metadata metadata; - private AsyncConsumerMetrics asyncConsumerMetrics; private BackgroundEventHandler backgroundEventHandler; @BeforeEach public void setup() { this.time = new MockTime(0); this.metadata = mock(Metadata.class); - this.asyncConsumerMetrics = mock(AsyncConsumerMetrics.class); - BlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>(); - this.backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue, time, asyncConsumerMetrics); + this.backgroundEventHandler = mock(BackgroundEventHandler.class); this.client = new MockClient(time, Collections.singletonList(mockNode())); } @@ -286,7 +283,7 @@ public class NetworkClientDelegateTest { } public NetworkClientDelegate newNetworkClientDelegate(boolean notifyMetadataErrorsViaErrorQueue) { - return newNetworkClientDelegate(notifyMetadataErrorsViaErrorQueue, asyncConsumerMetrics); + return newNetworkClientDelegate(notifyMetadataErrorsViaErrorQueue, mock(AsyncConsumerMetrics.class)); } public NetworkClientDelegate newNetworkClientDelegate(boolean notifyMetadataErrorsViaErrorQueue, AsyncConsumerMetrics asyncConsumerMetrics) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index ed37fecf28d..8c5623a7f26 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -95,6 +95,7 @@ public class ApplicationEventProcessorTest { private final StreamsMembershipManager streamsMembershipManager = mock(StreamsMembershipManager.class); private final BackgroundEventHandler backgroundEventHandler = mock(BackgroundEventHandler.class); private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker = mock(OffsetCommitCallbackInvoker.class); + private final NetworkClientDelegate networkClientDelegate = mock(NetworkClientDelegate.class); private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); private ApplicationEventProcessor processor; @@ -114,7 +115,7 @@ public class ApplicationEventProcessorTest { processor = new ApplicationEventProcessor( new LogContext(), requestManagers, - mock(NetworkClientDelegate.class), + networkClientDelegate, metadata, subscriptionState, backgroundEventHandler, @@ -139,7 +140,7 @@ public class ApplicationEventProcessorTest { processor = new ApplicationEventProcessor( new LogContext(), requestManagers, - mock(NetworkClientDelegate.class), + networkClientDelegate, metadata, subscriptionState, backgroundEventHandler,