mirror of https://github.com/apache/kafka.git
KAFKA-16222: desanitize entity name when migrate client quotas (#15481)
The entity name is sanitized when it's in Zk mode. We didn't desanitize it when we migrate client quotas. Add Sanitizer.desanitize to fix it. Reviewers: Luke Chen <showuon@gmail.com>
This commit is contained in:
parent
8c0fafba58
commit
34d365fd8a
|
@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.ControllerMovedException
|
|||
import org.apache.kafka.common.metadata._
|
||||
import org.apache.kafka.common.resource.ResourcePattern
|
||||
import org.apache.kafka.common.security.scram.ScramCredential
|
||||
import org.apache.kafka.common.utils.Sanitizer
|
||||
import org.apache.kafka.common.{TopicIdPartition, Uuid}
|
||||
import org.apache.kafka.metadata.DelegationTokenData
|
||||
import org.apache.kafka.metadata.PartitionRegistration
|
||||
|
@ -226,6 +227,9 @@ class ZkMigrationClient(
|
|||
entityDataList: util.List[ClientQuotaRecord.EntityData],
|
||||
quotas: util.Map[String, lang.Double]
|
||||
): Unit = {
|
||||
entityDataList.forEach(entityData => {
|
||||
entityData.setEntityName(Sanitizer.desanitize(entityData.entityName()))
|
||||
})
|
||||
val batch = new util.ArrayList[ApiMessageAndVersion]()
|
||||
quotas.forEach((key, value) => {
|
||||
batch.add(new ApiMessageAndVersion(new ClientQuotaRecord()
|
||||
|
|
|
@ -48,7 +48,7 @@ import org.apache.kafka.raft.RaftConfig
|
|||
import org.apache.kafka.security.PasswordEncoder
|
||||
import org.apache.kafka.server.ControllerRequestCompletionHandler
|
||||
import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion, ProducerIdsBlock}
|
||||
import org.apache.kafka.server.config.ConfigType
|
||||
import org.apache.kafka.server.config.{ConfigEntityName, ConfigType}
|
||||
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertNotNull, assertTrue, fail}
|
||||
import org.junit.jupiter.api.{Assumptions, Timeout}
|
||||
import org.junit.jupiter.api.extension.ExtendWith
|
||||
|
@ -227,15 +227,19 @@ class ZkMigrationIntegrationTest {
|
|||
createTopicResult.all().get(60, TimeUnit.SECONDS)
|
||||
|
||||
val quotas = new util.ArrayList[ClientQuotaAlteration]()
|
||||
quotas.add(new ClientQuotaAlteration(
|
||||
new ClientQuotaEntity(Map("user" -> "user1").asJava),
|
||||
List(new ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0)).asJava))
|
||||
quotas.add(new ClientQuotaAlteration(
|
||||
new ClientQuotaEntity(Map("user" -> "user1", "client-id" -> "clientA").asJava),
|
||||
val defaultUserEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> ConfigEntityName.DEFAULT).asJava)
|
||||
quotas.add(new ClientQuotaAlteration(defaultUserEntity, List(new ClientQuotaAlteration.Op("consumer_byte_rate", 900.0)).asJava))
|
||||
val defaultClientIdEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> ConfigEntityName.DEFAULT).asJava)
|
||||
quotas.add(new ClientQuotaAlteration(defaultClientIdEntity, List(new ClientQuotaAlteration.Op("consumer_byte_rate", 900.0)).asJava))
|
||||
val defaultIpEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.IP -> null.asInstanceOf[String]).asJava)
|
||||
quotas.add(new ClientQuotaAlteration(defaultIpEntity, List(new ClientQuotaAlteration.Op("connection_creation_rate", 9.0)).asJava))
|
||||
val userEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user/1@prod").asJava)
|
||||
quotas.add(new ClientQuotaAlteration(userEntity, List(new ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0)).asJava))
|
||||
val userClientEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user/1@prod", ClientQuotaEntity.CLIENT_ID -> "client/1@domain").asJava)
|
||||
quotas.add(new ClientQuotaAlteration(userClientEntity,
|
||||
List(new ClientQuotaAlteration.Op("consumer_byte_rate", 800.0), new ClientQuotaAlteration.Op("producer_byte_rate", 100.0)).asJava))
|
||||
quotas.add(new ClientQuotaAlteration(
|
||||
new ClientQuotaEntity(Map("ip" -> "8.8.8.8").asJava),
|
||||
List(new ClientQuotaAlteration.Op("connection_creation_rate", 10.0)).asJava))
|
||||
val ipEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.IP -> "8.8.8.8").asJava)
|
||||
quotas.add(new ClientQuotaAlteration(ipEntity, List(new ClientQuotaAlteration.Op("connection_creation_rate", 10.0)).asJava))
|
||||
admin.alterClientQuotas(quotas).all().get(60, TimeUnit.SECONDS)
|
||||
|
||||
val zkClient = clusterInstance.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
|
||||
|
@ -271,7 +275,13 @@ class ZkMigrationIntegrationTest {
|
|||
assertEquals(10, image.topics().getTopic("test-topic-3").partitions().size())
|
||||
|
||||
val clientQuotas = image.clientQuotas().entities()
|
||||
assertEquals(3, clientQuotas.size())
|
||||
assertEquals(6, clientQuotas.size())
|
||||
assertEquals(true, clientQuotas.containsKey(defaultUserEntity))
|
||||
assertEquals(true, clientQuotas.containsKey(defaultClientIdEntity))
|
||||
assertEquals(true, clientQuotas.containsKey(new ClientQuotaEntity(Map(ClientQuotaEntity.IP -> "").asJava))) // default ip
|
||||
assertEquals(true, clientQuotas.containsKey(userEntity))
|
||||
assertEquals(true, clientQuotas.containsKey(userClientEntity))
|
||||
assertEquals(true, clientQuotas.containsKey(ipEntity))
|
||||
}
|
||||
|
||||
migrationState = migrationClient.releaseControllerLeadership(migrationState)
|
||||
|
|
Loading…
Reference in New Issue