KAFKA-19282: Update quotaTypesEnabled on quota removal in ClientQuotaManager (#19742)
CI / build (push) Waiting to run Details

In `kafka.server.ClientQuotaManager` class, `quotaTypesEnabled` is not updated when a quota is removed via `removeQuota` method in `DefaultQuotaCallback` class. This field is set when quotas are added in `updateQuota` but it's never changed or cleared. So in case all the quotas have been removed dynamically, the system may incorrectly assume the quotas are active, which leads to unnecessary metric creation or updates until the broker is restarted.

Reviewers: Jonah Hooper <jhooper@confluent.io>, Hailey Ni <hni@confluent.io>, Alyssa Huang <ahuang@confluent.io>, David Jacot <djacot@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
This commit is contained in:
Mahsa Seifikar 2025-06-25 16:29:46 -04:00 committed by GitHub
parent 23ddb1d8ac
commit 7aaba96cc1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 209 additions and 16 deletions

View File

@ -155,9 +155,12 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
case None => new DefaultQuotaCallback case None => new DefaultQuotaCallback
} }
private val clientQuotaType = QuotaType.toClientQuotaType(quotaType) private val clientQuotaType = QuotaType.toClientQuotaType(quotaType)
private val activeQuotaEntities = new ConcurrentHashMap[Int, Int]() // Key is QuotaTypes, value is the number of active quota entities of that type
@volatile @volatile
private var quotaTypesEnabled = clientQuotaCallbackPlugin match { var quotaTypesEnabled = clientQuotaCallbackPlugin match {
case Some(_) => QuotaTypes.CustomQuotas case Some(_) => QuotaTypes.CustomQuotas
case None => QuotaTypes.NoQuotas case None => QuotaTypes.NoQuotas
} }
@ -194,9 +197,6 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
/** /**
* Returns true if any quotas are enabled for this quota manager. This is used * Returns true if any quotas are enabled for this quota manager. This is used
* to determine if quota related metrics should be created. * to determine if quota related metrics should be created.
* Note: If any quotas (static defaults, dynamic defaults or quota overrides) have
* been configured for this broker at any time for this quota type, quotasEnabled will
* return true until the next broker restart, even if all quotas are subsequently deleted.
*/ */
def quotasEnabled: Boolean = quotaTypesEnabled != QuotaTypes.NoQuotas def quotasEnabled: Boolean = quotaTypesEnabled != QuotaTypes.NoQuotas
@ -428,18 +428,16 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
try { try {
val quotaEntity = KafkaQuotaEntity(userEntity, clientEntity) val quotaEntity = KafkaQuotaEntity(userEntity, clientEntity)
if (userEntity.nonEmpty) {
if (quotaEntity.clientIdEntity.nonEmpty)
quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
else
quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
} else if (clientEntity.nonEmpty)
quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
quota match { quota match {
case Some(newQuota) => quotaCallback.updateQuota(clientQuotaType, quotaEntity, newQuota.bound) case Some(newQuota) =>
case None => quotaCallback.removeQuota(clientQuotaType, quotaEntity) updateQuotaTypes(quotaEntity, shouldAdd = true)
quotaCallback.updateQuota(clientQuotaType, quotaEntity, newQuota.bound)
case None =>
updateQuotaTypes(quotaEntity, shouldAdd = false)
quotaCallback.removeQuota(clientQuotaType, quotaEntity)
} }
val updatedEntity = if (userEntity.contains(DefaultUserEntity) || clientEntity.contains(DefaultClientIdEntity)) val updatedEntity = if (userEntity.contains(DefaultUserEntity) || clientEntity.contains(DefaultClientIdEntity))
None // more than one entity may need updating, so `updateQuotaMetricConfigs` will go through all metrics None // more than one entity may need updating, so `updateQuotaMetricConfigs` will go through all metrics
else else
@ -451,6 +449,60 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
} }
} }
/**
* Updates `quotaTypesEnabled` by performing a bitwise OR operation to combine the enabled quota types.
* This method ensures that the `quotaTypesEnabled` field reflects the active quota types based on the
* current state of `activeQuotaEntities`.
* For example:
* - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then quotaTypesEnabled = 3 (2 | 1 = 3)
* - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then quotaTypesEnabled = (4 | 1 = 5)
* - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then quotaTypesEnabled = 6 (4 | 2 = 6)
* - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7
*
* @param quotaEntity The entity for which the quota is being updated, which can be a combination of user and client-id.
* @param shouldAdd A boolean indicating whether to add or remove the quota entity.
*/
private def updateQuotaTypes(quotaEntity: KafkaQuotaEntity, shouldAdd: Boolean): Unit = {
if (quotaTypesEnabled == QuotaTypes.CustomQuotas) {
// If custom quotas are enabled, we do not need to update quota types
return
}
val isActive = quotaCallback match {
case callback: DefaultQuotaCallback => callback.getActiveQuotasEntities.contains(quotaEntity)
case _ => true
}
val activeQuotaType = quotaEntity match {
case KafkaQuotaEntity(Some(_), Some(_)) => QuotaTypes.UserClientIdQuotaEnabled
case KafkaQuotaEntity(Some(_), None) => QuotaTypes.UserQuotaEnabled
case KafkaQuotaEntity(None, Some(_)) => QuotaTypes.ClientIdQuotaEnabled
case _ => QuotaTypes.NoQuotas
}
if (shouldAdd && !isActive) {
activeQuotaEntities.compute(activeQuotaType, (_, currentValue) => if (currentValue == 0) 1 else currentValue + 1)
quotaTypesEnabled |= activeQuotaType
} else if (!shouldAdd && isActive) {
activeQuotaEntities.compute(activeQuotaType, (_, currentValue) => if (currentValue <= 1) 0 else currentValue - 1)
if (activeQuotaEntities.get(activeQuotaType) == 0) {
quotaTypesEnabled &= ~activeQuotaType
}
}
val quotaTypes = List(
QuotaTypes.UserClientIdQuotaEnabled -> "UserClientIdQuota",
QuotaTypes.ClientIdQuotaEnabled -> "ClientIdQuota",
QuotaTypes.UserQuotaEnabled -> "UserQuota"
)
val activeEntities = quotaTypes.collect {
case (k, name) if activeQuotaEntities.get(k) > 0 => name
}.mkString(", ")
info(s"Quota types enabled has been changed to $quotaTypesEnabled with active quota entities: [$activeEntities]")
}
/** /**
* Updates metrics configs. This is invoked when quota configs are updated when partitions leaders change * Updates metrics configs. This is invoked when quota configs are updated when partitions leaders change
* and custom callbacks that implement partition-based quotas have updated quotas. * and custom callbacks that implement partition-based quotas have updated quotas.
@ -647,6 +699,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
Map(DefaultTags.User -> userTag, DefaultTags.ClientId -> clientIdTag) Map(DefaultTags.User -> userTag, DefaultTags.ClientId -> clientIdTag)
} }
def getActiveQuotasEntities: util.Set[ClientQuotaEntity] = {
overriddenQuotas.keySet()
}
override def close(): Unit = {} override def close(): Unit = {}
} }
} }

View File

@ -16,17 +16,21 @@
*/ */
package kafka.server package kafka.server
import kafka.server.ClientQuotaManager.BaseUserEntity import kafka.server.ClientQuotaManager.{BaseUserEntity, KafkaQuotaEntity}
import org.apache.kafka.common.Cluster
import java.net.InetAddress import java.net.InetAddress
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.metrics.Quota import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.server.config.ClientQuotaManagerConfig import org.apache.kafka.server.config.ClientQuotaManagerConfig
import org.apache.kafka.network.Session import org.apache.kafka.network.Session
import org.apache.kafka.server.quota.QuotaType import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaType, QuotaType}
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import java.util.{Collections, Map, HashMap}
class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
private val config = new ClientQuotaManagerConfig() private val config = new ClientQuotaManagerConfig()
@ -501,6 +505,140 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
} }
} }
@Test
def testQuotaTypesEnabledUpdatesWithDefaultCallback(): Unit = {
val clientQuotaManager = new ClientQuotaManager(config, metrics, QuotaType.CONTROLLER_MUTATION, time, "")
try {
// Initially, quotaTypesEnabled should be QuotaTypes.NoQuotas and quotasEnabled should be false
assertEquals(QuotaTypes.NoQuotas, clientQuotaManager.quotaTypesEnabled)
assertFalse(clientQuotaManager.quotasEnabled)
// Add a client-id quota, quotaTypesEnabled should be QuotaTypes.ClientIdQuotaEnabled
clientQuotaManager.updateQuota(None, Some(ClientQuotaManager.ClientIdEntity("client1")), Some(new Quota(5, true)))
assertEquals(QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled)
assertTrue(clientQuotaManager.quotasEnabled)
// Add a user quota, quotaTypesEnabled should be QuotaTypes.UserQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled
clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), None, Some(new Quota(5, true)))
assertEquals(QuotaTypes.UserQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled)
assertTrue(clientQuotaManager.quotasEnabled)
// Add a duplicate client-id quota, quotaTypesEnabled should remain unchanged
clientQuotaManager.updateQuota(None, Some(ClientQuotaManager.ClientIdEntity("client2")), Some(new Quota(5, true)))
assertEquals(QuotaTypes.UserQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled)
assertTrue(clientQuotaManager.quotasEnabled)
// Add duplicate user quota, quotaTypesEnabled should remain unchanged
clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userB")), None, Some(new Quota(5, true)))
assertEquals(QuotaTypes.UserQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled)
assertTrue(clientQuotaManager.quotasEnabled)
// Add a user-client-id quota, quotaTypesEnabled should be QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled
clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), Some(ClientQuotaManager.ClientIdEntity("client1")), Some(new Quota(10, true)))
assertEquals(QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled, clientQuotaManager.quotaTypesEnabled)
assertTrue(clientQuotaManager.quotasEnabled)
// Add Duplicate user-client-id quota, quotaTypesEnabled should remain unchanged
clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), Some(ClientQuotaManager.ClientIdEntity("client1")), Some(new Quota(12, true)))
assertEquals(QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled, clientQuotaManager.quotaTypesEnabled)
assertTrue(clientQuotaManager.quotasEnabled)
// Remove the first user quota, quotaTypesEnabled should remain unchanged
clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), None, None)
assertEquals(QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled, clientQuotaManager.quotaTypesEnabled)
assertTrue(clientQuotaManager.quotasEnabled)
// Remove the second user quota, quotaTypesEnabled should be QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled
clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userB")), None, None)
assertEquals(QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled)
assertTrue(clientQuotaManager.quotasEnabled)
// Remove the first client-id quota, quotaTypesEnabled should remain unchanged
clientQuotaManager.updateQuota(None, Some(ClientQuotaManager.ClientIdEntity("client1")), None)
assertEquals(QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled)
assertTrue(clientQuotaManager.quotasEnabled)
// Remove the second client-id quota, quotaTypesEnabled should be QuotaTypes.UserClientIdQuotaEnabled
clientQuotaManager.updateQuota(None, Some(ClientQuotaManager.ClientIdEntity("client2")), None)
assertEquals(QuotaTypes.UserClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled)
assertTrue(clientQuotaManager.quotasEnabled)
// Remove the first user-client-id quota, quotaTypesEnabled should be noQuotas as both user-client-id quotas has the same user client but different quota
clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), Some(ClientQuotaManager.ClientIdEntity("client1")), None)
assertEquals(QuotaTypes.NoQuotas, clientQuotaManager.quotaTypesEnabled)
assertFalse(clientQuotaManager.quotasEnabled)
// Remove the second user-client-id quota, quotaTypesEnabled should be QuotaTypes.NoQuotas and quotasEnabled should be false
clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), Some(ClientQuotaManager.ClientIdEntity("client1")), None)
assertEquals(QuotaTypes.NoQuotas, clientQuotaManager.quotaTypesEnabled)
assertFalse(clientQuotaManager.quotasEnabled)
} finally {
clientQuotaManager.shutdown()
}
}
@Test
def testQuotaTypesEnabledUpdatesWithCustomCallback(): Unit = {
val customQuotaCallback = new ClientQuotaCallback {
val quotas = new HashMap[ClientQuotaEntity, Quota]()
override def configure(configs: Map[String, _]): Unit = {}
override def quotaMetricTags(quotaType: ClientQuotaType, principal: KafkaPrincipal, clientId: String): Map[String, String] = Collections.emptyMap()
override def quotaLimit(quotaType: ClientQuotaType, metricTags: Map[String, String]): java.lang.Double = 1
override def updateClusterMetadata(cluster: Cluster): Boolean = false
override def updateQuota(quotaType: ClientQuotaType, entity: ClientQuotaEntity, newValue: Double): Unit = {
quotas.put(entity.asInstanceOf[KafkaQuotaEntity], new Quota(newValue.toLong, true))
}
override def removeQuota(quotaType: ClientQuotaType, entity: ClientQuotaEntity): Unit = {
quotas.remove(entity.asInstanceOf[KafkaQuotaEntity])
}
override def quotaResetRequired(quotaType: ClientQuotaType): Boolean = false
override def close(): Unit = {}
}
val clientQuotaManager = new ClientQuotaManager(
config = new ClientQuotaManagerConfig(),
metrics = metrics,
quotaType = QuotaType.CONTROLLER_MUTATION,
time = time,
threadNamePrefix = "",
clientQuotaCallbackPlugin = Some(Plugin.wrapInstance(customQuotaCallback, metrics, ""))
)
try {
// Initially, quotaTypesEnabled should be CustomQuotas and quotasEnabled should be true
assertEquals(QuotaTypes.CustomQuotas, clientQuotaManager.quotaTypesEnabled)
assertTrue(clientQuotaManager.quotasEnabled, "quotasEnabled should be true with custom callback")
// Add a client-id quota, quotaTypesEnabled should remain QuotaTypes.CustomQuotas
clientQuotaManager.updateQuota(None, Some(ClientQuotaManager.ClientIdEntity("client1")), Some(new Quota(12, true)))
assertEquals(QuotaTypes.CustomQuotas, clientQuotaManager.quotaTypesEnabled)
// Add a user quota, quotaTypesEnabled quotaTypesEnabled should remain QuotaTypes.CustomQuotas
clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), None, Some(new Quota(12, true)))
assertEquals(QuotaTypes.CustomQuotas, clientQuotaManager.quotaTypesEnabled)
assertTrue(clientQuotaManager.quotasEnabled, "quotasEnabled should remain true")
// Add a user-client-id quota, quotaTypesEnabled should remain QuotaTypes.CustomQuotas
clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), Some(ClientQuotaManager.ClientIdEntity("client1")), Some(new Quota(12, true)))
assertEquals(QuotaTypes.CustomQuotas, clientQuotaManager.quotaTypesEnabled)
assertTrue(clientQuotaManager.quotasEnabled, "quotasEnabled should remain true")
// Remove all quotas, quotaTypesEnabled should be QuotaTypes.CustomQuotas
clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), Some(ClientQuotaManager.ClientIdEntity("client1")), None)
clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), None, None)
clientQuotaManager.updateQuota(None, Some(ClientQuotaManager.ClientIdEntity("client1")), None)
assertEquals(QuotaTypes.CustomQuotas, clientQuotaManager.quotaTypesEnabled)
assertTrue(clientQuotaManager.quotasEnabled, "quotasEnabled should remain true")
} finally {
clientQuotaManager.shutdown()
}
}
private case class UserClient( private case class UserClient(
user: String, user: String,
clientId: String, clientId: String,