KAFKA-12590: Remove deprecated kafka.security.auth.Authorizer, SimpleAclAuthorizer and related classes in 3.0 (#10450)

These were deprecated in Apache Kafka 2.4 (released in December 2019) to be replaced
by `org.apache.kafka.server.authorizer.Authorizer` and `AclAuthorizer`.

As part of KIP-500, we will implement a new `Authorizer` implementation that relies
on a topic (potentially a KRaft topic) instead of `ZooKeeper`, so we should take the chance
to remove related tech debt in 3.0.

Details on the issues affecting the old Authorizer interface can be found in the KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-504+-+Add+new+Java+Authorizer+Interface

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Ron Dagostino <rdagostino@confluent.io>
This commit is contained in:
Ismael Juma 2021-04-03 08:23:26 -07:00 committed by GitHub
parent 66b0c5c64f
commit 976e78e405
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 68 additions and 2173 deletions

View File

@ -1,86 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.security.auth
import kafka.security.authorizer.AclEntry
import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.security.auth.KafkaPrincipal
@deprecated("Use org.apache.kafka.common.acl.AclBinding", "Since 2.5")
object Acl {
val WildCardPrincipal: KafkaPrincipal = AclEntry.WildcardPrincipal
val WildCardHost: String = AclEntry.WildcardHost
val WildCardResource: String = ResourcePattern.WILDCARD_RESOURCE
val AllowAllAcl = new Acl(WildCardPrincipal, Allow, WildCardHost, All)
val PrincipalKey = AclEntry.PrincipalKey
val PermissionTypeKey = AclEntry.PermissionTypeKey
val OperationKey = AclEntry.OperationKey
val HostsKey = AclEntry.HostsKey
val VersionKey = AclEntry.VersionKey
val CurrentVersion = AclEntry.CurrentVersion
val AclsKey = AclEntry.AclsKey
/**
*
* @see AclEntry
*/
def fromBytes(bytes: Array[Byte]): Set[Acl] = {
AclEntry.fromBytes(bytes)
.map(ace => Acl(ace.kafkaPrincipal,
PermissionType.fromJava(ace.permissionType()),
ace.host(),
Operation.fromJava(ace.operation())))
}
def toJsonCompatibleMap(acls: Set[Acl]): Map[String, Any] = {
AclEntry.toJsonCompatibleMap(acls.map(acl =>
AclEntry(acl.principal, acl.permissionType.toJava, acl.host, acl.operation.toJava)
))
}
}
/**
* An instance of this class will represent an acl that can express following statement.
* <pre>
* Principal P has permissionType PT on Operation O1 from hosts H1.
* </pre>
* @param principal A value of *:* indicates all users.
* @param permissionType
* @param host A value of * indicates all hosts.
* @param operation A value of ALL indicates all operations.
*/
@deprecated("Use org.apache.kafka.common.acl.AclBinding", "Since 2.5")
case class Acl(principal: KafkaPrincipal, permissionType: PermissionType, host: String, operation: Operation) {
/**
* TODO: Ideally we would have a symmetric toJson method but our current json library can not jsonify/dejsonify complex objects.
* @return Map representation of the Acl.
*/
def toMap(): Map[String, Any] = {
Map(Acl.PrincipalKey -> principal.toString,
Acl.PermissionTypeKey -> permissionType.name,
Acl.OperationKey -> operation.name,
Acl.HostsKey -> host)
}
override def toString: String = {
"%s has %s permission for operations: %s from hosts: %s".format(principal, permissionType.name, operation, host)
}
}

View File

@ -1,149 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.security.auth
import kafka.network.RequestChannel.Session
import org.apache.kafka.common.Configurable
import org.apache.kafka.common.security.auth.KafkaPrincipal
/**
* Top level interface that all pluggable authorizers must implement. Kafka will read the `authorizer.class.name` config
* value at startup time, create an instance of the specified class using the default constructor, and call its
* `configure` method.
*
* From that point onwards, every client request will first be routed to the `authorize` method and the request will only
* be authorized if the method returns true.
*
* If `authorizer.class.name` has no value specified, then no authorization will be performed, and all operations are
* permitted.
*/
@deprecated("Use org.apache.kafka.server.authorizer.Authorizer", "Since 2.4")
trait Authorizer extends Configurable {
/**
* @param session The session being authenticated.
* @param operation Type of operation client is trying to perform on resource.
* @param resource Resource the client is trying to access. Resource pattern type is always literal in input resource.
* @return true if the operation should be permitted, false otherwise
*/
def authorize(session: Session, operation: Operation, resource: Resource): Boolean
/**
* add the acls to resource, this is an additive operation so existing acls will not be overwritten, instead these new
* acls will be added to existing acls.
*
* {code}
* // The following will add ACLs to the literal resource path 'foo', which will only affect the topic named 'foo':
* authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "foo", LITERAL))
*
* // The following will add ACLs to the special literal topic resource path '*', which affects all topics:
* authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "*", LITERAL))
*
* // The following will add ACLs to the prefixed resource path 'foo', which affects all topics whose name begins with 'foo':
* authorizer.addAcls(Set(acl1, acl2), Resource(Topic, "foo", PREFIXED))
* {code}
*
* @param acls set of acls to add to existing acls
* @param resource the resource path to which these acls should be attached.
* the supplied resource will have a specific resource pattern type,
* i.e. the resource pattern type will not be ``PatternType.ANY`` or ``PatternType.UNKNOWN``.
*/
def addAcls(acls: Set[Acl], resource: Resource): Unit
/**
* remove these acls from the resource.
*
* {code}
* // The following will remove ACLs from the literal resource path 'foo', which will only affect the topic named 'foo':
* authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "foo", LITERAL))
*
* // The following will remove ACLs from the special literal topic resource path '*', which affects all topics:
* authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "*", LITERAL))
*
* // The following will remove ACLs from the prefixed resource path 'foo', which affects all topics whose name begins with 'foo':
* authorizer.removeAcls(Set(acl1, acl2), Resource(Topic, "foo", PREFIXED))
* {code}
*
* @param acls set of acls to be removed.
* @param resource resource path from which the acls should be removed.
* the supplied resource will have a specific resource pattern type,
* i.e. the resource pattern type will not be ``PatternType.ANY`` or ``PatternType.UNKNOWN``.
* @return true if some acl got removed, false if no acl was removed.
*/
def removeAcls(acls: Set[Acl], resource: Resource): Boolean
/**
* remove a resource along with all of its acls from acl store.
*
* {code}
* // The following will remove all ACLs from the literal resource path 'foo', which will only affect the topic named 'foo':
* authorizer.removeAcls(Resource(Topic, "foo", LITERAL))
*
* // The following will remove all ACLs from the special literal topic resource path '*', which affects all topics:
* authorizer.removeAcls(Resource(Topic, "*", LITERAL))
*
* // The following will remove all ACLs from the prefixed resource path 'foo', which affects all topics whose name begins with 'foo':
* authorizer.removeAcls(Resource(Topic, "foo", PREFIXED))
* {code}
*
* @param resource the resource path from which these acls should be removed.
* the supplied resource will have a specific resource pattern type,
* i.e. the resource pattern type will not be ``PatternType.ANY`` or ``PatternType.UNKNOWN``.
* @return
*/
def removeAcls(resource: Resource): Boolean
/**
* get set of acls for the supplied resource
*
* {code}
* // The following will get all ACLs from the literal resource path 'foo', which will only affect the topic named 'foo':
* authorizer.removeAcls(Resource(Topic, "foo", LITERAL))
*
* // The following will get all ACLs from the special literal topic resource path '*', which affects all topics:
* authorizer.removeAcls(Resource(Topic, "*", LITERAL))
*
* // The following will get all ACLs from the prefixed resource path 'foo', which affects all topics whose name begins with 'foo':
* authorizer.removeAcls(Resource(Topic, "foo", PREFIXED))
* {code}
*
* @param resource the resource path to which the acls belong.
* the supplied resource will have a specific resource pattern type,
* i.e. the resource pattern type will not be ``PatternType.ANY`` or ``PatternType.UNKNOWN``.
* @return empty set if no acls are found, otherwise the acls for the resource.
*/
def getAcls(resource: Resource): Set[Acl]
/**
* get the acls for this principal.
* @param principal principal name.
* @return empty Map if no acls exist for this principal, otherwise a map of resource -> acls for the principal.
*/
def getAcls(principal: KafkaPrincipal): Map[Resource, Set[Acl]]
/**
* gets the map of resource paths to acls for all resources.
*/
def getAcls(): Map[Resource, Set[Acl]]
/**
* Closes this instance.
*/
def close(): Unit
}

View File

@ -1,113 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.security.auth
import kafka.common.{BaseEnum, KafkaException}
import org.apache.kafka.common.acl.AclOperation
/**
* Different operations a client may perform on kafka resources.
*/
@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
sealed trait Operation extends BaseEnum {
def toJava : AclOperation
}
@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
case object Read extends Operation {
val name = "Read"
val toJava = AclOperation.READ
}
@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
case object Write extends Operation {
val name = "Write"
val toJava = AclOperation.WRITE
}
@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
case object Create extends Operation {
val name = "Create"
val toJava = AclOperation.CREATE
}
@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
case object Delete extends Operation {
val name = "Delete"
val toJava = AclOperation.DELETE
}
@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
case object Alter extends Operation {
val name = "Alter"
val toJava = AclOperation.ALTER
}
@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
case object Describe extends Operation {
val name = "Describe"
val toJava = AclOperation.DESCRIBE
}
@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
case object ClusterAction extends Operation {
val name = "ClusterAction"
val toJava = AclOperation.CLUSTER_ACTION
}
@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
case object DescribeConfigs extends Operation {
val name = "DescribeConfigs"
val toJava = AclOperation.DESCRIBE_CONFIGS
}
@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
case object AlterConfigs extends Operation {
val name = "AlterConfigs"
val toJava = AclOperation.ALTER_CONFIGS
}
@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
case object IdempotentWrite extends Operation {
val name = "IdempotentWrite"
val toJava = AclOperation.IDEMPOTENT_WRITE
}
@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
case object All extends Operation {
val name = "All"
val toJava = AclOperation.ALL
}
@deprecated("Use org.apache.kafka.common.acl.AclOperation", "Since 2.5")
object Operation {
def fromString(operation: String): Operation = {
val op = values.find(op => op.name.equalsIgnoreCase(operation))
op.getOrElse(throw new KafkaException(operation + " not a valid operation name. The valid names are " + values.mkString(",")))
}
def fromJava(operation: AclOperation): Operation = {
operation match {
case AclOperation.READ => Read
case AclOperation.WRITE => Write
case AclOperation.CREATE => Create
case AclOperation.DELETE => Delete
case AclOperation.ALTER => Alter
case AclOperation.DESCRIBE => Describe
case AclOperation.CLUSTER_ACTION => ClusterAction
case AclOperation.ALTER_CONFIGS => AlterConfigs
case AclOperation.DESCRIBE_CONFIGS => DescribeConfigs
case AclOperation.IDEMPOTENT_WRITE => IdempotentWrite
case AclOperation.ALL => All
case _ => throw new KafkaException(operation + " is not a convertible operation name. The valid names are " + values.mkString(","))
}
}
def values: Seq[Operation] = List(Read, Write, Create, Delete, Alter, Describe, ClusterAction, AlterConfigs,
DescribeConfigs, IdempotentWrite, All)
}

View File

@ -1,50 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.security.auth
import kafka.common.{BaseEnum, KafkaException}
import org.apache.kafka.common.acl.AclPermissionType
@deprecated("Use org.apache.kafka.common.acl.AclPermissionType", "Since 2.5")
sealed trait PermissionType extends BaseEnum {
val toJava: AclPermissionType
}
@deprecated("Use org.apache.kafka.common.acl.AclPermissionType", "Since 2.5")
case object Allow extends PermissionType {
val name = "Allow"
val toJava = AclPermissionType.ALLOW
}
@deprecated("Use org.apache.kafka.common.acl.AclPermissionType", "Since 2.5")
case object Deny extends PermissionType {
val name = "Deny"
val toJava = AclPermissionType.DENY
}
@deprecated("Use org.apache.kafka.common.acl.AclPermissionType", "Since 2.5")
object PermissionType {
def fromString(permissionType: String): PermissionType = {
val pType = values.find(pType => pType.name.equalsIgnoreCase(permissionType))
pType.getOrElse(throw new KafkaException(permissionType + " not a valid permissionType name. The valid names are " + values.mkString(",")))
}
def fromJava(permissionType: AclPermissionType): PermissionType = fromString(permissionType.toString)
def values: Seq[PermissionType] = List(Allow, Deny)
}

View File

@ -1,85 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.security.auth
import kafka.common.KafkaException
import kafka.security.authorizer.AclEntry
import org.apache.kafka.common.resource.{PatternType, ResourcePattern}
@deprecated("Use org.apache.kafka.common.resource.ResourcePattern", "Since 2.5")
object Resource {
val Separator = AclEntry.ResourceSeparator
val ClusterResourceName = "kafka-cluster"
val ClusterResource = Resource(Cluster, Resource.ClusterResourceName, PatternType.LITERAL)
val WildCardResource = AclEntry.WildcardResource
@deprecated("This resource name is not used by Kafka and will be removed in a future release", since = "2.1")
val ProducerIdResourceName = "producer-id" // This is not used since we don't have a producer id resource
def fromString(str: String): Resource = {
ResourceType.values.find(resourceType => str.startsWith(resourceType.name + Separator)) match {
case None => throw new KafkaException("Invalid resource string: '" + str + "'")
case Some(resourceType) =>
val remaining = str.substring(resourceType.name.length + 1)
PatternType.values.find(patternType => remaining.startsWith(patternType.name + Separator)) match {
case Some(patternType) =>
val name = remaining.substring(patternType.name.length + 1)
Resource(resourceType, name, patternType)
case None =>
Resource(resourceType, remaining, PatternType.LITERAL)
}
}
}
}
/**
*
* @param resourceType non-null type of resource.
* @param name non-null name of the resource, for topic this will be topic name , for group it will be group name. For cluster type
* it will be a constant string kafka-cluster.
* @param patternType non-null resource pattern type: literal, prefixed, etc.
*/
@deprecated("Use org.apache.kafka.common.resource.ResourcePattern", "Since 2.5")
case class Resource(resourceType: ResourceType, name: String, patternType: PatternType) {
if (!patternType.isSpecific)
throw new IllegalArgumentException(s"patternType must not be $patternType")
/**
* Create an instance of this class with the provided parameters.
* Resource pattern type would default to PatternType.LITERAL.
*
* @param resourceType non-null resource type
* @param name non-null resource name
* @deprecated Since 2.0, use [[kafka.security.auth.Resource(ResourceType, String, PatternType)]]
*/
@deprecated("Use Resource(ResourceType, String, PatternType", "Since 2.0")
def this(resourceType: ResourceType, name: String) = {
this(resourceType, name, PatternType.LITERAL)
}
def toPattern: ResourcePattern = {
new ResourcePattern(resourceType.toJava, name, patternType)
}
override def toString: String = {
resourceType.name + Resource.Separator + patternType + Resource.Separator + name
}
}

View File

@ -1,93 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.security.auth
import kafka.common.{BaseEnum, KafkaException}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.resource.{ResourceType => JResourceType}
@deprecated("Use org.apache.kafka.common.resource.ResourceType", "Since 2.5")
sealed trait ResourceType extends BaseEnum with Ordered[ ResourceType ] {
def error: Errors
def toJava: JResourceType
// this method output will not include "All" Operation type
def supportedOperations: Set[Operation]
override def compare(that: ResourceType): Int = this.name compare that.name
}
@deprecated("Use org.apache.kafka.common.resource.ResourceType", "Since 2.5")
case object Topic extends ResourceType {
val name = "Topic"
val error = Errors.TOPIC_AUTHORIZATION_FAILED
val toJava = JResourceType.TOPIC
val supportedOperations = Set(Read, Write, Create, Describe, Delete, Alter, DescribeConfigs, AlterConfigs)
}
@deprecated("Use org.apache.kafka.common.resource.ResourceType", "Since 2.5")
case object Group extends ResourceType {
val name = "Group"
val error = Errors.GROUP_AUTHORIZATION_FAILED
val toJava = JResourceType.GROUP
val supportedOperations = Set(Read, Describe, Delete)
}
@deprecated("Use org.apache.kafka.common.resource.ResourceType", "Since 2.5")
case object Cluster extends ResourceType {
val name = "Cluster"
val error = Errors.CLUSTER_AUTHORIZATION_FAILED
val toJava = JResourceType.CLUSTER
val supportedOperations = Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe)
}
@deprecated("Use org.apache.kafka.common.resource.ResourceType", "Since 2.5")
case object TransactionalId extends ResourceType {
val name = "TransactionalId"
val error = Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
val toJava = JResourceType.TRANSACTIONAL_ID
val supportedOperations = Set(Describe, Write)
}
@deprecated("Use org.apache.kafka.common.resource.ResourceType", "Since 2.5")
case object DelegationToken extends ResourceType {
val name = "DelegationToken"
val error = Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED
val toJava = JResourceType.DELEGATION_TOKEN
val supportedOperations : Set[Operation] = Set(Describe)
}
@deprecated("Use org.apache.kafka.common.resource.ResourceType", "Since 2.5")
object ResourceType {
def fromString(resourceType: String): ResourceType = {
val rType = values.find(rType => rType.name.equalsIgnoreCase(resourceType))
rType.getOrElse(throw new KafkaException(resourceType + " not a valid resourceType name. The valid names are " + values.mkString(",")))
}
def fromJava(resourceType: JResourceType): ResourceType = {
resourceType match {
case JResourceType.TOPIC => Topic
case JResourceType.GROUP => Group
case JResourceType.CLUSTER => Cluster
case JResourceType.TRANSACTIONAL_ID => TransactionalId
case JResourceType.DELEGATION_TOKEN => DelegationToken
case _ => throw new KafkaException(resourceType + " is not a convertible resource type. The valid types are " + values.mkString(","))
}
}
def values: Seq[ResourceType] = List(Topic, Group, Cluster, TransactionalId, DelegationToken)
}

