diff --git a/build.gradle b/build.gradle index 11eb11355ef..030af63553a 100644 --- a/build.gradle +++ b/build.gradle @@ -263,6 +263,12 @@ project(':core') { dependsOn 'copyDependantLibs' } + jar.manifest { + attributes( + 'Version': "${version}" + ) + } + task testJar(type: Jar) { classifier = 'test' from sourceSets.test.output diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 1be57008e98..b9fde2aacbb 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -21,7 +21,7 @@ import kafka.admin.AdminUtils import kafka.utils._ import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.log.LogConfig -import kafka.server.{TopicPartitionRequestKey, LogOffsetMetadata, OffsetManager, ReplicaManager} +import kafka.server.{LogOffsetMetadata, OffsetManager, ReplicaManager} import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController import kafka.message.ByteBufferMessageSet @@ -29,7 +29,6 @@ import kafka.message.ByteBufferMessageSet import java.io.IOException import java.util.concurrent.locks.ReentrantReadWriteLock import kafka.utils.Utils.{inReadLock,inWriteLock} -import scala.Some import scala.collection.immutable.Set import com.yammer.metrics.core.Gauge @@ -62,13 +61,13 @@ class Partition(val topic: String, private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId) - newGauge( - topic + "-" + partitionId + "-UnderReplicated", + newGauge("UnderReplicated", new Gauge[Int] { def value = { if (isUnderReplicated) 1 else 0 } - } + }, + Map("topic" -> topic, "partition" -> partitionId.toString) ) def isUnderReplicated(): Boolean = { @@ -345,7 +344,7 @@ class Partition(val topic: String, leaderReplica.highWatermark = newHighWatermark debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark)) // some delayed requests may be unblocked after HW changed - val requestKey = new TopicPartitionRequestKey(this.topic, this.partitionId) + val requestKey = new TopicAndPartition(this.topic, this.partitionId) replicaManager.tryCompleteDelayedFetch(requestKey) replicaManager.tryCompleteDelayedProduce(requestKey) } else { @@ -415,7 +414,7 @@ class Partition(val topic: String, val info = log.append(messages, assignOffsets = true) // probably unblock some follower fetch requests since log end offset has been updated - replicaManager.tryCompleteDelayedFetch(new TopicPartitionRequestKey(this.topic, this.partitionId)) + replicaManager.tryCompleteDelayedFetch(new TopicAndPartition(this.topic, this.partitionId)) // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(leaderReplica) info diff --git a/core/src/main/scala/kafka/common/AppInfo.scala b/core/src/main/scala/kafka/common/AppInfo.scala new file mode 100644 index 00000000000..d642ca555f8 --- /dev/null +++ b/core/src/main/scala/kafka/common/AppInfo.scala @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.common + +import java.net.URL +import java.util.jar.{Attributes, Manifest} + +import com.yammer.metrics.core.Gauge +import kafka.metrics.KafkaMetricsGroup + +object AppInfo extends KafkaMetricsGroup { + private var isRegistered = false + private val lock = new Object() + + def registerInfo(): Unit = { + lock.synchronized { + if (isRegistered) { + return + } + } + + try { + val clazz = AppInfo.getClass + val className = clazz.getSimpleName + ".class" + val classPath = clazz.getResource(className).toString + if (!classPath.startsWith("jar")) { + // Class not from JAR + return + } + val manifestPath = classPath.substring(0, classPath.lastIndexOf("!") + 1) + "/META-INF/MANIFEST.MF" + + val mf = new Manifest + mf.read(new URL(manifestPath).openStream()) + val version = mf.getMainAttributes.get(new Attributes.Name("Version")).toString + + newGauge("Version", + new Gauge[String] { + def value = { + version + } + }) + + lock.synchronized { + isRegistered = true + } + } catch { + case e: Exception => + warn("Can't read Kafka version from MANIFEST.MF. Possible cause: %s".format(e)) + } + } +} diff --git a/core/src/main/scala/kafka/common/ClientIdAndBroker.scala b/core/src/main/scala/kafka/common/ClientIdAndBroker.scala index 93223a9c93b..3b09041d33a 100644 --- a/core/src/main/scala/kafka/common/ClientIdAndBroker.scala +++ b/core/src/main/scala/kafka/common/ClientIdAndBroker.scala @@ -8,7 +8,7 @@ package kafka.common * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,6 +21,14 @@ package kafka.common * Convenience case class since (clientId, brokerInfo) pairs are used to create * SyncProducer Request Stats and SimpleConsumer Request and Response Stats. */ -case class ClientIdAndBroker(clientId: String, brokerInfo: String) { - override def toString = "%s-%s".format(clientId, brokerInfo) + +trait ClientIdBroker { +} + +case class ClientIdAndBroker(clientId: String, brokerHost: String, brokerPort: Int) extends ClientIdBroker { + override def toString = "%s-%s-%d".format(clientId, brokerHost, brokerPort) +} + +case class ClientIdAllBrokers(clientId: String) extends ClientIdBroker { + override def toString = "%s-%s".format(clientId, "AllBrokers") } diff --git a/core/src/main/scala/kafka/common/ClientIdAndTopic.scala b/core/src/main/scala/kafka/common/ClientIdAndTopic.scala index 7acf9e76bdd..5825aad2c8d 100644 --- a/core/src/main/scala/kafka/common/ClientIdAndTopic.scala +++ b/core/src/main/scala/kafka/common/ClientIdAndTopic.scala @@ -1,5 +1,3 @@ -package kafka.common - /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -17,11 +15,21 @@ package kafka.common * limitations under the License. */ +package kafka.common + /** * Convenience case class since (clientId, topic) pairs are used in the creation * of many Stats objects. */ -case class ClientIdAndTopic(clientId: String, topic: String) { +trait ClientIdTopic { +} + +case class ClientIdAndTopic(clientId: String, topic: String) extends ClientIdTopic { override def toString = "%s-%s".format(clientId, topic) } +case class ClientIdAllTopics(clientId: String) extends ClientIdTopic { + override def toString = "%s-%s".format(clientId, "AllTopics") +} + + diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index f8c1b4e674f..ee6139c9010 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -30,7 +30,7 @@ class ConsumerFetcherThread(name: String, partitionMap: Map[TopicAndPartition, PartitionTopicInfo], val consumerFetcherManager: ConsumerFetcherManager) extends AbstractFetcherThread(name = name, - clientId = config.clientId + "-" + name, + clientId = config.clientId, sourceBroker = sourceBroker, socketTimeout = config.socketTimeoutMs, socketBufferSize = config.socketReceiveBufferBytes, diff --git a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala index f63e6c59bb1..01797ff766a 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerTopicStats.scala @@ -20,12 +20,17 @@ package kafka.consumer import kafka.utils.{Pool, threadsafe, Logging} import java.util.concurrent.TimeUnit import kafka.metrics.KafkaMetricsGroup -import kafka.common.ClientIdAndTopic +import kafka.common.{ClientIdTopic, ClientIdAllTopics, ClientIdAndTopic} @threadsafe -class ConsumerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup { - val messageRate = newMeter(metricId + "MessagesPerSec", "messages", TimeUnit.SECONDS) - val byteRate = newMeter(metricId + "BytesPerSec", "bytes", TimeUnit.SECONDS) +class ConsumerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup { + val tags = metricId match { + case ClientIdAndTopic(clientId, topic) => Map("clientId" -> clientId, "topic" -> topic) + case ClientIdAllTopics(clientId) => Map("clientId" -> clientId) + } + + val messageRate = newMeter("MessagesPerSec", "messages", TimeUnit.SECONDS, tags) + val byteRate = newMeter("BytesPerSec", "bytes", TimeUnit.SECONDS, tags) } /** @@ -35,12 +40,12 @@ class ConsumerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup class ConsumerTopicStats(clientId: String) extends Logging { private val valueFactory = (k: ClientIdAndTopic) => new ConsumerTopicMetrics(k) private val stats = new Pool[ClientIdAndTopic, ConsumerTopicMetrics](Some(valueFactory)) - private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics")) // to differentiate from a topic named AllTopics + private val allTopicStats = new ConsumerTopicMetrics(new ClientIdAllTopics(clientId)) // to differentiate from a topic named AllTopics def getConsumerAllTopicStats(): ConsumerTopicMetrics = allTopicStats def getConsumerTopicStats(topic: String): ConsumerTopicMetrics = { - stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic + "-")) + stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic)) } } diff --git a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala index 5243f415288..3df55e13001 100644 --- a/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala +++ b/core/src/main/scala/kafka/consumer/FetchRequestAndResponseStats.scala @@ -19,13 +19,21 @@ package kafka.consumer import java.util.concurrent.TimeUnit -import kafka.common.ClientIdAndBroker +import kafka.common.{ClientIdAllBrokers, ClientIdBroker, ClientIdAndBroker} import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} import kafka.utils.Pool -class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup { - val requestTimer = new KafkaTimer(newTimer(metricId + "FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) - val requestSizeHist = newHistogram(metricId + "FetchResponseSize") +class FetchRequestAndResponseMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup { + val tags = metricId match { + case ClientIdAndBroker(clientId, brokerHost, brokerPort) => + Map("clientId" -> clientId, "brokerHost" -> brokerHost, + "brokerPort" -> brokerPort.toString) + case ClientIdAllBrokers(clientId) => + Map("clientId" -> clientId) + } + + val requestTimer = new KafkaTimer(newTimer("FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, tags)) + val requestSizeHist = newHistogram("FetchResponseSize", biased = true, tags) } /** @@ -33,14 +41,14 @@ class FetchRequestAndResponseMetrics(metricId: ClientIdAndBroker) extends KafkaM * @param clientId ClientId of the given consumer */ class FetchRequestAndResponseStats(clientId: String) { - private val valueFactory = (k: ClientIdAndBroker) => new FetchRequestAndResponseMetrics(k) - private val stats = new Pool[ClientIdAndBroker, FetchRequestAndResponseMetrics](Some(valueFactory)) - private val allBrokersStats = new FetchRequestAndResponseMetrics(new ClientIdAndBroker(clientId, "AllBrokers")) + private val valueFactory = (k: ClientIdBroker) => new FetchRequestAndResponseMetrics(k) + private val stats = new Pool[ClientIdBroker, FetchRequestAndResponseMetrics](Some(valueFactory)) + private val allBrokersStats = new FetchRequestAndResponseMetrics(new ClientIdAllBrokers(clientId)) def getFetchRequestAndResponseAllBrokersStats(): FetchRequestAndResponseMetrics = allBrokersStats - def getFetchRequestAndResponseStats(brokerInfo: String): FetchRequestAndResponseMetrics = { - stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo + "-")) + def getFetchRequestAndResponseStats(brokerHost: String, brokerPort: Int): FetchRequestAndResponseMetrics = { + stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerHost, brokerPort)) } } @@ -56,7 +64,7 @@ object FetchRequestAndResponseStatsRegistry { } def removeConsumerFetchRequestAndResponseStats(clientId: String) { - val pattern = (clientId + "-ConsumerFetcherThread.*").r + val pattern = (".*" + clientId + ".*").r val keys = globalStats.keys for (key <- keys) { pattern.findFirstIn(key) match { diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index d349a3000fe..e53ee51638b 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -36,7 +36,6 @@ class SimpleConsumer(val host: String, ConsumerConfig.validateClientId(clientId) private val lock = new Object() private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout) - val brokerInfo = "host_%s-port_%s".format(host, port) private val fetchRequestAndResponseStats = FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId) private var isClosed = false @@ -106,7 +105,7 @@ class SimpleConsumer(val host: String, */ def fetch(request: FetchRequest): FetchResponse = { var response: Receive = null - val specificTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseStats(brokerInfo).requestTimer + val specificTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestTimer val aggregateTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestTimer aggregateTimer.time { specificTimer.time { @@ -115,7 +114,7 @@ class SimpleConsumer(val host: String, } val fetchResponse = FetchResponse.readFrom(response.buffer) val fetchedSize = fetchResponse.sizeInBytes - fetchRequestAndResponseStats.getFetchRequestAndResponseStats(brokerInfo).requestSizeHist.update(fetchedSize) + fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestSizeHist.update(fetchedSize) fetchRequestAndResponseStats.getFetchRequestAndResponseAllBrokersStats.requestSizeHist.update(fetchedSize) fetchResponse } diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index 4b6dc072b2d..3e1718bc7ca 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -104,9 +104,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null // useful for tracking migration of consumers to store offsets in kafka - private val kafkaCommitMeter = newMeter(config.clientId + "-KafkaCommitsPerSec", "commits", TimeUnit.SECONDS) - private val zkCommitMeter = newMeter(config.clientId + "-ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS) - private val rebalanceTimer = new KafkaTimer(newTimer(config.clientId + "-RebalanceRateAndTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) + private val kafkaCommitMeter = newMeter("KafkaCommitsPerSec", "commits", TimeUnit.SECONDS, Map("clientId" -> config.clientId)) + private val zkCommitMeter = newMeter("ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS, Map("clientId" -> config.clientId)) + private val rebalanceTimer = new KafkaTimer(newTimer("RebalanceRateAndTime", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, Map("clientId" -> config.clientId))) val consumerIdString = { var consumerUuid : String = null @@ -138,6 +138,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, } KafkaMetricsReporter.startReporters(config.props) + AppInfo.registerInfo() def this(config: ConsumerConfig) = this(config, true) @@ -521,14 +522,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, private var isWatcherTriggered = false private val lock = new ReentrantLock private val cond = lock.newCondition() - - @volatile private var allTopicsOwnedPartitionsCount = 0 - newGauge(config.clientId + "-" + config.groupId + "-AllTopicsOwnedPartitionsCount", new Gauge[Int] { - def value() = allTopicsOwnedPartitionsCount - }) - private def ownedPartitionsCountMetricName(topic: String) = - "%s-%s-%s-OwnedPartitionsCount".format(config.clientId, config.groupId, topic) + @volatile private var allTopicsOwnedPartitionsCount = 0 + newGauge("OwnedPartitionsCount", + new Gauge[Int] { + def value() = allTopicsOwnedPartitionsCount + }, + Map("clientId" -> config.clientId, "groupId" -> config.groupId)) + + private def ownedPartitionsCountMetricTags(topic: String) = Map("clientId" -> config.clientId, "groupId" -> config.groupId, "topic" -> topic) private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") { override def run() { @@ -581,7 +583,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, for(partition <- infos.keys) { deletePartitionOwnershipFromZK(topic, partition) } - removeMetric(ownedPartitionsCountMetricName(topic)) + removeMetric("OwnedPartitionsCount", ownedPartitionsCountMetricTags(topic)) localTopicRegistry.remove(topic) } allTopicsOwnedPartitionsCount = 0 @@ -684,9 +686,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, partitionOwnershipDecision.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic } .foreach { case (topic, partitionThreadPairs) => - newGauge(ownedPartitionsCountMetricName(topic), new Gauge[Int] { - def value() = partitionThreadPairs.size - }) + newGauge("OwnedPartitionsCount", + new Gauge[Int] { + def value() = partitionThreadPairs.size + }, + ownedPartitionsCountMetricTags(topic)) } topicRegistry = currentTopicRegistry @@ -868,10 +872,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, topicThreadIdAndQueues.put(topicThreadId, q) debug("Adding topicThreadId %s and queue %s to topicThreadIdAndQueues data structure".format(topicThreadId, q.toString)) newGauge( - config.clientId + "-" + config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize", + "FetchQueueSize", new Gauge[Int] { def value = q.size - } + }, + Map("clientId" -> config.clientId, + "topic" -> topicThreadId._1, + "threadId" -> topicThreadId._2.threadId.toString) ) }) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 37b4a85cca0..4fae2f0d339 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -88,17 +88,31 @@ class Log(val dir: File, info("Completed load of log %s with log end offset %d".format(name, logEndOffset)) - newGauge(name + "-" + "NumLogSegments", - new Gauge[Int] { def value = numberOfSegments }) + val tags = Map("topic" -> topicAndPartition.topic, "partition" -> topicAndPartition.partition.toString) - newGauge(name + "-" + "LogStartOffset", - new Gauge[Long] { def value = logStartOffset }) + newGauge("NumLogSegments", + new Gauge[Int] { + def value = numberOfSegments + }, + tags) - newGauge(name + "-" + "LogEndOffset", - new Gauge[Long] { def value = logEndOffset }) - - newGauge(name + "-" + "Size", - new Gauge[Long] {def value = size}) + newGauge("LogStartOffset", + new Gauge[Long] { + def value = logStartOffset + }, + tags) + + newGauge("LogEndOffset", + new Gauge[Long] { + def value = logEndOffset + }, + tags) + + newGauge("Size", + new Gauge[Long] { + def value = size + }, + tags) /** The name of this log */ def name = dir.getName() @@ -168,7 +182,7 @@ class Log(val dir: File, if(logSegments.size == 0) { // no existing segments, create a new mutable segment beginning at offset 0 - segments.put(0, new LogSegment(dir = dir, + segments.put(0L, new LogSegment(dir = dir, startOffset = 0, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala index 2313a57d02c..e9e49180f6d 100644 --- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala +++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -35,29 +35,52 @@ trait KafkaMetricsGroup extends Logging { * Creates a new MetricName object for gauges, meters, etc. created for this * metrics group. * @param name Descriptive name of the metric. + * @param tags Additional attributes which mBean will have. * @return Sanitized metric name object. */ - private def metricName(name: String) = { + private def metricName(name: String, tags: scala.collection.Map[String, String] = Map.empty) = { val klass = this.getClass val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName val simpleName = klass.getSimpleName.replaceAll("\\$$", "") - new MetricName(pkg, simpleName, name) + + explicitMetricName(pkg, simpleName, name, tags) } - def newGauge[T](name: String, metric: Gauge[T]) = - Metrics.defaultRegistry().newGauge(metricName(name), metric) - def newMeter(name: String, eventType: String, timeUnit: TimeUnit) = - Metrics.defaultRegistry().newMeter(metricName(name), eventType, timeUnit) + private def explicitMetricName(group: String, typeName: String, name: String, tags: scala.collection.Map[String, String] = Map.empty) = { + val nameBuilder: StringBuilder = new StringBuilder - def newHistogram(name: String, biased: Boolean = true) = - Metrics.defaultRegistry().newHistogram(metricName(name), biased) + nameBuilder.append(group) - def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit) = - Metrics.defaultRegistry().newTimer(metricName(name), durationUnit, rateUnit) + nameBuilder.append(":type=") + + nameBuilder.append(typeName) + + if (name.length > 0) { + nameBuilder.append(",name=") + nameBuilder.append(name) + } + + KafkaMetricsGroup.toMBeanName(tags).map(mbeanName => nameBuilder.append(",").append(mbeanName)) + + new MetricName(group, typeName, name, null, nameBuilder.toString()) + } + + def newGauge[T](name: String, metric: Gauge[T], tags: scala.collection.Map[String, String] = Map.empty) = + Metrics.defaultRegistry().newGauge(metricName(name, tags), metric) + + def newMeter(name: String, eventType: String, timeUnit: TimeUnit, tags: scala.collection.Map[String, String] = Map.empty) = + Metrics.defaultRegistry().newMeter(metricName(name, tags), eventType, timeUnit) + + def newHistogram(name: String, biased: Boolean = true, tags: scala.collection.Map[String, String] = Map.empty) = + Metrics.defaultRegistry().newHistogram(metricName(name, tags), biased) + + def newTimer(name: String, durationUnit: TimeUnit, rateUnit: TimeUnit, tags: scala.collection.Map[String, String] = Map.empty) = + Metrics.defaultRegistry().newTimer(metricName(name, tags), durationUnit, rateUnit) + + def removeMetric(name: String, tags: scala.collection.Map[String, String] = Map.empty) = + Metrics.defaultRegistry().removeMetric(metricName(name, tags)) - def removeMetric(name: String) = - Metrics.defaultRegistry().removeMetric(metricName(name)) } @@ -68,72 +91,75 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging { */ private val consumerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName]( // kafka.consumer.ZookeeperConsumerConnector - new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-FetchQueueSize"), - new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-KafkaCommitsPerSec"), - new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-ZooKeeperCommitsPerSec"), - new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-RebalanceRateAndTime"), - new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "-OwnedPartitionsCount"), - new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "AllTopicsOwnedPartitionsCount"), + new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "FetchQueueSize"), + new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "KafkaCommitsPerSec"), + new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "ZooKeeperCommitsPerSec"), + new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "RebalanceRateAndTime"), + new MetricName("kafka.consumer", "ZookeeperConsumerConnector", "OwnedPartitionsCount"), // kafka.consumer.ConsumerFetcherManager - new MetricName("kafka.consumer", "ConsumerFetcherManager", "-MaxLag"), - new MetricName("kafka.consumer", "ConsumerFetcherManager", "-MinFetchRate"), + new MetricName("kafka.consumer", "ConsumerFetcherManager", "MaxLag"), + new MetricName("kafka.consumer", "ConsumerFetcherManager", "MinFetchRate"), // kafka.server.AbstractFetcherThread <-- kafka.consumer.ConsumerFetcherThread - new MetricName("kafka.server", "FetcherLagMetrics", "-ConsumerLag"), + new MetricName("kafka.server", "FetcherLagMetrics", "ConsumerLag"), // kafka.consumer.ConsumerTopicStats <-- kafka.consumer.{ConsumerIterator, PartitionTopicInfo} - new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-MessagesPerSec"), - new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-AllTopicsMessagesPerSec"), + new MetricName("kafka.consumer", "ConsumerTopicMetrics", "MessagesPerSec"), // kafka.consumer.ConsumerTopicStats - new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-BytesPerSec"), - new MetricName("kafka.consumer", "ConsumerTopicMetrics", "-AllTopicsBytesPerSec"), + new MetricName("kafka.consumer", "ConsumerTopicMetrics", "BytesPerSec"), // kafka.server.AbstractFetcherThread <-- kafka.consumer.ConsumerFetcherThread - new MetricName("kafka.server", "FetcherStats", "-BytesPerSec"), - new MetricName("kafka.server", "FetcherStats", "-RequestsPerSec"), + new MetricName("kafka.server", "FetcherStats", "BytesPerSec"), + new MetricName("kafka.server", "FetcherStats", "RequestsPerSec"), // kafka.consumer.FetchRequestAndResponseStats <-- kafka.consumer.SimpleConsumer - new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-FetchResponseSize"), - new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-FetchRequestRateAndTimeMs"), - new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-AllBrokersFetchResponseSize"), - new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "-AllBrokersFetchRequestRateAndTimeMs"), + new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "FetchResponseSize"), + new MetricName("kafka.consumer", "FetchRequestAndResponseMetrics", "FetchRequestRateAndTimeMs"), /** * ProducerRequestStats <-- SyncProducer * metric for SyncProducer in fetchTopicMetaData() needs to be removed when consumer is closed. */ - new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestRateAndTimeMs"), - new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestSize"), - new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestRateAndTimeMs"), - new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestSize") + new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestRateAndTimeMs"), + new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestSize") ) - private val producerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName] ( + private val producerMetricNameList: immutable.List[MetricName] = immutable.List[MetricName]( // kafka.producer.ProducerStats <-- DefaultEventHandler <-- Producer - new MetricName("kafka.producer", "ProducerStats", "-SerializationErrorsPerSec"), - new MetricName("kafka.producer", "ProducerStats", "-ResendsPerSec"), - new MetricName("kafka.producer", "ProducerStats", "-FailedSendsPerSec"), + new MetricName("kafka.producer", "ProducerStats", "SerializationErrorsPerSec"), + new MetricName("kafka.producer", "ProducerStats", "ResendsPerSec"), + new MetricName("kafka.producer", "ProducerStats", "FailedSendsPerSec"), // kafka.producer.ProducerSendThread - new MetricName("kafka.producer.async", "ProducerSendThread", "-ProducerQueueSize"), + new MetricName("kafka.producer.async", "ProducerSendThread", "ProducerQueueSize"), // kafka.producer.ProducerTopicStats <-- kafka.producer.{Producer, async.DefaultEventHandler} - new MetricName("kafka.producer", "ProducerTopicMetrics", "-MessagesPerSec"), - new MetricName("kafka.producer", "ProducerTopicMetrics", "-DroppedMessagesPerSec"), - new MetricName("kafka.producer", "ProducerTopicMetrics", "-BytesPerSec"), - new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsMessagesPerSec"), - new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsDroppedMessagesPerSec"), - new MetricName("kafka.producer", "ProducerTopicMetrics", "-AllTopicsBytesPerSec"), + new MetricName("kafka.producer", "ProducerTopicMetrics", "MessagesPerSec"), + new MetricName("kafka.producer", "ProducerTopicMetrics", "DroppedMessagesPerSec"), + new MetricName("kafka.producer", "ProducerTopicMetrics", "BytesPerSec"), // kafka.producer.ProducerRequestStats <-- SyncProducer - new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestRateAndTimeMs"), - new MetricName("kafka.producer", "ProducerRequestMetrics", "-ProducerRequestSize"), - new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestRateAndTimeMs"), - new MetricName("kafka.producer", "ProducerRequestMetrics", "-AllBrokersProducerRequestSize") + new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestRateAndTimeMs"), + new MetricName("kafka.producer", "ProducerRequestMetrics", "ProducerRequestSize") ) + private def toMBeanName(tags: collection.Map[String, String]): Option[String] = { + val filteredTags = tags + .filter { case (tagKey, tagValue) => tagValue != ""} + if (filteredTags.nonEmpty) { + val tagsString = filteredTags + .map { case (key, value) => "%s=%s".format(key, value)} + .mkString(",") + + Some(tagsString) + } + else { + None + } + } + def removeAllConsumerMetrics(clientId: String) { FetchRequestAndResponseStatsRegistry.removeConsumerFetchRequestAndResponseStats(clientId) ConsumerTopicStatsRegistry.removeConsumerTopicStat(clientId) @@ -150,18 +176,19 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging { private def removeAllMetricsInList(metricNameList: immutable.List[MetricName], clientId: String) { metricNameList.foreach(metric => { - val pattern = (clientId + ".*" + metric.getName +".*").r + val pattern = (".*clientId=" + clientId + ".*").r val registeredMetrics = scala.collection.JavaConversions.asScalaSet(Metrics.defaultRegistry().allMetrics().keySet()) for (registeredMetric <- registeredMetrics) { if (registeredMetric.getGroup == metric.getGroup && + registeredMetric.getName == metric.getName && registeredMetric.getType == metric.getType) { - pattern.findFirstIn(registeredMetric.getName) match { + pattern.findFirstIn(registeredMetric.getMBeanName) match { case Some(_) => { val beforeRemovalSize = Metrics.defaultRegistry().allMetrics().keySet().size Metrics.defaultRegistry().removeMetric(registeredMetric) val afterRemovalSize = Metrics.defaultRegistry().allMetrics().keySet().size trace("Removing metric %s. Metrics registry size reduced from %d to %d".format( - registeredMetric, beforeRemovalSize, afterRemovalSize)) + registeredMetric, beforeRemovalSize, afterRemovalSize)) } case _ => } diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 4560d8fb7db..7b1db3dbbb2 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -125,12 +125,12 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe def value = responseQueues.foldLeft(0) {(total, q) => total + q.size()} }) - for(i <- 0 until numProcessors) { - newGauge( - "Processor-" + i + "-ResponseQueueSize", + for (i <- 0 until numProcessors) { + newGauge("ResponseQueueSize", new Gauge[Int] { def value = responseQueues(i).size() - } + }, + Map("processor" -> i.toString) ) } @@ -187,24 +187,25 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe object RequestMetrics { val metricsMap = new scala.collection.mutable.HashMap[String, RequestMetrics] - val consumerFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Consumer" - val followFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Follower" + val consumerFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "Consumer" + val followFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "Follower" (RequestKeys.keyToNameAndDeserializerMap.values.map(e => e._1) ++ List(consumerFetchMetricName, followFetchMetricName)).foreach(name => metricsMap.put(name, new RequestMetrics(name))) } class RequestMetrics(name: String) extends KafkaMetricsGroup { - val requestRate = newMeter(name + "-RequestsPerSec", "requests", TimeUnit.SECONDS) + val tags = Map("request" -> name) + val requestRate = newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags) // time a request spent in a request queue - val requestQueueTimeHist = newHistogram(name + "-RequestQueueTimeMs") + val requestQueueTimeHist = newHistogram("RequestQueueTimeMs", biased = true, tags) // time a request takes to be processed at the local broker - val localTimeHist = newHistogram(name + "-LocalTimeMs") + val localTimeHist = newHistogram("LocalTimeMs", biased = true, tags) // time a request takes to wait on remote brokers (only relevant to fetch and produce requests) - val remoteTimeHist = newHistogram(name + "-RemoteTimeMs") + val remoteTimeHist = newHistogram("RemoteTimeMs", biased = true, tags) // time a response spent in a response queue - val responseQueueTimeHist = newHistogram(name + "-ResponseQueueTimeMs") + val responseQueueTimeHist = newHistogram("ResponseQueueTimeMs", biased = true, tags) // time to send the response to the requester - val responseSendTimeHist = newHistogram(name + "-ResponseSendTimeMs") - val totalTimeHist = newHistogram(name + "-TotalTimeMs") + val responseSendTimeHist = newHistogram("ResponseSendTimeMs", biased = true, tags) + val totalTimeHist = newHistogram("TotalTimeMs", biased = true, tags) } diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index cee76b323e5..e451592fe35 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -67,7 +67,7 @@ class SocketServer(val brokerId: Int, time, maxRequestSize, aggregateIdleMeter, - newMeter("NetworkProcessor-" + i + "-IdlePercent", "percent", TimeUnit.NANOSECONDS), + newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)), numProcessorThreads, requestChannel, quotas, diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala index cd634f653ca..e38d2fa7ec8 100644 --- a/core/src/main/scala/kafka/producer/Producer.scala +++ b/core/src/main/scala/kafka/producer/Producer.scala @@ -19,7 +19,7 @@ package kafka.producer import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} -import kafka.common.QueueFullException +import kafka.common.{AppInfo, QueueFullException} import kafka.metrics._ import kafka.producer.async.{DefaultEventHandler, EventHandler, ProducerSendThread} import kafka.serializer.Encoder @@ -53,6 +53,7 @@ class Producer[K,V](val config: ProducerConfig, private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId) KafkaMetricsReporter.startReporters(config.props) + AppInfo.registerInfo() def this(config: ProducerConfig) = this(config, diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala index 1c46d729d82..026e93a2f1d 100644 --- a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala +++ b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala @@ -19,11 +19,16 @@ package kafka.producer import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} import java.util.concurrent.TimeUnit import kafka.utils.Pool -import kafka.common.ClientIdAndBroker +import kafka.common.{ClientIdAllBrokers, ClientIdBroker, ClientIdAndBroker} -class ProducerRequestMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGroup { - val requestTimer = new KafkaTimer(newTimer(metricId + "ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) - val requestSizeHist = newHistogram(metricId + "ProducerRequestSize") +class ProducerRequestMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup { + val tags = metricId match { + case ClientIdAndBroker(clientId, brokerHost, brokerPort) => Map("clientId" -> clientId, "brokerHost" -> brokerHost, "brokerPort" -> brokerPort.toString) + case ClientIdAllBrokers(clientId) => Map("clientId" -> clientId) + } + + val requestTimer = new KafkaTimer(newTimer("ProducerRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS, tags)) + val requestSizeHist = newHistogram("ProducerRequestSize", biased = true, tags) } /** @@ -31,14 +36,14 @@ class ProducerRequestMetrics(metricId: ClientIdAndBroker) extends KafkaMetricsGr * @param clientId ClientId of the given producer */ class ProducerRequestStats(clientId: String) { - private val valueFactory = (k: ClientIdAndBroker) => new ProducerRequestMetrics(k) - private val stats = new Pool[ClientIdAndBroker, ProducerRequestMetrics](Some(valueFactory)) - private val allBrokersStats = new ProducerRequestMetrics(new ClientIdAndBroker(clientId, "AllBrokers")) + private val valueFactory = (k: ClientIdBroker) => new ProducerRequestMetrics(k) + private val stats = new Pool[ClientIdBroker, ProducerRequestMetrics](Some(valueFactory)) + private val allBrokersStats = new ProducerRequestMetrics(new ClientIdAllBrokers(clientId)) def getProducerRequestAllBrokersStats(): ProducerRequestMetrics = allBrokersStats - def getProducerRequestStats(brokerInfo: String): ProducerRequestMetrics = { - stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerInfo + "-")) + def getProducerRequestStats(brokerHost: String, brokerPort: Int): ProducerRequestMetrics = { + stats.getAndMaybePut(new ClientIdAndBroker(clientId, brokerHost, brokerPort)) } } diff --git a/core/src/main/scala/kafka/producer/ProducerStats.scala b/core/src/main/scala/kafka/producer/ProducerStats.scala index 35e3aae2f81..1d0fa888c99 100644 --- a/core/src/main/scala/kafka/producer/ProducerStats.scala +++ b/core/src/main/scala/kafka/producer/ProducerStats.scala @@ -21,9 +21,10 @@ import java.util.concurrent.TimeUnit import kafka.utils.Pool class ProducerStats(clientId: String) extends KafkaMetricsGroup { - val serializationErrorRate = newMeter(clientId + "-SerializationErrorsPerSec", "errors", TimeUnit.SECONDS) - val resendRate = newMeter(clientId + "-ResendsPerSec", "resends", TimeUnit.SECONDS) - val failedSendRate = newMeter(clientId + "-FailedSendsPerSec", "failed sends", TimeUnit.SECONDS) + val tags: Map[String, String] = Map("clientId" -> clientId) + val serializationErrorRate = newMeter("SerializationErrorsPerSec", "errors", TimeUnit.SECONDS, tags) + val resendRate = newMeter("ResendsPerSec", "resends", TimeUnit.SECONDS, tags) + val failedSendRate = newMeter("FailedSendsPerSec", "failed sends", TimeUnit.SECONDS, tags) } /** diff --git a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala index 9bb1419dcc4..97594c83136 100644 --- a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala +++ b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala @@ -17,16 +17,21 @@ package kafka.producer import kafka.metrics.KafkaMetricsGroup -import kafka.common.ClientIdAndTopic +import kafka.common.{ClientIdTopic, ClientIdAllTopics, ClientIdAndTopic} import kafka.utils.{Pool, threadsafe} import java.util.concurrent.TimeUnit @threadsafe -class ProducerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup { - val messageRate = newMeter(metricId + "MessagesPerSec", "messages", TimeUnit.SECONDS) - val byteRate = newMeter(metricId + "BytesPerSec", "bytes", TimeUnit.SECONDS) - val droppedMessageRate = newMeter(metricId + "DroppedMessagesPerSec", "drops", TimeUnit.SECONDS) +class ProducerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup { + val tags = metricId match { + case ClientIdAndTopic(clientId, topic) => Map("clientId" -> clientId, "topic" -> topic) + case ClientIdAllTopics(clientId) => Map("clientId" -> clientId) + } + + val messageRate = newMeter("MessagesPerSec", "messages", TimeUnit.SECONDS, tags) + val byteRate = newMeter("BytesPerSec", "bytes", TimeUnit.SECONDS, tags) + val droppedMessageRate = newMeter("DroppedMessagesPerSec", "drops", TimeUnit.SECONDS, tags) } /** @@ -34,14 +39,14 @@ class ProducerTopicMetrics(metricId: ClientIdAndTopic) extends KafkaMetricsGroup * @param clientId The clientId of the given producer client. */ class ProducerTopicStats(clientId: String) { - private val valueFactory = (k: ClientIdAndTopic) => new ProducerTopicMetrics(k) - private val stats = new Pool[ClientIdAndTopic, ProducerTopicMetrics](Some(valueFactory)) - private val allTopicsStats = new ProducerTopicMetrics(new ClientIdAndTopic(clientId, "AllTopics")) // to differentiate from a topic named AllTopics + private val valueFactory = (k: ClientIdTopic) => new ProducerTopicMetrics(k) + private val stats = new Pool[ClientIdTopic, ProducerTopicMetrics](Some(valueFactory)) + private val allTopicsStats = new ProducerTopicMetrics(new ClientIdAllTopics(clientId)) // to differentiate from a topic named AllTopics def getProducerAllTopicsStats(): ProducerTopicMetrics = allTopicsStats def getProducerTopicStats(topic: String): ProducerTopicMetrics = { - stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic + "-")) + stats.getAndMaybePut(new ClientIdAndTopic(clientId, topic)) } } diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index 35e9e8cdb38..0f09951329a 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -39,7 +39,6 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { @volatile private var shutdown: Boolean = false private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize, config.sendBufferBytes, config.requestTimeoutMs) - val brokerInfo = "host_%s-port_%s".format(config.host, config.port) val producerRequestStats = ProducerRequestStatsRegistry.getProducerRequestStats(config.clientId) trace("Instantiating Scala Sync Producer with properties: %s".format(config.props)) @@ -93,11 +92,11 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { */ def send(producerRequest: ProducerRequest): ProducerResponse = { val requestSize = producerRequest.sizeInBytes - producerRequestStats.getProducerRequestStats(brokerInfo).requestSizeHist.update(requestSize) + producerRequestStats.getProducerRequestStats(config.host, config.port).requestSizeHist.update(requestSize) producerRequestStats.getProducerRequestAllBrokersStats.requestSizeHist.update(requestSize) var response: Receive = null - val specificTimer = producerRequestStats.getProducerRequestStats(brokerInfo).requestTimer + val specificTimer = producerRequestStats.getProducerRequestStats(config.host, config.port).requestTimer val aggregateTimer = producerRequestStats.getProducerRequestAllBrokersStats.requestTimer aggregateTimer.time { specificTimer.time { @@ -134,7 +133,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { case e: Exception => error("Error on disconnect: ", e) } } - + private def connect(): BlockingChannel = { if (!blockingChannel.isConnected && !shutdown) { try { @@ -156,5 +155,4 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { connect() } } -} - +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala index 42e9c741c2d..2ccf82a6f1e 100644 --- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala +++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala @@ -34,10 +34,11 @@ class ProducerSendThread[K,V](val threadName: String, private val shutdownLatch = new CountDownLatch(1) private val shutdownCommand = new KeyedMessage[K,V]("shutdown", null.asInstanceOf[K], null.asInstanceOf[V]) - newGauge(clientId + "-ProducerQueueSize", + newGauge("ProducerQueueSize", new Gauge[Int] { def value = queue.size - }) + }, + Map("clientId" -> clientId)) override def run { try { diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala index 9390edf37da..20c00cb8cc2 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -26,7 +26,7 @@ import kafka.metrics.KafkaMetricsGroup import kafka.common.TopicAndPartition import com.yammer.metrics.core.Gauge -abstract class AbstractFetcherManager(protected val name: String, metricPrefix: String, numFetchers: Int = 1) +abstract class AbstractFetcherManager(protected val name: String, clientId: String, numFetchers: Int = 1) extends Logging with KafkaMetricsGroup { // map of (source broker_id, fetcher_id per source broker) => fetcher private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread] @@ -34,7 +34,7 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix: this.logIdent = "[" + name + "] " newGauge( - metricPrefix + "-MaxLag", + "MaxLag", new Gauge[Long] { // current max lag across all fetchers/topics/partitions def value = fetcherThreadMap.foldLeft(0L)((curMaxAll, fetcherThreadMapEntry) => { @@ -42,24 +42,25 @@ abstract class AbstractFetcherManager(protected val name: String, metricPrefix: curMaxThread.max(fetcherLagStatsEntry._2.lag) }).max(curMaxAll) }) - } + }, + Map("clientId" -> clientId) ) newGauge( - metricPrefix + "-MinFetchRate", - { - new Gauge[Double] { - // current min fetch rate across all fetchers/topics/partitions - def value = { - val headRate: Double = - fetcherThreadMap.headOption.map(_._2.fetcherStats.requestRate.oneMinuteRate).getOrElse(0) + "MinFetchRate", { + new Gauge[Double] { + // current min fetch rate across all fetchers/topics/partitions + def value = { + val headRate: Double = + fetcherThreadMap.headOption.map(_._2.fetcherStats.requestRate.oneMinuteRate).getOrElse(0) - fetcherThreadMap.foldLeft(headRate)((curMinAll, fetcherThreadMapEntry) => { - fetcherThreadMapEntry._2.fetcherStats.requestRate.oneMinuteRate.min(curMinAll) - }) - } + fetcherThreadMap.foldLeft(headRate)((curMinAll, fetcherThreadMapEntry) => { + fetcherThreadMapEntry._2.fetcherStats.requestRate.oneMinuteRate.min(curMinAll) + }) } } + }, + Map("clientId" -> clientId) ) private def getFetcherId(topic: String, partitionId: Int) : Int = { diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 2e9532e820b..8c281d4668f 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -26,9 +26,7 @@ import kafka.utils.Utils.inLock import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset} import kafka.metrics.KafkaMetricsGroup -import scala.collection.mutable -import scala.collection.Set -import scala.collection.Map +import scala.collection.{mutable, Set, Map} import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.atomic.AtomicLong @@ -46,8 +44,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke private val partitionMapLock = new ReentrantLock private val partitionMapCond = partitionMapLock.newCondition() val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId) - private val brokerInfo = "host_%s-port_%s".format(sourceBroker.host, sourceBroker.port) - private val metricId = new ClientIdAndBroker(clientId, brokerInfo) + private val metricId = new ClientIdAndBroker(clientId, sourceBroker.host, sourceBroker.port) val fetcherStats = new FetcherStats(metricId) val fetcherLagStats = new FetcherLagStats(metricId) val fetchRequestBuilder = new FetchRequestBuilder(). @@ -204,13 +201,15 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } } -class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMetricsGroup { +class FetcherLagMetrics(metricId: ClientIdTopicPartition) extends KafkaMetricsGroup { private[this] val lagVal = new AtomicLong(-1L) - newGauge( - metricId + "-ConsumerLag", + newGauge("ConsumerLag", new Gauge[Long] { def value = lagVal.get - } + }, + Map("clientId" -> metricId.clientId, + "topic" -> metricId.topic, + "partition" -> metricId.partitionId.toString) ) def lag_=(newLag: Long) { @@ -221,20 +220,25 @@ class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMet } class FetcherLagStats(metricId: ClientIdAndBroker) { - private val valueFactory = (k: ClientIdBrokerTopicPartition) => new FetcherLagMetrics(k) - val stats = new Pool[ClientIdBrokerTopicPartition, FetcherLagMetrics](Some(valueFactory)) + private val valueFactory = (k: ClientIdTopicPartition) => new FetcherLagMetrics(k) + val stats = new Pool[ClientIdTopicPartition, FetcherLagMetrics](Some(valueFactory)) def getFetcherLagStats(topic: String, partitionId: Int): FetcherLagMetrics = { - stats.getAndMaybePut(new ClientIdBrokerTopicPartition(metricId.clientId, metricId.brokerInfo, topic, partitionId)) + stats.getAndMaybePut(new ClientIdTopicPartition(metricId.clientId, topic, partitionId)) } } class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup { - val requestRate = newMeter(metricId + "-RequestsPerSec", "requests", TimeUnit.SECONDS) - val byteRate = newMeter(metricId + "-BytesPerSec", "bytes", TimeUnit.SECONDS) + val tags = Map("clientId" -> metricId.clientId, + "brokerHost" -> metricId.brokerHost, + "brokerPort" -> metricId.brokerPort.toString) + + val requestRate = newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags) + + val byteRate = newMeter("BytesPerSec", "bytes", TimeUnit.SECONDS, tags) } -case class ClientIdBrokerTopicPartition(clientId: String, brokerInfo: String, topic: String, partitionId: Int) { - override def toString = "%s-%s-%s-%d".format(clientId, brokerInfo, topic, partitionId) +case class ClientIdTopicPartition(clientId: String, topic: String, partitionId: Int) { + override def toString = "%s-%s-%d".format(clientId, topic, partitionId) } diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index 1ccbb4b6fdb..1e2e56f87a3 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -44,7 +44,6 @@ case class FetchMetadata(fetchMinBytes: Int, "onlyCommitted: " + fetchOnlyCommitted + ", " "partitionStatus: " + fetchPartitionStatus + "]" } - /** * A delayed fetch request that can be created by the replica manager and watched * in the fetch request purgatory diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 8049e07e5d6..1603066d33f 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -22,7 +22,6 @@ import kafka.api.ProducerResponseStatus import kafka.common.ErrorMapping import kafka.common.TopicAndPartition -import scala.Some import scala.collection._ case class ProducePartitionStatus(requiredOffset: Long, responseStatus: ProducerResponseStatus) { @@ -118,5 +117,4 @@ class DelayedProduce(delayMs: Long, val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus) responseCallback(responseStatus) } -} - +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/DelayedRequestKey.scala b/core/src/main/scala/kafka/server/DelayedRequestKey.scala deleted file mode 100644 index 628ef59564b..00000000000 --- a/core/src/main/scala/kafka/server/DelayedRequestKey.scala +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import kafka.common.TopicAndPartition - -/** - * Keys used for delayed request metrics recording - */ -trait DelayedRequestKey { - def keyLabel: String -} - -object DelayedRequestKey { - val globalLabel = "All" -} - -case class TopicPartitionRequestKey(topic: String, partition: Int) extends DelayedRequestKey { - - def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition) - - override def keyLabel = "%s-%d".format(topic, partition) -} diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index 00bcc06716f..e4053fbe8ef 100644 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -93,23 +93,28 @@ class KafkaRequestHandlerPool(val brokerId: Int, } } -class BrokerTopicMetrics(name: String) extends KafkaMetricsGroup { - val messagesInRate = newMeter(name + "MessagesInPerSec", "messages", TimeUnit.SECONDS) - val bytesInRate = newMeter(name + "BytesInPerSec", "bytes", TimeUnit.SECONDS) - val bytesOutRate = newMeter(name + "BytesOutPerSec", "bytes", TimeUnit.SECONDS) - val bytesRejectedRate = newMeter(name + "BytesRejectedPerSec", "bytes", TimeUnit.SECONDS) - val failedProduceRequestRate = newMeter(name + "FailedProduceRequestsPerSec", "requests", TimeUnit.SECONDS) - val failedFetchRequestRate = newMeter(name + "FailedFetchRequestsPerSec", "requests", TimeUnit.SECONDS) +class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup { + val tags: scala.collection.Map[String, String] = name match { + case None => scala.collection.Map.empty + case Some(topic) => Map("topic" -> topic) + } + + val messagesInRate = newMeter("MessagesInPerSec", "messages", TimeUnit.SECONDS, tags) + val bytesInRate = newMeter("BytesInPerSec", "bytes", TimeUnit.SECONDS, tags) + val bytesOutRate = newMeter("BytesOutPerSec", "bytes", TimeUnit.SECONDS, tags) + val bytesRejectedRate = newMeter("BytesRejectedPerSec", "bytes", TimeUnit.SECONDS, tags) + val failedProduceRequestRate = newMeter("FailedProduceRequestsPerSec", "requests", TimeUnit.SECONDS, tags) + val failedFetchRequestRate = newMeter("FailedFetchRequestsPerSec", "requests", TimeUnit.SECONDS, tags) } object BrokerTopicStats extends Logging { - private val valueFactory = (k: String) => new BrokerTopicMetrics(k) + private val valueFactory = (k: String) => new BrokerTopicMetrics(Some(k)) private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory)) - private val allTopicsStats = new BrokerTopicMetrics("AllTopics") + private val allTopicsStats = new BrokerTopicMetrics(None) def getBrokerAllTopicsStats(): BrokerTopicMetrics = allTopicsStats def getBrokerTopicStats(topic: String): BrokerTopicMetrics = { - stats.getAndMaybePut(topic + "-") + stats.getAndMaybePut(topic) } } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 4de812374e8..1bf7d10cef2 100644 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -25,7 +25,6 @@ import kafka.utils._ import java.util.concurrent._ import atomic.{AtomicInteger, AtomicBoolean} import java.io.File -import java.net.BindException import org.I0Itec.zkclient.ZkClient import kafka.controller.{ControllerStats, KafkaController} import kafka.cluster.Broker diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala b/core/src/main/scala/kafka/server/KafkaServerStartable.scala index cd64bbe56c9..1c1b75b4137 100644 --- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala +++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala @@ -17,6 +17,7 @@ package kafka.server +import kafka.common.AppInfo import kafka.utils.Logging @@ -26,6 +27,7 @@ class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging { def startup() { try { server.startup() + AppInfo.registerInfo() } catch { case e: Throwable => diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 3007a6d89b6..f043f042e4c 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -34,7 +34,6 @@ import scala.collection._ import scala.collection.mutable.HashMap import scala.collection.Map import scala.collection.Set -import scala.Some import org.I0Itec.zkclient.ZkClient import com.yammer.metrics.core.Gauge @@ -124,9 +123,9 @@ class ReplicaManager(val config: KafkaConfig, * 1. The partition HW has changed (for acks = -1) * 2. A follower replica's fetch operation is received (for acks > 1) */ - def tryCompleteDelayedProduce(key: DelayedRequestKey) { + def tryCompleteDelayedProduce(key: TopicAndPartition) { val completed = producerRequestPurgatory.checkAndComplete(key) - debug("Request key %s unblocked %d producer requests.".format(key.keyLabel, completed)) + debug("Request key %s unblocked %d producer requests.".format(key, completed)) } /** @@ -136,9 +135,9 @@ class ReplicaManager(val config: KafkaConfig, * 1. The partition HW has changed (for regular fetch) * 2. A new message set is appended to the local log (for follower fetch) */ - def tryCompleteDelayedFetch(key: DelayedRequestKey) { + def tryCompleteDelayedFetch(key: TopicAndPartition) { val completed = fetchRequestPurgatory.checkAndComplete(key) - debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, completed)) + debug("Request key %s unblocked %d fetch requests.".format(key, completed)) } def startup() { @@ -281,7 +280,7 @@ class ReplicaManager(val config: KafkaConfig, val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback) // create a list of (topic, partition) pairs to use as keys for this delayed request - val producerRequestKeys = messagesPerPartition.keys.map(new TopicPartitionRequestKey(_)).toSeq + val producerRequestKeys = messagesPerPartition.keys.toSeq // try to complete the request immediately, otherwise put it into the purgatory // this is because while the delayed request is being created, new requests may @@ -384,7 +383,7 @@ class ReplicaManager(val config: KafkaConfig, val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, responseCallback) // create a list of (topic, partition) pairs to use as keys for this delayed request - val delayedFetchKeys = fetchPartitionStatus.keys.map(new TopicPartitionRequestKey(_)).toSeq + val delayedFetchKeys = fetchPartitionStatus.keys.toSeq // try to complete the request immediately, otherwise put it into the purgatory; // this is because while the delayed request is being created, new requests may @@ -709,7 +708,7 @@ class ReplicaManager(val config: KafkaConfig, // for producer requests with ack > 1, we need to check // if they can be unblocked after some follower's log end offsets have moved - tryCompleteDelayedProduce(new TopicPartitionRequestKey(topicAndPartition)) + tryCompleteDelayedProduce(topicAndPartition) case None => warn("While recording the replica LEO, the partition %s hasn't been created.".format(topicAndPartition)) } diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala index f61c7c701fd..f2dc4ed2f04 100644 --- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala @@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicLong import java.util._ import java.text.SimpleDateFormat import java.math.BigInteger -import scala.collection.immutable.List import org.apache.log4j.Logger diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index 9b0521c85fa..8c4687b2c96 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -26,8 +26,7 @@ import kafka.message._ import kafka.serializer._ import org.I0Itec.zkclient.ZkClient import kafka.utils._ -import kafka.producer.{ProducerConfig, KeyedMessage, Producer} -import java.util.{Collections, Properties} +import java.util.Collections import org.apache.log4j.{Logger, Level} import kafka.utils.TestUtils._ import kafka.common.MessageStreamsExistException @@ -89,8 +88,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar zkConsumerConnector0.shutdown // send some messages to each broker - val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) ++ - sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages) + val sentMessages1 = sendMessagesToPartition(configs, topic, 0, nMessages) ++ + sendMessagesToPartition(configs, topic, 1, nMessages) // wait to make sure the topic and partition have a leader for the successful case waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) @@ -123,8 +122,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true) val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) // send some messages to each broker - val sentMessages2 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) ++ - sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages) + val sentMessages2 = sendMessagesToPartition(configs, topic, 0, nMessages) ++ + sendMessagesToPartition(configs, topic, 1, nMessages) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) @@ -144,8 +143,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true) val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]()) // send some messages to each broker - val sentMessages3 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages) ++ - sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages) + val sentMessages3 = sendMessagesToPartition(configs, topic, 0, nMessages) ++ + sendMessagesToPartition(configs, topic, 1, nMessages) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) @@ -178,8 +177,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar requestHandlerLogger.setLevel(Level.FATAL) // send some messages to each broker - val sentMessages1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) ++ - sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec) + val sentMessages1 = sendMessagesToPartition(configs, topic, 0, nMessages, GZIPCompressionCodec) ++ + sendMessagesToPartition(configs, topic, 1, nMessages, GZIPCompressionCodec) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) @@ -211,8 +210,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true) val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) // send some messages to each broker - val sentMessages2 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) ++ - sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec) + val sentMessages2 = sendMessagesToPartition(configs, topic, 0, nMessages, GZIPCompressionCodec) ++ + sendMessagesToPartition(configs, topic, 1, nMessages, GZIPCompressionCodec) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) @@ -232,8 +231,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true) val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int](), new StringDecoder(), new StringDecoder()) // send some messages to each broker - val sentMessages3 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, GZIPCompressionCodec) ++ - sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec) + val sentMessages3 = sendMessagesToPartition(configs, topic, 0, nMessages, GZIPCompressionCodec) ++ + sendMessagesToPartition(configs, topic, 1, nMessages, GZIPCompressionCodec) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1) @@ -254,8 +253,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar def testCompressionSetConsumption() { // send some messages to each broker - val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, 200, DefaultCompressionCodec) ++ - sendMessagesToBrokerPartition(configs.last, topic, 1, 200, DefaultCompressionCodec) + val sentMessages = sendMessagesToPartition(configs, topic, 0, 200, DefaultCompressionCodec) ++ + sendMessagesToPartition(configs, topic, 1, 200, DefaultCompressionCodec) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1) @@ -280,8 +279,8 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar requestHandlerLogger.setLevel(Level.FATAL) // send some messages to each broker - val sentMessages = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages, NoCompressionCodec) ++ - sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, NoCompressionCodec) + val sentMessages = sendMessagesToPartition(configs, topic, 0, nMessages, NoCompressionCodec) ++ + sendMessagesToPartition(configs, topic, 1, nMessages, NoCompressionCodec) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0) TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1) @@ -321,7 +320,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers) // send some messages to each broker - val sentMessages1 = sendMessages(configs.head, nMessages, "batch1", NoCompressionCodec, 1) + val sentMessages1 = sendMessages(configs, topic, "producer1", nMessages, "batch1", NoCompressionCodec, 1) // create a consumer val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1)) @@ -345,70 +344,6 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar zkClient.close() } - def sendMessagesToBrokerPartition(config: KafkaConfig, - topic: String, - partition: Int, - numMessages: Int, - compression: CompressionCodec = NoCompressionCodec): List[String] = { - val header = "test-%d-%d".format(config.brokerId, partition) - val props = new Properties() - props.put("compression.codec", compression.codec.toString) - val producer: Producer[Int, String] = - createProducer(TestUtils.getBrokerListStrFromConfigs(configs), - encoder = classOf[StringEncoder].getName, - keyEncoder = classOf[IntEncoder].getName, - partitioner = classOf[FixedValuePartitioner].getName, - producerProps = props) - - val ms = 0.until(numMessages).map(x => header + config.brokerId + "-" + partition + "-" + x) - producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*) - debug("Sent %d messages to broker %d for partition [%s,%d]".format(ms.size, config.brokerId, topic, partition)) - producer.close() - ms.toList - } - - def sendMessages(config: KafkaConfig, - messagesPerNode: Int, - header: String, - compression: CompressionCodec, - numParts: Int): List[String]= { - var messages: List[String] = Nil - val props = new Properties() - props.put("compression.codec", compression.codec.toString) - val producer: Producer[Int, String] = - createProducer(brokerList = TestUtils.getBrokerListStrFromConfigs(configs), - encoder = classOf[StringEncoder].getName, - keyEncoder = classOf[IntEncoder].getName, - partitioner = classOf[FixedValuePartitioner].getName, - producerProps = props) - - for (partition <- 0 until numParts) { - val ms = 0.until(messagesPerNode).map(x => header + config.brokerId + "-" + partition + "-" + x) - producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*) - messages ++= ms - debug("Sent %d messages to broker %d for partition [%s,%d]".format(ms.size, config.brokerId, topic, partition)) - } - producer.close() - messages - } - - def getMessages(nMessagesPerThread: Int, - topicMessageStreams: Map[String,List[KafkaStream[String, String]]]): List[String]= { - var messages: List[String] = Nil - for((topic, messageStreams) <- topicMessageStreams) { - for (messageStream <- messageStreams) { - val iterator = messageStream.iterator - for(i <- 0 until nMessagesPerThread) { - assertTrue(iterator.hasNext) - val message = iterator.next.message - messages ::= message - debug("received message: " + message) - } - } - } - messages.reverse - } - def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = { val children = zkClient.getChildren(path) Collections.sort(children) diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala new file mode 100644 index 00000000000..3cf23b3d6d4 --- /dev/null +++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.consumer + +import com.yammer.metrics.Metrics +import junit.framework.Assert._ +import kafka.integration.KafkaServerTestHarness +import kafka.server._ +import scala.collection._ +import org.scalatest.junit.JUnit3Suite +import kafka.message._ +import kafka.serializer._ +import kafka.utils._ +import kafka.utils.TestUtils._ + +class MetricsTest extends JUnit3Suite with KafkaServerTestHarness with Logging { + val zookeeperConnect = TestZKUtils.zookeeperConnect + val numNodes = 2 + val numParts = 2 + val topic = "topic1" + val configs = + for (props <- TestUtils.createBrokerConfigs(numNodes)) + yield new KafkaConfig(props) { + override val zkConnect = zookeeperConnect + override val numPartitions = numParts + } + val nMessages = 2 + + override def tearDown() { + super.tearDown() + } + + def testMetricsLeak() { + // create topic topic1 with 1 partition on broker 0 + createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = servers) + // force creation not client's specific metrics. + createAndShutdownStep("group0", "consumer0", "producer0") + + val countOfStaticMetrics = Metrics.defaultRegistry().allMetrics().keySet().size + + for (i <- 0 to 5) { + createAndShutdownStep("group" + i % 3, "consumer" + i % 2, "producer" + i % 2) + assertEquals(countOfStaticMetrics, Metrics.defaultRegistry().allMetrics().keySet().size) + } + } + + def createAndShutdownStep(group: String, consumerId: String, producerId: String): Unit = { + val sentMessages1 = sendMessages(configs, topic, producerId, nMessages, "batch1", NoCompressionCodec, 1) + // create a consumer + val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumerId)) + val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) + val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) + val receivedMessages1 = getMessages(nMessages, topicMessageStreams1) + + zkConsumerConnector1.shutdown() + } +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index dd3640f47b2..0da774d0ed0 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -26,7 +26,6 @@ import java.util.Properties import org.apache.kafka.common.utils.Utils._ -import collection.mutable.Map import collection.mutable.ListBuffer import org.I0Itec.zkclient.ZkClient @@ -36,7 +35,7 @@ import kafka.producer._ import kafka.message._ import kafka.api._ import kafka.cluster.Broker -import kafka.consumer.ConsumerConfig +import kafka.consumer.{KafkaStream, ConsumerConfig} import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder} import kafka.common.TopicAndPartition import kafka.admin.AdminUtils @@ -47,6 +46,8 @@ import junit.framework.AssertionFailedError import junit.framework.Assert._ import org.apache.kafka.clients.producer.KafkaProducer +import scala.collection.Map + /** * Utility functions to help with testing */ @@ -483,7 +484,7 @@ object TestUtils extends Logging { val data = topics.flatMap(topic => partitions.map(partition => (TopicAndPartition(topic, partition), message)) ) - new ProducerRequest(correlationId, clientId, acks.toShort, timeout, Map(data:_*)) + new ProducerRequest(correlationId, clientId, acks.toShort, timeout, collection.mutable.Map(data:_*)) } def makeLeaderForPartition(zkClient: ZkClient, topic: String, @@ -720,6 +721,73 @@ object TestUtils extends Logging { time = time, brokerState = new BrokerState()) } + + def sendMessagesToPartition(configs: Seq[KafkaConfig], + topic: String, + partition: Int, + numMessages: Int, + compression: CompressionCodec = NoCompressionCodec): List[String] = { + val header = "test-%d".format(partition) + val props = new Properties() + props.put("compression.codec", compression.codec.toString) + val producer: Producer[Int, String] = + createProducer(TestUtils.getBrokerListStrFromConfigs(configs), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[IntEncoder].getName, + partitioner = classOf[FixedValuePartitioner].getName, + producerProps = props) + + val ms = 0.until(numMessages).map(x => header + "-" + x) + producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*) + debug("Sent %d messages for partition [%s,%d]".format(ms.size, topic, partition)) + producer.close() + ms.toList + } + + def sendMessages(configs: Seq[KafkaConfig], + topic: String, + producerId: String, + messagesPerNode: Int, + header: String, + compression: CompressionCodec, + numParts: Int): List[String]= { + var messages: List[String] = Nil + val props = new Properties() + props.put("compression.codec", compression.codec.toString) + props.put("client.id", producerId) + val producer: Producer[Int, String] = + createProducer(brokerList = TestUtils.getBrokerListStrFromConfigs(configs), + encoder = classOf[StringEncoder].getName, + keyEncoder = classOf[IntEncoder].getName, + partitioner = classOf[FixedValuePartitioner].getName, + producerProps = props) + + for (partition <- 0 until numParts) { + val ms = 0.until(messagesPerNode).map(x => header + "-" + partition + "-" + x) + producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*) + messages ++= ms + debug("Sent %d messages for partition [%s,%d]".format(ms.size, topic, partition)) + } + producer.close() + messages + } + + def getMessages(nMessagesPerThread: Int, + topicMessageStreams: Map[String, List[KafkaStream[String, String]]]): List[String] = { + var messages: List[String] = Nil + for ((topic, messageStreams) <- topicMessageStreams) { + for (messageStream <- messageStreams) { + val iterator = messageStream.iterator + for (i <- 0 until nMessagesPerThread) { + assertTrue(iterator.hasNext) + val message = iterator.next.message + messages ::= message + debug("received message: " + message) + } + } + } + messages.reverse + } } object TestZKUtils {