From ff3b2d67a1049c75b300547ca6ecf074b6a3ab28 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Fri, 9 Apr 2021 18:11:34 +0200 Subject: [PATCH] KAFKA-12591; Remove deprecated `quota.producer.default` and `quota.consumer.default` configurations (#10427) `quota.producer.default` and `quota.consumer.default` were deprecated in AK 0.11.0.0. Dynamic default quotas must be used instead. This patch removes them for AK 3.0. Reviewers: Rajini Sivaram , Ismael Juma --- .../kafka/server/ClientQuotaManager.scala | 14 +------ .../main/scala/kafka/server/KafkaConfig.scala | 12 ------ .../scala/kafka/server/QuotaFactory.scala | 28 ++------------ .../kafka/api/ClientIdQuotaTest.scala | 11 ++++-- .../kafka/api/CustomQuotaCallbackTest.scala | 2 - .../kafka/api/UserClientIdQuotaTest.scala | 2 - .../integration/kafka/api/UserQuotaTest.scala | 4 +- .../DynamicBrokerReconfigurationTest.scala | 8 +++- .../kafka/server/ClientQuotaManagerTest.scala | 38 +++++++++++++------ .../unit/kafka/server/KafkaConfigTest.scala | 2 - docs/upgrade.html | 2 + 11 files changed, 49 insertions(+), 74 deletions(-) diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index 1f5b752d614..ac67ca9360d 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -44,21 +44,16 @@ case class ClientSensors(metricTags: Map[String, String], quotaSensor: Sensor, t /** * Configuration settings for quota management - * @param quotaDefault The default allocated to any client-id if - * dynamic defaults or user quotas are not set * @param numQuotaSamples The number of samples to retain in memory * @param quotaWindowSizeSeconds The time span of each sample * */ -case class ClientQuotaManagerConfig(quotaDefault: Long = - ClientQuotaManagerConfig.QuotaDefault, - numQuotaSamples: Int = +case class ClientQuotaManagerConfig(numQuotaSamples: Int = ClientQuotaManagerConfig.DefaultNumQuotaSamples, quotaWindowSizeSeconds: Int = ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds) object ClientQuotaManagerConfig { - val QuotaDefault = Long.MaxValue // Always have 10 whole windows + 1 current window val DefaultNumQuotaSamples = 11 val DefaultQuotaWindowSizeSeconds = 1 @@ -195,15 +190,12 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, private val lock = new ReentrantReadWriteLock() private val sensorAccessor = new SensorAccess(lock, metrics) private val quotaCallback = clientQuotaCallback.getOrElse(new DefaultQuotaCallback) - private val staticConfigClientIdQuota = Quota.upperBound(config.quotaDefault.toDouble) private val clientQuotaType = QuotaType.toClientQuotaType(quotaType) @volatile private var quotaTypesEnabled = clientQuotaCallback match { case Some(_) => QuotaTypes.CustomQuotas - case None => - if (config.quotaDefault == Long.MaxValue) QuotaTypes.NoQuotas - else QuotaTypes.ClientIdQuotaEnabled + case None => QuotaTypes.NoQuotas } private val delayQueueSensor = metrics.sensor(quotaType.toString + "-delayQueue") @@ -620,8 +612,6 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, // /config/clients/ quota = overriddenQuotas.get(DefaultClientIdQuotaEntity) } - if (quota == null) - quota = staticConfigClientIdQuota } } if (quota == null) null else quota.bound diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index e8a3e6cf4e4..e25cce8ae1e 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -205,8 +205,6 @@ object Defaults { val FetchMaxBytes = 55 * 1024 * 1024 /** ********* Quota Configuration ***********/ - val ProducerQuotaBytesPerSecondDefault = ClientQuotaManagerConfig.QuotaDefault - val ConsumerQuotaBytesPerSecondDefault = ClientQuotaManagerConfig.QuotaDefault val NumQuotaSamples: Int = ClientQuotaManagerConfig.DefaultNumQuotaSamples val QuotaWindowSizeSeconds: Int = ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds val NumReplicationQuotaSamples: Int = ReplicationQuotaManagerConfig.DefaultNumQuotaSamples @@ -511,8 +509,6 @@ object KafkaConfig { val FetchMaxBytes = "fetch.max.bytes" /** ********* Quota Configuration ***********/ - val ProducerQuotaBytesPerSecondDefaultProp = "quota.producer.default" - val ConsumerQuotaBytesPerSecondDefaultProp = "quota.consumer.default" val NumQuotaSamplesProp = "quota.window.num" val NumReplicationQuotaSamplesProp = "replication.quota.window.num" val NumAlterLogDirsReplicationQuotaSamplesProp = "alter.log.dirs.replication.quota.window.num" @@ -919,10 +915,6 @@ object KafkaConfig { val FetchMaxBytesDoc = "The maximum number of bytes we will return for a fetch request. Must be at least 1024." /** ********* Quota Configuration ***********/ - val ProducerQuotaBytesPerSecondDefaultDoc = "DEPRECATED: Used only when dynamic default quotas are not configured for , or in Zookeeper. " + - "Any producer distinguished by clientId will get throttled if it produces more bytes than this value per-second" - val ConsumerQuotaBytesPerSecondDefaultDoc = "DEPRECATED: Used only when dynamic default quotas are not configured for or in Zookeeper. " + - "Any consumer distinguished by clientId/consumer group will get throttled if it fetches more bytes than this value per-second" val NumQuotaSamplesDoc = "The number of samples to retain in memory for client quotas" val NumReplicationQuotaSamplesDoc = "The number of samples to retain in memory for replication quotas" val NumAlterLogDirsReplicationQuotaSamplesDoc = "The number of samples to retain in memory for alter log dirs replication quotas" @@ -1232,8 +1224,6 @@ object KafkaConfig { .define(KafkaMetricsPollingIntervalSecondsProp, INT, Defaults.KafkaMetricsPollingIntervalSeconds, atLeast(1), LOW, KafkaMetricsPollingIntervalSecondsDoc) /** ********* Quota configuration ***********/ - .define(ProducerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ProducerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ProducerQuotaBytesPerSecondDefaultDoc) - .define(ConsumerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ConsumerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ConsumerQuotaBytesPerSecondDefaultDoc) .define(NumQuotaSamplesProp, INT, Defaults.NumQuotaSamples, atLeast(1), LOW, NumQuotaSamplesDoc) .define(NumReplicationQuotaSamplesProp, INT, Defaults.NumReplicationQuotaSamples, atLeast(1), LOW, NumReplicationQuotaSamplesDoc) .define(NumAlterLogDirsReplicationQuotaSamplesProp, INT, Defaults.NumAlterLogDirsReplicationQuotaSamples, atLeast(1), LOW, NumAlterLogDirsReplicationQuotaSamplesDoc) @@ -1742,8 +1732,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO def passwordEncoderIterations = getInt(KafkaConfig.PasswordEncoderIterationsProp) /** ********* Quota Configuration **************/ - val producerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp) - val consumerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp) val numQuotaSamples = getInt(KafkaConfig.NumQuotaSamplesProp) val quotaWindowSizeSeconds = getInt(KafkaConfig.QuotaWindowSizeSecondsProp) val numReplicationQuotaSamples = getInt(KafkaConfig.NumReplicationQuotaSamplesProp) diff --git a/core/src/main/scala/kafka/server/QuotaFactory.scala b/core/src/main/scala/kafka/server/QuotaFactory.scala index 9d4443b904e..f3901f6b29b 100644 --- a/core/src/main/scala/kafka/server/QuotaFactory.scala +++ b/core/src/main/scala/kafka/server/QuotaFactory.scala @@ -76,9 +76,9 @@ object QuotaFactory extends Logging { val clientQuotaCallback = Option(cfg.getConfiguredInstance(KafkaConfig.ClientQuotaCallbackClassProp, classOf[ClientQuotaCallback])) QuotaManagers( - new ClientQuotaManager(clientFetchConfig(cfg), metrics, Fetch, time, threadNamePrefix, clientQuotaCallback), - new ClientQuotaManager(clientProduceConfig(cfg), metrics, Produce, time, threadNamePrefix, clientQuotaCallback), - new ClientRequestQuotaManager(clientRequestConfig(cfg), metrics, time, threadNamePrefix, clientQuotaCallback), + new ClientQuotaManager(clientConfig(cfg), metrics, Fetch, time, threadNamePrefix, clientQuotaCallback), + new ClientQuotaManager(clientConfig(cfg), metrics, Produce, time, threadNamePrefix, clientQuotaCallback), + new ClientRequestQuotaManager(clientConfig(cfg), metrics, time, threadNamePrefix, clientQuotaCallback), new ControllerMutationQuotaManager(clientControllerMutationConfig(cfg), metrics, time, threadNamePrefix, clientQuotaCallback), new ReplicationQuotaManager(replicationConfig(cfg), metrics, LeaderReplication, time), @@ -88,27 +88,7 @@ object QuotaFactory extends Logging { ) } - def clientProduceConfig(cfg: KafkaConfig): ClientQuotaManagerConfig = { - if (cfg.producerQuotaBytesPerSecondDefault != Long.MaxValue) - warn(s"${KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp} has been deprecated in 0.11.0.0 and will be removed in a future release. Use dynamic quota defaults instead.") - ClientQuotaManagerConfig( - quotaDefault = cfg.producerQuotaBytesPerSecondDefault, - numQuotaSamples = cfg.numQuotaSamples, - quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds - ) - } - - def clientFetchConfig(cfg: KafkaConfig): ClientQuotaManagerConfig = { - if (cfg.consumerQuotaBytesPerSecondDefault != Long.MaxValue) - warn(s"${KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp} has been deprecated in 0.11.0.0 and will be removed in a future release. Use dynamic quota defaults instead.") - ClientQuotaManagerConfig( - quotaDefault = cfg.consumerQuotaBytesPerSecondDefault, - numQuotaSamples = cfg.numQuotaSamples, - quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds - ) - } - - def clientRequestConfig(cfg: KafkaConfig): ClientQuotaManagerConfig = { + def clientConfig(cfg: KafkaConfig): ClientQuotaManagerConfig = { ClientQuotaManagerConfig( numQuotaSamples = cfg.numQuotaSamples, quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds diff --git a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala index 0d1ff497fdb..28b0b15ec9a 100644 --- a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala @@ -14,7 +14,7 @@ package kafka.api -import kafka.server.{KafkaConfig, KafkaServer} +import kafka.server.KafkaServer import org.apache.kafka.common.security.auth.KafkaPrincipal import org.junit.jupiter.api.BeforeEach @@ -25,9 +25,14 @@ class ClientIdQuotaTest extends BaseQuotaTest { @BeforeEach override def setUp(): Unit = { - this.serverConfig.setProperty(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, defaultProducerQuota.toString) - this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, defaultConsumerQuota.toString) super.setUp() + quotaTestClients.alterClientQuotas( + quotaTestClients.clientQuotaAlteration( + quotaTestClients.clientQuotaEntity(None, Some(QuotaTestClients.DefaultEntity)), + Some(defaultProducerQuota), Some(defaultConsumerQuota), Some(defaultRequestQuota) + ) + ) + quotaTestClients.waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota) } override def createQuotaTestClients(topic: String, leaderNode: KafkaServer): QuotaTestClients = { diff --git a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala index 7db050b45db..bfc967a67ae 100644 --- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala +++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala @@ -63,8 +63,6 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { @BeforeEach override def setUp(): Unit = { startSasl(jaasSections(kafkaServerSaslMechanisms, Some("SCRAM-SHA-256"), KafkaSasl, JaasTestUtils.KafkaServerContextName)) - this.serverConfig.setProperty(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString) - this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString) this.serverConfig.setProperty(KafkaConfig.ClientQuotaCallbackClassProp, classOf[GroupedUserQuotaCallback].getName) this.serverConfig.setProperty(s"${listenerName.configPrefix}${KafkaConfig.PrincipalBuilderClassProp}", classOf[GroupedUserPrincipalBuilder].getName) diff --git a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala index 56b2c02bacc..e442ddab1d7 100644 --- a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala @@ -32,8 +32,6 @@ class UserClientIdQuotaTest extends BaseQuotaTest { @BeforeEach override def setUp(): Unit = { this.serverConfig.setProperty(KafkaConfig.SslClientAuthProp, "required") - this.serverConfig.setProperty(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString) - this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString) super.setUp() quotaTestClients.alterClientQuotas( quotaTestClients.clientQuotaAlteration( diff --git a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala index 4917b4e2108..7a00f4067ac 100644 --- a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala @@ -16,7 +16,7 @@ package kafka.api import java.io.File -import kafka.server.{KafkaConfig, KafkaServer} +import kafka.server.KafkaServer import kafka.utils.JaasTestUtils import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.junit.jupiter.api.{AfterEach, BeforeEach} @@ -33,8 +33,6 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup { @BeforeEach override def setUp(): Unit = { startSasl(jaasSections(kafkaServerSaslMechanisms, Some("GSSAPI"), KafkaSasl, JaasTestUtils.KafkaServerContextName)) - this.serverConfig.setProperty(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString) - this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString) super.setUp() quotaTestClients.alterClientQuotas( quotaTestClients.clientQuotaAlteration( diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index c37d376208e..08a55aedef1 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -26,7 +26,6 @@ import java.time.Duration import java.util import java.util.{Collections, Properties} import java.util.concurrent._ - import javax.management.ObjectName import com.yammer.metrics.core.MetricName import kafka.admin.ConfigCommand @@ -52,6 +51,7 @@ import org.apache.kafka.common.config.types.Password import org.apache.kafka.common.config.provider.FileConfigProvider import org.apache.kafka.common.errors.{AuthenticationException, InvalidRequestException} import org.apache.kafka.common.internals.Topic +import org.apache.kafka.common.metrics.Quota import org.apache.kafka.common.metrics.{KafkaMetric, MetricsReporter} import org.apache.kafka.common.network.{ListenerName, Mode} import org.apache.kafka.common.network.CertStores.{KEYSTORE_PROPS, TRUSTSTORE_PROPS} @@ -121,7 +121,6 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet props.put(KafkaConfig.SaslEnabledMechanismsProp, kafkaServerSaslMechanisms.mkString(",")) props.put(KafkaConfig.LogSegmentBytesProp, "2000") // low value to test log rolling on config update props.put(KafkaConfig.NumReplicaFetchersProp, "2") // greater than one to test reducing threads - props.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "10000000") // non-default value to trigger a new metric props.put(KafkaConfig.PasswordEncoderSecretProp, "dynamic-config-secret") props.put(KafkaConfig.LogRetentionTimeMillisProp, 1680000000.toString) props.put(KafkaConfig.LogRetentionTimeHoursProp, 168.toString) @@ -870,7 +869,12 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet } assertEquals(servers.map(_.config.brokerId).toSet, TestMetricsReporter.configuredBrokers.toSet) + // non-default value to trigger a new metric val clientId = "test-client-1" + servers.foreach { server => + server.quotaManagers.produce.updateQuota(None, Some(clientId), Some(clientId), + Some(Quota.upperBound(10000000))) + } val (producerThread, consumerThread) = startProduceConsume(retries = 0, clientId) TestUtils.waitUntilTrue(() => consumerThread.received >= 5, "Messages not sent") diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala index c9991b451ed..4159a1b3c04 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala @@ -28,7 +28,7 @@ import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { - private val config = ClientQuotaManagerConfig(quotaDefault = 500) + private val config = ClientQuotaManagerConfig() private def testQuotaParsing(config: ClientQuotaManagerConfig, client1: UserClient, client2: UserClient, randomClient: UserClient, defaultConfigClient: UserClient): Unit = { val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce, time, "") @@ -38,10 +38,12 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { clientQuotaManager.updateQuota(client1.configUser, client1.configClientId, client1.sanitizedConfigClientId, Some(new Quota(2000, true))) clientQuotaManager.updateQuota(client2.configUser, client2.configClientId, client2.sanitizedConfigClientId, Some(new Quota(4000, true))) - assertEquals(config.quotaDefault.toDouble, - clientQuotaManager.quota(randomClient.user, randomClient.clientId).bound, 0.0, "Default producer quota should be " + config.quotaDefault) - assertEquals(2000, clientQuotaManager.quota(client1.user, client1.clientId).bound, 0.0, "Should return the overridden value (2000)") - assertEquals(4000, clientQuotaManager.quota(client2.user, client2.clientId).bound, 0.0, "Should return the overridden value (4000)") + assertEquals(Long.MaxValue.toDouble, clientQuotaManager.quota(randomClient.user, randomClient.clientId).bound, 0.0, + "Default producer quota should be " + Long.MaxValue.toDouble) + assertEquals(2000, clientQuotaManager.quota(client1.user, client1.clientId).bound, 0.0, + "Should return the overridden value (2000)") + assertEquals(4000, clientQuotaManager.quota(client2.user, client2.clientId).bound, 0.0, + "Should return the overridden value (4000)") // p1 should be throttled using the overridden quota var throttleTimeMs = maybeRecord(clientQuotaManager, client1.user, client1.clientId, 2500 * config.numQuotaSamples) @@ -98,7 +100,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { val client2 = UserClient("User2", "p2", Some("User2"), None) val randomClient = UserClient("RandomUser", "random-client-id", None, None) val defaultConfigClient = UserClient("", "", Some(ConfigEntityName.Default), None) - val config = ClientQuotaManagerConfig(quotaDefault = Long.MaxValue) + val config = ClientQuotaManagerConfig() testQuotaParsing(config, client1, client2, randomClient, defaultConfigClient) } @@ -112,7 +114,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { val client2 = UserClient("User2", "p2", Some("User2"), Some("p2")) val randomClient = UserClient("RandomUser", "random-client-id", None, None) val defaultConfigClient = UserClient("", "", Some(ConfigEntityName.Default), Some(ConfigEntityName.Default)) - val config = ClientQuotaManagerConfig(quotaDefault = Long.MaxValue) + val config = ClientQuotaManagerConfig() testQuotaParsing(config, client1, client2, randomClient, defaultConfigClient) } @@ -158,7 +160,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { @Test def testGetMaxValueInQuotaWindowWithNonDefaultQuotaWindow(): Unit = { val numFullQuotaWindows = 3 // 3 seconds window (vs. 10 seconds default) - val nonDefaultConfig = ClientQuotaManagerConfig(quotaDefault = Long.MaxValue, numQuotaSamples = numFullQuotaWindows + 1) + val nonDefaultConfig = ClientQuotaManagerConfig(numQuotaSamples = numFullQuotaWindows + 1) val clientQuotaManager = new ClientQuotaManager(nonDefaultConfig, metrics, Fetch, time, "") val userSession = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "userA"), InetAddress.getLocalHost) @@ -177,7 +179,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { @Test def testSetAndRemoveDefaultUserQuota(): Unit = { // quotaTypesEnabled will be QuotaTypes.NoQuotas initially - val clientQuotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(quotaDefault = Long.MaxValue), + val clientQuotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(), metrics, Produce, time, "") try { @@ -199,7 +201,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { @Test def testSetAndRemoveUserQuota(): Unit = { // quotaTypesEnabled will be QuotaTypes.NoQuotas initially - val clientQuotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(quotaDefault = Long.MaxValue), + val clientQuotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(), metrics, Produce, time, "") try { @@ -218,7 +220,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { @Test def testSetAndRemoveUserClientQuota(): Unit = { // quotaTypesEnabled will be QuotaTypes.NoQuotas initially - val clientQuotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(quotaDefault = Long.MaxValue), + val clientQuotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(), metrics, Produce, time, "") try { @@ -236,7 +238,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { @Test def testQuotaConfigPrecedence(): Unit = { - val clientQuotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(quotaDefault=Long.MaxValue), + val clientQuotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(), metrics, Produce, time, "") try { @@ -303,6 +305,9 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce, time, "") val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "Produce", "")) try { + clientQuotaManager.updateQuota(None, Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), + Some(new Quota(500, true))) + // We have 10 second windows. Make sure that there is no quota violation // if we produce under the quota for (_ <- 0 until 10) { @@ -348,6 +353,9 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { def testExpireThrottleTimeSensor(): Unit = { val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce, time, "") try { + clientQuotaManager.updateQuota(None, Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), + Some(new Quota(500, true))) + maybeRecord(clientQuotaManager, "ANONYMOUS", "client1", 100) // remove the throttle time sensor metrics.removeSensor("ProduceThrottleTime-:client1") @@ -367,6 +375,9 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { def testExpireQuotaSensors(): Unit = { val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce, time, "") try { + clientQuotaManager.updateQuota(None, Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), + Some(new Quota(500, true))) + maybeRecord(clientQuotaManager, "ANONYMOUS", "client1", 100) // remove all the sensors metrics.removeSensor("ProduceThrottleTime-:client1") @@ -391,6 +402,9 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce, time, "") val clientId = "client@#$%" try { + clientQuotaManager.updateQuota(None, Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), + Some(new Quota(500, true))) + maybeRecord(clientQuotaManager, "ANONYMOUS", clientId, 100) // The metrics should use the raw client ID, even if the reporters internally sanitize them diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 1f803b64762..36613db0d97 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -743,8 +743,6 @@ class KafkaConfigTest { case KafkaConfig.TransactionsTopicPartitionsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2") case KafkaConfig.TransactionsTopicSegmentBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2") case KafkaConfig.TransactionsTopicReplicationFactorProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2") - case KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") - case KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") case KafkaConfig.NumQuotaSamplesProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") case KafkaConfig.QuotaWindowSizeSecondsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") case KafkaConfig.DeleteTopicEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean", "0") diff --git a/docs/upgrade.html b/docs/upgrade.html index a0bc5af2c1a..04505b4bed3 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -53,6 +53,8 @@
  • The MessageFormatter.init(Properties) method was removed. Please use configure(Map) instead.
  • Kafka Streams no longer has a compile time dependency on "connect:json" module (KAFKA-5146). Projects that were relying on this transitive dependency will have to explicitly declare it.
  • +
  • The deprecated quota.producer.default and quota.consumer.default configurations were removed (KAFKA-12591). + Dynamic quota defaults must be used instead.