MINOR: Use drainEvents() in ShareConsumerImpl::processBackgroundEvents (#20474)

*What*

- Currently in `ShareConsumerImpl`, we were not resetting
`background-event-queue-size` metric to 0 after draining the events from
the queue.
- This PR fixes it by using `BackgroundEventHandler::drainEvents`
similar to `AsyncKafkaConsumer`.
- Added a unit test to verify the metric is reset to 0 after draining
the events.

Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai
<chia7712@gmail.com>
This commit is contained in:
Shivsundar R 2025-09-04 09:39:50 -04:00 committed by GitHub
parent a81f08d368
commit 29b940bef4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 33 additions and 7 deletions

View File

@ -170,6 +170,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
private final String clientId;
private final String groupId;
private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
private final BackgroundEventHandler backgroundEventHandler;
private final BackgroundEventProcessor backgroundEventProcessor;
private final CompletableEventReaper backgroundEventReaper;
private final Deserializers<K, V> deserializers;
@ -263,7 +264,7 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
ShareFetchMetricsManager shareFetchMetricsManager = createShareFetchMetricsManager(metrics);
ApiVersions apiVersions = new ApiVersions();
final BlockingQueue<ApplicationEvent> applicationEventQueue = new LinkedBlockingQueue<>();
final BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler(
this.backgroundEventHandler = new BackgroundEventHandler(
backgroundEventQueue, time, asyncConsumerMetrics);
// This FetchBuffer is shared between the application and network threads.
@ -378,8 +379,8 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics, CONSUMER_SHARE_METRIC_GROUP);
final BlockingQueue<ApplicationEvent> applicationEventQueue = new LinkedBlockingQueue<>();
final BlockingQueue<BackgroundEvent> backgroundEventQueue = new LinkedBlockingQueue<>();
final BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler(
this.backgroundEventQueue = new LinkedBlockingQueue<>();
this.backgroundEventHandler = new BackgroundEventHandler(
backgroundEventQueue, time, asyncConsumerMetrics);
final Supplier<NetworkClientDelegate> networkClientDelegateSupplier =
@ -419,7 +420,6 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
requestManagersSupplier,
asyncConsumerMetrics);
this.backgroundEventQueue = new LinkedBlockingQueue<>();
this.backgroundEventProcessor = new BackgroundEventProcessor();
this.backgroundEventReaper = new CompletableEventReaper(logContext);
@ -468,6 +468,8 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
this.clientTelemetryReporter = Optional.empty();
this.completedAcknowledgements = Collections.emptyList();
this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics, CONSUMER_SHARE_METRIC_GROUP);
this.backgroundEventHandler = new BackgroundEventHandler(
backgroundEventQueue, time, asyncConsumerMetrics);
}
// auxiliary interface for testing
@ -1110,12 +1112,13 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
* It is possible that {@link ErrorEvent an error}
* could occur when processing the events. In such cases, the processor will take a reference to the first
* error, continue to process the remaining events, and then throw the first error that occurred.
*
* Visible for testing.
*/
private boolean processBackgroundEvents() {
boolean processBackgroundEvents() {
AtomicReference<KafkaException> firstError = new AtomicReference<>();
LinkedList<BackgroundEvent> events = new LinkedList<>();
backgroundEventQueue.drainTo(events);
List<BackgroundEvent> events = backgroundEventHandler.drainEvents();
if (!events.isEmpty()) {
long startMs = time.milliseconds();
for (BackgroundEvent event : events) {
@ -1234,6 +1237,10 @@ public class ShareConsumerImpl<K, V> implements ShareConsumerDelegate<K, V> {
return metrics;
}
AsyncConsumerMetrics asyncConsumerMetrics() {
return asyncConsumerMetrics;
}
@Override
public KafkaShareConsumerMetrics kafkaShareConsumerMetrics() {
return kafkaShareConsumerMetrics;

View File

@ -26,11 +26,13 @@ import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
import org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
@ -76,6 +78,7 @@ import java.util.function.Predicate;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -779,6 +782,22 @@ public class ShareConsumerImplTest {
assertEquals(1000, timer.remainingMs());
}
@Test
public void testRecordBackgroundEventQueueSize() {
consumer = newConsumer();
Metrics metrics = consumer.metricsRegistry();
AsyncConsumerMetrics asyncConsumerMetrics = consumer.asyncConsumerMetrics();
ShareAcknowledgementCommitCallbackEvent event = new ShareAcknowledgementCommitCallbackEvent(Map.of());
backgroundEventQueue.add(event);
asyncConsumerMetrics.recordBackgroundEventQueueSize(1);
assertEquals(1, (double) metrics.metric(metrics.metricName("background-event-queue-size", CONSUMER_SHARE_METRIC_GROUP)).metricValue());
consumer.processBackgroundEvents();
assertEquals(0, (double) metrics.metric(metrics.metricName("background-event-queue-size", CONSUMER_SHARE_METRIC_GROUP)).metricValue());
}
/**
* Tests {@link ShareConsumerImpl#processBackgroundEvents(Future, Timer, Predicate) processBackgroundEvents}
* handles the case where the {@link Future} does not complete within the timeout.