mirror of https://github.com/apache/kafka.git
fix
This commit is contained in:
parent
5db6b53401
commit
90655da996
|
@ -739,19 +739,17 @@ class KafkaRequestHandlerTest {
|
||||||
assertEquals(8, KafkaRequestHandlerPool.aggregateThreads.get, "global counter should be 8 after both pools")
|
assertEquals(8, KafkaRequestHandlerPool.aggregateThreads.get, "global counter should be 8 after both pools")
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Get the aggregate meter from broker pool using reflection
|
|
||||||
val aggregateMeterField = classOf[KafkaRequestHandlerPool].getDeclaredField("aggregateIdleMeter")
|
val aggregateMeterField = classOf[KafkaRequestHandlerPool].getDeclaredField("aggregateIdleMeter")
|
||||||
aggregateMeterField.setAccessible(true)
|
aggregateMeterField.setAccessible(true)
|
||||||
val aggregateMeter = aggregateMeterField.get(brokerPool).asInstanceOf[Meter]
|
val aggregateMeter = aggregateMeterField.get(brokerPool).asInstanceOf[Meter]
|
||||||
|
|
||||||
// Get the per-pool idle meters from both pools using reflection
|
|
||||||
val perPoolIdleMeterField = classOf[KafkaRequestHandlerPool].getDeclaredField("perPoolIdleMeter")
|
val perPoolIdleMeterField = classOf[KafkaRequestHandlerPool].getDeclaredField("perPoolIdleMeter")
|
||||||
perPoolIdleMeterField.setAccessible(true)
|
perPoolIdleMeterField.setAccessible(true)
|
||||||
val brokerPerPoolIdleMeter = perPoolIdleMeterField.get(brokerPool).asInstanceOf[Meter]
|
val brokerPerPoolIdleMeter = perPoolIdleMeterField.get(brokerPool).asInstanceOf[Meter]
|
||||||
val controllerPerPoolIdleMeter = perPoolIdleMeterField.get(controllerPool).asInstanceOf[Meter]
|
val controllerPerPoolIdleMeter = perPoolIdleMeterField.get(controllerPool).asInstanceOf[Meter]
|
||||||
|
|
||||||
// Wait for idle measurements to accumulate
|
// Wait for idle measurements to accumulate
|
||||||
val deadline = System.currentTimeMillis() + 10000
|
val deadline = System.currentTimeMillis() + 8000
|
||||||
var aggregateValue = 0.0
|
var aggregateValue = 0.0
|
||||||
var brokerPerPoolValue = 0.0
|
var brokerPerPoolValue = 0.0
|
||||||
var controllerPerPoolValue = 0.0
|
var controllerPerPoolValue = 0.0
|
||||||
|
@ -763,15 +761,24 @@ class KafkaRequestHandlerTest {
|
||||||
}
|
}
|
||||||
print(s"Aggregate: $aggregateValue, Broker PerPool: $brokerPerPoolValue, Controller PerPool: $controllerPerPoolValue")
|
print(s"Aggregate: $aggregateValue, Broker PerPool: $brokerPerPoolValue, Controller PerPool: $controllerPerPoolValue")
|
||||||
|
|
||||||
// Verify that the aggregate meter shows reasonable idle percentage
|
// Verify that the meter shows reasonable idle percentage
|
||||||
// Since both pools are hitting the same global counter (8 threads), the rate should be normalized
|
|
||||||
assertTrue(aggregateValue >= 0.0 && aggregateValue <= 1.00, s"aggregate idle percent should be within [0,1], got $aggregateValue")
|
assertTrue(aggregateValue >= 0.0 && aggregateValue <= 1.00, s"aggregate idle percent should be within [0,1], got $aggregateValue")
|
||||||
|
|
||||||
// Verify that per-pool idle meters show reasonable values
|
|
||||||
// Each pool has 4 threads, so per-pool idle should be normalized by the pool size (4)
|
|
||||||
assertTrue(brokerPerPoolValue >= 0.0 && brokerPerPoolValue <= 1.00, s"broker per-pool idle percent should be within [0,1], got $brokerPerPoolValue")
|
assertTrue(brokerPerPoolValue >= 0.0 && brokerPerPoolValue <= 1.00, s"broker per-pool idle percent should be within [0,1], got $brokerPerPoolValue")
|
||||||
assertTrue(controllerPerPoolValue >= 0.0 && controllerPerPoolValue <= 1.00, s"controller per-pool idle percent should be within [0,1], got $controllerPerPoolValue")
|
assertTrue(controllerPerPoolValue >= 0.0 && controllerPerPoolValue <= 1.00, s"controller per-pool idle percent should be within [0,1], got $controllerPerPoolValue")
|
||||||
|
|
||||||
|
// Test pool resizing
|
||||||
|
// Shrink broker pool from 4 to 2 threads
|
||||||
|
brokerPool.resizeThreadPool(2)
|
||||||
|
assertEquals(2, brokerPool.threadPoolSize.get)
|
||||||
|
assertEquals(4, controllerPool.threadPoolSize.get)
|
||||||
|
assertEquals(6, KafkaRequestHandlerPool.aggregateThreads.get)
|
||||||
|
|
||||||
|
// Expand controller pool from 4 to 6 threads
|
||||||
|
controllerPool.resizeThreadPool(6)
|
||||||
|
assertEquals(2, brokerPool.threadPoolSize.get)
|
||||||
|
assertEquals(6, controllerPool.threadPoolSize.get)
|
||||||
|
assertEquals(8, KafkaRequestHandlerPool.aggregateThreads.get)
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
controllerPool.shutdown()
|
controllerPool.shutdown()
|
||||||
brokerPool.shutdown()
|
brokerPool.shutdown()
|
||||||
|
|
Loading…
Reference in New Issue