Handle immediate metadata errors for CompletableEvents

Added logic to check and fail CompletableEvents for metadata errors immediately upon processing, ensuring events that do not enter the awaiting state are handled correctly. Updated related tests to use consistent mocks and reduced poll durations for faster execution.
This commit is contained in:
Kirk True 2025-09-20 16:12:38 -07:00
parent 18f4fa11f3
commit ea99a13021
4 changed files with 14 additions and 11 deletions

View File

@ -174,7 +174,8 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
reapExpiredApplicationEvents(currentTimeMs); reapExpiredApplicationEvents(currentTimeMs);
List<CompletableEvent<?>> uncompletedEvents = applicationEventReaper.uncompletedEvents(); List<CompletableEvent<?>> uncompletedEvents = applicationEventReaper.uncompletedEvents();
maybeFailOnMetadataError(uncompletedEvents); } maybeFailOnMetadataError(uncompletedEvents);
}
/** /**
* Process the eventsif anythat were produced by the application thread. * Process the eventsif anythat were produced by the application thread.
@ -192,6 +193,10 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
try { try {
if (event instanceof CompletableEvent) { if (event instanceof CompletableEvent) {
applicationEventReaper.add((CompletableEvent<?>) event); applicationEventReaper.add((CompletableEvent<?>) event);
// Check if there are any metadata errors and fail the CompletableEvent if an error is present.
// This call is meant to handle "immediately completed events" which may not enter the awaiting state,
// so metadata errors need to be checked and handled right away.
maybeFailOnMetadataError(List.of((CompletableEvent<?>) event));
} }
applicationEventProcessor.process(event); applicationEventProcessor.process(event);
} catch (Throwable t) { } catch (Throwable t) {
@ -374,7 +379,7 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
if (subscriptionMetadataEvent.isEmpty()) if (subscriptionMetadataEvent.isEmpty())
return; return;
networkClientDelegate.getAndClearMetadataError().ifPresent(metadataError -> networkClientDelegate.getAndClearMetadataError().ifPresent(metadataError ->
subscriptionMetadataEvent.forEach(event -> event.future().completeExceptionally(metadataError)) subscriptionMetadataEvent.forEach(event -> event.future().completeExceptionally(metadataError))
); );
} }
} }

View File

@ -1046,7 +1046,7 @@ public class KafkaConsumerTest {
}, fetchResponse(tp0, 50L, 5)); }, fetchResponse(tp0, 50L, 5));
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1000)); ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1));
assertEquals(5, records.count()); assertEquals(5, records.count());
assertEquals(Set.of(tp0), records.partitions()); assertEquals(Set.of(tp0), records.partitions());
assertEquals(1, records.nextOffsets().size()); assertEquals(1, records.nextOffsets().size());
@ -1826,7 +1826,7 @@ public class KafkaConsumerTest {
client.prepareResponse(fetchResponse(tp0, 10L, 1)); client.prepareResponse(fetchResponse(tp0, 10L, 1));
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1000)); ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1));
assertEquals(1, records.count()); assertEquals(1, records.count());
assertEquals(11L, consumer.position(tp0)); assertEquals(11L, consumer.position(tp0));
assertEquals(1, records.nextOffsets().size()); assertEquals(1, records.nextOffsets().size());

View File

@ -73,16 +73,13 @@ public class NetworkClientDelegateTest {
private MockTime time; private MockTime time;
private MockClient client; private MockClient client;
private Metadata metadata; private Metadata metadata;
private AsyncConsumerMetrics asyncConsumerMetrics;
private BackgroundEventHandler backgroundEventHandler; private BackgroundEventHandler backgroundEventHandler;
@BeforeEach @BeforeEach
public void setup() { public void setup() {
this.time = new MockTime(0); this.time = new MockTime(0);
this.metadata = mock(Metadata.class); this.metadata = mock(Metadata.class);
this.asyncConsumerMetrics = mock(AsyncConsumerMetrics.class); this.backgroundEventHandler = mock(BackgroundEventHandler.class);
BlockingQueue<BackgroundEvent> backgroundEventQueue = new LinkedBlockingQueue<>();
this.backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue, time, asyncConsumerMetrics);
this.client = new MockClient(time, Collections.singletonList(mockNode())); this.client = new MockClient(time, Collections.singletonList(mockNode()));
} }
@ -286,7 +283,7 @@ public class NetworkClientDelegateTest {
} }
public NetworkClientDelegate newNetworkClientDelegate(boolean notifyMetadataErrorsViaErrorQueue) { public NetworkClientDelegate newNetworkClientDelegate(boolean notifyMetadataErrorsViaErrorQueue) {
return newNetworkClientDelegate(notifyMetadataErrorsViaErrorQueue, asyncConsumerMetrics); return newNetworkClientDelegate(notifyMetadataErrorsViaErrorQueue, mock(AsyncConsumerMetrics.class));
} }
public NetworkClientDelegate newNetworkClientDelegate(boolean notifyMetadataErrorsViaErrorQueue, AsyncConsumerMetrics asyncConsumerMetrics) { public NetworkClientDelegate newNetworkClientDelegate(boolean notifyMetadataErrorsViaErrorQueue, AsyncConsumerMetrics asyncConsumerMetrics) {

View File

@ -95,6 +95,7 @@ public class ApplicationEventProcessorTest {
private final StreamsMembershipManager streamsMembershipManager = mock(StreamsMembershipManager.class); private final StreamsMembershipManager streamsMembershipManager = mock(StreamsMembershipManager.class);
private final BackgroundEventHandler backgroundEventHandler = mock(BackgroundEventHandler.class); private final BackgroundEventHandler backgroundEventHandler = mock(BackgroundEventHandler.class);
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker = mock(OffsetCommitCallbackInvoker.class); private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker = mock(OffsetCommitCallbackInvoker.class);
private final NetworkClientDelegate networkClientDelegate = mock(NetworkClientDelegate.class);
private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class); private final CompletableEventReaper applicationEventReaper = mock(CompletableEventReaper.class);
private ApplicationEventProcessor processor; private ApplicationEventProcessor processor;
@ -114,7 +115,7 @@ public class ApplicationEventProcessorTest {
processor = new ApplicationEventProcessor( processor = new ApplicationEventProcessor(
new LogContext(), new LogContext(),
requestManagers, requestManagers,
mock(NetworkClientDelegate.class), networkClientDelegate,
metadata, metadata,
subscriptionState, subscriptionState,
backgroundEventHandler, backgroundEventHandler,
@ -139,7 +140,7 @@ public class ApplicationEventProcessorTest {
processor = new ApplicationEventProcessor( processor = new ApplicationEventProcessor(
new LogContext(), new LogContext(),
requestManagers, requestManagers,
mock(NetworkClientDelegate.class), networkClientDelegate,
metadata, metadata,
subscriptionState, subscriptionState,
backgroundEventHandler, backgroundEventHandler,