MINOR: Make LeaderAndIsr immutable case class

Also include a few code readability improvements.

Author: jozi-k <jozef.koval@protonmail.ch>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2731 from jozi-k/immutable_LeaderAndIsr
This commit is contained in:
jozi-k 2017-04-12 00:57:08 +01:00 committed by Ismael Juma
parent 256f8d5662
commit 1d25369d22
7 changed files with 152 additions and 139 deletions

View File

@ -25,12 +25,25 @@ import scala.collection.Set
object LeaderAndIsr { object LeaderAndIsr {
val initialLeaderEpoch: Int = 0 val initialLeaderEpoch: Int = 0
val initialZKVersion: Int = 0 val initialZKVersion: Int = 0
val NoLeader = -1 val NoLeader: Int = -1
val LeaderDuringDelete = -2 val LeaderDuringDelete: Int = -2
def apply(leader: Int, isr: List[Int]): LeaderAndIsr = LeaderAndIsr(leader, initialLeaderEpoch, isr, initialZKVersion)
def duringDelete(isr: List[Int]): LeaderAndIsr = LeaderAndIsr(LeaderDuringDelete, isr)
} }
case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int], var zkVersion: Int) { case class LeaderAndIsr(leader: Int,
def this(leader: Int, isr: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, isr, LeaderAndIsr.initialZKVersion) leaderEpoch: Int,
isr: List[Int],
zkVersion: Int) {
def withZkVersion(zkVersion: Int) = copy(zkVersion = zkVersion)
def newLeader(leader: Int) = newLeaderAndIsr(leader, isr)
def newLeaderAndIsr(leader: Int, isr: List[Int]) = LeaderAndIsr(leader, leaderEpoch + 1, isr, zkVersion + 1)
def newEpochAndZkVersion = newLeaderAndIsr(leader, isr)
override def toString: String = { override def toString: String = {
Json.encode(Map("leader" -> leader, "leader_epoch" -> leaderEpoch, "isr" -> isr)) Json.encode(Map("leader" -> leader, "leader_epoch" -> leaderEpoch, "isr" -> isr))
@ -39,12 +52,10 @@ case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int
case class PartitionStateInfo(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, allReplicas: Set[Int]) { case class PartitionStateInfo(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, allReplicas: Set[Int]) {
def replicationFactor = allReplicas.size
override def toString: String = { override def toString: String = {
val partitionStateInfo = new StringBuilder val partitionStateInfo = new StringBuilder
partitionStateInfo.append("(LeaderAndIsrInfo:" + leaderIsrAndControllerEpoch.toString) partitionStateInfo.append("(LeaderAndIsrInfo:" + leaderIsrAndControllerEpoch.toString)
partitionStateInfo.append(",ReplicationFactor:" + replicationFactor + ")") partitionStateInfo.append(",ReplicationFactor:" + allReplicas.size + ")")
partitionStateInfo.append(",AllReplicas:" + allReplicas.mkString(",") + ")") partitionStateInfo.append(",AllReplicas:" + allReplicas.mkString(",") + ")")
partitionStateInfo.toString() partitionStateInfo.toString()
} }

View File

@ -311,20 +311,24 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
/** Send UpdateMetadataRequest to the given brokers for the given partitions and partitions that are being deleted */ /** Send UpdateMetadataRequest to the given brokers for the given partitions and partitions that are being deleted */
def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int], def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
partitions: collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition], partitions: collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition]) {
callback: AbstractResponse => Unit = null) {
def updateMetadataRequestPartitionInfo(partition: TopicAndPartition, beingDeleted: Boolean) { def updateMetadataRequestPartitionInfo(partition: TopicAndPartition, beingDeleted: Boolean) {
val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition) val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)
leaderIsrAndControllerEpochOpt match { leaderIsrAndControllerEpochOpt match {
case Some(leaderIsrAndControllerEpoch) => case Some(l @ LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)) =>
val replicas = controllerContext.partitionReplicaAssignment(partition).toSet val replicas = controllerContext.partitionReplicaAssignment(partition).toSet
val partitionStateInfo = if (beingDeleted) {
val leaderAndIsr = new LeaderAndIsr(LeaderAndIsr.LeaderDuringDelete, leaderIsrAndControllerEpoch.leaderAndIsr.isr) val leaderIsrAndControllerEpoch = if (beingDeleted) {
PartitionStateInfo(LeaderIsrAndControllerEpoch(leaderAndIsr, leaderIsrAndControllerEpoch.controllerEpoch), replicas) val leaderDuringDelete = LeaderAndIsr.duringDelete(leaderAndIsr.isr)
LeaderIsrAndControllerEpoch(leaderDuringDelete, controllerEpoch)
} else { } else {
PartitionStateInfo(leaderIsrAndControllerEpoch, replicas) l
} }
val partitionStateInfo = PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)
updateMetadataRequestPartitionInfoMap.put(new TopicPartition(partition.topic, partition.partition), partitionStateInfo) updateMetadataRequestPartitionInfoMap.put(new TopicPartition(partition.topic, partition.partition), partitionStateInfo)
case None => case None =>
info("Leader not yet assigned for partition %s. Skip sending UpdateMetadataRequest.".format(partition)) info("Leader not yet assigned for partition %s. Skip sending UpdateMetadataRequest.".format(partition))
} }

View File

@ -1081,21 +1081,21 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
newIsr = leaderAndIsr.isr newIsr = leaderAndIsr.isr
} }
val newLeaderAndIsr = new LeaderAndIsr(newLeader, leaderAndIsr.leaderEpoch + 1, val newLeaderAndIsr = leaderAndIsr.newLeaderAndIsr(newLeader, newIsr)
newIsr, leaderAndIsr.zkVersion + 1)
// update the new leadership decision in zookeeper or retry // update the new leadership decision in zookeeper or retry
val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partition, val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partition,
newLeaderAndIsr, epoch, leaderAndIsr.zkVersion) newLeaderAndIsr, epoch, leaderAndIsr.zkVersion)
newLeaderAndIsr.zkVersion = newVersion val leaderWithNewVersion = newLeaderAndIsr.withZkVersion(newVersion)
finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch)) finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderWithNewVersion, epoch))
controllerContext.partitionLeadershipInfo.put(topicAndPartition, finalLeaderIsrAndControllerEpoch.get) controllerContext.partitionLeadershipInfo.put(topicAndPartition, finalLeaderIsrAndControllerEpoch.get)
if (updateSucceeded) if (updateSucceeded) {
info("New leader and ISR for partition %s is %s".format(topicAndPartition, newLeaderAndIsr.toString())) info(s"New leader and ISR for partition $topicAndPartition is $leaderWithNewVersion")
}
updateSucceeded updateSucceeded
} else { } else {
warn("Cannot remove replica %d from ISR of partition %s since it is not in the ISR. Leader = %d ; ISR = %s" warn(s"Cannot remove replica $replicaId from ISR of partition $topicAndPartition since it is not in the ISR." +
.format(replicaId, topicAndPartition, leaderAndIsr.leader, leaderAndIsr.isr)) s" Leader = ${leaderAndIsr.leader} ; ISR = ${leaderAndIsr.isr}")
finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderAndIsr, epoch)) finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderAndIsr, epoch))
controllerContext.partitionLeadershipInfo.put(topicAndPartition, finalLeaderIsrAndControllerEpoch.get) controllerContext.partitionLeadershipInfo.put(topicAndPartition, finalLeaderIsrAndControllerEpoch.get)
true true
@ -1133,20 +1133,20 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
"controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch)) "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
// increment the leader epoch even if there are no leader or isr changes to allow the leader to cache the expanded // increment the leader epoch even if there are no leader or isr changes to allow the leader to cache the expanded
// assigned replica list // assigned replica list
val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1, val newLeaderAndIsr = leaderAndIsr.newEpochAndZkVersion
leaderAndIsr.isr, leaderAndIsr.zkVersion + 1)
// update the new leadership decision in zookeeper or retry // update the new leadership decision in zookeeper or retry
val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic,
partition, newLeaderAndIsr, epoch, leaderAndIsr.zkVersion) partition, newLeaderAndIsr, epoch, leaderAndIsr.zkVersion)
newLeaderAndIsr.zkVersion = newVersion val leaderWithNewVersion = newLeaderAndIsr.withZkVersion(newVersion)
finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch)) finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderWithNewVersion, epoch))
if (updateSucceeded) if (updateSucceeded) {
info("Updated leader epoch for partition %s to %d".format(topicAndPartition, newLeaderAndIsr.leaderEpoch)) info(s"Updated leader epoch for partition $topicAndPartition to ${leaderWithNewVersion.leaderEpoch}")
}
updateSucceeded updateSucceeded
case None => case None =>
throw new IllegalStateException(("Cannot update leader epoch for partition %s as leaderAndIsr path is empty. " + throw new IllegalStateException(s"Cannot update leader epoch for partition $topicAndPartition as " +
"This could mean we somehow tried to reassign a partition that doesn't exist").format(topicAndPartition)) "leaderAndIsr path is empty. This could mean we somehow tried to reassign a partition that doesn't exist")
true true
} }
} }