View File

@ -1,175 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.security.auth
import java.util
import kafka.network.RequestChannel.Session
import kafka.security.auth.SimpleAclAuthorizer.BaseAuthorizer
import kafka.security.authorizer.{AclAuthorizer, AuthorizerUtils, AuthorizerWrapper}
import kafka.utils._
import kafka.zk.ZkVersion
import org.apache.kafka.common.acl.{AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
import org.apache.kafka.common.errors.ApiException
import org.apache.kafka.common.resource.{PatternType, ResourcePatternFilter}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult}
import scala.collection.mutable
import scala.jdk.CollectionConverters._
@deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.4")
object SimpleAclAuthorizer {
//optional override zookeeper cluster configuration where acls will be stored, if not specified acls will be stored in
//same zookeeper where all other kafka broker info is stored.
val ZkUrlProp = AclAuthorizer.ZkUrlProp
val ZkConnectionTimeOutProp = AclAuthorizer.ZkConnectionTimeOutProp
val ZkSessionTimeOutProp = AclAuthorizer.ZkSessionTimeOutProp
val ZkMaxInFlightRequests = AclAuthorizer.ZkMaxInFlightRequests
//List of users that will be treated as super users and will have access to all the resources for all actions from all hosts, defaults to no super users.
val SuperUsersProp = AclAuthorizer.SuperUsersProp
//If set to true when no acls are found for a resource , authorizer allows access to everyone. Defaults to false.
val AllowEveryoneIfNoAclIsFoundProp = AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp
case class VersionedAcls(acls: Set[Acl], zkVersion: Int) {
def exists: Boolean = zkVersion != ZkVersion.UnknownVersion
}
val NoAcls = VersionedAcls(Set.empty, ZkVersion.UnknownVersion)
private[auth] class BaseAuthorizer extends AclAuthorizer {
override def logAuditMessage(requestContext: AuthorizableRequestContext, action: Action, authorized: Boolean): Unit = {
val principal = requestContext.principal
val host = requestContext.clientAddress.getHostAddress
val operation = Operation.fromJava(action.operation)
val resource = AuthorizerWrapper.convertToResource(action.resourcePattern)
def logMessage: String = {
val authResult = if (authorized) "Allowed" else "Denied"
s"Principal = $principal is $authResult Operation = $operation from host = $host on resource = $resource"
}
if (authorized) authorizerLogger.debug(logMessage)
else authorizerLogger.info(logMessage)
}
}
}
@deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.4")
class SimpleAclAuthorizer extends Authorizer with Logging {
private val aclAuthorizer = new BaseAuthorizer
// The maximum number of times we should try to update the resource acls in zookeeper before failing;
// This should never occur, but is a safeguard just in case.
protected[auth] var maxUpdateRetries = 10
/**
* Guaranteed to be called before any authorize call is made.
*/
override def configure(javaConfigs: util.Map[String, _]): Unit = {
aclAuthorizer.configure(javaConfigs)
}
override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = {
val requestContext = AuthorizerUtils.sessionToRequestContext(session)
val action = new Action(operation.toJava, resource.toPattern, 1, true, true)
aclAuthorizer.authorize(requestContext, List(action).asJava).asScala.head == AuthorizationResult.ALLOWED
}
def isSuperUser(operation: Operation, resource: Resource, principal: KafkaPrincipal, host: String): Boolean = {
aclAuthorizer.isSuperUser(principal)
}
override def addAcls(acls: Set[Acl], resource: Resource): Unit = {
aclAuthorizer.maxUpdateRetries = maxUpdateRetries
if (acls != null && acls.nonEmpty) {
val bindings = acls.map { acl => AuthorizerWrapper.convertToAclBinding(resource, acl) }
createAcls(bindings)
}
}
override def removeAcls(aclsTobeRemoved: Set[Acl], resource: Resource): Boolean = {
val filters = aclsTobeRemoved.map { acl =>
new AclBindingFilter(resource.toPattern.toFilter, AuthorizerWrapper.convertToAccessControlEntry(acl).toFilter)
}
deleteAcls(filters)
}
override def removeAcls(resource: Resource): Boolean = {
val filter = new AclBindingFilter(resource.toPattern.toFilter, AccessControlEntryFilter.ANY)
deleteAcls(Set(filter))
}
override def getAcls(resource: Resource): Set[Acl] = {
val filter = new AclBindingFilter(resource.toPattern.toFilter, AccessControlEntryFilter.ANY)
acls(filter).getOrElse(resource, Set.empty)
}
override def getAcls(principal: KafkaPrincipal): Map[Resource, Set[Acl]] = {
val filter = new AclBindingFilter(ResourcePatternFilter.ANY,
new AccessControlEntryFilter(principal.toString, null, AclOperation.ANY, AclPermissionType.ANY))
acls(filter)
}
def getMatchingAcls(resourceType: ResourceType, resourceName: String): Set[Acl] = {
val filter = new AclBindingFilter(new ResourcePatternFilter(resourceType.toJava, resourceName, PatternType.MATCH),
AccessControlEntryFilter.ANY)
acls(filter).flatMap(_._2).toSet
}
override def getAcls(): Map[Resource, Set[Acl]] = {
acls(AclBindingFilter.ANY)
}
def close(): Unit = {
aclAuthorizer.close()
}
private def createAcls(bindings: Set[AclBinding]): Unit = {
aclAuthorizer.maxUpdateRetries = maxUpdateRetries
val results = aclAuthorizer.createAcls(null, bindings.toList.asJava).asScala.map(_.toCompletableFuture.get)
results.foreach { result => result.exception.ifPresent(throwException) }
}
private def deleteAcls(filters: Set[AclBindingFilter]): Boolean = {
aclAuthorizer.maxUpdateRetries = maxUpdateRetries
val results = aclAuthorizer.deleteAcls(null, filters.toList.asJava).asScala.map(_.toCompletableFuture.get)
results.foreach { result => result.exception.ifPresent(throwException) }
results.flatMap(_.aclBindingDeleteResults.asScala).foreach { result => result.exception.ifPresent(e => throw e) }
results.exists(r => r.aclBindingDeleteResults.asScala.exists(d => !d.exception.isPresent))
}
private def acls(filter: AclBindingFilter): Map[Resource, Set[Acl]] = {
val result = mutable.Map[Resource, mutable.Set[Acl]]()
aclAuthorizer.acls(filter).forEach { binding =>
val resource = AuthorizerWrapper.convertToResource(binding.pattern)
val acl = AuthorizerWrapper.convertToAcl(binding.entry)
result.getOrElseUpdate(resource, mutable.Set()).add(acl)
}
result.mapValues(_.toSet).toMap
}
// To retain the same exceptions as in previous versions, throw the underlying exception when the exception
// was wrapped by AclAuthorizer in an ApiException
private def throwException(e: ApiException): Unit = {
if (e.getCause != null)
throw e.getCause
else
throw e
}
}

