Improve Kafka internal metrics; patched by Jun Rao; reviewed by Joel Koshy and Neha Narkhede; KAFKA-203

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1384202 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jun Rao 2012-09-13 04:27:13 +00:00
parent de927e2948
commit 2bc65dab67
43 changed files with 638 additions and 1004 deletions

View File

@ -105,7 +105,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
replicaId: Int = FetchRequest.DefaultReplicaId, replicaId: Int = FetchRequest.DefaultReplicaId,
maxWait: Int = FetchRequest.DefaultMaxWait, maxWait: Int = FetchRequest.DefaultMaxWait,
minBytes: Int = FetchRequest.DefaultMinBytes, minBytes: Int = FetchRequest.DefaultMinBytes,
offsetInfo: Seq[OffsetDetail] ) extends RequestOrResponse(Some(RequestKeys.Fetch)) { offsetInfo: Seq[OffsetDetail] ) extends RequestOrResponse(Some(RequestKeys.FetchKey)) {
// ensure that a topic "X" appears in at most one OffsetDetail // ensure that a topic "X" appears in at most one OffsetDetail
def validate() { def validate() {
@ -144,6 +144,8 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
def sizeInBytes: Int = 2 + 4 + (2 + clientId.length()) + 4 + 4 + 4 + offsetInfo.foldLeft(4)(_ + _.sizeInBytes()) def sizeInBytes: Int = 2 + 4 + (2 + clientId.length()) + 4 + 4 + 4 + offsetInfo.foldLeft(4)(_ + _.sizeInBytes())
def numPartitions: Int = offsetInfo.foldLeft(0)(_ + _.offsets.size) def numPartitions: Int = offsetInfo.foldLeft(0)(_ + _.offsets.size)
def isFromFollower(): Boolean = replicaId != FetchRequest.NonFollowerId
} }

View File

@ -94,7 +94,7 @@ case class LeaderAndIsrRequest (versionId: Short,
isInit: Boolean, isInit: Boolean,
ackTimeoutMs: Int, ackTimeoutMs: Int,
leaderAndISRInfos: Map[(String, Int), LeaderAndIsr]) leaderAndISRInfos: Map[(String, Int), LeaderAndIsr])
extends RequestOrResponse(Some(RequestKeys.LeaderAndISRRequest)) { extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
def this(isInit: Boolean, leaderAndISRInfos: Map[(String, Int), LeaderAndIsr]) = { def this(isInit: Boolean, leaderAndISRInfos: Map[(String, Int), LeaderAndIsr]) = {
this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, isInit, LeaderAndIsrRequest.DefaultAckTimeout, leaderAndISRInfos) this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, isInit, LeaderAndIsrRequest.DefaultAckTimeout, leaderAndISRInfos)
} }

View File

@ -45,7 +45,7 @@ case class OffsetRequest(versionId: Short = OffsetRequest.CurrentVersion,
topic: String, topic: String,
partition: Int, partition: Int,
time: Long, time: Long,
maxNumOffsets: Int) extends RequestOrResponse(Some(RequestKeys.Offsets)) { maxNumOffsets: Int) extends RequestOrResponse(Some(RequestKeys.OffsetsKey)) {
def this(topic: String, partition: Int, time: Long, maxNumOffsets: Int) = def this(topic: String, partition: Int, time: Long, maxNumOffsets: Int) =
this(OffsetRequest.CurrentVersion, OffsetRequest.DefaultClientId, topic, partition, time, maxNumOffsets) this(OffsetRequest.CurrentVersion, OffsetRequest.DefaultClientId, topic, partition, time, maxNumOffsets)

View File

@ -24,7 +24,7 @@ import kafka.utils._
object ProducerRequest { object ProducerRequest {
val CurrentVersion: Short = 0 val CurrentVersion: Short = 0
def readFrom(buffer: ByteBuffer): ProducerRequest = { def readFrom(buffer: ByteBuffer): ProducerRequest = {
val versionId: Short = buffer.getShort val versionId: Short = buffer.getShort
val correlationId: Int = buffer.getInt val correlationId: Int = buffer.getInt
@ -58,7 +58,7 @@ case class ProducerRequest( versionId: Short,
clientId: String, clientId: String,
requiredAcks: Short, requiredAcks: Short,
ackTimeoutMs: Int, ackTimeoutMs: Int,
data: Array[TopicData] ) extends RequestOrResponse(Some(RequestKeys.Produce)) { data: Array[TopicData] ) extends RequestOrResponse(Some(RequestKeys.ProduceKey)) {
def this(correlationId: Int, clientId: String, requiredAcks: Short, ackTimeoutMs: Int, data: Array[TopicData]) = def this(correlationId: Int, clientId: String, requiredAcks: Short, ackTimeoutMs: Int, data: Array[TopicData]) =
this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs, data) this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs, data)

View File

@ -17,11 +17,36 @@
package kafka.api package kafka.api
import kafka.common.KafkaException
import java.nio.ByteBuffer
object RequestKeys { object RequestKeys {
val Produce: Short = 0 val ProduceKey: Short = 0
val Fetch: Short = 1 val FetchKey: Short = 1
val Offsets: Short = 2 val OffsetsKey: Short = 2
val TopicMetadata: Short = 3 val MetadataKey: Short = 3
val LeaderAndISRRequest: Short = 4 val LeaderAndIsrKey: Short = 4
val StopReplicaRequest: Short = 5 val StopReplicaKey: Short = 5
val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
Map( ProduceKey -> ("Produce", ProducerRequest.readFrom),
FetchKey -> ("Fetch", FetchRequest.readFrom),
OffsetsKey -> ("Offsets", OffsetRequest.readFrom),
MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom),
LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom),
StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom) )
def nameForKey(key: Short): String = {
keyToNameAndDeserializerMap.get(key) match {
case Some(nameAndSerializer) => nameAndSerializer._1
case None => throw new KafkaException("Wrong request type %d".format(key))
}
}
def deserializerForKey(key: Short): (ByteBuffer) => RequestOrResponse = {
keyToNameAndDeserializerMap.get(key) match {
case Some(nameAndSerializer) => nameAndSerializer._2
case None => throw new KafkaException("Wrong request type %d".format(key))
}
}
} }

View File

@ -45,7 +45,7 @@ case class StopReplicaRequest(versionId: Short,
clientId: String, clientId: String,
ackTimeoutMs: Int, ackTimeoutMs: Int,
stopReplicaSet: Set[(String, Int)]) stopReplicaSet: Set[(String, Int)])
extends RequestOrResponse(Some(RequestKeys.StopReplicaRequest)) { extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) {
def this(stopReplicaSet: Set[(String, Int)]) = { def this(stopReplicaSet: Set[(String, Int)]) = {
this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout, stopReplicaSet) this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout, stopReplicaSet)
} }

View File

@ -78,7 +78,7 @@ case class TopicMetadataRequest(val versionId: Short,
val topics: Seq[String], val topics: Seq[String],
val detailedMetadata: DetailedMetadataRequest = NoSegmentMetadata, val detailedMetadata: DetailedMetadataRequest = NoSegmentMetadata,
val timestamp: Option[Long] = None, val count: Option[Int] = None) val timestamp: Option[Long] = None, val count: Option[Int] = None)
extends RequestOrResponse(Some(RequestKeys.TopicMetadata)){ extends RequestOrResponse(Some(RequestKeys.MetadataKey)){
def this(topics: Seq[String]) = def this(topics: Seq[String]) =
this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics, NoSegmentMetadata, None, None) this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics, NoSegmentMetadata, None, None)

View File

@ -22,6 +22,8 @@ import java.lang.Object
import kafka.api.LeaderAndIsr import kafka.api.LeaderAndIsr
import kafka.server.ReplicaManager import kafka.server.ReplicaManager
import kafka.common.ErrorMapping import kafka.common.ErrorMapping
import com.yammer.metrics.core.Gauge
import kafka.metrics.KafkaMetricsGroup
/** /**
* Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR
@ -29,7 +31,7 @@ import kafka.common.ErrorMapping
class Partition(val topic: String, class Partition(val topic: String,
val partitionId: Int, val partitionId: Int,
time: Time, time: Time,
val replicaManager: ReplicaManager) extends Logging { val replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup {
private val localBrokerId = replicaManager.config.brokerId private val localBrokerId = replicaManager.config.brokerId
private val logManager = replicaManager.logManager private val logManager = replicaManager.logManager
private val replicaFetcherManager = replicaManager.replicaFetcherManager private val replicaFetcherManager = replicaManager.replicaFetcherManager
@ -45,6 +47,20 @@ class Partition(val topic: String,
private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId) private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
newGauge(
topic + "-" + partitionId + "UnderReplicated",
new Gauge[Int] {
def value() = {
if (isUnderReplicated) 1 else 0
}
}
)
def isUnderReplicated(): Boolean = {
// TODO: need to pass in replication factor from controller
inSyncReplicas.size < replicaManager.config.defaultReplicationFactor
}
def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = { def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = {
val replicaOpt = getReplica(replicaId) val replicaOpt = getReplica(replicaId)
replicaOpt match { replicaOpt match {
@ -182,6 +198,7 @@ class Partition(val topic: String,
info("Expanding ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(","))) info("Expanding ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(",")))
// update ISR in ZK and cache // update ISR in ZK and cache
updateISR(newInSyncReplicas) updateISR(newInSyncReplicas)
replicaManager.isrExpandRate.mark()
} }
maybeIncrementLeaderHW(leaderReplica) maybeIncrementLeaderHW(leaderReplica)
case None => // nothing to do if no longer leader case None => // nothing to do if no longer leader
@ -240,6 +257,7 @@ class Partition(val topic: String,
updateISR(newInSyncReplicas) updateISR(newInSyncReplicas)
// we may need to increment high watermark since ISR could be down to 1 // we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(leaderReplica) maybeIncrementLeaderHW(leaderReplica)
replicaManager.isrShrinkRate.mark()
} }
case None => // do nothing if no longer leader case None => // do nothing if no longer leader
} }

View File

@ -36,7 +36,7 @@ class Replica(val brokerId: Int,
val topic = partition.topic val topic = partition.topic
val partitionId = partition.partitionId val partitionId = partition.partitionId
def logEndOffset_=(newLogEndOffset: Long) = { def logEndOffset_=(newLogEndOffset: Long) {
if (!isLocal) { if (!isLocal) {
logEndOffsetValue.set(newLogEndOffset) logEndOffsetValue.set(newLogEndOffset)
logEndOffsetUpdateTimeMsValue.set(time.milliseconds) logEndOffsetUpdateTimeMsValue.set(time.milliseconds)

View File

@ -18,7 +18,7 @@
package kafka.consumer package kafka.consumer
import scala.collection._ import scala.collection._
import kafka.utils.{Utils, Logging} import kafka.utils.Logging
import kafka.serializer.{DefaultDecoder, Decoder} import kafka.serializer.{DefaultDecoder, Decoder}
/** /**
@ -64,8 +64,6 @@ trait ConsumerConnector {
} }
object Consumer extends Logging { object Consumer extends Logging {
private val consumerStatsMBeanName = "kafka:type=kafka.ConsumerStats"
/** /**
* Create a ConsumerConnector * Create a ConsumerConnector
* *
@ -74,7 +72,6 @@ object Consumer extends Logging {
*/ */
def create(config: ConsumerConfig): ConsumerConnector = { def create(config: ConsumerConfig): ConsumerConnector = {
val consumerConnect = new ZookeeperConsumerConnector(config) val consumerConnect = new ZookeeperConsumerConnector(config)
Utils.registerMBean(consumerConnect, consumerStatsMBeanName)
consumerConnect consumerConnect
} }
@ -86,7 +83,6 @@ object Consumer extends Logging {
*/ */
def createJavaConsumerConnector(config: ConsumerConfig): kafka.javaapi.consumer.ConsumerConnector = { def createJavaConsumerConnector(config: ConsumerConfig): kafka.javaapi.consumer.ConsumerConnector = {
val consumerConnect = new kafka.javaapi.consumer.ZookeeperConsumerConnector(config) val consumerConnect = new kafka.javaapi.consumer.ZookeeperConsumerConnector(config)
Utils.registerMBean(consumerConnect.underlying, consumerStatsMBeanName)
consumerConnect consumerConnect
} }
} }

View File

@ -47,8 +47,8 @@ class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk],
currentTopicInfo.resetConsumeOffset(consumedOffset) currentTopicInfo.resetConsumeOffset(consumedOffset)
val topic = currentTopicInfo.topic val topic = currentTopicInfo.topic
trace("Setting %s consumed offset to %d".format(topic, consumedOffset)) trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
ConsumerTopicStat.getConsumerTopicStat(topic).recordMessagesPerTopic(1) ConsumerTopicStat.getConsumerTopicStat(topic).messageRate.mark()
ConsumerTopicStat.getConsumerAllTopicStat().recordMessagesPerTopic(1) ConsumerTopicStat.getConsumerAllTopicStat().messageRate.mark()
item item
} }

View File

@ -17,44 +17,24 @@
package kafka.consumer package kafka.consumer
import java.util.concurrent.atomic.AtomicLong import kafka.utils.{Pool, threadsafe, Logging}
import kafka.utils.{Pool, Utils, threadsafe, Logging} import java.util.concurrent.TimeUnit
import kafka.metrics.KafkaMetricsGroup
trait ConsumerTopicStatMBean {
def getMessagesPerTopic: Long
def getBytesPerTopic: Long
}
@threadsafe @threadsafe
class ConsumerTopicStat extends ConsumerTopicStatMBean { class ConsumerTopicStat(name: String) extends KafkaMetricsGroup {
private val numCumulatedMessagesPerTopic = new AtomicLong(0) val messageRate = newMeter(name + "MessagesPerSec", "messages", TimeUnit.SECONDS)
private val numCumulatedBytesPerTopic = new AtomicLong(0) val byteRate = newMeter(name + "BytesPerSec", "bytes", TimeUnit.SECONDS)
def getMessagesPerTopic: Long = numCumulatedMessagesPerTopic.get
def recordMessagesPerTopic(nMessages: Int) = numCumulatedMessagesPerTopic.getAndAdd(nMessages)
def getBytesPerTopic: Long = numCumulatedBytesPerTopic.get
def recordBytesPerTopic(nBytes: Long) = numCumulatedBytesPerTopic.getAndAdd(nBytes)
} }
object ConsumerTopicStat extends Logging { object ConsumerTopicStat extends Logging {
private val stats = new Pool[String, ConsumerTopicStat] private val valueFactory = (k: String) => new ConsumerTopicStat(k)
private val allTopicStat = new ConsumerTopicStat private val stats = new Pool[String, ConsumerTopicStat](Some(valueFactory))
Utils.registerMBean(allTopicStat, "kafka:type=kafka.ConsumerAllTopicStat") private val allTopicStat = new ConsumerTopicStat("AllTopics")
def getConsumerAllTopicStat(): ConsumerTopicStat = allTopicStat def getConsumerAllTopicStat(): ConsumerTopicStat = allTopicStat
def getConsumerTopicStat(topic: String): ConsumerTopicStat = { def getConsumerTopicStat(topic: String): ConsumerTopicStat = {
var stat = stats.get(topic) stats.getAndMaybePut(topic + "-")
if (stat == null) {
stat = new ConsumerTopicStat
if (stats.putIfNotExists(topic, stat) == null)
Utils.registerMBean(stat, "kafka:type=kafka.ConsumerTopicStat." + topic)
else
stat = stats.get(topic)
}
return stat
} }
} }

View File

