More clean up and refactoring

This commit is contained in:
Kirk True 2025-09-29 12:16:54 -07:00
parent 52c08455c1
commit bfcd7ec0f8
7 changed files with 27 additions and 41 deletions

View File

@ -462,7 +462,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
streamsRebalanceData
);
final CompletableEventReaper applicationEventReaper = new CompletableEventReaper(logContext);
Supplier<CompositePollEventProcessorContext> compositePollContextSupplier = CompositePollEventProcessorContext.supplier(
final Supplier<CompositePollEventProcessorContext> compositePollContextSupplier = CompositePollEventProcessorContext.supplier(
logContext,
networkClientDelegateSupplier,
backgroundEventHandler,
@ -500,8 +500,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
time,
applicationEventHandler,
() -> {
processBackgroundEvents();
offsetCommitCallbackInvoker.executeCallbacks();
processBackgroundEvents();
}
);
@ -583,8 +583,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
time,
applicationEventHandler,
() -> {
processBackgroundEvents();
offsetCommitCallbackInvoker.executeCallbacks();
processBackgroundEvents();
}
);
this.backgroundEventHandler = new BackgroundEventHandler(
@ -681,7 +681,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
Optional.empty()
);
final CompletableEventReaper applicationEventReaper = new CompletableEventReaper(logContext);
Supplier<CompositePollEventProcessorContext> compositePollContextSupplier = CompositePollEventProcessorContext.supplier(
final Supplier<CompositePollEventProcessorContext> compositePollContextSupplier = CompositePollEventProcessorContext.supplier(
logContext,
networkClientDelegateSupplier,
backgroundEventHandler,
@ -711,8 +711,8 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
time,
applicationEventHandler,
() -> {
processBackgroundEvents();
offsetCommitCallbackInvoker.executeCallbacks();
processBackgroundEvents();
}
);
}

View File

@ -51,7 +51,6 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
@ -71,7 +70,7 @@ public class NetworkClientDelegate implements AutoCloseable {
private final int requestTimeoutMs;
private final Queue<UnsentRequest> unsentRequests;
private final long retryBackoffMs;
private final AtomicReference<Exception> metadataError;
private Optional<Exception> metadataError;
private final boolean notifyMetadataErrorsViaErrorQueue;
private final AsyncConsumerMetrics asyncConsumerMetrics;
@ -92,7 +91,7 @@ public class NetworkClientDelegate implements AutoCloseable {
this.unsentRequests = new ArrayDeque<>();
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.metadataError = new AtomicReference<>();
this.metadataError = Optional.empty();
this.notifyMetadataErrorsViaErrorQueue = notifyMetadataErrorsViaErrorQueue;
this.asyncConsumerMetrics = asyncConsumerMetrics;
}
@ -164,7 +163,7 @@ public class NetworkClientDelegate implements AutoCloseable {
if (notifyMetadataErrorsViaErrorQueue) {
backgroundEventHandler.add(new ErrorEvent(e));
} else {
metadataError.compareAndSet(null, e);
metadataError = Optional.of(e);
}
}
}
@ -250,8 +249,9 @@ public class NetworkClientDelegate implements AutoCloseable {
}
public Optional<Exception> getAndClearMetadataError() {
Exception exception = metadataError.getAndSet(null);
return Optional.ofNullable(exception);
Optional<Exception> metadataError = this.metadataError;
this.metadataError = Optional.empty();
return metadataError;
}
public Node leastLoadedNode() {

View File

@ -50,6 +50,12 @@ public class OffsetCommitCallbackInvoker {
}
}
/**
* Returns the current size of the queue. Used by the background thread to determine if it needs to <i>pause</i>
* itself to return to the application thread for processing.
*
* @return Current size of queue
*/
public int size() {
return callbackQueue.size();
}

View File

