diff --git a/bin/kafka-acls.sh b/bin/kafka-acls.sh
index 8fa65542e10..ffbb1e19810 100755
--- a/bin/kafka-acls.sh
+++ b/bin/kafka-acls.sh
@@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-exec $(dirname $0)/kafka-run-class.sh kafka.admin.AclCommand "$@"
+exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.AclCommand "$@"
diff --git a/bin/windows/kafka-acls.bat b/bin/windows/kafka-acls.bat
index 8f0be85c045..12c4a9a69a7 100644
--- a/bin/windows/kafka-acls.bat
+++ b/bin/windows/kafka-acls.bat
@@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
-"%~dp0kafka-run-class.bat" kafka.admin.AclCommand %*
+"%~dp0kafka-run-class.bat" org.apache.kafka.tools.AclCommand %*
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 0b9e7dd717b..e3382fb3db3 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -290,6 +290,8 @@
+
+
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 2f8ff2d84c2..2d05fac1e70 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -269,11 +269,11 @@
+ files="(AclCommand|ConsoleConsumer|DefaultMessageFormatter|StreamsResetter|ProducerPerformance|Agent).java"/>
+ files="(AclCommand|DefaultMessageFormatter|ProducerPerformance|StreamsResetter|Agent|TransactionalMessageCopier|ReplicaVerificationTool|LineMessageReader).java"/>
JResource, ResourceType => JResourceType}
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.utils.{Exit, Utils, SecurityUtils => JSecurityUtils}
-import org.apache.kafka.security.authorizer.AclEntry
-import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
-
-import scala.jdk.CollectionConverters._
-import scala.collection.mutable
-import scala.io.StdIn
-
-object AclCommand extends Logging {
-
- private val ClusterResourceFilter = new ResourcePatternFilter(JResourceType.CLUSTER, JResource.CLUSTER_NAME, PatternType.LITERAL)
-
- private val Newline = scala.util.Properties.lineSeparator
-
- def main(args: Array[String]): Unit = {
-
- val opts = new AclCommandOptions(args)
-
- CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to manage acls on kafka.")
-
- opts.checkArgs()
-
- val aclCommandService = new AdminClientService(opts)
-
- try {
- if (opts.options.has(opts.addOpt))
- aclCommandService.addAcls()
- else if (opts.options.has(opts.removeOpt))
- aclCommandService.removeAcls()
- else if (opts.options.has(opts.listOpt))
- aclCommandService.listAcls()
- } catch {
- case e: Throwable =>
- println(s"Error while executing ACL command: ${e.getMessage}")
- println(Utils.stackTrace(e))
- Exit.exit(1)
- }
- }
-
- private class AdminClientService(val opts: AclCommandOptions) extends Logging {
-
- private def withAdminClient(opts: AclCommandOptions)(f: Admin => Unit): Unit = {
- val props = if (opts.options.has(opts.commandConfigOpt))
- Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
- else
- new Properties()
-
- if (opts.options.has(opts.bootstrapServerOpt)) {
- props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
- } else {
- props.put(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, opts.options.valueOf(opts.bootstrapControllerOpt))
- }
- val adminClient = Admin.create(props)
-
- try {
- f(adminClient)
- } finally {
- adminClient.close()
- }
- }
-
- def addAcls(): Unit = {
- val resourceToAcl = getResourceToAcls(opts)
- withAdminClient(opts) { adminClient =>
- for ((resource, acls) <- resourceToAcl) {
- println(s"Adding ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
- val aclBindings = acls.map(acl => new AclBinding(resource, acl)).asJavaCollection
- adminClient.createAcls(aclBindings).all().get()
- }
- }
- }
-
- def removeAcls(): Unit = {
- withAdminClient(opts) { adminClient =>
- val filterToAcl = getResourceFilterToAcls(opts)
-
- for ((filter, acls) <- filterToAcl) {
- if (acls.isEmpty) {
- if (confirmAction(opts, s"Are you sure you want to delete all ACLs for resource filter `$filter`? (y/n)"))
- removeAcls(adminClient, acls, filter)
- } else {
- if (confirmAction(opts, s"Are you sure you want to remove ACLs: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline from resource filter `$filter`? (y/n)"))
- removeAcls(adminClient, acls, filter)
- }
- }
- }
- }
-
- def listAcls(): Unit = {
- withAdminClient(opts) { adminClient =>
- listAcls(adminClient)
- }
- }
-
- private def listAcls(adminClient: Admin): Unit = {
- val filters = getResourceFilter(opts, dieIfNoResourceFound = false)
- val listPrincipals = getPrincipals(opts, opts.listPrincipalsOpt)
- val resourceToAcls = getAcls(adminClient, filters)
-
- if (listPrincipals.isEmpty) {
- printResourceAcls(resourceToAcls)
- } else {
- listPrincipals.foreach{principal =>
- println(s"ACLs for principal `$principal`")
- val filteredResourceToAcls = resourceToAcls.map { case (resource, acls) =>
- resource -> acls.filter(acl => principal.toString.equals(acl.principal))
- }.filter { case (_, acls) => acls.nonEmpty }
- printResourceAcls(filteredResourceToAcls)
- }
- }
- }
-
- private def printResourceAcls(resourceToAcls: Map[ResourcePattern, Set[AccessControlEntry]]): Unit = {
- for ((resource, acls) <- resourceToAcls)
- println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline")
- }
-
- private def removeAcls(adminClient: Admin, acls: Set[AccessControlEntry], filter: ResourcePatternFilter): Unit = {
- if (acls.isEmpty)
- adminClient.deleteAcls(List(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).asJava).all().get()
- else {
- val aclBindingFilters = acls.map(acl => new AclBindingFilter(filter, acl.toFilter)).toList.asJava
- adminClient.deleteAcls(aclBindingFilters).all().get()
- }
- }
-
- private def getAcls(adminClient: Admin, filters: Set[ResourcePatternFilter]): Map[ResourcePattern, Set[AccessControlEntry]] = {
- val aclBindings =
- if (filters.isEmpty) adminClient.describeAcls(AclBindingFilter.ANY).values().get().asScala.toList
- else {
- val results = for (filter <- filters) yield {
- adminClient.describeAcls(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values().get().asScala.toList
- }
- results.reduceLeft(_ ++ _)
- }
-
- val resourceToAcls = mutable.Map[ResourcePattern, Set[AccessControlEntry]]().withDefaultValue(Set())
-
- aclBindings.foreach(aclBinding => resourceToAcls(aclBinding.pattern()) = resourceToAcls(aclBinding.pattern()) + aclBinding.entry())
- resourceToAcls.toMap
- }
- }
-
- private def getResourceToAcls(opts: AclCommandOptions): Map[ResourcePattern, Set[AccessControlEntry]] = {
- val patternType = opts.options.valueOf(opts.resourcePatternType)
- if (!patternType.isSpecific)
- CommandLineUtils.printUsageAndExit(opts.parser, s"A '--resource-pattern-type' value of '$patternType' is not valid when adding acls.")
-
- val resourceToAcl = getResourceFilterToAcls(opts).map {
- case (filter, acls) =>
- new ResourcePattern(filter.resourceType(), filter.name(), filter.patternType()) -> acls
- }
-
- if (resourceToAcl.values.exists(_.isEmpty))
- CommandLineUtils.printUsageAndExit(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add ACLs.")
-
- resourceToAcl
- }
-
- private def getResourceFilterToAcls(opts: AclCommandOptions): Map[ResourcePatternFilter, Set[AccessControlEntry]] = {
- var resourceToAcls = Map.empty[ResourcePatternFilter, Set[AccessControlEntry]]
-
- //if none of the --producer or --consumer options are specified , just construct ACLs from CLI options.
- if (!opts.options.has(opts.producerOpt) && !opts.options.has(opts.consumerOpt)) {
- resourceToAcls ++= getCliResourceFilterToAcls(opts)
- }
-
- //users are allowed to specify both --producer and --consumer options in a single command.
- if (opts.options.has(opts.producerOpt))
- resourceToAcls ++= getProducerResourceFilterToAcls(opts)
-
- if (opts.options.has(opts.consumerOpt))
- resourceToAcls ++= getConsumerResourceFilterToAcls(opts).map { case (k, v) => k -> (v ++ resourceToAcls.getOrElse(k, Set.empty[AccessControlEntry])) }
-
- validateOperation(opts, resourceToAcls)
-
- resourceToAcls
- }
-
- private def getProducerResourceFilterToAcls(opts: AclCommandOptions): Map[ResourcePatternFilter, Set[AccessControlEntry]] = {
- val filters = getResourceFilter(opts)
-
- val topics = filters.filter(_.resourceType == JResourceType.TOPIC)
- val transactionalIds = filters.filter(_.resourceType == JResourceType.TRANSACTIONAL_ID)
- val enableIdempotence = opts.options.has(opts.idempotentOpt)
-
- val topicAcls = getAcl(opts, Set(WRITE, DESCRIBE, CREATE))
- val transactionalIdAcls = getAcl(opts, Set(WRITE, DESCRIBE))
-
- //Write, Describe, Create permission on topics, Write, Describe on transactionalIds
- topics.map(_ -> topicAcls).toMap ++
- transactionalIds.map(_ -> transactionalIdAcls).toMap ++
- (if (enableIdempotence)
- Map(ClusterResourceFilter -> getAcl(opts, Set(IDEMPOTENT_WRITE)))
- else
- Map.empty)
- }
-
- private def getConsumerResourceFilterToAcls(opts: AclCommandOptions): Map[ResourcePatternFilter, Set[AccessControlEntry]] = {
- val filters = getResourceFilter(opts)
-
- val topics = filters.filter(_.resourceType == JResourceType.TOPIC)
- val groups = filters.filter(_.resourceType == JResourceType.GROUP)
-
- //Read, Describe on topic, Read on consumerGroup
-
- val acls = getAcl(opts, Set(READ, DESCRIBE))
-
- topics.map(_ -> acls).toMap[ResourcePatternFilter, Set[AccessControlEntry]] ++
- groups.map(_ -> getAcl(opts, Set(READ))).toMap[ResourcePatternFilter, Set[AccessControlEntry]]
- }
-
- private def getCliResourceFilterToAcls(opts: AclCommandOptions): Map[ResourcePatternFilter, Set[AccessControlEntry]] = {
- val acls = getAcl(opts)
- val filters = getResourceFilter(opts)
- filters.map(_ -> acls).toMap
- }
-
- private def getAcl(opts: AclCommandOptions, operations: Set[AclOperation]): Set[AccessControlEntry] = {
- val allowedPrincipals = getPrincipals(opts, opts.allowPrincipalsOpt)
-
- val deniedPrincipals = getPrincipals(opts, opts.denyPrincipalsOpt)
-
- val allowedHosts = getHosts(opts, opts.allowHostsOpt, opts.allowPrincipalsOpt)
-
- val deniedHosts = getHosts(opts, opts.denyHostsOpt, opts.denyPrincipalsOpt)
-
- val acls = new collection.mutable.HashSet[AccessControlEntry]
- if (allowedHosts.nonEmpty && allowedPrincipals.nonEmpty)
- acls ++= getAcls(allowedPrincipals, ALLOW, operations, allowedHosts)
-
- if (deniedHosts.nonEmpty && deniedPrincipals.nonEmpty)
- acls ++= getAcls(deniedPrincipals, DENY, operations, deniedHosts)
-
- acls.toSet
- }
-
- private def getAcl(opts: AclCommandOptions): Set[AccessControlEntry] = {
- val operations = opts.options.valuesOf(opts.operationsOpt).asScala
- .map(operation => JSecurityUtils.operation(operation.trim)).toSet
- getAcl(opts, operations)
- }
-
- def getAcls(principals: Set[KafkaPrincipal], permissionType: AclPermissionType, operations: Set[AclOperation],
- hosts: Set[String]): Set[AccessControlEntry] = {
- for {
- principal <- principals
- operation <- operations
- host <- hosts
- } yield new AccessControlEntry(principal.toString, host, operation, permissionType)
- }
-
- private def getHosts(opts: AclCommandOptions, hostOptionSpec: OptionSpec[String],
- principalOptionSpec: OptionSpec[String]): Set[String] = {
- if (opts.options.has(hostOptionSpec))
- opts.options.valuesOf(hostOptionSpec).asScala.map(_.trim).toSet
- else if (opts.options.has(principalOptionSpec))
- Set[String](AclEntry.WILDCARD_HOST)
- else
- Set.empty[String]
- }
-
- private def getPrincipals(opts: AclCommandOptions, principalOptionSpec: OptionSpec[String]): Set[KafkaPrincipal] = {
- if (opts.options.has(principalOptionSpec))
- opts.options.valuesOf(principalOptionSpec).asScala.map(s => JSecurityUtils.parseKafkaPrincipal(s.trim)).toSet
- else
- Set.empty[KafkaPrincipal]
- }
-
- private def getResourceFilter(opts: AclCommandOptions, dieIfNoResourceFound: Boolean = true): Set[ResourcePatternFilter] = {
- val patternType = opts.options.valueOf(opts.resourcePatternType)
-
- var resourceFilters = Set.empty[ResourcePatternFilter]
- if (opts.options.has(opts.topicOpt))
- opts.options.valuesOf(opts.topicOpt).forEach(topic => resourceFilters += new ResourcePatternFilter(JResourceType.TOPIC, topic.trim, patternType))
-
- if (patternType == PatternType.LITERAL && (opts.options.has(opts.clusterOpt) || opts.options.has(opts.idempotentOpt)))
- resourceFilters += ClusterResourceFilter
-
- if (opts.options.has(opts.groupOpt))
- opts.options.valuesOf(opts.groupOpt).forEach(group => resourceFilters += new ResourcePatternFilter(JResourceType.GROUP, group.trim, patternType))
-
- if (opts.options.has(opts.transactionalIdOpt))
- opts.options.valuesOf(opts.transactionalIdOpt).forEach(transactionalId =>
- resourceFilters += new ResourcePatternFilter(JResourceType.TRANSACTIONAL_ID, transactionalId, patternType))
-
- if (opts.options.has(opts.delegationTokenOpt))
- opts.options.valuesOf(opts.delegationTokenOpt).forEach(token => resourceFilters += new ResourcePatternFilter(JResourceType.DELEGATION_TOKEN, token.trim, patternType))
-
- if (opts.options.has(opts.userPrincipalOpt))
- opts.options.valuesOf(opts.userPrincipalOpt).forEach(user => resourceFilters += new ResourcePatternFilter(JResourceType.USER, user.trim, patternType))
-
- if (resourceFilters.isEmpty && dieIfNoResourceFound)
- CommandLineUtils.printUsageAndExit(opts.parser, "You must provide at least one resource: --topic or --cluster or --group or --delegation-token ")
-
- resourceFilters
- }
-
- private def confirmAction(opts: AclCommandOptions, msg: String): Boolean = {
- if (opts.options.has(opts.forceOpt))
- return true
- println(msg)
- StdIn.readLine().equalsIgnoreCase("y")
- }
-
- private def validateOperation(opts: AclCommandOptions, resourceToAcls: Map[ResourcePatternFilter, Set[AccessControlEntry]]): Unit = {
- for ((resource, acls) <- resourceToAcls) {
- val validOps = AclEntry.supportedOperations(resource.resourceType).asScala.toSet + AclOperation.ALL
- if ((acls.map(_.operation) -- validOps).nonEmpty)
- CommandLineUtils.printUsageAndExit(opts.parser, s"ResourceType ${resource.resourceType} only supports operations ${validOps.map(JSecurityUtils.operationName).mkString(", ")}")
- }
- }
-
- class AclCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
- val CommandConfigDoc = "A property file containing configs to be passed to Admin Client."
-
- val bootstrapServerOpt: OptionSpec[String] = parser.accepts("bootstrap-server", "A list of host/port pairs to use for establishing the connection to the Kafka cluster." +
- " This list should be in the form host1:port1,host2:port2,... This config is required for acl management using admin client API.")
- .withRequiredArg
- .describedAs("server to connect to")
- .ofType(classOf[String])
-
- val bootstrapControllerOpt: OptionSpec[String] = parser.accepts("bootstrap-controller", "A list of host/port pairs to use for establishing the connection to the Kafka cluster." +
- " This list should be in the form host1:port1,host2:port2,... This config is required for acl management using admin client API.")
- .withRequiredArg
- .describedAs("controller to connect to")
- .ofType(classOf[String])
-
- val commandConfigOpt: OptionSpec[String] = parser.accepts("command-config", CommandConfigDoc)
- .withOptionalArg()
- .describedAs("command-config")
- .ofType(classOf[String])
-
- val topicOpt: OptionSpec[String] = parser.accepts("topic", "topic to which ACLs should be added or removed. " +
- "A value of '*' indicates ACL should apply to all topics.")
- .withRequiredArg
- .describedAs("topic")
- .ofType(classOf[String])
-
- val clusterOpt: OptionSpecBuilder = parser.accepts("cluster", "Add/Remove cluster ACLs.")
- val groupOpt: OptionSpec[String] = parser.accepts("group", "Consumer Group to which the ACLs should be added or removed. " +
- "A value of '*' indicates the ACLs should apply to all groups.")
- .withRequiredArg
- .describedAs("group")
- .ofType(classOf[String])
-
- val transactionalIdOpt: OptionSpec[String] = parser.accepts("transactional-id", "The transactionalId to which ACLs should " +
- "be added or removed. A value of '*' indicates the ACLs should apply to all transactionalIds.")
- .withRequiredArg
- .describedAs("transactional-id")
- .ofType(classOf[String])
-
- val idempotentOpt: OptionSpecBuilder = parser.accepts("idempotent", "Enable idempotence for the producer. This should be " +
- "used in combination with the --producer option. Note that idempotence is enabled automatically if " +
- "the producer is authorized to a particular transactional-id.")
-
- val delegationTokenOpt: OptionSpec[String] = parser.accepts("delegation-token", "Delegation token to which ACLs should be added or removed. " +
- "A value of '*' indicates ACL should apply to all tokens.")
- .withRequiredArg
- .describedAs("delegation-token")
- .ofType(classOf[String])
-
- val resourcePatternType: OptionSpec[PatternType] = parser.accepts("resource-pattern-type", "The type of the resource pattern or pattern filter. " +
- "When adding acls, this should be a specific pattern type, e.g. 'literal' or 'prefixed'. " +
- "When listing or removing acls, a specific pattern type can be used to list or remove acls from specific resource patterns, " +
- "or use the filter values of 'any' or 'match', where 'any' will match any pattern type, but will match the resource name exactly, " +
- "where as 'match' will perform pattern matching to list or remove all acls that affect the supplied resource(s). " +
- "WARNING: 'match', when used in combination with the '--remove' switch, should be used with care.")
- .withRequiredArg()
- .ofType(classOf[String])
- .withValuesConvertedBy(new PatternTypeConverter())
- .defaultsTo(PatternType.LITERAL)
-
- val addOpt: OptionSpecBuilder = parser.accepts("add", "Indicates you are trying to add ACLs.")
- val removeOpt: OptionSpecBuilder = parser.accepts("remove", "Indicates you are trying to remove ACLs.")
- val listOpt: OptionSpecBuilder = parser.accepts("list", "List ACLs for the specified resource, use --topic or --group or --cluster to specify a resource.")
-
- val operationsOpt: OptionSpec[String] = parser.accepts("operation", "Operation that is being allowed or denied. Valid operation names are: " + Newline +
- AclEntry.ACL_OPERATIONS.asScala.map("\t" + JSecurityUtils.operationName(_)).mkString(Newline) + Newline)
- .withRequiredArg
- .ofType(classOf[String])
- .defaultsTo(JSecurityUtils.operationName(AclOperation.ALL))
-
- val allowPrincipalsOpt: OptionSpec[String] = parser.accepts("allow-principal", "principal is in principalType:name format." +
- " Note that principalType must be supported by the Authorizer being used." +
- " For example, User:'*' is the wild card indicating all users.")
- .withRequiredArg
- .describedAs("allow-principal")
- .ofType(classOf[String])
-
- val denyPrincipalsOpt: OptionSpec[String] = parser.accepts("deny-principal", "principal is in principalType:name format. " +
- "By default anyone not added through --allow-principal is denied access. " +
- "You only need to use this option as negation to already allowed set. " +
- "Note that principalType must be supported by the Authorizer being used. " +
- "For example if you wanted to allow access to all users in the system but not test-user you can define an ACL that " +
- "allows access to User:'*' and specify --deny-principal=User:test@EXAMPLE.COM. " +
- "AND PLEASE REMEMBER DENY RULES TAKES PRECEDENCE OVER ALLOW RULES.")
- .withRequiredArg
- .describedAs("deny-principal")
- .ofType(classOf[String])
-
- val listPrincipalsOpt: OptionSpec[String] = parser.accepts("principal", "List ACLs for the specified principal. principal is in principalType:name format." +
- " Note that principalType must be supported by the Authorizer being used. Multiple --principal option can be passed.")
- .withOptionalArg()
- .describedAs("principal")
- .ofType(classOf[String])
-
- val allowHostsOpt: OptionSpec[String] = parser.accepts("allow-host", "Host from which principals listed in --allow-principal will have access. " +
- "If you have specified --allow-principal then the default for this option will be set to '*' which allows access from all hosts.")
- .withRequiredArg
- .describedAs("allow-host")
- .ofType(classOf[String])
-
- val denyHostsOpt: OptionSpec[String] = parser.accepts("deny-host", "Host from which principals listed in --deny-principal will be denied access. " +
- "If you have specified --deny-principal then the default for this option will be set to '*' which denies access from all hosts.")
- .withRequiredArg
- .describedAs("deny-host")
- .ofType(classOf[String])
-
- val producerOpt: OptionSpecBuilder = parser.accepts("producer", "Convenience option to add/remove ACLs for producer role. " +
- "This will generate ACLs that allows WRITE,DESCRIBE and CREATE on topic.")
-
- val consumerOpt: OptionSpecBuilder = parser.accepts("consumer", "Convenience option to add/remove ACLs for consumer role. " +
- "This will generate ACLs that allows READ,DESCRIBE on topic and READ on group.")
-
- val forceOpt: OptionSpecBuilder = parser.accepts("force", "Assume Yes to all queries and do not prompt.")
-
- val userPrincipalOpt: OptionSpec[String] = parser.accepts("user-principal", "Specifies a user principal as a resource in relation with the operation. For instance " +
- "one could grant CreateTokens or DescribeTokens permission on a given user principal.")
- .withRequiredArg()
- .describedAs("user-principal")
- .ofType(classOf[String])
-
- options = parser.parse(args: _*)
-
- def checkArgs(): Unit = {
- if (options.has(bootstrapServerOpt) && options.has(bootstrapControllerOpt))
- CommandLineUtils.printUsageAndExit(parser, "Only one of --bootstrap-server or --bootstrap-controller must be specified")
-
- if (!options.has(bootstrapServerOpt) && !options.has(bootstrapControllerOpt))
- CommandLineUtils.printUsageAndExit(parser, "One of --bootstrap-server or --bootstrap-controller must be specified")
-
- val actions = Seq(addOpt, removeOpt, listOpt).count(options.has)
- if (actions != 1)
- CommandLineUtils.printUsageAndExit(parser, "Command must include exactly one action: --list, --add, --remove. ")
-
- CommandLineUtils.checkInvalidArgs(parser, options, listOpt, producerOpt, consumerOpt, allowHostsOpt, allowPrincipalsOpt, denyHostsOpt, denyPrincipalsOpt)
-
- //when --producer or --consumer is specified , user should not specify operations as they are inferred and we also disallow --deny-principals and --deny-hosts.
- CommandLineUtils.checkInvalidArgs(parser, options, producerOpt, operationsOpt, denyPrincipalsOpt, denyHostsOpt)
- CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt, operationsOpt, denyPrincipalsOpt, denyHostsOpt)
-
- if (options.has(listPrincipalsOpt) && !options.has(listOpt))
- CommandLineUtils.printUsageAndExit(parser, "The --principal option is only available if --list is set")
-
- if (options.has(producerOpt) && !options.has(topicOpt))
- CommandLineUtils.printUsageAndExit(parser, "With --producer you must specify a --topic")
-
- if (options.has(idempotentOpt) && !options.has(producerOpt))
- CommandLineUtils.printUsageAndExit(parser, "The --idempotent option is only available if --producer is set")
-
- if (options.has(consumerOpt) && (!options.has(topicOpt) || !options.has(groupOpt) || (!options.has(producerOpt) && (options.has(clusterOpt) || options.has(transactionalIdOpt)))))
- CommandLineUtils.printUsageAndExit(parser, "With --consumer you must specify a --topic and a --group and no --cluster or --transactional-id option should be specified.")
- }
- }
-}
-
-class PatternTypeConverter extends EnumConverter[PatternType](classOf[PatternType]) {
-
- override def convert(value: String): PatternType = {
- val patternType = super.convert(value)
- if (patternType.isUnknown)
- throw new ValueConversionException("Unknown resource-pattern-type: " + value)
-
- patternType
- }
-
- override def valuePattern: String = PatternType.values
- .filter(_ != PatternType.UNKNOWN)
- .mkString("|")
-}
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 9560f060d0f..fe9336a04f4 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -58,7 +58,7 @@ import scala.jdk.CollectionConverters._
* brokers.
*
* To start brokers we need to set a cluster ACL, which happens optionally in KafkaServerTestHarness.
- * The remaining ACLs to enable access to producers and consumers are set here. To set ACLs, we use AclCommand directly.
+ * The remaining ACLs to enable access to producers and consumers are set here.
*
* Finally, we rely on SaslSetup to bootstrap and setup Kerberos. We don't use
* SaslTestHarness here directly because it extends QuorumTestHarness, and we
diff --git a/tools/src/main/java/org/apache/kafka/tools/AclCommand.java b/tools/src/main/java/org/apache/kafka/tools/AclCommand.java
new file mode 100644
index 00000000000..d54970ad042
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/AclCommand.java
@@ -0,0 +1,605 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AccessControlEntryFilter;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.Resource;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourcePatternFilter;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.security.authorizer.AclEntry;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import joptsimple.AbstractOptionSpec;
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+import joptsimple.OptionSpecBuilder;
+import joptsimple.ValueConversionException;
+import joptsimple.util.EnumConverter;
+
+import static org.apache.kafka.common.acl.AclOperation.CREATE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
+import static org.apache.kafka.common.acl.AclPermissionType.DENY;
+
+public class AclCommand {
+
+ private static final ResourcePatternFilter CLUSTER_RESOURCE_FILTER =
+ new ResourcePatternFilter(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL);
+ private static final String NL = System.lineSeparator();
+
+ public static void main(String[] args) {
+ AclCommandOptions opts = new AclCommandOptions(args);
+ AdminClientService aclCommandService = new AdminClientService(opts);
+ try (Admin admin = Admin.create(adminConfigs(opts))) {
+ if (opts.options.has(opts.addOpt)) {
+ aclCommandService.addAcls(admin);
+ } else if (opts.options.has(opts.removeOpt)) {
+ aclCommandService.removeAcls(admin);
+ } else if (opts.options.has(opts.listOpt)) {
+ aclCommandService.listAcls(admin);
+ }
+ } catch (Throwable e) {
+ System.out.println("Error while executing ACL command: " + e.getMessage());
+ System.out.println(Utils.stackTrace(e));
+ Exit.exit(1);
+ }
+ }
+
+ private static Properties adminConfigs(AclCommandOptions opts) throws IOException {
+ Properties props = new Properties();
+ if (opts.options.has(opts.commandConfigOpt)) {
+ props = Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt));
+ }
+ if (opts.options.has(opts.bootstrapServerOpt)) {
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt));
+ } else {
+ props.put(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, opts.options.valueOf(opts.bootstrapControllerOpt));
+ }
+ return props;
+ }
+
+ private static class AdminClientService {
+
+ private final AclCommandOptions opts;
+
+ AdminClientService(AclCommandOptions opts) {
+ this.opts = opts;
+ }
+
+ void addAcls(Admin admin) throws ExecutionException, InterruptedException {
+ Map> resourceToAcl = getResourceToAcls(opts);
+ for (Map.Entry> entry : resourceToAcl.entrySet()) {
+ ResourcePattern resource = entry.getKey();
+ Set acls = entry.getValue();
+ System.out.println("Adding ACLs for resource `" + resource + "`: " + NL + " " + acls.stream().map(a -> "\t" + a).collect(Collectors.joining(NL)) + NL);
+ Collection aclBindings = acls.stream().map(acl -> new AclBinding(resource, acl)).collect(Collectors.toList());
+ admin.createAcls(aclBindings).all().get();
+ }
+ }
+
+ void removeAcls(Admin admin) throws ExecutionException, InterruptedException {
+ Map> filterToAcl = getResourceFilterToAcls(opts);
+ for (Map.Entry> entry : filterToAcl.entrySet()) {
+ ResourcePatternFilter filter = entry.getKey();
+ Set acls = entry.getValue();
+ if (acls.isEmpty()) {
+ if (confirmAction(opts, "Are you sure you want to delete all ACLs for resource filter `" + filter + "`? (y/n)")) {
+ removeAcls(admin, acls, filter);
+ }
+ } else {
+ String msg = "Are you sure you want to remove ACLs: " + NL +
+ " " + acls.stream().map(a -> "\t" + a).collect(Collectors.joining(NL)) + NL +
+ " from resource filter `" + filter + "`? (y/n)";
+ if (confirmAction(opts, msg)) {
+ removeAcls(admin, acls, filter);
+ }
+ }
+ }
+ }
+
+ private void listAcls(Admin admin) throws ExecutionException, InterruptedException {
+ Set filters = getResourceFilter(opts, false);
+ Set listPrincipals = getPrincipals(opts, opts.listPrincipalsOpt);
+ Map> resourceToAcls = getAcls(admin, filters);
+
+ if (listPrincipals.isEmpty()) {
+ printResourceAcls(resourceToAcls);
+ } else {
+ listPrincipals.forEach(principal -> {
+ System.out.println("ACLs for principal `" + principal + "`");
+ Map> filteredResourceToAcls = resourceToAcls.entrySet().stream()
+ .map(entry -> {
+ ResourcePattern resource = entry.getKey();
+ Set acls = entry.getValue().stream()
+ .filter(acl -> principal.toString().equals(acl.principal()))
+ .collect(Collectors.toSet());
+ return new AbstractMap.SimpleEntry<>(resource, acls);
+ })
+ .filter(entry -> !entry.getValue().isEmpty())
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ printResourceAcls(filteredResourceToAcls);
+ });
+ }
+ }
+
+ private static void printResourceAcls(Map> resourceToAcls) {
+ resourceToAcls.forEach((resource, acls) ->
+ System.out.println("Current ACLs for resource `" + resource + "`:" + NL +
+ acls.stream().map(acl -> "\t" + acl).collect(Collectors.joining(NL)) + NL)
+ );
+ }
+
+ private static void removeAcls(Admin adminClient, Set acls, ResourcePatternFilter filter) throws ExecutionException, InterruptedException {
+ if (acls.isEmpty()) {
+ adminClient.deleteAcls(Collections.singletonList(new AclBindingFilter(filter, AccessControlEntryFilter.ANY))).all().get();
+ } else {
+ List aclBindingFilters = acls.stream().map(acl -> new AclBindingFilter(filter, acl.toFilter())).collect(Collectors.toList());
+ adminClient.deleteAcls(aclBindingFilters).all().get();
+ }
+ }
+
+ private Map> getAcls(Admin adminClient, Set filters) throws ExecutionException, InterruptedException {
+ Collection aclBindings;
+ if (filters.isEmpty()) {
+ aclBindings = adminClient.describeAcls(AclBindingFilter.ANY).values().get();
+ } else {
+ aclBindings = new ArrayList<>();
+ for (ResourcePatternFilter filter : filters) {
+ aclBindings.addAll(adminClient.describeAcls(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values().get());
+ }
+ }
+
+ Map> resourceToAcls = new HashMap<>();
+ for (AclBinding aclBinding : aclBindings) {
+ ResourcePattern resource = aclBinding.pattern();
+ Set acls = resourceToAcls.getOrDefault(resource, new HashSet<>());
+ acls.add(aclBinding.entry());
+ resourceToAcls.put(resource, acls);
+ }
+ return resourceToAcls;
+ }
+ }
+
+ private static Map> getResourceToAcls(AclCommandOptions opts) {
+ PatternType patternType = opts.options.valueOf(opts.resourcePatternType);
+ if (!patternType.isSpecific()) {
+ CommandLineUtils.printUsageAndExit(opts.parser, "A '--resource-pattern-type' value of '" + patternType + "' is not valid when adding acls.");
+ }
+ Map> resourceToAcl = getResourceFilterToAcls(opts).entrySet().stream()
+ .collect(Collectors.toMap(entry -> new ResourcePattern(entry.getKey().resourceType(), entry.getKey().name(), entry.getKey().patternType()),
+ Map.Entry::getValue));
+
+ if (resourceToAcl.values().stream().anyMatch(Set::isEmpty)) {
+ CommandLineUtils.printUsageAndExit(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add ACLs.");
+ }
+ return resourceToAcl;
+ }
+
+ private static Map> getResourceFilterToAcls(AclCommandOptions opts) {
+ Map> resourceToAcls = new HashMap<>();
+ //if none of the --producer or --consumer options are specified , just construct ACLs from CLI options.
+ if (!opts.options.has(opts.producerOpt) && !opts.options.has(opts.consumerOpt)) {
+ resourceToAcls.putAll(getCliResourceFilterToAcls(opts));
+ }
+ //users are allowed to specify both --producer and --consumer options in a single command.
+ if (opts.options.has(opts.producerOpt)) {
+ resourceToAcls.putAll(getProducerResourceFilterToAcls(opts));
+ }
+ if (opts.options.has(opts.consumerOpt)) {
+ getConsumerResourceFilterToAcls(opts).forEach((k, v) -> {
+ Set existingAcls = resourceToAcls.getOrDefault(k, new HashSet<>());
+ existingAcls.addAll(v);
+ resourceToAcls.put(k, existingAcls);
+ });
+ }
+ validateOperation(opts, resourceToAcls);
+ return resourceToAcls;
+ }
+
+ private static Map> getProducerResourceFilterToAcls(AclCommandOptions opts) {
+ Set filters = getResourceFilter(opts, true);
+
+ Set topics = filters.stream().filter(f -> f.resourceType() == ResourceType.TOPIC).collect(Collectors.toSet());
+ Set transactionalIds = filters.stream().filter(f -> f.resourceType() == ResourceType.TRANSACTIONAL_ID).collect(Collectors.toSet());
+ boolean enableIdempotence = opts.options.has(opts.idempotentOpt);
+
+ Set topicAcls = getAcl(opts, new HashSet<>(Arrays.asList(WRITE, DESCRIBE, CREATE)));
+ Set transactionalIdAcls = getAcl(opts, new HashSet<>(Arrays.asList(WRITE, DESCRIBE)));
+
+ //Write, Describe, Create permission on topics, Write, Describe on transactionalIds
+ Map> result = new HashMap<>();
+ for (ResourcePatternFilter topic : topics) {
+ result.put(topic, topicAcls);
+ }
+ for (ResourcePatternFilter transactionalId : transactionalIds) {
+ result.put(transactionalId, transactionalIdAcls);
+ }
+ if (enableIdempotence) {
+ result.put(CLUSTER_RESOURCE_FILTER, getAcl(opts, Collections.singleton(IDEMPOTENT_WRITE)));
+ }
+ return result;
+ }
+
+ private static Map> getConsumerResourceFilterToAcls(AclCommandOptions opts) {
+ Set filters = getResourceFilter(opts, true);
+ Set topics = filters.stream().filter(f -> f.resourceType() == ResourceType.TOPIC).collect(Collectors.toSet());
+ Set groups = filters.stream().filter(f -> f.resourceType() == ResourceType.GROUP).collect(Collectors.toSet());
+
+ //Read, Describe on topic, Read on consumerGroup
+ Set topicAcls = getAcl(opts, new HashSet<>(Arrays.asList(READ, DESCRIBE)));
+ Set groupAcls = getAcl(opts, Collections.singleton(READ));
+
+ Map> result = new HashMap<>();
+ for (ResourcePatternFilter topic : topics) {
+ result.put(topic, topicAcls);
+ }
+ for (ResourcePatternFilter group : groups) {
+ result.put(group, groupAcls);
+ }
+ return result;
+ }
+
+ private static Map> getCliResourceFilterToAcls(AclCommandOptions opts) {
+ Set acls = getAcl(opts);
+ Set filters = getResourceFilter(opts, true);
+ return filters.stream().collect(Collectors.toMap(filter -> filter, filter -> acls));
+ }
+
+ private static Set getAcl(AclCommandOptions opts, Set operations) {
+ Set allowedPrincipals = getPrincipals(opts, opts.allowPrincipalsOpt);
+ Set deniedPrincipals = getPrincipals(opts, opts.denyPrincipalsOpt);
+ Set allowedHosts = getHosts(opts, opts.allowHostsOpt, opts.allowPrincipalsOpt);
+ Set deniedHosts = getHosts(opts, opts.denyHostsOpt, opts.denyPrincipalsOpt);
+
+ Set acls = new HashSet<>();
+ if (!allowedHosts.isEmpty() && !allowedPrincipals.isEmpty()) {
+ acls.addAll(getAcls(allowedPrincipals, ALLOW, operations, allowedHosts));
+ }
+ if (!deniedHosts.isEmpty() && !deniedPrincipals.isEmpty()) {
+ acls.addAll(getAcls(deniedPrincipals, DENY, operations, deniedHosts));
+ }
+ return acls;
+ }
+
+ private static Set getAcl(AclCommandOptions opts) {
+ Set operations = opts.options.valuesOf(opts.operationsOpt)
+ .stream().map(operation -> SecurityUtils.operation(operation.trim()))
+ .collect(Collectors.toSet());
+ return getAcl(opts, operations);
+ }
+
+ static Set getAcls(Set principals,
+ AclPermissionType permissionType,
+ Set operations,
+ Set hosts) {
+ Set acls = new HashSet<>();
+ for (KafkaPrincipal principal : principals) {
+ for (AclOperation operation : operations) {
+ for (String host : hosts) {
+ acls.add(new AccessControlEntry(principal.toString(), host, operation, permissionType));
+ }
+ }
+ }
+ return acls;
+ }
+
+ private static Set getHosts(AclCommandOptions opts, OptionSpec hostOptionSpec, OptionSpec principalOptionSpec) {
+ if (opts.options.has(hostOptionSpec)) {
+ return opts.options.valuesOf(hostOptionSpec).stream().map(String::trim).collect(Collectors.toSet());
+ } else if (opts.options.has(principalOptionSpec)) {
+ return Collections.singleton(AclEntry.WILDCARD_HOST);
+ } else {
+ return Collections.emptySet();
+ }
+ }
+
+ private static Set getPrincipals(AclCommandOptions opts, OptionSpec principalOptionSpec) {
+ if (opts.options.has(principalOptionSpec)) {
+ return opts.options.valuesOf(principalOptionSpec).stream()
+ .map(s -> SecurityUtils.parseKafkaPrincipal(s.trim()))
+ .collect(Collectors.toSet());
+ } else {
+ return Collections.emptySet();
+ }
+ }
+
+ private static Set getResourceFilter(AclCommandOptions opts, boolean dieIfNoResourceFound) {
+ PatternType patternType = opts.options.valueOf(opts.resourcePatternType);
+ Set resourceFilters = new HashSet<>();
+ if (opts.options.has(opts.topicOpt)) {
+ opts.options.valuesOf(opts.topicOpt).forEach(topic -> resourceFilters.add(new ResourcePatternFilter(ResourceType.TOPIC, topic.trim(), patternType)));
+ }
+ if (patternType == PatternType.LITERAL && (opts.options.has(opts.clusterOpt) || opts.options.has(opts.idempotentOpt))) {
+ resourceFilters.add(CLUSTER_RESOURCE_FILTER);
+ }
+ if (opts.options.has(opts.groupOpt)) {
+ opts.options.valuesOf(opts.groupOpt).forEach(group -> resourceFilters.add(new ResourcePatternFilter(ResourceType.GROUP, group.trim(), patternType)));
+ }
+ if (opts.options.has(opts.transactionalIdOpt)) {
+ opts.options.valuesOf(opts.transactionalIdOpt).forEach(transactionalId ->
+ resourceFilters.add(new ResourcePatternFilter(ResourceType.TRANSACTIONAL_ID, transactionalId, patternType)));
+ }
+ if (opts.options.has(opts.delegationTokenOpt)) {
+ opts.options.valuesOf(opts.delegationTokenOpt).forEach(token -> resourceFilters.add(new ResourcePatternFilter(ResourceType.DELEGATION_TOKEN, token.trim(), patternType)));
+ }
+ if (opts.options.has(opts.userPrincipalOpt)) {
+ opts.options.valuesOf(opts.userPrincipalOpt).forEach(user -> resourceFilters.add(new ResourcePatternFilter(ResourceType.USER, user.trim(), patternType)));
+ }
+ if (resourceFilters.isEmpty() && dieIfNoResourceFound) {
+ CommandLineUtils.printUsageAndExit(opts.parser, "You must provide at least one resource: --topic or --cluster or --group or --delegation-token ");
+ }
+ return resourceFilters;
+ }
+
+ private static boolean confirmAction(AclCommandOptions opts, String msg) {
+ if (opts.options.has(opts.forceOpt)) {
+ return true;
+ }
+ System.out.println(msg);
+ return System.console().readLine().equalsIgnoreCase("y");
+ }
+
+ private static void validateOperation(AclCommandOptions opts, Map> resourceToAcls) {
+ for (Map.Entry> entry : resourceToAcls.entrySet()) {
+ ResourcePatternFilter resource = entry.getKey();
+ Set acls = entry.getValue();
+ Collection validOps = new HashSet<>(AclEntry.supportedOperations(resource.resourceType()));
+ validOps.add(AclOperation.ALL);
+ Set unsupportedOps = new HashSet<>();
+ for (AccessControlEntry acl : acls) {
+ if (!validOps.contains(acl.operation())) {
+ unsupportedOps.add(acl.operation());
+ }
+ }
+ if (!unsupportedOps.isEmpty()) {
+ String msg = String.format("ResourceType %s only supports operations %s", resource.resourceType(), validOps);
+ CommandLineUtils.printUsageAndExit(opts.parser, msg);
+ }
+ }
+ }
+
+ public static class AclCommandOptions extends CommandDefaultOptions {
+
+ private final OptionSpec bootstrapServerOpt;
+ private final OptionSpec bootstrapControllerOpt;
+ private final OptionSpec commandConfigOpt;
+ private final OptionSpec topicOpt;
+ private final OptionSpecBuilder clusterOpt;
+ private final OptionSpec groupOpt;
+ private final OptionSpec transactionalIdOpt;
+ private final OptionSpecBuilder idempotentOpt;
+ private final OptionSpec delegationTokenOpt;
+ private final OptionSpec resourcePatternType;
+ private final OptionSpecBuilder addOpt;
+ private final OptionSpecBuilder removeOpt;
+ private final OptionSpecBuilder listOpt;
+ private final OptionSpec operationsOpt;
+ private final OptionSpec allowPrincipalsOpt;
+ private final OptionSpec denyPrincipalsOpt;
+ private final OptionSpec listPrincipalsOpt;
+ private final OptionSpec allowHostsOpt;
+ private final OptionSpec denyHostsOpt;
+ private final OptionSpecBuilder producerOpt;
+ private final OptionSpecBuilder consumerOpt;
+ private final OptionSpecBuilder forceOpt;
+ private final OptionSpec userPrincipalOpt;
+
+ @SuppressWarnings("this-escape")
+ public AclCommandOptions(String[] args) {
+ super(args);
+ bootstrapServerOpt = parser.accepts("bootstrap-server", "A list of host/port pairs to use for establishing the connection to the Kafka cluster." +
+ " This list should be in the form host1:port1,host2:port2,... This config is required for acl management using admin client API.")
+ .withRequiredArg()
+ .describedAs("server to connect to")
+ .ofType(String.class);
+ bootstrapControllerOpt = parser.accepts("bootstrap-controller", "A list of host/port pairs to use for establishing the connection to the Kafka cluster." +
+ " This list should be in the form host1:port1,host2:port2,... This config is required for acl management using admin client API.")
+ .withRequiredArg()
+ .describedAs("controller to connect to")
+ .ofType(String.class);
+ commandConfigOpt = parser.accepts("command-config", "A property file containing configs to be passed to Admin Client.")
+ .withOptionalArg()
+ .describedAs("command-config")
+ .ofType(String.class);
+ topicOpt = parser.accepts("topic", "topic to which ACLs should be added or removed. " +
+ "A value of '*' indicates ACL should apply to all topics.")
+ .withRequiredArg()
+ .describedAs("topic")
+ .ofType(String.class);
+ clusterOpt = parser.accepts("cluster", "Add/Remove cluster ACLs.");
+ groupOpt = parser.accepts("group", "Consumer Group to which the ACLs should be added or removed. " +
+ "A value of '*' indicates the ACLs should apply to all groups.")
+ .withRequiredArg()
+ .describedAs("group")
+ .ofType(String.class);
+ transactionalIdOpt = parser.accepts("transactional-id", "The transactionalId to which ACLs should " +
+ "be added or removed. A value of '*' indicates the ACLs should apply to all transactionalIds.")
+ .withRequiredArg()
+ .describedAs("transactional-id")
+ .ofType(String.class);
+ idempotentOpt = parser.accepts("idempotent", "Enable idempotence for the producer. This should be " +
+ "used in combination with the --producer option. Note that idempotence is enabled automatically if " +
+ "the producer is authorized to a particular transactional-id.");
+ delegationTokenOpt = parser.accepts("delegation-token", "Delegation token to which ACLs should be added or removed. " +
+ "A value of '*' indicates ACL should apply to all tokens.")
+ .withRequiredArg()
+ .describedAs("delegation-token")
+ .ofType(String.class);
+ resourcePatternType = parser.accepts("resource-pattern-type", "The type of the resource pattern or pattern filter. " +
+ "When adding acls, this should be a specific pattern type, e.g. 'literal' or 'prefixed'. " +
+ "When listing or removing acls, a specific pattern type can be used to list or remove acls from specific resource patterns, " +
+ "or use the filter values of 'any' or 'match', where 'any' will match any pattern type, but will match the resource name exactly, " +
+ "where as 'match' will perform pattern matching to list or remove all acls that affect the supplied resource(s). " +
+ "WARNING: 'match', when used in combination with the '--remove' switch, should be used with care.")
+ .withRequiredArg()
+ .ofType(String.class)
+ .withValuesConvertedBy(new PatternTypeConverter())
+ .defaultsTo(PatternType.LITERAL);
+ addOpt = parser.accepts("add", "Indicates you are trying to add ACLs.");
+ removeOpt = parser.accepts("remove", "Indicates you are trying to remove ACLs.");
+ listOpt = parser.accepts("list", "List ACLs for the specified resource, use --topic or --group or --cluster to specify a resource.");
+ operationsOpt = parser.accepts("operation", "Operation that is being allowed or denied. Valid operation names are: " + NL +
+ AclEntry.ACL_OPERATIONS.stream().map(o -> "\t" + SecurityUtils.operationName(o)).collect(Collectors.joining(NL)) + NL)
+ .withRequiredArg()
+ .ofType(String.class)
+ .defaultsTo(SecurityUtils.operationName(AclOperation.ALL));
+ allowPrincipalsOpt = parser.accepts("allow-principal", "principal is in principalType:name format." +
+ " Note that principalType must be supported by the Authorizer being used." +
+ " For example, User:'*' is the wild card indicating all users.")
+ .withRequiredArg()
+ .describedAs("allow-principal")
+ .ofType(String.class);
+ denyPrincipalsOpt = parser.accepts("deny-principal", "principal is in principalType:name format. " +
+ "By default anyone not added through --allow-principal is denied access. " +
+ "You only need to use this option as negation to already allowed set. " +
+ "Note that principalType must be supported by the Authorizer being used. " +
+ "For example if you wanted to allow access to all users in the system but not test-user you can define an ACL that " +
+ "allows access to User:'*' and specify --deny-principal=User:test@EXAMPLE.COM. " +
+ "AND PLEASE REMEMBER DENY RULES TAKES PRECEDENCE OVER ALLOW RULES.")
+ .withRequiredArg()
+ .describedAs("deny-principal")
+ .ofType(String.class);
+ listPrincipalsOpt = parser.accepts("principal", "List ACLs for the specified principal. principal is in principalType:name format." +
+ " Note that principalType must be supported by the Authorizer being used. Multiple --principal option can be passed.")
+ .withOptionalArg()
+ .describedAs("principal")
+ .ofType(String.class);
+ allowHostsOpt = parser.accepts("allow-host", "Host from which principals listed in --allow-principal will have access. " +
+ "If you have specified --allow-principal then the default for this option will be set to '*' which allows access from all hosts.")
+ .withRequiredArg()
+ .describedAs("allow-host")
+ .ofType(String.class);
+ denyHostsOpt = parser.accepts("deny-host", "Host from which principals listed in --deny-principal will be denied access. " +
+ "If you have specified --deny-principal then the default for this option will be set to '*' which denies access from all hosts.")
+ .withRequiredArg()
+ .describedAs("deny-host")
+ .ofType(String.class);
+ producerOpt = parser.accepts("producer", "Convenience option to add/remove ACLs for producer role. " +
+ "This will generate ACLs that allows WRITE,DESCRIBE and CREATE on topic.");
+ consumerOpt = parser.accepts("consumer", "Convenience option to add/remove ACLs for consumer role. " +
+ "This will generate ACLs that allows READ,DESCRIBE on topic and READ on group.");
+ forceOpt = parser.accepts("force", "Assume Yes to all queries and do not prompt.");
+ userPrincipalOpt = parser.accepts("user-principal", "Specifies a user principal as a resource in relation with the operation. For instance " +
+ "one could grant CreateTokens or DescribeTokens permission on a given user principal.")
+ .withRequiredArg()
+ .describedAs("user-principal")
+ .ofType(String.class);
+
+ try {
+ options = parser.parse(args);
+ } catch (OptionException e) {
+ CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+ }
+ checkArgs();
+ }
+
+ void checkArgs() {
+ CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to manage acls on kafka.");
+
+ if (options.has(bootstrapServerOpt) && options.has(bootstrapControllerOpt)) {
+ CommandLineUtils.printUsageAndExit(parser, "Only one of --bootstrap-server or --bootstrap-controller must be specified");
+ }
+ if (!options.has(bootstrapServerOpt) && !options.has(bootstrapControllerOpt)) {
+ CommandLineUtils.printUsageAndExit(parser, "One of --bootstrap-server or --bootstrap-controller must be specified");
+ }
+ List> mutuallyExclusiveOptions = Arrays.asList(addOpt, removeOpt, listOpt);
+ long mutuallyExclusiveOptionsCount = mutuallyExclusiveOptions.stream()
+ .filter(abstractOptionSpec -> options.has(abstractOptionSpec))
+ .count();
+ if (mutuallyExclusiveOptionsCount != 1) {
+ CommandLineUtils.printUsageAndExit(parser, "Command must include exactly one action: --list, --add, --remove. ");
+ }
+ CommandLineUtils.checkInvalidArgs(parser, options, listOpt, producerOpt, consumerOpt, allowHostsOpt, allowPrincipalsOpt, denyHostsOpt, denyPrincipalsOpt);
+
+ //when --producer or --consumer is specified , user should not specify operations as they are inferred and we also disallow --deny-principals and --deny-hosts.
+ CommandLineUtils.checkInvalidArgs(parser, options, producerOpt, operationsOpt, denyPrincipalsOpt, denyHostsOpt);
+ CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt, operationsOpt, denyPrincipalsOpt, denyHostsOpt);
+
+ if (options.has(listPrincipalsOpt) && !options.has(listOpt)) {
+ CommandLineUtils.printUsageAndExit(parser, "The --principal option is only available if --list is set");
+ }
+ if (options.has(producerOpt) && !options.has(topicOpt)) {
+ CommandLineUtils.printUsageAndExit(parser, "With --producer you must specify a --topic");
+ }
+ if (options.has(idempotentOpt) && !options.has(producerOpt)) {
+ CommandLineUtils.printUsageAndExit(parser, "The --idempotent option is only available if --producer is set");
+ }
+ if (options.has(consumerOpt) && (!options.has(topicOpt) || !options.has(groupOpt) || (!options.has(producerOpt) && (options.has(clusterOpt) || options.has(transactionalIdOpt))))) {
+ CommandLineUtils.printUsageAndExit(parser, "With --consumer you must specify a --topic and a --group and no --cluster or --transactional-id option should be specified.");
+ }
+ }
+ }
+
+ static class PatternTypeConverter extends EnumConverter {
+
+ PatternTypeConverter() {
+ super(PatternType.class);
+ }
+
+ @Override
+ public PatternType convert(String value) {
+ PatternType patternType = super.convert(value);
+ if (patternType.isUnknown())
+ throw new ValueConversionException("Unknown resource-pattern-type: " + value);
+
+ return patternType;
+ }
+
+ @Override
+ public String valuePattern() {
+ List values = Arrays.asList(PatternType.values());
+ List filteredValues = values.stream()
+ .filter(type -> type != PatternType.UNKNOWN)
+ .collect(Collectors.toList());
+ return filteredValues.stream()
+ .map(Object::toString)
+ .collect(Collectors.joining("|"));
+ }
+ }
+}
diff --git a/core/src/test/java/kafka/admin/AclCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/AclCommandTest.java
similarity index 64%
rename from core/src/test/java/kafka/admin/AclCommandTest.java
rename to tools/src/test/java/org/apache/kafka/tools/AclCommandTest.java
index e71c348c272..12f4dab8801 100644
--- a/core/src/test/java/kafka/admin/AclCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/AclCommandTest.java
@@ -14,9 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package kafka.admin;
-
-import kafka.admin.AclCommand.AclCommandOptions;
+package org.apache.kafka.tools;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBindingFilter;
@@ -44,15 +42,9 @@ import org.apache.logging.log4j.Level;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
-import java.io.PrintStream;
-import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -63,9 +55,6 @@ import java.util.stream.Collectors;
import javax.management.InstanceAlreadyExistsException;
-import scala.Console;
-import scala.jdk.javaapi.CollectionConverters;
-
import static org.apache.kafka.common.acl.AccessControlEntryFilter.ANY;
import static org.apache.kafka.common.acl.AclOperation.ALTER;
import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
@@ -120,102 +109,98 @@ public class AclCommandTest {
private static final String TOPIC = "--topic";
private static final String RESOURCE_PATTERN_TYPE = "--resource-pattern-type";
private static final KafkaPrincipal PRINCIPAL = SecurityUtils.parseKafkaPrincipal("User:test2");
- private static final Set USERS = new HashSet<>(Arrays.asList(
- SecurityUtils.parseKafkaPrincipal("User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"),
- PRINCIPAL,
- SecurityUtils.parseKafkaPrincipal("User:CN=\\#User with special chars in CN : (\\, \\+ \" \\ \\< \\> \\; ')")
- ));
- private static final Set HOSTS = new HashSet<>(Arrays.asList("host1", "host2"));
- private static final List ALLOW_HOST_COMMAND = Arrays.asList("--allow-host", "host1", "--allow-host", "host2");
- private static final List DENY_HOST_COMMAND = Arrays.asList("--deny-host", "host1", "--deny-host", "host2");
+ private static final Set USERS = Set.of(
+ SecurityUtils.parseKafkaPrincipal("User:CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown"),
+ PRINCIPAL,
+ SecurityUtils.parseKafkaPrincipal("User:CN=\\#User with special chars in CN : (\\, \\+ \" \\ \\< \\> \\; ')")
+ );
+ private static final Set HOSTS = Set.of("host1", "host2");
+ private static final List ALLOW_HOST_COMMAND = List.of("--allow-host", "host1", "--allow-host", "host2");
+ private static final List DENY_HOST_COMMAND = List.of("--deny-host", "host1", "--deny-host", "host2");
private static final ResourcePattern CLUSTER_RESOURCE = new ResourcePattern(CLUSTER, Resource.CLUSTER_NAME, LITERAL);
- private static final Set TOPIC_RESOURCES = new HashSet<>(Arrays.asList(
- new ResourcePattern(ResourceType.TOPIC, "test-1", LITERAL),
- new ResourcePattern(ResourceType.TOPIC, "test-2", LITERAL)
- ));
- private static final Set GROUP_RESOURCES = new HashSet<>(Arrays.asList(
- new ResourcePattern(ResourceType.GROUP, "testGroup-1", LITERAL),
- new ResourcePattern(ResourceType.GROUP, "testGroup-2", LITERAL)
- ));
- private static final Set TRANSACTIONAL_ID_RESOURCES = new HashSet<>(Arrays.asList(
+ private static final Set TOPIC_RESOURCES = Set.of(
+ new ResourcePattern(ResourceType.TOPIC, "test-1", LITERAL),
+ new ResourcePattern(ResourceType.TOPIC, "test-2", LITERAL)
+ );
+ private static final Set GROUP_RESOURCES = Set.of(
+ new ResourcePattern(ResourceType.GROUP, "testGroup-1", LITERAL),
+ new ResourcePattern(ResourceType.GROUP, "testGroup-2", LITERAL)
+ );
+ private static final Set TRANSACTIONAL_ID_RESOURCES = Set.of(
new ResourcePattern(TRANSACTIONAL_ID, "t0", LITERAL),
new ResourcePattern(TRANSACTIONAL_ID, "t1", LITERAL)
- ));
- private static final Set TOKEN_RESOURCES = new HashSet<>(Arrays.asList(
- new ResourcePattern(DELEGATION_TOKEN, "token1", LITERAL),
- new ResourcePattern(DELEGATION_TOKEN, "token2", LITERAL)
- ));
- private static final Set USER_RESOURCES = new HashSet<>(Arrays.asList(
- new ResourcePattern(USER, "User:test-user1", LITERAL),
- new ResourcePattern(USER, "User:test-user2", LITERAL)
- ));
+ );
+ private static final Set TOKEN_RESOURCES = Set.of(
+ new ResourcePattern(DELEGATION_TOKEN, "token1", LITERAL),
+ new ResourcePattern(DELEGATION_TOKEN, "token2", LITERAL)
+ );
+ private static final Set USER_RESOURCES = Set.of(
+ new ResourcePattern(USER, "User:test-user1", LITERAL),
+ new ResourcePattern(USER, "User:test-user2", LITERAL)
+ );
- private static final Map, List> RESOURCE_TO_COMMAND = new HashMap, List>() {{
- put(TOPIC_RESOURCES, Arrays.asList(TOPIC, "test-1", TOPIC, "test-2"));
- put(Collections.singleton(CLUSTER_RESOURCE), Collections.singletonList("--cluster"));
- put(GROUP_RESOURCES, Arrays.asList(GROUP, "testGroup-1", GROUP, "testGroup-2"));
- put(TRANSACTIONAL_ID_RESOURCES, Arrays.asList("--transactional-id", "t0", "--transactional-id", "t1"));
- put(TOKEN_RESOURCES, Arrays.asList("--delegation-token", "token1", "--delegation-token", "token2"));
- put(USER_RESOURCES, Arrays.asList("--user-principal", "User:test-user1", "--user-principal", "User:test-user2"));
- }};
+ private static final Map, List> RESOURCE_TO_COMMAND = Map.of(
+ TOPIC_RESOURCES, List.of(TOPIC, "test-1", TOPIC, "test-2"),
+ Set.of(CLUSTER_RESOURCE), List.of("--cluster"),
+ GROUP_RESOURCES, List.of(GROUP, "testGroup-1", GROUP, "testGroup-2"),
+ TRANSACTIONAL_ID_RESOURCES, List.of("--transactional-id", "t0", "--transactional-id", "t1"),
+ TOKEN_RESOURCES, List.of("--delegation-token", "token1", "--delegation-token", "token2"),
+ USER_RESOURCES, List.of("--user-principal", "User:test-user1", "--user-principal", "User:test-user2")
+ );
- private static final Map, Map.Entry, List>> RESOURCE_TO_OPERATIONS =
- new HashMap, Map.Entry, List>>() {{
- put(TOPIC_RESOURCES, new SimpleImmutableEntry<>(
- new HashSet<>(Arrays.asList(READ, WRITE, CREATE, DESCRIBE, DELETE, DESCRIBE_CONFIGS, ALTER_CONFIGS, ALTER)),
- Arrays.asList(OPERATION, "Read", OPERATION, "Write", OPERATION, "Create",
- OPERATION, "Describe", OPERATION, "Delete", OPERATION, "DescribeConfigs",
- OPERATION, "AlterConfigs", OPERATION, "Alter"))
- );
- put(Collections.singleton(CLUSTER_RESOURCE), new SimpleImmutableEntry<>(
- new HashSet<>(Arrays.asList(CREATE, CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, ALTER, DESCRIBE)),
- Arrays.asList(OPERATION, "Create", OPERATION, "ClusterAction", OPERATION, "DescribeConfigs",
- OPERATION, "AlterConfigs", OPERATION, "IdempotentWrite", OPERATION, "Alter", OPERATION, "Describe"))
- );
- put(GROUP_RESOURCES, new SimpleImmutableEntry<>(
- new HashSet<>(Arrays.asList(READ, DESCRIBE, DELETE)),
- Arrays.asList(OPERATION, "Read", OPERATION, "Describe", OPERATION, "Delete"))
- );
- put(TRANSACTIONAL_ID_RESOURCES, new SimpleImmutableEntry<>(
- new HashSet<>(Arrays.asList(DESCRIBE, WRITE)),
- Arrays.asList(OPERATION, "Describe", OPERATION, "Write"))
- );
- put(TOKEN_RESOURCES, new SimpleImmutableEntry<>(Collections.singleton(DESCRIBE), Arrays.asList(OPERATION, "Describe")));
- put(USER_RESOURCES, new SimpleImmutableEntry<>(
- new HashSet<>(Arrays.asList(CREATE_TOKENS, DESCRIBE_TOKENS)),
- Arrays.asList(OPERATION, "CreateTokens", OPERATION, "DescribeTokens"))
- );
- }};
+ private static final Map, Map.Entry, List>> RESOURCE_TO_OPERATIONS = Map.of(
+ TOPIC_RESOURCES, Map.entry(
+ Set.of(READ, WRITE, CREATE, DESCRIBE, DELETE, DESCRIBE_CONFIGS, ALTER_CONFIGS, ALTER),
+ List.of(OPERATION, "Read", OPERATION, "Write", OPERATION, "Create",
+ OPERATION, "Describe", OPERATION, "Delete", OPERATION, "DescribeConfigs",
+ OPERATION, "AlterConfigs", OPERATION, "Alter")),
+ Set.of(CLUSTER_RESOURCE), Map.entry(
+ Set.of(CREATE, CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, ALTER, DESCRIBE),
+ List.of(OPERATION, "Create", OPERATION, "ClusterAction", OPERATION, "DescribeConfigs",
+ OPERATION, "AlterConfigs", OPERATION, "IdempotentWrite", OPERATION, "Alter", OPERATION, "Describe")),
+ GROUP_RESOURCES, Map.entry(
+ Set.of(READ, DESCRIBE, DELETE),
+ List.of(OPERATION, "Read", OPERATION, "Describe", OPERATION, "Delete")),
+ TRANSACTIONAL_ID_RESOURCES, Map.entry(
+ Set.of(DESCRIBE, WRITE),
+ List.of(OPERATION, "Describe", OPERATION, "Write")),
+ TOKEN_RESOURCES, Map.entry(
+ Set.of(DESCRIBE),
+ List.of(OPERATION, "Describe")),
+ USER_RESOURCES, Map.entry(
+ Set.of(CREATE_TOKENS, DESCRIBE_TOKENS),
+ List.of(OPERATION, "CreateTokens", OPERATION, "DescribeTokens"))
+ );
- private static final Map, Set> CONSUMER_RESOURCE_TO_ACLS =
- new HashMap, Set>() {{
- put(TOPIC_RESOURCES, asJavaSet(AclCommand.getAcls(asScalaSet(USERS), ALLOW,
- asScalaSet(new HashSet<>(Arrays.asList(READ, DESCRIBE))), asScalaSet(HOSTS))));
- put(GROUP_RESOURCES, asJavaSet(AclCommand.getAcls(asScalaSet(USERS), ALLOW,
- asScalaSet(Collections.singleton(READ)), asScalaSet(HOSTS))));
- }};
+ private static final Map, Set> CONSUMER_RESOURCE_TO_ACLS = Map.of(
+ TOPIC_RESOURCES, AclCommand.getAcls(USERS, ALLOW, Set.of(READ, DESCRIBE), HOSTS),
+ GROUP_RESOURCES, AclCommand.getAcls(USERS, ALLOW, Set.of(READ), HOSTS)
+ );
- private static final Map, Map, Set>> CMD_TO_RESOURCES_TO_ACL =
- new HashMap, Map, Set>>() {{
- put(Collections.singletonList(PRODUCER), producerResourceToAcls(false));
- put(Arrays.asList(PRODUCER, IDEMPOTENT), producerResourceToAcls(true));
- put(Collections.singletonList(CONSUMER), CONSUMER_RESOURCE_TO_ACLS);
- put(Arrays.asList(PRODUCER, CONSUMER),
- CONSUMER_RESOURCE_TO_ACLS.entrySet().stream().map(entry -> {
- Set value = new HashSet<>(entry.getValue());
- value.addAll(producerResourceToAcls(false)
- .getOrDefault(entry.getKey(), Collections.emptySet()));
- return new SimpleEntry<>(entry.getKey(), value);
- }).collect(Collectors.toMap(Entry::getKey, Entry::getValue)));
- put(Arrays.asList(PRODUCER, IDEMPOTENT, CONSUMER),
- CONSUMER_RESOURCE_TO_ACLS.entrySet().stream().map(entry -> {
- Set value = new HashSet<>(entry.getValue());
- value.addAll(producerResourceToAcls(true)
- .getOrDefault(entry.getKey(), Collections.emptySet()));
- return new SimpleEntry<>(entry.getKey(), value);
- }).collect(Collectors.toMap(Entry::getKey, Entry::getValue)));
- }};
+ private static final Map, Map, Set>> CMD_TO_RESOURCES_TO_ACL = Map.of(
+ List.of(PRODUCER), producerResourceToAcls(false),
+ List.of(PRODUCER, IDEMPOTENT), producerResourceToAcls(true),
+ List.of(CONSUMER), CONSUMER_RESOURCE_TO_ACLS,
+ List.of(PRODUCER, CONSUMER),
+ CONSUMER_RESOURCE_TO_ACLS.entrySet().stream().collect(Collectors.toMap(
+ Map.Entry::getKey,
+ entry -> {
+ Set value = new HashSet<>(entry.getValue());
+ value.addAll(producerResourceToAcls(false).getOrDefault(entry.getKey(), Set.of()));
+ return value;
+ }
+ )),
+ List.of(PRODUCER, IDEMPOTENT, CONSUMER),
+ CONSUMER_RESOURCE_TO_ACLS.entrySet().stream().collect(Collectors.toMap(
+ Map.Entry::getKey,
+ entry -> {
+ Set value = new HashSet<>(entry.getValue());
+ value.addAll(producerResourceToAcls(true).getOrDefault(entry.getKey(), Set.of()));
+ return value;
+ }
+ ))
+ );
@ClusterTest
public void testAclCliWithAdminAPI(ClusterInstance cluster) throws InterruptedException {
@@ -295,7 +280,7 @@ public class AclCommandTest {
@Test
public void testUseBootstrapServerOptWithBootstrapControllerOpt() {
assertInitializeInvalidOptionsExitCodeAndMsg(
- Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, BOOTSTRAP_CONTROLLER, LOCALHOST),
+ List.of(BOOTSTRAP_SERVER, LOCALHOST, BOOTSTRAP_CONTROLLER, LOCALHOST),
"Only one of --bootstrap-server or --bootstrap-controller must be specified"
);
}
@@ -303,7 +288,7 @@ public class AclCommandTest {
@Test
public void testUseWithoutBootstrapServerOptAndBootstrapControllerOpt() {
assertInitializeInvalidOptionsExitCodeAndMsg(
- Collections.emptyList(),
+ List.of(ADD),
"One of --bootstrap-server or --bootstrap-controller must be specified"
);
}
@@ -311,14 +296,14 @@ public class AclCommandTest {
@Test
public void testExactlyOneAction() {
String errMsg = "Command must include exactly one action: --list, --add, --remove. ";
- assertInitializeInvalidOptionsExitCodeAndMsg(Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, LIST), errMsg);
- assertInitializeInvalidOptionsExitCodeAndMsg(Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, LIST, REMOVE), errMsg);
+ assertInitializeInvalidOptionsExitCodeAndMsg(List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, LIST), errMsg);
+ assertInitializeInvalidOptionsExitCodeAndMsg(List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, LIST, REMOVE), errMsg);
}
@Test
public void testUseListPrincipalsOptWithoutListOpt() {
assertInitializeInvalidOptionsExitCodeAndMsg(
- Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, "--principal", "User:CN=client"),
+ List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, "--principal", "User:CN=client"),
"The --principal option is only available if --list is set"
);
}
@@ -326,7 +311,7 @@ public class AclCommandTest {
@Test
public void testUseProducerOptWithoutTopicOpt() {
assertInitializeInvalidOptionsExitCodeAndMsg(
- Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, PRODUCER),
+ List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, PRODUCER),
"With --producer you must specify a --topic"
);
}
@@ -334,7 +319,7 @@ public class AclCommandTest {
@Test
public void testUseIdempotentOptWithoutProducerOpt() {
assertInitializeInvalidOptionsExitCodeAndMsg(
- Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, IDEMPOTENT),
+ List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, IDEMPOTENT),
"The --idempotent option is only available if --producer is set"
);
}
@@ -342,24 +327,24 @@ public class AclCommandTest {
@Test
public void testUseConsumerOptWithoutRequiredOpt() {
assertInitializeInvalidOptionsExitCodeAndMsg(
- Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, CONSUMER),
+ List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, CONSUMER),
"With --consumer you must specify a --topic and a --group and no --cluster or --transactional-id option should be specified."
);
- checkNotThrow(Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, CONSUMER, TOPIC, "test-topic", GROUP, "test-group"));
+ checkNotThrow(List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, CONSUMER, TOPIC, "test-topic", GROUP, "test-group"));
}
@Test
public void testInvalidArgs() {
assertInitializeInvalidOptionsExitCodeAndMsg(
- Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, LIST, PRODUCER),
+ List.of(BOOTSTRAP_SERVER, LOCALHOST, LIST, PRODUCER),
"Option \"[list]\" can't be used with option \"[producer]\""
);
assertInitializeInvalidOptionsExitCodeAndMsg(
- Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, PRODUCER, OPERATION),
+ List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, PRODUCER, OPERATION, "all"),
"Option \"[producer]\" can't be used with option \"[operation]\""
);
assertInitializeInvalidOptionsExitCodeAndMsg(
- Arrays.asList(BOOTSTRAP_SERVER, LOCALHOST, ADD, CONSUMER, OPERATION, TOPIC, "test-topic", GROUP, "test-group"),
+ List.of(BOOTSTRAP_SERVER, LOCALHOST, ADD, CONSUMER, OPERATION, TOPIC, "test-topic", GROUP, "test-group"),
"Option \"[consumer]\" can't be used with option \"[operation]\""
);
}
@@ -394,8 +379,7 @@ public class AclCommandTest {
}
private void testAclsOnPrefixedResources(ClusterInstance cluster, List cmdArgs) throws InterruptedException {
- List cmd = Arrays.asList("--allow-principal", PRINCIPAL.toString(), PRODUCER, TOPIC, "Test-",
- RESOURCE_PATTERN_TYPE, "Prefixed");
+ List cmd = List.of("--allow-principal", PRINCIPAL.toString(), PRODUCER, TOPIC, "Test-", RESOURCE_PATTERN_TYPE, "Prefixed");
List args = new ArrayList<>(cmdArgs);
args.addAll(cmd);
@@ -406,7 +390,7 @@ public class AclCommandTest {
AccessControlEntry describeAcl = new AccessControlEntry(PRINCIPAL.toString(), WILDCARD_HOST, DESCRIBE, ALLOW);
AccessControlEntry createAcl = new AccessControlEntry(PRINCIPAL.toString(), WILDCARD_HOST, CREATE, ALLOW);
cluster.waitAcls(new AclBindingFilter(new ResourcePattern(ResourceType.TOPIC, "Test-", PREFIXED).toFilter(), ANY),
- Arrays.asList(writeAcl, describeAcl, createAcl));
+ List.of(writeAcl, describeAcl, createAcl));
args = new ArrayList<>(cmdArgs);
args.addAll(cmd);
@@ -415,45 +399,42 @@ public class AclCommandTest {
callMain(args);
cluster.waitAcls(new AclBindingFilter(new ResourcePattern(ResourceType.CLUSTER, "kafka-cluster", PREFIXED).toFilter(), ANY),
- Collections.emptySet());
+ Set.of());
cluster.waitAcls(new AclBindingFilter(new ResourcePattern(ResourceType.TOPIC, "Test-", PREFIXED).toFilter(), ANY),
- Collections.emptySet());
+ Set.of());
}
private static Map, Set> producerResourceToAcls(boolean enableIdempotence) {
- Map, Set> result = new HashMap<>();
- result.put(TOPIC_RESOURCES, asJavaSet(AclCommand.getAcls(asScalaSet(USERS), ALLOW, asScalaSet(
- new HashSet<>(Arrays.asList(WRITE, DESCRIBE, CREATE))), asScalaSet(HOSTS))));
- result.put(TRANSACTIONAL_ID_RESOURCES, asJavaSet(AclCommand.getAcls(asScalaSet(USERS), ALLOW, asScalaSet(
- new HashSet<>(Arrays.asList(WRITE, DESCRIBE))), asScalaSet(HOSTS))));
- result.put(Collections.singleton(CLUSTER_RESOURCE), asJavaSet(AclCommand.getAcls(asScalaSet(USERS), ALLOW,
+ return Map.of(
+ TOPIC_RESOURCES, AclCommand.getAcls(USERS, ALLOW, Set.of(WRITE, DESCRIBE, CREATE), HOSTS),
+ TRANSACTIONAL_ID_RESOURCES, AclCommand.getAcls(USERS, ALLOW, Set.of(WRITE, DESCRIBE), HOSTS),
+ Set.of(CLUSTER_RESOURCE), AclCommand.getAcls(USERS, ALLOW,
enableIdempotence
- ? asScalaSet(Collections.singleton(IDEMPOTENT_WRITE))
- : asScalaSet(Collections.emptySet()), asScalaSet(HOSTS))));
- return result;
+ ? Set.of(IDEMPOTENT_WRITE)
+ : Set.of(), HOSTS));
}
private List adminArgs(String bootstrapServer, Optional commandConfig) {
- List adminArgs = new ArrayList<>(Arrays.asList(BOOTSTRAP_SERVER, bootstrapServer));
- commandConfig.ifPresent(file -> adminArgs.addAll(Arrays.asList(COMMAND_CONFIG, file.getAbsolutePath())));
+ List adminArgs = new ArrayList<>(List.of(BOOTSTRAP_SERVER, bootstrapServer));
+ commandConfig.ifPresent(file -> adminArgs.addAll(List.of(COMMAND_CONFIG, file.getAbsolutePath())));
return adminArgs;
}
private List adminArgsWithBootstrapController(String bootstrapController, Optional commandConfig) {
- List adminArgs = new ArrayList<>(Arrays.asList(BOOTSTRAP_CONTROLLER, bootstrapController));
- commandConfig.ifPresent(file -> adminArgs.addAll(Arrays.asList(COMMAND_CONFIG, file.getAbsolutePath())));
+ List adminArgs = new ArrayList<>(List.of(BOOTSTRAP_CONTROLLER, bootstrapController));
+ commandConfig.ifPresent(file -> adminArgs.addAll(List.of(COMMAND_CONFIG, file.getAbsolutePath())));
return adminArgs;
}
private Map.Entry callMain(List args) {
- return grabConsoleOutputAndError(() -> AclCommand.main(args.toArray(new String[0])));
+ return ToolsTestUtils.grabConsoleOutputAndError(() -> AclCommand.main(args.toArray(new String[0])));
}
private void testAclCli(ClusterInstance cluster, List cmdArgs) throws InterruptedException {
for (Map.Entry, List> entry : RESOURCE_TO_COMMAND.entrySet()) {
Set resources = entry.getKey();
List resourceCmd = entry.getValue();
- Set permissionTypes = new HashSet<>(Arrays.asList(ALLOW, DENY));
+ Set permissionTypes = Set.of(ALLOW, DENY);
for (AclPermissionType permissionType : permissionTypes) {
Map.Entry, List> operationToCmd = RESOURCE_TO_OPERATIONS.get(resources);
Map.Entry, List> aclToCommand = getAclToCommand(permissionType, operationToCmd.getKey());
@@ -494,8 +475,8 @@ public class AclCommandTest {
String resourceType = resource.resourceType().toString();
List cmd = resource == CLUSTER_RESOURCE
- ? Collections.singletonList("kafka-cluster")
- : resourceCmd.stream().filter(s -> !s.startsWith("--")).collect(Collectors.toList());
+ ? List.of("kafka-cluster")
+ : resourceCmd.stream().filter(s -> !s.startsWith("--")).toList();
cmd.forEach(name -> {
String expected = String.format(
@@ -509,36 +490,37 @@ public class AclCommandTest {
private void testPatternTypes(List cmdArgs) {
Exit.setExitProcedure((status, message) -> {
- if ((int) status == 1)
+ if (status == 1)
throw new RuntimeException("Exiting command");
else
throw new AssertionError("Unexpected exit with status " + status);
});
try {
- Arrays.stream(PatternType.values()).sequential().forEach(patternType -> {
+ for (PatternType patternType : PatternType.values()) {
List addCmd = new ArrayList<>(cmdArgs);
- addCmd.addAll(Arrays.asList("--allow-principal", PRINCIPAL.toString(), PRODUCER, TOPIC, "Test",
+ addCmd.addAll(List.of("--allow-principal", PRINCIPAL.toString(), PRODUCER, TOPIC, "Test",
ADD, RESOURCE_PATTERN_TYPE, patternType.toString()));
verifyPatternType(addCmd, patternType.isSpecific());
List listCmd = new ArrayList<>(cmdArgs);
- listCmd.addAll(Arrays.asList(TOPIC, "Test", LIST, RESOURCE_PATTERN_TYPE, patternType.toString()));
+ listCmd.addAll(List.of(TOPIC, "Test", LIST, RESOURCE_PATTERN_TYPE, patternType.toString()));
verifyPatternType(listCmd, patternType != PatternType.UNKNOWN);
List removeCmd = new ArrayList<>(cmdArgs);
- removeCmd.addAll(Arrays.asList(TOPIC, "Test", "--force", REMOVE, RESOURCE_PATTERN_TYPE, patternType.toString()));
+ removeCmd.addAll(List.of(TOPIC, "Test", "--force", REMOVE, RESOURCE_PATTERN_TYPE, patternType.toString()));
verifyPatternType(removeCmd, patternType != PatternType.UNKNOWN);
- });
+ }
} finally {
Exit.resetExitProcedure();
}
}
private void verifyPatternType(List cmd, boolean isValid) {
- if (isValid)
+ if (isValid) {
callMain(cmd);
- else
+ } else {
assertThrows(RuntimeException.class, () -> callMain(cmd));
+ }
}
private void testRemove(
@@ -554,7 +536,7 @@ public class AclCommandTest {
Map.Entry out = callMain(args);
assertEquals("", out.getValue());
for (ResourcePattern resource : resources) {
- cluster.waitAcls(new AclBindingFilter(resource.toFilter(), ANY), Collections.emptySet());
+ cluster.waitAcls(new AclBindingFilter(resource.toFilter(), ANY), Set.of());
}
}
@@ -562,8 +544,8 @@ public class AclCommandTest {
AclPermissionType permissionType,
Set operations
) {
- return new SimpleImmutableEntry<>(
- asJavaSet(AclCommand.getAcls(asScalaSet(USERS), permissionType, asScalaSet(operations), asScalaSet(HOSTS))),
+ return Map.entry(
+ AclCommand.getAcls(USERS, permissionType, operations, HOSTS),
getCmd(permissionType)
);
}
@@ -575,44 +557,12 @@ public class AclCommandTest {
List fullCmd = new ArrayList<>();
for (KafkaPrincipal user : USERS) {
fullCmd.addAll(cmd);
- fullCmd.addAll(Arrays.asList(principalCmd, user.toString()));
+ fullCmd.addAll(List.of(principalCmd, user.toString()));
}
return fullCmd;
}
- /**
- * Capture both the console output and console error during the execution of the provided function.
- */
- private static Map.Entry grabConsoleOutputAndError(Runnable runnable) {
- ByteArrayOutputStream outBuf = new ByteArrayOutputStream();
- ByteArrayOutputStream errBuf = new ByteArrayOutputStream();
- PrintStream out = new PrintStream(outBuf);
- PrintStream err = new PrintStream(errBuf);
- try {
- Console.withOut(out, () -> {
- Console.withErr(err, () -> {
- runnable.run();
- return null;
- });
- return null;
- });
- } finally {
- out.flush();
- err.flush();
- }
-
- return new SimpleImmutableEntry<>(outBuf.toString(), errBuf.toString());
- }
-
- private static scala.collection.immutable.Set asScalaSet(Set javaSet) {
- return CollectionConverters.asScala(javaSet).toSet();
- }
-
- private static Set asJavaSet(scala.collection.immutable.Set scalaSet) {
- return CollectionConverters.asJava(scalaSet);
- }
-
private void assertInitializeInvalidOptionsExitCodeAndMsg(List args, String expectedMsg) {
Exit.setExitProcedure((exitCode, message) -> {
assertEquals(1, exitCode);
@@ -620,7 +570,7 @@ public class AclCommandTest {
throw new RuntimeException();
});
try {
- assertThrows(RuntimeException.class, () -> new AclCommandOptions(args.toArray(new String[0])).checkArgs());
+ assertThrows(RuntimeException.class, () -> new AclCommand.AclCommandOptions(args.toArray(new String[0])).checkArgs());
} finally {
Exit.resetExitProcedure();
}
@@ -628,15 +578,15 @@ public class AclCommandTest {
private void checkNotThrow(List args) {
AtomicReference exitStatus = new AtomicReference<>();
- org.apache.kafka.common.utils.Exit.setExitProcedure((status, __) -> {
+ Exit.setExitProcedure((status, __) -> {
exitStatus.set(status);
throw new RuntimeException();
});
try {
- assertDoesNotThrow(() -> new AclCommandOptions(args.toArray(new String[0])).checkArgs());
+ assertDoesNotThrow(() -> new AclCommand.AclCommandOptions(args.toArray(new String[0])).checkArgs());
assertNull(exitStatus.get());
} finally {
- org.apache.kafka.common.utils.Exit.resetExitProcedure();
+ Exit.resetExitProcedure();
}
}
}