KAFKA-19030 Remove metricNamePrefix from RequestChannel (#19374)

As described in the JIRA ticket, `controlPlaneRequestChannelOpt` was
removed from KRaft mode, so there's no need to use the metrics prefix
anymore.

This change removes `metricNamePrefix` from RequestChannel and the
related files.

It also removes `DataPlaneAcceptor#MetricPrefix`, since
`DataPlaneAcceptor`  is the only implementation of `Acceptor`.

Since the implementation of KIP-291 is essentially removed, we can also
remove `logAndThreadNamePrefix` and `DataPlaneAcceptor#ThreadPrefix`.

Reviewers: PoAn Yang <payang@apache.org>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Parker Chang 2025-04-12 23:22:40 +08:00 committed by GitHub
parent 7863b35064
commit c8fe551139
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 22 additions and 37 deletions

View File

@ -340,7 +340,6 @@ object RequestChannel extends Logging {
} }
class RequestChannel(val queueSize: Int, class RequestChannel(val queueSize: Int,
val metricNamePrefix: String,
time: Time, time: Time,
val metrics: RequestChannelMetrics) { val metrics: RequestChannelMetrics) {
import RequestChannel._ import RequestChannel._
@ -349,13 +348,11 @@ class RequestChannel(val queueSize: Int,
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize) private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
private val processors = new ConcurrentHashMap[Int, Processor]() private val processors = new ConcurrentHashMap[Int, Processor]()
private val requestQueueSizeMetricName = metricNamePrefix.concat(RequestQueueSizeMetric)
private val responseQueueSizeMetricName = metricNamePrefix.concat(ResponseQueueSizeMetric)
private val callbackQueue = new ArrayBlockingQueue[BaseRequest](queueSize) private val callbackQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
metricsGroup.newGauge(requestQueueSizeMetricName, () => requestQueue.size) metricsGroup.newGauge(RequestQueueSizeMetric, () => requestQueue.size)
metricsGroup.newGauge(responseQueueSizeMetricName, () => { metricsGroup.newGauge(ResponseQueueSizeMetric, () => {
processors.values.asScala.foldLeft(0) {(total, processor) => processors.values.asScala.foldLeft(0) {(total, processor) =>
total + processor.responseQueueSize total + processor.responseQueueSize
} }
@ -365,13 +362,13 @@ class RequestChannel(val queueSize: Int,
if (processors.putIfAbsent(processor.id, processor) != null) if (processors.putIfAbsent(processor.id, processor) != null)
warn(s"Unexpected processor with processorId ${processor.id}") warn(s"Unexpected processor with processorId ${processor.id}")
metricsGroup.newGauge(responseQueueSizeMetricName, () => processor.responseQueueSize, metricsGroup.newGauge(ResponseQueueSizeMetric, () => processor.responseQueueSize,
Map(ProcessorMetricTag -> processor.id.toString).asJava) Map(ProcessorMetricTag -> processor.id.toString).asJava)
} }
def removeProcessor(processorId: Int): Unit = { def removeProcessor(processorId: Int): Unit = {
processors.remove(processorId) processors.remove(processorId)
metricsGroup.removeMetric(responseQueueSizeMetricName, Map(ProcessorMetricTag -> processorId.toString).asJava) metricsGroup.removeMetric(ResponseQueueSizeMetric, Map(ProcessorMetricTag -> processorId.toString).asJava)
} }
/** Send a request to be handled, potentially blocking until there is room in the queue for the request */ /** Send a request to be handled, potentially blocking until there is room in the queue for the request */

View File

@ -97,7 +97,7 @@ class SocketServer(
private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
// data-plane // data-plane
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, DataPlaneAcceptor]() private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, DataPlaneAcceptor]()
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics) val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, time, apiVersionManager.newRequestMetrics)
private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0) private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0)
val connectionQuotas = new ConnectionQuotas(config, time, metrics) val connectionQuotas = new ConnectionQuotas(config, time, metrics)
@ -113,7 +113,7 @@ class SocketServer(
private var stopped = false private var stopped = false
// Socket server metrics // Socket server metrics
metricsGroup.newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () => SocketServer.this.synchronized { metricsGroup.newGauge(s"NetworkProcessorAvgIdlePercent", () => SocketServer.this.synchronized {
val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => a.processors) val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => a.processors)
val ioWaitRatioMetricNames = dataPlaneProcessors.map { p => val ioWaitRatioMetricNames = dataPlaneProcessors.map { p =>
metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags) metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
@ -129,7 +129,7 @@ class SocketServer(
metricsGroup.newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory) metricsGroup.newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory)
metricsGroup.newGauge("MemoryPoolUsed", () => memoryPool.size() - memoryPool.availableMemory) metricsGroup.newGauge("MemoryPoolUsed", () => memoryPool.size() - memoryPool.availableMemory)
metricsGroup.newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", () => SocketServer.this.synchronized { metricsGroup.newGauge(s"ExpiredConnectionsKilledCount", () => SocketServer.this.synchronized {
val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => a.processors) val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => a.processors)
val expiredConnectionsKilledCountMetricNames = dataPlaneProcessors.map { p => val expiredConnectionsKilledCountMetricNames = dataPlaneProcessors.map { p =>
metrics.metricName("expired-connections-killed-count", MetricsGroup, p.metricTags) metrics.metricName("expired-connections-killed-count", MetricsGroup, p.metricTags)
@ -370,8 +370,6 @@ object SocketServer {
} }
object DataPlaneAcceptor { object DataPlaneAcceptor {
val ThreadPrefix: String = "data-plane"
val MetricPrefix: String = ""
val ListenerReconfigurableConfigs: Set[String] = Set(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG) val ListenerReconfigurableConfigs: Set[String] = Set(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG)
} }
@ -402,9 +400,6 @@ class DataPlaneAcceptor(socketServer: SocketServer,
memoryPool, memoryPool,
apiVersionManager) with ListenerReconfigurable { apiVersionManager) with ListenerReconfigurable {
override def metricPrefix(): String = DataPlaneAcceptor.MetricPrefix
override def threadPrefix(): String = DataPlaneAcceptor.ThreadPrefix
/** /**
* Returns the listener name associated with this reconfigurable. Listener-specific * Returns the listener name associated with this reconfigurable. Listener-specific
* configs corresponding to this listener name are provided for reconfiguration. * configs corresponding to this listener name are provided for reconfiguration.
@ -495,9 +490,6 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
val shouldRun = new AtomicBoolean(true) val shouldRun = new AtomicBoolean(true)
def metricPrefix(): String
def threadPrefix(): String
private val sendBufferSize = config.socketSendBufferBytes private val sendBufferSize = config.socketSendBufferBytes
private val recvBufferSize = config.socketReceiveBufferBytes private val recvBufferSize = config.socketReceiveBufferBytes
private val listenBacklogSize = config.socketListenBacklogSize private val listenBacklogSize = config.socketListenBacklogSize
@ -522,7 +514,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
// Build the metric name explicitly in order to keep the existing name for compatibility // Build the metric name explicitly in order to keep the existing name for compatibility
private val backwardCompatibilityMetricGroup = new KafkaMetricsGroup("kafka.network", "Acceptor") private val backwardCompatibilityMetricGroup = new KafkaMetricsGroup("kafka.network", "Acceptor")
private val blockedPercentMeterMetricName = backwardCompatibilityMetricGroup.metricName( private val blockedPercentMeterMetricName = backwardCompatibilityMetricGroup.metricName(
s"${metricPrefix()}AcceptorBlockedPercent", "AcceptorBlockedPercent",
Map(ListenerMetricTag -> endPoint.listenerName.value).asJava) Map(ListenerMetricTag -> endPoint.listenerName.value).asJava)
private val blockedPercentMeter = metricsGroup.newMeter(blockedPercentMeterMetricName,"blocked time", TimeUnit.NANOSECONDS) private val blockedPercentMeter = metricsGroup.newMeter(blockedPercentMeterMetricName,"blocked time", TimeUnit.NANOSECONDS)
private var currentProcessorIndex = 0 private var currentProcessorIndex = 0
@ -531,7 +523,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
private[network] val startedFuture = new CompletableFuture[Void]() private[network] val startedFuture = new CompletableFuture[Void]()
val thread: KafkaThread = KafkaThread.nonDaemon( val thread: KafkaThread = KafkaThread.nonDaemon(
s"${threadPrefix()}-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}", s"data-plane-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}",
this) this)
def start(): Unit = synchronized { def start(): Unit = synchronized {
@ -769,7 +761,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
listenerName: ListenerName, listenerName: ListenerName,
securityProtocol: SecurityProtocol, securityProtocol: SecurityProtocol,
connectionDisconnectListeners: Seq[ConnectionDisconnectListener]): Processor = { connectionDisconnectListeners: Seq[ConnectionDisconnectListener]): Processor = {
val name = s"${threadPrefix()}-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-$id" val name = s"data-plane-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-$id"
new Processor(id, new Processor(id,
time, time,
config.socketRequestMaxBytes, config.socketRequestMaxBytes,

View File

@ -21,7 +21,7 @@ import kafka.coordinator.group.{CoordinatorLoaderImpl, CoordinatorPartitionWrite
import kafka.coordinator.transaction.TransactionCoordinator import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.LogManager import kafka.log.LogManager
import kafka.log.remote.RemoteLogManager import kafka.log.remote.RemoteLogManager
import kafka.network.{DataPlaneAcceptor, SocketServer} import kafka.network.SocketServer
import kafka.raft.KafkaRaftManager import kafka.raft.KafkaRaftManager
import kafka.server.metadata._ import kafka.server.metadata._
import kafka.server.share.{ShareCoordinatorMetadataCacheHelperImpl, SharePartitionManager} import kafka.server.share.{ShareCoordinatorMetadataCacheHelperImpl, SharePartitionManager}
@ -470,8 +470,7 @@ class BrokerServer(
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
config.numIoThreads, s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent", config.numIoThreads, "RequestHandlerAvgIdlePercent")
DataPlaneAcceptor.ThreadPrefix)
// Start RemoteLogManager before initializing broker metadata publishers. // Start RemoteLogManager before initializing broker metadata publishers.
remoteLogManagerOpt.foreach { rlm => remoteLogManagerOpt.foreach { rlm =>

View File

@ -17,7 +17,7 @@
package kafka.server package kafka.server
import kafka.network.{DataPlaneAcceptor, SocketServer} import kafka.network.SocketServer
import kafka.raft.KafkaRaftManager import kafka.raft.KafkaRaftManager
import kafka.server.QuotaFactory.QuotaManagers import kafka.server.QuotaFactory.QuotaManagers
@ -285,8 +285,7 @@ class ControllerServer(
controllerApis, controllerApis,
time, time,
config.numIoThreads, config.numIoThreads,
s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent", "RequestHandlerAvgIdlePercent",
DataPlaneAcceptor.ThreadPrefix,
"controller") "controller")
// Set up the metadata cache publisher. // Set up the metadata cache publisher.

View File

@ -199,7 +199,6 @@ class KafkaRequestHandlerPool(
time: Time, time: Time,
numThreads: Int, numThreads: Int,
requestHandlerAvgIdleMetricName: String, requestHandlerAvgIdleMetricName: String,
logAndThreadNamePrefix : String,
nodeName: String = "broker" nodeName: String = "broker"
) extends Logging { ) extends Logging {
private val metricsGroup = new KafkaMetricsGroup(this.getClass) private val metricsGroup = new KafkaMetricsGroup(this.getClass)
@ -208,7 +207,7 @@ class KafkaRequestHandlerPool(
/* a meter to track the average free capacity of the request handlers */ /* a meter to track the average free capacity of the request handlers */
private val aggregateIdleMeter = metricsGroup.newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS) private val aggregateIdleMeter = metricsGroup.newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS)
this.logIdent = s"[$logAndThreadNamePrefix Kafka Request Handler on ${nodeName.capitalize} $brokerId] " this.logIdent = s"[data-plane Kafka Request Handler on ${nodeName.capitalize} $brokerId] "
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads) val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
for (i <- 0 until numThreads) { for (i <- 0 until numThreads) {
createHandler(i) createHandler(i)
@ -216,7 +215,7 @@ class KafkaRequestHandlerPool(
def createHandler(id: Int): Unit = synchronized { def createHandler(id: Int): Unit = synchronized {
runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time, nodeName) runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time, nodeName)
KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start() KafkaThread.daemon("data-plane-kafka-request-handler-" + id, runnables(id)).start()
} }
def resizeThreadPool(newSize: Int): Unit = synchronized { def resizeThreadPool(newSize: Int): Unit = synchronized {

View File

@ -21,7 +21,7 @@ import java.net.InetSocketAddress
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingDeque, TimeUnit} import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingDeque, TimeUnit}
import joptsimple.{OptionException, OptionSpec} import joptsimple.{OptionException, OptionSpec}
import kafka.network.{DataPlaneAcceptor, SocketServer} import kafka.network.SocketServer
import kafka.raft.{DefaultExternalKRaftMetrics, KafkaRaftManager, RaftManager} import kafka.raft.{DefaultExternalKRaftMetrics, KafkaRaftManager, RaftManager}
import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, SimpleApiVersionManager} import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, SimpleApiVersionManager}
import kafka.utils.{CoreUtils, Logging} import kafka.utils.{CoreUtils, Logging}
@ -130,8 +130,7 @@ class TestRaftServer(
requestHandler, requestHandler,
time, time,
config.numIoThreads, config.numIoThreads,
s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent", "RequestHandlerAvgIdlePercent"
DataPlaneAcceptor.ThreadPrefix
) )
workloadGenerator.start() workloadGenerator.start()