View File

@ -21,7 +21,6 @@ import java.util.concurrent.{CompletableFuture, CompletionStage}
import com.typesafe.scalalogging.Logger import com.typesafe.scalalogging.Logger
import kafka.api.KAFKA_2_0_IV1 import kafka.api.KAFKA_2_0_IV1
import kafka.security.authorizer.AclAuthorizer.{AclSeqs, ResourceOrdering, VersionedAcls}
import kafka.security.authorizer.AclEntry.ResourceSeparator import kafka.security.authorizer.AclEntry.ResourceSeparator
import kafka.server.{KafkaConfig, KafkaServer} import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils._ import kafka.utils._
@ -118,9 +117,16 @@ object AclAuthorizer {
zkClientConfig zkClientConfig
} }
} }
private def validateAclBinding(aclBinding: AclBinding): Unit = {
if (aclBinding.isUnknown)
throw new IllegalArgumentException("ACL binding contains unknown elements")
}
} }
class AclAuthorizer extends Authorizer with Logging { class AclAuthorizer extends Authorizer with Logging {
import kafka.security.authorizer.AclAuthorizer._
private[security] val authorizerLogger = Logger("kafka.authorizer.logger") private[security] val authorizerLogger = Logger("kafka.authorizer.logger")
private var superUsers = Set.empty[KafkaPrincipal] private var superUsers = Set.empty[KafkaPrincipal]
private var shouldAllowEveryoneIfNoAclIsFound = false private var shouldAllowEveryoneIfNoAclIsFound = false
@ -200,7 +206,7 @@ class AclAuthorizer extends Authorizer with Logging {
throw new UnsupportedVersionException(s"Adding ACLs on prefixed resource patterns requires " + throw new UnsupportedVersionException(s"Adding ACLs on prefixed resource patterns requires " +
s"${KafkaConfig.InterBrokerProtocolVersionProp} of $KAFKA_2_0_IV1 or greater") s"${KafkaConfig.InterBrokerProtocolVersionProp} of $KAFKA_2_0_IV1 or greater")
} }
AuthorizerUtils.validateAclBinding(aclBinding) validateAclBinding(aclBinding)
true true
} catch { } catch {
case e: Throwable => case e: Throwable =>
@ -225,7 +231,7 @@ class AclAuthorizer extends Authorizer with Logging {
} }
} }
} }
results.toList.map(CompletableFuture.completedFuture[AclCreateResult]).asJava results.toBuffer.map(CompletableFuture.completedFuture[AclCreateResult]).asJava
} }
/** /**

View File

@ -20,32 +20,15 @@ package kafka.security.authorizer
import java.net.InetAddress import java.net.InetAddress
import kafka.network.RequestChannel.Session import kafka.network.RequestChannel.Session
import kafka.security.auth.{Authorizer => LegacyAuthorizer}
import org.apache.kafka.common.acl._
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.resource.Resource import org.apache.kafka.common.resource.Resource
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer} import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, Authorizer}
import scala.annotation.nowarn
object AuthorizerUtils { object AuthorizerUtils {
@nowarn("cat=deprecation") def createAuthorizer(className: String): Authorizer = Utils.newInstance(className, classOf[Authorizer])
def createAuthorizer(className: String): Authorizer = {
Utils.newInstance(className, classOf[Object]) match {
case auth: Authorizer => auth
case auth: kafka.security.auth.Authorizer => new AuthorizerWrapper(auth)
case _ => throw new ConfigException(s"Authorizer does not implement ${classOf[Authorizer].getName} or ${classOf[LegacyAuthorizer].getName}.")
}
}
def validateAclBinding(aclBinding: AclBinding): Unit = {
if (aclBinding.isUnknown)
throw new IllegalArgumentException("ACL binding contains unknown elements")
}
def isClusterResource(name: String): Boolean = name.equals(Resource.CLUSTER_NAME) def isClusterResource(name: String): Boolean = name.equals(Resource.CLUSTER_NAME)

View File

@ -1,223 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.security.authorizer
import java.util.concurrent.{CompletableFuture, CompletionStage}
import java.{lang, util}
import kafka.network.RequestChannel.Session
import kafka.security.auth.{Acl, Operation, PermissionType, Resource, SimpleAclAuthorizer, ResourceType => ResourceTypeLegacy}
import kafka.security.authorizer.AuthorizerWrapper._
import org.apache.kafka.common.Endpoint
import org.apache.kafka.common.acl._
import org.apache.kafka.common.errors.{ApiException, InvalidRequestException}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.ApiError
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.SecurityUtils
import org.apache.kafka.common.utils.SecurityUtils.parseKafkaPrincipal
import org.apache.kafka.server.authorizer.AclDeleteResult.AclBindingDeleteResult
import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, AuthorizerServerInfo, _}
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Seq, immutable, mutable}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}
@deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5")
object AuthorizerWrapper {
def convertToResourceAndAcl(filter: AclBindingFilter): Either[ApiError, (Resource, Acl)] = {
(for {
resourceType <- Try(ResourceTypeLegacy.fromJava(filter.patternFilter.resourceType))
principal <- Try(parseKafkaPrincipal(filter.entryFilter.principal))
operation <- Try(Operation.fromJava(filter.entryFilter.operation))
permissionType <- Try(PermissionType.fromJava(filter.entryFilter.permissionType))
resource = Resource(resourceType, filter.patternFilter.name, filter.patternFilter.patternType)
acl = Acl(principal, permissionType, filter.entryFilter.host, operation)
} yield (resource, acl)) match {
case Failure(throwable) => Left(new ApiError(Errors.INVALID_REQUEST, throwable.getMessage))
case Success(s) => Right(s)
}
}
def convertToAclBinding(resource: Resource, acl: Acl): AclBinding = {
val resourcePattern = new ResourcePattern(resource.resourceType.toJava, resource.name, resource.patternType)
new AclBinding(resourcePattern, convertToAccessControlEntry(acl))
}
def convertToAccessControlEntry(acl: Acl): AccessControlEntry = {
new AccessControlEntry(acl.principal.toString, acl.host.toString,
acl.operation.toJava, acl.permissionType.toJava)
}
def convertToAcl(ace: AccessControlEntry): Acl = {
new Acl(parseKafkaPrincipal(ace.principal), PermissionType.fromJava(ace.permissionType), ace.host,
Operation.fromJava(ace.operation))
}
def convertToResource(resourcePattern: ResourcePattern): Resource = {
Resource(ResourceTypeLegacy.fromJava(resourcePattern.resourceType), resourcePattern.name, resourcePattern.patternType)
}
}
@deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5")
class AuthorizerWrapper(private[kafka] val baseAuthorizer: kafka.security.auth.Authorizer) extends Authorizer {
var shouldAllowEveryoneIfNoAclIsFound = false
override def configure(configs: util.Map[String, _]): Unit = {
baseAuthorizer.configure(configs)
shouldAllowEveryoneIfNoAclIsFound = (configs.asScala.get(
AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)
&& baseAuthorizer.isInstanceOf[SimpleAclAuthorizer])
}
override def start(serverInfo: AuthorizerServerInfo): util.Map[Endpoint, _ <: CompletionStage[Void]] = {
serverInfo.endpoints.asScala.map { endpoint =>
endpoint -> CompletableFuture.completedFuture[Void](null) }.toMap.asJava
}
override def authorize(requestContext: AuthorizableRequestContext, actions: util.List[Action]): util.List[AuthorizationResult] = {
val session = Session(requestContext.principal, requestContext.clientAddress)
actions.asScala.map { action =>
val operation = Operation.fromJava(action.operation)
if (baseAuthorizer.authorize(session, operation, convertToResource(action.resourcePattern)))
AuthorizationResult.ALLOWED
else
AuthorizationResult.DENIED
}.asJava
}
override def createAcls(requestContext: AuthorizableRequestContext,
aclBindings: util.List[AclBinding]): util.List[_ <: CompletionStage[AclCreateResult]] = {
aclBindings.asScala
.map { aclBinding =>
convertToResourceAndAcl(aclBinding.toFilter) match {
case Left(apiError) => new AclCreateResult(apiError.exception)
case Right((resource, acl)) =>
try {
baseAuthorizer.addAcls(Set(acl), resource)
AclCreateResult.SUCCESS
} catch {
case e: ApiException => new AclCreateResult(e)
case e: Throwable => new AclCreateResult(new InvalidRequestException("Failed to create ACL", e))
}
}
}.toList.map(CompletableFuture.completedFuture[AclCreateResult]).asJava
}
override def deleteAcls(requestContext: AuthorizableRequestContext,
aclBindingFilters: util.List[AclBindingFilter]): util.List[_ <: CompletionStage[AclDeleteResult]] = {
val filters = aclBindingFilters.asScala
val results = mutable.Map[Int, AclDeleteResult]()
val toDelete = mutable.Map[Int, ArrayBuffer[(Resource, Acl)]]()
if (filters.forall(_.matchesAtMostOne)) {
// Delete based on a list of ACL fixtures.
for ((filter, i) <- filters.zipWithIndex) {
convertToResourceAndAcl(filter) match {
case Left(apiError) => results.put(i, new AclDeleteResult(apiError.exception))
case Right(binding) => toDelete.put(i, ArrayBuffer(binding))
}
}
} else {
// Delete based on filters that may match more than one ACL.
val aclMap = baseAuthorizer.getAcls()
val filtersWithIndex = filters.zipWithIndex
for ((resource, acls) <- aclMap; acl <- acls) {
val binding = new AclBinding(
new ResourcePattern(resource.resourceType.toJava, resource.name, resource.patternType),
new AccessControlEntry(acl.principal.toString, acl.host.toString, acl.operation.toJava,
acl.permissionType.toJava))
for ((filter, i) <- filtersWithIndex if filter.matches(binding))
toDelete.getOrElseUpdate(i, ArrayBuffer.empty) += ((resource, acl))
}
}
for ((i, acls) <- toDelete) {
val deletionResults = acls.flatMap { case (resource, acl) =>
val aclBinding = convertToAclBinding(resource, acl)
try {
if (baseAuthorizer.removeAcls(immutable.Set(acl), resource))
Some(new AclBindingDeleteResult(aclBinding))
else None
} catch {
case throwable: Throwable =>
Some(new AclBindingDeleteResult(aclBinding, ApiError.fromThrowable(throwable).exception))
}
}.asJava
results.put(i, new AclDeleteResult(deletionResults))
}
filters.indices.map { i =>
results.getOrElse(i, new AclDeleteResult(Seq.empty[AclBindingDeleteResult].asJava))
}.map(CompletableFuture.completedFuture[AclDeleteResult]).asJava
}
override def acls(filter: AclBindingFilter): lang.Iterable[AclBinding] = {
baseAuthorizer.getAcls().flatMap { case (resource, acls) =>
acls.map(acl => convertToAclBinding(resource, acl)).filter(filter.matches)
}.asJava
}
override def close(): Unit = {
baseAuthorizer.close()
}
override def authorizeByResourceType(requestContext: AuthorizableRequestContext,
op: AclOperation,
resourceType: ResourceType): AuthorizationResult = {
SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType)
if (super.authorizeByResourceType(requestContext, op, resourceType) == AuthorizationResult.ALLOWED)
AuthorizationResult.ALLOWED
else if (denyAllResource(requestContext, op, resourceType) || !shouldAllowEveryoneIfNoAclIsFound)
AuthorizationResult.DENIED
else
AuthorizationResult.ALLOWED
}
private def denyAllResource(requestContext: AuthorizableRequestContext,
op: AclOperation,
resourceType: ResourceType): Boolean = {
val resourceTypeFilter = new ResourcePatternFilter(
resourceType, Resource.WildCardResource, PatternType.LITERAL)
val principal = new KafkaPrincipal(
requestContext.principal.getPrincipalType, requestContext.principal.getName).toString
val host = requestContext.clientAddress().getHostAddress
val entryFilter = new AccessControlEntryFilter(null, null, op, AclPermissionType.DENY)
val entryFilterAllOp = new AccessControlEntryFilter(null, null, AclOperation.ALL, AclPermissionType.DENY)
val aclFilter = new AclBindingFilter(resourceTypeFilter, entryFilter)
val aclFilterAllOp = new AclBindingFilter(resourceTypeFilter, entryFilterAllOp)
(acls(aclFilter).asScala.exists(b => principalHostMatch(b.entry(), principal, host))
|| acls(aclFilterAllOp).asScala.exists(b => principalHostMatch(b.entry(), principal, host)))
}
private def principalHostMatch(ace: AccessControlEntry,
principal: String,
host: String): Boolean = {
((ace.host() == AclEntry.WildcardHost || ace.host() == host)
&& (ace.principal() == AclEntry.WildcardPrincipalString || ace.principal() == principal))
}
}

View File

@ -679,8 +679,7 @@ object KafkaConfig {
/************* Authorizer Configuration ***********/ /************* Authorizer Configuration ***********/
val AuthorizerClassNameDoc = s"The fully qualified name of a class that implements s${classOf[Authorizer].getName}" + val AuthorizerClassNameDoc = s"The fully qualified name of a class that implements s${classOf[Authorizer].getName}" +
" interface, which is used by the broker for authorization. This config also supports authorizers that implement the deprecated" + " interface, which is used by the broker for authorization."
" kafka.security.auth.Authorizer trait which was previously used for authorization."
/** ********* Socket Server Configuration ***********/ /** ********* Socket Server Configuration ***********/
val PortDoc = "DEPRECATED: only used when <code>listeners</code> is not set. " + val PortDoc = "DEPRECATED: only used when <code>listeners</code> is not set. " +
"Use <code>listeners</code> instead. \n" + "Use <code>listeners</code> instead. \n" +