@ -21,7 +21,6 @@ import java.util.concurrent._
import java.util.concurrent.atomic._ import java.util.concurrent.atomic._
import kafka.message._ import kafka.message._
import kafka.utils.Logging import kafka.utils.Logging
import kafka.common.ErrorMapping
private[consumer] class PartitionTopicInfo(val topic: String, private[consumer] class PartitionTopicInfo(val topic: String,
val brokerId: Int, val brokerId: Int,
@ -59,8 +58,8 @@ private[consumer] class PartitionTopicInfo(val topic: String,
chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get)) chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))
val newOffset = fetchedOffset.addAndGet(size) val newOffset = fetchedOffset.addAndGet(size)
debug("updated fetch offset of ( %s ) to %d".format(this, newOffset)) debug("updated fetch offset of ( %s ) to %d".format(this, newOffset))
ConsumerTopicStat.getConsumerTopicStat(topic).recordBytesPerTopic(size) ConsumerTopicStat.getConsumerTopicStat(topic).byteRate.mark(size)
ConsumerTopicStat.getConsumerAllTopicStat().recordBytesPerTopic(size) ConsumerTopicStat.getConsumerAllTopicStat().byteRate.mark(size)
} }
} }

View File

@ -21,6 +21,8 @@ import kafka.api._
import kafka.network._ import kafka.network._
import kafka.utils._ import kafka.utils._
import kafka.common.ErrorMapping import kafka.common.ErrorMapping
import java.util.concurrent.TimeUnit
import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
/** /**
* A consumer of kafka messages * A consumer of kafka messages
@ -91,15 +93,13 @@ class SimpleConsumer( val host: String,
* @return a set of fetched messages * @return a set of fetched messages
*/ */
def fetch(request: FetchRequest): FetchResponse = { def fetch(request: FetchRequest): FetchResponse = {
val startTime = SystemTime.nanoseconds var response: Receive = null
val response = sendRequest(request) FetchRequestAndResponseStat.requestTimer.time {
response = sendRequest(request)
}
val fetchResponse = FetchResponse.readFrom(response.buffer) val fetchResponse = FetchResponse.readFrom(response.buffer)
val fetchedSize = fetchResponse.sizeInBytes val fetchedSize = fetchResponse.sizeInBytes
FetchRequestAndResponseStat.respondSizeHist.update(fetchedSize)
val endTime = SystemTime.nanoseconds
SimpleConsumerStats.recordFetchRequest(endTime - startTime)
SimpleConsumerStats.recordConsumptionThroughput(fetchedSize)
fetchResponse fetchResponse
} }
@ -125,39 +125,7 @@ class SimpleConsumer( val host: String,
} }
} }
trait SimpleConsumerStatsMBean { object FetchRequestAndResponseStat extends KafkaMetricsGroup {
def getFetchRequestsPerSecond: Double val requestTimer = new KafkaTimer(newTimer("FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
def getAvgFetchRequestMs: Double val respondSizeHist = newHistogram("FetchResponseSize")
def getMaxFetchRequestMs: Double }
def getNumFetchRequests: Long
def getConsumerThroughput: Double
}
@threadsafe
class SimpleConsumerStats(monitoringDurationNs: Long) extends SimpleConsumerStatsMBean {
private val fetchRequestStats = new SnapshotStats(monitoringDurationNs)
def recordFetchRequest(requestNs: Long) = fetchRequestStats.recordRequestMetric(requestNs)
def recordConsumptionThroughput(data: Long) = fetchRequestStats.recordThroughputMetric(data)
def getFetchRequestsPerSecond: Double = fetchRequestStats.getRequestsPerSecond
def getAvgFetchRequestMs: Double = fetchRequestStats.getAvgMetric / (1000.0 * 1000.0)
def getMaxFetchRequestMs: Double = fetchRequestStats.getMaxMetric / (1000.0 * 1000.0)
def getNumFetchRequests: Long = fetchRequestStats.getNumRequests
def getConsumerThroughput: Double = fetchRequestStats.getThroughput
}
object SimpleConsumerStats extends Logging {
private val simpleConsumerstatsMBeanName = "kafka:type=kafka.SimpleConsumerStats"
private val stats = new SimpleConsumerStats(1 * 1000L * 1000L * 1000L)
Utils.registerMBean(stats, simpleConsumerstatsMBeanName)
def recordFetchRequest(requestMs: Long) = stats.recordFetchRequest(requestMs)
def recordConsumptionThroughput(data: Long) = stats.recordConsumptionThroughput(data)
}

View File

@ -32,6 +32,8 @@ import java.util.UUID
import kafka.serializer.Decoder import kafka.serializer.Decoder
import kafka.utils.ZkUtils._ import kafka.utils.ZkUtils._
import kafka.common.{KafkaException, NoBrokersForPartitionException, ConsumerRebalanceFailedException, InvalidConfigException} import kafka.common.{KafkaException, NoBrokersForPartitionException, ConsumerRebalanceFailedException, InvalidConfigException}
import com.yammer.metrics.core.Gauge
import kafka.metrics.KafkaMetricsGroup
/** /**
@ -73,21 +75,9 @@ private[kafka] object ZookeeperConsumerConnector {
val shutdownCommand: FetchedDataChunk = new FetchedDataChunk(null, null, -1L) val shutdownCommand: FetchedDataChunk = new FetchedDataChunk(null, null, -1L)
} }
/**
* JMX interface for monitoring consumer
*/
trait ZookeeperConsumerConnectorMBean {
def getPartOwnerStats: String
def getConsumerGroup: String
def getOffsetLag(topic: String, brokerId: Int, partitionId: Int): Long
def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long
def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long
}
private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val enableFetcher: Boolean) // for testing only val enableFetcher: Boolean) // for testing only
extends ConsumerConnector with ZookeeperConsumerConnectorMBean extends ConsumerConnector with Logging with KafkaMetricsGroup {
with Logging {
private val isShuttingDown = new AtomicBoolean(false) private val isShuttingDown = new AtomicBoolean(false)
private val rebalanceLock = new Object private val rebalanceLock = new Object
private var fetcher: Option[ConsumerFetcherManager] = None private var fetcher: Option[ConsumerFetcherManager] = None
@ -260,58 +250,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
} }
} }
// for JMX
def getPartOwnerStats(): String = {
val builder = new StringBuilder
for ((topic, infos) <- topicRegistry) {
builder.append("\n" + topic + ": [")
val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
for(partition <- infos.values) {
builder.append("\n {")
builder.append{partition}
builder.append(",fetch offset:" + partition.getFetchOffset)
builder.append(",consumer offset:" + partition.getConsumeOffset)
builder.append("}")
}
builder.append("\n ]")
}
builder.toString
}
// for JMX
def getConsumerGroup(): String = config.groupId
def getOffsetLag(topic: String, brokerId: Int, partitionId: Int): Long =
getLatestOffset(topic, brokerId, partitionId) - getConsumedOffset(topic, brokerId, partitionId)
def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long = {
val partitionInfos = topicRegistry.get(topic)
if (partitionInfos != null) {
val partitionInfo = partitionInfos.get(partitionId)
if (partitionInfo != null)
return partitionInfo.getConsumeOffset
}
// otherwise, try to get it from zookeeper
try {
val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
val znode = topicDirs.consumerOffsetDir + "/" + partitionId
val offsetString = readDataMaybeNull(zkClient, znode)._1
offsetString match {
case Some(offset) => offset.toLong
case None => -1L
}
}
catch {
case e =>
error("error in getConsumedOffset JMX ", e)
-2L
}
}
def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long =
earliestOrLatestOffset(topic, brokerId, partitionId, OffsetRequest.LatestTime)
private def earliestOrLatestOffset(topic: String, brokerId: Int, partitionId: Int, earliestOrLatest: Long): Long = { private def earliestOrLatestOffset(topic: String, brokerId: Int, partitionId: Int, earliestOrLatest: Long): Long = {
var simpleConsumer: SimpleConsumer = null var simpleConsumer: SimpleConsumer = null
var producedOffset: Long = -1L var producedOffset: Long = -1L
@ -728,6 +666,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val topicThreadId = e._1 val topicThreadId = e._1
val q = e._2._1 val q = e._2._1
topicThreadIdAndQueues.put(topicThreadId, q) topicThreadIdAndQueues.put(topicThreadId, q)
newGauge(
config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
new Gauge[Int] {
def value() = q.size
}
)
}) })
val groupedByTopic = threadQueueStreamPairs.groupBy(_._1._1) val groupedByTopic = threadQueueStreamPairs.groupBy(_._1._1)

View File

@ -24,7 +24,7 @@ class ProducerRequest(val correlationId: Int,
val clientId: String, val clientId: String,
val requiredAcks: Short, val requiredAcks: Short,
val ackTimeoutMs: Int, val ackTimeoutMs: Int,
val data: Array[TopicData]) extends RequestOrResponse(Some(RequestKeys.Produce)) { val data: Array[TopicData]) extends RequestOrResponse(Some(RequestKeys.ProduceKey)) {
val underlying = new kafka.api.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeoutMs, data) val underlying = new kafka.api.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeoutMs, data)

View File

@ -26,6 +26,8 @@ import java.text.NumberFormat
import kafka.server.BrokerTopicStat import kafka.server.BrokerTopicStat
import kafka.message.{ByteBufferMessageSet, MessageSet, InvalidMessageException, FileMessageSet} import kafka.message.{ByteBufferMessageSet, MessageSet, InvalidMessageException, FileMessageSet}
import kafka.common.{KafkaException, InvalidMessageSizeException, OffsetOutOfRangeException} import kafka.common.{KafkaException, InvalidMessageSizeException, OffsetOutOfRangeException}
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
object Log { object Log {
val FileSuffix = ".kafka" val FileSuffix = ".kafka"
@ -130,7 +132,7 @@ class LogSegment(val file: File, val messageSet: FileMessageSet, val start: Long
@threadsafe @threadsafe
private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessageSize: Int, val flushInterval: Int, private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessageSize: Int, val flushInterval: Int,
val rollIntervalMs: Long, val needRecovery: Boolean, time: Time, val rollIntervalMs: Long, val needRecovery: Boolean, time: Time,
brokerId: Int = 0) extends Logging { brokerId: Int = 0) extends Logging with KafkaMetricsGroup {
this.logIdent = "[Kafka Log on Broker " + brokerId + "], " this.logIdent = "[Kafka Log on Broker " + brokerId + "], "
import kafka.log.Log._ import kafka.log.Log._
@ -147,9 +149,19 @@ private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessag
/* The actual segments of the log */ /* The actual segments of the log */
private[log] val segments: SegmentList[LogSegment] = loadSegments() private[log] val segments: SegmentList[LogSegment] = loadSegments()
private val logStats = new LogStats(this) newGauge(
name + "-" + "NumLogSegments",
new Gauge[Int] {
def value() = numberOfSegments
}
)
Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName) newGauge(
name + "-" + "LogEndOffset",
new Gauge[Long] {
def value() = logEndOffset
}
)
/* The name of this log */ /* The name of this log */
def name = dir.getName() def name = dir.getName()
@ -243,9 +255,8 @@ private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessag
numberOfMessages += 1; numberOfMessages += 1;
} }
BrokerTopicStat.getBrokerTopicStat(topicName).recordMessagesIn(numberOfMessages) BrokerTopicStat.getBrokerTopicStat(topicName).messagesInRate.mark(numberOfMessages)
BrokerTopicStat.getBrokerAllTopicStat.recordMessagesIn(numberOfMessages) BrokerTopicStat.getBrokerAllTopicStat.messagesInRate.mark(numberOfMessages)
logStats.recordAppendedMessages(numberOfMessages)
// truncate the message set's buffer upto validbytes, before appending it to the on-disk log // truncate the message set's buffer upto validbytes, before appending it to the on-disk log
val validByteBuffer = messages.buffer.duplicate() val validByteBuffer = messages.buffer.duplicate()

View File

@ -1,44 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import java.util.concurrent.atomic.AtomicLong
trait LogStatsMBean {
def getName(): String
def getSize(): Long
def getNumberOfSegments: Int
def getCurrentOffset: Long
def getNumAppendedMessages: Long
}
class LogStats(val log: Log) extends LogStatsMBean {
private val numCumulatedMessages = new AtomicLong(0)
def getName(): String = log.name
def getSize(): Long = log.size
def getNumberOfSegments: Int = log.numberOfSegments
def getCurrentOffset: Long = log.logEndOffset
def getNumAppendedMessages: Long = numCumulatedMessages.get
def recordAppendedMessages(nMessages: Int) = numCumulatedMessages.getAndAdd(nMessages)
}

View File

@ -24,6 +24,8 @@ import java.util.concurrent.atomic._
import kafka.utils._ import kafka.utils._
import kafka.common.KafkaException import kafka.common.KafkaException
import java.util.concurrent.TimeUnit
import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
/** /**
* An on-disk message set. The set can be opened either mutably or immutably. Mutation attempts * An on-disk message set. The set can be opened either mutably or immutably. Mutation attempts
@ -157,10 +159,9 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel,
*/ */
def flush() = { def flush() = {
checkMutable() checkMutable()
val startTime = SystemTime.milliseconds LogFlushStats.logFlushTimer.time {
channel.force(true) channel.force(true)
val elapsedTime = SystemTime.milliseconds - startTime }
LogFlushStats.recordFlushRequest(elapsedTime)
} }
/** /**
@ -238,38 +239,8 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel,
else else
next next
} }
} }
trait LogFlushStatsMBean { object LogFlushStats extends KafkaMetricsGroup {
def getFlushesPerSecond: Double val logFlushTimer = new KafkaTimer(newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
def getAvgFlushMs: Double
def getTotalFlushMs: Long
def getMaxFlushMs: Double
def getNumFlushes: Long
}
@threadsafe
class LogFlushStats(monitorDurationNs: Long) extends LogFlushStatsMBean {
private val flushRequestStats = new SnapshotStats(monitorDurationNs)
def recordFlushRequest(requestMs: Long) = flushRequestStats.recordRequestMetric(requestMs)
def getFlushesPerSecond: Double = flushRequestStats.getRequestsPerSecond
def getAvgFlushMs: Double = flushRequestStats.getAvgMetric
def getTotalFlushMs: Long = flushRequestStats.getTotalMetric
def getMaxFlushMs: Double = flushRequestStats.getMaxMetric
def getNumFlushes: Long = flushRequestStats.getNumRequests
}
object LogFlushStats extends Logging {
private val LogFlushStatsMBeanName = "kafka:type=kafka.LogFlushStats"
private val stats = new LogFlushStats(1L * 1000 * 1000 * 1000)
Utils.registerMBean(stats, LogFlushStatsMBeanName)
def recordFlushRequest(requestMs: Long) = stats.recordFlushRequest(requestMs)
} }

View File

@ -26,39 +26,15 @@ import com.yammer.metrics.Metrics
trait KafkaMetricsGroup extends Logging { trait KafkaMetricsGroup extends Logging {
/**
* This method enables the user to form logical sub-groups of this
* KafkaMetricsGroup by inserting a sub-group identifier in the package
* string.
*
* @return The sub-group identifier.
*/
def metricsGroupIdent: String = ""
/** /**
* Creates a new MetricName object for gauges, meters, etc. created for this * Creates a new MetricName object for gauges, meters, etc. created for this
* metrics group. It uses the metricsGroupIdent to create logical sub-groups. * metrics group.
* This is currently specifically of use to classes under kafka, with
* broker-id being the most common metrics grouping strategy.
*
* @param name Descriptive name of the metric. * @param name Descriptive name of the metric.
* @return Sanitized metric name object. * @return Sanitized metric name object.
*/ */
private def metricName(name: String) = { private def metricName(name: String) = {
val ident = metricsGroupIdent
val klass = this.getClass val klass = this.getClass
val pkg = { val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName
val actualPkg = if (klass.getPackage == null) "" else klass.getPackage.getName
if (ident.nonEmpty) {
// insert the sub-group identifier after the top-level package
if (actualPkg.contains("."))
actualPkg.replaceFirst("""\.""", ".%s.".format(ident))
else
actualPkg + "." + ident
}
else
actualPkg
}
val simpleName = klass.getSimpleName.replaceAll("\\$$", "") val simpleName = klass.getSimpleName.replaceAll("\\$$", "")
new MetricName(pkg, simpleName, name) new MetricName(pkg, simpleName, name)
} }

