KAFKA-14587: Move AclCommand to tools (#17880)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Mickael Maison 2024-12-14 20:05:46 +01:00 committed by GitHub
parent e41373cef6
commit 57eb5fd7dc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 746 additions and 701 deletions

View File

@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
exec $(dirname $0)/kafka-run-class.sh kafka.admin.AclCommand "$@"
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.AclCommand "$@"

View File

@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
"%~dp0kafka-run-class.bat" kafka.admin.AclCommand %*
"%~dp0kafka-run-class.bat" org.apache.kafka.tools.AclCommand %*

View File

@ -290,6 +290,8 @@
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="kafka.admin" />
<allow pkg="kafka.server" />
<allow pkg="org.apache.kafka.metadata.authorizer" />
<allow pkg="org.apache.kafka.security.authorizer" />
<allow pkg="org.apache.kafka.storage.internals" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.common" />

View File

@ -269,11 +269,11 @@
<suppress checks="ClassDataAbstractionCoupling"
files="VerifiableConsumer.java"/>
<suppress checks="CyclomaticComplexity"
files="(ConsoleConsumer|DefaultMessageFormatter|StreamsResetter|ProducerPerformance|Agent).java"/>
files="(AclCommand|ConsoleConsumer|DefaultMessageFormatter|StreamsResetter|ProducerPerformance|Agent).java"/>
<suppress checks="BooleanExpressionComplexity"
files="(StreamsResetter|DefaultMessageFormatter).java"/>
<suppress checks="NPathComplexity"
files="(DefaultMessageFormatter|ProducerPerformance|StreamsResetter|Agent|TransactionalMessageCopier|ReplicaVerificationTool|LineMessageReader).java"/>
files="(AclCommand|DefaultMessageFormatter|ProducerPerformance|StreamsResetter|Agent|TransactionalMessageCopier|ReplicaVerificationTool|LineMessageReader).java"/>
<suppress checks="ImportControl"
files="SignalLogger.java"/>
<suppress checks="IllegalImport"

View File

@ -112,8 +112,7 @@ public interface Authorizer extends Configurable, Closeable {
* to process the update synchronously on the request thread.
*
* @param requestContext Request context if the ACL is being created by a broker to handle
* a client request to create ACLs. This may be null if ACLs are created directly in ZooKeeper
* using AclCommand.
* a client request to create ACLs.
* @param aclBindings ACL bindings to create
*
* @return Create result for each ACL binding in the same order as in the input list. Each result
@ -131,8 +130,7 @@ public interface Authorizer extends Configurable, Closeable {
* Refer to the authorizer implementation docs for details on concurrent update guarantees.
*
* @param requestContext Request context if the ACL is being deleted by a broker to handle
* a client request to delete ACLs. This may be null if ACLs are deleted directly in ZooKeeper
* using AclCommand.
* a client request to delete ACLs.
* @param aclBindingFilters Filters to match ACL bindings that are to be deleted
*
* @return Delete result for each filter in the same order as in the input list.

View File

@ -1,510 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import java.util.Properties
import joptsimple._
import joptsimple.util.EnumConverter
import kafka.utils._
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.common.acl._
import org.apache.kafka.common.acl.AclOperation._
import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, Resource => JResource, ResourceType => JResourceType}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Exit, Utils, SecurityUtils => JSecurityUtils}
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
import scala.collection.mutable
import scala.io.StdIn
object AclCommand extends Logging {
private val ClusterResourceFilter = new ResourcePatternFilter(JResourceType.CLUSTER, JResource.CLUSTER_NAME, PatternType.LITERAL)
private val Newline = scala.util.Properties.lineSeparator
def main(args: Array[String]): Unit = {
val opts = new AclCommandOptions(args)
CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to manage acls on kafka.")
opts.checkArgs()
val aclCommandService = new AdminClientService(opts)
try {
if (opts.options.has(opts.addOpt))
aclCommandService.addAcls()
else if (opts.options.has(opts.removeOpt))
aclCommandService.removeAcls()
else if (opts.options.has(opts.listOpt))
aclCommandService.listAcls()
} catch {
case e: Throwable =>
println(s"Error while executing ACL command: ${e.getMessage}")
println(Utils.stackTrace(e))
Exit.exit(1)
}
}
private class AdminClientService(val opts: AclCommandOptions) extends Logging {
private def withAdminClient(opts: AclCommandOptions)(f: Admin => Unit): Unit = {
val props = if (opts.options.has(opts.commandConfigOpt))
Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
else
new Properties()
if (opts.options.has(opts.bootstrapServerOpt)) {
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
} else {
props.put(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, opts.options.valueOf(opts.bootstrapControllerOpt))
}
val adminClient = Admin.create(props)
try {
f(adminClient)
} finally {
adminClient.close()
}
}
def addAcls(): Unit = {
val resourceToAcl = getResourceToAcls(opts)
withAdminClient(opts) { adminClient =>
for ((resource, acls) <- resourceToAcl) {
println(s"Adding ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
val aclBindings = acls.map(acl => new AclBinding(resource, acl)).asJavaCollection
adminClient.createAcls(aclBindings).all().get()
}
}
}
def removeAcls(): Unit = {
withAdminClient(opts) { adminClient =>
val filterToAcl = getResourceFilterToAcls(opts)
for ((filter, acls) <- filterToAcl) {
if (acls.isEmpty) {
if (confirmAction(opts, s"Are you sure you want to delete all ACLs for resource filter `$filter`? (y/n)"))
removeAcls(adminClient, acls, filter)
} else {
if (confirmAction(opts, s"Are you sure you want to remove ACLs: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource filter `$filter`? (y/n)"))
removeAcls(adminClient, acls, filter)
}
}
}
}
def listAcls(): Unit = {
withAdminClient(opts) { adminClient =>
listAcls(adminClient)
}
}
private def listAcls(adminClient: Admin): Unit = {
val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
val listPrincipals = getPrincipals(opts, opts.listPrincipalsOpt)
val resourceToAcls = getAcls(adminClient, filters)
if (listPrincipals.isEmpty) {
printResourceAcls(resourceToAcls)
} else {
listPrincipals.foreach{principal =>
println(s"ACLs for principal `$principal`")
val filteredResourceToAcls = resourceToAcls.map { case (resource, acls) =>
resource -> acls.filter(acl => principal.toString.equals(acl.principal))
}.filter { case (_, acls) => acls.nonEmpty }
printResourceAcls(filteredResourceToAcls)
}
}
}
private def printResourceAcls(resourceToAcls: Map[ResourcePattern, Set[AccessControlEntry]]): Unit = {
for ((resource, acls) <- resourceToAcls)
println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
}
private def removeAcls(adminClient: Admin, acls: Set[AccessControlEntry], filter: ResourcePatternFilter): Unit = {
if (acls.isEmpty)
adminClient.deleteAcls(List(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).asJava).all().get()
else {
val aclBindingFilters = acls.map(acl => new AclBindingFilter(filter, acl.toFilter)).toList.asJava
adminClient.deleteAcls(aclBindingFilters).all().get()
}
}
private def getAcls(adminClient: Admin, filters: Set[ResourcePatternFilter]): Map[ResourcePattern, Set[AccessControlEntry]] = {
val aclBindings =
if (filters.isEmpty) adminClient.describeAcls(AclBindingFilter.ANY).values().get().asScala.toList
else {
val results = for (filter <- filters) yield {
adminClient.describeAcls(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values().get().asScala.toList
}
results.reduceLeft(_ ++ _)
}
val resourceToAcls = mutable.Map[ResourcePattern, Set[AccessControlEntry]]().withDefaultValue(Set())
aclBindings.foreach(aclBinding => resourceToAcls(aclBinding.pattern()) = resourceToAcls(aclBinding.pattern()) + aclBinding.entry())
resourceToAcls.toMap
}
}
private def getResourceToAcls(opts: AclCommandOptions): Map[ResourcePattern, Set[AccessControlEntry]] = {
val patternType = opts.options.valueOf(opts.resourcePatternType)
if (!patternType.isSpecific)
CommandLineUtils.printUsageAndExit(opts.parser, s"A '--resource-pattern-type' value of '$patternType' is not valid when adding acls.")
val resourceToAcl = getResourceFilterToAcls(opts).map {
case (filter, acls) =>
new ResourcePattern(filter.resourceType(), filter.name(), filter.patternType()) -> acls
}
if (resourceToAcl.values.exists(_.isEmpty))
CommandLineUtils.printUsageAndExit(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add ACLs.")
resourceToAcl
}
private def getResourceFilterToAcls(opts: AclCommandOptions): Map[ResourcePatternFilter, Set[AccessControlEntry]] = {
var resourceToAcls = Map.empty[ResourcePatternFilter, Set[AccessControlEntry]]
//if none of the --producer or --consumer options are specified , just construct ACLs from CLI options.
if (!opts.options.has(opts.producerOpt) && !opts.options.has(opts.consumerOpt)) {
resourceToAcls ++= getCliResourceFilterToAcls(opts)
}
//users are allowed to specify both --producer and --consumer options in a single command.
if (opts.options.has(opts.producerOpt))
resourceToAcls ++= getProducerResourceFilterToAcls(opts)
if (opts.options.has(opts.consumerOpt))
resourceToAcls ++= getConsumerResourceFilterToAcls(opts).map { case (k, v) => k -> (v ++ resourceToAcls.getOrElse(k, Set.empty[AccessControlEntry])) }
validateOperation(opts, resourceToAcls)
resourceToAcls
}
private def getProducerResourceFilterToAcls(opts: AclCommandOptions): Map[ResourcePatternFilter, Set[AccessControlEntry]] = {
val filters = getResourceFilter(opts)
val topics = filters.filter(_.resourceType == JResourceType.TOPIC)
val transactionalIds = filters.filter(_.resourceType == JResourceType.TRANSACTIONAL_ID)
val enableIdempotence = opts.options.has(opts.idempotentOpt)
val topicAcls = getAcl(opts, Set(WRITE, DESCRIBE, CREATE))
val transactionalIdAcls = getAcl(opts, Set(WRITE, DESCRIBE))
//Write, Describe, Create permission on topics, Write, Describe on transactionalIds
topics.map(_ -> topicAcls).toMap ++
transactionalIds.map(_ -> transactionalIdAcls).toMap ++
(if (enableIdempotence)
Map(ClusterResourceFilter -> getAcl(opts, Set(IDEMPOTENT_WRITE)))
else
Map.empty)
}
private def getConsumerResourceFilterToAcls(opts: AclCommandOptions): Map[ResourcePatternFilter, Set[AccessControlEntry]] = {
val filters = getResourceFilter(opts)
val topics = filters.filter(_.resourceType == JResourceType.TOPIC)
val groups = filters.filter(_.resourceType == JResourceType.GROUP)
//Read, Describe on topic, Read on consumerGroup
val acls = getAcl(opts, Set(READ, DESCRIBE))
topics.map(_ -> acls).toMap[ResourcePatternFilter, Set[AccessControlEntry]] ++
groups.map(_ -> getAcl(opts, Set(READ))).toMap[ResourcePatternFilter, Set[AccessControlEntry]]
}
private def getCliResourceFilterToAcls(opts: AclCommandOptions): Map[ResourcePatternFilter, Set[AccessControlEntry]] = {
val acls = getAcl(opts)
val filters = getResourceFilter(opts)
filters.map(_ -> acls).toMap
}
private def getAcl(opts: AclCommandOptions, operations: Set[AclOperation]): Set[AccessControlEntry] = {
val allowedPrincipals = getPrincipals(opts, opts.allowPrincipalsOpt)
val deniedPrincipals = getPrincipals(opts, opts.denyPrincipalsOpt)
val allowedHosts = getHosts(opts, opts.allowHostsOpt, opts.allowPrincipalsOpt)
val deniedHosts = getHosts(opts, opts.denyHostsOpt, opts.denyPrincipalsOpt)
val acls = new collection.mutable.HashSet[AccessControlEntry]
if (allowedHosts.nonEmpty && allowedPrincipals.nonEmpty)
acls ++= getAcls(allowedPrincipals, ALLOW, operations, allowedHosts)
if (deniedHosts.nonEmpty && deniedPrincipals.nonEmpty)
acls ++= getAcls(deniedPrincipals, DENY, operations, deniedHosts)
acls.toSet
}
private def getAcl(opts: AclCommandOptions): Set[AccessControlEntry] = {
val operations = opts.options.valuesOf(opts.operationsOpt).asScala
.map(operation => JSecurityUtils.operation(operation.trim)).toSet
getAcl(opts, operations)
}
def getAcls(principals: Set[KafkaPrincipal], permissionType: AclPermissionType, operations: Set[AclOperation],
hosts: Set[String]): Set[AccessControlEntry] = {
for {
principal <- principals
operation <- operations
host <- hosts
} yield new AccessControlEntry(principal.toString, host, operation, permissionType)
}
private def getHosts(opts: AclCommandOptions, hostOptionSpec: OptionSpec[String],
principalOptionSpec: OptionSpec[String]): Set[String] = {
if (opts.options.has(hostOptionSpec))
opts.options.valuesOf(hostOptionSpec).asScala.map(_.trim).toSet
else if (opts.options.has(principalOptionSpec))
Set[String](AclEntry.WILDCARD_HOST)
else
Set.empty[String]
}
private def getPrincipals(opts: AclCommandOptions, principalOptionSpec: OptionSpec[String]): Set[KafkaPrincipal] = {
if (opts.options.has(principalOptionSpec))
opts.options.valuesOf(principalOptionSpec).asScala.map(s => JSecurityUtils.parseKafkaPrincipal(s.trim)).toSet
else
Set.empty[KafkaPrincipal]
}
private def getResourceFilter(opts: AclCommandOptions, dieIfNoResourceFound: Boolean = true): Set[ResourcePatternFilter] = {
val patternType = opts.options.valueOf(opts.resourcePatternType)
var resourceFilters = Set.empty[ResourcePatternFilter]
if (opts.options.has(opts.topicOpt))
opts.options.valuesOf(opts.topicOpt).forEach(topic => resourceFilters += new ResourcePatternFilter(JResourceType.TOPIC, topic.trim, patternType))
if (patternType == PatternType.LITERAL && (opts.options.has(opts.clusterOpt) || opts.options.has(opts.idempotentOpt)))
resourceFilters += ClusterResourceFilter
if (opts.options.has(opts.groupOpt))
opts.options.valuesOf(opts.groupOpt).forEach(group => resourceFilters += new ResourcePatternFilter(JResourceType.GROUP, group.trim, patternType))
if (opts.options.has(opts.transactionalIdOpt))
opts.options.valuesOf(opts.transactionalIdOpt).forEach(transactionalId =>
resourceFilters += new ResourcePatternFilter(JResourceType.TRANSACTIONAL_ID, transactionalId, patternType))
if (opts.options.has(opts.delegationTokenOpt))
opts.options.valuesOf(opts.delegationTokenOpt).forEach(token => resourceFilters += new ResourcePatternFilter(JResourceType.DELEGATION_TOKEN, token.trim, patternType))
if (opts.options.has(opts.userPrincipalOpt))
opts.options.valuesOf(opts.userPrincipalOpt).forEach(user => resourceFilters += new ResourcePatternFilter(JResourceType.USER, user.trim, patternType))
if (resourceFilters.isEmpty && dieIfNoResourceFound)
CommandLineUtils.printUsageAndExit(opts.parser, "You must provide at least one resource: --topic <topic> or --cluster or --group <group> or --delegation-token <Delegation Token ID>")
resourceFilters
}
private def confirmAction(opts: AclCommandOptions, msg: String): Boolean = {
if (opts.options.has(opts.forceOpt))
return true
println(msg)
StdIn.readLine().equalsIgnoreCase("y")
}
private def validateOperation(opts: AclCommandOptions, resourceToAcls: Map[ResourcePatternFilter, Set[AccessControlEntry]]): Unit = {
for ((resource, acls) <- resourceToAcls) {
val validOps = AclEntry.supportedOperations(resource.resourceType).asScala.toSet + AclOperation.ALL
if ((acls.map(_.operation) -- validOps).nonEmpty)
CommandLineUtils.printUsageAndExit(opts.parser, s"ResourceType ${resource.resourceType} only supports operations ${validOps.map(JSecurityUtils.operationName).mkString(", ")}")
}
}
class AclCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
val CommandConfigDoc = "A property file containing configs to be passed to Admin Client."
val bootstrapServerOpt: OptionSpec[String] = parser.accepts("bootstrap-server", "A list of host/port pairs to use for establishing the connection to the Kafka cluster." +
" This list should be in the form host1:port1,host2:port2,... This config is required for acl management using admin client API.")
.withRequiredArg
.describedAs("server to connect to")
.ofType(classOf[String])
val bootstrapControllerOpt: OptionSpec[String] = parser.accepts("bootstrap-controller", "A list of host/port pairs to use for establishing the connection to the Kafka cluster." +
" This list should be in the form host1:port1,host2:port2,... This config is required for acl management using admin client API.")
.withRequiredArg
.describedAs("controller to connect to")
.ofType(classOf[String])
val commandConfigOpt: OptionSpec[String] = parser.accepts("command-config", CommandConfigDoc)
.withOptionalArg()
.describedAs("command-config")
.ofType(classOf[String])
val topicOpt: OptionSpec[String] = parser.accepts("topic", "topic to which ACLs should be added or removed. " +
"A value of '*' indicates ACL should apply to all topics.")
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
val clusterOpt: OptionSpecBuilder = parser.accepts("cluster", "Add/Remove cluster ACLs.")
val groupOpt: OptionSpec[String] = parser.accepts("group", "Consumer Group to which the ACLs should be added or removed. " +
"A value of '*' indicates the ACLs should apply to all groups.")
.withRequiredArg
.describedAs("group")
.ofType(classOf[String])
val transactionalIdOpt: OptionSpec[String] = parser.accepts("transactional-id", "The transactionalId to which ACLs should " +
"be added or removed. A value of '*' indicates the ACLs should apply to all transactionalIds.")
.withRequiredArg
.describedAs("transactional-id")
.ofType(classOf[String])
val idempotentOpt: OptionSpecBuilder = parser.accepts("idempotent", "Enable idempotence for the producer. This should be " +
"used in combination with the --producer option. Note that idempotence is enabled automatically if " +
"the producer is authorized to a particular transactional-id.")
val delegationTokenOpt: OptionSpec[String] = parser.accepts("delegation-token", "Delegation token to which ACLs should be added or removed. " +
"A value of '*' indicates ACL should apply to all tokens.")
.withRequiredArg
.describedAs("delegation-token")
.ofType(classOf[String])
val resourcePatternType: OptionSpec[PatternType] = parser.accepts("resource-pattern-type", "The type of the resource pattern or pattern filter. " +
"When adding acls, this should be a specific pattern type, e.g. 'literal' or 'prefixed'. " +
"When listing or removing acls, a specific pattern type can be used to list or remove acls from specific resource patterns, " +
"or use the filter values of 'any' or 'match', where 'any' will match any pattern type, but will match the resource name exactly, " +
"where as 'match' will perform pattern matching to list or remove all acls that affect the supplied resource(s). " +
"WARNING: 'match', when used in combination with the '--remove' switch, should be used with care.")
.withRequiredArg()
.ofType(classOf[String])
.withValuesConvertedBy(new PatternTypeConverter())
.defaultsTo(PatternType.LITERAL)
val addOpt: OptionSpecBuilder = parser.accepts("add", "Indicates you are trying to add ACLs.")
val removeOpt: OptionSpecBuilder = parser.accepts("remove", "Indicates you are trying to remove ACLs.")
val listOpt: OptionSpecBuilder = parser.accepts("list", "List ACLs for the specified resource, use --topic <topic> or --group <group> or --cluster to specify a resource.")
val operationsOpt: OptionSpec[String] = parser.accepts("operation", "Operation that is being allowed or denied. Valid operation names are: " + Newline +
AclEntry.ACL_OPERATIONS.asScala.map("\t" + JSecurityUtils.operationName(_)).mkString(Newline) + Newline)
.withRequiredArg
.ofType(classOf[String])
.defaultsTo(JSecurityUtils.operationName(AclOperation.ALL))
val allowPrincipalsOpt: OptionSpec[String] = parser.accepts("allow-principal", "principal is in principalType:name format." +
" Note that principalType must be supported by the Authorizer being used." +
" For example, User:'*' is the wild card indicating all users.")
.withRequiredArg
.describedAs("allow-principal")
.ofType(classOf[String])
val denyPrincipalsOpt: OptionSpec[String] = parser.accepts("deny-principal", "principal is in principalType:name format. " +
"By default anyone not added through --allow-principal is denied access. " +
"You only need to use this option as negation to already allowed set. " +
"Note that principalType must be supported by the Authorizer being used. " +
"For example if you wanted to allow access to all users in the system but not test-user you can define an ACL that " +
"allows access to User:'*' and specify --deny-principal=User:test@EXAMPLE.COM. " +
"AND PLEASE REMEMBER DENY RULES TAKES PRECEDENCE OVER ALLOW RULES.")
.withRequiredArg
.describedAs("deny-principal")
.ofType(classOf[String])
val listPrincipalsOpt: OptionSpec[String] = parser.accepts("principal", "List ACLs for the specified principal. principal is in principalType:name format." +
" Note that principalType must be supported by the Authorizer being used. Multiple --principal option can be passed.")
.withOptionalArg()
.describedAs("principal")
.ofType(classOf[String])
val allowHostsOpt: OptionSpec[String] = parser.accepts("allow-host", "Host from which principals listed in --allow-principal will have access. " +
"If you have specified --allow-principal then the default for this option will be set to '*' which allows access from all hosts.")
.withRequiredArg
.describedAs("allow-host")
.ofType(classOf[String])
val denyHostsOpt: OptionSpec[String] = parser.accepts("deny-host", "Host from which principals listed in --deny-principal will be denied access. " +
"If you have specified --deny-principal then the default for this option will be set to '*' which denies access from all hosts.")
.withRequiredArg
.describedAs("deny-host")
.ofType(classOf[String])
val producerOpt: OptionSpecBuilder = parser.accepts("producer", "Convenience option to add/remove ACLs for producer role. " +
"This will generate ACLs that allows WRITE,DESCRIBE and CREATE on topic.")
val consumerOpt: OptionSpecBuilder = parser.accepts("consumer", "Convenience option to add/remove ACLs for consumer role. " +
"This will generate ACLs that allows READ,DESCRIBE on topic and READ on group.")
val forceOpt: OptionSpecBuilder = parser.accepts("force", "Assume Yes to all queries and do not prompt.")
val userPrincipalOpt: OptionSpec[String] = parser.accepts("user-principal", "Specifies a user principal as a resource in relation with the operation. For instance " +
"one could grant CreateTokens or DescribeTokens permission on a given user principal.")
.withRequiredArg()
.describedAs("user-principal")
.ofType(classOf[String])
options = parser.parse(args: _*)
def checkArgs(): Unit = {
if (options.has(bootstrapServerOpt) && options.has(bootstrapControllerOpt))
CommandLineUtils.printUsageAndExit(parser, "Only one of --bootstrap-server or --bootstrap-controller must be specified")
if (!options.has(bootstrapServerOpt) && !options.has(bootstrapControllerOpt))
CommandLineUtils.printUsageAndExit(parser, "One of --bootstrap-server or --bootstrap-controller must be specified")
val actions = Seq(addOpt, removeOpt, listOpt).count(options.has)
if (actions != 1)
CommandLineUtils.printUsageAndExit(parser, "Command must include exactly one action: --list, --add, --remove. ")
CommandLineUtils.checkInvalidArgs(parser, options, listOpt, producerOpt, consumerOpt, allowHostsOpt, allowPrincipalsOpt, denyHostsOpt, denyPrincipalsOpt)
//when --producer or --consumer is specified , user should not specify operations as they are inferred and we also disallow --deny-principals and --deny-hosts.
CommandLineUtils.checkInvalidArgs(parser, options, producerOpt, operationsOpt, denyPrincipalsOpt, denyHostsOpt)
CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt, operationsOpt, denyPrincipalsOpt, denyHostsOpt)
if (options.has(listPrincipalsOpt) && !options.has(listOpt))
CommandLineUtils.printUsageAndExit(parser, "The --principal option is only available if --list is set")
if (options.has(producerOpt) && !options.has(topicOpt))
CommandLineUtils.printUsageAndExit(parser, "With --producer you must specify a --topic")
if (options.has(idempotentOpt) && !options.has(producerOpt))
CommandLineUtils.printUsageAndExit(parser, "The --idempotent option is only available if --producer is set")
if (options.has(consumerOpt) && (!options.has(topicOpt) || !options.has(groupOpt) || (!options.has(producerOpt) && (options.has(clusterOpt) || options.has(transactionalIdOpt)))))
CommandLineUtils.printUsageAndExit(parser, "With --consumer you must specify a --topic and a --group and no --cluster or --transactional-id option should be specified.")
}
}
}
class PatternTypeConverter extends EnumConverter[PatternType](classOf[PatternType]) {
override def convert(value: String): PatternType = {
val patternType = super.convert(value)
if (patternType.isUnknown)
throw new ValueConversionException("Unknown resource-pattern-type: " + value)
patternType
}
override def valuePattern: String = PatternType.values
.filter(_ != PatternType.UNKNOWN)
.mkString("|")
}

View File

@ -58,7 +58,7 @@ import scala.jdk.CollectionConverters._
* brokers.
*
* To start brokers we need to set a cluster ACL, which happens optionally in KafkaServerTestHarness.
* The remaining ACLs to enable access to producers and consumers are set here. To set ACLs, we use AclCommand directly.
* The remaining ACLs to enable access to producers and consumers are set here.
*
* Finally, we rely on SaslSetup to bootstrap and setup Kerberos. We don't use
* SaslTestHarness here directly because it extends QuorumTestHarness, and we

View File

@ -0,0 +1,605 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.Resource;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.SecurityUtils;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.security.authorizer.AclEntry;
import org.apache.kafka.server.util.CommandDefaultOptions;
import org.apache.kafka.server.util.CommandLineUtils;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import joptsimple.AbstractOptionSpec;
import joptsimple.OptionException;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import joptsimple.ValueConversionException;
import joptsimple.util.EnumConverter;
import static org.apache.kafka.common.acl.AclOperation.CREATE;
import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
import static org.apache.kafka.common.acl.AclOperation.READ;
import static org.apache.kafka.common.acl.AclOperation.WRITE;
import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
import static org.apache.kafka.common.acl.AclPermissionType.DENY;
public class AclCommand {
private static final ResourcePatternFilter CLUSTER_RESOURCE_FILTER =
new ResourcePatternFilter(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL);
private static final String NL = System.lineSeparator();
public static void main(String[] args) {
AclCommandOptions opts = new AclCommandOptions(args);
AdminClientService aclCommandService = new AdminClientService(opts);
try (Admin admin = Admin.create(adminConfigs(opts))) {
if (opts.options.has(opts.addOpt)) {
aclCommandService.addAcls(admin);
} else if (opts.options.has(opts.removeOpt)) {
aclCommandService.removeAcls(admin);
} else if (opts.options.has(opts.listOpt)) {
aclCommandService.listAcls(admin);
}
} catch (Throwable e) {
System.out.println("Error while executing ACL command: " + e.getMessage());
System.out.println(Utils.stackTrace(e));
Exit.exit(1);
}
}
private static Properties adminConfigs(AclCommandOptions opts) throws IOException {
Properties props = new Properties();
if (opts.options.has(opts.commandConfigOpt)) {
props = Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt));
}
if (opts.options.has(opts.bootstrapServerOpt)) {
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt));
} else {
props.put(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, opts.options.valueOf(opts.bootstrapControllerOpt));
}
return props;
}
private static class AdminClientService {
private final AclCommandOptions opts;
AdminClientService(AclCommandOptions opts) {
this.opts = opts;
}
void addAcls(Admin admin) throws ExecutionException, InterruptedException {
Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcl = getResourceToAcls(opts);
for (Map.Entry<ResourcePattern, Set<AccessControlEntry>> entry : resourceToAcl.entrySet()) {
ResourcePattern resource = entry.getKey();
Set<AccessControlEntry> acls = entry.getValue();
System.out.println("Adding ACLs for resource `" + resource + "`: " + NL + " " + acls.stream().map(a -> "\t" + a).collect(Collectors.joining(NL)) + NL);
Collection<AclBinding> aclBindings = acls.stream().map(acl -> new AclBinding(resource, acl)).collect(Collectors.toList());
admin.createAcls(aclBindings).all().get();
}
}
void removeAcls(Admin admin) throws ExecutionException, InterruptedException {
Map<ResourcePatternFilter, Set<AccessControlEntry>> filterToAcl = getResourceFilterToAcls(opts);
for (Map.Entry<ResourcePatternFilter, Set<AccessControlEntry>> entry : filterToAcl.entrySet()) {
ResourcePatternFilter filter = entry.getKey();
Set<AccessControlEntry> acls = entry.getValue();
if (acls.isEmpty()) {
if (confirmAction(opts, "Are you sure you want to delete all ACLs for resource filter `" + filter + "`? (y/n)")) {
removeAcls(admin, acls, filter);
}
} else {
String msg = "Are you sure you want to remove ACLs: " + NL +
" " + acls.stream().map(a -> "\t" + a).collect(Collectors.joining(NL)) + NL +
" from resource filter `" + filter + "`? (y/n)";
if (confirmAction(opts, msg)) {
removeAcls(admin, acls, filter);
}
}
}
}
private void listAcls(Admin admin) throws ExecutionException, InterruptedException {
Set<ResourcePatternFilter> filters = getResourceFilter(opts, false);
Set<KafkaPrincipal> listPrincipals = getPrincipals(opts, opts.listPrincipalsOpt);
Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls = getAcls(admin, filters);
if (listPrincipals.isEmpty()) {
printResourceAcls(resourceToAcls);
} else {
listPrincipals.forEach(principal -> {
System.out.println("ACLs for principal `" + principal + "`");
Map<ResourcePattern, Set<AccessControlEntry>> filteredResourceToAcls = resourceToAcls.entrySet().stream()
.map(entry -> {
ResourcePattern resource = entry.getKey();
Set<AccessControlEntry> acls = entry.getValue().stream()
.filter(acl -> principal.toString().equals(acl.principal()))
.collect(Collectors.toSet());
return new AbstractMap.SimpleEntry<>(resource, acls);
})
.filter(entry -> !entry.getValue().isEmpty())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
printResourceAcls(filteredResourceToAcls);
});
}
}
private static void printResourceAcls(Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls) {
resourceToAcls.forEach((resource, acls) ->
System.out.println("Current ACLs for resource `" + resource + "`:" + NL +
acls.stream().map(acl -> "\t" + acl).collect(Collectors.joining(NL)) + NL)
);
}
private static void removeAcls(Admin adminClient, Set<AccessControlEntry> acls, ResourcePatternFilter filter) throws ExecutionException, InterruptedException {
if (acls.isEmpty()) {
adminClient.deleteAcls(Collections.singletonList(new AclBindingFilter(filter, AccessControlEntryFilter.ANY))).all().get();
} else {
List<AclBindingFilter> aclBindingFilters = acls.stream().map(acl -> new AclBindingFilter(filter, acl.toFilter())).collect(Collectors.toList());
adminClient.deleteAcls(aclBindingFilters).all().get();
}
}
private Map<ResourcePattern, Set<AccessControlEntry>> getAcls(Admin adminClient, Set<ResourcePatternFilter> filters) throws ExecutionException, InterruptedException {
Collection<AclBinding> aclBindings;
if (filters.isEmpty()) {
aclBindings = adminClient.describeAcls(AclBindingFilter.ANY).values().get();
} else {
aclBindings = new ArrayList<>();
for (ResourcePatternFilter filter : filters) {
aclBindings.addAll(adminClient.describeAcls(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values().get());
}
}
Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls = new HashMap<>();
for (AclBinding aclBinding : aclBindings) {
ResourcePattern resource = aclBinding.pattern();
Set<AccessControlEntry> acls = resourceToAcls.getOrDefault(resource, new HashSet<>());
acls.add(aclBinding.entry());
resourceToAcls.put(resource, acls);
}
return resourceToAcls;
}
}
private static Map<ResourcePattern, Set<AccessControlEntry>> getResourceToAcls(AclCommandOptions opts) {
PatternType patternType = opts.options.valueOf(opts.resourcePatternType);
if (!patternType.isSpecific()) {
CommandLineUtils.printUsageAndExit(opts.parser, "A '--resource-pattern-type' value of '" + patternType + "' is not valid when adding acls.");
}
Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcl = getResourceFilterToAcls(opts).entrySet().stream()
.collect(Collectors.toMap(entry -> new ResourcePattern(entry.getKey().resourceType(), entry.getKey().name(), entry.getKey().patternType()),
Map.Entry::getValue));
if (resourceToAcl.values().stream().anyMatch(Set::isEmpty)) {
CommandLineUtils.printUsageAndExit(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add ACLs.");
}
return resourceToAcl;
}
private static Map<ResourcePatternFilter, Set<AccessControlEntry>> getResourceFilterToAcls(AclCommandOptions opts) {
Map<ResourcePatternFilter, Set<AccessControlEntry>> resourceToAcls = new HashMap<>();
//if none of the --producer or --consumer options are specified , just construct ACLs from CLI options.
if (!opts.options.has(opts.producerOpt) && !opts.options.has(opts.consumerOpt)) {
resourceToAcls.putAll(getCliResourceFilterToAcls(opts));
}
//users are allowed to specify both --producer and --consumer options in a single command.
if (opts.options.has(opts.producerOpt)) {
resourceToAcls.putAll(getProducerResourceFilterToAcls(opts));
}
if (opts.options.has(opts.consumerOpt)) {
getConsumerResourceFilterToAcls(opts).forEach((k, v) -> {
Set<AccessControlEntry> existingAcls = resourceToAcls.getOrDefault(k, new HashSet<>());
existingAcls.addAll(v);
resourceToAcls.put(k, existingAcls);
});
}
validateOperation(opts, resourceToAcls);
return resourceToAcls;
}
private static Map<ResourcePatternFilter, Set<AccessControlEntry>> getProducerResourceFilterToAcls(AclCommandOptions opts) {
Set<ResourcePatternFilter> filters = getResourceFilter(opts, true);
Set<ResourcePatternFilter> topics = filters.stream().filter(f -> f.resourceType() == ResourceType.TOPIC).collect(Collectors.toSet());
Set<ResourcePatternFilter> transactionalIds = filters.stream().filter(f -> f.resourceType() == ResourceType.TRANSACTIONAL_ID).collect(Collectors.toSet());
boolean enableIdempotence = opts.options.has(opts.idempotentOpt);
Set<AccessControlEntry> topicAcls = getAcl(opts, new HashSet<>(Arrays.asList(WRITE, DESCRIBE, CREATE)));
Set<AccessControlEntry> transactionalIdAcls = getAcl(opts, new HashSet<>(Arrays.asList(WRITE, DESCRIBE)));
//Write, Describe, Create permission on topics, Write, Describe on transactionalIds
Map<ResourcePatternFilter, Set<AccessControlEntry>> result = new HashMap<>();
for (ResourcePatternFilter topic : topics) {
result.put(topic, topicAcls);
}
for (ResourcePatternFilter transactionalId : transactionalIds) {
result.put(transactionalId, transactionalIdAcls);
}
if (enableIdempotence) {
result.put(CLUSTER_RESOURCE_FILTER, getAcl(opts, Collections.singleton(IDEMPOTENT_WRITE)));
}
return result;
}
private static Map<ResourcePatternFilter, Set<AccessControlEntry>> getConsumerResourceFilterToAcls(AclCommandOptions opts) {
Set<ResourcePatternFilter> filters = getResourceFilter(opts, true);
Set<ResourcePatternFilter> topics = filters.stream().filter(f -> f.resourceType() == ResourceType.TOPIC).collect(Collectors.toSet());
Set<ResourcePatternFilter> groups = filters.stream().filter(f -> f.resourceType() == ResourceType.GROUP).collect(Collectors.toSet());
//Read, Describe on topic, Read on consumerGroup
Set<AccessControlEntry> topicAcls = getAcl(opts, new HashSet<>(Arrays.asList(READ, DESCRIBE)));
Set<AccessControlEntry> groupAcls = getAcl(opts, Collections.singleton(READ));
Map<ResourcePatternFilter, Set<AccessControlEntry>> result = new HashMap<>();
for (ResourcePatternFilter topic : topics) {
result.put(topic, topicAcls);
}
for (ResourcePatternFilter group : groups) {
result.put(group, groupAcls);
}
return result;
}
private static Map<ResourcePatternFilter, Set<AccessControlEntry>> getCliResourceFilterToAcls(AclCommandOptions opts) {
Set<AccessControlEntry> acls = getAcl(opts);
Set<ResourcePatternFilter> filters = getResourceFilter(opts, true);
return filters.stream().collect(Collectors.toMap(filter -> filter, filter -> acls));
}
private static Set<AccessControlEntry> getAcl(AclCommandOptions opts, Set<AclOperation> operations) {
Set<KafkaPrincipal> allowedPrincipals = getPrincipals(opts, opts.allowPrincipalsOpt);
Set<KafkaPrincipal> deniedPrincipals = getPrincipals(opts, opts.denyPrincipalsOpt);
Set<String> allowedHosts = getHosts(opts, opts.allowHostsOpt, opts.allowPrincipalsOpt);
Set<String> deniedHosts = getHosts(opts, opts.denyHostsOpt, opts.denyPrincipalsOpt);
Set<AccessControlEntry> acls = new HashSet<>();
if (!allowedHosts.isEmpty() && !allowedPrincipals.isEmpty()) {
acls.addAll(getAcls(allowedPrincipals, ALLOW, operations, allowedHosts));
}
if (!deniedHosts.isEmpty() && !deniedPrincipals.isEmpty()) {
acls.addAll(getAcls(deniedPrincipals, DENY, operations, deniedHosts));
}
return acls;
}
private static Set<AccessControlEntry> getAcl(AclCommandOptions opts) {
Set<AclOperation> operations = opts.options.valuesOf(opts.operationsOpt)
.stream().map(operation -> SecurityUtils.operation(operation.trim()))
.collect(Collectors.toSet());
return getAcl(opts, operations);
}
static Set<AccessControlEntry> getAcls(Set<KafkaPrincipal> principals,
AclPermissionType permissionType,
Set<AclOperation> operations,
Set<String> hosts) {
Set<AccessControlEntry> acls = new HashSet<>();
for (KafkaPrincipal principal : principals) {
for (AclOperation operation : operations) {
for (String host : hosts) {
acls.add(new AccessControlEntry(principal.toString(), host, operation, permissionType));
}
}
}
return acls;
}
private static Set<String> getHosts(AclCommandOptions opts, OptionSpec<String> hostOptionSpec, OptionSpec<String> principalOptionSpec) {
if (opts.options.has(hostOptionSpec)) {
return opts.options.valuesOf(hostOptionSpec).stream().map(String::trim).collect(Collectors.toSet());
} else if (opts.options.has(principalOptionSpec)) {
return Collections.singleton(AclEntry.WILDCARD_HOST);
} else {
return Collections.emptySet();
}
}
private static Set<KafkaPrincipal> getPrincipals(AclCommandOptions opts, OptionSpec<String> principalOptionSpec) {
if (opts.options.has(principalOptionSpec)) {
return opts.options.valuesOf(principalOptionSpec).stream()
.map(s -> SecurityUtils.parseKafkaPrincipal(s.trim()))
.collect(Collectors.toSet());
} else {
return Collections.emptySet();
}
}
private static Set<ResourcePatternFilter> getResourceFilter(AclCommandOptions opts, boolean dieIfNoResourceFound) {
PatternType patternType = opts.options.valueOf(opts.resourcePatternType);
Set<ResourcePatternFilter> resourceFilters = new HashSet<>();
if (opts.options.has(opts.topicOpt)) {
opts.options.valuesOf(opts.topicOpt).forEach(topic -> resourceFilters.add(new ResourcePatternFilter(ResourceType.TOPIC, topic.trim(), patternType)));
}
if (patternType == PatternType.LITERAL && (opts.options.has(opts.clusterOpt) || opts.options.has(opts.idempotentOpt))) {
resourceFilters.add(CLUSTER_RESOURCE_FILTER);
}
if (opts.options.has(opts.groupOpt)) {
opts.options.valuesOf(opts.groupOpt).forEach(group -> resourceFilters.add(new ResourcePatternFilter(ResourceType.GROUP, group.trim(), patternType)));
}
if (opts.options.has(opts.transactionalIdOpt)) {
opts.options.valuesOf(opts.transactionalIdOpt).forEach(transactionalId ->
resourceFilters.add(new ResourcePatternFilter(ResourceType.TRANSACTIONAL_ID, transactionalId, patternType)));
}
if (opts.options.has(opts.delegationTokenOpt)) {
opts.options.valuesOf(opts.delegationTokenOpt).forEach(token -> resourceFilters.add(new ResourcePatternFilter(ResourceType.DELEGATION_TOKEN, token.trim(), patternType)));
}
if (opts.options.has(opts.userPrincipalOpt)) {
opts.options.valuesOf(opts.userPrincipalOpt).forEach(user -> resourceFilters.add(new ResourcePatternFilter(ResourceType.USER, user.trim(), patternType)));
}
if (resourceFilters.isEmpty() && dieIfNoResourceFound) {
CommandLineUtils.printUsageAndExit(opts.parser, "You must provide at least one resource: --topic <topic> or --cluster or --group <group> or --delegation-token <Delegation Token ID>");
}
return resourceFilters;
}
private static boolean confirmAction(AclCommandOptions opts, String msg) {
if (opts.options.has(opts.forceOpt)) {
return true;
}
System.out.println(msg);
return System.console().readLine().equalsIgnoreCase("y");
}
private static void validateOperation(AclCommandOptions opts, Map<ResourcePatternFilter, Set<AccessControlEntry>> resourceToAcls) {
for (Map.Entry<ResourcePatternFilter, Set<AccessControlEntry>> entry : resourceToAcls.entrySet()) {
ResourcePatternFilter resource = entry.getKey();
Set<AccessControlEntry> acls = entry.getValue();
Collection<AclOperation> validOps = new HashSet<>(AclEntry.supportedOperations(resource.resourceType()));
validOps.add(AclOperation.ALL);
Set<AclOperation> unsupportedOps = new HashSet<>();
for (AccessControlEntry acl : acls) {
if (!validOps.contains(acl.operation())) {
unsupportedOps.add(acl.operation());
}
}
if (!unsupportedOps.isEmpty()) {
String msg = String.format("ResourceType %s only supports operations %s", resource.resourceType(), validOps);
CommandLineUtils.printUsageAndExit(opts.parser, msg);
}
}
}
public static class AclCommandOptions extends CommandDefaultOptions {
private final OptionSpec<String> bootstrapServerOpt;
private final OptionSpec<String> bootstrapControllerOpt;
private final OptionSpec<String> commandConfigOpt;
private final OptionSpec<String> topicOpt;
private final OptionSpecBuilder clusterOpt;
private final OptionSpec<String> groupOpt;
private final OptionSpec<String> transactionalIdOpt;
private final OptionSpecBuilder idempotentOpt;
private final OptionSpec<String> delegationTokenOpt;
private final OptionSpec<PatternType> resourcePatternType;
private final OptionSpecBuilder addOpt;
private final OptionSpecBuilder removeOpt;
private final OptionSpecBuilder listOpt;
private final OptionSpec<String> operationsOpt;
private final OptionSpec<String> allowPrincipalsOpt;
private final OptionSpec<String> denyPrincipalsOpt;
private final OptionSpec<String> listPrincipalsOpt;
private final OptionSpec<String> allowHostsOpt;
private final OptionSpec<String> denyHostsOpt;
private final OptionSpecBuilder producerOpt;
private final OptionSpecBuilder consumerOpt;
private final OptionSpecBuilder forceOpt;
private final OptionSpec<String> userPrincipalOpt;
@SuppressWarnings("this-escape")
public AclCommandOptions(String[] args) {
super(args);
bootstrapServerOpt = parser.accepts("bootstrap-server", "A list of host/port pairs to use for establishing the connection to the Kafka cluster." +
" This list should be in the form host1:port1,host2:port2,... This config is required for acl management using admin client API.")
.withRequiredArg()
.describedAs("server to connect to")
.ofType(String.class);
bootstrapControllerOpt = parser.accepts("bootstrap-controller", "A list of host/port pairs to use for establishing the connection to the Kafka cluster." +
" This list should be in the form host1:port1,host2:port2,... This config is required for acl management using admin client API.")
.withRequiredArg()
.describedAs("controller to connect to")
.ofType(String.class);
commandConfigOpt = parser.accepts("command-config", "A property file containing configs to be passed to Admin Client.")
.withOptionalArg()
.describedAs("command-config")
.ofType(String.class);
topicOpt = parser.accepts("topic", "topic to which ACLs should be added or removed. " +
"A value of '*' indicates ACL should apply to all topics.")
.withRequiredArg()
.describedAs("topic")
.ofType(String.class);
clusterOpt = parser.accepts("cluster", "Add/Remove cluster ACLs.");
groupOpt = parser.accepts("group", "Consumer Group to which the ACLs should be added or removed. " +
"A value of '*' indicates the ACLs should apply to all groups.")
.withRequiredArg()
.describedAs("group")
.ofType(String.class);
transactionalIdOpt = parser.accepts("transactional-id", "The transactionalId to which ACLs should " +
"be added or removed. A value of '*' indicates the ACLs should apply to all transactionalIds.")
.withRequiredArg()
.describedAs("transactional-id")
.ofType(String.class);
idempotentOpt = parser.accepts("idempotent", "Enable idempotence for the producer. This should be " +
"used in combination with the --producer option. Note that idempotence is enabled automatically if " +
"the producer is authorized to a particular transactional-id.");
delegationTokenOpt = parser.accepts("delegation-token", "Delegation token to which ACLs should be added or removed. " +
"A value of '*' indicates ACL should apply to all tokens.")
.withRequiredArg()
.describedAs("delegation-token")
.ofType(String.class);
resourcePatternType = parser.accepts("resource-pattern-type", "The type of the resource pattern or pattern filter. " +
"When adding acls, this should be a specific pattern type, e.g. 'literal' or 'prefixed'. " +
"When listing or removing acls, a specific pattern type can be used to list or remove acls from specific resource patterns, " +
"or use the filter values of 'any' or 'match', where 'any' will match any pattern type, but will match the resource name exactly, " +
"where as 'match' will perform pattern matching to list or remove all acls that affect the supplied resource(s). " +
"WARNING: 'match', when used in combination with the '--remove' switch, should be used with care.")
.withRequiredArg()
.ofType(String.class)
.withValuesConvertedBy(new PatternTypeConverter())
.defaultsTo(PatternType.LITERAL);
addOpt = parser.accepts("add", "Indicates you are trying to add ACLs.");
removeOpt = parser.accepts("remove", "Indicates you are trying to remove ACLs.");
listOpt = parser.accepts("list", "List ACLs for the specified resource, use --topic <topic> or --group <group> or --cluster to specify a resource.");
operationsOpt = parser.accepts("operation", "Operation that is being allowed or denied. Valid operation names are: " + NL +
AclEntry.ACL_OPERATIONS.stream().map(o -> "\t" + SecurityUtils.operationName(o)).collect(Collectors.joining(NL)) + NL)
.withRequiredArg()
.ofType(String.class)
.defaultsTo(SecurityUtils.operationName(AclOperation.ALL));
allowPrincipalsOpt = parser.accepts("allow-principal", "principal is in principalType:name format." +
" Note that principalType must be supported by the Authorizer being used." +
" For example, User:'*' is the wild card indicating all users.")
.withRequiredArg()
.describedAs("allow-principal")
.ofType(String.class);
denyPrincipalsOpt = parser.accepts("deny-principal", "principal is in principalType:name format. " +
"By default anyone not added through --allow-principal is denied access. " +
"You only need to use this option as negation to already allowed set. " +
"Note that principalType must be supported by the Authorizer being used. " +
"For example if you wanted to allow access to all users in the system but not test-user you can define an ACL that " +
"allows access to User:'*' and specify --deny-principal=User:test@EXAMPLE.COM. " +
"AND PLEASE REMEMBER DENY RULES TAKES PRECEDENCE OVER ALLOW RULES.")
.withRequiredArg()
.describedAs("deny-principal")
.ofType(String.class);
listPrincipalsOpt = parser.accepts("principal", "List ACLs for the specified principal. principal is in principalType:name format." +
" Note that principalType must be supported by the Authorizer being used. Multiple --principal option can be passed.")
.withOptionalArg()
.describedAs("principal")
.ofType(String.class);
allowHostsOpt = parser.accepts("allow-host", "Host from which principals listed in --allow-principal will have access. " +
"If you have specified --allow-principal then the default for this option will be set to '*' which allows access from all hosts.")
.withRequiredArg()
.describedAs("allow-host")
.ofType(String.class);
denyHostsOpt = parser.accepts("deny-host", "Host from which principals listed in --deny-principal will be denied access. " +
"If you have specified --deny-principal then the default for this option will be set to '*' which denies access from all hosts.")
.withRequiredArg()
.describedAs("deny-host")
.ofType(String.class);
producerOpt = parser.accepts("producer", "Convenience option to add/remove ACLs for producer role. " +
"This will generate ACLs that allows WRITE,DESCRIBE and CREATE on topic.");
consumerOpt = parser.accepts("consumer", "Convenience option to add/remove ACLs for consumer role. " +
"This will generate ACLs that allows READ,DESCRIBE on topic and READ on group.");
forceOpt = parser.accepts("force", "Assume Yes to all queries and do not prompt.");
userPrincipalOpt = parser.accepts("user-principal", "Specifies a user principal as a resource in relation with the operation. For instance " +
"one could grant CreateTokens or DescribeTokens permission on a given user principal.")
.withRequiredArg()
.describedAs("user-principal")
.ofType(String.class);
try {
options = parser.parse(args);
} catch (OptionException e) {
CommandLineUtils.printUsageAndExit(parser, e.getMessage());
}
checkArgs();
}
void checkArgs() {
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to manage acls on kafka.");
if (options.has(bootstrapServerOpt) && options.has(bootstrapControllerOpt)) {
CommandLineUtils.printUsageAndExit(parser, "Only one of --bootstrap-server or --bootstrap-controller must be specified");
}
if (!options.has(bootstrapServerOpt) && !options.has(bootstrapControllerOpt)) {
CommandLineUtils.printUsageAndExit(parser, "One of --bootstrap-server or --bootstrap-controller must be specified");
}
List<AbstractOptionSpec<?>> mutuallyExclusiveOptions = Arrays.asList(addOpt, removeOpt, listOpt);
long mutuallyExclusiveOptionsCount = mutuallyExclusiveOptions.stream()
.filter(abstractOptionSpec -> options.has(abstractOptionSpec))
.count();
if (mutuallyExclusiveOptionsCount != 1) {
CommandLineUtils.printUsageAndExit(parser, "Command must include exactly one action: --list, --add, --remove. ");
}
CommandLineUtils.checkInvalidArgs(parser, options, listOpt, producerOpt, consumerOpt, allowHostsOpt, allowPrincipalsOpt, denyHostsOpt, denyPrincipalsOpt);
//when --producer or --consumer is specified , user should not specify operations as they are inferred and we also disallow --deny-principals and --deny-hosts.
CommandLineUtils.checkInvalidArgs(parser, options, producerOpt, operationsOpt, denyPrincipalsOpt, denyHostsOpt);
CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt, operationsOpt, denyPrincipalsOpt, denyHostsOpt);
if (options.has(listPrincipalsOpt) && !options.has(listOpt)) {
CommandLineUtils.printUsageAndExit(parser, "The --principal option is only available if --list is set");
}
if (options.has(producerOpt) && !options.has(topicOpt)) {
CommandLineUtils.printUsageAndExit(parser, "With --producer you must specify a --topic");
}
if (options.has(idempotentOpt) && !options.has(producerOpt)) {
CommandLineUtils.printUsageAndExit(parser, "The --idempotent option is only available if --producer is set");
}
if (options.has(consumerOpt) && (!options.has(topicOpt) || !options.has(groupOpt) || (!options.has(producerOpt) && (options.has(clusterOpt) || options.has(transactionalIdOpt))))) {
CommandLineUtils.printUsageAndExit(parser, "With --consumer you must specify a --topic and a --group and no --cluster or --transactional-id option should be specified.");
}
}
}
static class PatternTypeConverter extends EnumConverter<PatternType> {
PatternTypeConverter() {
super(PatternType.class);
}
@Override
public PatternType convert(String value) {
PatternType patternType = super.convert(value);
if (patternType.isUnknown())
throw new ValueConversionException("Unknown resource-pattern-type: " + value);
return patternType;
}
@Override
public String valuePattern() {
List<PatternType> values = Arrays.asList(PatternType.values());
List<PatternType> filteredValues = values.stream()
.filter(type -> type != PatternType.UNKNOWN)
.collect(Collectors.toList());
return filteredValues.stream()
.map(Object::toString)
.collect(Collectors.joining("|"));
}
}
}

