MINOR: Improve AuthorizerIntegrationTest (#7926)

This patch improves the authorizer integration tests in the following ways:

1. We use a separate principal for inter-broker communications. This ensures that ACLs set in the test cases do not interfere with inter-broker communication. We had two test cases (`testCreateTopicAuthorizationWithClusterCreate` and `testAuthorizationWithTopicExisting`) which depend on topic creation and were timing out because of inter-broker metadata propagation failures. The timeouts were treated as successfully satisfying the expectation of authorization. So the tests passed, but not because of the intended reason.
2. Previously `GroupAuthorizerIntegrationTest` was inheriting _all_ of the tests from `AuthorizerIntegrationTest`. This seemed like overkill since the ACL evaluation logic is essentially the same. 

Totally this should take about 5-10 minutes off the total build time and make the authorizer integration tests a little more resilient to problems with inter-broker communication.

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Ismael Juma <ismael@juma.me.uk>
This commit is contained in:
Jason Gustafson 2020-02-24 12:12:33 -08:00 committed by GitHub
parent 384eb16805
commit 5359b2e3bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 609 additions and 433 deletions

View File

@ -23,7 +23,7 @@ import kafka.utils.{JaasTestUtils, TestUtils}
import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.scram.ScramCredential
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.DelegationToken
@ -39,10 +39,10 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest
override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
override val clientPrincipal = JaasTestUtils.KafkaScramUser
override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaScramUser)
private val clientPassword = JaasTestUtils.KafkaScramPassword
override val kafkaPrincipal = JaasTestUtils.KafkaScramAdmin
override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaScramAdmin)
private val kafkaPassword = JaasTestUtils.KafkaScramAdminPassword
this.serverConfig.setProperty(KafkaConfig.DelegationTokenMasterKeyProp, "testKey")
@ -51,15 +51,15 @@ class DelegationTokenEndToEndAuthorizationTest extends EndToEndAuthorizationTest
super.configureSecurityBeforeServersStart()
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
// Create broker admin credentials before starting brokers
createScramCredentials(zkConnect, kafkaPrincipal, kafkaPassword)
createScramCredentials(zkConnect, kafkaPrincipal.getName, kafkaPassword)
}
override def configureSecurityAfterServersStart(): Unit = {
super.configureSecurityAfterServersStart()
// create scram credential for user "scram-user"
createScramCredentials(zkConnect, clientPrincipal, clientPassword)
waitForScramCredentials(clientPrincipal)
createScramCredentials(zkConnect, clientPrincipal.getName, clientPassword)
waitForScramCredentials(clientPrincipal.getName)
//create a token with "scram-user" credentials
val token = createDelegationToken()

View File

@ -38,6 +38,7 @@ import org.apache.kafka.common.errors.{GroupAuthorizationException, TopicAuthori
import org.apache.kafka.common.resource._
import org.apache.kafka.common.resource.ResourceType._
import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.junit.Assert._
import org.junit.{After, Before, Test}
import org.scalatest.Assertions.{assertThrows, fail, intercept}
@ -78,8 +79,6 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
val wildcard = "*"
val part = 0
val tp = new TopicPartition(topic, part)
val clientPrincipal: String
val kafkaPrincipal: String
override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
protected def authorizerClass: Class[_] = classOf[AclAuthorizer]
@ -91,8 +90,9 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
val prefixedGroupResource = new ResourcePattern(GROUP, groupPrefix, PREFIXED)
val wildcardTopicResource = new ResourcePattern(TOPIC, wildcard, LITERAL)
val wildcardGroupResource = new ResourcePattern(GROUP, wildcard, LITERAL)
def kafkaPrincipalStr = s"$kafkaPrincipalType:$kafkaPrincipal"
def clientPrincipalStr = s"$kafkaPrincipalType:$clientPrincipal"
def clientPrincipal: KafkaPrincipal
def kafkaPrincipal: KafkaPrincipal
// Arguments to AclCommand to set ACLs.
def clusterActionArgs: Array[String] = Array("--authorizer-properties",
@ -100,52 +100,52 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
s"--add",
s"--cluster",
s"--operation=ClusterAction",
s"--allow-principal=$kafkaPrincipalStr")
s"--allow-principal=$kafkaPrincipal")
def topicBrokerReadAclArgs: Array[String] = Array("--authorizer-properties",
s"zookeeper.connect=$zkConnect",
s"--add",
s"--topic=$wildcard",
s"--operation=Read",
s"--allow-principal=$kafkaPrincipalStr")
s"--allow-principal=$kafkaPrincipal")
def produceAclArgs(topic: String): Array[String] = Array("--authorizer-properties",
s"zookeeper.connect=$zkConnect",
s"--add",
s"--topic=$topic",
s"--producer",
s"--allow-principal=$clientPrincipalStr")
s"--allow-principal=$clientPrincipal")
def describeAclArgs: Array[String] = Array("--authorizer-properties",
s"zookeeper.connect=$zkConnect",
s"--add",
s"--topic=$topic",
s"--operation=Describe",
s"--allow-principal=$clientPrincipalStr")
s"--allow-principal=$clientPrincipal")
def deleteDescribeAclArgs: Array[String] = Array("--authorizer-properties",
s"zookeeper.connect=$zkConnect",
s"--remove",
s"--force",
s"--topic=$topic",
s"--operation=Describe",
s"--allow-principal=$clientPrincipalStr")
s"--allow-principal=$clientPrincipal")
def deleteWriteAclArgs: Array[String] = Array("--authorizer-properties",
s"zookeeper.connect=$zkConnect",
s"--remove",
s"--force",
s"--topic=$topic",
s"--operation=Write",
s"--allow-principal=$clientPrincipalStr")
s"--allow-principal=$clientPrincipal")
def consumeAclArgs(topic: String): Array[String] = Array("--authorizer-properties",
s"zookeeper.connect=$zkConnect",
s"--add",
s"--topic=$topic",
s"--group=$group",
s"--consumer",
s"--allow-principal=$clientPrincipalStr")
s"--allow-principal=$clientPrincipal")
def groupAclArgs: Array[String] = Array("--authorizer-properties",
s"zookeeper.connect=$zkConnect",
s"--add",
s"--group=$group",
s"--operation=Read",
s"--allow-principal=$clientPrincipalStr")
s"--allow-principal=$clientPrincipal")
def produceConsumeWildcardAclArgs: Array[String] = Array("--authorizer-properties",
s"zookeeper.connect=$zkConnect",
s"--add",
@ -153,7 +153,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
s"--group=$wildcard",
s"--consumer",
s"--producer",
s"--allow-principal=$clientPrincipalStr")
s"--allow-principal=$clientPrincipal")
def produceConsumePrefixedAclsArgs: Array[String] = Array("--authorizer-properties",
s"zookeeper.connect=$zkConnect",
s"--add",
@ -162,15 +162,15 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
s"--resource-pattern-type=prefixed",
s"--consumer",
s"--producer",
s"--allow-principal=$clientPrincipalStr")
s"--allow-principal=$clientPrincipal")
def ClusterActionAcl = Set(new AccessControlEntry(kafkaPrincipalStr, WildcardHost, CLUSTER_ACTION, ALLOW))
def TopicBrokerReadAcl = Set(new AccessControlEntry(kafkaPrincipalStr, WildcardHost, READ, ALLOW))
def GroupReadAcl = Set(new AccessControlEntry(clientPrincipalStr, WildcardHost, READ, ALLOW))
def TopicReadAcl = Set(new AccessControlEntry(clientPrincipalStr, WildcardHost, READ, ALLOW))
def TopicWriteAcl = Set(new AccessControlEntry(clientPrincipalStr, WildcardHost, WRITE, ALLOW))
def TopicDescribeAcl = Set(new AccessControlEntry(clientPrincipalStr, WildcardHost, DESCRIBE, ALLOW))
def TopicCreateAcl = Set(new AccessControlEntry(clientPrincipalStr, WildcardHost, CREATE, ALLOW))
def ClusterActionAcl = Set(new AccessControlEntry(kafkaPrincipal.toString, WildcardHost, CLUSTER_ACTION, ALLOW))
def TopicBrokerReadAcl = Set(new AccessControlEntry(kafkaPrincipal.toString, WildcardHost, READ, ALLOW))
def GroupReadAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, READ, ALLOW))
def TopicReadAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, READ, ALLOW))
def TopicWriteAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, WRITE, ALLOW))
def TopicDescribeAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, DESCRIBE, ALLOW))
def TopicCreateAcl = Set(new AccessControlEntry(clientPrincipal.toString, WildcardHost, CREATE, ALLOW))
// The next two configuration parameters enable ZooKeeper secure ACLs
// and sets the Kafka authorizer, both necessary to enable security.
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")