View File

@ -57,7 +57,7 @@ class KafkaRequestHandlerTest {
val time = new MockTime() val time = new MockTime()
val startTime = time.nanoseconds() val startTime = time.nanoseconds()
val metrics = new RequestChannelMetrics(Collections.emptySet[ApiKeys]) val metrics = new RequestChannelMetrics(Collections.emptySet[ApiKeys])
val requestChannel = new RequestChannel(10, "", time, metrics) val requestChannel = new RequestChannel(10, time, metrics)
val apiHandler = mock(classOf[ApiRequestHandler]) val apiHandler = mock(classOf[ApiRequestHandler])
try { try {
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time) val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time)
@ -95,7 +95,7 @@ class KafkaRequestHandlerTest {
val time = new MockTime() val time = new MockTime()
val metrics = mock(classOf[RequestChannelMetrics]) val metrics = mock(classOf[RequestChannelMetrics])
val apiHandler = mock(classOf[ApiRequestHandler]) val apiHandler = mock(classOf[ApiRequestHandler])
val requestChannel = new RequestChannel(10, "", time, metrics) val requestChannel = new RequestChannel(10, time, metrics)
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time) val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time)
var handledCount = 0 var handledCount = 0
@ -131,7 +131,7 @@ class KafkaRequestHandlerTest {
val time = new MockTime() val time = new MockTime()
val metrics = mock(classOf[RequestChannelMetrics]) val metrics = mock(classOf[RequestChannelMetrics])
val apiHandler = mock(classOf[ApiRequestHandler]) val apiHandler = mock(classOf[ApiRequestHandler])
val requestChannel = new RequestChannel(10, "", time, metrics) val requestChannel = new RequestChannel(10, time, metrics)
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time) val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time)
val originalRequestLocal = mock(classOf[RequestLocal]) val originalRequestLocal = mock(classOf[RequestLocal])
@ -165,7 +165,7 @@ class KafkaRequestHandlerTest {
val time = new MockTime() val time = new MockTime()
val metrics = mock(classOf[RequestChannelMetrics]) val metrics = mock(classOf[RequestChannelMetrics])
val apiHandler = mock(classOf[ApiRequestHandler]) val apiHandler = mock(classOf[ApiRequestHandler])
val requestChannel = new RequestChannel(10, "", time, metrics) val requestChannel = new RequestChannel(10, time, metrics)
val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time) val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time)
val originalRequestLocal = mock(classOf[RequestLocal]) val originalRequestLocal = mock(classOf[RequestLocal])