View File

@ -19,23 +19,78 @@ package kafka.network
import java.util.concurrent._ import java.util.concurrent._
import kafka.utils.SystemTime import kafka.utils.SystemTime
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
import java.nio.ByteBuffer
import kafka.api._
object RequestChannel {
val AllDone = new Request(1, 2, getShutdownReceive(), 0)
def getShutdownReceive() = {
val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, Array[TopicData]())
val byteBuffer = ByteBuffer.allocate(emptyProducerRequest.sizeInBytes + 2)
byteBuffer.putShort(RequestKeys.ProduceKey)
emptyProducerRequest.writeTo(byteBuffer)
byteBuffer.rewind()
byteBuffer
}
case class Request(processor: Int, requestKey: Any, buffer: ByteBuffer, startTimeNs: Long) {
var dequeueTimeNs = -1L
var apiLocalCompleteTimeNs = -1L
var responseCompleteTimeNs = -1L
val requestId = buffer.getShort()
val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer)
buffer.rewind()
def updateRequestMetrics() {
val endTimeNs = SystemTime.nanoseconds
val queueTime = (dequeueTimeNs - startTimeNs).max(0L)
val apiLocalTime = (apiLocalCompleteTimeNs - dequeueTimeNs).max(0L)
val apiRemoteTime = (responseCompleteTimeNs - apiLocalCompleteTimeNs).max(0L)
val responseSendTime = (endTimeNs - responseCompleteTimeNs).max(0L)
val totalTime = endTimeNs - startTimeNs
var metricsList = List(RequestMetrics.metricsMap(RequestKeys.nameForKey(requestId)))
if (requestId == RequestKeys.FetchKey) {
val isFromFollower = requestObj.asInstanceOf[FetchRequest].isFromFollower
metricsList ::= ( if (isFromFollower)
RequestMetrics.metricsMap(RequestMetrics.followFetchMetricName)
else
RequestMetrics.metricsMap(RequestMetrics.consumerFetchMetricName) )
}
metricsList.foreach{
m => m.requestRate.mark()
m.queueTimeHist.update(queueTime)
m.localTimeHist.update(apiLocalTime)
m.remoteTimeHist.update(apiRemoteTime)
m.responseSendTimeHist.update(responseSendTime)
m.totalTimeHist.update(totalTime)
}
}
}
case class Response(processor: Int, request: Request, responseSend: Send) {
request.responseCompleteTimeNs = SystemTime.nanoseconds
object RequestChannel {
val AllDone = new Request(1, 2, null, 0)
case class Request(processor: Int, requestKey: Any, request: Receive, start: Long)
case class Response(processor: Int, requestKey: Any, response: Send, start: Long, elapsedNs: Long) {
def this(request: Request, send: Send) = def this(request: Request, send: Send) =
this(request.processor, request.requestKey, send, request.start, SystemTime.nanoseconds - request.start) this(request.processor, request, send)
} }
} }
class RequestChannel(val numProcessors: Int, val queueSize: Int) { class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
private var responseListeners: List[(Int) => Unit] = Nil private var responseListeners: List[(Int) => Unit] = Nil
private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize) private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors) private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
for(i <- 0 until numProcessors) for(i <- 0 until numProcessors)
responseQueues(i) = new ArrayBlockingQueue[RequestChannel.Response](queueSize) responseQueues(i) = new ArrayBlockingQueue[RequestChannel.Response](queueSize)
newGauge(
"RequestQueueSize",
new Gauge[Int] {
def value() = requestQueue.size
}
)
/** Send a request to be handled, potentially blocking until there is room in the queue for the request */ /** Send a request to be handled, potentially blocking until there is room in the queue for the request */
def sendRequest(request: RequestChannel.Request) { def sendRequest(request: RequestChannel.Request) {
@ -60,5 +115,26 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) {
def addResponseListener(onResponse: Int => Unit) { def addResponseListener(onResponse: Int => Unit) {
responseListeners ::= onResponse responseListeners ::= onResponse
} }
} }
object RequestMetrics {
val metricsMap = new scala.collection.mutable.HashMap[String, RequestMetrics]
val consumerFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Consumer"
val followFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Follower"
(RequestKeys.keyToNameAndDeserializerMap.values.map(e => e._1)
++ List(consumerFetchMetricName, followFetchMetricName)).foreach(name => metricsMap.put(name, new RequestMetrics(name)))
}
class RequestMetrics(name: String) extends KafkaMetricsGroup {
val requestRate = newMeter(name + "-RequestsPerSec", "requests", TimeUnit.SECONDS)
// time a request spent in a request queue
val queueTimeHist = newHistogram(name + "-QueueTimeNs")
// time a request takes to be processed at the local broker
val localTimeHist = newHistogram(name + "-LocalTimeNs")
// time a request takes to wait on remote brokers (only relevant to fetch and produce requests)
val remoteTimeHist = newHistogram(name + "-RemoteTimeNs")
// time to send the response to the requester
val responseSendTimeHist = newHistogram(name + "-ResponseSendTimeNs")
val totalTimeHist = newHistogram(name + "-TotalTimeNs")
}

View File

