From 29b940bef439a1819d9de254eb68f9f7e6684bc3 Mon Sep 17 00:00:00 2001 From: Shivsundar R Date: Thu, 4 Sep 2025 09:39:50 -0400 Subject: [PATCH] MINOR: Use drainEvents() in ShareConsumerImpl::processBackgroundEvents (#20474) *What* - Currently in `ShareConsumerImpl`, we were not resetting `background-event-queue-size` metric to 0 after draining the events from the queue. - This PR fixes it by using `BackgroundEventHandler::drainEvents` similar to `AsyncKafkaConsumer`. - Added a unit test to verify the metric is reset to 0 after draining the events. Reviewers: Andrew Schofield , Chia-Ping Tsai --- .../consumer/internals/ShareConsumerImpl.java | 21 ++++++++++++------- .../internals/ShareConsumerImplTest.java | 19 +++++++++++++++++ 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index a3f0d8ee808..12b01b5482e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -170,6 +170,7 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { private final String clientId; private final String groupId; private final BlockingQueue backgroundEventQueue; + private final BackgroundEventHandler backgroundEventHandler; private final BackgroundEventProcessor backgroundEventProcessor; private final CompletableEventReaper backgroundEventReaper; private final Deserializers deserializers; @@ -263,7 +264,7 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { ShareFetchMetricsManager shareFetchMetricsManager = createShareFetchMetricsManager(metrics); ApiVersions apiVersions = new ApiVersions(); final BlockingQueue applicationEventQueue = new LinkedBlockingQueue<>(); - final BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler( + this.backgroundEventHandler = new BackgroundEventHandler( backgroundEventQueue, time, asyncConsumerMetrics); // This FetchBuffer is shared between the application and network threads. @@ -378,8 +379,8 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics, CONSUMER_SHARE_METRIC_GROUP); final BlockingQueue applicationEventQueue = new LinkedBlockingQueue<>(); - final BlockingQueue backgroundEventQueue = new LinkedBlockingQueue<>(); - final BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler( + this.backgroundEventQueue = new LinkedBlockingQueue<>(); + this.backgroundEventHandler = new BackgroundEventHandler( backgroundEventQueue, time, asyncConsumerMetrics); final Supplier networkClientDelegateSupplier = @@ -419,7 +420,6 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { requestManagersSupplier, asyncConsumerMetrics); - this.backgroundEventQueue = new LinkedBlockingQueue<>(); this.backgroundEventProcessor = new BackgroundEventProcessor(); this.backgroundEventReaper = new CompletableEventReaper(logContext); @@ -468,6 +468,8 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { this.clientTelemetryReporter = Optional.empty(); this.completedAcknowledgements = Collections.emptyList(); this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics, CONSUMER_SHARE_METRIC_GROUP); + this.backgroundEventHandler = new BackgroundEventHandler( + backgroundEventQueue, time, asyncConsumerMetrics); } // auxiliary interface for testing @@ -1110,12 +1112,13 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { * It is possible that {@link ErrorEvent an error} * could occur when processing the events. In such cases, the processor will take a reference to the first * error, continue to process the remaining events, and then throw the first error that occurred. + * + * Visible for testing. */ - private boolean processBackgroundEvents() { + boolean processBackgroundEvents() { AtomicReference firstError = new AtomicReference<>(); - LinkedList events = new LinkedList<>(); - backgroundEventQueue.drainTo(events); + List events = backgroundEventHandler.drainEvents(); if (!events.isEmpty()) { long startMs = time.milliseconds(); for (BackgroundEvent event : events) { @@ -1234,6 +1237,10 @@ public class ShareConsumerImpl implements ShareConsumerDelegate { return metrics; } + AsyncConsumerMetrics asyncConsumerMetrics() { + return asyncConsumerMetrics; + } + @Override public KafkaShareConsumerMetrics kafkaShareConsumerMetrics() { return kafkaShareConsumerMetrics; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java index 09fc99d8e24..0fa3def7c15 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java @@ -26,11 +26,13 @@ import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.clients.consumer.internals.events.PollEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent; +import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent; import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent; import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent; import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent; import org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent; +import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; @@ -76,6 +78,7 @@ import java.util.function.Predicate; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -779,6 +782,22 @@ public class ShareConsumerImplTest { assertEquals(1000, timer.remainingMs()); } + @Test + public void testRecordBackgroundEventQueueSize() { + consumer = newConsumer(); + Metrics metrics = consumer.metricsRegistry(); + AsyncConsumerMetrics asyncConsumerMetrics = consumer.asyncConsumerMetrics(); + + ShareAcknowledgementCommitCallbackEvent event = new ShareAcknowledgementCommitCallbackEvent(Map.of()); + backgroundEventQueue.add(event); + asyncConsumerMetrics.recordBackgroundEventQueueSize(1); + + assertEquals(1, (double) metrics.metric(metrics.metricName("background-event-queue-size", CONSUMER_SHARE_METRIC_GROUP)).metricValue()); + + consumer.processBackgroundEvents(); + assertEquals(0, (double) metrics.metric(metrics.metricName("background-event-queue-size", CONSUMER_SHARE_METRIC_GROUP)).metricValue()); + } + /** * Tests {@link ShareConsumerImpl#processBackgroundEvents(Future, Timer, Predicate) processBackgroundEvents} * handles the case where the {@link Future} does not complete within the timeout.