KAFKA-5690; Add support to list ACLs for a given principal (KIP-357)

Author: Manikumar Reddy <manikumar.reddy@gmail.com>

Reviewers: Dong Lin <lindong28@gmail.com>

Closes #5633 from omkreddy/KAFKA-5690-LIST-PER-PRICIPAL
This commit is contained in:
Manikumar Reddy 2018-09-17 09:35:00 -07:00 committed by Dong Lin
parent 607dea234f
commit b21c66f948
2 changed files with 59 additions and 11 deletions

View File

@ -138,10 +138,22 @@ object AclCommand extends Logging {
def listAcls(): Unit = {
withAdminClient(opts) { adminClient =>
val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
val listPrincipals = getPrincipals(opts, opts.listPrincipalsOpt)
val resourceToAcls = getAcls(adminClient, filters)
for ((resource, acls) <- resourceToAcls)
println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
if (listPrincipals.isEmpty) {
for ((resource, acls) <- resourceToAcls)
println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
} else {
listPrincipals.foreach(principal => {
println(s"ACLs for principal `$principal`")
val filteredResourceToAcls = resourceToAcls.mapValues(acls =>
acls.filter(acl => principal.toString.equals(acl.principal))).filter(entry => entry._2.nonEmpty)
for ((resource, acls) <- filteredResourceToAcls)
println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
})
}
}
}
@ -237,13 +249,20 @@ object AclCommand extends Logging {
def listAcls(): Unit = {
withAuthorizer() { authorizer =>
val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
val listPrincipals = getPrincipals(opts, opts.listPrincipalsOpt)
val resourceToAcls: Iterable[(Resource, Set[Acl])] =
if (filters.isEmpty) authorizer.getAcls()
else filters.flatMap(filter => getAcls(authorizer, filter))
for ((resource, acls) <- resourceToAcls)
println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
if (listPrincipals.isEmpty) {
val resourceToAcls = getFilteredResourceToAcls(authorizer, filters)
for ((resource, acls) <- resourceToAcls)
println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
} else {
listPrincipals.foreach(principal => {
println(s"ACLs for principal `$principal`")
val resourceToAcls = getFilteredResourceToAcls(authorizer, filters, Some(principal))
for ((resource, acls) <- resourceToAcls)
println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
})
}
}
}
@ -256,9 +275,23 @@ object AclCommand extends Logging {
)
}
private def getAcls(authorizer: Authorizer, filter: ResourcePatternFilter): Map[Resource, Set[Acl]] =
authorizer.getAcls()
.filter { case (resource, acl) => filter.matches(resource.toPattern) }
private def getFilteredResourceToAcls(authorizer: Authorizer, filters: Set[ResourcePatternFilter],
listPrincipal: Option[KafkaPrincipal] = None): Iterable[(Resource, Set[Acl])] = {
if (filters.isEmpty)
if (listPrincipal.isEmpty)
authorizer.getAcls()
else
authorizer.getAcls(listPrincipal.get)
else filters.flatMap(filter => getAcls(authorizer, filter, listPrincipal))
}
private def getAcls(authorizer: Authorizer, filter: ResourcePatternFilter,
listPrincipal: Option[KafkaPrincipal] = None): Map[Resource, Set[Acl]] =
if (listPrincipal.isEmpty)
authorizer.getAcls().filter { case (resource, acl) => filter.matches(resource.toPattern) }
else
authorizer.getAcls(listPrincipal.get).filter { case (resource, acl) => filter.matches(resource.toPattern) }
}
private def getResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = {
@ -521,6 +554,12 @@ object AclCommand extends Logging {
.describedAs("deny-principal")
.ofType(classOf[String])
val listPrincipalsOpt = parser.accepts("principal", "List ACLs for the specified principal. principal is in principalType:name format." +
" Note that principalType must be supported by the Authorizer being used. Multiple --principal option can be passed.")
.withOptionalArg()
.describedAs("principal")
.ofType(classOf[String])
val allowHostsOpt = parser.accepts("allow-host", "Host from which principals listed in --allow-principal will have access. " +
"If you have specified --allow-principal then the default for this option will be set to * which allows access from all hosts.")
.withRequiredArg
@ -568,6 +607,9 @@ object AclCommand extends Logging {
CommandLineUtils.checkInvalidArgs(parser, options, producerOpt, Set(operationsOpt, denyPrincipalsOpt, denyHostsOpt))
CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt, Set(operationsOpt, denyPrincipalsOpt, denyHostsOpt))
if (options.has(listPrincipalsOpt) && !options.has(listOpt))
CommandLineUtils.printUsageAndDie(parser, "The --principal option is only available if --list is set")
if (options.has(producerOpt) && !options.has(topicOpt))
CommandLineUtils.printUsageAndDie(parser, "With --producer you must specify a --topic")

View File

@ -1129,6 +1129,12 @@
<td></td>
<td>Principal</td>
</tr>
<tr>
<td>--principal</td>
<td>Principal is in PrincipalType:name format that will be used along with --list option. This will list the ACLs for the specified principal. <br>You can specify multiple --principal in a single command.</td>
<td></td>
<td>Principal</td>
</tr>
<tr>
<td>--allow-host</td>
<td>IP address from which principals listed in --allow-principal will have access.</td>