View File

@ -13,29 +13,127 @@
package kafka.api
import java.util.Properties
import java.util.concurrent.ExecutionException
import kafka.api.GroupAuthorizerIntegrationTest._
import kafka.security.auth.SimpleAclAuthorizer
import kafka.security.authorizer.AclEntry.WildcardHost
import kafka.server.{BaseRequestTest, KafkaConfig}
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.acl.{AccessControlEntry, AclOperation, AclPermissionType}
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.errors.TopicAuthorizationException
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder}
import org.junit.Assert._
import org.junit.{Before, Test}
import org.scalatest.Assertions.intercept
import scala.collection.JavaConverters._
object GroupAuthorizerIntegrationTest {
val GroupPrincipalType = "Group"
val TestGroupPrincipal = new KafkaPrincipal(GroupPrincipalType, "testGroup")
val BrokerPrincipal = new KafkaPrincipal("Group", "broker")
val ClientPrincipal = new KafkaPrincipal("Group", "client")
val BrokerListenerName = "BROKER"
val ClientListenerName = "CLIENT"
class GroupPrincipalBuilder extends KafkaPrincipalBuilder {
override def build(context: AuthenticationContext): KafkaPrincipal = {
TestGroupPrincipal
context.listenerName match {
case BrokerListenerName => BrokerPrincipal
case ClientListenerName => ClientPrincipal
case listenerName => throw new IllegalArgumentException(s"No principal mapped to listener $listenerName")
}
}
}
}
class GroupAuthorizerIntegrationTest extends AuthorizerIntegrationTest {
override val kafkaPrincipalType = GroupPrincipalType
override def userPrincipal = TestGroupPrincipal
class GroupAuthorizerIntegrationTest extends BaseRequestTest {
val brokerId: Integer = 0
override def brokerCount: Int = 1
override def interBrokerListenerName: ListenerName = new ListenerName(BrokerListenerName)
override def listenerName: ListenerName = new ListenerName(ClientListenerName)
def brokerPrincipal: KafkaPrincipal = BrokerPrincipal
def clientPrincipal: KafkaPrincipal = ClientPrincipal
override def brokerPropertyOverrides(properties: Properties): Unit = {
properties.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
classOf[GroupPrincipalBuilder].getName)
super.brokerPropertyOverrides(properties)
properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName)
properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1")
properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1")
properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[GroupPrincipalBuilder].getName)
}
@Before
override def setUp(): Unit = {
doSetup(createOffsetsTopic = false)
// Allow inter-broker communication
TestUtils.addAndVerifyAcls(servers.head,
Set(createAcl(AclOperation.CLUSTER_ACTION, AclPermissionType.ALLOW, principal = BrokerPrincipal)),
new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL))
TestUtils.createOffsetsTopic(zkClient, servers)
}
private def createAcl(aclOperation: AclOperation,
aclPermissionType: AclPermissionType,
principal: KafkaPrincipal = ClientPrincipal): AccessControlEntry = {
new AccessControlEntry(principal.toString, WildcardHost, aclOperation, aclPermissionType)
}
@Test
def testUnauthorizedProduceAndConsume(): Unit = {
val topic = "topic"
val topicPartition = new TopicPartition("topic", 0)
createTopic(topic)
val producer = createProducer()
val produceException = intercept[ExecutionException] {
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get()
}.getCause
assertTrue(produceException.isInstanceOf[TopicAuthorizationException])
assertEquals(Set(topic), produceException.asInstanceOf[TopicAuthorizationException].unauthorizedTopics.asScala)
val consumer = createConsumer(configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
consumer.assign(List(topicPartition).asJava)
val consumeException = intercept[TopicAuthorizationException] {
TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)
}
assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)
}
@Test
def testAuthorizedProduceAndConsume(): Unit = {
val topic = "topic"
val topicPartition = new TopicPartition("topic", 0)
createTopic(topic)
TestUtils.addAndVerifyAcls(servers.head,
Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL))
val producer = createProducer()
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get()
TestUtils.addAndVerifyAcls(servers.head,
Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL))
val consumer = createConsumer(configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
consumer.assign(List(topicPartition).asJava)
TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)
}
}