View File

@ -14,9 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin;
import kafka.admin.AclCommand.AclCommandOptions;
package org.apache.kafka.tools;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBindingFilter;
@ -44,15 +42,9 @@ import org.apache.logging.log4j.Level;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -63,9 +55,6 @@ import java.util.stream.Collectors;
import javax.management.InstanceAlreadyExistsException;
import scala.Console;
import scala.jdk.javaapi.CollectionConverters;
import static org.apache.kafka.common.acl.AccessControlEntryFilter.ANY;
import static org.apache.kafka.common.acl.AclOperation.ALTER;
import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
@ -120,102 +109,98 @@ public class AclCommandTest {
private static final String TOPIC = "--topic";
private static final String RESOURCE_PATTERN_TYPE = "--resource-pattern-type";
private static final KafkaPrincipal PRINCIPAL = SecurityUtils.parseKafkaPrincipal("User:test2");
private static final Set<KafkaPrincipal> USERS = new HashSet<>(Arrays.asList(
SecurityUtils.parseKafkaPrincipal("User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"),
PRINCIPAL,
SecurityUtils.parseKafkaPrincipal("User:CN=\\#User with special chars in CN : (\\, \\+ \" \\ \\< \\> \\; ')")
));
private static final Set<String> HOSTS = new HashSet<>(Arrays.asList("host1", "host2"));
private static final List<String> ALLOW_HOST_COMMAND = Arrays.asList("--allow-host", "host1", "--allow-host", "host2");
private static final List<String> DENY_HOST_COMMAND = Arrays.asList("--deny-host", "host1", "--deny-host", "host2");
private static final Set<KafkaPrincipal> USERS = Set.of(
SecurityUtils.parseKafkaPrincipal("User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"),
PRINCIPAL,
SecurityUtils.parseKafkaPrincipal("User:CN=\\#User with special chars in CN : (\\, \\+ \" \\ \\< \\> \\; ')")
);
private static final Set<String> HOSTS = Set.of("host1", "host2");
private static final List<String> ALLOW_HOST_COMMAND = List.of("--allow-host", "host1", "--allow-host", "host2");
private static final List<String> DENY_HOST_COMMAND = List.of("--deny-host", "host1", "--deny-host", "host2");
private static final ResourcePattern CLUSTER_RESOURCE = new ResourcePattern(CLUSTER, Resource.CLUSTER_NAME, LITERAL);
private static final Set<ResourcePattern> TOPIC_RESOURCES = new HashSet<>(Arrays.asList(
new ResourcePattern(ResourceType.TOPIC, "test-1", LITERAL),
new ResourcePattern(ResourceType.TOPIC, "test-2", LITERAL)
));
private static final Set<ResourcePattern> GROUP_RESOURCES = new HashSet<>(Arrays.asList(
new ResourcePattern(ResourceType.GROUP, "testGroup-1", LITERAL),
new ResourcePattern(ResourceType.GROUP, "testGroup-2", LITERAL)
));
private static final Set<ResourcePattern> TRANSACTIONAL_ID_RESOURCES = new HashSet<>(Arrays.asList(
private static final Set<ResourcePattern> TOPIC_RESOURCES = Set.of(
new ResourcePattern(ResourceType.TOPIC, "test-1", LITERAL),
new ResourcePattern(ResourceType.TOPIC, "test-2", LITERAL)
);
private static final Set<ResourcePattern> GROUP_RESOURCES = Set.of(
new ResourcePattern(ResourceType.GROUP, "testGroup-1", LITERAL),
new ResourcePattern(ResourceType.GROUP, "testGroup-2", LITERAL)
);
private static final Set<ResourcePattern> TRANSACTIONAL_ID_RESOURCES = Set.of(
new ResourcePattern(TRANSACTIONAL_ID, "t0", LITERAL),
new ResourcePattern(TRANSACTIONAL_ID, "t1", LITERAL)
));
private static final Set<ResourcePattern> TOKEN_RESOURCES = new HashSet<>(Arrays.asList(
new ResourcePattern(DELEGATION_TOKEN, "token1", LITERAL),
new ResourcePattern(DELEGATION_TOKEN, "token2", LITERAL)
));
private static final Set<ResourcePattern> USER_RESOURCES = new HashSet<>(Arrays.asList(
new ResourcePattern(USER, "User:test-user1", LITERAL),
new ResourcePattern(USER, "User:test-user2", LITERAL)
));
);
private static final Set<ResourcePattern> TOKEN_RESOURCES = Set.of(
new ResourcePattern(DELEGATION_TOKEN, "token1", LITERAL),
new ResourcePattern(DELEGATION_TOKEN, "token2", LITERAL)
);
private static final Set<ResourcePattern> USER_RESOURCES = Set.of(
new ResourcePattern(USER, "User:test-user1", LITERAL),
new ResourcePattern(USER, "User:test-user2", LITERAL)
);
private static final Map<Set<ResourcePattern>, List<String>> RESOURCE_TO_COMMAND = new HashMap<Set<ResourcePattern>, List<String>>() {{
put(TOPIC_RESOURCES, Arrays.asList(TOPIC, "test-1", TOPIC, "test-2"));
put(Collections.singleton(CLUSTER_RESOURCE), Collections.singletonList("--cluster"));
put(GROUP_RESOURCES, Arrays.asList(GROUP, "testGroup-1", GROUP, "testGroup-2"));
put(TRANSACTIONAL_ID_RESOURCES, Arrays.asList("--transactional-id", "t0", "--transactional-id", "t1"));
put(TOKEN_RESOURCES, Arrays.asList("--delegation-token", "token1", "--delegation-token", "token2"));
put(USER_RESOURCES, Arrays.asList("--user-principal", "User:test-user1", "--user-principal", "User:test-user2"));
}};
private static final Map<Set<ResourcePattern>, List<String>> RESOURCE_TO_COMMAND = Map.of(
TOPIC_RESOURCES, List.of(TOPIC, "test-1", TOPIC, "test-2"),
Set.of(CLUSTER_RESOURCE), List.of("--cluster"),
GROUP_RESOURCES, List.of(GROUP, "testGroup-1", GROUP, "testGroup-2"),
TRANSACTIONAL_ID_RESOURCES, List.of("--transactional-id", "t0", "--transactional-id", "t1"),
TOKEN_RESOURCES, List.of("--delegation-token", "token1", "--delegation-token", "token2"),
USER_RESOURCES, List.of("--user-principal", "User:test-user1", "--user-principal", "User:test-user2")
);
private static final Map<Set<ResourcePattern>, Map.Entry<Set<AclOperation>, List<String>>> RESOURCE_TO_OPERATIONS =
new HashMap<Set<ResourcePattern>, Map.Entry<Set<AclOperation>, List<String>>>() {{
put(TOPIC_RESOURCES, new SimpleImmutableEntry<>(
new HashSet<>(Arrays.asList(READ, WRITE, CREATE, DESCRIBE, DELETE, DESCRIBE_CONFIGS, ALTER_CONFIGS, ALTER)),
Arrays.asList(OPERATION, "Read", OPERATION, "Write", OPERATION, "Create",
OPERATION, "Describe", OPERATION, "Delete", OPERATION, "DescribeConfigs",
OPERATION, "AlterConfigs", OPERATION, "Alter"))
);
put(Collections.singleton(CLUSTER_RESOURCE), new SimpleImmutableEntry<>(
new HashSet<>(Arrays.asList(CREATE, CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, ALTER, DESCRIBE)),
Arrays.asList(OPERATION, "Create", OPERATION, "ClusterAction", OPERATION, "DescribeConfigs",
OPERATION, "AlterConfigs", OPERATION, "IdempotentWrite", OPERATION, "Alter", OPERATION, "Describe"))
);
put(GROUP_RESOURCES, new SimpleImmutableEntry<>(
new HashSet<>(Arrays.asList(READ, DESCRIBE, DELETE)),
Arrays.asList(OPERATION, "Read", OPERATION, "Describe", OPERATION, "Delete"))
);
put(TRANSACTIONAL_ID_RESOURCES, new SimpleImmutableEntry<>(
new HashSet<>(Arrays.asList(DESCRIBE, WRITE)),
Arrays.asList(OPERATION, "Describe", OPERATION, "Write"))
);
put(TOKEN_RESOURCES, new SimpleImmutableEntry<>(Collections.singleton(DESCRIBE), Arrays.asList(OPERATION, "Describe")));
put(USER_RESOURCES, new SimpleImmutableEntry<>(
new HashSet<>(Arrays.asList(CREATE_TOKENS, DESCRIBE_TOKENS)),
Arrays.asList(OPERATION, "CreateTokens", OPERATION, "DescribeTokens"))
);
}};
private static final Map<Set<ResourcePattern>, Map.Entry<Set<AclOperation>, List<String>>> RESOURCE_TO_OPERATIONS = Map.of(
TOPIC_RESOURCES, Map.entry(
Set.of(READ, WRITE, CREATE, DESCRIBE, DELETE, DESCRIBE_CONFIGS, ALTER_CONFIGS, ALTER),
List.of(OPERATION, "Read", OPERATION, "Write", OPERATION, "Create",
OPERATION, "Describe", OPERATION, "Delete", OPERATION, "DescribeConfigs",
OPERATION, "AlterConfigs", OPERATION, "Alter")),
Set.of(CLUSTER_RESOURCE), Map.entry(
Set.of(CREATE, CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, ALTER, DESCRIBE),
List.of(OPERATION, "Create", OPERATION, "ClusterAction", OPERATION, "DescribeConfigs",
OPERATION, "AlterConfigs", OPERATION, "IdempotentWrite", OPERATION, "Alter", OPERATION, "Describe")),
GROUP_RESOURCES, Map.entry(
Set.of(READ, DESCRIBE, DELETE),
List.of(OPERATION, "Read", OPERATION, "Describe", OPERATION, "Delete")),
TRANSACTIONAL_ID_RESOURCES, Map.entry(
Set.of(DESCRIBE, WRITE),
List.of(OPERATION, "Describe", OPERATION, "Write")),
TOKEN_RESOURCES, Map.entry(
Set.of(DESCRIBE),
List.of(OPERATION, "Describe")),
USER_RESOURCES, Map.entry(
Set.of(CREATE_TOKENS, DESCRIBE_TOKENS),
List.of(OPERATION, "CreateTokens", OPERATION, "DescribeTokens"))
);
private static final Map<Set<ResourcePattern>, Set<AccessControlEntry>> CONSUMER_RESOURCE_TO_ACLS =
new HashMap<Set<ResourcePattern>, Set<AccessControlEntry>>() {{
put(TOPIC_RESOURCES, asJavaSet(AclCommand.getAcls(asScalaSet(USERS), ALLOW,
asScalaSet(new HashSet<>(Arrays.asList(READ, DESCRIBE))), asScalaSet(HOSTS))));
put(GROUP_RESOURCES, asJavaSet(AclCommand.getAcls(asScalaSet(USERS), ALLOW,
asScalaSet(Collections.singleton(READ)), asScalaSet(HOSTS))));
}};
private static final Map<Set<ResourcePattern>, Set<AccessControlEntry>> CONSUMER_RESOURCE_TO_ACLS = Map.of(
TOPIC_RESOURCES, AclCommand.getAcls(USERS, ALLOW, Set.of(READ, DESCRIBE), HOSTS),
GROUP_RESOURCES, AclCommand.getAcls(USERS, ALLOW, Set.of(READ), HOSTS)
);
private static final Map<List<String>, Map<Set<ResourcePattern>, Set<AccessControlEntry>>> CMD_TO_RESOURCES_TO_ACL =
new HashMap<List<String>, Map<Set<ResourcePattern>, Set<AccessControlEntry>>>() {{
put(Collections.singletonList(PRODUCER), producerResourceToAcls(false));
put(Arrays.asList(PRODUCER, IDEMPOTENT), producerResourceToAcls(true));
put(Collections.singletonList(CONSUMER), CONSUMER_RESOURCE_TO_ACLS);
put(Arrays.asList(PRODUCER, CONSUMER),
CONSUMER_RESOURCE_TO_ACLS.entrySet().stream().map(entry -> {
Set<AccessControlEntry> value = new HashSet<>(entry.getValue());
value.addAll(producerResourceToAcls(false)
.getOrDefault(entry.getKey(), Collections.emptySet()));
return new SimpleEntry<>(entry.getKey(), value);
}).collect(Collectors.toMap(Entry::getKey, Entry::getValue)));
put(Arrays.asList(PRODUCER, IDEMPOTENT, CONSUMER),
CONSUMER_RESOURCE_TO_ACLS.entrySet().stream().map(entry -> {
Set<AccessControlEntry> value = new HashSet<>(entry.getValue());
value.addAll(producerResourceToAcls(true)
.getOrDefault(entry.getKey(), Collections.emptySet()));
return new SimpleEntry<>(entry.getKey(), value);
}).collect(Collectors.toMap(Entry::getKey, Entry::getValue)));
}};
private static final Map<List<String>, Map<Set<ResourcePattern>, Set<AccessControlEntry>>> CMD_TO_RESOURCES_TO_ACL = Map.of(
List.of(PRODUCER), producerResourceToAcls(false),
List.of(PRODUCER, IDEMPOTENT), producerResourceToAcls(true),
List.of(CONSUMER), CONSUMER_RESOURCE_TO_ACLS,
List.of(PRODUCER, CONSUMER),
CONSUMER_RESOURCE_TO_ACLS.entrySet().stream().collect(Collectors.toMap(
Map.Entry::getKey,
entry -> {
Set<AccessControlEntry> value = new HashSet<>(entry.getValue());
value.addAll(producerResourceToAcls(false).getOrDefault(entry.getKey(), Set.of()));
return value;
}
)),
List.of(PRODUCER, IDEMPOTENT, CONSUMER),
CONSUMER_RESOURCE_TO_ACLS.entrySet().stream().collect(Collectors.toMap(
Map.Entry::getKey,
entry -> {
Set<AccessControlEntry> value = new HashSet<>(entry.getValue());
value.addAll(producerResourceToAcls(true).getOrDefault(entry.getKey(), Set.of()));
return value;
}
))
);
@ClusterTest
public void testAclCliWithAdminAPI(ClusterInstance cluster) throws InterruptedException {
@ -295,7 +280,7 @@ public class AclCommandTest {
@Test
public void testUseBootstrapServerOptWithBootstrapControllerOpt() {
assertInitializeInvalidOptionsExitCodeAndMsg(
Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, BOOTSTRAP_CONTROLLER, LOCALHOST),
List.of(BOOTSTRAP_SERVER, LOCALHOST, BOOTSTRAP_CONTROLLER, LOCALHOST),
"Only one of --bootstrap-server or --bootstrap-controller must be specified"
);
}
@ -303,7 +288,7 @@ public class AclCommandTest {
@Test
public void testUseWithoutBootstrapServerOptAndBootstrapControllerOpt() {
assertInitializeInvalidOptionsExitCodeAndMsg(
Collections.emptyList(),
List.of(ADD),
"One of --bootstrap-server or --bootstrap-controller must be specified"
);
}
@ -311,14 +296,14 @@ public class AclCommandTest {
@Test
public void testExactlyOneAction() {
String errMsg = "Command must include exactly one action: --list, --add, --remove. ";
assertInitializeInvalidOptionsExitCodeAndMsg(Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, LIST), errMsg);
assertInitializeInvalidOptionsExitCodeAndMsg(Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, LIST, REMOVE), errMsg);
assertInitializeInvalidOptionsExitCodeAndMsg(List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, LIST), errMsg);
assertInitializeInvalidOptionsExitCodeAndMsg(List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, LIST, REMOVE), errMsg);
}
@Test
public void testUseListPrincipalsOptWithoutListOpt() {
assertInitializeInvalidOptionsExitCodeAndMsg(
Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, "--principal", "User:CN=client"),
List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, "--principal", "User:CN=client"),
"The --principal option is only available if --list is set"
);
}
@ -326,7 +311,7 @@ public class AclCommandTest {
@Test
public void testUseProducerOptWithoutTopicOpt() {
assertInitializeInvalidOptionsExitCodeAndMsg(
Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, PRODUCER),
List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, PRODUCER),
"With --producer you must specify a --topic"
);
}
@ -334,7 +319,7 @@ public class AclCommandTest {
@Test
public void testUseIdempotentOptWithoutProducerOpt() {
assertInitializeInvalidOptionsExitCodeAndMsg(
Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, IDEMPOTENT),
List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, IDEMPOTENT),
"The --idempotent option is only available if --producer is set"
);
}
@ -342,24 +327,24 @@ public class AclCommandTest {
@Test
public void testUseConsumerOptWithoutRequiredOpt() {
assertInitializeInvalidOptionsExitCodeAndMsg(
Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, CONSUMER),
List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, CONSUMER),
"With --consumer you must specify a --topic and a --group and no --cluster or --transactional-id option should be specified."
);
checkNotThrow(Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, CONSUMER, TOPIC, "test-topic", GROUP, "test-group"));
checkNotThrow(List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, CONSUMER, TOPIC, "test-topic", GROUP, "test-group"));
}
@Test
public void testInvalidArgs() {
assertInitializeInvalidOptionsExitCodeAndMsg(
Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, LIST, PRODUCER),
List.of(BOOTSTRAP_SERVER, LOCALHOST, LIST, PRODUCER),
"Option \"[list]\" can't be used with option \"[producer]\""
);
assertInitializeInvalidOptionsExitCodeAndMsg(
Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, PRODUCER, OPERATION),
List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, PRODUCER, OPERATION, "all"),
"Option \"[producer]\" can't be used with option \"[operation]\""
);
assertInitializeInvalidOptionsExitCodeAndMsg(
Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, CONSUMER, OPERATION, TOPIC, "test-topic", GROUP, "test-group"),
List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, CONSUMER, OPERATION, TOPIC, "test-topic", GROUP, "test-group"),
"Option \"[consumer]\" can't be used with option \"[operation]\""
);
}
@ -394,8 +379,7 @@ public class AclCommandTest {
}
private void testAclsOnPrefixedResources(ClusterInstance cluster, List<String> cmdArgs) throws InterruptedException {
List<String> cmd = Arrays.asList("--allow-principal", PRINCIPAL.toString(), PRODUCER, TOPIC, "Test-",
RESOURCE_PATTERN_TYPE, "Prefixed");
List<String> cmd = List.of("--allow-principal", PRINCIPAL.toString(), PRODUCER, TOPIC, "Test-", RESOURCE_PATTERN_TYPE, "Prefixed");
List<String> args = new ArrayList<>(cmdArgs);
args.addAll(cmd);
@ -406,7 +390,7 @@ public class AclCommandTest {
AccessControlEntry describeAcl = new AccessControlEntry(PRINCIPAL.toString(), WILDCARD_HOST, DESCRIBE, ALLOW);
AccessControlEntry createAcl = new AccessControlEntry(PRINCIPAL.toString(), WILDCARD_HOST, CREATE, ALLOW);
cluster.waitAcls(new AclBindingFilter(new ResourcePattern(ResourceType.TOPIC, "Test-", PREFIXED).toFilter(), ANY),
Arrays.asList(writeAcl, describeAcl, createAcl));
List.of(writeAcl, describeAcl, createAcl));
args = new ArrayList<>(cmdArgs);
args.addAll(cmd);
@ -415,45 +399,42 @@ public class AclCommandTest {
callMain(args);
cluster.waitAcls(new AclBindingFilter(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PREFIXED).toFilter(), ANY),
Collections.emptySet());
Set.of());
cluster.waitAcls(new AclBindingFilter(new ResourcePattern(ResourceType.TOPIC, "Test-", PREFIXED).toFilter(), ANY),
Collections.emptySet());
Set.of());
}
private static Map<Set<ResourcePattern>, Set<AccessControlEntry>> producerResourceToAcls(boolean enableIdempotence) {
Map<Set<ResourcePattern>, Set<AccessControlEntry>> result = new HashMap<>();
result.put(TOPIC_RESOURCES, asJavaSet(AclCommand.getAcls(asScalaSet(USERS), ALLOW, asScalaSet(
new HashSet<>(Arrays.asList(WRITE, DESCRIBE, CREATE))), asScalaSet(HOSTS))));
result.put(TRANSACTIONAL_ID_RESOURCES, asJavaSet(AclCommand.getAcls(asScalaSet(USERS), ALLOW, asScalaSet(
new HashSet<>(Arrays.asList(WRITE, DESCRIBE))), asScalaSet(HOSTS))));
result.put(Collections.singleton(CLUSTER_RESOURCE), asJavaSet(AclCommand.getAcls(asScalaSet(USERS), ALLOW,
return Map.of(
TOPIC_RESOURCES, AclCommand.getAcls(USERS, ALLOW, Set.of(WRITE, DESCRIBE, CREATE), HOSTS),
TRANSACTIONAL_ID_RESOURCES, AclCommand.getAcls(USERS, ALLOW, Set.of(WRITE, DESCRIBE), HOSTS),
Set.of(CLUSTER_RESOURCE), AclCommand.getAcls(USERS, ALLOW,
enableIdempotence
? asScalaSet(Collections.singleton(IDEMPOTENT_WRITE))
: asScalaSet(Collections.emptySet()), asScalaSet(HOSTS))));
return result;
? Set.of(IDEMPOTENT_WRITE)
: Set.of(), HOSTS));
}
private List<String> adminArgs(String bootstrapServer, Optional<File> commandConfig) {
List<String> adminArgs = new ArrayList<>(Arrays.asList(BOOTSTRAP_SERVER, bootstrapServer));
commandConfig.ifPresent(file -> adminArgs.addAll(Arrays.asList(COMMAND_CONFIG, file.getAbsolutePath())));
List<String> adminArgs = new ArrayList<>(List.of(BOOTSTRAP_SERVER, bootstrapServer));
commandConfig.ifPresent(file -> adminArgs.addAll(List.of(COMMAND_CONFIG, file.getAbsolutePath())));
return adminArgs;
}
private List<String> adminArgsWithBootstrapController(String bootstrapController, Optional<File> commandConfig) {
List<String> adminArgs = new ArrayList<>(Arrays.asList(BOOTSTRAP_CONTROLLER, bootstrapController));
commandConfig.ifPresent(file -> adminArgs.addAll(Arrays.asList(COMMAND_CONFIG, file.getAbsolutePath())));
List<String> adminArgs = new ArrayList<>(List.of(BOOTSTRAP_CONTROLLER, bootstrapController));
commandConfig.ifPresent(file -> adminArgs.addAll(List.of(COMMAND_CONFIG, file.getAbsolutePath())));
return adminArgs;
}
private Map.Entry<String, String> callMain(List<String> args) {
return grabConsoleOutputAndError(() -> AclCommand.main(args.toArray(new String[0])));
return ToolsTestUtils.grabConsoleOutputAndError(() -> AclCommand.main(args.toArray(new String[0])));
}
private void testAclCli(ClusterInstance cluster, List<String> cmdArgs) throws InterruptedException {
for (Map.Entry<Set<ResourcePattern>, List<String>> entry : RESOURCE_TO_COMMAND.entrySet()) {
Set<ResourcePattern> resources = entry.getKey();
List<String> resourceCmd = entry.getValue();
Set<AclPermissionType> permissionTypes = new HashSet<>(Arrays.asList(ALLOW, DENY));
Set<AclPermissionType> permissionTypes = Set.of(ALLOW, DENY);
for (AclPermissionType permissionType : permissionTypes) {
Map.Entry<Set<AclOperation>, List<String>> operationToCmd = RESOURCE_TO_OPERATIONS.get(resources);
Map.Entry<Set<AccessControlEntry>, List<String>> aclToCommand = getAclToCommand(permissionType, operationToCmd.getKey());
@ -494,8 +475,8 @@ public class AclCommandTest {
String resourceType = resource.resourceType().toString();
List<String> cmd = resource == CLUSTER_RESOURCE
? Collections.singletonList("kafka-cluster")
: resourceCmd.stream().filter(s -> !s.startsWith("--")).collect(Collectors.toList());
? List.of("kafka-cluster")
: resourceCmd.stream().filter(s -> !s.startsWith("--")).toList();
cmd.forEach(name -> {
String expected = String.format(
@ -509,36 +490,37 @@ public class AclCommandTest {
private void testPatternTypes(List<String> cmdArgs) {
Exit.setExitProcedure((status, message) -> {
if ((int) status == 1)
if (status == 1)
throw new RuntimeException("Exiting command");
else
throw new AssertionError("Unexpected exit with status " + status);
});
try {
Arrays.stream(PatternType.values()).sequential().forEach(patternType -> {
for (PatternType patternType : PatternType.values()) {
List<String> addCmd = new ArrayList<>(cmdArgs);
addCmd.addAll(Arrays.asList("--allow-principal", PRINCIPAL.toString(), PRODUCER, TOPIC, "Test",
addCmd.addAll(List.of("--allow-principal", PRINCIPAL.toString(), PRODUCER, TOPIC, "Test",
ADD, RESOURCE_PATTERN_TYPE, patternType.toString()));
verifyPatternType(addCmd, patternType.isSpecific());
List<String> listCmd = new ArrayList<>(cmdArgs);
listCmd.addAll(Arrays.asList(TOPIC, "Test", LIST, RESOURCE_PATTERN_TYPE, patternType.toString()));
listCmd.addAll(List.of(TOPIC, "Test", LIST, RESOURCE_PATTERN_TYPE, patternType.toString()));
verifyPatternType(listCmd, patternType != PatternType.UNKNOWN);
List<String> removeCmd = new ArrayList<>(cmdArgs);
removeCmd.addAll(Arrays.asList(TOPIC, "Test", "--force", REMOVE, RESOURCE_PATTERN_TYPE, patternType.toString()));
removeCmd.addAll(List.of(TOPIC, "Test", "--force", REMOVE, RESOURCE_PATTERN_TYPE, patternType.toString()));
verifyPatternType(removeCmd, patternType != PatternType.UNKNOWN);
});
}
} finally {
Exit.resetExitProcedure();
}
}
private void verifyPatternType(List<String> cmd, boolean isValid) {
if (isValid)
if (isValid) {
callMain(cmd);
else
} else {
assertThrows(RuntimeException.class, () -> callMain(cmd));
}
}
private void testRemove(
@ -554,7 +536,7 @@ public class AclCommandTest {
Map.Entry<String, String> out = callMain(args);
assertEquals("", out.getValue());
for (ResourcePattern resource : resources) {
cluster.waitAcls(new AclBindingFilter(resource.toFilter(), ANY), Collections.emptySet());
cluster.waitAcls(new AclBindingFilter(resource.toFilter(), ANY), Set.of());
}
}
@ -562,8 +544,8 @@ public class AclCommandTest {
AclPermissionType permissionType,
Set<AclOperation> operations
) {
return new SimpleImmutableEntry<>(
asJavaSet(AclCommand.getAcls(asScalaSet(USERS), permissionType, asScalaSet(operations), asScalaSet(HOSTS))),
return Map.entry(
AclCommand.getAcls(USERS, permissionType, operations, HOSTS),
getCmd(permissionType)
);
}
@ -575,44 +557,12 @@ public class AclCommandTest {
List<String> fullCmd = new ArrayList<>();
for (KafkaPrincipal user : USERS) {
fullCmd.addAll(cmd);
fullCmd.addAll(Arrays.asList(principalCmd, user.toString()));
fullCmd.addAll(List.of(principalCmd, user.toString()));
}
return fullCmd;
}
/**
* Capture both the console output and console error during the execution of the provided function.
*/
private static Map.Entry<String, String> grabConsoleOutputAndError(Runnable runnable) {
ByteArrayOutputStream outBuf = new ByteArrayOutputStream();
ByteArrayOutputStream errBuf = new ByteArrayOutputStream();
PrintStream out = new PrintStream(outBuf);
PrintStream err = new PrintStream(errBuf);
try {
Console.withOut(out, () -> {
Console.withErr(err, () -> {
runnable.run();
return null;
});
return null;
});
} finally {
out.flush();
err.flush();
}
return new SimpleImmutableEntry<>(outBuf.toString(), errBuf.toString());
}
private static <T> scala.collection.immutable.Set<T> asScalaSet(Set<T> javaSet) {
return CollectionConverters.asScala(javaSet).toSet();
}
private static <T> Set<T> asJavaSet(scala.collection.immutable.Set<T> scalaSet) {
return CollectionConverters.asJava(scalaSet);
}
private void assertInitializeInvalidOptionsExitCodeAndMsg(List<String> args, String expectedMsg) {
Exit.setExitProcedure((exitCode, message) -> {
assertEquals(1, exitCode);
@ -620,7 +570,7 @@ public class AclCommandTest {
throw new RuntimeException();
});
try {
assertThrows(RuntimeException.class, () -> new AclCommandOptions(args.toArray(new String[0])).checkArgs());
assertThrows(RuntimeException.class, () -> new AclCommand.AclCommandOptions(args.toArray(new String[0])).checkArgs());
} finally {
Exit.resetExitProcedure();
}
@ -628,15 +578,15 @@ public class AclCommandTest {
private void checkNotThrow(List<String> args) {
AtomicReference<Integer> exitStatus = new AtomicReference<>();
org.apache.kafka.common.utils.Exit.setExitProcedure((status, __) -> {
Exit.setExitProcedure((status, __) -> {
exitStatus.set(status);
throw new RuntimeException();
});
try {
assertDoesNotThrow(() -> new AclCommandOptions(args.toArray(new String[0])).checkArgs());
assertDoesNotThrow(() -> new AclCommand.AclCommandOptions(args.toArray(new String[0])).checkArgs());
assertNull(exitStatus.get());
} finally {
org.apache.kafka.common.utils.Exit.resetExitProcedure();
Exit.resetExitProcedure();
}
}
}