mirror of https://github.com/apache/kafka.git
KAFKA-7117: Support AdminClient API in AclCommand (KIP-332) (#5463)
Reviewers: Colin Patrick McCabe <colin@cmccabe.xyz>, Jun Rao <junrao@gmail.com>
This commit is contained in:
parent
297fb396a0
commit
f348f10ef8
|
@ -17,17 +17,22 @@
|
||||||
|
|
||||||
package kafka.admin
|
package kafka.admin
|
||||||
|
|
||||||
|
import java.util.Properties
|
||||||
|
|
||||||
import joptsimple._
|
import joptsimple._
|
||||||
import joptsimple.util.EnumConverter
|
import joptsimple.util.EnumConverter
|
||||||
import kafka.security.auth._
|
import kafka.security.auth._
|
||||||
import kafka.server.KafkaConfig
|
import kafka.server.KafkaConfig
|
||||||
import kafka.utils._
|
import kafka.utils._
|
||||||
|
import org.apache.kafka.clients.admin.{AdminClientConfig, AdminClient => JAdminClient}
|
||||||
|
import org.apache.kafka.common.acl._
|
||||||
import org.apache.kafka.common.security.JaasUtils
|
import org.apache.kafka.common.security.JaasUtils
|
||||||
import org.apache.kafka.common.security.auth.KafkaPrincipal
|
import org.apache.kafka.common.security.auth.KafkaPrincipal
|
||||||
import org.apache.kafka.common.utils.Utils
|
import org.apache.kafka.common.utils.{SecurityUtils, Utils}
|
||||||
import org.apache.kafka.common.resource.{PatternType, ResourcePatternFilter, Resource => JResource, ResourceType => JResourceType}
|
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, Resource => JResource, ResourceType => JResourceType}
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
import scala.collection.mutable
|
||||||
|
|
||||||
object AclCommand extends Logging {
|
object AclCommand extends Logging {
|
||||||
|
|
||||||
|
@ -52,13 +57,21 @@ object AclCommand extends Logging {
|
||||||
|
|
||||||
opts.checkArgs()
|
opts.checkArgs()
|
||||||
|
|
||||||
|
val aclCommandService = {
|
||||||
|
if (opts.options.has(opts.bootstrapServerOpt)) {
|
||||||
|
new AdminClientService(opts)
|
||||||
|
} else {
|
||||||
|
new AuthorizerService(opts)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (opts.options.has(opts.addOpt))
|
if (opts.options.has(opts.addOpt))
|
||||||
addAcl(opts)
|
aclCommandService.addAcls()
|
||||||
else if (opts.options.has(opts.removeOpt))
|
else if (opts.options.has(opts.removeOpt))
|
||||||
removeAcl(opts)
|
aclCommandService.removeAcls()
|
||||||
else if (opts.options.has(opts.listOpt))
|
else if (opts.options.has(opts.listOpt))
|
||||||
listAcl(opts)
|
aclCommandService.listAcls()
|
||||||
} catch {
|
} catch {
|
||||||
case e: Throwable =>
|
case e: Throwable =>
|
||||||
println(s"Error while executing ACL command: ${e.getMessage}")
|
println(s"Error while executing ACL command: ${e.getMessage}")
|
||||||
|
@ -67,92 +80,203 @@ object AclCommand extends Logging {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def withAuthorizer(opts: AclCommandOptions)(f: Authorizer => Unit) {
|
sealed trait AclCommandService {
|
||||||
val defaultProps = Map(KafkaConfig.ZkEnableSecureAclsProp -> JaasUtils.isZkSecurityEnabled)
|
def addAcls(): Unit
|
||||||
val authorizerProperties =
|
def removeAcls(): Unit
|
||||||
if (opts.options.has(opts.authorizerPropertiesOpt)) {
|
def listAcls(): Unit
|
||||||
val authorizerProperties = opts.options.valuesOf(opts.authorizerPropertiesOpt).asScala
|
|
||||||
defaultProps ++ CommandLineUtils.parseKeyValueArgs(authorizerProperties, acceptMissingValue = false).asScala
|
|
||||||
} else {
|
|
||||||
defaultProps
|
|
||||||
}
|
|
||||||
|
|
||||||
val authorizerClass = opts.options.valueOf(opts.authorizerOpt)
|
|
||||||
val authZ = CoreUtils.createObject[Authorizer](authorizerClass)
|
|
||||||
try {
|
|
||||||
authZ.configure(authorizerProperties.asJava)
|
|
||||||
f(authZ)
|
|
||||||
}
|
|
||||||
finally CoreUtils.swallow(authZ.close(), this)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private def addAcl(opts: AclCommandOptions) {
|
class AdminClientService(val opts: AclCommandOptions) extends AclCommandService with Logging {
|
||||||
|
|
||||||
|
private def withAdminClient(opts: AclCommandOptions)(f: JAdminClient => Unit) {
|
||||||
|
val props = if (opts.options.has(opts.commandConfigOpt))
|
||||||
|
Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
|
||||||
|
else
|
||||||
|
new Properties()
|
||||||
|
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
|
||||||
|
val adminClient = JAdminClient.create(props)
|
||||||
|
|
||||||
|
try {
|
||||||
|
f(adminClient)
|
||||||
|
} finally {
|
||||||
|
adminClient.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def addAcls(): Unit = {
|
||||||
|
val resourceToAcl = getResourceToAcls(opts)
|
||||||
|
withAdminClient(opts) { adminClient =>
|
||||||
|
for ((resource, acls) <- resourceToAcl) {
|
||||||
|
val resourcePattern = resource.toPattern
|
||||||
|
println(s"Adding ACLs for resource `$resourcePattern`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
|
||||||
|
val aclBindings = acls.map(acl => new AclBinding(resourcePattern, getAccessControlEntry(acl))).asJavaCollection
|
||||||
|
adminClient.createAcls(aclBindings).all().get()
|
||||||
|
}
|
||||||
|
|
||||||
|
listAcls()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def removeAcls(): Unit = {
|
||||||
|
withAdminClient(opts) { adminClient =>
|
||||||
|
val filterToAcl = getResourceFilterToAcls(opts)
|
||||||
|
|
||||||
|
for ((filter, acls) <- filterToAcl) {
|
||||||
|
if (acls.isEmpty) {
|
||||||
|
if (confirmAction(opts, s"Are you sure you want to delete all ACLs for resource filter `$filter`? (y/n)"))
|
||||||
|
removeAcls(adminClient, acls, filter)
|
||||||
|
} else {
|
||||||
|
if (confirmAction(opts, s"Are you sure you want to remove ACLs: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource filter `$filter`? (y/n)"))
|
||||||
|
removeAcls(adminClient, acls, filter)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
listAcls()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def listAcls(): Unit = {
|
||||||
|
withAdminClient(opts) { adminClient =>
|
||||||
|
val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
|
||||||
|
val resourceToAcls = getAcls(adminClient, filters)
|
||||||
|
|
||||||
|
for ((resource, acls) <- resourceToAcls)
|
||||||
|
println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private def getAccessControlEntry(acl: Acl): AccessControlEntry = {
|
||||||
|
new AccessControlEntry(acl.principal.toString, acl.host, acl.operation.toJava, acl.permissionType.toJava)
|
||||||
|
}
|
||||||
|
|
||||||
|
private def removeAcls(adminClient: JAdminClient, acls: Set[Acl], filter: ResourcePatternFilter): Unit = {
|
||||||
|
if (acls.isEmpty)
|
||||||
|
adminClient.deleteAcls(List(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).asJava).all().get()
|
||||||
|
else {
|
||||||
|
val aclBindingFilters = acls.map(acl => new AclBindingFilter(filter, getAccessControlEntryFilter(acl))).toList.asJava
|
||||||
|
adminClient.deleteAcls(aclBindingFilters).all().get()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private def getAccessControlEntryFilter(acl: Acl): AccessControlEntryFilter = {
|
||||||
|
new AccessControlEntryFilter(acl.principal.toString, acl.host, acl.operation.toJava, acl.permissionType.toJava)
|
||||||
|
}
|
||||||
|
|
||||||
|
private def getAcls(adminClient: JAdminClient, filters: Set[ResourcePatternFilter]): Map[ResourcePattern, Set[AccessControlEntry]] = {
|
||||||
|
val aclBindings =
|
||||||
|
if (filters.isEmpty) adminClient.describeAcls(AclBindingFilter.ANY).values().get().asScala.toList
|
||||||
|
else {
|
||||||
|
val results = for (filter <- filters) yield {
|
||||||
|
adminClient.describeAcls(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values().get().asScala.toList
|
||||||
|
}
|
||||||
|
results.reduceLeft(_ ++ _)
|
||||||
|
}
|
||||||
|
|
||||||
|
val resourceToAcls = mutable.Map[ResourcePattern, Set[AccessControlEntry]]().withDefaultValue(Set())
|
||||||
|
|
||||||
|
aclBindings.foreach(aclBinding => resourceToAcls(aclBinding.pattern()) = resourceToAcls(aclBinding.pattern()) + aclBinding.entry())
|
||||||
|
resourceToAcls.toMap
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class AuthorizerService(val opts: AclCommandOptions) extends AclCommandService with Logging {
|
||||||
|
|
||||||
|
private def withAuthorizer()(f: Authorizer => Unit) {
|
||||||
|
val defaultProps = Map(KafkaConfig.ZkEnableSecureAclsProp -> JaasUtils.isZkSecurityEnabled)
|
||||||
|
val authorizerProperties =
|
||||||
|
if (opts.options.has(opts.authorizerPropertiesOpt)) {
|
||||||
|
val authorizerProperties = opts.options.valuesOf(opts.authorizerPropertiesOpt).asScala
|
||||||
|
defaultProps ++ CommandLineUtils.parseKeyValueArgs(authorizerProperties, acceptMissingValue = false).asScala
|
||||||
|
} else {
|
||||||
|
defaultProps
|
||||||
|
}
|
||||||
|
|
||||||
|
val authorizerClass = if (opts.options.has(opts.authorizerOpt))
|
||||||
|
opts.options.valueOf(opts.authorizerOpt)
|
||||||
|
else
|
||||||
|
classOf[SimpleAclAuthorizer].getName
|
||||||
|
|
||||||
|
val authZ = CoreUtils.createObject[Authorizer](authorizerClass)
|
||||||
|
try {
|
||||||
|
authZ.configure(authorizerProperties.asJava)
|
||||||
|
f(authZ)
|
||||||
|
}
|
||||||
|
finally CoreUtils.swallow(authZ.close(), this)
|
||||||
|
}
|
||||||
|
|
||||||
|
def addAcls(): Unit = {
|
||||||
|
val resourceToAcl = getResourceToAcls(opts)
|
||||||
|
withAuthorizer() { authorizer =>
|
||||||
|
for ((resource, acls) <- resourceToAcl) {
|
||||||
|
println(s"Adding ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
|
||||||
|
authorizer.addAcls(acls, resource)
|
||||||
|
}
|
||||||
|
|
||||||
|
listAcls()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def removeAcls(): Unit = {
|
||||||
|
withAuthorizer() { authorizer =>
|
||||||
|
val filterToAcl = getResourceFilterToAcls(opts)
|
||||||
|
|
||||||
|
for ((filter, acls) <- filterToAcl) {
|
||||||
|
if (acls.isEmpty) {
|
||||||
|
if (confirmAction(opts, s"Are you sure you want to delete all ACLs for resource filter `$filter`? (y/n)"))
|
||||||
|
removeAcls(authorizer, acls, filter)
|
||||||
|
} else {
|
||||||
|
if (confirmAction(opts, s"Are you sure you want to remove ACLs: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource filter `$filter`? (y/n)"))
|
||||||
|
removeAcls(authorizer, acls, filter)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
listAcls()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def listAcls(): Unit = {
|
||||||
|
withAuthorizer() { authorizer =>
|
||||||
|
val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
|
||||||
|
|
||||||
|
val resourceToAcls: Iterable[(Resource, Set[Acl])] =
|
||||||
|
if (filters.isEmpty) authorizer.getAcls()
|
||||||
|
else filters.flatMap(filter => getAcls(authorizer, filter))
|
||||||
|
|
||||||
|
for ((resource, acls) <- resourceToAcls)
|
||||||
|
println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private def removeAcls(authorizer: Authorizer, acls: Set[Acl], filter: ResourcePatternFilter) {
|
||||||
|
getAcls(authorizer, filter)
|
||||||
|
.keys
|
||||||
|
.foreach(resource =>
|
||||||
|
if (acls.isEmpty) authorizer.removeAcls(resource)
|
||||||
|
else authorizer.removeAcls(acls, resource)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
private def getAcls(authorizer: Authorizer, filter: ResourcePatternFilter): Map[Resource, Set[Acl]] =
|
||||||
|
authorizer.getAcls()
|
||||||
|
.filter { case (resource, acl) => filter.matches(resource.toPattern) }
|
||||||
|
}
|
||||||
|
|
||||||
|
private def getResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = {
|
||||||
val patternType: PatternType = opts.options.valueOf(opts.resourcePatternType)
|
val patternType: PatternType = opts.options.valueOf(opts.resourcePatternType)
|
||||||
if (!patternType.isSpecific)
|
if (!patternType.isSpecific)
|
||||||
CommandLineUtils.printUsageAndDie(opts.parser, s"A '--resource-pattern-type' value of '$patternType' is not valid when adding acls.")
|
CommandLineUtils.printUsageAndDie(opts.parser, s"A '--resource-pattern-type' value of '$patternType' is not valid when adding acls.")
|
||||||
|
|
||||||
withAuthorizer(opts) { authorizer =>
|
val resourceToAcl = getResourceFilterToAcls(opts).map {
|
||||||
val resourceToAcl = getResourceFilterToAcls(opts).map {
|
case (filter, acls) =>
|
||||||
case (filter, acls) =>
|
Resource(ResourceType.fromJava(filter.resourceType()), filter.name(), filter.patternType()) -> acls
|
||||||
Resource(ResourceType.fromJava(filter.resourceType()), filter.name(), filter.patternType()) -> acls
|
|
||||||
}
|
|
||||||
|
|
||||||
if (resourceToAcl.values.exists(_.isEmpty))
|
|
||||||
CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add ACLs.")
|
|
||||||
|
|
||||||
for ((resource, acls) <- resourceToAcl) {
|
|
||||||
println(s"Adding ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
|
|
||||||
authorizer.addAcls(acls, resource)
|
|
||||||
}
|
|
||||||
|
|
||||||
listAcl(opts)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (resourceToAcl.values.exists(_.isEmpty))
|
||||||
|
CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add ACLs.")
|
||||||
|
|
||||||
|
resourceToAcl
|
||||||
}
|
}
|
||||||
|
|
||||||
private def removeAcl(opts: AclCommandOptions) {
|
|
||||||
withAuthorizer(opts) { authorizer =>
|
|
||||||
val filterToAcl = getResourceFilterToAcls(opts)
|
|
||||||
|
|
||||||
for ((filter, acls) <- filterToAcl) {
|
|
||||||
if (acls.isEmpty) {
|
|
||||||
if (confirmAction(opts, s"Are you sure you want to delete all ACLs for resource filter `$filter`? (y/n)"))
|
|
||||||
removeAcls(authorizer, acls, filter)
|
|
||||||
} else {
|
|
||||||
if (confirmAction(opts, s"Are you sure you want to remove ACLs: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource filter `$filter`? (y/n)"))
|
|
||||||
removeAcls(authorizer, acls, filter)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
listAcl(opts)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private def removeAcls(authorizer: Authorizer, acls: Set[Acl], filter: ResourcePatternFilter) {
|
|
||||||
getAcls(authorizer, filter)
|
|
||||||
.keys
|
|
||||||
.foreach(resource =>
|
|
||||||
if (acls.isEmpty) authorizer.removeAcls(resource)
|
|
||||||
else authorizer.removeAcls(acls, resource)
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
private def listAcl(opts: AclCommandOptions) {
|
|
||||||
withAuthorizer(opts) { authorizer =>
|
|
||||||
val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
|
|
||||||
|
|
||||||
val resourceToAcls: Iterable[(Resource, Set[Acl])] =
|
|
||||||
if (filters.isEmpty) authorizer.getAcls()
|
|
||||||
else filters.flatMap(filter => getAcls(authorizer, filter))
|
|
||||||
|
|
||||||
for ((resource, acls) <- resourceToAcls)
|
|
||||||
println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private def getAcls(authorizer: Authorizer, filter: ResourcePatternFilter): Map[Resource, Set[Acl]] =
|
|
||||||
authorizer.getAcls()
|
|
||||||
.filter { case (resource, acl) => filter.matches(resource.toPattern) }
|
|
||||||
|
|
||||||
private def getResourceFilterToAcls(opts: AclCommandOptions): Map[ResourcePatternFilter, Set[Acl]] = {
|
private def getResourceFilterToAcls(opts: AclCommandOptions): Map[ResourcePatternFilter, Set[Acl]] = {
|
||||||
var resourceToAcls = Map.empty[ResourcePatternFilter, Set[Acl]]
|
var resourceToAcls = Map.empty[ResourcePatternFilter, Set[Acl]]
|
||||||
|
|
||||||
|
@ -257,7 +381,7 @@ object AclCommand extends Logging {
|
||||||
|
|
||||||
private def getPrincipals(opts: AclCommandOptions, principalOptionSpec: ArgumentAcceptingOptionSpec[String]): Set[KafkaPrincipal] = {
|
private def getPrincipals(opts: AclCommandOptions, principalOptionSpec: ArgumentAcceptingOptionSpec[String]): Set[KafkaPrincipal] = {
|
||||||
if (opts.options.has(principalOptionSpec))
|
if (opts.options.has(principalOptionSpec))
|
||||||
opts.options.valuesOf(principalOptionSpec).asScala.map(s => KafkaPrincipal.fromString(s.trim)).toSet
|
opts.options.valuesOf(principalOptionSpec).asScala.map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toSet
|
||||||
else
|
else
|
||||||
Set.empty[KafkaPrincipal]
|
Set.empty[KafkaPrincipal]
|
||||||
}
|
}
|
||||||
|
@ -305,11 +429,23 @@ object AclCommand extends Logging {
|
||||||
|
|
||||||
class AclCommandOptions(args: Array[String]) {
|
class AclCommandOptions(args: Array[String]) {
|
||||||
val parser = new OptionParser(false)
|
val parser = new OptionParser(false)
|
||||||
|
val CommandConfigDoc = "A property file containing configs to be passed to Admin Client."
|
||||||
|
|
||||||
|
val bootstrapServerOpt = parser.accepts("bootstrap-server", "A list of host/port pairs to use for establishing the connection to the Kafka cluster." +
|
||||||
|
" This list should be in the form host1:port1,host2:port2,... This config is required for acl management using admin client API.")
|
||||||
|
.withRequiredArg
|
||||||
|
.describedAs("server to connect to")
|
||||||
|
.ofType(classOf[String])
|
||||||
|
|
||||||
|
val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc)
|
||||||
|
.withOptionalArg()
|
||||||
|
.describedAs("command-config")
|
||||||
|
.ofType(classOf[String])
|
||||||
|
|
||||||
val authorizerOpt = parser.accepts("authorizer", "Fully qualified class name of the authorizer, defaults to kafka.security.auth.SimpleAclAuthorizer.")
|
val authorizerOpt = parser.accepts("authorizer", "Fully qualified class name of the authorizer, defaults to kafka.security.auth.SimpleAclAuthorizer.")
|
||||||
.withRequiredArg
|
.withRequiredArg
|
||||||
.describedAs("authorizer")
|
.describedAs("authorizer")
|
||||||
.ofType(classOf[String])
|
.ofType(classOf[String])
|
||||||
.defaultsTo(classOf[SimpleAclAuthorizer].getName)
|
|
||||||
|
|
||||||
val authorizerPropertiesOpt = parser.accepts("authorizer-properties", "REQUIRED: properties required to configure an instance of Authorizer. " +
|
val authorizerPropertiesOpt = parser.accepts("authorizer-properties", "REQUIRED: properties required to configure an instance of Authorizer. " +
|
||||||
"These are key=val pairs. For the default authorizer the example values are: zookeeper.connect=localhost:2181")
|
"These are key=val pairs. For the default authorizer the example values are: zookeeper.connect=localhost:2181")
|
||||||
|
@ -410,7 +546,17 @@ object AclCommand extends Logging {
|
||||||
val options = parser.parse(args: _*)
|
val options = parser.parse(args: _*)
|
||||||
|
|
||||||
def checkArgs() {
|
def checkArgs() {
|
||||||
CommandLineUtils.checkRequiredArgs(parser, options, authorizerPropertiesOpt)
|
if (options.has(bootstrapServerOpt) && options.has(authorizerOpt))
|
||||||
|
CommandLineUtils.printUsageAndDie(parser, "Only one of --bootstrap-server or --authorizer must be specified")
|
||||||
|
|
||||||
|
if (!options.has(bootstrapServerOpt))
|
||||||
|
CommandLineUtils.checkRequiredArgs(parser, options, authorizerPropertiesOpt)
|
||||||
|
|
||||||
|
if (options.has(commandConfigOpt) && !options.has(bootstrapServerOpt))
|
||||||
|
CommandLineUtils.printUsageAndDie(parser, "The --command-config option can only be used with --bootstrap-server option")
|
||||||
|
|
||||||
|
if (options.has(authorizerPropertiesOpt) && options.has(bootstrapServerOpt))
|
||||||
|
CommandLineUtils.printUsageAndDie(parser, "The --authorizer-properties option can only be used with --authorizer option")
|
||||||
|
|
||||||
val actions = Seq(addOpt, removeOpt, listOpt).count(options.has)
|
val actions = Seq(addOpt, removeOpt, listOpt).count(options.has)
|
||||||
if (actions != 1)
|
if (actions != 1)
|
||||||
|
|
|
@ -22,8 +22,7 @@ import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFi
|
||||||
import org.apache.kafka.common.protocol.Errors
|
import org.apache.kafka.common.protocol.Errors
|
||||||
import org.apache.kafka.common.requests.ApiError
|
import org.apache.kafka.common.requests.ApiError
|
||||||
import org.apache.kafka.common.resource.ResourcePattern
|
import org.apache.kafka.common.resource.ResourcePattern
|
||||||
import org.apache.kafka.common.security.auth.KafkaPrincipal
|
import org.apache.kafka.common.utils.SecurityUtils._
|
||||||
|
|
||||||
import scala.util.{Failure, Success, Try}
|
import scala.util.{Failure, Success, Try}
|
||||||
|
|
||||||
|
|
||||||
|
@ -32,7 +31,7 @@ object SecurityUtils {
|
||||||
def convertToResourceAndAcl(filter: AclBindingFilter): Either[ApiError, (Resource, Acl)] = {
|
def convertToResourceAndAcl(filter: AclBindingFilter): Either[ApiError, (Resource, Acl)] = {
|
||||||
(for {
|
(for {
|
||||||
resourceType <- Try(ResourceType.fromJava(filter.patternFilter.resourceType))
|
resourceType <- Try(ResourceType.fromJava(filter.patternFilter.resourceType))
|
||||||
principal <- Try(KafkaPrincipal.fromString(filter.entryFilter.principal))
|
principal <- Try(parseKafkaPrincipal(filter.entryFilter.principal))
|
||||||
operation <- Try(Operation.fromJava(filter.entryFilter.operation))
|
operation <- Try(Operation.fromJava(filter.entryFilter.operation))
|
||||||
permissionType <- Try(PermissionType.fromJava(filter.entryFilter.permissionType))
|
permissionType <- Try(PermissionType.fromJava(filter.entryFilter.permissionType))
|
||||||
resource = Resource(resourceType, filter.patternFilter.name, filter.patternFilter.patternType)
|
resource = Resource(resourceType, filter.patternFilter.name, filter.patternFilter.patternType)
|
||||||
|
|
|
@ -20,20 +20,24 @@ import java.util.Properties
|
||||||
|
|
||||||
import kafka.admin.AclCommand.AclCommandOptions
|
import kafka.admin.AclCommand.AclCommandOptions
|
||||||
import kafka.security.auth._
|
import kafka.security.auth._
|
||||||
import kafka.server.KafkaConfig
|
import kafka.server.{KafkaConfig, KafkaServer}
|
||||||
import kafka.utils.{Exit, Logging, TestUtils}
|
import kafka.utils.{Exit, Logging, TestUtils}
|
||||||
import kafka.zk.ZooKeeperTestHarness
|
import kafka.zk.ZooKeeperTestHarness
|
||||||
import org.apache.kafka.common.resource.PatternType
|
import org.apache.kafka.common.resource.PatternType
|
||||||
|
import org.apache.kafka.common.network.ListenerName
|
||||||
|
|
||||||
import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
|
import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
|
||||||
import org.apache.kafka.common.security.auth.KafkaPrincipal
|
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
|
||||||
import org.junit.{Before, Test}
|
import org.apache.kafka.common.utils.SecurityUtils
|
||||||
|
import org.junit.{After, Before, Test}
|
||||||
|
|
||||||
class AclCommandTest extends ZooKeeperTestHarness with Logging {
|
class AclCommandTest extends ZooKeeperTestHarness with Logging {
|
||||||
|
|
||||||
private val principal: KafkaPrincipal = KafkaPrincipal.fromString("User:test2")
|
var servers: Seq[KafkaServer] = Seq()
|
||||||
private val Users = Set(KafkaPrincipal.fromString("User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"),
|
|
||||||
principal,
|
private val principal: KafkaPrincipal = SecurityUtils.parseKafkaPrincipal("User:test2")
|
||||||
KafkaPrincipal.fromString("""User:CN=\#User with special chars in CN : (\, \+ \" \\ \< \> \; ')"""))
|
private val Users = Set(SecurityUtils.parseKafkaPrincipal("User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"),
|
||||||
|
principal, SecurityUtils.parseKafkaPrincipal("""User:CN=\#User with special chars in CN : (\, \+ \" \\ \< \> \; ')"""))
|
||||||
private val Hosts = Set("host1", "host2")
|
private val Hosts = Set("host1", "host2")
|
||||||
private val AllowHostCommand = Array("--allow-host", "host1", "--allow-host", "host2")
|
private val AllowHostCommand = Array("--allow-host", "host1", "--allow-host", "host2")
|
||||||
private val DenyHostCommand = Array("--deny-host", "host1", "--deny-host", "host2")
|
private val DenyHostCommand = Array("--deny-host", "host1", "--deny-host", "host2")
|
||||||
|
@ -87,6 +91,7 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
|
||||||
|
|
||||||
private var brokerProps: Properties = _
|
private var brokerProps: Properties = _
|
||||||
private var zkArgs: Array[String] = _
|
private var zkArgs: Array[String] = _
|
||||||
|
private var adminArgs: Array[String] = _
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
override def setUp(): Unit = {
|
override def setUp(): Unit = {
|
||||||
|
@ -94,33 +99,66 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
|
||||||
|
|
||||||
brokerProps = TestUtils.createBrokerConfig(0, zkConnect)
|
brokerProps = TestUtils.createBrokerConfig(0, zkConnect)
|
||||||
brokerProps.put(KafkaConfig.AuthorizerClassNameProp, "kafka.security.auth.SimpleAclAuthorizer")
|
brokerProps.put(KafkaConfig.AuthorizerClassNameProp, "kafka.security.auth.SimpleAclAuthorizer")
|
||||||
|
brokerProps.put(SimpleAclAuthorizer.SuperUsersProp, "User:ANONYMOUS")
|
||||||
|
|
||||||
zkArgs = Array("--authorizer-properties", "zookeeper.connect=" + zkConnect)
|
zkArgs = Array("--authorizer-properties", "zookeeper.connect=" + zkConnect)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
override def tearDown() {
|
||||||
|
TestUtils.shutdownServers(servers)
|
||||||
|
super.tearDown()
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testAclCli() {
|
def testAclCliWithAuthorizer(): Unit = {
|
||||||
|
testAclCli(zkArgs)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testAclCliWithAdminAPI(): Unit = {
|
||||||
|
createServer()
|
||||||
|
testAclCli(adminArgs)
|
||||||
|
}
|
||||||
|
|
||||||
|
private def createServer(): Unit = {
|
||||||
|
servers = Seq(TestUtils.createServer(KafkaConfig.fromProps(brokerProps)))
|
||||||
|
val listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
|
||||||
|
adminArgs = Array("--bootstrap-server", TestUtils.bootstrapServers(servers, listenerName))
|
||||||
|
}
|
||||||
|
|
||||||
|
private def testAclCli(cmdArgs: Array[String]) {
|
||||||
for ((resources, resourceCmd) <- ResourceToCommand) {
|
for ((resources, resourceCmd) <- ResourceToCommand) {
|
||||||
for (permissionType <- PermissionType.values) {
|
for (permissionType <- PermissionType.values) {
|
||||||
val operationToCmd = ResourceToOperations(resources)
|
val operationToCmd = ResourceToOperations(resources)
|
||||||
val (acls, cmd) = getAclToCommand(permissionType, operationToCmd._1)
|
val (acls, cmd) = getAclToCommand(permissionType, operationToCmd._1)
|
||||||
AclCommand.main(zkArgs ++ cmd ++ resourceCmd ++ operationToCmd._2 :+ "--add")
|
AclCommand.main(cmdArgs ++ cmd ++ resourceCmd ++ operationToCmd._2 :+ "--add")
|
||||||
for (resource <- resources) {
|
for (resource <- resources) {
|
||||||
withAuthorizer() { authorizer =>
|
withAuthorizer() { authorizer =>
|
||||||
TestUtils.waitAndVerifyAcls(acls, authorizer, resource)
|
TestUtils.waitAndVerifyAcls(acls, authorizer, resource)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
testRemove(resources, resourceCmd, brokerProps)
|
testRemove(cmdArgs, resources, resourceCmd)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testProducerConsumerCli() {
|
def testProducerConsumerCliWithAuthorizer(): Unit = {
|
||||||
|
testProducerConsumerCli(zkArgs)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testProducerConsumerCliWithAdminAPI(): Unit = {
|
||||||
|
createServer()
|
||||||
|
testProducerConsumerCli(adminArgs)
|
||||||
|
}
|
||||||
|
|
||||||
|
private def testProducerConsumerCli(cmdArgs: Array[String]) {
|
||||||
for ((cmd, resourcesToAcls) <- CmdToResourcesToAcl) {
|
for ((cmd, resourcesToAcls) <- CmdToResourcesToAcl) {
|
||||||
val resourceCommand: Array[String] = resourcesToAcls.keys.map(ResourceToCommand).foldLeft(Array[String]())(_ ++ _)
|
val resourceCommand: Array[String] = resourcesToAcls.keys.map(ResourceToCommand).foldLeft(Array[String]())(_ ++ _)
|
||||||
AclCommand.main(zkArgs ++ getCmd(Allow) ++ resourceCommand ++ cmd :+ "--add")
|
AclCommand.main(cmdArgs ++ getCmd(Allow) ++ resourceCommand ++ cmd :+ "--add")
|
||||||
for ((resources, acls) <- resourcesToAcls) {
|
for ((resources, acls) <- resourcesToAcls) {
|
||||||
for (resource <- resources) {
|
for (resource <- resources) {
|
||||||
withAuthorizer() { authorizer =>
|
withAuthorizer() { authorizer =>
|
||||||
|
@ -128,15 +166,25 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
testRemove(resourcesToAcls.keys.flatten.toSet, resourceCommand ++ cmd, brokerProps)
|
testRemove(cmdArgs, resourcesToAcls.keys.flatten.toSet, resourceCommand ++ cmd)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testAclsOnPrefixedResources(): Unit = {
|
def testAclsOnPrefixedResourcesWithAuthorizer(): Unit = {
|
||||||
|
testAclsOnPrefixedResources(zkArgs)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testAclsOnPrefixedResourcesWithAdminAPI(): Unit = {
|
||||||
|
createServer()
|
||||||
|
testAclsOnPrefixedResources(adminArgs)
|
||||||
|
}
|
||||||
|
|
||||||
|
private def testAclsOnPrefixedResources(cmdArgs: Array[String]): Unit = {
|
||||||
val cmd = Array("--allow-principal", principal.toString, "--producer", "--topic", "Test-", "--resource-pattern-type", "Prefixed")
|
val cmd = Array("--allow-principal", principal.toString, "--producer", "--topic", "Test-", "--resource-pattern-type", "Prefixed")
|
||||||
|
|
||||||
AclCommand.main(zkArgs ++ cmd :+ "--add")
|
AclCommand.main(cmdArgs ++ cmd :+ "--add")
|
||||||
|
|
||||||
withAuthorizer() { authorizer =>
|
withAuthorizer() { authorizer =>
|
||||||
val writeAcl = Acl(principal, Allow, Acl.WildCardHost, Write)
|
val writeAcl = Acl(principal, Allow, Acl.WildCardHost, Write)
|
||||||
|
@ -145,7 +193,7 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
|
||||||
TestUtils.waitAndVerifyAcls(Set(writeAcl, describeAcl, createAcl), authorizer, Resource(Topic, "Test-", PREFIXED))
|
TestUtils.waitAndVerifyAcls(Set(writeAcl, describeAcl, createAcl), authorizer, Resource(Topic, "Test-", PREFIXED))
|
||||||
}
|
}
|
||||||
|
|
||||||
AclCommand.main(zkArgs ++ cmd :+ "--remove" :+ "--force")
|
AclCommand.main(cmdArgs ++ cmd :+ "--remove" :+ "--force")
|
||||||
|
|
||||||
withAuthorizer() { authorizer =>
|
withAuthorizer() { authorizer =>
|
||||||
TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, Resource(Cluster, "kafka-cluster", LITERAL))
|
TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, Resource(Cluster, "kafka-cluster", LITERAL))
|
||||||
|
@ -156,7 +204,8 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
|
||||||
@Test(expected = classOf[IllegalArgumentException])
|
@Test(expected = classOf[IllegalArgumentException])
|
||||||
def testInvalidAuthorizerProperty() {
|
def testInvalidAuthorizerProperty() {
|
||||||
val args = Array("--authorizer-properties", "zookeeper.connect " + zkConnect)
|
val args = Array("--authorizer-properties", "zookeeper.connect " + zkConnect)
|
||||||
AclCommand.withAuthorizer(new AclCommandOptions(args))(null)
|
val aclCommandService = new AclCommand.AuthorizerService(new AclCommandOptions(args))
|
||||||
|
aclCommandService.listAcls()
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -188,9 +237,9 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def testRemove(resources: Set[Resource], resourceCmd: Array[String], brokerProps: Properties) {
|
private def testRemove(cmdArgs: Array[String], resources: Set[Resource], resourceCmd: Array[String]) {
|
||||||
for (resource <- resources) {
|
for (resource <- resources) {
|
||||||
AclCommand.main(zkArgs ++ resourceCmd :+ "--remove" :+ "--force")
|
AclCommand.main(cmdArgs ++ resourceCmd :+ "--remove" :+ "--force")
|
||||||
withAuthorizer() { authorizer =>
|
withAuthorizer() { authorizer =>
|
||||||
TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, resource)
|
TestUtils.waitAndVerifyAcls(Set.empty[Acl], authorizer, resource)
|
||||||
}
|
}
|
||||||
|
@ -208,7 +257,7 @@ class AclCommandTest extends ZooKeeperTestHarness with Logging {
|
||||||
Users.foldLeft(cmd) ((cmd, user) => cmd ++ Array(principalCmd, user.toString))
|
Users.foldLeft(cmd) ((cmd, user) => cmd ++ Array(principalCmd, user.toString))
|
||||||
}
|
}
|
||||||
|
|
||||||
def withAuthorizer()(f: Authorizer => Unit) {
|
private def withAuthorizer()(f: Authorizer => Unit) {
|
||||||
val kafkaConfig = KafkaConfig.fromProps(brokerProps, doLog = false)
|
val kafkaConfig = KafkaConfig.fromProps(brokerProps, doLog = false)
|
||||||
val authZ = new SimpleAclAuthorizer
|
val authZ = new SimpleAclAuthorizer
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -1075,6 +1075,18 @@
|
||||||
<td></td>
|
<td></td>
|
||||||
<td>Configuration</td>
|
<td>Configuration</td>
|
||||||
</tr>
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<td>--bootstrap-server</td>
|
||||||
|
<td>A list of host/port pairs to use for establishing the connection to the Kafka cluster. Only one of --bootstrap-server or --authorizer option must be specified.</td>
|
||||||
|
<td></td>
|
||||||
|
<td>Configuration</td>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<td>--command-config</td>
|
||||||
|
<td>A property file containing configs to be passed to Admin Client. This option can only be used with --bootstrap-server option.</td>
|
||||||
|
<td></td>
|
||||||
|
<td>Configuration</td>
|
||||||
|
</tr>
|
||||||
<tr>
|
<tr>
|
||||||
<td>--cluster</td>
|
<td>--cluster</td>
|
||||||
<td>Indicates to the script that the user is trying to interact with acls on the singular cluster resource.</td>
|
<td>Indicates to the script that the user is trying to interact with acls on the singular cluster resource.</td>
|
||||||
|
@ -1199,7 +1211,17 @@
|
||||||
<pre class="brush: bash;"> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1 </pre>
|
<pre class="brush: bash;"> bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1 </pre>
|
||||||
Note that for consumer option we must also specify the consumer group.
|
Note that for consumer option we must also specify the consumer group.
|
||||||
In order to remove a principal from producer or consumer role we just need to pass --remove option. </li>
|
In order to remove a principal from producer or consumer role we just need to pass --remove option. </li>
|
||||||
</ul>
|
|
||||||
|
<li><b>AdminClient API based acl management</b><br>
|
||||||
|
Users having Alter permission on ClusterResource can use AdminClient API for ACL management. kafka-acls.sh script supports AdminClient API to manage ACLs without interacting with zookeeper/authorizer directly.
|
||||||
|
All the above examples can be executed by using <b>--bootstrap-server</b> option. For example:
|
||||||
|
|
||||||
|
<pre class="brush: bash;">
|
||||||
|
bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob --producer --topic Test-topic
|
||||||
|
bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --add --allow-principal User:Bob --consumer --topic Test-topic --group Group-1
|
||||||
|
bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminclient-configs.conf --list --topic Test-topic</pre></li>
|
||||||
|
|
||||||
|
</ul>
|
||||||
|
|
||||||
<h3><a id="security_rolling_upgrade" href="#security_rolling_upgrade">7.5 Incorporating Security Features in a Running Cluster</a></h3>
|
<h3><a id="security_rolling_upgrade" href="#security_rolling_upgrade">7.5 Incorporating Security Features in a Running Cluster</a></h3>
|
||||||
You can secure a running cluster via one or more of the supported protocols discussed previously. This is done in phases:
|
You can secure a running cluster via one or more of the supported protocols discussed previously. This is done in phases:
|
||||||
|
|
Loading…
Reference in New Issue