diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index 894e410fc63..8bd35f2c7f0 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -155,9 +155,12 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, case None => new DefaultQuotaCallback } private val clientQuotaType = QuotaType.toClientQuotaType(quotaType) + private val activeQuotaEntities = new ConcurrentHashMap[Int, Int]() // Key is QuotaTypes, value is the number of active quota entities of that type + + @volatile - private var quotaTypesEnabled = clientQuotaCallbackPlugin match { + var quotaTypesEnabled = clientQuotaCallbackPlugin match { case Some(_) => QuotaTypes.CustomQuotas case None => QuotaTypes.NoQuotas } @@ -194,9 +197,6 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, /** * Returns true if any quotas are enabled for this quota manager. This is used * to determine if quota related metrics should be created. - * Note: If any quotas (static defaults, dynamic defaults or quota overrides) have - * been configured for this broker at any time for this quota type, quotasEnabled will - * return true until the next broker restart, even if all quotas are subsequently deleted. */ def quotasEnabled: Boolean = quotaTypesEnabled != QuotaTypes.NoQuotas @@ -428,18 +428,16 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, try { val quotaEntity = KafkaQuotaEntity(userEntity, clientEntity) - if (userEntity.nonEmpty) { - if (quotaEntity.clientIdEntity.nonEmpty) - quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled - else - quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled - } else if (clientEntity.nonEmpty) - quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled - quota match { - case Some(newQuota) => quotaCallback.updateQuota(clientQuotaType, quotaEntity, newQuota.bound) - case None => quotaCallback.removeQuota(clientQuotaType, quotaEntity) + case Some(newQuota) => + updateQuotaTypes(quotaEntity, shouldAdd = true) + quotaCallback.updateQuota(clientQuotaType, quotaEntity, newQuota.bound) + + case None => + updateQuotaTypes(quotaEntity, shouldAdd = false) + quotaCallback.removeQuota(clientQuotaType, quotaEntity) } + val updatedEntity = if (userEntity.contains(DefaultUserEntity) || clientEntity.contains(DefaultClientIdEntity)) None // more than one entity may need updating, so `updateQuotaMetricConfigs` will go through all metrics else @@ -451,6 +449,60 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } } + /** + * Updates `quotaTypesEnabled` by performing a bitwise OR operation to combine the enabled quota types. + * This method ensures that the `quotaTypesEnabled` field reflects the active quota types based on the + * current state of `activeQuotaEntities`. + * For example: + * - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then quotaTypesEnabled = 3 (2 | 1 = 3) + * - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then quotaTypesEnabled = (4 | 1 = 5) + * - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then quotaTypesEnabled = 6 (4 | 2 = 6) + * - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7 + * + * @param quotaEntity The entity for which the quota is being updated, which can be a combination of user and client-id. + * @param shouldAdd A boolean indicating whether to add or remove the quota entity. + */ + private def updateQuotaTypes(quotaEntity: KafkaQuotaEntity, shouldAdd: Boolean): Unit = { + + if (quotaTypesEnabled == QuotaTypes.CustomQuotas) { + // If custom quotas are enabled, we do not need to update quota types + return + } + + val isActive = quotaCallback match { + case callback: DefaultQuotaCallback => callback.getActiveQuotasEntities.contains(quotaEntity) + case _ => true + } + + val activeQuotaType = quotaEntity match { + case KafkaQuotaEntity(Some(_), Some(_)) => QuotaTypes.UserClientIdQuotaEnabled + case KafkaQuotaEntity(Some(_), None) => QuotaTypes.UserQuotaEnabled + case KafkaQuotaEntity(None, Some(_)) => QuotaTypes.ClientIdQuotaEnabled + case _ => QuotaTypes.NoQuotas + } + + if (shouldAdd && !isActive) { + activeQuotaEntities.compute(activeQuotaType, (_, currentValue) => if (currentValue == 0) 1 else currentValue + 1) + quotaTypesEnabled |= activeQuotaType + } else if (!shouldAdd && isActive) { + activeQuotaEntities.compute(activeQuotaType, (_, currentValue) => if (currentValue <= 1) 0 else currentValue - 1) + if (activeQuotaEntities.get(activeQuotaType) == 0) { + quotaTypesEnabled &= ~activeQuotaType + } + } + + val quotaTypes = List( + QuotaTypes.UserClientIdQuotaEnabled -> "UserClientIdQuota", + QuotaTypes.ClientIdQuotaEnabled -> "ClientIdQuota", + QuotaTypes.UserQuotaEnabled -> "UserQuota" + ) + + val activeEntities = quotaTypes.collect { + case (k, name) if activeQuotaEntities.get(k) > 0 => name + }.mkString(", ") + info(s"Quota types enabled has been changed to $quotaTypesEnabled with active quota entities: [$activeEntities]") + } + /** * Updates metrics configs. This is invoked when quota configs are updated when partitions leaders change * and custom callbacks that implement partition-based quotas have updated quotas. @@ -647,6 +699,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, Map(DefaultTags.User -> userTag, DefaultTags.ClientId -> clientIdTag) } + def getActiveQuotasEntities: util.Set[ClientQuotaEntity] = { + overriddenQuotas.keySet() + } override def close(): Unit = {} } } diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala index 6c268d3c3fb..aee13e92bc0 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala @@ -16,17 +16,21 @@ */ package kafka.server -import kafka.server.ClientQuotaManager.BaseUserEntity +import kafka.server.ClientQuotaManager.{BaseUserEntity, KafkaQuotaEntity} +import org.apache.kafka.common.Cluster import java.net.InetAddress +import org.apache.kafka.common.internals.Plugin import org.apache.kafka.common.metrics.Quota import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.server.config.ClientQuotaManagerConfig import org.apache.kafka.network.Session -import org.apache.kafka.server.quota.QuotaType +import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaType, QuotaType} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Test +import java.util.{Collections, Map, HashMap} + class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { private val config = new ClientQuotaManagerConfig() @@ -501,6 +505,140 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest { } } + @Test + def testQuotaTypesEnabledUpdatesWithDefaultCallback(): Unit = { + val clientQuotaManager = new ClientQuotaManager(config, metrics, QuotaType.CONTROLLER_MUTATION, time, "") + try { + // Initially, quotaTypesEnabled should be QuotaTypes.NoQuotas and quotasEnabled should be false + assertEquals(QuotaTypes.NoQuotas, clientQuotaManager.quotaTypesEnabled) + assertFalse(clientQuotaManager.quotasEnabled) + + // Add a client-id quota, quotaTypesEnabled should be QuotaTypes.ClientIdQuotaEnabled + clientQuotaManager.updateQuota(None, Some(ClientQuotaManager.ClientIdEntity("client1")), Some(new Quota(5, true))) + assertEquals(QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled) + + // Add a user quota, quotaTypesEnabled should be QuotaTypes.UserQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled + clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), None, Some(new Quota(5, true))) + assertEquals(QuotaTypes.UserQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled) + + // Add a duplicate client-id quota, quotaTypesEnabled should remain unchanged + clientQuotaManager.updateQuota(None, Some(ClientQuotaManager.ClientIdEntity("client2")), Some(new Quota(5, true))) + assertEquals(QuotaTypes.UserQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled) + + // Add duplicate user quota, quotaTypesEnabled should remain unchanged + clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userB")), None, Some(new Quota(5, true))) + assertEquals(QuotaTypes.UserQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled) + + // Add a user-client-id quota, quotaTypesEnabled should be QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled + clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), Some(ClientQuotaManager.ClientIdEntity("client1")), Some(new Quota(10, true))) + assertEquals(QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled, clientQuotaManager.quotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled) + + // Add Duplicate user-client-id quota, quotaTypesEnabled should remain unchanged + clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), Some(ClientQuotaManager.ClientIdEntity("client1")), Some(new Quota(12, true))) + assertEquals(QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled, clientQuotaManager.quotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled) + + // Remove the first user quota, quotaTypesEnabled should remain unchanged + clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), None, None) + assertEquals(QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled, clientQuotaManager.quotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled) + + // Remove the second user quota, quotaTypesEnabled should be QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled + clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userB")), None, None) + assertEquals(QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled) + + // Remove the first client-id quota, quotaTypesEnabled should remain unchanged + clientQuotaManager.updateQuota(None, Some(ClientQuotaManager.ClientIdEntity("client1")), None) + assertEquals(QuotaTypes.UserClientIdQuotaEnabled | QuotaTypes.ClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled) + + // Remove the second client-id quota, quotaTypesEnabled should be QuotaTypes.UserClientIdQuotaEnabled + clientQuotaManager.updateQuota(None, Some(ClientQuotaManager.ClientIdEntity("client2")), None) + assertEquals(QuotaTypes.UserClientIdQuotaEnabled, clientQuotaManager.quotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled) + + // Remove the first user-client-id quota, quotaTypesEnabled should be noQuotas as both user-client-id quotas has the same user client but different quota + clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), Some(ClientQuotaManager.ClientIdEntity("client1")), None) + assertEquals(QuotaTypes.NoQuotas, clientQuotaManager.quotaTypesEnabled) + assertFalse(clientQuotaManager.quotasEnabled) + + // Remove the second user-client-id quota, quotaTypesEnabled should be QuotaTypes.NoQuotas and quotasEnabled should be false + clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), Some(ClientQuotaManager.ClientIdEntity("client1")), None) + assertEquals(QuotaTypes.NoQuotas, clientQuotaManager.quotaTypesEnabled) + assertFalse(clientQuotaManager.quotasEnabled) + } finally { + clientQuotaManager.shutdown() + } + } + + @Test + def testQuotaTypesEnabledUpdatesWithCustomCallback(): Unit = { + val customQuotaCallback = new ClientQuotaCallback { + val quotas = new HashMap[ClientQuotaEntity, Quota]() + override def configure(configs: Map[String, _]): Unit = {} + + override def quotaMetricTags(quotaType: ClientQuotaType, principal: KafkaPrincipal, clientId: String): Map[String, String] = Collections.emptyMap() + + override def quotaLimit(quotaType: ClientQuotaType, metricTags: Map[String, String]): java.lang.Double = 1 + override def updateClusterMetadata(cluster: Cluster): Boolean = false + + override def updateQuota(quotaType: ClientQuotaType, entity: ClientQuotaEntity, newValue: Double): Unit = { + quotas.put(entity.asInstanceOf[KafkaQuotaEntity], new Quota(newValue.toLong, true)) + } + + override def removeQuota(quotaType: ClientQuotaType, entity: ClientQuotaEntity): Unit = { + quotas.remove(entity.asInstanceOf[KafkaQuotaEntity]) + } + + override def quotaResetRequired(quotaType: ClientQuotaType): Boolean = false + + override def close(): Unit = {} + } + val clientQuotaManager = new ClientQuotaManager( + config = new ClientQuotaManagerConfig(), + metrics = metrics, + quotaType = QuotaType.CONTROLLER_MUTATION, + time = time, + threadNamePrefix = "", + clientQuotaCallbackPlugin = Some(Plugin.wrapInstance(customQuotaCallback, metrics, "")) + ) + + try { + // Initially, quotaTypesEnabled should be CustomQuotas and quotasEnabled should be true + assertEquals(QuotaTypes.CustomQuotas, clientQuotaManager.quotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled, "quotasEnabled should be true with custom callback") + + // Add a client-id quota, quotaTypesEnabled should remain QuotaTypes.CustomQuotas + clientQuotaManager.updateQuota(None, Some(ClientQuotaManager.ClientIdEntity("client1")), Some(new Quota(12, true))) + assertEquals(QuotaTypes.CustomQuotas, clientQuotaManager.quotaTypesEnabled) + + // Add a user quota, quotaTypesEnabled quotaTypesEnabled should remain QuotaTypes.CustomQuotas + clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), None, Some(new Quota(12, true))) + assertEquals(QuotaTypes.CustomQuotas, clientQuotaManager.quotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled, "quotasEnabled should remain true") + + // Add a user-client-id quota, quotaTypesEnabled should remain QuotaTypes.CustomQuotas + clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), Some(ClientQuotaManager.ClientIdEntity("client1")), Some(new Quota(12, true))) + assertEquals(QuotaTypes.CustomQuotas, clientQuotaManager.quotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled, "quotasEnabled should remain true") + + // Remove all quotas, quotaTypesEnabled should be QuotaTypes.CustomQuotas + clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), Some(ClientQuotaManager.ClientIdEntity("client1")), None) + clientQuotaManager.updateQuota(Some(ClientQuotaManager.UserEntity("userA")), None, None) + clientQuotaManager.updateQuota(None, Some(ClientQuotaManager.ClientIdEntity("client1")), None) + assertEquals(QuotaTypes.CustomQuotas, clientQuotaManager.quotaTypesEnabled) + assertTrue(clientQuotaManager.quotasEnabled, "quotasEnabled should remain true") + } finally { + clientQuotaManager.shutdown() + } + } + private case class UserClient( user: String, clientId: String,