KAFKA-532 Multiple controllers can co-exist during soft failures; patched by Neha Narkhede; reviewed by Jun Rao

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1411010 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Neha Narkhede 2012-11-18 22:48:20 +00:00
parent 79d3be9293
commit 8e7b0d4016
22 changed files with 512 additions and 204 deletions

View File

@ -21,9 +21,8 @@ package kafka.api
import java.nio._
import kafka.utils._
import kafka.api.ApiUtils._
import collection.mutable.Map
import collection.mutable.HashMap
import kafka.cluster.Broker
import kafka.controller.LeaderIsrAndControllerEpoch
object LeaderAndIsr {
@ -35,7 +34,7 @@ case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int
def this(leader: Int, isr: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, isr, LeaderAndIsr.initialZKVersion)
override def toString(): String = {
val jsonDataMap = new HashMap[String, String]
val jsonDataMap = new collection.mutable.HashMap[String, String]
jsonDataMap.put("leader", leader.toString)
jsonDataMap.put("leaderEpoch", leaderEpoch.toString)
jsonDataMap.put("ISR", isr.mkString(","))
@ -43,35 +42,42 @@ case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int
}
}
object PartitionStateInfo {
def readFrom(buffer: ByteBuffer): PartitionStateInfo = {
val controllerEpoch = buffer.getInt
val leader = buffer.getInt
val leaderGenId = buffer.getInt
val leaderEpoch = buffer.getInt
val isrString = readShortString(buffer)
val isr = isrString.split(",").map(_.toInt).toList
val zkVersion = buffer.getInt
val replicationFactor = buffer.getInt
PartitionStateInfo(LeaderAndIsr(leader, leaderGenId, isr, zkVersion), replicationFactor)
PartitionStateInfo(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, leaderEpoch, isr, zkVersion), controllerEpoch),
replicationFactor)
}
}
case class PartitionStateInfo(val leaderAndIsr: LeaderAndIsr, val replicationFactor: Int) {
case class PartitionStateInfo(val leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, val replicationFactor: Int) {
def writeTo(buffer: ByteBuffer) {
buffer.putInt(leaderAndIsr.leader)
buffer.putInt(leaderAndIsr.leaderEpoch)
writeShortString(buffer, leaderAndIsr.isr.mkString(","))
buffer.putInt(leaderAndIsr.zkVersion)
buffer.putInt(leaderIsrAndControllerEpoch.controllerEpoch)
buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.leader)
buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch)
writeShortString(buffer, leaderIsrAndControllerEpoch.leaderAndIsr.isr.mkString(","))
buffer.putInt(leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion)
buffer.putInt(replicationFactor)
}
def sizeInBytes(): Int = {
val size = 4 + 4 + (2 + leaderAndIsr.isr.mkString(",").length) + 4 + 4
val size =
4 /* epoch of the controller that elected the leader */ +
4 /* leader broker id */ +
4 /* leader epoch */ +
(2 + leaderIsrAndControllerEpoch.leaderAndIsr.isr.mkString(",").length) +
4 /* zk version */ +
4 /* replication factor */
size
}
}
object LeaderAndIsrRequest {
val CurrentVersion = 1.shortValue()
val DefaultClientId = ""
@ -83,8 +89,9 @@ object LeaderAndIsrRequest {
val versionId = buffer.getShort
val clientId = readShortString(buffer)
val ackTimeoutMs = buffer.getInt
val controllerEpoch = buffer.getInt
val partitionStateInfosCount = buffer.getInt
val partitionStateInfos = new HashMap[(String, Int), PartitionStateInfo]
val partitionStateInfos = new collection.mutable.HashMap[(String, Int), PartitionStateInfo]
for(i <- 0 until partitionStateInfosCount){
val topic = readShortString(buffer)
@ -99,26 +106,28 @@ object LeaderAndIsrRequest {
for (i <- 0 until leadersCount)
leaders += Broker.readFrom(buffer)
new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, partitionStateInfos, leaders)
new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, partitionStateInfos.toMap, leaders, controllerEpoch)
}
}
case class LeaderAndIsrRequest (versionId: Short,
clientId: String,
ackTimeoutMs: Int,
partitionStateInfos: Map[(String, Int), PartitionStateInfo],
leaders: Set[Broker])
leaders: Set[Broker],
controllerEpoch: Int)
extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker]) = {
this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout, partitionStateInfos, liveBrokers)
def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker], controllerEpoch: Int) = {
this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout,
partitionStateInfos, liveBrokers, controllerEpoch)
}
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
writeShortString(buffer, clientId)
buffer.putInt(ackTimeoutMs)
buffer.putInt(controllerEpoch)
buffer.putInt(partitionStateInfos.size)
for((key, value) <- partitionStateInfos){
writeShortString(buffer, key._1)
@ -130,12 +139,17 @@ case class LeaderAndIsrRequest (versionId: Short,
}
def sizeInBytes(): Int = {
var size = 1 + 2 + (2 + clientId.length) + 4 + 4
var size =
2 /* version id */ +
(2 + clientId.length) /* client id */ +
4 /* ack timeout */ +
4 /* controller epoch */ +
4 /* number of partitions */
for((key, value) <- partitionStateInfos)
size += (2 + key._1.length) + 4 + value.sizeInBytes
size += 4
size += (2 + key._1.length) /* topic */ + 4 /* partition */ + value.sizeInBytes /* partition state info */
size += 4 /* number of leader brokers */
for(broker <- leaders)
size += broker.sizeInBytes
size += broker.sizeInBytes /* broker info */
size
}
}

View File

