Add completion tracking to CompositePollEvent

Introduces an AtomicBoolean to track completion state in CompositePollEvent and updates ApplicationEventProcessor to mark events as complete when appropriate. Refactors AsyncKafkaConsumer to use a new CompositePollEventInvoker for polling, replacing prepareFetch, and implements exponential backoff for incomplete events.
This commit is contained in:
Kirk True 2025-09-20 13:41:07 -07:00
parent 0ac19f96b9
commit ae0ddcc4c0
3 changed files with 61 additions and 24 deletions

View File

@ -174,6 +174,35 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
private static final long NO_CURRENT_THREAD = -1L;
private class CompositePollEventInvoker {
private final Timer timer;
private final long pollTimeMs;
private CompositePollEvent latest;
private int backoff = -1;
public CompositePollEventInvoker(Timer timer, long pollTimeMs) {
this.timer = timer;
this.pollTimeMs = pollTimeMs;
}
private void poll() {
if (latest == null || latest.isComplete()) {
long deadlineMs = calculateDeadlineMs(timer);
latest = new CompositePollEvent(deadlineMs, pollTimeMs, ApplicationEvent.Type.POLL);
applicationEventHandler.add(latest);
} else {
if (backoff == -1)
backoff = 1;
else
backoff *= 2;
long sleep = Math.min(Math.min(backoff, retryBackoffMs), timer.remainingMs());
timer.sleep(sleep);
}
}
}
/**
* An {@link org.apache.kafka.clients.consumer.internals.events.EventProcessor} that is created and executes in the
* application thread for the purpose of processing {@link BackgroundEvent background events} generated by the
@ -837,13 +866,17 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
}
CompositePollEventInvoker pollEventInvoker = new CompositePollEventInvoker(timer, time.milliseconds());
do {
// We must not allow wake-ups between polling for fetches and returning the records.
// If the polled fetches are not empty the consumed position has already been updated in the polling
// of the fetches. A wakeup between returned fetches and returning records would lead to never
// returning the records in the fetches. Thus, we trigger a possible wake-up before we poll fetches.
wakeupTrigger.maybeTriggerWakeup();
prepareFetch(timer);
processBackgroundEvents();
offsetCommitCallbackInvoker.executeCallbacks();
pollEventInvoker.poll();
final Fetch<K, V> fetch = pollForFetches(timer);
if (!fetch.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
@ -871,20 +904,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
}
}
private void prepareFetch(Timer timer) {
processBackgroundEvents();
offsetCommitCallbackInvoker.executeCallbacks();
long pollTimeMs = timer.currentTimeMs();
CompositePollEvent event = new CompositePollEvent(
calculateDeadlineMs(timer),
pollTimeMs,
ApplicationEvent.Type.POLL
);
applicationEventHandler.add(event);
}
/**
* Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and
* partitions.

View File

@ -241,7 +241,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
}
private void process(final CompositePollEvent event) {
if (maybePauseCompositePoll())
if (maybePauseCompositePoll(event))
return;
ApplicationEvent.Type nextEventType = event.nextEventType();
@ -249,7 +249,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
if (nextEventType == ApplicationEvent.Type.POLL) {
processPollEvent(event.pollTimeMs());
if (maybePauseCompositePoll())
if (maybePauseCompositePoll(event))
return;
nextEventType = ApplicationEvent.Type.UPDATE_SUBSCRIPTION_METADATA;
@ -258,7 +258,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
if (nextEventType == ApplicationEvent.Type.UPDATE_SUBSCRIPTION_METADATA) {
processUpdatePatternSubscriptionEvent();
if (maybePauseCompositePoll())
if (maybePauseCompositePoll(event))
return;
nextEventType = ApplicationEvent.Type.CHECK_AND_UPDATE_POSITIONS;
@ -269,14 +269,15 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
applicationEventReaper.add(new CompositePollPsuedoEvent<>(updatePositionsFuture, event.deadlineMs()));
updatePositionsFuture.whenComplete((__, updatePositionsError) -> {
if (maybeFailCompositePoll(event, updatePositionsError) || maybePauseCompositePoll())
if (maybeFailCompositePoll(event, updatePositionsError) || maybePauseCompositePoll(event))
return;
// If needed, create a fetch request if there's no data in the FetchBuffer.
requestManagers.fetchRequestManager.createFetchRequests().whenComplete((___, fetchError) -> {
if (maybeFailCompositePoll(event, fetchError) || maybePauseCompositePoll())
if (maybeFailCompositePoll(event, fetchError) || maybePauseCompositePoll(event))
return;
event.complete();
log.trace("Completed CompositePollEvent {}", event);
});
});
@ -285,14 +286,19 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
}
log.warn("Unknown next step for composite poll: {}", nextEventType);
event.complete();
}
private boolean maybePauseCompositePoll() {
if (backgroundEventProcessingRequiredTest.requiresApplicationThread())
private boolean maybePauseCompositePoll(CompositePollEvent event) {
if (backgroundEventProcessingRequiredTest.requiresApplicationThread()) {
event.complete();
return true;
}
if (offsetCommitCallbackInvocationRequiredTest.requiresApplicationThread())
if (offsetCommitCallbackInvocationRequiredTest.requiresApplicationThread()) {
event.complete();
return true;
}
return false;
}
@ -311,6 +317,7 @@ public class ApplicationEventProcessor implements EventProcessor<ApplicationEven
}
backgroundEventHandler.add(new ErrorEvent(t));
event.complete();
log.trace("Failing CompositePollEvent {}", event, t);
return true;
}

View File

@ -16,11 +16,14 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
import java.util.concurrent.atomic.AtomicBoolean;
public class CompositePollEvent extends ApplicationEvent {
private final long deadlineMs;
private final long pollTimeMs;
private final Type nextEventType;
private final AtomicBoolean complete = new AtomicBoolean();
public CompositePollEvent(long deadlineMs, long pollTimeMs, Type nextEventType) {
super(Type.COMPOSITE_POLL);
@ -41,8 +44,16 @@ public class CompositePollEvent extends ApplicationEvent {
return nextEventType;
}
public boolean isComplete() {
return complete.get();
}
public void complete() {
complete.set(true);
}
@Override
protected String toStringBase() {
return super.toStringBase() + ", deadlineMs=" + deadlineMs + ", pollTimeMs=" + pollTimeMs + ", nextEventType=" + nextEventType;
return super.toStringBase() + ", deadlineMs=" + deadlineMs + ", pollTimeMs=" + pollTimeMs + ", nextEventType=" + nextEventType + ", complete=" + complete;
}
}