mirror of https://github.com/apache/kafka.git
Refactor POLL into ASYNC_POLL and SHARE_POLL
This commit is contained in:
parent
5041a36e6a
commit
f6864a3ac4
|
@ -41,6 +41,8 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandle
|
|||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
|
||||
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.AsyncPollEventProcessorContext;
|
||||
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent;
|
||||
|
@ -49,8 +51,6 @@ import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent;
|
|||
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompositePollEventInvoker;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompositePollEventProcessorContext;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
|
||||
|
@ -326,7 +326,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
// Init value is needed to avoid NPE in case of exception raised in the constructor
|
||||
private Optional<ClientTelemetryReporter> clientTelemetryReporter = Optional.empty();
|
||||
|
||||
private final CompositePollEventInvoker pollInvoker;
|
||||
private AsyncPollEvent inflightPoll;
|
||||
private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
|
||||
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
|
||||
private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker;
|
||||
|
@ -462,7 +462,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
streamsRebalanceData
|
||||
);
|
||||
final CompletableEventReaper applicationEventReaper = new CompletableEventReaper(logContext);
|
||||
final Supplier<CompositePollEventProcessorContext> compositePollContextSupplier = CompositePollEventProcessorContext.supplier(
|
||||
final Supplier<AsyncPollEventProcessorContext> asyncPollContextSupplier = AsyncPollEventProcessorContext.supplier(
|
||||
logContext,
|
||||
networkClientDelegateSupplier,
|
||||
backgroundEventHandler,
|
||||
|
@ -474,7 +474,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
metadata,
|
||||
subscriptions,
|
||||
requestManagersSupplier,
|
||||
compositePollContextSupplier
|
||||
asyncPollContextSupplier
|
||||
);
|
||||
this.applicationEventHandler = applicationEventHandlerFactory.build(
|
||||
logContext,
|
||||
|
@ -496,15 +496,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
new StreamsRebalanceListenerInvoker(logContext, s));
|
||||
this.backgroundEventProcessor = new BackgroundEventProcessor();
|
||||
this.backgroundEventReaper = backgroundEventReaperFactory.build(logContext);
|
||||
this.pollInvoker = new CompositePollEventInvoker(
|
||||
logContext,
|
||||
time,
|
||||
applicationEventHandler,
|
||||
() -> {
|
||||
offsetCommitCallbackInvoker.executeCallbacks();
|
||||
processBackgroundEvents();
|
||||
}
|
||||
);
|
||||
|
||||
// The FetchCollector is only used on the application thread.
|
||||
this.fetchCollector = fetchCollectorFactory.build(logContext,
|
||||
|
@ -579,15 +570,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
this.clientTelemetryReporter = Optional.empty();
|
||||
this.autoCommitEnabled = autoCommitEnabled;
|
||||
this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(interceptors);
|
||||
this.pollInvoker = new CompositePollEventInvoker(
|
||||
logContext,
|
||||
time,
|
||||
applicationEventHandler,
|
||||
() -> {
|
||||
offsetCommitCallbackInvoker.executeCallbacks();
|
||||
processBackgroundEvents();
|
||||
}
|
||||
);
|
||||
this.backgroundEventHandler = new BackgroundEventHandler(
|
||||
backgroundEventQueue,
|
||||
time,
|
||||
|
@ -682,7 +664,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
Optional.empty()
|
||||
);
|
||||
final CompletableEventReaper applicationEventReaper = new CompletableEventReaper(logContext);
|
||||
final Supplier<CompositePollEventProcessorContext> compositePollContextSupplier = CompositePollEventProcessorContext.supplier(
|
||||
final Supplier<AsyncPollEventProcessorContext> asyncPollContextSupplier = AsyncPollEventProcessorContext.supplier(
|
||||
logContext,
|
||||
networkClientDelegateSupplier,
|
||||
backgroundEventHandler,
|
||||
|
@ -695,7 +677,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
metadata,
|
||||
subscriptions,
|
||||
requestManagersSupplier,
|
||||
compositePollContextSupplier
|
||||
asyncPollContextSupplier
|
||||
);
|
||||
this.applicationEventHandler = new ApplicationEventHandler(logContext,
|
||||
time,
|
||||
|
@ -708,15 +690,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
this.streamsRebalanceListenerInvoker = Optional.empty();
|
||||
this.backgroundEventProcessor = new BackgroundEventProcessor();
|
||||
this.backgroundEventReaper = new CompletableEventReaper(logContext);
|
||||
this.pollInvoker = new CompositePollEventInvoker(
|
||||
logContext,
|
||||
time,
|
||||
applicationEventHandler,
|
||||
() -> {
|
||||
offsetCommitCallbackInvoker.executeCallbacks();
|
||||
processBackgroundEvents();
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
// auxiliary interface for testing
|
||||
|
@ -887,7 +860,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
// returning the records in the fetches. Thus, we trigger a possible wake-up before we poll fetches.
|
||||
wakeupTrigger.maybeTriggerWakeup();
|
||||
|
||||
pollInvoker.poll(timer);
|
||||
checkInflightPollResult(timer);
|
||||
final Fetch<K, V> fetch = pollForFetches(timer);
|
||||
if (!fetch.isEmpty()) {
|
||||
// before returning the fetched records, we can send off the next round of fetches
|
||||
|
@ -915,6 +888,77 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@code checkInflightPollResult()} manages the lifetime of the {@link AsyncPollEvent} processing. If it is
|
||||
* called when no event is currently processing, it will start a new event processing asynchronously. A check
|
||||
* is made during each invocation to see if the <em>inflight</em> event has reached a
|
||||
* {@link AsyncPollEvent.State terminal state}. If it has, the result will be processed accordingly.
|
||||
*/
|
||||
public void checkInflightPollResult(Timer timer) {
|
||||
if (inflightPoll == null) {
|
||||
log.trace("No existing inflight async poll event, submitting a new event");
|
||||
submitEvent(ApplicationEvent.Type.ASYNC_POLL, timer);
|
||||
}
|
||||
|
||||
try {
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace(
|
||||
"Attempting to retrieve result from previously submitted {} with {} remaining on timer",
|
||||
inflightPoll,
|
||||
timer.remainingMs()
|
||||
);
|
||||
}
|
||||
|
||||
// Result should be non-null and starts off as State.STARTED.
|
||||
AsyncPollEvent.Result result = inflightPoll.result();
|
||||
AsyncPollEvent.State state = result.state();
|
||||
|
||||
if (state == AsyncPollEvent.State.SUCCEEDED) {
|
||||
// The async poll event has completed all the requisite stages, though it does not imply that
|
||||
// there is data in the FetchBuffer yet. Make sure to clear out the inflight request.
|
||||
log.trace("Event {} completed, clearing inflight", inflightPoll);
|
||||
inflightPoll = null;
|
||||
} else if (state == AsyncPollEvent.State.FAILED) {
|
||||
// The async poll failed at one of the stages. Make sure to clear out the inflight request
|
||||
// before the underlying error is surfaced to the user.
|
||||
log.trace("Event {} failed, clearing inflight", inflightPoll);
|
||||
inflightPoll = null;
|
||||
|
||||
throw result.asKafkaException();
|
||||
} else if (state == AsyncPollEvent.State.CALLBACKS_REQUIRED) {
|
||||
// The background thread detected that it needed to yield to the application thread to invoke
|
||||
// callbacks. Even though the inflight reference _should_ be overwritten when the next stage of
|
||||
// the event is submitted, go ahead and clear out the inflight request just to be sure.
|
||||
log.trace("Event {} paused for callbacks, clearing inflight", inflightPoll);
|
||||
inflightPoll = null;
|
||||
|
||||
// Note: this is calling user-supplied code, so make sure to handle possible errors.
|
||||
offsetCommitCallbackInvoker.executeCallbacks();
|
||||
processBackgroundEvents();
|
||||
|
||||
// The application thread callbacks are complete. Create another event to resume the polling at
|
||||
// the next stage.
|
||||
submitEvent(result.asNextEventType(), timer);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
// If an exception is hit, bubble it up to the user but make sure to clear out the inflight request
|
||||
// because the error effectively renders it complete.
|
||||
log.debug("Event {} failed due to {}, clearing inflight", inflightPoll, String.valueOf(t));
|
||||
inflightPoll = null;
|
||||
throw ConsumerUtils.maybeWrapAsKafkaException(t);
|
||||
}
|
||||
}
|
||||
|
||||
private void submitEvent(ApplicationEvent.Type type, Timer timer) {
|
||||
long deadlineMs = calculateDeadlineMs(timer);
|
||||
long pollTimeMs = time.milliseconds();
|
||||
inflightPoll = new AsyncPollEvent(deadlineMs, pollTimeMs, type);
|
||||
applicationEventHandler.add(inflightPoll);
|
||||
|
||||
if (log.isTraceEnabled())
|
||||
log.trace("Submitted new {} with {} remaining on timer", inflightPoll, timer.remainingMs());
|
||||
}
|
||||
|
||||
/**
|
||||
* Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and
|
||||
* partitions.
|
||||
|
|
|
@ -38,13 +38,13 @@ import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
|
|||
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
|
||||
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeAsyncEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeSyncEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.SharePollEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
|
||||
|
@ -583,7 +583,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
|
|||
|
||||
do {
|
||||
// Make sure the network thread can tell the application is actively polling
|
||||
applicationEventHandler.add(new PollEvent(timer.currentTimeMs()));
|
||||
applicationEventHandler.add(new SharePollEvent(timer.currentTimeMs()));
|
||||
|
||||
processBackgroundEvents();
|
||||
|
||||
|
|
|
@ -28,14 +28,14 @@ import java.util.Objects;
|
|||
public abstract class ApplicationEvent {
|
||||
|
||||
public enum Type {
|
||||
COMMIT_ASYNC, COMMIT_SYNC, COMPOSITE_POLL, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE,
|
||||
COMMIT_ASYNC, COMMIT_SYNC, ASYNC_POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE,
|
||||
LIST_OFFSETS, CHECK_AND_UPDATE_POSITIONS, RESET_OFFSET, TOPIC_METADATA, ALL_TOPICS_METADATA,
|
||||
TOPIC_SUBSCRIPTION_CHANGE, TOPIC_PATTERN_SUBSCRIPTION_CHANGE, TOPIC_RE2J_PATTERN_SUBSCRIPTION_CHANGE,
|
||||
UPDATE_SUBSCRIPTION_METADATA, UNSUBSCRIBE,
|
||||
CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
|
||||
COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS, LEAVE_GROUP_ON_CLOSE, STOP_FIND_COORDINATOR_ON_CLOSE,
|
||||
PAUSE_PARTITIONS, RESUME_PARTITIONS, CURRENT_LAG,
|
||||
SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
|
||||
SHARE_POLL, SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
|
||||
SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE,
|
||||
SHARE_ACKNOWLEDGE_ON_CLOSE,
|
||||
SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK_REGISTRATION,
|
||||
|
|
|
@ -61,19 +61,19 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
private final ConsumerMetadata metadata;
|
||||
private final SubscriptionState subscriptions;
|
||||
private final RequestManagers requestManagers;
|
||||
private final Optional<CompositePollEventProcessorContext> compositePollContext;
|
||||
private final Optional<AsyncPollEventProcessorContext> asyncPollContext;
|
||||
private int metadataVersionSnapshot;
|
||||
|
||||
public ApplicationEventProcessor(final LogContext logContext,
|
||||
final RequestManagers requestManagers,
|
||||
final ConsumerMetadata metadata,
|
||||
final SubscriptionState subscriptions,
|
||||
final Optional<CompositePollEventProcessorContext> compositePollContext) {
|
||||
final Optional<AsyncPollEventProcessorContext> asyncPollContext) {
|
||||
this.log = logContext.logger(ApplicationEventProcessor.class);
|
||||
this.requestManagers = requestManagers;
|
||||
this.metadata = metadata;
|
||||
this.subscriptions = subscriptions;
|
||||
this.compositePollContext = compositePollContext;
|
||||
this.asyncPollContext = asyncPollContext;
|
||||
this.metadataVersionSnapshot = metadata.updateVersion();
|
||||
}
|
||||
|
||||
|
@ -88,16 +88,20 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
final RequestManagers requestManagers,
|
||||
final ConsumerMetadata metadata,
|
||||
final SubscriptionState subscriptions,
|
||||
final CompositePollEventProcessorContext compositePollContext) {
|
||||
this(logContext, requestManagers, metadata, subscriptions, Optional.of(compositePollContext));
|
||||
final AsyncPollEventProcessorContext asyncPollContext) {
|
||||
this(logContext, requestManagers, metadata, subscriptions, Optional.of(asyncPollContext));
|
||||
}
|
||||
|
||||
@SuppressWarnings({"CyclomaticComplexity", "JavaNCSSCheck"})
|
||||
@Override
|
||||
public void process(ApplicationEvent event) {
|
||||
switch (event.type()) {
|
||||
case COMPOSITE_POLL:
|
||||
process((CompositePollEvent) event);
|
||||
case ASYNC_POLL:
|
||||
process((AsyncPollEvent) event);
|
||||
return;
|
||||
|
||||
case SHARE_POLL:
|
||||
process((SharePollEvent) event);
|
||||
return;
|
||||
|
||||
case COMMIT_ASYNC:
|
||||
|
@ -108,10 +112,6 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
process((SyncCommitEvent) event);
|
||||
return;
|
||||
|
||||
case POLL:
|
||||
process((PollEvent) event);
|
||||
return;
|
||||
|
||||
case FETCH_COMMITTED_OFFSETS:
|
||||
process((FetchCommittedOffsetsEvent) event);
|
||||
return;
|
||||
|
@ -241,38 +241,13 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
}
|
||||
}
|
||||
|
||||
private void process(final PollEvent event) {
|
||||
processPollEvent(event.pollTimeMs());
|
||||
event.markReconcileAndAutoCommitComplete();
|
||||
}
|
||||
|
||||
private void processPollEvent(final long pollTimeMs) {
|
||||
// Trigger a reconciliation that can safely commit offsets if needed to rebalance,
|
||||
// as we're processing before any new fetching starts in the app thread
|
||||
private void process(final SharePollEvent event) {
|
||||
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
|
||||
consumerMembershipManager.maybeReconcile(true));
|
||||
if (requestManagers.commitRequestManager.isPresent()) {
|
||||
CommitRequestManager commitRequestManager = requestManagers.commitRequestManager.get();
|
||||
commitRequestManager.updateTimerAndMaybeCommit(pollTimeMs);
|
||||
// all commit request generation points have been passed,
|
||||
// so it's safe to notify the app thread could proceed and start fetching
|
||||
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
|
||||
hrm.membershipManager().onConsumerPoll();
|
||||
hrm.resetPollTimer(pollTimeMs);
|
||||
});
|
||||
requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> {
|
||||
hrm.membershipManager().onConsumerPoll();
|
||||
hrm.resetPollTimer(pollTimeMs);
|
||||
});
|
||||
} else {
|
||||
// safe to unblock - no auto-commit risk here:
|
||||
// 1. commitRequestManager is not present
|
||||
// 2. shareConsumer has no auto-commit mechanism
|
||||
requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
|
||||
hrm.membershipManager().onConsumerPoll();
|
||||
hrm.resetPollTimer(pollTimeMs);
|
||||
});
|
||||
}
|
||||
requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
|
||||
hrm.membershipManager().onConsumerPoll();
|
||||
hrm.resetPollTimer(event.pollTimeMs());
|
||||
});
|
||||
}
|
||||
|
||||
private void process(final CreateFetchRequestsEvent event) {
|
||||
|
@ -757,16 +732,35 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
requestManagers.streamsMembershipManager.get().onAllTasksLostCallbackCompleted(event);
|
||||
}
|
||||
|
||||
private void process(final CompositePollEvent event) {
|
||||
CompositePollEventProcessorContext context = compositePollContext.orElseThrow(IllegalArgumentException::new);
|
||||
private void process(final AsyncPollEvent event) {
|
||||
AsyncPollEventProcessorContext context = asyncPollContext.orElseThrow(IllegalArgumentException::new);
|
||||
ApplicationEvent.Type nextEventType = event.startingEventType();
|
||||
|
||||
if (context.maybeCompleteExceptionally(event) || context.maybeCompleteWithCallbackRequired(event, nextEventType))
|
||||
return;
|
||||
|
||||
if (nextEventType == ApplicationEvent.Type.POLL) {
|
||||
if (nextEventType == ApplicationEvent.Type.ASYNC_POLL) {
|
||||
log.debug("Processing {} logic for {}", nextEventType, event);
|
||||
processPollEvent(event.pollTimeMs());
|
||||
|
||||
// Trigger a reconciliation that can safely commit offsets if needed to rebalance,
|
||||
// as we're processing before any new fetching starts
|
||||
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
|
||||
consumerMembershipManager.maybeReconcile(true));
|
||||
|
||||
if (requestManagers.commitRequestManager.isPresent()) {
|
||||
CommitRequestManager commitRequestManager = requestManagers.commitRequestManager.get();
|
||||
commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
|
||||
|
||||
requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
|
||||
hrm.membershipManager().onConsumerPoll();
|
||||
hrm.resetPollTimer(event.pollTimeMs());
|
||||
});
|
||||
requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> {
|
||||
hrm.membershipManager().onConsumerPoll();
|
||||
hrm.resetPollTimer(event.pollTimeMs());
|
||||
});
|
||||
}
|
||||
|
||||
nextEventType = ApplicationEvent.Type.UPDATE_SUBSCRIPTION_METADATA;
|
||||
|
||||
if (context.maybeCompleteExceptionally(event) || context.maybeCompleteWithCallbackRequired(event, nextEventType))
|
||||
|
@ -805,7 +799,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
return;
|
||||
}
|
||||
|
||||
context.completeExceptionally(event, new KafkaException("Unknown next step for composite poll: " + nextEventType));
|
||||
context.completeExceptionally(event, new KafkaException("Unknown next step for async poll: " + nextEventType));
|
||||
}
|
||||
|
||||
private <T> BiConsumer<? super T, ? super Throwable> complete(final CompletableFuture<T> b) {
|
||||
|
@ -847,18 +841,18 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
|
|||
final ConsumerMetadata metadata,
|
||||
final SubscriptionState subscriptions,
|
||||
final Supplier<RequestManagers> requestManagersSupplier,
|
||||
final Supplier<CompositePollEventProcessorContext> compositePollEventProcessorContextSupplier) {
|
||||
final Supplier<AsyncPollEventProcessorContext> asyncPollContextSupplier) {
|
||||
return new CachedSupplier<>() {
|
||||
@Override
|
||||
protected ApplicationEventProcessor create() {
|
||||
RequestManagers requestManagers = requestManagersSupplier.get();
|
||||
CompositePollEventProcessorContext compositePollContext = compositePollEventProcessorContextSupplier.get();
|
||||
AsyncPollEventProcessorContext asyncPollContext = asyncPollContextSupplier.get();
|
||||
return new ApplicationEventProcessor(
|
||||
logContext,
|
||||
requestManagers,
|
||||
metadata,
|
||||
subscriptions,
|
||||
compositePollContext
|
||||
asyncPollContext
|
||||
);
|
||||
}
|
||||
};
|
||||
|
|
|
@ -33,7 +33,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
* This class represents the non-blocking event that executes logic functionally equivalent to the following:
|
||||
*
|
||||
* <ul>
|
||||
* <li>{@link PollEvent}</li>
|
||||
* <li>{@link SharePollEvent}</li>
|
||||
* <li>{@link UpdatePatternSubscriptionEvent}</li>
|
||||
* <li>{@link CheckAndUpdatePositionsEvent}</li>
|
||||
* <li>{@link CreateFetchRequestsEvent}</li>
|
||||
|
@ -46,8 +46,8 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
*
|
||||
* <p/>
|
||||
*
|
||||
* When the {@code CompositePollEvent} is created, it exists in the {@link State#STARTED} state. The background
|
||||
* thread will execute the {@code CompositePollEvent} until it completes successfully ({@link State#SUCCEEDED}),
|
||||
* When the {@code AsyncPollEvent} is created, it exists in the {@link State#STARTED} state. The background
|
||||
* thread will execute the {@code AsyncPollEvent} until it completes successfully ({@link State#SUCCEEDED}),
|
||||
* hits an error ({@link State#FAILED}), or detects that the application thread needs to execute callbacks
|
||||
* ({@link State#CALLBACKS_REQUIRED}).
|
||||
*
|
||||
|
@ -58,7 +58,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
* application thread. The background thread is able to detect when it needs to complete processing so that the
|
||||
* application thread can execute the awaiting callbacks.
|
||||
*/
|
||||
public class CompositePollEvent extends ApplicationEvent {
|
||||
public class AsyncPollEvent extends ApplicationEvent {
|
||||
|
||||
public enum State {
|
||||
|
||||
|
@ -131,8 +131,8 @@ public class CompositePollEvent extends ApplicationEvent {
|
|||
}
|
||||
|
||||
private static final List<Type> ALLOWED_STARTING_EVENT_TYPES = List.of(
|
||||
Type.ASYNC_POLL,
|
||||
Type.CHECK_AND_UPDATE_POSITIONS,
|
||||
Type.POLL,
|
||||
Type.UPDATE_SUBSCRIPTION_METADATA
|
||||
);
|
||||
private final long deadlineMs;
|
||||
|
@ -140,6 +140,17 @@ public class CompositePollEvent extends ApplicationEvent {
|
|||
private final Type startingEventType;
|
||||
private final AtomicReference<Result> result;
|
||||
|
||||
/**
|
||||
* Creates a new event to signify a multi-stage processing of {@link Consumer#poll(Duration)} logic.
|
||||
*
|
||||
* @param deadlineMs Time, in milliseconds, at which point the event must be completed; based on the
|
||||
* {@link Duration} passed to {@link Consumer#poll(Duration)}
|
||||
* @param pollTimeMs Time, in milliseconds, at which point the event was created
|
||||
*/
|
||||
public AsyncPollEvent(long deadlineMs, long pollTimeMs) {
|
||||
this(deadlineMs, pollTimeMs, Type.ASYNC_POLL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new event to signify a multi-stage processing of {@link Consumer#poll(Duration)} logic.
|
||||
*
|
||||
|
@ -148,8 +159,8 @@ public class CompositePollEvent extends ApplicationEvent {
|
|||
* @param pollTimeMs Time, in milliseconds, at which point the event was created
|
||||
* @param startingEventType {@link ApplicationEvent.Type} that serves as the starting point for the event processing
|
||||
*/
|
||||
public CompositePollEvent(long deadlineMs, long pollTimeMs, Type startingEventType) {
|
||||
super(Type.COMPOSITE_POLL);
|
||||
public AsyncPollEvent(long deadlineMs, long pollTimeMs, Type startingEventType) {
|
||||
super(Type.ASYNC_POLL);
|
||||
this.deadlineMs = deadlineMs;
|
||||
this.pollTimeMs = pollTimeMs;
|
||||
|
|
@ -35,10 +35,10 @@ import java.util.function.Supplier;
|
|||
|
||||
/**
|
||||
* This provides the context for the {@link ApplicationEventProcessor#process(ApplicationEvent)} that invokes the
|
||||
* {@link CompositePollEvent} process method. This is mostly to avoid polluting the {@link ApplicationEventProcessor}
|
||||
* with instance variables and logic that's specific only to the background {@link CompositePollEvent} processing.
|
||||
* {@link AsyncPollEvent} process method. This is mostly to avoid polluting the {@link ApplicationEventProcessor}
|
||||
* with instance variables and logic that's specific only to the background {@link AsyncPollEvent} processing.
|
||||
*/
|
||||
public class CompositePollEventProcessorContext {
|
||||
public class AsyncPollEventProcessorContext {
|
||||
|
||||
private final Logger log;
|
||||
private final NetworkClientDelegate networkClientDelegate;
|
||||
|
@ -47,12 +47,12 @@ public class CompositePollEventProcessorContext {
|
|||
private final CompletableEventReaper applicationEventReaper;
|
||||
private final FetchBuffer fetchBuffer;
|
||||
|
||||
private CompositePollEventProcessorContext(LogContext logContext,
|
||||
NetworkClientDelegate networkClientDelegate,
|
||||
BackgroundEventHandler backgroundEventHandler,
|
||||
OffsetCommitCallbackInvoker offsetCommitCallbackInvoker,
|
||||
CompletableEventReaper applicationEventReaper,
|
||||
FetchBuffer fetchBuffer) {
|
||||
private AsyncPollEventProcessorContext(LogContext logContext,
|
||||
NetworkClientDelegate networkClientDelegate,
|
||||
BackgroundEventHandler backgroundEventHandler,
|
||||
OffsetCommitCallbackInvoker offsetCommitCallbackInvoker,
|
||||
CompletableEventReaper applicationEventReaper,
|
||||
FetchBuffer fetchBuffer) {
|
||||
this.log = logContext.logger(getClass());
|
||||
this.networkClientDelegate = networkClientDelegate;
|
||||
this.backgroundEventHandler = backgroundEventHandler;
|
||||
|
@ -65,18 +65,18 @@ public class CompositePollEventProcessorContext {
|
|||
* Creates a {@link Supplier} for deferred creation during invocation by
|
||||
* {@link ConsumerNetworkThread}.
|
||||
*/
|
||||
public static Supplier<CompositePollEventProcessorContext> supplier(LogContext logContext,
|
||||
Supplier<NetworkClientDelegate> networkClientDelegateSupplier,
|
||||
BackgroundEventHandler backgroundEventHandler,
|
||||
OffsetCommitCallbackInvoker offsetCommitCallbackInvoker,
|
||||
CompletableEventReaper applicationEventReaper,
|
||||
FetchBuffer fetchBuffer) {
|
||||
public static Supplier<AsyncPollEventProcessorContext> supplier(LogContext logContext,
|
||||
Supplier<NetworkClientDelegate> networkClientDelegateSupplier,
|
||||
BackgroundEventHandler backgroundEventHandler,
|
||||
OffsetCommitCallbackInvoker offsetCommitCallbackInvoker,
|
||||
CompletableEventReaper applicationEventReaper,
|
||||
FetchBuffer fetchBuffer) {
|
||||
return new CachedSupplier<>() {
|
||||
@Override
|
||||
protected CompositePollEventProcessorContext create() {
|
||||
protected AsyncPollEventProcessorContext create() {
|
||||
NetworkClientDelegate networkClientDelegate = networkClientDelegateSupplier.get();
|
||||
|
||||
return new CompositePollEventProcessorContext(
|
||||
return new AsyncPollEventProcessorContext(
|
||||
logContext,
|
||||
networkClientDelegate,
|
||||
backgroundEventHandler,
|
||||
|
@ -117,20 +117,23 @@ public class CompositePollEventProcessorContext {
|
|||
|
||||
/**
|
||||
* Helper method that will check if any application thread user callbacks need to be executed. If so, the
|
||||
* current event will be completed with {@link CompositePollEvent.State#CALLBACKS_REQUIRED} and this method
|
||||
* current event will be completed with {@link AsyncPollEvent.State#CALLBACKS_REQUIRED} and this method
|
||||
* will return {@code true}. Otherwise, it will return {@code false}.
|
||||
*/
|
||||
public boolean maybeCompleteWithCallbackRequired(CompositePollEvent event, ApplicationEvent.Type nextEventType) {
|
||||
public boolean maybeCompleteWithCallbackRequired(AsyncPollEvent event, ApplicationEvent.Type nextEventType) {
|
||||
// If there are background events to process or enqueued callbacks to invoke, exit to
|
||||
// the application thread.
|
||||
if (backgroundEventHandler.size() > 0 || offsetCommitCallbackInvoker.size() > 0) {
|
||||
log.trace(
|
||||
"Pausing polling by completing {} with the state of {} and the next stage of {}",
|
||||
event,
|
||||
CompositePollEvent.State.CALLBACKS_REQUIRED,
|
||||
AsyncPollEvent.State.CALLBACKS_REQUIRED,
|
||||
nextEventType
|
||||
);
|
||||
event.completeWithCallbackRequired(nextEventType);
|
||||
|
||||
// This to ensure that the application thread doesn't needlessly wait for the full time out if it's
|
||||
// been detected that a callback is required.
|
||||
fetchBuffer.wakeup();
|
||||
return true;
|
||||
}
|
||||
|
@ -142,10 +145,10 @@ public class CompositePollEventProcessorContext {
|
|||
* Helper method that checks if there's a non-null error from
|
||||
* {@link NetworkClientDelegate#getAndClearMetadataError()} or if the provided exception is not a timeout-based
|
||||
* exception. If there's an error to report to the user, the current event will be completed with
|
||||
* {@link CompositePollEvent.State#FAILED} and this method will return {@code true}. Otherwise, it will
|
||||
* {@link AsyncPollEvent.State#FAILED} and this method will return {@code true}. Otherwise, it will
|
||||
* return {@code false}.
|
||||
*/
|
||||
public boolean maybeCompleteExceptionally(CompositePollEvent event, Throwable t) {
|
||||
public boolean maybeCompleteExceptionally(AsyncPollEvent event, Throwable t) {
|
||||
if (maybeCompleteExceptionally(event))
|
||||
return true;
|
||||
|
||||
|
@ -153,7 +156,7 @@ public class CompositePollEventProcessorContext {
|
|||
return false;
|
||||
|
||||
if (t instanceof org.apache.kafka.common.errors.TimeoutException || t instanceof java.util.concurrent.TimeoutException) {
|
||||
log.debug("Ignoring timeout for CompositePollEvent {}: {}", event, t.getMessage());
|
||||
log.trace("Ignoring timeout for {}: {}", event, t.getMessage());
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -168,10 +171,10 @@ public class CompositePollEventProcessorContext {
|
|||
/**
|
||||
* Helper method that checks if there's a non-null error from
|
||||
* {@link NetworkClientDelegate#getAndClearMetadataError()}, and if so, reports it to the user by completing the
|
||||
* current event with {@link CompositePollEvent.State#FAILED} and returning {@code true}. Otherwise, it will
|
||||
* current event with {@link AsyncPollEvent.State#FAILED} and returning {@code true}. Otherwise, it will
|
||||
* return {@code false}.
|
||||
*/
|
||||
public boolean maybeCompleteExceptionally(CompositePollEvent event) {
|
||||
public boolean maybeCompleteExceptionally(AsyncPollEvent event) {
|
||||
Optional<Exception> exception = networkClientDelegate.getAndClearMetadataError();
|
||||
|
||||
if (exception.isPresent()) {
|
||||
|
@ -183,18 +186,18 @@ public class CompositePollEventProcessorContext {
|
|||
}
|
||||
|
||||
/**
|
||||
* Helper method to complete the given event with {@link CompositePollEvent.State#FAILED}.
|
||||
* Helper method to complete the given event with {@link AsyncPollEvent.State#FAILED}.
|
||||
*/
|
||||
public void completeExceptionally(CompositePollEvent event, Throwable error) {
|
||||
public void completeExceptionally(AsyncPollEvent event, Throwable error) {
|
||||
KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(error);
|
||||
event.completeExceptionally(e);
|
||||
log.trace("Failing event processing for {}", event, e);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to complete the given event with {@link CompositePollEvent.State#SUCCEEDED}.
|
||||
* Helper method to complete the given event with {@link AsyncPollEvent.State#SUCCEEDED}.
|
||||
*/
|
||||
public void complete(CompositePollEvent event) {
|
||||
public void complete(AsyncPollEvent event) {
|
||||
event.completeSuccessfully();
|
||||
log.trace("Completed event processing for {}", event);
|
||||
}
|
|
@ -1,122 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.clients.consumer.internals.events;
|
||||
|
||||
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
|
||||
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.common.utils.Timer;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
|
||||
|
||||
/**
|
||||
* {@code CompositePollEventInvoker} is executed on the application thread in the
|
||||
* {@link AsyncKafkaConsumer#poll(Duration)}.
|
||||
*/
|
||||
public class CompositePollEventInvoker {
|
||||
|
||||
private final Logger log;
|
||||
private final Time time;
|
||||
private final ApplicationEventHandler applicationEventHandler;
|
||||
private final Runnable applicationThreadCallbacks;
|
||||
private CompositePollEvent inflight;
|
||||
|
||||
public CompositePollEventInvoker(LogContext logContext,
|
||||
Time time,
|
||||
ApplicationEventHandler applicationEventHandler,
|
||||
Runnable applicationThreadCallbacks) {
|
||||
this.log = logContext.logger(getClass());
|
||||
this.time = time;
|
||||
this.applicationEventHandler = applicationEventHandler;
|
||||
this.applicationThreadCallbacks = applicationThreadCallbacks;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@code poll()} manages the lifetime of the {@link CompositePollEvent} processing. If it is called when
|
||||
* no event is currently processing, it will start a new event processing asynchronously. A check is made
|
||||
* during each invocation to see if the <em>inflight</em> event has reached a
|
||||
* {@link CompositePollEvent.State terminal state}. If it has, the result will be processed accordingly.
|
||||
*/
|
||||
public void poll(Timer timer) {
|
||||
if (inflight == null) {
|
||||
log.trace("No existing inflight event, submitting a new event");
|
||||
submitEvent(ApplicationEvent.Type.POLL, timer);
|
||||
}
|
||||
|
||||
try {
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace(
|
||||
"Attempting to retrieve result from previously submitted {} with {} remaining on timer",
|
||||
inflight,
|
||||
timer.remainingMs()
|
||||
);
|
||||
}
|
||||
|
||||
// Result should be non-null and starts off as State.STARTED.
|
||||
CompositePollEvent.Result result = inflight.result();
|
||||
CompositePollEvent.State state = result.state();
|
||||
|
||||
if (state == CompositePollEvent.State.SUCCEEDED) {
|
||||
// The composite event has completed all the requisite stages, though it does not imply that
|
||||
// there is data in the FetchBuffer yet. Make sure to clear out the inflight request.
|
||||
log.trace("Event {} completed, clearing inflight", inflight);
|
||||
inflight = null;
|
||||
} else if (state == CompositePollEvent.State.FAILED) {
|
||||
// The composite event failed at one of the stages. Make sure to clear out the inflight request
|
||||
// before the underlying error is surfaced to the user.
|
||||
log.trace("Event {} failed, clearing inflight", inflight);
|
||||
inflight = null;
|
||||
|
||||
throw result.asKafkaException();
|
||||
} else if (state == CompositePollEvent.State.CALLBACKS_REQUIRED) {
|
||||
// The background thread detected that it needed to yield to the application thread to invoke
|
||||
// callbacks. Even though the inflight reference _should_ be overwritten when the next stage of
|
||||
// the event is submitted, go ahead and clear out the inflight request just to be sure.
|
||||
log.trace("Event {} paused for callbacks, clearing inflight", inflight);
|
||||
inflight = null;
|
||||
|
||||
// Note: this is calling user-supplied code, so make sure to handle possible errors.
|
||||
applicationThreadCallbacks.run();
|
||||
|
||||
// The application thread callbacks are complete. Create another event to resume the polling at
|
||||
// the next stage.
|
||||
submitEvent(result.asNextEventType(), timer);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
// If an exception is hit, bubble it up to the user but make sure to clear out the inflight request
|
||||
// because the error effectively renders it complete.
|
||||
log.debug("Event {} failed due to {}, clearing inflight", inflight, String.valueOf(t));
|
||||
inflight = null;
|
||||
throw ConsumerUtils.maybeWrapAsKafkaException(t);
|
||||
}
|
||||
}
|
||||
|
||||
private void submitEvent(ApplicationEvent.Type type, Timer timer) {
|
||||
long deadlineMs = calculateDeadlineMs(timer);
|
||||
long pollTimeMs = time.milliseconds();
|
||||
inflight = new CompositePollEvent(deadlineMs, pollTimeMs, type);
|
||||
applicationEventHandler.add(inflight);
|
||||
|
||||
if (log.isTraceEnabled())
|
||||
log.trace("Submitted new {} with {} remaining on timer", inflight, timer.remainingMs());
|
||||
}
|
||||
}
|
|
@ -16,28 +16,12 @@
|
|||
*/
|
||||
package org.apache.kafka.clients.consumer.internals.events;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
public class PollEvent extends ApplicationEvent {
|
||||
public class SharePollEvent extends ApplicationEvent {
|
||||
|
||||
private final long pollTimeMs;
|
||||
|
||||
/**
|
||||
* A future that represents the completion of reconciliation and auto-commit
|
||||
* processing.
|
||||
* This future is completed when all commit request generation points have
|
||||
* been passed, including:
|
||||
* <ul>
|
||||
* <li>auto-commit on rebalance</li>
|
||||
* <li>auto-commit on the interval</li>
|
||||
* </ul>
|
||||
* Once completed, it signals that it's safe for the consumer to proceed with
|
||||
* fetching new records.
|
||||
*/
|
||||
private final CompletableFuture<Void> reconcileAndAutoCommit = new CompletableFuture<>();
|
||||
|
||||
public PollEvent(final long pollTimeMs) {
|
||||
super(Type.POLL);
|
||||
public SharePollEvent(final long pollTimeMs) {
|
||||
super(Type.SHARE_POLL);
|
||||
this.pollTimeMs = pollTimeMs;
|
||||
}
|
||||
|
||||
|
@ -45,14 +29,6 @@ public class PollEvent extends ApplicationEvent {
|
|||
return pollTimeMs;
|
||||
}
|
||||
|
||||
public CompletableFuture<Void> reconcileAndAutoCommit() {
|
||||
return reconcileAndAutoCommit;
|
||||
}
|
||||
|
||||
public void markReconcileAndAutoCommitComplete() {
|
||||
reconcileAndAutoCommit.complete(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toStringBase() {
|
||||
return super.toStringBase() + ", pollTimeMs=" + pollTimeMs;
|
|
@ -19,8 +19,8 @@ package org.apache.kafka.clients.consumer.internals;
|
|||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
|
||||
import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
|
||||
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
|
||||
import org.apache.kafka.common.metrics.Metrics;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
|
@ -61,7 +61,7 @@ public class ApplicationEventHandlerTest {
|
|||
asyncConsumerMetrics
|
||||
)) {
|
||||
// add event
|
||||
applicationEventHandler.add(new PollEvent(time.milliseconds()));
|
||||
applicationEventHandler.add(new AsyncPollEvent(time.milliseconds() + 10, time.milliseconds()));
|
||||
verify(asyncConsumerMetrics).recordApplicationEventQueueSize(1);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
|
|||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
|
||||
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CommitEvent;
|
||||
|
@ -43,14 +44,12 @@ import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent;
|
|||
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompositePollEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
|
||||
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
|
||||
|
@ -427,7 +426,7 @@ public class AsyncKafkaConsumerTest {
|
|||
|
||||
consumer.wakeup();
|
||||
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
completeAsyncPollEventSuccessfully();
|
||||
assertThrows(WakeupException.class, () -> consumer.poll(Duration.ZERO));
|
||||
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
|
||||
}
|
||||
|
@ -447,7 +446,7 @@ public class AsyncKafkaConsumerTest {
|
|||
completeAssignmentChangeEventSuccessfully();
|
||||
consumer.assign(singleton(tp));
|
||||
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
completeAsyncPollEventSuccessfully();
|
||||
assertThrows(WakeupException.class, () -> consumer.poll(Duration.ofMinutes(1)));
|
||||
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
|
||||
}
|
||||
|
@ -471,7 +470,7 @@ public class AsyncKafkaConsumerTest {
|
|||
completeAssignmentChangeEventSuccessfully();
|
||||
consumer.assign(singleton(tp));
|
||||
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
completeAsyncPollEventSuccessfully();
|
||||
// since wakeup() is called when the non-empty fetch is returned the wakeup should be ignored
|
||||
assertDoesNotThrow(() -> consumer.poll(Duration.ofMinutes(1)));
|
||||
// the previously ignored wake-up should not be ignored in the next call
|
||||
|
@ -508,7 +507,7 @@ public class AsyncKafkaConsumerTest {
|
|||
|
||||
completeTopicSubscriptionChangeEventSuccessfully();
|
||||
consumer.subscribe(Collections.singletonList(topicName), listener);
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
completeAsyncPollEventSuccessfully();
|
||||
ConsumerPollTestUtils.waitForCondition(
|
||||
consumer,
|
||||
callbackExecuted::get,
|
||||
|
@ -533,7 +532,7 @@ public class AsyncKafkaConsumerTest {
|
|||
completeAssignmentChangeEventSuccessfully();
|
||||
consumer.assign(singleton(tp));
|
||||
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
completeAsyncPollEventSuccessfully();
|
||||
consumer.poll(Duration.ZERO);
|
||||
|
||||
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
|
||||
|
@ -679,7 +678,7 @@ public class AsyncKafkaConsumerTest {
|
|||
completeAssignmentChangeEventSuccessfully();
|
||||
consumer.assign(Collections.singleton(new TopicPartition("foo", 0)));
|
||||
assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback));
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
completeAsyncPollEventSuccessfully();
|
||||
ConsumerPollTestUtils.waitForCondition(
|
||||
consumer,
|
||||
() -> callback.invoked == 1 && callback.exception == null,
|
||||
|
@ -1483,7 +1482,7 @@ public class AsyncKafkaConsumerTest {
|
|||
backgroundEventQueue.add(e);
|
||||
}
|
||||
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
completeAsyncPollEventSuccessfully();
|
||||
|
||||
// This will trigger the background event queue to process our background event message.
|
||||
// If any error is happening inside the rebalance callbacks, we expect the first exception to be thrown from poll.
|
||||
|
@ -1559,7 +1558,7 @@ public class AsyncKafkaConsumerTest {
|
|||
backgroundEventQueue.add(errorEvent);
|
||||
completeAssignmentChangeEventSuccessfully();
|
||||
consumer.assign(singletonList(new TopicPartition("topic", 0)));
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
completeAsyncPollEventSuccessfully();
|
||||
ConsumerPollTestUtils.waitForException(
|
||||
consumer,
|
||||
t -> t.getMessage().equals(expectedException.getMessage()),
|
||||
|
@ -1580,7 +1579,7 @@ public class AsyncKafkaConsumerTest {
|
|||
backgroundEventQueue.add(errorEvent2);
|
||||
completeAssignmentChangeEventSuccessfully();
|
||||
consumer.assign(singletonList(new TopicPartition("topic", 0)));
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
completeAsyncPollEventSuccessfully();
|
||||
ConsumerPollTestUtils.waitForException(
|
||||
consumer,
|
||||
t -> t.getMessage().equals(expectedException1.getMessage()),
|
||||
|
@ -1665,9 +1664,9 @@ public class AsyncKafkaConsumerTest {
|
|||
|
||||
completeTopicSubscriptionChangeEventSuccessfully();
|
||||
consumer.subscribe(singletonList("topic1"));
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
completeAsyncPollEventSuccessfully();
|
||||
consumer.poll(Duration.ofMillis(100));
|
||||
verify(applicationEventHandler, atLeastOnce()).add(any(CompositePollEvent.class));
|
||||
verify(applicationEventHandler, atLeastOnce()).add(any(AsyncPollEvent.class));
|
||||
}
|
||||
|
||||
private Properties requiredConsumerConfigAndGroupId(final String groupId) {
|
||||
|
@ -1683,7 +1682,7 @@ public class AsyncKafkaConsumerTest {
|
|||
|
||||
completeAssignmentChangeEventSuccessfully();
|
||||
consumer.assign(singleton(new TopicPartition("t1", 1)));
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
completeAsyncPollEventSuccessfully();
|
||||
consumer.poll(Duration.ZERO);
|
||||
}
|
||||
|
||||
|
@ -1717,7 +1716,7 @@ public class AsyncKafkaConsumerTest {
|
|||
).when(fetchCollector).collectFetch(any(FetchBuffer.class));
|
||||
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
|
||||
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
completeAsyncPollEventSuccessfully();
|
||||
// And then poll for up to 10000ms, which should return 2 records without timing out
|
||||
ConsumerRecords<?, ?> returnedRecords = consumer.poll(Duration.ofMillis(10000));
|
||||
assertEquals(2, returnedRecords.count());
|
||||
|
@ -1821,7 +1820,7 @@ public class AsyncKafkaConsumerTest {
|
|||
// interrupt the thread and call poll
|
||||
try {
|
||||
Thread.currentThread().interrupt();
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
completeAsyncPollEventSuccessfully();
|
||||
assertThrows(InterruptException.class, () -> consumer.poll(Duration.ZERO));
|
||||
} finally {
|
||||
// clear interrupted state again since this thread may be reused by JUnit
|
||||
|
@ -1853,7 +1852,7 @@ public class AsyncKafkaConsumerTest {
|
|||
completeTopicSubscriptionChangeEventSuccessfully();
|
||||
consumer.subscribe(Collections.singletonList("topic"));
|
||||
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
completeAsyncPollEventSuccessfully();
|
||||
|
||||
ConsumerPollTestUtils.waitForCondition(
|
||||
consumer,
|
||||
|
@ -1921,7 +1920,7 @@ public class AsyncKafkaConsumerTest {
|
|||
completeUnsubscribeApplicationEventSuccessfully();
|
||||
|
||||
consumer.assign(singleton(new TopicPartition("topic1", 0)));
|
||||
markReconcileAndAutoCommitCompleteForPollEvent();
|
||||
completeAsyncPollEventSuccessfully();
|
||||
consumer.poll(Duration.ZERO);
|
||||
verify(applicationEventHandler, never()).addAndGet(any(UpdatePatternSubscriptionEvent.class));
|
||||
|
||||
|
@ -2217,6 +2216,14 @@ public class AsyncKafkaConsumerTest {
|
|||
}).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(SeekUnvalidatedEvent.class));
|
||||
}
|
||||
|
||||
private void completeAsyncPollEventSuccessfully() {
|
||||
doAnswer(invocation -> {
|
||||
AsyncPollEvent event = invocation.getArgument(0);
|
||||
event.completeWithCallbackRequired(ApplicationEvent.Type.CHECK_AND_UPDATE_POSITIONS);
|
||||
return null;
|
||||
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class));
|
||||
}
|
||||
|
||||
private void forceCommitCallbackInvocation() {
|
||||
// Invokes callback
|
||||
consumer.commitAsync();
|
||||
|
@ -2294,17 +2301,4 @@ public class AsyncKafkaConsumerTest {
|
|||
verify(mockStreamsListener).onTasksRevoked(any());
|
||||
}
|
||||
}
|
||||
|
||||
private void markReconcileAndAutoCommitCompleteForPollEvent() {
|
||||
doAnswer(invocation -> {
|
||||
PollEvent event = invocation.getArgument(0);
|
||||
event.markReconcileAndAutoCommitComplete();
|
||||
return null;
|
||||
}).when(applicationEventHandler).add(ArgumentMatchers.isA(PollEvent.class));
|
||||
doAnswer(invocation -> {
|
||||
CompositePollEvent event = invocation.getArgument(0);
|
||||
event.completeWithCallbackRequired(ApplicationEvent.Type.CHECK_AND_UPDATE_POSITIONS);
|
||||
return null;
|
||||
}).when(applicationEventHandler).add(ArgumentMatchers.isA(CompositePollEvent.class));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,8 +18,8 @@ package org.apache.kafka.clients.consumer.internals;
|
|||
|
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
|
||||
import org.apache.kafka.clients.consumer.internals.events.AsyncPollEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
|
||||
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
import org.apache.kafka.common.metrics.Metrics;
|
||||
|
@ -258,7 +258,7 @@ public class ConsumerNetworkThreadTest {
|
|||
)) {
|
||||
consumerNetworkThread.initializeResources();
|
||||
|
||||
PollEvent event = new PollEvent(0);
|
||||
AsyncPollEvent event = new AsyncPollEvent(10, 0);
|
||||
event.setEnqueuedMs(time.milliseconds());
|
||||
applicationEventQueue.add(event);
|
||||
asyncConsumerMetrics.recordApplicationEventQueueSize(1);
|
||||
|
|
|
@ -24,11 +24,11 @@ import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandle
|
|||
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.SharePollEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
|
||||
import org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
|
||||
|
@ -680,7 +680,7 @@ public class ShareConsumerImplTest {
|
|||
consumer.subscribe(subscriptionTopic);
|
||||
|
||||
consumer.poll(Duration.ofMillis(100));
|
||||
verify(applicationEventHandler).add(any(PollEvent.class));
|
||||
verify(applicationEventHandler).add(any(SharePollEvent.class));
|
||||
verify(applicationEventHandler).addAndGet(any(ShareSubscriptionChangeEvent.class));
|
||||
|
||||
completeShareAcknowledgeOnCloseApplicationEventSuccessfully();
|
||||
|
|
|
@ -111,7 +111,8 @@ public class ApplicationEventProcessorTest {
|
|||
new LogContext(),
|
||||
requestManagers,
|
||||
metadata,
|
||||
subscriptionState
|
||||
subscriptionState,
|
||||
Optional.of(mock(AsyncPollEventProcessorContext.class))
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -171,7 +172,7 @@ public class ApplicationEventProcessorTest {
|
|||
|
||||
private static Stream<Arguments> applicationEvents() {
|
||||
return Stream.of(
|
||||
Arguments.of(new PollEvent(100)),
|
||||
Arguments.of(new AsyncPollEvent(calculateDeadlineMs(12345, 100), 100)),
|
||||
Arguments.of(new CreateFetchRequestsEvent(calculateDeadlineMs(12345, 100))),
|
||||
Arguments.of(new CheckAndUpdatePositionsEvent(500)),
|
||||
Arguments.of(new TopicMetadataEvent("topic", Long.MAX_VALUE)),
|
||||
|
@ -265,12 +266,12 @@ public class ApplicationEventProcessorTest {
|
|||
|
||||
@Test
|
||||
public void testPollEvent() {
|
||||
PollEvent event = new PollEvent(12345);
|
||||
AsyncPollEvent event = new AsyncPollEvent(12346, 12345);
|
||||
|
||||
setupProcessor(true);
|
||||
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
|
||||
when(offsetsRequestManager.updateFetchPositions(anyLong())).thenReturn(new CompletableFuture<>());
|
||||
processor.process(event);
|
||||
assertTrue(event.reconcileAndAutoCommit().isDone());
|
||||
verify(commitRequestManager).updateTimerAndMaybeCommit(12345);
|
||||
verify(membershipManager).onConsumerPoll();
|
||||
verify(heartbeatRequestManager).resetPollTimer(12345);
|
||||
|
|
Loading…
Reference in New Issue