mirror of https://github.com/apache/kafka.git
KAFKA-2146: Fix adding partition did not find the correct startIndex.
TopicCommand provide a tool to add partitions for existing topics. It try to find the startIndex from existing partitions. There's a minor flaw in this process, it try to use the first partition fetched from zookeeper as the start partition, and use the first replica id in this partition as the startIndex. One thing, the first partition fetched from zookeeper is not necessary to be the start partition. As partition id begin from zero, we should use partition with id zero as the start partition. The other, broker id does not necessary begin from 0, so the startIndex is not necessary to be the first replica id in the start partition. Author: chenshangan <chenshangan@meituan.com> Reviewers: Guozhang Wang Closes #329 from shangan/trunk-KAFKA-2146
This commit is contained in:
parent
f2d4ed5bc4
commit
4922a51edd
|
|
@ -113,23 +113,31 @@ object AdminUtils extends Logging {
|
|||
if (existingPartitionsReplicaList.size == 0)
|
||||
throw new AdminOperationException("The topic %s does not exist".format(topic))
|
||||
|
||||
val existingReplicaList = existingPartitionsReplicaList.head._2
|
||||
val existingReplicaListForPartitionZero = existingPartitionsReplicaList.find(p => p._1.partition == 0) match {
|
||||
case None => throw new AdminOperationException("the topic does not have partition with id 0, it should never happen")
|
||||
case Some(headPartitionReplica) => headPartitionReplica._2
|
||||
}
|
||||
val partitionsToAdd = numPartitions - existingPartitionsReplicaList.size
|
||||
if (partitionsToAdd <= 0)
|
||||
throw new AdminOperationException("The number of partitions for a topic can only be increased")
|
||||
|
||||
// create the new partition replication list
|
||||
val brokerList = zkUtils.getSortedBrokerList()
|
||||
val newPartitionReplicaList = if (replicaAssignmentStr == null || replicaAssignmentStr == "")
|
||||
AdminUtils.assignReplicasToBrokers(brokerList, partitionsToAdd, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size)
|
||||
val newPartitionReplicaList = if (replicaAssignmentStr == null || replicaAssignmentStr == "") {
|
||||
var startIndex = brokerList.indexWhere(_ >= existingReplicaListForPartitionZero.head)
|
||||
if(startIndex < 0) {
|
||||
startIndex = 0
|
||||
}
|
||||
AdminUtils.assignReplicasToBrokers(brokerList, partitionsToAdd, existingReplicaListForPartitionZero.size, startIndex, existingPartitionsReplicaList.size)
|
||||
}
|
||||
else
|
||||
getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size, checkBrokerAvailable)
|
||||
|
||||
// check if manual assignment has the right replication factor
|
||||
val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaList.size))
|
||||
val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaListForPartitionZero.size))
|
||||
if (unmatchedRepFactorList.size != 0)
|
||||
throw new AdminOperationException("The replication factor in manual replication assignment " +
|
||||
" is not equal to the existing replication factor for the topic " + existingReplicaList.size)
|
||||
" is not equal to the existing replication factor for the topic " + existingReplicaListForPartitionZero.size)
|
||||
|
||||
info("Add partition list for %s is %s".format(topic, newPartitionReplicaList))
|
||||
val partitionReplicaList = existingPartitionsReplicaList.map(p => p._1.partition -> p._2)
|
||||
|
|
|
|||
Loading…
Reference in New Issue