kafka-1481; Stop using dashes AND underscores as separators in MBean names; patched by Vladimir Tretyakov; reviewed by Joel Koshy and Jun Rao

This commit is contained in:
Vladimir Tretyakov 2014-11-19 17:57:41 -08:00 committed by Jun Rao
parent 1c7d783dd6
commit 457744a820
33 changed files with 538 additions and 336 deletions

View File

@ -263,6 +263,12 @@ project(':core') {
dependsOn 'copyDependantLibs' dependsOn 'copyDependantLibs'
} }
jar.manifest {
attributes(
'Version': "${version}"
)
}
task testJar(type: Jar) { task testJar(type: Jar) {
classifier = 'test' classifier = 'test'
from sourceSets.test.output from sourceSets.test.output

View File

@ -21,7 +21,7 @@ import kafka.admin.AdminUtils
import kafka.utils._ import kafka.utils._
import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.api.{PartitionStateInfo, LeaderAndIsr}
import kafka.log.LogConfig import kafka.log.LogConfig
import kafka.server.{TopicPartitionRequestKey, LogOffsetMetadata, OffsetManager, ReplicaManager} import kafka.server.{LogOffsetMetadata, OffsetManager, ReplicaManager}
import kafka.metrics.KafkaMetricsGroup import kafka.metrics.KafkaMetricsGroup
import kafka.controller.KafkaController import kafka.controller.KafkaController
import kafka.message.ByteBufferMessageSet import kafka.message.ByteBufferMessageSet
@ -29,7 +29,6 @@ import kafka.message.ByteBufferMessageSet
import java.io.IOException import java.io.IOException
import java.util.concurrent.locks.ReentrantReadWriteLock import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.utils.Utils.{inReadLock,inWriteLock} import kafka.utils.Utils.{inReadLock,inWriteLock}
import scala.Some
import scala.collection.immutable.Set import scala.collection.immutable.Set
import com.yammer.metrics.core.Gauge import com.yammer.metrics.core.Gauge
@ -62,13 +61,13 @@ class Partition(val topic: String,
private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId) private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
newGauge( newGauge("UnderReplicated",
topic + "-" + partitionId + "-UnderReplicated",
new Gauge[Int] { new Gauge[Int] {
def value = { def value = {
if (isUnderReplicated) 1 else 0 if (isUnderReplicated) 1 else 0
} }
} },
Map("topic" -> topic, "partition" -> partitionId.toString)
) )
def isUnderReplicated(): Boolean = { def isUnderReplicated(): Boolean = {
@ -345,7 +344,7 @@ class Partition(val topic: String,
leaderReplica.highWatermark = newHighWatermark leaderReplica.highWatermark = newHighWatermark
debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark)) debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark))
// some delayed requests may be unblocked after HW changed // some delayed requests may be unblocked after HW changed
val requestKey = new TopicPartitionRequestKey(this.topic, this.partitionId) val requestKey = new TopicAndPartition(this.topic, this.partitionId)
replicaManager.tryCompleteDelayedFetch(requestKey) replicaManager.tryCompleteDelayedFetch(requestKey)
replicaManager.tryCompleteDelayedProduce(requestKey) replicaManager.tryCompleteDelayedProduce(requestKey)
} else { } else {
@ -415,7 +414,7 @@ class Partition(val topic: String,
val info = log.append(messages, assignOffsets = true) val info = log.append(messages, assignOffsets = true)
// probably unblock some follower fetch requests since log end offset has been updated // probably unblock some follower fetch requests since log end offset has been updated
replicaManager.tryCompleteDelayedFetch(new TopicPartitionRequestKey(this.topic, this.partitionId)) replicaManager.tryCompleteDelayedFetch(new TopicAndPartition(this.topic, this.partitionId))
// we may need to increment high watermark since ISR could be down to 1 // we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(leaderReplica) maybeIncrementLeaderHW(leaderReplica)
info info

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.common
import java.net.URL
import java.util.jar.{Attributes, Manifest}
import com.yammer.metrics.core.Gauge
import kafka.metrics.KafkaMetricsGroup
object AppInfo extends KafkaMetricsGroup {
private var isRegistered = false
private val lock = new Object()
def registerInfo(): Unit = {
lock.synchronized {
if (isRegistered) {
return
}
}
try {
val clazz = AppInfo.getClass
val className = clazz.getSimpleName + ".class"
val classPath = clazz.getResource(className).toString
if (!classPath.startsWith("jar")) {
// Class not from JAR
return
}
val manifestPath = classPath.substring(0, classPath.lastIndexOf("!") + 1) + "/META-INF/MANIFEST.MF"
val mf = new Manifest
mf.read(new URL(manifestPath).openStream())
val version = mf.getMainAttributes.get(new Attributes.Name("Version")).toString
newGauge("Version",
new Gauge[String] {
def value = {
version
}
})
lock.synchronized {
isRegistered = true
}
} catch {
case e: Exception =>
warn("Can't read Kafka version from MANIFEST.MF. Possible cause: %s".format(e))
}
}
}

View File

@ -8,7 +8,7 @@ package kafka.common
* (the "License"); you may not use this file except in compliance with * (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
@ -21,6 +21,14 @@ package kafka.common
* Convenience case class since (clientId, brokerInfo) pairs are used to create * Convenience case class since (clientId, brokerInfo) pairs are used to create
* SyncProducer Request Stats and SimpleConsumer Request and Response Stats. * SyncProducer Request Stats and SimpleConsumer Request and Response Stats.
*/ */
case class ClientIdAndBroker(clientId: String, brokerInfo: String) {
override def toString = "%s-%s".format(clientId, brokerInfo) trait ClientIdBroker {
}
case class ClientIdAndBroker(clientId: String, brokerHost: String, brokerPort: Int) extends ClientIdBroker {
override def toString = "%s-%s-%d".format(clientId, brokerHost, brokerPort)
}
case class ClientIdAllBrokers(clientId: String) extends ClientIdBroker {
override def toString = "%s-%s".format(clientId, "AllBrokers")
} }

View File

@ -1,5 +1,3 @@
package kafka.common
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
@ -17,11 +15,21 @@ package kafka.common
* limitations under the License. * limitations under the License.
*/ */
package kafka.common
/** /**
* Convenience case class since (clientId, topic) pairs are used in the creation * Convenience case class since (clientId, topic) pairs are used in the creation
* of many Stats objects. * of many Stats objects.
*/ */
case class ClientIdAndTopic(clientId: String, topic: String) { trait ClientIdTopic {
}
case class ClientIdAndTopic(clientId: String, topic: String) extends ClientIdTopic {
override def toString = "%s-%s".format(clientId, topic) override def toString = "%s-%s".format(clientId, topic)
} }
case class ClientIdAllTopics(clientId: String) extends ClientIdTopic {
override def toString = "%s-%s".format(clientId, "AllTopics")
}

View File

@ -30,7 +30,7 @@ class ConsumerFetcherThread(name: String,
partitionMap: Map[TopicAndPartition, PartitionTopicInfo], partitionMap: Map[TopicAndPartition, PartitionTopicInfo],
val consumerFetcherManager: ConsumerFetcherManager) val consumerFetcherManager: ConsumerFetcherManager)
extends AbstractFetcherThread(name = name, extends AbstractFetcherThread(name = name,
clientId = config.clientId + "-" + name, clientId = config.clientId,
sourceBroker = sourceBroker, sourceBroker = sourceBroker,
socketTimeout = config.socketTimeoutMs, socketTimeout = config.socketTimeoutMs,
socketBufferSize = config.socketReceiveBufferBytes, socketBufferSize = config.socketReceiveBufferBytes,

View File

@ -20,12 +20,17 @@ package kafka.consumer
import kafka.utils.{Pool, threadsafe, Logging} import kafka.utils.{Pool, threadsafe, Logging}
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import kafka.metrics.KafkaMetricsGroup import kafka.metrics.KafkaMetricsGroup
import kafka.common.ClientIdAndTopic import kafka.common.{ClientIdTopic, ClientIdAllTopics, ClientIdAndTopic}
@threadsafe @threadsafe
class ConsumerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup { class ConsumerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup {
val messageRate = newMeter(metricId + "MessagesPerSec", "messages", TimeUnit.SECONDS) val tags = metricId match {
val byteRate = newMeter(metricId + "BytesPerSec", "bytes", TimeUnit.SECONDS) case ClientIdAndTopic(clientId, topic) => Map("clientId" -> clientId, "topic" -> topic)
case ClientIdAllTopics(clientId) => Map("clientId" -> clientId)
}
val messageRate = newMeter("MessagesPerSec", "messages", TimeUnit.SECONDS, tags)
val byteRate = newMeter("BytesPerSec", "bytes", TimeUnit.SECONDS, tags)
} }
/** /**
@ -35,12 +40,12 @@ class ConsumerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup
class ConsumerTopicStats(clientId: String) extends Logging { class ConsumerTopicStats(clientId: String) extends Logging {
private val valueFactory = (k: ClientIdAndTopic) => new ConsumerTopicMetrics(k) private val valueFactory = (k: ClientIdAndTopic) => new ConsumerTopicMetrics(k)
private val stats = new Pool[ClientIdAndTopic, ConsumerTopicMetrics](Some(valueFactory)) private val stats = new Pool[ClientIdAndTopic, ConsumerTopicMetrics](Some(valueFactory))
private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics")) // to differentiate from a topic named AllTopics private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAllTopics(clientId)) // to differentiate from a topic named AllTopics
def getConsumerAllTopicStats(): ConsumerTopicMetrics = allTopicStats def getConsumerAllTopicStats(): ConsumerTopicMetrics = allTopicStats
def getConsumerTopicStats(topic: String): ConsumerTopicMetrics = { def getConsumerTopicStats(topic: String): ConsumerTopicMetrics = {
stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic + "-")) stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
} }
} }

View File

@ -19,13 +19,21 @@ package kafka.consumer
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import kafka.common.ClientIdAndBroker import kafka.common.{ClientIdAllBrokers, ClientIdBroker, ClientIdAndBroker}
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import kafka.utils.Pool import kafka.utils.Pool
class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup { class FetchRequestAndResponseMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup {
val requestTimer = new KafkaTimer(newTimer(metricId + "FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) val tags = metricId match {
val requestSizeHist = newHistogram(metricId + "FetchResponseSize") case ClientIdAndBroker(clientId, brokerHost, brokerPort) =>
Map("clientId" -> clientId, "brokerHost" -> brokerHost,
"brokerPort" -> brokerPort.toString)
case ClientIdAllBrokers(clientId) =>
Map("clientId" -> clientId)
}
val requestTimer = new KafkaTimer(newTimer("FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, tags))
val requestSizeHist = newHistogram("FetchResponseSize", biased = true, tags)
} }
/** /**
@ -33,14 +41,14 @@ class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker) extends KafkaM
* @param clientId ClientId of the given consumer * @param clientId ClientId of the given consumer
*/ */
class FetchRequestAndResponseStats(clientId: String) { class FetchRequestAndResponseStats(clientId: String) {
private val valueFactory = (k: ClientIdAndBroker) => new FetchRequestAndResponseMetrics(k) private val valueFactory = (k: ClientIdBroker) => new FetchRequestAndResponseMetrics(k)
private val stats = new Pool[ClientIdAndBroker, FetchRequestAndResponseMetrics](Some(valueFactory)) private val stats = new Pool[ClientIdBroker, FetchRequestAndResponseMetrics](Some(valueFactory))
private val allBrokersStats = new FetchRequestAndResponseMetrics(new ClientIdAndBroker(clientId, "AllBrokers")) private val allBrokersStats = new FetchRequestAndResponseMetrics(new ClientIdAllBrokers(clientId))
def getFetchRequestAndResponseAllBrokersStats(): FetchRequestAndResponseMetrics = allBrokersStats def getFetchRequestAndResponseAllBrokersStats(): FetchRequestAndResponseMetrics = allBrokersStats
def getFetchRequestAndResponseStats(brokerInfo: String): FetchRequestAndResponseMetrics = { def getFetchRequestAndResponseStats(brokerHost: String, brokerPort: Int): FetchRequestAndResponseMetrics = {
stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo + "-")) stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerHost, brokerPort))
} }
} }
@ -56,7 +64,7 @@ object FetchRequestAndResponseStatsRegistry {
} }
def removeConsumerFetchRequestAndResponseStats(clientId: String) { def removeConsumerFetchRequestAndResponseStats(clientId: String) {
val pattern = (clientId + "-ConsumerFetcherThread.*").r val pattern = (".*" + clientId + ".*").r
val keys = globalStats.keys val keys = globalStats.keys
for (key <- keys) { for (key <- keys) {
pattern.findFirstIn(key) match { pattern.findFirstIn(key) match {

View File

@ -36,7 +36,6 @@ class SimpleConsumer(val host: String,
ConsumerConfig.validateClientId(clientId) ConsumerConfig.validateClientId(clientId)
private val lock = new Object() private val lock = new Object()
private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout) private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout)
val brokerInfo = "host_%s-port_%s".format(host, port)
private val fetchRequestAndResponseStats = FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId) private val fetchRequestAndResponseStats = FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId)
private var isClosed = false private var isClosed = false
@ -106,7 +105,7 @@ class SimpleConsumer(val host: String,
*/ */
def fetch(request: FetchRequest): FetchResponse = { def fetch(request: FetchRequest): FetchResponse = {
var response: Receive = null var response: Receive = null
val specificTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseStats(brokerInfo).requestTimer val specificTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestTimer
val aggregateTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestTimer val aggregateTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestTimer
aggregateTimer.time { aggregateTimer.time {
specificTimer.time { specificTimer.time {
@ -115,7 +114,7 @@ class SimpleConsumer(val host: String,
} }
val fetchResponse = FetchResponse.readFrom(response.buffer) val fetchResponse = FetchResponse.readFrom(response.buffer)
val fetchedSize = fetchResponse.sizeInBytes val fetchedSize = fetchResponse.sizeInBytes
fetchRequestAndResponseStats.getFetchRequestAndResponseStats(brokerInfo).requestSizeHist.update(fetchedSize) fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestSizeHist.update(fetchedSize)
fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestSizeHist.update(fetchedSize) fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestSizeHist.update(fetchedSize)
fetchResponse fetchResponse
} }

View File

@ -104,9 +104,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
// useful for tracking migration of consumers to store offsets in kafka // useful for tracking migration of consumers to store offsets in kafka
private val kafkaCommitMeter = newMeter(config.clientId + "-KafkaCommitsPerSec", "commits", TimeUnit.SECONDS) private val kafkaCommitMeter = newMeter("KafkaCommitsPerSec", "commits", TimeUnit.SECONDS, Map("clientId" -> config.clientId))
private val zkCommitMeter = newMeter(config.clientId + "-ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS) private val zkCommitMeter = newMeter("ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS, Map("clientId" -> config.clientId))
private val rebalanceTimer = new KafkaTimer(newTimer(config.clientId + "-RebalanceRateAndTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) private val rebalanceTimer = new KafkaTimer(newTimer("RebalanceRateAndTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, Map("clientId" -> config.clientId)))
val consumerIdString = { val consumerIdString = {
var consumerUuid : String = null var consumerUuid : String = null
@ -138,6 +138,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
} }
KafkaMetricsReporter.startReporters(config.props) KafkaMetricsReporter.startReporters(config.props)
AppInfo.registerInfo()
def this(config: ConsumerConfig) = this(config, true) def this(config: ConsumerConfig) = this(config, true)
@ -521,14 +522,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private var isWatcherTriggered = false private var isWatcherTriggered = false
private val lock = new ReentrantLock private val lock = new ReentrantLock
private val cond = lock.newCondition() private val cond = lock.newCondition()
@volatile private var allTopicsOwnedPartitionsCount = 0
newGauge(config.clientId + "-" + config.groupId + "-AllTopicsOwnedPartitionsCount", new Gauge[Int] {
def value() = allTopicsOwnedPartitionsCount
})
private def ownedPartitionsCountMetricName(topic: String) = @volatile private var allTopicsOwnedPartitionsCount = 0
"%s-%s-%s-OwnedPartitionsCount".format(config.clientId, config.groupId, topic) newGauge("OwnedPartitionsCount",
new Gauge[Int] {
def value() = allTopicsOwnedPartitionsCount
},
Map("clientId" -> config.clientId, "groupId" -> config.groupId))
private def ownedPartitionsCountMetricTags(topic: String) = Map("clientId" -> config.clientId, "groupId" -> config.groupId, "topic" -> topic)
private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") { private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") {
override def run() { override def run() {
@ -581,7 +583,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
for(partition <- infos.keys) { for(partition <- infos.keys) {
deletePartitionOwnershipFromZK(topic, partition) deletePartitionOwnershipFromZK(topic, partition)
} }
removeMetric(ownedPartitionsCountMetricName(topic)) removeMetric("OwnedPartitionsCount", ownedPartitionsCountMetricTags(topic))
localTopicRegistry.remove(topic) localTopicRegistry.remove(topic)
} }
allTopicsOwnedPartitionsCount = 0 allTopicsOwnedPartitionsCount = 0
@ -684,9 +686,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
partitionOwnershipDecision.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic } partitionOwnershipDecision.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic }
.foreach { case (topic, partitionThreadPairs) => .foreach { case (topic, partitionThreadPairs) =>
newGauge(ownedPartitionsCountMetricName(topic), new Gauge[Int] { newGauge("OwnedPartitionsCount",
def value() = partitionThreadPairs.size new Gauge[Int] {
}) def value() = partitionThreadPairs.size
},
ownedPartitionsCountMetricTags(topic))
} }
topicRegistry = currentTopicRegistry topicRegistry = currentTopicRegistry
@ -868,10 +872,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
topicThreadIdAndQueues.put(topicThreadId, q) topicThreadIdAndQueues.put(topicThreadId, q)
debug("Adding topicThreadId %s and queue %s to topicThreadIdAndQueues data structure".format(topicThreadId, q.toString)) debug("Adding topicThreadId %s and queue %s to topicThreadIdAndQueues data structure".format(topicThreadId, q.toString))
newGauge( newGauge(
config.clientId + "-" + config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize", "FetchQueueSize",
new Gauge[Int] { new Gauge[Int] {
def value = q.size def value = q.size
} },
Map("clientId" -> config.clientId,
"topic" -> topicThreadId._1,
"threadId" -> topicThreadId._2.threadId.toString)
) )
}) })

View File

@ -88,17 +88,31 @@ class Log(val dir: File,
info("Completed load of log %s with log end offset %d".format(name, logEndOffset)) info("Completed load of log %s with log end offset %d".format(name, logEndOffset))
newGauge(name + "-" + "NumLogSegments", val tags = Map("topic" -> topicAndPartition.topic, "partition" -> topicAndPartition.partition.toString)
new Gauge[Int] { def value = numberOfSegments })
newGauge(name + "-" + "LogStartOffset", newGauge("NumLogSegments",
new Gauge[Long] { def value = logStartOffset }) new Gauge[Int] {
def value = numberOfSegments
},
tags)
newGauge(name + "-" + "LogEndOffset", newGauge("LogStartOffset",
new Gauge[Long] { def value = logEndOffset }) new Gauge[Long] {
def value = logStartOffset
newGauge(name + "-" + "Size", },
new Gauge[Long] {def value = size}) tags)
newGauge("LogEndOffset",
new Gauge[Long] {
def value = logEndOffset
},
tags)
newGauge("Size",
new Gauge[Long] {
def value = size
},
tags)
/** The name of this log */ /** The name of this log */
def name = dir.getName() def name = dir.getName()
@ -168,7 +182,7 @@ class Log(val dir: File,
if(logSegments.size == 0) { if(logSegments.size == 0) {
// no existing segments, create a new mutable segment beginning at offset 0 // no existing segments, create a new mutable segment beginning at offset 0
segments.put(0, new LogSegment(dir = dir, segments.put(0L, new LogSegment(dir = dir,
startOffset = 0, startOffset = 0,
indexIntervalBytes = config.indexInterval, indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize, maxIndexSize = config.maxIndexSize,

View File

@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with * (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
@ -35,29 +35,52 @@ trait KafkaMetricsGroup extends Logging {
* Creates a new MetricName object for gauges, meters, etc. created for this * Creates a new MetricName object for gauges, meters, etc. created for this
* metrics group. * metrics group.
* @param name Descriptive name of the metric. * @param name Descriptive name of the metric.
* @param tags Additional attributes which mBean will have.
* @return Sanitized metric name object. * @return Sanitized metric name object.
*/ */
private def metricName(name: String) = { private def metricName(name: String, tags: scala.collection.Map[String, String] = Map.empty) = {
val klass = this.getClass val klass = this.getClass
val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName
val simpleName = klass.getSimpleName.replaceAll("\\$$", "") val simpleName = klass.getSimpleName.replaceAll("\\$$", "")
new MetricName(pkg, simpleName, name)
explicitMetricName(pkg, simpleName, name, tags)
} }
def newGauge[T](name: String, metric: Gauge[T]) =
Metrics.defaultRegistry().newGauge(metricName(name), metric)
def newMeter(name: String, eventType: String, timeUnit: TimeUnit) = private def explicitMetricName(group: String, typeName: String, name: String, tags: scala.collection.Map[String, String] = Map.empty) = {
Metrics.defaultRegistry().newMeter(metricName(name), eventType, timeUnit) val nameBuilder: StringBuilder = new StringBuilder
def newHistogram(name: String, biased: Boolean = true) = nameBuilder.append(group)
Metrics.defaultRegistry().newHistogram(metricName(name), biased)
def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit) = nameBuilder.append(":type=")
Metrics.defaultRegistry().newTimer(metricName(name), durationUnit, rateUnit)
nameBuilder.append(typeName)
if (name.length > 0) {
nameBuilder.append(",name=")
nameBuilder.append(name)
}
KafkaMetricsGroup.toMBeanName(tags).map(mbeanName => nameBuilder.append(",").append(mbeanName))
new MetricName(group, typeName, name, null, nameBuilder.toString())
}
def newGauge[T](name: String, metric: Gauge[T], tags: scala.collection.Map[String, String] = Map.empty) =
Metrics.defaultRegistry().newGauge(metricName(name, tags), metric)
def newMeter(name: String, eventType: String, timeUnit: TimeUnit, tags: scala.collection.Map[String, String] = Map.empty) =
Metrics.defaultRegistry().newMeter(metricName(name, tags), eventType, timeUnit)
def newHistogram(name: String, biased: Boolean = true, tags: scala.collection.Map[String, String] = Map.empty) =
Metrics.defaultRegistry().newHistogram(metricName(name, tags), biased)
def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit, tags: scala.collection.Map[String, String] = Map.empty) =
Metrics.defaultRegistry().newTimer(metricName(name, tags), durationUnit, rateUnit)
def removeMetric(name: String, tags: scala.collection.Map[String, String] = Map.empty) =
Metrics.defaultRegistry().removeMetric(metricName(name, tags))
def removeMetric(name: String) =
Metrics.defaultRegistry().removeMetric(metricName(name))
} }
@ -68,72 +91,75 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
*/ */
private val consumerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName]( private val consumerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName](
// kafka.consumer.ZookeeperConsumerConnector // kafka.consumer.ZookeeperConsumerConnector
new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-FetchQueueSize"), new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "FetchQueueSize"),
new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-KafkaCommitsPerSec"), new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "KafkaCommitsPerSec"),
new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-ZooKeeperCommitsPerSec"), new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "ZooKeeperCommitsPerSec"),
new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-RebalanceRateAndTime"), new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "RebalanceRateAndTime"),
new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-OwnedPartitionsCount"), new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "OwnedPartitionsCount"),
new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "AllTopicsOwnedPartitionsCount"),
// kafka.consumer.ConsumerFetcherManager // kafka.consumer.ConsumerFetcherManager
new MetricName("kafka.consumer", "ConsumerFetcherManager", "-MaxLag"), new MetricName("kafka.consumer", "ConsumerFetcherManager", "MaxLag"),
new MetricName("kafka.consumer", "ConsumerFetcherManager", "-MinFetchRate"), new MetricName("kafka.consumer", "ConsumerFetcherManager", "MinFetchRate"),
// kafka.server.AbstractFetcherThread <-- kafka.consumer.ConsumerFetcherThread // kafka.server.AbstractFetcherThread <-- kafka.consumer.ConsumerFetcherThread
new MetricName("kafka.server", "FetcherLagMetrics", "-ConsumerLag"), new MetricName("kafka.server", "FetcherLagMetrics", "ConsumerLag"),
// kafka.consumer.ConsumerTopicStats <-- kafka.consumer.{ConsumerIterator, PartitionTopicInfo} // kafka.consumer.ConsumerTopicStats <-- kafka.consumer.{ConsumerIterator, PartitionTopicInfo}
new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-MessagesPerSec"), new MetricName("kafka.consumer", "ConsumerTopicMetrics", "MessagesPerSec"),
new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-AllTopicsMessagesPerSec"),
// kafka.consumer.ConsumerTopicStats // kafka.consumer.ConsumerTopicStats
new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-BytesPerSec"), new MetricName("kafka.consumer", "ConsumerTopicMetrics", "BytesPerSec"),
new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-AllTopicsBytesPerSec"),
// kafka.server.AbstractFetcherThread <-- kafka.consumer.ConsumerFetcherThread // kafka.server.AbstractFetcherThread <-- kafka.consumer.ConsumerFetcherThread
new MetricName("kafka.server", "FetcherStats", "-BytesPerSec"), new MetricName("kafka.server", "FetcherStats", "BytesPerSec"),
new MetricName("kafka.server", "FetcherStats", "-RequestsPerSec"), new MetricName("kafka.server", "FetcherStats", "RequestsPerSec"),
// kafka.consumer.FetchRequestAndResponseStats <-- kafka.consumer.SimpleConsumer // kafka.consumer.FetchRequestAndResponseStats <-- kafka.consumer.SimpleConsumer
new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-FetchResponseSize"), new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "FetchResponseSize"),
new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-FetchRequestRateAndTimeMs"), new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "FetchRequestRateAndTimeMs"),
new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-AllBrokersFetchResponseSize"),
new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-AllBrokersFetchRequestRateAndTimeMs"),
/** /**
* ProducerRequestStats <-- SyncProducer * ProducerRequestStats <-- SyncProducer
* metric for SyncProducer in fetchTopicMetaData() needs to be removed when consumer is closed. * metric for SyncProducer in fetchTopicMetaData() needs to be removed when consumer is closed.
*/ */
new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestRateAndTimeMs"), new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestRateAndTimeMs"),
new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestSize"), new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestSize")
new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestRateAndTimeMs"),
new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestSize")
) )
private val producerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName] ( private val producerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName](
// kafka.producer.ProducerStats <-- DefaultEventHandler <-- Producer // kafka.producer.ProducerStats <-- DefaultEventHandler <-- Producer
new MetricName("kafka.producer", "ProducerStats", "-SerializationErrorsPerSec"), new MetricName("kafka.producer", "ProducerStats", "SerializationErrorsPerSec"),
new MetricName("kafka.producer", "ProducerStats", "-ResendsPerSec"), new MetricName("kafka.producer", "ProducerStats", "ResendsPerSec"),
new MetricName("kafka.producer", "ProducerStats", "-FailedSendsPerSec"), new MetricName("kafka.producer", "ProducerStats", "FailedSendsPerSec"),
// kafka.producer.ProducerSendThread // kafka.producer.ProducerSendThread
new MetricName("kafka.producer.async", "ProducerSendThread", "-ProducerQueueSize"), new MetricName("kafka.producer.async", "ProducerSendThread", "ProducerQueueSize"),
// kafka.producer.ProducerTopicStats <-- kafka.producer.{Producer, async.DefaultEventHandler} // kafka.producer.ProducerTopicStats <-- kafka.producer.{Producer, async.DefaultEventHandler}
new MetricName("kafka.producer", "ProducerTopicMetrics", "-MessagesPerSec"), new MetricName("kafka.producer", "ProducerTopicMetrics", "MessagesPerSec"),
new MetricName("kafka.producer", "ProducerTopicMetrics", "-DroppedMessagesPerSec"), new MetricName("kafka.producer", "ProducerTopicMetrics", "DroppedMessagesPerSec"),
new MetricName("kafka.producer", "ProducerTopicMetrics", "-BytesPerSec"), new MetricName("kafka.producer", "ProducerTopicMetrics", "BytesPerSec"),
new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsMessagesPerSec"),
new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsDroppedMessagesPerSec"),
new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsBytesPerSec"),
// kafka.producer.ProducerRequestStats <-- SyncProducer // kafka.producer.ProducerRequestStats <-- SyncProducer
new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestRateAndTimeMs"), new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestRateAndTimeMs"),
new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestSize"), new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestSize")
new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestRateAndTimeMs"),
new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestSize")
) )
private def toMBeanName(tags: collection.Map[String, String]): Option[String] = {
val filteredTags = tags
.filter { case (tagKey, tagValue) => tagValue != ""}
if (filteredTags.nonEmpty) {
val tagsString = filteredTags
.map { case (key, value) => "%s=%s".format(key, value)}
.mkString(",")
Some(tagsString)
}
else {
None
}
}
def removeAllConsumerMetrics(clientId: String) { def removeAllConsumerMetrics(clientId: String) {
FetchRequestAndResponseStatsRegistry.removeConsumerFetchRequestAndResponseStats(clientId) FetchRequestAndResponseStatsRegistry.removeConsumerFetchRequestAndResponseStats(clientId)
ConsumerTopicStatsRegistry.removeConsumerTopicStat(clientId) ConsumerTopicStatsRegistry.removeConsumerTopicStat(clientId)
@ -150,18 +176,19 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging {
private def removeAllMetricsInList(metricNameList: immutable.List[MetricName], clientId: String) { private def removeAllMetricsInList(metricNameList: immutable.List[MetricName], clientId: String) {
metricNameList.foreach(metric => { metricNameList.foreach(metric => {
val pattern = (clientId + ".*" + metric.getName +".*").r val pattern = (".*clientId=" + clientId + ".*").r
val registeredMetrics = scala.collection.JavaConversions.asScalaSet(Metrics.defaultRegistry().allMetrics().keySet()) val registeredMetrics = scala.collection.JavaConversions.asScalaSet(Metrics.defaultRegistry().allMetrics().keySet())
for (registeredMetric <- registeredMetrics) { for (registeredMetric <- registeredMetrics) {
if (registeredMetric.getGroup == metric.getGroup && if (registeredMetric.getGroup == metric.getGroup &&
registeredMetric.getName == metric.getName &&
registeredMetric.getType == metric.getType) { registeredMetric.getType == metric.getType) {
pattern.findFirstIn(registeredMetric.getName) match { pattern.findFirstIn(registeredMetric.getMBeanName) match {
case Some(_) => { case Some(_) => {
val beforeRemovalSize = Metrics.defaultRegistry().allMetrics().keySet().size val beforeRemovalSize = Metrics.defaultRegistry().allMetrics().keySet().size
Metrics.defaultRegistry().removeMetric(registeredMetric) Metrics.defaultRegistry().removeMetric(registeredMetric)
val afterRemovalSize = Metrics.defaultRegistry().allMetrics().keySet().size val afterRemovalSize = Metrics.defaultRegistry().allMetrics().keySet().size
trace("Removing metric %s. Metrics registry size reduced from %d to %d".format( trace("Removing metric %s. Metrics registry size reduced from %d to %d".format(
registeredMetric, beforeRemovalSize, afterRemovalSize)) registeredMetric, beforeRemovalSize, afterRemovalSize))
} }
case _ => case _ =>
} }

View File

@ -125,12 +125,12 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
def value = responseQueues.foldLeft(0) {(total, q) => total + q.size()} def value = responseQueues.foldLeft(0) {(total, q) => total + q.size()}
}) })
for(i <- 0 until numProcessors) { for (i <- 0 until numProcessors) {
newGauge( newGauge("ResponseQueueSize",
"Processor-" + i + "-ResponseQueueSize",
new Gauge[Int] { new Gauge[Int] {
def value = responseQueues(i).size() def value = responseQueues(i).size()
} },
Map("processor" -> i.toString)
) )
} }
@ -187,24 +187,25 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
object RequestMetrics { object RequestMetrics {
val metricsMap = new scala.collection.mutable.HashMap[String, RequestMetrics] val metricsMap = new scala.collection.mutable.HashMap[String, RequestMetrics]
val consumerFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Consumer" val consumerFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "Consumer"
val followFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Follower" val followFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "Follower"
(RequestKeys.keyToNameAndDeserializerMap.values.map(e => e._1) (RequestKeys.keyToNameAndDeserializerMap.values.map(e => e._1)
++ List(consumerFetchMetricName, followFetchMetricName)).foreach(name => metricsMap.put(name, new RequestMetrics(name))) ++ List(consumerFetchMetricName, followFetchMetricName)).foreach(name => metricsMap.put(name, new RequestMetrics(name)))
} }
class RequestMetrics(name: String) extends KafkaMetricsGroup { class RequestMetrics(name: String) extends KafkaMetricsGroup {
val requestRate = newMeter(name + "-RequestsPerSec", "requests", TimeUnit.SECONDS) val tags = Map("request" -> name)
val requestRate = newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags)
// time a request spent in a request queue // time a request spent in a request queue
val requestQueueTimeHist = newHistogram(name + "-RequestQueueTimeMs") val requestQueueTimeHist = newHistogram("RequestQueueTimeMs", biased = true, tags)
// time a request takes to be processed at the local broker // time a request takes to be processed at the local broker
val localTimeHist = newHistogram(name + "-LocalTimeMs") val localTimeHist = newHistogram("LocalTimeMs", biased = true, tags)
// time a request takes to wait on remote brokers (only relevant to fetch and produce requests) // time a request takes to wait on remote brokers (only relevant to fetch and produce requests)
val remoteTimeHist = newHistogram(name + "-RemoteTimeMs") val remoteTimeHist = newHistogram("RemoteTimeMs", biased = true, tags)
// time a response spent in a response queue // time a response spent in a response queue
val responseQueueTimeHist = newHistogram(name + "-ResponseQueueTimeMs") val responseQueueTimeHist = newHistogram("ResponseQueueTimeMs", biased = true, tags)
// time to send the response to the requester // time to send the response to the requester
val responseSendTimeHist = newHistogram(name + "-ResponseSendTimeMs") val responseSendTimeHist = newHistogram("ResponseSendTimeMs", biased = true, tags)
val totalTimeHist = newHistogram(name + "-TotalTimeMs") val totalTimeHist = newHistogram("TotalTimeMs", biased = true, tags)
} }

View File

@ -67,7 +67,7 @@ class SocketServer(val brokerId: Int,
time, time,
maxRequestSize, maxRequestSize,
aggregateIdleMeter, aggregateIdleMeter,
newMeter("NetworkProcessor-" + i + "-IdlePercent", "percent", TimeUnit.NANOSECONDS), newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)),
numProcessorThreads, numProcessorThreads,
requestChannel, requestChannel,
quotas, quotas,

View File

@ -19,7 +19,7 @@ package kafka.producer
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
import kafka.common.QueueFullException import kafka.common.{AppInfo, QueueFullException}
import kafka.metrics._ import kafka.metrics._
import kafka.producer.async.{DefaultEventHandler, EventHandler, ProducerSendThread} import kafka.producer.async.{DefaultEventHandler, EventHandler, ProducerSendThread}
import kafka.serializer.Encoder import kafka.serializer.Encoder
@ -53,6 +53,7 @@ class Producer[K,V](val config: ProducerConfig,
private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId) private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)
KafkaMetricsReporter.startReporters(config.props) KafkaMetricsReporter.startReporters(config.props)
AppInfo.registerInfo()
def this(config: ProducerConfig) = def this(config: ProducerConfig) =
this(config, this(config,

View File

@ -19,11 +19,16 @@ package kafka.producer
import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import kafka.utils.Pool import kafka.utils.Pool
import kafka.common.ClientIdAndBroker import kafka.common.{ClientIdAllBrokers, ClientIdBroker, ClientIdAndBroker}
class ProducerRequestMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup { class ProducerRequestMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup {
val requestTimer = new KafkaTimer(newTimer(metricId + "ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) val tags = metricId match {
val requestSizeHist = newHistogram(metricId + "ProducerRequestSize") case ClientIdAndBroker(clientId, brokerHost, brokerPort) => Map("clientId" -> clientId, "brokerHost" -> brokerHost, "brokerPort" -> brokerPort.toString)
case ClientIdAllBrokers(clientId) => Map("clientId" -> clientId)
}
val requestTimer = new KafkaTimer(newTimer("ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, tags))
val requestSizeHist = newHistogram("ProducerRequestSize", biased = true, tags)
} }
/** /**
@ -31,14 +36,14 @@ class ProducerRequestMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGr
* @param clientId ClientId of the given producer * @param clientId ClientId of the given producer
*/ */
class ProducerRequestStats(clientId: String) { class ProducerRequestStats(clientId: String) {
private val valueFactory = (k: ClientIdAndBroker) => new ProducerRequestMetrics(k) private val valueFactory = (k: ClientIdBroker) => new ProducerRequestMetrics(k)
private val stats = new Pool[ClientIdAndBroker, ProducerRequestMetrics](Some(valueFactory)) private val stats = new Pool[ClientIdBroker, ProducerRequestMetrics](Some(valueFactory))
private val allBrokersStats = new ProducerRequestMetrics(new ClientIdAndBroker(clientId, "AllBrokers")) private val allBrokersStats = new ProducerRequestMetrics(new ClientIdAllBrokers(clientId))
def getProducerRequestAllBrokersStats(): ProducerRequestMetrics = allBrokersStats def getProducerRequestAllBrokersStats(): ProducerRequestMetrics = allBrokersStats
def getProducerRequestStats(brokerInfo: String): ProducerRequestMetrics = { def getProducerRequestStats(brokerHost: String, brokerPort: Int): ProducerRequestMetrics = {
stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo + "-")) stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerHost, brokerPort))
} }
} }

View File

@ -21,9 +21,10 @@ import java.util.concurrent.TimeUnit
import kafka.utils.Pool import kafka.utils.Pool
class ProducerStats(clientId: String) extends KafkaMetricsGroup { class ProducerStats(clientId: String) extends KafkaMetricsGroup {
val serializationErrorRate = newMeter(clientId + "-SerializationErrorsPerSec", "errors", TimeUnit.SECONDS) val tags: Map[String, String] = Map("clientId" -> clientId)
val resendRate = newMeter(clientId + "-ResendsPerSec", "resends", TimeUnit.SECONDS) val serializationErrorRate = newMeter("SerializationErrorsPerSec", "errors", TimeUnit.SECONDS, tags)
val failedSendRate = newMeter(clientId + "-FailedSendsPerSec", "failed sends", TimeUnit.SECONDS) val resendRate = newMeter("ResendsPerSec", "resends", TimeUnit.SECONDS, tags)
val failedSendRate = newMeter("FailedSendsPerSec", "failed sends", TimeUnit.SECONDS, tags)
} }
/** /**

View File

@ -17,16 +17,21 @@
package kafka.producer package kafka.producer
import kafka.metrics.KafkaMetricsGroup import kafka.metrics.KafkaMetricsGroup
import kafka.common.ClientIdAndTopic import kafka.common.{ClientIdTopic, ClientIdAllTopics, ClientIdAndTopic}
import kafka.utils.{Pool, threadsafe} import kafka.utils.{Pool, threadsafe}
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
@threadsafe @threadsafe
class ProducerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup { class ProducerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup {
val messageRate = newMeter(metricId + "MessagesPerSec", "messages", TimeUnit.SECONDS) val tags = metricId match {
val byteRate = newMeter(metricId + "BytesPerSec", "bytes", TimeUnit.SECONDS) case ClientIdAndTopic(clientId, topic) => Map("clientId" -> clientId, "topic" -> topic)
val droppedMessageRate = newMeter(metricId + "DroppedMessagesPerSec", "drops", TimeUnit.SECONDS) case ClientIdAllTopics(clientId) => Map("clientId" -> clientId)
}
val messageRate = newMeter("MessagesPerSec", "messages", TimeUnit.SECONDS, tags)
val byteRate = newMeter("BytesPerSec", "bytes", TimeUnit.SECONDS, tags)
val droppedMessageRate = newMeter("DroppedMessagesPerSec", "drops", TimeUnit.SECONDS, tags)
} }
/** /**
@ -34,14 +39,14 @@ class ProducerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup
* @param clientId The clientId of the given producer client. * @param clientId The clientId of the given producer client.
*/ */
class ProducerTopicStats(clientId: String) { class ProducerTopicStats(clientId: String) {
private val valueFactory = (k: ClientIdAndTopic) => new ProducerTopicMetrics(k) private val valueFactory = (k: ClientIdTopic) => new ProducerTopicMetrics(k)
private val stats = new Pool[ClientIdAndTopic, ProducerTopicMetrics](Some(valueFactory)) private val stats = new Pool[ClientIdTopic, ProducerTopicMetrics](Some(valueFactory))
private val allTopicsStats = new ProducerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics")) // to differentiate from a topic named AllTopics private val allTopicsStats = new ProducerTopicMetrics(new ClientIdAllTopics(clientId)) // to differentiate from a topic named AllTopics
def getProducerAllTopicsStats(): ProducerTopicMetrics = allTopicsStats def getProducerAllTopicsStats(): ProducerTopicMetrics = allTopicsStats
def getProducerTopicStats(topic: String): ProducerTopicMetrics = { def getProducerTopicStats(topic: String): ProducerTopicMetrics = {
stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic + "-")) stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic))
} }
} }

View File

@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0 * The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with * (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
@ -39,7 +39,6 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
@volatile private var shutdown: Boolean = false @volatile private var shutdown: Boolean = false
private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize, private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize,
config.sendBufferBytes, config.requestTimeoutMs) config.sendBufferBytes, config.requestTimeoutMs)
val brokerInfo = "host_%s-port_%s".format(config.host, config.port)
val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId) val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId)
trace("Instantiating Scala Sync Producer with properties: %s".format(config.props)) trace("Instantiating Scala Sync Producer with properties: %s".format(config.props))
@ -93,11 +92,11 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
*/ */
def send(producerRequest: ProducerRequest): ProducerResponse = { def send(producerRequest: ProducerRequest): ProducerResponse = {
val requestSize = producerRequest.sizeInBytes val requestSize = producerRequest.sizeInBytes
producerRequestStats.getProducerRequestStats(brokerInfo).requestSizeHist.update(requestSize) producerRequestStats.getProducerRequestStats(config.host, config.port).requestSizeHist.update(requestSize)
producerRequestStats.getProducerRequestAllBrokersStats.requestSizeHist.update(requestSize) producerRequestStats.getProducerRequestAllBrokersStats.requestSizeHist.update(requestSize)
var response: Receive = null var response: Receive = null
val specificTimer = producerRequestStats.getProducerRequestStats(brokerInfo).requestTimer val specificTimer = producerRequestStats.getProducerRequestStats(config.host, config.port).requestTimer
val aggregateTimer = producerRequestStats.getProducerRequestAllBrokersStats.requestTimer val aggregateTimer = producerRequestStats.getProducerRequestAllBrokersStats.requestTimer
aggregateTimer.time { aggregateTimer.time {
specificTimer.time { specificTimer.time {
@ -134,7 +133,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
case e: Exception => error("Error on disconnect: ", e) case e: Exception => error("Error on disconnect: ", e)
} }
} }
private def connect(): BlockingChannel = { private def connect(): BlockingChannel = {
if (!blockingChannel.isConnected && !shutdown) { if (!blockingChannel.isConnected && !shutdown) {
try { try {
@ -156,5 +155,4 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
connect() connect()
} }
} }
} }

View File

@ -34,10 +34,11 @@ class ProducerSendThread[K,V](val threadName: String,
private val shutdownLatch = new CountDownLatch(1) private val shutdownLatch = new CountDownLatch(1)
private val shutdownCommand = new KeyedMessage[K,V]("shutdown", null.asInstanceOf[K], null.asInstanceOf[V]) private val shutdownCommand = new KeyedMessage[K,V]("shutdown", null.asInstanceOf[K], null.asInstanceOf[V])
newGauge(clientId + "-ProducerQueueSize", newGauge("ProducerQueueSize",
new Gauge[Int] { new Gauge[Int] {
def value = queue.size def value = queue.size
}) },
Map("clientId" -> clientId))
override def run { override def run {
try { try {

View File

@ -26,7 +26,7 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.common.TopicAndPartition import kafka.common.TopicAndPartition
import com.yammer.metrics.core.Gauge import com.yammer.metrics.core.Gauge
abstract class AbstractFetcherManager(protected val name: String, metricPrefix: String, numFetchers: Int = 1) abstract class AbstractFetcherManager(protected val name: String, clientId: String, numFetchers: Int = 1)
extends Logging with KafkaMetricsGroup { extends Logging with KafkaMetricsGroup {
// map of (source broker_id, fetcher_id per source broker) => fetcher // map of (source broker_id, fetcher_id per source broker) => fetcher
private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread] private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread]
@ -34,7 +34,7 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix:
this.logIdent = "[" + name + "] " this.logIdent = "[" + name + "] "
newGauge( newGauge(
metricPrefix + "-MaxLag", "MaxLag",
new Gauge[Long] { new Gauge[Long] {
// current max lag across all fetchers/topics/partitions // current max lag across all fetchers/topics/partitions
def value = fetcherThreadMap.foldLeft(0L)((curMaxAll, fetcherThreadMapEntry) => { def value = fetcherThreadMap.foldLeft(0L)((curMaxAll, fetcherThreadMapEntry) => {
@ -42,24 +42,25 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix:
curMaxThread.max(fetcherLagStatsEntry._2.lag) curMaxThread.max(fetcherLagStatsEntry._2.lag)
}).max(curMaxAll) }).max(curMaxAll)
}) })
} },
Map("clientId" -> clientId)
) )
newGauge( newGauge(
metricPrefix + "-MinFetchRate", "MinFetchRate", {
{ new Gauge[Double] {
new Gauge[Double] { // current min fetch rate across all fetchers/topics/partitions
// current min fetch rate across all fetchers/topics/partitions def value = {
def value = { val headRate: Double =
val headRate: Double = fetcherThreadMap.headOption.map(_._2.fetcherStats.requestRate.oneMinuteRate).getOrElse(0)
fetcherThreadMap.headOption.map(_._2.fetcherStats.requestRate.oneMinuteRate).getOrElse(0)
fetcherThreadMap.foldLeft(headRate)((curMinAll, fetcherThreadMapEntry) => { fetcherThreadMap.foldLeft(headRate)((curMinAll, fetcherThreadMapEntry) => {
fetcherThreadMapEntry._2.fetcherStats.requestRate.oneMinuteRate.min(curMinAll) fetcherThreadMapEntry._2.fetcherStats.requestRate.oneMinuteRate.min(curMinAll)
}) })
}
} }
} }
},
Map("clientId" -> clientId)
) )
private def getFetcherId(topic: String, partitionId: Int) : Int = { private def getFetcherId(topic: String, partitionId: Int) : Int = {

View File

@ -26,9 +26,7 @@ import kafka.utils.Utils.inLock
import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset} import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset}
import kafka.metrics.KafkaMetricsGroup import kafka.metrics.KafkaMetricsGroup
import scala.collection.mutable import scala.collection.{mutable, Set, Map}
import scala.collection.Set
import scala.collection.Map
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
@ -46,8 +44,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
private val partitionMapLock = new ReentrantLock private val partitionMapLock = new ReentrantLock
private val partitionMapCond = partitionMapLock.newCondition() private val partitionMapCond = partitionMapLock.newCondition()
val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId) val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId)
private val brokerInfo = "host_%s-port_%s".format(sourceBroker.host, sourceBroker.port) private val metricId = new ClientIdAndBroker(clientId, sourceBroker.host, sourceBroker.port)
private val metricId = new ClientIdAndBroker(clientId, brokerInfo)
val fetcherStats = new FetcherStats(metricId) val fetcherStats = new FetcherStats(metricId)
val fetcherLagStats = new FetcherLagStats(metricId) val fetcherLagStats = new FetcherLagStats(metricId)
val fetchRequestBuilder = new FetchRequestBuilder(). val fetchRequestBuilder = new FetchRequestBuilder().
@ -204,13 +201,15 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
} }
} }
class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMetricsGroup { class FetcherLagMetrics(metricId: ClientIdTopicPartition) extends KafkaMetricsGroup {
private[this] val lagVal = new AtomicLong(-1L) private[this] val lagVal = new AtomicLong(-1L)
newGauge( newGauge("ConsumerLag",
metricId + "-ConsumerLag",
new Gauge[Long] { new Gauge[Long] {
def value = lagVal.get def value = lagVal.get
} },
Map("clientId" -> metricId.clientId,
"topic" -> metricId.topic,
"partition" -> metricId.partitionId.toString)
) )
def lag_=(newLag: Long) { def lag_=(newLag: Long) {
@ -221,20 +220,25 @@ class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMet
} }
class FetcherLagStats(metricId: ClientIdAndBroker) { class FetcherLagStats(metricId: ClientIdAndBroker) {
private val valueFactory = (k: ClientIdBrokerTopicPartition) => new FetcherLagMetrics(k) private val valueFactory = (k: ClientIdTopicPartition) => new FetcherLagMetrics(k)
val stats = new Pool[ClientIdBrokerTopicPartition, FetcherLagMetrics](Some(valueFactory)) val stats = new Pool[ClientIdTopicPartition, FetcherLagMetrics](Some(valueFactory))
def getFetcherLagStats(topic: String, partitionId: Int): FetcherLagMetrics = { def getFetcherLagStats(topic: String, partitionId: Int): FetcherLagMetrics = {
stats.getAndMaybePut(new ClientIdBrokerTopicPartition(metricId.clientId, metricId.brokerInfo, topic, partitionId)) stats.getAndMaybePut(new ClientIdTopicPartition(metricId.clientId, topic, partitionId))
} }
} }
class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup { class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
val requestRate = newMeter(metricId + "-RequestsPerSec", "requests", TimeUnit.SECONDS) val tags = Map("clientId" -> metricId.clientId,
val byteRate = newMeter(metricId + "-BytesPerSec", "bytes", TimeUnit.SECONDS) "brokerHost" -> metricId.brokerHost,
"brokerPort" -> metricId.brokerPort.toString)
val requestRate = newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags)
val byteRate = newMeter("BytesPerSec", "bytes", TimeUnit.SECONDS, tags)
} }
case class ClientIdBrokerTopicPartition(clientId: String, brokerInfo: String, topic: String, partitionId: Int) { case class ClientIdTopicPartition(clientId: String, topic: String, partitionId: Int) {
override def toString = "%s-%s-%s-%d".format(clientId, brokerInfo, topic, partitionId) override def toString = "%s-%s-%d".format(clientId, topic, partitionId)
} }

