KAFKA-6751; Support dynamic configuration of max.connections.per.ip/max.connections.per.ip.overrides configs (KIP-308) (#5334)

KIP-308 implementation. See https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=85474993.

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
Manikumar Reddy O 2018-08-10 03:10:24 +05:30 committed by Jason Gustafson
parent b1539ff62d
commit 92004fa21a
9 changed files with 242 additions and 7 deletions

View File

@ -68,6 +68,8 @@ public final class Utils {
// IPv6 is supported with [ip] pattern // IPv6 is supported with [ip] pattern
private static final Pattern HOST_PORT_PATTERN = Pattern.compile(".*?\\[?([0-9a-zA-Z\\-%._:]*)\\]?:([0-9]+)"); private static final Pattern HOST_PORT_PATTERN = Pattern.compile(".*?\\[?([0-9a-zA-Z\\-%._:]*)\\]?:([0-9]+)");
private static final Pattern VALID_HOST_CHARACTERS = Pattern.compile("([0-9a-zA-Z\\-%._:]*)");
// Prints up to 2 decimal digits. Used for human readable printing // Prints up to 2 decimal digits. Used for human readable printing
private static final DecimalFormat TWO_DIGIT_FORMAT = new DecimalFormat("0.##"); private static final DecimalFormat TWO_DIGIT_FORMAT = new DecimalFormat("0.##");
@ -435,6 +437,15 @@ public final class Utils {
return matcher.matches() ? Integer.parseInt(matcher.group(2)) : null; return matcher.matches() ? Integer.parseInt(matcher.group(2)) : null;
} }
/**
* Basic validation of the supplied address. checks for valid characters
* @param address hostname string to validate
* @return true if address contains valid characters
*/
public static boolean validHostPattern(String address) {
return VALID_HOST_CHARACTERS.matcher(address).matches();
}
/** /**
* Formats hostname and port number as a "host:port" address string, * Formats hostname and port number as a "host:port" address string,
* surrounding IPv6 addresses with braces '[', ']' * surrounding IPv6 addresses with braces '[', ']'

View File

@ -39,6 +39,7 @@ import static org.apache.kafka.common.utils.Utils.formatAddress;
import static org.apache.kafka.common.utils.Utils.formatBytes; import static org.apache.kafka.common.utils.Utils.formatBytes;
import static org.apache.kafka.common.utils.Utils.getHost; import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort; import static org.apache.kafka.common.utils.Utils.getPort;
import static org.apache.kafka.common.utils.Utils.validHostPattern;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
@ -59,6 +60,16 @@ public class UtilsTest {
assertEquals("fe80::b1da:69ca:57f7:63d8%3", getHost("PLAINTEXT://[fe80::b1da:69ca:57f7:63d8%3]:5678")); assertEquals("fe80::b1da:69ca:57f7:63d8%3", getHost("PLAINTEXT://[fe80::b1da:69ca:57f7:63d8%3]:5678"));
} }
@Test
public void testHostPattern() {
assertTrue(validHostPattern("127.0.0.1"));
assertTrue(validHostPattern("mydomain.com"));
assertTrue(validHostPattern("MyDomain.com"));
assertTrue(validHostPattern("My_Domain.com"));
assertTrue(validHostPattern("::1"));
assertTrue(validHostPattern("2001:db8:85a3:8d3:1319:8a2e:370"));
}
@Test @Test
public void testGetPort() { public void testGetPort() {
assertEquals(8000, getPort("127.0.0.1:8000").intValue()); assertEquals(8000, getPort("127.0.0.1:8000").intValue());

View File

@ -57,9 +57,6 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
private val maxQueuedRequests = config.queuedMaxRequests private val maxQueuedRequests = config.queuedMaxRequests
private val maxConnectionsPerIp = config.maxConnectionsPerIp
private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides
private val logContext = new LogContext(s"[SocketServer brokerId=${config.brokerId}] ") private val logContext = new LogContext(s"[SocketServer brokerId=${config.brokerId}] ")
this.logIdent = logContext.logPrefix this.logIdent = logContext.logPrefix
@ -90,7 +87,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
*/ */
def startup(startupProcessors: Boolean = true) { def startup(startupProcessors: Boolean = true) {
this.synchronized { this.synchronized {
connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) connectionQuotas = new ConnectionQuotas(config.maxConnectionsPerIp, config.maxConnectionsPerIpOverrides)
createAcceptorAndProcessors(config.numNetworkThreads, config.listeners) createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)
if (startupProcessors) { if (startupProcessors) {
startProcessors() startProcessors()
@ -229,6 +226,16 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
} }
} }
def updateMaxConnectionsPerIp(maxConnectionsPerIp: Int): Unit = {
info(s"Updating maxConnectionsPerIp: $maxConnectionsPerIp")
connectionQuotas.updateMaxConnectionsPerIp(maxConnectionsPerIp)
}
def updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides: Map[String, Int]): Unit = {
info(s"Updating maxConnectionsPerIpOverrides: ${maxConnectionsPerIpOverrides.map { case (k, v) => s"$k=$v" }.mkString(",")}")
connectionQuotas.updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides)
}
/* `protected` for test usage */ /* `protected` for test usage */
protected[network] def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, protected[network] def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
securityProtocol: SecurityProtocol, memoryPool: MemoryPool): Processor = { securityProtocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
@ -878,19 +885,28 @@ private[kafka] class Processor(val id: Int,
class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) { class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) {
private val overrides = overrideQuotas.map { case (host, count) => (InetAddress.getByName(host), count) } @volatile private var defaultMaxConnectionsPerIp = defaultMax
@volatile private var maxConnectionsPerIpOverrides = overrideQuotas.map { case (host, count) => (InetAddress.getByName(host), count) }
private val counts = mutable.Map[InetAddress, Int]() private val counts = mutable.Map[InetAddress, Int]()
def inc(address: InetAddress) { def inc(address: InetAddress) {
counts.synchronized { counts.synchronized {
val count = counts.getOrElseUpdate(address, 0) val count = counts.getOrElseUpdate(address, 0)
counts.put(address, count + 1) counts.put(address, count + 1)
val max = overrides.getOrElse(address, defaultMax) val max = maxConnectionsPerIpOverrides.getOrElse(address, defaultMaxConnectionsPerIp)
if (count >= max) if (count >= max)
throw new TooManyConnectionsException(address, max) throw new TooManyConnectionsException(address, max)
} }
} }
def updateMaxConnectionsPerIp(maxConnectionsPerIp: Int): Unit = {
defaultMaxConnectionsPerIp = maxConnectionsPerIp
}
def updateMaxConnectionsPerIpOverride(overrideQuotas: Map[String, Int]): Unit = {
maxConnectionsPerIpOverrides = overrideQuotas.map { case (host, count) => (InetAddress.getByName(host), count) }
}
def dec(address: InetAddress) { def dec(address: InetAddress) {
counts.synchronized { counts.synchronized {
val count = counts.getOrElse(address, val count = counts.getOrElse(address,

View File

@ -80,7 +80,8 @@ object DynamicBrokerConfig {
DynamicLogConfig.ReconfigurableConfigs ++ DynamicLogConfig.ReconfigurableConfigs ++
DynamicThreadPool.ReconfigurableConfigs ++ DynamicThreadPool.ReconfigurableConfigs ++
Set(KafkaConfig.MetricReporterClassesProp) ++ Set(KafkaConfig.MetricReporterClassesProp) ++
DynamicListenerConfig.ReconfigurableConfigs DynamicListenerConfig.ReconfigurableConfigs ++
DynamicConnectionQuota.ReconfigurableConfigs
private val PerBrokerConfigs = DynamicSecurityConfigs ++ private val PerBrokerConfigs = DynamicSecurityConfigs ++
DynamicListenerConfig.ReconfigurableConfigs DynamicListenerConfig.ReconfigurableConfigs
@ -197,6 +198,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer)) addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer))
addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, kafkaServer)) addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, kafkaServer))
addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer)) addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
addBrokerReconfigurable(new DynamicConnectionQuota(kafkaServer))
} }
def addReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) { def addReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) {
@ -815,3 +817,24 @@ class DynamicListenerConfig(server: KafkaServer) extends BrokerReconfigurable wi
} }
object DynamicConnectionQuota {
val ReconfigurableConfigs = Set(KafkaConfig.MaxConnectionsPerIpProp, KafkaConfig.MaxConnectionsPerIpOverridesProp)
}
class DynamicConnectionQuota(server: KafkaServer) extends BrokerReconfigurable {
override def reconfigurableConfigs: Set[String] = {
DynamicConnectionQuota.ReconfigurableConfigs
}
override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
}
override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
server.socketServer.updateMaxConnectionsPerIpOverride(newConfig.maxConnectionsPerIpOverrides)
if (newConfig.maxConnectionsPerIp != oldConfig.maxConnectionsPerIp)
server.socketServer.updateMaxConnectionsPerIp(newConfig.maxConnectionsPerIp)
}
}

