From fbff947d25956100b1189fa4ee4ca64ea775df11 Mon Sep 17 00:00:00 2001 From: "Colin P. McCabe" Date: Wed, 27 Mar 2024 09:38:11 -0700 Subject: [PATCH] KAFKA-16411: Correctly migrate default client quota entities (#15584) KAFKA-16222 fixed a bug whereby we didn't undo the name sanitization used on client quota entity names stored in ZooKeeper. However, it incorrectly claimed to fix the handling of default client quota entities. It also failed to correctly re-sanitize when syncronizing the data back to ZooKeeper. This PR fixes ZkConfigMigrationClient to do the sanitization correctly on both the read and write paths. We do de-sanitization before invoking the visitors, since after all it does not make sense to do the same de-sanitization step in each and every visitor. Additionally, this PR fixes a bug causing default entities to be converted incorrectly. For example, ClientQuotaEntity(user -> null) is stored under the /config/users/ znode in ZooKeeper. In KRaft it appears as a ClientQuotaRecord with EntityData(entityType=users, entityName=null). Prior to this PR, this was being converted to a ClientQuotaRecord with EntityData(entityType=users, entityName=""). That represents a quota on the user whose name is the empty string (yes, we allow users to name themselves with the empty string, sadly.) The confusion appears to have arisen because for TOPIC and BROKER configurations, the default ConfigResource is indeed the one named with the empty (not null) string. For example, the default topic configuration resource is ConfigResource(name="", type=TOPIC). However, things are different for client quotas. Default client quota entities in KRaft (and also in AdminClient) are represented by maps with null values. For example, the default User entity is represented by Map("user" -> null). In retrospect, using a map with null values was a poor choice; a Map> would have made more sense. However, this is the way the API currently is and we have to convert correctly. There was an additional level of confusion present in KAFKA-16222 where someone thought that using the ZooKeeper placeholder string "" in the AdminClient API would yield a default client quota entity. Thise seems to have been suggested by the ConfigEntityName class that was created recently. In fact, is not part of any public API in Kafka. Accordingly, this PR also renames ConfigEntityName.DEFAULT to ZooKeeperInternals.DEFAULT_STRING, to make it clear that the string is just a detail of the ZooKeeper implementation. It is not used in the Kafka API to indicate defaults. Hopefully this will avoid confusion in the future. Finally, the PR also creates KRaftClusterTest.testDefaultClientQuotas to get extra test coverage of setting default client quotas. Reviewers: Manikumar Reddy , Igor Soarez Conflicts: Do not backport the changes to create ZooKeeperInternals.DEFAULT_STRING to this branch, to make the cherry-pick smaller. --- .../scala/kafka/zk/ZkMigrationClient.scala | 4 - .../migration/ZkConfigMigrationClient.scala | 89 ++++++++++++++----- .../kafka/server/KRaftClusterTest.scala | 63 +++++++++++++ .../kafka/zk/ZkMigrationIntegrationTest.scala | 45 +++++----- .../ZkConfigMigrationClientTest.scala | 22 ++--- 5 files changed, 167 insertions(+), 56 deletions(-) diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala index 76e0b47aee8..a11a84c017b 100644 --- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala +++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala @@ -27,7 +27,6 @@ import org.apache.kafka.common.errors.ControllerMovedException import org.apache.kafka.common.metadata._ import org.apache.kafka.common.resource.ResourcePattern import org.apache.kafka.common.security.scram.ScramCredential -import org.apache.kafka.common.utils.Sanitizer import org.apache.kafka.common.{TopicIdPartition, Uuid} import org.apache.kafka.metadata.DelegationTokenData import org.apache.kafka.metadata.PartitionRegistration @@ -226,9 +225,6 @@ class ZkMigrationClient( entityDataList: util.List[ClientQuotaRecord.EntityData], quotas: util.Map[String, lang.Double] ): Unit = { - entityDataList.forEach(entityData => { - entityData.setEntityName(Sanitizer.desanitize(entityData.entityName())) - }) val batch = new util.ArrayList[ApiMessageAndVersion]() quotas.forEach((key, value) => { batch.add(new ApiMessageAndVersion(new ClientQuotaRecord() diff --git a/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala b/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala index 55fb048e686..844c1aabc4c 100644 --- a/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala +++ b/core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala @@ -21,6 +21,7 @@ import kafka.server.{ConfigEntityName, ConfigType, DynamicBrokerConfig, DynamicC import kafka.utils.{Logging, PasswordEncoder} import kafka.zk.ZkMigrationClient.{logAndRethrow, wrapZkException} import kafka.zk._ +import kafka.zk.migration.ZkConfigMigrationClient.getSanitizedClientQuotaZNodeName import kafka.zookeeper.{CreateRequest, DeleteRequest, SetDataRequest} import org.apache.kafka.clients.admin.ScramMechanism import org.apache.kafka.common.config.types.Password @@ -29,6 +30,7 @@ import org.apache.kafka.common.errors.InvalidRequestException import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData import org.apache.kafka.common.quota.ClientQuotaEntity import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils +import org.apache.kafka.common.utils.Sanitizer import org.apache.kafka.metadata.migration.ConfigMigrationClient.ClientQuotaVisitor import org.apache.kafka.metadata.migration.{ConfigMigrationClient, MigrationClientException, ZkMigrationLeadershipState} import org.apache.zookeeper.KeeperException.Code @@ -49,11 +51,10 @@ class ZkConfigMigrationClient( /** - * In ZK, we use the special string "<default>" to represent the default entity. - * In KRaft, we use an empty string. This method builds an EntityData that converts the special ZK string - * to the special KRaft string. + * In ZK, we use the special string "<default>" to represent the default config entity. + * In KRaft, we use an empty string. This method converts the between the two conventions. */ - private def fromZkEntityName(entityName: String): String = { + private def fromZkConfigEntityName(entityName: String): String = { if (entityName.equals(ConfigEntityName.Default)) { "" } else { @@ -61,7 +62,7 @@ class ZkConfigMigrationClient( } } - private def toZkEntityName(entityName: String): String = { + private def toZkConfigEntityName(entityName: String): String = { if (entityName.isEmpty) { ConfigEntityName.Default } else { @@ -69,22 +70,35 @@ class ZkConfigMigrationClient( } } - private def buildEntityData(entityType: String, entityName: String): EntityData = { - new EntityData().setEntityType(entityType).setEntityName(fromZkEntityName(entityName)) + private def buildClientQuotaEntityData( + entityType: String, + znodeName: String + ): EntityData = { + val result = new EntityData().setEntityType(entityType) + if (znodeName.equals(ConfigEntityName.Default)) { + // Default __client quota__ entity names are null. This is different than default __configs__, + // which have their names set to the empty string instead. + result.setEntityName(null) + } else { + // ZNode names are sanitized before being stored in ZooKeeper. + // For example, @ is turned into %40. Undo the sanitization here. + result.setEntityName(Sanitizer.desanitize(znodeName)) + } + result } override def iterateClientQuotas(visitor: ClientQuotaVisitor): Unit = { def migrateEntityType(zkEntityType: String, entityType: String): Unit = { adminZkClient.fetchAllEntityConfigs(zkEntityType).foreach { case (name, props) => - val entity = List(buildEntityData(entityType, name)).asJava + val entity = List(buildClientQuotaEntityData(entityType, name)).asJava ScramMechanism.values().filter(_ != ScramMechanism.UNKNOWN).foreach { mechanism => val propertyValue = props.getProperty(mechanism.mechanismName) if (propertyValue != null) { val scramCredentials = ScramCredentialUtils.credentialFromString(propertyValue) logAndRethrow(this, s"Error in client quota visitor for SCRAM credential. User was $entity.") { - visitor.visitScramCredential(name, mechanism, scramCredentials) + visitor.visitScramCredential(Sanitizer.desanitize(name), mechanism, scramCredentials) } props.remove(mechanism.mechanismName) } @@ -105,14 +119,14 @@ class ZkConfigMigrationClient( migrateEntityType(ConfigType.User, ClientQuotaEntity.USER) migrateEntityType(ConfigType.Client, ClientQuotaEntity.CLIENT_ID) - adminZkClient.fetchAllChildEntityConfigs(ConfigType.User, ConfigType.Client).foreach { case (name, props) => + adminZkClient.fetchAllChildEntityConfigs(ConfigType.User, ConfigType.Client).foreach { case (znodePath, props) => // Taken from ZkAdminManager - val components = name.split("/") + val components = znodePath.split("/") if (components.size != 3 || components(1) != "clients") - throw new IllegalArgumentException(s"Unexpected config path: ${name}") + throw new IllegalArgumentException(s"Unexpected config path: ${znodePath}") val entity = List( - buildEntityData(ClientQuotaEntity.USER, components(0)), - buildEntityData(ClientQuotaEntity.CLIENT_ID, components(2)) + buildClientQuotaEntityData(ClientQuotaEntity.USER, components(0)), + buildClientQuotaEntityData(ClientQuotaEntity.CLIENT_ID, components(2)) ) val quotaMap = props.asScala.map { case (key, value) => val doubleValue = try lang.Double.valueOf(value) catch { @@ -132,7 +146,7 @@ class ZkConfigMigrationClient( override def iterateBrokerConfigs(configConsumer: BiConsumer[String, util.Map[String, String]]): Unit = { val brokerEntities = zkClient.getAllEntitiesWithConfig(ConfigType.Broker) zkClient.getEntitiesConfigs(ConfigType.Broker, brokerEntities.toSet).foreach { case (broker, props) => - val brokerResource = fromZkEntityName(broker) + val brokerResource = fromZkConfigEntityName(broker) val decodedProps = props.asScala.map { case (key, value) => if (DynamicBrokerConfig.isPasswordConfig(key)) key -> passwordEncoder.decode(value).value @@ -154,7 +168,7 @@ class ZkConfigMigrationClient( } override def readTopicConfigs(topicName: String, configConsumer: Consumer[util.Map[String, String]]): Unit = { - val topicResource = fromZkEntityName(topicName) + val topicResource = fromZkConfigEntityName(topicName) val props = zkClient.getEntityConfigs(ConfigType.Topic, topicResource) val decodedProps = props.asScala.map { case (key, value) => if (DynamicBrokerConfig.isPasswordConfig(key)) @@ -179,7 +193,7 @@ class ZkConfigMigrationClient( case _ => None } - val configName = toZkEntityName(configResource.name()) + val configName = toZkConfigEntityName(configResource.name()) if (configType.isDefined) { val props = new Properties() configMap.forEach { case (key, value) => @@ -218,7 +232,7 @@ class ZkConfigMigrationClient( case _ => None } - val configName = toZkEntityName(configResource.name()) + val configName = toZkConfigEntityName(configResource.name()) if (configType.isDefined) { val path = ConfigEntityZNode.path(configType.get, configName) val requests = Seq(DeleteRequest(path, ZkVersion.MatchAnyVersion)) @@ -247,10 +261,9 @@ class ZkConfigMigrationClient( scram: util.Map[String, String], state: ZkMigrationLeadershipState ): ZkMigrationLeadershipState = wrapZkException { - val entityMap = entity.asScala - val user = entityMap.get(ClientQuotaEntity.USER).map(toZkEntityName) - val client = entityMap.get(ClientQuotaEntity.CLIENT_ID).map(toZkEntityName) - val ip = entityMap.get(ClientQuotaEntity.IP).map(toZkEntityName) + val user: Option[String] = getSanitizedClientQuotaZNodeName(entity, ClientQuotaEntity.USER) + val client: Option[String] = getSanitizedClientQuotaZNodeName(entity, ClientQuotaEntity.CLIENT_ID) + val ip: Option[String] = getSanitizedClientQuotaZNodeName(entity, ClientQuotaEntity.IP) val props = new Properties() val (configType, path, configKeys) = if (user.isDefined && client.isEmpty) { @@ -348,3 +361,35 @@ class ZkConfigMigrationClient( } } +object ZkConfigMigrationClient { + /** + * Find the znode name to use for a ClientQuotaEntity. + * + * @param entity The client quota entity map. See org.apache.kafka.common.ClientQuotaEntity. + * @param component The component that we want a znode name for. + * @return Some(znodeName) if there is a znode path; None otherwise. + */ + def getSanitizedClientQuotaZNodeName( + entity: util.Map[String, String], + component: String + ): Option[String] = { + if (!entity.containsKey(component)) { + // There is no znode path, because the component wasn't found. For example, if the + // entity was (user -> "bob") and our component was "ip", we would return None here. + None + } else { + val rawValue = entity.get(component) + if (rawValue == null) { + // A raw value of null means this is a default entity. For example, (user -> null) means + // the default user. Yes, this means we stored a null value in the map and it did not mean + // "not present." This is an unfortunate API that should be revisited at some point. + Some(ConfigEntityName.Default) + } else { + // We found a non-null value, and now we need to sanitize it. For example, "c@@ldude" will + // turn into c%40%40ldude, so that we can use it as a znode name in ZooKeeper. + Some(Sanitizer.sanitize(rawValue)) + } + } + } +} + diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index dbea33f9058..48b985e1732 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -32,6 +32,7 @@ import org.apache.kafka.common.metadata.{ConfigRecord, FeatureLevelRecord} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors._ +import org.apache.kafka.common.quota.ClientQuotaAlteration.Op import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent} import org.apache.kafka.common.requests.{ApiError, DescribeClusterRequest, DescribeClusterResponse} import org.apache.kafka.common.security.auth.KafkaPrincipal @@ -324,6 +325,68 @@ class KRaftClusterTest { } } + def setConsumerByteRate( + admin: Admin, + entity: ClientQuotaEntity, + value: Long + ): Unit = { + admin.alterClientQuotas(Collections.singletonList( + new ClientQuotaAlteration(entity, Collections.singletonList( + new Op("consumer_byte_rate", value.doubleValue()))))). + all().get() + } + + def getConsumerByteRates(admin: Admin): Map[ClientQuotaEntity, Long] = { + val allFilter = ClientQuotaFilter.contains(Collections.emptyList()) + val results = new java.util.HashMap[ClientQuotaEntity, Long] + admin.describeClientQuotas(allFilter).entities().get().forEach { + case (entity, entityMap) => + Option(entityMap.get("consumer_byte_rate")).foreach { + case value => results.put(entity, value.longValue()) + } + } + results.asScala.toMap + } + + @Test + def testDefaultClientQuotas(): Unit = { + val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setNumBrokerNodes(1). + setNumControllerNodes(1).build()).build() + try { + cluster.format() + cluster.startup() + TestUtils.waitUntilTrue(() => cluster.brokers().get(0).brokerState == BrokerState.RUNNING, + "Broker never made it to RUNNING state.") + val admin = Admin.create(cluster.clientProperties()) + try { + val defaultUser = new ClientQuotaEntity(Collections.singletonMap[String, String]("user", null)) + val bobUser = new ClientQuotaEntity(Collections.singletonMap[String, String]("user", "bob")) + TestUtils.retry(30000) { + assertEquals(Map(), getConsumerByteRates(admin)) + } + setConsumerByteRate(admin, defaultUser, 100L) + TestUtils.retry(30000) { + assertEquals(Map( + defaultUser -> 100L + ), getConsumerByteRates(admin)) + } + setConsumerByteRate(admin, bobUser, 1000L) + TestUtils.retry(30000) { + assertEquals(Map( + defaultUser -> 100L, + bobUser -> 1000L + ), getConsumerByteRates(admin)) + } + } finally { + admin.close() + } + } finally { + cluster.close() + } + } + @Test def testCreateClusterWithAdvertisedPortZero(): Unit = { val brokerPropertyOverrides: (TestKitNodes, BrokerNode) => Map[String, String] = (nodes, _) => Map( diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala index f8fe82ed9d0..187e89e770f 100644 --- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala @@ -17,7 +17,7 @@ package kafka.zk import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString} -import kafka.server.{ConfigEntityName, ConfigType, ControllerRequestCompletionHandler, KafkaConfig} +import kafka.server.{ConfigType, ControllerRequestCompletionHandler, KafkaConfig} import kafka.test.{ClusterConfig, ClusterGenerator, ClusterInstance} import kafka.test.annotation.{AutoStart, ClusterConfigProperty, ClusterTemplate, ClusterTest, Type} import kafka.test.junit.ClusterTestExtensions @@ -40,7 +40,7 @@ import org.apache.kafka.common.resource.ResourcePattern import org.apache.kafka.common.resource.ResourceType.TOPIC import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils -import org.apache.kafka.common.utils.SecurityUtils +import org.apache.kafka.common.utils.{Sanitizer, SecurityUtils} import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance} import org.apache.kafka.metadata.authorizer.StandardAcl import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState @@ -216,11 +216,11 @@ class ZkMigrationIntegrationTest { createTopicResult.all().get(60, TimeUnit.SECONDS) val quotas = new util.ArrayList[ClientQuotaAlteration]() - val defaultUserEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> ConfigEntityName.Default).asJava) + val defaultUserEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, null)) quotas.add(new ClientQuotaAlteration(defaultUserEntity, List(new ClientQuotaAlteration.Op("consumer_byte_rate", 900.0)).asJava)) - val defaultClientIdEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> ConfigEntityName.Default).asJava) + val defaultClientIdEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.CLIENT_ID, null)) quotas.add(new ClientQuotaAlteration(defaultClientIdEntity, List(new ClientQuotaAlteration.Op("consumer_byte_rate", 900.0)).asJava)) - val defaultIpEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.IP -> null.asInstanceOf[String]).asJava) + val defaultIpEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.IP, null)) quotas.add(new ClientQuotaAlteration(defaultIpEntity, List(new ClientQuotaAlteration.Op("connection_creation_rate", 9.0)).asJava)) val userEntity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> "user/1@prod").asJava) quotas.add(new ClientQuotaAlteration(userEntity, List(new ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0)).asJava)) @@ -264,13 +264,14 @@ class ZkMigrationIntegrationTest { assertEquals(10, image.topics().getTopic("test-topic-3").partitions().size()) val clientQuotas = image.clientQuotas().entities() - assertEquals(6, clientQuotas.size()) - assertEquals(true, clientQuotas.containsKey(defaultUserEntity)) - assertEquals(true, clientQuotas.containsKey(defaultClientIdEntity)) - assertEquals(true, clientQuotas.containsKey(new ClientQuotaEntity(Map(ClientQuotaEntity.IP -> "").asJava))) // default ip - assertEquals(true, clientQuotas.containsKey(userEntity)) - assertEquals(true, clientQuotas.containsKey(userClientEntity)) - assertEquals(true, clientQuotas.containsKey(ipEntity)) + assertEquals(new java.util.HashSet[ClientQuotaEntity](java.util.Arrays.asList( + defaultUserEntity, + defaultClientIdEntity, + defaultIpEntity, + userEntity, + userClientEntity, + ipEntity + )), clientQuotas.keySet()) } migrationState = migrationClient.releaseControllerLeadership(migrationState) @@ -832,11 +833,14 @@ class ZkMigrationIntegrationTest { def alterClientQuotas(admin: Admin): AlterClientQuotasResult = { val quotas = new util.ArrayList[ClientQuotaAlteration]() quotas.add(new ClientQuotaAlteration( - new ClientQuotaEntity(Map("user" -> "user1").asJava), + new ClientQuotaEntity(Map("user" -> "user@1").asJava), List(new ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0)).asJava)) quotas.add(new ClientQuotaAlteration( - new ClientQuotaEntity(Map("user" -> "user1", "client-id" -> "clientA").asJava), + new ClientQuotaEntity(Map("user" -> "user@1", "client-id" -> "clientA").asJava), List(new ClientQuotaAlteration.Op("consumer_byte_rate", 800.0), new ClientQuotaAlteration.Op("producer_byte_rate", 100.0)).asJava)) + quotas.add(new ClientQuotaAlteration( + new ClientQuotaEntity(Collections.singletonMap("user", null)), + List(new ClientQuotaAlteration.Op("consumer_byte_rate", 900.0), new ClientQuotaAlteration.Op("producer_byte_rate", 100.0)).asJava)) quotas.add(new ClientQuotaAlteration( new ClientQuotaEntity(Map("ip" -> "8.8.8.8").asJava), List(new ClientQuotaAlteration.Op("connection_creation_rate", 10.0)).asJava)) @@ -854,7 +858,7 @@ class ZkMigrationIntegrationTest { val alterations = new util.ArrayList[UserScramCredentialAlteration]() alterations.add(new UserScramCredentialUpsertion("user1", new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 8191), "password1")) - alterations.add(new UserScramCredentialUpsertion("user2", + alterations.add(new UserScramCredentialUpsertion("user@2", new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 8192), "password2")) admin.alterUserScramCredentials(alterations) } @@ -869,20 +873,21 @@ class ZkMigrationIntegrationTest { def verifyClientQuotas(zkClient: KafkaZkClient): Unit = { TestUtils.retry(10000) { - assertEquals("1000", zkClient.getEntityConfigs(ConfigType.User, "user1").getProperty("consumer_byte_rate")) - assertEquals("800", zkClient.getEntityConfigs("users/user1/clients", "clientA").getProperty("consumer_byte_rate")) - assertEquals("100", zkClient.getEntityConfigs("users/user1/clients", "clientA").getProperty("producer_byte_rate")) + assertEquals("1000", zkClient.getEntityConfigs(ConfigType.User, Sanitizer.sanitize("user@1")).getProperty("consumer_byte_rate")) + assertEquals("900", zkClient.getEntityConfigs(ConfigType.User, "").getProperty("consumer_byte_rate")) + assertEquals("800", zkClient.getEntityConfigs("users/" + Sanitizer.sanitize("user@1") + "/clients", "clientA").getProperty("consumer_byte_rate")) + assertEquals("100", zkClient.getEntityConfigs("users/" + Sanitizer.sanitize("user@1") + "/clients", "clientA").getProperty("producer_byte_rate")) assertEquals("10", zkClient.getEntityConfigs(ConfigType.Ip, "8.8.8.8").getProperty("connection_creation_rate")) } } def verifyUserScramCredentials(zkClient: KafkaZkClient): Unit = { TestUtils.retry(10000) { - val propertyValue1 = zkClient.getEntityConfigs(ConfigType.User, "user1").getProperty("SCRAM-SHA-256") + val propertyValue1 = zkClient.getEntityConfigs(ConfigType.User, Sanitizer.sanitize("user1")).getProperty("SCRAM-SHA-256") val scramCredentials1 = ScramCredentialUtils.credentialFromString(propertyValue1) assertEquals(8191, scramCredentials1.iterations) - val propertyValue2 = zkClient.getEntityConfigs(ConfigType.User, "user2").getProperty("SCRAM-SHA-256") + val propertyValue2 = zkClient.getEntityConfigs(ConfigType.User, Sanitizer.sanitize("user@2")).getProperty("SCRAM-SHA-256") assertNotNull(propertyValue2) val scramCredentials2 = ScramCredentialUtils.credentialFromString(propertyValue2) assertEquals(8192, scramCredentials2.iterations) diff --git a/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala index 243209d23bf..f04720399a8 100644 --- a/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/migration/ZkConfigMigrationClientTest.scala @@ -141,15 +141,17 @@ class ZkConfigMigrationClientTest extends ZkMigrationTestHarness { RecordTestUtils.replayAllBatches(delta, batches) val image = delta.apply() - assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("user" -> "").asJava))) - assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("user" -> "user1").asJava))) - assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("user" -> "user1", "client-id" -> "clientA").asJava))) - assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("user" -> "", "client-id" -> "").asJava))) - assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("user" -> "", "client-id" -> "clientA").asJava))) - assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("client-id" -> "").asJava))) - assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("client-id" -> "clientB").asJava))) - assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("ip" -> "1.1.1.1").asJava))) - assertTrue(image.entities().containsKey(new ClientQuotaEntity(Map("ip" -> "").asJava))) + assertEquals(new util.HashSet[ClientQuotaEntity](java.util.Arrays.asList( + new ClientQuotaEntity(Map("user" -> null.asInstanceOf[String]).asJava), + new ClientQuotaEntity(Map("user" -> "user1").asJava), + new ClientQuotaEntity(Map("user" -> "user1", "client-id" -> "clientA").asJava), + new ClientQuotaEntity(Map("user" -> null.asInstanceOf[String], "client-id" -> null.asInstanceOf[String]).asJava), + new ClientQuotaEntity(Map("user" -> null.asInstanceOf[String], "client-id" -> "clientA").asJava), + new ClientQuotaEntity(Map("client-id" -> null.asInstanceOf[String]).asJava), + new ClientQuotaEntity(Map("client-id" -> "clientB").asJava), + new ClientQuotaEntity(Map("ip" -> "1.1.1.1").asJava), + new ClientQuotaEntity(Map("ip" -> null.asInstanceOf[String]).asJava))), + image.entities().keySet()) } @Test @@ -185,7 +187,7 @@ class ZkConfigMigrationClientTest extends ZkMigrationTestHarness { assertEquals(4, migrationState.migrationZkVersion()) migrationState = writeClientQuotaAndVerify(migrationClient, adminZkClient, migrationState, - Map(ClientQuotaEntity.USER -> ""), + Map(ClientQuotaEntity.USER -> null.asInstanceOf[String]), Map(QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 200.0), ConfigType.User, "") assertEquals(5, migrationState.migrationZkVersion())