View File

@ -44,7 +44,6 @@ case class FetchMetadata(fetchMinBytes: Int,
"onlyCommitted: " + fetchOnlyCommitted + ", " "onlyCommitted: " + fetchOnlyCommitted + ", "
"partitionStatus: " + fetchPartitionStatus + "]" "partitionStatus: " + fetchPartitionStatus + "]"
} }
/** /**
* A delayed fetch request that can be created by the replica manager and watched * A delayed fetch request that can be created by the replica manager and watched
* in the fetch request purgatory * in the fetch request purgatory

View File

@ -22,7 +22,6 @@ import kafka.api.ProducerResponseStatus
import kafka.common.ErrorMapping import kafka.common.ErrorMapping
import kafka.common.TopicAndPartition import kafka.common.TopicAndPartition
import scala.Some
import scala.collection._ import scala.collection._
case class ProducePartitionStatus(requiredOffset: Long, responseStatus: ProducerResponseStatus) { case class ProducePartitionStatus(requiredOffset: Long, responseStatus: ProducerResponseStatus) {
@ -118,5 +117,4 @@ class DelayedProduce(delayMs: Long,
val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus) val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus)
responseCallback(responseStatus) responseCallback(responseStatus)
} }
} }

View File

@ -1,38 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.common.TopicAndPartition
/**
* Keys used for delayed request metrics recording
*/
trait DelayedRequestKey {
def keyLabel: String
}
object DelayedRequestKey {
val globalLabel = "All"
}
case class TopicPartitionRequestKey(topic: String, partition: Int) extends DelayedRequestKey {
def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition)
override def keyLabel = "%s-%d".format(topic, partition)
}

