Create topic support (revisit based on v3 design); patched by Prashanth Menon; reviewed by Jun Rao; KAFKA-329

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1351188 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Prashanth Menon 2012-06-18 01:17:44 +00:00
parent efdc57bc58
commit eff68ce1d0
10 changed files with 221 additions and 238 deletions

View File

@ -18,12 +18,12 @@
package kafka.admin
import java.util.Random
import kafka.api.{TopicMetadata, PartitionMetadata}
import kafka.cluster.Broker
import kafka.utils.{Logging, Utils, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import kafka.api.{TopicMetadata, PartitionMetadata}
import kafka.utils.{Logging, SystemTime, Utils, ZkUtils}
import kafka.cluster.Broker
import collection.mutable.HashMap
import scala.collection.mutable
object AdminUtils extends Logging {
val rand = new Random
@ -49,7 +49,7 @@ object AdminUtils extends Logging {
*/
def assignReplicasToBrokers(brokerList: Seq[String], nPartitions: Int, replicationFactor: Int,
fixedStartIndex: Int = -1) // for testing only
: Array[List[String]] = {
: Map[Int, List[String]] = {
if (nPartitions <= 0)
throw new AdministrationException("number of partitions must be larger than 0")
if (replicationFactor <= 0)
@ -57,7 +57,7 @@ object AdminUtils extends Logging {
if (replicationFactor > brokerList.size)
throw new AdministrationException("replication factor: " + replicationFactor +
" larger than available brokers: " + brokerList.size)
val ret = new Array[List[String]](nPartitions)
val ret = new mutable.HashMap[Int, List[String]]()
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
var secondReplicaShift = -1
@ -68,47 +68,40 @@ object AdminUtils extends Logging {
var replicaList = List(brokerList(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1)
replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, secondReplicaShift, j, brokerList.size))
ret(i) = replicaList.reverse
ret.put(i, replicaList.reverse)
}
ret
ret.toMap
}
def createReplicaAssignmentPathInZK(topic: String, replicaAssignmentList: Seq[List[String]], zkClient: ZkClient) {
def createTopicPartitionAssignmentPathInZK(topic: String, replicaAssignment: Map[Int, List[String]], zkClient: ZkClient) {
try {
val topicVersion = SystemTime.milliseconds
ZkUtils.createPersistentPath(zkClient, ZkUtils.BrokerTopicsPath + "/" + topic, topicVersion.toString)
for (i <- 0 until replicaAssignmentList.size) {
val zkPath = ZkUtils.getTopicPartitionReplicasPath(topic, i.toString)
ZkUtils.updatePersistentPath(zkClient, zkPath, Utils.seqToCSV(replicaAssignmentList(i)))
debug("Updated path %s with %s for replica assignment".format(zkPath, Utils.seqToCSV(replicaAssignmentList(i))))
}
}
catch {
case e: ZkNodeExistsException =>
throw new AdministrationException("topic " + topic + " already exists, with version "
+ ZkUtils.getTopicVersion (zkClient, topic))
case e2 =>
throw new AdministrationException(e2.toString)
val zkPath = ZkUtils.getTopicPath(topic)
val jsonPartitionMap = Utils.mapToJson(replicaAssignment.map(e => (e._1.toString -> e._2)))
ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionMap)
debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
} catch {
case e: ZkNodeExistsException => throw new AdministrationException("topic %s already exists".format(topic))
case e2 => throw new AdministrationException(e2.toString)
}
}
def getTopicMetaDataFromZK(topics: Seq[String], zkClient: ZkClient): Seq[Option[TopicMetadata]] = {
val cachedBrokerInfo = new HashMap[Int, Broker]()
val metadataList = topics.map { topic =>
val cachedBrokerInfo = new mutable.HashMap[Int, Broker]()
topics.map { topic =>
if (ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
val partitions = ZkUtils.getSortedPartitionIdsForTopic(zkClient, topic)
val partitionMetadata = new Array[PartitionMetadata](partitions.size)
val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic).iterator).get(topic).get
val sortedPartitions = topicPartitionAssignment.toList.sortWith( (m1,m2) => m1._1.toInt < m2._1.toInt )
for (i <-0 until partitionMetadata.size) {
val replicas = ZkUtils.readData(zkClient, ZkUtils.getTopicPartitionReplicasPath(topic, partitions(i).toString))
val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitions(i))
val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitions(i))
val partitionMetadata = sortedPartitions.map { partitionMap =>
val partition = partitionMap._1.toInt
val replicas = partitionMap._2
val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition)
val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
partitionMetadata(i) = new PartitionMetadata(partitions(i),
leader match { case None => None case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l.toInt)).head) },
getBrokerInfoFromCache(zkClient, cachedBrokerInfo, Utils.getCSVList(replicas).map(id => id.toInt)),
new PartitionMetadata(partition,
leader.map(l => getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head),
getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt)),
getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas),
None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */)
}
@ -117,7 +110,6 @@ object AdminUtils extends Logging {
None
}
}
metadataList.toList
}
private def getBrokerInfoFromCache(zkClient: ZkClient,

View File

@ -18,8 +18,9 @@
package kafka.admin
import joptsimple.OptionParser
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{Logging, Utils, ZKStringSerializer, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import scala.collection.mutable
object CreateTopicCommand extends Logging {
@ -71,13 +72,11 @@ object CreateTopicCommand extends Logging {
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr)
println("creation succeeded!")
}
catch {
} catch {
case e =>
println("creation failed because of " + e.getMessage)
println(Utils.stackTrace(e))
}
finally {
} finally {
if (zkClient != null)
zkClient.close()
}
@ -85,19 +84,19 @@ object CreateTopicCommand extends Logging {
def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignmentStr: String = "") {
val brokerList = ZkUtils.getSortedBrokerList(zkClient)
var replicaAssignment: Seq[List[String]] = null
if (replicaAssignmentStr == "")
replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, replicationFactor)
val partitionReplicaAssignment = if (replicaAssignmentStr == "")
AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, replicationFactor)
else
replicaAssignment = getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet)
debug("Replica assignment list for %s is %s".format(topic, replicaAssignment))
AdminUtils.createReplicaAssignmentPathInZK(topic, replicaAssignment, zkClient)
getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet)
debug("Replica assignment list for %s is %s".format(topic, partitionReplicaAssignment))
AdminUtils.createTopicPartitionAssignmentPathInZK(topic, partitionReplicaAssignment, zkClient)
}
def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[String]): Array[List[String]] = {
def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[String]): Map[Int, List[String]] = {
val partitionList = replicaAssignmentList.split(",")
val ret = new Array[List[String]](partitionList.size)
val ret = new mutable.HashMap[Int, List[String]]()
for (i <- 0 until partitionList.size) {
val brokerList = partitionList(i).split(":").map(s => s.trim())
if (brokerList.size <= 0)
@ -107,10 +106,10 @@ object CreateTopicCommand extends Logging {
if (!brokerList.toSet.subsetOf(availableBrokerList))
throw new AdministrationException("some specified brokers not available. specified brokers: " + brokerList.toString +
"available broker:" + availableBrokerList.toString)
ret(i) = brokerList.toList
ret.put(i, brokerList.toList)
if (ret(i).size != ret(0).size)
throw new AdministrationException("partition " + i + " has different replication factor: " + brokerList)
}
ret
ret.toMap
}
}

