mirror of https://github.com/apache/kafka.git
KAFKA-8747; Add atomic counter to fix flaky testEventQueueTime test (#7320)
This patch adds an atomic counter in the test to ensure we have processed all the events before we assert the metrics. There was a race condition with the previous assertion, which asserted that the event queue is empty before checking the metrics. Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
parent
18246e509e
commit
7012fa3262
|
|
@ -69,11 +69,13 @@ class ControllerEventManagerTest {
|
|||
val controllerStats = new ControllerStats
|
||||
val time = new MockTime()
|
||||
val latch = new CountDownLatch(1)
|
||||
val processedEvents = new AtomicInteger()
|
||||
|
||||
val eventProcessor = new ControllerEventProcessor {
|
||||
override def process(event: ControllerEvent): Unit = {
|
||||
latch.await()
|
||||
time.sleep(500)
|
||||
processedEvents.incrementAndGet()
|
||||
}
|
||||
override def preempt(event: ControllerEvent): Unit = {}
|
||||
}
|
||||
|
|
@ -89,12 +91,12 @@ class ControllerEventManagerTest {
|
|||
controllerEventManager.put(TopicChange)
|
||||
latch.countDown()
|
||||
|
||||
TestUtils.waitUntilTrue(() => processedEvents.get() == 2,
|
||||
"Timed out waiting for processing of all events")
|
||||
|
||||
val queueTimeHistogram = Metrics.defaultRegistry.allMetrics.asScala.filterKeys(_.getMBeanName == metricName).values.headOption
|
||||
.getOrElse(fail(s"Unable to find metric $metricName")).asInstanceOf[Histogram]
|
||||
|
||||
TestUtils.waitUntilTrue(() => controllerEventManager.isEmpty,
|
||||
"Timed out waiting for processing of all events")
|
||||
|
||||
assertEquals(2, queueTimeHistogram.count)
|
||||
assertEquals(0, queueTimeHistogram.min, 0.01)
|
||||
assertEquals(500, queueTimeHistogram.max, 0.01)
|
||||
|
|
|
|||
Loading…
Reference in New Issue