KAFKA-5461; Add metric to track global topic count and global parition count in a cluster

Author: Abhishek Mendhekar <amendhekar@linkedin.com>

Reviewers: Joel Koshy <jjkoshy.w@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #3549 from abhishekmendhekar/KAFKA-5461
This commit is contained in:
Abhishek Mendhekar 2017-08-04 11:23:43 +01:00 committed by Ismael Juma
parent 5a516fb28e
commit e7f7d40939
3 changed files with 41 additions and 6 deletions

4
.gitignore vendored
View File

@ -23,8 +23,8 @@ TAGS
*.iml *.iml
.project .project
.settings .settings
kafka.ipr *.ipr
kafka.iws *.iws
.vagrant .vagrant
Vagrantfile.local Vagrantfile.local
/logs /logs

View File

@ -188,6 +188,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
@volatile private var activeControllerId = -1 @volatile private var activeControllerId = -1
@volatile private var offlinePartitionCount = 0 @volatile private var offlinePartitionCount = 0
@volatile private var preferredReplicaImbalanceCount = 0 @volatile private var preferredReplicaImbalanceCount = 0
@volatile private var globalTopicCount = 0
@volatile private var globalPartitionCount = 0
newGauge( newGauge(
"ActiveControllerCount", "ActiveControllerCount",
@ -217,6 +219,20 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
} }
) )
newGauge(
"GlobalTopicCount",
new Gauge[Int] {
def value: Int = globalTopicCount
}
)
newGauge(
"GlobalPartitionCount",
new Gauge[Int] {
def value: Int = globalPartitionCount
}
)
def epoch: Int = controllerContext.epoch def epoch: Int = controllerContext.epoch
def state: ControllerState = eventManager.state def state: ControllerState = eventManager.state
@ -319,6 +335,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
kafkaScheduler.shutdown() kafkaScheduler.shutdown()
offlinePartitionCount = 0 offlinePartitionCount = 0
preferredReplicaImbalanceCount = 0 preferredReplicaImbalanceCount = 0
globalTopicCount = 0
globalPartitionCount = 0
// de-register partition ISR listener for on-going partition reassignment task // de-register partition ISR listener for on-going partition reassignment task
deregisterPartitionReassignmentIsrChangeListeners() deregisterPartitionReassignmentIsrChangeListeners()
@ -1593,8 +1611,9 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
private def updateMetrics(): Unit = { private def updateMetrics(): Unit = {
offlinePartitionCount = offlinePartitionCount =
if (!isActive) 0 if (!isActive) {
else { 0
} else {
controllerContext.partitionLeadershipInfo.count { case (tp, leadershipInfo) => controllerContext.partitionLeadershipInfo.count { case (tp, leadershipInfo) =>
!controllerContext.liveOrShuttingDownBrokerIds.contains(leadershipInfo.leaderAndIsr.leader) && !controllerContext.liveOrShuttingDownBrokerIds.contains(leadershipInfo.leaderAndIsr.leader) &&
!topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)
@ -1602,8 +1621,9 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
} }
preferredReplicaImbalanceCount = preferredReplicaImbalanceCount =
if (!isActive) 0 if (!isActive) {
else { 0
} else {
controllerContext.partitionReplicaAssignment.count { case (topicPartition, replicas) => controllerContext.partitionReplicaAssignment.count { case (topicPartition, replicas) =>
val preferredReplica = replicas.head val preferredReplica = replicas.head
val leadershipInfo = controllerContext.partitionLeadershipInfo.get(topicPartition) val leadershipInfo = controllerContext.partitionLeadershipInfo.get(topicPartition)
@ -1611,6 +1631,10 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
!topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic) !topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic)
} }
} }
globalTopicCount = if (!isActive) 0 else controllerContext.allTopics.size
globalPartitionCount = if (!isActive) 0 else controllerContext.partitionLeadershipInfo.size
} }
// visible for testing // visible for testing

View File

@ -156,6 +156,17 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
assertTrue(meterCount(bytesOut) > initialBytesOut) assertTrue(meterCount(bytesOut) > initialBytesOut)
} }
@Test
def testControllerMetrics(): Unit = {
val metrics = Metrics.defaultRegistry.allMetrics
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=ActiveControllerCount"), 1)
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=OfflinePartitionsCount"), 1)
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount"), 1)
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=GlobalTopicCount"), 1)
assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=GlobalPartitionCount"), 1)
}
private def meterCount(metricName: String): Long = { private def meterCount(metricName: String): Long = {
Metrics.defaultRegistry.allMetrics.asScala Metrics.defaultRegistry.allMetrics.asScala
.filterKeys(_.getMBeanName.endsWith(metricName)) .filterKeys(_.getMBeanName.endsWith(metricName))