KAFKA-12591; Remove deprecated `quota.producer.default` and `quota.consumer.default` configurations (#10427)

`quota.producer.default` and `quota.consumer.default` were deprecated in AK 0.11.0.0. Dynamic default quotas must be used instead. This patch removes them for AK 3.0. 

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
David Jacot 2021-04-09 18:11:34 +02:00 committed by GitHub
parent 8d356b034b
commit ff3b2d67a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 49 additions and 74 deletions

View File

@ -44,21 +44,16 @@ case class ClientSensors(metricTags: Map[String, String], quotaSensor: Sensor, t
/** /**
* Configuration settings for quota management * Configuration settings for quota management
* @param quotaDefault The default allocated to any client-id if
* dynamic defaults or user quotas are not set
* @param numQuotaSamples The number of samples to retain in memory * @param numQuotaSamples The number of samples to retain in memory
* @param quotaWindowSizeSeconds The time span of each sample * @param quotaWindowSizeSeconds The time span of each sample
* *
*/ */
case class ClientQuotaManagerConfig(quotaDefault: Long = case class ClientQuotaManagerConfig(numQuotaSamples: Int =
ClientQuotaManagerConfig.QuotaDefault,
numQuotaSamples: Int =
ClientQuotaManagerConfig.DefaultNumQuotaSamples, ClientQuotaManagerConfig.DefaultNumQuotaSamples,
quotaWindowSizeSeconds: Int = quotaWindowSizeSeconds: Int =
ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds) ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds)
object ClientQuotaManagerConfig { object ClientQuotaManagerConfig {
val QuotaDefault = Long.MaxValue
// Always have 10 whole windows + 1 current window // Always have 10 whole windows + 1 current window
val DefaultNumQuotaSamples = 11 val DefaultNumQuotaSamples = 11
val DefaultQuotaWindowSizeSeconds = 1 val DefaultQuotaWindowSizeSeconds = 1
@ -195,15 +190,12 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
private val lock = new ReentrantReadWriteLock() private val lock = new ReentrantReadWriteLock()
private val sensorAccessor = new SensorAccess(lock, metrics) private val sensorAccessor = new SensorAccess(lock, metrics)
private val quotaCallback = clientQuotaCallback.getOrElse(new DefaultQuotaCallback) private val quotaCallback = clientQuotaCallback.getOrElse(new DefaultQuotaCallback)
private val staticConfigClientIdQuota = Quota.upperBound(config.quotaDefault.toDouble)
private val clientQuotaType = QuotaType.toClientQuotaType(quotaType) private val clientQuotaType = QuotaType.toClientQuotaType(quotaType)
@volatile @volatile
private var quotaTypesEnabled = clientQuotaCallback match { private var quotaTypesEnabled = clientQuotaCallback match {
case Some(_) => QuotaTypes.CustomQuotas case Some(_) => QuotaTypes.CustomQuotas
case None => case None => QuotaTypes.NoQuotas
if (config.quotaDefault == Long.MaxValue) QuotaTypes.NoQuotas
else QuotaTypes.ClientIdQuotaEnabled
} }
private val delayQueueSensor = metrics.sensor(quotaType.toString + "-delayQueue") private val delayQueueSensor = metrics.sensor(quotaType.toString + "-delayQueue")
@ -620,8 +612,6 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
// /config/clients/<default> // /config/clients/<default>
quota = overriddenQuotas.get(DefaultClientIdQuotaEntity) quota = overriddenQuotas.get(DefaultClientIdQuotaEntity)
} }
if (quota == null)
quota = staticConfigClientIdQuota
} }
} }
if (quota == null) null else quota.bound if (quota == null) null else quota.bound

View File

