mirror of https://github.com/apache/kafka.git
KAFKA-2212: Authorizer CLI implementation.
Author: Parth Brahmbhatt <brahmbhatt.parth@gmail.com> Author: Ismael Juma <ismael@juma.me.uk> Reviewers: Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com> Closes #230 from Parth-Brahmbhatt/KAFKA-2212
This commit is contained in:
parent
d03b871dd0
commit
5764e54de1
|
|
@ -0,0 +1,17 @@
|
|||
#!/bin/bash
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
exec $(dirname $0)/kafka-run-class.sh kafka.admin.AclCommand $@
|
||||
|
|
@ -0,0 +1,17 @@
|
|||
@echo off
|
||||
rem Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
rem contributor license agreements. See the NOTICE file distributed with
|
||||
rem this work for additional information regarding copyright ownership.
|
||||
rem The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
rem (the "License"); you may not use this file except in compliance with
|
||||
rem the License. You may obtain a copy of the License at
|
||||
rem
|
||||
rem http://www.apache.org/licenses/LICENSE-2.0
|
||||
rem
|
||||
rem Unless required by applicable law or agreed to in writing, software
|
||||
rem distributed under the License is distributed on an "AS IS" BASIS,
|
||||
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
rem See the License for the specific language governing permissions and
|
||||
rem limitations under the License.
|
||||
|
||||
%~dp0kafka-run-class.bat kafka.admin.AclCommand %*
|
||||
|
|
@ -0,0 +1,350 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package kafka.admin
|
||||
|
||||
import joptsimple._
|
||||
import kafka.security.auth._
|
||||
import kafka.utils._
|
||||
import org.apache.kafka.common.security.auth.KafkaPrincipal
|
||||
import org.apache.kafka.common.utils.Utils
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
object AclCommand {
|
||||
|
||||
val Delimiter = ','
|
||||
val Newline = scala.util.Properties.lineSeparator
|
||||
val ResourceTypeToValidOperations = Map[ResourceType, Set[Operation]] (
|
||||
Topic -> Set(Read, Write, Describe),
|
||||
ConsumerGroup -> Set(Read),
|
||||
Cluster -> Set(Create, ClusterAction)
|
||||
)
|
||||
|
||||
def main(args: Array[String]) {
|
||||
|
||||
val opts = new AclCommandOptions(args)
|
||||
|
||||
if (opts.options.has(opts.helpOpt))
|
||||
CommandLineUtils.printUsageAndDie(opts.parser, "Usage:")
|
||||
|
||||
opts.checkArgs()
|
||||
|
||||
var authorizerProperties = Map.empty[String, Any]
|
||||
if (opts.options.has(opts.authorizerPropertiesOpt)) {
|
||||
val props = opts.options.valuesOf(opts.authorizerPropertiesOpt).asScala.map(_.split("="))
|
||||
props.foreach(pair => authorizerProperties += (pair(0).trim -> pair(1).trim))
|
||||
}
|
||||
|
||||
val authorizerClass = opts.options.valueOf(opts.authorizerOpt)
|
||||
val authZ: Authorizer = CoreUtils.createObject(authorizerClass)
|
||||
authZ.configure(authorizerProperties.asJava)
|
||||
|
||||
try {
|
||||
if (opts.options.has(opts.addOpt))
|
||||
addAcl(authZ, opts)
|
||||
else if (opts.options.has(opts.removeOpt))
|
||||
removeAcl(authZ, opts)
|
||||
else if (opts.options.has(opts.listOpt))
|
||||
listAcl(authZ, opts)
|
||||
} catch {
|
||||
case e: Throwable =>
|
||||
println(s"Error while executing topic Acl command ${e.getMessage}")
|
||||
println(Utils.stackTrace(e))
|
||||
System.exit(-1)
|
||||
}
|
||||
}
|
||||
|
||||
private def addAcl(authZ: Authorizer, opts: AclCommandOptions) {
|
||||
val resourceToAcl = getResourceToAcls(opts)
|
||||
|
||||
if (resourceToAcl.values.exists(_.isEmpty))
|
||||
CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: --allow-principals, --deny-principals when trying to add acls.")
|
||||
|
||||
for ((resource, acls) <- resourceToAcl) {
|
||||
val acls = resourceToAcl(resource)
|
||||
println(s"Adding following acls for resource: $resource $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
|
||||
authZ.addAcls(acls, resource)
|
||||
}
|
||||
|
||||
listAcl(authZ, opts)
|
||||
}
|
||||
|
||||
private def removeAcl(authZ: Authorizer, opts: AclCommandOptions) {
|
||||
val resourceToAcl = getResourceToAcls(opts)
|
||||
|
||||
for ((resource, acls) <- resourceToAcl) {
|
||||
if (acls.isEmpty) {
|
||||
if (confirmAction(s"Are you sure you want to delete all acls for resource: $resource y/n?"))
|
||||
authZ.removeAcls(resource)
|
||||
} else {
|
||||
if (confirmAction(s"Are you sure you want to remove acls: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource $resource y/n?"))
|
||||
authZ.removeAcls(acls, resource)
|
||||
}
|
||||
}
|
||||
|
||||
listAcl(authZ, opts)
|
||||
}
|
||||
|
||||
private def listAcl(authZ: Authorizer, opts: AclCommandOptions) {
|
||||
val resources = getResource(opts, dieIfNoResourceFound = false)
|
||||
|
||||
val resourceToAcls = if(resources.isEmpty)
|
||||
authZ.getAcls()
|
||||
else
|
||||
resources.map(resource => (resource -> authZ.getAcls(resource)))
|
||||
|
||||
for ((resource, acls) <- resourceToAcls)
|
||||
println(s"Following is list of acls for resource: $resource $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
|
||||
}
|
||||
|
||||
private def getResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = {
|
||||
var resourceToAcls = Map.empty[Resource, Set[Acl]]
|
||||
|
||||
//if none of the --producer or --consumer options are specified , just construct acls from CLI options.
|
||||
if (!opts.options.has(opts.producerOpt) && !opts.options.has(opts.consumerOpt)) {
|
||||
resourceToAcls ++= getCliResourceToAcls(opts)
|
||||
}
|
||||
|
||||
//users are allowed to specify both --producer and --consumer options in a single command.
|
||||
if (opts.options.has(opts.producerOpt))
|
||||
resourceToAcls ++= getProducerResourceToAcls(opts)
|
||||
|
||||
if (opts.options.has(opts.consumerOpt))
|
||||
resourceToAcls ++= getConsumerResourceToAcls(opts).map { case (k, v) => k -> (v ++ resourceToAcls.getOrElse(k, Set.empty[Acl])) }
|
||||
|
||||
validateOperation(opts, resourceToAcls)
|
||||
|
||||
resourceToAcls
|
||||
}
|
||||
|
||||
private def getProducerResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = {
|
||||
val topics: Set[Resource] = getResource(opts).filter(_.resourceType == Topic)
|
||||
|
||||
val acls = getAcl(opts, Set(Write, Describe))
|
||||
|
||||
//Write, Describe permission on topics, Create permission on cluster
|
||||
topics.map(_ -> acls).toMap[Resource, Set[Acl]] +
|
||||
(Resource.ClusterResource -> getAcl(opts, Set(Create)))
|
||||
}
|
||||
|
||||
private def getConsumerResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = {
|
||||
val resources = getResource(opts)
|
||||
|
||||
val topics: Set[Resource] = getResource(opts).filter(_.resourceType == Topic)
|
||||
val consumerGroups: Set[Resource] = resources.filter(_.resourceType == ConsumerGroup)
|
||||
|
||||
//Read,Describe on topic, Read on consumerGroup + Create on cluster
|
||||
|
||||
val acls = getAcl(opts, Set(Read, Describe))
|
||||
|
||||
topics.map(_ -> acls).toMap[Resource, Set[Acl]] ++
|
||||
consumerGroups.map(_ -> getAcl(opts, Set(Read))).toMap[Resource, Set[Acl]]
|
||||
}
|
||||
|
||||
private def getCliResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = {
|
||||
val acls = getAcl(opts)
|
||||
val resources = getResource(opts)
|
||||
resources.map(_ -> acls).toMap
|
||||
}
|
||||
|
||||
private def getAcl(opts: AclCommandOptions, operations: Set[Operation]): Set[Acl] = {
|
||||
val allowedPrincipals = getPrincipals(opts, opts.allowPrincipalsOpt)
|
||||
|
||||
val deniedPrincipals = getPrincipals(opts, opts.denyPrincipalsOpt)
|
||||
|
||||
val allowedHosts = getHosts(opts, opts.allowHostsOpt, opts.allowPrincipalsOpt)
|
||||
|
||||
val deniedHosts = getHosts(opts, opts.denyHostssOpt, opts.denyPrincipalsOpt)
|
||||
|
||||
val acls = new collection.mutable.HashSet[Acl]
|
||||
if (allowedHosts.nonEmpty && allowedPrincipals.nonEmpty)
|
||||
acls ++= getAcls(allowedPrincipals, Allow, operations, allowedHosts)
|
||||
|
||||
if (deniedHosts.nonEmpty && deniedPrincipals.nonEmpty)
|
||||
acls ++= getAcls(deniedPrincipals, Deny, operations, deniedHosts)
|
||||
|
||||
acls.toSet
|
||||
}
|
||||
|
||||
private def getAcl(opts: AclCommandOptions): Set[Acl] = {
|
||||
val operations = opts.options.valuesOf(opts.operationsOpt).asScala.map(operation => Operation.fromString(operation.trim)).toSet
|
||||
getAcl(opts, operations)
|
||||
}
|
||||
|
||||
def getAcls(principals: Set[KafkaPrincipal], permissionType: PermissionType, operations: Set[Operation],
|
||||
hosts: Set[String]): Set[Acl] = {
|
||||
for {
|
||||
principal <- principals
|
||||
operation <- operations
|
||||
host <- hosts
|
||||
} yield new Acl(principal, permissionType, host, operation)
|
||||
}
|
||||
|
||||
private def getHosts(opts: AclCommandOptions, hostOptionSpec: ArgumentAcceptingOptionSpec[String],
|
||||
principalOptionSpec: ArgumentAcceptingOptionSpec[String]): Set[String] = {
|
||||
if (opts.options.has(hostOptionSpec))
|
||||
opts.options.valuesOf(hostOptionSpec).asScala.map(_.trim).toSet
|
||||
else if (opts.options.has(principalOptionSpec))
|
||||
Set[String](Acl.WildCardHost)
|
||||
else
|
||||
Set.empty[String]
|
||||
}
|
||||
|
||||
private def getPrincipals(opts: AclCommandOptions, principalOptionSpec: ArgumentAcceptingOptionSpec[String]): Set[KafkaPrincipal] = {
|
||||
if (opts.options.has(principalOptionSpec))
|
||||
opts.options.valuesOf(principalOptionSpec).asScala.map(s => KafkaPrincipal.fromString(s.trim)).toSet
|
||||
else
|
||||
Set.empty[KafkaPrincipal]
|
||||
}
|
||||
|
||||
private def getResource(opts: AclCommandOptions, dieIfNoResourceFound: Boolean = true): Set[Resource] = {
|
||||
var resources = Set.empty[Resource]
|
||||
if (opts.options.has(opts.topicOpt))
|
||||
opts.options.valuesOf(opts.topicOpt).asScala.foreach(topic => resources += new Resource(Topic, topic.trim))
|
||||
|
||||
if (opts.options.has(opts.clusterOpt))
|
||||
resources += Resource.ClusterResource
|
||||
|
||||
if (opts.options.has(opts.groupOpt))
|
||||
opts.options.valuesOf(opts.groupOpt).asScala.foreach(consumerGroup => resources += new Resource(ConsumerGroup, consumerGroup.trim))
|
||||
|
||||
if (resources.isEmpty && dieIfNoResourceFound)
|
||||
CommandLineUtils.printUsageAndDie(opts.parser, "You must provide at least one resource: --topic <topic> or --cluster or --consumer-group <group>")
|
||||
|
||||
resources
|
||||
}
|
||||
|
||||
private def confirmAction(msg: String): Boolean = {
|
||||
println(msg)
|
||||
Console.readLine().equalsIgnoreCase("y")
|
||||
}
|
||||
|
||||
private def validateOperation(opts: AclCommandOptions, resourceToAcls: Map[Resource, Set[Acl]]) = {
|
||||
for ((resource, acls) <- resourceToAcls) {
|
||||
val validOps = ResourceTypeToValidOperations(resource.resourceType)
|
||||
if ((acls.map(_.operation) -- validOps).nonEmpty)
|
||||
CommandLineUtils.printUsageAndDie(opts.parser, s"ResourceType ${resource.resourceType} only supports operations ${validOps.mkString(Delimiter.toString)}")
|
||||
}
|
||||
}
|
||||
|
||||
class AclCommandOptions(args: Array[String]) {
|
||||
val parser = new OptionParser
|
||||
val authorizerOpt = parser.accepts("authorizer", "Fully qualified class name of the authorizer, defaults to kafka.security.auth.SimpleAclAuthorizer.")
|
||||
.withRequiredArg
|
||||
.describedAs("authorizer")
|
||||
.ofType(classOf[String])
|
||||
.defaultsTo(classOf[SimpleAclAuthorizer].getName)
|
||||
|
||||
val authorizerPropertiesOpt = parser.accepts("authorizer-properties", "REQUIRED: properties required to configure an instance of Authorizer. " +
|
||||
"These are comma separated key=val pairs. For the default authorizer the example values are: " +
|
||||
"zookeeper.connect=localhost:2181")
|
||||
.withRequiredArg
|
||||
.describedAs("authorizer-properties")
|
||||
.ofType(classOf[String])
|
||||
.withValuesSeparatedBy(Delimiter)
|
||||
|
||||
val topicOpt = parser.accepts("topic", "Comma separated list of topic to which acls should be added or removed. " +
|
||||
"A value of * indicates acl should apply to all topics.")
|
||||
.withRequiredArg
|
||||
.describedAs("topic")
|
||||
.ofType(classOf[String])
|
||||
.withValuesSeparatedBy(Delimiter)
|
||||
|
||||
val clusterOpt = parser.accepts("cluster", "Add/Remove cluster acls.")
|
||||
val groupOpt = parser.accepts("consumer-group", "Comma separated list of consumer groups to which the acls should be added or removed. " +
|
||||
"A value of * indicates the acls should apply to all consumer-groups.")
|
||||
.withRequiredArg
|
||||
.describedAs("consumer-group")
|
||||
.ofType(classOf[String])
|
||||
.withValuesSeparatedBy(Delimiter)
|
||||
|
||||
val addOpt = parser.accepts("add", "Indicates you are trying to add acls.")
|
||||
val removeOpt = parser.accepts("remove", "Indicates you are trying to remove acls.")
|
||||
val listOpt = parser.accepts("list", "List acls for the specified resource, use --topic <topic> or --consumer-group <group> or --cluster to specify a resource.")
|
||||
|
||||
val operationsOpt = parser.accepts("operations", "Comma separated list of operations, default is All. Valid operation names are: " + Newline +
|
||||
Operation.values.map("\t" + _).mkString(Newline) + Newline)
|
||||
.withRequiredArg
|
||||
.ofType(classOf[String])
|
||||
.defaultsTo(All.name)
|
||||
.withValuesSeparatedBy(Delimiter)
|
||||
|
||||
val allowPrincipalsOpt = parser.accepts("allow-principals", "Comma separated list of principals where principal is in principalType:name format." +
|
||||
" User:* is the wild card indicating all users.")
|
||||
.withRequiredArg
|
||||
.describedAs("allow-principals")
|
||||
.ofType(classOf[String])
|
||||
.withValuesSeparatedBy(Delimiter)
|
||||
|
||||
val denyPrincipalsOpt = parser.accepts("deny-principals", "Comma separated list of principals where principal is in " +
|
||||
"principalType: name format. By default anyone not in --allow-principals list is denied access. " +
|
||||
"You only need to use this option as negation to already allowed set. " +
|
||||
"For example if you wanted to allow access to all users in the system but not test-user you can define an acl that " +
|
||||
"allows access to User:* and specify --deny-principals=User:test@EXAMPLE.COM. " +
|
||||
"AND PLEASE REMEMBER DENY RULES TAKES PRECEDENCE OVER ALLOW RULES.")
|
||||
.withRequiredArg
|
||||
.describedAs("deny-principals")
|
||||
.ofType(classOf[String])
|
||||
.withValuesSeparatedBy(Delimiter)
|
||||
|
||||
val allowHostsOpt = parser.accepts("allow-hosts", "Comma separated list of hosts from which principals listed in --allow-principals will have access. " +
|
||||
"If you have specified --allow-principals then the default for this option will be set to * which allows access from all hosts.")
|
||||
.withRequiredArg
|
||||
.describedAs("allow-hosts")
|
||||
.ofType(classOf[String])
|
||||
.withValuesSeparatedBy(Delimiter)
|
||||
|
||||
val denyHostssOpt = parser.accepts("deny-hosts", "Comma separated list of hosts from which principals listed in --deny-principals will be denied access. " +
|
||||
"If you have specified --deny-principals then the default for this option will be set to * which denies access from all hosts.")
|
||||
.withRequiredArg
|
||||
.describedAs("deny-hosts")
|
||||
.ofType(classOf[String])
|
||||
.withValuesSeparatedBy(Delimiter)
|
||||
|
||||
val producerOpt = parser.accepts("producer", "Convenience option to add/remove acls for producer role. " +
|
||||
"This will generate acls that allows WRITE,DESCRIBE on topic and CREATE on cluster. ")
|
||||
|
||||
val consumerOpt = parser.accepts("consumer", "Convenience option to add/remove acls for consumer role. " +
|
||||
"This will generate acls that allows READ,DESCRIBE on topic and READ on consumer-group.")
|
||||
|
||||
val helpOpt = parser.accepts("help", "Print usage information.")
|
||||
|
||||
val options = parser.parse(args: _*)
|
||||
|
||||
def checkArgs() {
|
||||
CommandLineUtils.checkRequiredArgs(parser, options, authorizerPropertiesOpt)
|
||||
|
||||
val actions = Seq(addOpt, removeOpt, listOpt).count(options.has)
|
||||
if (actions != 1)
|
||||
CommandLineUtils.printUsageAndDie(parser, "Command must include exactly one action: --list, --add, --remove. ")
|
||||
|
||||
CommandLineUtils.checkInvalidArgs(parser, options, listOpt, Set(producerOpt, consumerOpt, allowHostsOpt, allowPrincipalsOpt, denyHostssOpt, denyPrincipalsOpt))
|
||||
|
||||
//when --producer or --consumer is specified , user should not specify operations as they are inferred and we also disallow --deny-principals and --deny-hosts.
|
||||
CommandLineUtils.checkInvalidArgs(parser, options, producerOpt, Set(operationsOpt, denyPrincipalsOpt, denyHostssOpt))
|
||||
CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt, Set(operationsOpt, denyPrincipalsOpt, denyHostssOpt))
|
||||
|
||||
if (options.has(producerOpt) && !options.has(topicOpt))
|
||||
CommandLineUtils.printUsageAndDie(parser, "With --producer you must specify a --topic")
|
||||
|
||||
if (options.has(consumerOpt) && (!options.has(topicOpt) || !options.has(groupOpt) || (!options.has(producerOpt) && options.has(clusterOpt))))
|
||||
CommandLineUtils.printUsageAndDie(parser, "With --consumer you must specify a --topic and a --consumer-group and no --cluster option should be specified.")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,26 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package kafka.common
|
||||
|
||||
/*
|
||||
* We inherit from `Product` and `Serializable` because `case` objects and classes inherit from them and if we don't
|
||||
* do it here, the compiler will infer types that unexpectedly include `Product` and `Serializable`, see
|
||||
* http://underscore.io/blog/posts/2015/06/04/more-on-sealed.html for more information.
|
||||
*/
|
||||
trait BaseEnum extends Product with Serializable {
|
||||
def name: String
|
||||
}
|
||||
|
|
@ -77,5 +77,10 @@ trait Authorizer extends Configurable {
|
|||
* @return empty Map if no acls exist for this principal, otherwise a map of resource -> acls for the principal.
|
||||
*/
|
||||
def getAcls(principal: KafkaPrincipal): Map[Resource, Set[Acl]]
|
||||
|
||||
/**
|
||||
* gets the map of resource to acls for all resources.
|
||||
*/
|
||||
def getAcls(): Map[Resource, Set[Acl]]
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -16,13 +16,13 @@
|
|||
*/
|
||||
package kafka.security.auth
|
||||
|
||||
import kafka.common.KafkaException
|
||||
import kafka.common.{BaseEnum, KafkaException}
|
||||
|
||||
/**
|
||||
* Different operations a client may perform on kafka resources.
|
||||
*/
|
||||
|
||||
sealed trait Operation { def name: String}
|
||||
sealed trait Operation extends BaseEnum
|
||||
case object Read extends Operation { val name = "Read" }
|
||||
case object Write extends Operation { val name = "Write" }
|
||||
case object Create extends Operation { val name = "Create" }
|
||||
|
|
|
|||
|
|
@ -16,16 +16,14 @@
|
|||
*/
|
||||
package kafka.security.auth
|
||||
|
||||
import kafka.common.KafkaException
|
||||
import kafka.common.{BaseEnum, KafkaException}
|
||||
|
||||
/**
|
||||
* PermissionType.
|
||||
*/
|
||||
|
||||
|
||||
sealed trait PermissionType {
|
||||
def name: String
|
||||
}
|
||||
sealed trait PermissionType extends BaseEnum
|
||||
|
||||
case object Allow extends PermissionType {
|
||||
val name = "Allow"
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ object Resource {
|
|||
val Separator = ":"
|
||||
val ClusterResourceName = "kafka-cluster"
|
||||
val ClusterResource = new Resource(Cluster, Resource.ClusterResourceName)
|
||||
val WildCardResource = "*"
|
||||
|
||||
def fromString(str: String): Resource = {
|
||||
str.split(Separator, 2) match {
|
||||
|
|
|
|||
|
|
@ -16,16 +16,14 @@
|
|||
*/
|
||||
package kafka.security.auth
|
||||
|
||||
import kafka.common.KafkaException
|
||||
import kafka.common.{BaseEnum, KafkaException}
|
||||
|
||||
/**
|
||||
* ResourceTypes.
|
||||
*/
|
||||
|
||||
|
||||
sealed trait ResourceType {
|
||||
def name: String
|
||||
}
|
||||
sealed trait ResourceType extends BaseEnum
|
||||
|
||||
case object Cluster extends ResourceType {
|
||||
val name = "Cluster"
|
||||
|
|
@ -48,4 +46,4 @@ object ResourceType {
|
|||
}
|
||||
|
||||
def values: Seq[ResourceType] = List(Cluster, Topic, ConsumerGroup)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -107,7 +107,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
|
|||
override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = {
|
||||
val principal: KafkaPrincipal = session.principal
|
||||
val host = session.host
|
||||
val acls = getAcls(resource)
|
||||
val acls = getAcls(resource) ++ getAcls(new Resource(resource.resourceType, Resource.WildCardResource))
|
||||
|
||||
//check if there is any Deny acl match that would disallow this operation.
|
||||
val denyMatch = aclMatch(session, operation, resource, principal, host, Deny, acls)
|
||||
|
|
@ -221,6 +221,10 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
|
|||
}.toMap
|
||||
}
|
||||
|
||||
override def getAcls(): Map[Resource, Set[Acl]] = {
|
||||
aclCache.toMap
|
||||
}
|
||||
|
||||
private def loadCache() {
|
||||
var acls = Set.empty[Acl]
|
||||
val resourceTypes = ZkUtils.getChildren(zkClient, SimpleAclAuthorizer.AclZkPath)
|
||||
|
|
|
|||
|
|
@ -50,10 +50,10 @@ object CommandLineUtils extends Logging {
|
|||
/**
|
||||
* Print usage and exit
|
||||
*/
|
||||
def printUsageAndDie(parser: OptionParser, message: String) {
|
||||
def printUsageAndDie(parser: OptionParser, message: String): Nothing = {
|
||||
System.err.println(message)
|
||||
parser.printHelpOn(System.err)
|
||||
System.exit(1)
|
||||
sys.exit(1)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -73,4 +73,4 @@ object CommandLineUtils extends Logging {
|
|||
}
|
||||
props
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,134 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package unit.kafka.admin
|
||||
|
||||
import java.io.StringReader
|
||||
import java.util.Properties
|
||||
|
||||
import kafka.admin.AclCommand
|
||||
import kafka.security.auth._
|
||||
import kafka.server.KafkaConfig
|
||||
import kafka.utils.{Logging, TestUtils}
|
||||
import kafka.zk.ZooKeeperTestHarness
|
||||
import org.apache.kafka.common.security.auth.KafkaPrincipal
|
||||
import org.junit.{Assert, Test}
|
||||
|
||||
class AclCommandTest extends ZooKeeperTestHarness with Logging {
|
||||
|
||||
private val Users = Set(KafkaPrincipal.fromString("User:test1"), KafkaPrincipal.fromString("User:test2"))
|
||||
private val UsersString = Users.mkString(AclCommand.Delimiter.toString)
|
||||
private val Hosts = Set("host1", "host2")
|
||||
private val HostsString = Hosts.mkString(AclCommand.Delimiter.toString)
|
||||
|
||||
private val TopicResources = Set(new Resource(Topic, "test-1"), new Resource(Topic, "test-2"))
|
||||
private val ConsumerGroupResources = Set(new Resource(ConsumerGroup, "testGroup-1"), new Resource(ConsumerGroup, "testGroup-2"))
|
||||
|
||||
private val ResourceToCommand = Map[Set[Resource], Array[String]](
|
||||
TopicResources -> Array("--topic", "test-1,test-2"),
|
||||
Set(Resource.ClusterResource) -> Array("--cluster"),
|
||||
ConsumerGroupResources -> Array("--consumer-group", "testGroup-1,testGroup-2")
|
||||
)
|
||||
|
||||
private val ResourceToOperations = Map[Set[Resource], (Set[Operation], Array[String])](
|
||||
TopicResources -> (Set(Read, Write, Describe), Array("--operations", "Read,Write,Describe")),
|
||||
Set(Resource.ClusterResource) -> (Set(Create, ClusterAction), Array("--operations", "Create,ClusterAction")),
|
||||
ConsumerGroupResources -> (Set(Read).toSet[Operation], Array("--operations", "Read"))
|
||||
)
|
||||
|
||||
private val ProducerResourceToAcls = Map[Set[Resource], Set[Acl]](
|
||||
TopicResources -> AclCommand.getAcls(Users, Allow, Set(Write, Describe), Hosts),
|
||||
Set(Resource.ClusterResource) -> AclCommand.getAcls(Users, Allow, Set(Create), Hosts)
|
||||
)
|
||||
|
||||
private val ConsumerResourceToAcls = Map[Set[Resource], Set[Acl]](
|
||||
TopicResources -> AclCommand.getAcls(Users, Allow, Set(Read, Describe), Hosts),
|
||||
ConsumerGroupResources -> AclCommand.getAcls(Users, Allow, Set(Read), Hosts)
|
||||
)
|
||||
|
||||
private val CmdToResourcesToAcl = Map[Array[String], Map[Set[Resource], Set[Acl]]](
|
||||
Array[String]("--producer") -> ProducerResourceToAcls,
|
||||
Array[String]("--consumer") -> ConsumerResourceToAcls,
|
||||
Array[String]("--producer", "--consumer") -> ConsumerResourceToAcls.map { case (k, v) => k -> (v ++
|
||||
ProducerResourceToAcls.getOrElse(k, Set.empty[Acl])) }
|
||||
)
|
||||
|
||||
@Test
|
||||
def testAclCli() {
|
||||
val brokerProps = TestUtils.createBrokerConfig(0, zkConnect)
|
||||
brokerProps.put(KafkaConfig.AuthorizerClassNameProp, "kafka.security.auth.SimpleAclAuthorizer")
|
||||
val args = Array("--authorizer-properties", "zookeeper.connect=" + zkConnect)
|
||||
|
||||
for ((resources, resourceCmd) <- ResourceToCommand) {
|
||||
for (permissionType <- PermissionType.values) {
|
||||
val operationToCmd = ResourceToOperations(resources)
|
||||
val (acls, cmd) = getAclToCommand(permissionType, operationToCmd._1)
|
||||
AclCommand.main(args ++ cmd ++ resourceCmd ++ operationToCmd._2 :+ "--add")
|
||||
for (resource <- resources) {
|
||||
Assert.assertEquals(acls, getAuthorizer(brokerProps).getAcls(resource))
|
||||
}
|
||||
|
||||
testRemove(resources, resourceCmd, args, brokerProps)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testProducerConsumerCli() {
|
||||
val brokerProps = TestUtils.createBrokerConfig(0, zkConnect)
|
||||
brokerProps.put(KafkaConfig.AuthorizerClassNameProp, "kafka.security.auth.SimpleAclAuthorizer")
|
||||
val args = Array("--authorizer-properties", "zookeeper.connect=" + zkConnect)
|
||||
|
||||
for ((cmd, resourcesToAcls) <- CmdToResourcesToAcl) {
|
||||
val resourceCommand: Array[String] = resourcesToAcls.keys.map(ResourceToCommand).foldLeft(Array[String]())(_ ++ _)
|
||||
AclCommand.main(args ++ getCmd(Allow) ++ resourceCommand ++ cmd :+ "--add")
|
||||
for ((resources, acls) <- resourcesToAcls) {
|
||||
for (resource <- resources) {
|
||||
Assert.assertEquals(acls, getAuthorizer(brokerProps).getAcls(resource))
|
||||
}
|
||||
}
|
||||
testRemove(resourcesToAcls.keys.flatten.toSet, resourceCommand, args, brokerProps)
|
||||
}
|
||||
}
|
||||
|
||||
private def testRemove(resources: Set[Resource], resourceCmd: Array[String], args: Array[String], brokerProps: Properties) {
|
||||
for (resource <- resources) {
|
||||
Console.withIn(new StringReader(s"y${AclCommand.Newline}" * resources.size)) {
|
||||
AclCommand.main(args ++ resourceCmd :+ "--remove")
|
||||
Assert.assertEquals(Set.empty[Acl], getAuthorizer(brokerProps).getAcls(resource))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def getAclToCommand(permissionType: PermissionType, operations: Set[Operation]): (Set[Acl], Array[String]) = {
|
||||
(AclCommand.getAcls(Users, permissionType, operations, Hosts), getCmd(permissionType))
|
||||
}
|
||||
|
||||
private def getCmd(permissionType: PermissionType): Array[String] = {
|
||||
if (permissionType == Allow)
|
||||
Array("--allow-principals", UsersString, "--allow-hosts", HostsString)
|
||||
else
|
||||
Array("--deny-principals", UsersString, "--deny-hosts", HostsString)
|
||||
}
|
||||
|
||||
def getAuthorizer(props: Properties): Authorizer = {
|
||||
val kafkaConfig = KafkaConfig.fromProps(props)
|
||||
val authZ = new SimpleAclAuthorizer
|
||||
authZ.configure(kafkaConfig.originals)
|
||||
|
||||
authZ
|
||||
}
|
||||
}
|
||||
|
|
@ -140,6 +140,31 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
|
|||
assertTrue("superuser always has access, no matter what acls.", simpleAclAuthorizer.authorize(session2, Read, resource))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testWildCardAcls(): Unit = {
|
||||
assertFalse("when acls = [], authorizer should fail close.", simpleAclAuthorizer.authorize(session, Read, resource))
|
||||
|
||||
val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
|
||||
val host1 = "host1"
|
||||
val readAcl = new Acl(user1, Allow, host1, Read)
|
||||
val wildCardResource = new Resource(resource.resourceType, Resource.WildCardResource)
|
||||
|
||||
val acls = changeAclAndVerify(Set.empty[Acl], Set[Acl](readAcl), Set.empty[Acl], wildCardResource)
|
||||
|
||||
val host1Session = new Session(user1, host1)
|
||||
assertTrue("User1 should have Read access from host1", simpleAclAuthorizer.authorize(host1Session, Read, resource))
|
||||
|
||||
//allow Write to specific topic.
|
||||
val writeAcl = new Acl(user1, Allow, host1, Write)
|
||||
changeAclAndVerify(Set.empty[Acl], Set[Acl](writeAcl), Set.empty[Acl])
|
||||
|
||||
//deny Write to wild card topic.
|
||||
val denyWriteOnWildCardResourceAcl = new Acl(user1, Deny, host1, Write)
|
||||
changeAclAndVerify(acls, Set[Acl](denyWriteOnWildCardResourceAcl), Set.empty[Acl], wildCardResource)
|
||||
|
||||
assertFalse("User1 should not have Write access from host1", simpleAclAuthorizer.authorize(host1Session, Write, resource))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testNoAclFound() {
|
||||
assertFalse("when acls = [], authorizer should fail close.", simpleAclAuthorizer.authorize(session, Read, resource))
|
||||
|
|
@ -175,11 +200,21 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
|
|||
acls = changeAclAndVerify(acls, Set[Acl](acl5), Set.empty[Acl])
|
||||
|
||||
//test get by principal name.
|
||||
TestUtils.waitUntilTrue(() => Map(resource -> Set(acl1, acl2)) == simpleAclAuthorizer.getAcls(user1), "changes not propogated in timeout period")
|
||||
TestUtils.waitUntilTrue(() => Map(resource -> Set(acl3, acl4, acl5)) == simpleAclAuthorizer.getAcls(user2), "changes not propogated in timeout period")
|
||||
TestUtils.waitUntilTrue(() => Map(resource -> Set(acl1, acl2)) == simpleAclAuthorizer.getAcls(user1), "changes not propagated in timeout period")
|
||||
TestUtils.waitUntilTrue(() => Map(resource -> Set(acl3, acl4, acl5)) == simpleAclAuthorizer.getAcls(user2), "changes not propagated in timeout period")
|
||||
|
||||
val resourceToAcls = Map[Resource, Set[Acl]](
|
||||
new Resource(Topic, Resource.WildCardResource) -> Set[Acl](new Acl(user2, Allow, WildCardHost, Read)),
|
||||
new Resource(Cluster, Resource.WildCardResource) -> Set[Acl](new Acl(user2, Allow, host1, Read)),
|
||||
new Resource(ConsumerGroup, Resource.WildCardResource) -> acls,
|
||||
new Resource(ConsumerGroup, "test-ConsumerGroup") -> acls
|
||||
)
|
||||
|
||||
resourceToAcls foreach { case (key, value) => changeAclAndVerify(Set.empty[Acl], value, Set.empty[Acl], key) }
|
||||
assertEquals(resourceToAcls + (resource -> acls), simpleAclAuthorizer.getAcls())
|
||||
|
||||
//test remove acl from existing acls.
|
||||
changeAclAndVerify(acls, Set.empty[Acl], Set(acl1, acl5))
|
||||
acls = changeAclAndVerify(acls, Set.empty[Acl], Set(acl1, acl5))
|
||||
|
||||
//test remove all acls for resource
|
||||
simpleAclAuthorizer.removeAcls(resource)
|
||||
|
|
@ -213,7 +248,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
|
|||
assertEquals(acls1, authorizer.getAcls(resource1))
|
||||
}
|
||||
|
||||
private def changeAclAndVerify(originalAcls: Set[Acl], addedAcls: Set[Acl], removedAcls: Set[Acl]): Set[Acl] = {
|
||||
private def changeAclAndVerify(originalAcls: Set[Acl], addedAcls: Set[Acl], removedAcls: Set[Acl], resource: Resource = resource): Set[Acl] = {
|
||||
var acls = originalAcls
|
||||
|
||||
if(addedAcls.nonEmpty) {
|
||||
|
|
|
|||
Loading…
Reference in New Issue