View File

@ -93,23 +93,28 @@ class KafkaRequestHandlerPool(val brokerId: Int,
} }
} }
class BrokerTopicMetrics(name: String) extends KafkaMetricsGroup { class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup {
val messagesInRate = newMeter(name + "MessagesInPerSec", "messages", TimeUnit.SECONDS) val tags: scala.collection.Map[String, String] = name match {
val bytesInRate = newMeter(name + "BytesInPerSec", "bytes", TimeUnit.SECONDS) case None => scala.collection.Map.empty
val bytesOutRate = newMeter(name + "BytesOutPerSec", "bytes", TimeUnit.SECONDS) case Some(topic) => Map("topic" -> topic)
val bytesRejectedRate = newMeter(name + "BytesRejectedPerSec", "bytes", TimeUnit.SECONDS) }
val failedProduceRequestRate = newMeter(name + "FailedProduceRequestsPerSec", "requests", TimeUnit.SECONDS)
val failedFetchRequestRate = newMeter(name + "FailedFetchRequestsPerSec", "requests", TimeUnit.SECONDS) val messagesInRate = newMeter("MessagesInPerSec", "messages", TimeUnit.SECONDS, tags)
val bytesInRate = newMeter("BytesInPerSec", "bytes", TimeUnit.SECONDS, tags)
val bytesOutRate = newMeter("BytesOutPerSec", "bytes", TimeUnit.SECONDS, tags)
val bytesRejectedRate = newMeter("BytesRejectedPerSec", "bytes", TimeUnit.SECONDS, tags)
val failedProduceRequestRate = newMeter("FailedProduceRequestsPerSec", "requests", TimeUnit.SECONDS, tags)
val failedFetchRequestRate = newMeter("FailedFetchRequestsPerSec", "requests", TimeUnit.SECONDS, tags)
} }
object BrokerTopicStats extends Logging { object BrokerTopicStats extends Logging {
private val valueFactory = (k: String) => new BrokerTopicMetrics(k) private val valueFactory = (k: String) => new BrokerTopicMetrics(Some(k))
private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory)) private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory))
private val allTopicsStats = new BrokerTopicMetrics("AllTopics") private val allTopicsStats = new BrokerTopicMetrics(None)
def getBrokerAllTopicsStats(): BrokerTopicMetrics = allTopicsStats def getBrokerAllTopicsStats(): BrokerTopicMetrics = allTopicsStats
def getBrokerTopicStats(topic: String): BrokerTopicMetrics = { def getBrokerTopicStats(topic: String): BrokerTopicMetrics = {
stats.getAndMaybePut(topic + "-") stats.getAndMaybePut(topic)
} }
} }

