From bfcd7ec0f80e9c6b51ea3b45f202f7f42e83e602 Mon Sep 17 00:00:00 2001 From: Kirk True Date: Mon, 29 Sep 2025 12:16:54 -0700 Subject: [PATCH] More clean up and refactoring --- .../internals/AsyncKafkaConsumer.java | 10 +++---- .../internals/NetworkClientDelegate.java | 12 ++++----- .../OffsetCommitCallbackInvoker.java | 6 +++++ .../consumer/internals/ShareConsumerImpl.java | 6 ++--- .../events/BackgroundEventHandler.java | 6 +++++ .../internals/AsyncKafkaConsumerTest.java | 26 +------------------ .../events/ApplicationEventProcessorTest.java | 2 +- 7 files changed, 27 insertions(+), 41 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index e98e631b2d3..c552cc3bbd7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -462,7 +462,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { streamsRebalanceData ); final CompletableEventReaper applicationEventReaper = new CompletableEventReaper(logContext); - Supplier compositePollContextSupplier = CompositePollEventProcessorContext.supplier( + final Supplier compositePollContextSupplier = CompositePollEventProcessorContext.supplier( logContext, networkClientDelegateSupplier, backgroundEventHandler, @@ -500,8 +500,8 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { time, applicationEventHandler, () -> { - processBackgroundEvents(); offsetCommitCallbackInvoker.executeCallbacks(); + processBackgroundEvents(); } ); @@ -583,8 +583,8 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { time, applicationEventHandler, () -> { - processBackgroundEvents(); offsetCommitCallbackInvoker.executeCallbacks(); + processBackgroundEvents(); } ); this.backgroundEventHandler = new BackgroundEventHandler( @@ -681,7 +681,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { Optional.empty() ); final CompletableEventReaper applicationEventReaper = new CompletableEventReaper(logContext); - Supplier compositePollContextSupplier = CompositePollEventProcessorContext.supplier( + final Supplier compositePollContextSupplier = CompositePollEventProcessorContext.supplier( logContext, networkClientDelegateSupplier, backgroundEventHandler, @@ -711,8 +711,8 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { time, applicationEventHandler, () -> { - processBackgroundEvents(); offsetCommitCallbackInvoker.executeCallbacks(); + processBackgroundEvents(); } ); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java index 5f71cd3fbc7..31c402df2a6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java @@ -51,7 +51,6 @@ import java.util.Objects; import java.util.Optional; import java.util.Queue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Supplier; @@ -71,7 +70,7 @@ public class NetworkClientDelegate implements AutoCloseable { private final int requestTimeoutMs; private final Queue unsentRequests; private final long retryBackoffMs; - private final AtomicReference metadataError; + private Optional metadataError; private final boolean notifyMetadataErrorsViaErrorQueue; private final AsyncConsumerMetrics asyncConsumerMetrics; @@ -92,7 +91,7 @@ public class NetworkClientDelegate implements AutoCloseable { this.unsentRequests = new ArrayDeque<>(); this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); - this.metadataError = new AtomicReference<>(); + this.metadataError = Optional.empty(); this.notifyMetadataErrorsViaErrorQueue = notifyMetadataErrorsViaErrorQueue; this.asyncConsumerMetrics = asyncConsumerMetrics; } @@ -164,7 +163,7 @@ public class NetworkClientDelegate implements AutoCloseable { if (notifyMetadataErrorsViaErrorQueue) { backgroundEventHandler.add(new ErrorEvent(e)); } else { - metadataError.compareAndSet(null, e); + metadataError = Optional.of(e); } } } @@ -250,8 +249,9 @@ public class NetworkClientDelegate implements AutoCloseable { } public Optional getAndClearMetadataError() { - Exception exception = metadataError.getAndSet(null); - return Optional.ofNullable(exception); + Optional metadataError = this.metadataError; + this.metadataError = Optional.empty(); + return metadataError; } public Node leastLoadedNode() { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvoker.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvoker.java index acc0f277d0d..77b8f6f81fe 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvoker.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvoker.java @@ -50,6 +50,12 @@ public class OffsetCommitCallbackInvoker { } } + /** + * Returns the current size of the queue. Used by the background thread to determine if it needs to pause + * itself to return to the application thread for processing. + * + * @return Current size of queue + */ public int size() { return callbackQueue.size(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index cc6efc67e98..ecdc9b8a048 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -297,7 +297,6 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { clientTelemetryReporter, metrics ); - final CompletableEventReaper applicationEventReaper = new CompletableEventReaper(logContext); final Supplier applicationEventProcessorSupplier = ApplicationEventProcessor.supplier( logContext, metadata, @@ -309,7 +308,7 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { logContext, time, applicationEventQueue, - applicationEventReaper, + new CompletableEventReaper(logContext), applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, @@ -404,7 +403,6 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { metrics ); - final CompletableEventReaper applicationEventReaper = new CompletableEventReaper(logContext); final Supplier applicationEventProcessorSupplier = ApplicationEventProcessor.supplier( logContext, metadata, @@ -416,7 +414,7 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { logContext, time, applicationEventQueue, - applicationEventReaper, + new CompletableEventReaper(logContext), applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java index ab790ba2055..935220bf70d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java @@ -45,6 +45,12 @@ public class BackgroundEventHandler { this.asyncConsumerMetrics = asyncConsumerMetrics; } + /** + * Returns the current size of the queue. Used by the background thread to determine if it needs to pause + * itself to return to the application thread for processing. + * + * @return Current size of queue + */ public int size() { return backgroundEventQueue.size(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 0e7df790223..d8ae8183f86 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -428,7 +428,6 @@ public class AsyncKafkaConsumerTest { consumer.wakeup(); markReconcileAndAutoCommitCompleteForPollEvent(); - markResultForCompositePollEvent(); assertThrows(WakeupException.class, () -> consumer.poll(Duration.ZERO)); assertDoesNotThrow(() -> consumer.poll(Duration.ZERO)); } @@ -449,7 +448,6 @@ public class AsyncKafkaConsumerTest { consumer.assign(singleton(tp)); markReconcileAndAutoCommitCompleteForPollEvent(); - markResultForCompositePollEvent(); assertThrows(WakeupException.class, () -> consumer.poll(Duration.ofMinutes(1))); assertDoesNotThrow(() -> consumer.poll(Duration.ZERO)); } @@ -474,7 +472,6 @@ public class AsyncKafkaConsumerTest { consumer.assign(singleton(tp)); markReconcileAndAutoCommitCompleteForPollEvent(); - markResultForCompositePollEvent(); // since wakeup() is called when the non-empty fetch is returned the wakeup should be ignored assertDoesNotThrow(() -> consumer.poll(Duration.ofMinutes(1))); // the previously ignored wake-up should not be ignored in the next call @@ -512,7 +509,6 @@ public class AsyncKafkaConsumerTest { completeTopicSubscriptionChangeEventSuccessfully(); consumer.subscribe(Collections.singletonList(topicName), listener); markReconcileAndAutoCommitCompleteForPollEvent(); - markResultForCompositePollEvent(CompositePollEvent.State.CALLBACKS_REQUIRED); waitForConsumerPoll( callbackExecuted::get, "Consumer.poll() did not execute callback within timeout" @@ -537,7 +533,6 @@ public class AsyncKafkaConsumerTest { consumer.assign(singleton(tp)); markReconcileAndAutoCommitCompleteForPollEvent(); - markResultForCompositePollEvent(); consumer.poll(Duration.ZERO); assertDoesNotThrow(() -> consumer.poll(Duration.ZERO)); @@ -684,7 +679,6 @@ public class AsyncKafkaConsumerTest { consumer.assign(Collections.singleton(new TopicPartition("foo", 0))); assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), callback)); markReconcileAndAutoCommitCompleteForPollEvent(); - markResultForCompositePollEvent(CompositePollEvent.State.CALLBACKS_REQUIRED); waitForConsumerPoll( () -> callback.invoked == 1 && callback.exception == null, "Consumer.poll() did not execute the callback once (without error) in allottec timeout" @@ -1488,7 +1482,6 @@ public class AsyncKafkaConsumerTest { } markReconcileAndAutoCommitCompleteForPollEvent(); - markResultForCompositePollEvent(CompositePollEvent.State.CALLBACKS_REQUIRED); // This will trigger the background event queue to process our background event message. // If any error is happening inside the rebalance callbacks, we expect the first exception to be thrown from poll. @@ -1567,7 +1560,6 @@ public class AsyncKafkaConsumerTest { completeAssignmentChangeEventSuccessfully(); consumer.assign(singletonList(new TopicPartition("topic", 0))); markReconcileAndAutoCommitCompleteForPollEvent(); - markResultForCompositePollEvent(CompositePollEvent.State.CALLBACKS_REQUIRED); waitForConsumerPollException( e -> e.getMessage().equals(expectedException.getMessage()), "Consumer.poll() did not fail with expected exception " + expectedException + " within timeout" @@ -1588,7 +1580,6 @@ public class AsyncKafkaConsumerTest { completeAssignmentChangeEventSuccessfully(); consumer.assign(singletonList(new TopicPartition("topic", 0))); markReconcileAndAutoCommitCompleteForPollEvent(); - markResultForCompositePollEvent(CompositePollEvent.State.CALLBACKS_REQUIRED); waitForConsumerPollException( e -> e.getMessage().equals(expectedException1.getMessage()), "Consumer.poll() did not fail with expected exception " + expectedException1 + " within timeout" @@ -1673,7 +1664,6 @@ public class AsyncKafkaConsumerTest { completeTopicSubscriptionChangeEventSuccessfully(); consumer.subscribe(singletonList("topic1")); markReconcileAndAutoCommitCompleteForPollEvent(); - markResultForCompositePollEvent(); consumer.poll(Duration.ofMillis(100)); verify(applicationEventHandler).add(any(CompositePollEvent.class)); } @@ -1692,7 +1682,6 @@ public class AsyncKafkaConsumerTest { completeAssignmentChangeEventSuccessfully(); consumer.assign(singleton(new TopicPartition("t1", 1))); markReconcileAndAutoCommitCompleteForPollEvent(); - markResultForCompositePollEvent(); consumer.poll(Duration.ZERO); } @@ -1727,7 +1716,6 @@ public class AsyncKafkaConsumerTest { when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); markReconcileAndAutoCommitCompleteForPollEvent(); - markResultForCompositePollEvent(); // And then poll for up to 10000ms, which should return 2 records without timing out ConsumerRecords returnedRecords = consumer.poll(Duration.ofMillis(10000)); assertEquals(2, returnedRecords.count()); @@ -1832,7 +1820,6 @@ public class AsyncKafkaConsumerTest { try { Thread.currentThread().interrupt(); markReconcileAndAutoCommitCompleteForPollEvent(); - markResultForCompositePollEvent(); assertThrows(InterruptException.class, () -> consumer.poll(Duration.ZERO)); } finally { // clear interrupted state again since this thread may be reused by JUnit @@ -1865,8 +1852,6 @@ public class AsyncKafkaConsumerTest { consumer.subscribe(Collections.singletonList("topic")); when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); markReconcileAndAutoCommitCompleteForPollEvent(); - markResultForCompositePollEvent(); - markResultForCompositePollEvent(CompositePollEvent.State.CALLBACKS_REQUIRED); waitForConsumerPoll( () -> backgroundEventReaper.size() == 0, @@ -1934,7 +1919,6 @@ public class AsyncKafkaConsumerTest { consumer.assign(singleton(new TopicPartition("topic1", 0))); markReconcileAndAutoCommitCompleteForPollEvent(); - markResultForCompositePollEvent(); consumer.poll(Duration.ZERO); verify(applicationEventHandler, never()).addAndGet(any(UpdatePatternSubscriptionEvent.class)); @@ -2316,17 +2300,9 @@ public class AsyncKafkaConsumerTest { event.markReconcileAndAutoCommitComplete(); return null; }).when(applicationEventHandler).add(ArgumentMatchers.isA(PollEvent.class)); - } - - private void markResultForCompositePollEvent() { - doAnswer(invocation -> null) - .when(applicationEventHandler).add(ArgumentMatchers.isA(CompositePollEvent.class)); - } - - private void markResultForCompositePollEvent(CompositePollEvent.State state) { doAnswer(invocation -> { CompositePollEvent event = invocation.getArgument(0); - event.complete(state, Optional.empty()); + event.complete(CompositePollEvent.State.CALLBACKS_REQUIRED, Optional.empty()); return null; }).when(applicationEventHandler).add(ArgumentMatchers.isA(CompositePollEvent.class)); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index 9a9bfe36e59..d7feb04930a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -81,7 +81,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -@SuppressWarnings({"ClassDataAbstractionCoupling", "ClassFanOutComplexity"}) +@SuppressWarnings("ClassDataAbstractionCoupling") public class ApplicationEventProcessorTest { private final Time time = new MockTime(); private final CommitRequestManager commitRequestManager = mock(CommitRequestManager.class);