@ -17,6 +17,7 @@
package kafka.api
import kafka.common.ErrorMapping
import java.nio.ByteBuffer
import kafka.api.ApiUtils._
import collection.mutable.HashMap
@ -26,6 +27,7 @@ import collection.Map
object LeaderAndIsrResponse {
def readFrom(buffer: ByteBuffer): LeaderAndIsrResponse = {
val versionId = buffer.getShort
val errorCode = buffer.getShort
val numEntries = buffer.getInt
val responseMap = new HashMap[(String, Int), Short]()
for (i<- 0 until numEntries){
@ -34,24 +36,32 @@ object LeaderAndIsrResponse {
val partitionErrorCode = buffer.getShort
responseMap.put((topic, partition), partitionErrorCode)
}
new LeaderAndIsrResponse(versionId, responseMap)
new LeaderAndIsrResponse(versionId, responseMap, errorCode)
}
}
case class LeaderAndIsrResponse(versionId: Short,
responseMap: Map[(String, Int), Short])
responseMap: Map[(String, Int), Short],
errorCode: Short = ErrorMapping.NoError)
extends RequestOrResponse {
def sizeInBytes(): Int ={
var size = 2 + 4
for ((key, value) <- responseMap){
size += 2 + key._1.length + 4 + 2
var size =
2 /* version id */ +
2 /* error code */ +
4 /* number of responses */
for ((key, value) <- responseMap) {
size +=
2 + key._1.length /* topic */ +
4 /* partition */ +
2 /* error code for this partition */
}
size
}
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
buffer.putShort(errorCode)
buffer.putInt(responseMap.size)
for ((key:(String, Int), value) <- responseMap){
writeShortString(buffer, key._1)

View File

@ -33,6 +33,7 @@ object StopReplicaRequest extends Logging {
val versionId = buffer.getShort
val clientId = readShortString(buffer)
val ackTimeoutMs = buffer.getInt
val controllerEpoch = buffer.getInt
val deletePartitions = buffer.get match {
case 1 => true
case 0 => false
@ -44,7 +45,7 @@ object StopReplicaRequest extends Logging {
(1 to topicPartitionPairCount) foreach { _ =>
topicPartitionPairSet.add(readShortString(buffer), buffer.getInt)
}
StopReplicaRequest(versionId, clientId, ackTimeoutMs, deletePartitions, topicPartitionPairSet.toSet)
StopReplicaRequest(versionId, clientId, ackTimeoutMs, deletePartitions, topicPartitionPairSet.toSet, controllerEpoch)
}
}
@ -52,18 +53,20 @@ case class StopReplicaRequest(versionId: Short,
clientId: String,
ackTimeoutMs: Int,
deletePartitions: Boolean,
partitions: Set[(String, Int)])
partitions: Set[(String, Int)],
controllerEpoch: Int)
extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) {
def this(deletePartitions: Boolean, partitions: Set[(String, Int)]) = {
def this(deletePartitions: Boolean, partitions: Set[(String, Int)], controllerEpoch: Int) = {
this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout,
deletePartitions, partitions)
deletePartitions, partitions, controllerEpoch)
}
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
writeShortString(buffer, clientId)
buffer.putInt(ackTimeoutMs)
buffer.putInt(controllerEpoch)
buffer.put(if (deletePartitions) 1.toByte else 0.toByte)
buffer.putInt(partitions.size)
for ((topic, partitionId) <- partitions){
@ -77,6 +80,7 @@ case class StopReplicaRequest(versionId: Short,
2 + /* versionId */
ApiUtils.shortStringLength(clientId) +
4 + /* ackTimeoutMs */
4 + /* controller epoch */
1 + /* deletePartitions */
4 /* partition count */
for ((topic, partitionId) <- partitions){

View File

@ -19,13 +19,15 @@ package kafka.api
import java.nio.ByteBuffer
import collection.mutable.HashMap
import collection.Map
import collection.immutable.Map
import kafka.common.ErrorMapping
import kafka.api.ApiUtils._
object StopReplicaResponse {
def readFrom(buffer: ByteBuffer): StopReplicaResponse = {
val versionId = buffer.getShort
val errorCode = buffer.getShort
val numEntries = buffer.getInt
val responseMap = new HashMap[(String, Int), Short]()
@ -35,23 +37,31 @@ object StopReplicaResponse {
val partitionErrorCode = buffer.getShort()
responseMap.put((topic, partition), partitionErrorCode)
}
new StopReplicaResponse(versionId, responseMap)
new StopReplicaResponse(versionId, responseMap.toMap, errorCode)
}
}
case class StopReplicaResponse(val versionId: Short,
val responseMap: Map[(String, Int), Short]) extends RequestOrResponse{
val responseMap: Map[(String, Int), Short],
val errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{
def sizeInBytes(): Int ={
var size = 2 + 4
for ((key, value) <- responseMap){
size += (2 + key._1.length) + 4 + 2
var size =
2 /* version id */ +
2 /* error code */ +
4 /* number of responses */
for ((key, value) <- responseMap) {
size +=
2 + key._1.length /* topic */ +
4 /* partition */ +
2 /* error code for this partition */
}
size
}
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
buffer.putShort(errorCode)
buffer.putInt(responseMap.size)
for ((key:(String, Int), value) <- responseMap){
writeShortString(buffer, key._1)

View File

@ -24,6 +24,7 @@ import kafka.server.ReplicaManager
import com.yammer.metrics.core.Gauge
import kafka.metrics.KafkaMetricsGroup
import kafka.common.ErrorMapping
import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController}
/**
@ -44,6 +45,12 @@ class Partition(val topic: String,
private val leaderIsrUpdateLock = new Object
private var zkVersion: Int = LeaderAndIsr.initialZKVersion
private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1
/* Epoch of the controller that last changed the leader. This needs to be initialized correctly upon broker startup.
* One way of doing that is through the controller's start replica state change command. When a new broker starts up
* the controller sends it a start replica command containing the leader for each partition that the broker hosts.
* In addition to the leader, the controller can also send the epoch of the controller that elected the leader for
* each partition. */
private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
this.logIdent = "Partition [%s, %d] on broker %d: ".format(topic, partitionId, localBrokerId)
private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
@ -117,14 +124,18 @@ class Partition(val topic: String,
* 3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time)
* 4. set the new leader and ISR
*/
def makeLeader(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr): Boolean = {
def makeLeader(topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Boolean = {
leaderIsrUpdateLock synchronized {
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
if (leaderEpoch >= leaderAndIsr.leaderEpoch){
info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become leader request"
.format(leaderEpoch, leaderAndIsr.leaderEpoch))
return false
}
trace("Started to become leader at the request %s".format(leaderAndIsr.toString()))
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
// stop replica fetcher thread, if any
replicaFetcherManager.removeFetcher(topic, partitionId)
@ -148,14 +159,19 @@ class Partition(val topic: String,
* 3. set the leader and set ISR to empty
* 4. start a fetcher to the new leader
*/
def makeFollower(topic: String, partitionId: Int, leaderAndIsr: LeaderAndIsr, liveBrokers: Set[Broker]): Boolean = {
def makeFollower(topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
liveBrokers: Set[Broker]): Boolean = {
leaderIsrUpdateLock synchronized {
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
if (leaderEpoch >= leaderAndIsr.leaderEpoch){
info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become follwer request"
info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become follower request"
.format(leaderEpoch, leaderAndIsr.leaderEpoch))
return false
}
trace("Started to become follower at the request %s".format(leaderAndIsr.toString()))
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
val newLeaderBrokerId: Int = leaderAndIsr.leader
info("Starting the follower state transition to follow leader %d for topic %s partition %d"
.format(newLeaderBrokerId, topic, partitionId))
@ -290,8 +306,10 @@ class Partition(val topic: String,
private def updateIsr(newIsr: Set[Replica]) {
info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newIsr.mkString(", ")))
val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
// use the epoch of the controller that made the leadership decision, instead of the current controller epoch
val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), newLeaderAndIsr.toString(), zkVersion)
ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
if (updateSucceeded){
inSyncReplicas = newIsr
zkVersion = newVersion

View File

@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.common
class ControllerMovedException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
def this(message: String) = this(message, null)
def this() = this(null, null)
}

View File

@ -40,6 +40,7 @@ object ErrorMapping {
val BrokerNotAvailableCode: Short = 8
val ReplicaNotAvailableCode: Short = 9
val MessageSizeTooLargeCode: Short = 10
val StaleControllerEpochCode: Short = 11
private val exceptionToCode =
Map[Class[Throwable], Short](
@ -52,7 +53,8 @@ object ErrorMapping {
classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode,
classOf[BrokerNotAvailableException].asInstanceOf[Class[Throwable]] -> BrokerNotAvailableCode,
classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode,
classOf[MessageSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSizeTooLargeCode
classOf[MessageSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSizeTooLargeCode,
classOf[ControllerMovedException].asInstanceOf[Class[Throwable]] -> StaleControllerEpochCode
).withDefaultValue(UnknownCode)
/* invert the mapping */

View File

@ -159,12 +159,13 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
stopAndDeleteReplicaRequestMap.clear()
}
def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, leaderAndIsr: LeaderAndIsr, replicationFactor: Int) {
def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, replicationFactor: Int) {
brokerIds.foreach { brokerId =>
leaderAndIsrRequestMap.getOrElseUpdate(brokerId,
new mutable.HashMap[(String, Int), PartitionStateInfo])
leaderAndIsrRequestMap(brokerId).put((topic, partition),
PartitionStateInfo(leaderAndIsr, replicationFactor))
PartitionStateInfo(leaderIsrAndControllerEpoch, replicationFactor))
}
}
@ -183,13 +184,13 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
}
}
def sendRequestsToBrokers(liveBrokers: Set[Broker]) {
def sendRequestsToBrokers(controllerEpoch: Int, liveBrokers: Set[Broker]) {
leaderAndIsrRequestMap.foreach { m =>
val broker = m._1
val partitionStateInfos = m._2
val leaderIds = partitionStateInfos.map(_._2.leaderAndIsr.leader).toSet
val partitionStateInfos = m._2.toMap
val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
val leaders = liveBrokers.filter(b => leaderIds.contains(b.id))
val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders)
val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerEpoch)
debug("The leaderAndIsr request sent to broker %d is %s".format(broker, leaderAndIsrRequest))
sendRequest(broker, leaderAndIsrRequest, null)
}
@ -201,7 +202,8 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
if (replicas.size > 0) {
debug("The stop replica request (delete = %s) sent to broker %d is %s"
.format(deletePartitions, broker, replicas.mkString(",")))
sendRequest(broker, new StopReplicaRequest(deletePartitions, Set.empty[(String, Int)] ++ replicas), null)
sendRequest(broker, new StopReplicaRequest(deletePartitions,
Set.empty[(String, Int)] ++ replicas, controllerEpoch), null)
}
}
m.clear()

View File

@ -24,23 +24,27 @@ import java.util.concurrent.TimeUnit
import kafka.admin.PreferredReplicaLeaderElectionCommand
import kafka.api._
import kafka.cluster.Broker
import kafka.common.{BrokerNotAvailableException, TopicAndPartition, KafkaException}
import kafka.common._
import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
import kafka.server.{ZookeeperLeaderElector, KafkaConfig}
import kafka.utils.ZkUtils._
import kafka.utils.{Utils, ZkUtils, Logging}
import org.apache.zookeeper.Watcher.Event.KeeperState
import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
import org.I0Itec.zkclient.exception.ZkNoNodeException
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
import scala.Some
import kafka.common.TopicAndPartition
class ControllerContext(val zkClient: ZkClient,
var controllerChannelManager: ControllerChannelManager = null,
val controllerLock: Object = new Object,
var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty,
val brokerShutdownLock: Object = new Object,
var epoch: Int = KafkaController.InitialControllerEpoch - 1,
var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1,
var allTopics: Set[String] = Set.empty,
var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty,
var allLeaders: mutable.Map[TopicAndPartition, LeaderAndIsr] = mutable.Map.empty,
var allLeaders: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty,
var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] =
new mutable.HashMap,
var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] =
@ -68,6 +72,8 @@ trait KafkaControllerMBean {
object KafkaController {
val MBeanName = "kafka.controller:type=KafkaController,name=ControllerOps"
val InitialControllerEpoch = 1
val InitialControllerEpochZkVersion = 1
}
class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean {
@ -82,6 +88,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
private val brokerRequestBatch = new ControllerBrokerRequestBatch(sendRequest)
registerControllerChangedListener()
newGauge(
"ActiveControllerCount",
@ -90,6 +97,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
}
)
def epoch = controllerContext.epoch
/**
* JMX operation to initiate clean shutdown of a broker. On clean shutdown,
* the controller first determines the partitions that the shutting down
@ -127,8 +136,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
def replicatedPartitionsBrokerLeads() = controllerContext.controllerLock.synchronized {
trace("All leaders = " + controllerContext.allLeaders.mkString(","))
controllerContext.allLeaders.filter {
case (topicAndPartition, leader) =>
leader.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
case (topicAndPartition, leaderIsrAndControllerEpoch) =>
leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
}.map(_._1)
}
@ -139,18 +148,18 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
val (topic, partition) = topicAndPartition.asTuple
// move leadership serially to relinquish lock.
controllerContext.controllerLock synchronized {
controllerContext.allLeaders.get(topicAndPartition).foreach{ currLeaderAndIsr =>
if (currLeaderAndIsr.leader == id) {
controllerContext.allLeaders.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch =>
if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
controlledShutdownPartitionLeaderSelector)
val newLeaderAndIsr = controllerContext.allLeaders(topicAndPartition)
val newLeaderIsrAndControllerEpoch = controllerContext.allLeaders(topicAndPartition)
// mark replica offline only if leadership was moved successfully
if (newLeaderAndIsr.leader != currLeaderAndIsr.leader)
if (newLeaderIsrAndControllerEpoch.leaderAndIsr.leader != currLeaderIsrAndControllerEpoch.leaderAndIsr.leader)
replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topic, partition, id)), OfflineReplica)
} else
debug("Partition %s moved from leader %d to new leader %d during shutdown."
.format(topicAndPartition, id, currLeaderAndIsr.leader))
.format(topicAndPartition, id, currLeaderIsrAndControllerEpoch.leaderAndIsr.leader))
}
}
}
@ -165,18 +174,19 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
allPartitionsAndReplicationFactorOnBroker foreach {
case(topicAndPartition, replicationFactor) =>
val (topic, partition) = topicAndPartition.asTuple
if (controllerContext.allLeaders(topicAndPartition).leader != id) {
if (controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader != id) {
brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topic, partition, deletePartition = false)
removeReplicaFromIsr(topic, partition, id) match {
case Some(updatedLeaderAndIsr) =>
case Some(updatedLeaderIsrAndControllerEpoch) =>
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(
Seq(updatedLeaderAndIsr.leader), topic, partition, updatedLeaderAndIsr, replicationFactor)
Seq(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), topic, partition,
updatedLeaderIsrAndControllerEpoch, replicationFactor)
case None =>
// ignore
}
}
}
brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers)
brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.liveBrokers)
val partitionsRemaining = replicatedPartitionsBrokerLeads().toSet
debug("Remaining partitions to move on broker %d: %s".format(id, partitionsRemaining.mkString(",")))
@ -187,15 +197,21 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
/**
* This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller.
* It does the following things on the become-controller state change -
* 1. Initializes the controller's context object that holds cache objects for current topics, live brokers and
* 1. Register controller epoch changed listener
* 2. Increments the controller epoch
* 3. Initializes the controller's context object that holds cache objects for current topics, live brokers and
* leaders for all existing partitions.
* 2. Starts the controller's channel manager
* 3. Starts the replica state machine
* 4. Starts the partition state machine
* 4. Starts the controller's channel manager
* 5. Starts the replica state machine
* 6. Starts the partition state machine
* If it encounters any unexpected exception/error while becoming controller, it resigns as the current controller.
* This ensures another controller election will be triggered and there will always be an actively serving controller
*/
def onControllerFailover() {
if(isRunning) {
info("Broker %d starting become controller state transition".format(config.brokerId))
// increment the controller epoch
incrementControllerEpoch(zkClient)
// before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
registerReassignedPartitionsListener()
registerPreferredReplicaElectionListener()
@ -205,7 +221,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
partitionStateMachine.startup()
replicaStateMachine.startup()
Utils.registerMBean(this, KafkaController.MBeanName)
info("Broker %d is ready to serve as the new controller".format(config.brokerId))
info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
}
else
info("Controller has been shut down, aborting startup/failover")
@ -268,7 +284,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
val deadBrokersSet = deadBrokers.toSet
// trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers
val partitionsWithoutLeader = controllerContext.allLeaders.filter(partitionAndLeader =>
deadBrokersSet.contains(partitionAndLeader._2.leader)).keySet
deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader)).keySet
partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)
// trigger OnlinePartition state changes for offline or new partitions
partitionStateMachine.triggerOnlinePartitionStateChange()
@ -389,6 +405,37 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
controllerContext.controllerChannelManager.sendRequest(brokerId, request, callback)
}
def incrementControllerEpoch(zkClient: ZkClient) = {
try {
var newControllerEpoch = controllerContext.epoch + 1
val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPathIfExists(zkClient,
ZkUtils.ControllerEpochPath, newControllerEpoch.toString, controllerContext.epochZkVersion)
if(!updateSucceeded)
throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
else {
controllerContext.epochZkVersion = newVersion
controllerContext.epoch = newControllerEpoch
}
} catch {
case nne: ZkNoNodeException =>
// if path doesn't exist, this is the first controller whose epoch should be 1
// the following call can still fail if another controller gets elected between checking if the path exists and
// trying to create the controller epoch path
try {
zkClient.createPersistent(ZkUtils.ControllerEpochPath, KafkaController.InitialControllerEpoch.toString)
controllerContext.epoch = KafkaController.InitialControllerEpoch
controllerContext.epochZkVersion = KafkaController.InitialControllerEpochZkVersion
} catch {
case e: ZkNodeExistsException => throw new ControllerMovedException("Controller moved to another broker. " +
"Aborting controller startup procedure")
case oe => error("Error while incrementing controller epoch", oe)
}
case oe => error("Error while incrementing controller epoch", oe)
}
info("Controller %d incremented epoch to %d".format(config.brokerId, controllerContext.epoch))
}
private def registerSessionExpirationListener() = {
zkClient.subscribeStateChanges(new SessionExpirationListener())
}
@ -397,7 +444,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
controllerContext.liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet
controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, controllerContext.allTopics.toSeq)
controllerContext.allLeaders = new mutable.HashMap[TopicAndPartition, LeaderAndIsr]
controllerContext.allLeaders = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
// update the leader and isr cache for all existing partitions from Zookeeper
updateLeaderAndIsrCache()
// start the channel manager
@ -429,7 +476,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
val partitionsUndergoingPreferredReplicaElection = ZkUtils.getPartitionsUndergoingPreferredReplicaElection(zkClient)
// check if they are already completed
val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter(partition =>
controllerContext.allLeaders(partition).leader == controllerContext.partitionReplicaAssignment(partition).head)
controllerContext.allLeaders(partition).leaderAndIsr.leader == controllerContext.partitionReplicaAssignment(partition).head)
controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitionsUndergoingPreferredReplicaElection
controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsThatCompletedPreferredReplicaElection
info("Partitions undergoing preferred replica election: %s".format(partitionsUndergoingPreferredReplicaElection.mkString(",")))
@ -445,13 +492,13 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
private def updateLeaderAndIsrCache() {
val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, controllerContext.allTopics.toSeq)
for((topicPartition, leaderAndIsr) <- leaderAndIsrInfo) {
for((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo) {
// If the leader specified in the leaderAndIsr is no longer alive, there is no need to recover it
controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match {
controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader) match {
case true =>
controllerContext.allLeaders.put(topicPartition, leaderAndIsr)
controllerContext.allLeaders.put(topicPartition, leaderIsrAndControllerEpoch)
case false =>
debug("While refreshing controller's leader and isr cache, leader %d for ".format(leaderAndIsr.leader) +
debug("While refreshing controller's leader and isr cache, leader %d for ".format(leaderIsrAndControllerEpoch.leaderAndIsr.leader) +
"partition %s is dead, just ignore it".format(topicPartition))
}
}
@ -469,7 +516,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
private def moveReassignedPartitionLeaderIfRequired(topicAndPartition: TopicAndPartition,
reassignedPartitionContext: ReassignedPartitionsContext) {
val reassignedReplicas = reassignedPartitionContext.newReplicas
val currentLeader = controllerContext.allLeaders(topicAndPartition).leader
val currentLeader = controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader
if(!reassignedPartitionContext.newReplicas.contains(currentLeader)) {
info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
"is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(",")))
@ -542,6 +589,10 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
zkClient.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, new PreferredReplicaElectionListener(this))
}
private def registerControllerChangedListener() {
zkClient.subscribeDataChanges(ZkUtils.ControllerEpochPath, new ControllerEpochListener(this))
}
def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) {
// read the current list of reassigned partitions from zookeeper
val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
@ -570,7 +621,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition]) {
for(partition <- partitionsToBeRemoved) {
// check the status
val currentLeader = controllerContext.allLeaders(partition).leader
val currentLeader = controllerContext.allLeaders(partition).leaderAndIsr.leader
val preferredReplica = controllerContext.partitionReplicaAssignment(partition).head
if(currentLeader == preferredReplica) {
info("Partition %s completed preferred replica leader election. New leader is %d".format(partition, preferredReplica))
@ -598,35 +649,42 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
* @return the new leaderAndIsr (with the replica removed if it was present),
* or None if leaderAndIsr is empty.
*/
def removeReplicaFromIsr(topic: String, partition: Int, replicaId: Int): Option[LeaderAndIsr] = {
def removeReplicaFromIsr(topic: String, partition: Int, replicaId: Int): Option[LeaderIsrAndControllerEpoch] = {
val topicAndPartition = TopicAndPartition(topic, partition)
debug("Removing replica %d from ISR of %s.".format(replicaId, topicAndPartition))
var finalLeaderAndIsr: Option[LeaderAndIsr] = None
var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None
var zkWriteCompleteOrUnnecessary = false
while (!zkWriteCompleteOrUnnecessary) {
// refresh leader and isr from zookeeper again
val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
zkWriteCompleteOrUnnecessary = leaderAndIsrOpt match {
case Some(leaderAndIsr) => // increment the leader epoch even if the ISR changes
val leaderIsrAndEpochOpt = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition)
zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match {
case Some(leaderIsrAndEpoch) => // increment the leader epoch even if the ISR changes
val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr
val controllerEpoch = leaderIsrAndEpoch.controllerEpoch
if(controllerEpoch > epoch)
throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" +
"means the current controller with epoch %d went through a soft failure and another ".format(epoch) +
"controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
if (leaderAndIsr.isr.contains(replicaId)) {
val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1,
leaderAndIsr.isr.filter(b => b != replicaId), leaderAndIsr.zkVersion + 1)
// update the new leadership decision in zookeeper or retry
val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString(), leaderAndIsr.zkVersion)
val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(
zkClient,
ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, epoch),
leaderAndIsr.zkVersion)
newLeaderAndIsr.zkVersion = newVersion
finalLeaderAndIsr = Some(newLeaderAndIsr)
if (updateSucceeded) {
// we've successfully written to ZK, let's refresh our cache
info("New leader and ISR for partition [%s, %d] is %s".format(topic, partition, newLeaderAndIsr.toString()))
controllerContext.allLeaders.put(topicAndPartition, newLeaderAndIsr)
}
finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch))
if (updateSucceeded)
info("New leader and ISR for partition [%s, %d] is %s"
.format(topic, partition, newLeaderAndIsr.toString()))
updateSucceeded
} else {
warn("Cannot remove replica %d from ISR of %s. Leader = %d ; ISR = %s"
.format(replicaId, topicAndPartition, leaderAndIsr.leader, leaderAndIsr.isr))
finalLeaderAndIsr = Some(leaderAndIsr)
.format(replicaId, topicAndPartition, leaderAndIsr.leader, leaderAndIsr.isr))
finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderAndIsr, epoch))
true
}
case None =>
@ -634,7 +692,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
true
}
}
finalLeaderAndIsr
finalLeaderIsrAndControllerEpoch
}
class SessionExpirationListener() extends IZkStateListener with Logging {
@ -859,11 +917,50 @@ class PreferredReplicaElectionListener(controller: KafkaController) extends IZkD
}
}
class ControllerEpochListener(controller: KafkaController) extends IZkDataListener with Logging {
this.logIdent = "[ControllerEpochListener on " + controller.config.brokerId + "]: "
val controllerContext = controller.controllerContext
readControllerEpochFromZookeeper()
/**
* Invoked when a controller updates the epoch value
* @throws Exception On any error.
*/
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
debug("Controller epoch listener fired with new epoch " + data.toString)
controllerContext.controllerLock synchronized {
// read the epoch path to get the zk version
readControllerEpochFromZookeeper()
}
}
/**
* @throws Exception
* On any error.
*/
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
}
private def readControllerEpochFromZookeeper() {
// initialize the controller epoch and zk version by reading from zookeeper
if(ZkUtils.pathExists(controllerContext.zkClient, ZkUtils.ControllerEpochPath)) {
val epochData = ZkUtils.readData(controllerContext.zkClient, ZkUtils.ControllerEpochPath)
controllerContext.epoch = epochData._1.toInt
controllerContext.epochZkVersion = epochData._2.getVersion
info("Initialized controller epoch to %d and zk version %d".format(controllerContext.epoch, controllerContext.epochZkVersion))
}
}
}
case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty,
var isrChangeListener: ReassignedPartitionsIsrChangeListener = null)
case class PartitionAndReplica(topic: String, partition: Int, replica: Int)
case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int)
object ControllerStat extends KafkaMetricsGroup {
val offlinePartitionRate = newMeter("OfflinePartitionsPerSec", "partitions", TimeUnit.SECONDS)
val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS)