View File

@ -18,10 +18,9 @@ import java.util
import java.util.concurrent.ExecutionException import java.util.concurrent.ExecutionException
import java.util.regex.Pattern import java.util.regex.Pattern
import java.util.{Collections, Optional, Properties} import java.util.{Collections, Optional, Properties}
import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService} import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService}
import kafka.log.LogConfig import kafka.log.LogConfig
import kafka.security.authorizer.AclEntry import kafka.security.authorizer.{AclAuthorizer, AclEntry}
import kafka.security.authorizer.AclEntry.WildcardHost import kafka.security.authorizer.AclEntry.WildcardHost
import kafka.server.{BaseRequestTest, KafkaConfig} import kafka.server.{BaseRequestTest, KafkaConfig}
import kafka.utils.TestUtils import kafka.utils.TestUtils
@ -144,7 +143,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group) consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group)
override def brokerPropertyOverrides(properties: Properties): Unit = { override def brokerPropertyOverrides(properties: Properties): Unit = {
properties.put(KafkaConfig.AuthorizerClassNameProp, "kafka.security.auth.SimpleAclAuthorizer") properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName)
properties.put(KafkaConfig.BrokerIdProp, brokerId.toString) properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")

View File

@ -15,7 +15,7 @@ package kafka.api
import java.util.Properties import java.util.Properties
import java.util.concurrent.ExecutionException import java.util.concurrent.ExecutionException
import kafka.api.GroupAuthorizerIntegrationTest._ import kafka.api.GroupAuthorizerIntegrationTest._
import kafka.security.auth.SimpleAclAuthorizer import kafka.security.authorizer.AclAuthorizer
import kafka.security.authorizer.AclEntry.WildcardHost import kafka.security.authorizer.AclEntry.WildcardHost
import kafka.server.{BaseRequestTest, KafkaConfig} import kafka.server.{BaseRequestTest, KafkaConfig}
import kafka.utils.TestUtils import kafka.utils.TestUtils
@ -63,7 +63,7 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
def clientPrincipal: KafkaPrincipal = ClientPrincipal def clientPrincipal: KafkaPrincipal = ClientPrincipal
override def brokerPropertyOverrides(properties: Properties): Unit = { override def brokerPropertyOverrides(properties: Properties): Unit = {
properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName) properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AclAuthorizer].getName)
properties.put(KafkaConfig.BrokerIdProp, brokerId.toString) properties.put(KafkaConfig.BrokerIdProp, brokerId.toString)
properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") properties.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1") properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")

View File

@ -16,7 +16,7 @@
*/ */
package kafka.api package kafka.api
import kafka.security.auth.SimpleAclAuthorizer import kafka.security.authorizer.AclAuthorizer
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import kafka.utils.JaasTestUtils import kafka.utils.JaasTestUtils
import org.apache.kafka.common.config.SslConfigs import org.apache.kafka.common.config.SslConfigs
@ -25,8 +25,6 @@ import org.junit.jupiter.api.Assertions.assertNull
import scala.collection.immutable.List import scala.collection.immutable.List
// Note: this test currently uses the deprecated SimpleAclAuthorizer to ensure we have test coverage
// It must be replaced with the new AclAuthorizer when SimpleAclAuthorizer is removed
class SaslGssapiSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest { class SaslGssapiSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest {
override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
JaasTestUtils.KafkaClientPrincipalUnqualifiedName) JaasTestUtils.KafkaClientPrincipalUnqualifiedName)
@ -35,7 +33,7 @@ class SaslGssapiSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTe
override protected def kafkaClientSaslMechanism = "GSSAPI" override protected def kafkaClientSaslMechanism = "GSSAPI"
override protected def kafkaServerSaslMechanisms = List("GSSAPI") override protected def kafkaServerSaslMechanisms = List("GSSAPI")
override protected def authorizerClass = classOf[SimpleAclAuthorizer] override protected def authorizerClass = classOf[AclAuthorizer]
// Configure brokers to require SSL client authentication in order to verify that SASL_SSL works correctly even if the // Configure brokers to require SSL client authentication in order to verify that SASL_SSL works correctly even if the
// client doesn't have a keystore. We want to cover the scenario where a broker requires either SSL client // client doesn't have a keystore. We want to cover the scenario where a broker requires either SSL client

View File

@ -15,40 +15,38 @@ package kafka.api
import java.io.File import java.io.File
import java.util import java.util
import kafka.log.LogConfig import kafka.log.LogConfig
import kafka.security.authorizer.AclAuthorizer
import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
import kafka.server.{Defaults, KafkaConfig} import kafka.server.{Defaults, KafkaConfig}
import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils} import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
import kafka.utils.TestUtils._ import kafka.utils.TestUtils._
import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.admin._
import org.apache.kafka.common.Uuid import org.apache.kafka.common.Uuid
import org.apache.kafka.common.acl._ import org.apache.kafka.common.acl._
import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DELETE, DESCRIBE} import org.apache.kafka.common.acl.AclOperation.{ALL, ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DELETE, DESCRIBE}
import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY} import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException, TopicAuthorizationException, UnknownTopicOrPartitionException} import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException, TopicAuthorizationException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType} import org.apache.kafka.common.resource.PatternType.LITERAL
import org.apache.kafka.common.resource.ResourceType.{GROUP, TOPIC}
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourcePatternFilter, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.server.authorizer.Authorizer
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import scala.annotation.nowarn import java.util.Collections
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
import scala.collection.Seq import scala.collection.Seq
import scala.compat.java8.OptionConverters._ import scala.compat.java8.OptionConverters._
import scala.concurrent.ExecutionException import scala.concurrent.ExecutionException
import scala.util.{Failure, Success, Try} import scala.util.{Failure, Success, Try}
abstract class AuthorizationAdmin {
def authorizerClassName: String
def initializeAcls(): Unit
def addClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit
def removeClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit
}
// Note: this test currently uses the deprecated SimpleAclAuthorizer to ensure we have test coverage
// It must be replaced with the new AclAuthorizer when SimpleAclAuthorizer is removed
class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetup { class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetup {
@nowarn("cat=deprecation") val clusterResourcePattern = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
val authorizationAdmin: AuthorizationAdmin = new LegacyAuthorizationAdmin
val authorizationAdmin = new AclAuthorizationAdmin(classOf[AclAuthorizer], classOf[AclAuthorizer])
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
override protected def securityProtocol = SecurityProtocol.SASL_SSL override protected def securityProtocol = SecurityProtocol.SASL_SSL
@ -477,58 +475,53 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu
client.describeAcls(allTopicAcls).values.get().asScala.toSet client.describeAcls(allTopicAcls).values.get().asScala.toSet
} }
@deprecated("Use kafka.security.authorizer.AclAuthorizer", "Since 2.5") class AclAuthorizationAdmin(authorizerClass: Class[_ <: AclAuthorizer], authorizerForInitClass: Class[_ <: AclAuthorizer]) {
class LegacyAuthorizationAdmin extends AuthorizationAdmin {
import kafka.security.auth._
import kafka.security.authorizer.AuthorizerWrapper
override def authorizerClassName: String = classOf[SimpleAclAuthorizer].getName def authorizerClassName: String = authorizerClass.getName
override def initializeAcls(): Unit = { def initializeAcls(): Unit = {
val authorizer = CoreUtils.createObject[Authorizer](classOf[SimpleAclAuthorizer].getName) val authorizer = CoreUtils.createObject[Authorizer](authorizerForInitClass.getName)
try { try {
authorizer.configure(configs.head.originals()) authorizer.configure(configs.head.originals())
authorizer.addAcls(Set(new Acl(Acl.WildCardPrincipal, Allow, val ace = new AccessControlEntry(WildcardPrincipalString, WildcardHost, ALL, ALLOW)
Acl.WildCardHost, All)), new Resource(Topic, "*", PatternType.LITERAL)) authorizer.createAcls(null, List(new AclBinding(new ResourcePattern(TOPIC, "*", LITERAL), ace)).asJava)
authorizer.addAcls(Set(new Acl(Acl.WildCardPrincipal, Allow, authorizer.createAcls(null, List(new AclBinding(new ResourcePattern(GROUP, "*", LITERAL), ace)).asJava)
Acl.WildCardHost, All)), new Resource(Group, "*", PatternType.LITERAL))
authorizer.addAcls(Set(clusterAcl(ALLOW, CREATE), authorizer.createAcls(null, List(clusterAcl(ALLOW, CREATE),
clusterAcl(ALLOW, DELETE), clusterAcl(ALLOW, DELETE),
clusterAcl(ALLOW, CLUSTER_ACTION), clusterAcl(ALLOW, CLUSTER_ACTION),
clusterAcl(ALLOW, ALTER_CONFIGS), clusterAcl(ALLOW, ALTER_CONFIGS),
clusterAcl(ALLOW, ALTER)), clusterAcl(ALLOW, ALTER))
Resource.ClusterResource) .map(ace => new AclBinding(clusterResourcePattern, ace)).asJava)
} finally { } finally {
authorizer.close() authorizer.close()
} }
} }
override def addClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit = { def addClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit = {
val acls = Set(clusterAcl(permissionType, operation)) val ace = clusterAcl(permissionType, operation)
val authorizer = simpleAclAuthorizer val aclBinding = new AclBinding(clusterResourcePattern, ace)
val prevAcls = authorizer.getAcls(Resource.ClusterResource) val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get
authorizer.addAcls(acls, Resource.ClusterResource) val prevAcls = authorizer.acls(new AclBindingFilter(clusterResourcePattern.toFilter, AccessControlEntryFilter.ANY))
TestUtils.waitAndVerifyAcls(prevAcls ++ acls, authorizer, Resource.ClusterResource) .asScala.map(_.entry).toSet
authorizer.createAcls(null, Collections.singletonList(aclBinding))
TestUtils.waitAndVerifyAcls(prevAcls ++ Set(ace), authorizer, clusterResourcePattern)
} }
override def removeClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit = { def removeClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit = {
val acls = Set(clusterAcl(permissionType, operation)) val ace = clusterAcl(permissionType, operation)
val authorizer = simpleAclAuthorizer val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get
val prevAcls = authorizer.getAcls(Resource.ClusterResource) val clusterFilter = new AclBindingFilter(clusterResourcePattern.toFilter, AccessControlEntryFilter.ANY)
assertTrue(authorizer.removeAcls(acls, Resource.ClusterResource)) val prevAcls = authorizer.acls(clusterFilter).asScala.map(_.entry).toSet
TestUtils.waitAndVerifyAcls(prevAcls -- acls, authorizer, Resource.ClusterResource) val deleteFilter = new AclBindingFilter(clusterResourcePattern.toFilter, ace.toFilter)
assertFalse(authorizer.deleteAcls(null, Collections.singletonList(deleteFilter))
.get(0).toCompletableFuture.get.aclBindingDeleteResults().asScala.head.exception.isPresent)
TestUtils.waitAndVerifyAcls(prevAcls -- Set(ace), authorizer, clusterResourcePattern)
} }
private def clusterAcl(permissionType: AclPermissionType, operation: AclOperation): AccessControlEntry = {
private def clusterAcl(permissionType: AclPermissionType, operation: AclOperation): Acl = { new AccessControlEntry(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*").toString,
new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*"), PermissionType.fromJava(permissionType), WildcardHost, operation, permissionType)
Acl.WildCardHost, Operation.fromJava(operation))
}
private def simpleAclAuthorizer: Authorizer = {
val authorizerWrapper = servers.head.dataPlaneRequestProcessor.authorizer.get.asInstanceOf[AuthorizerWrapper]
authorizerWrapper.baseAuthorizer
} }
} }
} }

