diff --git a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala index fc8f3240b71..83bc38fddcf 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala @@ -69,11 +69,13 @@ class ControllerEventManagerTest { val controllerStats = new ControllerStats val time = new MockTime() val latch = new CountDownLatch(1) + val processedEvents = new AtomicInteger() val eventProcessor = new ControllerEventProcessor { override def process(event: ControllerEvent): Unit = { latch.await() time.sleep(500) + processedEvents.incrementAndGet() } override def preempt(event: ControllerEvent): Unit = {} } @@ -89,12 +91,12 @@ class ControllerEventManagerTest { controllerEventManager.put(TopicChange) latch.countDown() + TestUtils.waitUntilTrue(() => processedEvents.get() == 2, + "Timed out waiting for processing of all events") + val queueTimeHistogram = Metrics.defaultRegistry.allMetrics.asScala.filterKeys(_.getMBeanName == metricName).values.headOption .getOrElse(fail(s"Unable to find metric $metricName")).asInstanceOf[Histogram] - TestUtils.waitUntilTrue(() => controllerEventManager.isEmpty, - "Timed out waiting for processing of all events") - assertEquals(2, queueTimeHistogram.count) assertEquals(0, queueTimeHistogram.min, 0.01) assertEquals(500, queueTimeHistogram.max, 0.01)