This commit is contained in:
tony tang 2025-09-19 15:18:29 -05:00
parent 4bca99e040
commit 034c4981a9
No known key found for this signature in database
1 changed files with 9 additions and 10 deletions

View File

@ -24,7 +24,7 @@ import kafka.server.KafkaRequestHandler.{threadCurrentRequest, threadRequestChan
import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import com.yammer.metrics.core.Meter import com.yammer.metrics.core.Meter
import kafka.server.KafkaRequestHandlerPool.sharedAggregateTotalThreads import kafka.server.KafkaRequestHandlerPool.aggregateThreads
import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.utils.{Exit, KafkaThread, Time} import org.apache.kafka.common.utils.{Exit, KafkaThread, Time}
import org.apache.kafka.server.common.RequestLocal import org.apache.kafka.server.common.RequestLocal
@ -90,12 +90,12 @@ class KafkaRequestHandler(
id: Int, id: Int,
brokerId: Int, brokerId: Int,
val aggregateIdleMeter: Meter, val aggregateIdleMeter: Meter,
val perPoolIdleMeter: Meter,
val totalHandlerThreads: AtomicInteger, val totalHandlerThreads: AtomicInteger,
val requestChannel: RequestChannel, val requestChannel: RequestChannel,
apis: ApiRequestHandler, apis: ApiRequestHandler,
time: Time, time: Time,
nodeName: String = "broker", nodeName: String = "broker",
val perPoolIdleMeter: Meter,
) extends Runnable with Logging { ) extends Runnable with Logging {
this.logIdent = s"[Kafka Request Handler $id on ${nodeName.capitalize} $brokerId] " this.logIdent = s"[Kafka Request Handler $id on ${nodeName.capitalize} $brokerId] "
private val shutdownComplete = new CountDownLatch(1) private val shutdownComplete = new CountDownLatch(1)
@ -117,7 +117,7 @@ class KafkaRequestHandler(
// Per-pool idle ratio uses the pool's own thread count as denominator // Per-pool idle ratio uses the pool's own thread count as denominator
perPoolIdleMeter.mark(idleTime / totalHandlerThreads.get) perPoolIdleMeter.mark(idleTime / totalHandlerThreads.get)
// Aggregate idle ratio uses the total threads across all pools as denominator // Aggregate idle ratio uses the total threads across all pools as denominator
aggregateIdleMeter.mark(idleTime / sharedAggregateTotalThreads.get) aggregateIdleMeter.mark(idleTime / aggregateThreads.get)
req match { req match {
case RequestChannel.ShutdownRequest => case RequestChannel.ShutdownRequest =>
@ -198,7 +198,7 @@ class KafkaRequestHandler(
} }
object KafkaRequestHandlerPool { object KafkaRequestHandlerPool {
val sharedAggregateTotalThreads = new AtomicInteger(0) val aggregateThreads = new AtomicInteger(0)
} }
class KafkaRequestHandlerPool( class KafkaRequestHandlerPool(
@ -214,15 +214,13 @@ class KafkaRequestHandlerPool(
val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads) val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
/* Per-pool idle meter (broker-only or controller-only) */ /* Per-pool idle meter (broker-only or controller-only) */
private val perPoolIdleMeterName = if (nodeName == "controller") "ControllerRequestHandlerAvgIdlePercent" else "BrokerRequestHandlerAvgIdlePercent" private val perPoolIdleMeterName = nodeName + "RequestHandlerAvgIdlePercent"
private val perPoolIdleMeter = metricsGroup.newMeter(perPoolIdleMeterName, "percent", TimeUnit.NANOSECONDS) private val perPoolIdleMeter = metricsGroup.newMeter(perPoolIdleMeterName, "percent", TimeUnit.NANOSECONDS)
/* Aggregate meter to track the average free capacity of the request handlers */ /* Aggregate meter to track the average free capacity of the request handlers */
private val aggregateIdleMeter = metricsGroup.newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS) private val aggregateIdleMeter = metricsGroup.newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS)
this.logIdent = s"[data-plane Kafka Request Handler on ${nodeName.capitalize} $brokerId] " this.logIdent = s"[data-plane Kafka Request Handler on ${nodeName.capitalize} $brokerId] "
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads) val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
// when using shared aggregate counter, register this pool's threads
sharedAggregateTotalThreads.addAndGet(numThreads)
for (i <- 0 until numThreads) { for (i <- 0 until numThreads) {
createHandler(i) createHandler(i)
} }
@ -232,13 +230,14 @@ class KafkaRequestHandlerPool(
id, id,
brokerId, brokerId,
aggregateIdleMeter, aggregateIdleMeter,
perPoolIdleMeter,
threadPoolSize, threadPoolSize,
requestChannel, requestChannel,
apis, apis,
time, time,
nodeName, nodeName,
perPoolIdleMeter
) )
aggregateThreads.getAndIncrement()
KafkaThread.daemon("data-plane-kafka-request-handler-" + id, runnables(id)).start() KafkaThread.daemon("data-plane-kafka-request-handler-" + id, runnables(id)).start()
} }
@ -252,9 +251,9 @@ class KafkaRequestHandlerPool(
} else if (newSize < currentSize) { } else if (newSize < currentSize) {
for (i <- 1 to (currentSize - newSize)) { for (i <- 1 to (currentSize - newSize)) {
runnables.remove(currentSize - i).stop() runnables.remove(currentSize - i).stop()
aggregateThreads.getAndDecrement()
} }
} }
sharedAggregateTotalThreads.addAndGet(newSize - currentSize)
threadPoolSize.set(newSize) threadPoolSize.set(newSize)
} }
@ -265,7 +264,7 @@ class KafkaRequestHandlerPool(
for (handler <- runnables) for (handler <- runnables)
handler.awaitShutdown() handler.awaitShutdown()
// Unregister this pool's threads from shared aggregate counter // Unregister this pool's threads from shared aggregate counter
sharedAggregateTotalThreads.addAndGet(-threadPoolSize.get) aggregateThreads.addAndGet(-threadPoolSize.get)
info("shut down completely") info("shut down completely")
} }
} }