View File

@ -40,7 +40,7 @@ object GroupEndToEndAuthorizationTest {
}
class GroupEndToEndAuthorizationTest extends SaslScramSslEndToEndAuthorizationTest {
override val kafkaPrincipalType = GroupPrincipalType
override val clientPrincipal = ClientGroup
override val clientPrincipal = new KafkaPrincipal(GroupPrincipalType, ClientGroup)
override val kafkaPrincipal = new KafkaPrincipal(GroupPrincipalType, JaasTestUtils.KafkaScramAdmin)
this.serverConfig.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[GroupPrincipalBuilder].getName)
}

View File

@ -67,8 +67,8 @@ class PlaintextEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
classOf[TestClientPrincipalBuilder].getName)
this.serverConfig.setProperty("listener.name.server." + BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
classOf[TestServerPrincipalBuilder].getName)
override val clientPrincipal = "client"
override val kafkaPrincipal = "server"
override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")
override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "server")
@Before
override def setUp(): Unit = {

View File

@ -21,13 +21,12 @@ import java.util.Properties
import java.util.concurrent.{ExecutionException, Future, TimeUnit}
import kafka.log.LogConfig
import kafka.server.{Defaults, KafkaConfig}
import kafka.server.Defaults
import kafka.utils.TestUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.errors.{InvalidTimestampException, RecordTooLargeException, SerializationException, TimeoutException}
import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch, Records, TimestampType}
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.utils.ByteUtils
import org.junit.Assert._
import org.junit.Test
import org.scalatest.Assertions.intercept

