mirror of https://github.com/apache/kafka.git
kafka-1073; CheckReassignmentStatus is broken; patched by Jun Rao; reviewed by Guozhang Wang, Swapnil Ghike and Neha Narkhede
This commit is contained in:
parent
2c6d3c7b45
commit
71ed6ca336
|
@ -1,17 +0,0 @@
|
||||||
#!/bin/bash
|
|
||||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
# contributor license agreements. See the NOTICE file distributed with
|
|
||||||
# this work for additional information regarding copyright ownership.
|
|
||||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
# (the "License"); you may not use this file except in compliance with
|
|
||||||
# the License. You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
$(dirname $0)/kafka-run-class.sh kafka.admin.CheckReassignmentStatus $@
|
|
|
@ -1,110 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package kafka.admin
|
|
||||||
|
|
||||||
import joptsimple.OptionParser
|
|
||||||
import org.I0Itec.zkclient.ZkClient
|
|
||||||
import kafka.utils._
|
|
||||||
import scala.collection.Map
|
|
||||||
import kafka.common.TopicAndPartition
|
|
||||||
|
|
||||||
object CheckReassignmentStatus extends Logging {
|
|
||||||
|
|
||||||
def main(args: Array[String]): Unit = {
|
|
||||||
val parser = new OptionParser
|
|
||||||
val jsonFileOpt = parser.accepts("path-to-json-file", "REQUIRED: The JSON file with the list of partitions and the " +
|
|
||||||
"new replicas they should be reassigned to")
|
|
||||||
.withRequiredArg
|
|
||||||
.describedAs("partition reassignment json file path")
|
|
||||||
.ofType(classOf[String])
|
|
||||||
val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " +
|
|
||||||
"form host:port. Multiple URLS can be given to allow fail-over.")
|
|
||||||
.withRequiredArg
|
|
||||||
.describedAs("urls")
|
|
||||||
.ofType(classOf[String])
|
|
||||||
|
|
||||||
val options = parser.parse(args : _*)
|
|
||||||
|
|
||||||
for(arg <- List(jsonFileOpt, zkConnectOpt)) {
|
|
||||||
if(!options.has(arg)) {
|
|
||||||
System.err.println("Missing required argument \"" + arg + "\"")
|
|
||||||
parser.printHelpOn(System.err)
|
|
||||||
System.exit(1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
val jsonFile = options.valueOf(jsonFileOpt)
|
|
||||||
val zkConnect = options.valueOf(zkConnectOpt)
|
|
||||||
val jsonString = Utils.readFileAsString(jsonFile)
|
|
||||||
val zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
|
|
||||||
|
|
||||||
try {
|
|
||||||
// read the json file into a string
|
|
||||||
val partitionsToBeReassigned = Json.parseFull(jsonString) match {
|
|
||||||
case Some(reassignedPartitions) =>
|
|
||||||
val partitions = reassignedPartitions.asInstanceOf[Array[Map[String, String]]]
|
|
||||||
partitions.map { m =>
|
|
||||||
val topic = m.asInstanceOf[Map[String, String]].get("topic").get
|
|
||||||
val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt
|
|
||||||
val replicasList = m.asInstanceOf[Map[String, String]].get("replicas").get
|
|
||||||
val newReplicas = replicasList.split(",").map(_.toInt)
|
|
||||||
(TopicAndPartition(topic, partition), newReplicas.toSeq)
|
|
||||||
}.toMap
|
|
||||||
case None => Map.empty[TopicAndPartition, Seq[Int]]
|
|
||||||
}
|
|
||||||
|
|
||||||
val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned)
|
|
||||||
reassignedPartitionsStatus.foreach { partition =>
|
|
||||||
partition._2 match {
|
|
||||||
case ReassignmentCompleted =>
|
|
||||||
println("Partition %s reassignment completed successfully".format(partition._1))
|
|
||||||
case ReassignmentFailed =>
|
|
||||||
println("Partition %s reassignment failed".format(partition._1))
|
|
||||||
case ReassignmentInProgress =>
|
|
||||||
println("Partition %s reassignment in progress".format(partition._1))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]])
|
|
||||||
:Map[TopicAndPartition, ReassignmentStatus] = {
|
|
||||||
val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas)
|
|
||||||
// for all partitions whose replica reassignment is complete, check the status
|
|
||||||
partitionsToBeReassigned.map { topicAndPartition =>
|
|
||||||
(topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition._1,
|
|
||||||
topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def checkIfPartitionReassignmentSucceeded(zkClient: ZkClient, topicAndPartition: TopicAndPartition,
|
|
||||||
reassignedReplicas: Seq[Int],
|
|
||||||
partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]],
|
|
||||||
partitionsBeingReassigned: Map[TopicAndPartition, Seq[Int]]): ReassignmentStatus = {
|
|
||||||
val newReplicas = partitionsToBeReassigned(topicAndPartition)
|
|
||||||
partitionsBeingReassigned.get(topicAndPartition) match {
|
|
||||||
case Some(partition) => ReassignmentInProgress
|
|
||||||
case None =>
|
|
||||||
// check if AR == RAR
|
|
||||||
val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition)
|
|
||||||
if(assignedReplicas == newReplicas)
|
|
||||||
ReassignmentCompleted
|
|
||||||
else
|
|
||||||
ReassignmentFailed
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -58,6 +58,12 @@ object ReassignPartitionsCommand extends Logging {
|
||||||
.describedAs("execute")
|
.describedAs("execute")
|
||||||
.ofType(classOf[String])
|
.ofType(classOf[String])
|
||||||
|
|
||||||
|
val statusCheckJsonFileOpt = parser.accepts("status-check-json-file", "REQUIRED: The JSON file with the list of partitions and the " +
|
||||||
|
"new replicas they should be reassigned to, which can be obtained from the output of a dry run.")
|
||||||
|
.withRequiredArg
|
||||||
|
.describedAs("partition reassignment json file path")
|
||||||
|
.ofType(classOf[String])
|
||||||
|
|
||||||
val options = parser.parse(args : _*)
|
val options = parser.parse(args : _*)
|
||||||
|
|
||||||
for(arg <- List(zkConnectOpt)) {
|
for(arg <- List(zkConnectOpt)) {
|
||||||
|
@ -80,7 +86,24 @@ object ReassignPartitionsCommand extends Logging {
|
||||||
|
|
||||||
var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]()
|
var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]()
|
||||||
|
|
||||||
if(options.has(topicsToMoveJsonFileOpt)) {
|
if(options.has(statusCheckJsonFileOpt)) {
|
||||||
|
val jsonFile = options.valueOf(statusCheckJsonFileOpt)
|
||||||
|
val jsonString = Utils.readFileAsString(jsonFile)
|
||||||
|
val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString)
|
||||||
|
|
||||||
|
println("Status of partition reassignment:")
|
||||||
|
val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned)
|
||||||
|
reassignedPartitionsStatus.foreach { partition =>
|
||||||
|
partition._2 match {
|
||||||
|
case ReassignmentCompleted =>
|
||||||
|
println("Reassignment of partition %s completed successfully".format(partition._1))
|
||||||
|
case ReassignmentFailed =>
|
||||||
|
println("Reassignment of partition %s failed".format(partition._1))
|
||||||
|
case ReassignmentInProgress =>
|
||||||
|
println("Reassignment of partition %s is still in progress".format(partition._1))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if(options.has(topicsToMoveJsonFileOpt)) {
|
||||||
val topicsToMoveJsonFile = options.valueOf(topicsToMoveJsonFileOpt)
|
val topicsToMoveJsonFile = options.valueOf(topicsToMoveJsonFileOpt)
|
||||||
val brokerList = options.valueOf(brokerListOpt)
|
val brokerList = options.valueOf(brokerListOpt)
|
||||||
val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
|
val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
|
||||||
|
@ -107,16 +130,19 @@ object ReassignPartitionsCommand extends Logging {
|
||||||
System.exit(1)
|
System.exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (options.has(executeOpt)) {
|
if (options.has(topicsToMoveJsonFileOpt) || options.has(manualAssignmentJsonFileOpt)) {
|
||||||
val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned)
|
if (options.has(executeOpt)) {
|
||||||
|
val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned)
|
||||||
|
|
||||||
if(reassignPartitionsCommand.reassignPartitions())
|
if(reassignPartitionsCommand.reassignPartitions())
|
||||||
println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned))
|
println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned))
|
||||||
else
|
else
|
||||||
println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
|
println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
|
||||||
} else {
|
} else {
|
||||||
System.out.println("This is a dry run (Use --execute to do the actual reassignment. " +
|
System.out.println("This is a dry run (Use --execute to do the actual reassignment. " +
|
||||||
"The replica assignment is \n" + partitionsToBeReassigned.toString())
|
"The following is the replica assignment. Save it for the status check option.\n" +
|
||||||
|
ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
case e: Throwable =>
|
case e: Throwable =>
|
||||||
|
@ -127,6 +153,32 @@ object ReassignPartitionsCommand extends Logging {
|
||||||
zkClient.close()
|
zkClient.close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]])
|
||||||
|
:Map[TopicAndPartition, ReassignmentStatus] = {
|
||||||
|
val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas)
|
||||||
|
partitionsToBeReassigned.map { topicAndPartition =>
|
||||||
|
(topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition._1,
|
||||||
|
topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private def checkIfPartitionReassignmentSucceeded(zkClient: ZkClient, topicAndPartition: TopicAndPartition,
|
||||||
|
reassignedReplicas: Seq[Int],
|
||||||
|
partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]],
|
||||||
|
partitionsBeingReassigned: Map[TopicAndPartition, Seq[Int]]): ReassignmentStatus = {
|
||||||
|
val newReplicas = partitionsToBeReassigned(topicAndPartition)
|
||||||
|
partitionsBeingReassigned.get(topicAndPartition) match {
|
||||||
|
case Some(partition) => ReassignmentInProgress
|
||||||
|
case None =>
|
||||||
|
// check if the current replica assignment matches the expected one after reassignment
|
||||||
|
val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition)
|
||||||
|
if(assignedReplicas == newReplicas)
|
||||||
|
ReassignmentCompleted
|
||||||
|
else
|
||||||
|
ReassignmentFailed
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[TopicAndPartition, collection.Seq[Int]])
|
class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.Map[TopicAndPartition, collection.Seq[Int]])
|
||||||
|
|
Loading…
Reference in New Issue