mirror of https://github.com/apache/kafka.git
KAFKA-18415: Fix for event queue metric and flaky test (#18416)
Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
parent
0377e807ff
commit
0721d21a57
|
@ -81,8 +81,10 @@ public class ApplicationEventHandler implements Closeable {
|
|||
public void add(final ApplicationEvent event) {
|
||||
Objects.requireNonNull(event, "ApplicationEvent provided to add must be non-null");
|
||||
event.setEnqueuedMs(time.milliseconds());
|
||||
// Record the updated queue size before actually adding the event to the queue
|
||||
// to avoid race conditions (the background thread is continuously removing from this queue)
|
||||
asyncConsumerMetrics.recordApplicationEventQueueSize(applicationEventQueue.size() + 1);
|
||||
applicationEventQueue.add(event);
|
||||
asyncConsumerMetrics.recordApplicationEventQueueSize(applicationEventQueue.size());
|
||||
wakeupNetworkThread();
|
||||
}
|
||||
|
||||
|
|
|
@ -53,8 +53,8 @@ public class BackgroundEventHandler {
|
|||
public void add(BackgroundEvent event) {
|
||||
Objects.requireNonNull(event, "BackgroundEvent provided to add must be non-null");
|
||||
event.setEnqueuedMs(time.milliseconds());
|
||||
asyncConsumerMetrics.recordBackgroundEventQueueSize(backgroundEventQueue.size() + 1);
|
||||
backgroundEventQueue.add(event);
|
||||
asyncConsumerMetrics.recordBackgroundEventQueueSize(backgroundEventQueue.size());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -32,8 +32,9 @@ import org.junit.jupiter.api.Test;
|
|||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
public class ApplicationEventHandlerTest {
|
||||
private final Time time = new MockTime();
|
||||
|
@ -46,7 +47,7 @@ public class ApplicationEventHandlerTest {
|
|||
@Test
|
||||
public void testRecordApplicationEventQueueSize() {
|
||||
try (Metrics metrics = new Metrics();
|
||||
AsyncConsumerMetrics asyncConsumerMetrics = new AsyncConsumerMetrics(metrics);
|
||||
AsyncConsumerMetrics asyncConsumerMetrics = spy(new AsyncConsumerMetrics(metrics));
|
||||
ApplicationEventHandler applicationEventHandler = new ApplicationEventHandler(
|
||||
new LogContext(),
|
||||
time,
|
||||
|
@ -59,15 +60,7 @@ public class ApplicationEventHandlerTest {
|
|||
)) {
|
||||
// add event
|
||||
applicationEventHandler.add(new PollEvent(time.milliseconds()));
|
||||
assertEquals(
|
||||
1,
|
||||
(double) metrics.metric(
|
||||
metrics.metricName(
|
||||
AsyncConsumerMetrics.APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME,
|
||||
ConsumerUtils.CONSUMER_METRIC_GROUP
|
||||
)
|
||||
).metricValue()
|
||||
);
|
||||
verify(asyncConsumerMetrics).recordApplicationEventQueueSize(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue