From 53098dbff9770bbdf9e48b7a85d206df186d57b6 Mon Sep 17 00:00:00 2001 From: tony tang Date: Thu, 2 Oct 2025 12:12:56 -0500 Subject: [PATCH] fix --- .../kafka/server/KafkaRequestHandlerTest.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala b/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala index 8522492425f..4b4994585e4 100644 --- a/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala +++ b/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala @@ -50,7 +50,7 @@ class KafkaRequestHandlerTest { val topic2 = "topic2" val brokerTopicMetrics: BrokerTopicMetrics = brokerTopicStats.topicStats(topic) val allTopicMetrics: BrokerTopicMetrics = brokerTopicStats.allTopicsStats - KafkaRequestHandlerPool.sharedAggregateTotalThreads.set(1) + KafkaRequestHandlerPool.aggregateThreads.set(1) @Test def testCallbackTiming(): Unit = { @@ -60,7 +60,7 @@ class KafkaRequestHandlerTest { val requestChannel = new RequestChannel(10, time, metrics) val apiHandler = mock(classOf[ApiRequestHandler]) try { - val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker", mock(classOf[Meter])) + val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker") val request = makeRequest(time, metrics) requestChannel.sendRequest(request) @@ -96,7 +96,7 @@ class KafkaRequestHandlerTest { val metrics = mock(classOf[RequestChannelMetrics]) val apiHandler = mock(classOf[ApiRequestHandler]) val requestChannel = new RequestChannel(10, time, metrics) - val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker", mock(classOf[Meter])) + val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker") var handledCount = 0 var tryCompleteActionCount = 0 @@ -132,7 +132,7 @@ class KafkaRequestHandlerTest { val metrics = mock(classOf[RequestChannelMetrics]) val apiHandler = mock(classOf[ApiRequestHandler]) val requestChannel = new RequestChannel(10, time, metrics) - val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker", mock(classOf[Meter])) + val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker") val originalRequestLocal = mock(classOf[RequestLocal]) @@ -166,7 +166,7 @@ class KafkaRequestHandlerTest { val metrics = mock(classOf[RequestChannelMetrics]) val apiHandler = mock(classOf[ApiRequestHandler]) val requestChannel = new RequestChannel(10, time, metrics) - val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker", mock(classOf[Meter])) + val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker") val originalRequestLocal = mock(classOf[RequestLocal]) when(originalRequestLocal.bufferSupplier).thenReturn(BufferSupplier.create()) @@ -710,7 +710,7 @@ class KafkaRequestHandlerTest { val apiHandler = mock(classOf[ApiRequestHandler]) // Reset global shared counter for test - KafkaRequestHandlerPool.sharedAggregateTotalThreads.set(0) + KafkaRequestHandlerPool.aggregateThreads.set(0) // Create broker pool with 4 threads val brokerPool = new KafkaRequestHandlerPool( @@ -724,7 +724,7 @@ class KafkaRequestHandlerTest { ) // Verify global counter is updated - assertEquals(4, KafkaRequestHandlerPool.sharedAggregateTotalThreads.get, "global counter should be 4 after broker pool") + assertEquals(4, KafkaRequestHandlerPool.aggregateThreads.get, "global counter should be 4 after broker pool") // Create controller pool with 4 threads val controllerPool = new KafkaRequestHandlerPool( @@ -738,7 +738,7 @@ class KafkaRequestHandlerTest { ) // Verify global counter is updated to sum of both pools - assertEquals(8, KafkaRequestHandlerPool.sharedAggregateTotalThreads.get, "global counter should be 8 after both pools") + assertEquals(8, KafkaRequestHandlerPool.aggregateThreads.get, "global counter should be 8 after both pools") try { // Get the aggregate meter from broker pool using reflection @@ -764,7 +764,7 @@ class KafkaRequestHandlerTest { metricsController.close() // Verify global counter is reset after shutdown - assertEquals(0, KafkaRequestHandlerPool.sharedAggregateTotalThreads.get, "global counter should be 0 after shutdown") + assertEquals(0, KafkaRequestHandlerPool.aggregateThreads.get, "global counter should be 0 after shutdown") } } }