diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index c1aacca4ce4..8cce81dcc8c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -199,51 +199,6 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { } } - private class CompositePollEventInvoker { - - private CompositePollEvent latest; - - private void poll(Timer timer) { - if (latest == null) { - submitEvent(ApplicationEvent.Type.POLL, timer); - } - - try { - log.debug("Attempting to retrieve result from previously submitted {} with {} remaining on timer", latest, timer.remainingMs()); - - CompositePollEvent.Result result = latest.resultOrError(); - CompositePollEvent.State state = result.state(); - - if (state == CompositePollEvent.State.COMPLETE) { - // Make sure to clear out the latest request since it's complete. - latest = null; - } else if (state == CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED) { - processBackgroundEvents(); - result.nextEventType().ifPresent(t -> submitEvent(t, timer)); - } else if (state == CompositePollEvent.State.OFFSET_COMMIT_CALLBACKS_REQUIRED) { - offsetCommitCallbackInvoker.executeCallbacks(); - result.nextEventType().ifPresent(t -> submitEvent(t, timer)); - } else if (state == CompositePollEvent.State.UNKNOWN) { - throw new KafkaException("Unexpected poll result received"); - } - } catch (Throwable t) { - // If an exception is hit, bubble it up to the user but make sure to clear out the latest request - // to signify this one is complete. - latest = null; - throw ConsumerUtils.maybeWrapAsKafkaException(t); - } - } - - private void submitEvent(ApplicationEvent.Type type, Timer timer) { - long deadlineMs = calculateDeadlineMs(timer); - latest = new CompositePollEvent(deadlineMs, time.milliseconds(), type); - applicationEventHandler.add(latest); - log.debug("Submitted new {} submitted with {} remaining on timer", latest, timer.remainingMs()); - } - } - - private final CompositePollEventInvoker pollInvoker = new CompositePollEventInvoker(); - /** * An {@link org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is created and executes in the * application thread for the purpose of processing {@link BackgroundEvent background events} generated by the @@ -396,6 +351,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { private Optional clientTelemetryReporter = Optional.empty(); private final AsyncConsumerApplicationThreadRequirement asyncApplicationThreadRequirement; + private final CompositePollEventInvoker pollInvoker; private final WakeupTrigger wakeupTrigger = new WakeupTrigger(); private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker; private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker; @@ -563,6 +519,13 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { new StreamsRebalanceListenerInvoker(logContext, s)); this.backgroundEventProcessor = new BackgroundEventProcessor(); this.backgroundEventReaper = backgroundEventReaperFactory.build(logContext); + this.pollInvoker = new CompositePollEventInvoker( + logContext, + time, + applicationEventHandler, + this::processBackgroundEvents, + offsetCommitCallbackInvoker::executeCallbacks + ); // The FetchCollector is only used on the application thread. this.fetchCollector = fetchCollectorFactory.build(logContext, @@ -637,6 +600,13 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { this.clientTelemetryReporter = Optional.empty(); this.autoCommitEnabled = autoCommitEnabled; this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors); + this.pollInvoker = new CompositePollEventInvoker( + logContext, + time, + applicationEventHandler, + this::processBackgroundEvents, + offsetCommitCallbackInvoker::executeCallbacks + ); this.backgroundEventHandler = new BackgroundEventHandler( backgroundEventQueue, time, @@ -759,6 +729,13 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { this.streamsRebalanceListenerInvoker = Optional.empty(); this.backgroundEventProcessor = new BackgroundEventProcessor(); this.backgroundEventReaper = new CompletableEventReaper(logContext); + this.pollInvoker = new CompositePollEventInvoker( + logContext, + time, + applicationEventHandler, + this::processBackgroundEvents, + offsetCommitCallbackInvoker::executeCallbacks + ); } // auxiliary interface for testing @@ -929,8 +906,8 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { // returning the records in the fetches. Thus, we trigger a possible wake-up before we poll fetches. wakeupTrigger.maybeTriggerWakeup(); - processBackgroundEvents(); - offsetCommitCallbackInvoker.executeCallbacks(); +// processBackgroundEvents(); +// offsetCommitCallbackInvoker.executeCallbacks(); pollInvoker.poll(timer); final Fetch fetch = pollForFetches(timer); if (!fetch.isEmpty()) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompositePollEventInvoker.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompositePollEventInvoker.java new file mode 100644 index 00000000000..473378a82b4 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompositePollEventInvoker.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; +import org.apache.kafka.clients.consumer.internals.events.CompositePollEvent; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs; + +public class CompositePollEventInvoker { + + private final Logger log; + private final Time time; + private final ApplicationEventHandler applicationEventHandler; + private final Runnable backgroundEventProcessor; + private final Runnable offsetCommitProcessor; + private CompositePollEvent latest; + + public CompositePollEventInvoker(LogContext logContext, + Time time, + ApplicationEventHandler applicationEventHandler, + Runnable backgroundEventProcessor, + Runnable offsetCommitProcessor) { + this.log = logContext.logger(getClass()); + this.time = time; + this.applicationEventHandler = applicationEventHandler; + this.backgroundEventProcessor = backgroundEventProcessor; + this.offsetCommitProcessor = offsetCommitProcessor; + } + + public void poll(Timer timer) { + if (latest == null) { + log.debug("latest was null, so submitting new event..."); + submitEvent(ApplicationEvent.Type.POLL, timer); + } + + try { + log.debug("Attempting to retrieve result from previously submitted {} with {} remaining on timer", latest, timer.remainingMs()); + + CompositePollEvent.Result result = latest.resultOrError(); + CompositePollEvent.State state = result.state(); + log.debug("Retrieved result: {}, with state: {}", result, state); + + if (state == CompositePollEvent.State.COMPLETE) { + // Make sure to clear out the latest request since it's complete. + log.debug("We're supposedly complete with event {}, so clearing...", latest); + latest = null; + } else if (state == CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED) { + log.debug("About to process background events"); + backgroundEventProcessor.run(); + log.debug("Done processing background events"); + result.nextEventType().ifPresent(t -> submitEvent(t, timer)); + } else if (state == CompositePollEvent.State.OFFSET_COMMIT_CALLBACKS_REQUIRED) { + log.debug("About to process offset commits"); + offsetCommitProcessor.run(); + log.debug("Done processing offset commits"); + result.nextEventType().ifPresent(t -> submitEvent(t, timer)); + } else if (state == CompositePollEvent.State.UNKNOWN) { + throw new KafkaException("Unexpected poll result received"); + } + } catch (Throwable t) { + log.debug("Caught error, rethrowing...", t); + // If an exception is hit, bubble it up to the user but make sure to clear out the latest request + // to signify this one is complete. + latest = null; + throw ConsumerUtils.maybeWrapAsKafkaException(t); + } + } + + private void submitEvent(ApplicationEvent.Type type, Timer timer) { + long deadlineMs = calculateDeadlineMs(timer); + latest = new CompositePollEvent(deadlineMs, time.milliseconds(), type); + applicationEventHandler.add(latest); + log.debug("Submitted new {} submitted with {} remaining on timer", latest, timer.remainingMs()); + } +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 6ff8d98d1f2..50d0b96daea 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -147,6 +147,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -2666,8 +2667,7 @@ public class KafkaConsumerTest { consumer.assign(Set.of(tp0)); // poll once to update with the current metadata - consumer.poll(Duration.ofMillis(0)); - TestUtils.waitForCondition(() -> requestGenerated(client, ApiKeys.FIND_COORDINATOR), + waitForConsumerPoll(() -> requestGenerated(client, ApiKeys.FIND_COORDINATOR), "No metadata requests sent"); client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, metadata.fetch().nodes().get(0))); @@ -2681,9 +2681,8 @@ public class KafkaConsumerTest { } // poll once again, which should send the list-offset request consumer.seek(tp0, 50L); - consumer.poll(Duration.ofMillis(0)); // requests: list-offset, fetch - TestUtils.waitForCondition(() -> { + waitForConsumerPoll(() -> { boolean hasListOffsetRequest = requestGenerated(client, ApiKeys.LIST_OFFSETS); boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH); return hasListOffsetRequest && hasFetchRequest; @@ -3817,6 +3816,20 @@ public void testPollIdleRatio(GroupProtocol groupProtocol) { return new MetricName(NAME, "plugins", DESCRIPTION, expectedTags); } + private void waitForConsumerPoll(Supplier testCondition, String conditionDetails) { + try { + TestUtils.waitForCondition( + () -> { + consumer.poll(Duration.ZERO); + return testCondition.get(); + }, + conditionDetails + ); + } catch (InterruptedException e) { + throw new InterruptException(e); + } + } + private static final String NAME = "name"; private static final String DESCRIPTION = "description"; private static final LinkedHashMap TAGS = new LinkedHashMap<>(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index c93fdbfd56e..fa00b9ad974 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -112,6 +112,7 @@ import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; @@ -125,7 +126,9 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Stream; @@ -509,9 +512,11 @@ public class AsyncKafkaConsumerTest { completeTopicSubscriptionChangeEventSuccessfully(); consumer.subscribe(Collections.singletonList(topicName), listener); markReconcileAndAutoCommitCompleteForPollEvent(); - markResultForCompositePollEvent(); - consumer.poll(Duration.ZERO); - assertTrue(callbackExecuted.get()); + markResultForCompositePollEvent(CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED); + waitForConsumerPoll( + callbackExecuted::get, + "Consumer.poll() did not execute callback within timeout" + ); } @Test @@ -679,8 +684,11 @@ public class AsyncKafkaConsumerTest { consumer.assign(Collections.singleton(new TopicPartition("foo", 0))); assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); markReconcileAndAutoCommitCompleteForPollEvent(); - markResultForCompositePollEvent(); - assertMockCommitCallbackInvoked(() -> consumer.poll(Duration.ZERO), callback); + markResultForCompositePollEvent(CompositePollEvent.State.OFFSET_COMMIT_CALLBACKS_REQUIRED); + waitForConsumerPoll( + () -> callback.invoked == 1 && callback.exception == null, + "Consumer.poll() did not execute the callback once (without error) in allottec timeout" + ); } @Test @@ -1461,7 +1469,7 @@ public class AsyncKafkaConsumerTest { int expectedRevokedCount, int expectedAssignedCount, int expectedLostCount, - Optional expectedException + Optional expectedExceptionOpt ) { consumer = newConsumer(); CounterConsumerRebalanceListener consumerRebalanceListener = new CounterConsumerRebalanceListener( @@ -1480,14 +1488,21 @@ public class AsyncKafkaConsumerTest { } markReconcileAndAutoCommitCompleteForPollEvent(); - markResultForCompositePollEvent(); + markResultForCompositePollEvent(CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED); // This will trigger the background event queue to process our background event message. // If any error is happening inside the rebalance callbacks, we expect the first exception to be thrown from poll. - if (expectedException.isPresent()) { - Exception exception = assertThrows(expectedException.get().getClass(), () -> consumer.poll(Duration.ZERO)); - assertEquals(expectedException.get().getMessage(), exception.getMessage()); - assertEquals(expectedException.get().getCause(), exception.getCause()); + if (expectedExceptionOpt.isPresent()) { + Exception expectedException = expectedExceptionOpt.get(); + + waitForConsumerPollException( + e -> + Objects.equals(e.getClass(), expectedException.getClass()) && + Objects.equals(e.getMessage(), expectedException.getMessage()) && + Objects.equals(e.getCause(), expectedException.getCause()) + , + "Consumer.poll() did not throw the expected exception " + expectedException + ); } else { when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); assertDoesNotThrow(() -> consumer.poll(Duration.ZERO)); @@ -1552,10 +1567,11 @@ public class AsyncKafkaConsumerTest { completeAssignmentChangeEventSuccessfully(); consumer.assign(singletonList(new TopicPartition("topic", 0))); markReconcileAndAutoCommitCompleteForPollEvent(); - markResultForCompositePollEvent(); - final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO)); - - assertEquals(expectedException.getMessage(), exception.getMessage()); + markResultForCompositePollEvent(CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED); + waitForConsumerPollException( + e -> e.getMessage().equals(expectedException.getMessage()), + "Consumer.poll() did not fail with expected exception " + expectedException + " within timeout" + ); } @Test @@ -1572,10 +1588,11 @@ public class AsyncKafkaConsumerTest { completeAssignmentChangeEventSuccessfully(); consumer.assign(singletonList(new TopicPartition("topic", 0))); markReconcileAndAutoCommitCompleteForPollEvent(); - markResultForCompositePollEvent(); - final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO)); - - assertEquals(expectedException1.getMessage(), exception.getMessage()); + markResultForCompositePollEvent(CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED); + waitForConsumerPollException( + e -> e.getMessage().equals(expectedException1.getMessage()), + "Consumer.poll() did not fail with expected exception " + expectedException1 + " within timeout" + ); assertTrue(backgroundEventQueue.isEmpty()); } @@ -1849,7 +1866,12 @@ public class AsyncKafkaConsumerTest { when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); markReconcileAndAutoCommitCompleteForPollEvent(); markResultForCompositePollEvent(); - consumer.poll(Duration.ZERO); + markResultForCompositePollEvent(CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED); + + waitForConsumerPoll( + () -> backgroundEventReaper.size() == 0, + "Consumer.poll() did not reap background events within timeout" + ); verify(backgroundEventReaper).reap(time.milliseconds()); } @@ -2300,4 +2322,44 @@ public class AsyncKafkaConsumerTest { doAnswer(invocation -> null) .when(applicationEventHandler).add(ArgumentMatchers.isA(CompositePollEvent.class)); } + + private void markResultForCompositePollEvent(CompositePollEvent.State state) { + doAnswer(invocation -> { + CompositePollEvent event = invocation.getArgument(0); + event.complete(state, Optional.empty()); + return null; + }).when(applicationEventHandler).add(ArgumentMatchers.isA(CompositePollEvent.class)); + } + + private void waitForConsumerPoll(Supplier testCondition, String conditionDetails) { + try { + TestUtils.waitForCondition( + () -> { + consumer.poll(Duration.ZERO); + return testCondition.get(); + }, + conditionDetails + ); + } catch (InterruptedException e) { + throw new InterruptException(e); + } + } + + private void waitForConsumerPollException(Function testCondition, String conditionDetails) { + try { + TestUtils.waitForCondition( + () -> { + try { + consumer.poll(Duration.ZERO); + return false; + } catch (KafkaException e) { + return testCondition.apply(e); + } + }, + conditionDetails + ); + } catch (InterruptedException e) { + throw new InterruptException(e); + } + } }