From 16b600d4fd972be697af95398f22f23aa8ad2608 Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Tue, 19 Jun 2012 00:27:00 +0000 Subject: [PATCH] add jmx beans in broker to track # bytes in consumer; patched by Jun Rao; reviewed by Joel Koshy; KAFKA-336 git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1351544 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/scala/kafka/consumer/ConsumerIterator.scala | 1 + .../main/scala/kafka/consumer/ConsumerTopicStat.scala | 10 ++++++++++ .../main/scala/kafka/consumer/PartitionTopicInfo.scala | 2 ++ 3 files changed, 13 insertions(+) diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala index 9c6828c25d7..73e27943ca9 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala @@ -47,6 +47,7 @@ class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk], val topic = currentTopicInfo.topic trace("Setting %s consumed offset to %d".format(topic, consumedOffset)) ConsumerTopicStat.getConsumerTopicStat(topic).recordMessagesPerTopic(1) + ConsumerTopicStat.getConsumerAllTopicStat().recordMessagesPerTopic(1) item } diff --git a/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala b/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala index a001a5e83ad..3a9de2a9915 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala @@ -22,19 +22,29 @@ import kafka.utils.{Pool, Utils, threadsafe, Logging} trait ConsumerTopicStatMBean { def getMessagesPerTopic: Long + def getBytesPerTopic: Long } @threadsafe class ConsumerTopicStat extends ConsumerTopicStatMBean { private val numCumulatedMessagesPerTopic = new AtomicLong(0) + private val numCumulatedBytesPerTopic = new AtomicLong(0) def getMessagesPerTopic: Long = numCumulatedMessagesPerTopic.get def recordMessagesPerTopic(nMessages: Int) = numCumulatedMessagesPerTopic.getAndAdd(nMessages) + + def getBytesPerTopic: Long = numCumulatedBytesPerTopic.get + + def recordBytesPerTopic(nBytes: Long) = numCumulatedBytesPerTopic.getAndAdd(nBytes) } object ConsumerTopicStat extends Logging { private val stats = new Pool[String, ConsumerTopicStat] + private val allTopicStat = new ConsumerTopicStat + Utils.registerMBean(allTopicStat, "kafka:type=kafka.ConsumerAllTopicStat") + + def getConsumerAllTopicStat(): ConsumerTopicStat = allTopicStat def getConsumerTopicStat(topic: String): ConsumerTopicStat = { var stat = stats.get(topic) diff --git a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala index f64f2b0ea72..2a4caa72793 100644 --- a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala +++ b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala @@ -61,6 +61,8 @@ private[consumer] class PartitionTopicInfo(val topic: String, chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset)) val newOffset = fetchedOffset.addAndGet(size) debug("updated fetch offset of ( %s ) to %d".format(this, newOffset)) + ConsumerTopicStat.getConsumerTopicStat(topic).recordBytesPerTopic(size) + ConsumerTopicStat.getConsumerAllTopicStat().recordBytesPerTopic(size) } size }