@ -41,7 +41,6 @@ class SocketServer(val brokerId: Int,
private val time = SystemTime private val time = SystemTime
private val processors = new Array[Processor](numProcessorThreads) private val processors = new Array[Processor](numProcessorThreads)
private var acceptor: Acceptor = new Acceptor(port, processors) private var acceptor: Acceptor = new Acceptor(port, processors)
val stats: SocketServerStats = new SocketServerStats(1000L * 1000L * 1000L * monitoringPeriodSecs)
val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests) val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests)
/** /**
@ -49,7 +48,7 @@ class SocketServer(val brokerId: Int,
*/ */
def startup() { def startup() {
for(i <- 0 until numProcessorThreads) { for(i <- 0 until numProcessorThreads) {
processors(i) = new Processor(i, time, maxRequestSize, requestChannel, stats) processors(i) = new Processor(i, time, maxRequestSize, requestChannel)
Utils.newThread("kafka-processor-%d-%d".format(port, i), processors(i), false).start() Utils.newThread("kafka-processor-%d-%d".format(port, i), processors(i), false).start()
} }
// register the processor threads for notification of responses // register the processor threads for notification of responses
@ -187,8 +186,7 @@ private[kafka] class Acceptor(val port: Int, private val processors: Array[Proce
private[kafka] class Processor(val id: Int, private[kafka] class Processor(val id: Int,
val time: Time, val time: Time,
val maxRequestSize: Int, val maxRequestSize: Int,
val requestChannel: RequestChannel, val requestChannel: RequestChannel) extends AbstractServerThread {
val stats: SocketServerStats) extends AbstractServerThread {
private val newConnections = new ConcurrentLinkedQueue[SocketChannel](); private val newConnections = new ConcurrentLinkedQueue[SocketChannel]();
@ -240,10 +238,10 @@ private[kafka] class Processor(val id: Int,
var curr = requestChannel.receiveResponse(id) var curr = requestChannel.receiveResponse(id)
while(curr != null) { while(curr != null) {
trace("Socket server received response to send, registering for write: " + curr) trace("Socket server received response to send, registering for write: " + curr)
val key = curr.requestKey.asInstanceOf[SelectionKey] val key = curr.request.requestKey.asInstanceOf[SelectionKey]
try { try {
key.interestOps(SelectionKey.OP_WRITE) key.interestOps(SelectionKey.OP_WRITE)
key.attach(curr.response) key.attach(curr)
} catch { } catch {
case e: CancelledKeyException => { case e: CancelledKeyException => {
debug("Ignoring response for closed socket.") debug("Ignoring response for closed socket.")
@ -288,18 +286,17 @@ private[kafka] class Processor(val id: Int,
*/ */
def read(key: SelectionKey) { def read(key: SelectionKey) {
val socketChannel = channelFor(key) val socketChannel = channelFor(key)
var request = key.attachment.asInstanceOf[Receive] var receive = key.attachment.asInstanceOf[Receive]
if(key.attachment == null) { if(key.attachment == null) {
request = new BoundedByteBufferReceive(maxRequestSize) receive = new BoundedByteBufferReceive(maxRequestSize)
key.attach(request) key.attach(receive)
} }
val read = request.readFrom(socketChannel) val read = receive.readFrom(socketChannel)
stats.recordBytesRead(read)
trace(read + " bytes read from " + socketChannel.socket.getRemoteSocketAddress()) trace(read + " bytes read from " + socketChannel.socket.getRemoteSocketAddress())
if(read < 0) { if(read < 0) {
close(key) close(key)
} else if(request.complete) { } else if(receive.complete) {
val req = RequestChannel.Request(processor = id, requestKey = key, request = request, start = time.nanoseconds) val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeNs = time.nanoseconds)
requestChannel.sendRequest(req) requestChannel.sendRequest(req)
trace("Recieved request, sending for processing by handler: " + req) trace("Recieved request, sending for processing by handler: " + req)
key.attach(null) key.attach(null)
@ -315,13 +312,14 @@ private[kafka] class Processor(val id: Int,
*/ */
def write(key: SelectionKey) { def write(key: SelectionKey) {
val socketChannel = channelFor(key) val socketChannel = channelFor(key)
var response = key.attachment().asInstanceOf[Send] val response = key.attachment().asInstanceOf[RequestChannel.Response]
if(response == null) val responseSend = response.responseSend
if(responseSend == null)
throw new IllegalStateException("Registered for write interest but no response attached to key.") throw new IllegalStateException("Registered for write interest but no response attached to key.")
val written = response.writeTo(socketChannel) val written = responseSend.writeTo(socketChannel)
stats.recordBytesWritten(written)
trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress()) trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress())
if(response.complete) { if(responseSend.complete) {
response.request.updateRequestMetrics()
key.attach(null) key.attach(null)
key.interestOps(SelectionKey.OP_READ) key.interestOps(SelectionKey.OP_READ)
} else { } else {

View File

@ -1,90 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.network
import kafka.utils._
import kafka.api.RequestKeys
trait SocketServerStatsMBean {
def getProduceRequestsPerSecond: Double
def getFetchRequestsPerSecond: Double
def getAvgProduceRequestMs: Double
def getMaxProduceRequestMs: Double
def getAvgFetchRequestMs: Double
def getMaxFetchRequestMs: Double
def getBytesReadPerSecond: Double
def getBytesWrittenPerSecond: Double
def getNumFetchRequests: Long
def getNumProduceRequests: Long
def getTotalBytesRead: Long
def getTotalBytesWritten: Long
def getTotalFetchRequestMs: Long
def getTotalProduceRequestMs: Long
}
@threadsafe
class SocketServerStats(val monitorDurationNs: Long, val time: Time) extends SocketServerStatsMBean {
def this(monitorDurationNs: Long) = this(monitorDurationNs, SystemTime)
val produceTimeStats = new SnapshotStats(monitorDurationNs)
val fetchTimeStats = new SnapshotStats(monitorDurationNs)
val produceBytesStats = new SnapshotStats(monitorDurationNs)
val fetchBytesStats = new SnapshotStats(monitorDurationNs)
def recordRequest(requestTypeId: Short, durationNs: Long) {
requestTypeId match {
case r if r == RequestKeys.Produce =>
produceTimeStats.recordRequestMetric(durationNs)
case r if r == RequestKeys.Fetch =>
fetchTimeStats.recordRequestMetric(durationNs)
case _ => /* not collecting; let go */
}
}
def recordBytesWritten(bytes: Int): Unit = fetchBytesStats.recordRequestMetric(bytes)
def recordBytesRead(bytes: Int): Unit = produceBytesStats.recordRequestMetric(bytes)
def getProduceRequestsPerSecond: Double = produceTimeStats.getRequestsPerSecond
def getFetchRequestsPerSecond: Double = fetchTimeStats.getRequestsPerSecond
def getAvgProduceRequestMs: Double = produceTimeStats.getAvgMetric / (1000.0 * 1000.0)
def getMaxProduceRequestMs: Double = produceTimeStats.getMaxMetric / (1000.0 * 1000.0)
def getAvgFetchRequestMs: Double = fetchTimeStats.getAvgMetric / (1000.0 * 1000.0)
def getMaxFetchRequestMs: Double = fetchTimeStats.getMaxMetric / (1000.0 * 1000.0)
def getBytesReadPerSecond: Double = produceBytesStats.getAvgMetric
def getBytesWrittenPerSecond: Double = fetchBytesStats.getAvgMetric
def getNumFetchRequests: Long = fetchTimeStats.getNumRequests
def getNumProduceRequests: Long = produceTimeStats.getNumRequests
def getTotalBytesRead: Long = produceBytesStats.getTotalMetric
def getTotalBytesWritten: Long = fetchBytesStats.getTotalMetric
def getTotalFetchRequestMs: Long = fetchTimeStats.getTotalMetric
def getTotalProduceRequestMs: Long = produceTimeStats.getTotalMetric
}

View File

@ -20,8 +20,9 @@ import async.{AsyncProducerStats, DefaultEventHandler, ProducerSendThread, Event
import kafka.utils._ import kafka.utils._
import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
import kafka.serializer.Encoder import kafka.serializer.Encoder
import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean} import java.util.concurrent.atomic.AtomicBoolean
import kafka.common.{QueueFullException, InvalidConfigException} import kafka.common.{QueueFullException, InvalidConfigException}
import kafka.metrics.KafkaMetricsGroup
class Producer[K,V](config: ProducerConfig, class Producer[K,V](config: ProducerConfig,
private val eventHandler: EventHandler[K,V]) // for testing only private val eventHandler: EventHandler[K,V]) // for testing only
@ -68,8 +69,10 @@ extends Logging {
} }
private def recordStats(producerData: ProducerData[K,V]*) { private def recordStats(producerData: ProducerData[K,V]*) {
for (data <- producerData) for (data <- producerData) {
ProducerTopicStat.getProducerTopicStat(data.getTopic).recordMessagesPerTopic(data.getData.size) ProducerTopicStat.getProducerTopicStat(data.getTopic).messageRate.mark(data.getData.size)
ProducerTopicStat.getProducerAllTopicStat.messageRate.mark(data.getData.size)
}
} }
private def asyncSend(producerData: ProducerData[K,V]*) { private def asyncSend(producerData: ProducerData[K,V]*) {
@ -93,7 +96,7 @@ extends Logging {
} }
} }
if(!added) { if(!added) {
AsyncProducerStats.recordDroppedEvents AsyncProducerStats.droppedMessageRate.mark()
error("Event queue is full of unsent messages, could not send event: " + data.toString) error("Event queue is full of unsent messages, could not send event: " + data.toString)
throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + data.toString) throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + data.toString)
}else { }else {
@ -118,31 +121,27 @@ extends Logging {
} }
} }
trait ProducerTopicStatMBean {
def getMessagesPerTopic: Long
}
@threadsafe @threadsafe
class ProducerTopicStat extends ProducerTopicStatMBean { class ProducerTopicStat(name: String) extends KafkaMetricsGroup {
private val numCumulatedMessagesPerTopic = new AtomicLong(0) val messageRate = newMeter(name + "MessagesPerSec", "messages", TimeUnit.SECONDS)
val byteRate = newMeter(name + "BytesPerSec", "bytes", TimeUnit.SECONDS)
def getMessagesPerTopic: Long = numCumulatedMessagesPerTopic.get
def recordMessagesPerTopic(nMessages: Int) = numCumulatedMessagesPerTopic.getAndAdd(nMessages)
} }
object ProducerTopicStat extends Logging { object ProducerTopicStat {
private val stats = new Pool[String, ProducerTopicStat] private val valueFactory = (k: String) => new ProducerTopicStat(k)
private val stats = new Pool[String, ProducerTopicStat](Some(valueFactory))
private val allTopicStat = new ProducerTopicStat("AllTopics")
def getProducerAllTopicStat(): ProducerTopicStat = allTopicStat
def getProducerTopicStat(topic: String): ProducerTopicStat = { def getProducerTopicStat(topic: String): ProducerTopicStat = {
var stat = stats.get(topic) stats.getAndMaybePut(topic + "-")
if (stat == null) {
stat = new ProducerTopicStat
if (stats.putIfNotExists(topic, stat) == null)
Utils.registerMBean(stat, "kafka.producer.Producer:type=kafka.ProducerTopicStat." + topic)
else
stat = stats.get(topic)
}
return stat
} }
} }
object ProducerStats extends KafkaMetricsGroup {
val serializationErrorRate = newMeter("SerializationErrorsPerSec", "errors", TimeUnit.SECONDS)
val resendRate = newMeter( "ResendsPerSec", "resends", TimeUnit.SECONDS)
val failedSendRate = newMeter("FailedSendsPerSec", "failed sends", TimeUnit.SECONDS)
}

View File

@ -18,11 +18,12 @@
package kafka.producer package kafka.producer
import kafka.api._ import kafka.api._
import kafka.message.MessageSet
import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive} import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive}
import kafka.utils._ import kafka.utils._
import java.util.Random import java.util.Random
import kafka.common.{ErrorMapping, MessageSizeTooLargeException} import kafka.common.ErrorMapping
import java.util.concurrent.TimeUnit
import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
object SyncProducer { object SyncProducer {
val RequestKey: Short = 0 val RequestKey: Short = 0
@ -57,7 +58,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
val buffer = new BoundedByteBufferSend(request).buffer val buffer = new BoundedByteBufferSend(request).buffer
trace("verifying sendbuffer of size " + buffer.limit) trace("verifying sendbuffer of size " + buffer.limit)
val requestTypeId = buffer.getShort() val requestTypeId = buffer.getShort()
if(requestTypeId == RequestKeys.Produce) { if(requestTypeId == RequestKeys.ProduceKey) {
val request = ProducerRequest.readFrom(buffer) val request = ProducerRequest.readFrom(buffer)
trace(request.toString) trace(request.toString)
} }
@ -92,7 +93,6 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
sentOnConnection = 0 sentOnConnection = 0
lastConnectionTime = System.currentTimeMillis lastConnectionTime = System.currentTimeMillis
} }
SyncProducerStats.recordProduceRequest(SystemTime.nanoseconds - startTime)
response response
} }
} }
@ -101,7 +101,11 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
* Send a message * Send a message
*/ */
def send(producerRequest: ProducerRequest): ProducerResponse = { def send(producerRequest: ProducerRequest): ProducerResponse = {
val response = doSend(producerRequest) ProducerRequestStat.requestSizeHist.update(producerRequest.sizeInBytes)
var response: Receive = null
ProducerRequestStat.requestTimer.time {
response = doSend(producerRequest)
}
ProducerResponse.readFrom(response.buffer) ProducerResponse.readFrom(response.buffer)
} }
@ -171,34 +175,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
} }
} }
trait SyncProducerStatsMBean { object ProducerRequestStat extends KafkaMetricsGroup {
def getProduceRequestsPerSecond: Double val requestTimer = new KafkaTimer(newTimer("ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
def getAvgProduceRequestMs: Double val requestSizeHist = newHistogram("ProducerRequestSize")
def getMaxProduceRequestMs: Double }
def getNumProduceRequests: Long
}
@threadsafe
class SyncProducerStats(monitoringDurationNs: Long) extends SyncProducerStatsMBean {
private val produceRequestStats = new SnapshotStats(monitoringDurationNs)
def recordProduceRequest(requestNs: Long) = produceRequestStats.recordRequestMetric(requestNs)
def getProduceRequestsPerSecond: Double = produceRequestStats.getRequestsPerSecond
def getAvgProduceRequestMs: Double = produceRequestStats.getAvgMetric / (1000.0 * 1000.0)
def getMaxProduceRequestMs: Double = produceRequestStats.getMaxMetric / (1000.0 * 1000.0)
def getNumProduceRequests: Long = produceRequestStats.getNumRequests
}
object SyncProducerStats extends Logging {
private val kafkaProducerstatsMBeanName = "kafka:type=kafka.KafkaProducerStats"
private val stats = new SyncProducerStats(1L * 1000 * 1000 * 1000)
swallow(Utils.registerMBean(stats, kafkaProducerstatsMBeanName))
def recordProduceRequest(requestMs: Long) = {
stats.recordProduceRequest(requestMs)
}
}

View File

@ -17,22 +17,9 @@
package kafka.producer.async package kafka.producer.async
import java.util.concurrent.atomic.AtomicInteger import kafka.metrics.KafkaMetricsGroup
import kafka.utils.Utils import java.util.concurrent.TimeUnit
class AsyncProducerStats extends AsyncProducerStatsMBean { object AsyncProducerStats extends KafkaMetricsGroup {
val droppedEvents = new AtomicInteger(0) val droppedMessageRate = newMeter("DroppedMessagesPerSec", "drops", TimeUnit.SECONDS)
def getAsyncProducerDroppedEvents: Int = droppedEvents.get
def recordDroppedEvents = droppedEvents.getAndAdd(1)
}
object AsyncProducerStats {
private val stats = new AsyncProducerStats
val ProducerMBeanName = "kafka.producer.Producer:type=AsyncProducerStats"
Utils.registerMBean(stats, ProducerMBeanName)
def recordDroppedEvents = stats.recordDroppedEvents
} }

View File

@ -1,26 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.producer.async
trait AsyncProducerStatsMBean {
def getAsyncProducerDroppedEvents: Int
}
trait AsyncProducerQueueSizeStatsMBean {
def getAsyncProducerQueueSize: Int
}

View File

@ -24,7 +24,7 @@ import kafka.serializer.Encoder
import kafka.utils.{Utils, Logging} import kafka.utils.{Utils, Logging}
import scala.collection.Map import scala.collection.Map
import scala.collection.mutable.{ListBuffer, HashMap} import scala.collection.mutable.{ListBuffer, HashMap}
import kafka.api.{TopicMetadata, ProducerRequest, TopicData, PartitionData} import kafka.api._
class DefaultEventHandler[K,V](config: ProducerConfig, class DefaultEventHandler[K,V](config: ProducerConfig,
@ -33,6 +33,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
private val producerPool: ProducerPool, private val producerPool: ProducerPool,
private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata]) private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata])
extends EventHandler[K,V] with Logging { extends EventHandler[K,V] with Logging {
val isSync = ("sync" == config.producerType)
val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos) val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos)
@ -41,6 +42,11 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
def handle(events: Seq[ProducerData[K,V]]) { def handle(events: Seq[ProducerData[K,V]]) {
lock synchronized { lock synchronized {
val serializedData = serialize(events) val serializedData = serialize(events)
serializedData.foreach{
pd => val dataSize = pd.data.foldLeft(0)(_ + _.payloadSize)
ProducerTopicStat.getProducerTopicStat(pd.topic).byteRate.mark(dataSize)
ProducerTopicStat.getProducerAllTopicStat.byteRate.mark(dataSize)
}
var outstandingProduceRequests = serializedData var outstandingProduceRequests = serializedData
var remainingRetries = config.producerRetries + 1 var remainingRetries = config.producerRetries + 1
while (remainingRetries > 0 && outstandingProduceRequests.size > 0) { while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
@ -51,9 +57,11 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
// get topics of the outstanding produce requests and refresh metadata for those // get topics of the outstanding produce requests and refresh metadata for those
Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.getTopic))) Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.getTopic)))
remainingRetries -= 1 remainingRetries -= 1
ProducerStats.resendRate.mark()
} }
} }
if(outstandingProduceRequests.size > 0) { if(outstandingProduceRequests.size > 0) {
ProducerStats.failedSendRate.mark()
error("Failed to send the following requests: " + outstandingProduceRequests) error("Failed to send the following requests: " + outstandingProduceRequests)
throw new FailedToSendMessageException("Failed to send messages after " + config.producerRetries + " tries.", null) throw new FailedToSendMessageException("Failed to send messages after " + config.producerRetries + " tries.", null)
} }
@ -90,7 +98,28 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
} }
def serialize(events: Seq[ProducerData[K,V]]): Seq[ProducerData[K,Message]] = { def serialize(events: Seq[ProducerData[K,V]]): Seq[ProducerData[K,Message]] = {
events.map(e => new ProducerData[K,Message](e.getTopic, e.getKey, e.getData.map(m => encoder.toMessage(m)))) val serializedProducerData = new ListBuffer[ProducerData[K,Message]]
events.foreach {e =>
val serializedMessages = new ListBuffer[Message]
for (d <- e.getData) {
try {
serializedMessages += encoder.toMessage(d)
} catch {
case t =>
ProducerStats.serializationErrorRate.mark()
if (isSync)
throw t
else {
// currently, if in async mode, we just log the serialization error. We need to revisit
// this when doing kafka-496
error("Error serializing message " + t)
}
}
}
if (serializedMessages.size > 0)
serializedProducerData += new ProducerData[K,Message](e.getTopic, e.getKey, serializedMessages)
}
serializedProducerData
} }
def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Option[Map[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]]] = { def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Option[Map[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]]] = {

View File

@ -21,16 +21,25 @@ import kafka.utils.{SystemTime, Logging}
import java.util.concurrent.{TimeUnit, CountDownLatch, BlockingQueue} import java.util.concurrent.{TimeUnit, CountDownLatch, BlockingQueue}
import collection.mutable.ListBuffer import collection.mutable.ListBuffer
import kafka.producer.ProducerData import kafka.producer.ProducerData
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
class ProducerSendThread[K,V](val threadName: String, class ProducerSendThread[K,V](val threadName: String,
val queue: BlockingQueue[ProducerData[K,V]], val queue: BlockingQueue[ProducerData[K,V]],
val handler: EventHandler[K,V], val handler: EventHandler[K,V],
val queueTime: Long, val queueTime: Long,
val batchSize: Int) extends Thread(threadName) with Logging { val batchSize: Int) extends Thread(threadName) with Logging with KafkaMetricsGroup {
private val shutdownLatch = new CountDownLatch(1) private val shutdownLatch = new CountDownLatch(1)
private val shutdownCommand = new ProducerData[K,V](null, null.asInstanceOf[K], null.asInstanceOf[Seq[V]]) private val shutdownCommand = new ProducerData[K,V](null, null.asInstanceOf[K], null.asInstanceOf[Seq[V]])
newGauge(
"ProducerQueueSize-" + getId,
new Gauge[Int] {
def value() = queue.size
}
)
override def run { override def run {
try { try {

View File

@ -23,7 +23,11 @@ import kafka.common.ErrorMapping
import collection.mutable import collection.mutable
import kafka.message.ByteBufferMessageSet import kafka.message.ByteBufferMessageSet
import kafka.api.{FetchResponse, PartitionData, FetchRequestBuilder} import kafka.api.{FetchResponse, PartitionData, FetchRequestBuilder}
import kafka.utils.ShutdownableThread import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
import java.util.concurrent.atomic.AtomicLong
import kafka.utils.{Pool, ShutdownableThread}
import java.util.concurrent.TimeUnit
/** /**
@ -35,6 +39,8 @@ abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socket
private val fetchMap = new mutable.HashMap[Tuple2[String,Int], Long] // a (topic, partitionId) -> offset map private val fetchMap = new mutable.HashMap[Tuple2[String,Int], Long] // a (topic, partitionId) -> offset map
private val fetchMapLock = new Object private val fetchMapLock = new Object
val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize) val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize)
val fetcherMetrics = FetcherStat.getFetcherStat(name + "-" + sourceBroker.id)
// callbacks to be defined in subclass // callbacks to be defined in subclass
// process fetched data // process fetched data
@ -79,6 +85,7 @@ abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socket
} }
} }
} }
fetcherMetrics.requestRate.mark()
if (response != null) { if (response != null) {
// process fetched data // process fetched data
@ -93,8 +100,11 @@ abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socket
partitionData.error match { partitionData.error match {
case ErrorMapping.NoError => case ErrorMapping.NoError =>
processPartitionData(topic, currentOffset.get, partitionData) processPartitionData(topic, currentOffset.get, partitionData)
val newOffset = currentOffset.get + partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes val validBytes = partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes
val newOffset = currentOffset.get + validBytes
fetchMap.put(key, newOffset) fetchMap.put(key, newOffset)
FetcherLagMetrics.getFetcherLagMetrics(topic, partitionId).lag = partitionData.hw - newOffset
fetcherMetrics.byteRate.mark(validBytes)
case ErrorMapping.OffsetOutOfRangeCode => case ErrorMapping.OffsetOutOfRangeCode =>
val newOffset = handleOffsetOutOfRange(topic, partitionId) val newOffset = handleOffsetOutOfRange(topic, partitionId)
fetchMap.put(key, newOffset) fetchMap.put(key, newOffset)
@ -140,4 +150,43 @@ abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socket
fetchMap.size fetchMap.size
} }
} }
} }
class FetcherLagMetrics(name: (String, Int)) extends KafkaMetricsGroup {
private[this] var lagVal = new AtomicLong(-1L)
newGauge(
name._1 + "-" + name._2 + "-ConsumerLag",
new Gauge[Long] {
def value() = lagVal.get
}
)
def lag_=(newLag: Long) {
lagVal.set(newLag)
}
def lag = lagVal.get
}
object FetcherLagMetrics {
private val valueFactory = (k: (String, Int)) => new FetcherLagMetrics(k)
private val stats = new Pool[(String, Int), FetcherLagMetrics](Some(valueFactory))
def getFetcherLagMetrics(topic: String, partitionId: Int): FetcherLagMetrics = {
stats.getAndMaybePut( (topic, partitionId) )
}
}
class FetcherStat(name: String) extends KafkaMetricsGroup {
val requestRate = newMeter(name + "RequestsPerSec", "requests", TimeUnit.SECONDS)
val byteRate = newMeter(name + "BytesPerSec", "bytes", TimeUnit.SECONDS)
}
object FetcherStat {
private val valueFactory = (k: String) => new FetcherStat(k)
private val stats = new Pool[String, FetcherStat](Some(valueFactory))
def getFetcherStat(name: String): FetcherStat = {
stats.getAndMaybePut(name)
}
}

View File

@ -40,8 +40,6 @@ class KafkaApis(val requestChannel: RequestChannel,
val replicaManager: ReplicaManager, val replicaManager: ReplicaManager,
val zkClient: ZkClient, val zkClient: ZkClient,
brokerId: Int) extends Logging { brokerId: Int) extends Logging {
private val metricsGroup = brokerId.toString
private val producerRequestPurgatory = new ProducerRequestPurgatory(brokerId) private val producerRequestPurgatory = new ProducerRequestPurgatory(brokerId)
private val fetchRequestPurgatory = new FetchRequestPurgatory(brokerId, requestChannel) private val fetchRequestPurgatory = new FetchRequestPurgatory(brokerId, requestChannel)
private val delayedRequestMetrics = new DelayedRequestMetrics private val delayedRequestMetrics = new DelayedRequestMetrics
@ -54,20 +52,20 @@ class KafkaApis(val requestChannel: RequestChannel,
* Top-level method that handles all requests and multiplexes to the right api * Top-level method that handles all requests and multiplexes to the right api
*/ */
def handle(request: RequestChannel.Request) { def handle(request: RequestChannel.Request) {
val apiId = request.request.buffer.getShort() request.requestId match {
apiId match { case RequestKeys.ProduceKey => handleProducerRequest(request)
case RequestKeys.Produce => handleProducerRequest(request) case RequestKeys.FetchKey => handleFetchRequest(request)
case RequestKeys.Fetch => handleFetchRequest(request) case RequestKeys.OffsetsKey => handleOffsetRequest(request)
case RequestKeys.Offsets => handleOffsetRequest(request) case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
case RequestKeys.TopicMetadata => handleTopicMetadataRequest(request) case RequestKeys.LeaderAndIsrKey => handleLeaderAndISRRequest(request)
case RequestKeys.LeaderAndISRRequest => handleLeaderAndISRRequest(request) case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
case RequestKeys.StopReplicaRequest => handleStopReplicaRequest(request) case requestId => throw new KafkaException("No mapping found for handler id " + requestId)
case _ => throw new KafkaException("No mapping found for handler id " + apiId)
} }
request.apiLocalCompleteTimeNs = SystemTime.nanoseconds
} }
def handleLeaderAndISRRequest(request: RequestChannel.Request){ def handleLeaderAndISRRequest(request: RequestChannel.Request){
val leaderAndISRRequest = LeaderAndIsrRequest.readFrom(request.request.buffer) val leaderAndISRRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
if(requestLogger.isTraceEnabled) if(requestLogger.isTraceEnabled)
requestLogger.trace("Handling leader and isr request " + leaderAndISRRequest) requestLogger.trace("Handling leader and isr request " + leaderAndISRRequest)
trace("Handling leader and isr request " + leaderAndISRRequest) trace("Handling leader and isr request " + leaderAndISRRequest)
@ -79,7 +77,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleStopReplicaRequest(request: RequestChannel.Request){ def handleStopReplicaRequest(request: RequestChannel.Request){
val stopReplicaRequest = StopReplicaRequest.readFrom(request.request.buffer) val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest]
if(requestLogger.isTraceEnabled) if(requestLogger.isTraceEnabled)
requestLogger.trace("Handling stop replica request " + stopReplicaRequest) requestLogger.trace("Handling stop replica request " + stopReplicaRequest)
trace("Handling stop replica request " + stopReplicaRequest) trace("Handling stop replica request " + stopReplicaRequest)
@ -107,11 +105,6 @@ class KafkaApis(val requestChannel: RequestChannel,
for(fetchReq <- satisfied) { for(fetchReq <- satisfied) {
val topicData = readMessageSets(fetchReq.fetch) val topicData = readMessageSets(fetchReq.fetch)
val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData) val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData)
val fromFollower = fetchReq.fetch.replicaId != FetchRequest.NonFollowerId
delayedRequestMetrics.recordDelayedFetchSatisfied(
fromFollower, SystemTime.nanoseconds - fetchReq.creationTimeNs, response)
requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response))) requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response)))
} }
} }
@ -120,7 +113,7 @@ class KafkaApis(val requestChannel: RequestChannel,
* Handle a produce request * Handle a produce request
*/ */
def handleProducerRequest(request: RequestChannel.Request) { def handleProducerRequest(request: RequestChannel.Request) {
val produceRequest = ProducerRequest.readFrom(request.request.buffer) val produceRequest = request.requestObj.asInstanceOf[ProducerRequest]
val sTime = SystemTime.milliseconds val sTime = SystemTime.milliseconds
if(requestLogger.isTraceEnabled) if(requestLogger.isTraceEnabled)
requestLogger.trace("Handling producer request " + request.toString) requestLogger.trace("Handling producer request " + request.toString)
@ -179,8 +172,8 @@ class KafkaApis(val requestChannel: RequestChannel,
for(topicData <- request.data) { for(topicData <- request.data) {
for(partitionData <- topicData.partitionDataArray) { for(partitionData <- topicData.partitionDataArray) {
msgIndex += 1 msgIndex += 1
BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordBytesIn(partitionData.messages.sizeInBytes) BrokerTopicStat.getBrokerTopicStat(topicData.topic).bytesInRate.mark(partitionData.messages.sizeInBytes)
BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(partitionData.messages.sizeInBytes) BrokerTopicStat.getBrokerAllTopicStat.bytesInRate.mark(partitionData.messages.sizeInBytes)
try { try {
val localReplica = replicaManager.getLeaderReplicaIfLocal(topicData.topic, partitionData.partition) val localReplica = replicaManager.getLeaderReplicaIfLocal(topicData.topic, partitionData.partition)
val log = localReplica.log.get val log = localReplica.log.get
@ -193,8 +186,8 @@ class KafkaApis(val requestChannel: RequestChannel,
.format(partitionData.messages.sizeInBytes, offsets(msgIndex))) .format(partitionData.messages.sizeInBytes, offsets(msgIndex)))
} catch { } catch {
case e => case e =>
BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordFailedProduceRequest BrokerTopicStat.getBrokerTopicStat(topicData.topic).failedProduceRequestRate.mark()
BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest BrokerTopicStat.getBrokerAllTopicStat.failedProduceRequestRate.mark()
error("Error processing ProducerRequest on %s:%d".format(topicData.topic, partitionData.partition), e) error("Error processing ProducerRequest on %s:%d".format(topicData.topic, partitionData.partition), e)
e match { e match {
case _: IOException => case _: IOException =>
@ -214,7 +207,7 @@ class KafkaApis(val requestChannel: RequestChannel,
* Handle a fetch request * Handle a fetch request
*/ */
def handleFetchRequest(request: RequestChannel.Request) { def handleFetchRequest(request: RequestChannel.Request) {
val fetchRequest = FetchRequest.readFrom(request.request.buffer) val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
if(requestLogger.isTraceEnabled) if(requestLogger.isTraceEnabled)
requestLogger.trace("Handling fetch request " + fetchRequest.toString) requestLogger.trace("Handling fetch request " + fetchRequest.toString)
trace("Handling fetch request " + fetchRequest.toString) trace("Handling fetch request " + fetchRequest.toString)
@ -229,7 +222,7 @@ class KafkaApis(val requestChannel: RequestChannel,
requestChannel.sendResponse(channelResponse) requestChannel.sendResponse(channelResponse)
} }
if(fetchRequest.replicaId != FetchRequest.NonFollowerId) { if(fetchRequest.isFromFollower) {
maybeUpdatePartitionHW(fetchRequest) maybeUpdatePartitionHW(fetchRequest)
// after updating HW, some delayed produce requests may be unblocked // after updating HW, some delayed produce requests may be unblocked
var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce] var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce]
@ -272,7 +265,7 @@ class KafkaApis(val requestChannel: RequestChannel,
debug("Fetching log for topic %s partition %d".format(offsetDetail.topic, offsetDetail.partitions(i))) debug("Fetching log for topic %s partition %d".format(offsetDetail.topic, offsetDetail.partitions(i)))
try { try {
val leader = replicaManager.getLeaderReplicaIfLocal(offsetDetail.topic, offsetDetail.partitions(i)) val leader = replicaManager.getLeaderReplicaIfLocal(offsetDetail.topic, offsetDetail.partitions(i))
val end = if(fetchRequest.replicaId == FetchRequest.NonFollowerId) { val end = if (!fetchRequest.isFromFollower) {
leader.highWatermark leader.highWatermark
} else { } else {
leader.logEndOffset leader.logEndOffset
@ -317,12 +310,12 @@ class KafkaApis(val requestChannel: RequestChannel,
val topic = offsetDetail.topic val topic = offsetDetail.topic
val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes) val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes)
for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) { for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) {
val isFetchFromFollower = fetchRequest.replicaId != FetchRequest.NonFollowerId val isFetchFromFollower = fetchRequest.isFromFollower()
val partitionInfo = val partitionInfo =
try { try {
val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, isFetchFromFollower) val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, isFetchFromFollower)
BrokerTopicStat.getBrokerTopicStat(topic).recordBytesOut(messages.sizeInBytes) BrokerTopicStat.getBrokerTopicStat(topic).bytesOutRate.mark(messages.sizeInBytes)
BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(messages.sizeInBytes) BrokerTopicStat.getBrokerAllTopicStat.bytesOutRate.mark(messages.sizeInBytes)
if (!isFetchFromFollower) { if (!isFetchFromFollower) {
new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages) new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages)
} else { } else {
@ -335,8 +328,8 @@ class KafkaApis(val requestChannel: RequestChannel,
} }
catch { catch {
case e => case e =>
BrokerTopicStat.getBrokerTopicStat(topic).recordFailedFetchRequest BrokerTopicStat.getBrokerTopicStat(topic).failedFetchRequestRate.mark()
BrokerTopicStat.getBrokerAllTopicStat.recordFailedFetchRequest BrokerTopicStat.getBrokerAllTopicStat.failedFetchRequestRate.mark()
error("error when processing request " + (topic, partition, offset, fetchSize), e) error("error when processing request " + (topic, partition, offset, fetchSize), e)
new PartitionData(partition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), new PartitionData(partition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
offset, -1L, MessageSet.Empty) offset, -1L, MessageSet.Empty)
@ -375,7 +368,7 @@ class KafkaApis(val requestChannel: RequestChannel,
* Service the offset request API * Service the offset request API
*/ */
def handleOffsetRequest(request: RequestChannel.Request) { def handleOffsetRequest(request: RequestChannel.Request) {
val offsetRequest = OffsetRequest.readFrom(request.request.buffer) val offsetRequest = request.requestObj.asInstanceOf[OffsetRequest]
if(requestLogger.isTraceEnabled) if(requestLogger.isTraceEnabled)
requestLogger.trace("Handling offset request " + offsetRequest.toString) requestLogger.trace("Handling offset request " + offsetRequest.toString)
trace("Handling offset request " + offsetRequest.toString) trace("Handling offset request " + offsetRequest.toString)
@ -402,7 +395,7 @@ class KafkaApis(val requestChannel: RequestChannel,
* Service the topic metadata request API * Service the topic metadata request API
*/ */
def handleTopicMetadataRequest(request: RequestChannel.Request) { def handleTopicMetadataRequest(request: RequestChannel.Request) {
val metadataRequest = TopicMetadataRequest.readFrom(request.request.buffer) val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
if(requestLogger.isTraceEnabled) if(requestLogger.isTraceEnabled)
requestLogger.trace("Handling topic metadata request " + metadataRequest.toString()) requestLogger.trace("Handling topic metadata request " + metadataRequest.toString())
trace("Handling topic metadata request " + metadataRequest.toString()) trace("Handling topic metadata request " + metadataRequest.toString())
@ -456,7 +449,7 @@ class KafkaApis(val requestChannel: RequestChannel,
def keyLabel: String def keyLabel: String
} }
private [kafka] object MetricKey { private [kafka] object MetricKey {
val globalLabel = "all" val globalLabel = "All"
} }
private [kafka] case class RequestKey(topic: String, partition: Int) private [kafka] case class RequestKey(topic: String, partition: Int)
@ -476,7 +469,6 @@ class KafkaApis(val requestChannel: RequestChannel,
this.logIdent = "[FetchRequestPurgatory-%d], ".format(brokerId) this.logIdent = "[FetchRequestPurgatory-%d], ".format(brokerId)
/** /**
* A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
*/ */
@ -489,8 +481,8 @@ class KafkaApis(val requestChannel: RequestChannel,
def expire(delayed: DelayedFetch) { def expire(delayed: DelayedFetch) {
val topicData = readMessageSets(delayed.fetch) val topicData = readMessageSets(delayed.fetch)
val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData) val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData)
val fromFollower = delayed.fetch.replicaId != FetchRequest.NonFollowerId val fromFollower = delayed.fetch.isFromFollower
delayedRequestMetrics.recordDelayedFetchExpired(fromFollower, response) delayedRequestMetrics.recordDelayedFetchExpired(fromFollower)
requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response))) requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response)))
} }
} }
@ -560,7 +552,6 @@ class KafkaApis(val requestChannel: RequestChannel,
val partitionId = followerFetchRequestKey.partition val partitionId = followerFetchRequestKey.partition
val key = RequestKey(topic, partitionId) val key = RequestKey(topic, partitionId)
val fetchPartitionStatus = partitionStatus(key) val fetchPartitionStatus = partitionStatus(key)
val durationNs = SystemTime.nanoseconds - creationTimeNs
trace("Checking producer request satisfaction for %s-%d, acksPending = %b" trace("Checking producer request satisfaction for %s-%d, acksPending = %b"
.format(topic, partitionId, fetchPartitionStatus.acksPending)) .format(topic, partitionId, fetchPartitionStatus.acksPending))
if (fetchPartitionStatus.acksPending) { if (fetchPartitionStatus.acksPending) {
@ -576,17 +567,12 @@ class KafkaApis(val requestChannel: RequestChannel,
if (!fetchPartitionStatus.acksPending) { if (!fetchPartitionStatus.acksPending) {
val topicData = produce.data.find(_.topic == topic).get val topicData = produce.data.find(_.topic == topic).get
val partitionData = topicData.partitionDataArray.find(_.partition == partitionId).get val partitionData = topicData.partitionDataArray.find(_.partition == partitionId).get
delayedRequestMetrics.recordDelayedProducerKeyCaughtUp(key,
durationNs,
partitionData.sizeInBytes)
maybeUnblockDelayedFetchRequests(topic, Array(partitionData)) maybeUnblockDelayedFetchRequests(topic, Array(partitionData))
} }
} }
// unblocked if there are no partitions with pending acks // unblocked if there are no partitions with pending acks
val satisfied = ! partitionStatus.exists(p => p._2.acksPending) val satisfied = ! partitionStatus.exists(p => p._2.acksPending)
if (satisfied)
delayedRequestMetrics.recordDelayedProduceSatisfied(durationNs)
satisfied satisfied
} }
@ -629,53 +615,18 @@ class KafkaApis(val requestChannel: RequestChannel,
private class DelayedRequestMetrics { private class DelayedRequestMetrics {
private class DelayedProducerRequestMetrics(keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup { private class DelayedProducerRequestMetrics(keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup {
val caughtUpFollowerFetchRequestMeter = val expiredRequestMeter = newMeter(keyLabel + "ExpiresPerSecond", "requests", TimeUnit.SECONDS)
newMeter("CaughtUpFollowerFetchRequestsPerSecond-" + keyLabel, "requests", TimeUnit.SECONDS)
val followerCatchUpTimeHistogram = if (keyLabel == MetricKey.globalLabel)
Some(newHistogram("FollowerCatchUpTimeInNs", biased = true))
else None
/*
* Note that throughput is updated on individual key satisfaction.
* Therefore, it is an upper bound on throughput since the
* DelayedProducerRequest may get expired.
*/
val throughputMeter = newMeter("Throughput-" + keyLabel, "bytes", TimeUnit.SECONDS)
val expiredRequestMeter = newMeter("ExpiredRequestsPerSecond-" + keyLabel, "requests", TimeUnit.SECONDS)
val satisfiedRequestMeter = if (keyLabel == MetricKey.globalLabel)
Some(newMeter("SatisfiedRequestsPerSecond", "requests", TimeUnit.SECONDS))
else None
val satisfactionTimeHistogram = if (keyLabel == MetricKey.globalLabel)
Some(newHistogram("SatisfactionTimeInNs", biased = true))
else None
} }
private class DelayedFetchRequestMetrics(forFollower: Boolean, private class DelayedFetchRequestMetrics(forFollower: Boolean) extends KafkaMetricsGroup {
keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup { private val metricPrefix = if (forFollower) "Follower" else "Consumer"
private val metricPrefix = if (forFollower) "Follower" else "NonFollower"
val satisfiedRequestMeter = if (keyLabel == MetricKey.globalLabel) val expiredRequestMeter = newMeter(metricPrefix + "ExpiresPerSecond", "requests", TimeUnit.SECONDS)
Some(newMeter(metricPrefix + "-SatisfiedRequestsPerSecond",
"requests", TimeUnit.SECONDS))
else None
val satisfactionTimeHistogram = if (keyLabel == MetricKey.globalLabel)
Some(newHistogram(metricPrefix + "-SatisfactionTimeInNs", biased = true))
else None
val expiredRequestMeter = if (keyLabel == MetricKey.globalLabel)
Some(newMeter(metricPrefix + "-ExpiredRequestsPerSecond",
"requests", TimeUnit.SECONDS))
else None
val throughputMeter = newMeter("%s-Throughput-%s".format(metricPrefix, keyLabel),
"bytes", TimeUnit.SECONDS)
} }
private val producerRequestMetricsForKey = { private val producerRequestMetricsForKey = {
val valueFactory = (k: MetricKey) => new DelayedProducerRequestMetrics(k.keyLabel) val valueFactory = (k: MetricKey) => new DelayedProducerRequestMetrics(k.keyLabel + "-")
new Pool[MetricKey, DelayedProducerRequestMetrics](Some(valueFactory)) new Pool[MetricKey, DelayedProducerRequestMetrics](Some(valueFactory))
} }
@ -684,74 +635,16 @@ class KafkaApis(val requestChannel: RequestChannel,
private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true) private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true)
private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = false) private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = false)
private val followerFetchRequestMetricsForKey = {
val valueFactory = (k: MetricKey) => new DelayedFetchRequestMetrics(forFollower = true, k.keyLabel)
new Pool[MetricKey, DelayedFetchRequestMetrics](Some(valueFactory))
}
private val nonFollowerFetchRequestMetricsForKey = {
val valueFactory = (k: MetricKey) => new DelayedFetchRequestMetrics(forFollower = false, k.keyLabel)
new Pool[MetricKey, DelayedFetchRequestMetrics](Some(valueFactory))
}
def recordDelayedProducerKeyExpired(key: MetricKey) { def recordDelayedProducerKeyExpired(key: MetricKey) {
val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key) val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key)
List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark()) List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark())
} }
def recordDelayedFetchExpired(forFollower: Boolean) {
def recordDelayedProducerKeyCaughtUp(key: MetricKey, timeToCatchUpNs: Long, bytes: Int) {
val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key)
List(keyMetrics, aggregateProduceRequestMetrics).foreach(m => {
m.caughtUpFollowerFetchRequestMeter.mark()
m.followerCatchUpTimeHistogram.foreach(_.update(timeToCatchUpNs))
m.throughputMeter.mark(bytes)
})
}
def recordDelayedProduceSatisfied(timeToSatisfyNs: Long) {
aggregateProduceRequestMetrics.satisfiedRequestMeter.foreach(_.mark())
aggregateProduceRequestMetrics.satisfactionTimeHistogram.foreach(_.update(timeToSatisfyNs))
}
private def recordDelayedFetchThroughput(forFollower: Boolean, response: FetchResponse) {
val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics
else aggregateNonFollowerFetchRequestMetrics
metrics.throughputMeter.mark(response.sizeInBytes)
response.topicMap.foreach(topicAndData => {
val topic = topicAndData._1
topicAndData._2.partitionDataArray.foreach(partitionData => {
val key = RequestKey(topic, partitionData.partition)
val keyMetrics = if (forFollower)
followerFetchRequestMetricsForKey.getAndMaybePut(key)
else
nonFollowerFetchRequestMetricsForKey.getAndMaybePut(key)
keyMetrics.throughputMeter.mark(partitionData.sizeInBytes)
})
})
}
def recordDelayedFetchExpired(forFollower: Boolean, response: FetchResponse) {
val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics
else aggregateNonFollowerFetchRequestMetrics else aggregateNonFollowerFetchRequestMetrics
metrics.expiredRequestMeter.foreach(_.mark()) metrics.expiredRequestMeter.mark()
recordDelayedFetchThroughput(forFollower, response)
}
def recordDelayedFetchSatisfied(forFollower: Boolean, timeToSatisfyNs: Long, response: FetchResponse) {
val aggregateMetrics = if (forFollower) aggregateFollowerFetchRequestMetrics
else aggregateNonFollowerFetchRequestMetrics
aggregateMetrics.satisfactionTimeHistogram.foreach(_.update(timeToSatisfyNs))
aggregateMetrics.satisfiedRequestMeter.foreach(_.mark())
recordDelayedFetchThroughput(forFollower, response)
} }
} }
} }

View File

@ -28,8 +28,10 @@ import org.apache.zookeeper.Watcher.Event.KeeperState
import collection.JavaConversions._ import collection.JavaConversions._
import kafka.utils.{ShutdownableThread, ZkUtils, Logging} import kafka.utils.{ShutdownableThread, ZkUtils, Logging}
import java.lang.Object import java.lang.Object
import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue} import com.yammer.metrics.core.Gauge
import kafka.common.{KafkaException, PartitionOfflineException} import java.util.concurrent.{TimeUnit, LinkedBlockingQueue, BlockingQueue}
import kafka.common.{PartitionOfflineException, KafkaException}
import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
class RequestSendThread(val controllerId: Int, class RequestSendThread(val controllerId: Int,
@ -52,9 +54,9 @@ class RequestSendThread(val controllerId: Int,
receive = channel.receive() receive = channel.receive()
var response: RequestOrResponse = null var response: RequestOrResponse = null
request.requestId.get match { request.requestId.get match {
case RequestKeys.LeaderAndISRRequest => case RequestKeys.LeaderAndIsrKey =>
response = LeaderAndISRResponse.readFrom(receive.buffer) response = LeaderAndISRResponse.readFrom(receive.buffer)
case RequestKeys.StopReplicaRequest => case RequestKeys.StopReplicaKey =>
response = StopReplicaResponse.readFrom(receive.buffer) response = StopReplicaResponse.readFrom(receive.buffer)
} }
trace("got a response %s".format(controllerId, response, toBrokerId)) trace("got a response %s".format(controllerId, response, toBrokerId))
@ -144,7 +146,7 @@ case class ControllerBrokerStateInfo(channel: BlockingChannel,
messageQueue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)], messageQueue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
requestSendThread: RequestSendThread) requestSendThread: RequestSendThread)
class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging { class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup{
this.logIdent = "[Controller " + config.brokerId + "], " this.logIdent = "[Controller " + config.brokerId + "], "
private var isRunning = true private var isRunning = true
private val controllerLock = new Object private val controllerLock = new Object
@ -155,6 +157,13 @@ class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging
private var allPartitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]] = null private var allPartitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]] = null
private var allLeaders: mutable.Map[(String, Int), Int] = null private var allLeaders: mutable.Map[(String, Int), Int] = null
newGauge(
"ActiveControllerCount",
new Gauge[Int] {
def value() = if (isActive) 1 else 0
}
)
// Return true if this controller succeeds in the controller leader election // Return true if this controller succeeds in the controller leader election
private def tryToBecomeController(): Boolean = { private def tryToBecomeController(): Boolean = {
val controllerStatus = val controllerStatus =
@ -369,6 +378,7 @@ class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging
} }
else{ else{
warn("during initializing leader of parition (%s, %d), assigned replicas are [%s], live brokers are [%s], no assigned replica is alive".format(topicPartition._1, topicPartition._2, replicaAssignment.mkString(","), liveBrokerIds)) warn("during initializing leader of parition (%s, %d), assigned replicas are [%s], live brokers are [%s], no assigned replica is alive".format(topicPartition._1, topicPartition._2, replicaAssignment.mkString(","), liveBrokerIds))
ControllerStat.offlinePartitionRate.mark()
} }
} }
@ -479,10 +489,13 @@ class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging
debug("No broker is ISR is alive, picking the leader from the alive assigned replicas: %s" debug("No broker is ISR is alive, picking the leader from the alive assigned replicas: %s"
.format(liveAssignedReplicasToThisPartition.mkString(","))) .format(liveAssignedReplicasToThisPartition.mkString(",")))
liveAssignedReplicasToThisPartition.isEmpty match { liveAssignedReplicasToThisPartition.isEmpty match {
case true => throw new PartitionOfflineException(("No replica for partition " + case true =>
"([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, liveBrokerIds)) + ControllerStat.offlinePartitionRate.mark()
" Assigned replicas are: [%s]".format(assignedReplicas)) throw new PartitionOfflineException(("No replica for partition " +
"([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, liveBrokerIds)) +
" Assigned replicas are: [%s]".format(assignedReplicas))
case false => case false =>
ControllerStat.uncleanLeaderElectionRate.mark()
val newLeader = liveAssignedReplicasToThisPartition.head val newLeader = liveAssignedReplicasToThisPartition.head
warn("No broker in ISR is alive, elected leader from the alive replicas is [%s], ".format(newLeader) + warn("No broker in ISR is alive, elected leader from the alive replicas is [%s], ".format(newLeader) +
"There's potential data loss") "There's potential data loss")
@ -509,18 +522,20 @@ class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging
class BrokerChangeListener() extends IZkChildListener with Logging { class BrokerChangeListener() extends IZkChildListener with Logging {
this.logIdent = "[Controller " + config.brokerId + "], " this.logIdent = "[Controller " + config.brokerId + "], "
def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) { def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
controllerLock synchronized { ControllerStat.leaderElectionTimer.time {
val curBrokerIds = currentBrokerList.map(_.toInt).toSet controllerLock synchronized {
val newBrokerIds = curBrokerIds -- liveBrokerIds val curBrokerIds = currentBrokerList.map(_.toInt).toSet
val newBrokers = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get) val newBrokerIds = curBrokerIds -- liveBrokerIds
val deletedBrokerIds = liveBrokerIds -- curBrokerIds val newBrokers = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get) val deletedBrokerIds = liveBrokerIds -- curBrokerIds
liveBrokerIds = liveBrokers.map(_.id) liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
info("Newly added brokers: %s, deleted brokers: %s, all brokers: %s" liveBrokerIds = liveBrokers.map(_.id)
.format(newBrokerIds.mkString(","), deletedBrokerIds.mkString(","), liveBrokerIds.mkString(","))) info("Newly added brokers: %s, deleted brokers: %s, all brokers: %s"
newBrokers.foreach(controllerChannelManager.addBroker(_)) .format(newBrokerIds.mkString(","), deletedBrokerIds.mkString(","), liveBrokerIds.mkString(",")))
deletedBrokerIds.foreach(controllerChannelManager.removeBroker(_)) newBrokers.foreach(controllerChannelManager.addBroker(_))
onBrokerChange(newBrokerIds) deletedBrokerIds.foreach(controllerChannelManager.removeBroker(_))
onBrokerChange(newBrokerIds)
}
} }
} }
} }
@ -591,4 +606,10 @@ class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging
} }
} }
} }
} }
object ControllerStat extends KafkaMetricsGroup {
val offlinePartitionRate = newMeter("OfflinePartitionsPerSec", "partitions", TimeUnit.SECONDS)
val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS)
val leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
}