View File

@ -25,7 +25,6 @@ import kafka.utils._
import java.util.concurrent._ import java.util.concurrent._
import atomic.{AtomicInteger, AtomicBoolean} import atomic.{AtomicInteger, AtomicBoolean}
import java.io.File import java.io.File
import java.net.BindException
import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.ZkClient
import kafka.controller.{ControllerStats, KafkaController} import kafka.controller.{ControllerStats, KafkaController}
import kafka.cluster.Broker import kafka.cluster.Broker

View File

@ -17,6 +17,7 @@
package kafka.server package kafka.server
import kafka.common.AppInfo
import kafka.utils.Logging import kafka.utils.Logging
@ -26,6 +27,7 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
def startup() { def startup() {
try { try {
server.startup() server.startup()
AppInfo.registerInfo()
} }
catch { catch {
case e: Throwable => case e: Throwable =>

View File

@ -34,7 +34,6 @@ import scala.collection._
import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap
import scala.collection.Map import scala.collection.Map
import scala.collection.Set import scala.collection.Set
import scala.Some
import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.ZkClient
import com.yammer.metrics.core.Gauge import com.yammer.metrics.core.Gauge
@ -124,9 +123,9 @@ class ReplicaManager(val config: KafkaConfig,
* 1. The partition HW has changed (for acks = -1) * 1. The partition HW has changed (for acks = -1)
* 2. A follower replica's fetch operation is received (for acks > 1) * 2. A follower replica's fetch operation is received (for acks > 1)
*/ */
def tryCompleteDelayedProduce(key: DelayedRequestKey) { def tryCompleteDelayedProduce(key: TopicAndPartition) {
val completed = producerRequestPurgatory.checkAndComplete(key) val completed = producerRequestPurgatory.checkAndComplete(key)
debug("Request key %s unblocked %d producer requests.".format(key.keyLabel, completed)) debug("Request key %s unblocked %d producer requests.".format(key, completed))
} }
/** /**
@ -136,9 +135,9 @@ class ReplicaManager(val config: KafkaConfig,
* 1. The partition HW has changed (for regular fetch) * 1. The partition HW has changed (for regular fetch)
* 2. A new message set is appended to the local log (for follower fetch) * 2. A new message set is appended to the local log (for follower fetch)
*/ */
def tryCompleteDelayedFetch(key: DelayedRequestKey) { def tryCompleteDelayedFetch(key: TopicAndPartition) {
val completed = fetchRequestPurgatory.checkAndComplete(key) val completed = fetchRequestPurgatory.checkAndComplete(key)
debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, completed)) debug("Request key %s unblocked %d fetch requests.".format(key, completed))
} }
def startup() { def startup() {
@ -281,7 +280,7 @@ class ReplicaManager(val config: KafkaConfig,
val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback) val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback)
// create a list of (topic, partition) pairs to use as keys for this delayed request // create a list of (topic, partition) pairs to use as keys for this delayed request
val producerRequestKeys = messagesPerPartition.keys.map(new TopicPartitionRequestKey(_)).toSeq val producerRequestKeys = messagesPerPartition.keys.toSeq
// try to complete the request immediately, otherwise put it into the purgatory // try to complete the request immediately, otherwise put it into the purgatory
// this is because while the delayed request is being created, new requests may // this is because while the delayed request is being created, new requests may
@ -384,7 +383,7 @@ class ReplicaManager(val config: KafkaConfig,
val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, responseCallback) val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, responseCallback)
// create a list of (topic, partition) pairs to use as keys for this delayed request // create a list of (topic, partition) pairs to use as keys for this delayed request
val delayedFetchKeys = fetchPartitionStatus.keys.map(new TopicPartitionRequestKey(_)).toSeq val delayedFetchKeys = fetchPartitionStatus.keys.toSeq
// try to complete the request immediately, otherwise put it into the purgatory; // try to complete the request immediately, otherwise put it into the purgatory;
// this is because while the delayed request is being created, new requests may // this is because while the delayed request is being created, new requests may
@ -709,7 +708,7 @@ class ReplicaManager(val config: KafkaConfig,
// for producer requests with ack > 1, we need to check // for producer requests with ack > 1, we need to check
// if they can be unblocked after some follower's log end offsets have moved // if they can be unblocked after some follower's log end offsets have moved
tryCompleteDelayedProduce(new TopicPartitionRequestKey(topicAndPartition)) tryCompleteDelayedProduce(topicAndPartition)
case None => case None =>
warn("While recording the replica LEO, the partition %s hasn't been created.".format(topicAndPartition)) warn("While recording the replica LEO, the partition %s hasn't been created.".format(topicAndPartition))
} }

View File

@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicLong
import java.util._ import java.util._
import java.text.SimpleDateFormat import java.text.SimpleDateFormat
import java.math.BigInteger import java.math.BigInteger
import scala.collection.immutable.List
import org.apache.log4j.Logger import org.apache.log4j.Logger

View File

@ -26,8 +26,7 @@ import kafka.message._
import kafka.serializer._ import kafka.serializer._
import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.ZkClient
import kafka.utils._ import kafka.utils._
import kafka.producer.{ProducerConfig, KeyedMessage, Producer} import java.util.Collections
import java.util.{Collections, Properties}
import org.apache.log4j.{Logger, Level} import org.apache.log4j.{Logger, Level}
import kafka.utils.TestUtils._ import kafka.utils.TestUtils._
import kafka.common.MessageStreamsExistException import kafka.common.MessageStreamsExistException
@ -89,8 +88,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
zkConsumerConnector0.shutdown zkConsumerConnector0.shutdown
// send some messages to each broker // send some messages to each broker
val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) ++ val sentMessages1 = sendMessagesToPartition(configs, topic, 0, nMessages) ++
sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages) sendMessagesToPartition(configs, topic, 1, nMessages)
// wait to make sure the topic and partition have a leader for the successful case // wait to make sure the topic and partition have a leader for the successful case
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
@ -123,8 +122,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true) val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
// send some messages to each broker // send some messages to each broker
val sentMessages2 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) ++ val sentMessages2 = sendMessagesToPartition(configs, topic, 0, nMessages) ++
sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages) sendMessagesToPartition(configs, topic, 1, nMessages)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@ -144,8 +143,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true) val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]()) val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
// send some messages to each broker // send some messages to each broker
val sentMessages3 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) ++ val sentMessages3 = sendMessagesToPartition(configs, topic, 0, nMessages) ++
sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages) sendMessagesToPartition(configs, topic, 1, nMessages)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@ -178,8 +177,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
requestHandlerLogger.setLevel(Level.FATAL) requestHandlerLogger.setLevel(Level.FATAL)
// send some messages to each broker // send some messages to each broker
val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) ++ val sentMessages1 = sendMessagesToPartition(configs, topic, 0, nMessages, GZIPCompressionCodec) ++
sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec) sendMessagesToPartition(configs, topic, 1, nMessages, GZIPCompressionCodec)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@ -211,8 +210,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true) val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
// send some messages to each broker // send some messages to each broker
val sentMessages2 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) ++ val sentMessages2 = sendMessagesToPartition(configs, topic, 0, nMessages, GZIPCompressionCodec) ++
sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec) sendMessagesToPartition(configs, topic, 1, nMessages, GZIPCompressionCodec)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@ -232,8 +231,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true) val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int](), new StringDecoder(), new StringDecoder()) val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int](), new StringDecoder(), new StringDecoder())
// send some messages to each broker // send some messages to each broker
val sentMessages3 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) ++ val sentMessages3 = sendMessagesToPartition(configs, topic, 0, nMessages, GZIPCompressionCodec) ++
sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec) sendMessagesToPartition(configs, topic, 1, nMessages, GZIPCompressionCodec)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1)
@ -254,8 +253,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
def testCompressionSetConsumption() { def testCompressionSetConsumption() {
// send some messages to each broker // send some messages to each broker
val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec) ++ val sentMessages = sendMessagesToPartition(configs, topic, 0, 200, DefaultCompressionCodec) ++
sendMessagesToBrokerPartition(configs.last, topic, 1, 200, DefaultCompressionCodec) sendMessagesToPartition(configs, topic, 1, 200, DefaultCompressionCodec)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
@ -280,8 +279,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
requestHandlerLogger.setLevel(Level.FATAL) requestHandlerLogger.setLevel(Level.FATAL)
// send some messages to each broker // send some messages to each broker
val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, NoCompressionCodec) ++ val sentMessages = sendMessagesToPartition(configs, topic, 0, nMessages, NoCompressionCodec) ++
sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, NoCompressionCodec) sendMessagesToPartition(configs, topic, 1, nMessages, NoCompressionCodec)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
@ -321,7 +320,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers) createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
// send some messages to each broker // send some messages to each broker
val sentMessages1 = sendMessages(configs.head, nMessages, "batch1", NoCompressionCodec, 1) val sentMessages1 = sendMessages(configs, topic, "producer1", nMessages, "batch1", NoCompressionCodec, 1)
// create a consumer // create a consumer
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
@ -345,70 +344,6 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
zkClient.close() zkClient.close()
} }
def sendMessagesToBrokerPartition(config: KafkaConfig,
topic: String,
partition: Int,
numMessages: Int,
compression: CompressionCodec = NoCompressionCodec): List[String] = {
val header = "test-%d-%d".format(config.brokerId, partition)
val props = new Properties()
props.put("compression.codec", compression.codec.toString)
val producer: Producer[Int, String] =
createProducer(TestUtils.getBrokerListStrFromConfigs(configs),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[IntEncoder].getName,
partitioner = classOf[FixedValuePartitioner].getName,
producerProps = props)
val ms = 0.until(numMessages).map(x => header + config.brokerId + "-" + partition + "-" + x)
producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
debug("Sent %d messages to broker %d for partition [%s,%d]".format(ms.size, config.brokerId, topic, partition))
producer.close()
ms.toList
}
def sendMessages(config: KafkaConfig,
messagesPerNode: Int,
header: String,
compression: CompressionCodec,
numParts: Int): List[String]= {
var messages: List[String] = Nil
val props = new Properties()
props.put("compression.codec", compression.codec.toString)
val producer: Producer[Int, String] =
createProducer(brokerList = TestUtils.getBrokerListStrFromConfigs(configs),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[IntEncoder].getName,
partitioner = classOf[FixedValuePartitioner].getName,
producerProps = props)
for (partition <- 0 until numParts) {
val ms = 0.until(messagesPerNode).map(x => header + config.brokerId + "-" + partition + "-" + x)
producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
messages ++= ms
debug("Sent %d messages to broker %d for partition [%s,%d]".format(ms.size, config.brokerId, topic, partition))
}
producer.close()
messages
}
def getMessages(nMessagesPerThread: Int,
topicMessageStreams: Map[String,List[KafkaStream[String, String]]]): List[String]= {
var messages: List[String] = Nil
for((topic, messageStreams) <- topicMessageStreams) {
for (messageStream <- messageStreams) {
val iterator = messageStream.iterator
for(i <- 0 until nMessagesPerThread) {
assertTrue(iterator.hasNext)
val message = iterator.next.message
messages ::= message
debug("received message: " + message)
}
}
}
messages.reverse
}
def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = { def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = {
val children = zkClient.getChildren(path) val children = zkClient.getChildren(path)
Collections.sort(children) Collections.sort(children)

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.consumer
import com.yammer.metrics.Metrics
import junit.framework.Assert._
import kafka.integration.KafkaServerTestHarness
import kafka.server._
import scala.collection._
import org.scalatest.junit.JUnit3Suite
import kafka.message._
import kafka.serializer._
import kafka.utils._
import kafka.utils.TestUtils._
class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging {
val zookeeperConnect = TestZKUtils.zookeeperConnect
val numNodes = 2
val numParts = 2
val topic = "topic1"
val configs =
for (props <- TestUtils.createBrokerConfigs(numNodes))
yield new KafkaConfig(props) {
override val zkConnect = zookeeperConnect
override val numPartitions = numParts
}
val nMessages = 2
override def tearDown() {
super.tearDown()
}
def testMetricsLeak() {
// create topic topic1 with 1 partition on broker 0
createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
// force creation not client's specific metrics.
createAndShutdownStep("group0", "consumer0", "producer0")
val countOfStaticMetrics = Metrics.defaultRegistry().allMetrics().keySet().size
for (i <- 0 to 5) {
createAndShutdownStep("group" + i % 3, "consumer" + i % 2, "producer" + i % 2)
assertEquals(countOfStaticMetrics, Metrics.defaultRegistry().allMetrics().keySet().size)
}
}
def createAndShutdownStep(group: String, consumerId: String, producerId: String): Unit = {
val sentMessages1 = sendMessages(configs, topic, producerId, nMessages, "batch1", NoCompressionCodec, 1)
// create a consumer
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumerId))
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder())
val receivedMessages1 = getMessages(nMessages, topicMessageStreams1)
zkConsumerConnector1.shutdown()
}
}

