MINOR: Extract SockerServer inner classes to server module (#16632)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Mickael Maison 2024-07-23 14:19:41 +02:00 committed by GitHub
parent a012af5fb4
commit c71eb60a3b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 178 additions and 63 deletions

View File

@ -26,7 +26,6 @@ import java.util.Optional
import java.util.concurrent._
import java.util.concurrent.atomic._
import kafka.cluster.{BrokerEndPoint, EndPoint}
import kafka.network.ConnectionQuotas._
import kafka.network.Processor._
import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse}
import kafka.network.SocketServer._
@ -45,7 +44,7 @@ import org.apache.kafka.common.requests.{ApiVersionsRequest, RequestContext, Req
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time, Utils}
import org.apache.kafka.common.{Endpoint, KafkaException, MetricName, Reconfigurable}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.network.{ConnectionQuotaEntity, ConnectionThrottledException, SocketServerConfigs, TooManyConnectionsException}
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.config.QuotaConfigs
import org.apache.kafka.server.metrics.KafkaMetricsGroup
@ -1387,47 +1386,6 @@ private[kafka] class Processor(
}
}
/**
* Interface for connection quota configuration. Connection quotas can be configured at the
* broker, listener or IP level.
*/
sealed trait ConnectionQuotaEntity {
def sensorName: String
def metricName: String
def sensorExpiration: Long
def metricTags: Map[String, String]
}
object ConnectionQuotas {
private val InactiveSensorExpirationTimeSeconds = TimeUnit.HOURS.toSeconds(1)
private val ConnectionRateSensorName = "Connection-Accept-Rate"
private val ConnectionRateMetricName = "connection-accept-rate"
private val IpMetricTag = "ip"
private val ListenerThrottlePrefix = ""
private val IpThrottlePrefix = "ip-"
private case class ListenerQuotaEntity(listenerName: String) extends ConnectionQuotaEntity {
override def sensorName: String = s"$ConnectionRateSensorName-$listenerName"
override def sensorExpiration: Long = Long.MaxValue
override def metricName: String = ConnectionRateMetricName
override def metricTags: Map[String, String] = Map(ListenerMetricTag -> listenerName)
}
private case object BrokerQuotaEntity extends ConnectionQuotaEntity {
override def sensorName: String = ConnectionRateSensorName
override def sensorExpiration: Long = Long.MaxValue
override def metricName: String = s"broker-$ConnectionRateMetricName"
override def metricTags: Map[String, String] = Map.empty
}
private case class IpQuotaEntity(ip: InetAddress) extends ConnectionQuotaEntity {
override def sensorName: String = s"$ConnectionRateSensorName-${ip.getHostAddress}"
override def sensorExpiration: Long = InactiveSensorExpirationTimeSeconds
override def metricName: String = ConnectionRateMetricName
override def metricTags: Map[String, String] = Map(IpMetricTag -> ip.getHostAddress)
}
}
class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extends Logging with AutoCloseable {
@volatile private var defaultMaxConnectionsPerIp: Int = config.maxConnectionsPerIp
@ -1444,7 +1402,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
@volatile private var defaultConnectionRatePerIp = QuotaConfigs.IP_CONNECTION_RATE_DEFAULT.intValue()
private val connectionRatePerIp = new ConcurrentHashMap[InetAddress, Int]()
// sensor that tracks broker-wide connection creation rate and limit (quota)
private val brokerConnectionRateSensor = getOrCreateConnectionRateQuotaSensor(config.maxConnectionCreationRate, BrokerQuotaEntity)
private val brokerConnectionRateSensor = getOrCreateConnectionRateQuotaSensor(config.maxConnectionCreationRate, ConnectionQuotaEntity.brokerQuotaEntity())
private val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds.toLong)
def inc(listenerName: ListenerName, address: InetAddress, acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = {
@ -1482,7 +1440,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = {
// if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if
// the rate limit increases, because it is just one connection per listener and the code is simpler that way
updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
updateConnectionRateQuota(maxConnectionRate, ConnectionQuotaEntity.brokerQuotaEntity())
}
/**
@ -1495,9 +1453,9 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
*/
def updateIpConnectionRateQuota(ip: Option[InetAddress], maxConnectionRate: Option[Int]): Unit = synchronized {
def isIpConnectionRateMetric(metricName: MetricName) = {
metricName.name == ConnectionRateMetricName &&
metricName.name == ConnectionQuotaEntity.CONNECTION_RATE_METRIC_NAME &&
metricName.group == MetricsGroup &&
metricName.tags.containsKey(IpMetricTag)
metricName.tags.containsKey(ConnectionQuotaEntity.IP_METRIC_TAG)
}
def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
@ -1517,7 +1475,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
connectionRatePerIp.remove(address)
}
}
updateConnectionRateQuota(connectionRateForIp(address), IpQuotaEntity(address))
updateConnectionRateQuota(connectionRateForIp(address), ConnectionQuotaEntity.ipQuotaEntity(address))
case None =>
// synchronize on counts to ensure reading an IP connection rate quota and creating a quota config is atomic
counts.synchronized {
@ -1526,7 +1484,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
info(s"Updated default max IP connection rate to $defaultConnectionRatePerIp")
metrics.metrics.forEach { (metricName, metric) =>
if (isIpConnectionRateMetric(metricName)) {
val quota = connectionRateForIp(InetAddress.getByName(metricName.tags.get(IpMetricTag)))
val quota = connectionRateForIp(InetAddress.getByName(metricName.tags.get(ConnectionQuotaEntity.IP_METRIC_TAG)))
if (shouldUpdateQuota(metric, quota)) {
debug(s"Updating existing connection rate quota config for ${metricName.tags} to $quota")
metric.config(rateQuotaMetricConfig(quota))
@ -1701,7 +1659,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
val connectionRateQuota = connectionRateForIp(address)
val quotaEnabled = connectionRateQuota != QuotaConfigs.IP_CONNECTION_RATE_DEFAULT
if (quotaEnabled) {
val sensor = getOrCreateConnectionRateQuotaSensor(connectionRateQuota, IpQuotaEntity(address))
val sensor = getOrCreateConnectionRateQuotaSensor(connectionRateQuota, ConnectionQuotaEntity.ipQuotaEntity(address))
val timeMs = time.milliseconds
val throttleMs = recordAndGetThrottleTimeMs(sensor, timeMs)
if (throttleMs > 0) {
@ -1766,7 +1724,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
connectionQuotaEntity.metricName,
MetricsGroup,
s"Tracking rate of accepting new connections (per second)",
connectionQuotaEntity.metricTags.asJava)
connectionQuotaEntity.metricTags)
}
private def rateQuotaMetricConfig(quotaLimit: Int): MetricConfig = {
@ -1783,9 +1741,9 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
class ListenerConnectionQuota(lock: Object, listener: ListenerName) extends ListenerReconfigurable with AutoCloseable {
@volatile private var _maxConnections = Int.MaxValue
private[network] val connectionRateSensor = getOrCreateConnectionRateQuotaSensor(Int.MaxValue, ListenerQuotaEntity(listener.value))
private[network] val listenerConnectionRateThrottleSensor = createConnectionRateThrottleSensor(ListenerThrottlePrefix)
private[network] val ipConnectionRateThrottleSensor = createConnectionRateThrottleSensor(IpThrottlePrefix)
private[network] val connectionRateSensor = getOrCreateConnectionRateQuotaSensor(Int.MaxValue, ConnectionQuotaEntity.listenerQuotaEntity(listener.value))
private[network] val listenerConnectionRateThrottleSensor = createConnectionRateThrottleSensor(ConnectionQuotaEntity.LISTENER_THROTTLE_PREFIX)
private[network] val ipConnectionRateThrottleSensor = createConnectionRateThrottleSensor(ConnectionQuotaEntity.IP_THROTTLE_PREFIX)
def maxConnections: Int = _maxConnections
@ -1793,7 +1751,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
override def configure(configs: util.Map[String, _]): Unit = {
_maxConnections = maxConnections(configs)
updateConnectionRateQuota(maxConnectionCreationRate(configs), ListenerQuotaEntity(listener.value))
updateConnectionRateQuota(maxConnectionCreationRate(configs), ConnectionQuotaEntity.listenerQuotaEntity(listener.value))
}
override def reconfigurableConfigs(): util.Set[String] = {
@ -1813,7 +1771,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
override def reconfigure(configs: util.Map[String, _]): Unit = {
lock.synchronized {
_maxConnections = maxConnections(configs)
updateConnectionRateQuota(maxConnectionCreationRate(configs), ListenerQuotaEntity(listener.value))
updateConnectionRateQuota(maxConnectionCreationRate(configs), ConnectionQuotaEntity.listenerQuotaEntity(listener.value))
lock.notifyAll()
}
}
@ -1860,8 +1818,3 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
}
}
class TooManyConnectionsException(val ip: InetAddress, val count: Int) extends KafkaException(s"Too many connections from $ip (maximum = $count)")
class ConnectionThrottledException(val ip: InetAddress, val startThrottleTimeMs: Long, val throttleTimeMs: Long)
extends KafkaException(s"$ip throttled for $throttleTimeMs")

View File

@ -31,8 +31,8 @@ import org.apache.kafka.common.metrics.internals.MetricsUtils
import org.apache.kafka.common.metrics.{KafkaMetric, MetricConfig, Metrics}
import org.apache.kafka.common.network._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.server.config.{ReplicationConfigs, QuotaConfigs}
import org.apache.kafka.network.{ConnectionThrottledException, SocketServerConfigs, TooManyConnectionsException}
import org.apache.kafka.server.config.{QuotaConfigs, ReplicationConfigs}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.MockTime
import org.junit.jupiter.api.Assertions._

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.network;
import java.net.InetAddress;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Class for connection quota configuration. Connection quotas can be configured at the
* broker, listener or IP level.
*/
public class ConnectionQuotaEntity {
public static final String CONNECTION_RATE_SENSOR_NAME = "Connection-Accept-Rate";
public static final String CONNECTION_RATE_METRIC_NAME = "connection-accept-rate";
public static final String LISTENER_THROTTLE_PREFIX = "";
public static final String IP_METRIC_TAG = "ip";
public static final String IP_THROTTLE_PREFIX = "ip-";
public static ConnectionQuotaEntity listenerQuotaEntity(String listenerName) {
return new ConnectionQuotaEntity(CONNECTION_RATE_SENSOR_NAME + "-" + listenerName,
CONNECTION_RATE_METRIC_NAME,
Long.MAX_VALUE,
Collections.singletonMap("listener", listenerName));
}
public static ConnectionQuotaEntity brokerQuotaEntity() {
return new ConnectionQuotaEntity(CONNECTION_RATE_SENSOR_NAME,
"broker-" + ConnectionQuotaEntity.CONNECTION_RATE_METRIC_NAME,
Long.MAX_VALUE,
Collections.emptyMap());
}
public static ConnectionQuotaEntity ipQuotaEntity(InetAddress ip) {
return new ConnectionQuotaEntity(CONNECTION_RATE_SENSOR_NAME + "-" + ip.getHostAddress(),
CONNECTION_RATE_METRIC_NAME,
TimeUnit.HOURS.toSeconds(1),
Collections.singletonMap(IP_METRIC_TAG, ip.getHostAddress()));
}
private final String sensorName;
private final String metricName;
private final long sensorExpiration;
private final Map<String, String> metricTags;
private ConnectionQuotaEntity(String sensorName, String metricName, long sensorExpiration, Map<String, String> metricTags) {
this.sensorName = sensorName;
this.metricName = metricName;
this.sensorExpiration = sensorExpiration;
this.metricTags = metricTags;
}
/**
* The name of the sensor for this quota entity
*/
public String sensorName() {
return sensorName;
}
/**
* The name of the metric for this quota entity
*/
public String metricName() {
return metricName;
}
/**
* The duration in second to keep the sensor even if no new values are recorded
*/
public long sensorExpiration() {
return sensorExpiration;
}
/**
* Tags associated with this quota entity
*/
public Map<String, String> metricTags() {
return metricTags;
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.network;
import org.apache.kafka.common.KafkaException;
import java.net.InetAddress;
public class ConnectionThrottledException extends KafkaException {
public final long startThrottleTimeMs;
public final long throttleTimeMs;
public ConnectionThrottledException(InetAddress ip, long startThrottleTimeMs, long throttleTimeMs) {
super(ip + " throttled for " + throttleTimeMs);
this.startThrottleTimeMs = startThrottleTimeMs;
this.throttleTimeMs = throttleTimeMs;
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.network;
import org.apache.kafka.common.KafkaException;
import java.net.InetAddress;
public class TooManyConnectionsException extends KafkaException {
public final InetAddress ip;
public final int count;
public TooManyConnectionsException(InetAddress ip, int count) {
super("Too many connections from " + ip + " (maximum = " + count + ")");
this.ip = ip;
this.count = count;
}
}