KAFKA-129 ZK-based producer can throw an unexpceted exception when sending a message; patched by nehanarkhede; reviewed by junrao

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1178997 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Neha Narkhede 2011-10-04 22:43:00 +00:00
parent 513c4a59d8
commit dd62e77f90
7 changed files with 198 additions and 96 deletions

View File

@ -43,6 +43,15 @@ trait BrokerPartitionInfo {
*/ */
def getAllBrokerInfo: Map[Int, Broker] def getAllBrokerInfo: Map[Int, Broker]
/**
* This is relevant to the ZKBrokerPartitionInfo. It updates the ZK cache
* by reading from zookeeper and recreating the data structures. This API
* is invoked by the producer, when it detects that the ZK cache of
* ZKBrokerPartitionInfo is stale.
*
*/
def updateInfo
/** /**
* Cleanup * Cleanup
*/ */

View File

@ -53,13 +53,15 @@ private[producer] class ConfigBrokerPartitionInfo(config: ProducerConfig) extend
def close {} def close {}
def updateInfo = {}
/** /**
* Generate a sequence of (brokerId, numPartitions) for all brokers * Generate a sequence of (brokerId, numPartitions) for all brokers
* specified in the producer configuration * specified in the producer configuration
* @return sequence of (brokerId, numPartitions) * @return sequence of (brokerId, numPartitions)
*/ */
private def getConfigTopicPartitionInfo(): SortedSet[Partition] = { private def getConfigTopicPartitionInfo(): SortedSet[Partition] = {
val brokerInfoList = config.brokerPartitionInfo.split(",") val brokerInfoList = config.brokerList.split(",")
if(brokerInfoList.size == 0) throw new InvalidConfigException("broker.list is empty") if(brokerInfoList.size == 0) throw new InvalidConfigException("broker.list is empty")
// check if each individual broker info is valid => (brokerId: brokerHost: brokerPort) // check if each individual broker info is valid => (brokerId: brokerHost: brokerPort)
brokerInfoList.foreach { bInfo => brokerInfoList.foreach { bInfo =>
@ -84,7 +86,7 @@ private[producer] class ConfigBrokerPartitionInfo(config: ProducerConfig) extend
*/ */
private def getConfigBrokerInfo(): Map[Int, Broker] = { private def getConfigBrokerInfo(): Map[Int, Broker] = {
val brokerInfo = new HashMap[Int, Broker]() val brokerInfo = new HashMap[Int, Broker]()
val brokerInfoList = config.brokerPartitionInfo.split(",") val brokerInfoList = config.brokerList.split(",")
brokerInfoList.foreach{ bInfo => brokerInfoList.foreach{ bInfo =>
val brokerIdHostPort = bInfo.split(":") val brokerIdHostPort = bInfo.split(":")
brokerInfo += (brokerIdHostPort(0).toInt -> new Broker(brokerIdHostPort(0).toInt, brokerIdHostPort(1), brokerInfo += (brokerIdHostPort(0).toInt -> new Broker(brokerIdHostPort(0).toInt, brokerIdHostPort(1),

View File

@ -23,7 +23,6 @@ import kafka.utils._
import java.util.Properties import java.util.Properties
import kafka.cluster.{Partition, Broker} import kafka.cluster.{Partition, Broker}
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import kafka.api.ProducerRequest
import kafka.common.{NoBrokersForPartitionException, InvalidConfigException, InvalidPartitionException} import kafka.common.{NoBrokersForPartitionException, InvalidConfigException, InvalidPartitionException}
class Producer[K,V](config: ProducerConfig, class Producer[K,V](config: ProducerConfig,
@ -35,9 +34,9 @@ class Producer[K,V](config: ProducerConfig,
{ {
private val logger = Logger.getLogger(classOf[Producer[K, V]]) private val logger = Logger.getLogger(classOf[Producer[K, V]])
private val hasShutdown = new AtomicBoolean(false) private val hasShutdown = new AtomicBoolean(false)
if(!Utils.propertyExists(config.zkConnect) && !Utils.propertyExists(config.brokerPartitionInfo)) if(!Utils.propertyExists(config.zkConnect) && !Utils.propertyExists(config.brokerList))
throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified") throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified")
if (Utils.propertyExists(config.zkConnect) && Utils.propertyExists(config.brokerPartitionInfo)) if (Utils.propertyExists(config.zkConnect) && Utils.propertyExists(config.brokerList))
logger.warn("Both zk.connect and broker.list provided (zk.connect takes precedence).") logger.warn("Both zk.connect and broker.list provided (zk.connect takes precedence).")
private val random = new java.util.Random private val random = new java.util.Random
// check if zookeeper based auto partition discovery is enabled // check if zookeeper based auto partition discovery is enabled
@ -94,47 +93,86 @@ class Producer[K,V](config: ProducerConfig,
partitioner: Partitioner[K]) = partitioner: Partitioner[K]) =
this(config, if(partitioner == null) new DefaultPartitioner[K] else partitioner, this(config, if(partitioner == null) new DefaultPartitioner[K] else partitioner,
new ProducerPool[V](config, encoder, eventHandler, cbkHandler), true, null) new ProducerPool[V](config, encoder, eventHandler, cbkHandler), true, null)
/** /**
* Sends the data, partitioned by key to the topic using either the * Sends the data, partitioned by key to the topic using either the
* synchronous or the asynchronous producer * synchronous or the asynchronous producer
* @param producerData the producer data object that encapsulates the topic, key and message data * @param producerData the producer data object that encapsulates the topic, key and message data
*/ */
def send(producerData: ProducerData[K,V]*) { def send(producerData: ProducerData[K,V]*) {
zkEnabled match {
case true => zkSend(producerData: _*)
case false => configSend(producerData: _*)
}
}
private def zkSend(producerData: ProducerData[K,V]*) {
val producerPoolRequests = producerData.map { pd => val producerPoolRequests = producerData.map { pd =>
// find the number of broker partitions registered for this topic var brokerIdPartition: Option[Partition] = None
logger.debug("Getting the number of broker partitions registered for topic: " + pd.getTopic) var brokerInfoOpt: Option[Broker] = None
val numBrokerPartitions = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic).toSeq
logger.debug("Broker partitions registered for topic: " + pd.getTopic + " = " + numBrokerPartitions) var numRetries: Int = 0
val totalNumPartitions = numBrokerPartitions.length while(numRetries <= config.zkReadRetries && brokerInfoOpt.isEmpty) {
if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey) if(numRetries > 0) {
logger.info("Try #" + numRetries + " ZK producer cache is stale. Refreshing it by reading from ZK again")
brokerPartitionInfo.updateInfo
}
val numBrokerPartitions = getNumPartitionsForTopic(pd)
val totalNumPartitions = numBrokerPartitions.length
var brokerIdPartition: Partition = null
var partition: Int = 0
if(zkEnabled) {
// get the partition id
val partitionId = getPartition(pd.getKey, totalNumPartitions) val partitionId = getPartition(pd.getKey, totalNumPartitions)
brokerIdPartition = numBrokerPartitions(partitionId) brokerIdPartition = Some(numBrokerPartitions(partitionId))
val brokerInfo = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.brokerId).get brokerInfoOpt = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.get.brokerId)
logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + numRetries += 1
" on partition " + brokerIdPartition.partId) }
partition = brokerIdPartition.partId
}else {
// randomly select a broker
val randomBrokerId = random.nextInt(totalNumPartitions)
brokerIdPartition = numBrokerPartitions(randomBrokerId)
val brokerInfo = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.brokerId).get
logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + brokerInfoOpt match {
" on a randomly chosen partition") case Some(brokerInfo) =>
partition = ProducerRequest.RandomPartition if(logger.isDebugEnabled) logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port +
" on partition " + brokerIdPartition.get.partId)
case None =>
throw new NoBrokersForPartitionException("Invalid Zookeeper state. Failed to get partition for topic: " +
pd.getTopic + " and key: " + pd.getKey)
} }
producerPool.getProducerPoolData(pd.getTopic, producerPool.getProducerPoolData(pd.getTopic,
new Partition(brokerIdPartition.brokerId, partition), new Partition(brokerIdPartition.get.brokerId, brokerIdPartition.get.partId),
pd.getData) pd.getData)
} }
producerPool.send(producerPoolRequests: _*) producerPool.send(producerPoolRequests: _*)
} }
private def configSend(producerData: ProducerData[K,V]*) {
val producerPoolRequests = producerData.map { pd =>
// find the broker partitions registered for this topic
val numBrokerPartitions = getNumPartitionsForTopic(pd)
val totalNumPartitions = numBrokerPartitions.length
val partitionId = getPartition(pd.getKey, totalNumPartitions)
val brokerIdPartition = numBrokerPartitions(partitionId)
val brokerInfo = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.brokerId).get
if(logger.isDebugEnabled)
logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + " on a partition " +
brokerIdPartition.partId)
producerPool.getProducerPoolData(pd.getTopic,
new Partition(brokerIdPartition.brokerId, brokerIdPartition.partId),
pd.getData)
}
producerPool.send(producerPoolRequests: _*)
}
private def getNumPartitionsForTopic(pd: ProducerData[K,V]): Seq[Partition] = {
if(logger.isDebugEnabled)
logger.debug("Getting the number of broker partitions registered for topic: " + pd.getTopic)
val numBrokerPartitions = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic).toSeq
if(logger.isDebugEnabled)
logger.debug("Broker partitions registered for topic: " + pd.getTopic + " = " + numBrokerPartitions)
val totalNumPartitions = numBrokerPartitions.length
if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey)
numBrokerPartitions
}
/** /**
* Retrieves the partition id and throws an InvalidPartitionException if * Retrieves the partition id and throws an InvalidPartitionException if
* the value of partition is not between 0 and numPartitions-1 * the value of partition is not between 0 and numPartitions-1

View File

@ -28,8 +28,8 @@ class ProducerConfig(val props: Properties) extends ZKConfig(props)
/** For bypassing zookeeper based auto partition discovery, use this config * /** For bypassing zookeeper based auto partition discovery, use this config *
* to pass in static broker and per-broker partition information. Format- * * to pass in static broker and per-broker partition information. Format- *
* brokerid1:host1:port1, brokerid2:host2:port2*/ * brokerid1:host1:port1, brokerid2:host2:port2*/
val brokerPartitionInfo = Utils.getString(props, "broker.list", null) val brokerList = Utils.getString(props, "broker.list", null)
if(brokerPartitionInfo != null && Utils.getString(props, "partitioner.class", null) != null) if(brokerList != null && Utils.getString(props, "partitioner.class", null) != null)
throw new InvalidConfigException("partitioner.class cannot be used when broker.list is set") throw new InvalidConfigException("partitioner.class cannot be used when broker.list is set")
/** the partitioner class for partitioning events amongst sub-topics */ /** the partitioner class for partitioning events amongst sub-topics */
@ -58,4 +58,14 @@ class ProducerConfig(val props: Properties) extends ZKConfig(props)
* If the compression codec is NoCompressionCodec, compression is disabled for all topics * If the compression codec is NoCompressionCodec, compression is disabled for all topics
*/ */
val compressedTopics = Utils.getCSVList(Utils.getString(props, "compressed.topics", null)) val compressedTopics = Utils.getCSVList(Utils.getString(props, "compressed.topics", null))
/**
* The producer using the zookeeper software load balancer maintains a ZK cache that gets
* updated by the zookeeper watcher listeners. During some events like a broker bounce, the
* producer ZK cache can get into an inconsistent state, for a small time period. In this time
* period, it could end up picking a broker partition that is unavailable. When this happens, the
* ZK cache needs to be updated.
* This parameter specifies the number of times the producer attempts to refresh this ZK cache.
*/
val zkReadRetries = Utils.getInt(props, "zk.read.num.retries", 3)
} }

View File

@ -13,7 +13,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package kafka.producer package kafka.producer
import kafka.utils.{ZKStringSerializer, ZkUtils, ZKConfig} import kafka.utils.{ZKStringSerializer, ZkUtils, ZKConfig}
@ -27,6 +27,7 @@ import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
import collection.SortedSet import collection.SortedSet
private[producer] object ZKBrokerPartitionInfo { private[producer] object ZKBrokerPartitionInfo {
/** /**
* Generate a mapping from broker id to (brokerId, numPartitions) for the list of brokers * Generate a mapping from broker id to (brokerId, numPartitions) for the list of brokers
* specified * specified
@ -49,7 +50,7 @@ private[producer] object ZKBrokerPartitionInfo {
} }
} }
brokerParts brokerParts
} }
} }
/** /**
@ -89,22 +90,24 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In
* @return a sequence of (brokerId, numPartitions). Returns a zero-length * @return a sequence of (brokerId, numPartitions). Returns a zero-length
* sequence if no brokers are available. * sequence if no brokers are available.
*/ */
def getBrokerPartitionInfo(topic: String): scala.collection.immutable.SortedSet[Partition] = { def getBrokerPartitionInfo(topic: String): SortedSet[Partition] = {
val brokerPartitions = topicBrokerPartitions.get(topic) zkWatcherLock synchronized {
var numBrokerPartitions = SortedSet.empty[Partition] val brokerPartitions = topicBrokerPartitions.get(topic)
brokerPartitions match { var numBrokerPartitions = SortedSet.empty[Partition]
case Some(bp) => brokerPartitions match {
bp.size match { case Some(bp) =>
case 0 => // no brokers currently registered for this topic. Find the list of all brokers in the cluster. bp.size match {
numBrokerPartitions = bootstrapWithExistingBrokers(topic) case 0 => // no brokers currently registered for this topic. Find the list of all brokers in the cluster.
topicBrokerPartitions += (topic -> numBrokerPartitions) numBrokerPartitions = bootstrapWithExistingBrokers(topic)
case _ => numBrokerPartitions = TreeSet[Partition]() ++ bp topicBrokerPartitions += (topic -> numBrokerPartitions)
} case _ => numBrokerPartitions = TreeSet[Partition]() ++ bp
case None => // no brokers currently registered for this topic. Find the list of all brokers in the cluster. }
numBrokerPartitions = bootstrapWithExistingBrokers(topic) case None => // no brokers currently registered for this topic. Find the list of all brokers in the cluster.
topicBrokerPartitions += (topic -> numBrokerPartitions) numBrokerPartitions = bootstrapWithExistingBrokers(topic)
topicBrokerPartitions += (topic -> numBrokerPartitions)
}
numBrokerPartitions
} }
numBrokerPartitions
} }
/** /**
@ -113,7 +116,11 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In
* @param brokerId the broker for which the info is to be returned * @param brokerId the broker for which the info is to be returned
* @return host and port of brokerId * @return host and port of brokerId
*/ */
def getBrokerInfo(brokerId: Int): Option[Broker] = allBrokers.get(brokerId) def getBrokerInfo(brokerId: Int): Option[Broker] = {
zkWatcherLock synchronized {
allBrokers.get(brokerId)
}
}
/** /**
* Generate a mapping from broker id to the host and port for all brokers * Generate a mapping from broker id to the host and port for all brokers
@ -123,18 +130,28 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In
def close = zkClient.close def close = zkClient.close
def updateInfo = {
zkWatcherLock synchronized {
topicBrokerPartitions = getZKTopicPartitionInfo
allBrokers = getZKBrokerInfo
}
}
private def bootstrapWithExistingBrokers(topic: String): scala.collection.immutable.SortedSet[Partition] = { private def bootstrapWithExistingBrokers(topic: String): scala.collection.immutable.SortedSet[Partition] = {
logger.debug("Currently, no brokers are registered under topic: " + topic) if(logger.isDebugEnabled) logger.debug("Currently, no brokers are registered under topic: " + topic)
logger.debug("Bootstrapping topic: " + topic + " with available brokers in the cluster with default " + if(logger.isDebugEnabled)
logger.debug("Bootstrapping topic: " + topic + " with available brokers in the cluster with default " +
"number of partitions = 1") "number of partitions = 1")
val allBrokersIds = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerIdsPath) val allBrokersIds = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerIdsPath)
logger.trace("List of all brokers currently registered in zookeeper = " + allBrokersIds.toString) if(logger.isTraceEnabled)
logger.trace("List of all brokers currently registered in zookeeper = " + allBrokersIds.toString)
// since we do not have the in formation about number of partitions on these brokers, just assume single partition // since we do not have the in formation about number of partitions on these brokers, just assume single partition
// i.e. pick partition 0 from each broker as a candidate // i.e. pick partition 0 from each broker as a candidate
val numBrokerPartitions = TreeSet[Partition]() ++ allBrokersIds.map(b => new Partition(b.toInt, 0)) val numBrokerPartitions = TreeSet[Partition]() ++ allBrokersIds.map(b => new Partition(b.toInt, 0))
// add the rest of the available brokers with default 1 partition for this topic, so all of the brokers // add the rest of the available brokers with default 1 partition for this topic, so all of the brokers
// participate in hosting this topic. // participate in hosting this topic.
logger.debug("Adding following broker id, partition id for NEW topic: " + topic + "=" + numBrokerPartitions.toString) if(logger.isDebugEnabled)
logger.debug("Adding following broker id, partition id for NEW topic: " + topic + "=" + numBrokerPartitions.toString)
numBrokerPartitions numBrokerPartitions
} }
@ -154,7 +171,8 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In
val numPartitions = brokerList.map(bid => ZkUtils.readData(zkClient, brokerTopicPath + "/" + bid).toInt) val numPartitions = brokerList.map(bid => ZkUtils.readData(zkClient, brokerTopicPath + "/" + bid).toInt)
val brokerPartitions = brokerList.map(bid => bid.toInt).zip(numPartitions) val brokerPartitions = brokerList.map(bid => bid.toInt).zip(numPartitions)
val sortedBrokerPartitions = brokerPartitions.sortWith((id1, id2) => id1._1 < id2._1) val sortedBrokerPartitions = brokerPartitions.sortWith((id1, id2) => id1._1 < id2._1)
logger.debug("Broker ids and # of partitions on each for topic: " + topic + " = " + sortedBrokerPartitions.toString) if(logger.isDebugEnabled)
logger.debug("Broker ids and # of partitions on each for topic: " + topic + " = " + sortedBrokerPartitions.toString)
var brokerParts = SortedSet.empty[Partition] var brokerParts = SortedSet.empty[Partition]
sortedBrokerPartitions.foreach { bp => sortedBrokerPartitions.foreach { bp =>
@ -164,7 +182,8 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In
} }
} }
brokerPartitionsPerTopic += (topic -> brokerParts) brokerPartitionsPerTopic += (topic -> brokerParts)
logger.debug("Sorted list of broker ids and partition ids on each for topic: " + topic + " = " + brokerParts.toString) if(logger.isDebugEnabled)
logger.debug("Sorted list of broker ids and partition ids on each for topic: " + topic + " = " + brokerParts.toString)
} }
brokerPartitionsPerTopic brokerPartitionsPerTopic
} }
@ -195,26 +214,35 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In
private var oldBrokerIdMap = collection.mutable.Map.empty[Int, Broker] ++ originalBrokerIdMap private var oldBrokerIdMap = collection.mutable.Map.empty[Int, Broker] ++ originalBrokerIdMap
private val logger = Logger.getLogger(classOf[BrokerTopicsListener]) private val logger = Logger.getLogger(classOf[BrokerTopicsListener])
logger.debug("[BrokerTopicsListener] Creating broker topics listener to watch the following paths - \n" + if(logger.isDebugEnabled)
"/broker/topics, /broker/topics/topic, /broker/ids") logger.debug("[BrokerTopicsListener] Creating broker topics listener to watch the following paths - \n" +
logger.debug("[BrokerTopicsListener] Initialized this broker topics listener with initial mapping of broker id to " + "/broker/topics, /broker/topics/topic, /broker/ids")
if(logger.isDebugEnabled)
logger.debug("[BrokerTopicsListener] Initialized this broker topics listener with initial mapping of broker id to " +
"partition id per topic with " + oldBrokerTopicPartitionsMap.toString) "partition id per topic with " + oldBrokerTopicPartitionsMap.toString)
@throws(classOf[Exception]) @throws(classOf[Exception])
def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { def handleChildChange(parentPath : String, currentChildren : java.util.List[String]) {
val curChilds: java.util.List[String] = if(currentChildren != null) currentChildren
else new java.util.ArrayList[String]()
zkWatcherLock synchronized { zkWatcherLock synchronized {
logger.trace("Watcher fired for path: " + parentPath) if(logger.isTraceEnabled)
logger.trace("Watcher fired for path: " + parentPath + " with change " + curChilds.toString)
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
parentPath match { parentPath match {
case "/brokers/topics" => // this is a watcher for /broker/topics path case "/brokers/topics" => // this is a watcher for /broker/topics path
val updatedTopics = asBuffer(curChilds) val updatedTopics = asBuffer(curChilds)
logger.debug("[BrokerTopicsListener] List of topics changed at " + parentPath + " Updated topics -> " + if(logger.isDebugEnabled) {
curChilds.toString) logger.debug("[BrokerTopicsListener] List of topics changed at " + parentPath + " Updated topics -> " +
logger.debug("[BrokerTopicsListener] Old list of topics: " + oldBrokerTopicPartitionsMap.keySet.toString) curChilds.toString)
logger.debug("[BrokerTopicsListener] Updated list of topics: " + updatedTopics.toSet.toString) logger.debug("[BrokerTopicsListener] Old list of topics: " + oldBrokerTopicPartitionsMap.keySet.toString)
logger.debug("[BrokerTopicsListener] Updated list of topics: " + updatedTopics.toSet.toString)
}
val newTopics = updatedTopics.toSet &~ oldBrokerTopicPartitionsMap.keySet val newTopics = updatedTopics.toSet &~ oldBrokerTopicPartitionsMap.keySet
logger.debug("[BrokerTopicsListener] List of newly registered topics: " + newTopics.toString) if(logger.isDebugEnabled)
logger.debug("[BrokerTopicsListener] List of newly registered topics: " + newTopics.toString)
newTopics.foreach { topic => newTopics.foreach { topic =>
val brokerTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic val brokerTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic
val brokerList = ZkUtils.getChildrenParentMayNotExist(zkClient, brokerTopicPath) val brokerList = ZkUtils.getChildrenParentMayNotExist(zkClient, brokerTopicPath)
@ -223,15 +251,17 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In
brokerTopicsListener) brokerTopicsListener)
} }
case "/brokers/ids" => // this is a watcher for /broker/ids path case "/brokers/ids" => // this is a watcher for /broker/ids path
logger.debug("[BrokerTopicsListener] List of brokers changed in the Kafka cluster " + parentPath + if(logger.isDebugEnabled)
"\t Currently registered list of brokers -> " + curChilds.toString) logger.debug("[BrokerTopicsListener] List of brokers changed in the Kafka cluster " + parentPath +
"\t Currently registered list of brokers -> " + curChilds.toString)
processBrokerChange(parentPath, curChilds) processBrokerChange(parentPath, curChilds)
case _ => case _ =>
val pathSplits = parentPath.split("/") val pathSplits = parentPath.split("/")
val topic = pathSplits.last val topic = pathSplits.last
if(pathSplits.length == 4 && pathSplits(2).equals("topics")) { if(pathSplits.length == 4 && pathSplits(2).equals("topics")) {
logger.debug("[BrokerTopicsListener] List of brokers changed at " + parentPath + "\t Currently registered " + if(logger.isDebugEnabled)
" list of brokers -> " + curChilds.toString + " for topic -> " + topic) logger.debug("[BrokerTopicsListener] List of brokers changed at " + parentPath + "\t Currently registered " +
" list of brokers -> " + curChilds.toString + " for topic -> " + topic)
processNewBrokerInExistingTopic(topic, asBuffer(curChilds)) processNewBrokerInExistingTopic(topic, asBuffer(curChilds))
} }
} }
@ -247,17 +277,18 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
val updatedBrokerList = asBuffer(curChilds).map(bid => bid.toInt) val updatedBrokerList = asBuffer(curChilds).map(bid => bid.toInt)
val newBrokers = updatedBrokerList.toSet &~ oldBrokerIdMap.keySet val newBrokers = updatedBrokerList.toSet &~ oldBrokerIdMap.keySet
logger.debug("[BrokerTopicsListener] List of newly registered brokers: " + newBrokers.toString) if(logger.isDebugEnabled) logger.debug("[BrokerTopicsListener] List of newly registered brokers: " + newBrokers.toString)
newBrokers.foreach { bid => newBrokers.foreach { bid =>
val brokerInfo = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + bid) val brokerInfo = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)
val brokerHostPort = brokerInfo.split(":") val brokerHostPort = brokerInfo.split(":")
allBrokers += (bid -> new Broker(bid, brokerHostPort(1), brokerHostPort(1), brokerHostPort(2).toInt)) allBrokers += (bid -> new Broker(bid, brokerHostPort(1), brokerHostPort(1), brokerHostPort(2).toInt))
logger.debug("[BrokerTopicsListener] Invoking the callback for broker: " + bid) if(logger.isDebugEnabled) logger.debug("[BrokerTopicsListener] Invoking the callback for broker: " + bid)
producerCbk(bid, brokerHostPort(1), brokerHostPort(2).toInt) producerCbk(bid, brokerHostPort(1), brokerHostPort(2).toInt)
} }
// remove dead brokers from the in memory list of live brokers // remove dead brokers from the in memory list of live brokers
val deadBrokers = oldBrokerIdMap.keySet &~ updatedBrokerList.toSet val deadBrokers = oldBrokerIdMap.keySet &~ updatedBrokerList.toSet
logger.debug("[BrokerTopicsListener] Deleting broker ids for dead brokers: " + deadBrokers.toString) if(logger.isDebugEnabled)
logger.debug("[BrokerTopicsListener] Deleting broker ids for dead brokers: " + deadBrokers.toString)
deadBrokers.foreach {bid => deadBrokers.foreach {bid =>
allBrokers = allBrokers - bid allBrokers = allBrokers - bid
// also remove this dead broker from particular topics // also remove this dead broker from particular topics
@ -266,7 +297,8 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In
case Some(oldBrokerPartitionList) => case Some(oldBrokerPartitionList) =>
val aliveBrokerPartitionList = oldBrokerPartitionList.filter(bp => bp.brokerId != bid) val aliveBrokerPartitionList = oldBrokerPartitionList.filter(bp => bp.brokerId != bid)
topicBrokerPartitions += (topic -> aliveBrokerPartitionList) topicBrokerPartitions += (topic -> aliveBrokerPartitionList)
logger.debug("[BrokerTopicsListener] Removing dead broker ids for topic: " + topic + "\t " + if(logger.isDebugEnabled)
logger.debug("[BrokerTopicsListener] Removing dead broker ids for topic: " + topic + "\t " +
"Updated list of broker id, partition id = " + aliveBrokerPartitionList.toString) "Updated list of broker id, partition id = " + aliveBrokerPartitionList.toString)
case None => case None =>
} }
@ -285,19 +317,23 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In
// find the old list of brokers for this topic // find the old list of brokers for this topic
oldBrokerTopicPartitionsMap.get(topic) match { oldBrokerTopicPartitionsMap.get(topic) match {
case Some(brokersParts) => case Some(brokersParts) =>
logger.debug("[BrokerTopicsListener] Old list of brokers: " + brokersParts.map(bp => bp.brokerId).toString) if(logger.isDebugEnabled)
logger.debug("[BrokerTopicsListener] Old list of brokers: " + brokersParts.map(bp => bp.brokerId).toString)
case None => case None =>
} }
val updatedBrokerList = curChilds.map(b => b.toInt) val updatedBrokerList = curChilds.map(b => b.toInt)
import ZKBrokerPartitionInfo._ import ZKBrokerPartitionInfo._
val updatedBrokerParts:SortedSet[Partition] = getBrokerPartitions(zkClient, topic, updatedBrokerList.toList) val updatedBrokerParts:SortedSet[Partition] = getBrokerPartitions(zkClient, topic, updatedBrokerList.toList)
logger.debug("[BrokerTopicsListener] Currently registered list of brokers for topic: " + topic + " are " + if(logger.isDebugEnabled)
curChilds.toString) logger.debug("[BrokerTopicsListener] Currently registered list of brokers for topic: " + topic + " are " +
curChilds.toString)
// update the number of partitions on existing brokers // update the number of partitions on existing brokers
var mergedBrokerParts: SortedSet[Partition] = TreeSet[Partition]() ++ updatedBrokerParts var mergedBrokerParts: SortedSet[Partition] = TreeSet[Partition]() ++ updatedBrokerParts
topicBrokerPartitions.get(topic) match { topicBrokerPartitions.get(topic) match {
case Some(oldBrokerParts) => case Some(oldBrokerParts) =>
logger.debug("[BrokerTopicsListener] Unregistered list of brokers for topic: " + topic + " are " + if(logger.isDebugEnabled)
logger.debug("[BrokerTopicsListener] Unregistered list of brokers for topic: " + topic + " are " +
oldBrokerParts.toString) oldBrokerParts.toString)
mergedBrokerParts = oldBrokerParts ++ updatedBrokerParts mergedBrokerParts = oldBrokerParts ++ updatedBrokerParts
case None => case None =>
@ -305,16 +341,24 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In
// keep only brokers that are alive // keep only brokers that are alive
mergedBrokerParts = mergedBrokerParts.filter(bp => allBrokers.contains(bp.brokerId)) mergedBrokerParts = mergedBrokerParts.filter(bp => allBrokers.contains(bp.brokerId))
topicBrokerPartitions += (topic -> mergedBrokerParts) topicBrokerPartitions += (topic -> mergedBrokerParts)
logger.debug("[BrokerTopicsListener] List of broker partitions for topic: " + topic + " are " + mergedBrokerParts.toString) if(logger.isDebugEnabled)
logger.debug("[BrokerTopicsListener] List of broker partitions for topic: " + topic + " are " +
mergedBrokerParts.toString)
} }
def resetState = { def resetState = {
logger.debug("[BrokerTopicsListener] Before reseting broker topic partitions state " + oldBrokerTopicPartitionsMap.toString) if(logger.isTraceEnabled)
logger.trace("[BrokerTopicsListener] Before reseting broker topic partitions state " +
oldBrokerTopicPartitionsMap.toString)
oldBrokerTopicPartitionsMap = collection.mutable.Map.empty[String, SortedSet[Partition]] ++ topicBrokerPartitions oldBrokerTopicPartitionsMap = collection.mutable.Map.empty[String, SortedSet[Partition]] ++ topicBrokerPartitions
logger.debug("[BrokerTopicsListener] After reseting broker topic partitions state " + oldBrokerTopicPartitionsMap.toString) if(logger.isDebugEnabled)
logger.debug("[BrokerTopicsListener] Before reseting broker id map state " + oldBrokerIdMap.toString) logger.debug("[BrokerTopicsListener] After reseting broker topic partitions state " +
oldBrokerTopicPartitionsMap.toString)
if(logger.isTraceEnabled)
logger.trace("[BrokerTopicsListener] Before reseting broker id map state " + oldBrokerIdMap.toString)
oldBrokerIdMap = collection.mutable.Map.empty[Int, Broker] ++ allBrokers oldBrokerIdMap = collection.mutable.Map.empty[Int, Broker] ++ allBrokers
logger.debug("[BrokerTopicsListener] After reseting broker id map state " + oldBrokerIdMap.toString) if(logger.isDebugEnabled)
logger.debug("[BrokerTopicsListener] After reseting broker id map state " + oldBrokerIdMap.toString)
} }
} }
@ -322,7 +366,8 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In
* Handles the session expiration event in zookeeper * Handles the session expiration event in zookeeper
*/ */
class ZKSessionExpirationListener(val brokerTopicsListener: BrokerTopicsListener) class ZKSessionExpirationListener(val brokerTopicsListener: BrokerTopicsListener)
extends IZkStateListener { extends IZkStateListener {
@throws(classOf[Exception]) @throws(classOf[Exception])
def handleStateChanged(state: KeeperState) { def handleStateChanged(state: KeeperState) {
// do nothing, since zkclient will do reconnect for us. // do nothing, since zkclient will do reconnect for us.
@ -350,7 +395,7 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In
// NOTE: this is probably not required here. Since when we read from getZKTopicPartitionInfo() above, // NOTE: this is probably not required here. Since when we read from getZKTopicPartitionInfo() above,
// it automatically recreates the watchers there itself // it automatically recreates the watchers there itself
topicBrokerPartitions.keySet.foreach(topic => zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath + "/" + topic, topicBrokerPartitions.keySet.foreach(topic => zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath + "/" + topic,
brokerTopicsListener)) brokerTopicsListener))
// there is no need to re-register other listeners as they are listening on the child changes of // there is no need to re-register other listeners as they are listening on the child changes of
// permanent nodes // permanent nodes
} }

View File

@ -169,10 +169,10 @@ class ProducerTest extends JUnitSuite {
// 2 sync producers // 2 sync producers
val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]() val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]()
val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer]) val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
// it should send to random partition on broker 1 // it should send to partition 0 due to the StaticPartitioner
val messageList = new java.util.ArrayList[Message] val messageList = new java.util.ArrayList[Message]
messageList.add(new Message("t".getBytes())) messageList.add(new Message("t".getBytes()))
syncProducer1.send(topic, -1, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList)) syncProducer1.send(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList))
EasyMock.expectLastCall EasyMock.expectLastCall
syncProducer1.close syncProducer1.close
EasyMock.expectLastCall EasyMock.expectLastCall
@ -368,7 +368,7 @@ class ProducerTest extends JUnitSuite {
val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]() val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]]) val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
// it should send to partition 0 (first partition) on second broker i.e broker2 // it should send to partition 0 (first partition) on second broker i.e broker2
asyncProducer1.send(topic, "test1", -1) asyncProducer1.send(topic, "test1", 0)
EasyMock.expectLastCall EasyMock.expectLastCall
asyncProducer1.close asyncProducer1.close
EasyMock.expectLastCall EasyMock.expectLastCall
@ -554,7 +554,6 @@ class ProducerTest extends JUnitSuite {
val messageList = new java.util.ArrayList[Message] val messageList = new java.util.ArrayList[Message]
messageList.add(new Message("test".getBytes())) messageList.add(new Message("test".getBytes()))
tempProducer.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList)) tempProducer.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList))
Thread.sleep(500) Thread.sleep(500)
val messagesContent = new java.util.ArrayList[String] val messagesContent = new java.util.ArrayList[String]
@ -585,7 +584,7 @@ class ProducerTest extends JUnitSuite {
val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]]) val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]]) val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]])
// it should send to partition 0 (first partition) on second broker i.e broker2 // it should send to partition 0 (first partition) on second broker i.e broker2
asyncProducer1.send(topic, "test1", -1) asyncProducer1.send(topic, "test1", 0)
EasyMock.expectLastCall EasyMock.expectLastCall
asyncProducer1.close asyncProducer1.close
EasyMock.expectLastCall EasyMock.expectLastCall