View File

@ -19,14 +19,17 @@ package kafka.api
import kafka.security.auth.SimpleAclAuthorizer
import kafka.server.KafkaConfig
import kafka.utils.JaasTestUtils
import org.apache.kafka.common.security.auth.KafkaPrincipal
import scala.collection.immutable.List
// Note: this test currently uses the deprecated SimpleAclAuthorizer to ensure we have test coverage
// It must be replaced with the new AclAuthorizer when SimpleAclAuthorizer is removed
class SaslGssapiSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest {
override val clientPrincipal = JaasTestUtils.KafkaClientPrincipalUnqualifiedName
override val kafkaPrincipal = JaasTestUtils.KafkaServerPrincipalUnqualifiedName
override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
JaasTestUtils.KafkaClientPrincipalUnqualifiedName)
override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
JaasTestUtils.KafkaServerPrincipalUnqualifiedName)
override protected def kafkaClientSaslMechanism = "GSSAPI"
override protected def kafkaServerSaslMechanisms = List("GSSAPI")

View File

@ -17,10 +17,11 @@
package kafka.api
import kafka.utils.JaasTestUtils
import org.apache.kafka.common.security.auth.KafkaPrincipal
class SaslOAuthBearerSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest {
override protected def kafkaClientSaslMechanism = "OAUTHBEARER"
override protected def kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
override val clientPrincipal = JaasTestUtils.KafkaOAuthBearerUser
override val kafkaPrincipal = JaasTestUtils.KafkaOAuthBearerAdmin
override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaOAuthBearerUser)
override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaOAuthBearerAdmin)
}