View File

@ -36,6 +36,7 @@ import org.apache.kafka.common.metrics.Sensor
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.record.{LegacyRecord, Records, TimestampType} import org.apache.kafka.common.record.{LegacyRecord, Records, TimestampType}
import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.Map import scala.collection.Map
@ -1392,5 +1393,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
if (maxConnectionsPerIp == 0) if (maxConnectionsPerIp == 0)
require(!maxConnectionsPerIpOverrides.isEmpty, s"${KafkaConfig.MaxConnectionsPerIpProp} can be set to zero only if" + require(!maxConnectionsPerIpOverrides.isEmpty, s"${KafkaConfig.MaxConnectionsPerIpProp} can be set to zero only if" +
s" ${KafkaConfig.MaxConnectionsPerIpOverridesProp} property is set.") s" ${KafkaConfig.MaxConnectionsPerIpOverridesProp} property is set.")
val invalidAddresses = maxConnectionsPerIpOverrides.keys.filterNot(address => Utils.validHostPattern(address))
if (!invalidAddresses.isEmpty)
throw new IllegalArgumentException(s"${KafkaConfig.MaxConnectionsPerIpOverridesProp} contains invalid addresses : ${invalidAddresses.mkString(",")}")
} }
} }

View File

@ -0,0 +1,143 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.network
import java.io.IOException
import java.net.{InetAddress, Socket}
import java.util.Properties
import kafka.server.{BaseRequestTest, KafkaConfig}
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.Assert.assertEquals
import org.junit.{Before, Test}
import scala.collection.JavaConverters._
class DynamicConnectionQuotaTest extends BaseRequestTest {
override def numBrokers = 1
val topic = "test"
@Before
override def setUp(): Unit = {
super.setUp()
TestUtils.createTopic(zkClient, topic, numBrokers, numBrokers, servers)
}
@Test
def testDynamicConnectionQuota(): Unit = {
def connect(socketServer: SocketServer, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT, localAddr: InetAddress = null) = {
new Socket("localhost", socketServer.boundPort(ListenerName.forSecurityProtocol(protocol)), localAddr, 0)
}
val socketServer = servers.head.socketServer
val localAddress = InetAddress.getByName("127.0.0.1")
def connectionCount = socketServer.connectionCount(localAddress)
val initialConnectionCount = connectionCount
val maxConnectionsPerIP = 5
val props = new Properties
props.put(KafkaConfig.MaxConnectionsPerIpProp, maxConnectionsPerIP.toString)
reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.MaxConnectionsPerIpProp, maxConnectionsPerIP.toString))
//wait for adminClient connections to close
TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connection count mismatch")
//create connections up to maxConnectionsPerIP - 1, leave space for one connection
var conns = (connectionCount until (maxConnectionsPerIP - 1)).map(_ => connect(socketServer))
// produce should succeed
var produceResponse = sendProduceRequest()
assertEquals(1, produceResponse.responses.size)
val (tp, partitionResponse) = produceResponse.responses.asScala.head
assertEquals(Errors.NONE, partitionResponse.error)
conns = conns :+ connect(socketServer)
// now try one more (should fail)
intercept[IOException](sendProduceRequest())
conns.foreach(conn => conn.close())
TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connection count mismatch")
// Increase MaxConnectionsPerIpOverrides for localhost to 7
val maxConnectionsPerIPOverride = 7
props.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, s"localhost:$maxConnectionsPerIPOverride")
reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.MaxConnectionsPerIpOverridesProp, s"localhost:$maxConnectionsPerIPOverride"))
//wait for adminClient connections to close
TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connection count mismatch")
//create connections up to maxConnectionsPerIPOverride - 1, leave space for one connection
conns = (connectionCount until maxConnectionsPerIPOverride - 1).map(_ => connect(socketServer))
// send should succeed
produceResponse = sendProduceRequest()
assertEquals(1, produceResponse.responses.size)
val (tp1, partitionResponse1) = produceResponse.responses.asScala.head
assertEquals(Errors.NONE, partitionResponse1.error)
conns = conns :+ connect(socketServer)
// now try one more (should fail)
intercept[IOException](sendProduceRequest())
//close one connection
conns.head.close()
// send should succeed
sendProduceRequest()
}
private def reconfigureServers(newProps: Properties, perBrokerConfig: Boolean, aPropToVerify: (String, String)): Unit = {
val adminClient = createAdminClient()
TestUtils.alterConfigs(servers, adminClient, newProps, perBrokerConfig).all.get()
waitForConfigOnServer(aPropToVerify._1, aPropToVerify._2)
adminClient.close()
}
private def createAdminClient(): AdminClient = {
val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(securityProtocol.name))
val config = new Properties()
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
config.put(AdminClientConfig.METADATA_MAX_AGE_CONFIG, "10")
val adminClient = AdminClient.create(config)
adminClient
}
private def waitForConfigOnServer(propName: String, propValue: String, maxWaitMs: Long = 10000): Unit = {
TestUtils.retry(maxWaitMs) {
assertEquals(propValue, servers.head.config.originals.get(propName))
}
}
private def sendProduceRequest(): ProduceResponse = {
val topicPartition = new TopicPartition(topic, 0)
val memoryRecords = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "value".getBytes))
val partitionRecords = Map(topicPartition -> memoryRecords)
val request = ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build()
val response = connectAndSend(request, ApiKeys.PRODUCE, servers.head.socketServer)
ProduceResponse.parse(response, request.version)
}
}

