add jmx beans in broker to track # bytes in consumer; patched by Jun Rao; reviewed by Joel Koshy; KAFKA-336

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1351544 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jun Rao 2012-06-19 00:27:00 +00:00
parent f8ce071f3f
commit 16b600d4fd
3 changed files with 13 additions and 0 deletions

View File

@ -47,6 +47,7 @@ class ConsumerIterator[T](private val channel: BlockingQueue[FetchedDataChunk],
val topic = currentTopicInfo.topic val topic = currentTopicInfo.topic
trace("Setting %s consumed offset to %d".format(topic, consumedOffset)) trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
ConsumerTopicStat.getConsumerTopicStat(topic).recordMessagesPerTopic(1) ConsumerTopicStat.getConsumerTopicStat(topic).recordMessagesPerTopic(1)
ConsumerTopicStat.getConsumerAllTopicStat().recordMessagesPerTopic(1)
item item
} }

View File

@ -22,19 +22,29 @@ import kafka.utils.{Pool, Utils, threadsafe, Logging}
trait ConsumerTopicStatMBean { trait ConsumerTopicStatMBean {
def getMessagesPerTopic: Long def getMessagesPerTopic: Long
def getBytesPerTopic: Long
} }
@threadsafe @threadsafe
class ConsumerTopicStat extends ConsumerTopicStatMBean { class ConsumerTopicStat extends ConsumerTopicStatMBean {
private val numCumulatedMessagesPerTopic = new AtomicLong(0) private val numCumulatedMessagesPerTopic = new AtomicLong(0)
private val numCumulatedBytesPerTopic = new AtomicLong(0)
def getMessagesPerTopic: Long = numCumulatedMessagesPerTopic.get def getMessagesPerTopic: Long = numCumulatedMessagesPerTopic.get
def recordMessagesPerTopic(nMessages: Int) = numCumulatedMessagesPerTopic.getAndAdd(nMessages) def recordMessagesPerTopic(nMessages: Int) = numCumulatedMessagesPerTopic.getAndAdd(nMessages)
def getBytesPerTopic: Long = numCumulatedBytesPerTopic.get
def recordBytesPerTopic(nBytes: Long) = numCumulatedBytesPerTopic.getAndAdd(nBytes)
} }
object ConsumerTopicStat extends Logging { object ConsumerTopicStat extends Logging {
private val stats = new Pool[String, ConsumerTopicStat] private val stats = new Pool[String, ConsumerTopicStat]
private val allTopicStat = new ConsumerTopicStat
Utils.registerMBean(allTopicStat, "kafka:type=kafka.ConsumerAllTopicStat")
def getConsumerAllTopicStat(): ConsumerTopicStat = allTopicStat
def getConsumerTopicStat(topic: String): ConsumerTopicStat = { def getConsumerTopicStat(topic: String): ConsumerTopicStat = {
var stat = stats.get(topic) var stat = stats.get(topic)

View File

@ -61,6 +61,8 @@ private[consumer] class PartitionTopicInfo(val topic: String,
chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset)) chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
val newOffset = fetchedOffset.addAndGet(size) val newOffset = fetchedOffset.addAndGet(size)
debug("updated fetch offset of ( %s ) to %d".format(this, newOffset)) debug("updated fetch offset of ( %s ) to %d".format(this, newOffset))
ConsumerTopicStat.getConsumerTopicStat(topic).recordBytesPerTopic(size)
ConsumerTopicStat.getConsumerAllTopicStat().recordBytesPerTopic(size)
} }
size size
} }