KAFKA-301 Implement broker startup procedure; patched by Neha Narkhede; reviewed by Jun Rao and Jay Kreps

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1329509 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Neha Narkhede 2012-04-23 23:32:59 +00:00
parent 79a3b31f26
commit d73355017a
15 changed files with 655 additions and 46 deletions

0
bin/run-rat.sh Normal file → Executable file
View File

View File

@ -27,6 +27,7 @@ import collection.mutable.HashMap
object AdminUtils extends Logging { object AdminUtils extends Logging {
val rand = new Random val rand = new Random
val AdminEpoch = -1
/** /**
* There are 2 goals of replica assignment: * There are 2 goals of replica assignment:
@ -69,7 +70,6 @@ object AdminUtils extends Logging {
replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, secondReplicaShift, j, brokerList.size)) replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, secondReplicaShift, j, brokerList.size))
ret(i) = replicaList.reverse ret(i) = replicaList.reverse
} }
ret ret
} }
@ -102,14 +102,14 @@ object AdminUtils extends Logging {
for (i <-0 until partitionMetadata.size) { for (i <-0 until partitionMetadata.size) {
val replicas = ZkUtils.readData(zkClient, ZkUtils.getTopicPartitionReplicasPath(topic, partitions(i).toString)) val replicas = ZkUtils.readData(zkClient, ZkUtils.getTopicPartitionReplicasPath(topic, partitions(i).toString))
val inSyncReplicas = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartitionInSyncPath(topic, partitions(i).toString)) val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitions(i))
val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitions(i)) val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitions(i))
debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader) debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
partitionMetadata(i) = new PartitionMetadata(partitions(i), partitionMetadata(i) = new PartitionMetadata(partitions(i),
leader match { case None => None case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l.toInt)).head) }, leader match { case None => None case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l.toInt)).head) },
getBrokerInfoFromCache(zkClient, cachedBrokerInfo, Utils.getCSVList(replicas).map(id => id.toInt)), getBrokerInfoFromCache(zkClient, cachedBrokerInfo, Utils.getCSVList(replicas).map(id => id.toInt)),
getBrokerInfoFromCache(zkClient, cachedBrokerInfo, Utils.getCSVList(inSyncReplicas).map(id => id.toInt)), getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas),
None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */) None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */)
} }
Some(new TopicMetadata(topic, partitionMetadata)) Some(new TopicMetadata(topic, partitionMetadata))
@ -117,7 +117,6 @@ object AdminUtils extends Logging {
None None
} }
} }
metadataList.toList metadataList.toList
} }

View File

@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.common
/**
* Thrown when a get epoch request is made for partition, but no epoch exists for that partition
*/
class NoEpochForPartitionException(message: String) extends RuntimeException(message) {
def this() = this(null)
}

View File

