diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java index dd6a1666c7b..6ab827b617c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java @@ -81,8 +81,10 @@ public class ApplicationEventHandler implements Closeable { public void add(final ApplicationEvent event) { Objects.requireNonNull(event, "ApplicationEvent provided to add must be non-null"); event.setEnqueuedMs(time.milliseconds()); + // Record the updated queue size before actually adding the event to the queue + // to avoid race conditions (the background thread is continuously removing from this queue) + asyncConsumerMetrics.recordApplicationEventQueueSize(applicationEventQueue.size() + 1); applicationEventQueue.add(event); - asyncConsumerMetrics.recordApplicationEventQueueSize(applicationEventQueue.size()); wakeupNetworkThread(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java index adc621d5f2e..3e83908f3df 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java @@ -53,8 +53,8 @@ public class BackgroundEventHandler { public void add(BackgroundEvent event) { Objects.requireNonNull(event, "BackgroundEvent provided to add must be non-null"); event.setEnqueuedMs(time.milliseconds()); + asyncConsumerMetrics.recordBackgroundEventQueueSize(backgroundEventQueue.size() + 1); backgroundEventQueue.add(event); - asyncConsumerMetrics.recordBackgroundEventQueueSize(backgroundEventQueue.size()); } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java index a8ce990a23d..3430719b16e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java @@ -32,8 +32,9 @@ import org.junit.jupiter.api.Test; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; public class ApplicationEventHandlerTest { private final Time time = new MockTime(); @@ -46,7 +47,7 @@ public class ApplicationEventHandlerTest { @Test public void testRecordApplicationEventQueueSize() { try (Metrics metrics = new Metrics(); - AsyncConsumerMetrics asyncConsumerMetrics = new AsyncConsumerMetrics(metrics); + AsyncConsumerMetrics asyncConsumerMetrics = spy(new AsyncConsumerMetrics(metrics)); ApplicationEventHandler applicationEventHandler = new ApplicationEventHandler( new LogContext(), time, @@ -59,15 +60,7 @@ public class ApplicationEventHandlerTest { )) { // add event applicationEventHandler.add(new PollEvent(time.milliseconds())); - assertEquals( - 1, - (double) metrics.metric( - metrics.metricName( - AsyncConsumerMetrics.APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME, - ConsumerUtils.CONSUMER_METRIC_GROUP - ) - ).metricValue() - ); + verify(asyncConsumerMetrics).recordApplicationEventQueueSize(1); } } }