Notify FetchBuffer from background thread if pausing due to application thread callbacks

FetchBuffer is now provided to CompositePollEventProcessorContext and its supplier, allowing the context to call fetchBuffer.wakeup() after completing an event. The wakeup method in FetchBuffer is made public to support this usage.
This commit is contained in:
Kirk True 2025-10-01 13:08:19 -07:00
parent bc660d6462
commit e9dbc61bff
3 changed files with 15 additions and 6 deletions

View File

@ -467,7 +467,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
networkClientDelegateSupplier, networkClientDelegateSupplier,
backgroundEventHandler, backgroundEventHandler,
offsetCommitCallbackInvoker, offsetCommitCallbackInvoker,
applicationEventReaper applicationEventReaper,
fetchBuffer
); );
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext, final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext,
metadata, metadata,
@ -686,7 +687,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
networkClientDelegateSupplier, networkClientDelegateSupplier,
backgroundEventHandler, backgroundEventHandler,
offsetCommitCallbackInvoker, offsetCommitCallbackInvoker,
applicationEventReaper applicationEventReaper,
fetchBuffer
); );
Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier( Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(
logContext, logContext,

View File

@ -193,7 +193,7 @@ public class FetchBuffer implements AutoCloseable {
} }
} }
void wakeup() { public void wakeup() {
try { try {
lock.lock(); lock.lock();
wokenup.set(true); wokenup.set(true);

View File

@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.internals.CachedSupplier;
import org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer; import org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils; import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.clients.consumer.internals.FetchBuffer;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate; import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker; import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
@ -44,17 +45,20 @@ public class CompositePollEventProcessorContext {
private final BackgroundEventHandler backgroundEventHandler; private final BackgroundEventHandler backgroundEventHandler;
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker; private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
private final CompletableEventReaper applicationEventReaper; private final CompletableEventReaper applicationEventReaper;
private final FetchBuffer fetchBuffer;
private CompositePollEventProcessorContext(LogContext logContext, private CompositePollEventProcessorContext(LogContext logContext,
NetworkClientDelegate networkClientDelegate, NetworkClientDelegate networkClientDelegate,
BackgroundEventHandler backgroundEventHandler, BackgroundEventHandler backgroundEventHandler,
OffsetCommitCallbackInvoker offsetCommitCallbackInvoker, OffsetCommitCallbackInvoker offsetCommitCallbackInvoker,
CompletableEventReaper applicationEventReaper) { CompletableEventReaper applicationEventReaper,
FetchBuffer fetchBuffer) {
this.log = logContext.logger(getClass()); this.log = logContext.logger(getClass());
this.networkClientDelegate = networkClientDelegate; this.networkClientDelegate = networkClientDelegate;
this.backgroundEventHandler = backgroundEventHandler; this.backgroundEventHandler = backgroundEventHandler;
this.offsetCommitCallbackInvoker = offsetCommitCallbackInvoker; this.offsetCommitCallbackInvoker = offsetCommitCallbackInvoker;
this.applicationEventReaper = applicationEventReaper; this.applicationEventReaper = applicationEventReaper;
this.fetchBuffer = fetchBuffer;
} }
/** /**
@ -65,7 +69,8 @@ public class CompositePollEventProcessorContext {
Supplier<NetworkClientDelegate> networkClientDelegateSupplier, Supplier<NetworkClientDelegate> networkClientDelegateSupplier,
BackgroundEventHandler backgroundEventHandler, BackgroundEventHandler backgroundEventHandler,
OffsetCommitCallbackInvoker offsetCommitCallbackInvoker, OffsetCommitCallbackInvoker offsetCommitCallbackInvoker,
CompletableEventReaper applicationEventReaper) { CompletableEventReaper applicationEventReaper,
FetchBuffer fetchBuffer) {
return new CachedSupplier<>() { return new CachedSupplier<>() {
@Override @Override
protected CompositePollEventProcessorContext create() { protected CompositePollEventProcessorContext create() {
@ -76,7 +81,8 @@ public class CompositePollEventProcessorContext {
networkClientDelegate, networkClientDelegate,
backgroundEventHandler, backgroundEventHandler,
offsetCommitCallbackInvoker, offsetCommitCallbackInvoker,
applicationEventReaper applicationEventReaper,
fetchBuffer
); );
} }
}; };
@ -125,6 +131,7 @@ public class CompositePollEventProcessorContext {
nextEventType nextEventType
); );
event.completeWithCallbackRequired(nextEventType); event.completeWithCallbackRequired(nextEventType);
fetchBuffer.wakeup();
return true; return true;
} }