View File

@ -14,22 +14,16 @@ package kafka.api
import java.io.File import java.io.File
import java.util import java.util
import java.util.Collections
import java.util.concurrent._ import java.util.concurrent._
import com.yammer.metrics.core.Gauge import com.yammer.metrics.core.Gauge
import kafka.metrics.KafkaYammerMetrics import kafka.metrics.KafkaYammerMetrics
import kafka.security.authorizer.AclAuthorizer import kafka.security.authorizer.AclAuthorizer
import kafka.security.authorizer.AclEntry.{WildcardHost, WildcardPrincipalString}
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import kafka.utils.{CoreUtils, TestUtils} import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, CreateAclsResult} import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, CreateAclsResult}
import org.apache.kafka.common.acl._ import org.apache.kafka.common.acl._
import org.apache.kafka.common.acl.AclOperation._
import org.apache.kafka.common.acl.AclPermissionType._
import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType} import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
import org.apache.kafka.common.resource.PatternType._
import org.apache.kafka.common.resource.ResourceType._
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.server.authorizer._ import org.apache.kafka.server.authorizer._
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertTrue} import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertTrue}
@ -84,8 +78,7 @@ object SslAdminIntegrationTest {
} }
class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest { class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
override val authorizationAdmin = new AclAuthorizationAdmin override val authorizationAdmin = new AclAuthorizationAdmin(classOf[SslAdminIntegrationTest.TestableAclAuthorizer], classOf[AclAuthorizer])
val clusterResourcePattern = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
@ -266,54 +259,4 @@ class SslAdminIntegrationTest extends SaslSslAdminIntegrationTest {
assertTrue(metrics.nonEmpty, s"Unable to find metric $name: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}") assertTrue(metrics.nonEmpty, s"Unable to find metric $name: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}")
metrics.map(_.asInstanceOf[Gauge[Int]].value).sum metrics.map(_.asInstanceOf[Gauge[Int]].value).sum
} }
class AclAuthorizationAdmin extends AuthorizationAdmin {
override def authorizerClassName: String = classOf[SslAdminIntegrationTest.TestableAclAuthorizer].getName
override def initializeAcls(): Unit = {
val authorizer = CoreUtils.createObject[Authorizer](classOf[AclAuthorizer].getName)
try {
authorizer.configure(configs.head.originals())
val ace = new AccessControlEntry(WildcardPrincipalString, WildcardHost, ALL, ALLOW)
authorizer.createAcls(null, List(new AclBinding(new ResourcePattern(TOPIC, "*", LITERAL), ace)).asJava)
authorizer.createAcls(null, List(new AclBinding(new ResourcePattern(GROUP, "*", LITERAL), ace)).asJava)
authorizer.createAcls(null, List(clusterAcl(ALLOW, CREATE),
clusterAcl(ALLOW, DELETE),
clusterAcl(ALLOW, CLUSTER_ACTION),
clusterAcl(ALLOW, ALTER_CONFIGS),
clusterAcl(ALLOW, ALTER))
.map(ace => new AclBinding(clusterResourcePattern, ace)).asJava)
} finally {
authorizer.close()
}
}
override def addClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit = {
val ace = clusterAcl(permissionType, operation)
val aclBinding = new AclBinding(clusterResourcePattern, ace)
val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get
val prevAcls = authorizer.acls(new AclBindingFilter(clusterResourcePattern.toFilter, AccessControlEntryFilter.ANY))
.asScala.map(_.entry).toSet
authorizer.createAcls(null, Collections.singletonList(aclBinding))
TestUtils.waitAndVerifyAcls(prevAcls ++ Set(ace), authorizer, clusterResourcePattern)
}
override def removeClusterAcl(permissionType: AclPermissionType, operation: AclOperation): Unit = {
val ace = clusterAcl(permissionType, operation)
val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get
val clusterFilter = new AclBindingFilter(clusterResourcePattern.toFilter, AccessControlEntryFilter.ANY)
val prevAcls = authorizer.acls(clusterFilter).asScala.map(_.entry).toSet
val deleteFilter = new AclBindingFilter(clusterResourcePattern.toFilter, ace.toFilter)
assertFalse(authorizer.deleteAcls(null, Collections.singletonList(deleteFilter))
.get(0).toCompletableFuture.get.aclBindingDeleteResults().asScala.head.exception.isPresent)
TestUtils.waitAndVerifyAcls(prevAcls -- Set(ace), authorizer, clusterResourcePattern)
}
private def clusterAcl(permissionType: AclPermissionType, operation: AclOperation): AccessControlEntry = {
new AccessControlEntry(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*").toString,
WildcardHost, operation, permissionType)
}
}
} }

View File

@ -1,68 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.security.auth
import kafka.common.KafkaException
import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Assertions._
@deprecated("Use org.apache.kafka.common.resource.ResourcePattern", "Since 2.5")
class ResourceTest {
@Test
def shouldThrowOnTwoPartStringWithUnknownResourceType(): Unit = {
assertThrows(classOf[KafkaException], () => Resource.fromString("Unknown:fred"))
}
@Test
def shouldThrowOnBadResourceTypeSeparator(): Unit = {
assertThrows(classOf[KafkaException], () => Resource.fromString("Topic-fred"))
}
@Test
def shouldParseOldTwoPartString(): Unit = {
assertEquals(Resource(Group, "fred", LITERAL), Resource.fromString("Group:fred"))
assertEquals(Resource(Topic, "t", LITERAL), Resource.fromString("Topic:t"))
}
@Test
def shouldParseOldTwoPartWithEmbeddedSeparators(): Unit = {
assertEquals(Resource(Group, ":This:is:a:weird:group:name:", LITERAL), Resource.fromString("Group::This:is:a:weird:group:name:"))
}
@Test
def shouldParseThreePartString(): Unit = {
assertEquals(Resource(Group, "fred", PREFIXED), Resource.fromString("Group:PREFIXED:fred"))
assertEquals(Resource(Topic, "t", LITERAL), Resource.fromString("Topic:LITERAL:t"))
}
@Test
def shouldParseThreePartWithEmbeddedSeparators(): Unit = {
assertEquals(Resource(Group, ":This:is:a:weird:group:name:", PREFIXED), Resource.fromString("Group:PREFIXED::This:is:a:weird:group:name:"))
assertEquals(Resource(Group, ":This:is:a:weird:group:name:", LITERAL), Resource.fromString("Group:LITERAL::This:is:a:weird:group:name:"))
}
@Test
def shouldRoundTripViaString(): Unit = {
val expected = Resource(Group, "fred", PREFIXED)
val actual = Resource.fromString(expected.toString)
assertEquals(expected, actual)
}
}

View File

@ -1,40 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.security.auth
import org.apache.kafka.common.acl.AclOperation
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
@deprecated("Scala Authorizer API classes gave been deprecated", "Since 2.5")
class OperationTest {
/**
* Test round trip conversions between org.apache.kafka.common.acl.AclOperation and
* kafka.security.auth.Operation.
*/
@Test
def testJavaConversions(): Unit = {
AclOperation.values.foreach {
case AclOperation.UNKNOWN | AclOperation.ANY =>
case aclOp =>
val op = Operation.fromJava(aclOp)
val aclOp2 = op.toJava
assertEquals(aclOp, aclOp2)
}
}
}

View File

@ -1,49 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.security.auth
import kafka.common.KafkaException
import org.apache.kafka.common.acl.AclPermissionType
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@deprecated("Scala Authorizer API classes gave been deprecated", "Since 2.5")
class PermissionTypeTest {
@Test
def testFromString(): Unit = {
val permissionType = PermissionType.fromString("Allow")
assertEquals(Allow, permissionType)
assertThrows(classOf[KafkaException], () => PermissionType.fromString("badName"))
}
/**
* Test round trip conversions between org.apache.kafka.common.acl.AclPermissionType and
* kafka.security.auth.PermissionType.
*/
@Test
def testJavaConversions(): Unit = {
AclPermissionType.values().foreach {
case AclPermissionType.UNKNOWN | AclPermissionType.ANY =>
case aclPerm =>
val perm = PermissionType.fromJava(aclPerm)
val aclPerm2 = perm.toJava
assertEquals(aclPerm, aclPerm2)
}
}
}

View File

@ -1,48 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.security.auth
import kafka.common.KafkaException
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.apache.kafka.common.resource.{ResourceType => JResourceType}
@deprecated("Scala Authorizer API classes gave been deprecated", "Since 2.5")
class ResourceTypeTest {
@Test
def testFromString(): Unit = {
val resourceType = ResourceType.fromString("Topic")
assertEquals(Topic, resourceType)
assertThrows(classOf[KafkaException], () => ResourceType.fromString("badName"))
}
/**
* Test round trip conversions between org.apache.kafka.common.acl.ResourceType and
* kafka.security.auth.ResourceType.
*/
@Test
def testJavaConversions(): Unit = {
JResourceType.values.foreach {
case JResourceType.UNKNOWN | JResourceType.ANY =>
case jResourceType =>
val resourceType = ResourceType.fromJava(jResourceType)
val jResourceType2 = resourceType.toJava
assertEquals(jResourceType, jResourceType2)
}
}
}

View File

