mirror of https://github.com/apache/kafka.git
Improve tests
This commit is contained in:
parent
cbe5ada027
commit
bf1900e997
|
@ -40,7 +40,7 @@ import org.apache.kafka.common.resource.ResourcePattern
|
||||||
import org.apache.kafka.common.resource.ResourceType.TOPIC
|
import org.apache.kafka.common.resource.ResourceType.TOPIC
|
||||||
import org.apache.kafka.common.security.auth.KafkaPrincipal
|
import org.apache.kafka.common.security.auth.KafkaPrincipal
|
||||||
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
|
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils
|
||||||
import org.apache.kafka.common.utils.SecurityUtils
|
import org.apache.kafka.common.utils.{Sanitizer, SecurityUtils}
|
||||||
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
|
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
|
||||||
import org.apache.kafka.metadata.authorizer.StandardAcl
|
import org.apache.kafka.metadata.authorizer.StandardAcl
|
||||||
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
|
import org.apache.kafka.metadata.migration.ZkMigrationLeadershipState
|
||||||
|
@ -881,11 +881,14 @@ class ZkMigrationIntegrationTest {
|
||||||
def alterClientQuotas(admin: Admin): AlterClientQuotasResult = {
|
def alterClientQuotas(admin: Admin): AlterClientQuotasResult = {
|
||||||
val quotas = new util.ArrayList[ClientQuotaAlteration]()
|
val quotas = new util.ArrayList[ClientQuotaAlteration]()
|
||||||
quotas.add(new ClientQuotaAlteration(
|
quotas.add(new ClientQuotaAlteration(
|
||||||
new ClientQuotaEntity(Map("user" -> "user1").asJava),
|
new ClientQuotaEntity(Map("user" -> "user@1").asJava),
|
||||||
List(new ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0)).asJava))
|
List(new ClientQuotaAlteration.Op("consumer_byte_rate", 1000.0)).asJava))
|
||||||
quotas.add(new ClientQuotaAlteration(
|
quotas.add(new ClientQuotaAlteration(
|
||||||
new ClientQuotaEntity(Map("user" -> "user1", "client-id" -> "clientA").asJava),
|
new ClientQuotaEntity(Map("user" -> "user@1", "client-id" -> "clientA").asJava),
|
||||||
List(new ClientQuotaAlteration.Op("consumer_byte_rate", 800.0), new ClientQuotaAlteration.Op("producer_byte_rate", 100.0)).asJava))
|
List(new ClientQuotaAlteration.Op("consumer_byte_rate", 800.0), new ClientQuotaAlteration.Op("producer_byte_rate", 100.0)).asJava))
|
||||||
|
quotas.add(new ClientQuotaAlteration(
|
||||||
|
new ClientQuotaEntity(Collections.singletonMap("user", null)),
|
||||||
|
List(new ClientQuotaAlteration.Op("consumer_byte_rate", 900.0), new ClientQuotaAlteration.Op("producer_byte_rate", 100.0)).asJava))
|
||||||
quotas.add(new ClientQuotaAlteration(
|
quotas.add(new ClientQuotaAlteration(
|
||||||
new ClientQuotaEntity(Map("ip" -> "8.8.8.8").asJava),
|
new ClientQuotaEntity(Map("ip" -> "8.8.8.8").asJava),
|
||||||
List(new ClientQuotaAlteration.Op("connection_creation_rate", 10.0)).asJava))
|
List(new ClientQuotaAlteration.Op("connection_creation_rate", 10.0)).asJava))
|
||||||
|
@ -903,7 +906,7 @@ class ZkMigrationIntegrationTest {
|
||||||
val alterations = new util.ArrayList[UserScramCredentialAlteration]()
|
val alterations = new util.ArrayList[UserScramCredentialAlteration]()
|
||||||
alterations.add(new UserScramCredentialUpsertion("user1",
|
alterations.add(new UserScramCredentialUpsertion("user1",
|
||||||
new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 8191), "password1"))
|
new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 8191), "password1"))
|
||||||
alterations.add(new UserScramCredentialUpsertion("user2",
|
alterations.add(new UserScramCredentialUpsertion("user@2",
|
||||||
new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 8192), "password2"))
|
new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 8192), "password2"))
|
||||||
admin.alterUserScramCredentials(alterations)
|
admin.alterUserScramCredentials(alterations)
|
||||||
}
|
}
|
||||||
|
@ -918,20 +921,21 @@ class ZkMigrationIntegrationTest {
|
||||||
|
|
||||||
def verifyClientQuotas(zkClient: KafkaZkClient): Unit = {
|
def verifyClientQuotas(zkClient: KafkaZkClient): Unit = {
|
||||||
TestUtils.retry(10000) {
|
TestUtils.retry(10000) {
|
||||||
assertEquals("1000", zkClient.getEntityConfigs(ConfigType.USER, "user1").getProperty("consumer_byte_rate"))
|
assertEquals("1000", zkClient.getEntityConfigs(ConfigType.USER, Sanitizer.sanitize("user@1")).getProperty("consumer_byte_rate"))
|
||||||
assertEquals("800", zkClient.getEntityConfigs("users/user1/clients", "clientA").getProperty("consumer_byte_rate"))
|
assertEquals("900", zkClient.getEntityConfigs(ConfigType.USER, "<default>").getProperty("consumer_byte_rate"))
|
||||||
assertEquals("100", zkClient.getEntityConfigs("users/user1/clients", "clientA").getProperty("producer_byte_rate"))
|
assertEquals("800", zkClient.getEntityConfigs("users/" + Sanitizer.sanitize("user@1") + "/clients", "clientA").getProperty("consumer_byte_rate"))
|
||||||
|
assertEquals("100", zkClient.getEntityConfigs("users/" + Sanitizer.sanitize("user@1") + "/clients", "clientA").getProperty("producer_byte_rate"))
|
||||||
assertEquals("10", zkClient.getEntityConfigs(ConfigType.IP, "8.8.8.8").getProperty("connection_creation_rate"))
|
assertEquals("10", zkClient.getEntityConfigs(ConfigType.IP, "8.8.8.8").getProperty("connection_creation_rate"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def verifyUserScramCredentials(zkClient: KafkaZkClient): Unit = {
|
def verifyUserScramCredentials(zkClient: KafkaZkClient): Unit = {
|
||||||
TestUtils.retry(10000) {
|
TestUtils.retry(10000) {
|
||||||
val propertyValue1 = zkClient.getEntityConfigs(ConfigType.USER, "user1").getProperty("SCRAM-SHA-256")
|
val propertyValue1 = zkClient.getEntityConfigs(ConfigType.USER, Sanitizer.sanitize("user1")).getProperty("SCRAM-SHA-256")
|
||||||
val scramCredentials1 = ScramCredentialUtils.credentialFromString(propertyValue1)
|
val scramCredentials1 = ScramCredentialUtils.credentialFromString(propertyValue1)
|
||||||
assertEquals(8191, scramCredentials1.iterations)
|
assertEquals(8191, scramCredentials1.iterations)
|
||||||
|
|
||||||
val propertyValue2 = zkClient.getEntityConfigs(ConfigType.USER, "user2").getProperty("SCRAM-SHA-256")
|
val propertyValue2 = zkClient.getEntityConfigs(ConfigType.USER, Sanitizer.sanitize("user@2")).getProperty("SCRAM-SHA-256")
|
||||||
assertNotNull(propertyValue2)
|
assertNotNull(propertyValue2)
|
||||||
val scramCredentials2 = ScramCredentialUtils.credentialFromString(propertyValue2)
|
val scramCredentials2 = ScramCredentialUtils.credentialFromString(propertyValue2)
|
||||||
assertEquals(8192, scramCredentials2.iterations)
|
assertEquals(8192, scramCredentials2.iterations)
|
||||||
|
|
Loading…
Reference in New Issue