View File

@ -155,8 +155,8 @@ class ProducerTest extends JUnitSuite {
// 2 sync producers // 2 sync producers
val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]() val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]()
val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer]) val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
// it should send to random partition on broker 1 // it should send to partition 0 due to the StaticPartitioner
syncProducer1.send(topic, -1, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("t".getBytes()))) syncProducer1.send(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("t".getBytes())))
EasyMock.expectLastCall EasyMock.expectLastCall
syncProducer1.close syncProducer1.close
EasyMock.expectLastCall EasyMock.expectLastCall
@ -374,7 +374,7 @@ class ProducerTest extends JUnitSuite {
val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]() val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]]) val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
// it should send to partition 0 (first partition) on second broker i.e broker2 // it should send to partition 0 (first partition) on second broker i.e broker2
asyncProducer1.send(topic, "test1", -1) asyncProducer1.send(topic, "test1", 0)
EasyMock.expectLastCall EasyMock.expectLastCall
asyncProducer1.close asyncProducer1.close
EasyMock.expectLastCall EasyMock.expectLastCall
@ -610,9 +610,9 @@ class ProducerTest extends JUnitSuite {
val serverConfig = new KafkaConfig(serverProps) { val serverConfig = new KafkaConfig(serverProps) {
override val numPartitions = 4 override val numPartitions = 4
} }
val server3 = TestUtils.createServer(serverConfig) val server3 = TestUtils.createServer(serverConfig)
Thread.sleep(500) Thread.sleep(500)
// send a message to the new broker to register it under topic "test-topic" // send a message to the new broker to register it under topic "test-topic"
val tempProps = new Properties() val tempProps = new Properties()
tempProps.put("host", "localhost") tempProps.put("host", "localhost")
@ -622,7 +622,6 @@ class ProducerTest extends JUnitSuite {
messages = new Message("test".getBytes()))) messages = new Message("test".getBytes())))
Thread.sleep(500) Thread.sleep(500)
producer.send(new ProducerData[String, String]("test-topic", "test-topic", Array("test1"))) producer.send(new ProducerData[String, String]("test-topic", "test-topic", Array("test1")))
producer.close producer.close
@ -648,7 +647,7 @@ class ProducerTest extends JUnitSuite {
val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]() val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]]) val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
// it should send to partition 0 (first partition) on second broker i.e broker2 // it should send to partition 0 (first partition) on second broker i.e broker2
asyncProducer1.send(topic, "test1", -1) asyncProducer1.send(topic, "test1", 0)
EasyMock.expectLastCall EasyMock.expectLastCall
asyncProducer1.close asyncProducer1.close
EasyMock.expectLastCall EasyMock.expectLastCall