diff --git a/core/src/main/scala/kafka/message/FileMessageSet.scala b/core/src/main/scala/kafka/message/FileMessageSet.scala index 69b1dfa8b8a..2025734582b 100644 --- a/core/src/main/scala/kafka/message/FileMessageSet.scala +++ b/core/src/main/scala/kafka/message/FileMessageSet.scala @@ -257,6 +257,7 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel, trait LogFlushStatsMBean { def getFlushesPerSecond: Double def getAvgFlushMs: Double + def getTotalFlushMs: Long def getMaxFlushMs: Double def getNumFlushes: Long } @@ -271,6 +272,8 @@ class LogFlushStats extends LogFlushStatsMBean { def getAvgFlushMs: Double = flushRequestStats.getAvgMetric + def getTotalFlushMs: Long = flushRequestStats.getTotalMetric + def getMaxFlushMs: Double = flushRequestStats.getMaxMetric def getNumFlushes: Long = flushRequestStats.getNumRequests diff --git a/core/src/main/scala/kafka/network/SocketServerStats.scala b/core/src/main/scala/kafka/network/SocketServerStats.scala index caa440d4515..0d6778e9cf0 100644 --- a/core/src/main/scala/kafka/network/SocketServerStats.scala +++ b/core/src/main/scala/kafka/network/SocketServerStats.scala @@ -33,6 +33,10 @@ trait SocketServerStatsMBean { def getBytesWrittenPerSecond: Double def getNumFetchRequests: Long def getNumProduceRequests: Long + def getTotalBytesRead: Long + def getTotalBytesWritten: Long + def getTotalFetchRequestMs: Long + def getTotalProduceRequestMs: Long } @threadsafe @@ -77,4 +81,12 @@ class SocketServerStats(val monitorDurationNs: Long, val time: Time) extends Soc def getNumFetchRequests: Long = fetchTimeStats.getNumRequests def getNumProduceRequests: Long = produceTimeStats.getNumRequests + + def getTotalBytesRead: Long = produceBytesStats.getTotalMetric + + def getTotalBytesWritten: Long = fetchBytesStats.getTotalMetric + + def getTotalFetchRequestMs: Long = fetchTimeStats.getTotalMetric + + def getTotalProduceRequestMs: Long = produceTimeStats.getTotalMetric } diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 0baf52449bb..3f8dce4487e 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -600,11 +600,13 @@ class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * private val complete = new AtomicReference(new Stats()) private val current = new AtomicReference(new Stats()) + private val total = new AtomicLong(0) private val numCumulatedRequests = new AtomicLong(0) def recordRequestMetric(requestNs: Long) { val stats = current.get stats.add(requestNs) + total.getAndAdd(requestNs) numCumulatedRequests.getAndAdd(1) val ageNs = time.nanoseconds - stats.start // if the current stats are too old it is time to swap @@ -653,6 +655,8 @@ class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * } } + def getTotalMetric: Long = total.get + def getMaxMetric: Double = complete.get.maxRequestMetric class Stats {