mirror of https://github.com/apache/kafka.git
Refactor CompositePollEventInvoker to standalone class
Moved CompositePollEventInvoker from AsyncKafkaConsumer to its own file for better separation of concerns and testability. Updated AsyncKafkaConsumer to use the new class and refactored constructors accordingly. Enhanced related tests to use new helper methods for polling and exception handling, improving test clarity and reliability.
This commit is contained in:
parent
2d21fa0fdf
commit
b1937702d2
|
@ -199,51 +199,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
}
|
||||
}
|
||||
|
||||
private class CompositePollEventInvoker {
|
||||
|
||||
private CompositePollEvent latest;
|
||||
|
||||
private void poll(Timer timer) {
|
||||
if (latest == null) {
|
||||
submitEvent(ApplicationEvent.Type.POLL, timer);
|
||||
}
|
||||
|
||||
try {
|
||||
log.debug("Attempting to retrieve result from previously submitted {} with {} remaining on timer", latest, timer.remainingMs());
|
||||
|
||||
CompositePollEvent.Result result = latest.resultOrError();
|
||||
CompositePollEvent.State state = result.state();
|
||||
|
||||
if (state == CompositePollEvent.State.COMPLETE) {
|
||||
// Make sure to clear out the latest request since it's complete.
|
||||
latest = null;
|
||||
} else if (state == CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED) {
|
||||
processBackgroundEvents();
|
||||
result.nextEventType().ifPresent(t -> submitEvent(t, timer));
|
||||
} else if (state == CompositePollEvent.State.OFFSET_COMMIT_CALLBACKS_REQUIRED) {
|
||||
offsetCommitCallbackInvoker.executeCallbacks();
|
||||
result.nextEventType().ifPresent(t -> submitEvent(t, timer));
|
||||
} else if (state == CompositePollEvent.State.UNKNOWN) {
|
||||
throw new KafkaException("Unexpected poll result received");
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
// If an exception is hit, bubble it up to the user but make sure to clear out the latest request
|
||||
// to signify this one is complete.
|
||||
latest = null;
|
||||
throw ConsumerUtils.maybeWrapAsKafkaException(t);
|
||||
}
|
||||
}
|
||||
|
||||
private void submitEvent(ApplicationEvent.Type type, Timer timer) {
|
||||
long deadlineMs = calculateDeadlineMs(timer);
|
||||
latest = new CompositePollEvent(deadlineMs, time.milliseconds(), type);
|
||||
applicationEventHandler.add(latest);
|
||||
log.debug("Submitted new {} submitted with {} remaining on timer", latest, timer.remainingMs());
|
||||
}
|
||||
}
|
||||
|
||||
private final CompositePollEventInvoker pollInvoker = new CompositePollEventInvoker();
|
||||
|
||||
/**
|
||||
* An {@link org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is created and executes in the
|
||||
* application thread for the purpose of processing {@link BackgroundEvent background events} generated by the
|
||||
|
@ -396,6 +351,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
private Optional<ClientTelemetryReporter> clientTelemetryReporter = Optional.empty();
|
||||
|
||||
private final AsyncConsumerApplicationThreadRequirement asyncApplicationThreadRequirement;
|
||||
private final CompositePollEventInvoker pollInvoker;
|
||||
private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
|
||||
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
|
||||
private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker;
|
||||
|
@ -563,6 +519,13 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
new StreamsRebalanceListenerInvoker(logContext, s));
|
||||
this.backgroundEventProcessor = new BackgroundEventProcessor();
|
||||
this.backgroundEventReaper = backgroundEventReaperFactory.build(logContext);
|
||||
this.pollInvoker = new CompositePollEventInvoker(
|
||||
logContext,
|
||||
time,
|
||||
applicationEventHandler,
|
||||
this::processBackgroundEvents,
|
||||
offsetCommitCallbackInvoker::executeCallbacks
|
||||
);
|
||||
|
||||
// The FetchCollector is only used on the application thread.
|
||||
this.fetchCollector = fetchCollectorFactory.build(logContext,
|
||||
|
@ -637,6 +600,13 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
this.clientTelemetryReporter = Optional.empty();
|
||||
this.autoCommitEnabled = autoCommitEnabled;
|
||||
this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors);
|
||||
this.pollInvoker = new CompositePollEventInvoker(
|
||||
logContext,
|
||||
time,
|
||||
applicationEventHandler,
|
||||
this::processBackgroundEvents,
|
||||
offsetCommitCallbackInvoker::executeCallbacks
|
||||
);
|
||||
this.backgroundEventHandler = new BackgroundEventHandler(
|
||||
backgroundEventQueue,
|
||||
time,
|
||||
|
@ -759,6 +729,13 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
this.streamsRebalanceListenerInvoker = Optional.empty();
|
||||
this.backgroundEventProcessor = new BackgroundEventProcessor();
|
||||
this.backgroundEventReaper = new CompletableEventReaper(logContext);
|
||||
this.pollInvoker = new CompositePollEventInvoker(
|
||||
logContext,
|
||||
time,
|
||||
applicationEventHandler,
|
||||
this::processBackgroundEvents,
|
||||
offsetCommitCallbackInvoker::executeCallbacks
|
||||
);
|
||||
}
|
||||
|
||||
// auxiliary interface for testing
|
||||
|
@ -929,8 +906,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
// returning the records in the fetches. Thus, we trigger a possible wake-up before we poll fetches.
|
||||
wakeupTrigger.maybeTriggerWakeup();
|
||||
|
||||
processBackgroundEvents();
|
||||
offsetCommitCallbackInvoker.executeCallbacks();
|
||||
// processBackgroundEvents();
|
||||
// offsetCommitCallbackInvoker.executeCallbacks();
|
||||
pollInvoker.poll(timer);
|
||||
final Fetch<K, V> fetch = pollForFetches(timer);
|
||||
if (!fetch.isEmpty()) {
|
||||
|
|
|
@ -0,0 +1,96 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.clients.consumer.internals;
|
||||
|
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompositePollEvent;
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.common.utils.Timer;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
|
||||
|
||||
public class CompositePollEventInvoker {
|
||||
|
||||
private final Logger log;
|
||||
private final Time time;
|
||||
private final ApplicationEventHandler applicationEventHandler;
|
||||
private final Runnable backgroundEventProcessor;
|
||||
private final Runnable offsetCommitProcessor;
|
||||
private CompositePollEvent latest;
|
||||
|
||||
public CompositePollEventInvoker(LogContext logContext,
|
||||
Time time,
|
||||
ApplicationEventHandler applicationEventHandler,
|
||||
Runnable backgroundEventProcessor,
|
||||
Runnable offsetCommitProcessor) {
|
||||
this.log = logContext.logger(getClass());
|
||||
this.time = time;
|
||||
this.applicationEventHandler = applicationEventHandler;
|
||||
this.backgroundEventProcessor = backgroundEventProcessor;
|
||||
this.offsetCommitProcessor = offsetCommitProcessor;
|
||||
}
|
||||
|
||||
public void poll(Timer timer) {
|
||||
if (latest == null) {
|
||||
log.debug("latest was null, so submitting new event...");
|
||||
submitEvent(ApplicationEvent.Type.POLL, timer);
|
||||
}
|
||||
|
||||
try {
|
||||
log.debug("Attempting to retrieve result from previously submitted {} with {} remaining on timer", latest, timer.remainingMs());
|
||||
|
||||
CompositePollEvent.Result result = latest.resultOrError();
|
||||
CompositePollEvent.State state = result.state();
|
||||
log.debug("Retrieved result: {}, with state: {}", result, state);
|
||||
|
||||
if (state == CompositePollEvent.State.COMPLETE) {
|
||||
// Make sure to clear out the latest request since it's complete.
|
||||
log.debug("We're supposedly complete with event {}, so clearing...", latest);
|
||||
latest = null;
|
||||
} else if (state == CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED) {
|
||||
log.debug("About to process background events");
|
||||
backgroundEventProcessor.run();
|
||||
log.debug("Done processing background events");
|
||||
result.nextEventType().ifPresent(t -> submitEvent(t, timer));
|
||||
} else if (state == CompositePollEvent.State.OFFSET_COMMIT_CALLBACKS_REQUIRED) {
|
||||
log.debug("About to process offset commits");
|
||||
offsetCommitProcessor.run();
|
||||
log.debug("Done processing offset commits");
|
||||
result.nextEventType().ifPresent(t -> submitEvent(t, timer));
|
||||
} else if (state == CompositePollEvent.State.UNKNOWN) {
|
||||
throw new KafkaException("Unexpected poll result received");
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
log.debug("Caught error, rethrowing...", t);
|
||||
// If an exception is hit, bubble it up to the user but make sure to clear out the latest request
|
||||
// to signify this one is complete.
|
||||
latest = null;
|
||||
throw ConsumerUtils.maybeWrapAsKafkaException(t);
|
||||
}
|
||||
}
|
||||
|
||||
private void submitEvent(ApplicationEvent.Type type, Timer timer) {
|
||||
long deadlineMs = calculateDeadlineMs(timer);
|
||||
latest = new CompositePollEvent(deadlineMs, time.milliseconds(), type);
|
||||
applicationEventHandler.add(latest);
|
||||
log.debug("Submitted new {} submitted with {} remaining on timer", latest, timer.remainingMs());
|
||||
}
|
||||
}
|
|
@ -147,6 +147,7 @@ import java.util.concurrent.TimeoutException;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
@ -2666,8 +2667,7 @@ public class KafkaConsumerTest {
|
|||
consumer.assign(Set.of(tp0));
|
||||
|
||||
// poll once to update with the current metadata
|
||||
consumer.poll(Duration.ofMillis(0));
|
||||
TestUtils.waitForCondition(() -> requestGenerated(client, ApiKeys.FIND_COORDINATOR),
|
||||
waitForConsumerPoll(() -> requestGenerated(client, ApiKeys.FIND_COORDINATOR),
|
||||
"No metadata requests sent");
|
||||
client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, metadata.fetch().nodes().get(0)));
|
||||
|
||||
|
@ -2681,9 +2681,8 @@ public class KafkaConsumerTest {
|
|||
}
|
||||
// poll once again, which should send the list-offset request
|
||||
consumer.seek(tp0, 50L);
|
||||
consumer.poll(Duration.ofMillis(0));
|
||||
// requests: list-offset, fetch
|
||||
TestUtils.waitForCondition(() -> {
|
||||
waitForConsumerPoll(() -> {
|
||||
boolean hasListOffsetRequest = requestGenerated(client, ApiKeys.LIST_OFFSETS);
|
||||
boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH);
|
||||
return hasListOffsetRequest && hasFetchRequest;
|
||||
|
@ -3817,6 +3816,20 @@ public void testPollIdleRatio(GroupProtocol groupProtocol) {
|
|||
return new MetricName(NAME, "plugins", DESCRIPTION, expectedTags);
|
||||
}
|
||||
|
||||
private void waitForConsumerPoll(Supplier<Boolean> testCondition, String conditionDetails) {
|
||||
try {
|
||||
TestUtils.waitForCondition(
|
||||
() -> {
|
||||
consumer.poll(Duration.ZERO);
|
||||
return testCondition.get();
|
||||
},
|
||||
conditionDetails
|
||||
);
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static final String NAME = "name";
|
||||
private static final String DESCRIPTION = "description";
|
||||
private static final LinkedHashMap<String, String> TAGS = new LinkedHashMap<>();
|
||||
|
|
|
@ -112,6 +112,7 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
@ -125,7 +126,9 @@ import java.util.concurrent.LinkedBlockingQueue;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
|
@ -509,9 +512,11 @@ public class AsyncKafkaConsumerTest {
|
|||
completeTopicSubscriptionChangeEventSuccessfully();
|
||||
consumer.subscribe(Collections.singletonList(topicName), listener);
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
markResultForCompositePollEvent();
|
||||
consumer.poll(Duration.ZERO);
|
||||
assertTrue(callbackExecuted.get());
|
||||
markResultForCompositePollEvent(CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED);
|
||||
waitForConsumerPoll(
|
||||
callbackExecuted::get,
|
||||
"Consumer.poll() did not execute callback within timeout"
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -679,8 +684,11 @@ public class AsyncKafkaConsumerTest {
|
|||
consumer.assign(Collections.singleton(new TopicPartition("foo", 0)));
|
||||
assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback));
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
markResultForCompositePollEvent();
|
||||
assertMockCommitCallbackInvoked(() -> consumer.poll(Duration.ZERO), callback);
|
||||
markResultForCompositePollEvent(CompositePollEvent.State.OFFSET_COMMIT_CALLBACKS_REQUIRED);
|
||||
waitForConsumerPoll(
|
||||
() -> callback.invoked == 1 && callback.exception == null,
|
||||
"Consumer.poll() did not execute the callback once (without error) in allottec timeout"
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1461,7 +1469,7 @@ public class AsyncKafkaConsumerTest {
|
|||
int expectedRevokedCount,
|
||||
int expectedAssignedCount,
|
||||
int expectedLostCount,
|
||||
Optional<RuntimeException> expectedException
|
||||
Optional<RuntimeException> expectedExceptionOpt
|
||||
) {
|
||||
consumer = newConsumer();
|
||||
CounterConsumerRebalanceListener consumerRebalanceListener = new CounterConsumerRebalanceListener(
|
||||
|
@ -1480,14 +1488,21 @@ public class AsyncKafkaConsumerTest {
|
|||
}
|
||||
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
markResultForCompositePollEvent();
|
||||
markResultForCompositePollEvent(CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED);
|
||||
|
||||
// This will trigger the background event queue to process our background event message.
|
||||
// If any error is happening inside the rebalance callbacks, we expect the first exception to be thrown from poll.
|
||||
if (expectedException.isPresent()) {
|
||||
Exception exception = assertThrows(expectedException.get().getClass(), () -> consumer.poll(Duration.ZERO));
|
||||
assertEquals(expectedException.get().getMessage(), exception.getMessage());
|
||||
assertEquals(expectedException.get().getCause(), exception.getCause());
|
||||
if (expectedExceptionOpt.isPresent()) {
|
||||
Exception expectedException = expectedExceptionOpt.get();
|
||||
|
||||
waitForConsumerPollException(
|
||||
e ->
|
||||
Objects.equals(e.getClass(), expectedException.getClass()) &&
|
||||
Objects.equals(e.getMessage(), expectedException.getMessage()) &&
|
||||
Objects.equals(e.getCause(), expectedException.getCause())
|
||||
,
|
||||
"Consumer.poll() did not throw the expected exception " + expectedException
|
||||
);
|
||||
} else {
|
||||
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
|
||||
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
|
||||
|
@ -1552,10 +1567,11 @@ public class AsyncKafkaConsumerTest {
|
|||
completeAssignmentChangeEventSuccessfully();
|
||||
consumer.assign(singletonList(new TopicPartition("topic", 0)));
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
markResultForCompositePollEvent();
|
||||
final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO));
|
||||
|
||||
assertEquals(expectedException.getMessage(), exception.getMessage());
|
||||
markResultForCompositePollEvent(CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED);
|
||||
waitForConsumerPollException(
|
||||
e -> e.getMessage().equals(expectedException.getMessage()),
|
||||
"Consumer.poll() did not fail with expected exception " + expectedException + " within timeout"
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1572,10 +1588,11 @@ public class AsyncKafkaConsumerTest {
|
|||
completeAssignmentChangeEventSuccessfully();
|
||||
consumer.assign(singletonList(new TopicPartition("topic", 0)));
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
markResultForCompositePollEvent();
|
||||
final KafkaException exception = assertThrows(KafkaException.class, () -> consumer.poll(Duration.ZERO));
|
||||
|
||||
assertEquals(expectedException1.getMessage(), exception.getMessage());
|
||||
markResultForCompositePollEvent(CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED);
|
||||
waitForConsumerPollException(
|
||||
e -> e.getMessage().equals(expectedException1.getMessage()),
|
||||
"Consumer.poll() did not fail with expected exception " + expectedException1 + " within timeout"
|
||||
);
|
||||
assertTrue(backgroundEventQueue.isEmpty());
|
||||
}
|
||||
|
||||
|
@ -1849,7 +1866,12 @@ public class AsyncKafkaConsumerTest {
|
|||
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
markResultForCompositePollEvent();
|
||||
consumer.poll(Duration.ZERO);
|
||||
markResultForCompositePollEvent(CompositePollEvent.State.BACKGROUND_EVENT_PROCESSING_REQUIRED);
|
||||
|
||||
waitForConsumerPoll(
|
||||
() -> backgroundEventReaper.size() == 0,
|
||||
"Consumer.poll() did not reap background events within timeout"
|
||||
);
|
||||
verify(backgroundEventReaper).reap(time.milliseconds());
|
||||
}
|
||||
|
||||
|
@ -2300,4 +2322,44 @@ public class AsyncKafkaConsumerTest {
|
|||
doAnswer(invocation -> null)
|
||||
.when(applicationEventHandler).add(ArgumentMatchers.isA(CompositePollEvent.class));
|
||||
}
|
||||
|
||||
private void markResultForCompositePollEvent(CompositePollEvent.State state) {
|
||||
doAnswer(invocation -> {
|
||||
CompositePollEvent event = invocation.getArgument(0);
|
||||
event.complete(state, Optional.empty());
|
||||
return null;
|
||||
}).when(applicationEventHandler).add(ArgumentMatchers.isA(CompositePollEvent.class));
|
||||
}
|
||||
|
||||
private void waitForConsumerPoll(Supplier<Boolean> testCondition, String conditionDetails) {
|
||||
try {
|
||||
TestUtils.waitForCondition(
|
||||
() -> {
|
||||
consumer.poll(Duration.ZERO);
|
||||
return testCondition.get();
|
||||
},
|
||||
conditionDetails
|
||||
);
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void waitForConsumerPollException(Function<KafkaException, Boolean> testCondition, String conditionDetails) {
|
||||
try {
|
||||
TestUtils.waitForCondition(
|
||||
() -> {
|
||||
try {
|
||||
consumer.poll(Duration.ZERO);
|
||||
return false;
|
||||
} catch (KafkaException e) {
|
||||
return testCondition.apply(e);
|
||||
}
|
||||
},
|
||||
conditionDetails
|
||||
);
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue