KAFKA-7904; Add AtMinIsr partition metric and TopicCommand option (KIP-427)

- Add `AtMinIsrPartitionCount` metric to `ReplicaManager`
- Add `AtMinIsr` metric to `Partition`
- Add `--at-min-isr-partitions` describe `TopicCommand` option

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103089398

Author: Kevin Lu <lu.kevin@berkeley.edu>
Author: lu.kevin@berkeley.edu <kelu@paypal.com>

Reviewers: Gwen Shapira

Closes #6421 from KevinLiLu/KAFKA-7904
This commit is contained in:
Kevin Lu 2019-04-05 09:14:41 -07:00 committed by Gwen Shapira
parent a674ded0b3
commit 31d191fc85
6 changed files with 95 additions and 7 deletions

View File

@ -104,7 +104,7 @@ object TopicCommand extends Logging {
describeConfigs: Boolean)
class DescribeOptions(opts: TopicCommandOptions, liveBrokers: Set[Int]) {
val describeConfigs: Boolean = !opts.reportUnavailablePartitions && !opts.reportUnderReplicatedPartitions && !opts.reportUnderMinIsrPartitions
val describeConfigs: Boolean = !opts.reportUnavailablePartitions && !opts.reportUnderReplicatedPartitions && !opts.reportUnderMinIsrPartitions && !opts.reportAtMinIsrPartitions
val describePartitions: Boolean = !opts.reportOverriddenConfigs
private def hasUnderReplicatedPartitions(partitionDescription: PartitionDescription) = {
partitionDescription.isr.size < partitionDescription.assignedReplicas.size
@ -121,15 +121,22 @@ object TopicCommand extends Logging {
private def hasUnderMinIsrPartitions(partitionDescription: PartitionDescription) = {
partitionDescription.isr.size < partitionDescription.minIsrCount
}
private def hasAtMinIsrPartitions(partitionDescription: PartitionDescription) = {
partitionDescription.isr.size == partitionDescription.minIsrCount
}
private def shouldPrintUnderMinIsrPartitions(partitionDescription: PartitionDescription) = {
opts.reportUnderMinIsrPartitions && hasUnderMinIsrPartitions(partitionDescription)
}
private def shouldPrintAtMinIsrPartitions(partitionDescription: PartitionDescription) = {
opts.reportAtMinIsrPartitions && hasAtMinIsrPartitions(partitionDescription)
}
def shouldPrintTopicPartition(partitionDesc: PartitionDescription): Boolean = {
describeConfigs ||
shouldPrintUnderReplicatedPartitions(partitionDesc) ||
shouldPrintUnavailablePartitions(partitionDesc) ||
shouldPrintUnderMinIsrPartitions(partitionDesc)
shouldPrintUnderMinIsrPartitions(partitionDesc) ||
shouldPrintAtMinIsrPartitions(partitionDesc)
}
}
@ -228,7 +235,7 @@ object TopicCommand extends Logging {
}
}
if (describeOptions.describePartitions) {
val computedMinIsrCount = if (opts.reportUnderMinIsrPartitions)
val computedMinIsrCount = if (opts.reportUnderMinIsrPartitions || opts.reportAtMinIsrPartitions)
allConfigs.get(new ConfigResource(ConfigResource.Type.TOPIC, td.name())).get().get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).value().toInt else 0
for (partition <- sortedPartitions) {
val partitionDesc = PartitionDescription(
@ -550,6 +557,8 @@ object TopicCommand extends Logging {
"if set when describing topics, only show partitions whose leader is not available")
private val reportUnderMinIsrPartitionsOpt = parser.accepts("under-min-isr-partitions",
"if set when describing topics, only show partitions whose isr count is less than the configured minimum. Not supported with the --zookeeper option.")
private val reportAtMinIsrPartitionsOpt = parser.accepts("at-min-isr-partitions",
"if set when describing topics, only show partitions whose isr count is equal to the configured minimum. Not supported with the --zookeeper option.")
private val topicsWithOverridesOpt = parser.accepts("topics-with-overrides",
"if set when describing topics, only show topics that have overridden configs")
private val ifExistsOpt = parser.accepts("if-exists",
@ -568,6 +577,8 @@ object TopicCommand extends Logging {
private val allTopicLevelOpts: Set[OptionSpec[_]] = Set(alterOpt, createOpt, describeOpt, listOpt, deleteOpt)
private val allReplicationReportOpts: Set[OptionSpec[_]] = Set(reportUnderReplicatedPartitionsOpt, reportUnderMinIsrPartitionsOpt, reportAtMinIsrPartitionsOpt, reportUnavailablePartitionsOpt)
def has(builder: OptionSpec[_]): Boolean = options.has(builder)
def valueAsOption[A](option: OptionSpec[A], defaultValue: Option[A] = None): Option[A] = if (has(option)) Some(options.valueOf(option)) else defaultValue
def valuesAsOption[A](option: OptionSpec[A], defaultValue: Option[util.List[A]] = None): Option[util.List[A]] = if (has(option)) Some(options.valuesOf(option)) else defaultValue
@ -593,6 +604,7 @@ object TopicCommand extends Logging {
def reportUnderReplicatedPartitions: Boolean = has(reportUnderReplicatedPartitionsOpt)
def reportUnavailablePartitions: Boolean = has(reportUnavailablePartitionsOpt)
def reportUnderMinIsrPartitions: Boolean = has(reportUnderMinIsrPartitionsOpt)
def reportAtMinIsrPartitions: Boolean = has(reportAtMinIsrPartitionsOpt)
def reportOverriddenConfigs: Boolean = has(topicsWithOverridesOpt)
def ifExists: Boolean = has(ifExistsOpt)
def ifNotExists: Boolean = has(ifNotExistsOpt)
@ -636,13 +648,15 @@ object TopicCommand extends Logging {
if(options.has(createOpt))
CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, Set(partitionsOpt, replicationFactorOpt))
CommandLineUtils.checkInvalidArgs(parser, options, reportUnderReplicatedPartitionsOpt,
allTopicLevelOpts -- Set(describeOpt) + reportUnderMinIsrPartitionsOpt + reportUnavailablePartitionsOpt + topicsWithOverridesOpt)
allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnderReplicatedPartitionsOpt + topicsWithOverridesOpt)
CommandLineUtils.checkInvalidArgs(parser, options, reportUnderMinIsrPartitionsOpt,
allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + reportUnavailablePartitionsOpt + topicsWithOverridesOpt + zkConnectOpt)
allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnderMinIsrPartitionsOpt + topicsWithOverridesOpt + zkConnectOpt)
CommandLineUtils.checkInvalidArgs(parser, options, reportAtMinIsrPartitionsOpt,
allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportAtMinIsrPartitionsOpt + topicsWithOverridesOpt + zkConnectOpt)
CommandLineUtils.checkInvalidArgs(parser, options, reportUnavailablePartitionsOpt,
allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + reportUnderReplicatedPartitionsOpt + topicsWithOverridesOpt)
allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnavailablePartitionsOpt + topicsWithOverridesOpt)
CommandLineUtils.checkInvalidArgs(parser, options, topicsWithOverridesOpt,
allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + reportUnderMinIsrPartitionsOpt + reportUnavailablePartitionsOpt)
allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts)
CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, allTopicLevelOpts -- Set(alterOpt, deleteOpt, describeOpt) ++ Set(bootstrapServerOpt))
CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, allTopicLevelOpts -- Set(createOpt) ++ Set(bootstrapServerOpt))
CommandLineUtils.checkInvalidArgs(parser, options, excludeInternalTopicOpt, allTopicLevelOpts -- Set(listOpt, describeOpt))

