KAFKA-6514; Add API version as a tag for the RequestsPerSec metric (#4506)

Updated `RequestChannel` to include `version` as a tag for all RequestsPerSec metrics (KIP-272). Updated tests to verify that the extra tag exists.
This commit is contained in:
Allen Wang 2018-04-16 10:16:26 -07:00 committed by Jason Gustafson
parent f3ed56b21f
commit 19418fc86a
3 changed files with 21 additions and 6 deletions

View File

@ -158,7 +158,7 @@ object RequestChannel extends Logging {
val metricNames = fetchMetricNames :+ header.apiKey.name
metricNames.foreach { metricName =>
val m = metrics(metricName)
m.requestRate.mark()
m.requestRate(header.apiVersion).mark()
m.requestQueueTimeHist.update(Math.round(requestQueueTimeMs))
m.localTimeHist.update(Math.round(apiLocalTimeMs))
m.remoteTimeHist.update(Math.round(apiRemoteTimeMs))
@ -350,10 +350,11 @@ object RequestMetrics {
}
class RequestMetrics(name: String) extends KafkaMetricsGroup {
import RequestMetrics._
val tags = Map("request" -> name)
val requestRate = newMeter(RequestsPerSec, "requests", TimeUnit.SECONDS, tags)
val requestRateInternal = new mutable.HashMap[Short, Meter]
// time a request spent in a request queue
val requestQueueTimeHist = newHistogram(RequestQueueTimeMs, biased = true, tags)
// time a request takes to be processed at the local broker
@ -386,6 +387,10 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup {
private val errorMeters = mutable.Map[Errors, ErrorMeter]()
Errors.values.foreach(error => errorMeters.put(error, new ErrorMeter(name, error)))
def requestRate(version: Short): Meter = {
requestRateInternal.getOrElseUpdate(version, newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags + ("version" -> version.toString)))
}
class ErrorMeter(name: String, error: Errors) {
private val tags = Map("request" -> name, "error" -> error.name)
@ -418,7 +423,7 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup {
}
def removeMetrics(): Unit = {
removeMetric(RequestsPerSec, tags)
for (version <- requestRateInternal.keySet) removeMetric(RequestsPerSec, tags + ("version" -> version.toString))
removeMetric(RequestQueueTimeMs, tags)
removeMetric(LocalTimeMs, tags)
removeMetric(RemoteTimeMs, tags)

View File

@ -688,10 +688,14 @@ class SocketServerTest extends JUnitSuite {
@Test
def testRequestMetricsAfterStop(): Unit = {
server.stopProcessingRequests()
server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate.mark()
val version = ApiKeys.PRODUCE.latestVersion
val version2 = (version - 1).toShort
for (_ <- 0 to 1) server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).mark()
server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version2).mark()
assertEquals(2, server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).count())
server.requestChannel.updateErrorMetrics(ApiKeys.PRODUCE, Map(Errors.NONE -> 1))
val nonZeroMeters = Map("kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce" -> 1,
val nonZeroMeters = Map(s"kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce,version=$version" -> 2,
s"kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce,version=$version2" -> 1,
"kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=Produce,error=NONE" -> 1)
def requestMetricMeters = YammerMetrics

View File

@ -68,6 +68,12 @@
<ul>
<li><a href="https://cwiki.apache.org/confluence/x/oYtjB">KIP-186</a> increases the default offset retention time from 1 day to 7 days. This makes it less likely to "lose" offsets in an application that commits infrequently. It also increases the active set of offsets and therefore can increase memory usage on the broker. Note that the console consumer currently enables offset commit by default and can be the source of a large number of offsets which this change will now preserve for 7 days instead of 1. You can preserve the existing behavior by setting the broker config <code>offsets.retention.minutes</code> to 1440.</li>
<li><a href="https://issues.apache.org/jira/browse/KAFKA-5674">KAFKA-5674</a> extends the lower interval of <code>max.connections.per.ip minimum</code> to zero and therefore allows IP-based filtering of inbound connections.</li>
<li><a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-272%3A+Add+API+version+tag+to+broker%27s+RequestsPerSec+metric">KIP-272</a>
added API version tag to the metric <code>kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower|...}</code>.
This metric now becomes <code>kafka.network:type=RequestMetrics,name=RequestsPerSec,request={Produce|FetchConsumer|FetchFollower|...},version={0|1|2|3|...}</code>. This will impact
JMX monitoring tools that do not automatically aggregate. To get the total count for a specific request type, the tool needs to be
updated to aggregate across different versions.
</li>
<li> New Kafka Streams configuration parameter <code>upgrade.from</code> added that allows rolling bounce upgrade from older version. </li>
</ul>