View File

@ -118,15 +118,15 @@ class SaslPlainSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes
override protected def kafkaClientSaslMechanism = "PLAIN"
override protected def kafkaServerSaslMechanisms = List("PLAIN")
override val clientPrincipal = "user"
override val kafkaPrincipal = "admin"
override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "admin")
override def jaasSections(kafkaServerSaslMechanisms: Seq[String],
kafkaClientSaslMechanism: Option[String],
mode: SaslSetupMode,
kafkaServerEntryName: String): Seq[JaasSection] = {
val brokerLogin = new PlainLoginModule(KafkaPlainAdmin, "") // Password provided by callback handler
val clientLogin = new PlainLoginModule(KafkaPlainUser2, KafkaPlainPassword2)
val brokerLogin = PlainLoginModule(KafkaPlainAdmin, "") // Password provided by callback handler
val clientLogin = PlainLoginModule(KafkaPlainUser2, KafkaPlainPassword2)
Seq(JaasSection(kafkaServerEntryName, Seq(brokerLogin)),
JaasSection(KafkaClientContextName, Seq(clientLogin))) ++ zkSections
}

View File

@ -18,6 +18,7 @@ package kafka.api
import kafka.utils.JaasTestUtils
import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import scala.collection.JavaConverters._
@ -26,15 +27,15 @@ import org.junit.Before
class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest {
override protected def kafkaClientSaslMechanism = "SCRAM-SHA-256"
override protected def kafkaServerSaslMechanisms = ScramMechanism.mechanismNames.asScala.toList
override val clientPrincipal = JaasTestUtils.KafkaScramUser
override val kafkaPrincipal = JaasTestUtils.KafkaScramAdmin
override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaScramUser)
override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, JaasTestUtils.KafkaScramAdmin)
private val kafkaPassword = JaasTestUtils.KafkaScramAdminPassword
override def configureSecurityBeforeServersStart(): Unit = {
super.configureSecurityBeforeServersStart()
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
// Create broker credentials before starting brokers
createScramCredentials(zkConnect, kafkaPrincipal, kafkaPassword)
createScramCredentials(zkConnect, kafkaPrincipal.getName, kafkaPassword)
}
@Before

View File