@ -1,3 +1,5 @@
package kafka.common
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
@ -15,8 +17,6 @@
* limitations under the License. * limitations under the License.
*/ */
package kafka.producer.async
/* Indicates the queue for sending messages is full of unsent messages */ /* Indicates the queue for sending messages is full of unsent messages */
class QueueFullException(message: String) extends RuntimeException(message) { class QueueFullException(message: String) extends RuntimeException(message) {
def this() = this(null) def this() = this(null)

View File

@ -18,11 +18,11 @@ package kafka.producer
import async._ import async._
import kafka.utils._ import kafka.utils._
import kafka.common.InvalidConfigException
import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
import kafka.serializer.Encoder import kafka.serializer.Encoder
import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean} import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean}
import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.ZkClient
import kafka.common.{QueueFullException, InvalidConfigException}
class Producer[K,V](config: ProducerConfig, class Producer[K,V](config: ProducerConfig,
private val eventHandler: EventHandler[K,V]) // for testing only private val eventHandler: EventHandler[K,V]) // for testing only
@ -120,6 +120,7 @@ extends Logging {
def close() = { def close() = {
val canShutdown = hasShutdown.compareAndSet(false, true) val canShutdown = hasShutdown.compareAndSet(false, true)
if(canShutdown) { if(canShutdown) {
info("Shutting down producer")
if (producerSendThread != null) if (producerSendThread != null)
producerSendThread.shutdown producerSendThread.shutdown
eventHandler.close eventHandler.close

View File

@ -105,4 +105,7 @@ class KafkaConfig(props: Properties) extends ZKConfig(props) {
* leader election on all replicas minus the preferred replica */ * leader election on all replicas minus the preferred replica */
val preferredReplicaWaitTime = Utils.getLong(props, "preferred.replica.wait.time", 300) val preferredReplicaWaitTime = Utils.getLong(props, "preferred.replica.wait.time", 300)
/* size of the state change request queue in Zookeeper */
val stateChangeQSize = Utils.getInt(props, "state.change.queue.size", 1000)
} }

View File

@ -23,6 +23,8 @@ import java.net.InetAddress
import kafka.common.{InvalidPartitionException, KafkaZookeeperClient} import kafka.common.{InvalidPartitionException, KafkaZookeeperClient}
import kafka.cluster.Replica import kafka.cluster.Replica
import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener, IZkStateListener, ZkClient} import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener, IZkStateListener, ZkClient}
import kafka.admin.AdminUtils
import java.lang.{Thread, IllegalStateException}
/** /**
* Handles the server's interaction with zookeeper. The server needs to register the following paths: * Handles the server's interaction with zookeeper. The server needs to register the following paths:
@ -36,11 +38,10 @@ class KafkaZooKeeper(config: KafkaConfig,
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId
private var zkClient: ZkClient = null private var zkClient: ZkClient = null
var topics: List[String] = Nil private val leaderChangeListener = new LeaderChangeListener
val lock = new Object() private val topicPartitionsChangeListener = new TopicChangeListener
var existingTopics: Set[String] = Set.empty[String] private var stateChangeHandler: StateChangeCommandHandler = null
val leaderChangeListener = new LeaderChangeListener
val topicPartitionsChangeListener = new TopicChangeListener
private val topicListenerLock = new Object private val topicListenerLock = new Object
private val leaderChangeLock = new Object private val leaderChangeLock = new Object
@ -48,6 +49,7 @@ class KafkaZooKeeper(config: KafkaConfig,
/* start client */ /* start client */
info("connecting to ZK: " + config.zkConnect) info("connecting to ZK: " + config.zkConnect)
zkClient = KafkaZookeeperClient.getZookeeperClient(config) zkClient = KafkaZookeeperClient.getZookeeperClient(config)
startStateChangeCommandHandler()
zkClient.subscribeStateChanges(new SessionExpireListener) zkClient.subscribeStateChanges(new SessionExpireListener)
registerBrokerInZk() registerBrokerInZk()
subscribeToTopicAndPartitionsChanges(true) subscribeToTopicAndPartitionsChanges(true)
@ -60,6 +62,13 @@ class KafkaZooKeeper(config: KafkaConfig,
ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, creatorId, config.port) ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, creatorId, config.port)
} }
private def startStateChangeCommandHandler() {
val stateChangeQ = new ZkQueue(zkClient, ZkUtils.getBrokerStateChangePath(config.brokerId), config.stateChangeQSize)
stateChangeHandler = new StateChangeCommandHandler("StateChangeCommandHandler", config, stateChangeQ,
ensureStateChangeCommandValidityOnThisBroker, ensureEpochValidity)
stateChangeHandler.start()
}
/** /**
* When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a * When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a
* connection for us. We need to re-register this broker in the broker registry. * connection for us. We need to re-register this broker in the broker registry.
@ -93,6 +102,7 @@ class KafkaZooKeeper(config: KafkaConfig,
def close() { def close() {
if (zkClient != null) { if (zkClient != null) {
stateChangeHandler.shutdown()
info("Closing zookeeper client...") info("Closing zookeeper client...")
zkClient.close() zkClient.close()
} }
@ -184,7 +194,6 @@ class KafkaZooKeeper(config: KafkaConfig,
case Some(leader) => info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partId, leader)) case Some(leader) => info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partId, leader))
case None => // leader election case None => // leader election
leaderElection(replica) leaderElection(replica)
} }
} }
@ -201,9 +210,12 @@ class KafkaZooKeeper(config: KafkaConfig,
}catch { }catch {
case e => // ignoring case e => // ignoring
} }
if(ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic, replica.partition.partId, replica.brokerId)) { val newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic, replica.partition.partId, replica.brokerId)
info("Broker %d is leader for topic %s partition %d".format(replica.brokerId, replica.topic, replica.partition.partId)) newLeaderEpoch match {
// TODO: Become leader as part of KAFKA-302 case Some(epoch) =>
info("Broker %d is leader for topic %s partition %d".format(replica.brokerId, replica.topic, replica.partition.partId))
// TODO: Become leader as part of KAFKA-302
case None =>
} }
} }
} }
@ -233,6 +245,26 @@ class KafkaZooKeeper(config: KafkaConfig,
} }
} }
private def ensureStateChangeCommandValidityOnThisBroker(stateChangeCommand: StateChangeCommand): Boolean = {
// check if this broker hosts a replica for this topic and partition
ZkUtils.isPartitionOnBroker(zkClient, stateChangeCommand.topic, stateChangeCommand.partition, config.brokerId)
}
private def ensureEpochValidity(stateChangeCommand: StateChangeCommand): Boolean = {
// get the topic and partition that this request is meant for
val topic = stateChangeCommand.topic
val partition = stateChangeCommand.partition
val epoch = stateChangeCommand.epoch
val currentLeaderEpoch = ZkUtils.getEpochForPartition(zkClient, topic, partition)
// check if the request's epoch matches the current leader's epoch OR the admin command's epoch
val validEpoch = (currentLeaderEpoch == epoch) || (epoch == AdminUtils.AdminEpoch)
if(epoch > currentLeaderEpoch)
throw new IllegalStateException(("Illegal epoch state. Request's epoch %d larger than registered epoch %d for " +
"topic %s partition %d").format(epoch, currentLeaderEpoch, topic, partition))
validEpoch
}
class LeaderChangeListener extends IZkDataListener with Logging { class LeaderChangeListener extends IZkDataListener with Logging {
@throws(classOf[Exception]) @throws(classOf[Exception])

View File

@ -0,0 +1,92 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import util.parsing.json.JSON
import java.lang.IllegalStateException
import kafka.utils.{Utils, Logging}
import collection.mutable.HashMap
object StateChangeCommand extends Logging {
val State = "state"
val Topic = "topic"
val Partition = "partition"
val Epoch = "epoch"
val StartReplica = "start-replica"
val CloseReplica = "close-replica"
def getStateChangeRequest(requestJson: String): StateChangeCommand = {
var topMap : Map[String, String] = null
try {
JSON.parseFull(requestJson) match {
case Some(m) =>
topMap = m.asInstanceOf[Map[String, String]]
val topic = topMap.get(StateChangeCommand.Topic).getOrElse(null)
val partition = topMap.get(StateChangeCommand.Partition).getOrElse("-1").toInt
val epoch = topMap.get(StateChangeCommand.Epoch).getOrElse("-1").toInt
val requestOpt = topMap.get(StateChangeCommand.State)
requestOpt match {
case Some(request) =>
request match {
case StartReplica => new StartReplica(topic, partition, epoch)
case CloseReplica => new CloseReplica(topic, partition, epoch)
case _ => throw new IllegalStateException("Unknown state change request " + request)
}
case None =>
throw new IllegalStateException("Illegal state change request JSON " + requestJson)
}
case None => throw new RuntimeException("Error parsing state change request : " + requestJson)
}
} catch {
case e =>
error("Error parsing state change request JSON " + requestJson, e)
throw e
}
}
}
sealed trait StateChangeCommand extends Logging {
def state: String
def topic: String
def partition: Int
def epoch: Int
def toJson(): String = {
val jsonMap = new HashMap[String, String]
jsonMap.put(StateChangeCommand.State, state)
jsonMap.put(StateChangeCommand.Topic, topic)
jsonMap.put(StateChangeCommand.Partition, partition.toString)
jsonMap.put(StateChangeCommand.Epoch, epoch.toString)
Utils.stringMapToJsonString(jsonMap)
}
}
/* The elected leader sends the start replica state change request to all the new replicas that have been assigned
* a partition. Note that the followers must act on this request only if the request epoch == latest partition epoch or -1 */
case class StartReplica(val topic: String, partition: Int, epoch: Int) extends StateChangeCommand {
val state: String = StateChangeCommand.StartReplica
}
/* The elected leader sends the close replica state change request to all the replicas that have been un-assigned a partition
* OR if a topic has been deleted. Note that the followers must act on this request even if the epoch has changed */
case class CloseReplica(topic: String, partition: Int, epoch: Int) extends StateChangeCommand {
val state: String = StateChangeCommand.CloseReplica
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.utils.{ZkQueue, Logging}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.CountDownLatch
class StateChangeCommandHandler(name: String, config: KafkaConfig, stateChangeQ: ZkQueue,
ensureStateChangeCommandValidityOnThisBroker: (StateChangeCommand) => Boolean,
ensureEpochValidity: (StateChangeCommand) => Boolean) extends Thread(name) with Logging {
val isRunning: AtomicBoolean = new AtomicBoolean(true)
private val shutdownLatch = new CountDownLatch(1)
override def run() {
try {
while(isRunning.get()) {
// get outstanding state change requests for this broker
val command = stateChangeQ.take()
val stateChangeCommand = StateChangeCommand.getStateChangeRequest(command._2)
ensureStateChangeCommandValidityOnThisBroker(stateChangeCommand)
stateChangeCommand match {
case StartReplica(topic, partition, epoch) =>
if(ensureEpochValidity(stateChangeCommand))
handleStartReplica(topic, partition)
case CloseReplica(topic, partition, epoch) =>
/**
* close replica requests are sent as part of delete topic or partition reassignment process
* To ensure that a topic will be deleted even if the broker is offline, this state change should not
* be protected with the epoch validity check
*/
handleCloseReplica(topic, partition)
}
stateChangeQ.remove(command)
}
}catch {
case e: InterruptedException => info("State change command handler interrupted. Shutting down")
case e1 => error("Error in state change command handler. Shutting down due to ", e1)
}
shutdownComplete()
}
private def shutdownComplete() = shutdownLatch.countDown
def shutdown() {
isRunning.set(false)
interrupt()
shutdownLatch.await()
info("State change command handler shutdown completed")
}
def handleStartReplica(topic: String, partition: Int) {
info("Received start replica state change command for topic %s partition %d on broker %d"
.format(topic, partition, config.brokerId))
// TODO: implement this as part of create topic support or partition reassignment support. Until then, it is unused
}
def handleCloseReplica(topic: String, partition: Int) {
info("Received close replica state change command for topic %s partition %d on broker %d"
.format(topic, partition, config.brokerId))
// TODO: implement this as part of delete topic support. Until then, it is unused
}
}

View File

@ -700,6 +700,21 @@ object Utils extends Logging {
case _ => // swallow case _ => // swallow
} }
} }
def stringMapToJsonString(jsonDataMap: Map[String, String]): String = {
val builder = new StringBuilder
builder.append("{ ")
var numElements = 0
for ( (key, value) <- jsonDataMap) {
if (numElements > 0)
builder.append(",")
builder.append("\"" + key + "\": ")
builder.append("\"" + value + "\"")
numElements += 1
}
builder.append(" }")
builder.toString
}
} }
class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * 1000L) { class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * 1000L) {

View File

@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import kafka.utils.ZkUtils._
import kafka.common.QueueFullException
import org.I0Itec.zkclient.{IZkChildListener, ZkClient}
import java.util.concurrent.PriorityBlockingQueue
import java.util.Comparator
class ZkQueue(zkClient: ZkClient, path: String, size: Int) {
// create the queue in ZK, if one does not exist
makeSurePersistentPathExists(zkClient, path)
val queueItems = new PriorityBlockingQueue[String](size, new ZkQueueComparator)
var latestQueueItemPriority: Int = -1
zkClient.subscribeChildChanges(path, new ZkQueueListener)
// TODO: This API will be used by the leader to enqueue state change requests to the followers
/**
* Inserts the specified element into this priority queue. This method will never block. If the queue is full,
* it will throw QueueFullException
* @param item Item to add to the zookeeper queue
* @returns The zookeeper location of item in the queue
*/
def put(item: String): String = {
// if queue is full, throw QueueFullException
if(isFull)
throw new QueueFullException("Queue is full. Item %s will be rejected".format(item))
val queueLocation = createSequentialPersistentPath(zkClient, path + "/", item)
debug("Added item %s to queue at location %s".format(item, queueLocation))
queueLocation
}
/**
* Reads all the items and their queue locations in this queue
* @returns A list of (queue_location, item) pairs
*/
def readAll(): Seq[(String, String)] = {
val allItems = getChildren(zkClient, path).sorted
allItems.size match {
case 0 => Seq.empty[(String, String)]
case _ => allItems.map { item =>
// read the data and delete the node
val queueLocation = path + "/" + item
val data = ZkUtils.readData(zkClient, queueLocation)
(item, data)
}
}
}
/**
* Returns true if this zookeeper queue contains no elements.
*/
def isEmpty: Boolean = (readAll().size == 0)
// TODO: Implement the queue shrink operation if the queue is full, as part of create/delete topic
/**
* Returns true if this zookeeper queue contains number of items equal to the size of the queue
*/
def isFull: Boolean = (readAll().size == size)
/**
* Retrieves but does not remove the head of this queue, waiting if necessary until an element becomes available.
* @returns The location of the head and the head element in the zookeeper queue
*/
def take(): (String, String) = {
// take the element key
val item = queueItems.take()
val queueLocation = path + "/" + item
val data = ZkUtils.readData(zkClient, queueLocation)
(item, data)
}
/**
* Removes a single instance of the specified element from this queue, if it is present. More formally, removes an
* element e such that o.equals(e), if this queue contains one or more such elements. Returns true if this queue
* contained the specified element (or equivalently, if this queue changed as a result of the call).
* @param queueItem A tuple where the first element is the location of the item as returned by the take() API and the
* second element is the queue item to be removed
*/
def remove(queueItem: (String, String)): Boolean = {
val queueLocation = path + "/" + queueItem._1
// we do not want to remove items from the queue if they were not read
assert(!queueItems.contains(queueItem._1), "Attempt to remove unconsumed item %s from the queue".format(queueItem))
ZkUtils.deletePath(zkClient, queueLocation)
}
class ZkQueueListener extends IZkChildListener with Logging {
@throws(classOf[Exception])
def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
debug("ZkQueue listener fired for queue %s with children %s and latest queue item priority %d"
.format(path, curChilds.toString, latestQueueItemPriority))
import scala.collection.JavaConversions._
val outstandingRequests = asBuffer(curChilds).sortWith((req1, req2) => req1.toInt < req2.toInt)
outstandingRequests.foreach { req =>
val queueItemPriority = req.toInt
if(queueItemPriority > latestQueueItemPriority) {
latestQueueItemPriority = queueItemPriority
queueItems.add(req)
debug("Added item %s to queue %s".format(req, path))
}
}
}
}
class ZkQueueComparator extends Comparator[String] {
def compare(element1: String, element2: String): Int = {
element1.toInt - element2.toInt
}
}
}