View File

@ -19,7 +19,8 @@ package kafka.server
import kafka.network._ import kafka.network._
import kafka.utils._ import kafka.utils._
import java.util.concurrent.atomic.AtomicLong import kafka.metrics.KafkaMetricsGroup
import java.util.concurrent.TimeUnit
/** /**
* A thread that answers kafka requests. * A thread that answers kafka requests.
@ -30,10 +31,11 @@ class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestCha
def run() { def run() {
while(true) { while(true) {
val req = requestChannel.receiveRequest() val req = requestChannel.receiveRequest()
if(req == RequestChannel.AllDone){ if(req eq RequestChannel.AllDone){
trace("receives shut down command, shut down".format(brokerId, id)) trace("receives shut down command, shut down".format(brokerId, id))
return return
} }
req.dequeueTimeNs = SystemTime.nanoseconds
debug("handles request " + req) debug("handles request " + req)
apis.handle(req) apis.handle(req)
} }
@ -63,62 +65,24 @@ class KafkaRequestHandlerPool(val brokerId: Int,
thread.join thread.join
info("shutted down completely") info("shutted down completely")
} }
} }
trait BrokerTopicStatMBean { class BrokerTopicMetrics(name: String) extends KafkaMetricsGroup {
def getMessagesIn: Long val messagesInRate = newMeter(name + "MessagesInPerSec", "messages", TimeUnit.SECONDS)
def getBytesIn: Long val bytesInRate = newMeter(name + "BytesInPerSec", "bytes", TimeUnit.SECONDS)
def getBytesOut: Long val bytesOutRate = newMeter(name + "BytesOutPerSec", "bytes", TimeUnit.SECONDS)
def getFailedProduceRequest: Long val failedProduceRequestRate = newMeter(name + "FailedProduceRequestsPerSec", "requests", TimeUnit.SECONDS)
def getFailedFetchRequest: Long val failedFetchRequestRate = newMeter(name + "FailedFetchRequestsPerSec", "requests", TimeUnit.SECONDS)
}
@threadsafe
class BrokerTopicStat extends BrokerTopicStatMBean {
private val numCumulatedMessagesIn = new AtomicLong(0)
private val numCumulatedBytesIn = new AtomicLong(0)
private val numCumulatedBytesOut = new AtomicLong(0)
private val numCumulatedFailedProduceRequests = new AtomicLong(0)
private val numCumulatedFailedFetchRequests = new AtomicLong(0)
def getMessagesIn: Long = numCumulatedMessagesIn.get
def recordMessagesIn(nMessages: Int) = numCumulatedMessagesIn.getAndAdd(nMessages)
def getBytesIn: Long = numCumulatedBytesIn.get
def recordBytesIn(nBytes: Long) = numCumulatedBytesIn.getAndAdd(nBytes)
def getBytesOut: Long = numCumulatedBytesOut.get
def recordBytesOut(nBytes: Long) = numCumulatedBytesOut.getAndAdd(nBytes)
def recordFailedProduceRequest = numCumulatedFailedProduceRequests.getAndIncrement
def getFailedProduceRequest = numCumulatedFailedProduceRequests.get()
def recordFailedFetchRequest = numCumulatedFailedFetchRequests.getAndIncrement
def getFailedFetchRequest = numCumulatedFailedFetchRequests.get()
} }
object BrokerTopicStat extends Logging { object BrokerTopicStat extends Logging {
private val stats = new Pool[String, BrokerTopicStat] private val valueFactory = (k: String) => new BrokerTopicMetrics(k)
private val allTopicStat = new BrokerTopicStat private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory))
Utils.registerMBean(allTopicStat, "kafka:type=kafka.BrokerAllTopicStat") private val allTopicStat = new BrokerTopicMetrics("AllTopics")
def getBrokerAllTopicStat(): BrokerTopicStat = allTopicStat def getBrokerAllTopicStat(): BrokerTopicMetrics = allTopicStat
def getBrokerTopicStat(topic: String): BrokerTopicStat = { def getBrokerTopicStat(topic: String): BrokerTopicMetrics = {
var stat = stats.get(topic) stats.getAndMaybePut(topic + "-")
if (stat == null) {
stat = new BrokerTopicStat
if (stats.putIfNotExists(topic, stat) == null)
Utils.registerMBean(stat, "kafka:type=kafka.BrokerTopicStat." + topic)
else
stat = stats.get(topic)
}
return stat
} }
} }

View File

@ -18,7 +18,7 @@
package kafka.server package kafka.server
import java.io.File import java.io.File
import kafka.network.{SocketServerStats, SocketServer} import kafka.network.SocketServer
import kafka.log.LogManager import kafka.log.LogManager
import kafka.utils._ import kafka.utils._
import java.util.concurrent._ import java.util.concurrent._
@ -34,7 +34,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
val CleanShutdownFile = ".kafka_cleanshutdown" val CleanShutdownFile = ".kafka_cleanshutdown"
private var isShuttingDown = new AtomicBoolean(false) private var isShuttingDown = new AtomicBoolean(false)
private var shutdownLatch = new CountDownLatch(1) private var shutdownLatch = new CountDownLatch(1)
private val statsMBeanName = "kafka:type=kafka.SocketServerStats"
var socketServer: SocketServer = null var socketServer: SocketServer = null
var requestHandlerPool: KafkaRequestHandlerPool = null var requestHandlerPool: KafkaRequestHandlerPool = null
var logManager: LogManager = null var logManager: LogManager = null
@ -82,8 +81,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
socketServer.startup socketServer.startup
Utils.registerMBean(socketServer.stats, statsMBeanName)
/* start client */ /* start client */
kafkaZookeeper = new KafkaZooKeeper(config) kafkaZookeeper = new KafkaZooKeeper(config)
// starting relevant replicas and leader election for partitions assigned to this broker // starting relevant replicas and leader election for partitions assigned to this broker
@ -123,7 +120,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
replicaManager.shutdown() replicaManager.shutdown()
if (socketServer != null) if (socketServer != null)
socketServer.shutdown() socketServer.shutdown()
Utils.unregisterMBean(statsMBeanName)
if(logManager != null) if(logManager != null)
logManager.shutdown() logManager.shutdown()
@ -144,8 +140,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
def awaitShutdown(): Unit = shutdownLatch.await() def awaitShutdown(): Unit = shutdownLatch.await()
def getLogManager(): LogManager = logManager def getLogManager(): LogManager = logManager
def getStats(): SocketServerStats = socketServer.stats
} }