@ -1,731 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.security.auth
import java.net.InetAddress
import java.nio.charset.StandardCharsets.UTF_8
import java.util.UUID
import kafka.api.{ApiVersion, KAFKA_2_0_IV0, KAFKA_2_0_IV1}
import kafka.network.RequestChannel.Session
import kafka.security.auth.Acl.{WildCardHost, WildCardResource}
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import kafka.zk.{ZkAclStore, ZooKeeperTestHarness}
import kafka.zookeeper.{GetChildrenRequest, GetDataRequest, ZooKeeperClient}
import org.apache.kafka.common.errors.UnsupportedVersionException
import org.apache.kafka.common.resource.PatternType
import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Time
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
@deprecated("Use AclAuthorizer", "Since 2.4")
class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
private val allowReadAcl = Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Read)
private val allowWriteAcl = Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Write)
private val denyReadAcl = Acl(Acl.WildCardPrincipal, Deny, WildCardHost, Read)
private val wildCardResource = Resource(Topic, WildCardResource, LITERAL)
private val prefixedResource = Resource(Topic, "foo", PREFIXED)
private val simpleAclAuthorizer = new SimpleAclAuthorizer
private val simpleAclAuthorizer2 = new SimpleAclAuthorizer
private var resource: Resource = _
private val superUsers = "User:superuser1; User:superuser2"
private val username = "alice"
private val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
private val session = Session(principal, InetAddress.getByName("192.168.0.1"))
private var config: KafkaConfig = _
private var zooKeeperClient: ZooKeeperClient = _
class CustomPrincipal(principalType: String, name: String) extends KafkaPrincipal(principalType, name) {
override def equals(o: scala.Any): Boolean = false
}
@BeforeEach
override def setUp(): Unit = {
super.setUp()
// Increase maxUpdateRetries to avoid transient failures
simpleAclAuthorizer.maxUpdateRetries = Int.MaxValue
simpleAclAuthorizer2.maxUpdateRetries = Int.MaxValue
val props = TestUtils.createBrokerConfig(0, zkConnect)
props.put(SimpleAclAuthorizer.SuperUsersProp, superUsers)
config = KafkaConfig.fromProps(props)
simpleAclAuthorizer.configure(config.originals)
simpleAclAuthorizer2.configure(config.originals)
resource = Resource(Topic, "foo-" + UUID.randomUUID(), LITERAL)
zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests,
Time.SYSTEM, "kafka.test", "SimpleAclAuthorizerTest")
}
@AfterEach
override def tearDown(): Unit = {
simpleAclAuthorizer.close()
simpleAclAuthorizer2.close()
zooKeeperClient.close()
super.tearDown()
}
@Test
def testAuthorizeThrowsOnNonLiteralResource(): Unit = {
assertThrows(classOf[IllegalArgumentException], () => simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "something", PREFIXED)))
}
@Test
def testAuthorizeWithEmptyResourceName(): Unit = {
assertFalse(simpleAclAuthorizer.authorize(session, Read, Resource(Group, "", LITERAL)))
simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), Resource(Group, WildCardResource, LITERAL))
assertTrue(simpleAclAuthorizer.authorize(session, Read, Resource(Group, "", LITERAL)))
}
// Authorizing the empty resource is not supported because we create a znode with the resource name.
@Test
def testEmptyAclThrowsException(): Unit = {
assertThrows(classOf[IllegalArgumentException], () => simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), Resource(Group, "", LITERAL)))
}
@Test
def testTopicAcl(): Unit = {
val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "rob")
val user3 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "batman")
val host1 = InetAddress.getByName("192.168.1.1")
val host2 = InetAddress.getByName("192.168.1.2")
//user1 has READ access from host1 and host2.
val acl1 = new Acl(user1, Allow, host1.getHostAddress, Read)
val acl2 = new Acl(user1, Allow, host2.getHostAddress, Read)
//user1 does not have READ access from host1.
val acl3 = new Acl(user1, Deny, host1.getHostAddress, Read)
//user1 has Write access from host1 only.
val acl4 = new Acl(user1, Allow, host1.getHostAddress, Write)
//user1 has DESCRIBE access from all hosts.
val acl5 = new Acl(user1, Allow, WildCardHost, Describe)
//user2 has READ access from all hosts.
val acl6 = new Acl(user2, Allow, WildCardHost, Read)
//user3 has WRITE access from all hosts.
val acl7 = new Acl(user3, Allow, WildCardHost, Write)
val acls = Set[Acl](acl1, acl2, acl3, acl4, acl5, acl6, acl7)
changeAclAndVerify(Set.empty[Acl], acls, Set.empty[Acl])
val host1Session = Session(user1, host1)
val host2Session = Session(user1, host2)
assertTrue(simpleAclAuthorizer.authorize(host2Session, Read, resource), "User1 should have READ access from host2")
assertFalse(simpleAclAuthorizer.authorize(host1Session, Read, resource), "User1 should not have READ access from host1 due to denyAcl")
assertTrue(simpleAclAuthorizer.authorize(host1Session, Write, resource), "User1 should have WRITE access from host1")
assertFalse(simpleAclAuthorizer.authorize(host2Session, Write, resource), "User1 should not have WRITE access from host2 as no allow acl is defined")
assertTrue(simpleAclAuthorizer.authorize(host1Session, Describe, resource), "User1 should not have DESCRIBE access from host1")
assertTrue(simpleAclAuthorizer.authorize(host2Session, Describe, resource), "User1 should have DESCRIBE access from host2")
assertFalse(simpleAclAuthorizer.authorize(host1Session, Alter, resource), "User1 should not have edit access from host1")
assertFalse(simpleAclAuthorizer.authorize(host2Session, Alter, resource), "User1 should not have edit access from host2")
//test if user has READ and write access they also get describe access
val user2Session = Session(user2, host1)
val user3Session = Session(user3, host1)
assertTrue(simpleAclAuthorizer.authorize(user2Session, Describe, resource), "User2 should have DESCRIBE access from host1")
assertTrue(simpleAclAuthorizer.authorize(user3Session, Describe, resource), "User3 should have DESCRIBE access from host2")
assertTrue(simpleAclAuthorizer.authorize(user2Session, Read, resource), "User2 should have READ access from host1")
assertTrue(simpleAclAuthorizer.authorize(user3Session, Write, resource), "User3 should have WRITE access from host2")
}
/**
CustomPrincipals should be compared with their principal type and name
*/
@Test
def testAllowAccessWithCustomPrincipal(): Unit = {
val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val customUserPrincipal = new CustomPrincipal(KafkaPrincipal.USER_TYPE, username)
val host1 = InetAddress.getByName("192.168.1.1")
val host2 = InetAddress.getByName("192.168.1.2")
// user has READ access from host2 but not from host1
val acl1 = new Acl(user, Deny, host1.getHostAddress, Read)
val acl2 = new Acl(user, Allow, host2.getHostAddress, Read)
val acls = Set[Acl](acl1, acl2)
changeAclAndVerify(Set.empty[Acl], acls, Set.empty[Acl])
val host1Session = Session(customUserPrincipal, host1)
val host2Session = Session(customUserPrincipal, host2)
assertTrue(simpleAclAuthorizer.authorize(host2Session, Read, resource), "User1 should have READ access from host2")
assertFalse(simpleAclAuthorizer.authorize(host1Session, Read, resource), "User1 should not have READ access from host1 due to denyAcl")
}
@Test
def testDenyTakesPrecedence(): Unit = {
val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val host = InetAddress.getByName("192.168.2.1")
val session = Session(user, host)
val allowAll = Acl.AllowAllAcl
val denyAcl = new Acl(user, Deny, host.getHostAddress, All)
val acls = Set[Acl](allowAll, denyAcl)
changeAclAndVerify(Set.empty[Acl], acls, Set.empty[Acl])
assertFalse(simpleAclAuthorizer.authorize(session, Read, resource), "deny should take precedence over allow.")
}
@Test
def testAllowAllAccess(): Unit = {
val allowAllAcl = Acl.AllowAllAcl
changeAclAndVerify(Set.empty[Acl], Set[Acl](allowAllAcl), Set.empty[Acl])
val session = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "random"), InetAddress.getByName("192.0.4.4"))
assertTrue(simpleAclAuthorizer.authorize(session, Read, resource), "allow all acl should allow access to all.")
}
@Test
def testSuperUserHasAccess(): Unit = {
val denyAllAcl = new Acl(Acl.WildCardPrincipal, Deny, WildCardHost, All)
changeAclAndVerify(Set.empty[Acl], Set[Acl](denyAllAcl), Set.empty[Acl])
val session1 = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"), InetAddress.getByName("192.0.4.4"))
val session2 = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser2"), InetAddress.getByName("192.0.4.4"))
assertTrue(simpleAclAuthorizer.authorize(session1, Read, resource), "superuser always has access, no matter what acls.")
assertTrue(simpleAclAuthorizer.authorize(session2, Read, resource), "superuser always has access, no matter what acls.")
}
/**
CustomPrincipals should be compared with their principal type and name
*/
@Test
def testSuperUserWithCustomPrincipalHasAccess(): Unit = {
val denyAllAcl = new Acl(Acl.WildCardPrincipal, Deny, WildCardHost, All)
changeAclAndVerify(Set.empty[Acl], Set[Acl](denyAllAcl), Set.empty[Acl])
val session = Session(new CustomPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"), InetAddress.getByName("192.0.4.4"))
assertTrue(simpleAclAuthorizer.authorize(session, Read, resource), "superuser with custom principal always has access, no matter what acls.")
}
@Test
def testWildCardAcls(): Unit = {
assertFalse(simpleAclAuthorizer.authorize(session, Read, resource), "when acls = [], authorizer should fail close.")
val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val host1 = InetAddress.getByName("192.168.3.1")
val readAcl = new Acl(user1, Allow, host1.getHostAddress, Read)
val acls = changeAclAndVerify(Set.empty[Acl], Set[Acl](readAcl), Set.empty[Acl], wildCardResource)
val host1Session = Session(user1, host1)
assertTrue(simpleAclAuthorizer.authorize(host1Session, Read, resource), "User1 should have Read access from host1")
//allow Write to specific topic.
val writeAcl = new Acl(user1, Allow, host1.getHostAddress, Write)
changeAclAndVerify(Set.empty[Acl], Set[Acl](writeAcl), Set.empty[Acl])
//deny Write to wild card topic.
val denyWriteOnWildCardResourceAcl = new Acl(user1, Deny, host1.getHostAddress, Write)
changeAclAndVerify(acls, Set[Acl](denyWriteOnWildCardResourceAcl), Set.empty[Acl], wildCardResource)
assertFalse(simpleAclAuthorizer.authorize(host1Session, Write, resource), "User1 should not have Write access from host1")
}
@Test
def testNoAclFound(): Unit = {
assertFalse(simpleAclAuthorizer.authorize(session, Read, resource), "when acls = [], authorizer should fail close.")
}
@Test
def testNoAclFoundOverride(): Unit = {
val props = TestUtils.createBrokerConfig(1, zkConnect)
props.put(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true")
val cfg = KafkaConfig.fromProps(props)
val testAuthorizer = new SimpleAclAuthorizer
try {
testAuthorizer.configure(cfg.originals)
assertTrue(testAuthorizer.authorize(session, Read, resource), "when acls = null or [], authorizer should fail open with allow.everyone = true.")
} finally {
testAuthorizer.close()
}
}
@Test
def testAclManagementAPIs(): Unit = {
val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
val host1 = "host1"
val host2 = "host2"
val acl1 = new Acl(user1, Allow, host1, Read)
val acl2 = new Acl(user1, Allow, host1, Write)
val acl3 = new Acl(user2, Allow, host2, Read)
val acl4 = new Acl(user2, Allow, host2, Write)
var acls = changeAclAndVerify(Set.empty[Acl], Set[Acl](acl1, acl2, acl3, acl4), Set.empty[Acl])
//test addAcl is additive
val acl5 = new Acl(user2, Allow, WildCardHost, Read)
acls = changeAclAndVerify(acls, Set[Acl](acl5), Set.empty[Acl])
//test get by principal name.
TestUtils.waitUntilTrue(() => Map(resource -> Set(acl1, acl2)) == simpleAclAuthorizer.getAcls(user1), "changes not propagated in timeout period")
TestUtils.waitUntilTrue(() => Map(resource -> Set(acl3, acl4, acl5)) == simpleAclAuthorizer.getAcls(user2), "changes not propagated in timeout period")
val resourceToAcls = Map[Resource, Set[Acl]](
new Resource(Topic, Resource.WildCardResource, LITERAL) -> Set[Acl](new Acl(user2, Allow, WildCardHost, Read)),
new Resource(Cluster, Resource.WildCardResource, LITERAL) -> Set[Acl](new Acl(user2, Allow, host1, Read)),
new Resource(Group, Resource.WildCardResource, LITERAL) -> acls,
new Resource(Group, "test-ConsumerGroup", LITERAL) -> acls
)
resourceToAcls foreach { case (key, value) => changeAclAndVerify(Set.empty[Acl], value, Set.empty[Acl], key) }
TestUtils.waitUntilTrue(() => resourceToAcls + (resource -> acls) == simpleAclAuthorizer.getAcls(), "changes not propagated in timeout period.")
//test remove acl from existing acls.
acls = changeAclAndVerify(acls, Set.empty[Acl], Set(acl1, acl5))
//test remove all acls for resource
simpleAclAuthorizer.removeAcls(resource)
TestUtils.waitAndVerifyAcls(Set.empty[Acl], simpleAclAuthorizer, resource)
assertFalse(zkClient.resourceExists(resource.toPattern))
//test removing last acl also deletes ZooKeeper path
acls = changeAclAndVerify(Set.empty[Acl], Set(acl1), Set.empty[Acl])
changeAclAndVerify(acls, Set.empty[Acl], acls)
assertFalse(zkClient.resourceExists(resource.toPattern))
}
@Test
def testLoadCache(): Unit = {
val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val acl1 = new Acl(user1, Allow, "host-1", Read)
val acls = Set[Acl](acl1)
simpleAclAuthorizer.addAcls(acls, resource)
val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
val resource1 = Resource(Topic, "test-2", LITERAL)
val acl2 = new Acl(user2, Deny, "host3", Read)
val acls1 = Set[Acl](acl2)
simpleAclAuthorizer.addAcls(acls1, resource1)
zkClient.deleteAclChangeNotifications()
val authorizer = new SimpleAclAuthorizer
try {
authorizer.configure(config.originals)
assertEquals(acls, authorizer.getAcls(resource))
assertEquals(acls1, authorizer.getAcls(resource1))
} finally {
authorizer.close()
}
}
@Test
def testLocalConcurrentModificationOfResourceAcls(): Unit = {
val commonResource = Resource(Topic, "test", LITERAL)
val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val acl1 = new Acl(user1, Allow, WildCardHost, Read)
val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
val acl2 = new Acl(user2, Deny, WildCardHost, Read)
simpleAclAuthorizer.addAcls(Set(acl1), commonResource)
simpleAclAuthorizer.addAcls(Set(acl2), commonResource)
TestUtils.waitAndVerifyAcls(Set(acl1, acl2), simpleAclAuthorizer, commonResource)
}
@Test
def testDistributedConcurrentModificationOfResourceAcls(): Unit = {
val commonResource = Resource(Topic, "test", LITERAL)
val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val acl1 = new Acl(user1, Allow, WildCardHost, Read)
val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
val acl2 = new Acl(user2, Deny, WildCardHost, Read)
// Add on each instance
simpleAclAuthorizer.addAcls(Set(acl1), commonResource)
simpleAclAuthorizer2.addAcls(Set(acl2), commonResource)
TestUtils.waitAndVerifyAcls(Set(acl1, acl2), simpleAclAuthorizer, commonResource)
TestUtils.waitAndVerifyAcls(Set(acl1, acl2), simpleAclAuthorizer2, commonResource)
val user3 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "joe")
val acl3 = new Acl(user3, Deny, WildCardHost, Read)
// Add on one instance and delete on another
simpleAclAuthorizer.addAcls(Set(acl3), commonResource)
val deleted = simpleAclAuthorizer2.removeAcls(Set(acl3), commonResource)
assertTrue(deleted, "The authorizer should see a value that needs to be deleted")
TestUtils.waitAndVerifyAcls(Set(acl1, acl2), simpleAclAuthorizer, commonResource)
TestUtils.waitAndVerifyAcls(Set(acl1, acl2), simpleAclAuthorizer2, commonResource)
}
@Test
def testHighConcurrencyModificationOfResourceAcls(): Unit = {
val commonResource = Resource(Topic, "test", LITERAL)
val acls = (0 to 50).map { i =>
val useri = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, i.toString)
new Acl(useri, Allow, WildCardHost, Read)
}
// Alternate authorizer, Remove all acls that end in 0
val concurrentFuctions = acls.map { acl =>
() => {
val aclId = acl.principal.getName.toInt
if (aclId % 2 == 0) {
simpleAclAuthorizer.addAcls(Set(acl), commonResource)
} else {
simpleAclAuthorizer2.addAcls(Set(acl), commonResource)
}
if (aclId % 10 == 0) {
simpleAclAuthorizer2.removeAcls(Set(acl), commonResource)
}
}
}
val expectedAcls = acls.filter { acl =>
val aclId = acl.principal.getName.toInt
aclId % 10 != 0
}.toSet
TestUtils.assertConcurrent("Should support many concurrent calls", concurrentFuctions, 30 * 1000)
TestUtils.waitAndVerifyAcls(expectedAcls, simpleAclAuthorizer, commonResource)
TestUtils.waitAndVerifyAcls(expectedAcls, simpleAclAuthorizer2, commonResource)
}
/**
* Test ACL inheritance, as described in #{org.apache.kafka.common.acl.AclOperation}
*/
@Test
def testAclInheritance(): Unit = {
testImplicationsOfAllow(All, Set(Read, Write, Create, Delete, Alter, Describe,
ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite))
testImplicationsOfDeny(All, Set(Read, Write, Create, Delete, Alter, Describe,
ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite))
testImplicationsOfAllow(Read, Set(Describe))
testImplicationsOfAllow(Write, Set(Describe))
testImplicationsOfAllow(Delete, Set(Describe))
testImplicationsOfAllow(Alter, Set(Describe))
testImplicationsOfDeny(Describe, Set())
testImplicationsOfAllow(AlterConfigs, Set(DescribeConfigs))
testImplicationsOfDeny(DescribeConfigs, Set())
}
private def testImplicationsOfAllow(parentOp: Operation, allowedOps: Set[Operation]): Unit = {
val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val host = InetAddress.getByName("192.168.3.1")
val hostSession = Session(user, host)
val acl = Acl(user, Allow, WildCardHost, parentOp)
simpleAclAuthorizer.addAcls(Set(acl), Resource.ClusterResource)
Operation.values.foreach { op =>
val authorized = simpleAclAuthorizer.authorize(hostSession, op, Resource.ClusterResource)
if (allowedOps.contains(op) || op == parentOp)
assertTrue(authorized, s"ALLOW $parentOp should imply ALLOW $op")
else
assertFalse(authorized, s"ALLOW $parentOp should not imply ALLOW $op")
}
simpleAclAuthorizer.removeAcls(Set(acl), Resource.ClusterResource)
}
private def testImplicationsOfDeny(parentOp: Operation, deniedOps: Set[Operation]): Unit = {
val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
val host1 = InetAddress.getByName("192.168.3.1")
val host1Session = Session(user1, host1)
val acls = Set(Acl(user1, Deny, WildCardHost, parentOp), Acl(user1, Allow, WildCardHost, All))
simpleAclAuthorizer.addAcls(acls, Resource.ClusterResource)
Operation.values.foreach { op =>
val authorized = simpleAclAuthorizer.authorize(host1Session, op, Resource.ClusterResource)
if (deniedOps.contains(op) || op == parentOp)
assertFalse(authorized, s"DENY $parentOp should imply DENY $op")
else
assertTrue(authorized, s"DENY $parentOp should not imply DENY $op")
}
simpleAclAuthorizer.removeAcls(acls, Resource.ClusterResource)
}
@Test
def testHighConcurrencyDeletionOfResourceAcls(): Unit = {
val acl = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username), Allow, WildCardHost, All)
// Alternate authorizer to keep adding and removing ZooKeeper path
val concurrentFuctions = (0 to 50).map { _ =>
() => {
simpleAclAuthorizer.addAcls(Set(acl), resource)
simpleAclAuthorizer2.removeAcls(Set(acl), resource)
}
}
TestUtils.assertConcurrent("Should support many concurrent calls", concurrentFuctions, 30 * 1000)
TestUtils.waitAndVerifyAcls(Set.empty[Acl], simpleAclAuthorizer, resource)
TestUtils.waitAndVerifyAcls(Set.empty[Acl], simpleAclAuthorizer2, resource)
}
@Test
def testAccessAllowedIfAllowAclExistsOnWildcardResource(): Unit = {
simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), wildCardResource)
assertTrue(simpleAclAuthorizer.authorize(session, Read, resource))
}
@Test
def testDeleteAclOnWildcardResource(): Unit = {
simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), wildCardResource)
simpleAclAuthorizer.removeAcls(Set[Acl](allowReadAcl), wildCardResource)
assertEquals(Set(allowWriteAcl), simpleAclAuthorizer.getAcls(wildCardResource))
}
@Test
def testDeleteAllAclOnWildcardResource(): Unit = {
simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), wildCardResource)
simpleAclAuthorizer.removeAcls(wildCardResource)
assertEquals(Map(), simpleAclAuthorizer.getAcls())
}
@Test
def testAccessAllowedIfAllowAclExistsOnPrefixedResource(): Unit = {
simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), prefixedResource)
assertTrue(simpleAclAuthorizer.authorize(session, Read, resource))
}
@Test
def testDeleteAclOnPrefixedResource(): Unit = {
simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), prefixedResource)
simpleAclAuthorizer.removeAcls(Set[Acl](allowReadAcl), prefixedResource)
assertEquals(Set(allowWriteAcl), simpleAclAuthorizer.getAcls(prefixedResource))
}
@Test
def testDeleteAllAclOnPrefixedResource(): Unit = {
simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), prefixedResource)
simpleAclAuthorizer.removeAcls(prefixedResource)
assertEquals(Map(), simpleAclAuthorizer.getAcls())
}
@Test
def testAddAclsOnLiteralResource(): Unit = {
simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), resource)
simpleAclAuthorizer.addAcls(Set[Acl](allowWriteAcl, denyReadAcl), resource)
assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), simpleAclAuthorizer.getAcls(resource))
assertEquals(Set(), simpleAclAuthorizer.getAcls(wildCardResource))
assertEquals(Set(), simpleAclAuthorizer.getAcls(prefixedResource))
}
@Test
def testAddAclsOnWildcardResource(): Unit = {
simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), wildCardResource)
simpleAclAuthorizer.addAcls(Set[Acl](allowWriteAcl, denyReadAcl), wildCardResource)
assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), simpleAclAuthorizer.getAcls(wildCardResource))
assertEquals(Set(), simpleAclAuthorizer.getAcls(resource))
assertEquals(Set(), simpleAclAuthorizer.getAcls(prefixedResource))
}
@Test
def testAddAclsOnPrefiexedResource(): Unit = {
simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl, allowWriteAcl), prefixedResource)
simpleAclAuthorizer.addAcls(Set[Acl](allowWriteAcl, denyReadAcl), prefixedResource)
assertEquals(Set(allowReadAcl, allowWriteAcl, denyReadAcl), simpleAclAuthorizer.getAcls(prefixedResource))
assertEquals(Set(), simpleAclAuthorizer.getAcls(wildCardResource))
assertEquals(Set(), simpleAclAuthorizer.getAcls(resource))
}
@Test
def testAuthorizeWithPrefixedResource(): Unit = {
simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "a_other", LITERAL))
simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "a_other", PREFIXED))
simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID(), PREFIXED))
simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID(), PREFIXED))
simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "foo-" + UUID.randomUUID() + "-zzz", PREFIXED))
simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fooo-" + UUID.randomUUID(), PREFIXED))
simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fo-" + UUID.randomUUID(), PREFIXED))
simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fop-" + UUID.randomUUID(), PREFIXED))
simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fon-" + UUID.randomUUID(), PREFIXED))
simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "fon-", PREFIXED))
simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", PREFIXED))
simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", LITERAL))
simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), prefixedResource)
assertTrue(simpleAclAuthorizer.authorize(session, Read, resource))
}
@Test
def testSingleCharacterResourceAcls(): Unit = {
simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), Resource(Topic, "f", LITERAL))
assertTrue(simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "f", LITERAL)))
assertFalse(simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "foo", LITERAL)))
simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), Resource(Topic, "_", PREFIXED))
assertTrue(simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "_foo", LITERAL)))
assertTrue(simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "_", LITERAL)))
assertFalse(simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "foo_", LITERAL)))
}
@Test
def testGetAclsPrincipal(): Unit = {
val aclOnSpecificPrincipal = new Acl(principal, Allow, WildCardHost, Write)
simpleAclAuthorizer.addAcls(Set[Acl](aclOnSpecificPrincipal), resource)
assertEquals(0, simpleAclAuthorizer.getAcls(Acl.WildCardPrincipal).size,
"acl on specific should not be returned for wildcard request")
assertEquals(1, simpleAclAuthorizer.getAcls(principal).size,
"acl on specific should be returned for specific request")
assertEquals(1, simpleAclAuthorizer.getAcls(new KafkaPrincipal(principal.getPrincipalType, principal.getName)).size,
"acl on specific should be returned for different principal instance")
simpleAclAuthorizer.removeAcls(resource)
val aclOnWildcardPrincipal = new Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Write)
simpleAclAuthorizer.addAcls(Set[Acl](aclOnWildcardPrincipal), resource)
assertEquals(1, simpleAclAuthorizer.getAcls(Acl.WildCardPrincipal).size,
"acl on wildcard should be returned for wildcard request")
assertEquals(0, simpleAclAuthorizer.getAcls(principal).size,
"acl on wildcard should not be returned for specific request")
}
@Test
def testThrowsOnAddPrefixedAclIfInterBrokerProtocolVersionTooLow(): Unit = {
givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV0))
assertThrows(classOf[UnsupportedVersionException], () => simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), Resource(Topic, "z_other", PREFIXED)))
}
@Test
def testWritesExtendedAclChangeEventIfInterBrokerProtocolNotSet(): Unit = {
givenAuthorizerWithProtocolVersion(Option.empty)
val resource = Resource(Topic, "z_other", PREFIXED)
val expected = new String(ZkAclStore(PREFIXED).changeStore.createChangeNode(resource.toPattern).bytes, UTF_8)
simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource)
val actual = getAclChangeEventAsString(PREFIXED)
assertEquals(expected, actual)
}
@Test
def testWritesExtendedAclChangeEventWhenInterBrokerProtocolAtLeastKafkaV2(): Unit = {
givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV1))
val resource = Resource(Topic, "z_other", PREFIXED)
val expected = new String(ZkAclStore(PREFIXED).changeStore.createChangeNode(resource.toPattern).bytes, UTF_8)
simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource)
val actual = getAclChangeEventAsString(PREFIXED)
assertEquals(expected, actual)
}
@Test
def testWritesLiteralWritesLiteralAclChangeEventWhenInterBrokerProtocolLessThanKafkaV2eralAclChangesForOlderProtocolVersions(): Unit = {
givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV0))
val resource = Resource(Topic, "z_other", LITERAL)
val expected = new String(ZkAclStore(LITERAL).changeStore.createChangeNode(resource.toPattern).bytes, UTF_8)
simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource)
val actual = getAclChangeEventAsString(LITERAL)
assertEquals(expected, actual)
}
@Test
def testWritesLiteralAclChangeEventWhenInterBrokerProtocolIsKafkaV2(): Unit = {
givenAuthorizerWithProtocolVersion(Option(KAFKA_2_0_IV1))
val resource = Resource(Topic, "z_other", LITERAL)
val expected = new String(ZkAclStore(LITERAL).changeStore.createChangeNode(resource.toPattern).bytes, UTF_8)
simpleAclAuthorizer.addAcls(Set[Acl](denyReadAcl), resource)
val actual = getAclChangeEventAsString(LITERAL)
assertEquals(expected, actual)
}
private def givenAuthorizerWithProtocolVersion(protocolVersion: Option[ApiVersion]): Unit = {
simpleAclAuthorizer.close()
val props = TestUtils.createBrokerConfig(0, zkConnect)
props.put(SimpleAclAuthorizer.SuperUsersProp, superUsers)
protocolVersion.foreach(version => props.put(KafkaConfig.InterBrokerProtocolVersionProp, version.toString))
config = KafkaConfig.fromProps(props)
simpleAclAuthorizer.configure(config.originals)
}
private def getAclChangeEventAsString(patternType: PatternType) = {
val store = ZkAclStore(patternType)
val children = zooKeeperClient.handleRequest(GetChildrenRequest(store.changeStore.aclChangePath, registerWatch = true))
children.maybeThrow()
assertEquals(1, children.children.size, "Expecting 1 change event")
val data = zooKeeperClient.handleRequest(GetDataRequest(s"${store.changeStore.aclChangePath}/${children.children.head}"))
data.maybeThrow()
new String(data.data, UTF_8)
}
private def changeAclAndVerify(originalAcls: Set[Acl], addedAcls: Set[Acl], removedAcls: Set[Acl], resource: Resource = resource): Set[Acl] = {
var acls = originalAcls
if(addedAcls.nonEmpty) {
simpleAclAuthorizer.addAcls(addedAcls, resource)
acls ++= addedAcls
}
if(removedAcls.nonEmpty) {
simpleAclAuthorizer.removeAcls(removedAcls, resource)
acls --=removedAcls
}
TestUtils.waitAndVerifyAcls(acls, simpleAclAuthorizer, resource)
acls
}
}

