From e7f7d4093968d8de494e371a6d3c85e555332cbb Mon Sep 17 00:00:00 2001 From: Abhishek Mendhekar Date: Fri, 4 Aug 2017 11:23:43 +0100 Subject: [PATCH] KAFKA-5461; Add metric to track global topic count and global parition count in a cluster Author: Abhishek Mendhekar Reviewers: Joel Koshy , Ismael Juma Closes #3549 from abhishekmendhekar/KAFKA-5461 --- .gitignore | 4 +-- .../kafka/controller/KafkaController.scala | 32 ++++++++++++++++--- .../unit/kafka/metrics/MetricsTest.scala | 11 +++++++ 3 files changed, 41 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index e12082e82b8..60883492f31 100644 --- a/.gitignore +++ b/.gitignore @@ -23,8 +23,8 @@ TAGS *.iml .project .settings -kafka.ipr -kafka.iws +*.ipr +*.iws .vagrant Vagrantfile.local /logs diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 0a6aac03fe6..1de04d8bd24 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -188,6 +188,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met @volatile private var activeControllerId = -1 @volatile private var offlinePartitionCount = 0 @volatile private var preferredReplicaImbalanceCount = 0 + @volatile private var globalTopicCount = 0 + @volatile private var globalPartitionCount = 0 newGauge( "ActiveControllerCount", @@ -217,6 +219,20 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met } ) + newGauge( + "GlobalTopicCount", + new Gauge[Int] { + def value: Int = globalTopicCount + } + ) + + newGauge( + "GlobalPartitionCount", + new Gauge[Int] { + def value: Int = globalPartitionCount + } + ) + def epoch: Int = controllerContext.epoch def state: ControllerState = eventManager.state @@ -319,6 +335,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met kafkaScheduler.shutdown() offlinePartitionCount = 0 preferredReplicaImbalanceCount = 0 + globalTopicCount = 0 + globalPartitionCount = 0 // de-register partition ISR listener for on-going partition reassignment task deregisterPartitionReassignmentIsrChangeListeners() @@ -1593,8 +1611,9 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met private def updateMetrics(): Unit = { offlinePartitionCount = - if (!isActive) 0 - else { + if (!isActive) { + 0 + } else { controllerContext.partitionLeadershipInfo.count { case (tp, leadershipInfo) => !controllerContext.liveOrShuttingDownBrokerIds.contains(leadershipInfo.leaderAndIsr.leader) && !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) @@ -1602,8 +1621,9 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met } preferredReplicaImbalanceCount = - if (!isActive) 0 - else { + if (!isActive) { + 0 + } else { controllerContext.partitionReplicaAssignment.count { case (topicPartition, replicas) => val preferredReplica = replicas.head val leadershipInfo = controllerContext.partitionLeadershipInfo.get(topicPartition) @@ -1611,6 +1631,10 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met !topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic) } } + + globalTopicCount = if (!isActive) 0 else controllerContext.allTopics.size + + globalPartitionCount = if (!isActive) 0 else controllerContext.partitionLeadershipInfo.size } // visible for testing diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala index e32f4294f11..bff21360c8c 100644 --- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala @@ -156,6 +156,17 @@ class MetricsTest extends KafkaServerTestHarness with Logging { assertTrue(meterCount(bytesOut) > initialBytesOut) } + @Test + def testControllerMetrics(): Unit = { + val metrics = Metrics.defaultRegistry.allMetrics + + assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=ActiveControllerCount"), 1) + assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=OfflinePartitionsCount"), 1) + assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount"), 1) + assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=GlobalTopicCount"), 1) + assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=GlobalPartitionCount"), 1) + } + private def meterCount(metricName: String): Long = { Metrics.defaultRegistry.allMetrics.asScala .filterKeys(_.getMBeanName.endsWith(metricName))