@ -69,8 +69,8 @@ class SslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
// Leading and trailing spaces in Kafka principal dont work with ACLs, but we can workaround by using
// a PrincipalBuilder that removes/replaces them.
private val clientCn = """\#A client with special chars in CN : (\, \+ \" \\ \< \> \; ')"""
override val clientPrincipal = s"O=A client,CN=$clientCn"
override val kafkaPrincipal = "server"
override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, s"O=A client,CN=$clientCn")
override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "server")
@Before
override def setUp(): Unit = {
startSasl(jaasSections(List.empty, None, ZkSasl))

View File

@ -23,7 +23,7 @@ import java.util.Arrays
import kafka.server._
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.{After, Before}
import scala.collection.Seq
@ -42,7 +42,6 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness {
var servers: Buffer[KafkaServer] = new ArrayBuffer
var brokerList: String = null
var alive: Array[Boolean] = null
val kafkaPrincipalType = KafkaPrincipal.USER_TYPE
/**
* Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every

View File

@ -24,14 +24,15 @@ import java.nio.charset.{Charset, StandardCharsets}
import java.nio.file.{Files, StandardOpenOption}
import java.security.cert.X509Certificate
import java.time.Duration
import java.util.Arrays
import java.util.Collections
import java.util.Properties
import java.util.{Arrays, Collections, Properties}
import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
import com.yammer.metrics.Metrics
import com.yammer.metrics.core.Meter
import javax.net.ssl.X509TrustManager
import kafka.api._
import kafka.cluster.{Broker, EndPoint}
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.log._
import kafka.security.auth.{Acl, Authorizer => LegacyAuthorizer, Resource}
import kafka.server._
@ -40,14 +41,14 @@ import Implicits._
import com.yammer.metrics.core.Meter
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.metrics.KafkaYammerMetrics
import kafka.utils.Implicits._
import kafka.zk._
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBindingFilter}
import org.apache.kafka.common.{KafkaFuture, TopicPartition}
import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
import org.apache.kafka.common.header.Header
@ -59,7 +60,8 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, IntegerSerializer, Serializer}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.Utils._
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.common.{KafkaFuture, TopicPartition}
import org.apache.kafka.server.authorizer.{Authorizer => JAuthorizer}
import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
import org.apache.zookeeper.KeeperException.SessionExpiredException
import org.apache.zookeeper.ZooDefs._
@ -68,8 +70,9 @@ import org.junit.Assert._
import org.scalatest.Assertions.fail
import scala.collection.JavaConverters._
import scala.collection.{Map, Seq, mutable}
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.collection.{Map, Seq, mutable}
import scala.compat.java8.OptionConverters._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Await, ExecutionContext, Future}
@ -1194,22 +1197,25 @@ object TestUtils extends Logging {
trustManager
}
def waitAndVerifyAcls(expected: Set[AccessControlEntry], authorizer: Authorizer, resource: ResourcePattern) = {
def waitAndVerifyAcls(expected: Set[AccessControlEntry],
authorizer: JAuthorizer,
resource: ResourcePattern,
accessControlEntryFilter: AccessControlEntryFilter = AccessControlEntryFilter.ANY): Unit = {
val newLine = scala.util.Properties.lineSeparator
val filter = new AclBindingFilter(resource.toFilter, AccessControlEntryFilter.ANY)
val filter = new AclBindingFilter(resource.toFilter, accessControlEntryFilter)
waitUntilTrue(() => authorizer.acls(filter).asScala.map(_.entry).toSet == expected,
s"expected acls:${expected.mkString(newLine + "\t", newLine + "\t", newLine)}" +
s"but got:${authorizer.acls(filter).asScala.map(_.entry).mkString(newLine + "\t", newLine + "\t", newLine)}", waitTimeMs = JTestUtils.DEFAULT_MAX_WAIT_MS)
s"but got:${authorizer.acls(filter).asScala.map(_.entry).mkString(newLine + "\t", newLine + "\t", newLine)}")
}
@deprecated("Use org.apache.kafka.server.authorizer.Authorizer", "Since 2.5")
def waitAndVerifyAcls(expected: Set[Acl], authorizer: LegacyAuthorizer, resource: Resource) = {
def waitAndVerifyAcls(expected: Set[Acl], authorizer: LegacyAuthorizer, resource: Resource): Unit = {
val newLine = scala.util.Properties.lineSeparator
waitUntilTrue(() => authorizer.getAcls(resource) == expected,
s"expected acls:${expected.mkString(newLine + "\t", newLine + "\t", newLine)}" +
s"but got:${authorizer.getAcls(resource).mkString(newLine + "\t", newLine + "\t", newLine)}", waitTimeMs = JTestUtils.DEFAULT_MAX_WAIT_MS)
s"but got:${authorizer.getAcls(resource).mkString(newLine + "\t", newLine + "\t", newLine)}")
}
/**
@ -1735,4 +1741,19 @@ object TestUtils extends Logging {
waitUntilTrue(() => adminClient.listPartitionReassignments().reassignments().get().isEmpty,
s"There still are ongoing reassignments", pause = pause)
}
def addAndVerifyAcls(server: KafkaServer, acls: Set[AccessControlEntry], resource: ResourcePattern): Unit = {
val authorizer = server.dataPlaneRequestProcessor.authorizer.get
val aclBindings = acls.map { acl => new AclBinding(resource, acl) }
authorizer.createAcls(null, aclBindings.toList.asJava).asScala
.map(_.toCompletableFuture.get)
.foreach { result =>
result.exception.asScala.foreach { e => throw e }
}
val aclFilter = new AclBindingFilter(resource.toFilter, AccessControlEntryFilter.ANY)
waitAndVerifyAcls(
authorizer.acls(aclFilter).asScala.map(_.entry).toSet ++ acls,
authorizer, resource)
}
}