mirror of https://github.com/apache/kafka.git
Expose total metrics through MBeansp; patched by Pierre-Yves Ritschard; reviewed by Jun Rao; KAFKA-140
git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1177798 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
da35513245
commit
efc7a8a0cd
|
@ -257,6 +257,7 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel,
|
|||
trait LogFlushStatsMBean {
|
||||
def getFlushesPerSecond: Double
|
||||
def getAvgFlushMs: Double
|
||||
def getTotalFlushMs: Long
|
||||
def getMaxFlushMs: Double
|
||||
def getNumFlushes: Long
|
||||
}
|
||||
|
@ -271,6 +272,8 @@ class LogFlushStats extends LogFlushStatsMBean {
|
|||
|
||||
def getAvgFlushMs: Double = flushRequestStats.getAvgMetric
|
||||
|
||||
def getTotalFlushMs: Long = flushRequestStats.getTotalMetric
|
||||
|
||||
def getMaxFlushMs: Double = flushRequestStats.getMaxMetric
|
||||
|
||||
def getNumFlushes: Long = flushRequestStats.getNumRequests
|
||||
|
|
|
@ -33,6 +33,10 @@ trait SocketServerStatsMBean {
|
|||
def getBytesWrittenPerSecond: Double
|
||||
def getNumFetchRequests: Long
|
||||
def getNumProduceRequests: Long
|
||||
def getTotalBytesRead: Long
|
||||
def getTotalBytesWritten: Long
|
||||
def getTotalFetchRequestMs: Long
|
||||
def getTotalProduceRequestMs: Long
|
||||
}
|
||||
|
||||
@threadsafe
|
||||
|
@ -77,4 +81,12 @@ class SocketServerStats(val monitorDurationNs: Long, val time: Time) extends Soc
|
|||
def getNumFetchRequests: Long = fetchTimeStats.getNumRequests
|
||||
|
||||
def getNumProduceRequests: Long = produceTimeStats.getNumRequests
|
||||
|
||||
def getTotalBytesRead: Long = produceBytesStats.getTotalMetric
|
||||
|
||||
def getTotalBytesWritten: Long = fetchBytesStats.getTotalMetric
|
||||
|
||||
def getTotalFetchRequestMs: Long = fetchTimeStats.getTotalMetric
|
||||
|
||||
def getTotalProduceRequestMs: Long = produceTimeStats.getTotalMetric
|
||||
}
|
||||
|
|
|
@ -600,11 +600,13 @@ class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L *
|
|||
|
||||
private val complete = new AtomicReference(new Stats())
|
||||
private val current = new AtomicReference(new Stats())
|
||||
private val total = new AtomicLong(0)
|
||||
private val numCumulatedRequests = new AtomicLong(0)
|
||||
|
||||
def recordRequestMetric(requestNs: Long) {
|
||||
val stats = current.get
|
||||
stats.add(requestNs)
|
||||
total.getAndAdd(requestNs)
|
||||
numCumulatedRequests.getAndAdd(1)
|
||||
val ageNs = time.nanoseconds - stats.start
|
||||
// if the current stats are too old it is time to swap
|
||||
|
@ -653,6 +655,8 @@ class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L *
|
|||
}
|
||||
}
|
||||
|
||||
def getTotalMetric: Long = total.get
|
||||
|
||||
def getMaxMetric: Double = complete.get.maxRequestMetric
|
||||
|
||||
class Stats {
|
||||
|
|
Loading…
Reference in New Issue