diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 157302b4e3e..93d9b38a760 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -1326,6 +1326,8 @@ object ConnectionQuotas { private val ConnectionRateSensorName = "Connection-Accept-Rate" private val ConnectionRateMetricName = "connection-accept-rate" private val IpMetricTag = "ip" + private val ListenerThrottlePrefix = "" + private val IpThrottlePrefix = "ip-" private case class ListenerQuotaEntity(listenerName: String) extends ConnectionQuotaEntity { override def sensorName: String = s"$ConnectionRateSensorName-$listenerName" @@ -1573,7 +1575,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend val throttleTimeMs = math.max(minThrottleTimeMs, listenerThrottleTimeMs) // record throttle time due to hitting connection rate quota if (throttleTimeMs > 0) { - listenerQuota.connectionRateThrottleSensor.record(throttleTimeMs.toDouble, timeMs) + listenerQuota.listenerConnectionRateThrottleSensor.record(throttleTimeMs.toDouble, timeMs) } throttleTimeMs } @@ -1589,19 +1591,23 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend } /** - * To avoid over-recording listener/broker connection rate, we un-record a listener and broker connection - * if the IP gets throttled. + * Record IP throttle time on the corresponding listener. To avoid over-recording listener/broker connection rate, we + * also un-record the listener and broker connection if the IP gets throttled. * * @param listenerName listener to un-record connection + * @param throttleMs IP throttle time to record for listener * @param timeMs current time in milliseconds */ - private def unrecordListenerConnection(listenerName: ListenerName, timeMs: Long): Unit = { + private def updateListenerMetrics(listenerName: ListenerName, throttleMs: Long, timeMs: Long): Unit = { if (!protectedListener(listenerName)) { brokerConnectionRateSensor.record(-1.0, timeMs, false) } maxConnectionsPerListener .get(listenerName) - .foreach(_.connectionRateSensor.record(-1.0, timeMs, false)) + .foreach { listenerQuota => + listenerQuota.ipConnectionRateThrottleSensor.record(throttleMs.toDouble, timeMs) + listenerQuota.connectionRateSensor.record(-1.0, timeMs, false) + } } /** @@ -1625,7 +1631,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend trace(s"Throttling $address for $throttleMs ms") // unrecord the connection since we won't accept the connection sensor.record(-1.0, timeMs, false) - unrecordListenerConnection(listenerName, timeMs) + updateListenerMetrics(listenerName, throttleMs, timeMs) throw new ConnectionThrottledException(address, timeMs, throttleMs) } } @@ -1701,7 +1707,8 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend class ListenerConnectionQuota(lock: Object, listener: ListenerName) extends ListenerReconfigurable with AutoCloseable { @volatile private var _maxConnections = Int.MaxValue private[network] val connectionRateSensor = getOrCreateConnectionRateQuotaSensor(Int.MaxValue, ListenerQuotaEntity(listener.value)) - private[network] val connectionRateThrottleSensor = createConnectionRateThrottleSensor() + private[network] val listenerConnectionRateThrottleSensor = createConnectionRateThrottleSensor(ListenerThrottlePrefix) + private[network] val ipConnectionRateThrottleSensor = createConnectionRateThrottleSensor(IpThrottlePrefix) def maxConnections: Int = _maxConnections @@ -1736,7 +1743,8 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend def close(): Unit = { metrics.removeSensor(connectionRateSensor.name) - metrics.removeSensor(connectionRateThrottleSensor.name) + metrics.removeSensor(listenerConnectionRateThrottleSensor.name) + metrics.removeSensor(ipConnectionRateThrottleSensor.name) } private def maxConnections(configs: util.Map[String, _]): Int = { @@ -1748,13 +1756,13 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend } /** - * Creates sensor for tracking the average throttle time on this listener due to hitting connection rate quota. - * The average is out of all throttle times > 0, which is consistent with the bandwidth and request quota throttle - * time metrics. + * Creates sensor for tracking the average throttle time on this listener due to hitting broker/listener connection + * rate or IP connection rate quota. The average is out of all throttle times > 0, which is consistent with the + * bandwidth and request quota throttle time metrics. */ - private def createConnectionRateThrottleSensor(): Sensor = { - val sensor = metrics.sensor(s"ConnectionRateThrottleTime-${listener.value}") - val metricName = metrics.metricName("connection-accept-throttle-time", + private def createConnectionRateThrottleSensor(throttlePrefix: String): Sensor = { + val sensor = metrics.sensor(s"${throttlePrefix}ConnectionRateThrottleTime-${listener.value}") + val metricName = metrics.metricName(s"${throttlePrefix}connection-accept-throttle-time", MetricsGroup, "Tracking average throttle-time, out of non-zero throttle times, per listener", Map(ListenerMetricTag -> listener.value).asJava) diff --git a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala index 257c8752c11..07283db0761 100644 --- a/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala @@ -451,6 +451,8 @@ class ConnectionQuotasTest { // acceptor shouldn't block for IP rate throttling verifyNoBlockedPercentRecordedOnAllListeners() + // no IP throttle time should be recorded on any listeners + listeners.values.map(_.listenerName).foreach(verifyIpThrottleTimeOnListener(_, expectThrottle = false)) } @Test @@ -469,15 +471,21 @@ class ConnectionQuotasTest { val numConnections = 80 acceptConnectionsAndVerifyRate(connectionQuotas, externalListener, numConnections, connCreateIntervalMs, ipConnectionRateLimit, 1, expectIpThrottle = true) + verifyIpThrottleTimeOnListener(externalListener.listenerName, expectThrottle = true) // verify that default quota applies to IPs without a quota override connectionQuotas.updateIpConnectionRateQuota(None, Some(ipConnectionRateLimit)) val adminListener = listeners("ADMIN") + // listener shouldn't have any IP throttle time recorded + verifyIpThrottleTimeOnListener(adminListener.listenerName, expectThrottle = false) acceptConnectionsAndVerifyRate(connectionQuotas, adminListener, numConnections, connCreateIntervalMs, ipConnectionRateLimit, 1, expectIpThrottle = true) + verifyIpThrottleTimeOnListener(adminListener.listenerName, expectThrottle = true) // acceptor shouldn't block for IP rate throttling verifyNoBlockedPercentRecordedOnAllListeners() + // replication listener shouldn't have any IP throttling recorded + verifyIpThrottleTimeOnListener(listeners("REPLICATION").listenerName, expectThrottle = false) } @Test @@ -757,8 +765,10 @@ class ConnectionQuotasTest { 0, metricValue(listenerConnRateMetric(listenerName.value)), eps) assertNotNull(s"Expected connection-accept-throttle-time metric to exist for listener ${listenerName.value}", listenerConnThrottleMetric(listenerName.value)) - assertEquals(s"Connection throttle metric for listener ${listenerName.value}", + assertEquals(s"Listener connection throttle metric for listener ${listenerName.value}", 0, metricValue(listenerConnThrottleMetric(listenerName.value)).toLong) + assertEquals(s"Ip connection throttle metric for listener ${listenerName.value}", + 0, metricValue(ipConnThrottleMetric(listenerName.value)).toLong) } verifyNoBlockedPercentRecordedOnAllListeners() assertEquals("Broker-wide connection acceptance rate metric", 0, metricValue(brokerConnRateMetric()), eps) @@ -780,6 +790,11 @@ class ConnectionQuotasTest { } } + private def verifyIpThrottleTimeOnListener(listener: ListenerName, expectThrottle: Boolean): Unit = { + assertEquals(s"IP connection throttle recorded for listener ${listener.value}", expectThrottle, + metricValue(ipConnThrottleMetric(listener.value)).toLong > 0) + } + private def verifyOnlyNonInterBrokerListenersBlockedPercentRecorded(): Unit = { blockedPercentMeters.foreach { case (name, meter) => name match { @@ -806,6 +821,14 @@ class ConnectionQuotasTest { metrics.metric(metricName) } + private def ipConnThrottleMetric(listener: String): KafkaMetric = { + val metricName = metrics.metricName( + "ip-connection-accept-throttle-time", + SocketServer.MetricsGroup, + Collections.singletonMap(Processor.ListenerMetricTag, listener)) + metrics.metric(metricName) + } + private def listenerConnRateMetric(listener: String) : KafkaMetric = { val metricName = metrics.metricName( "connection-accept-rate",