diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala index ee960bd35f8..e69719e7501 100644 --- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala +++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala @@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.ControllerMovedException import org.apache.kafka.common.metadata._ import org.apache.kafka.common.resource.ResourcePattern import org.apache.kafka.common.security.scram.ScramCredential +import org.apache.kafka.common.utils.Sanitizer import org.apache.kafka.common.{TopicIdPartition, Uuid} import org.apache.kafka.metadata.DelegationTokenData import org.apache.kafka.metadata.PartitionRegistration @@ -226,6 +227,9 @@ class ZkMigrationClient( entityDataList: util.List[ClientQuotaRecord.EntityData], quotas: util.Map[String, lang.Double] ): Unit = { + entityDataList.forEach(entityData => { + entityData.setEntityName(Sanitizer.desanitize(entityData.entityName())) + }) val batch = new util.ArrayList[ApiMessageAndVersion]() quotas.forEach((key, value) => { batch.add(new ApiMessageAndVersion(new ClientQuotaRecord() diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala index e52bc4e567a..d442df33adf 100644 --- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala @@ -48,7 +48,7 @@ import org.apache.kafka.raft.RaftConfig import org.apache.kafka.security.PasswordEncoder import org.apache.kafka.server.ControllerRequestCompletionHandler import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, ProducerIdsBlock} -import org.apache.kafka.server.config.ConfigType +import org.apache.kafka.server.config.{ConfigEntityName, ConfigType} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertNotNull, assertTrue, fail} import org.junit.jupiter.api.{Assumptions, Timeout} import org.junit.jupiter.api.extension.ExtendWith @@ -227,15 +227,19 @@ class ZkMigrationIntegrationTest { createTopicResult.all().get(60, TimeUnit.SECONDS) val quotas = new util.ArrayList[ClientQuotaAlteration]() - quotas.add(new ClientQuotaAlteration( - new ClientQuotaEntity(Map("user" -> "user1").asJava), - List(new ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0)).asJava)) - quotas.add(new ClientQuotaAlteration( - new ClientQuotaEntity(Map("user" -> "user1", "client-id" -> "clientA").asJava), + val defaultUserEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> ConfigEntityName.DEFAULT).asJava) + quotas.add(new ClientQuotaAlteration(defaultUserEntity, List(new ClientQuotaAlteration.Op("consumer_byte_rate", 900.0)).asJava)) + val defaultClientIdEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> ConfigEntityName.DEFAULT).asJava) + quotas.add(new ClientQuotaAlteration(defaultClientIdEntity, List(new ClientQuotaAlteration.Op("consumer_byte_rate", 900.0)).asJava)) + val defaultIpEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.IP -> null.asInstanceOf[String]).asJava) + quotas.add(new ClientQuotaAlteration(defaultIpEntity, List(new ClientQuotaAlteration.Op("connection_creation_rate", 9.0)).asJava)) + val userEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user/1@prod").asJava) + quotas.add(new ClientQuotaAlteration(userEntity, List(new ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0)).asJava)) + val userClientEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user/1@prod", ClientQuotaEntity.CLIENT_ID -> "client/1@domain").asJava) + quotas.add(new ClientQuotaAlteration(userClientEntity, List(new ClientQuotaAlteration.Op("consumer_byte_rate", 800.0), new ClientQuotaAlteration.Op("producer_byte_rate", 100.0)).asJava)) - quotas.add(new ClientQuotaAlteration( - new ClientQuotaEntity(Map("ip" -> "8.8.8.8").asJava), - List(new ClientQuotaAlteration.Op("connection_creation_rate", 10.0)).asJava)) + val ipEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.IP -> "8.8.8.8").asJava) + quotas.add(new ClientQuotaAlteration(ipEntity, List(new ClientQuotaAlteration.Op("connection_creation_rate", 10.0)).asJava)) admin.alterClientQuotas(quotas).all().get(60, TimeUnit.SECONDS) val zkClient = clusterInstance.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient @@ -271,7 +275,13 @@ class ZkMigrationIntegrationTest { assertEquals(10, image.topics().getTopic("test-topic-3").partitions().size()) val clientQuotas = image.clientQuotas().entities() - assertEquals(3, clientQuotas.size()) + assertEquals(6, clientQuotas.size()) + assertEquals(true, clientQuotas.containsKey(defaultUserEntity)) + assertEquals(true, clientQuotas.containsKey(defaultClientIdEntity)) + assertEquals(true, clientQuotas.containsKey(new ClientQuotaEntity(Map(ClientQuotaEntity.IP -> "").asJava))) // default ip + assertEquals(true, clientQuotas.containsKey(userEntity)) + assertEquals(true, clientQuotas.containsKey(userClientEntity)) + assertEquals(true, clientQuotas.containsKey(ipEntity)) } migrationState = migrationClient.releaseControllerLeadership(migrationState)