View File

@ -127,6 +127,15 @@ class Partition(val topicPartition: TopicPartition,
tags
)
newGauge("AtMinIsr",
new Gauge[Int] {
def value = {
if (isAtMinIsr) 1 else 0
}
},
tags
)
newGauge("ReplicasCount",
new Gauge[Int] {
def value = {
@ -162,6 +171,15 @@ class Partition(val topicPartition: TopicPartition,
}
}
def isAtMinIsr: Boolean = {
leaderReplicaIfLocal match {
case Some(leaderReplica) =>
inSyncReplicas.size == leaderReplica.log.get.config.minInSyncReplicas
case None =>
false
}
}
/**
* Create the future replica if 1) the current replica is not in the given log directory and 2) the future replica
* does not exist. This method assumes that the current replica has already been created.
@ -1022,6 +1040,7 @@ class Partition(val topicPartition: TopicPartition,
removeMetric("InSyncReplicasCount", tags)
removeMetric("ReplicasCount", tags)
removeMetric("LastStableOffsetLag", tags)
removeMetric("AtMinIsr", tags)
}
override def equals(that: Any): Boolean = that match {

View File

@ -241,6 +241,12 @@ class ReplicaManager(val config: KafkaConfig,
def value = leaderPartitionsIterator.count(_.isUnderMinIsr)
}
)
val atMinIsrPartitionCount = newGauge(
"AtMinIsrPartitionCount",
new Gauge[Int] {
def value = leaderPartitionsIterator.count(_.isAtMinIsr)
}
)
val isrExpandRate = newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS)
val isrShrinkRate = newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS)
@ -1482,6 +1488,7 @@ class ReplicaManager(val config: KafkaConfig,
removeMetric("OfflineReplicaCount")
removeMetric("UnderReplicatedPartitions")
removeMetric("UnderMinIsrPartitionCount")
removeMetric("AtMinIsrPartitionCount")
}
// High watermark do not need to be checkpointed only when under unit tests

View File

@ -596,6 +596,28 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
}
}
@Test
def testDescribeAtMinIsrPartitions(): Unit = {
val configMap = new java.util.HashMap[String, String]()
configMap.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "4")
adminClient.createTopics(
Collections.singletonList(new NewTopic(testTopicName, 1, 6).configs(configMap))).all().get()
waitForTopicCreated(testTopicName)
try {
killBroker(0)
killBroker(1)
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--at-min-isr-partitions"))))
val rows = output.split("\n")
assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"))
assertEquals(1, rows.length);
} finally {
restartDeadBrokers()
}
}
/**
* Test describe --under-min-isr-partitions option with four topics:
* (1) topic with partition under the configured min ISR count

View File

@ -1035,4 +1035,25 @@ class PartitionTest {
builder.build()
}
/**
* Test for AtMinIsr partition state. We set the partition replica set size as 3, but only set one replica as an ISR.
* As the default minIsr configuration is 1, then the partition should be at min ISR (isAtMinIsr = true).
*/
@Test
def testAtMinIsr(): Unit = {
val controllerEpoch = 3
val leader = brokerId
val follower1 = brokerId + 1
val follower2 = brokerId + 2
val controllerId = brokerId + 3
val replicas = List[Integer](leader, follower1, follower2).asJava
val isr = List[Integer](leader).asJava
val leaderEpoch = 8
val partition = Partition(topicPartition, time, replicaManager)
assertFalse(partition.isAtMinIsr)
// Make isr set to only have leader to trigger AtMinIsr (default min isr config is 1)
partition.makeLeader(controllerId, new LeaderAndIsrRequest.PartitionState(controllerEpoch, leader, leaderEpoch, isr, 1, replicas, true), 0)
assertTrue(partition.isAtMinIsr)
}
}

View File

@ -865,6 +865,11 @@
<td>kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount</td>
<td>0</td>
</tr>
<tr>
<td># of at minIsr partitions (|ISR| = min.insync.replicas)</td>
<td>kafka.server:type=ReplicaManager,name=AtMinIsrPartitionCount</td>
<td>0</td>
</tr>
<tr>
<td># of offline log directories</td>
<td>kafka.log:type=LogManager,name=OfflineLogDirectoryCount</td>