View File

@ -48,49 +48,49 @@ trait PartitionLeaderSelector {
*/ */
class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, config: KafkaConfig) class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, config: KafkaConfig)
extends PartitionLeaderSelector with Logging { extends PartitionLeaderSelector with Logging {
this.logIdent = "[OfflinePartitionLeaderSelector]: "
logIdent = "[OfflinePartitionLeaderSelector]: "
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
controllerContext.partitionReplicaAssignment.get(topicAndPartition) match { controllerContext.partitionReplicaAssignment.get(topicAndPartition) match {
case Some(assignedReplicas) => case Some(assignedReplicas) =>
val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r)) val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
val newLeaderAndIsr = val newLeaderAndIsr =
if (liveBrokersInIsr.isEmpty) { if (liveBrokersInIsr.isEmpty) {
// Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration // Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration
// for unclean leader election. // for unclean leader election.
if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkUtils, if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkUtils,
ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) { ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
throw new NoReplicaOnlineException(("No broker in ISR for partition " + throw new NoReplicaOnlineException(
"%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + s"No broker in ISR for partition $topicAndPartition is alive. Live brokers are: [${controllerContext.liveBrokerIds}], " +
" ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(","))) s"ISR brokers are: [${currentLeaderAndIsr.isr.mkString(",")}]"
)
} }
debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s" debug(s"No broker in ISR is alive for $topicAndPartition. Pick the leader from the alive assigned " +
.format(topicAndPartition, liveAssignedReplicas.mkString(","))) s"replicas: ${liveAssignedReplicas.mkString(",")}")
if (liveAssignedReplicas.isEmpty) { if (liveAssignedReplicas.isEmpty) {
throw new NoReplicaOnlineException(("No replica for partition " + throw new NoReplicaOnlineException(s"No replica for partition $topicAndPartition is alive. Live " +
"%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + s"brokers are: [${controllerContext.liveBrokerIds}]. Assigned replicas are: [$assignedReplicas].")
" Assigned replicas are: [%s]".format(assignedReplicas))
} else { } else {
ControllerStats.uncleanLeaderElectionRate.mark() ControllerStats.uncleanLeaderElectionRate.mark()
val newLeader = liveAssignedReplicas.head val newLeader = liveAssignedReplicas.head
warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss." warn(s"No broker in ISR is alive for $topicAndPartition. Elect leader $newLeader from live " +
.format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(","))) s"brokers ${liveAssignedReplicas.mkString(",")}. There's potential data loss.")
new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1) currentLeaderAndIsr.newLeaderAndIsr(newLeader, List(newLeader))
} }
} else { } else {
val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r)) val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r))
val newLeader = liveReplicasInIsr.head val newLeader = liveReplicasInIsr.head
debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader." debug(s"Some broker in ISR is alive for $topicAndPartition. Select $newLeader from ISR " +
.format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(","))) s"${liveBrokersInIsr.mkString(",")} to be the leader.")
new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1) currentLeaderAndIsr.newLeaderAndIsr(newLeader, liveBrokersInIsr)
} }
info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition)) info(s"Selected new leader and ISR $newLeaderAndIsr for offline partition $topicAndPartition")
(newLeaderAndIsr, liveAssignedReplicas) (newLeaderAndIsr, liveAssignedReplicas)
case None => case None =>
throw new NoReplicaOnlineException("Partition %s doesn't have replicas assigned to it".format(topicAndPartition)) throw new NoReplicaOnlineException(s"Partition $topicAndPartition doesn't have replicas assigned to it")
} }
} }
} }
@ -101,30 +101,29 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi
* Replicas to receive LeaderAndIsr request = reassigned replicas * Replicas to receive LeaderAndIsr request = reassigned replicas
*/ */
class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
this.logIdent = "[ReassignedPartitionLeaderSelector]: "
logIdent = "[ReassignedPartitionLeaderSelector]: "
/** /**
* The reassigned replicas are already in the ISR when selectLeader is called. * The reassigned replicas are already in the ISR when selectLeader is called.
*/ */
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { def selectLeader(topicAndPartition: TopicAndPartition,
currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas
val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch val newLeaderOpt = reassignedInSyncReplicas.find { r =>
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion controllerContext.liveBrokerIds.contains(r) && currentLeaderAndIsr.isr.contains(r)
val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r) &&
currentLeaderAndIsr.isr.contains(r))
val newLeaderOpt = aliveReassignedInSyncReplicas.headOption
newLeaderOpt match {
case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr,
currentLeaderIsrZkPathVersion + 1), reassignedInSyncReplicas)
case None =>
reassignedInSyncReplicas.size match {
case 0 =>
throw new NoReplicaOnlineException("List of reassigned replicas for partition " +
" %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
case _ =>
throw new NoReplicaOnlineException("None of the reassigned replicas for partition " +
"%s are in-sync with the leader. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
} }
newLeaderOpt match {
case Some(newLeader) => (currentLeaderAndIsr.newLeader(newLeader), reassignedInSyncReplicas)
case None =>
val errorMessage = if (reassignedInSyncReplicas.isEmpty) {
s"List of reassigned replicas for partition $topicAndPartition is empty. Current leader and ISR: " +
s"[$currentLeaderAndIsr]"
} else {
s"None of the reassigned replicas for partition $topicAndPartition are in-sync with the leader. " +
s"Current leader and ISR: [$currentLeaderAndIsr]"
}
throw new NoReplicaOnlineException(errorMessage)
} }
} }
} }
@ -134,11 +133,12 @@ class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) ex
* New isr = current isr; * New isr = current isr;
* Replicas to receive LeaderAndIsr request = assigned replicas * Replicas to receive LeaderAndIsr request = assigned replicas
*/ */
class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
with Logging {
this.logIdent = "[PreferredReplicaPartitionLeaderSelector]: "
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { logIdent = "[PreferredReplicaPartitionLeaderSelector]: "
def selectLeader(topicAndPartition: TopicAndPartition,
currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
val preferredReplica = assignedReplicas.head val preferredReplica = assignedReplicas.head
// check if preferred replica is the current leader // check if preferred replica is the current leader
@ -151,11 +151,11 @@ with Logging {
" Triggering preferred replica leader election") " Triggering preferred replica leader election")
// check if preferred replica is not the current leader and is alive and in the isr // check if preferred replica is not the current leader and is alive and in the isr
if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) { if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) {
(new LeaderAndIsr(preferredReplica, currentLeaderAndIsr.leaderEpoch + 1, currentLeaderAndIsr.isr, val newLeaderAndIsr = currentLeaderAndIsr.newLeader(preferredReplica)
currentLeaderAndIsr.zkVersion + 1), assignedReplicas) (newLeaderAndIsr, assignedReplicas)
} else { } else {
throw new StateChangeFailedException("Preferred replica %d for partition ".format(preferredReplica) + throw new StateChangeFailedException(s"Preferred replica $preferredReplica for partition $topicAndPartition " +
"%s is either not alive or not in the isr. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) s"is either not alive or not in the isr. Current leader and ISR: [$currentLeaderAndIsr]")
} }
} }
} }
@ -166,30 +166,26 @@ with Logging {
* New isr = current isr - shutdown replica; * New isr = current isr - shutdown replica;
* Replicas to receive LeaderAndIsr request = live assigned replicas * Replicas to receive LeaderAndIsr request = live assigned replicas
*/ */
class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
extends PartitionLeaderSelector
with Logging {
this.logIdent = "[ControlledShutdownLeaderSelector]: " logIdent = "[ControlledShutdownLeaderSelector]: "
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
val currentLeader = currentLeaderAndIsr.leader
def selectLeader(topicAndPartition: TopicAndPartition,
currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
val currentIsr = currentLeaderAndIsr.isr
val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r)) val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r))
val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId)) val newIsr = currentIsr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId))
liveAssignedReplicas.find(newIsr.contains) match { liveAssignedReplicas.find(newIsr.contains) match {
case Some(newLeader) => case Some(newLeader) =>
debug("Partition %s : current leader = %d, new leader = %d".format(topicAndPartition, currentLeader, newLeader)) debug(s"Partition $topicAndPartition : current leader = ${currentLeaderAndIsr.leader}, new leader = $newLeader")
(LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), liveAssignedReplicas) val newLeaderAndIsr = currentLeaderAndIsr.newLeaderAndIsr(newLeader, newIsr)
(newLeaderAndIsr, liveAssignedReplicas)
case None => case None =>
throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides" + throw new StateChangeFailedException(s"No other replicas in ISR ${currentIsr.mkString(",")} for $topicAndPartition " +
" shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString(","))) s"besides shutting down brokers ${controllerContext.shuttingDownBrokerIds.mkString(",")}")
} }
} }
} }
@ -200,9 +196,10 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
*/ */
class NoOpLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging { class NoOpLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
this.logIdent = "[NoOpLeaderSelector]: " logIdent = "[NoOpLeaderSelector]: "
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { def selectLeader(topicAndPartition: TopicAndPartition,
currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
warn("I should never have been asked to perform leader election, returning the current LeaderAndIsr and replica assignment.") warn("I should never have been asked to perform leader election, returning the current LeaderAndIsr and replica assignment.")
(currentLeaderAndIsr, controllerContext.partitionReplicaAssignment(topicAndPartition)) (currentLeaderAndIsr, controllerContext.partitionReplicaAssignment(topicAndPartition))
} }

View File

@ -266,42 +266,52 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
* OfflinePartition state. * OfflinePartition state.
* @param topicAndPartition The topic/partition whose leader and isr path is to be initialized * @param topicAndPartition The topic/partition whose leader and isr path is to be initialized
*/ */
private def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) { private def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) = {
val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition) val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition).toList
val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r)) val liveAssignedReplicas = replicaAssignment.filter(controllerContext.liveBrokerIds.contains)
liveAssignedReplicas.size match { liveAssignedReplicas.headOption match {
case 0 => case None =>
val failMsg = ("encountered error during state change of partition %s from New to Online, assigned replicas are [%s], " + val failMsg = s"Controller $controllerId epoch ${controller.epoch} encountered error during state change of " +
"live brokers are [%s]. No assigned replica is alive.") s"partition $topicAndPartition from New to Online, assigned replicas are " +
.format(topicAndPartition, replicaAssignment.mkString(","), controllerContext.liveBrokerIds) s"[${replicaAssignment.mkString(",")}], live brokers are [${controllerContext.liveBrokerIds}]. No assigned " +
stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg) "replica is alive."
stateChangeLogger.error(failMsg)
throw new StateChangeFailedException(failMsg) throw new StateChangeFailedException(failMsg)
case _ =>
debug("Live assigned replicas for partition %s are: [%s]".format(topicAndPartition, liveAssignedReplicas)) // leader is the first replica in the list of assigned replicas
// make the first replica in the list of assigned replicas, the leader case Some(leader) =>
val leader = liveAssignedReplicas.head debug(s"Live assigned replicas for partition $topicAndPartition are: [$liveAssignedReplicas]")
val leaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader, liveAssignedReplicas.toList), val leaderAndIsr = LeaderAndIsr(leader, liveAssignedReplicas)
controller.epoch) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controller.epoch)
debug("Initializing leader and isr for partition %s to %s".format(topicAndPartition, leaderIsrAndControllerEpoch)) debug(s"Initializing leader and isr for partition $topicAndPartition to $leaderIsrAndControllerEpoch")
try { try {
zkUtils.createPersistentPath( zkUtils.createPersistentPath(
getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition), getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
zkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch)) zkUtils.leaderAndIsrZkData(leaderAndIsr, controller.epoch)
)
// NOTE: the above write can fail only if the current controller lost its zk session and the new controller // NOTE: the above write can fail only if the current controller lost its zk session and the new controller
// took over and initialized this partition. This can happen if the current controller went into a long // took over and initialized this partition. This can happen if the current controller went into a long
// GC pause // GC pause
controllerContext.partitionLeadershipInfo.put(topicAndPartition, leaderIsrAndControllerEpoch) controllerContext.partitionLeadershipInfo.put(topicAndPartition, leaderIsrAndControllerEpoch)
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic, brokerRequestBatch.addLeaderAndIsrRequestForBrokers(
topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment) liveAssignedReplicas,
topicAndPartition.topic,
topicAndPartition.partition,
leaderIsrAndControllerEpoch,
replicaAssignment
)
} catch { } catch {
case _: ZkNodeExistsException => case _: ZkNodeExistsException =>
// read the controller epoch // read the controller epoch
val leaderIsrAndEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topicAndPartition.topic, val leaderIsrAndEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topicAndPartition.topic,
topicAndPartition.partition).get topicAndPartition.partition).get
val failMsg = ("encountered error while changing partition %s's state from New to Online since LeaderAndIsr path already " +
"exists with value %s and controller epoch %d") val failMsg = s"Controller $controllerId epoch ${controller.epoch} encountered error while changing " +
.format(topicAndPartition, leaderIsrAndEpoch.leaderAndIsr.toString(), leaderIsrAndEpoch.controllerEpoch) s"partition $topicAndPartition's state from New to Online since LeaderAndIsr path already exists with " +
stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg) s"value ${leaderIsrAndEpoch.leaderAndIsr} and controller epoch ${leaderIsrAndEpoch.controllerEpoch}"
stateChangeLogger.error(failMsg)
throw new StateChangeFailedException(failMsg) throw new StateChangeFailedException(failMsg)
} }
} }
@ -339,12 +349,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr) val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr)
val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partition, val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partition,
leaderAndIsr, controller.epoch, currentLeaderAndIsr.zkVersion) leaderAndIsr, controller.epoch, currentLeaderAndIsr.zkVersion)
newLeaderAndIsr = leaderAndIsr newLeaderAndIsr = leaderAndIsr.withZkVersion(newVersion)
newLeaderAndIsr.zkVersion = newVersion
zookeeperPathUpdateSucceeded = updateSucceeded zookeeperPathUpdateSucceeded = updateSucceeded
replicasForThisPartition = replicas replicasForThisPartition = replicas
} }
val newLeaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch) val newLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch)
// update the leader cache // update the leader cache
controllerContext.partitionLeadershipInfo.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch) controllerContext.partitionLeadershipInfo.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch)
stateChangeLogger.trace("Controller %d epoch %d elected leader %d for Offline partition %s" stateChangeLogger.trace("Controller %d epoch %d elected leader %d for Offline partition %s"

View File

@ -45,7 +45,7 @@ class ControlledShutdownLeaderSelectorTest {
controllerContext.partitionReplicaAssignment = mutable.Map(topicPartition -> assignment) controllerContext.partitionReplicaAssignment = mutable.Map(topicPartition -> assignment)
val leaderSelector = new ControlledShutdownLeaderSelector(controllerContext) val leaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
val firstLeaderAndIsr = new LeaderAndIsr(firstLeader, firstIsr) val firstLeaderAndIsr = LeaderAndIsr(firstLeader, firstIsr)
val (secondLeaderAndIsr, secondReplicas) = leaderSelector.selectLeader(topicPartition, firstLeaderAndIsr) val (secondLeaderAndIsr, secondReplicas) = leaderSelector.selectLeader(topicPartition, firstLeaderAndIsr)
assertEquals(preferredReplicaId, secondLeaderAndIsr.leader) assertEquals(preferredReplicaId, secondLeaderAndIsr.leader)

View File

@ -700,30 +700,22 @@ object TestUtils extends Logging {
new ProducerRequest(correlationId, clientId, acks.toShort, timeout, collection.mutable.Map(data:_*)) new ProducerRequest(correlationId, clientId, acks.toShort, timeout, collection.mutable.Map(data:_*))
} }
def makeLeaderForPartition(zkUtils: ZkUtils, topic: String, def makeLeaderForPartition(zkUtils: ZkUtils,
topic: String,
leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int], leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int],
controllerEpoch: Int) { controllerEpoch: Int) {
leaderPerPartitionMap.foreach leaderPerPartitionMap.foreach { case (partition, leader) =>
{ try {
leaderForPartition => { val newLeaderAndIsr = zkUtils.getLeaderAndIsrForPartition(topic, partition)
val partition = leaderForPartition._1 .map(_.newLeader(leader))
val leader = leaderForPartition._2 .getOrElse(LeaderAndIsr(leader, List(leader)))
try{
val currentLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topic, partition) zkUtils.updatePersistentPath(
var newLeaderAndIsr: LeaderAndIsr = null getTopicPartitionLeaderAndIsrPath(topic, partition),
if(currentLeaderAndIsrOpt.isEmpty) zkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch)
newLeaderAndIsr = new LeaderAndIsr(leader, List(leader)) )
else{
newLeaderAndIsr = currentLeaderAndIsrOpt.get
newLeaderAndIsr.leader = leader
newLeaderAndIsr.leaderEpoch += 1
newLeaderAndIsr.zkVersion += 1
}
zkUtils.updatePersistentPath(getTopicPartitionLeaderAndIsrPath(topic, partition),
zkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch))
} catch { } catch {
case oe: Throwable => error("Error while electing leader for partition [%s,%d]".format(topic, partition), oe) case oe: Throwable => error(s"Error while electing leader for partition [$topic,$partition]", oe)
}
} }
} }
} }