@ -297,7 +297,6 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
clientTelemetryReporter,
metrics
);
final CompletableEventReaper applicationEventReaper = new CompletableEventReaper(logContext);
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(
logContext,
metadata,
@ -309,7 +308,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
logContext,
time,
applicationEventQueue,
applicationEventReaper,
new CompletableEventReaper(logContext),
applicationEventProcessorSupplier,
networkClientDelegateSupplier,
requestManagersSupplier,
@ -404,7 +403,6 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
metrics
);
final CompletableEventReaper applicationEventReaper = new CompletableEventReaper(logContext);
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(
logContext,
metadata,
@ -416,7 +414,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
logContext,
time,
applicationEventQueue,
applicationEventReaper,
new CompletableEventReaper(logContext),
applicationEventProcessorSupplier,
networkClientDelegateSupplier,
requestManagersSupplier,

View File

@ -45,6 +45,12 @@ public class BackgroundEventHandler {
this.asyncConsumerMetrics = asyncConsumerMetrics;
}
/**
* Returns the current size of the queue. Used by the background thread to determine if it needs to <i>pause</i>
* itself to return to the application thread for processing.
*
* @return Current size of queue
*/
public int size() {
return backgroundEventQueue.size();
}

View File

@ -428,7 +428,6 @@ public class AsyncKafkaConsumerTest {
consumer.wakeup();
markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent();
assertThrows(WakeupException.class, () -> consumer.poll(Duration.ZERO));
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
}
@ -449,7 +448,6 @@ public class AsyncKafkaConsumerTest {
consumer.assign(singleton(tp));
markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent();
assertThrows(WakeupException.class, () -> consumer.poll(Duration.ofMinutes(1)));
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
}
@ -474,7 +472,6 @@ public class AsyncKafkaConsumerTest {
consumer.assign(singleton(tp));
markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent();
// since wakeup() is called when the non-empty fetch is returned the wakeup should be ignored
assertDoesNotThrow(() -> consumer.poll(Duration.ofMinutes(1)));
// the previously ignored wake-up should not be ignored in the next call
@ -512,7 +509,6 @@ public class AsyncKafkaConsumerTest {
completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(Collections.singletonList(topicName), listener);
markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent(CompositePollEvent.State.CALLBACKS_REQUIRED);
waitForConsumerPoll(
callbackExecuted::get,
"Consumer.poll() did not execute callback within timeout"
@ -537,7 +533,6 @@ public class AsyncKafkaConsumerTest {
consumer.assign(singleton(tp));
markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent();
consumer.poll(Duration.ZERO);
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
@ -684,7 +679,6 @@ public class AsyncKafkaConsumerTest {
consumer.assign(Collections.singleton(new TopicPartition("foo", 0)));
assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback));
markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent(CompositePollEvent.State.CALLBACKS_REQUIRED);
waitForConsumerPoll(
() -> callback.invoked == 1 && callback.exception == null,
"Consumer.poll() did not execute the callback once (without error) in allottec timeout"
@ -1488,7 +1482,6 @@ public class AsyncKafkaConsumerTest {
}
markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent(CompositePollEvent.State.CALLBACKS_REQUIRED);
// This will trigger the background event queue to process our background event message.
// If any error is happening inside the rebalance callbacks, we expect the first exception to be thrown from poll.
@ -1567,7 +1560,6 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully();
consumer.assign(singletonList(new TopicPartition("topic", 0)));
markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent(CompositePollEvent.State.CALLBACKS_REQUIRED);
waitForConsumerPollException(
e -> e.getMessage().equals(expectedException.getMessage()),
"Consumer.poll() did not fail with expected exception " + expectedException + " within timeout"
@ -1588,7 +1580,6 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully();
consumer.assign(singletonList(new TopicPartition("topic", 0)));
markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent(CompositePollEvent.State.CALLBACKS_REQUIRED);
waitForConsumerPollException(
e -> e.getMessage().equals(expectedException1.getMessage()),
"Consumer.poll() did not fail with expected exception " + expectedException1 + " within timeout"
@ -1673,7 +1664,6 @@ public class AsyncKafkaConsumerTest {
completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(singletonList("topic1"));
markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent();
consumer.poll(Duration.ofMillis(100));
verify(applicationEventHandler).add(any(CompositePollEvent.class));
}
@ -1692,7 +1682,6 @@ public class AsyncKafkaConsumerTest {
completeAssignmentChangeEventSuccessfully();
consumer.assign(singleton(new TopicPartition("t1", 1)));
markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent();
consumer.poll(Duration.ZERO);
}
@ -1727,7 +1716,6 @@ public class AsyncKafkaConsumerTest {
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent();
// And then poll for up to 10000ms, which should return 2 records without timing out
ConsumerRecords<?, ?> returnedRecords = consumer.poll(Duration.ofMillis(10000));
assertEquals(2, returnedRecords.count());
@ -1832,7 +1820,6 @@ public class AsyncKafkaConsumerTest {
try {
Thread.currentThread().interrupt();
markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent();
assertThrows(InterruptException.class, () -> consumer.poll(Duration.ZERO));
} finally {
// clear interrupted state again since this thread may be reused by JUnit
@ -1865,8 +1852,6 @@ public class AsyncKafkaConsumerTest {
consumer.subscribe(Collections.singletonList("topic"));
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent();
markResultForCompositePollEvent(CompositePollEvent.State.CALLBACKS_REQUIRED);
waitForConsumerPoll(
() -> backgroundEventReaper.size() == 0,
@ -1934,7 +1919,6 @@ public class AsyncKafkaConsumerTest {
consumer.assign(singleton(new TopicPartition("topic1", 0)));
markReconcileAndAutoCommitCompleteForPollEvent();
markResultForCompositePollEvent();
consumer.poll(Duration.ZERO);
verify(applicationEventHandler, never()).addAndGet(any(UpdatePatternSubscriptionEvent.class));
@ -2316,17 +2300,9 @@ public class AsyncKafkaConsumerTest {
event.markReconcileAndAutoCommitComplete();
return null;
}).when(applicationEventHandler).add(ArgumentMatchers.isA(PollEvent.class));
}
private void markResultForCompositePollEvent() {
doAnswer(invocation -> null)
.when(applicationEventHandler).add(ArgumentMatchers.isA(CompositePollEvent.class));
}
private void markResultForCompositePollEvent(CompositePollEvent.State state) {
doAnswer(invocation -> {
CompositePollEvent event = invocation.getArgument(0);
event.complete(state, Optional.empty());
event.complete(CompositePollEvent.State.CALLBACKS_REQUIRED, Optional.empty());
return null;
}).when(applicationEventHandler).add(ArgumentMatchers.isA(CompositePollEvent.class));
}

View File

@ -81,7 +81,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@SuppressWarnings({"ClassDataAbstractionCoupling", "ClassFanOutComplexity"})
@SuppressWarnings("ClassDataAbstractionCoupling")
public class ApplicationEventProcessorTest {
private final Time time = new MockTime();
private final CommitRequestManager commitRequestManager = mock(CommitRequestManager.class);