mirror of https://github.com/apache/kafka.git
fix
This commit is contained in:
parent
034c4981a9
commit
53098dbff9
|
@ -50,7 +50,7 @@ class KafkaRequestHandlerTest {
|
||||||
val topic2 = "topic2"
|
val topic2 = "topic2"
|
||||||
val brokerTopicMetrics: BrokerTopicMetrics = brokerTopicStats.topicStats(topic)
|
val brokerTopicMetrics: BrokerTopicMetrics = brokerTopicStats.topicStats(topic)
|
||||||
val allTopicMetrics: BrokerTopicMetrics = brokerTopicStats.allTopicsStats
|
val allTopicMetrics: BrokerTopicMetrics = brokerTopicStats.allTopicsStats
|
||||||
KafkaRequestHandlerPool.sharedAggregateTotalThreads.set(1)
|
KafkaRequestHandlerPool.aggregateThreads.set(1)
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testCallbackTiming(): Unit = {
|
def testCallbackTiming(): Unit = {
|
||||||
|
@ -60,7 +60,7 @@ class KafkaRequestHandlerTest {
|
||||||
val requestChannel = new RequestChannel(10, time, metrics)
|
val requestChannel = new RequestChannel(10, time, metrics)
|
||||||
val apiHandler = mock(classOf[ApiRequestHandler])
|
val apiHandler = mock(classOf[ApiRequestHandler])
|
||||||
try {
|
try {
|
||||||
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker", mock(classOf[Meter]))
|
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker")
|
||||||
|
|
||||||
val request = makeRequest(time, metrics)
|
val request = makeRequest(time, metrics)
|
||||||
requestChannel.sendRequest(request)
|
requestChannel.sendRequest(request)
|
||||||
|
@ -96,7 +96,7 @@ class KafkaRequestHandlerTest {
|
||||||
val metrics = mock(classOf[RequestChannelMetrics])
|
val metrics = mock(classOf[RequestChannelMetrics])
|
||||||
val apiHandler = mock(classOf[ApiRequestHandler])
|
val apiHandler = mock(classOf[ApiRequestHandler])
|
||||||
val requestChannel = new RequestChannel(10, time, metrics)
|
val requestChannel = new RequestChannel(10, time, metrics)
|
||||||
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker", mock(classOf[Meter]))
|
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker")
|
||||||
|
|
||||||
var handledCount = 0
|
var handledCount = 0
|
||||||
var tryCompleteActionCount = 0
|
var tryCompleteActionCount = 0
|
||||||
|
@ -132,7 +132,7 @@ class KafkaRequestHandlerTest {
|
||||||
val metrics = mock(classOf[RequestChannelMetrics])
|
val metrics = mock(classOf[RequestChannelMetrics])
|
||||||
val apiHandler = mock(classOf[ApiRequestHandler])
|
val apiHandler = mock(classOf[ApiRequestHandler])
|
||||||
val requestChannel = new RequestChannel(10, time, metrics)
|
val requestChannel = new RequestChannel(10, time, metrics)
|
||||||
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker", mock(classOf[Meter]))
|
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker")
|
||||||
|
|
||||||
val originalRequestLocal = mock(classOf[RequestLocal])
|
val originalRequestLocal = mock(classOf[RequestLocal])
|
||||||
|
|
||||||
|
@ -166,7 +166,7 @@ class KafkaRequestHandlerTest {
|
||||||
val metrics = mock(classOf[RequestChannelMetrics])
|
val metrics = mock(classOf[RequestChannelMetrics])
|
||||||
val apiHandler = mock(classOf[ApiRequestHandler])
|
val apiHandler = mock(classOf[ApiRequestHandler])
|
||||||
val requestChannel = new RequestChannel(10, time, metrics)
|
val requestChannel = new RequestChannel(10, time, metrics)
|
||||||
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker", mock(classOf[Meter]))
|
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time, "broker")
|
||||||
|
|
||||||
val originalRequestLocal = mock(classOf[RequestLocal])
|
val originalRequestLocal = mock(classOf[RequestLocal])
|
||||||
when(originalRequestLocal.bufferSupplier).thenReturn(BufferSupplier.create())
|
when(originalRequestLocal.bufferSupplier).thenReturn(BufferSupplier.create())
|
||||||
|
@ -710,7 +710,7 @@ class KafkaRequestHandlerTest {
|
||||||
val apiHandler = mock(classOf[ApiRequestHandler])
|
val apiHandler = mock(classOf[ApiRequestHandler])
|
||||||
|
|
||||||
// Reset global shared counter for test
|
// Reset global shared counter for test
|
||||||
KafkaRequestHandlerPool.sharedAggregateTotalThreads.set(0)
|
KafkaRequestHandlerPool.aggregateThreads.set(0)
|
||||||
|
|
||||||
// Create broker pool with 4 threads
|
// Create broker pool with 4 threads
|
||||||
val brokerPool = new KafkaRequestHandlerPool(
|
val brokerPool = new KafkaRequestHandlerPool(
|
||||||
|
@ -724,7 +724,7 @@ class KafkaRequestHandlerTest {
|
||||||
)
|
)
|
||||||
|
|
||||||
// Verify global counter is updated
|
// Verify global counter is updated
|
||||||
assertEquals(4, KafkaRequestHandlerPool.sharedAggregateTotalThreads.get, "global counter should be 4 after broker pool")
|
assertEquals(4, KafkaRequestHandlerPool.aggregateThreads.get, "global counter should be 4 after broker pool")
|
||||||
|
|
||||||
// Create controller pool with 4 threads
|
// Create controller pool with 4 threads
|
||||||
val controllerPool = new KafkaRequestHandlerPool(
|
val controllerPool = new KafkaRequestHandlerPool(
|
||||||
|
@ -738,7 +738,7 @@ class KafkaRequestHandlerTest {
|
||||||
)
|
)
|
||||||
|
|
||||||
// Verify global counter is updated to sum of both pools
|
// Verify global counter is updated to sum of both pools
|
||||||
assertEquals(8, KafkaRequestHandlerPool.sharedAggregateTotalThreads.get, "global counter should be 8 after both pools")
|
assertEquals(8, KafkaRequestHandlerPool.aggregateThreads.get, "global counter should be 8 after both pools")
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Get the aggregate meter from broker pool using reflection
|
// Get the aggregate meter from broker pool using reflection
|
||||||
|
@ -764,7 +764,7 @@ class KafkaRequestHandlerTest {
|
||||||
metricsController.close()
|
metricsController.close()
|
||||||
|
|
||||||
// Verify global counter is reset after shutdown
|
// Verify global counter is reset after shutdown
|
||||||
assertEquals(0, KafkaRequestHandlerPool.sharedAggregateTotalThreads.get, "global counter should be 0 after shutdown")
|
assertEquals(0, KafkaRequestHandlerPool.aggregateThreads.get, "global counter should be 0 after shutdown")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue