diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 6e0b693cc1b..a6d3e2cbc38 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -68,6 +68,8 @@ public final class Utils { // IPv6 is supported with [ip] pattern private static final Pattern HOST_PORT_PATTERN = Pattern.compile(".*?\\[?([0-9a-zA-Z\\-%._:]*)\\]?:([0-9]+)"); + private static final Pattern VALID_HOST_CHARACTERS = Pattern.compile("([0-9a-zA-Z\\-%._:]*)"); + // Prints up to 2 decimal digits. Used for human readable printing private static final DecimalFormat TWO_DIGIT_FORMAT = new DecimalFormat("0.##"); @@ -435,6 +437,15 @@ public final class Utils { return matcher.matches() ? Integer.parseInt(matcher.group(2)) : null; } + /** + * Basic validation of the supplied address. checks for valid characters + * @param address hostname string to validate + * @return true if address contains valid characters + */ + public static boolean validHostPattern(String address) { + return VALID_HOST_CHARACTERS.matcher(address).matches(); + } + /** * Formats hostname and port number as a "host:port" address string, * surrounding IPv6 addresses with braces '[', ']' diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java index 3feeff25e79..4d1d830ed7d 100755 --- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java @@ -39,6 +39,7 @@ import static org.apache.kafka.common.utils.Utils.formatAddress; import static org.apache.kafka.common.utils.Utils.formatBytes; import static org.apache.kafka.common.utils.Utils.getHost; import static org.apache.kafka.common.utils.Utils.getPort; +import static org.apache.kafka.common.utils.Utils.validHostPattern; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -59,6 +60,16 @@ public class UtilsTest { assertEquals("fe80::b1da:69ca:57f7:63d8%3", getHost("PLAINTEXT://[fe80::b1da:69ca:57f7:63d8%3]:5678")); } + @Test + public void testHostPattern() { + assertTrue(validHostPattern("127.0.0.1")); + assertTrue(validHostPattern("mydomain.com")); + assertTrue(validHostPattern("MyDomain.com")); + assertTrue(validHostPattern("My_Domain.com")); + assertTrue(validHostPattern("::1")); + assertTrue(validHostPattern("2001:db8:85a3:8d3:1319:8a2e:370")); + } + @Test public void testGetPort() { assertEquals(8000, getPort("127.0.0.1:8000").intValue()); diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 62fc7a554c6..749c921ee02 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -57,9 +57,6 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time private val maxQueuedRequests = config.queuedMaxRequests - private val maxConnectionsPerIp = config.maxConnectionsPerIp - private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides - private val logContext = new LogContext(s"[SocketServer brokerId=${config.brokerId}] ") this.logIdent = logContext.logPrefix @@ -90,7 +87,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time */ def startup(startupProcessors: Boolean = true) { this.synchronized { - connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) + connectionQuotas = new ConnectionQuotas(config.maxConnectionsPerIp, config.maxConnectionsPerIpOverrides) createAcceptorAndProcessors(config.numNetworkThreads, config.listeners) if (startupProcessors) { startProcessors() @@ -229,6 +226,16 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time } } + def updateMaxConnectionsPerIp(maxConnectionsPerIp: Int): Unit = { + info(s"Updating maxConnectionsPerIp: $maxConnectionsPerIp") + connectionQuotas.updateMaxConnectionsPerIp(maxConnectionsPerIp) + } + + def updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides: Map[String, Int]): Unit = { + info(s"Updating maxConnectionsPerIpOverrides: ${maxConnectionsPerIpOverrides.map { case (k, v) => s"$k=$v" }.mkString(",")}") + connectionQuotas.updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides) + } + /* `protected` for test usage */ protected[network] def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, securityProtocol: SecurityProtocol, memoryPool: MemoryPool): Processor = { @@ -878,19 +885,28 @@ private[kafka] class Processor(val id: Int, class ConnectionQuotas(val defaultMax: Int, overrideQuotas: Map[String, Int]) { - private val overrides = overrideQuotas.map { case (host, count) => (InetAddress.getByName(host), count) } + @volatile private var defaultMaxConnectionsPerIp = defaultMax + @volatile private var maxConnectionsPerIpOverrides = overrideQuotas.map { case (host, count) => (InetAddress.getByName(host), count) } private val counts = mutable.Map[InetAddress, Int]() def inc(address: InetAddress) { counts.synchronized { val count = counts.getOrElseUpdate(address, 0) counts.put(address, count + 1) - val max = overrides.getOrElse(address, defaultMax) + val max = maxConnectionsPerIpOverrides.getOrElse(address, defaultMaxConnectionsPerIp) if (count >= max) throw new TooManyConnectionsException(address, max) } } + def updateMaxConnectionsPerIp(maxConnectionsPerIp: Int): Unit = { + defaultMaxConnectionsPerIp = maxConnectionsPerIp + } + + def updateMaxConnectionsPerIpOverride(overrideQuotas: Map[String, Int]): Unit = { + maxConnectionsPerIpOverrides = overrideQuotas.map { case (host, count) => (InetAddress.getByName(host), count) } + } + def dec(address: InetAddress) { counts.synchronized { val count = counts.getOrElse(address, diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 72772fa6fcb..19743e58257 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -80,7 +80,8 @@ object DynamicBrokerConfig { DynamicLogConfig.ReconfigurableConfigs ++ DynamicThreadPool.ReconfigurableConfigs ++ Set(KafkaConfig.MetricReporterClassesProp) ++ - DynamicListenerConfig.ReconfigurableConfigs + DynamicListenerConfig.ReconfigurableConfigs ++ + DynamicConnectionQuota.ReconfigurableConfigs private val PerBrokerConfigs = DynamicSecurityConfigs ++ DynamicListenerConfig.ReconfigurableConfigs @@ -197,6 +198,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer)) addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, kafkaServer)) addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer)) + addBrokerReconfigurable(new DynamicConnectionQuota(kafkaServer)) } def addReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) { @@ -815,3 +817,24 @@ class DynamicListenerConfig(server: KafkaServer) extends BrokerReconfigurable wi } +object DynamicConnectionQuota { + val ReconfigurableConfigs = Set(KafkaConfig.MaxConnectionsPerIpProp, KafkaConfig.MaxConnectionsPerIpOverridesProp) +} + +class DynamicConnectionQuota(server: KafkaServer) extends BrokerReconfigurable { + + override def reconfigurableConfigs: Set[String] = { + DynamicConnectionQuota.ReconfigurableConfigs + } + + override def validateReconfiguration(newConfig: KafkaConfig): Unit = { + } + + override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { + server.socketServer.updateMaxConnectionsPerIpOverride(newConfig.maxConnectionsPerIpOverrides) + + if (newConfig.maxConnectionsPerIp != oldConfig.maxConnectionsPerIp) + server.socketServer.updateMaxConnectionsPerIp(newConfig.maxConnectionsPerIp) + } +} + diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 3d7367e3602..b651549f1e8 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -36,6 +36,7 @@ import org.apache.kafka.common.metrics.Sensor import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.record.{LegacyRecord, Records, TimestampType} import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.utils.Utils import scala.collection.JavaConverters._ import scala.collection.Map @@ -1392,5 +1393,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO if (maxConnectionsPerIp == 0) require(!maxConnectionsPerIpOverrides.isEmpty, s"${KafkaConfig.MaxConnectionsPerIpProp} can be set to zero only if" + s" ${KafkaConfig.MaxConnectionsPerIpOverridesProp} property is set.") + + val invalidAddresses = maxConnectionsPerIpOverrides.keys.filterNot(address => Utils.validHostPattern(address)) + if (!invalidAddresses.isEmpty) + throw new IllegalArgumentException(s"${KafkaConfig.MaxConnectionsPerIpOverridesProp} contains invalid addresses : ${invalidAddresses.mkString(",")}") } } diff --git a/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala new file mode 100644 index 00000000000..374556bda22 --- /dev/null +++ b/core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.io.IOException +import java.net.{InetAddress, Socket} +import java.util.Properties + +import kafka.server.{BaseRequestTest, KafkaConfig} +import kafka.utils.TestUtils +import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} +import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse} +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.junit.Assert.assertEquals +import org.junit.{Before, Test} + +import scala.collection.JavaConverters._ + +class DynamicConnectionQuotaTest extends BaseRequestTest { + + override def numBrokers = 1 + + val topic = "test" + + @Before + override def setUp(): Unit = { + super.setUp() + TestUtils.createTopic(zkClient, topic, numBrokers, numBrokers, servers) + } + + @Test + def testDynamicConnectionQuota(): Unit = { + def connect(socketServer: SocketServer, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT, localAddr: InetAddress = null) = { + new Socket("localhost", socketServer.boundPort(ListenerName.forSecurityProtocol(protocol)), localAddr, 0) + } + + val socketServer = servers.head.socketServer + val localAddress = InetAddress.getByName("127.0.0.1") + def connectionCount = socketServer.connectionCount(localAddress) + val initialConnectionCount = connectionCount + val maxConnectionsPerIP = 5 + + val props = new Properties + props.put(KafkaConfig.MaxConnectionsPerIpProp, maxConnectionsPerIP.toString) + reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.MaxConnectionsPerIpProp, maxConnectionsPerIP.toString)) + + //wait for adminClient connections to close + TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connection count mismatch") + + //create connections up to maxConnectionsPerIP - 1, leave space for one connection + var conns = (connectionCount until (maxConnectionsPerIP - 1)).map(_ => connect(socketServer)) + + // produce should succeed + var produceResponse = sendProduceRequest() + assertEquals(1, produceResponse.responses.size) + val (tp, partitionResponse) = produceResponse.responses.asScala.head + assertEquals(Errors.NONE, partitionResponse.error) + + conns = conns :+ connect(socketServer) + // now try one more (should fail) + intercept[IOException](sendProduceRequest()) + + conns.foreach(conn => conn.close()) + TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connection count mismatch") + + // Increase MaxConnectionsPerIpOverrides for localhost to 7 + val maxConnectionsPerIPOverride = 7 + props.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, s"localhost:$maxConnectionsPerIPOverride") + reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.MaxConnectionsPerIpOverridesProp, s"localhost:$maxConnectionsPerIPOverride")) + + //wait for adminClient connections to close + TestUtils.waitUntilTrue(() => initialConnectionCount == connectionCount, "Connection count mismatch") + + //create connections up to maxConnectionsPerIPOverride - 1, leave space for one connection + conns = (connectionCount until maxConnectionsPerIPOverride - 1).map(_ => connect(socketServer)) + + // send should succeed + produceResponse = sendProduceRequest() + assertEquals(1, produceResponse.responses.size) + val (tp1, partitionResponse1) = produceResponse.responses.asScala.head + assertEquals(Errors.NONE, partitionResponse1.error) + + conns = conns :+ connect(socketServer) + // now try one more (should fail) + intercept[IOException](sendProduceRequest()) + + //close one connection + conns.head.close() + // send should succeed + sendProduceRequest() + } + + private def reconfigureServers(newProps: Properties, perBrokerConfig: Boolean, aPropToVerify: (String, String)): Unit = { + val adminClient = createAdminClient() + TestUtils.alterConfigs(servers, adminClient, newProps, perBrokerConfig).all.get() + waitForConfigOnServer(aPropToVerify._1, aPropToVerify._2) + adminClient.close() + } + + private def createAdminClient(): AdminClient = { + val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(securityProtocol.name)) + val config = new Properties() + config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) + config.put(AdminClientConfig.METADATA_MAX_AGE_CONFIG, "10") + val adminClient = AdminClient.create(config) + adminClient + } + + private def waitForConfigOnServer(propName: String, propValue: String, maxWaitMs: Long = 10000): Unit = { + TestUtils.retry(maxWaitMs) { + assertEquals(propValue, servers.head.config.originals.get(propName)) + } + } + + private def sendProduceRequest(): ProduceResponse = { + val topicPartition = new TopicPartition(topic, 0) + val memoryRecords = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord(System.currentTimeMillis(), "key".getBytes, "value".getBytes)) + val partitionRecords = Map(topicPartition -> memoryRecords) + val request = ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build() + val response = connectAndSend(request, ApiKeys.PRODUCE, servers.head.socketServer) + ProduceResponse.parse(response, request.version) + } +} diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 9c8acb48024..41b90557dba 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -171,6 +171,22 @@ class DynamicBrokerConfigTest extends JUnitSuite { verifyUpdate(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "password") } + @Test + def testConnectionQuota(): Unit = { + verifyConfigUpdate(KafkaConfig.MaxConnectionsPerIpProp, "100", perBrokerConfig = true, expectFailure = false) + verifyConfigUpdate(KafkaConfig.MaxConnectionsPerIpProp, "100", perBrokerConfig = false, expectFailure = false) + //MaxConnectionsPerIpProp can be set to zero only if MaxConnectionsPerIpOverridesProp property is set + verifyConfigUpdate(KafkaConfig.MaxConnectionsPerIpProp, "0", perBrokerConfig = false, expectFailure = true) + + verifyConfigUpdate(KafkaConfig.MaxConnectionsPerIpOverridesProp, "hostName1:100,hostName2:0", perBrokerConfig = true, + expectFailure = false) + verifyConfigUpdate(KafkaConfig.MaxConnectionsPerIpOverridesProp, "hostName1:100,hostName2:0", perBrokerConfig = false, + expectFailure = false) + //test invalid address + verifyConfigUpdate(KafkaConfig.MaxConnectionsPerIpOverridesProp, "hostName#:100", perBrokerConfig = true, + expectFailure = true) + } + private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean) { val configProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) configProps.put(KafkaConfig.PasswordEncoderSecretProp, "broker.secret") diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 0ee8d8152da..927dd1c203b 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -792,6 +792,8 @@ class KafkaConfigTest { assertFalse(isValidKafkaConfig(props)) props.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:100") assertTrue(isValidKafkaConfig(props)) + props.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.0#:100") + assertFalse(isValidKafkaConfig(props)) } private def assertPropertyInvalid(validRequiredProps: => Properties, name: String, values: Any*) { diff --git a/docs/configuration.html b/docs/configuration.html index cc3b42cd0ea..e5576f9263b 100644 --- a/docs/configuration.html +++ b/docs/configuration.html @@ -203,6 +203,14 @@
  • background.threads
  • +
    Updating ConnectionQuota Configs
    + The maximum number of connections allowed for a given IP/host by the broker may be updated dynamically at cluster-default level used by all brokers. + The changes will apply for new connection creations and the existing connections count will be taken into account by the new limits. + +
    Adding and Removing Listeners

    Listeners may be added or removed dynamically. When a new listener is added, security configs of the listener must be provided as listener configs with the listener prefix listener.name.{listenerName}.. If the new listener uses SASL,