View File

@ -26,7 +26,6 @@ import java.util.Properties
import org.apache.kafka.common.utils.Utils._ import org.apache.kafka.common.utils.Utils._
import collection.mutable.Map
import collection.mutable.ListBuffer import collection.mutable.ListBuffer
import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.ZkClient
@ -36,7 +35,7 @@ import kafka.producer._
import kafka.message._ import kafka.message._
import kafka.api._ import kafka.api._
import kafka.cluster.Broker import kafka.cluster.Broker
import kafka.consumer.ConsumerConfig import kafka.consumer.{KafkaStream, ConsumerConfig}
import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder} import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
import kafka.common.TopicAndPartition import kafka.common.TopicAndPartition
import kafka.admin.AdminUtils import kafka.admin.AdminUtils
@ -47,6 +46,8 @@ import junit.framework.AssertionFailedError
import junit.framework.Assert._ import junit.framework.Assert._
import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.KafkaProducer
import scala.collection.Map
/** /**
* Utility functions to help with testing * Utility functions to help with testing
*/ */
@ -483,7 +484,7 @@ object TestUtils extends Logging {
val data = topics.flatMap(topic => val data = topics.flatMap(topic =>
partitions.map(partition => (TopicAndPartition(topic, partition), message)) partitions.map(partition => (TopicAndPartition(topic, partition), message))
) )
new ProducerRequest(correlationId, clientId, acks.toShort, timeout, Map(data:_*)) new ProducerRequest(correlationId, clientId, acks.toShort, timeout, collection.mutable.Map(data:_*))
} }
def makeLeaderForPartition(zkClient: ZkClient, topic: String, def makeLeaderForPartition(zkClient: ZkClient, topic: String,
@ -720,6 +721,73 @@ object TestUtils extends Logging {
time = time, time = time,
brokerState = new BrokerState()) brokerState = new BrokerState())
} }
def sendMessagesToPartition(configs: Seq[KafkaConfig],
topic: String,
partition: Int,
numMessages: Int,
compression: CompressionCodec = NoCompressionCodec): List[String] = {
val header = "test-%d".format(partition)
val props = new Properties()
props.put("compression.codec", compression.codec.toString)
val producer: Producer[Int, String] =
createProducer(TestUtils.getBrokerListStrFromConfigs(configs),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[IntEncoder].getName,
partitioner = classOf[FixedValuePartitioner].getName,
producerProps = props)
val ms = 0.until(numMessages).map(x => header + "-" + x)
producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
debug("Sent %d messages for partition [%s,%d]".format(ms.size, topic, partition))
producer.close()
ms.toList
}
def sendMessages(configs: Seq[KafkaConfig],
topic: String,
producerId: String,
messagesPerNode: Int,
header: String,
compression: CompressionCodec,
numParts: Int): List[String]= {
var messages: List[String] = Nil
val props = new Properties()
props.put("compression.codec", compression.codec.toString)
props.put("client.id", producerId)
val producer: Producer[Int, String] =
createProducer(brokerList = TestUtils.getBrokerListStrFromConfigs(configs),
encoder = classOf[StringEncoder].getName,
keyEncoder = classOf[IntEncoder].getName,
partitioner = classOf[FixedValuePartitioner].getName,
producerProps = props)
for (partition <- 0 until numParts) {
val ms = 0.until(messagesPerNode).map(x => header + "-" + partition + "-" + x)
producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*)
messages ++= ms
debug("Sent %d messages for partition [%s,%d]".format(ms.size, topic, partition))
}
producer.close()
messages
}
def getMessages(nMessagesPerThread: Int,
topicMessageStreams: Map[String, List[KafkaStream[String, String]]]): List[String] = {
var messages: List[String] = Nil
for ((topic, messageStreams) <- topicMessageStreams) {
for (messageStream <- messageStreams) {
val iterator = messageStream.iterator
for (i <- 0 until nMessagesPerThread) {
assertTrue(iterator.hasNext)
val message = iterator.next.message
messages ::= message
debug("received message: " + message)
}
}
}
messages.reverse
}
} }
object TestZKUtils { object TestZKUtils {