View File

@ -1,106 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.security.authorizer
import java.util.UUID
import kafka.security.auth.SimpleAclAuthorizer
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import kafka.zookeeper.ZooKeeperClient
import org.apache.kafka.common.acl.AclOperation._
import org.apache.kafka.common.acl._
import org.apache.kafka.common.resource.PatternType.LITERAL
import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.resource.ResourceType._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.authorizer._
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import scala.annotation.nowarn
class AuthorizerWrapperTest extends ZooKeeperTestHarness with BaseAuthorizerTest {
@nowarn("cat=deprecation")
private val wrappedSimpleAuthorizer = new AuthorizerWrapper(new SimpleAclAuthorizer)
@nowarn("cat=deprecation")
private val wrappedSimpleAuthorizerAllowEveryone = new AuthorizerWrapper(new SimpleAclAuthorizer)
override def authorizer: Authorizer = wrappedSimpleAuthorizer
@BeforeEach
@nowarn("cat=deprecation")
override def setUp(): Unit = {
super.setUp()
val props = TestUtils.createBrokerConfig(0, zkConnect)
props.put(AclAuthorizer.SuperUsersProp, superUsers)
config = KafkaConfig.fromProps(props)
wrappedSimpleAuthorizer.configure(config.originals)
props.put(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, "true")
config = KafkaConfig.fromProps(props)
wrappedSimpleAuthorizerAllowEveryone.configure(config.originals)
resource = new ResourcePattern(TOPIC, "foo-" + UUID.randomUUID(), LITERAL)
zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests,
Time.SYSTEM, "kafka.test", "AuthorizerWrapperTest")
}
@AfterEach
override def tearDown(): Unit = {
val authorizers = Seq(wrappedSimpleAuthorizer, wrappedSimpleAuthorizerAllowEveryone)
authorizers.foreach(a => {
a.close()
})
zooKeeperClient.close()
super.tearDown()
}
@Test
def testAuthorizeByResourceTypeEnableAllowEveryOne(): Unit = {
testAuthorizeByResourceTypeEnableAllowEveryOne(wrappedSimpleAuthorizer)
}
private def testAuthorizeByResourceTypeEnableAllowEveryOne(authorizer: Authorizer): Unit = {
assertTrue(authorizeByResourceType(wrappedSimpleAuthorizerAllowEveryone, requestContext, READ, resource.resourceType()),
"If allow.everyone.if.no.acl.found = true, " +
"caller should have read access to at least one topic")
val allUser = AclEntry.WildcardPrincipalString
val allHost = AclEntry.WildcardHost
val denyAll = new AccessControlEntry(allUser, allHost, ALL, AclPermissionType.DENY)
val wildcardResource = new ResourcePattern(resource.resourceType(), AclEntry.WildcardResource, LITERAL)
addAcls(wrappedSimpleAuthorizerAllowEveryone, Set(denyAll), resource)
assertTrue(authorizeByResourceType(wrappedSimpleAuthorizerAllowEveryone, requestContext, READ, resource.resourceType()),
"Should still allow since the deny only apply on the specific resource")
addAcls(wrappedSimpleAuthorizerAllowEveryone, Set(denyAll), wildcardResource)
assertFalse(authorizeByResourceType(wrappedSimpleAuthorizerAllowEveryone, requestContext, READ, resource.resourceType()),
"When an ACL binding which can deny all users and hosts exists, " +
"even if allow.everyone.if.no.acl.found = true, caller shouldn't have read access any topic")
}
@Test
def testAuthorizeByResourceTypeDisableAllowEveryoneOverride(): Unit = {
assertFalse (authorizeByResourceType(wrappedSimpleAuthorizer, requestContext, READ, resource.resourceType()),
"If allow.everyone.if.no.acl.found = false, " +
"caller shouldn't have read access to any topic")
}
}