View File

@ -329,8 +329,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
case None =>
/* check if auto creation of topics is turned on */
if(config.autoCreateTopics) {
CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions,
config.defaultReplicationFactor)
CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
.format(topic, config.numPartitions, config.defaultReplicationFactor))
val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head

View File

@ -133,9 +133,8 @@ class KafkaZooKeeper(config: KafkaConfig,
def handleNewTopics(topics: Seq[String]) {
// get relevant partitions to this broker
val topicsAndPartitionsOnThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId)
topicsAndPartitionsOnThisBroker.foreach { tp =>
val topic = tp._1
val partitionsAssignedToThisBroker = tp._2
debug("Partitions assigned to broker %d are %s".format(config.brokerId, topicsAndPartitionsOnThisBroker.mkString(",")))
for( (topic, partitionsAssignedToThisBroker) <- topicsAndPartitionsOnThisBroker ) {
// subscribe to leader changes for these partitions
subscribeToLeaderForPartitions(topic, partitionsAssignedToThisBroker)
// start replicas for these partitions
@ -143,37 +142,19 @@ class KafkaZooKeeper(config: KafkaConfig,
}
}
def handleNewPartitions(topic: String, partitions: Seq[Int]) {
info("Handling topic %s partitions %s".format(topic, partitions.mkString(",")))
// find the partitions relevant to this broker
val partitionsAssignedToThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topic, partitions, config.brokerId)
info("Partitions assigned to broker %d for topic %s are %s"
.format(config.brokerId, topic, partitionsAssignedToThisBroker.mkString(",")))
// subscribe to leader changes for these partitions
subscribeToLeaderForPartitions(topic, partitionsAssignedToThisBroker)
// start replicas for these partitions
startReplicasForPartitions(topic, partitionsAssignedToThisBroker)
}
def subscribeToTopicAndPartitionsChanges(startReplicas: Boolean) {
info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicPartitionsChangeListener)
val topics = ZkUtils.getAllTopics(zkClient)
debug("Existing topics are %s".format(topics.mkString(",")))
topics.foreach(topic => zkClient.subscribeChildChanges(ZkUtils.getTopicPartitionsPath(topic), topicPartitionsChangeListener))
val topicsAndPartitionsOnThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId)
debug("Partitions assigned to broker %d are %s".format(config.brokerId, topicsAndPartitionsOnThisBroker.mkString(",")))
for( (topic, partitionsAssignedToThisBroker) <- topicsAndPartitionsOnThisBroker ) {
// subscribe to leader changes for these partitions
subscribeToLeaderForPartitions(topic, partitionsAssignedToThisBroker)
val partitionsAssignedToThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId)
debug("Partitions assigned to broker %d are %s".format(config.brokerId, partitionsAssignedToThisBroker.mkString(",")))
partitionsAssignedToThisBroker.foreach { tp =>
val topic = tp._1
val partitions = tp._2.map(p => p.toInt)
partitions.foreach { partition =>
// register leader change listener
zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderPath(topic, partition.toString), leaderChangeListener)
}
// start replicas for these partitions
if(startReplicas)
startReplicasForPartitions(topic, partitions)
startReplicasForPartitions(topic, partitionsAssignedToThisBroker)
}
}
@ -199,11 +180,11 @@ class KafkaZooKeeper(config: KafkaConfig,
}
private def startReplica(replica: Replica) {
info("Starting replica for topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId,
replica.brokerId))
info("Starting replica for topic %s partition %d on broker %d"
.format(replica.topic, replica.partition.partitionId, replica.brokerId))
ZkUtils.getLeaderForPartition(zkClient, replica.topic, replica.partition.partitionId) match {
case Some(leader) => info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partitionId,
leader))
case Some(leader) =>
info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partitionId,leader))
// check if this broker is the leader, if not, then become follower
if(leader != config.brokerId)
becomeFollower(replica, leader, zkClient)
@ -218,10 +199,9 @@ class KafkaZooKeeper(config: KafkaConfig,
val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, replica.topic, replica.partition.partitionId).map(_.toInt)
val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, replica.topic, replica.partition.partitionId)
val liveBrokers = ZkUtils.getSortedBrokerList(zkClient).map(_.toInt)
if(canBecomeLeader(config.brokerId, replica.topic, replica.partition.partitionId,
assignedReplicas, inSyncReplicas, liveBrokers)) {
info("Broker %d will participate in leader election for topic %s partition %d".format(config.brokerId, replica.topic,
replica.partition.partitionId))
if(canBecomeLeader(config.brokerId, replica.topic, replica.partition.partitionId, assignedReplicas, inSyncReplicas, liveBrokers)) {
info("Broker %d will participate in leader election for topic %s partition %d"
.format(config.brokerId, replica.topic, replica.partition.partitionId))
// wait for some time if it is not the preferred replica
try {
if(replica.brokerId != assignedReplicas.head) {
@ -233,7 +213,7 @@ class KafkaZooKeeper(config: KafkaConfig,
Thread.sleep(config.preferredReplicaWaitTime)
}
}
}catch {
} catch {
case e => // ignoring
}
val newLeaderEpochAndISR = ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic,
@ -279,7 +259,7 @@ class KafkaZooKeeper(config: KafkaConfig,
" partition %d is alive. Broker %d can become leader since it is in the assigned replicas %s"
.format(partition, brokerId, assignedReplicas.mkString(",")))
true
}else {
} else {
info("No broker in the ISR %s for topic %s".format(inSyncReplicas.mkString(","), topic) +
" partition %d is alive. Broker %d can become leader since it is in the assigned replicas %s"
.format(partition, brokerId, assignedReplicas.mkString(",")))
@ -297,7 +277,7 @@ class KafkaZooKeeper(config: KafkaConfig,
info("ISR for topic %s partition %d is empty. Broker %d can become leader since it "
.format(topic, partition, brokerId) + "is part of the assigned replicas list")
true
}else {
} else {
info("ISR for topic %s partition %d is empty. Broker %d cannot become leader since it "
.format(topic, partition, brokerId) + "is not part of the assigned replicas list")
false
@ -310,27 +290,19 @@ class KafkaZooKeeper(config: KafkaConfig,
@throws(classOf[Exception])
def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
import collection.JavaConversions
topicListenerLock.synchronized {
debug("Topic/partition change listener fired for path " + parentPath)
import scala.collection.JavaConversions._
val currentChildren = asBuffer(curChilds)
val currentChildren = JavaConversions.asBuffer(curChilds).toSet
val newTopics = currentChildren -- allTopics
val deletedTopics = allTopics -- currentChildren
allTopics.clear()
// check if topic has changed or a partition for an existing topic has changed
if(parentPath == ZkUtils.BrokerTopicsPath) {
val currentTopics = currentChildren
debug("New topics " + currentTopics.mkString(","))
// for each new topic [topic], watch the path /brokers/topics/[topic]/partitions
currentTopics.foreach { topic =>
zkClient.subscribeChildChanges(ZkUtils.getTopicPartitionsPath(topic), this)
allTopics += topic
}
handleNewTopics(currentTopics)
}else {
val topic = parentPath.split("/").takeRight(2).head
debug("Partitions changed for topic %s on broker %d with new value %s"
.format(topic, config.brokerId, currentChildren.mkString(",")))
handleNewPartitions(topic, currentChildren.map(p => p.toInt).toSeq)
}
allTopics ++ currentChildren
debug("New topics: [%s]. Deleted topics: [%s]".format(newTopics.mkString(","), deletedTopics.mkString(",")))
handleNewTopics(newTopics.toSeq)
// TODO: Handle topic deletions
//handleDeletedTopics(deletedTopics.toSeq)
}
}

View File

@ -749,6 +749,21 @@ object Utils extends Logging {
builder.toString
}
def mapToJson[T <: Any](map: Map[String, List[String]]): String = {
val builder = new StringBuilder
builder.append("{ ")
var numElements = 0
for ( (key, value) <- map ) {
if (numElements > 0)
builder.append(",")
builder.append("\"" + key + "\": ")
builder.append("[%s]".format(value.map("\""+_+"\"").mkString(",")))
numElements += 1
}
builder.append(" }")
builder.toString
}
def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
for(arg <- required) {
if(!options.has(arg)) {

View File

@ -17,15 +17,16 @@
package kafka.utils
import org.I0Itec.zkclient.serialize.ZkSerializer
import kafka.cluster.{Broker, Cluster}
import scala.collection._
import java.util.Properties
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError}
import java.util.concurrent.locks.Condition
import kafka.cluster.{Broker, Cluster}
import kafka.common.NoEpochForPartitionException
import kafka.consumer.TopicCount
import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
import java.util.concurrent.locks.Condition
import kafka.common.NoEpochForPartitionException
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError}
import org.I0Itec.zkclient.serialize.ZkSerializer
import scala.collection._
import util.parsing.json.JSON
object ZkUtils extends Logging {
val ConsumersPath = "/consumers"
@ -110,7 +111,7 @@ object ZkUtils extends Logging {
}else {
throw new NoEpochForPartitionException("ISR path for topic %s partition %d is empty".format(topic, partition))
}
}catch {
} catch {
case e: ZkNoNodeException =>
throw new NoEpochForPartitionException("No epoch since leader never existed for topic %s partition %d".format(topic, partition))
case e1 => throw e1
@ -118,15 +119,23 @@ object ZkUtils extends Logging {
lastKnownEpoch
}
/**
* Gets the assigned replicas (AR) for a specific topic and partition
*/
def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[String] = {
val replicaListString = readDataMaybeNull(zkClient, getTopicPartitionReplicasPath(topic, partition.toString))
if(replicaListString == null)
Seq.empty[String]
else {
Utils.getCSVList(replicaListString)
val topicAndPartitionAssignment = getPartitionAssignmentForTopics(zkClient, List(topic).iterator)
topicAndPartitionAssignment.get(topic) match {
case Some(partitionAssignment) => partitionAssignment.get(partition.toString) match {
case Some(replicaList) => replicaList
case None => Seq.empty[String]
}
case None => Seq.empty[String]
}
}
/**
* Gets the in-sync replicas (ISR) for a specific topic and partition
*/
def getInSyncReplicasForPartition(client: ZkClient, topic: String, partition: Int): Seq[Int] = {
val replicaListAndEpochString = readDataMaybeNull(client, getTopicPartitionInSyncPath(topic, partition.toString))
if(replicaListAndEpochString == null)
@ -225,8 +234,7 @@ object ZkUtils extends Logging {
private def createEphemeralPath(client: ZkClient, path: String, data: String): Unit = {
try {
client.createEphemeral(path, data)
}
catch {
} catch {
case e: ZkNoNodeException => {
createParentPath(client, path)
client.createEphemeral(path, data)
@ -241,23 +249,20 @@ object ZkUtils extends Logging {
def createEphemeralPathExpectConflict(client: ZkClient, path: String, data: String): Unit = {
try {
createEphemeralPath(client, path, data)
}
catch {
} catch {
case e: ZkNodeExistsException => {
// this can happen when there is connection loss; make sure the data is what we intend to write
var storedData: String = null
try {
storedData = readData(client, path)
}
catch {
} catch {
case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
case e2 => throw e2
}
if (storedData == null || storedData != data) {
info("conflict in " + path + " data: " + data + " stored data: " + storedData)
throw e
}
else {
} else {
// otherwise, the creation succeeded, return normally
info(path + " exists with value " + data + " during connection loss; this is ok")
}
@ -272,8 +277,7 @@ object ZkUtils extends Logging {
def createPersistentPath(client: ZkClient, path: String, data: String = ""): Unit = {
try {
client.createPersistent(path, data)
}
catch {
} catch {
case e: ZkNoNodeException => {
createParentPath(client, path)
client.createPersistent(path, data)
@ -292,14 +296,12 @@ object ZkUtils extends Logging {
def updatePersistentPath(client: ZkClient, path: String, data: String): Unit = {
try {
client.writeData(path, data)
}
catch {
} catch {
case e: ZkNoNodeException => {
createParentPath(client, path)
try {
client.createPersistent(path, data)
}
catch {
} catch {
case e: ZkNodeExistsException => client.writeData(path, data)
case e2 => throw e2
}
@ -315,8 +317,7 @@ object ZkUtils extends Logging {
def updateEphemeralPath(client: ZkClient, path: String, data: String): Unit = {
try {
client.writeData(path, data)
}
catch {
} catch {
case e: ZkNoNodeException => {
createParentPath(client, path)
client.createEphemeral(path, data)
@ -328,8 +329,7 @@ object ZkUtils extends Logging {
def deletePath(client: ZkClient, path: String): Boolean = {
try {
client.delete(path)
}
catch {
} catch {
case e: ZkNoNodeException =>
// this can happen during a connection loss event, return normally
info(path + " deleted during connection loss; this is ok")
@ -341,8 +341,7 @@ object ZkUtils extends Logging {
def deletePathRecursive(client: ZkClient, path: String) {
try {
client.deleteRecursive(path)
}
catch {
} catch {
case e: ZkNoNodeException =>
// this can happen during a connection loss event, return normally
info(path + " deleted during connection loss; this is ok")
@ -368,16 +367,12 @@ object ZkUtils extends Logging {
import scala.collection.JavaConversions._
// triggers implicit conversion from java list to scala Seq
var ret: java.util.List[String] = null
try {
ret = client.getChildren(path)
}
catch {
case e: ZkNoNodeException =>
return Nil
client.getChildren(path)
} catch {
case e: ZkNoNodeException => return Nil
case e2 => throw e2
}
return ret
}
/**
@ -399,35 +394,40 @@ object ZkUtils extends Logging {
cluster
}
def getPartitionsForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, Seq[String]] = {
val ret = new mutable.HashMap[String, Seq[String]]()
topics.foreach { topic =>
// get the partitions that exist for topic
val partitions = getChildrenParentMayNotExist(zkClient, getTopicPartitionsPath(topic))
debug("children of /brokers/topics/%s are %s".format(topic, partitions))
ret += (topic -> partitions.sortWith((s,t) => s < t))
def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, Map[String, List[String]]] = {
val ret = new mutable.HashMap[String, Map[String, List[String]]]()
topics.foreach{ topic =>
val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))
val partitionMap = if (jsonPartitionMap == null) {
Map[String, List[String]]()
} else {
JSON.parseFull(jsonPartitionMap) match {
case Some(m) => m.asInstanceOf[Map[String, List[String]]]
case None => Map[String, List[String]]()
}
}
debug("partition map for /brokers/topics/%s is %s".format(topic, partitionMap))
ret += (topic -> partitionMap)
}
ret
}
def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int): Map[String, Seq[Int]] = {
val topicsAndPartitions = getPartitionsForTopics(zkClient, topics.iterator)
topicsAndPartitions.map { tp =>
val topic = tp._1
val partitions = tp._2.map(p => p.toInt)
val relevantPartitions = partitions.filter { partition =>
val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partition).map(r => r.toInt)
assignedReplicas.contains(brokerId)
}
(topic -> relevantPartitions)
def getPartitionsForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, Seq[String]] = {
getPartitionAssignmentForTopics(zkClient, topics).map{ topicAndPartitionMap =>
val topic = topicAndPartitionMap._1
val partitionMap = topicAndPartitionMap._2
debug("partition assignment of /brokers/topics/%s is %s".format(topic, partitionMap))
(topic -> partitionMap.keys.toSeq.sortWith((s,t) => s < t))
}
}
def getPartitionsAssignedToBroker(zkClient: ZkClient, topic: String, partitions: Seq[Int], broker: Int): Seq[Int] = {
partitions.filter { p =>
val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, p).map(r => r.toInt)
assignedReplicas.contains(broker)
def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int): Map[String, Seq[Int]] = {
val topicsAndPartitions = getPartitionAssignmentForTopics(zkClient, topics.iterator)
topicsAndPartitions.map{ topicAndPartitionMap =>
val topic = topicAndPartitionMap._1
val partitionMap = topicAndPartitionMap._2
val relevantPartitions = partitionMap.filter( m => m._2.contains(brokerId.toString) )
(topic -> relevantPartitions.keySet.map(_.toInt).toSeq)
}
}
@ -470,14 +470,6 @@ object ZkUtils extends Logging {
consumersPerTopicMap
}
/**
* For a given topic, this returns the sorted list of partition ids registered for this topic
*/
def getSortedPartitionIdsForTopic(zkClient: ZkClient, topic: String): Seq[Int] = {
val topicPartitionsPath = ZkUtils.getTopicPartitionsPath(topic)
ZkUtils.getChildrenParentMayNotExist(zkClient, topicPartitionsPath).map(pid => pid.toInt).sortWith((s,t) => s < t)
}
def getBrokerInfoFromIds(zkClient: ZkClient, brokerIds: Seq[Int]): Seq[Broker] =
brokerIds.map( bid => Broker.createBroker(bid, ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)) )

View File

@ -50,18 +50,18 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
// correct assignment
{
val expectedAssignment = Array(
List("0", "1", "2"),
List("1", "2", "3"),
List("2", "3", "4"),
List("3", "4", "0"),
List("4", "0", "1"),
List("0", "2", "3"),
List("1", "3", "4"),
List("2", "4", "0"),
List("3", "0", "1"),
List("4", "1", "2")
)
val expectedAssignment = Map(
0 -> List("0", "1", "2"),
1 -> List("1", "2", "3"),
2 -> List("2", "3", "4"),
3 -> List("3", "4", "0"),
4 -> List("4", "0", "1"),
5 -> List("0", "2", "3"),
6 -> List("1", "3", "4"),
7 -> List("2", "4", "0"),
8 -> List("3", "0", "1"),
9 -> List("4", "1", "2")
)
val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, 10, 3, 0)
val e = (expectedAssignment.toList == actualAssignment.toList)
@ -109,46 +109,51 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
// good assignment
{
val replicationAssignmentStr = "0:1:2,1:2:3"
val expectedReplicationAssignment = Array(
List("0", "1", "2"),
List("1", "2", "3")
val expectedReplicationAssignment = Map(
0 -> List("0", "1", "2"),
1 -> List("1", "2", "3")
)
val actualReplicationAssignment = CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
assertTrue(expectedReplicationAssignment.toList == actualReplicationAssignment.toList)
assertEquals(expectedReplicationAssignment.size, actualReplicationAssignment.size)
for( (part, replicas) <- expectedReplicationAssignment ) {
assertEquals(replicas, actualReplicationAssignment(part))
}
}
}
@Test
def testTopicCreationInZK() {
val expectedReplicaAssignment = Array(
List("0", "1", "2"),
List("1", "2", "3"),
List("2", "3", "4"),
List("3", "4", "0"),
List("4", "0", "1"),
List("0", "2", "3"),
List("1", "3", "4"),
List("2", "4", "0"),
List("3", "0", "1"),
List("4", "1", "2"),
List("1", "2", "3"),
List("1", "3", "4")
)
val expectedReplicaAssignment = Map(
0 -> List("0", "1", "2"),
1 -> List("1", "2", "3"),
2 -> List("2", "3", "4"),
3 -> List("3", "4", "0"),
4 -> List("4", "0", "1"),
5 -> List("0", "2", "3"),
6 -> List("1", "3", "4"),
7 -> List("2", "4", "0"),
8 -> List("3", "0", "1"),
9 -> List("4", "1", "2"),
10 -> List("1", "2", "3"),
11 -> List("1", "3", "4")
)
TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
val topic = "test"
// create the topic
AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
val actualReplicaAssignment = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
.get.partitionsMetadata.map(p => p.replicas)
val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
expectedReplicaAssignment.toList.zip(actualReplicaList).foreach(l => assertEquals(l._1, l._2))
assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
for( i <- 0 until actualReplicaList.size ) {
assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i))
}
try {
AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
fail("shouldn't be able to create a topic already exists")
}
catch {
} catch {
case e: AdministrationException => // this is good
case e2 => throw e2
}
@ -156,22 +161,26 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
@Test
def testGetTopicMetadata() {
val expectedReplicaAssignment = Array(
List("0", "1", "2"),
List("1", "2", "3")
val expectedReplicaAssignment = Map(
0 -> List("0", "1", "2"),
1 -> List("1", "2", "3")
)
val topic = "auto-topic"
TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3))
AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
newTopicMetadata match {
case Some(metadata) => assertEquals(topic, metadata.topic)
case Some(metadata) =>
assertEquals(topic, metadata.topic)
assertNotNull("partition metadata list cannot be null", metadata.partitionsMetadata)
assertEquals("partition metadata list length should be 2", 2, metadata.partitionsMetadata.size)
val actualReplicaAssignment = metadata.partitionsMetadata.map(p => p.replicas)
val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
assertEquals(expectedReplicaAssignment.toList, actualReplicaList)
assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
for(i <- 0 until actualReplicaList.size) {
assertEquals(expectedReplicaAssignment(i), actualReplicaList(i))
}
case None => fail("Topic " + topic + " should've been automatically created")
}
}

View File

@ -81,8 +81,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
try {
getMessages(nMessages*2, topicMessageStreams0)
fail("should get an exception")
}
catch {
} catch {
case e: ConsumerTimeoutException => // this is ok
case e => throw e
}
@ -90,15 +89,15 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
zkConsumerConnector0.shutdown
// wait to make sure the topic and partition have a leader for the successful case
waitUntilLeaderIsElected(zkClient, topic, 0, 500)
waitUntilLeaderIsElected(zkClient, topic, 1, 500)
// send some messages to each broker
val sentMessages1_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages)
val sentMessages1_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
val sentMessages1 = (sentMessages1_1 ++ sentMessages1_2).sortWith((s,t) => s.checksum < t.checksum)
// wait to make sure the topic and partition have a leader for the successful case
waitUntilLeaderIsElected(zkClient, topic, 0, 500)
waitUntilLeaderIsElected(zkClient, topic, 1, 500)
// create a consumer
val consumerConfig1 = new ConsumerConfig(
TestUtils.createConsumerProperties(zkConnect, group, consumer1))
@ -141,7 +140,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
val expected_2 = List( ("0", "group1_consumer1-0"),
("1", "group1_consumer2-0"))
assertEquals(expected_2, actual_2)
assertEquals(expected_2, actual_2)
// create a consumer with empty map
val consumerConfig3 = new ConsumerConfig(
@ -165,7 +164,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
// also check partition ownership
val actual_3 = getZKChildrenValues(dirs.consumerOwnerDir)
assertEquals(expected_2, actual_3)
assertEquals(expected_2, actual_3)
zkConsumerConnector1.shutdown
zkConsumerConnector2.shutdown
@ -199,7 +198,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
val expected_1 = List( ("0", "group1_consumer1-0"),
("1", "group1_consumer1-0"))
assertEquals(expected_1, actual_1)
assertEquals(expected_1, actual_1)
// commit consumed offsets
zkConsumerConnector1.commitOffsets
@ -227,7 +226,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
val expected_2 = List( ("0", "group1_consumer1-0"),
("1", "group1_consumer2-0"))
assertEquals(expected_2, actual_2)
assertEquals(expected_2, actual_2)
// create a consumer with empty map
val consumerConfig3 = new ConsumerConfig(
@ -251,7 +250,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
// also check partition ownership
val actual_3 = getZKChildrenValues(dirs.consumerOwnerDir)
assertEquals(expected_2, actual_3)
assertEquals(expected_2, actual_3)
zkConsumerConnector1.shutdown
zkConsumerConnector2.shutdown
@ -264,8 +263,6 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
requestHandlerLogger.setLevel(Level.FATAL)
var actualMessages: List[Message] = Nil
// shutdown one server
servers.last.shutdown
Thread.sleep(500)
@ -288,7 +285,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
val expected_1 = List( ("0", "group1_consumer0-0"),
("1", "group1_consumer0-0"))
assertEquals(expected_1, actual_1)
assertEquals(expected_1, actual_1)
zkConsumerConnector0.shutdown
// at this point, only some part of the message set was consumed. So consumed offset should still be 0
@ -361,8 +358,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val sentMessages1 = sendMessages(configs.head, nMessages, "batch1", NoCompressionCodec, 1)
// create a consumer
val consumerConfig1 = new ConsumerConfig(
TestUtils.createConsumerProperties(zkConnect, group, consumer1))
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
val topicRegistry = zkConsumerConnector1.getTopicRegistry

View File

@ -91,8 +91,11 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
props.put("zk.connect", TestZKUtils.zookeeperConnect)
val config = new ProducerConfig(props)
// create topic with 1 partition
// create topic with 1 partition and await leadership
CreateTopicCommand.createTopic(zkClient, "new-topic", 1)
TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 0, 500)
val producer = new Producer[String, String](config)
try {
// Available partition ids should be 0.
@ -132,6 +135,10 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
// create topic
CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0")
TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 0, 500)
TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 1, 500)
TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 2, 500)
TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 3, 500)
val config = new ProducerConfig(props)
val producer = new Producer[String, String](config)
@ -189,6 +196,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
// create topics in ZK
CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 0, 500)
// do a simple test to make sure plumbing is okay
try {

View File

@ -52,8 +52,9 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness {
val followerBrokerId = configs.last.brokerId
val leaderBroker = new Broker(leaderBrokerId, "localhost", "localhost", configs.head.port)
// create a topic and partition
// create a topic and partition and await leadership
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(c => c.brokerId).mkString(":"))
TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 1000)
// send test messages to leader
val producer = TestUtils.createProducer[String, String](zkConnect, new StringEncoder)