kafka-899; LeaderNotAvailableException the first time a new message for a partition is processed; patched by Jun Rao; reviewed by Neha Narkhede

This commit is contained in:
Jun Rao 2013-05-30 22:16:47 -07:00
parent d93cbc610a
commit 1caae2c2a1
3 changed files with 22 additions and 41 deletions

View File

@ -1,25 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.common
/**
* Thrown when a request is made for a topic, that hasn't been created in a Kafka cluster
*/
class UnknownTopicException(message: String) extends RuntimeException(message) {
def this() = this(null)
}

View File

@ -149,7 +149,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
try {
for (message <- messages) {
val topicPartitionsList = getPartitionListForTopic(message)
val partitionIndex = getPartition(message.key, topicPartitionsList)
val partitionIndex = getPartition(message.topic, message.key, topicPartitionsList)
val brokerPartition = topicPartitionsList(partitionIndex)
// postpone the failure until the send operation, so that requests for other brokers are handled correctly
@ -177,9 +177,9 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
}
Some(ret)
}catch { // Swallow recoverable exceptions and return None so that they can be retried.
case ute: UnknownTopicException => warn("Failed to collate messages by topic,partition due to", ute); None
case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to", lnae); None
case oe => error("Failed to collate messages by topic, partition due to", oe); None
case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic,partition due to: " + ute.getMessage); None
case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to: " + lnae.getMessage); None
case oe => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None
}
}
@ -200,25 +200,24 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
* @param topicPartitionList the list of available partitions
* @return the partition id
*/
private def getPartition(key: K, topicPartitionList: Seq[PartitionAndLeader]): Int = {
private def getPartition(topic: String, key: K, topicPartitionList: Seq[PartitionAndLeader]): Int = {
val numPartitions = topicPartitionList.size
if(numPartitions <= 0)
throw new UnknownTopicOrPartitionException("Invalid number of partitions: " + numPartitions +
"\n Valid values are > 0")
throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist")
val partition =
if(key == null) {
// If the key is null, we don't really need a partitioner so we just send to the next
// available partition
val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
if (availablePartitions.isEmpty)
throw new LeaderNotAvailableException("No leader for any partition")
throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
val index = Utils.abs(partitionCounter.getAndIncrement()) % availablePartitions.size
availablePartitions(index).partitionId
} else
partitioner.partition(key, numPartitions)
if(partition < 0 || partition >= numPartitions)
throw new UnknownTopicOrPartitionException("Invalid partition id : " + partition +
"\n Valid values are in the range inclusive [0, " + (numPartitions-1) + "]")
throw new UnknownTopicOrPartitionException("Invalid partition id: " + partition + " for topic " + topic +
"; Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]")
partition
}
@ -253,11 +252,18 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message =>
trace("Successfully sent message: %s".format(Utils.readString(message.message.payload)))))
}
failedTopicPartitions = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
.map(partitionStatus => partitionStatus._1)
if(failedTopicPartitions.size > 0)
error("Produce request with correlation id %d failed due to response %s. List of failed topic partitions is %s"
.format(currentCorrelationId, response.toString, failedTopicPartitions.mkString(",")))
val failedPartitionsAndStatus = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
failedTopicPartitions = failedPartitionsAndStatus.map(partitionStatus => partitionStatus._1)
if(failedTopicPartitions.size > 0) {
val errorString = failedPartitionsAndStatus
.sortWith((p1, p2) => p1._1.topic.compareTo(p2._1.topic) < 0 ||
(p1._1.topic.compareTo(p2._1.topic) == 0 && p1._1.partition < p2._1.partition))
.map{
case(topicAndPartition, status) =>
topicAndPartition.toString + ": " + ErrorMapping.exceptionFor(status.error).getClass.getName
}.mkString(",")
warn("Produce request with correlation id %d failed due to %s".format(currentCorrelationId, errorString))
}
failedTopicPartitions
} else
Seq.empty[TopicAndPartition]

View File

@ -50,7 +50,7 @@ class ProducerSendThread[K,V](val threadName: String,
}
def shutdown = {
info("Beging shutting down ProducerSendThread")
info("Begin shutting down ProducerSendThread")
queue.put(shutdownCommand)
shutdownLatch.await
info("Shutdown ProducerSendThread complete")