View File

@ -34,7 +34,6 @@ import kafka.cluster.{Broker, EndPoint, IsrChangeListener}
import kafka.controller.LeaderIsrAndControllerEpoch import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.log._ import kafka.log._
import kafka.metrics.KafkaYammerMetrics import kafka.metrics.KafkaYammerMetrics
import kafka.security.auth.{Acl, Resource, Authorizer => LegacyAuthorizer}
import kafka.server._ import kafka.server._
import kafka.server.checkpoints.OffsetCheckpointFile import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.server.metadata.{CachedConfigRepository, ConfigRepository, MetadataBroker} import kafka.server.metadata.{CachedConfigRepository, ConfigRepository, MetadataBroker}
@ -1325,15 +1324,6 @@ object TestUtils extends Logging {
s"but got:${authorizer.acls(filter).asScala.map(_.entry).mkString(newLine + "\t", newLine + "\t", newLine)}") s"but got:${authorizer.acls(filter).asScala.map(_.entry).mkString(newLine + "\t", newLine + "\t", newLine)}")
} }
@deprecated("Use org.apache.kafka.server.authorizer.Authorizer", "Since 2.5")
def waitAndVerifyAcls(expected: Set[Acl], authorizer: LegacyAuthorizer, resource: Resource): Unit = {
val newLine = scala.util.Properties.lineSeparator
waitUntilTrue(() => authorizer.getAcls(resource) == expected,
s"expected acls:${expected.mkString(newLine + "\t", newLine + "\t", newLine)}" +
s"but got:${authorizer.getAcls(resource).mkString(newLine + "\t", newLine + "\t", newLine)}")
}
/** /**
* Verifies that this ACL is the secure one. * Verifies that this ACL is the secure one.
*/ */

View File

@ -27,6 +27,8 @@
or updating the application not to use internal classes.</li> or updating the application not to use internal classes.</li>
<li>The Streams API removed all deprecated APIs that were deprecated in version 2.5.0 or earlier. <li>The Streams API removed all deprecated APIs that were deprecated in version 2.5.0 or earlier.
For a complete list of removed APIs compare the detailed Kafka Streams upgrade notes.</li> For a complete list of removed APIs compare the detailed Kafka Streams upgrade notes.</li>
<li>The deprecated Scala <code>Authorizer</code>, <code>SimpleAclAuthorizer</code> and related classes have been removed. Please use the Java <code>Authorizer</code>
and <code>AclAuthorizer</code> instead.</li>
<li>The deprecated <code>Metric#value()</code> method was removed (<a href="https://issues.apache.org/jira/browse/KAFKA-12573">KAFKA-12573</a>).</li> <li>The deprecated <code>Metric#value()</code> method was removed (<a href="https://issues.apache.org/jira/browse/KAFKA-12573">KAFKA-12573</a>).</li>
<li>Deprecated security classes were removed: <code>PrincipalBuilder</code>, <code>DefaultPrincipalBuilder</code> and <code>ResourceFilter</code>. <li>Deprecated security classes were removed: <code>PrincipalBuilder</code>, <code>DefaultPrincipalBuilder</code> and <code>ResourceFilter</code>.
Furthermore, deprecated constants and constructors were removed from <code>SslConfigs</code>, <code>SaslConfigs</code>, Furthermore, deprecated constants and constructors were removed from <code>SslConfigs</code>, <code>SaslConfigs</code>,

View File

@ -155,8 +155,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "kafka.properties") CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "kafka.properties")
# Kafka Authorizer # Kafka Authorizer
ACL_AUTHORIZER = "kafka.security.authorizer.AclAuthorizer" ACL_AUTHORIZER = "kafka.security.authorizer.AclAuthorizer"
# Old Kafka Authorizer. This is deprecated but still supported.
SIMPLE_AUTHORIZER = "kafka.security.auth.SimpleAclAuthorizer"
HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "kafka_heap_dump.bin") HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "kafka_heap_dump.bin")
INTERBROKER_LISTENER_NAME = 'INTERNAL' INTERBROKER_LISTENER_NAME = 'INTERNAL'
JAAS_CONF_PROPERTY = "java.security.auth.login.config=/mnt/security/jaas.conf" JAAS_CONF_PROPERTY = "java.security.auth.login.config=/mnt/security/jaas.conf"

View File

@ -70,8 +70,8 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
self.kafka.close_port(SecurityConfig.PLAINTEXT) self.kafka.close_port(SecurityConfig.PLAINTEXT)
self.set_authorizer_and_bounce(client_protocol, broker_protocol) self.set_authorizer_and_bounce(client_protocol, broker_protocol)
def set_authorizer_and_bounce(self, client_protocol, broker_protocol, authorizer_class_name = KafkaService.ACL_AUTHORIZER): def set_authorizer_and_bounce(self, client_protocol, broker_protocol):
self.kafka.authorizer_class_name = authorizer_class_name self.kafka.authorizer_class_name = KafkaService.ACL_AUTHORIZER
# Force use of direct ZooKeeper access due to SecurityDisabledException: No Authorizer is configured on the broker. # Force use of direct ZooKeeper access due to SecurityDisabledException: No Authorizer is configured on the broker.
self.acls.set_acls(client_protocol, self.kafka, self.topic, self.group, force_use_zk_connection=True) self.acls.set_acls(client_protocol, self.kafka, self.topic, self.group, force_use_zk_connection=True)
self.acls.set_acls(broker_protocol, self.kafka, self.topic, self.group, force_use_zk_connection=True) self.acls.set_acls(broker_protocol, self.kafka, self.topic, self.group, force_use_zk_connection=True)
@ -93,8 +93,8 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
self.kafka.interbroker_sasl_mechanism = new_sasl_mechanism self.kafka.interbroker_sasl_mechanism = new_sasl_mechanism
self.bounce() self.bounce()
# Bounce again with ACLs for new mechanism. Use old SimpleAclAuthorizer here to ensure that is also tested. # Bounce again with ACLs for new mechanism.
self.set_authorizer_and_bounce(security_protocol, security_protocol, KafkaService.SIMPLE_AUTHORIZER) self.set_authorizer_and_bounce(security_protocol, security_protocol)
def add_separate_broker_listener(self, broker_security_protocol, broker_sasl_mechanism): def add_separate_broker_listener(self, broker_security_protocol, broker_sasl_mechanism):
# Enable the new internal listener on all brokers first # Enable the new internal listener on all brokers first