View File

@ -128,7 +128,7 @@ with Logging {
val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
val preferredReplica = assignedReplicas.head
// check if preferred replica is the current leader
val currentLeader = controllerContext.allLeaders(topicAndPartition).leader
val currentLeader = controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader
if(currentLeader == preferredReplica) {
throw new StateChangeFailedException("Preferred replica %d is already the current leader for partition [%s,%d]"
.format(preferredReplica, topic, partition))

View File

@ -85,7 +85,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, offlinePartitionSelector)
}
brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers)
brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.liveBrokers)
} catch {
case e => error("Error while moving some partitions to the online state", e)
}
@ -104,8 +104,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
partitions.foreach { topicAndPartition =>
handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector)
}
brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers)
} catch {
brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.liveBrokers)
}catch {
case e => error("Error while moving some partitions to %s state".format(targetState), e)
}
}
@ -144,7 +144,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
case _ => // should never come here since illegal previous states are checked above
}
info("Partition [%s, %d] state changed from %s to OnlinePartition with leader %d".format(topic, partition,
partitionState(topicAndPartition), controllerContext.allLeaders(topicAndPartition).leader))
partitionState(topicAndPartition), controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader))
partitionState.put(topicAndPartition, OnlinePartition)
// post: partition has a leader
case OfflinePartition =>
@ -231,22 +231,28 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
debug("Live assigned replicas for partition %s are: [%s]".format(topicAndPartition, liveAssignedReplicas))
// make the first replica in the list of assigned replicas, the leader
val leader = liveAssignedReplicas.head
val leaderAndIsr = new LeaderAndIsr(leader, liveAssignedReplicas.toList)
val leaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader, liveAssignedReplicas.toList),
controller.epoch)
try {
ZkUtils.createPersistentPath(controllerContext.zkClient,
ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition), leaderAndIsr.toString)
ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
ZkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch))
// NOTE: the above write can fail only if the current controller lost its zk session and the new controller
// took over and initialized this partition. This can happen if the current controller went into a long
// GC pause
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic,
topicAndPartition.partition, leaderAndIsr, replicaAssignment.size)
controllerContext.allLeaders.put(topicAndPartition, leaderAndIsr)
topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment.size)
controllerContext.allLeaders.put(topicAndPartition, leaderIsrAndControllerEpoch)
partitionState.put(topicAndPartition, OnlinePartition)
} catch {
case e: ZkNodeExistsException =>
// read the controller epoch
val leaderIsrAndEpoch = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic,
topicAndPartition.partition).get
ControllerStat.offlinePartitionRate.mark()
throw new StateChangeFailedException("Error while changing partition %s's state from New to Online"
.format(topicAndPartition) + " since Leader and ISR path already exists")
.format(topicAndPartition) + " since Leader and isr path already exists with value " +
"%s and controller epoch %d".format(leaderIsrAndEpoch.leaderAndIsr.toString(), leaderIsrAndEpoch.controllerEpoch))
}
}
}
@ -266,22 +272,30 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
var newLeaderAndIsr: LeaderAndIsr = null
var replicasForThisPartition: Seq[Int] = Seq.empty[Int]
while(!zookeeperPathUpdateSucceeded) {
val currentLeaderAndIsr = getLeaderAndIsrOrThrowException(topic, partition)
val currentLeaderIsrAndEpoch = getLeaderIsrAndEpochOrThrowException(topic, partition)
val currentLeaderAndIsr = currentLeaderIsrAndEpoch.leaderAndIsr
val controllerEpoch = currentLeaderIsrAndEpoch.controllerEpoch
if(controllerEpoch > controller.epoch)
throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" +
"means the current controller with epoch %d went through a soft failure and another ".format(controller.epoch) +
"controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
// elect new leader or throw exception
val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topic, partition, currentLeaderAndIsr)
val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), leaderAndIsr.toString, currentLeaderAndIsr.zkVersion)
ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
ZkUtils.leaderAndIsrZkData(leaderAndIsr, controller.epoch), currentLeaderAndIsr.zkVersion)
newLeaderAndIsr = leaderAndIsr
newLeaderAndIsr.zkVersion = newVersion
zookeeperPathUpdateSucceeded = updateSucceeded
replicasForThisPartition = replicas
}
val newLeaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch)
// update the leader cache
controllerContext.allLeaders.put(TopicAndPartition(topic, partition), newLeaderAndIsr)
controllerContext.allLeaders.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch)
info("Elected leader %d for Offline partition [%s, %d]".format(newLeaderAndIsr.leader, topic, partition))
// notify all replicas of the new leader
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition, newLeaderAndIsr,
controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size)
// store new leader and isr info in cache
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,
newLeaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size)
} catch {
case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas for partition [%s, %d] are dead."
.format(topic, partition) + " Marking this partition offline", poe)
@ -299,9 +313,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
zkClient.subscribeChildChanges(ZkUtils.getTopicPath(topic), new PartitionChangeListener(topic))
}
private def getLeaderAndIsrOrThrowException(topic: String, partition: Int): LeaderAndIsr = {
ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) match {
case Some(currentLeaderAndIsr) => currentLeaderAndIsr
private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = {
ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) match {
case Some(currentLeaderIsrAndEpoch) => currentLeaderIsrAndEpoch
case None =>
throw new StateChangeFailedException("Leader and ISR information doesn't exist for partition " +
"[%s, %d] in %s state".format(topic, partition, partitionState(TopicAndPartition(topic, partition))))

View File

@ -83,7 +83,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
try {
brokerRequestBatch.newBatch()
replicas.foreach(r => handleStateChange(r.topic, r.partition, r.replica, targetState))
brokerRequestBatch.sendRequestsToBrokers(controllerContext.liveBrokers)
brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.liveBrokers)
}catch {
case e => error("Error while moving some replicas to %s state".format(targetState), e)
}
@ -106,14 +106,14 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
case NewReplica =>
assertValidPreviousStates(topic, partition, replicaId, List(NonExistentReplica), targetState)
// start replica as a follower to the current leader for its partition
val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
leaderAndIsrOpt match {
case Some(leaderAndIsr) =>
if(leaderAndIsr.leader == replicaId)
val leaderIsrAndControllerEpochOpt = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition)
leaderIsrAndControllerEpochOpt match {
case Some(leaderIsrAndControllerEpoch) =>
if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)
throw new StateChangeFailedException("Replica %d for partition [%s, %d] cannot be moved to NewReplica"
.format(replicaId, topic, partition) + "state as it is being requested to become leader")
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
topic, partition, leaderAndIsr, replicaAssignment.size)
topic, partition, leaderIsrAndControllerEpoch, replicaAssignment.size)
case None => // new leader request will be sent to this replica when one gets elected
}
replicaState.put((topic, partition, replicaId), NewReplica)
@ -137,13 +137,14 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition))
case _ =>
// if the leader for this replica exists and is alive, send the leader and ISR
controllerContext.allLeaders.get(topicAndPartition) match {
case Some(leaderAndIsr) =>
controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match {
// check if the leader for this partition is alive or even exists
controllerContext.allLeaders.get(topicAndPartition) match {
case Some(leaderIsrAndControllerEpoch) =>
controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader) match {
case true => // leader is alive
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
topic, partition, leaderAndIsr, replicaAssignment.size)
topic, partition, leaderIsrAndControllerEpoch,
replicaAssignment.size)
replicaState.put((topic, partition, replicaId), OnlineReplica)
info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition))
case false => // ignore partitions whose leader is not alive
@ -155,14 +156,15 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
case OfflineReplica =>
assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica), targetState)
// As an optimization, the controller removes dead replicas from the ISR
val leaderAndIsrIsEmpty = controllerContext.allLeaders.get(topicAndPartition) match {
case Some(currLeaderAndIsr) =>
if (currLeaderAndIsr.isr.contains(replicaId))
val leaderAndIsrIsEmpty: Boolean = controllerContext.allLeaders.get(topicAndPartition) match {
case Some(currLeaderIsrAndControllerEpoch) =>
if (currLeaderIsrAndControllerEpoch.leaderAndIsr.isr.contains(replicaId))
controller.removeReplicaFromIsr(topic, partition, replicaId) match {
case Some(updatedLeaderAndIsr) =>
case Some(updatedLeaderIsrAndControllerEpoch) =>
// send the shrunk ISR state change request only to the leader
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderAndIsr.leader),
topic, partition, updatedLeaderAndIsr, replicaAssignment.size)
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader),
topic, partition, updatedLeaderIsrAndControllerEpoch,
replicaAssignment.size)
replicaState.put((topic, partition, replicaId), OfflineReplica)
info("Replica %d for partition [%s, %d] state changed to OfflineReplica".format(replicaId, topic, partition))
info("Removed offline replica %d from ISR for partition [%s, %d]".format(replicaId, topic, partition))