View File

@ -24,13 +24,16 @@ import kafka.utils._
import kafka.log.LogManager import kafka.log.LogManager
import kafka.api.{LeaderAndIsrRequest, LeaderAndIsr} import kafka.api.{LeaderAndIsrRequest, LeaderAndIsr}
import kafka.common.{UnknownTopicOrPartitionException, LeaderNotAvailableException, ErrorMapping} import kafka.common.{UnknownTopicOrPartitionException, LeaderNotAvailableException, ErrorMapping}
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
import java.util.concurrent.TimeUnit
object ReplicaManager { object ReplicaManager {
val UnknownLogEndOffset = -1L val UnknownLogEndOffset = -1L
} }
class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient, kafkaScheduler: KafkaScheduler, class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient, kafkaScheduler: KafkaScheduler,
val logManager: LogManager) extends Logging { val logManager: LogManager) extends Logging with KafkaMetricsGroup {
private val allPartitions = new Pool[(String, Int), Partition] private val allPartitions = new Pool[(String, Int), Partition]
private var leaderPartitions = new mutable.HashSet[Partition]() private var leaderPartitions = new mutable.HashSet[Partition]()
private val leaderPartitionsLock = new Object private val leaderPartitionsLock = new Object
@ -41,6 +44,26 @@ class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient
val highWatermarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir) val highWatermarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir)
info("Created highwatermark file %s".format(highWatermarkCheckpoint.name)) info("Created highwatermark file %s".format(highWatermarkCheckpoint.name))
newGauge(
"LeaderCount",
new Gauge[Int] {
def value() = leaderPartitions.size
}
)
newGauge(
"UnderReplicatedPartitions",
new Gauge[Int] {
def value() = {
leaderPartitionsLock synchronized {
leaderPartitions.count(_.isUnderReplicated)
}
}
}
)
val isrExpandRate = newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS)
val isrShrinkRate = newMeter("ISRShrinksPerSec", "shrinks", TimeUnit.SECONDS)
def startHighWaterMarksCheckPointThread() = { def startHighWaterMarksCheckPointThread() = {
if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true)) if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.defaultFlushIntervalMs) kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.defaultFlushIntervalMs)