View File

@ -171,6 +171,22 @@ class DynamicBrokerConfigTest extends JUnitSuite {
verifyUpdate(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "password") verifyUpdate(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "password")
} }
@Test
def testConnectionQuota(): Unit = {
verifyConfigUpdate(KafkaConfig.MaxConnectionsPerIpProp, "100", perBrokerConfig = true, expectFailure = false)
verifyConfigUpdate(KafkaConfig.MaxConnectionsPerIpProp, "100", perBrokerConfig = false, expectFailure = false)
//MaxConnectionsPerIpProp can be set to zero only if MaxConnectionsPerIpOverridesProp property is set
verifyConfigUpdate(KafkaConfig.MaxConnectionsPerIpProp, "0", perBrokerConfig = false, expectFailure = true)
verifyConfigUpdate(KafkaConfig.MaxConnectionsPerIpOverridesProp, "hostName1:100,hostName2:0", perBrokerConfig = true,
expectFailure = false)
verifyConfigUpdate(KafkaConfig.MaxConnectionsPerIpOverridesProp, "hostName1:100,hostName2:0", perBrokerConfig = false,
expectFailure = false)
//test invalid address
verifyConfigUpdate(KafkaConfig.MaxConnectionsPerIpOverridesProp, "hostName#:100", perBrokerConfig = true,
expectFailure = true)
}
private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean) { private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean) {
val configProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) val configProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
configProps.put(KafkaConfig.PasswordEncoderSecretProp, "broker.secret") configProps.put(KafkaConfig.PasswordEncoderSecretProp, "broker.secret")