View File

@ -24,7 +24,6 @@ import kafka.network._
import kafka.utils.{Pool, SystemTime, Logging}
import org.apache.log4j.Logger
import scala.collection._
import mutable.HashMap
import kafka.network.RequestChannel.Response
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic._
@ -127,8 +126,8 @@ class KafkaApis(val requestChannel: RequestChannel,
requestLogger.trace("Handling leader and ISR request " + leaderAndIsrRequest)
trace("Handling leader and ISR request " + leaderAndIsrRequest)
try {
val responseMap = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest)
val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.versionId, responseMap)
val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest)
val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.versionId, response, error)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse)))
} catch {
case e: KafkaStorageException =>
@ -144,13 +143,8 @@ class KafkaApis(val requestChannel: RequestChannel,
requestLogger.trace("Handling stop replica request " + stopReplicaRequest)
trace("Handling stop replica request " + stopReplicaRequest)
val responseMap = new HashMap[(String, Int), Short]
for((topic, partitionId) <- stopReplicaRequest.partitions) {
val errorCode = replicaManager.stopReplica(topic, partitionId, stopReplicaRequest.deletePartitions)
responseMap.put((topic, partitionId), errorCode)
}
val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, responseMap)
val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)
val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, response.toMap, error)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse)))
}

View File

