From dd62e77f90f5f6fb9050627a9cfa40b35dae67b1 Mon Sep 17 00:00:00 2001 From: Neha Narkhede Date: Tue, 4 Oct 2011 22:43:00 +0000 Subject: [PATCH] KAFKA-129 ZK-based producer can throw an unexpceted exception when sending a message; patched by nehanarkhede; reviewed by junrao git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1178997 13f79535-47bb-0310-9956-ffa450edef68 --- .../kafka/producer/BrokerPartitionInfo.scala | 9 ++ .../producer/ConfigBrokerPartitionInfo.scala | 6 +- .../main/scala/kafka/producer/Producer.scala | 94 +++++++---- .../scala/kafka/producer/ProducerConfig.scala | 14 +- .../producer/ZKBrokerPartitionInfo.scala | 151 ++++++++++++------ .../kafka/javaapi/producer/ProducerTest.scala | 9 +- .../unit/kafka/producer/ProducerTest.scala | 11 +- 7 files changed, 198 insertions(+), 96 deletions(-) diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala index a28e2dcbb2a..e04440a6ccc 100644 --- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala +++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala @@ -43,6 +43,15 @@ trait BrokerPartitionInfo { */ def getAllBrokerInfo: Map[Int, Broker] + /** + * This is relevant to the ZKBrokerPartitionInfo. It updates the ZK cache + * by reading from zookeeper and recreating the data structures. This API + * is invoked by the producer, when it detects that the ZK cache of + * ZKBrokerPartitionInfo is stale. + * + */ + def updateInfo + /** * Cleanup */ diff --git a/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala index 3ec780c8502..3180de3cde3 100644 --- a/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala +++ b/core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala @@ -53,13 +53,15 @@ private[producer] class ConfigBrokerPartitionInfo(config: ProducerConfig) extend def close {} + def updateInfo = {} + /** * Generate a sequence of (brokerId, numPartitions) for all brokers * specified in the producer configuration * @return sequence of (brokerId, numPartitions) */ private def getConfigTopicPartitionInfo(): SortedSet[Partition] = { - val brokerInfoList = config.brokerPartitionInfo.split(",") + val brokerInfoList = config.brokerList.split(",") if(brokerInfoList.size == 0) throw new InvalidConfigException("broker.list is empty") // check if each individual broker info is valid => (brokerId: brokerHost: brokerPort) brokerInfoList.foreach { bInfo => @@ -84,7 +86,7 @@ private[producer] class ConfigBrokerPartitionInfo(config: ProducerConfig) extend */ private def getConfigBrokerInfo(): Map[Int, Broker] = { val brokerInfo = new HashMap[Int, Broker]() - val brokerInfoList = config.brokerPartitionInfo.split(",") + val brokerInfoList = config.brokerList.split(",") brokerInfoList.foreach{ bInfo => val brokerIdHostPort = bInfo.split(":") brokerInfo += (brokerIdHostPort(0).toInt -> new Broker(brokerIdHostPort(0).toInt, brokerIdHostPort(1), diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala index 277496174ea..0269d719133 100644 --- a/core/src/main/scala/kafka/producer/Producer.scala +++ b/core/src/main/scala/kafka/producer/Producer.scala @@ -23,7 +23,6 @@ import kafka.utils._ import java.util.Properties import kafka.cluster.{Partition, Broker} import java.util.concurrent.atomic.AtomicBoolean -import kafka.api.ProducerRequest import kafka.common.{NoBrokersForPartitionException, InvalidConfigException, InvalidPartitionException} class Producer[K,V](config: ProducerConfig, @@ -35,9 +34,9 @@ class Producer[K,V](config: ProducerConfig, { private val logger = Logger.getLogger(classOf[Producer[K, V]]) private val hasShutdown = new AtomicBoolean(false) - if(!Utils.propertyExists(config.zkConnect) && !Utils.propertyExists(config.brokerPartitionInfo)) + if(!Utils.propertyExists(config.zkConnect) && !Utils.propertyExists(config.brokerList)) throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified") - if (Utils.propertyExists(config.zkConnect) && Utils.propertyExists(config.brokerPartitionInfo)) + if (Utils.propertyExists(config.zkConnect) && Utils.propertyExists(config.brokerList)) logger.warn("Both zk.connect and broker.list provided (zk.connect takes precedence).") private val random = new java.util.Random // check if zookeeper based auto partition discovery is enabled @@ -94,47 +93,86 @@ class Producer[K,V](config: ProducerConfig, partitioner: Partitioner[K]) = this(config, if(partitioner == null) new DefaultPartitioner[K] else partitioner, new ProducerPool[V](config, encoder, eventHandler, cbkHandler), true, null) + /** * Sends the data, partitioned by key to the topic using either the * synchronous or the asynchronous producer * @param producerData the producer data object that encapsulates the topic, key and message data */ def send(producerData: ProducerData[K,V]*) { + zkEnabled match { + case true => zkSend(producerData: _*) + case false => configSend(producerData: _*) + } + } + + private def zkSend(producerData: ProducerData[K,V]*) { val producerPoolRequests = producerData.map { pd => - // find the number of broker partitions registered for this topic - logger.debug("Getting the number of broker partitions registered for topic: " + pd.getTopic) - val numBrokerPartitions = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic).toSeq - logger.debug("Broker partitions registered for topic: " + pd.getTopic + " = " + numBrokerPartitions) - val totalNumPartitions = numBrokerPartitions.length - if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey) + var brokerIdPartition: Option[Partition] = None + var brokerInfoOpt: Option[Broker] = None + + var numRetries: Int = 0 + while(numRetries <= config.zkReadRetries && brokerInfoOpt.isEmpty) { + if(numRetries > 0) { + logger.info("Try #" + numRetries + " ZK producer cache is stale. Refreshing it by reading from ZK again") + brokerPartitionInfo.updateInfo + } + + val numBrokerPartitions = getNumPartitionsForTopic(pd) + val totalNumPartitions = numBrokerPartitions.length - var brokerIdPartition: Partition = null - var partition: Int = 0 - if(zkEnabled) { - // get the partition id val partitionId = getPartition(pd.getKey, totalNumPartitions) - brokerIdPartition = numBrokerPartitions(partitionId) - val brokerInfo = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.brokerId).get - logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + - " on partition " + brokerIdPartition.partId) - partition = brokerIdPartition.partId - }else { - // randomly select a broker - val randomBrokerId = random.nextInt(totalNumPartitions) - brokerIdPartition = numBrokerPartitions(randomBrokerId) - val brokerInfo = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.brokerId).get + brokerIdPartition = Some(numBrokerPartitions(partitionId)) + brokerInfoOpt = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.get.brokerId) + numRetries += 1 + } - logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + - " on a randomly chosen partition") - partition = ProducerRequest.RandomPartition + brokerInfoOpt match { + case Some(brokerInfo) => + if(logger.isDebugEnabled) logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + + " on partition " + brokerIdPartition.get.partId) + case None => + throw new NoBrokersForPartitionException("Invalid Zookeeper state. Failed to get partition for topic: " + + pd.getTopic + " and key: " + pd.getKey) } producerPool.getProducerPoolData(pd.getTopic, - new Partition(brokerIdPartition.brokerId, partition), - pd.getData) + new Partition(brokerIdPartition.get.brokerId, brokerIdPartition.get.partId), + pd.getData) } producerPool.send(producerPoolRequests: _*) } + private def configSend(producerData: ProducerData[K,V]*) { + val producerPoolRequests = producerData.map { pd => + // find the broker partitions registered for this topic + val numBrokerPartitions = getNumPartitionsForTopic(pd) + val totalNumPartitions = numBrokerPartitions.length + + val partitionId = getPartition(pd.getKey, totalNumPartitions) + val brokerIdPartition = numBrokerPartitions(partitionId) + val brokerInfo = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.brokerId).get + + if(logger.isDebugEnabled) + logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + " on a partition " + + brokerIdPartition.partId) + producerPool.getProducerPoolData(pd.getTopic, + new Partition(brokerIdPartition.brokerId, brokerIdPartition.partId), + pd.getData) + } + producerPool.send(producerPoolRequests: _*) + } + + private def getNumPartitionsForTopic(pd: ProducerData[K,V]): Seq[Partition] = { + if(logger.isDebugEnabled) + logger.debug("Getting the number of broker partitions registered for topic: " + pd.getTopic) + val numBrokerPartitions = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic).toSeq + if(logger.isDebugEnabled) + logger.debug("Broker partitions registered for topic: " + pd.getTopic + " = " + numBrokerPartitions) + val totalNumPartitions = numBrokerPartitions.length + if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey) + numBrokerPartitions + } + /** * Retrieves the partition id and throws an InvalidPartitionException if * the value of partition is not between 0 and numPartitions-1 diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala index 4548f33bcec..b62143940e0 100644 --- a/core/src/main/scala/kafka/producer/ProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala @@ -28,8 +28,8 @@ class ProducerConfig(val props: Properties) extends ZKConfig(props) /** For bypassing zookeeper based auto partition discovery, use this config * * to pass in static broker and per-broker partition information. Format- * * brokerid1:host1:port1, brokerid2:host2:port2*/ - val brokerPartitionInfo = Utils.getString(props, "broker.list", null) - if(brokerPartitionInfo != null && Utils.getString(props, "partitioner.class", null) != null) + val brokerList = Utils.getString(props, "broker.list", null) + if(brokerList != null && Utils.getString(props, "partitioner.class", null) != null) throw new InvalidConfigException("partitioner.class cannot be used when broker.list is set") /** the partitioner class for partitioning events amongst sub-topics */ @@ -58,4 +58,14 @@ class ProducerConfig(val props: Properties) extends ZKConfig(props) * If the compression codec is NoCompressionCodec, compression is disabled for all topics */ val compressedTopics = Utils.getCSVList(Utils.getString(props, "compressed.topics", null)) + + /** + * The producer using the zookeeper software load balancer maintains a ZK cache that gets + * updated by the zookeeper watcher listeners. During some events like a broker bounce, the + * producer ZK cache can get into an inconsistent state, for a small time period. In this time + * period, it could end up picking a broker partition that is unavailable. When this happens, the + * ZK cache needs to be updated. + * This parameter specifies the number of times the producer attempts to refresh this ZK cache. + */ + val zkReadRetries = Utils.getInt(props, "zk.read.num.retries", 3) } diff --git a/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala index 1610a4c2e15..84d7d8766e0 100644 --- a/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala +++ b/core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala @@ -13,7 +13,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package kafka.producer import kafka.utils.{ZKStringSerializer, ZkUtils, ZKConfig} @@ -27,6 +27,7 @@ import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} import collection.SortedSet private[producer] object ZKBrokerPartitionInfo { + /** * Generate a mapping from broker id to (brokerId, numPartitions) for the list of brokers * specified @@ -49,7 +50,7 @@ private[producer] object ZKBrokerPartitionInfo { } } brokerParts - } + } } /** @@ -89,22 +90,24 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In * @return a sequence of (brokerId, numPartitions). Returns a zero-length * sequence if no brokers are available. */ - def getBrokerPartitionInfo(topic: String): scala.collection.immutable.SortedSet[Partition] = { - val brokerPartitions = topicBrokerPartitions.get(topic) - var numBrokerPartitions = SortedSet.empty[Partition] - brokerPartitions match { - case Some(bp) => - bp.size match { - case 0 => // no brokers currently registered for this topic. Find the list of all brokers in the cluster. - numBrokerPartitions = bootstrapWithExistingBrokers(topic) - topicBrokerPartitions += (topic -> numBrokerPartitions) - case _ => numBrokerPartitions = TreeSet[Partition]() ++ bp - } - case None => // no brokers currently registered for this topic. Find the list of all brokers in the cluster. - numBrokerPartitions = bootstrapWithExistingBrokers(topic) - topicBrokerPartitions += (topic -> numBrokerPartitions) + def getBrokerPartitionInfo(topic: String): SortedSet[Partition] = { + zkWatcherLock synchronized { + val brokerPartitions = topicBrokerPartitions.get(topic) + var numBrokerPartitions = SortedSet.empty[Partition] + brokerPartitions match { + case Some(bp) => + bp.size match { + case 0 => // no brokers currently registered for this topic. Find the list of all brokers in the cluster. + numBrokerPartitions = bootstrapWithExistingBrokers(topic) + topicBrokerPartitions += (topic -> numBrokerPartitions) + case _ => numBrokerPartitions = TreeSet[Partition]() ++ bp + } + case None => // no brokers currently registered for this topic. Find the list of all brokers in the cluster. + numBrokerPartitions = bootstrapWithExistingBrokers(topic) + topicBrokerPartitions += (topic -> numBrokerPartitions) + } + numBrokerPartitions } - numBrokerPartitions } /** @@ -113,7 +116,11 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In * @param brokerId the broker for which the info is to be returned * @return host and port of brokerId */ - def getBrokerInfo(brokerId: Int): Option[Broker] = allBrokers.get(brokerId) + def getBrokerInfo(brokerId: Int): Option[Broker] = { + zkWatcherLock synchronized { + allBrokers.get(brokerId) + } + } /** * Generate a mapping from broker id to the host and port for all brokers @@ -123,18 +130,28 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In def close = zkClient.close + def updateInfo = { + zkWatcherLock synchronized { + topicBrokerPartitions = getZKTopicPartitionInfo + allBrokers = getZKBrokerInfo + } + } + private def bootstrapWithExistingBrokers(topic: String): scala.collection.immutable.SortedSet[Partition] = { - logger.debug("Currently, no brokers are registered under topic: " + topic) - logger.debug("Bootstrapping topic: " + topic + " with available brokers in the cluster with default " + + if(logger.isDebugEnabled) logger.debug("Currently, no brokers are registered under topic: " + topic) + if(logger.isDebugEnabled) + logger.debug("Bootstrapping topic: " + topic + " with available brokers in the cluster with default " + "number of partitions = 1") val allBrokersIds = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerIdsPath) - logger.trace("List of all brokers currently registered in zookeeper = " + allBrokersIds.toString) + if(logger.isTraceEnabled) + logger.trace("List of all brokers currently registered in zookeeper = " + allBrokersIds.toString) // since we do not have the in formation about number of partitions on these brokers, just assume single partition // i.e. pick partition 0 from each broker as a candidate val numBrokerPartitions = TreeSet[Partition]() ++ allBrokersIds.map(b => new Partition(b.toInt, 0)) // add the rest of the available brokers with default 1 partition for this topic, so all of the brokers // participate in hosting this topic. - logger.debug("Adding following broker id, partition id for NEW topic: " + topic + "=" + numBrokerPartitions.toString) + if(logger.isDebugEnabled) + logger.debug("Adding following broker id, partition id for NEW topic: " + topic + "=" + numBrokerPartitions.toString) numBrokerPartitions } @@ -154,7 +171,8 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In val numPartitions = brokerList.map(bid => ZkUtils.readData(zkClient, brokerTopicPath + "/" + bid).toInt) val brokerPartitions = brokerList.map(bid => bid.toInt).zip(numPartitions) val sortedBrokerPartitions = brokerPartitions.sortWith((id1, id2) => id1._1 < id2._1) - logger.debug("Broker ids and # of partitions on each for topic: " + topic + " = " + sortedBrokerPartitions.toString) + if(logger.isDebugEnabled) + logger.debug("Broker ids and # of partitions on each for topic: " + topic + " = " + sortedBrokerPartitions.toString) var brokerParts = SortedSet.empty[Partition] sortedBrokerPartitions.foreach { bp => @@ -164,7 +182,8 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In } } brokerPartitionsPerTopic += (topic -> brokerParts) - logger.debug("Sorted list of broker ids and partition ids on each for topic: " + topic + " = " + brokerParts.toString) + if(logger.isDebugEnabled) + logger.debug("Sorted list of broker ids and partition ids on each for topic: " + topic + " = " + brokerParts.toString) } brokerPartitionsPerTopic } @@ -195,26 +214,35 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In private var oldBrokerIdMap = collection.mutable.Map.empty[Int, Broker] ++ originalBrokerIdMap private val logger = Logger.getLogger(classOf[BrokerTopicsListener]) - logger.debug("[BrokerTopicsListener] Creating broker topics listener to watch the following paths - \n" + - "/broker/topics, /broker/topics/topic, /broker/ids") - logger.debug("[BrokerTopicsListener] Initialized this broker topics listener with initial mapping of broker id to " + + if(logger.isDebugEnabled) + logger.debug("[BrokerTopicsListener] Creating broker topics listener to watch the following paths - \n" + + "/broker/topics, /broker/topics/topic, /broker/ids") + if(logger.isDebugEnabled) + logger.debug("[BrokerTopicsListener] Initialized this broker topics listener with initial mapping of broker id to " + "partition id per topic with " + oldBrokerTopicPartitionsMap.toString) @throws(classOf[Exception]) - def handleChildChange(parentPath : String, curChilds : java.util.List[String]) { + def handleChildChange(parentPath : String, currentChildren : java.util.List[String]) { + val curChilds: java.util.List[String] = if(currentChildren != null) currentChildren + else new java.util.ArrayList[String]() + zkWatcherLock synchronized { - logger.trace("Watcher fired for path: " + parentPath) + if(logger.isTraceEnabled) + logger.trace("Watcher fired for path: " + parentPath + " with change " + curChilds.toString) import scala.collection.JavaConversions._ parentPath match { case "/brokers/topics" => // this is a watcher for /broker/topics path val updatedTopics = asBuffer(curChilds) - logger.debug("[BrokerTopicsListener] List of topics changed at " + parentPath + " Updated topics -> " + - curChilds.toString) - logger.debug("[BrokerTopicsListener] Old list of topics: " + oldBrokerTopicPartitionsMap.keySet.toString) - logger.debug("[BrokerTopicsListener] Updated list of topics: " + updatedTopics.toSet.toString) + if(logger.isDebugEnabled) { + logger.debug("[BrokerTopicsListener] List of topics changed at " + parentPath + " Updated topics -> " + + curChilds.toString) + logger.debug("[BrokerTopicsListener] Old list of topics: " + oldBrokerTopicPartitionsMap.keySet.toString) + logger.debug("[BrokerTopicsListener] Updated list of topics: " + updatedTopics.toSet.toString) + } val newTopics = updatedTopics.toSet &~ oldBrokerTopicPartitionsMap.keySet - logger.debug("[BrokerTopicsListener] List of newly registered topics: " + newTopics.toString) + if(logger.isDebugEnabled) + logger.debug("[BrokerTopicsListener] List of newly registered topics: " + newTopics.toString) newTopics.foreach { topic => val brokerTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic val brokerList = ZkUtils.getChildrenParentMayNotExist(zkClient, brokerTopicPath) @@ -223,15 +251,17 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In brokerTopicsListener) } case "/brokers/ids" => // this is a watcher for /broker/ids path - logger.debug("[BrokerTopicsListener] List of brokers changed in the Kafka cluster " + parentPath + - "\t Currently registered list of brokers -> " + curChilds.toString) + if(logger.isDebugEnabled) + logger.debug("[BrokerTopicsListener] List of brokers changed in the Kafka cluster " + parentPath + + "\t Currently registered list of brokers -> " + curChilds.toString) processBrokerChange(parentPath, curChilds) case _ => val pathSplits = parentPath.split("/") val topic = pathSplits.last if(pathSplits.length == 4 && pathSplits(2).equals("topics")) { - logger.debug("[BrokerTopicsListener] List of brokers changed at " + parentPath + "\t Currently registered " + - " list of brokers -> " + curChilds.toString + " for topic -> " + topic) + if(logger.isDebugEnabled) + logger.debug("[BrokerTopicsListener] List of brokers changed at " + parentPath + "\t Currently registered " + + " list of brokers -> " + curChilds.toString + " for topic -> " + topic) processNewBrokerInExistingTopic(topic, asBuffer(curChilds)) } } @@ -247,17 +277,18 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In import scala.collection.JavaConversions._ val updatedBrokerList = asBuffer(curChilds).map(bid => bid.toInt) val newBrokers = updatedBrokerList.toSet &~ oldBrokerIdMap.keySet - logger.debug("[BrokerTopicsListener] List of newly registered brokers: " + newBrokers.toString) + if(logger.isDebugEnabled) logger.debug("[BrokerTopicsListener] List of newly registered brokers: " + newBrokers.toString) newBrokers.foreach { bid => val brokerInfo = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + bid) val brokerHostPort = brokerInfo.split(":") allBrokers += (bid -> new Broker(bid, brokerHostPort(1), brokerHostPort(1), brokerHostPort(2).toInt)) - logger.debug("[BrokerTopicsListener] Invoking the callback for broker: " + bid) + if(logger.isDebugEnabled) logger.debug("[BrokerTopicsListener] Invoking the callback for broker: " + bid) producerCbk(bid, brokerHostPort(1), brokerHostPort(2).toInt) } // remove dead brokers from the in memory list of live brokers val deadBrokers = oldBrokerIdMap.keySet &~ updatedBrokerList.toSet - logger.debug("[BrokerTopicsListener] Deleting broker ids for dead brokers: " + deadBrokers.toString) + if(logger.isDebugEnabled) + logger.debug("[BrokerTopicsListener] Deleting broker ids for dead brokers: " + deadBrokers.toString) deadBrokers.foreach {bid => allBrokers = allBrokers - bid // also remove this dead broker from particular topics @@ -266,7 +297,8 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In case Some(oldBrokerPartitionList) => val aliveBrokerPartitionList = oldBrokerPartitionList.filter(bp => bp.brokerId != bid) topicBrokerPartitions += (topic -> aliveBrokerPartitionList) - logger.debug("[BrokerTopicsListener] Removing dead broker ids for topic: " + topic + "\t " + + if(logger.isDebugEnabled) + logger.debug("[BrokerTopicsListener] Removing dead broker ids for topic: " + topic + "\t " + "Updated list of broker id, partition id = " + aliveBrokerPartitionList.toString) case None => } @@ -285,19 +317,23 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In // find the old list of brokers for this topic oldBrokerTopicPartitionsMap.get(topic) match { case Some(brokersParts) => - logger.debug("[BrokerTopicsListener] Old list of brokers: " + brokersParts.map(bp => bp.brokerId).toString) + if(logger.isDebugEnabled) + logger.debug("[BrokerTopicsListener] Old list of brokers: " + brokersParts.map(bp => bp.brokerId).toString) case None => } + val updatedBrokerList = curChilds.map(b => b.toInt) import ZKBrokerPartitionInfo._ val updatedBrokerParts:SortedSet[Partition] = getBrokerPartitions(zkClient, topic, updatedBrokerList.toList) - logger.debug("[BrokerTopicsListener] Currently registered list of brokers for topic: " + topic + " are " + - curChilds.toString) + if(logger.isDebugEnabled) + logger.debug("[BrokerTopicsListener] Currently registered list of brokers for topic: " + topic + " are " + + curChilds.toString) // update the number of partitions on existing brokers var mergedBrokerParts: SortedSet[Partition] = TreeSet[Partition]() ++ updatedBrokerParts topicBrokerPartitions.get(topic) match { case Some(oldBrokerParts) => - logger.debug("[BrokerTopicsListener] Unregistered list of brokers for topic: " + topic + " are " + + if(logger.isDebugEnabled) + logger.debug("[BrokerTopicsListener] Unregistered list of brokers for topic: " + topic + " are " + oldBrokerParts.toString) mergedBrokerParts = oldBrokerParts ++ updatedBrokerParts case None => @@ -305,16 +341,24 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In // keep only brokers that are alive mergedBrokerParts = mergedBrokerParts.filter(bp => allBrokers.contains(bp.brokerId)) topicBrokerPartitions += (topic -> mergedBrokerParts) - logger.debug("[BrokerTopicsListener] List of broker partitions for topic: " + topic + " are " + mergedBrokerParts.toString) + if(logger.isDebugEnabled) + logger.debug("[BrokerTopicsListener] List of broker partitions for topic: " + topic + " are " + + mergedBrokerParts.toString) } def resetState = { - logger.debug("[BrokerTopicsListener] Before reseting broker topic partitions state " + oldBrokerTopicPartitionsMap.toString) + if(logger.isTraceEnabled) + logger.trace("[BrokerTopicsListener] Before reseting broker topic partitions state " + + oldBrokerTopicPartitionsMap.toString) oldBrokerTopicPartitionsMap = collection.mutable.Map.empty[String, SortedSet[Partition]] ++ topicBrokerPartitions - logger.debug("[BrokerTopicsListener] After reseting broker topic partitions state " + oldBrokerTopicPartitionsMap.toString) - logger.debug("[BrokerTopicsListener] Before reseting broker id map state " + oldBrokerIdMap.toString) + if(logger.isDebugEnabled) + logger.debug("[BrokerTopicsListener] After reseting broker topic partitions state " + + oldBrokerTopicPartitionsMap.toString) + if(logger.isTraceEnabled) + logger.trace("[BrokerTopicsListener] Before reseting broker id map state " + oldBrokerIdMap.toString) oldBrokerIdMap = collection.mutable.Map.empty[Int, Broker] ++ allBrokers - logger.debug("[BrokerTopicsListener] After reseting broker id map state " + oldBrokerIdMap.toString) + if(logger.isDebugEnabled) + logger.debug("[BrokerTopicsListener] After reseting broker id map state " + oldBrokerIdMap.toString) } } @@ -322,7 +366,8 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In * Handles the session expiration event in zookeeper */ class ZKSessionExpirationListener(val brokerTopicsListener: BrokerTopicsListener) - extends IZkStateListener { + extends IZkStateListener { + @throws(classOf[Exception]) def handleStateChanged(state: KeeperState) { // do nothing, since zkclient will do reconnect for us. @@ -350,7 +395,7 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In // NOTE: this is probably not required here. Since when we read from getZKTopicPartitionInfo() above, // it automatically recreates the watchers there itself topicBrokerPartitions.keySet.foreach(topic => zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath + "/" + topic, - brokerTopicsListener)) + brokerTopicsListener)) // there is no need to re-register other listeners as they are listening on the child changes of // permanent nodes } diff --git a/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala index a0ab883dcbe..35029e56f44 100644 --- a/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala @@ -169,10 +169,10 @@ class ProducerTest extends JUnitSuite { // 2 sync producers val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]() val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer]) - // it should send to random partition on broker 1 + // it should send to partition 0 due to the StaticPartitioner val messageList = new java.util.ArrayList[Message] messageList.add(new Message("t".getBytes())) - syncProducer1.send(topic, -1, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList)) + syncProducer1.send(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList)) EasyMock.expectLastCall syncProducer1.close EasyMock.expectLastCall @@ -368,7 +368,7 @@ class ProducerTest extends JUnitSuite { val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]() val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]]) // it should send to partition 0 (first partition) on second broker i.e broker2 - asyncProducer1.send(topic, "test1", -1) + asyncProducer1.send(topic, "test1", 0) EasyMock.expectLastCall asyncProducer1.close EasyMock.expectLastCall @@ -554,7 +554,6 @@ class ProducerTest extends JUnitSuite { val messageList = new java.util.ArrayList[Message] messageList.add(new Message("test".getBytes())) tempProducer.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList)) - Thread.sleep(500) val messagesContent = new java.util.ArrayList[String] @@ -585,7 +584,7 @@ class ProducerTest extends JUnitSuite { val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]]) val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]]) // it should send to partition 0 (first partition) on second broker i.e broker2 - asyncProducer1.send(topic, "test1", -1) + asyncProducer1.send(topic, "test1", 0) EasyMock.expectLastCall asyncProducer1.close EasyMock.expectLastCall diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 2342a80ca85..db0acdd8ed8 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -155,8 +155,8 @@ class ProducerTest extends JUnitSuite { // 2 sync producers val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]() val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer]) - // it should send to random partition on broker 1 - syncProducer1.send(topic, -1, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("t".getBytes()))) + // it should send to partition 0 due to the StaticPartitioner + syncProducer1.send(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("t".getBytes()))) EasyMock.expectLastCall syncProducer1.close EasyMock.expectLastCall @@ -374,7 +374,7 @@ class ProducerTest extends JUnitSuite { val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]() val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]]) // it should send to partition 0 (first partition) on second broker i.e broker2 - asyncProducer1.send(topic, "test1", -1) + asyncProducer1.send(topic, "test1", 0) EasyMock.expectLastCall asyncProducer1.close EasyMock.expectLastCall @@ -610,9 +610,9 @@ class ProducerTest extends JUnitSuite { val serverConfig = new KafkaConfig(serverProps) { override val numPartitions = 4 } + val server3 = TestUtils.createServer(serverConfig) Thread.sleep(500) - // send a message to the new broker to register it under topic "test-topic" val tempProps = new Properties() tempProps.put("host", "localhost") @@ -622,7 +622,6 @@ class ProducerTest extends JUnitSuite { messages = new Message("test".getBytes()))) Thread.sleep(500) - producer.send(new ProducerData[String, String]("test-topic", "test-topic", Array("test1"))) producer.close @@ -648,7 +647,7 @@ class ProducerTest extends JUnitSuite { val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]() val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]]) // it should send to partition 0 (first partition) on second broker i.e broker2 - asyncProducer1.send(topic, "test1", -1) + asyncProducer1.send(topic, "test1", 0) EasyMock.expectLastCall asyncProducer1.close EasyMock.expectLastCall