View File

@ -33,7 +33,6 @@ import kafka.metrics.KafkaMetricsGroup
* for example a key could be a (topic, partition) pair. * for example a key could be a (topic, partition) pair.
*/ */
class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, delayMs: Long) extends DelayedItem[RequestChannel.Request](request, delayMs) { class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, delayMs: Long) extends DelayedItem[RequestChannel.Request](request, delayMs) {
val creationTimeNs = SystemTime.nanoseconds
val satisfied = new AtomicBoolean(false) val satisfied = new AtomicBoolean(false)
} }
@ -67,32 +66,13 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
/* a list of requests watching each key */ /* a list of requests watching each key */
private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers)) private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers))
private val numDelayedRequestsBeanName = "NumDelayedRequests"
private val timeToSatisfyHistogramBeanName = "TimeToSatisfyInNs"
private val satisfactionRateBeanName = "SatisfactionRate"
private val expirationRateBeanName = "ExpirationRate"
val satisfactionRateMeter = newMeter(
satisfactionRateBeanName,
"requests",
TimeUnit.SECONDS
)
val timeToSatisfyHistogram = newHistogram(timeToSatisfyHistogramBeanName, biased = true)
newGauge( newGauge(
numDelayedRequestsBeanName, "NumDelayedRequests",
new Gauge[Int] { new Gauge[Int] {
def value() = expiredRequestReaper.unsatisfied.get() def value() = expiredRequestReaper.unsatisfied.get()
} }
) )
val expirationRateMeter = newMeter(
expirationRateBeanName,
"requests",
TimeUnit.SECONDS
)
/* background thread expiring requests that have been waiting too long */ /* background thread expiring requests that have been waiting too long */
private val expiredRequestReaper = new ExpiredRequestReaper private val expiredRequestReaper = new ExpiredRequestReaper
private val expirationThread = Utils.daemonThread("request-expiration-task", expiredRequestReaper) private val expirationThread = Utils.daemonThread("request-expiration-task", expiredRequestReaper)
@ -196,10 +176,6 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
iter.remove() iter.remove()
val updated = curr.satisfied.compareAndSet(false, true) val updated = curr.satisfied.compareAndSet(false, true)
if(updated == true) { if(updated == true) {
val requestNs = SystemTime.nanoseconds - curr.creationTimeNs
satisfactionRateMeter.mark()
timeToSatisfyHistogram.update(requestNs)
response += curr response += curr
liveCount -= 1 liveCount -= 1
expiredRequestReaper.satisfyRequest() expiredRequestReaper.satisfyRequest()
@ -282,7 +258,6 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0) exten
val curr = delayed.take() val curr = delayed.take()
val updated = curr.satisfied.compareAndSet(false, true) val updated = curr.satisfied.compareAndSet(false, true)
if(updated) { if(updated) {
expirationRateMeter.mark()
unsatisfied.getAndDecrement() unsatisfied.getAndDecrement()
for(key <- curr.keys) for(key <- curr.keys)
watchersFor(key).decLiveCount() watchersFor(key).decLiveCount()

View File

@ -21,12 +21,14 @@ import java.util.ArrayList
import java.util.concurrent._ import java.util.concurrent._
import collection.JavaConversions import collection.JavaConversions
import kafka.common.KafkaException import kafka.common.KafkaException
import java.lang.Object
class Pool[K,V](valueFactory: Option[(K) => V] = None) extends Iterable[(K, V)] { class Pool[K,V](valueFactory: Option[(K) => V] = None) extends Iterable[(K, V)] {
private val pool = new ConcurrentHashMap[K, V] private val pool = new ConcurrentHashMap[K, V]
private val createLock = new Object
def this(m: collection.Map[K, V]) { def this(m: collection.Map[K, V]) {
this() this()
m.foreach(kv => pool.put(kv._1, kv._2)) m.foreach(kv => pool.put(kv._1, kv._2))
@ -52,8 +54,12 @@ class Pool[K,V](valueFactory: Option[(K) => V] = None) extends Iterable[(K, V)]
throw new KafkaException("Empty value factory in pool.") throw new KafkaException("Empty value factory in pool.")
val curr = pool.get(key) val curr = pool.get(key)
if (curr == null) { if (curr == null) {
pool.putIfAbsent(key, valueFactory.get(key)) createLock synchronized {
pool.get(key) val curr = pool.get(key)
if (curr == null)
pool.put(key, valueFactory.get(key))
pool.get(key)
}
} }
else else
curr curr

View File

@ -20,7 +20,6 @@ package kafka.utils
import java.io._ import java.io._
import java.nio._ import java.nio._
import java.nio.channels._ import java.nio.channels._
import java.util.concurrent.atomic._
import java.lang.management._ import java.lang.management._
import java.util.zip.CRC32 import java.util.zip.CRC32
import javax.management._ import javax.management._
@ -685,100 +684,6 @@ object Utils extends Logging {
for (forever <- Stream.continually(1); t <- coll) yield t for (forever <- Stream.continually(1); t <- coll) yield t
stream.iterator stream.iterator
} }
}
class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * 1000L) {
private val time: Time = SystemTime
private val complete = new AtomicReference(new Stats())
private val current = new AtomicReference(new Stats())
private val total = new AtomicLong(0)
private val numCumulatedRequests = new AtomicLong(0)
def recordRequestMetric(requestNs: Long) {
val stats = current.get
stats.add(requestNs)
total.getAndAdd(requestNs)
numCumulatedRequests.getAndAdd(1)
val ageNs = time.nanoseconds - stats.start
// if the current stats are too old it is time to swap
if(ageNs >= monitorDurationNs) {
val swapped = current.compareAndSet(stats, new Stats())
if(swapped) {
complete.set(stats)
stats.end.set(time.nanoseconds)
}
}
}
def recordThroughputMetric(data: Long) {
val stats = current.get
stats.addData(data)
val ageNs = time.nanoseconds - stats.start
// if the current stats are too old it is time to swap
if(ageNs >= monitorDurationNs) {
val swapped = current.compareAndSet(stats, new Stats())
if(swapped) {
complete.set(stats)
stats.end.set(time.nanoseconds)
}
}
}
def getNumRequests(): Long = numCumulatedRequests.get
def getRequestsPerSecond: Double = {
val stats = complete.get
stats.numRequests / stats.durationSeconds
}
def getThroughput: Double = {
val stats = complete.get
stats.totalData / stats.durationSeconds
}
def getAvgMetric: Double = {
val stats = complete.get
if (stats.numRequests == 0) {
0
}
else {
stats.totalRequestMetric / stats.numRequests
}
}
def getTotalMetric: Long = total.get
def getMaxMetric: Double = complete.get.maxRequestMetric
class Stats {
val start = time.nanoseconds
var end = new AtomicLong(-1)
var numRequests = 0
var totalRequestMetric: Long = 0L
var maxRequestMetric: Long = 0L
var totalData: Long = 0L
private val lock = new Object()
def addData(data: Long) {
lock synchronized {
totalData += data
}
}
def add(requestNs: Long) {
lock synchronized {
numRequests +=1
totalRequestMetric += requestNs
maxRequestMetric = scala.math.max(maxRequestMetric, requestNs)
}
}
def durationSeconds: Double = (end.get - start) / (1000.0 * 1000.0 * 1000.0)
def durationMs: Double = (end.get - start) / (1000.0 * 1000.0)
}
} }
/** /**

View File

@ -29,8 +29,7 @@ import kafka.utils.TestUtils
import kafka.utils.TestUtils._ import kafka.utils.TestUtils._
import kafka.server.{ReplicaManager, KafkaApis, KafkaConfig} import kafka.server.{ReplicaManager, KafkaApis, KafkaConfig}
import kafka.common.ErrorMapping import kafka.common.ErrorMapping
import kafka.api.{TopicMetadata, TopicMetaDataResponse, TopicMetadataRequest} import kafka.api.{RequestKeys, TopicMetadata, TopicMetaDataResponse, TopicMetadataRequest}
class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
val props = createBrokerConfigs(1) val props = createBrokerConfigs(1)
@ -106,29 +105,20 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
// create a topic metadata request // create a topic metadata request
val topicMetadataRequest = new TopicMetadataRequest(List(topic)) val topicMetadataRequest = new TopicMetadataRequest(List(topic))
val serializedMetadataRequest = ByteBuffer.allocate(topicMetadataRequest.sizeInBytes + 2) val serializedMetadataRequest = TestUtils.createRequestByteBuffer(topicMetadataRequest)
topicMetadataRequest.writeTo(serializedMetadataRequest)
serializedMetadataRequest.rewind()
// create the kafka request handler // create the kafka request handler
val requestChannel = new RequestChannel(2, 5) val requestChannel = new RequestChannel(2, 5)
val apis = new KafkaApis(requestChannel, replicaManager, zkClient, 1) val apis = new KafkaApis(requestChannel, replicaManager, zkClient, 1)
// mock the receive API to return the request buffer as created above
val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])
EasyMock.expect(receivedRequest.buffer).andReturn(serializedMetadataRequest)
EasyMock.replay(receivedRequest)
// call the API (to be tested) to get metadata // call the API (to be tested) to get metadata
apis.handleTopicMetadataRequest(new RequestChannel.Request(processor=0, requestKey=5, request=receivedRequest, start=1)) apis.handleTopicMetadataRequest(new RequestChannel.Request
val metadataResponse = requestChannel.receiveResponse(0).response.asInstanceOf[BoundedByteBufferSend].buffer (processor=0, requestKey=RequestKeys.MetadataKey, buffer=serializedMetadataRequest, startTimeNs=1))
val metadataResponse = requestChannel.receiveResponse(0).responseSend.asInstanceOf[BoundedByteBufferSend].buffer
// check assertions // check assertions
val topicMetadata = TopicMetaDataResponse.readFrom(metadataResponse).topicsMetadata val topicMetadata = TopicMetaDataResponse.readFrom(metadataResponse).topicsMetadata
// verify the expected calls to log manager occurred in the right order
EasyMock.verify(receivedRequest)
topicMetadata topicMetadata
} }
} }

View File

@ -24,6 +24,9 @@ import org.scalatest.junit.JUnitSuite
import kafka.utils.TestUtils import kafka.utils.TestUtils
import java.util.Random import java.util.Random
import junit.framework.Assert._ import junit.framework.Assert._
import kafka.producer.SyncProducerConfig
import kafka.api.{TopicData, ProducerRequest}
import java.nio.ByteBuffer
class SocketServerTest extends JUnitSuite { class SocketServerTest extends JUnitSuite {
@ -54,9 +57,9 @@ class SocketServerTest extends JUnitSuite {
/* A simple request handler that just echos back the response */ /* A simple request handler that just echos back the response */
def processRequest(channel: RequestChannel) { def processRequest(channel: RequestChannel) {
val request = channel.receiveRequest val request = channel.receiveRequest
val id = request.request.buffer.getShort val id = request.buffer.getShort
val send = new BoundedByteBufferSend(request.request.buffer.slice) val send = new BoundedByteBufferSend(request.buffer.slice)
channel.sendResponse(new RequestChannel.Response(request.processor, request.requestKey, send, request.start, 15)) channel.sendResponse(new RequestChannel.Response(request.processor, request, send))
} }
def connect() = new Socket("localhost", server.port) def connect() = new Socket("localhost", server.port)
@ -69,10 +72,21 @@ class SocketServerTest extends JUnitSuite {
@Test @Test
def simpleRequest() { def simpleRequest() {
val socket = connect() val socket = connect()
sendRequest(socket, 0, "hello".getBytes) val correlationId = SyncProducerConfig.DefaultCorrelationId
val clientId = SyncProducerConfig.DefaultClientId
val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
val ack = SyncProducerConfig.DefaultRequiredAcks
val emptyRequest = new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Array[TopicData]())
val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes())
emptyRequest.writeTo(byteBuffer)
byteBuffer.rewind()
val serializedBytes = new Array[Byte](byteBuffer.remaining)
byteBuffer.get(serializedBytes)
sendRequest(socket, 0, serializedBytes)
processRequest(server.requestChannel) processRequest(server.requestChannel)
val response = new String(receiveResponse(socket)) assertEquals(serializedBytes.toSeq, receiveResponse(socket).toSeq)
assertEquals("hello", response)
} }
@Test(expected=classOf[IOException]) @Test(expected=classOf[IOException])

View File

@ -16,16 +16,15 @@
*/ */
package kafka.server package kafka.server
import java.nio.ByteBuffer
import kafka.api.{FetchRequest, FetchRequestBuilder}
import kafka.cluster.{Partition, Replica} import kafka.cluster.{Partition, Replica}
import kafka.log.Log import kafka.log.Log
import kafka.message.{ByteBufferMessageSet, Message} import kafka.message.{ByteBufferMessageSet, Message}
import kafka.network.{BoundedByteBufferReceive, RequestChannel} import kafka.network.RequestChannel
import kafka.utils.{Time, TestUtils, MockTime} import kafka.utils.{Time, TestUtils, MockTime}
import org.easymock.EasyMock import org.easymock.EasyMock
import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.ZkClient
import org.scalatest.junit.JUnit3Suite import org.scalatest.junit.JUnit3Suite
import kafka.api.{FetchRequest, FetchRequestBuilder}
class SimpleFetchTest extends JUnit3Suite { class SimpleFetchTest extends JUnit3Suite {
@ -92,16 +91,10 @@ class SimpleFetchTest extends JUnit3Suite {
.replicaId(FetchRequest.NonFollowerId) .replicaId(FetchRequest.NonFollowerId)
.addFetch(topic, partitionId, 0, hw*2) .addFetch(topic, partitionId, 0, hw*2)
.build() .build()
val goodFetchBB = ByteBuffer.allocate(goodFetch.sizeInBytes) val goodFetchBB = TestUtils.createRequestByteBuffer(goodFetch)
goodFetch.writeTo(goodFetchBB)
goodFetchBB.rewind()
val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])
EasyMock.expect(receivedRequest.buffer).andReturn(goodFetchBB)
EasyMock.replay(receivedRequest)
// send the request // send the request
apis.handleFetchRequest(new RequestChannel.Request(processor=1, requestKey=5, request=receivedRequest, start=1)) apis.handleFetchRequest(new RequestChannel.Request(processor=1, requestKey=5, buffer=goodFetchBB, startTimeNs=1))
// make sure the log only reads bytes between 0->HW (5) // make sure the log only reads bytes between 0->HW (5)
EasyMock.verify(log) EasyMock.verify(log)
@ -170,16 +163,10 @@ class SimpleFetchTest extends JUnit3Suite {
.addFetch(topic, partitionId, followerLEO, Integer.MAX_VALUE) .addFetch(topic, partitionId, followerLEO, Integer.MAX_VALUE)
.build() .build()
val fetchRequest = ByteBuffer.allocate(bigFetch.sizeInBytes) val fetchRequestBB = TestUtils.createRequestByteBuffer(bigFetch)
bigFetch.writeTo(fetchRequest)
fetchRequest.rewind()
val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])
EasyMock.expect(receivedRequest.buffer).andReturn(fetchRequest)
EasyMock.replay(receivedRequest)
// send the request // send the request
apis.handleFetchRequest(new RequestChannel.Request(processor=0, requestKey=5, request=receivedRequest, start=1)) apis.handleFetchRequest(new RequestChannel.Request(processor=0, requestKey=5, buffer=fetchRequestBB, startTimeNs=1))
/** /**
* Make sure the log satisfies the fetch from a follower by reading data beyond the HW, mainly all bytes after * Make sure the log satisfies the fetch from a follower by reading data beyond the HW, mainly all bytes after

View File

@ -468,6 +468,14 @@ object TestUtils extends Logging {
} }
} }
} }
def createRequestByteBuffer(request: RequestOrResponse): ByteBuffer = {
val byteBuffer = ByteBuffer.allocate(request.sizeInBytes + 2)
byteBuffer.putShort(request.requestId.get)
request.writeTo(byteBuffer)
byteBuffer.rewind()
byteBuffer
}
} }
object TestZKUtils { object TestZKUtils {

View File

@ -4,136 +4,118 @@
"role": "broker", "role": "broker",
"graphs": [ "graphs": [
{ {
"graph_name": "SocketServerThroughput", "graph_name": "Produce-Request-Rate",
"y_label": "bytes-read-per-second,bytes-written-per-second", "y_label": "requests-per-sec",
"bean_name": "kafka:type=kafka.SocketServerStats", "bean_name": "kafka.network:type=RequestMetrics,name=Produce-RequestsPerSec",
"attributes": "BytesReadPerSecond,BytesWrittenPerSecond" "attributes": "OneMinuteRate"
}, },
{ {
"graph_name": "FetchRequestPurgatoryNumDelayedRequests", "graph_name": "Produce-Request-Time",
"y_label": "num-delayed-requests", "y_label": "ns,ns",
"bean_name": "kafka.server:type=FetchRequestPurgatory,name=NumDelayedRequests", "bean_name": "kafka.network:type=RequestMetrics,name=Produce-TotalTimeNs",
"attributes": "Value" "attributes": "Mean,99thPercentile"
}, },
{ {
"graph_name": "MeanFetchRequestPurgatorySatisfactionRate", "graph_name": "Produce-Request-Remote-Time",
"y_label": "mean-request-satisfaction-rate", "y_label": "ns,ns",
"bean_name": "kafka.server:type=FetchRequestPurgatory,name=SatisfactionRate", "bean_name": "kafka.network:type=RequestMetrics,name=Produce-RemoteTimeNs",
"attributes": "MeanRate" "attributes": "Mean,99thPercentile"
}, },
{ {
"graph_name": "FetchRequestPurgatoryTimeToSatisfy", "graph_name": "Fetch-Consumer-Request-Rate",
"y_label": "mean-time-to-satisfy-ns,95th-percentile-time-to-satisfy-ns,99th-percentile-time-to-satisfy-ns,999th-percentile-time-to-satisfy-ns", "y_label": "requests-per-sec",
"bean_name": "kafka.server:type=FetchRequestPurgatory,name=TimeToSatisfyInNs", "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Consumer-RequestsPerSec",
"attributes": "Mean,95thPercentile,99thPercentile,999thPercentile" "attributes": "OneMinuteRate"
}, },
{ {
"graph_name": "FetchRequestPurgatoryExpirationRate", "graph_name": "Fetch-Consumer-Request-Time",
"y_label": "expiration-rate", "y_label": "ns,ns",
"bean_name": "kafka.server:type=FetchRequestPurgatory,name=ExpirationRate", "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Consumer-TotalTimeNs",
"attributes": "MeanRate" "attributes": "Mean,99thPercentile"
},
{
"graph_name": "Fetch-Consumer-Request-Remote-Time",
"y_label": "ns,ns",
"bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Consumer-RemoteTimeNs",
"attributes": "Mean,99thPercentile"
},
{
"graph_name": "Fetch-Follower-Request-Rate",
"y_label": "requests-per-sec",
"bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Follower-RequestsPerSec",
"attributes": "OneMinuteRate"
},
{
"graph_name": "Fetch-Follower-Request-Time",
"y_label": "ns,ns",
"bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Follower-TotalTimeNs",
"attributes": "Mean,99thPercentile"
},
{
"graph_name": "Fetch-Follower-Request-Remote-Time",
"y_label": "ns,ns",
"bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Follower-RemoteTimeNs",
"attributes": "Mean,99thPercentile"
},
{
"graph_name": "ProducePurgatoryExpirationRate",
"y_label": "expirations-per-sec",
"bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=AllExpiresPerSecond",
"attributes": "OneMinuteRate"
},
{
"graph_name": "FetchConsumerPurgatoryExpirationRate",
"y_label": "expirations-per-sec",
"bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=ConsumerExpiresPerSecond",
"attributes": "OneMinuteRate"
}, },
{ {
"graph_name": "ProducerRequestPurgatoryNumDelayedRequests", "graph_name": "FetchFollowerPurgatoryExpirationRate",
"y_label": "num-delayed-requests", "y_label": "expirations-per-sec",
"bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=FollowerExpiresPerSecond",
"attributes": "OneMinuteRate"
},
{
"graph_name": "ProducePurgatoryQueueSize",
"y_label": "size",
"bean_name": "kafka.server:type=ProducerRequestPurgatory,name=NumDelayedRequests", "bean_name": "kafka.server:type=ProducerRequestPurgatory,name=NumDelayedRequests",
"attributes": "Value" "attributes": "Value"
}, },
{ {
"graph_name": "MeanProducerRequestPurgatorySatisfactionRate", "graph_name": "FetchPurgatoryQueueSize",
"y_label": "mean-request-satisfaction-rate", "y_label": "size",
"bean_name": "kafka.server:type=ProducerRequestPurgatory,name=SatisfactionRate", "bean_name": "kafka.server:type=FetchRequestPurgatory,name=NumDelayedRequests",
"attributes": "MeanRate" "attributes": "Value"
}, },
{ {
"graph_name": "ProducerRequestPurgatoryExpirationRate", "graph_name": "ControllerLeaderElectionRateAndTime",
"y_label": "expiration-rate", "y_label": "elections-per-sec,ms,ms",
"bean_name": "kafka.server:type=ProducerRequestPurgatory,name=ExpirationRate", "bean_name": "kafka.server:type=ControllerStat,name=LeaderElectionRateAndTimeMs",
"attributes": "MeanRate" "attributes": "OneMinuteRate,Mean,99thPercentile"
}, },
{ {
"graph_name": "DelayedProducerRequests-CaughtUpFollowerFetchRequestsPerSecond", "graph_name": "LogFlushRateAndTime",
"y_label": "mean-caught-up-follower-fetch-requests-per-second", "y_label": "flushes-per-sec,ms,ms",
"bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=CaughtUpFollowerRequestsPerSecond-all", "bean_name": "kafka.message:type=LogFlushStats,name=LogFlushRateAndTimeMs",
"attributes": "MeanRate" "attributes": "OneMinuteRate,Mean,99thPercentile"
}, },
{ {
"graph_name": "DelayedProducerRequests-ExpiredRequestRate", "graph_name": "AllBytesOutRate",
"y_label": "mean-expired-request-rate", "y_label": "bytes-per-sec",
"bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=ExpiredRequestsPerSecond-all", "bean_name": "kafka.server:type=BrokerTopicMetrics,name=AllTopicsBytesOutPerSec",
"attributes": "MeanRate" "attributes": "OneMinuteRate"
}, },
{ {
"graph_name": "DelayedProducerRequests-FollowerCatchUpLatency", "graph_name": "AllBytesInRate",
"y_label": "mean-follower-catchup-time-ns,95th-percentile-follower-catchup-time-ns,99th-percentile-follower-catchup-time-ns,999th-percentile-follower-catchup-time-ns", "y_label": "bytes-per-sec",
"bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=FollowerCatchUpTimeInNs", "bean_name": "kafka.server:type=BrokerTopicMetrics,name=AllTopicsBytesInPerSec",
"attributes": "Mean,95thPercentile,99thPercentile,999thPercentile" "attributes": "OneMinuteRate"
}, },
{ {
"graph_name": "DelayedProducerRequests-SatisfactionTimeInNs", "graph_name": "AllMessagesInRate",
"y_label": "mean-time-to-satisfy-ns,95th-percentile-time-to-satisfy-ns,99th-percentile-time-to-satisfy-ns,999th-percentile-time-to-satisfy-ns", "y_label": "messages-per-sec",
"bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=SatisfactionTimeInNs", "bean_name": "kafka.server:type=BrokerTopicMetrics,name=AllTopicsMessagesInPerSec",
"attributes": "Mean,95thPercentile,99thPercentile,999thPercentile" "attributes": "OneMinuteRate"
},
{
"graph_name": "DelayedProducerRequests-SatisfiedRequestsPerSecond",
"y_label": "mean-satisfaction-requests-per-second",
"bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=SatisfiedRequestsPerSecond",
"attributes": "MeanRate"
},
{
"graph_name": "DelayedProducerRequests-Throughput-all",
"y_label": "mean-purgatory-throughput-all",
"bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=Throughput-all",
"attributes": "MeanRate"
},
{
"graph_name": "DelayedFetchRequests-Follower-ExpiredRequestRate",
"y_label": "mean-expired-request-rate",
"bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=Follower-ExpiredRequestsPerSecond",
"attributes": "MeanRate"
},
{
"graph_name": "DelayedFetchRequests-Follower-SatisfactionTimeInNs",
"y_label": "mean-time-to-satisfy-ns,95th-percentile-time-to-satisfy-ns,99th-percentile-time-to-satisfy-ns,999th-percentile-time-to-satisfy-ns",
"bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=Follower-SatisfactionTimeInNs",
"attributes": "Mean,95thPercentile,99thPercentile,999thPercentile"
},
{
"graph_name": "DelayedProducerRequests-Follower-SatisfiedRequestsPerSecond",
"y_label": "mean-satisfaction-requests-per-second",
"bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=Follower-SatisfiedRequestsPerSecond",
"attributes": "MeanRate"
},
{
"graph_name": "DelayedFetchRequests-Follower-Throughput-all",
"y_label": "mean-purgatory-throughput-all",
"bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=Follower-Throughput-all",
"attributes": "MeanRate"
},
{
"graph_name": "DelayedFetchRequests-NonFollower-ExpiredRequestRate",
"y_label": "mean-expired-request-rate",
"bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=NonFollower-ExpiredRequestsPerSecond",
"attributes": "MeanRate"
},
{
"graph_name": "DelayedFetchRequests-NonFollower-SatisfactionTimeInNs",
"y_label": "mean-time-to-satisfy-ns,95th-percentile-time-to-satisfy-ns,99th-percentile-time-to-satisfy-ns,999th-percentile-time-to-satisfy-ns",
"bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=NonFollower-SatisfactionTimeInNs",
"attributes": "Mean,95thPercentile,99thPercentile,999thPercentile"
},
{
"graph_name": "DelayedFetchRequests-NonFollower-SatisfiedRequestsPerSecond",
"y_label": "mean-satisfaction-requests-per-second",
"bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=NonFollower-SatisfiedRequestsPerSecond",
"attributes": "MeanRate"
},
{
"graph_name": "DelayedFetchRequests-NonFollower-Throughput-all",
"y_label": "mean-purgatory-throughput-all",
"bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=NonFollower-Throughput-all",
"attributes": "MeanRate"
} }
] ]
}, },
@ -141,10 +123,16 @@
"role": "producer_performance", "role": "producer_performance",
"graphs": [ "graphs": [
{ {
"graph_name": "ProducerStats", "graph_name": "ProduceRequestRateAndTime",
"y_label": "avg-producer-latency-ms,max-producer-latency-ms,produce-request-throughput", "y_label": "requests-per-sec,ms,ms",
"bean_name": "kafka:type=kafka.KafkaProducerStats", "bean_name": "kafka.producer:type=ProducerRequestStat,name=ProduceRequestRateAndTimeMs",
"attributes": "AvgProduceRequestMs,MaxProduceRequestMs,ProduceRequestsPerSecond" "attributes": "OneMinuteRate,Mean,99thPercentile"
},
{
"graph_name": "ProduceRequestSize",
"y_label": "bytes,bytes",
"bean_name": "kafka.producer:type=ProducerRequestStat,name=ProducerRequestSize",
"attributes": "Mean,99thPercentile"
} }
] ]
}, },
@ -152,10 +140,22 @@
"role": "console_consumer", "role": "console_consumer",
"graphs": [ "graphs": [
{ {
"graph_name": "SimpleConsumerRequestStats", "graph_name": "FetchRequestRateAndTime",
"y_label": "simple-consumer-throughput,simple-consumer-throughput-bytes,simple-consumer-latency-ms", "y_label": "requests-per-sec,ms,ms",
"bean_name": "kafka:type=kafka.SimpleConsumerStats", "bean_name": "kafka.consumer:type=FetchRequestAndResponseStat,name=FetchRequestRateAndTimeMs",
"attributes": "FetchRequestsPerSecond,ConsumerThroughput,AvgFetchRequestMs" "attributes": "OneMinuteRate,Mean,99thPercentile"
},
{
"graph_name": "FetchResponseSize",
"y_label": "bytes,bytes",
"bean_name": "kafka.consumer:type=FetchRequestAndResponseStat,name=FetchResponseSize",
"attributes": "Mean,99thPercentile"
},
{
"graph_name": "ConsumedMessageRate",
"y_label": "messages-per-sec",
"bean_name": "kafka.consumer:type=ConsumerTopicStat,name=AllTopicsMessagesPerSec",
"attributes": "OneMinuteRate"
} }
] ]
}, },