diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 646671fc0cf..a0c050ce377 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -24,6 +24,7 @@ import kafka.log.remote.RemoteLogManager import kafka.log.{LogManager, UnifiedLog} import kafka.server.HostedPartition.Online import kafka.server.QuotaFactory.QuotaManagers +import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName} import kafka.server.ReplicaManager.createLogReadResult import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints} import kafka.server.metadata.ZkMetadataCache @@ -177,6 +178,39 @@ object HostedPartition { object ReplicaManager { val HighWatermarkFilename = "replication-offset-checkpoint" + private val LeaderCountMetricName = "LeaderCount" + private val PartitionCountMetricName = "PartitionCount" + private val OfflineReplicaCountMetricName = "OfflineReplicaCount" + private val UnderReplicatedPartitionsMetricName = "UnderReplicatedPartitions" + private val UnderMinIsrPartitionCountMetricName = "UnderMinIsrPartitionCount" + private val AtMinIsrPartitionCountMetricName = "AtMinIsrPartitionCount" + private val ReassigningPartitionsMetricName = "ReassigningPartitions" + private val PartitionsWithLateTransactionsCountMetricName = "PartitionsWithLateTransactionsCount" + private val ProducerIdCountMetricName = "ProducerIdCount" + private val IsrExpandsPerSecMetricName = "IsrExpandsPerSec" + private val IsrShrinksPerSecMetricName = "IsrShrinksPerSec" + private val FailedIsrUpdatesPerSecMetricName = "FailedIsrUpdatesPerSec" + + private[server] val GaugeMetricNames = Set( + LeaderCountMetricName, + PartitionCountMetricName, + OfflineReplicaCountMetricName, + UnderReplicatedPartitionsMetricName, + UnderMinIsrPartitionCountMetricName, + AtMinIsrPartitionCountMetricName, + ReassigningPartitionsMetricName, + PartitionsWithLateTransactionsCountMetricName, + ProducerIdCountMetricName + ) + + private[server] val MeterMetricNames = Set( + IsrExpandsPerSecMetricName, + IsrShrinksPerSecMetricName, + FailedIsrUpdatesPerSecMetricName + ) + + private[server] val MetricNames = GaugeMetricNames.union(MeterMetricNames) + def createLogReadResult(highWatermark: Long, leaderLogStartOffset: Long, leaderLogEndOffset: Long, @@ -282,16 +316,16 @@ class ReplicaManager(val config: KafkaConfig, // Visible for testing private[server] val replicaSelectorOpt: Option[ReplicaSelector] = createReplicaSelector() - metricsGroup.newGauge("LeaderCount", () => leaderPartitionsIterator.size) + metricsGroup.newGauge(LeaderCountMetricName, () => leaderPartitionsIterator.size) // Visible for testing - private[kafka] val partitionCount = metricsGroup.newGauge("PartitionCount", () => allPartitions.size) - metricsGroup.newGauge("OfflineReplicaCount", () => offlinePartitionCount) - metricsGroup.newGauge("UnderReplicatedPartitions", () => underReplicatedPartitionCount) - metricsGroup.newGauge("UnderMinIsrPartitionCount", () => leaderPartitionsIterator.count(_.isUnderMinIsr)) - metricsGroup.newGauge("AtMinIsrPartitionCount", () => leaderPartitionsIterator.count(_.isAtMinIsr)) - metricsGroup.newGauge("ReassigningPartitions", () => reassigningPartitionsCount) - metricsGroup.newGauge("PartitionsWithLateTransactionsCount", () => lateTransactionsCount) - metricsGroup.newGauge("ProducerIdCount", () => producerIdCount) + private[kafka] val partitionCount = metricsGroup.newGauge(PartitionCountMetricName, () => allPartitions.size) + metricsGroup.newGauge(OfflineReplicaCountMetricName, () => offlinePartitionCount) + metricsGroup.newGauge(UnderReplicatedPartitionsMetricName, () => underReplicatedPartitionCount) + metricsGroup.newGauge(UnderMinIsrPartitionCountMetricName, () => leaderPartitionsIterator.count(_.isUnderMinIsr)) + metricsGroup.newGauge(AtMinIsrPartitionCountMetricName, () => leaderPartitionsIterator.count(_.isAtMinIsr)) + metricsGroup.newGauge(ReassigningPartitionsMetricName, () => reassigningPartitionsCount) + metricsGroup.newGauge(PartitionsWithLateTransactionsCountMetricName, () => lateTransactionsCount) + metricsGroup.newGauge(ProducerIdCountMetricName, () => producerIdCount) def reassigningPartitionsCount: Int = leaderPartitionsIterator.count(_.isReassigning) @@ -302,9 +336,9 @@ class ReplicaManager(val config: KafkaConfig, def producerIdCount: Int = onlinePartitionsIterator.map(_.producerIdCount).sum - val isrExpandRate: Meter = metricsGroup.newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS) - val isrShrinkRate: Meter = metricsGroup.newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS) - val failedIsrUpdatesRate: Meter = metricsGroup.newMeter("FailedIsrUpdatesPerSec", "failedUpdates", TimeUnit.SECONDS) + val isrExpandRate: Meter = metricsGroup.newMeter(IsrExpandsPerSecMetricName, "expands", TimeUnit.SECONDS) + val isrShrinkRate: Meter = metricsGroup.newMeter(IsrShrinksPerSecMetricName, "shrinks", TimeUnit.SECONDS) + val failedIsrUpdatesRate: Meter = metricsGroup.newMeter(FailedIsrUpdatesPerSecMetricName, "failedUpdates", TimeUnit.SECONDS) def underReplicatedPartitionCount: Int = leaderPartitionsIterator.count(_.isUnderReplicated) @@ -2160,18 +2194,7 @@ class ReplicaManager(val config: KafkaConfig, } def removeMetrics(): Unit = { - metricsGroup.removeMetric("LeaderCount") - metricsGroup.removeMetric("PartitionCount") - metricsGroup.removeMetric("OfflineReplicaCount") - metricsGroup.removeMetric("UnderReplicatedPartitions") - metricsGroup.removeMetric("UnderMinIsrPartitionCount") - metricsGroup.removeMetric("AtMinIsrPartitionCount") - metricsGroup.removeMetric("ReassigningPartitions") - metricsGroup.removeMetric("PartitionsWithLateTransactionsCount") - metricsGroup.removeMetric("ProducerIdCount") - metricsGroup.removeMetric("IsrExpandsPerSec") - metricsGroup.removeMetric("IsrShrinksPerSec") - metricsGroup.removeMetric("FailedIsrUpdatesPerSec") + ReplicaManager.MetricNames.foreach(metricsGroup.removeMetric) } def beginControlledShutdown(): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index ca93339b064..767709997c8 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -372,9 +372,9 @@ class ReplicaManagerTest { // Use the second instance of metrics group that is constructed. The first instance is constructed by // ReplicaManager constructor > BrokerTopicStats > BrokerTopicMetrics. val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(1) - verify(mockMetricsGroup, times(9)).newGauge(anyString(), any()) - verify(mockMetricsGroup, times(3)).newMeter(anyString(), anyString(), any(classOf[TimeUnit])) - verify(mockMetricsGroup, times(12)).removeMetric(anyString()) + ReplicaManager.GaugeMetricNames.foreach(metricName => verify(mockMetricsGroup).newGauge(ArgumentMatchers.eq(metricName), any())) + ReplicaManager.MeterMetricNames.foreach(metricName => verify(mockMetricsGroup).newMeter(ArgumentMatchers.eq(metricName), anyString(), any(classOf[TimeUnit]))) + ReplicaManager.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_)) // assert that we have verified all invocations on verifyNoMoreInteractions(mockMetricsGroup)