KAFKA-16411: Correctly migrate default client quota entities (#15584)

KAFKA-16222 fixed a bug whereby we didn't undo the name sanitization used on client quota entity names
stored in ZooKeeper. However, it incorrectly claimed to fix the handling of default client quota
entities. It also failed to correctly re-sanitize when syncronizing the data back to ZooKeeper.

This PR fixes ZkConfigMigrationClient to do the sanitization correctly on both the read and write
paths. We do de-sanitization before invoking the visitors, since after all it does not make sense to
do the same de-sanitization step in each and every visitor.

Additionally, this PR fixes a bug causing default entities to be converted incorrectly. For example,
ClientQuotaEntity(user -> null) is stored under the /config/users/<default> znode in ZooKeeper. In
KRaft it appears as a ClientQuotaRecord with EntityData(entityType=users, entityName=null).
Prior to this PR, this was being converted to a ClientQuotaRecord with EntityData(entityType=users,
entityName=""). That represents a quota on the user whose name is the empty string (yes, we allow
users to name themselves with the empty string, sadly.)

The confusion appears to have arisen because for TOPIC and BROKER configurations, the default
ConfigResource is indeed the one named with the empty (not null) string. For example, the default
topic configuration resource is ConfigResource(name="", type=TOPIC).  However, things are different
for client quotas. Default client quota entities in KRaft (and also in AdminClient) are represented
by maps with null values. For example, the default User entity is represented by Map("user" ->
null).  In retrospect, using a map with null values was a poor choice; a Map<String,
Optional<String>> would have made more sense. However, this is the way the API currently is and we
have to convert correctly.

There was an additional level of confusion present in KAFKA-16222 where someone thought that using
the ZooKeeper placeholder string "<default>" in the AdminClient API would yield a default client
quota entity. Thise seems to have been suggested by the ConfigEntityName class that was created
recently. In fact, <default> is not part of any public API in Kafka. Accordingly, this PR also
renames ConfigEntityName.DEFAULT to ZooKeeperInternals.DEFAULT_STRING, to make it clear that the
string <default> is just a detail of the ZooKeeper implementation.  It is not used in the Kafka API
to indicate defaults. Hopefully this will avoid confusion in the future.

Finally, the PR also creates KRaftClusterTest.testDefaultClientQuotas to get extra test coverage of
setting default client quotas.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Igor Soarez <soarez@apple.com>
This commit is contained in:
Colin Patrick McCabe 2024-03-26 16:49:38 -07:00 committed by GitHub
parent 4099774da9
commit 8d914b543d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 251 additions and 141 deletions

View File

@ -35,7 +35,7 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity,
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism}
import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
import org.apache.kafka.server.config.{ConfigEntityName, ConfigType}
import org.apache.kafka.server.config.{ConfigType, ZooKeeperInternals}
import org.apache.kafka.security.{PasswordEncoder, PasswordEncoderConfigs}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.kafka.storage.internals.log.LogConfig
@ -153,7 +153,7 @@ object ConfigCommand extends Logging {
if (!configsToBeAdded.isEmpty || configsToBeDeleted.nonEmpty) {
validateBrokersNotRunning(entityName, adminZkClient, zkClient, errorMessage)
val perBrokerConfig = entityName != ConfigEntityName.DEFAULT
val perBrokerConfig = entityName != ZooKeeperInternals.DEFAULT_STRING
preProcessBrokerConfigs(configsToBeAdded, perBrokerConfig)
}
}
@ -178,7 +178,7 @@ object ConfigCommand extends Logging {
adminZkClient: AdminZkClient,
zkClient: KafkaZkClient,
errorMessage: String): Unit = {
val perBrokerConfig = entityName != ConfigEntityName.DEFAULT
val perBrokerConfig = entityName != ZooKeeperInternals.DEFAULT_STRING
val info = "Broker configuration operations using ZooKeeper are only supported if the affected broker(s) are not running."
if (perBrokerConfig) {
adminZkClient.parseBroker(entityName).foreach { brokerId =>
@ -697,7 +697,7 @@ object ConfigCommand extends Logging {
case t => t
}
sanitizedName match {
case Some(ConfigEntityName.DEFAULT) => "default " + typeName
case Some(ZooKeeperInternals.DEFAULT_STRING) => "default " + typeName
case Some(n) =>
val desanitized = if (entityType == ConfigType.USER || entityType == ConfigType.CLIENT) Sanitizer.desanitize(n) else n
s"$typeName '$desanitized'"
@ -758,7 +758,7 @@ object ConfigCommand extends Logging {
else {
// Exactly one entity type and at-most one entity name expected for other entities
val name = entityNames.headOption match {
case Some("") => Some(ConfigEntityName.DEFAULT)
case Some("") => Some(ZooKeeperInternals.DEFAULT_STRING)
case v => v
}
ConfigEntity(Entity(entityTypes.head, name), None)
@ -775,7 +775,7 @@ object ConfigCommand extends Logging {
def sanitizeName(entityType: String, name: String) = {
if (name.isEmpty)
ConfigEntityName.DEFAULT
ZooKeeperInternals.DEFAULT_STRING
else {
entityType match {
case ConfigType.USER | ConfigType.CLIENT => Sanitizer.sanitize(name)

View File

@ -19,7 +19,6 @@ package kafka.server
import java.{lang, util}
import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit}
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.network.RequestChannel
import kafka.server.ClientQuotaManager._
import kafka.utils.{Logging, QuotaUtils}
@ -29,7 +28,7 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Rate}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Sanitizer, Time}
import org.apache.kafka.server.config.{ConfigEntityName, ClientQuotaManagerConfig}
import org.apache.kafka.server.config.{ClientQuotaManagerConfig, ZooKeeperInternals}
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaType}
import org.apache.kafka.server.util.ShutdownableThread
import org.apache.kafka.network.Session
@ -76,13 +75,13 @@ object ClientQuotaManager {
case object DefaultUserEntity extends BaseUserEntity {
override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.DEFAULT_USER
override def name: String = ConfigEntityName.DEFAULT
override def name: String = ZooKeeperInternals.DEFAULT_STRING
override def toString: String = "default user"
}
case object DefaultClientIdEntity extends ClientQuotaEntity.ConfigEntity {
override def entityType: ClientQuotaEntity.ConfigEntityType = ClientQuotaEntity.ConfigEntityType.DEFAULT_CLIENT_ID
override def name: String = ConfigEntityName.DEFAULT
override def name: String = ZooKeeperInternals.DEFAULT_STRING
override def toString: String = "default client-id"
}
@ -93,7 +92,7 @@ object ClientQuotaManager {
def sanitizedUser: String = userEntity.map {
case entity: UserEntity => entity.sanitizedUser
case DefaultUserEntity => ConfigEntityName.DEFAULT
case DefaultUserEntity => ZooKeeperInternals.DEFAULT_STRING
}.getOrElse("")
def clientId: String = clientIdEntity.map(_.name).getOrElse("")
@ -419,11 +418,11 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
lock.writeLock().lock()
try {
val userEntity = sanitizedUser.map {
case ConfigEntityName.DEFAULT => DefaultUserEntity
case ZooKeeperInternals.DEFAULT_STRING => DefaultUserEntity
case user => UserEntity(user)
}
val clientIdEntity = sanitizedClientId.map {
case ConfigEntityName.DEFAULT => DefaultClientIdEntity
case ZooKeeperInternals.DEFAULT_STRING => DefaultClientIdEntity
case _ => ClientIdEntity(clientId.getOrElse(throw new IllegalStateException("Client-id not provided")))
}
val quotaEntity = KafkaQuotaEntity(userEntity, clientIdEntity)

View File

@ -34,7 +34,7 @@ import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.common.metrics.Quota._
import org.apache.kafka.common.utils.Sanitizer
import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.config.ConfigEntityName
import org.apache.kafka.server.config.ZooKeeperInternals
import org.apache.kafka.storage.internals.log.{LogConfig, ThrottledReplicaListValidator}
import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
@ -208,7 +208,7 @@ class UserConfigHandler(private val quotaManagers: QuotaManagers, val credential
val sanitizedUser = entities(0)
val sanitizedClientId = if (entities.length == 3) Some(entities(2)) else None
updateQuotaConfig(Some(sanitizedUser), sanitizedClientId, config)
if (sanitizedClientId.isEmpty && sanitizedUser != ConfigEntityName.DEFAULT)
if (sanitizedClientId.isEmpty && sanitizedUser != ZooKeeperInternals.DEFAULT_STRING)
credentialProvider.updateCredentials(Sanitizer.desanitize(sanitizedUser), config)
}
}
@ -218,7 +218,7 @@ class IpConfigHandler(private val connectionQuotas: ConnectionQuotas) extends Co
def processConfigChanges(ip: String, config: Properties): Unit = {
val ipConnectionRateQuota = Option(config.getProperty(QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG)).map(_.toInt)
val updatedIp = {
if (ip != ConfigEntityName.DEFAULT) {
if (ip != ZooKeeperInternals.DEFAULT_STRING) {
try {
Some(InetAddress.getByName(ip))
} catch {
@ -246,7 +246,7 @@ class BrokerConfigHandler(private val brokerConfig: KafkaConfig,
else
DefaultReplicationThrottledRate
}
if (brokerId == ConfigEntityName.DEFAULT)
if (brokerId == ZooKeeperInternals.DEFAULT_STRING)
brokerConfig.dynamicConfig.updateDefaultConfig(properties)
else if (brokerConfig.brokerId == brokerId.trim.toInt) {
brokerConfig.dynamicConfig.updateBrokerConfig(brokerConfig.brokerId, properties)

View File

@ -37,7 +37,7 @@ import org.apache.kafka.common.security.authenticator.LoginManager
import org.apache.kafka.common.utils.{ConfigUtils, Utils}
import org.apache.kafka.security.PasswordEncoder
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.config.{ConfigEntityName, ConfigType, ServerTopicConfigSynonyms}
import org.apache.kafka.server.config.{ConfigType, ServerTopicConfigSynonyms, ZooKeeperInternals}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.ClientMetricsReceiverPlugin
import org.apache.kafka.server.telemetry.ClientTelemetry
@ -233,7 +233,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
zkClientOpt.foreach { zkClient =>
val adminZkClient = new AdminZkClient(zkClient)
updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.BROKER, ConfigEntityName.DEFAULT), false)
updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.BROKER, ZooKeeperInternals.DEFAULT_STRING), false)
val props = adminZkClient.fetchEntityConfig(ConfigType.BROKER, kafkaConfig.brokerId.toString)
val brokerConfig = maybeReEncodePasswords(props, adminZkClient)
updateBrokerConfig(kafkaConfig.brokerId, brokerConfig)

View File

@ -23,7 +23,7 @@ import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.common.config.ConfigDef.Importance._
import org.apache.kafka.common.config.ConfigDef.Range._
import org.apache.kafka.common.config.ConfigDef.Type._
import org.apache.kafka.server.config.{ConfigEntityName, ReplicationQuotaManagerConfig}
import org.apache.kafka.server.config.{ReplicationQuotaManagerConfig, ZooKeeperInternals}
import org.apache.kafka.storage.internals.log.LogConfig
import java.util
@ -102,7 +102,7 @@ object DynamicConfig {
def validate(props: Properties) = DynamicConfig.validate(ipConfigs, props, customPropsAllowed = false)
def isValidIpEntity(ip: String): Boolean = {
if (ip != ConfigEntityName.DEFAULT) {
if (ip != ZooKeeperInternals.DEFAULT_STRING) {
try {
InetAddress.getByName(ip)
} catch {

View File

@ -48,7 +48,7 @@ import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError}
import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter}
import org.apache.kafka.common.utils.Sanitizer
import org.apache.kafka.server.common.AdminOperationException
import org.apache.kafka.server.config.{ConfigEntityName, ConfigType}
import org.apache.kafka.server.config.{ConfigType, ZooKeeperInternals}
import org.apache.kafka.storage.internals.log.LogConfig
import scala.collection.{Map, mutable, _}
@ -518,7 +518,7 @@ class ZkAdminManager(val config: KafkaConfig,
val perBrokerConfig = brokerId.nonEmpty
val persistentProps = if (perBrokerConfig) adminZkClient.fetchEntityConfig(ConfigType.BROKER, brokerId.get.toString)
else adminZkClient.fetchEntityConfig(ConfigType.BROKER, ConfigEntityName.DEFAULT)
else adminZkClient.fetchEntityConfig(ConfigType.BROKER, ZooKeeperInternals.DEFAULT_STRING)
val configProps = this.config.dynamicConfig.fromPersistentProps(persistentProps, perBrokerConfig)
prepareIncrementalConfigs(alterConfigOps, configProps, KafkaConfig.configKeys)
@ -559,13 +559,13 @@ class ZkAdminManager(val config: KafkaConfig,
private def sanitizeEntityName(entityName: String): String =
Option(entityName) match {
case None => ConfigEntityName.DEFAULT
case None => ZooKeeperInternals.DEFAULT_STRING
case Some(name) => Sanitizer.sanitize(name)
}
private def desanitizeEntityName(sanitizedEntityName: String): String =
sanitizedEntityName match {
case ConfigEntityName.DEFAULT => null
case ZooKeeperInternals.DEFAULT_STRING => null
case name => Sanitizer.desanitize(name)
}

View File

@ -24,10 +24,10 @@ import org.apache.kafka.common.config.internals.QuotaConfigs
import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.common.quota.ClientQuotaEntity
import org.apache.kafka.common.utils.Sanitizer
import java.net.{InetAddress, UnknownHostException}
import java.net.{InetAddress, UnknownHostException}
import org.apache.kafka.image.{ClientQuotaDelta, ClientQuotasDelta}
import org.apache.kafka.server.config.ConfigEntityName
import org.apache.kafka.server.config.ZooKeeperInternals
import scala.compat.java8.OptionConverters._
@ -147,13 +147,13 @@ class ClientQuotaMetadataManager(private[metadata] val quotaManagers: QuotaManag
// Convert entity into Options with sanitized values for QuotaManagers
val (sanitizedUser, sanitizedClientId) = quotaEntity match {
case UserEntity(user) => (Some(Sanitizer.sanitize(user)), None)
case DefaultUserEntity => (Some(ConfigEntityName.DEFAULT), None)
case DefaultUserEntity => (Some(ZooKeeperInternals.DEFAULT_STRING), None)
case ClientIdEntity(clientId) => (None, Some(Sanitizer.sanitize(clientId)))
case DefaultClientIdEntity => (None, Some(ConfigEntityName.DEFAULT))
case DefaultClientIdEntity => (None, Some(ZooKeeperInternals.DEFAULT_STRING))
case ExplicitUserExplicitClientIdEntity(user, clientId) => (Some(Sanitizer.sanitize(user)), Some(Sanitizer.sanitize(clientId)))
case ExplicitUserDefaultClientIdEntity(user) => (Some(Sanitizer.sanitize(user)), Some(ConfigEntityName.DEFAULT))
case DefaultUserExplicitClientIdEntity(clientId) => (Some(ConfigEntityName.DEFAULT), Some(Sanitizer.sanitize(clientId)))
case DefaultUserDefaultClientIdEntity => (Some(ConfigEntityName.DEFAULT), Some(ConfigEntityName.DEFAULT))
case ExplicitUserDefaultClientIdEntity(user) => (Some(Sanitizer.sanitize(user)), Some(ZooKeeperInternals.DEFAULT_STRING))
case DefaultUserExplicitClientIdEntity(clientId) => (Some(ZooKeeperInternals.DEFAULT_STRING), Some(Sanitizer.sanitize(clientId)))
case DefaultUserDefaultClientIdEntity => (Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING))
case IpEntity(_) | DefaultIpEntity => throw new IllegalStateException("Should not see IP quota entities here")
}

View File

@ -24,7 +24,7 @@ import kafka.utils.Logging
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, CLIENT_METRICS, TOPIC}
import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.server.config.{ConfigEntityName, ConfigType}
import org.apache.kafka.server.config.{ConfigType, ZooKeeperInternals}
import org.apache.kafka.server.fault.FaultHandler
@ -78,7 +78,7 @@ class DynamicConfigPublisher(
// These are stored in KRaft with an empty name field.
info("Updating cluster configuration : " +
toLoggableProps(resource, props).mkString(","))
nodeConfigHandler.processConfigChanges(ConfigEntityName.DEFAULT, props)
nodeConfigHandler.processConfigChanges(ZooKeeperInternals.DEFAULT_STRING, props)
} catch {
case t: Throwable => faultHandler.handleFault("Error updating " +
s"cluster with new configuration: ${toLoggableProps(resource, props).mkString(",")} " +

View File

@ -22,7 +22,7 @@ import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type
import org.apache.kafka.common.errors.InvalidRequestException
import org.apache.kafka.server.config.{ConfigEntityName, ConfigType}
import org.apache.kafka.server.config.{ConfigType, ZooKeeperInternals}
object ZkConfigRepository {
@ -41,7 +41,7 @@ class ZkConfigRepository(adminZkClient: AdminZkClient) extends ConfigRepository
// ZK stores cluster configs under "<default>".
val effectiveName = if (configResource.`type`.equals(Type.BROKER) &&
configResource.name.isEmpty) {
ConfigEntityName.DEFAULT
ZooKeeperInternals.DEFAULT_STRING
} else {
configResource.name
}

View File

@ -28,7 +28,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.server.common.AdminOperationException
import org.apache.kafka.server.config.{ConfigEntityName, ConfigType}
import org.apache.kafka.server.config.{ConfigType, ZooKeeperInternals}
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.zookeeper.KeeperException.NodeExistsException
@ -345,7 +345,7 @@ class AdminZkClient(zkClient: KafkaZkClient,
*/
def parseBroker(broker: String): Option[Int] = {
broker match {
case ConfigEntityName.DEFAULT => None
case ZooKeeperInternals.DEFAULT_STRING => None
case _ =>
try Some(broker.toInt)
catch {
@ -440,7 +440,7 @@ class AdminZkClient(zkClient: KafkaZkClient,
*
*/
def changeUserOrUserClientIdConfig(sanitizedEntityName: String, configs: Properties, isUserClientId: Boolean = false): Unit = {
if (sanitizedEntityName == ConfigEntityName.DEFAULT || sanitizedEntityName.contains("/clients"))
if (sanitizedEntityName == ZooKeeperInternals.DEFAULT_STRING || sanitizedEntityName.contains("/clients"))
DynamicConfig.Client.validate(configs)
else
DynamicConfig.User.validate(configs)
@ -520,7 +520,7 @@ class AdminZkClient(zkClient: KafkaZkClient,
*/
def changeBrokerConfig(broker: Option[Int], configs: Properties): Unit = {
validateBrokerConfig(configs)
changeEntityConfig(ConfigType.BROKER, broker.map(_.toString).getOrElse(ConfigEntityName.DEFAULT), configs)
changeEntityConfig(ConfigType.BROKER, broker.map(_.toString).getOrElse(ZooKeeperInternals.DEFAULT_STRING), configs)
}
/**

View File

@ -27,7 +27,6 @@ import org.apache.kafka.common.errors.ControllerMovedException
import org.apache.kafka.common.metadata._
import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.security.scram.ScramCredential
import org.apache.kafka.common.utils.Sanitizer
import org.apache.kafka.common.{TopicIdPartition, Uuid}
import org.apache.kafka.metadata.DelegationTokenData
import org.apache.kafka.metadata.PartitionRegistration
@ -227,9 +226,6 @@ class ZkMigrationClient(
entityDataList: util.List[ClientQuotaRecord.EntityData],
quotas: util.Map[String, lang.Double]
): Unit = {
entityDataList.forEach(entityData => {
entityData.setEntityName(Sanitizer.desanitize(entityData.entityName()))
})
val batch = new util.ArrayList[ApiMessageAndVersion]()
quotas.forEach((key, value) => {
batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()

View File

@ -21,6 +21,7 @@ import kafka.server.{DynamicBrokerConfig, DynamicConfig, ZkAdminManager}
import kafka.utils.Logging
import kafka.zk.ZkMigrationClient.{logAndRethrow, wrapZkException}
import kafka.zk._
import kafka.zk.migration.ZkConfigMigrationClient.getSanitizedClientQuotaZNodeName
import kafka.zookeeper.{CreateRequest, DeleteRequest, SetDataRequest}
import org.apache.kafka.clients.admin.ScramMechanism
import org.apache.kafka.common.config.types.Password
@ -29,14 +30,14 @@ import org.apache.kafka.common.errors.InvalidRequestException
import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData
import org.apache.kafka.common.quota.ClientQuotaEntity
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
import org.apache.kafka.common.utils.Sanitizer
import org.apache.kafka.metadata.migration.ConfigMigrationClient.ClientQuotaVisitor
import org.apache.kafka.metadata.migration.{ConfigMigrationClient, MigrationClientException, ZkMigrationLeadershipState}
import org.apache.kafka.security.PasswordEncoder
import org.apache.kafka.server.config.{ConfigEntityName, ConfigType}
import org.apache.kafka.server.config.{ConfigType, ZooKeeperInternals}
import org.apache.zookeeper.KeeperException.Code
import org.apache.zookeeper.{CreateMode, KeeperException}
import java.{lang, util}
import java.util.Properties
import java.util.function.{BiConsumer, Consumer}
@ -50,44 +51,54 @@ class ZkConfigMigrationClient(
val adminZkClient = new AdminZkClient(zkClient)
/**
* In ZK, we use the special string "&lt;default&gt;" to represent the default entity.
* In KRaft, we use an empty string. This method builds an EntityData that converts the special ZK string
* to the special KRaft string.
* In ZK, we use the special string "&lt;default&gt;" to represent the default config entity.
* In KRaft, we use an empty string. This method converts the between the two conventions.
*/
private def fromZkEntityName(entityName: String): String = {
if (entityName.equals(ConfigEntityName.DEFAULT)) {
private def fromZkConfigfEntityName(entityName: String): String = {
if (entityName.equals(ZooKeeperInternals.DEFAULT_STRING)) {
""
} else {
entityName
}
}
private def toZkEntityName(entityName: String): String = {
private def toZkConfigEntityName(entityName: String): String = {
if (entityName.isEmpty) {
ConfigEntityName.DEFAULT
ZooKeeperInternals.DEFAULT_STRING
} else {
entityName
}
}
private def buildEntityData(entityType: String, entityName: String): EntityData = {
new EntityData().setEntityType(entityType).setEntityName(fromZkEntityName(entityName))
private def buildClientQuotaEntityData(
entityType: String,
znodeName: String
): EntityData = {
val result = new EntityData().setEntityType(entityType)
if (znodeName.equals(ZooKeeperInternals.DEFAULT_STRING)) {
// Default __client quota__ entity names are null. This is different than default __configs__,
// which have their names set to the empty string instead.
result.setEntityName(null)
} else {
// ZNode names are sanitized before being stored in ZooKeeper.
// For example, @ is turned into %40. Undo the sanitization here.
result.setEntityName(Sanitizer.desanitize(znodeName))
}
result
}
override def iterateClientQuotas(visitor: ClientQuotaVisitor): Unit = {
def migrateEntityType(zkEntityType: String, entityType: String): Unit = {
adminZkClient.fetchAllEntityConfigs(zkEntityType).foreach { case (name, props) =>
val entity = List(buildEntityData(entityType, name)).asJava
adminZkClient.fetchAllEntityConfigs(zkEntityType).foreach { case (znodeName, props) =>
val entity = List(buildClientQuotaEntityData(entityType, znodeName)).asJava
ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach { mechanism =>
val propertyValue = props.getProperty(mechanism.mechanismName)
if (propertyValue != null) {
val scramCredentials = ScramCredentialUtils.credentialFromString(propertyValue)
logAndRethrow(this, s"Error in client quota visitor for SCRAM credential. User was $entity.") {
visitor.visitScramCredential(name, mechanism, scramCredentials)
visitor.visitScramCredential(Sanitizer.desanitize(znodeName), mechanism, scramCredentials)
}
props.remove(mechanism.mechanismName)
}
@ -108,14 +119,14 @@ class ZkConfigMigrationClient(
migrateEntityType(ConfigType.USER, ClientQuotaEntity.USER)
migrateEntityType(ConfigType.CLIENT, ClientQuotaEntity.CLIENT_ID)
adminZkClient.fetchAllChildEntityConfigs(ConfigType.USER, ConfigType.CLIENT).foreach { case (name, props) =>
adminZkClient.fetchAllChildEntityConfigs(ConfigType.USER, ConfigType.CLIENT).foreach { case (znodePath, props) =>
// Taken from ZkAdminManager
val components = name.split("/")
val components = znodePath.split("/")
if (components.size != 3 || components(1) != "clients")
throw new IllegalArgumentException(s"Unexpected config path: $name")
throw new IllegalArgumentException(s"Unexpected config path: $znodePath")
val entity = List(
buildEntityData(ClientQuotaEntity.USER, components(0)),
buildEntityData(ClientQuotaEntity.CLIENT_ID, components(2))
buildClientQuotaEntityData(ClientQuotaEntity.USER, components(0)),
buildClientQuotaEntityData(ClientQuotaEntity.CLIENT_ID, components(2))
)
val quotaMap = props.asScala.map { case (key, value) =>
val doubleValue = try lang.Double.valueOf(value) catch {
@ -135,7 +146,7 @@ class ZkConfigMigrationClient(
override def iterateBrokerConfigs(configConsumer: BiConsumer[String, util.Map[String, String]]): Unit = {
val brokerEntities = zkClient.getAllEntitiesWithConfig(ConfigType.BROKER)
zkClient.getEntitiesConfigs(ConfigType.BROKER, brokerEntities.toSet).foreach { case (broker, props) =>
val brokerResource = fromZkEntityName(broker)
val brokerResource = fromZkConfigfEntityName(broker)
val decodedProps = props.asScala.map { case (key, value) =>
if (DynamicBrokerConfig.isPasswordConfig(key))
key -> passwordEncoder.decode(value).value
@ -157,7 +168,7 @@ class ZkConfigMigrationClient(
}
override def readTopicConfigs(topicName: String, configConsumer: Consumer[util.Map[String, String]]): Unit = {
val topicResource = fromZkEntityName(topicName)
val topicResource = fromZkConfigfEntityName(topicName)
val props = zkClient.getEntityConfigs(ConfigType.TOPIC, topicResource)
val decodedProps = props.asScala.map { case (key, value) =>
if (DynamicBrokerConfig.isPasswordConfig(key))
@ -182,7 +193,7 @@ class ZkConfigMigrationClient(
case _ => None
}
val configName = toZkEntityName(configResource.name())
val configName = toZkConfigEntityName(configResource.name())
if (configType.isDefined) {
val props = new Properties()
configMap.forEach { case (key, value) =>
@ -221,7 +232,7 @@ class ZkConfigMigrationClient(
case _ => None
}
val configName = toZkEntityName(configResource.name())
val configName = toZkConfigEntityName(configResource.name())
if (configType.isDefined) {
val path = ConfigEntityZNode.path(configType.get, configName)
val requests = Seq(DeleteRequest(path, ZkVersion.MatchAnyVersion))
@ -250,10 +261,9 @@ class ZkConfigMigrationClient(
scram: util.Map[String, String],
state: ZkMigrationLeadershipState
): ZkMigrationLeadershipState = wrapZkException {
val entityMap = entity.asScala
val user = entityMap.get(ClientQuotaEntity.USER).map(toZkEntityName)
val client = entityMap.get(ClientQuotaEntity.CLIENT_ID).map(toZkEntityName)
val ip = entityMap.get(ClientQuotaEntity.IP).map(toZkEntityName)
val user: Option[String] = getSanitizedClientQuotaZNodeName(entity, ClientQuotaEntity.USER)
val client: Option[String] = getSanitizedClientQuotaZNodeName(entity, ClientQuotaEntity.CLIENT_ID)
val ip: Option[String] = getSanitizedClientQuotaZNodeName(entity, ClientQuotaEntity.IP)
val props = new Properties()
val (configType, path, configKeys) = if (user.isDefined && client.isEmpty) {
@ -351,3 +361,34 @@ class ZkConfigMigrationClient(
}
}
object ZkConfigMigrationClient {
/**
* Find the znode name to use for a ClientQuotaEntity.
*
* @param entity The client quota entity map. See org.apache.kafka.common.ClientQuotaEntity.
* @param component The component that we want a znode name for.
* @return Some(znodeName) if there is a znode path; None otherwise.
*/
def getSanitizedClientQuotaZNodeName(
entity: util.Map[String, String],
component: String
): Option[String] = {
if (!entity.containsKey(component)) {
// There is no znode path, because the component wasn't found. For example, if the
// entity was (user -> "bob") and our component was "ip", we would return None here.
None
} else {
val rawValue = entity.get(component)
if (rawValue == null) {
// A raw value of null means this is a default entity. For example, (user -> null) means
// the default user. Yes, this means we stored a null value in the map and it did not mean
// "not present." This is an unfortunate API that should be revisited at some point.
Some(ZooKeeperInternals.DEFAULT_STRING)
} else {
// We found a non-null value, and now we need to sanitize it. For example, "c@@ldude" will
// turn into c%40%40ldude, so that we can use it as a znode name in ZooKeeper.
Some(Sanitizer.sanitize(rawValue))
}
}
}
}

View File

@ -25,7 +25,7 @@ import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.ConfigEntityName
import org.apache.kafka.server.config.ZooKeeperInternals
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@ -93,7 +93,7 @@ class ConfigCommandIntegrationTest extends QuorumTestHarness with Logging {
}
def verifyConfig(configs: Map[String, String], brokerId: Option[String]): Unit = {
val entityConfigs = zkClient.getEntityConfigs("brokers", brokerId.getOrElse(ConfigEntityName.DEFAULT))
val entityConfigs = zkClient.getEntityConfigs("brokers", brokerId.getOrElse(ZooKeeperInternals.DEFAULT_STRING))
assertEquals(configs, entityConfigs.asScala)
}

View File

@ -26,12 +26,13 @@ import org.apache.kafka.clients.admin._
import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
import org.apache.kafka.common.config.{ConfigException, ConfigResource}
import org.apache.kafka.common.config.ConfigResource.Type
import org.apache.kafka.common.errors.{InvalidPartitionsException,PolicyViolationException, UnsupportedVersionException}
import org.apache.kafka.common.errors.{InvalidPartitionsException, PolicyViolationException, UnsupportedVersionException}
import org.apache.kafka.common.message.DescribeClusterRequestData
import org.apache.kafka.common.metadata.{ConfigRecord, FeatureLevelRecord}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.quota.ClientQuotaAlteration.Op
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
import org.apache.kafka.common.requests.{ApiError, DescribeClusterRequest, DescribeClusterResponse}
import org.apache.kafka.common.security.auth.KafkaPrincipal
@ -324,6 +325,68 @@ class KRaftClusterTest {
}
}
def setConsumerByteRate(
admin: Admin,
entity: ClientQuotaEntity,
value: Long
): Unit = {
admin.alterClientQuotas(Collections.singletonList(
new ClientQuotaAlteration(entity, Collections.singletonList(
new Op("consumer_byte_rate", value.doubleValue()))))).
all().get()
}
def getConsumerByteRates(admin: Admin): Map[ClientQuotaEntity, Long] = {
val allFilter = ClientQuotaFilter.contains(Collections.emptyList())
val results = new java.util.HashMap[ClientQuotaEntity, Long]
admin.describeClientQuotas(allFilter).entities().get().forEach {
case (entity, entityMap) =>
Option(entityMap.get("consumer_byte_rate")).foreach {
case value => results.put(entity, value.longValue())
}
}
results.asScala.toMap
}
@Test
def testDefaultClientQuotas(): Unit = {
val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setNumBrokerNodes(1).
setNumControllerNodes(1).build()).build()
try {
cluster.format()
cluster.startup()
TestUtils.waitUntilTrue(() => cluster.brokers().get(0).brokerState == BrokerState.RUNNING,
"Broker never made it to RUNNING state.")
val admin = Admin.create(cluster.clientProperties())
try {
val defaultUser = new ClientQuotaEntity(Collections.singletonMap[String, String]("user", null))
val bobUser = new ClientQuotaEntity(Collections.singletonMap[String, String]("user", "bob"))
TestUtils.retry(30000) {
assertEquals(Map(), getConsumerByteRates(admin))
}
setConsumerByteRate(admin, defaultUser, 100L)
TestUtils.retry(30000) {
assertEquals(Map(
defaultUser -> 100L
), getConsumerByteRates(admin))
}
setConsumerByteRate(admin, bobUser, 1000L)
TestUtils.retry(30000) {
assertEquals(Map(
defaultUser -> 100L,
bobUser -> 1000L
), getConsumerByteRates(admin))
}
} finally {
admin.close()
}
} finally {
cluster.close()
}
}
@Test
def testCreateClusterWithAdvertisedPortZero(): Unit = {
val brokerPropertyOverrides: (TestKitNodes, BrokerNode) => Map[String, String] = (nodes, _) => Map(

View File

@ -40,7 +40,7 @@ import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.resource.ResourceType.TOPIC
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
import org.apache.kafka.common.utils.SecurityUtils
import org.apache.kafka.common.utils.{Sanitizer, SecurityUtils}
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
import org.apache.kafka.metadata.authorizer.StandardAcl
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
@ -48,7 +48,7 @@ import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.security.PasswordEncoder
import org.apache.kafka.server.ControllerRequestCompletionHandler
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, ProducerIdsBlock}
import org.apache.kafka.server.config.{ConfigEntityName, ConfigType}
import org.apache.kafka.server.config.ConfigType
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertNotNull, assertTrue, fail}
import org.junit.jupiter.api.{Assumptions, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
@ -227,11 +227,11 @@ class ZkMigrationIntegrationTest {
createTopicResult.all().get(60, TimeUnit.SECONDS)
val quotas = new util.ArrayList[ClientQuotaAlteration]()
val defaultUserEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> ConfigEntityName.DEFAULT).asJava)
val defaultUserEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, null))
quotas.add(new ClientQuotaAlteration(defaultUserEntity, List(new ClientQuotaAlteration.Op("consumer_byte_rate", 900.0)).asJava))
val defaultClientIdEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> ConfigEntityName.DEFAULT).asJava)
val defaultClientIdEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.CLIENT_ID, null))
quotas.add(new ClientQuotaAlteration(defaultClientIdEntity, List(new ClientQuotaAlteration.Op("consumer_byte_rate", 900.0)).asJava))
val defaultIpEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.IP -> null.asInstanceOf[String]).asJava)
val defaultIpEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.IP, null))
quotas.add(new ClientQuotaAlteration(defaultIpEntity, List(new ClientQuotaAlteration.Op("connection_creation_rate", 9.0)).asJava))
val userEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user/1@prod").asJava)
quotas.add(new ClientQuotaAlteration(userEntity, List(new ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0)).asJava))
@ -275,15 +275,15 @@ class ZkMigrationIntegrationTest {
assertEquals(10, image.topics().getTopic("test-topic-3").partitions().size())
val clientQuotas = image.clientQuotas().entities()
assertEquals(6, clientQuotas.size())
assertEquals(true, clientQuotas.containsKey(defaultUserEntity))
assertEquals(true, clientQuotas.containsKey(defaultClientIdEntity))
assertEquals(true, clientQuotas.containsKey(new ClientQuotaEntity(Map(ClientQuotaEntity.IP -> "").asJava))) // default ip
assertEquals(true, clientQuotas.containsKey(userEntity))
assertEquals(true, clientQuotas.containsKey(userClientEntity))
assertEquals(true, clientQuotas.containsKey(ipEntity))
assertEquals(new java.util.HashSet[ClientQuotaEntity](java.util.Arrays.asList(
defaultUserEntity,
defaultClientIdEntity,
defaultIpEntity,
userEntity,
userClientEntity,
ipEntity
)), clientQuotas.keySet())
}
migrationState = migrationClient.releaseControllerLeadership(migrationState)
}
@ -881,11 +881,14 @@ class ZkMigrationIntegrationTest {
def alterClientQuotas(admin: Admin): AlterClientQuotasResult = {
val quotas = new util.ArrayList[ClientQuotaAlteration]()
quotas.add(new ClientQuotaAlteration(
new ClientQuotaEntity(Map("user" -> "user1").asJava),
new ClientQuotaEntity(Map("user" -> "user@1").asJava),
List(new ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0)).asJava))
quotas.add(new ClientQuotaAlteration(
new ClientQuotaEntity(Map("user" -> "user1", "client-id" -> "clientA").asJava),
new ClientQuotaEntity(Map("user" -> "user@1", "client-id" -> "clientA").asJava),
List(new ClientQuotaAlteration.Op("consumer_byte_rate", 800.0), new ClientQuotaAlteration.Op("producer_byte_rate", 100.0)).asJava))
quotas.add(new ClientQuotaAlteration(
new ClientQuotaEntity(Collections.singletonMap("user", null)),
List(new ClientQuotaAlteration.Op("consumer_byte_rate", 900.0), new ClientQuotaAlteration.Op("producer_byte_rate", 100.0)).asJava))
quotas.add(new ClientQuotaAlteration(
new ClientQuotaEntity(Map("ip" -> "8.8.8.8").asJava),
List(new ClientQuotaAlteration.Op("connection_creation_rate", 10.0)).asJava))
@ -903,7 +906,7 @@ class ZkMigrationIntegrationTest {
val alterations = new util.ArrayList[UserScramCredentialAlteration]()
alterations.add(new UserScramCredentialUpsertion("user1",
new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 8191), "password1"))
alterations.add(new UserScramCredentialUpsertion("user2",
alterations.add(new UserScramCredentialUpsertion("user@2",
new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 8192), "password2"))
admin.alterUserScramCredentials(alterations)
}
@ -918,20 +921,21 @@ class ZkMigrationIntegrationTest {
def verifyClientQuotas(zkClient: KafkaZkClient): Unit = {
TestUtils.retry(10000) {
assertEquals("1000", zkClient.getEntityConfigs(ConfigType.USER, "user1").getProperty("consumer_byte_rate"))
assertEquals("800", zkClient.getEntityConfigs("users/user1/clients", "clientA").getProperty("consumer_byte_rate"))
assertEquals("100", zkClient.getEntityConfigs("users/user1/clients", "clientA").getProperty("producer_byte_rate"))
assertEquals("1000", zkClient.getEntityConfigs(ConfigType.USER, Sanitizer.sanitize("user@1")).getProperty("consumer_byte_rate"))
assertEquals("900", zkClient.getEntityConfigs(ConfigType.USER, "<default>").getProperty("consumer_byte_rate"))
assertEquals("800", zkClient.getEntityConfigs("users/" + Sanitizer.sanitize("user@1") + "/clients", "clientA").getProperty("consumer_byte_rate"))
assertEquals("100", zkClient.getEntityConfigs("users/" + Sanitizer.sanitize("user@1") + "/clients", "clientA").getProperty("producer_byte_rate"))
assertEquals("10", zkClient.getEntityConfigs(ConfigType.IP, "8.8.8.8").getProperty("connection_creation_rate"))
}
}
def verifyUserScramCredentials(zkClient: KafkaZkClient): Unit = {
TestUtils.retry(10000) {
val propertyValue1 = zkClient.getEntityConfigs(ConfigType.USER, "user1").getProperty("SCRAM-SHA-256")
val propertyValue1 = zkClient.getEntityConfigs(ConfigType.USER, Sanitizer.sanitize("user1")).getProperty("SCRAM-SHA-256")
val scramCredentials1 = ScramCredentialUtils.credentialFromString(propertyValue1)
assertEquals(8191, scramCredentials1.iterations)
val propertyValue2 = zkClient.getEntityConfigs(ConfigType.USER, "user2").getProperty("SCRAM-SHA-256")
val propertyValue2 = zkClient.getEntityConfigs(ConfigType.USER, Sanitizer.sanitize("user@2")).getProperty("SCRAM-SHA-256")
assertNotNull(propertyValue2)
val scramCredentials2 = ScramCredentialUtils.credentialFromString(propertyValue2)
assertEquals(8192, scramCredentials2.iterations)

View File

@ -30,7 +30,7 @@ import org.apache.kafka.common.internals.KafkaFutureImpl
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
import org.apache.kafka.common.utils.Sanitizer
import org.apache.kafka.server.config.{ConfigEntityName, ConfigType}
import org.apache.kafka.server.config.{ConfigType, ZooKeeperInternals}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.ArgumentMatchers.anyString
@ -1475,7 +1475,7 @@ class ConfigCommandTest extends Logging {
val clientId = "client-1"
for (opts <- Seq(describeOpts, alterOpts)) {
checkEntity("clients", Some(clientId), clientId, opts)
checkEntity("clients", Some(""), ConfigEntityName.DEFAULT, opts)
checkEntity("clients", Some(""), ZooKeeperInternals.DEFAULT_STRING, opts)
}
checkEntity("clients", None, "", describeOpts)
checkInvalidArgs("clients", None, alterOpts)
@ -1487,7 +1487,7 @@ class ConfigCommandTest extends Logging {
assertEquals(principal, Sanitizer.desanitize(sanitizedPrincipal))
for (opts <- Seq(describeOpts, alterOpts)) {
checkEntity("users", Some(principal), sanitizedPrincipal, opts)
checkEntity("users", Some(""), ConfigEntityName.DEFAULT, opts)
checkEntity("users", Some(""), ZooKeeperInternals.DEFAULT_STRING, opts)
}
checkEntity("users", None, "", describeOpts)
checkInvalidArgs("users", None, alterOpts)
@ -1497,9 +1497,9 @@ class ConfigCommandTest extends Logging {
def clientIdOpts(name: String) = Array("--entity-type", "clients", "--entity-name", name)
for (opts <- Seq(describeOpts, alterOpts)) {
checkEntity("users", Some(principal), userClient, opts ++ clientIdOpts(clientId))
checkEntity("users", Some(principal), sanitizedPrincipal + "/clients/" + ConfigEntityName.DEFAULT, opts ++ clientIdOpts(""))
checkEntity("users", Some(""), ConfigEntityName.DEFAULT + "/clients/" + clientId, describeOpts ++ clientIdOpts(clientId))
checkEntity("users", Some(""), ConfigEntityName.DEFAULT + "/clients/" + ConfigEntityName.DEFAULT, opts ++ clientIdOpts(""))
checkEntity("users", Some(principal), sanitizedPrincipal + "/clients/" + ZooKeeperInternals.DEFAULT_STRING, opts ++ clientIdOpts(""))
checkEntity("users", Some(""), ZooKeeperInternals.DEFAULT_STRING + "/clients/" + clientId, describeOpts ++ clientIdOpts(clientId))
checkEntity("users", Some(""), ZooKeeperInternals.DEFAULT_STRING + "/clients/" + ZooKeeperInternals.DEFAULT_STRING, opts ++ clientIdOpts(""))
}
checkEntity("users", Some(principal), sanitizedPrincipal + "/clients", describeOpts ++ Array("--entity-type", "clients"))
// Both user and client-id must be provided for alter

View File

@ -21,7 +21,7 @@ import kafka.server.QuotaType._
import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Sanitizer
import org.apache.kafka.server.config.{ClientQuotaManagerConfig, ConfigEntityName}
import org.apache.kafka.server.config.{ClientQuotaManagerConfig, ZooKeeperInternals}
import org.apache.kafka.network.Session
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@ -85,7 +85,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
val client1 = UserClient("ANONYMOUS", "p1", None, Some("p1"))
val client2 = UserClient("ANONYMOUS", "p2", None, Some("p2"))
val randomClient = UserClient("ANONYMOUS", "random-client-id", None, None)
val defaultConfigClient = UserClient("", "", None, Some(ConfigEntityName.DEFAULT))
val defaultConfigClient = UserClient("", "", None, Some(ZooKeeperInternals.DEFAULT_STRING))
testQuotaParsing(config, client1, client2, randomClient, defaultConfigClient)
}
@ -98,7 +98,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
val client1 = UserClient("User1", "p1", Some("User1"), None)
val client2 = UserClient("User2", "p2", Some("User2"), None)
val randomClient = UserClient("RandomUser", "random-client-id", None, None)
val defaultConfigClient = UserClient("", "", Some(ConfigEntityName.DEFAULT), None)
val defaultConfigClient = UserClient("", "", Some(ZooKeeperInternals.DEFAULT_STRING), None)
val config = new ClientQuotaManagerConfig()
testQuotaParsing(config, client1, client2, randomClient, defaultConfigClient)
}
@ -112,7 +112,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
val client1 = UserClient("User1", "p1", Some("User1"), Some("p1"))
val client2 = UserClient("User2", "p2", Some("User2"), Some("p2"))
val randomClient = UserClient("RandomUser", "random-client-id", None, None)
val defaultConfigClient = UserClient("", "", Some(ConfigEntityName.DEFAULT), Some(ConfigEntityName.DEFAULT))
val defaultConfigClient = UserClient("", "", Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING))
val config = new ClientQuotaManagerConfig()
testQuotaParsing(config, client1, client2, randomClient, defaultConfigClient)
}
@ -125,7 +125,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
val client1 = UserClient("User1", "p1", Some("User1"), None)
val client2 = UserClient("User2", "p2", Some("User2"), None)
val randomClient = UserClient("RandomUser", "random-client-id", None, None)
val defaultConfigClient = UserClient("", "", Some(ConfigEntityName.DEFAULT), None)
val defaultConfigClient = UserClient("", "", Some(ZooKeeperInternals.DEFAULT_STRING), None)
testQuotaParsing(config, client1, client2, randomClient, defaultConfigClient)
}
@ -137,7 +137,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
val client1 = UserClient("User1", "p1", Some("User1"), Some("p1"))
val client2 = UserClient("User2", "p2", Some("User2"), Some("p2"))
val randomClient = UserClient("RandomUser", "random-client-id", None, None)
val defaultConfigClient = UserClient("", "", Some(ConfigEntityName.DEFAULT), Some(ConfigEntityName.DEFAULT))
val defaultConfigClient = UserClient("", "", Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING))
testQuotaParsing(config, client1, client2, randomClient, defaultConfigClient)
}
@ -168,7 +168,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
assertEquals(Double.MaxValue, clientQuotaManager.getMaxValueInQuotaWindow(userSession, "client1"), 0.01)
// Set default <user> quota config
clientQuotaManager.updateQuota(Some(ConfigEntityName.DEFAULT), None, None, Some(new Quota(10, true)))
clientQuotaManager.updateQuota(Some(ZooKeeperInternals.DEFAULT_STRING), None, None, Some(new Quota(10, true)))
assertEquals(10 * numFullQuotaWindows, clientQuotaManager.getMaxValueInQuotaWindow(userSession, "client1"), 0.01)
} finally {
clientQuotaManager.shutdown()
@ -186,11 +186,11 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
checkQuota(clientQuotaManager, "userA", "client1", Long.MaxValue, 1000, false)
// Set default <user> quota config
clientQuotaManager.updateQuota(Some(ConfigEntityName.DEFAULT), None, None, Some(new Quota(10, true)))
clientQuotaManager.updateQuota(Some(ZooKeeperInternals.DEFAULT_STRING), None, None, Some(new Quota(10, true)))
checkQuota(clientQuotaManager, "userA", "client1", 10, 1000, true)
// Remove default <user> quota config, back to no quotas
clientQuotaManager.updateQuota(Some(ConfigEntityName.DEFAULT), None, None, None)
clientQuotaManager.updateQuota(Some(ZooKeeperInternals.DEFAULT_STRING), None, None, None)
checkQuota(clientQuotaManager, "userA", "client1", Long.MaxValue, 1000, false)
} finally {
clientQuotaManager.shutdown()
@ -241,14 +241,14 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
metrics, Produce, time, "")
try {
clientQuotaManager.updateQuota(Some(ConfigEntityName.DEFAULT), None, None, Some(new Quota(1000, true)))
clientQuotaManager.updateQuota(None, Some(ConfigEntityName.DEFAULT), Some(ConfigEntityName.DEFAULT), Some(new Quota(2000, true)))
clientQuotaManager.updateQuota(Some(ConfigEntityName.DEFAULT), Some(ConfigEntityName.DEFAULT), Some(ConfigEntityName.DEFAULT), Some(new Quota(3000, true)))
clientQuotaManager.updateQuota(Some(ZooKeeperInternals.DEFAULT_STRING), None, None, Some(new Quota(1000, true)))
clientQuotaManager.updateQuota(None, Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING), Some(new Quota(2000, true)))
clientQuotaManager.updateQuota(Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING), Some(new Quota(3000, true)))
clientQuotaManager.updateQuota(Some("userA"), None, None, Some(new Quota(4000, true)))
clientQuotaManager.updateQuota(Some("userA"), Some("client1"), Some("client1"), Some(new Quota(5000, true)))
clientQuotaManager.updateQuota(Some("userB"), None, None, Some(new Quota(6000, true)))
clientQuotaManager.updateQuota(Some("userB"), Some("client1"), Some("client1"), Some(new Quota(7000, true)))
clientQuotaManager.updateQuota(Some("userB"), Some(ConfigEntityName.DEFAULT), Some(ConfigEntityName.DEFAULT), Some(new Quota(8000, true)))
clientQuotaManager.updateQuota(Some("userB"), Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING), Some(new Quota(8000, true)))
clientQuotaManager.updateQuota(Some("userC"), None, None, Some(new Quota(10000, true)))
clientQuotaManager.updateQuota(None, Some("client1"), Some("client1"), Some(new Quota(9000, true)))
@ -266,14 +266,14 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
checkQuota(clientQuotaManager, "userE", "client1", 3000, 2500, false)
// Remove default <user, client> quota config, revert to <user> default
clientQuotaManager.updateQuota(Some(ConfigEntityName.DEFAULT), Some(ConfigEntityName.DEFAULT), Some(ConfigEntityName.DEFAULT), None)
clientQuotaManager.updateQuota(Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING), None)
checkQuota(clientQuotaManager, "userD", "client1", 1000, 0, false) // Metrics tags changed, restart counter
checkQuota(clientQuotaManager, "userE", "client4", 1000, 1500, true)
checkQuota(clientQuotaManager, "userF", "client4", 1000, 800, false) // Default <user> quota shared across clients of user
checkQuota(clientQuotaManager, "userF", "client5", 1000, 800, true)
// Remove default <user> quota config, revert to <client-id> default
clientQuotaManager.updateQuota(Some(ConfigEntityName.DEFAULT), None, None, None)
clientQuotaManager.updateQuota(Some(ZooKeeperInternals.DEFAULT_STRING), None, None, None)
checkQuota(clientQuotaManager, "userF", "client4", 2000, 0, false) // Default <client-id> quota shared across client-id of all users
checkQuota(clientQuotaManager, "userF", "client5", 2000, 0, false)
checkQuota(clientQuotaManager, "userF", "client5", 2000, 2500, true)
@ -290,7 +290,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
checkQuota(clientQuotaManager, "userA", "client6", 8000, 0, true) // Throttled due to shared user quota
clientQuotaManager.updateQuota(Some("userA"), Some("client6"), Some("client6"), Some(new Quota(11000, true)))
checkQuota(clientQuotaManager, "userA", "client6", 11000, 8500, false)
clientQuotaManager.updateQuota(Some("userA"), Some(ConfigEntityName.DEFAULT), Some(ConfigEntityName.DEFAULT), Some(new Quota(12000, true)))
clientQuotaManager.updateQuota(Some("userA"), Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING), Some(new Quota(12000, true)))
clientQuotaManager.updateQuota(Some("userA"), Some("client6"), Some("client6"), None)
checkQuota(clientQuotaManager, "userA", "client6", 12000, 4000, true) // Throttled due to sum of new and earlier values
@ -304,7 +304,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce, time, "")
val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "Produce", ""))
try {
clientQuotaManager.updateQuota(None, Some(ConfigEntityName.DEFAULT), Some(ConfigEntityName.DEFAULT),
clientQuotaManager.updateQuota(None, Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING),
Some(new Quota(500, true)))
// We have 10 second windows. Make sure that there is no quota violation
@ -352,7 +352,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
def testExpireThrottleTimeSensor(): Unit = {
val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce, time, "")
try {
clientQuotaManager.updateQuota(None, Some(ConfigEntityName.DEFAULT), Some(ConfigEntityName.DEFAULT),
clientQuotaManager.updateQuota(None, Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING),
Some(new Quota(500, true)))
maybeRecord(clientQuotaManager, "ANONYMOUS", "client1", 100)
@ -374,7 +374,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
def testExpireQuotaSensors(): Unit = {
val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce, time, "")
try {
clientQuotaManager.updateQuota(None, Some(ConfigEntityName.DEFAULT), Some(ConfigEntityName.DEFAULT),
clientQuotaManager.updateQuota(None, Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING),
Some(new Quota(500, true)))
maybeRecord(clientQuotaManager, "ANONYMOUS", "client1", 100)
@ -401,7 +401,7 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
val clientQuotaManager = new ClientQuotaManager(config, metrics, Produce, time, "")
val clientId = "client@#$%"
try {
clientQuotaManager.updateQuota(None, Some(ConfigEntityName.DEFAULT), Some(ConfigEntityName.DEFAULT),
clientQuotaManager.updateQuota(None, Some(ZooKeeperInternals.DEFAULT_STRING), Some(ZooKeeperInternals.DEFAULT_STRING),
Some(new Quota(500, true)))
maybeRecord(clientQuotaManager, "ANONYMOUS", clientId, 100)
@ -421,6 +421,6 @@ class ClientQuotaManagerTest extends BaseClientQuotaManagerTest {
// The class under test expects only sanitized client configs. We pass both the default value (which should not be
// sanitized to ensure it remains unique) and non-default values, so we need to take care in generating the sanitized
// client ID
def sanitizedConfigClientId = configClientId.map(x => if (x == ConfigEntityName.DEFAULT) ConfigEntityName.DEFAULT else Sanitizer.sanitize(x))
def sanitizedConfigClientId = configClientId.map(x => if (x == ZooKeeperInternals.DEFAULT_STRING) ZooKeeperInternals.DEFAULT_STRING else Sanitizer.sanitize(x))
}
}

View File

@ -20,7 +20,6 @@ package kafka.server
import java.net.InetAddress
import java.util
import java.util.concurrent.{ExecutionException, TimeUnit}
import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
import kafka.test.junit.ClusterTestExtensions
@ -31,7 +30,7 @@ import org.apache.kafka.common.errors.{InvalidRequestException, UnsupportedVersi
import org.apache.kafka.common.internals.KafkaFutureImpl
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
import org.apache.kafka.common.requests.{AlterClientQuotasRequest, AlterClientQuotasResponse, DescribeClientQuotasRequest, DescribeClientQuotasResponse}
import org.apache.kafka.server.config.ConfigEntityName
import org.apache.kafka.server.config.ZooKeeperInternals
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Tag
import org.junit.jupiter.api.extension.ExtendWith
@ -527,7 +526,7 @@ class ClientQuotasRequestTest(cluster: ClusterInstance) {
def testClientQuotasWithDefaultName(): Unit = {
// An entity using the name associated with the default entity name. The entity's name should be sanitized so
// that it does not conflict with the default entity name.
val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.CLIENT_ID -> ConfigEntityName.DEFAULT)).asJava)
val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.CLIENT_ID -> ZooKeeperInternals.DEFAULT_STRING)).asJava)
alterEntityQuotas(entity, Map((ProducerByteRateProp -> Some(20000.0))), validateOnly = false)
verifyDescribeEntityQuotas(entity, Map((ProducerByteRateProp -> 20000.0)))

View File

@ -44,7 +44,7 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
import org.apache.kafka.common.record.{CompressionType, RecordVersion}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1
import org.apache.kafka.server.config.{ConfigEntityName, ConfigType}
import org.apache.kafka.server.config.{ConfigType, ZooKeeperInternals}
import org.apache.kafka.storage.internals.log.LogConfig
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{Test, Timeout}
@ -321,7 +321,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
val ipDefaultProps = new Properties()
ipDefaultProps.put(QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG, "20")
adminZkClient.changeIpConfig(ConfigEntityName.DEFAULT, ipDefaultProps)
adminZkClient.changeIpConfig(ZooKeeperInternals.DEFAULT_STRING, ipDefaultProps)
val ipOverrideProps = new Properties()
ipOverrideProps.put(QuotaConfigs.IP_CONNECTION_RATE_OVERRIDE_CONFIG, "10")

View File

@ -142,15 +142,17 @@ class ZkConfigMigrationClientTest extends ZkMigrationTestHarness {
RecordTestUtils.replayAllBatches(delta, batches)
val image = delta.apply()
assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("user" -> "").asJava)))
assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("user" -> "user1").asJava)))
assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("user" -> "user1", "client-id" -> "clientA").asJava)))
assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("user" -> "", "client-id" -> "").asJava)))
assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("user" -> "", "client-id" -> "clientA").asJava)))
assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("client-id" -> "").asJava)))
assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("client-id" -> "clientB").asJava)))
assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("ip" -> "1.1.1.1").asJava)))
assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("ip" -> "").asJava)))
assertEquals(new util.HashSet[ClientQuotaEntity](java.util.Arrays.asList(
new ClientQuotaEntity(Map("user" -> null.asInstanceOf[String]).asJava),
new ClientQuotaEntity(Map("user" -> "user1").asJava),
new ClientQuotaEntity(Map("user" -> "user1", "client-id" -> "clientA").asJava),
new ClientQuotaEntity(Map("user" -> null.asInstanceOf[String], "client-id" -> null.asInstanceOf[String]).asJava),
new ClientQuotaEntity(Map("user" -> null.asInstanceOf[String], "client-id" -> "clientA").asJava),
new ClientQuotaEntity(Map("client-id" -> null.asInstanceOf[String]).asJava),
new ClientQuotaEntity(Map("client-id" -> "clientB").asJava),
new ClientQuotaEntity(Map("ip" -> "1.1.1.1").asJava),
new ClientQuotaEntity(Map("ip" -> null.asInstanceOf[String]).asJava))),
image.entities().keySet())
}
@Test
@ -186,7 +188,7 @@ class ZkConfigMigrationClientTest extends ZkMigrationTestHarness {
assertEquals(4, migrationState.migrationZkVersion())
migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState,
Map(ClientQuotaEntity.USER -> ""),
Map(ClientQuotaEntity.USER -> null.asInstanceOf[String]),
Map(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 200.0),
ConfigType.USER, "<default>")
assertEquals(5, migrationState.migrationZkVersion())

View File

@ -16,6 +16,12 @@
*/
package org.apache.kafka.server.config;
public class ConfigEntityName {
public static final String DEFAULT = "<default>";
public class ZooKeeperInternals {
/**
* This string is used in ZooKeeper in several places to indicate a default entity type.
* For example, default user quotas are stored under /config/users/&ltdefault&gt
* Note that AdminClient does <b>not</b> use this to indicate a default, nor do records in KRaft mode.
* This constant will go away in Apache Kafka 4.0 with the end of ZK mode.
*/
public static final String DEFAULT_STRING = "<default>";
}