From ea30ec4b5628fed7bdea228f6bf149a23a415816 Mon Sep 17 00:00:00 2001 From: Gantigmaa Selenge <39860586+tinaselenge@users.noreply.github.com> Date: Thu, 2 Mar 2023 06:30:07 +0000 Subject: [PATCH] KAFKA-14590: Move DelegationTokenCommand to tools (#13172) KAFKA-14590: Move DelegationTokenCommand to tools Reviewers: Luke Chen , Christo Lolov , Federico Valeri --- .../kafka/clients/admin/MockAdminClient.java | 91 +++++- .../kafka/admin/DelegationTokenCommand.scala | 224 ------------- .../admin/DelegationTokenCommandTest.scala | 146 --------- .../kafka/tools/DelegationTokenCommand.java | 308 ++++++++++++++++++ .../tools/DelegationTokenCommandTest.java | 110 +++++++ 5 files changed, 501 insertions(+), 378 deletions(-) delete mode 100644 core/src/main/scala/kafka/admin/DelegationTokenCommand.scala delete mode 100644 core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala create mode 100644 tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java create mode 100644 tools/src/test/java/org/apache/kafka/tools/DelegationTokenCommandTest.java diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 32b659fe4ae..7a32bcf5a16 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -50,8 +50,11 @@ import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.quota.ClientQuotaAlteration; import org.apache.kafka.common.quota.ClientQuotaFilter; import org.apache.kafka.common.requests.DescribeLogDirsResponse; +import org.apache.kafka.common.errors.DelegationTokenNotFoundException; +import org.apache.kafka.common.errors.InvalidPrincipalTypeException; import java.time.Duration; +import java.util.Arrays; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -61,6 +64,9 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.token.delegation.DelegationToken; +import org.apache.kafka.common.security.token.delegation.TokenInformation; public class MockAdminClient extends AdminClient { public static final String DEFAULT_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA"; @@ -94,6 +100,8 @@ public class MockAdminClient extends AdminClient { private Map mockMetrics = new HashMap<>(); + private final List allTokens = new ArrayList<>(); + public static Builder create() { return new Builder(); } @@ -182,7 +190,7 @@ public class MockAdminClient extends AdminClient { return new MockAdminClient(brokers, controller == null ? brokers.get(0) : controller, clusterId, - defaultPartitions != null ? defaultPartitions.shortValue() : 1, + defaultPartitions != null ? defaultPartitions : 1, defaultReplicationFactor != null ? defaultReplicationFactor.shortValue() : Math.min(brokers.size(), 3), brokerLogDirs, usingRaftController, @@ -596,22 +604,89 @@ public class MockAdminClient extends AdminClient { @Override synchronized public CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); + KafkaFutureImpl future = new KafkaFutureImpl<>(); + + for (KafkaPrincipal renewer : options.renewers()) { + if (!renewer.getPrincipalType().equals(KafkaPrincipal.USER_TYPE)) { + future.completeExceptionally(new InvalidPrincipalTypeException("")); + return new CreateDelegationTokenResult(future); + } + } + + String tokenId = Uuid.randomUuid().toString(); + TokenInformation tokenInfo = new TokenInformation(tokenId, options.renewers().get(0), options.renewers(), System.currentTimeMillis(), options.maxlifeTimeMs(), -1); + DelegationToken token = new DelegationToken(tokenInfo, tokenId.getBytes()); + allTokens.add(token); + future.complete(token); + + return new CreateDelegationTokenResult(future); } @Override synchronized public RenewDelegationTokenResult renewDelegationToken(byte[] hmac, RenewDelegationTokenOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); + KafkaFutureImpl future = new KafkaFutureImpl<>(); + + Boolean tokenFound = false; + Long expiryTimestamp = options.renewTimePeriodMs(); + for (DelegationToken token : allTokens) { + if (Arrays.equals(token.hmac(), hmac)) { + token.tokenInfo().setExpiryTimestamp(expiryTimestamp); + tokenFound = true; + } + } + + if (tokenFound) { + future.complete(expiryTimestamp); + } else { + future.completeExceptionally(new DelegationTokenNotFoundException("")); + } + + return new RenewDelegationTokenResult(future); } @Override synchronized public ExpireDelegationTokenResult expireDelegationToken(byte[] hmac, ExpireDelegationTokenOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); + KafkaFutureImpl future = new KafkaFutureImpl<>(); + + Long expiryTimestamp = options.expiryTimePeriodMs(); + List tokensToRemove = new ArrayList<>(); + Boolean tokenFound = false; + for (DelegationToken token : allTokens) { + if (Arrays.equals(token.hmac(), hmac)) { + if (expiryTimestamp == -1 || expiryTimestamp < System.currentTimeMillis()) { + tokensToRemove.add(token); + } + tokenFound = true; + } + } + + if (tokenFound) { + allTokens.removeAll(tokensToRemove); + future.complete(expiryTimestamp); + } else { + future.completeExceptionally(new DelegationTokenNotFoundException("")); + } + + return new ExpireDelegationTokenResult(future); } @Override synchronized public DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); + KafkaFutureImpl> future = new KafkaFutureImpl<>(); + + if (options.owners().isEmpty()) { + future.complete(allTokens); + } else { + List tokensResult = new ArrayList<>(); + for (DelegationToken token : allTokens) { + if (options.owners().contains(token.tokenInfo().owner())) { + tokensResult.add(token); + } + } + future.complete(tokensResult); + } + + return new DescribeDelegationTokenResult(future); } @Override @@ -782,7 +857,7 @@ public class MockAdminClient extends AdminClient { case BROKER: { int brokerId; try { - brokerId = Integer.valueOf(resource.name()); + brokerId = Integer.parseInt(resource.name()); } catch (NumberFormatException e) { return e; } @@ -921,7 +996,7 @@ public class MockAdminClient extends AdminClient { newReassignments.entrySet()) { TopicPartition partition = entry.getKey(); Optional newReassignment = entry.getValue(); - KafkaFutureImpl future = new KafkaFutureImpl(); + KafkaFutureImpl future = new KafkaFutureImpl<>(); futures.put(partition, future); TopicMetadata topicMetadata = allTopics.get(partition.topic()); if (partition.partition() < 0 || @@ -1058,7 +1133,7 @@ public class MockAdminClient extends AdminClient { ) { Map> results = new HashMap<>(); for (Map.Entry entry : featureUpdates.entrySet()) { - KafkaFutureImpl future = new KafkaFutureImpl(); + KafkaFutureImpl future = new KafkaFutureImpl<>(); String feature = entry.getKey(); try { short cur = featureLevels.getOrDefault(feature, (short) 0); diff --git a/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala b/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala deleted file mode 100644 index 7dce0d58515..00000000000 --- a/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala +++ /dev/null @@ -1,224 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.admin - -import java.text.SimpleDateFormat -import java.util -import java.util.Base64 -import joptsimple.ArgumentAcceptingOptionSpec -import kafka.utils.{Exit, Logging} -import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.admin.{Admin, CreateDelegationTokenOptions, DescribeDelegationTokenOptions, ExpireDelegationTokenOptions, RenewDelegationTokenOptions} -import org.apache.kafka.common.security.auth.KafkaPrincipal -import org.apache.kafka.common.security.token.delegation.DelegationToken -import org.apache.kafka.common.utils.{SecurityUtils, Utils} -import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils} - -import scala.jdk.CollectionConverters._ - -/** - * A command to manage delegation token. - */ -object DelegationTokenCommand extends Logging { - - def main(args: Array[String]): Unit = { - val opts = new DelegationTokenCommandOptions(args) - - CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to create, renew, expire, or describe delegation tokens.") - - // should have exactly one action - val actions = Seq(opts.createOpt, opts.renewOpt, opts.expiryOpt, opts.describeOpt).count(opts.options.has _) - if(actions != 1) - CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --create, --renew, --expire or --describe") - - opts.checkArgs() - - val adminClient = createAdminClient(opts) - - var exitCode = 0 - try { - if(opts.options.has(opts.createOpt)) - createToken(adminClient, opts) - else if(opts.options.has(opts.renewOpt)) - renewToken(adminClient, opts) - else if(opts.options.has(opts.expiryOpt)) - expireToken(adminClient, opts) - else if(opts.options.has(opts.describeOpt)) - describeToken(adminClient, opts) - } catch { - case e: Throwable => - println("Error while executing delegation token command : " + e.getMessage) - error(Utils.stackTrace(e)) - exitCode = 1 - } finally { - adminClient.close() - Exit.exit(exitCode) - } - } - - def createToken(adminClient: Admin, opts: DelegationTokenCommandOptions): DelegationToken = { - val renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt).getOrElse(new util.LinkedList[KafkaPrincipal]()) - val maxLifeTimeMs = opts.options.valueOf(opts.maxLifeTimeOpt).longValue - - println("Calling create token operation with renewers :" + renewerPrincipals +" , max-life-time-period :"+ maxLifeTimeMs) - val createDelegationTokenOptions = new CreateDelegationTokenOptions().maxlifeTimeMs(maxLifeTimeMs).renewers(renewerPrincipals) - val ownerPrincipal = getPrincipals(opts, opts.ownerPrincipalsOpt) - if (ownerPrincipal.isDefined) - createDelegationTokenOptions.owner(ownerPrincipal.get.asScala.head) - val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) - val token = createResult.delegationToken().get() - println("Created delegation token with tokenId : %s".format(token.tokenInfo.tokenId)); printToken(List(token)) - token - } - - def printToken(tokens: List[DelegationToken]): Unit = { - val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm") - print("\n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s".format("TOKENID", "HMAC", "OWNER", "REQUESTER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE")) - for (token <- tokens) { - val tokenInfo = token.tokenInfo - print("\n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s".format( - tokenInfo.tokenId, - token.hmacAsBase64String, - tokenInfo.owner, - tokenInfo.tokenRequester(), - tokenInfo.renewersAsString, - dateFormat.format(tokenInfo.issueTimestamp), - dateFormat.format(tokenInfo.expiryTimestamp), - dateFormat.format(tokenInfo.maxTimestamp))) - println() - } - } - - private def getPrincipals(opts: DelegationTokenCommandOptions, principalOptionSpec: ArgumentAcceptingOptionSpec[String]): Option[util.List[KafkaPrincipal]] = { - if (opts.options.has(principalOptionSpec)) - Some(opts.options.valuesOf(principalOptionSpec).asScala.map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toList.asJava) - else - None - } - - def renewToken(adminClient: Admin, opts: DelegationTokenCommandOptions): Long = { - val hmac = opts.options.valueOf(opts.hmacOpt) - val renewTimePeriodMs = opts.options.valueOf(opts.renewTimePeriodOpt).longValue() - println("Calling renew token operation with hmac :" + hmac +" , renew-time-period :"+ renewTimePeriodMs) - val renewResult = adminClient.renewDelegationToken(Base64.getDecoder.decode(hmac), new RenewDelegationTokenOptions().renewTimePeriodMs(renewTimePeriodMs)) - val expiryTimeStamp = renewResult.expiryTimestamp().get() - val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm") - println("Completed renew operation. New expiry date : %s".format(dateFormat.format(expiryTimeStamp))) - expiryTimeStamp - } - - def expireToken(adminClient: Admin, opts: DelegationTokenCommandOptions): Long = { - val hmac = opts.options.valueOf(opts.hmacOpt) - val expiryTimePeriodMs = opts.options.valueOf(opts.expiryTimePeriodOpt).longValue() - println("Calling expire token operation with hmac :" + hmac +" , expire-time-period : "+ expiryTimePeriodMs) - val expireResult = adminClient.expireDelegationToken(Base64.getDecoder.decode(hmac), new ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs)) - val expiryTimeStamp = expireResult.expiryTimestamp().get() - val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm") - println("Completed expire operation. New expiry date : %s".format(dateFormat.format(expiryTimeStamp))) - expiryTimeStamp - } - - def describeToken(adminClient: Admin, opts: DelegationTokenCommandOptions): List[DelegationToken] = { - val ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt) - if (ownerPrincipals.isEmpty) - println("Calling describe token operation for current user.") - else - println("Calling describe token operation for owners :" + ownerPrincipals.get) - - val describeResult = adminClient.describeDelegationToken(new DescribeDelegationTokenOptions().owners(ownerPrincipals.orNull)) - val tokens = describeResult.delegationTokens().get().asScala.toList - println("Total number of tokens : %s".format(tokens.size)); printToken(tokens) - tokens - } - - private def createAdminClient(opts: DelegationTokenCommandOptions): Admin = { - val props = Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt)) - Admin.create(props) - } - - class DelegationTokenCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) { - val BootstrapServerDoc = "REQUIRED: server(s) to use for bootstrapping." - val CommandConfigDoc = "REQUIRED: A property file containing configs to be passed to Admin Client. Token management" + - " operations are allowed in secure mode only. This config file is used to pass security related configs." - - val bootstrapServerOpt = parser.accepts("bootstrap-server", BootstrapServerDoc) - .withRequiredArg - .ofType(classOf[String]) - val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc) - .withRequiredArg - .ofType(classOf[String]) - - val createOpt = parser.accepts("create", "Create a new delegation token. Use --renewer-principal option to pass renewers principals.") - val renewOpt = parser.accepts("renew", "Renew delegation token. Use --renew-time-period option to set renew time period.") - val expiryOpt = parser.accepts("expire", "Expire delegation token. Use --expiry-time-period option to expire the token.") - val describeOpt = parser.accepts("describe", "Describe delegation tokens for the given principals. Use --owner-principal to pass owner/renewer principals." + - " If --owner-principal option is not supplied, all the user owned tokens and tokens where user have Describe permission will be returned.") - - val ownerPrincipalsOpt = parser.accepts("owner-principal", "owner is a kafka principal. It is should be in principalType:name format.") - .withOptionalArg() - .ofType(classOf[String]) - - val renewPrincipalsOpt = parser.accepts("renewer-principal", "renewer is a kafka principal. It is should be in principalType:name format.") - .withOptionalArg() - .ofType(classOf[String]) - - val maxLifeTimeOpt = parser.accepts("max-life-time-period", "Max life period for the token in milliseconds. If the value is -1," + - " then token max life time will default to a server side config value (delegation.token.max.lifetime.ms).") - .withOptionalArg() - .ofType(classOf[Long]) - - val renewTimePeriodOpt = parser.accepts("renew-time-period", "Renew time period in milliseconds. If the value is -1, then the" + - " renew time period will default to a server side config value (delegation.token.expiry.time.ms).") - .withOptionalArg() - .ofType(classOf[Long]) - - val expiryTimePeriodOpt = parser.accepts("expiry-time-period", "Expiry time period in milliseconds. If the value is -1, then the" + - " token will get invalidated immediately." ) - .withOptionalArg() - .ofType(classOf[Long]) - - val hmacOpt = parser.accepts("hmac", "HMAC of the delegation token") - .withOptionalArg - .ofType(classOf[String]) - - options = parser.parse(args : _*) - - def checkArgs(): Unit = { - // check required args - CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, commandConfigOpt) - - if (options.has(createOpt)) - CommandLineUtils.checkRequiredArgs(parser, options, maxLifeTimeOpt) - - if (options.has(renewOpt)) { - CommandLineUtils.checkRequiredArgs(parser, options, hmacOpt, renewTimePeriodOpt) - } - - if (options.has(expiryOpt)) { - CommandLineUtils.checkRequiredArgs(parser, options, hmacOpt, expiryTimePeriodOpt) - } - - // check invalid args - CommandLineUtils.checkInvalidArgs(parser, options, createOpt, hmacOpt, renewTimePeriodOpt, expiryTimePeriodOpt) - CommandLineUtils.checkInvalidArgs(parser, options, renewOpt, renewPrincipalsOpt, maxLifeTimeOpt, expiryTimePeriodOpt, ownerPrincipalsOpt) - CommandLineUtils.checkInvalidArgs(parser, options, expiryOpt, renewOpt, maxLifeTimeOpt, renewTimePeriodOpt, ownerPrincipalsOpt) - CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, renewTimePeriodOpt, maxLifeTimeOpt, hmacOpt, renewTimePeriodOpt, expiryTimePeriodOpt) - } - } -} diff --git a/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala b/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala deleted file mode 100644 index 932a4d27b84..00000000000 --- a/core/src/test/scala/unit/kafka/admin/DelegationTokenCommandTest.scala +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.admin - -import java.util - -import kafka.admin.DelegationTokenCommand.DelegationTokenCommandOptions -import kafka.api.{KafkaSasl, SaslSetup} -import kafka.server.{BaseRequestTest, KafkaConfig} -import kafka.utils.{JaasTestUtils, TestUtils} -import org.apache.kafka.clients.admin.{Admin, AdminClientConfig} -import org.apache.kafka.common.security.auth.SecurityProtocol -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo} - -import scala.collection.mutable.ListBuffer -import scala.concurrent.ExecutionException - -class DelegationTokenCommandTest extends BaseRequestTest with SaslSetup { - override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT - private val kafkaClientSaslMechanism = "PLAIN" - private val kafkaServerSaslMechanisms = List("PLAIN") - protected override val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) - protected override val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) - var adminClient: Admin = _ - - override def brokerCount = 1 - - @BeforeEach - override def setUp(testInfo: TestInfo): Unit = { - startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, JaasTestUtils.KafkaServerContextName)) - super.setUp(testInfo) - } - - override def generateConfigs = { - val props = TestUtils.createBrokerConfigs(brokerCount, zkConnect, - enableControlledShutdown = false, - interBrokerSecurityProtocol = Some(securityProtocol), - trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, enableToken = true) - props.foreach(brokerPropertyOverrides) - props.map(KafkaConfig.fromProps) - } - - private def createAdminConfig: util.Map[String, Object] = { - val config = new util.HashMap[String, Object] - config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) - val securityProps: util.Map[Object, Object] = - TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties) - securityProps.forEach { (key, value) => config.put(key.asInstanceOf[String], value) } - config - } - - @Test - def testDelegationTokenRequests(): Unit = { - adminClient = Admin.create(createAdminConfig) - val renewer1 = "User:renewer1" - val renewer2 = "User:renewer2" - - // create token1 with renewer1 - val tokenCreated = DelegationTokenCommand.createToken(adminClient, getCreateOpts(List(renewer1))) - - var tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List())) - assertTrue(tokens.size == 1) - val token1 = tokens.head - assertEquals(token1, tokenCreated) - - // create token2 with renewer2 - val token2 = DelegationTokenCommand.createToken(adminClient, getCreateOpts(List(renewer2))) - - tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List())) - assertTrue(tokens.size == 2) - assertEquals(Set(token1, token2), tokens.toSet) - - //get tokens for renewer2 - tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List(renewer2))) - assertTrue(tokens.size == 1) - assertEquals(Set(token2), tokens.toSet) - - //test renewing tokens - val expiryTimestamp = DelegationTokenCommand.renewToken(adminClient, getRenewOpts(token1.hmacAsBase64String())) - val renewedToken = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List(renewer1))).head - assertEquals(expiryTimestamp, renewedToken.tokenInfo().expiryTimestamp()) - - //test expire tokens - DelegationTokenCommand.expireToken(adminClient, getExpireOpts(token1.hmacAsBase64String())) - DelegationTokenCommand.expireToken(adminClient, getExpireOpts(token2.hmacAsBase64String())) - - tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List())) - assertTrue(tokens.size == 0) - - //create token with invalid renewer principal type - assertThrows(classOf[ExecutionException], () => DelegationTokenCommand.createToken(adminClient, getCreateOpts(List("Group:Renewer3")))) - - // try describing tokens for unknown owner - assertTrue(DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(List("User:Unknown"))).isEmpty) - } - - private def getCreateOpts(renewers: List[String]): DelegationTokenCommandOptions = { - val opts = ListBuffer("--bootstrap-server", bootstrapServers(), "--max-life-time-period", "-1", - "--command-config", "testfile", "--create") - renewers.foreach(renewer => opts ++= ListBuffer("--renewer-principal", renewer)) - new DelegationTokenCommandOptions(opts.toArray) - } - - private def getDescribeOpts(owners: List[String]): DelegationTokenCommandOptions = { - val opts = ListBuffer("--bootstrap-server", bootstrapServers(), "--command-config", "testfile", "--describe") - owners.foreach(owner => opts ++= ListBuffer("--owner-principal", owner)) - new DelegationTokenCommandOptions(opts.toArray) - } - - private def getRenewOpts(hmac: String): DelegationTokenCommandOptions = { - val opts = Array("--bootstrap-server", bootstrapServers(), "--command-config", "testfile", "--renew", - "--renew-time-period", "-1", - "--hmac", hmac) - new DelegationTokenCommandOptions(opts) - } - - private def getExpireOpts(hmac: String): DelegationTokenCommandOptions = { - val opts = Array("--bootstrap-server", bootstrapServers(), "--command-config", "testfile", "--expire", - "--expiry-time-period", "-1", - "--hmac", hmac) - new DelegationTokenCommandOptions(opts) - } - - @AfterEach - override def tearDown(): Unit = { - if (adminClient != null) - adminClient.close() - super.tearDown() - closeSasl() - } -} diff --git a/tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java b/tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java new file mode 100644 index 00000000000..aca7a2515ab --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.stream.Stream; +import joptsimple.OptionSpec; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.CreateDelegationTokenOptions; +import org.apache.kafka.clients.admin.CreateDelegationTokenResult; +import org.apache.kafka.clients.admin.DescribeDelegationTokenOptions; +import org.apache.kafka.clients.admin.DescribeDelegationTokenResult; +import org.apache.kafka.clients.admin.ExpireDelegationTokenOptions; +import org.apache.kafka.clients.admin.ExpireDelegationTokenResult; +import org.apache.kafka.clients.admin.RenewDelegationTokenOptions; +import org.apache.kafka.clients.admin.RenewDelegationTokenResult; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.token.delegation.DelegationToken; +import org.apache.kafka.common.security.token.delegation.TokenInformation; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.SecurityUtils; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; + +public class DelegationTokenCommand { + public static void main(String... args) { + Exit.exit(mainNoExit(args)); + } + + static int mainNoExit(String... args) { + try { + execute(args); + return 0; + } catch (TerseException e) { + System.err.println(e.getMessage()); + return 1; + } catch (Throwable e) { + System.err.println(e.getMessage()); + System.err.println(Utils.stackTrace(e)); + return 1; + } + } + + static void execute(String... args) throws Exception { + DelegationTokenCommandOptions opts = new DelegationTokenCommandOptions(args); + CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to create, renew, expire, or describe delegation tokens."); + + // should have exactly one action + long numberOfActions = Stream.of(opts.hasCreateOpt(), opts.hasRenewOpt(), opts.hasExpireOpt(), opts.hasDescribeOpt()).filter(b -> b).count(); + if (numberOfActions != 1) { + CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --create, --renew, --expire or --describe"); + } + + opts.checkArgs(); + + try (Admin adminClient = createAdminClient(opts)) { + if (opts.hasCreateOpt()) { + createToken(adminClient, opts); + } else if (opts.hasRenewOpt()) { + renewToken(adminClient, opts); + } else if (opts.hasExpireOpt()) { + expireToken(adminClient, opts); + } else if (opts.hasDescribeOpt()) { + describeToken(adminClient, opts); + } + } + } + + public static DelegationToken createToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException { + List renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt); + Long maxLifeTimeMs = opts.maxLifeTime(); + + System.out.println("Calling create token operation with renewers :" + renewerPrincipals + " , max-life-time-period :" + maxLifeTimeMs); + CreateDelegationTokenOptions createDelegationTokenOptions = new CreateDelegationTokenOptions().maxlifeTimeMs(maxLifeTimeMs).renewers(renewerPrincipals); + + List ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt); + if (!ownerPrincipals.isEmpty()) { + createDelegationTokenOptions.owner(ownerPrincipals.get(0)); + } + + CreateDelegationTokenResult createResult = adminClient.createDelegationToken(createDelegationTokenOptions); + DelegationToken token = createResult.delegationToken().get(); + System.out.println("Created delegation token with tokenId : " + token.tokenInfo().tokenId()); + printToken(Collections.singletonList(token)); + + return token; + } + + private static void printToken(List tokens) { + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm"); + System.out.printf("%n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s%n", "TOKENID", "HMAC", "OWNER", "REQUESTER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE"); + + for (DelegationToken token : tokens) { + TokenInformation tokenInfo = token.tokenInfo(); + System.out.printf("%n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s%n", + tokenInfo.tokenId(), + token.hmacAsBase64String(), + tokenInfo.owner(), + tokenInfo.tokenRequester(), + tokenInfo.renewersAsString(), + dateFormat.format(tokenInfo.issueTimestamp()), + dateFormat.format(tokenInfo.expiryTimestamp()), + dateFormat.format(tokenInfo.maxTimestamp())); + } + } + + private static List getPrincipals(DelegationTokenCommandOptions opts, OptionSpec principalOptionSpec) { + List principals = new ArrayList<>(); + + if (opts.options.has(principalOptionSpec)) { + for (String e : opts.options.valuesOf(principalOptionSpec)) + principals.add(SecurityUtils.parseKafkaPrincipal(e.trim())); + } + return principals; + } + + public static Long renewToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException { + String hmac = opts.hmac(); + Long renewTimePeriodMs = opts.renewTimePeriod(); + + System.out.println("Calling renew token operation with hmac :" + hmac + " , renew-time-period :" + renewTimePeriodMs); + RenewDelegationTokenResult renewResult = adminClient.renewDelegationToken(Base64.getDecoder().decode(hmac), new RenewDelegationTokenOptions().renewTimePeriodMs(renewTimePeriodMs)); + Long expiryTimeStamp = renewResult.expiryTimestamp().get(); + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm"); + System.out.printf("Completed renew operation. New expiry date : %s", dateFormat.format(expiryTimeStamp)); + return expiryTimeStamp; + } + + public static void expireToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException { + String hmac = opts.hmac(); + Long expiryTimePeriodMs = opts.expiryTimePeriod(); + + System.out.println("Calling expire token operation with hmac :" + hmac + " , expire-time-period :" + expiryTimePeriodMs); + ExpireDelegationTokenResult renewResult = adminClient.expireDelegationToken(Base64.getDecoder().decode(hmac), new ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs)); + Long expiryTimeStamp = renewResult.expiryTimestamp().get(); + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm"); + System.out.printf("Completed expire operation. New expiry date : %s", dateFormat.format(expiryTimeStamp)); + } + + public static List describeToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException { + List ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt); + + if (ownerPrincipals.isEmpty()) { + System.out.println("Calling describe token operation for current user."); + } else { + System.out.printf("Calling describe token operation for owners: %s%n", ownerPrincipals); + } + + DescribeDelegationTokenResult describeResult = adminClient.describeDelegationToken(new DescribeDelegationTokenOptions().owners(ownerPrincipals)); + List tokens = describeResult.delegationTokens().get(); + System.out.printf("Total number of tokens : %d", tokens.size()); + printToken(tokens); + return tokens; + } + + private static Admin createAdminClient(DelegationTokenCommandOptions opts) throws IOException { + Properties props = Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)); + props.put("bootstrap.servers", opts.options.valueOf(opts.bootstrapServerOpt)); + return Admin.create(props); + } + + static class DelegationTokenCommandOptions extends CommandDefaultOptions { + public final OptionSpec bootstrapServerOpt; + public final OptionSpec commandConfigOpt; + public final OptionSpec createOpt; + public final OptionSpec renewOpt; + public final OptionSpec expiryOpt; + public final OptionSpec describeOpt; + public final OptionSpec ownerPrincipalsOpt; + public final OptionSpec renewPrincipalsOpt; + public final OptionSpec maxLifeTimeOpt; + public final OptionSpec renewTimePeriodOpt; + public final OptionSpec expiryTimePeriodOpt; + public final OptionSpec hmacOpt; + + public DelegationTokenCommandOptions(String[] args) { + super(args); + + String bootstrapServerDoc = "REQUIRED: server(s) to use for bootstrapping."; + String commandConfigDoc = "REQUIRED: A property file containing configs to be passed to Admin Client. Token management" + + " operations are allowed in secure mode only. This config file is used to pass security related configs."; + + this.bootstrapServerOpt = parser.accepts("bootstrap-server", bootstrapServerDoc) + .withRequiredArg() + .ofType(String.class); + + this.commandConfigOpt = parser.accepts("command-config", commandConfigDoc) + .withRequiredArg() + .ofType(String.class); + + this.createOpt = parser.accepts("create", "Create a new delegation token. Use --renewer-principal option to pass renewer principals."); + this.renewOpt = parser.accepts("renew", "Renew delegation token. Use --renew-time-period option to set renew time period."); + this.expiryOpt = parser.accepts("expire", "Expire delegation token. Use --expiry-time-period option to expire the token."); + this.describeOpt = parser.accepts("describe", "Describe delegation tokens for the given principals. Use --owner-principal to pass owner/renewer principals." + + " If --owner-principal option is not supplied, all the user-owned tokens and tokens where the user has Describe permissions will be returned."); + + this.ownerPrincipalsOpt = parser.accepts("owner-principal", "owner is a Kafka principal. They should be in principalType:name format.") + .withOptionalArg() + .ofType(String.class); + + this.renewPrincipalsOpt = parser.accepts("renewer-principal", "renewer is a Kafka principal. They should be in principalType:name format.") + .withOptionalArg() + .ofType(String.class); + + this.maxLifeTimeOpt = parser.accepts("max-life-time-period", "Max life period for the token in milliseconds. If the value is -1," + + " then token max life time will default to the server side config value of (delegation.token.max.lifetime.ms).") + .withOptionalArg() + .ofType(Long.class); + + this.renewTimePeriodOpt = parser.accepts("renew-time-period", "Renew time period in milliseconds. If the value is -1, then the" + + " renew time period will default to the server side config value of (delegation.token.expiry.time.ms).") + .withOptionalArg() + .ofType(Long.class); + + this.expiryTimePeriodOpt = parser.accepts("expiry-time-period", "Expiry time period in milliseconds. If the value is -1, then the" + + " token will get invalidated immediately.") + .withOptionalArg() + .ofType(Long.class); + + this.hmacOpt = parser.accepts("hmac", "HMAC of the delegation token") + .withOptionalArg() + .ofType(String.class); + + options = parser.parse(args); + } + + public boolean hasCreateOpt() { + return options.has(createOpt); + } + + public boolean hasRenewOpt() { + return options.has(renewOpt); + } + + public boolean hasExpireOpt() { + return options.has(expiryOpt); + } + + public boolean hasDescribeOpt() { + return options.has(describeOpt); + } + + public long maxLifeTime() { + return options.valueOf(maxLifeTimeOpt); + } + + public long renewTimePeriod() { + return options.valueOf(renewTimePeriodOpt); + } + + public long expiryTimePeriod() { + return options.valueOf(expiryTimePeriodOpt); + } + + public String hmac() { + return options.valueOf(hmacOpt); + } + + public void checkArgs() { + // check required args + CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, commandConfigOpt); + + if (options.has(createOpt)) { + CommandLineUtils.checkRequiredArgs(parser, options, maxLifeTimeOpt); + } + + if (options.has(renewOpt)) { + CommandLineUtils.checkRequiredArgs(parser, options, hmacOpt, renewTimePeriodOpt); + } + + if (options.has(expiryOpt)) { + CommandLineUtils.checkRequiredArgs(parser, options, hmacOpt, expiryTimePeriodOpt); + } + + // check invalid args + CommandLineUtils.checkInvalidArgs(parser, options, createOpt, new HashSet<>(Arrays.asList(hmacOpt, renewTimePeriodOpt, expiryTimePeriodOpt))); + CommandLineUtils.checkInvalidArgs(parser, options, renewOpt, new HashSet<>(Arrays.asList(renewPrincipalsOpt, maxLifeTimeOpt, expiryTimePeriodOpt, ownerPrincipalsOpt))); + CommandLineUtils.checkInvalidArgs(parser, options, expiryOpt, new HashSet<>(Arrays.asList(renewOpt, maxLifeTimeOpt, renewTimePeriodOpt, ownerPrincipalsOpt))); + CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, new HashSet<>(Arrays.asList(renewTimePeriodOpt, maxLifeTimeOpt, hmacOpt, renewTimePeriodOpt, expiryTimePeriodOpt))); + } + } +} + + diff --git a/tools/src/test/java/org/apache/kafka/tools/DelegationTokenCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/DelegationTokenCommandTest.java new file mode 100644 index 00000000000..fb923629dca --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/DelegationTokenCommandTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.MockAdminClient; +import org.apache.kafka.common.security.token.delegation.DelegationToken; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class DelegationTokenCommandTest { + + @Test + public void testDelegationTokenRequests() throws ExecutionException, InterruptedException { + Admin adminClient = new MockAdminClient.Builder().build(); + + String renewer1 = "User:renewer1"; + String renewer2 = "User:renewer2"; + + // create token1 with renewer1 + DelegationToken tokenCreated = DelegationTokenCommand.createToken(adminClient, getCreateOpts(renewer1)); + + List tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts("")); + assertEquals(1, tokens.size()); + DelegationToken token1 = tokens.get(0); + assertEquals(token1, tokenCreated); + + // create token2 with renewer2 + DelegationToken token2 = DelegationTokenCommand.createToken(adminClient, getCreateOpts(renewer2)); + + tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts("")); + assertEquals(2, tokens.size()); + assertEquals(Arrays.asList(token1, token2), tokens); + + //get tokens for renewer2 + tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(renewer2)); + assertEquals(1, tokens.size()); + assertEquals(Collections.singletonList(token2), tokens); + + //test renewing tokens + Long expiryTimestamp = DelegationTokenCommand.renewToken(adminClient, getRenewOpts(token1.hmacAsBase64String())); + DelegationToken renewedToken = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(renewer1)).get(0); + assertEquals(expiryTimestamp, renewedToken.tokenInfo().expiryTimestamp()); + + //test expire tokens + DelegationTokenCommand.expireToken(adminClient, getExpireOpts(token1.hmacAsBase64String())); + DelegationTokenCommand.expireToken(adminClient, getExpireOpts(token2.hmacAsBase64String())); + + tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts("")); + assertEquals(0, tokens.size()); + + //create token with invalid renewer principal type + assertThrows(ExecutionException.class, () -> DelegationTokenCommand.createToken(adminClient, getCreateOpts("Group:Renewer3"))); + + // try describing tokens for unknown owner + assertTrue(DelegationTokenCommand.describeToken(adminClient, getDescribeOpts("User:Unknown")).isEmpty()); + + } + + private DelegationTokenCommand.DelegationTokenCommandOptions getCreateOpts(String renewer) { + String[] args = {"--bootstrap-server", "localhost:9092", "--max-life-time-period", "-1", "--command-config", "testfile", "--create", "--renewer-principal", renewer}; + return new DelegationTokenCommand.DelegationTokenCommandOptions(args); + } + + private DelegationTokenCommand.DelegationTokenCommandOptions getDescribeOpts(String owner) { + List args = new ArrayList<>(); + args.add("--bootstrap-server"); + args.add("localhost:9092"); + args.add("--command-config"); + args.add("testfile"); + args.add("--describe"); + if (!owner.equals("")) { + args.add("--owner-principal"); + args.add(owner); + } + return new DelegationTokenCommand.DelegationTokenCommandOptions(args.toArray(new String[0])); + } + + private DelegationTokenCommand.DelegationTokenCommandOptions getRenewOpts(String hmac) { + String[] args = {"--bootstrap-server", "localhost:9092", "--command-config", "testfile", "--renew", "--renew-time-period", "604800000", "--hmac", hmac}; + return new DelegationTokenCommand.DelegationTokenCommandOptions(args); + } + + private DelegationTokenCommand.DelegationTokenCommandOptions getExpireOpts(String hmac) { + String[] args = {"--bootstrap-server", "localhost:9092", "--command-config", "testfile", "--expire", "--expiry-time-period", "-1", "--hmac", hmac}; + return new DelegationTokenCommand.DelegationTokenCommandOptions(args); + } +}