mirror of https://github.com/apache/kafka.git
KAFKA-5341; Add UnderMinIsrPartitionCount and per-partition UnderMinIsr metrics (KIP-164)
Author: Dong Lin <lindong28@gmail.com> Reviewers: Ewen Cheslack-Postava <ewen@confluent.io> Closes #3583 from lindong28/KAFKA-5341
This commit is contained in:
parent
4059fa5763
commit
4003c9384b
|
@ -94,6 +94,15 @@ class Partition(val topic: String,
|
|||
tags
|
||||
)
|
||||
|
||||
newGauge("UnderMinIsr",
|
||||
new Gauge[Int] {
|
||||
def value = {
|
||||
if (isUnderMinIsr) 1 else 0
|
||||
}
|
||||
},
|
||||
tags
|
||||
)
|
||||
|
||||
newGauge("ReplicasCount",
|
||||
new Gauge[Int] {
|
||||
def value = {
|
||||
|
@ -120,6 +129,15 @@ class Partition(val topic: String,
|
|||
def isUnderReplicated: Boolean =
|
||||
isLeaderReplicaLocal && inSyncReplicas.size < assignedReplicas.size
|
||||
|
||||
def isUnderMinIsr: Boolean = {
|
||||
leaderReplicaIfLocal match {
|
||||
case Some(leaderReplica) =>
|
||||
inSyncReplicas.size < leaderReplica.log.get.config.minInSyncReplicas
|
||||
case None =>
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
def getOrCreateReplica(replicaId: Int = localBrokerId, isNew: Boolean = false): Replica = {
|
||||
assignedReplicaMap.getAndMaybePut(replicaId, {
|
||||
if (isReplicaLocal(replicaId)) {
|
||||
|
@ -559,6 +577,7 @@ class Partition(val topic: String,
|
|||
*/
|
||||
def removePartitionMetrics() {
|
||||
removeMetric("UnderReplicated", tags)
|
||||
removeMetric("UnderMinIsr", tags)
|
||||
removeMetric("InSyncReplicasCount", tags)
|
||||
removeMetric("ReplicasCount", tags)
|
||||
removeMetric("LastStableOffsetLag", tags)
|
||||
|
|
|
@ -222,6 +222,14 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
def value = underReplicatedPartitionCount
|
||||
}
|
||||
)
|
||||
|
||||
val underMinIsrPartitionCount = newGauge(
|
||||
"UnderMinIsrPartitionCount",
|
||||
new Gauge[Int] {
|
||||
def value = getLeaderPartitions.count(_.isUnderMinIsr)
|
||||
}
|
||||
)
|
||||
|
||||
val isrExpandRate = newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS)
|
||||
val isrShrinkRate = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS)
|
||||
val failedIsrUpdatesRate = newMeter("FailedIsrUpdatesPerSec", "failedUpdates", TimeUnit.SECONDS)
|
||||
|
@ -1266,6 +1274,7 @@ class ReplicaManager(val config: KafkaConfig,
|
|||
removeMetric("PartitionCount")
|
||||
removeMetric("OfflineReplicaCount")
|
||||
removeMetric("UnderReplicatedPartitions")
|
||||
removeMetric("UnderMinIsrPartitionCount")
|
||||
}
|
||||
|
||||
// High watermark do not need to be checkpointed only when under unit tests
|
||||
|
|
|
@ -728,6 +728,16 @@
|
|||
<td>kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions</td>
|
||||
<td>0</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td># of under minIsr partitions (|ISR| < min.insync.replicas)</td>
|
||||
<td>kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount</td>
|
||||
<td>0</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td># of offline log directories</td>
|
||||
<td>kafka.log:type=LogManager,name=OfflineLogDirectoryCount</td>
|
||||
<td>0</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>Is controller active on broker</td>
|
||||
<td>kafka.controller:type=KafkaController,name=ActiveControllerCount</td>
|
||||
|
|
Loading…
Reference in New Issue