KAFKA-774 Periodic refresh of topic metadata on the producer does not check for error code in the response; reviewed by Swapnil Ghike

This commit is contained in:
Neha Narkhede 2013-02-26 16:34:11 -08:00
parent 37ca9db782
commit 6989dac889
4 changed files with 33 additions and 37 deletions

View File

@ -87,55 +87,50 @@ object AdminUtils extends Logging {
case e2 => throw new AdministrationException(e2.toString)
}
}
def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient): TopicMetadata =
def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient): TopicMetadata =
fetchTopicMetadataFromZk(topic, zkClient, new mutable.HashMap[Int, Broker])
def fetchTopicMetadataFromZk(topics: Set[String], zkClient: ZkClient): Set[TopicMetadata] = {
val cachedBrokerInfo = new mutable.HashMap[Int, Broker]()
topics.map(topic => fetchTopicMetadataFromZk(topic, zkClient, cachedBrokerInfo))
}
private def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient, cachedBrokerInfo: mutable.HashMap[Int, Broker]): TopicMetadata = {
if(ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic).get
val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
val partitionMetadata = sortedPartitions.map { partitionMap =>
val partition = partitionMap._1
val replicas = partitionMap._2
val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition)
val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
val partition = partitionMap._1
val replicas = partitionMap._2
val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition)
val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
var leaderInfo: Option[Broker] = None
var replicaInfo: Seq[Broker] = Nil
var isrInfo: Seq[Broker] = Nil
try {
var leaderInfo: Option[Broker] = None
var replicaInfo: Seq[Broker] = Nil
var isrInfo: Seq[Broker] = Nil
try {
leaderInfo = leader match {
case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head)
case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition)
try {
leaderInfo = leader match {
case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head)
case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition)
}
} catch {
case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d".format(topic, partition), e)
}
try {
replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt))
isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas)
} catch {
case e => throw new ReplicaNotAvailableException(e)
}
} catch {
case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d".format(topic, partition))
}
try {
replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt))
isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas)
} catch {
case e => throw new ReplicaNotAvailableException(e)
}
new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
} catch {
case e: ReplicaNotAvailableException =>
case e =>
error("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e)
new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo,
ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
case le: LeaderNotAvailableException =>
new PartitionMetadata(partition, None, replicaInfo, isrInfo,
ErrorMapping.codeFor(le.getClass.asInstanceOf[Class[Throwable]]))
}
}
new TopicMetadata(topic, partitionMetadata)

View File

@ -20,6 +20,7 @@ package kafka.common
/**
* Thrown when a request is made for partition, but no leader exists for that partition
*/
class LeaderNotAvailableException(message: String) extends RuntimeException(message) {
def this() = this(null)
}
class LeaderNotAvailableException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
def this(message: String) = this(message, null)
def this() = this(null, null)
}

View File

@ -80,10 +80,11 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
if(tmd.errorCode == ErrorMapping.NoError){
topicPartitionInfo.put(tmd.topic, tmd)
} else
warn("Metadata for topic [%s] is erroneous: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode))
warn("Error while fetching metadata for topic [%s]: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode))
tmd.partitionsMetadata.foreach(pmd =>{
if (pmd.errorCode != ErrorMapping.NoError){
debug("Metadata for topic partition [%s, %d] is errornous: [%s]".format(tmd.topic, pmd.partitionId, pmd), ErrorMapping.exceptionFor(pmd.errorCode))
warn("Error while fetching metadata for topic partition [%s,%d]: [%s]".format(tmd.topic, pmd.partitionId, pmd),
ErrorMapping.exceptionFor(pmd.errorCode))
}
})
})

View File

@ -81,7 +81,6 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
}
if(outstandingProduceRequests.size > 0) {
producerStats.failedSendRate.mark()
val correlationIdEnd = correlationId.get()
error("Failed to send requests for topics %s with correlation ids in [%d,%d]".format(outstandingProduceRequests.map(_.topic).mkString(","),
correlationIdStart, correlationIdEnd-1))