@ -205,8 +205,6 @@ object Defaults {
val FetchMaxBytes = 55 * 1024 * 1024 val FetchMaxBytes = 55 * 1024 * 1024
/** ********* Quota Configuration ***********/ /** ********* Quota Configuration ***********/
val ProducerQuotaBytesPerSecondDefault = ClientQuotaManagerConfig.QuotaDefault
val ConsumerQuotaBytesPerSecondDefault = ClientQuotaManagerConfig.QuotaDefault
val NumQuotaSamples: Int = ClientQuotaManagerConfig.DefaultNumQuotaSamples val NumQuotaSamples: Int = ClientQuotaManagerConfig.DefaultNumQuotaSamples
val QuotaWindowSizeSeconds: Int = ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds val QuotaWindowSizeSeconds: Int = ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds
val NumReplicationQuotaSamples: Int = ReplicationQuotaManagerConfig.DefaultNumQuotaSamples val NumReplicationQuotaSamples: Int = ReplicationQuotaManagerConfig.DefaultNumQuotaSamples
@ -511,8 +509,6 @@ object KafkaConfig {
val FetchMaxBytes = "fetch.max.bytes" val FetchMaxBytes = "fetch.max.bytes"
/** ********* Quota Configuration ***********/ /** ********* Quota Configuration ***********/
val ProducerQuotaBytesPerSecondDefaultProp = "quota.producer.default"
val ConsumerQuotaBytesPerSecondDefaultProp = "quota.consumer.default"
val NumQuotaSamplesProp = "quota.window.num" val NumQuotaSamplesProp = "quota.window.num"
val NumReplicationQuotaSamplesProp = "replication.quota.window.num" val NumReplicationQuotaSamplesProp = "replication.quota.window.num"
val NumAlterLogDirsReplicationQuotaSamplesProp = "alter.log.dirs.replication.quota.window.num" val NumAlterLogDirsReplicationQuotaSamplesProp = "alter.log.dirs.replication.quota.window.num"
@ -919,10 +915,6 @@ object KafkaConfig {
val FetchMaxBytesDoc = "The maximum number of bytes we will return for a fetch request. Must be at least 1024." val FetchMaxBytesDoc = "The maximum number of bytes we will return for a fetch request. Must be at least 1024."
/** ********* Quota Configuration ***********/ /** ********* Quota Configuration ***********/
val ProducerQuotaBytesPerSecondDefaultDoc = "DEPRECATED: Used only when dynamic default quotas are not configured for <user>, <client-id> or <user, client-id> in Zookeeper. " +
"Any producer distinguished by clientId will get throttled if it produces more bytes than this value per-second"
val ConsumerQuotaBytesPerSecondDefaultDoc = "DEPRECATED: Used only when dynamic default quotas are not configured for <user, <client-id> or <user, client-id> in Zookeeper. " +
"Any consumer distinguished by clientId/consumer group will get throttled if it fetches more bytes than this value per-second"
val NumQuotaSamplesDoc = "The number of samples to retain in memory for client quotas" val NumQuotaSamplesDoc = "The number of samples to retain in memory for client quotas"
val NumReplicationQuotaSamplesDoc = "The number of samples to retain in memory for replication quotas" val NumReplicationQuotaSamplesDoc = "The number of samples to retain in memory for replication quotas"
val NumAlterLogDirsReplicationQuotaSamplesDoc = "The number of samples to retain in memory for alter log dirs replication quotas" val NumAlterLogDirsReplicationQuotaSamplesDoc = "The number of samples to retain in memory for alter log dirs replication quotas"
@ -1232,8 +1224,6 @@ object KafkaConfig {
.define(KafkaMetricsPollingIntervalSecondsProp, INT, Defaults.KafkaMetricsPollingIntervalSeconds, atLeast(1), LOW, KafkaMetricsPollingIntervalSecondsDoc) .define(KafkaMetricsPollingIntervalSecondsProp, INT, Defaults.KafkaMetricsPollingIntervalSeconds, atLeast(1), LOW, KafkaMetricsPollingIntervalSecondsDoc)
/** ********* Quota configuration ***********/ /** ********* Quota configuration ***********/
.define(ProducerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ProducerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ProducerQuotaBytesPerSecondDefaultDoc)
.define(ConsumerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ConsumerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ConsumerQuotaBytesPerSecondDefaultDoc)
.define(NumQuotaSamplesProp, INT, Defaults.NumQuotaSamples, atLeast(1), LOW, NumQuotaSamplesDoc) .define(NumQuotaSamplesProp, INT, Defaults.NumQuotaSamples, atLeast(1), LOW, NumQuotaSamplesDoc)
.define(NumReplicationQuotaSamplesProp, INT, Defaults.NumReplicationQuotaSamples, atLeast(1), LOW, NumReplicationQuotaSamplesDoc) .define(NumReplicationQuotaSamplesProp, INT, Defaults.NumReplicationQuotaSamples, atLeast(1), LOW, NumReplicationQuotaSamplesDoc)
.define(NumAlterLogDirsReplicationQuotaSamplesProp, INT, Defaults.NumAlterLogDirsReplicationQuotaSamples, atLeast(1), LOW, NumAlterLogDirsReplicationQuotaSamplesDoc) .define(NumAlterLogDirsReplicationQuotaSamplesProp, INT, Defaults.NumAlterLogDirsReplicationQuotaSamples, atLeast(1), LOW, NumAlterLogDirsReplicationQuotaSamplesDoc)
@ -1742,8 +1732,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
def passwordEncoderIterations = getInt(KafkaConfig.PasswordEncoderIterationsProp) def passwordEncoderIterations = getInt(KafkaConfig.PasswordEncoderIterationsProp)
/** ********* Quota Configuration **************/ /** ********* Quota Configuration **************/
val producerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp)
val consumerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp)
val numQuotaSamples = getInt(KafkaConfig.NumQuotaSamplesProp) val numQuotaSamples = getInt(KafkaConfig.NumQuotaSamplesProp)
val quotaWindowSizeSeconds = getInt(KafkaConfig.QuotaWindowSizeSecondsProp) val quotaWindowSizeSeconds = getInt(KafkaConfig.QuotaWindowSizeSecondsProp)
val numReplicationQuotaSamples = getInt(KafkaConfig.NumReplicationQuotaSamplesProp) val numReplicationQuotaSamples = getInt(KafkaConfig.NumReplicationQuotaSamplesProp)

View File

@ -76,9 +76,9 @@ object QuotaFactory extends Logging {
val clientQuotaCallback = Option(cfg.getConfiguredInstance(KafkaConfig.ClientQuotaCallbackClassProp, val clientQuotaCallback = Option(cfg.getConfiguredInstance(KafkaConfig.ClientQuotaCallbackClassProp,
classOf[ClientQuotaCallback])) classOf[ClientQuotaCallback]))
QuotaManagers( QuotaManagers(
new ClientQuotaManager(clientFetchConfig(cfg), metrics, Fetch, time, threadNamePrefix, clientQuotaCallback), new ClientQuotaManager(clientConfig(cfg), metrics, Fetch, time, threadNamePrefix, clientQuotaCallback),
new ClientQuotaManager(clientProduceConfig(cfg), metrics, Produce, time, threadNamePrefix, clientQuotaCallback), new ClientQuotaManager(clientConfig(cfg), metrics, Produce, time, threadNamePrefix, clientQuotaCallback),
new ClientRequestQuotaManager(clientRequestConfig(cfg), metrics, time, threadNamePrefix, clientQuotaCallback), new ClientRequestQuotaManager(clientConfig(cfg), metrics, time, threadNamePrefix, clientQuotaCallback),
new ControllerMutationQuotaManager(clientControllerMutationConfig(cfg), metrics, time, new ControllerMutationQuotaManager(clientControllerMutationConfig(cfg), metrics, time,
threadNamePrefix, clientQuotaCallback), threadNamePrefix, clientQuotaCallback),
new ReplicationQuotaManager(replicationConfig(cfg), metrics, LeaderReplication, time), new ReplicationQuotaManager(replicationConfig(cfg), metrics, LeaderReplication, time),
@ -88,27 +88,7 @@ object QuotaFactory extends Logging {
) )
} }
def clientProduceConfig(cfg: KafkaConfig): ClientQuotaManagerConfig = { def clientConfig(cfg: KafkaConfig): ClientQuotaManagerConfig = {
if (cfg.producerQuotaBytesPerSecondDefault != Long.MaxValue)
warn(s"${KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp} has been deprecated in 0.11.0.0 and will be removed in a future release. Use dynamic quota defaults instead.")
ClientQuotaManagerConfig(
quotaDefault = cfg.producerQuotaBytesPerSecondDefault,
numQuotaSamples = cfg.numQuotaSamples,
quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds
)
}
def clientFetchConfig(cfg: KafkaConfig): ClientQuotaManagerConfig = {
if (cfg.consumerQuotaBytesPerSecondDefault != Long.MaxValue)
warn(s"${KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp} has been deprecated in 0.11.0.0 and will be removed in a future release. Use dynamic quota defaults instead.")
ClientQuotaManagerConfig(
quotaDefault = cfg.consumerQuotaBytesPerSecondDefault,
numQuotaSamples = cfg.numQuotaSamples,
quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds
)
}
def clientRequestConfig(cfg: KafkaConfig): ClientQuotaManagerConfig = {
ClientQuotaManagerConfig( ClientQuotaManagerConfig(
numQuotaSamples = cfg.numQuotaSamples, numQuotaSamples = cfg.numQuotaSamples,
quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds

View File

@ -14,7 +14,7 @@
package kafka.api package kafka.api
import kafka.server.{KafkaConfig, KafkaServer} import kafka.server.KafkaServer
import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.BeforeEach
@ -25,9 +25,14 @@ class ClientIdQuotaTest extends BaseQuotaTest {
@BeforeEach @BeforeEach
override def setUp(): Unit = { override def setUp(): Unit = {
this.serverConfig.setProperty(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, defaultProducerQuota.toString)
this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, defaultConsumerQuota.toString)
super.setUp() super.setUp()
quotaTestClients.alterClientQuotas(
quotaTestClients.clientQuotaAlteration(
quotaTestClients.clientQuotaEntity(None, Some(QuotaTestClients.DefaultEntity)),
Some(defaultProducerQuota), Some(defaultConsumerQuota), Some(defaultRequestQuota)
)
)
quotaTestClients.waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
} }
override def createQuotaTestClients(topic: String, leaderNode: KafkaServer): QuotaTestClients = { override def createQuotaTestClients(topic: String, leaderNode: KafkaServer): QuotaTestClients = {

View File

@ -63,8 +63,6 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
@BeforeEach @BeforeEach
override def setUp(): Unit = { override def setUp(): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Some("SCRAM-SHA-256"), KafkaSasl, JaasTestUtils.KafkaServerContextName)) startSasl(jaasSections(kafkaServerSaslMechanisms, Some("SCRAM-SHA-256"), KafkaSasl, JaasTestUtils.KafkaServerContextName))
this.serverConfig.setProperty(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
this.serverConfig.setProperty(KafkaConfig.ClientQuotaCallbackClassProp, classOf[GroupedUserQuotaCallback].getName) this.serverConfig.setProperty(KafkaConfig.ClientQuotaCallbackClassProp, classOf[GroupedUserQuotaCallback].getName)
this.serverConfig.setProperty(s"${listenerName.configPrefix}${KafkaConfig.PrincipalBuilderClassProp}", this.serverConfig.setProperty(s"${listenerName.configPrefix}${KafkaConfig.PrincipalBuilderClassProp}",
classOf[GroupedUserPrincipalBuilder].getName) classOf[GroupedUserPrincipalBuilder].getName)

View File

@ -32,8 +32,6 @@ class UserClientIdQuotaTest extends BaseQuotaTest {
@BeforeEach @BeforeEach
override def setUp(): Unit = { override def setUp(): Unit = {
this.serverConfig.setProperty(KafkaConfig.SslClientAuthProp, "required") this.serverConfig.setProperty(KafkaConfig.SslClientAuthProp, "required")
this.serverConfig.setProperty(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
super.setUp() super.setUp()
quotaTestClients.alterClientQuotas( quotaTestClients.alterClientQuotas(
quotaTestClients.clientQuotaAlteration( quotaTestClients.clientQuotaAlteration(

View File

@ -16,7 +16,7 @@ package kafka.api
import java.io.File import java.io.File
import kafka.server.{KafkaConfig, KafkaServer} import kafka.server.KafkaServer
import kafka.utils.JaasTestUtils import kafka.utils.JaasTestUtils
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.junit.jupiter.api.{AfterEach, BeforeEach} import org.junit.jupiter.api.{AfterEach, BeforeEach}
@ -33,8 +33,6 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup {
@BeforeEach @BeforeEach
override def setUp(): Unit = { override def setUp(): Unit = {
startSasl(jaasSections(kafkaServerSaslMechanisms, Some("GSSAPI"), KafkaSasl, JaasTestUtils.KafkaServerContextName)) startSasl(jaasSections(kafkaServerSaslMechanisms, Some("GSSAPI"), KafkaSasl, JaasTestUtils.KafkaServerContextName))
this.serverConfig.setProperty(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
super.setUp() super.setUp()
quotaTestClients.alterClientQuotas( quotaTestClients.alterClientQuotas(
quotaTestClients.clientQuotaAlteration( quotaTestClients.clientQuotaAlteration(

View File

@ -26,7 +26,6 @@ import java.time.Duration
import java.util import java.util
import java.util.{Collections, Properties} import java.util.{Collections, Properties}
import java.util.concurrent._ import java.util.concurrent._
import javax.management.ObjectName import javax.management.ObjectName
import com.yammer.metrics.core.MetricName import com.yammer.metrics.core.MetricName
import kafka.admin.ConfigCommand import kafka.admin.ConfigCommand
@ -52,6 +51,7 @@ import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.config.provider.FileConfigProvider import org.apache.kafka.common.config.provider.FileConfigProvider
import org.apache.kafka.common.errors.{AuthenticationException, InvalidRequestException} import org.apache.kafka.common.errors.{AuthenticationException, InvalidRequestException}
import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.common.metrics.{KafkaMetric, MetricsReporter} import org.apache.kafka.common.metrics.{KafkaMetric, MetricsReporter}
import org.apache.kafka.common.network.{ListenerName, Mode} import org.apache.kafka.common.network.{ListenerName, Mode}
import org.apache.kafka.common.network.CertStores.{KEYSTORE_PROPS, TRUSTSTORE_PROPS} import org.apache.kafka.common.network.CertStores.{KEYSTORE_PROPS, TRUSTSTORE_PROPS}
@ -121,7 +121,6 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
props.put(KafkaConfig.SaslEnabledMechanismsProp, kafkaServerSaslMechanisms.mkString(",")) props.put(KafkaConfig.SaslEnabledMechanismsProp, kafkaServerSaslMechanisms.mkString(","))
props.put(KafkaConfig.LogSegmentBytesProp, "2000") // low value to test log rolling on config update props.put(KafkaConfig.LogSegmentBytesProp, "2000") // low value to test log rolling on config update
props.put(KafkaConfig.NumReplicaFetchersProp, "2") // greater than one to test reducing threads props.put(KafkaConfig.NumReplicaFetchersProp, "2") // greater than one to test reducing threads
props.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "10000000") // non-default value to trigger a new metric
props.put(KafkaConfig.PasswordEncoderSecretProp, "dynamic-config-secret") props.put(KafkaConfig.PasswordEncoderSecretProp, "dynamic-config-secret")
props.put(KafkaConfig.LogRetentionTimeMillisProp, 1680000000.toString) props.put(KafkaConfig.LogRetentionTimeMillisProp, 1680000000.toString)
props.put(KafkaConfig.LogRetentionTimeHoursProp, 168.toString) props.put(KafkaConfig.LogRetentionTimeHoursProp, 168.toString)
@ -870,7 +869,12 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
} }
assertEquals(servers.map(_.config.brokerId).toSet, TestMetricsReporter.configuredBrokers.toSet) assertEquals(servers.map(_.config.brokerId).toSet, TestMetricsReporter.configuredBrokers.toSet)
// non-default value to trigger a new metric
val clientId = "test-client-1" val clientId = "test-client-1"
servers.foreach { server =>
server.quotaManagers.produce.updateQuota(None, Some(clientId), Some(clientId),
Some(Quota.upperBound(10000000)))
}
val (producerThread, consumerThread) = startProduceConsume(retries = 0, clientId) val (producerThread, consumerThread) = startProduceConsume(retries = 0, clientId)
TestUtils.waitUntilTrue(() => consumerThread.received >= 5, "Messages not sent") TestUtils.waitUntilTrue(() => consumerThread.received >= 5, "Messages not sent")

View File

@ -28,7 +28,7 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
private val config = ClientQuotaManagerConfig(quotaDefault = 500) private val config = ClientQuotaManagerConfig()
private def testQuotaParsing(config: ClientQuotaManagerConfig, client1: UserClient, client2: UserClient, randomClient: UserClient, defaultConfigClient: UserClient): Unit = { private def testQuotaParsing(config: ClientQuotaManagerConfig, client1: UserClient, client2: UserClient, randomClient: UserClient, defaultConfigClient: UserClient): Unit = {
val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce, time, "") val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce, time, "")
@ -38,10 +38,12 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
clientQuotaManager.updateQuota(client1.configUser, client1.configClientId, client1.sanitizedConfigClientId, Some(new Quota(2000, true))) clientQuotaManager.updateQuota(client1.configUser, client1.configClientId, client1.sanitizedConfigClientId, Some(new Quota(2000, true)))
clientQuotaManager.updateQuota(client2.configUser, client2.configClientId, client2.sanitizedConfigClientId, Some(new Quota(4000, true))) clientQuotaManager.updateQuota(client2.configUser, client2.configClientId, client2.sanitizedConfigClientId, Some(new Quota(4000, true)))
assertEquals(config.quotaDefault.toDouble, assertEquals(Long.MaxValue.toDouble, clientQuotaManager.quota(randomClient.user, randomClient.clientId).bound, 0.0,
clientQuotaManager.quota(randomClient.user, randomClient.clientId).bound, 0.0, "Default producer quota should be " + config.quotaDefault) "Default producer quota should be " + Long.MaxValue.toDouble)
assertEquals(2000, clientQuotaManager.quota(client1.user, client1.clientId).bound, 0.0, "Should return the overridden value (2000)") assertEquals(2000, clientQuotaManager.quota(client1.user, client1.clientId).bound, 0.0,
assertEquals(4000, clientQuotaManager.quota(client2.user, client2.clientId).bound, 0.0, "Should return the overridden value (4000)") "Should return the overridden value (2000)")
assertEquals(4000, clientQuotaManager.quota(client2.user, client2.clientId).bound, 0.0,
"Should return the overridden value (4000)")
// p1 should be throttled using the overridden quota // p1 should be throttled using the overridden quota
var throttleTimeMs = maybeRecord(clientQuotaManager, client1.user, client1.clientId, 2500 * config.numQuotaSamples) var throttleTimeMs = maybeRecord(clientQuotaManager, client1.user, client1.clientId, 2500 * config.numQuotaSamples)
@ -98,7 +100,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
val client2 = UserClient("User2", "p2", Some("User2"), None) val client2 = UserClient("User2", "p2", Some("User2"), None)
val randomClient = UserClient("RandomUser", "random-client-id", None, None) val randomClient = UserClient("RandomUser", "random-client-id", None, None)
val defaultConfigClient = UserClient("", "", Some(ConfigEntityName.Default), None) val defaultConfigClient = UserClient("", "", Some(ConfigEntityName.Default), None)
val config = ClientQuotaManagerConfig(quotaDefault = Long.MaxValue) val config = ClientQuotaManagerConfig()
testQuotaParsing(config, client1, client2, randomClient, defaultConfigClient) testQuotaParsing(config, client1, client2, randomClient, defaultConfigClient)
} }
@ -112,7 +114,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
val client2 = UserClient("User2", "p2", Some("User2"), Some("p2")) val client2 = UserClient("User2", "p2", Some("User2"), Some("p2"))
val randomClient = UserClient("RandomUser", "random-client-id", None, None) val randomClient = UserClient("RandomUser", "random-client-id", None, None)
val defaultConfigClient = UserClient("", "", Some(ConfigEntityName.Default), Some(ConfigEntityName.Default)) val defaultConfigClient = UserClient("", "", Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))
val config = ClientQuotaManagerConfig(quotaDefault = Long.MaxValue) val config = ClientQuotaManagerConfig()
testQuotaParsing(config, client1, client2, randomClient, defaultConfigClient) testQuotaParsing(config, client1, client2, randomClient, defaultConfigClient)
} }
@ -158,7 +160,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
@Test @Test
def testGetMaxValueInQuotaWindowWithNonDefaultQuotaWindow(): Unit = { def testGetMaxValueInQuotaWindowWithNonDefaultQuotaWindow(): Unit = {
val numFullQuotaWindows = 3 // 3 seconds window (vs. 10 seconds default) val numFullQuotaWindows = 3 // 3 seconds window (vs. 10 seconds default)
val nonDefaultConfig = ClientQuotaManagerConfig(quotaDefault = Long.MaxValue, numQuotaSamples = numFullQuotaWindows + 1) val nonDefaultConfig = ClientQuotaManagerConfig(numQuotaSamples = numFullQuotaWindows + 1)
val clientQuotaManager = new ClientQuotaManager(nonDefaultConfig, metrics, Fetch, time, "") val clientQuotaManager = new ClientQuotaManager(nonDefaultConfig, metrics, Fetch, time, "")
val userSession = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "userA"), InetAddress.getLocalHost) val userSession = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "userA"), InetAddress.getLocalHost)
@ -177,7 +179,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
@Test @Test
def testSetAndRemoveDefaultUserQuota(): Unit = { def testSetAndRemoveDefaultUserQuota(): Unit = {
// quotaTypesEnabled will be QuotaTypes.NoQuotas initially // quotaTypesEnabled will be QuotaTypes.NoQuotas initially
val clientQuotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(quotaDefault = Long.MaxValue), val clientQuotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(),
metrics, Produce, time, "") metrics, Produce, time, "")
try { try {
@ -199,7 +201,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
@Test @Test
def testSetAndRemoveUserQuota(): Unit = { def testSetAndRemoveUserQuota(): Unit = {
// quotaTypesEnabled will be QuotaTypes.NoQuotas initially // quotaTypesEnabled will be QuotaTypes.NoQuotas initially
val clientQuotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(quotaDefault = Long.MaxValue), val clientQuotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(),
metrics, Produce, time, "") metrics, Produce, time, "")
try { try {
@ -218,7 +220,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
@Test @Test
def testSetAndRemoveUserClientQuota(): Unit = { def testSetAndRemoveUserClientQuota(): Unit = {
// quotaTypesEnabled will be QuotaTypes.NoQuotas initially // quotaTypesEnabled will be QuotaTypes.NoQuotas initially
val clientQuotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(quotaDefault = Long.MaxValue), val clientQuotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(),
metrics, Produce, time, "") metrics, Produce, time, "")
try { try {
@ -236,7 +238,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
@Test @Test
def testQuotaConfigPrecedence(): Unit = { def testQuotaConfigPrecedence(): Unit = {
val clientQuotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(quotaDefault=Long.MaxValue), val clientQuotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(),
metrics, Produce, time, "") metrics, Produce, time, "")
try { try {
@ -303,6 +305,9 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce, time, "") val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce, time, "")
val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "Produce", "")) val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "Produce", ""))
try { try {
clientQuotaManager.updateQuota(None, Some(ConfigEntityName.Default), Some(ConfigEntityName.Default),
Some(new Quota(500, true)))
// We have 10 second windows. Make sure that there is no quota violation // We have 10 second windows. Make sure that there is no quota violation
// if we produce under the quota // if we produce under the quota
for (_ <- 0 until 10) { for (_ <- 0 until 10) {
@ -348,6 +353,9 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
def testExpireThrottleTimeSensor(): Unit = { def testExpireThrottleTimeSensor(): Unit = {
val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce, time, "") val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce, time, "")
try { try {
clientQuotaManager.updateQuota(None, Some(ConfigEntityName.Default), Some(ConfigEntityName.Default),
Some(new Quota(500, true)))
maybeRecord(clientQuotaManager, "ANONYMOUS", "client1", 100) maybeRecord(clientQuotaManager, "ANONYMOUS", "client1", 100)
// remove the throttle time sensor // remove the throttle time sensor
metrics.removeSensor("ProduceThrottleTime-:client1") metrics.removeSensor("ProduceThrottleTime-:client1")
@ -367,6 +375,9 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
def testExpireQuotaSensors(): Unit = { def testExpireQuotaSensors(): Unit = {
val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce, time, "") val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce, time, "")
try { try {
clientQuotaManager.updateQuota(None, Some(ConfigEntityName.Default), Some(ConfigEntityName.Default),
Some(new Quota(500, true)))
maybeRecord(clientQuotaManager, "ANONYMOUS", "client1", 100) maybeRecord(clientQuotaManager, "ANONYMOUS", "client1", 100)
// remove all the sensors // remove all the sensors
metrics.removeSensor("ProduceThrottleTime-:client1") metrics.removeSensor("ProduceThrottleTime-:client1")
@ -391,6 +402,9 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce, time, "") val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce, time, "")
val clientId = "client@#$%" val clientId = "client@#$%"
try { try {
clientQuotaManager.updateQuota(None, Some(ConfigEntityName.Default), Some(ConfigEntityName.Default),
Some(new Quota(500, true)))
maybeRecord(clientQuotaManager, "ANONYMOUS", clientId, 100) maybeRecord(clientQuotaManager, "ANONYMOUS", clientId, 100)
// The metrics should use the raw client ID, even if the reporters internally sanitize them // The metrics should use the raw client ID, even if the reporters internally sanitize them

View File

@ -743,8 +743,6 @@ class KafkaConfigTest {
case KafkaConfig.TransactionsTopicPartitionsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2") case KafkaConfig.TransactionsTopicPartitionsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
case KafkaConfig.TransactionsTopicSegmentBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2") case KafkaConfig.TransactionsTopicSegmentBytesProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
case KafkaConfig.TransactionsTopicReplicationFactorProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2") case KafkaConfig.TransactionsTopicReplicationFactorProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0", "-2")
case KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case KafkaConfig.NumQuotaSamplesProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") case KafkaConfig.NumQuotaSamplesProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case KafkaConfig.QuotaWindowSizeSecondsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") case KafkaConfig.QuotaWindowSizeSecondsProp => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case KafkaConfig.DeleteTopicEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean", "0") case KafkaConfig.DeleteTopicEnableProp => assertPropertyInvalid(baseProperties, name, "not_a_boolean", "0")

View File

@ -53,6 +53,8 @@
<li>The <code>MessageFormatter.init(Properties)</code> method was removed. Please use <code>configure(Map)</code> instead.</li> <li>The <code>MessageFormatter.init(Properties)</code> method was removed. Please use <code>configure(Map)</code> instead.</li>
<li>Kafka Streams no longer has a compile time dependency on "connect:json" module (<a href="https://issues.apache.org/jira/browse/KAFKA-5146">KAFKA-5146</a>). <li>Kafka Streams no longer has a compile time dependency on "connect:json" module (<a href="https://issues.apache.org/jira/browse/KAFKA-5146">KAFKA-5146</a>).
Projects that were relying on this transitive dependency will have to explicitly declare it.</li> Projects that were relying on this transitive dependency will have to explicitly declare it.</li>
<li>The deprecated <code>quota.producer.default</code> and <code>quota.consumer.default</code> configurations were removed (<a href="https://issues.apache.org/jira/browse/KAFKA-12591">KAFKA-12591</a>).
Dynamic quota defaults must be used instead.</li>
</ul> </ul>
</ul> </ul>