View File

@ -25,11 +25,13 @@ import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException,
import kafka.consumer.TopicCount import kafka.consumer.TopicCount
import org.I0Itec.zkclient.{IZkDataListener, ZkClient} import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
import java.util.concurrent.locks.Condition import java.util.concurrent.locks.Condition
import kafka.common.NoEpochForPartitionException
object ZkUtils extends Logging { object ZkUtils extends Logging {
val ConsumersPath = "/consumers" val ConsumersPath = "/consumers"
val BrokerIdsPath = "/brokers/ids" val BrokerIdsPath = "/brokers/ids"
val BrokerTopicsPath = "/brokers/topics" val BrokerTopicsPath = "/brokers/topics"
val BrokerStatePath = "/brokers/state"
def getTopicPath(topic: String): String ={ def getTopicPath(topic: String): String ={
BrokerTopicsPath + "/" + topic BrokerTopicsPath + "/" + topic
@ -59,6 +61,10 @@ object ZkUtils extends Logging {
getTopicPartitionPath(topic, partitionId) + "/" + "leader" getTopicPartitionPath(topic, partitionId) + "/" + "leader"
} }
def getBrokerStateChangePath(brokerId: Int): String = {
BrokerStatePath + "/" + brokerId
}
def getSortedBrokerList(zkClient: ZkClient): Seq[String] ={ def getSortedBrokerList(zkClient: ZkClient): Seq[String] ={
ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
} }
@ -69,9 +75,37 @@ object ZkUtils extends Logging {
} }
def getLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int): Option[Int] = { def getLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int): Option[Int] = {
val leader = readDataMaybeNull(zkClient, getTopicPartitionLeaderPath(topic, partition.toString)) val leaderAndEpoch = readDataMaybeNull(zkClient, getTopicPartitionLeaderPath(topic, partition.toString))
if(leader == null) None if(leaderAndEpoch == null) None
else Some(leader.toInt) else {
val leaderAndEpochInfo = leaderAndEpoch.split(";")
Some(leaderAndEpochInfo.head.toInt)
}
}
/**
* This API should read the epoch in the ISR path. It is sufficient to read the epoch in the ISR path, since if the
* leader fails after updating epoch in the leader path and before updating epoch in the ISR path, effectively some
* other broker will retry becoming leader with the same new epoch value.
*/
def getEpochForPartition(client: ZkClient, topic: String, partition: Int): Int = {
val lastKnownEpoch = try {
val isrAndEpoch = readData(client, getTopicPartitionInSyncPath(topic, partition.toString))
if(isrAndEpoch != null) {
val isrAndEpochInfo = isrAndEpoch.split(";")
if(isrAndEpochInfo.last.isEmpty)
throw new NoEpochForPartitionException("No epoch in ISR path for topic %s partition %d is empty".format(topic, partition))
else
isrAndEpochInfo.last.toInt
}else {
throw new NoEpochForPartitionException("ISR path for topic %s partition %d is empty".format(topic, partition))
}
}catch {
case e: ZkNoNodeException =>
throw new NoEpochForPartitionException("No epoch since leader never existed for topic %s partition %d".format(topic, partition))
case e1 => throw e1
}
lastKnownEpoch
} }
def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[String] = { def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[String] = {
@ -83,22 +117,60 @@ object ZkUtils extends Logging {
} }
} }
def getInSyncReplicasForPartition(client: ZkClient, topic: String, partition: Int): Seq[Int] = {
val replicaListAndEpochString = readDataMaybeNull(client, getTopicPartitionInSyncPath(topic, partition.toString))
if(replicaListAndEpochString == null)
Seq.empty[Int]
else {
val replicasAndEpochInfo = replicaListAndEpochString.split(";")
Utils.getCSVList(replicasAndEpochInfo.head).map(r => r.toInt)
}
}
def isPartitionOnBroker(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = { def isPartitionOnBroker(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = {
val replicas = getReplicasForPartition(zkClient, topic, partition) val replicas = getReplicasForPartition(zkClient, topic, partition)
debug("The list of replicas for topic %s, partition %d is %s".format(topic, partition, replicas)) debug("The list of replicas for topic %s, partition %d is %s".format(topic, partition, replicas))
replicas.contains(brokerId.toString) replicas.contains(brokerId.toString)
} }
def tryToBecomeLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = { def tryToBecomeLeaderForPartition(client: ZkClient, topic: String, partition: Int, brokerId: Int): Option[Int] = {
try { try {
createEphemeralPathExpectConflict(zkClient, getTopicPartitionLeaderPath(topic, partition.toString), brokerId.toString) // NOTE: first increment epoch, then become leader
true val newEpoch = incrementEpochForPartition(client, topic, partition, brokerId)
createEphemeralPathExpectConflict(client, getTopicPartitionLeaderPath(topic, partition.toString),
"%d;%d".format(brokerId, newEpoch))
val currentISR = getInSyncReplicasForPartition(client, topic, partition)
updatePersistentPath(client, getTopicPartitionInSyncPath(topic, partition.toString),
"%s;%d".format(currentISR.mkString(","), newEpoch))
info("Elected broker %d with epoch %d to be leader for topic %s partition %d".format(brokerId, newEpoch, topic, partition))
Some(newEpoch)
} catch { } catch {
case e: ZkNodeExistsException => error("Leader exists for topic %s partition %d".format(topic, partition)); false case e: ZkNodeExistsException => error("Leader exists for topic %s partition %d".format(topic, partition)); None
case oe => false case oe => None
} }
} }
def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int, leader: Int) = {
// read previous epoch, increment it and write it to the leader path and the ISR path.
val epoch = try {
Some(getEpochForPartition(client, topic, partition))
}catch {
case e: NoEpochForPartitionException => None
case e1 => throw e1
}
val newEpoch = epoch match {
case Some(partitionEpoch) =>
debug("Existing epoch for topic %s partition %d is %d".format(topic, partition, epoch))
partitionEpoch + 1
case None =>
// this is the first time leader is elected for this partition. So set epoch to 1
debug("First epoch is 1 for topic %s partition %d".format(topic, partition))
1
}
newEpoch
}
def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, creator: String, port: Int) { def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, creator: String, port: Int) {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
val broker = new Broker(id, creator, host, port) val broker = new Broker(id, creator, host, port)
@ -186,7 +258,7 @@ object ZkUtils extends Logging {
/** /**
* Create an persistent node with the given path and data. Create parents if necessary. * Create an persistent node with the given path and data. Create parents if necessary.
*/ */
def createPersistentPath(client: ZkClient, path: String, data: String): Unit = { def createPersistentPath(client: ZkClient, path: String, data: String = ""): Unit = {
try { try {
client.createPersistent(path, data) client.createPersistent(path, data)
} }
@ -198,6 +270,10 @@ object ZkUtils extends Logging {
} }
} }
def createSequentialPersistentPath(client: ZkClient, path: String, data: String = ""): String = {
client.createPersistentSequential(path, data)
}
/** /**
* Update the value of a persistent node with the given path and data. * Update the value of a persistent node with the given path and data.
* create parrent directory if necessary. Never throw NodeExistException. * create parrent directory if necessary. Never throw NodeExistException.
@ -238,7 +314,7 @@ object ZkUtils extends Logging {
} }
} }
def deletePath(client: ZkClient, path: String) { def deletePath(client: ZkClient, path: String): Boolean = {
try { try {
client.delete(path) client.delete(path)
} }
@ -246,6 +322,7 @@ object ZkUtils extends Logging {
case e: ZkNoNodeException => case e: ZkNoNodeException =>
// this can happen during a connection loss event, return normally // this can happen during a connection loss event, return normally
info(path + " deleted during connection loss; this is ok") info(path + " deleted during connection loss; this is ok")
false
case e2 => throw e2 case e2 => throw e2
} }
} }

View File

@ -24,7 +24,6 @@ import org.easymock.EasyMock
import org.junit.Test import org.junit.Test
import kafka.api._ import kafka.api._
import kafka.cluster.Broker import kafka.cluster.Broker
import kafka.common.{InvalidConfigException, NoBrokersForPartitionException, InvalidPartitionException}
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message} import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
import kafka.producer.async._ import kafka.producer.async._
import kafka.serializer.{StringEncoder, StringDecoder, Encoder} import kafka.serializer.{StringEncoder, StringDecoder, Encoder}
@ -35,6 +34,7 @@ import collection.Map
import collection.mutable.ListBuffer import collection.mutable.ListBuffer
import org.scalatest.junit.JUnit3Suite import org.scalatest.junit.JUnit3Suite
import kafka.utils.{NegativePartitioner, TestZKUtils, TestUtils} import kafka.utils.{NegativePartitioner, TestZKUtils, TestUtils}
import kafka.common.{NoBrokersForPartitionException, InvalidPartitionException, InvalidConfigException, QueueFullException}
class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
val props = createBrokerConfigs(1) val props = createBrokerConfigs(1)
@ -56,7 +56,7 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
val mockEventHandler = new EventHandler[String,String] { val mockEventHandler = new EventHandler[String,String] {
def handle(events: Seq[ProducerData[String,String]]) { def handle(events: Seq[ProducerData[String,String]]) {
Thread.sleep(1000000) Thread.sleep(500)
} }
def close {} def close {}
@ -79,6 +79,8 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
} }
catch { catch {
case e: QueueFullException => //expected case e: QueueFullException => //expected
}finally {
producer.close()
} }
} }
@ -319,6 +321,8 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
fail("Should fail with ClassCastException due to incompatible Encoder") fail("Should fail with ClassCastException due to incompatible Encoder")
} catch { } catch {
case e: ClassCastException => case e: ClassCastException =>
}finally {
producer.close()
} }
} }

View File

@ -22,7 +22,7 @@ import kafka.zk.ZooKeeperTestHarness
import kafka.admin.CreateTopicCommand import kafka.admin.CreateTopicCommand
import kafka.utils.TestUtils._ import kafka.utils.TestUtils._
import junit.framework.Assert._ import junit.framework.Assert._
import kafka.utils.{Utils, TestUtils} import kafka.utils.{ZkUtils, Utils, TestUtils}
class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness { class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
@ -35,27 +35,22 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1) val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1)
val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2) val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
override def setUp() { override def setUp() {
super.setUp() super.setUp()
}
override def tearDown() {
super.tearDown()
}
def testLeaderElectionWithCreateTopic {
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
// start both servers // start both servers
val server1 = TestUtils.createServer(new KafkaConfig(configProps1)) val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
val server2 = TestUtils.createServer(new KafkaConfig(configProps2)) val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
servers ++= List(server1, server2) servers ++= List(server1, server2)
}
override def tearDown() {
// shutdown the servers and delete data hosted on them
servers.map(server => server.shutdown())
servers.map(server => Utils.rm(server.config.logDir))
super.tearDown()
}
def testLeaderElectionWithCreateTopic {
// start 2 brokers // start 2 brokers
val topic = "new-topic" val topic = "new-topic"
val partitionId = 0 val partitionId = 0
@ -64,15 +59,16 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1") CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
// wait until leader is elected // wait until leader is elected
var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500) var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 200)
assertTrue("Leader should get elected", leader.isDefined)
assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1)) // NOTE: this is to avoid transient test failures
assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
// kill the server hosting the preferred replica // kill the server hosting the preferred replica
servers.head.shutdown() servers.head.shutdown()
// check if leader moves to the other server // check if leader moves to the other server
leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 5000) leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1)) assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
Thread.sleep(zookeeper.tickTime) Thread.sleep(zookeeper.tickTime)
@ -81,7 +77,6 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
servers.head.startup() servers.head.startup()
leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500) leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
// TODO: Once the optimization for preferred replica re-election is in, this check should change to broker 0
assertEquals("Leader must remain on broker 1", 1, leader.getOrElse(-1)) assertEquals("Leader must remain on broker 1", 1, leader.getOrElse(-1))
// shutdown current leader (broker 1) // shutdown current leader (broker 1)
@ -90,5 +85,41 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
// test if the leader is the preferred replica // test if the leader is the preferred replica
assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1)) assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1))
// shutdown the servers and delete data hosted on them
servers.map(server => server.shutdown())
servers.map(server => Utils.rm(server.config.logDir))
}
// Assuming leader election happens correctly, test if epoch changes as expected
def testEpoch() {
// keep switching leaders to see if epoch changes correctly
val topic = "new-topic"
val partitionId = 0
// setup 2 brokers in ZK
val brokers = TestUtils.createBrokersInZk(zkClient, List(brokerId1, brokerId2))
try {
// create topic with 1 partition, 2 replicas, one on each broker
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
var newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
assertTrue("Broker 0 should become leader", newLeaderEpoch.isDefined)
assertEquals("First epoch value should be 1", 1, newLeaderEpoch.get)
ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 1)
assertTrue("Broker 1 should become leader", newLeaderEpoch.isDefined)
assertEquals("Second epoch value should be 2", 2, newLeaderEpoch.get)
ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
assertTrue("Broker 0 should become leader again", newLeaderEpoch.isDefined)
assertEquals("Third epoch value should be 3", 3, newLeaderEpoch.get)
}finally {
TestUtils.deleteBrokersInZk(zkClient, List(brokerId1, brokerId2))
}
} }
} }

View File

@ -0,0 +1,124 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
import kafka.common.QueueFullException
import junit.framework.Assert._
import kafka.utils.{ZkQueue, TestUtils}
class StateChangeTest extends JUnit3Suite with ZooKeeperTestHarness {
val brokerId1 = 0
val port1 = TestUtils.choosePort()
var stateChangeQ: ZkQueue = null
val config = new KafkaConfig(TestUtils.createBrokerConfig(brokerId1, port1))
override def setUp() {
super.setUp()
// create a queue
val queuePath = "/brokers/state/" + config.brokerId
stateChangeQ = new ZkQueue(zkClient, queuePath, 10)
}
override def tearDown() {
super.tearDown()
}
def testZkQueueDrainAll() {
for(i <- 0 until 5) {
val itemPath = stateChangeQ.put("test:0:follower")
val item = itemPath.split("/").last.split("-").last.toInt
assertEquals(i, item)
}
var numItems: Int = 0
for(i <- 0 until 5) {
val item = stateChangeQ.take()
assertEquals("test:0:follower", item._2)
assertTrue(stateChangeQ.remove(item))
numItems += 1
}
assertEquals(5, numItems)
for(i <- 5 until 10) {
val itemPath = stateChangeQ.put("test:1:follower")
val item = itemPath.split("/").last.split("-").last.toInt
assertEquals(i+5, item)
}
numItems = 0
for(i <- 0 until 5) {
val item = stateChangeQ.take()
assertTrue(stateChangeQ.remove(item))
assertEquals("test:1:follower", item._2)
numItems += 1
}
assertEquals(5, numItems)
}
def testZkQueueFull() {
for(i <- 0 until 10) {
val itemPath = stateChangeQ.put("test:0:follower")
val item = itemPath.split("/").last.split("-").last.toInt
assertEquals(i, item)
}
try {
stateChangeQ.put("test:0:follower")
fail("Queue should be full")
}catch {
case e:QueueFullException => // expected
}
}
def testStateChangeCommandJson() {
// test start replica
val topic = "foo"
val partition = 0
val epoch = 1
val startReplica = new StartReplica(topic, partition, epoch)
val startReplicaJson = startReplica.toJson()
val startReplicaFromJson = StateChangeCommand.getStateChangeRequest(startReplicaJson)
assertEquals(startReplica, startReplicaFromJson)
// test close replica
val closeReplica = new StartReplica(topic, partition, epoch)
val closeReplicaJson = startReplica.toJson()
val closeReplicaFromJson = StateChangeCommand.getStateChangeRequest(closeReplicaJson)
assertEquals(closeReplica, closeReplicaFromJson)
}
// TODO: Do this after patch for delete topic/delete partition is in
def testStateChangeRequestValidity() {
// mock out the StateChangeRequestHandler
// setup 3 replicas for one topic partition
// shutdown follower 1
// restart leader to trigger epoch change
// start follower 1
// test follower 1 acted only on one become follower request
}
}