View File

@ -792,6 +792,8 @@ class KafkaConfigTest {
assertFalse(isValidKafkaConfig(props)) assertFalse(isValidKafkaConfig(props))
props.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:100") props.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:100")
assertTrue(isValidKafkaConfig(props)) assertTrue(isValidKafkaConfig(props))
props.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.0#:100")
assertFalse(isValidKafkaConfig(props))
} }
private def assertPropertyInvalid(validRequiredProps: => Properties, name: String, values: Any*) { private def assertPropertyInvalid(validRequiredProps: => Properties, name: String, values: Any*) {

View File

@ -203,6 +203,14 @@
<li><code>background.threads</code></li> <li><code>background.threads</code></li>
</ul> </ul>
<h5>Updating ConnectionQuota Configs</h5>
The maximum number of connections allowed for a given IP/host by the broker may be updated dynamically at cluster-default level used by all brokers.
The changes will apply for new connection creations and the existing connections count will be taken into account by the new limits.
<ul>
<li><code>max.connections.per.ip</code></li>
<li><code>max.connections.per.ip.overrides</code></li>
</ul>
<h5>Adding and Removing Listeners</h5> <h5>Adding and Removing Listeners</h5>
<p>Listeners may be added or removed dynamically. When a new listener is added, security configs of the listener must be provided <p>Listeners may be added or removed dynamically. When a new listener is added, security configs of the listener must be provided
as listener configs with the listener prefix <code>listener.name.{listenerName}.</code>. If the new listener uses SASL, as listener configs with the listener prefix <code>listener.name.{listenerName}.</code>. If the new listener uses SASL,