mirror of https://github.com/apache/kafka.git
KAFKA-10748: Add IP connection rate throttling metric (KIP-612) (#9685)
This PR adds the IP throttling metric as described in KIP-612. Reviewers: Anna Povzner <anna@confluent.io>, David Jacot <djacot@confluent.io>
This commit is contained in:
parent
404062d2b6
commit
b44d32dffe
|
|
@ -1326,6 +1326,8 @@ object ConnectionQuotas {
|
|||
private val ConnectionRateSensorName = "Connection-Accept-Rate"
|
||||
private val ConnectionRateMetricName = "connection-accept-rate"
|
||||
private val IpMetricTag = "ip"
|
||||
private val ListenerThrottlePrefix = ""
|
||||
private val IpThrottlePrefix = "ip-"
|
||||
|
||||
private case class ListenerQuotaEntity(listenerName: String) extends ConnectionQuotaEntity {
|
||||
override def sensorName: String = s"$ConnectionRateSensorName-$listenerName"
|
||||
|
|
@ -1573,7 +1575,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
|
|||
val throttleTimeMs = math.max(minThrottleTimeMs, listenerThrottleTimeMs)
|
||||
// record throttle time due to hitting connection rate quota
|
||||
if (throttleTimeMs > 0) {
|
||||
listenerQuota.connectionRateThrottleSensor.record(throttleTimeMs.toDouble, timeMs)
|
||||
listenerQuota.listenerConnectionRateThrottleSensor.record(throttleTimeMs.toDouble, timeMs)
|
||||
}
|
||||
throttleTimeMs
|
||||
}
|
||||
|
|
@ -1589,19 +1591,23 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
|
|||
}
|
||||
|
||||
/**
|
||||
* To avoid over-recording listener/broker connection rate, we un-record a listener and broker connection
|
||||
* if the IP gets throttled.
|
||||
* Record IP throttle time on the corresponding listener. To avoid over-recording listener/broker connection rate, we
|
||||
* also un-record the listener and broker connection if the IP gets throttled.
|
||||
*
|
||||
* @param listenerName listener to un-record connection
|
||||
* @param throttleMs IP throttle time to record for listener
|
||||
* @param timeMs current time in milliseconds
|
||||
*/
|
||||
private def unrecordListenerConnection(listenerName: ListenerName, timeMs: Long): Unit = {
|
||||
private def updateListenerMetrics(listenerName: ListenerName, throttleMs: Long, timeMs: Long): Unit = {
|
||||
if (!protectedListener(listenerName)) {
|
||||
brokerConnectionRateSensor.record(-1.0, timeMs, false)
|
||||
}
|
||||
maxConnectionsPerListener
|
||||
.get(listenerName)
|
||||
.foreach(_.connectionRateSensor.record(-1.0, timeMs, false))
|
||||
.foreach { listenerQuota =>
|
||||
listenerQuota.ipConnectionRateThrottleSensor.record(throttleMs.toDouble, timeMs)
|
||||
listenerQuota.connectionRateSensor.record(-1.0, timeMs, false)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -1625,7 +1631,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
|
|||
trace(s"Throttling $address for $throttleMs ms")
|
||||
// unrecord the connection since we won't accept the connection
|
||||
sensor.record(-1.0, timeMs, false)
|
||||
unrecordListenerConnection(listenerName, timeMs)
|
||||
updateListenerMetrics(listenerName, throttleMs, timeMs)
|
||||
throw new ConnectionThrottledException(address, timeMs, throttleMs)
|
||||
}
|
||||
}
|
||||
|
|
@ -1701,7 +1707,8 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
|
|||
class ListenerConnectionQuota(lock: Object, listener: ListenerName) extends ListenerReconfigurable with AutoCloseable {
|
||||
@volatile private var _maxConnections = Int.MaxValue
|
||||
private[network] val connectionRateSensor = getOrCreateConnectionRateQuotaSensor(Int.MaxValue, ListenerQuotaEntity(listener.value))
|
||||
private[network] val connectionRateThrottleSensor = createConnectionRateThrottleSensor()
|
||||
private[network] val listenerConnectionRateThrottleSensor = createConnectionRateThrottleSensor(ListenerThrottlePrefix)
|
||||
private[network] val ipConnectionRateThrottleSensor = createConnectionRateThrottleSensor(IpThrottlePrefix)
|
||||
|
||||
def maxConnections: Int = _maxConnections
|
||||
|
||||
|
|
@ -1736,7 +1743,8 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
|
|||
|
||||
def close(): Unit = {
|
||||
metrics.removeSensor(connectionRateSensor.name)
|
||||
metrics.removeSensor(connectionRateThrottleSensor.name)
|
||||
metrics.removeSensor(listenerConnectionRateThrottleSensor.name)
|
||||
metrics.removeSensor(ipConnectionRateThrottleSensor.name)
|
||||
}
|
||||
|
||||
private def maxConnections(configs: util.Map[String, _]): Int = {
|
||||
|
|
@ -1748,13 +1756,13 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
|
|||
}
|
||||
|
||||
/**
|
||||
* Creates sensor for tracking the average throttle time on this listener due to hitting connection rate quota.
|
||||
* The average is out of all throttle times > 0, which is consistent with the bandwidth and request quota throttle
|
||||
* time metrics.
|
||||
* Creates sensor for tracking the average throttle time on this listener due to hitting broker/listener connection
|
||||
* rate or IP connection rate quota. The average is out of all throttle times > 0, which is consistent with the
|
||||
* bandwidth and request quota throttle time metrics.
|
||||
*/
|
||||
private def createConnectionRateThrottleSensor(): Sensor = {
|
||||
val sensor = metrics.sensor(s"ConnectionRateThrottleTime-${listener.value}")
|
||||
val metricName = metrics.metricName("connection-accept-throttle-time",
|
||||
private def createConnectionRateThrottleSensor(throttlePrefix: String): Sensor = {
|
||||
val sensor = metrics.sensor(s"${throttlePrefix}ConnectionRateThrottleTime-${listener.value}")
|
||||
val metricName = metrics.metricName(s"${throttlePrefix}connection-accept-throttle-time",
|
||||
MetricsGroup,
|
||||
"Tracking average throttle-time, out of non-zero throttle times, per listener",
|
||||
Map(ListenerMetricTag -> listener.value).asJava)
|
||||
|
|
|
|||
|
|
@ -451,6 +451,8 @@ class ConnectionQuotasTest {
|
|||
|
||||
// acceptor shouldn't block for IP rate throttling
|
||||
verifyNoBlockedPercentRecordedOnAllListeners()
|
||||
// no IP throttle time should be recorded on any listeners
|
||||
listeners.values.map(_.listenerName).foreach(verifyIpThrottleTimeOnListener(_, expectThrottle = false))
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -469,15 +471,21 @@ class ConnectionQuotasTest {
|
|||
val numConnections = 80
|
||||
acceptConnectionsAndVerifyRate(connectionQuotas, externalListener, numConnections, connCreateIntervalMs, ipConnectionRateLimit,
|
||||
1, expectIpThrottle = true)
|
||||
verifyIpThrottleTimeOnListener(externalListener.listenerName, expectThrottle = true)
|
||||
|
||||
// verify that default quota applies to IPs without a quota override
|
||||
connectionQuotas.updateIpConnectionRateQuota(None, Some(ipConnectionRateLimit))
|
||||
val adminListener = listeners("ADMIN")
|
||||
// listener shouldn't have any IP throttle time recorded
|
||||
verifyIpThrottleTimeOnListener(adminListener.listenerName, expectThrottle = false)
|
||||
acceptConnectionsAndVerifyRate(connectionQuotas, adminListener, numConnections, connCreateIntervalMs, ipConnectionRateLimit,
|
||||
1, expectIpThrottle = true)
|
||||
verifyIpThrottleTimeOnListener(adminListener.listenerName, expectThrottle = true)
|
||||
|
||||
// acceptor shouldn't block for IP rate throttling
|
||||
verifyNoBlockedPercentRecordedOnAllListeners()
|
||||
// replication listener shouldn't have any IP throttling recorded
|
||||
verifyIpThrottleTimeOnListener(listeners("REPLICATION").listenerName, expectThrottle = false)
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -757,8 +765,10 @@ class ConnectionQuotasTest {
|
|||
0, metricValue(listenerConnRateMetric(listenerName.value)), eps)
|
||||
assertNotNull(s"Expected connection-accept-throttle-time metric to exist for listener ${listenerName.value}",
|
||||
listenerConnThrottleMetric(listenerName.value))
|
||||
assertEquals(s"Connection throttle metric for listener ${listenerName.value}",
|
||||
assertEquals(s"Listener connection throttle metric for listener ${listenerName.value}",
|
||||
0, metricValue(listenerConnThrottleMetric(listenerName.value)).toLong)
|
||||
assertEquals(s"Ip connection throttle metric for listener ${listenerName.value}",
|
||||
0, metricValue(ipConnThrottleMetric(listenerName.value)).toLong)
|
||||
}
|
||||
verifyNoBlockedPercentRecordedOnAllListeners()
|
||||
assertEquals("Broker-wide connection acceptance rate metric", 0, metricValue(brokerConnRateMetric()), eps)
|
||||
|
|
@ -780,6 +790,11 @@ class ConnectionQuotasTest {
|
|||
}
|
||||
}
|
||||
|
||||
private def verifyIpThrottleTimeOnListener(listener: ListenerName, expectThrottle: Boolean): Unit = {
|
||||
assertEquals(s"IP connection throttle recorded for listener ${listener.value}", expectThrottle,
|
||||
metricValue(ipConnThrottleMetric(listener.value)).toLong > 0)
|
||||
}
|
||||
|
||||
private def verifyOnlyNonInterBrokerListenersBlockedPercentRecorded(): Unit = {
|
||||
blockedPercentMeters.foreach { case (name, meter) =>
|
||||
name match {
|
||||
|
|
@ -806,6 +821,14 @@ class ConnectionQuotasTest {
|
|||
metrics.metric(metricName)
|
||||
}
|
||||
|
||||
private def ipConnThrottleMetric(listener: String): KafkaMetric = {
|
||||
val metricName = metrics.metricName(
|
||||
"ip-connection-accept-throttle-time",
|
||||
SocketServer.MetricsGroup,
|
||||
Collections.singletonMap(Processor.ListenerMetricTag, listener))
|
||||
metrics.metric(metricName)
|
||||
}
|
||||
|
||||
private def listenerConnRateMetric(listener: String) : KafkaMetric = {
|
||||
val metricName = metrics.metricName(
|
||||
"connection-accept-rate",
|
||||
|
|
|
|||
Loading…
Reference in New Issue