From c8fe551139ca106165c0c0b42f6c7b1db03f490d Mon Sep 17 00:00:00 2001 From: Parker Chang Date: Sat, 12 Apr 2025 23:22:40 +0800 Subject: [PATCH] KAFKA-19030 Remove metricNamePrefix from RequestChannel (#19374) As described in the JIRA ticket, `controlPlaneRequestChannelOpt` was removed from KRaft mode, so there's no need to use the metrics prefix anymore. This change removes `metricNamePrefix` from RequestChannel and the related files. It also removes `DataPlaneAcceptor#MetricPrefix`, since `DataPlaneAcceptor` is the only implementation of `Acceptor`. Since the implementation of KIP-291 is essentially removed, we can also remove `logAndThreadNamePrefix` and `DataPlaneAcceptor#ThreadPrefix`. Reviewers: PoAn Yang , Ken Huang , Chia-Ping Tsai --- .../scala/kafka/network/RequestChannel.scala | 11 ++++------ .../scala/kafka/network/SocketServer.scala | 20 ++++++------------- .../scala/kafka/server/BrokerServer.scala | 5 ++--- .../scala/kafka/server/ControllerServer.scala | 5 ++--- .../kafka/server/KafkaRequestHandler.scala | 5 ++--- .../scala/kafka/tools/TestRaftServer.scala | 5 ++--- .../server/KafkaRequestHandlerTest.scala | 8 ++++---- 7 files changed, 22 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index a16e03a1916..dfb96ef8e11 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -340,7 +340,6 @@ object RequestChannel extends Logging { } class RequestChannel(val queueSize: Int, - val metricNamePrefix: String, time: Time, val metrics: RequestChannelMetrics) { import RequestChannel._ @@ -349,13 +348,11 @@ class RequestChannel(val queueSize: Int, private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize) private val processors = new ConcurrentHashMap[Int, Processor]() - private val requestQueueSizeMetricName = metricNamePrefix.concat(RequestQueueSizeMetric) - private val responseQueueSizeMetricName = metricNamePrefix.concat(ResponseQueueSizeMetric) private val callbackQueue = new ArrayBlockingQueue[BaseRequest](queueSize) - metricsGroup.newGauge(requestQueueSizeMetricName, () => requestQueue.size) + metricsGroup.newGauge(RequestQueueSizeMetric, () => requestQueue.size) - metricsGroup.newGauge(responseQueueSizeMetricName, () => { + metricsGroup.newGauge(ResponseQueueSizeMetric, () => { processors.values.asScala.foldLeft(0) {(total, processor) => total + processor.responseQueueSize } @@ -365,13 +362,13 @@ class RequestChannel(val queueSize: Int, if (processors.putIfAbsent(processor.id, processor) != null) warn(s"Unexpected processor with processorId ${processor.id}") - metricsGroup.newGauge(responseQueueSizeMetricName, () => processor.responseQueueSize, + metricsGroup.newGauge(ResponseQueueSizeMetric, () => processor.responseQueueSize, Map(ProcessorMetricTag -> processor.id.toString).asJava) } def removeProcessor(processorId: Int): Unit = { processors.remove(processorId) - metricsGroup.removeMetric(responseQueueSizeMetricName, Map(ProcessorMetricTag -> processorId.toString).asJava) + metricsGroup.removeMetric(ResponseQueueSizeMetric, Map(ProcessorMetricTag -> processorId.toString).asJava) } /** Send a request to be handled, potentially blocking until there is room in the queue for the request */ diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index c520b17fa06..79cd0bc8ce2 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -97,7 +97,7 @@ class SocketServer( private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE // data-plane private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, DataPlaneAcceptor]() - val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics) + val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, time, apiVersionManager.newRequestMetrics) private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0) val connectionQuotas = new ConnectionQuotas(config, time, metrics) @@ -113,7 +113,7 @@ class SocketServer( private var stopped = false // Socket server metrics - metricsGroup.newGauge(s"${DataPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () => SocketServer.this.synchronized { + metricsGroup.newGauge(s"NetworkProcessorAvgIdlePercent", () => SocketServer.this.synchronized { val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => a.processors) val ioWaitRatioMetricNames = dataPlaneProcessors.map { p => metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags) @@ -129,7 +129,7 @@ class SocketServer( metricsGroup.newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory) metricsGroup.newGauge("MemoryPoolUsed", () => memoryPool.size() - memoryPool.availableMemory) - metricsGroup.newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", () => SocketServer.this.synchronized { + metricsGroup.newGauge(s"ExpiredConnectionsKilledCount", () => SocketServer.this.synchronized { val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => a.processors) val expiredConnectionsKilledCountMetricNames = dataPlaneProcessors.map { p => metrics.metricName("expired-connections-killed-count", MetricsGroup, p.metricTags) @@ -370,8 +370,6 @@ object SocketServer { } object DataPlaneAcceptor { - val ThreadPrefix: String = "data-plane" - val MetricPrefix: String = "" val ListenerReconfigurableConfigs: Set[String] = Set(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG) } @@ -402,9 +400,6 @@ class DataPlaneAcceptor(socketServer: SocketServer, memoryPool, apiVersionManager) with ListenerReconfigurable { - override def metricPrefix(): String = DataPlaneAcceptor.MetricPrefix - override def threadPrefix(): String = DataPlaneAcceptor.ThreadPrefix - /** * Returns the listener name associated with this reconfigurable. Listener-specific * configs corresponding to this listener name are provided for reconfiguration. @@ -495,9 +490,6 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer, val shouldRun = new AtomicBoolean(true) - def metricPrefix(): String - def threadPrefix(): String - private val sendBufferSize = config.socketSendBufferBytes private val recvBufferSize = config.socketReceiveBufferBytes private val listenBacklogSize = config.socketListenBacklogSize @@ -522,7 +514,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer, // Build the metric name explicitly in order to keep the existing name for compatibility private val backwardCompatibilityMetricGroup = new KafkaMetricsGroup("kafka.network", "Acceptor") private val blockedPercentMeterMetricName = backwardCompatibilityMetricGroup.metricName( - s"${metricPrefix()}AcceptorBlockedPercent", + "AcceptorBlockedPercent", Map(ListenerMetricTag -> endPoint.listenerName.value).asJava) private val blockedPercentMeter = metricsGroup.newMeter(blockedPercentMeterMetricName,"blocked time", TimeUnit.NANOSECONDS) private var currentProcessorIndex = 0 @@ -531,7 +523,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer, private[network] val startedFuture = new CompletableFuture[Void]() val thread: KafkaThread = KafkaThread.nonDaemon( - s"${threadPrefix()}-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}", + s"data-plane-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}", this) def start(): Unit = synchronized { @@ -769,7 +761,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer, listenerName: ListenerName, securityProtocol: SecurityProtocol, connectionDisconnectListeners: Seq[ConnectionDisconnectListener]): Processor = { - val name = s"${threadPrefix()}-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-$id" + val name = s"data-plane-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-$id" new Processor(id, time, config.socketRequestMaxBytes, diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 31ebc14c960..5f52e6abdcb 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -21,7 +21,7 @@ import kafka.coordinator.group.{CoordinatorLoaderImpl, CoordinatorPartitionWrite import kafka.coordinator.transaction.TransactionCoordinator import kafka.log.LogManager import kafka.log.remote.RemoteLogManager -import kafka.network.{DataPlaneAcceptor, SocketServer} +import kafka.network.SocketServer import kafka.raft.KafkaRaftManager import kafka.server.metadata._ import kafka.server.share.{ShareCoordinatorMetadataCacheHelperImpl, SharePartitionManager} @@ -470,8 +470,7 @@ class BrokerServer( dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, - config.numIoThreads, s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent", - DataPlaneAcceptor.ThreadPrefix) + config.numIoThreads, "RequestHandlerAvgIdlePercent") // Start RemoteLogManager before initializing broker metadata publishers. remoteLogManagerOpt.foreach { rlm => diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index e7537878ca6..3ddf97d2705 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -17,7 +17,7 @@ package kafka.server -import kafka.network.{DataPlaneAcceptor, SocketServer} +import kafka.network.SocketServer import kafka.raft.KafkaRaftManager import kafka.server.QuotaFactory.QuotaManagers @@ -285,8 +285,7 @@ class ControllerServer( controllerApis, time, config.numIoThreads, - s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent", - DataPlaneAcceptor.ThreadPrefix, + "RequestHandlerAvgIdlePercent", "controller") // Set up the metadata cache publisher. diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index fa251e27529..815fe4966eb 100755 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -199,7 +199,6 @@ class KafkaRequestHandlerPool( time: Time, numThreads: Int, requestHandlerAvgIdleMetricName: String, - logAndThreadNamePrefix : String, nodeName: String = "broker" ) extends Logging { private val metricsGroup = new KafkaMetricsGroup(this.getClass) @@ -208,7 +207,7 @@ class KafkaRequestHandlerPool( /* a meter to track the average free capacity of the request handlers */ private val aggregateIdleMeter = metricsGroup.newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS) - this.logIdent = s"[$logAndThreadNamePrefix Kafka Request Handler on ${nodeName.capitalize} $brokerId] " + this.logIdent = s"[data-plane Kafka Request Handler on ${nodeName.capitalize} $brokerId] " val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads) for (i <- 0 until numThreads) { createHandler(i) @@ -216,7 +215,7 @@ class KafkaRequestHandlerPool( def createHandler(id: Int): Unit = synchronized { runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time, nodeName) - KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start() + KafkaThread.daemon("data-plane-kafka-request-handler-" + id, runnables(id)).start() } def resizeThreadPool(newSize: Int): Unit = synchronized { diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala index 418a276bbd1..69d296fe467 100644 --- a/core/src/main/scala/kafka/tools/TestRaftServer.scala +++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala @@ -21,7 +21,7 @@ import java.net.InetSocketAddress import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} import java.util.concurrent.{CompletableFuture, CountDownLatch, LinkedBlockingDeque, TimeUnit} import joptsimple.{OptionException, OptionSpec} -import kafka.network.{DataPlaneAcceptor, SocketServer} +import kafka.network.SocketServer import kafka.raft.{DefaultExternalKRaftMetrics, KafkaRaftManager, RaftManager} import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, SimpleApiVersionManager} import kafka.utils.{CoreUtils, Logging} @@ -130,8 +130,7 @@ class TestRaftServer( requestHandler, time, config.numIoThreads, - s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent", - DataPlaneAcceptor.ThreadPrefix + "RequestHandlerAvgIdlePercent" ) workloadGenerator.start() diff --git a/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala b/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala index 495ad0b1c00..6e8efa28b86 100644 --- a/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala +++ b/core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala @@ -57,7 +57,7 @@ class KafkaRequestHandlerTest { val time = new MockTime() val startTime = time.nanoseconds() val metrics = new RequestChannelMetrics(Collections.emptySet[ApiKeys]) - val requestChannel = new RequestChannel(10, "", time, metrics) + val requestChannel = new RequestChannel(10, time, metrics) val apiHandler = mock(classOf[ApiRequestHandler]) try { val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time) @@ -95,7 +95,7 @@ class KafkaRequestHandlerTest { val time = new MockTime() val metrics = mock(classOf[RequestChannelMetrics]) val apiHandler = mock(classOf[ApiRequestHandler]) - val requestChannel = new RequestChannel(10, "", time, metrics) + val requestChannel = new RequestChannel(10, time, metrics) val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time) var handledCount = 0 @@ -131,7 +131,7 @@ class KafkaRequestHandlerTest { val time = new MockTime() val metrics = mock(classOf[RequestChannelMetrics]) val apiHandler = mock(classOf[ApiRequestHandler]) - val requestChannel = new RequestChannel(10, "", time, metrics) + val requestChannel = new RequestChannel(10, time, metrics) val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time) val originalRequestLocal = mock(classOf[RequestLocal]) @@ -165,7 +165,7 @@ class KafkaRequestHandlerTest { val time = new MockTime() val metrics = mock(classOf[RequestChannelMetrics]) val apiHandler = mock(classOf[ApiRequestHandler]) - val requestChannel = new RequestChannel(10, "", time, metrics) + val requestChannel = new RequestChannel(10, time, metrics) val handler = new KafkaRequestHandler(0, 0, mock(classOf[Meter]), new AtomicInteger(1), requestChannel, apiHandler, time) val originalRequestLocal = mock(classOf[RequestLocal])