@ -18,6 +18,7 @@ package kafka.server
import kafka.cluster.{Broker, Partition, Replica}
import collection._
import mutable.HashMap
import org.I0Itec.zkclient.ZkClient
import java.util.concurrent.atomic.AtomicBoolean
import kafka.utils._
@ -26,7 +27,8 @@ import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
import java.util.concurrent.TimeUnit
import kafka.common._
import kafka.api.{PartitionStateInfo, LeaderAndIsrRequest}
import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest}
import kafka.controller.KafkaController
object ReplicaManager {
@ -38,6 +40,8 @@ class ReplicaManager(val config: KafkaConfig,
val zkClient: ZkClient,
kafkaScheduler: KafkaScheduler,
val logManager: LogManager) extends Logging with KafkaMetricsGroup {
/* epoch of the controller that last changed the leader */
@volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
private val allPartitions = new Pool[(String, Int), Partition]
private var leaderPartitions = new mutable.HashSet[Partition]()
private val leaderPartitionsLock = new Object
@ -110,6 +114,23 @@ class ReplicaManager(val config: KafkaConfig,
errorCode
}
def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[(String, Int), Short], Short) = {
val responseMap = new collection.mutable.HashMap[(String, Int), Short]
if(stopReplicaRequest.controllerEpoch < controllerEpoch) {
error("Received stop replica request from an old controller epoch %d.".format(stopReplicaRequest.controllerEpoch) +
" Latest known controller epoch is %d " + controllerEpoch)
(responseMap, ErrorMapping.StaleControllerEpochCode)
} else {
controllerEpoch = stopReplicaRequest.controllerEpoch
val responseMap = new HashMap[(String, Int), Short]
for((topic, partitionId) <- stopReplicaRequest.partitions){
val errorCode = stopReplica(topic, partitionId, stopReplicaRequest.deletePartitions)
responseMap.put((topic, partitionId), errorCode)
}
(responseMap, ErrorMapping.NoError)
}
}
def getOrCreatePartition(topic: String, partitionId: Int, replicationFactor: Int): Partition = {
var partition = allPartitions.get((topic, partitionId))
if (partition == null) {
@ -158,49 +179,42 @@ class ReplicaManager(val config: KafkaConfig,
}
}
def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): collection.Map[(String, Int), Short] = {
def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): (collection.Map[(String, Int), Short], Short) = {
info("Handling leader and isr request %s".format(leaderAndISRRequest))
val responseMap = new collection.mutable.HashMap[(String, Int), Short]
if(leaderAndISRRequest.controllerEpoch < controllerEpoch) {
error("Received leader and isr request from an old controller epoch %d.".format(leaderAndISRRequest.controllerEpoch) +
" Latest known controller epoch is %d " + controllerEpoch)
(responseMap, ErrorMapping.StaleControllerEpochCode)
}else {
controllerEpoch = leaderAndISRRequest.controllerEpoch
for((topicAndPartition, partitionStateInfo) <- leaderAndISRRequest.partitionStateInfos) {
var errorCode = ErrorMapping.NoError
val topic = topicAndPartition._1
val partitionId = topicAndPartition._2
for((topicAndPartition, partitionStateInfo) <- leaderAndISRRequest.partitionStateInfos){
var errorCode = ErrorMapping.NoError
val topic = topicAndPartition._1
val partitionId = topicAndPartition._2
val requestedLeaderId = partitionStateInfo.leaderAndIsr.leader
try {
if(requestedLeaderId == config.brokerId)
makeLeader(topic, partitionId, partitionStateInfo)
else
makeFollower(topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders)
} catch {
case e =>
error("Error processing leaderAndISR request %s".format(leaderAndISRRequest), e)
errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
val requestedLeaderId = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
try {
if(requestedLeaderId == config.brokerId)
makeLeader(topic, partitionId, partitionStateInfo)
else
makeFollower(topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders)
} catch {
case e =>
error("Error processing leaderAndISR request %s".format(leaderAndISRRequest), e)
errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
}
responseMap.put(topicAndPartition, errorCode)
}
responseMap.put(topicAndPartition, errorCode)
(responseMap, ErrorMapping.NoError)
}
/**
* If IsInit flag is on, this means that the controller wants to treat topics not in the request
* as deleted.
* TODO: Handle this properly as part of KAFKA-330
*/
// if(leaderAndISRRequest.isInit == LeaderAndIsrRequest.IsInit){
// startHighWaterMarksCheckPointThread
// val partitionsToRemove = allPartitions.filter(p => !leaderAndISRRequest.partitionStateInfos.contains(p._1)).map(entry => entry._1)
// info("Init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove))
// partitionsToRemove.foreach(p => stopReplica(p._1, p._2))
// }
responseMap
}
private def makeLeader(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo) = {
val leaderAndIsr = partitionStateInfo.leaderAndIsr
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId))
val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
if (partition.makeLeader(topic, partitionId, leaderAndIsr)) {
if (partition.makeLeader(topic, partitionId, leaderIsrAndControllerEpoch)) {
// also add this partition to the list of partitions for which the leader is the current broker
leaderPartitionsLock synchronized {
leaderPartitions += partition
@ -209,14 +223,15 @@ class ReplicaManager(val config: KafkaConfig,
info("Completed the leader state transition for topic %s partition %d".format(topic, partitionId))
}
private def makeFollower(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo, liveBrokers: Set[Broker]) {
val leaderAndIsr = partitionStateInfo.leaderAndIsr
val leaderBrokerId: Int = leaderAndIsr.leader
private def makeFollower(topic: String, partitionId: Int, partitionStateInfo: PartitionStateInfo,
liveBrokers: Set[Broker]) {
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
val leaderBrokerId: Int = leaderIsrAndControllerEpoch.leaderAndIsr.leader
info("Starting the follower state transition to follow leader %d for topic %s partition %d"
.format(leaderBrokerId, topic, partitionId))
val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
if (partition.makeFollower(topic, partitionId, leaderAndIsr, liveBrokers)) {
if (partition.makeFollower(topic, partitionId, leaderIsrAndControllerEpoch, liveBrokers)) {
// remove this replica's partition from the ISR expiration queue
leaderPartitionsLock synchronized {
leaderPartitions -= partition
@ -233,7 +248,7 @@ class ReplicaManager(val config: KafkaConfig,
def recordFollowerPosition(topic: String, partitionId: Int, replicaId: Int, offset: Long) = {
val partitionOpt = getPartition(topic, partitionId)
if(partitionOpt.isDefined){
if(partitionOpt.isDefined) {
partitionOpt.get.updateLeaderHWAndMaybeExpandIsr(replicaId, offset)
} else {
warn("While recording the follower position, the partition [%s, %d] hasn't been created, skip updating leader HW".format(topic, partitionId))

View File

@ -44,8 +44,6 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
}
}
def amILeader : Boolean = leaderId == brokerId
def elect: Boolean = {
controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
try {
@ -56,10 +54,14 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
} catch {
case e: ZkNodeExistsException =>
// If someone else has written the path, then
debug("Someone else was elected as leader other than " + brokerId)
val data: String = controllerContext.zkClient.readData(electionPath, true)
if (data != null) leaderId = data.toInt
case e2 => throw e2
debug("Broker %d was elected as leader instead of broker %d".format(data.toInt, brokerId))
if (data != null) {
leaderId = data.toInt
}
case e2 =>
error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
resign()
}
amILeader
}
@ -68,6 +70,12 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
leaderId = -1
}
def amILeader : Boolean = leaderId == brokerId
def resign() = {
deletePath(controllerContext.zkClient, electionPath)
}
/**
* We do not have session expiration listen in the ZkElection, but assuming the caller who uses this module will
* have its own session expiration listener and handler
@ -79,6 +87,10 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
*/
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
controllerContext.controllerLock synchronized {
leaderId = data.toString.toInt
info("New leader is %d".format(leaderId))
}
}
/**

View File

@ -24,17 +24,19 @@ import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException,
import org.I0Itec.zkclient.serialize.ZkSerializer
import scala.collection._
import kafka.api.LeaderAndIsr
import mutable.HashMap
import org.apache.zookeeper.data.Stat
import java.util.concurrent.locks.{ReentrantLock, Condition}
import kafka.controller.{PartitionAndReplica, ReassignedPartitionsContext}
import kafka.admin._
import kafka.common.{TopicAndPartition, KafkaException, NoEpochForPartitionException}
import kafka.controller.{LeaderIsrAndControllerEpoch, PartitionAndReplica, ReassignedPartitionsContext}
object ZkUtils extends Logging {
val ConsumersPath = "/consumers"
val BrokerIdsPath = "/brokers/ids"
val BrokerTopicsPath = "/brokers/topics"
val ControllerPath = "/controller"
val ControllerEpochPath = "/controllerEpoch"
val ReassignPartitionsPath = "/admin/reassign_partitions"
val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
@ -74,7 +76,7 @@ object ZkUtils extends Logging {
brokerIds.map(_.toInt).map(getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
}
def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = {
def getLeaderIsrAndEpochForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderIsrAndControllerEpoch] = {
val leaderAndIsrPath = getTopicPartitionLeaderAndIsrPath(topic, partition)
val leaderAndIsrInfo = readDataMaybeNull(zkClient, leaderAndIsrPath)
val leaderAndIsrOpt = leaderAndIsrInfo._1
@ -85,17 +87,23 @@ object ZkUtils extends Logging {
}
}
def parseLeaderAndIsr(leaderAndIsrStr: String, topic: String, partition: Int, stat: Stat): Option[LeaderAndIsr] = {
def getLeaderAndIsrForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndIsr] = {
getLeaderIsrAndEpochForPartition(zkClient, topic, partition).map(_.leaderAndIsr)
}
def parseLeaderAndIsr(leaderAndIsrStr: String, topic: String, partition: Int, stat: Stat)
: Option[LeaderIsrAndControllerEpoch] = {
Json.parseFull(leaderAndIsrStr) match {
case Some(m) =>
val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt
val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get
val controllerEpoch = m.asInstanceOf[Map[String, String]].get("controllerEpoch").get.toInt
val isr = Utils.parseCsvList(isrString).map(r => r.toInt)
val zkPathVersion = stat.getVersion
debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch,
isr.toString(), zkPathVersion, topic, partition))
Some(LeaderAndIsr(leader, epoch, isr.toList, zkPathVersion))
Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr.toList, zkPathVersion), controllerEpoch))
case None => None
}
}
@ -189,6 +197,15 @@ object ZkUtils extends Logging {
topicDirs.consumerOwnerDir + "/" + partition
}
def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = {
val jsonDataMap = new HashMap[String, String]
jsonDataMap.put("leader", leaderAndIsr.leader.toString)
jsonDataMap.put("leaderEpoch", leaderAndIsr.leaderEpoch.toString)
jsonDataMap.put("ISR", leaderAndIsr.isr.mkString(","))
jsonDataMap.put("controllerEpoch", controllerEpoch.toString)
Utils.stringMapToJson(jsonDataMap)
}
/**
* make sure a persistent path exists in ZK. Create the path if not exist.
*/
@ -313,6 +330,25 @@ object ZkUtils extends Logging {
}
}
/**
* Conditional update the persistent path data, return (true, newVersion) if it succeeds, otherwise (the current
* version is not the expected version, etc.) return (false, -1). If path doesn't exist, throws ZkNoNodeException
*/
def conditionalUpdatePersistentPathIfExists(client: ZkClient, path: String, data: String, expectVersion: Int): (Boolean, Int) = {
try {
val stat = client.writeData(path, data, expectVersion)
info("Conditional update of zkPath %s with value %s and expected version %d succeeded, returning the new version: %d"
.format(path, data, expectVersion, stat.getVersion))
(true, stat.getVersion)
} catch {
case nne: ZkNoNodeException => throw nne
case e: Exception =>
error("Conditional update of zkPath %s with data %s and expected version %d failed".format(path, data,
expectVersion), e)
(false, -1)
}
}
/**
* Update the value of a persistent node with the given path and data.
* create parrent directory if necessary. Never throw NodeExistException.
@ -439,13 +475,13 @@ object ZkUtils extends Logging {
}
def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topics: Seq[String]):
mutable.Map[TopicAndPartition, LeaderAndIsr] = {
val ret = new mutable.HashMap[TopicAndPartition, LeaderAndIsr]
mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
val ret = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
val partitionsForTopics = getPartitionsForTopics(zkClient, topics)
for((topic, partitions) <- partitionsForTopics) {
for(partition <- partitions) {
ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition.toInt) match {
case Some(leaderAndIsr) => ret.put(TopicAndPartition(topic, partition.toInt), leaderAndIsr)
ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition.toInt) match {
case Some(leaderIsrAndControllerEpoch) => ret.put(TopicAndPartition(topic, partition.toInt), leaderIsrAndControllerEpoch)
case None =>
}
}

View File

@ -159,7 +159,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
// create the topic
AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
// create leaders for all partitions
TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
val actualReplicaAssignment = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient).partitionsMetadata.map(p => p.replicas)
val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
@ -189,7 +189,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3))
AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
// create leaders for all partitions
TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
val newTopicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
newTopicMetadata.errorCode match {

View File

@ -25,6 +25,7 @@ import kafka.message.{Message, ByteBufferMessageSet}
import kafka.cluster.Broker
import collection.mutable._
import kafka.common.{TopicAndPartition, ErrorMapping}
import kafka.controller.LeaderIsrAndControllerEpoch
object SerializationTestUtils{
@ -83,11 +84,11 @@ object SerializationTestUtils{
private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq)
def createTestLeaderAndIsrRequest() : LeaderAndIsrRequest = {
val leaderAndIsr1 = new LeaderAndIsr(leader1, 1, isr1, 1)
val leaderAndIsr2 = new LeaderAndIsr(leader2, 1, isr2, 2)
val leaderAndIsr1 = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader1, 1, isr1, 1), 1)
val leaderAndIsr2 = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader2, 1, isr2, 2), 1)
val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, 3)),
((topic2, 0), PartitionStateInfo(leaderAndIsr2, 3)))
new LeaderAndIsrRequest(map, collection.immutable.Set[Broker]())
new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 1)
}
def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = {
@ -97,13 +98,13 @@ object SerializationTestUtils{
}
def createTestStopReplicaRequest() : StopReplicaRequest = {
new StopReplicaRequest(deletePartitions = true, partitions = collection.immutable.Set((topic1, 0), (topic2, 0)))
new StopReplicaRequest(controllerEpoch = 1, deletePartitions = true, partitions = collection.immutable.Set((topic1, 0), (topic2, 0)))
}
def createTestStopReplicaResponse() : StopReplicaResponse = {
val responseMap = Map(((topic1, 0), ErrorMapping.NoError),
((topic2, 0), ErrorMapping.NoError))
new StopReplicaResponse(1, responseMap)
new StopReplicaResponse(1, responseMap.toMap)
}
def createTestProducerRequest: ProducerRequest = {

View File

@ -69,7 +69,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
val leaderForPartitionMap = Map(
0 -> configs.head.brokerId
)
TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap, 1)
val topicMetadata = mockLogManagerAndTestTopic(topic)
assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)

View File

@ -23,6 +23,10 @@ import kafka.admin.CreateTopicCommand
import kafka.utils.TestUtils._
import junit.framework.Assert._
import kafka.utils.{ZkUtils, Utils, TestUtils}
import kafka.controller.{LeaderIsrAndControllerEpoch, ControllerChannelManager}
import kafka.cluster.Broker
import kafka.common.ErrorMapping
import kafka.api._
class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
val brokerId1 = 0
@ -35,6 +39,8 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
var staleControllerEpochDetected = false
override def setUp() {
super.setUp()
// start both servers
@ -95,4 +101,50 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
else
assertEquals("Second epoch value should be %d".format(leaderEpoch2+1) , leaderEpoch2+1, leaderEpoch3)
}
def testLeaderElectionWithStaleControllerEpoch() {
// start 2 brokers
val topic = "new-topic"
val partitionId = 0
// create topic with 1 partition, 2 replicas, one on each broker
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
// wait until leader is elected
val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
val leaderEpoch1 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
debug("leader Epoc: " + leaderEpoch1)
debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
assertTrue("Leader should get elected", leader1.isDefined)
// NOTE: this is to avoid transient test failures
assertTrue("Leader could be broker 0 or broker 1", (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1))
assertEquals("First epoch value should be 0", 0, leaderEpoch1)
// start another controller
val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(2, TestUtils.choosePort()))
val brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, "localhost", s.config.port))
val controllerChannelManager = new ControllerChannelManager(brokers.toSet, controllerConfig)
controllerChannelManager.startup()
val staleControllerEpoch = 0
val leaderAndIsr = new collection.mutable.HashMap[(String, Int), LeaderIsrAndControllerEpoch]
leaderAndIsr.put((topic, partitionId),
new LeaderIsrAndControllerEpoch(new LeaderAndIsr(brokerId2, List(brokerId1, brokerId2)), 2))
val partitionStateInfo = leaderAndIsr.mapValues(l => new PartitionStateInfo(l, 1)).toMap
val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, staleControllerEpoch)
controllerChannelManager.sendRequest(brokerId2, leaderAndIsrRequest, staleControllerEpochCallback)
TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true, 1000)
assertTrue("Stale controller epoch not detected by the broker", staleControllerEpochDetected)
controllerChannelManager.shutdown()
}
private def staleControllerEpochCallback(response: RequestOrResponse): Unit = {
val leaderAndIsrResponse = response.asInstanceOf[LeaderAndIsrResponse]
staleControllerEpochDetected = leaderAndIsrResponse.errorCode match {
case ErrorMapping.StaleControllerEpochCode => true
case _ => false
}
}
}

View File

@ -372,7 +372,9 @@ object TestUtils extends Logging {
new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeoutMs, Map(data:_*))
}
def makeLeaderForPartition(zkClient: ZkClient, topic: String, leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int]) {
def makeLeaderForPartition(zkClient: ZkClient, topic: String,
leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int],
controllerEpoch: Int) {
leaderPerPartitionMap.foreach
{
leaderForPartition => {
@ -390,7 +392,7 @@ object TestUtils extends Logging {
newLeaderAndIsr.zkVersion += 1
}
ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
newLeaderAndIsr.toString)
ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch))
} catch {
case oe => error("Error while electing leader for topic %s partition %d".format(topic, partition), oe)
}

View File

@ -13,7 +13,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
*/
package kafka.zk
@ -35,7 +35,7 @@ class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness {
try {
ZkUtils.createEphemeralPathExpectConflict(zkClient, "/tmp/zktest", "node created")
} catch {
case e: Exception => println("Exception in creating ephemeral node")
case e: Exception =>
}
var testData: String = null