KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions to tools (#13131)

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Christo Lolov <christololov@gmail.com>, Sagar Rao <sagarmeansocean@gmail.com>
This commit is contained in:
Federico Valeri 2023-01-26 20:06:09 +01:00 committed by GitHub
parent f9e0d03274
commit 72cfc994f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 664 additions and 567 deletions

View File

@ -1536,6 +1536,7 @@ project(':server-common') {
api project(':clients')
implementation libs.slf4jApi
implementation libs.metrics
implementation libs.joptSimple
testImplementation project(':clients')
testImplementation project(':clients').sourceSets.test.output

View File

@ -60,8 +60,9 @@
<subpackage name="tools">
<allow pkg="org.apache.kafka.clients.admin" />
<allow pkg="kafka.admin" />
<allow pkg="joptsimple" />
<allow pkg="org.apache.kafka.clients.consumer" />
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="joptsimple" />
</subpackage>
<subpackage name="coordinator">

View File

@ -347,6 +347,7 @@
<subpackage name="server">
<allow pkg="org.apache.kafka.common" />
<allow pkg="joptsimple" />
<!-- This is required to make AlterConfigPolicyTest work. -->
<allow pkg="org.apache.kafka.server.policy" />
@ -406,6 +407,7 @@
<allow pkg="net.sourceforge.argparse4j" />
<allow pkg="org.apache.log4j" />
<allow pkg="kafka.test" />
<allow pkg="joptsimple" />
</subpackage>
<subpackage name="trogdor">

View File

@ -22,10 +22,9 @@ import java.util.Properties
import joptsimple.OptionParser
import kafka.server.{KafkaConfig, KafkaRaftServer, KafkaServer, Server}
import kafka.utils.Implicits._
import kafka.utils.{CommandLineUtils, Exit, Logging}
import kafka.utils.{Exit, Logging}
import org.apache.kafka.common.utils.{Java, LoggingSignalHandler, OperatingSystem, Time, Utils}
import scala.jdk.CollectionConverters._
import org.apache.kafka.server.util.CommandLineUtils
object Kafka extends Logging {
@ -41,12 +40,12 @@ object Kafka extends Logging {
optionParser.accepts("version", "Print version information and exit.")
if (args.isEmpty || args.contains("--help")) {
CommandLineUtils.printUsageAndDie(optionParser,
CommandLineUtils.printUsageAndExit(optionParser,
"USAGE: java [options] %s server.properties [--override property=value]*".format(this.getClass.getCanonicalName.split('$').head))
}
if (args.contains("--version")) {
CommandLineUtils.printVersionAndDie()
CommandLineUtils.printVersionAndExit()
}
val props = Utils.loadProps(args(0))
@ -55,10 +54,10 @@ object Kafka extends Logging {
val options = optionParser.parse(args.slice(1, args.length): _*)
if (options.nonOptionArguments().size() > 0) {
CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(","))
CommandLineUtils.printUsageAndExit(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(","))
}
props ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala)
props ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt))
}
props
}

View File

@ -18,7 +18,6 @@
package kafka.admin
import java.util.Properties
import joptsimple._
import joptsimple.util.EnumConverter
import kafka.security.authorizer.{AclAuthorizer, AclEntry, AuthorizerUtils}
@ -33,6 +32,7 @@ import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Utils, SecurityUtils => JSecurityUtils}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
import scala.collection.mutable
@ -51,7 +51,7 @@ object AclCommand extends Logging {
val opts = new AclCommandOptions(args)
CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to manage acls on kafka.")
CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to manage acls on kafka.")
opts.checkArgs()
@ -202,8 +202,8 @@ object AclCommand extends Logging {
val defaultProps = Map(KafkaConfig.ZkEnableSecureAclsProp -> JaasUtils.isZkSaslEnabled)
val authorizerPropertiesWithoutTls =
if (opts.options.has(opts.authorizerPropertiesOpt)) {
val authorizerProperties = opts.options.valuesOf(opts.authorizerPropertiesOpt).asScala
defaultProps ++ CommandLineUtils.parseKeyValueArgs(authorizerProperties, acceptMissingValue = false).asScala
val authorizerProperties = opts.options.valuesOf(opts.authorizerPropertiesOpt)
defaultProps ++ CommandLineUtils.parseKeyValueArgs(authorizerProperties, false).asScala
} else {
defaultProps
}
@ -324,7 +324,7 @@ object AclCommand extends Logging {
private def getResourceToAcls(opts: AclCommandOptions): Map[ResourcePattern, Set[AccessControlEntry]] = {
val patternType = opts.options.valueOf(opts.resourcePatternType)
if (!patternType.isSpecific)
CommandLineUtils.printUsageAndDie(opts.parser, s"A '--resource-pattern-type' value of '$patternType' is not valid when adding acls.")
CommandLineUtils.printUsageAndExit(opts.parser, s"A '--resource-pattern-type' value of '$patternType' is not valid when adding acls.")
val resourceToAcl = getResourceFilterToAcls(opts).map {
case (filter, acls) =>
@ -332,7 +332,7 @@ object AclCommand extends Logging {
}
if (resourceToAcl.values.exists(_.isEmpty))
CommandLineUtils.printUsageAndDie(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add ACLs.")
CommandLineUtils.printUsageAndExit(opts.parser, "You must specify one of: --allow-principal, --deny-principal when trying to add ACLs.")
resourceToAcl
}
@ -430,8 +430,8 @@ object AclCommand extends Logging {
} yield new AccessControlEntry(principal.toString, host, operation, permissionType)
}
private def getHosts(opts: AclCommandOptions, hostOptionSpec: ArgumentAcceptingOptionSpec[String],
principalOptionSpec: ArgumentAcceptingOptionSpec[String]): Set[String] = {
private def getHosts(opts: AclCommandOptions, hostOptionSpec: OptionSpec[String],
principalOptionSpec: OptionSpec[String]): Set[String] = {
if (opts.options.has(hostOptionSpec))
opts.options.valuesOf(hostOptionSpec).asScala.map(_.trim).toSet
else if (opts.options.has(principalOptionSpec))
@ -440,7 +440,7 @@ object AclCommand extends Logging {
Set.empty[String]
}
private def getPrincipals(opts: AclCommandOptions, principalOptionSpec: ArgumentAcceptingOptionSpec[String]): Set[KafkaPrincipal] = {
private def getPrincipals(opts: AclCommandOptions, principalOptionSpec: OptionSpec[String]): Set[KafkaPrincipal] = {
if (opts.options.has(principalOptionSpec))
opts.options.valuesOf(principalOptionSpec).asScala.map(s => JSecurityUtils.parseKafkaPrincipal(s.trim)).toSet
else
@ -471,7 +471,7 @@ object AclCommand extends Logging {
opts.options.valuesOf(opts.userPrincipalOpt).forEach(user => resourceFilters += new ResourcePatternFilter(JResourceType.USER, user.trim, patternType))
if (resourceFilters.isEmpty && dieIfNoResourceFound)
CommandLineUtils.printUsageAndDie(opts.parser, "You must provide at least one resource: --topic <topic> or --cluster or --group <group> or --delegation-token <Delegation Token ID>")
CommandLineUtils.printUsageAndExit(opts.parser, "You must provide at least one resource: --topic <topic> or --cluster or --group <group> or --delegation-token <Delegation Token ID>")
resourceFilters
}
@ -487,7 +487,7 @@ object AclCommand extends Logging {
for ((resource, acls) <- resourceToAcls) {
val validOps = AclEntry.supportedOperations(resource.resourceType) + AclOperation.ALL
if ((acls.map(_.operation) -- validOps).nonEmpty)
CommandLineUtils.printUsageAndDie(opts.parser, s"ResourceType ${resource.resourceType} only supports operations ${validOps.mkString(",")}")
CommandLineUtils.printUsageAndExit(opts.parser, s"ResourceType ${resource.resourceType} only supports operations ${validOps.mkString(",")}")
}
}
@ -634,7 +634,7 @@ object AclCommand extends Logging {
def checkArgs(): Unit = {
if (options.has(bootstrapServerOpt) && options.has(authorizerOpt))
CommandLineUtils.printUsageAndDie(parser, "Only one of --bootstrap-server or --authorizer must be specified")
CommandLineUtils.printUsageAndExit(parser, "Only one of --bootstrap-server or --authorizer must be specified")
if (!options.has(bootstrapServerOpt)) {
CommandLineUtils.checkRequiredArgs(parser, options, authorizerPropertiesOpt)
@ -642,32 +642,32 @@ object AclCommand extends Logging {
}
if (options.has(commandConfigOpt) && !options.has(bootstrapServerOpt))
CommandLineUtils.printUsageAndDie(parser, "The --command-config option can only be used with --bootstrap-server option")
CommandLineUtils.printUsageAndExit(parser, "The --command-config option can only be used with --bootstrap-server option")
if (options.has(authorizerPropertiesOpt) && options.has(bootstrapServerOpt))
CommandLineUtils.printUsageAndDie(parser, "The --authorizer-properties option can only be used with --authorizer option")
CommandLineUtils.printUsageAndExit(parser, "The --authorizer-properties option can only be used with --authorizer option")
val actions = Seq(addOpt, removeOpt, listOpt).count(options.has)
if (actions != 1)
CommandLineUtils.printUsageAndDie(parser, "Command must include exactly one action: --list, --add, --remove. ")
CommandLineUtils.printUsageAndExit(parser, "Command must include exactly one action: --list, --add, --remove. ")
CommandLineUtils.checkInvalidArgs(parser, options, listOpt, Set(producerOpt, consumerOpt, allowHostsOpt, allowPrincipalsOpt, denyHostsOpt, denyPrincipalsOpt))
CommandLineUtils.checkInvalidArgs(parser, options, listOpt, producerOpt, consumerOpt, allowHostsOpt, allowPrincipalsOpt, denyHostsOpt, denyPrincipalsOpt)
//when --producer or --consumer is specified , user should not specify operations as they are inferred and we also disallow --deny-principals and --deny-hosts.
CommandLineUtils.checkInvalidArgs(parser, options, producerOpt, Set(operationsOpt, denyPrincipalsOpt, denyHostsOpt))
CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt, Set(operationsOpt, denyPrincipalsOpt, denyHostsOpt))
CommandLineUtils.checkInvalidArgs(parser, options, producerOpt, operationsOpt, denyPrincipalsOpt, denyHostsOpt)
CommandLineUtils.checkInvalidArgs(parser, options, consumerOpt, operationsOpt, denyPrincipalsOpt, denyHostsOpt)
if (options.has(listPrincipalsOpt) && !options.has(listOpt))
CommandLineUtils.printUsageAndDie(parser, "The --principal option is only available if --list is set")
CommandLineUtils.printUsageAndExit(parser, "The --principal option is only available if --list is set")
if (options.has(producerOpt) && !options.has(topicOpt))
CommandLineUtils.printUsageAndDie(parser, "With --producer you must specify a --topic")
CommandLineUtils.printUsageAndExit(parser, "With --producer you must specify a --topic")
if (options.has(idempotentOpt) && !options.has(producerOpt))
CommandLineUtils.printUsageAndDie(parser, "The --idempotent option is only available if --producer is set")
CommandLineUtils.printUsageAndExit(parser, "The --idempotent option is only available if --producer is set")
if (options.has(consumerOpt) && (!options.has(topicOpt) || !options.has(groupOpt) || (!options.has(producerOpt) && (options.has(clusterOpt) || options.has(transactionalIdOpt)))))
CommandLineUtils.printUsageAndDie(parser, "With --consumer you must specify a --topic and a --group and no --cluster or --transactional-id option should be specified.")
CommandLineUtils.printUsageAndExit(parser, "With --consumer you must specify a --topic and a --group and no --cluster or --transactional-id option should be specified.")
}
}
}

View File

@ -22,8 +22,6 @@ import java.io.IOException
import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}
import kafka.utils.{CommandDefaultOptions, CommandLineUtils}
import kafka.utils.Implicits._
import kafka.utils.Logging
import org.apache.kafka.common.utils.Utils
@ -42,6 +40,7 @@ import org.apache.kafka.common.utils.{KafkaThread, Time}
import org.apache.kafka.common.Node
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, MetadataRequest, MetadataResponse}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}
@ -94,7 +93,7 @@ object BrokerApiVersionsCommand {
checkArgs()
def checkArgs(): Unit = {
CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to retrieve broker version information.")
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to retrieve broker version information.")
// check required args
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
}

View File

@ -23,7 +23,7 @@ import java.util.{Collections, Properties}
import joptsimple._
import kafka.server.DynamicConfig.QuotaConfigs
import kafka.server.{ConfigEntityName, ConfigType, Defaults, DynamicBrokerConfig, DynamicConfig, KafkaConfig}
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, Logging, PasswordEncoder}
import kafka.utils.{Exit, Logging, PasswordEncoder}
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, Config => JConfig, ScramMechanism => PublicScramMechanism}
@ -37,6 +37,7 @@ import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism}
import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
import org.apache.kafka.server.log.internals.LogConfig
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.zookeeper.client.ZKClientConfig
import scala.annotation.nowarn
@ -84,7 +85,7 @@ object ConfigCommand extends Logging {
try {
val opts = new ConfigCommandOptions(args)
CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to manipulate and describe entity config for a topic, client, user, broker or ip")
CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to manipulate and describe entity config for a topic, client, user, broker or ip")
opts.checkArgs()
@ -863,10 +864,10 @@ object ConfigCommand extends Logging {
// should have exactly one action
val actions = Seq(alterOpt, describeOpt).count(options.has _)
if (actions != 1)
CommandLineUtils.printUsageAndDie(parser, "Command must include exactly one action: --describe, --alter")
CommandLineUtils.printUsageAndExit(parser, "Command must include exactly one action: --describe, --alter")
// check required args
CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, Set(describeOpt))
CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, Set(alterOpt, addConfig, deleteConfig))
CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, describeOpt)
CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, alterOpt, addConfig, deleteConfig)
val entityTypeVals = entityTypes
if (entityTypeVals.size != entityTypeVals.distinct.size)

View File

@ -28,18 +28,18 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer
import scala.collection.{Map, Seq, immutable, mutable}
import scala.util.{Failure, Success, Try}
import joptsimple.OptionSpec
import joptsimple.{OptionException, OptionSpec}
import org.apache.kafka.common.protocol.Errors
import scala.collection.immutable.TreeMap
import scala.reflect.ClassTag
import org.apache.kafka.common.ConsumerGroupState
import joptsimple.OptionException
import org.apache.kafka.common.requests.ListOffsetsResponse
object ConsumerGroupCommand extends Logging {
@ -49,17 +49,17 @@ object ConsumerGroupCommand extends Logging {
val opts = new ConsumerGroupCommandOptions(args)
try {
opts.checkArgs()
CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to list all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.")
CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to list all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.")
// should have exactly one action
val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt, opts.resetOffsetsOpt, opts.deleteOffsetsOpt).count(opts.options.has)
if (actions != 1)
CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete, --reset-offsets, --delete-offsets")
CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --list, --describe, --delete, --reset-offsets, --delete-offsets")
run(opts)
} catch {
case e: OptionException =>
CommandLineUtils.printUsageAndDie(opts.parser, e.getMessage)
CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage)
}
}
@ -85,7 +85,7 @@ object ConsumerGroupCommand extends Logging {
}
} catch {
case e: IllegalArgumentException =>
CommandLineUtils.printUsageAndDie(opts.parser, e.getMessage)
CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage)
case e: Throwable =>
printError(s"Executing consumer group command failed due to ${e.getMessage}", Some(e))
} finally {
@ -747,7 +747,7 @@ object ConsumerGroupCommand extends Logging {
if (opts.options.has(opts.resetFromFileOpt))
Nil
else
CommandLineUtils.printUsageAndDie(opts.parser, "One of the reset scopes should be defined: --all-topics, --topic.")
ToolsUtils.printUsageAndExit(opts.parser, "One of the reset scopes should be defined: --all-topics, --topic.")
}
}
@ -801,7 +801,7 @@ object ConsumerGroupCommand extends Logging {
partitionsToReset.map { topicPartition =>
logStartOffsets.get(topicPartition) match {
case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting starting offset of topic partition: $topicPartition")
case _ => ToolsUtils.printUsageAndExit(opts.parser, s"Error getting starting offset of topic partition: $topicPartition")
}
}.toMap
} else if (opts.options.has(opts.resetToLatestOpt)) {
@ -809,7 +809,7 @@ object ConsumerGroupCommand extends Logging {
partitionsToReset.map { topicPartition =>
logEndOffsets.get(topicPartition) match {
case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting ending offset of topic partition: $topicPartition")
case _ => ToolsUtils.printUsageAndExit(opts.parser, s"Error getting ending offset of topic partition: $topicPartition")
}
}.toMap
} else if (opts.options.has(opts.resetShiftByOpt)) {
@ -830,7 +830,7 @@ object ConsumerGroupCommand extends Logging {
val logTimestampOffset = logTimestampOffsets.get(topicPartition)
logTimestampOffset match {
case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting offset by timestamp of topic partition: $topicPartition")
case _ => ToolsUtils.printUsageAndExit(opts.parser, s"Error getting offset by timestamp of topic partition: $topicPartition")
}
}.toMap
} else if (opts.options.has(opts.resetByDurationOpt)) {
@ -844,7 +844,7 @@ object ConsumerGroupCommand extends Logging {
val logTimestampOffset = logTimestampOffsets.get(topicPartition)
logTimestampOffset match {
case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
case _ => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting offset by timestamp of topic partition: $topicPartition")
case _ => ToolsUtils.printUsageAndExit(opts.parser, s"Error getting offset by timestamp of topic partition: $topicPartition")
}
}.toMap
} else if (resetPlanFromFile.isDefined) {
@ -875,12 +875,12 @@ object ConsumerGroupCommand extends Logging {
val preparedOffsetsForPartitionsWithoutCommittedOffset = getLogEndOffsets(groupId, partitionsToResetWithoutCommittedOffset).map {
case (topicPartition, LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset))
case (topicPartition, _) => CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting ending offset of topic partition: $topicPartition")
case (topicPartition, _) => ToolsUtils.printUsageAndExit(opts.parser, s"Error getting ending offset of topic partition: $topicPartition")
}
preparedOffsetsForPartitionsWithCommittedOffset ++ preparedOffsetsForPartitionsWithoutCommittedOffset
} else {
CommandLineUtils.printUsageAndDie(opts.parser, "Option '%s' requires one of the following scenarios: %s".format(opts.resetOffsetsOpt, opts.allResetOffsetScenarioOpts) )
ToolsUtils.printUsageAndExit(opts.parser, "Option '%s' requires one of the following scenarios: %s".format(opts.resetOffsetsOpt, opts.allResetOffsetScenarioOpts))
}
}
@ -1095,15 +1095,15 @@ object ConsumerGroupCommand extends Logging {
if (options.has(describeOpt)) {
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
CommandLineUtils.printUsageAndDie(parser,
CommandLineUtils.printUsageAndExit(parser,
s"Option $describeOpt takes one of these options: ${allGroupSelectionScopeOpts.mkString(", ")}")
val mutuallyExclusiveOpts: Set[OptionSpec[_]] = Set(membersOpt, offsetsOpt, stateOpt)
if (mutuallyExclusiveOpts.toList.map(o => if (options.has(o)) 1 else 0).sum > 1) {
CommandLineUtils.printUsageAndDie(parser,
CommandLineUtils.printUsageAndExit(parser,
s"Option $describeOpt takes at most one of these options: ${mutuallyExclusiveOpts.mkString(", ")}")
}
if (options.has(stateOpt) && options.valueOf(stateOpt) != null)
CommandLineUtils.printUsageAndDie(parser,
CommandLineUtils.printUsageAndExit(parser,
s"Option $describeOpt does not take a value for $stateOpt")
} else {
if (options.has(timeoutMsOpt))
@ -1112,22 +1112,22 @@ object ConsumerGroupCommand extends Logging {
if (options.has(deleteOpt)) {
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
CommandLineUtils.printUsageAndDie(parser,
CommandLineUtils.printUsageAndExit(parser,
s"Option $deleteOpt takes one of these options: ${allGroupSelectionScopeOpts.mkString(", ")}")
if (options.has(topicOpt))
CommandLineUtils.printUsageAndDie(parser, s"The consumer does not support topic-specific offset " +
CommandLineUtils.printUsageAndExit(parser, s"The consumer does not support topic-specific offset " +
"deletion from a consumer group.")
}
if (options.has(deleteOffsetsOpt)) {
if (!options.has(groupOpt) || !options.has(topicOpt))
CommandLineUtils.printUsageAndDie(parser,
CommandLineUtils.printUsageAndExit(parser,
s"Option $deleteOffsetsOpt takes the following options: ${allDeleteOffsetsOpts.mkString(", ")}")
}
if (options.has(resetOffsetsOpt)) {
if (options.has(dryRunOpt) && options.has(executeOpt))
CommandLineUtils.printUsageAndDie(parser, s"Option $resetOffsetsOpt only accepts one of $executeOpt and $dryRunOpt")
CommandLineUtils.printUsageAndExit(parser, s"Option $resetOffsetsOpt only accepts one of $executeOpt and $dryRunOpt")
if (!options.has(dryRunOpt) && !options.has(executeOpt)) {
Console.err.println("WARN: No action will be performed as the --execute option is missing." +
@ -1137,21 +1137,21 @@ object ConsumerGroupCommand extends Logging {
}
if (!options.has(groupOpt) && !options.has(allGroupsOpt))
CommandLineUtils.printUsageAndDie(parser,
CommandLineUtils.printUsageAndExit(parser,
s"Option $resetOffsetsOpt takes one of these options: ${allGroupSelectionScopeOpts.mkString(", ")}")
CommandLineUtils.checkInvalidArgs(parser, options, resetToOffsetOpt, allResetOffsetScenarioOpts - resetToOffsetOpt)
CommandLineUtils.checkInvalidArgs(parser, options, resetToDatetimeOpt, allResetOffsetScenarioOpts - resetToDatetimeOpt)
CommandLineUtils.checkInvalidArgs(parser, options, resetByDurationOpt, allResetOffsetScenarioOpts - resetByDurationOpt)
CommandLineUtils.checkInvalidArgs(parser, options, resetToEarliestOpt, allResetOffsetScenarioOpts - resetToEarliestOpt)
CommandLineUtils.checkInvalidArgs(parser, options, resetToLatestOpt, allResetOffsetScenarioOpts - resetToLatestOpt)
CommandLineUtils.checkInvalidArgs(parser, options, resetToCurrentOpt, allResetOffsetScenarioOpts - resetToCurrentOpt)
CommandLineUtils.checkInvalidArgs(parser, options, resetShiftByOpt, allResetOffsetScenarioOpts - resetShiftByOpt)
CommandLineUtils.checkInvalidArgs(parser, options, resetFromFileOpt, allResetOffsetScenarioOpts - resetFromFileOpt)
CommandLineUtils.checkInvalidArgs(parser, options, resetToOffsetOpt, (allResetOffsetScenarioOpts - resetToOffsetOpt).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, resetToDatetimeOpt, (allResetOffsetScenarioOpts - resetToDatetimeOpt).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, resetByDurationOpt, (allResetOffsetScenarioOpts - resetByDurationOpt).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, resetToEarliestOpt, (allResetOffsetScenarioOpts - resetToEarliestOpt).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, resetToLatestOpt, (allResetOffsetScenarioOpts - resetToLatestOpt).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, resetToCurrentOpt, (allResetOffsetScenarioOpts - resetToCurrentOpt).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, resetShiftByOpt, (allResetOffsetScenarioOpts - resetShiftByOpt).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, resetFromFileOpt, (allResetOffsetScenarioOpts - resetFromFileOpt).asJava)
}
CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allGroupSelectionScopeOpts - groupOpt)
CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allConsumerGroupLevelOpts - describeOpt - deleteOpt - resetOffsetsOpt)
CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, allConsumerGroupLevelOpts - deleteOpt - resetOffsetsOpt)
CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, (allGroupSelectionScopeOpts - groupOpt).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, (allConsumerGroupLevelOpts - describeOpt - deleteOpt - resetOffsetsOpt).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, (allConsumerGroupLevelOpts - deleteOpt - resetOffsetsOpt).asJava )
}
}
}

View File

@ -20,17 +20,16 @@ package kafka.admin
import java.text.SimpleDateFormat
import java.util
import java.util.Base64
import joptsimple.ArgumentAcceptingOptionSpec
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, Logging}
import kafka.utils.{Exit, Logging}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{Admin, CreateDelegationTokenOptions, DescribeDelegationTokenOptions, ExpireDelegationTokenOptions, RenewDelegationTokenOptions}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.token.delegation.DelegationToken
import org.apache.kafka.common.utils.{SecurityUtils, Utils}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
import scala.collection.Set
/**
* A command to manage delegation token.
@ -40,12 +39,12 @@ object DelegationTokenCommand extends Logging {
def main(args: Array[String]): Unit = {
val opts = new DelegationTokenCommandOptions(args)
CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to create, renew, expire, or describe delegation tokens.")
CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to create, renew, expire, or describe delegation tokens.")
// should have exactly one action
val actions = Seq(opts.createOpt, opts.renewOpt, opts.expiryOpt, opts.describeOpt).count(opts.options.has _)
if(actions != 1)
CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --create, --renew, --expire or --describe")
CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --create, --renew, --expire or --describe")
opts.checkArgs()
@ -207,17 +206,19 @@ object DelegationTokenCommand extends Logging {
if (options.has(createOpt))
CommandLineUtils.checkRequiredArgs(parser, options, maxLifeTimeOpt)
if (options.has(renewOpt))
if (options.has(renewOpt)) {
CommandLineUtils.checkRequiredArgs(parser, options, hmacOpt, renewTimePeriodOpt)
}
if (options.has(expiryOpt))
if (options.has(expiryOpt)) {
CommandLineUtils.checkRequiredArgs(parser, options, hmacOpt, expiryTimePeriodOpt)
}
// check invalid args
CommandLineUtils.checkInvalidArgs(parser, options, createOpt, Set(hmacOpt, renewTimePeriodOpt, expiryTimePeriodOpt))
CommandLineUtils.checkInvalidArgs(parser, options, renewOpt, Set(renewPrincipalsOpt, maxLifeTimeOpt, expiryTimePeriodOpt, ownerPrincipalsOpt))
CommandLineUtils.checkInvalidArgs(parser, options, expiryOpt, Set(renewOpt, maxLifeTimeOpt, renewTimePeriodOpt, ownerPrincipalsOpt))
CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, Set(renewTimePeriodOpt, maxLifeTimeOpt, hmacOpt, renewTimePeriodOpt, expiryTimePeriodOpt))
CommandLineUtils.checkInvalidArgs(parser, options, createOpt, hmacOpt, renewTimePeriodOpt, expiryTimePeriodOpt)
CommandLineUtils.checkInvalidArgs(parser, options, renewOpt, renewPrincipalsOpt, maxLifeTimeOpt, expiryTimePeriodOpt, ownerPrincipalsOpt)
CommandLineUtils.checkInvalidArgs(parser, options, expiryOpt, renewOpt, maxLifeTimeOpt, renewTimePeriodOpt, ownerPrincipalsOpt)
CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, renewTimePeriodOpt, maxLifeTimeOpt, hmacOpt, renewTimePeriodOpt, expiryTimePeriodOpt)
}
}
}

View File

@ -19,14 +19,14 @@ package kafka.admin
import java.io.PrintStream
import java.util.Properties
import kafka.common.AdminCommandFailedException
import kafka.utils.json.JsonValue
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, CoreUtils, Json}
import kafka.utils.{CoreUtils, Json}
import org.apache.kafka.clients.admin.{Admin, RecordsToDelete}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
import scala.collection.Seq
@ -130,7 +130,7 @@ object DeleteRecordsCommand {
options = parser.parse(args : _*)
CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to delete records of the given partitions down to the specified offset.")
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to delete records of the given partitions down to the specified offset.")
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, offsetJsonFileOpt)
}

View File

@ -20,8 +20,6 @@ import java.util.Properties
import java.util.concurrent.ExecutionException
import joptsimple.util.EnumConverter
import kafka.common.AdminCommandFailedException
import kafka.utils.CommandDefaultOptions
import kafka.utils.CommandLineUtils
import kafka.utils.CoreUtils
import kafka.utils.Implicits._
import kafka.utils.Json
@ -33,6 +31,8 @@ import org.apache.kafka.common.errors.ClusterAuthorizationException
import org.apache.kafka.common.errors.ElectionNotNeededException
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
import scala.collection.mutable
import scala.concurrent.duration._
@ -44,7 +44,7 @@ object LeaderElectionCommand extends Logging {
def run(args: Array[String], timeout: Duration): Unit = {
val commandOptions = new LeaderElectionCommandOptions(args)
CommandLineUtils.printHelpAndExitIfNeeded(
CommandLineUtils.maybePrintHelpOrVersion(
commandOptions,
"This tool attempts to elect a new leader for a set of topic partitions. The type of elections supported are preferred replicas and unclean replicas."
)

View File

@ -16,13 +16,12 @@
*/
package kafka.admin
import java.io.PrintStream
import java.util.Properties
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Json}
import kafka.utils.Json
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, LogDirDescription}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
import scala.collection.Map
@ -126,7 +125,7 @@ object LogDirsCommand {
options = parser.parse(args : _*)
CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to query log directory usage on the specified brokers.")
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to query log directory usage on the specified brokers.")
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, describeOpt)
}

View File

@ -21,7 +21,7 @@ import java.util.Optional
import java.util.concurrent.ExecutionException
import kafka.common.AdminCommandFailedException
import kafka.server.DynamicConfig
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, CoreUtils, Exit, Json, Logging}
import kafka.utils.{CoreUtils, Exit, Json, Logging}
import kafka.utils.Implicits._
import kafka.utils.json.JsonValue
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
@ -31,6 +31,7 @@ import org.apache.kafka.common.errors.{ReplicaNotAvailableException, UnknownTopi
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, KafkaFuture, TopicPartition, TopicPartitionReplica}
import org.apache.kafka.server.log.internals.LogConfig
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
import scala.collection.{Map, Seq, mutable}
@ -1330,20 +1331,20 @@ object ReassignPartitionsCommand extends Logging {
def validateAndParseArgs(args: Array[String]): ReassignPartitionsCommandOptions = {
val opts = new ReassignPartitionsCommandOptions(args)
CommandLineUtils.printHelpAndExitIfNeeded(opts, helpText)
CommandLineUtils.maybePrintHelpOrVersion(opts, helpText)
// Determine which action we should perform.
val validActions = Seq(opts.generateOpt, opts.executeOpt, opts.verifyOpt,
opts.cancelOpt, opts.listOpt)
val allActions = validActions.filter(opts.options.has _)
if (allActions.size != 1) {
CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: %s".format(
CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: %s".format(
validActions.map("--" + _.options().get(0)).mkString(", ")))
}
val action = allActions.head
if (!opts.options.has(opts.bootstrapServerOpt))
CommandLineUtils.printUsageAndDie(opts.parser, "Please specify --bootstrap-server")
CommandLineUtils.printUsageAndExit(opts.parser, "Please specify --bootstrap-server")
// Make sure that we have all the required arguments for our action.
val requiredArgs = Map(
@ -1400,7 +1401,7 @@ object ReassignPartitionsCommand extends Logging {
if (!opt.equals(action) &&
!requiredArgs(action).contains(opt) &&
!permittedArgs(action).contains(opt)) {
CommandLineUtils.printUsageAndDie(opts.parser,
CommandLineUtils.printUsageAndExit(opts.parser,
"""Option "%s" can't be used with action "%s"""".format(opt, action))
}
})

View File

@ -18,7 +18,7 @@
package kafka.admin
import java.util
import java.util.{Collections, Properties}
import java.util.{Collections, Optional, Properties}
import joptsimple._
import kafka.common.AdminCommandFailedException
import kafka.utils._
@ -34,6 +34,7 @@ import org.apache.kafka.common.errors.{ClusterAuthorizationException, TopicExist
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.log.internals.LogConfig
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
@ -605,53 +606,54 @@ object TopicCommand extends Logging {
def checkArgs(): Unit = {
if (args.isEmpty)
CommandLineUtils.printUsageAndDie(parser, "Create, delete, describe, or change a topic.")
CommandLineUtils.printUsageAndExit(parser, "Create, delete, describe, or change a topic.")
CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to create, delete, describe, or change a topic.")
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to create, delete, describe, or change a topic.")
// should have exactly one action
val actions = Seq(createOpt, listOpt, alterOpt, describeOpt, deleteOpt).count(options.has)
if (actions != 1)
CommandLineUtils.printUsageAndDie(parser, "Command must include exactly one action: --list, --describe, --create, --alter or --delete")
CommandLineUtils.printUsageAndExit(parser, "Command must include exactly one action: --list, --describe, --create, --alter or --delete")
// check required args
if (!has(bootstrapServerOpt))
throw new IllegalArgumentException("--bootstrap-server must be specified")
if (has(describeOpt) && has(ifExistsOpt)) {
if (!has(topicOpt) && !has(topicIdOpt))
CommandLineUtils.printUsageAndDie(parser, "--topic or --topic-id is required to describe a topic")
CommandLineUtils.printUsageAndExit(parser, "--topic or --topic-id is required to describe a topic")
if (has(topicOpt) && has(topicIdOpt))
println("Only topic id will be used when both --topic and --topic-id are specified and topicId is not Uuid.ZERO_UUID")
}
if (!has(listOpt) && !has(describeOpt))
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
if (has(alterOpt)) {
CommandLineUtils.checkInvalidArgsSet(parser, options, Set(bootstrapServerOpt, configOpt), Set(alterOpt),
Some(kafkaConfigsCanAlterTopicConfigsViaBootstrapServer))
val usedOptions = immutable.Set[OptionSpec[_]](bootstrapServerOpt, configOpt)
val invalidOptions = immutable.Set[OptionSpec[_]](alterOpt)
CommandLineUtils.checkInvalidArgsSet(parser, options, usedOptions.asJava, invalidOptions.asJava, Optional.of(kafkaConfigsCanAlterTopicConfigsViaBootstrapServer))
CommandLineUtils.checkRequiredArgs(parser, options, partitionsOpt)
}
// check invalid args
CommandLineUtils.checkInvalidArgs(parser, options, configOpt, allTopicLevelOpts -- Set(alterOpt, createOpt))
CommandLineUtils.checkInvalidArgs(parser, options, deleteConfigOpt, allTopicLevelOpts -- Set(alterOpt) ++ Set(bootstrapServerOpt))
CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt, allTopicLevelOpts -- Set(alterOpt, createOpt))
CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt, allTopicLevelOpts -- Set(createOpt))
CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, allTopicLevelOpts -- Set(createOpt,alterOpt))
if(options.has(createOpt))
CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, Set(partitionsOpt, replicationFactorOpt))
CommandLineUtils.checkInvalidArgs(parser, options, configOpt, (allTopicLevelOpts -- Set(alterOpt, createOpt)).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, deleteConfigOpt, (allTopicLevelOpts -- Set(alterOpt) ++ Set(bootstrapServerOpt)).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt, (allTopicLevelOpts -- Set(alterOpt, createOpt)).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt, (allTopicLevelOpts -- Set(createOpt)).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, (allTopicLevelOpts -- Set(createOpt,alterOpt)).asJava)
if (options.has(createOpt))
CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, partitionsOpt, replicationFactorOpt)
CommandLineUtils.checkInvalidArgs(parser, options, reportUnderReplicatedPartitionsOpt,
allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnderReplicatedPartitionsOpt + topicsWithOverridesOpt)
(allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnderReplicatedPartitionsOpt + topicsWithOverridesOpt).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, reportUnderMinIsrPartitionsOpt,
allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnderMinIsrPartitionsOpt + topicsWithOverridesOpt)
(allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnderMinIsrPartitionsOpt + topicsWithOverridesOpt).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, reportAtMinIsrPartitionsOpt,
allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportAtMinIsrPartitionsOpt + topicsWithOverridesOpt)
(allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportAtMinIsrPartitionsOpt + topicsWithOverridesOpt).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, reportUnavailablePartitionsOpt,
allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnavailablePartitionsOpt + topicsWithOverridesOpt)
(allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts - reportUnavailablePartitionsOpt + topicsWithOverridesOpt).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, topicsWithOverridesOpt,
allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts)
CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, allTopicLevelOpts -- Set(alterOpt, deleteOpt, describeOpt))
CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, allTopicLevelOpts -- Set(createOpt))
CommandLineUtils.checkInvalidArgs(parser, options, excludeInternalTopicOpt, allTopicLevelOpts -- Set(listOpt, describeOpt))
(allTopicLevelOpts -- Set(describeOpt) ++ allReplicationReportOpts).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, (allTopicLevelOpts -- Set(alterOpt, deleteOpt, describeOpt)).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, (allTopicLevelOpts -- Set(createOpt)).asJava)
CommandLineUtils.checkInvalidArgs(parser, options, excludeInternalTopicOpt, (allTopicLevelOpts -- Set(listOpt, describeOpt)).asJava)
}
}
}

View File

@ -19,11 +19,12 @@ package kafka.admin
import joptsimple.{ArgumentAcceptingOptionSpec, OptionSet}
import kafka.server.KafkaConfig
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, Logging}
import kafka.utils.{Exit, Logging, ToolsUtils}
import kafka.utils.Implicits._
import kafka.zk.{ControllerZNode, KafkaZkClient, ZkData, ZkSecurityMigratorUtils}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback}
import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.Code
@ -70,7 +71,7 @@ object ZkSecurityMigrator extends Logging {
val jaasFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
val opts = new ZkSecurityMigratorOptions(args)
CommandLineUtils.printHelpAndExitIfNeeded(opts, usageMessage)
CommandLineUtils.maybePrintHelpOrVersion(opts, usageMessage)
// Must have either SASL or TLS mutual authentication enabled to use this tool.
// Instantiate the client config we will use so that we take into account config provided via the CLI option
@ -99,7 +100,7 @@ object ZkSecurityMigrator extends Logging {
info("zookeeper.acl option is unsecure")
false
case _ =>
CommandLineUtils.printUsageAndDie(opts.parser, usageMessage)
ToolsUtils.printUsageAndExit(opts.parser, usageMessage)
}
val zkUrl = opts.options.valueOf(opts.zkUrlOpt)
val zkSessionTimeout = opts.options.valueOf(opts.zkSessionTimeoutOpt).intValue

View File

@ -35,6 +35,7 @@ import org.apache.kafka.common.requests.ListOffsetsRequest
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, Deserializer}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
@ -293,7 +294,7 @@ object ConsoleConsumer extends Logging {
options = tryParse(parser, args)
CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to read data from Kafka topics and outputs it to standard output.")
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to read data from Kafka topics and outputs it to standard output.")
var groupIdPassed = true
val enableSystestEventsLogging = options.has(enableSystestEventsLoggingOpt)
@ -302,7 +303,7 @@ object ConsoleConsumer extends Logging {
var topicArg: String = _
var includedTopicsArg: String = _
var filterSpec: TopicFilter = _
val extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt).asScala)
val extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt))
val consumerProps = if (options.has(consumerConfigOpt))
Utils.loadProps(options.valueOf(consumerConfigOpt))
else
@ -315,7 +316,7 @@ object ConsoleConsumer extends Logging {
Utils.loadProps(options.valueOf(messageFormatterConfigOpt))
else
new Properties()
formatterArgs ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt).asScala)
formatterArgs ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt))
val maxMessages = if (options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1
val timeoutMs = if (options.has(timeoutMsOpt)) options.valueOf(timeoutMsOpt).intValue else -1
val bootstrapServer = options.valueOf(bootstrapServerOpt)
@ -341,19 +342,19 @@ object ConsoleConsumer extends Logging {
val topicOrFilterArgs = List(topicArg, includedTopicsArg).filterNot(_ == null)
// user need to specify value for either --topic or one of the include filters options (--include or --whitelist)
if (topicOrFilterArgs.size != 1)
CommandLineUtils.printUsageAndDie(parser, s"Exactly one of --include/--topic is required. " +
CommandLineUtils.printUsageAndExit(parser, s"Exactly one of --include/--topic is required. " +
s"${if (options.has(whitelistOpt)) "--whitelist is DEPRECATED use --include instead; ignored if --include specified."}")
if (partitionArg.isDefined) {
if (!options.has(topicOpt))
CommandLineUtils.printUsageAndDie(parser, "The topic is required when partition is specified.")
CommandLineUtils.printUsageAndExit(parser, "The topic is required when partition is specified.")
if (fromBeginning && options.has(offsetOpt))
CommandLineUtils.printUsageAndDie(parser, "Options from-beginning and offset cannot be specified together.")
CommandLineUtils.printUsageAndExit(parser, "Options from-beginning and offset cannot be specified together.")
} else if (options.has(offsetOpt))
CommandLineUtils.printUsageAndDie(parser, "The partition is required when offset is specified.")
CommandLineUtils.printUsageAndExit(parser, "The partition is required when offset is specified.")
def invalidOffset(offset: String): Nothing =
CommandLineUtils.printUsageAndDie(parser, s"The provided offset value '$offset' is incorrect. Valid values are " +
ToolsUtils.printUsageAndExit(parser, s"The provided offset value '$offset' is incorrect. Valid values are " +
"'earliest', 'latest', or a non-negative long.")
val offsetArg =
@ -385,7 +386,7 @@ object ConsoleConsumer extends Logging {
).flatten
if (groupIdsProvided.size > 1) {
CommandLineUtils.printUsageAndDie(parser, "The group ids provided in different places (directly using '--group', "
CommandLineUtils.printUsageAndExit(parser, "The group ids provided in different places (directly using '--group', "
+ "via '--consumer-property', or via '--consumer.config') do not match. "
+ s"Detected group ids: ${groupIdsProvided.mkString("'", "', '", "'")}")
}
@ -403,14 +404,14 @@ object ConsoleConsumer extends Logging {
}
if (groupIdPassed && partitionArg.isDefined)
CommandLineUtils.printUsageAndDie(parser, "Options group and partition cannot be specified together.")
CommandLineUtils.printUsageAndExit(parser, "Options group and partition cannot be specified together.")
def tryParse(parser: OptionParser, args: Array[String]): OptionSet = {
try
parser.parse(args: _*)
catch {
case e: OptionException =>
CommandLineUtils.printUsageAndDie(parser, e.getMessage)
ToolsUtils.printUsageAndExit(parser, e.getMessage)
}
}
}

View File

@ -24,14 +24,13 @@ import java.util.regex.Pattern
import joptsimple.{OptionException, OptionParser, OptionSet}
import kafka.common.MessageReader
import kafka.utils.Implicits._
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, ToolsUtils}
import kafka.utils.{Exit, ToolsUtils}
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.record.CompressionType
import org.apache.kafka.common.utils.Utils
import scala.jdk.CollectionConverters._
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
object ConsoleProducer {
@ -260,7 +259,7 @@ object ConsoleProducer {
options = tryParse(parser, args)
CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps to read data from standard input and publish it to Kafka.")
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to read data from standard input and publish it to Kafka.")
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt)
@ -280,15 +279,15 @@ object ConsoleProducer {
else compressionCodecOptionValue
else CompressionType.NONE.name
val readerClass = options.valueOf(messageReaderOpt)
val cmdLineProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt).asScala)
val extraProducerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(producerPropertyOpt).asScala)
val cmdLineProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt))
val extraProducerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(producerPropertyOpt))
def tryParse(parser: OptionParser, args: Array[String]): OptionSet = {
try
parser.parse(args: _*)
catch {
case e: OptionException =>
CommandLineUtils.printUsageAndDie(parser, e.getMessage)
ToolsUtils.printUsageAndExit(parser, e.getMessage)
}
}
}

View File

@ -22,14 +22,14 @@ import java.time.Duration
import java.util
import java.util.concurrent.atomic.AtomicLong
import java.util.{Properties, Random}
import com.typesafe.scalalogging.LazyLogging
import joptsimple.OptionException
import kafka.utils.{CommandLineUtils, ToolsUtils}
import kafka.utils.ToolsUtils
import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer}
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
import org.apache.kafka.server.util.CommandLineUtils
import scala.jdk.CollectionConverters._
import scala.collection.mutable
@ -260,13 +260,13 @@ object ConsumerPerformance extends LazyLogging {
options = parser.parse(args: _*)
catch {
case e: OptionException =>
CommandLineUtils.printUsageAndDie(parser, e.getMessage)
CommandLineUtils.printUsageAndExit(parser, e.getMessage)
}
if(options.has(numThreadsOpt) || options.has(numFetchersOpt))
println("WARNING: option [threads] and [num-fetch-threads] have been deprecated and will be ignored by the test")
CommandLineUtils.printHelpAndExitIfNeeded(this, "This tool helps in performance test for the full zookeeper consumer")
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps in performance test for the full zookeeper consumer")
CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt)

View File

@ -34,6 +34,7 @@ import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.metadata.bootstrap.BootstrapDirectory
import org.apache.kafka.server.log.internals.{CorruptSnapshotException, OffsetIndex, TimeIndex, TransactionIndex}
import org.apache.kafka.snapshot.Snapshots
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
import scala.collection.mutable
@ -46,7 +47,7 @@ object DumpLogSegments {
def main(args: Array[String]): Unit = {
val opts = new DumpLogSegmentsOptions(args)
CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt log segment.")
CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt log segment.")
opts.checkArgs()
val misMatchesForIndexFilesMap = mutable.Map[String, List[(Long, Long)]]()

View File

@ -19,11 +19,12 @@
package kafka.tools
import joptsimple._
import kafka.utils.{CommandLineUtils, Exit, IncludeList, ToolsUtils}
import kafka.utils.{Exit, IncludeList, ToolsUtils}
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, ListTopicsOptions, OffsetSpec}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.util.CommandLineUtils
import java.util.Properties
import java.util.concurrent.ExecutionException
@ -82,7 +83,7 @@ object GetOffsetShell {
val excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", s"By default, internal topics are included. If specified, internal topics are excluded.")
if (args.isEmpty)
CommandLineUtils.printUsageAndDie(parser, "An interactive shell for getting topic-partition offsets.")
CommandLineUtils.printUsageAndExit(parser, "An interactive shell for getting topic-partition offsets.")
val options = parser.parse(args : _*)

View File

@ -23,13 +23,13 @@ import java.text.SimpleDateFormat
import javax.management._
import javax.management.remote._
import javax.rmi.ssl.SslRMIClientSocketFactory
import joptsimple.OptionParser
import scala.jdk.CollectionConverters._
import scala.collection.mutable
import scala.math._
import kafka.utils.{CommandLineUtils, Exit, Logging}
import kafka.utils.{Exit, Logging}
import org.apache.kafka.server.util.CommandLineUtils
/**
@ -99,7 +99,7 @@ object JmxTool extends Logging {
if(args.isEmpty)
CommandLineUtils.printUsageAndDie(parser, "Dump JMX values to standard output.")
CommandLineUtils.printUsageAndExit(parser, "Dump JMX values to standard output.")
val options = parser.parse(args : _*)
@ -207,7 +207,7 @@ object JmxTool extends Logging {
}
if(numExpectedAttributes.isEmpty) {
CommandLineUtils.printUsageAndDie(parser, s"No matched attributes for the queried objects $queries.")
CommandLineUtils.printUsageAndExit(parser, s"No matched attributes for the queried objects $queries.")
}
// print csv header

View File

@ -23,7 +23,6 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.CountDownLatch
import java.util.regex.Pattern
import java.util.{Collections, Properties}
import kafka.consumer.BaseConsumerRecord
import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
@ -35,6 +34,7 @@ import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
import scala.collection.mutable.HashMap
@ -86,7 +86,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
info("Starting mirror maker")
try {
val opts = new MirrorMakerOptions(args)
CommandLineUtils.printHelpAndExitIfNeeded(opts, "This tool helps to continuously copy data between two Kafka clusters.")
CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to continuously copy data between two Kafka clusters.")
opts.checkArgs()
} catch {
case ct: ControlThrowable => throw ct

View File

@ -17,8 +17,7 @@
package kafka.tools
import kafka.utils.CommandDefaultOptions
import org.apache.kafka.server.util.CommandDefaultOptions
class PerfConfig(args: Array[String]) extends CommandDefaultOptions(args) {
val numMessagesOpt = parser.accepts("messages", "REQUIRED: The number of messages to send or consume")

View File

@ -29,6 +29,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.AbstractRequest.Builder
import org.apache.kafka.common.requests.{AbstractRequest, FetchRequest, FetchResponse, ListOffsetsRequest}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.server.util.CommandLineUtils
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{Node, TopicPartition, Uuid}
@ -114,11 +115,11 @@ object ReplicaVerificationTool extends Logging {
val options = parser.parse(args: _*)
if (args.isEmpty || options.has(helpOpt)) {
CommandLineUtils.printUsageAndDie(parser, "Validate that all replicas for a set of topics have the same data.")
CommandLineUtils.printUsageAndExit(parser, "Validate that all replicas for a set of topics have the same data.")
}
if (options.has(versionOpt)) {
CommandLineUtils.printVersionAndDie()
CommandLineUtils.printVersionAndExit()
}
CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt)

View File

@ -23,12 +23,12 @@ import scala.util.matching.Regex
import collection.mutable
import java.util.Date
import java.text.SimpleDateFormat
import kafka.utils.{CoreUtils, Exit, Logging}
import kafka.utils.{CommandLineUtils, CoreUtils, Exit, Logging}
import java.io.{BufferedOutputStream, OutputStream}
import java.nio.charset.StandardCharsets
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.server.util.CommandLineUtils
/**
* A utility that merges the state change logs (possibly obtained from different brokers and over multiple days).
@ -89,7 +89,7 @@ object StateChangeLogMerger extends Logging {
.defaultsTo("9999-12-31 23:59:59,999")
if(args.isEmpty)
CommandLineUtils.printUsageAndDie(parser, "A tool for merging the log files from several brokers to reconnstruct a unified history of what happened.")
CommandLineUtils.printUsageAndExit(parser, "A tool for merging the log files from several brokers to reconnstruct a unified history of what happened.")
val options = parser.parse(args : _*)

View File

@ -21,7 +21,6 @@ import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import kafka.utils.CommandLineUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions;
@ -39,7 +38,7 @@ import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import scala.collection.JavaConverters;
import org.apache.kafka.server.util.CommandLineUtils;
import java.io.IOException;
import java.text.ParseException;
@ -282,13 +281,13 @@ public class StreamsResetter {
try {
options = optionParser.parse(args);
if (args.length == 0 || options.has(helpOption)) {
CommandLineUtils.printUsageAndDie(optionParser, USAGE);
CommandLineUtils.printUsageAndExit(optionParser, USAGE);
}
if (options.has(versionOption)) {
CommandLineUtils.printVersionAndDie();
CommandLineUtils.printVersionAndExit();
}
} catch (final OptionException e) {
CommandLineUtils.printUsageAndDie(optionParser, e.getMessage());
CommandLineUtils.printUsageAndExit(optionParser, e.getMessage());
}
final Set<OptionSpec<?>> allScenarioOptions = new HashSet<>();
@ -319,7 +318,7 @@ public class StreamsResetter {
optionParser,
options,
option,
JavaConverters.asScalaSetConverter(invalidOptions).asScala());
invalidOptions);
}
private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map<Object, Object> consumerConfig,

View File

@ -24,7 +24,7 @@ import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.{KafkaRaftManager, RaftManager}
import kafka.security.CredentialProvider
import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, MetaProperties, SimpleApiVersionManager}
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, CoreUtils, Exit, Logging, ShutdownableThread}
import kafka.utils.{CoreUtils, Exit, Logging, ShutdownableThread}
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.metrics.Metrics
@ -39,6 +39,7 @@ import org.apache.kafka.raft.errors.NotLeaderException
import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient, RaftConfig}
import org.apache.kafka.server.common.serialization.RecordSerde
import org.apache.kafka.server.fault.ProcessExitingFaultHandler
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.kafka.snapshot.SnapshotReader
import scala.jdk.CollectionConverters._
@ -435,7 +436,7 @@ object TestRaftServer extends Logging {
def main(args: Array[String]): Unit = {
val opts = new TestRaftServerOptions(args)
try {
CommandLineUtils.printHelpAndExitIfNeeded(opts,
CommandLineUtils.maybePrintHelpOrVersion(opts,
"Standalone raft server for performance testing")
val configFile = opts.options.valueOf(opts.configOpt)
@ -460,7 +461,7 @@ object TestRaftServer extends Logging {
Exit.exit(0)
} catch {
case e: OptionException =>
CommandLineUtils.printUsageAndDie(opts.parser, e.getMessage)
CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage)
case e: Throwable =>
fatal("Exiting raft server due to fatal exception", e)
Exit.exit(1)

View File

@ -1,27 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import joptsimple.{OptionParser, OptionSet}
abstract class CommandDefaultOptions(val args: Array[String], allowCommandOptionAbbreviation: Boolean = false) {
val parser = new OptionParser(allowCommandOptionAbbreviation)
val helpOpt = parser.accepts("help", "Print usage information.").forHelp()
val versionOpt = parser.accepts("version", "Display Kafka version.").forHelp()
var options: OptionSet = _
}

View File

@ -1,145 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import java.util.Properties
import joptsimple.{OptionParser, OptionSet, OptionSpec}
import scala.collection.Set
/**
* Helper functions for dealing with command line utilities
*/
object CommandLineUtils extends Logging {
/**
* Check if there are no options or `--help` option from command line
*
* @param commandOpts Acceptable options for a command
* @return true on matching the help check condition
*/
def isPrintHelpNeeded(commandOpts: CommandDefaultOptions): Boolean = {
commandOpts.args.isEmpty || commandOpts.options.has(commandOpts.helpOpt)
}
def isPrintVersionNeeded(commandOpts: CommandDefaultOptions): Boolean = {
commandOpts.options.has(commandOpts.versionOpt)
}
/**
* Check and print help message if there is no options or `--help` option
* from command line, if `--version` is specified on the command line
* print version information and exit.
* NOTE: The function name is not strictly speaking correct anymore
* as it also checks whether the version needs to be printed, but
* refactoring this would have meant changing all command line tools
* and unnecessarily increased the blast radius of this change.
*
* @param commandOpts Acceptable options for a command
* @param message Message to display on successful check
*/
def printHelpAndExitIfNeeded(commandOpts: CommandDefaultOptions, message: String): Unit = {
if (isPrintHelpNeeded(commandOpts))
printUsageAndDie(commandOpts.parser, message)
if (isPrintVersionNeeded(commandOpts))
printVersionAndDie()
}
/**
* Check that all the listed options are present
*/
def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*): Unit = {
for (arg <- required) {
if (!options.has(arg))
printUsageAndDie(parser, "Missing required argument \"" + arg + "\"")
}
}
/**
* Check that none of the listed options are present
*/
def checkInvalidArgs(parser: OptionParser, options: OptionSet, usedOption: OptionSpec[_], invalidOptions: Set[OptionSpec[_]]): Unit = {
if (options.has(usedOption)) {
for (arg <- invalidOptions) {
if (options.has(arg))
printUsageAndDie(parser, "Option \"" + usedOption + "\" can't be used with option \"" + arg + "\"")
}
}
}
/**
* Check that none of the listed options are present with the combination of used options
*/
def checkInvalidArgsSet(parser: OptionParser, options: OptionSet, usedOptions: Set[OptionSpec[_]], invalidOptions: Set[OptionSpec[_]],
trailingAdditionalMessage: Option[String] = None): Unit = {
if (usedOptions.count(options.has) == usedOptions.size) {
for (arg <- invalidOptions) {
if (options.has(arg))
printUsageAndDie(parser, "Option combination \"" + usedOptions.mkString(",") + "\" can't be used with option \"" + arg + "\"" + trailingAdditionalMessage.getOrElse(""))
}
}
}
/**
* Print usage and exit
*/
def printUsageAndDie(parser: OptionParser, message: String): Nothing = {
System.err.println(message)
parser.printHelpOn(System.err)
Exit.exit(1, Some(message))
}
def printVersionAndDie(): Nothing = {
System.out.println(VersionInfo.getVersionString)
Exit.exit(0)
}
/**
* Parse key-value pairs in the form key=value
* value may contain equals sign
*/
def parseKeyValueArgs(args: Iterable[String], acceptMissingValue: Boolean = true): Properties = {
val splits = args.map(_.split("=", 2)).filterNot(_.isEmpty)
val props = new Properties
for (a <- splits) {
if (a.length == 1 || (a.length == 2 && a(1).isEmpty)) {
if (acceptMissingValue) props.put(a(0), "")
else throw new IllegalArgumentException(s"Missing value for key ${a(0)}")
}
else props.put(a(0), a(1))
}
props
}
/**
* Merge the options into {@code props} for key {@code key}, with the following precedence, from high to low:
* 1) if {@code spec} is specified on {@code options} explicitly, use the value;
* 2) if {@code props} already has {@code key} set, keep it;
* 3) otherwise, use the default value of {@code spec}.
* A {@code null} value means to remove {@code key} from the {@code props}.
*/
def maybeMergeOptions[V](props: Properties, key: String, options: OptionSet, spec: OptionSpec[V]): Unit = {
if (options.has(spec) || !props.containsKey(key)) {
val value = options.valueOf(spec)
if (value == null)
props.remove(key)
else
props.put(key, value.toString)
}
}
}

View File

@ -18,6 +18,7 @@ package kafka.utils
import joptsimple.OptionParser
import org.apache.kafka.common.{Metric, MetricName}
import org.apache.kafka.server.util.CommandLineUtils
import scala.collection.mutable
@ -32,8 +33,8 @@ object ToolsUtils {
org.apache.kafka.common.utils.Utils.getPort(hostPortData) != null
}
val isValid = !validHostPort.isEmpty && validHostPort.size == hostPorts.length
if(!isValid)
CommandLineUtils.printUsageAndDie(parser, "Please provide valid host:port like host1:9091,host2:9092\n ")
if (!isValid)
CommandLineUtils.printUsageAndExit(parser, "Please provide valid host:port like host1:9091,host2:9092\n ")
}
/**
@ -64,4 +65,18 @@ object ToolsUtils {
println(s"%-${maxLengthOfDisplayName}s : $specifier".format(metricName, value))
}
}
/**
* This is a simple wrapper around `CommandLineUtils.printUsageAndExit`.
* It is needed for tools migration (KAFKA-14525), as there is no Java equivalent for return type `Nothing`.
* Can be removed once [[kafka.admin.ConsumerGroupCommand]], [[kafka.tools.ConsoleConsumer]]
* and [[kafka.tools.ConsoleProducer]] are migrated.
*
* @param parser Command line options parser.
* @param message Error message.
*/
def printUsageAndExit(parser: OptionParser, message: String): Nothing = {
CommandLineUtils.printUsageAndExit(parser, message)
throw new AssertionError("printUsageAndExit should not return, but it did.")
}
}

View File

@ -33,6 +33,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produce
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringDeserializer}
import org.apache.kafka.common.utils.{AbstractIterator, Utils}
import org.apache.kafka.server.util.CommandLineUtils
import scala.jdk.CollectionConverters._
@ -98,7 +99,7 @@ object LogCompactionTester {
val options = parser.parse(args: _*)
if (args.isEmpty)
CommandLineUtils.printUsageAndDie(parser, "A tool to test log compaction. Valid options are: ")
CommandLineUtils.printUsageAndExit(parser, "A tool to test log compaction. Valid options are: ")
CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, numMessagesOpt)

View File

@ -31,6 +31,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.log.internals.{LogConfig, LogDirFailureChannel}
import org.apache.kafka.server.util.{KafkaScheduler, Scheduler}
import org.apache.kafka.server.util.CommandLineUtils
import scala.math._

View File

@ -26,6 +26,7 @@ import joptsimple._
import kafka.server.{DelayedOperation, DelayedOperationPurgatory}
import kafka.utils._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.util.CommandLineUtils
import scala.math._
import scala.jdk.CollectionConverters._

View File

@ -216,7 +216,7 @@ class ReassignPartitionsCommandArgsTest {
@Test
def shouldPrintHelpTextIfHelpArg(): Unit = {
val args: Array[String]= Array("--help")
// note, this is not actually a failed case, it's just we share the same `printUsageAndDie` method when wrong arg received
// note, this is not actually a failed case, it's just we share the same `printUsageAndExit` method when wrong arg received
shouldFailWith(ReassignPartitionsCommand.helpText, args)
}

View File

@ -1,223 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import java.util.Properties
import joptsimple.{OptionParser, OptionSpec}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
class CommandLineUtilsTest {
@Test
def testParseEmptyArg(): Unit = {
val argArray = Array("my.empty.property=")
assertThrows(classOf[java.lang.IllegalArgumentException], () => CommandLineUtils.parseKeyValueArgs(argArray, acceptMissingValue = false))
}
@Test
def testParseEmptyArgWithNoDelimiter(): Unit = {
val argArray = Array("my.empty.property")
assertThrows(classOf[java.lang.IllegalArgumentException], () => CommandLineUtils.parseKeyValueArgs(argArray, acceptMissingValue = false))
}
@Test
def testParseEmptyArgAsValid(): Unit = {
val argArray = Array("my.empty.property=", "my.empty.property1")
val props = CommandLineUtils.parseKeyValueArgs(argArray)
assertEquals(props.getProperty("my.empty.property"), "", "Value of a key with missing value should be an empty string")
assertEquals(props.getProperty("my.empty.property1"), "", "Value of a key with missing value with no delimiter should be an empty string")
}
@Test
def testParseSingleArg(): Unit = {
val argArray = Array("my.property=value")
val props = CommandLineUtils.parseKeyValueArgs(argArray)
assertEquals(props.getProperty("my.property"), "value", "Value of a single property should be 'value' ")
}
@Test
def testParseArgs(): Unit = {
val argArray = Array("first.property=first","second.property=second")
val props = CommandLineUtils.parseKeyValueArgs(argArray)
assertEquals(props.getProperty("first.property"), "first", "Value of first property should be 'first'")
assertEquals(props.getProperty("second.property"), "second", "Value of second property should be 'second'")
}
@Test
def testParseArgsWithMultipleDelimiters(): Unit = {
val argArray = Array("first.property==first", "second.property=second=", "third.property=thi=rd")
val props = CommandLineUtils.parseKeyValueArgs(argArray)
assertEquals(props.getProperty("first.property"), "=first", "Value of first property should be '=first'")
assertEquals(props.getProperty("second.property"), "second=", "Value of second property should be 'second='")
assertEquals(props.getProperty("third.property"), "thi=rd", "Value of second property should be 'thi=rd'")
}
val props = new Properties()
val parser = new OptionParser(false)
var stringOpt : OptionSpec[String] = _
var intOpt : OptionSpec[java.lang.Integer] = _
var stringOptOptionalArg : OptionSpec[String] = _
var intOptOptionalArg : OptionSpec[java.lang.Integer] = _
var stringOptOptionalArgNoDefault : OptionSpec[String] = _
var intOptOptionalArgNoDefault : OptionSpec[java.lang.Integer] = _
def setUpOptions(): Unit = {
stringOpt = parser.accepts("str")
.withRequiredArg
.ofType(classOf[String])
.defaultsTo("default-string")
intOpt = parser.accepts("int")
.withRequiredArg()
.ofType(classOf[java.lang.Integer])
.defaultsTo(100)
stringOptOptionalArg = parser.accepts("str-opt")
.withOptionalArg
.ofType(classOf[String])
.defaultsTo("default-string-2")
intOptOptionalArg = parser.accepts("int-opt")
.withOptionalArg
.ofType(classOf[java.lang.Integer])
.defaultsTo(200)
stringOptOptionalArgNoDefault = parser.accepts("str-opt-nodef")
.withOptionalArg
.ofType(classOf[String])
intOptOptionalArgNoDefault = parser.accepts("int-opt-nodef")
.withOptionalArg
.ofType(classOf[java.lang.Integer])
}
@Test
def testMaybeMergeOptionsOverwriteExisting(): Unit = {
setUpOptions()
props.put("skey", "existing-string")
props.put("ikey", "300")
props.put("sokey", "existing-string-2")
props.put("iokey", "400")
props.put("sondkey", "existing-string-3")
props.put("iondkey", "500")
val options = parser.parse(
"--str", "some-string",
"--int", "600",
"--str-opt", "some-string-2",
"--int-opt", "700",
"--str-opt-nodef", "some-string-3",
"--int-opt-nodef", "800"
)
CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt)
CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt)
CommandLineUtils.maybeMergeOptions(props, "sokey", options, stringOptOptionalArg)
CommandLineUtils.maybeMergeOptions(props, "iokey", options, intOptOptionalArg)
CommandLineUtils.maybeMergeOptions(props, "sondkey", options, stringOptOptionalArgNoDefault)
CommandLineUtils.maybeMergeOptions(props, "iondkey", options, intOptOptionalArgNoDefault)
assertEquals("some-string", props.get("skey"))
assertEquals("600", props.get("ikey"))
assertEquals("some-string-2", props.get("sokey"))
assertEquals("700", props.get("iokey"))
assertEquals("some-string-3", props.get("sondkey"))
assertEquals("800", props.get("iondkey"))
}
@Test
def testMaybeMergeOptionsDefaultOverwriteExisting(): Unit = {
setUpOptions()
props.put("sokey", "existing-string")
props.put("iokey", "300")
props.put("sondkey", "existing-string-2")
props.put("iondkey", "400")
val options = parser.parse(
"--str-opt",
"--int-opt",
"--str-opt-nodef",
"--int-opt-nodef"
)
CommandLineUtils.maybeMergeOptions(props, "sokey", options, stringOptOptionalArg)
CommandLineUtils.maybeMergeOptions(props, "iokey", options, intOptOptionalArg)
CommandLineUtils.maybeMergeOptions(props, "sondkey", options, stringOptOptionalArgNoDefault)
CommandLineUtils.maybeMergeOptions(props, "iondkey", options, intOptOptionalArgNoDefault)
assertEquals("default-string-2", props.get("sokey"))
assertEquals("200", props.get("iokey"))
assertNull(props.get("sondkey"))
assertNull(props.get("iondkey"))
}
@Test
def testMaybeMergeOptionsDefaultValueIfNotExist(): Unit = {
setUpOptions()
val options = parser.parse()
CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt)
CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt)
CommandLineUtils.maybeMergeOptions(props, "sokey", options, stringOptOptionalArg)
CommandLineUtils.maybeMergeOptions(props, "iokey", options, intOptOptionalArg)
CommandLineUtils.maybeMergeOptions(props, "sondkey", options, stringOptOptionalArgNoDefault)
CommandLineUtils.maybeMergeOptions(props, "iondkey", options, intOptOptionalArgNoDefault)
assertEquals("default-string", props.get("skey"))
assertEquals("100", props.get("ikey"))
assertEquals("default-string-2", props.get("sokey"))
assertEquals("200", props.get("iokey"))
assertNull(props.get("sondkey"))
assertNull(props.get("iondkey"))
}
@Test
def testMaybeMergeOptionsNotOverwriteExisting(): Unit = {
setUpOptions()
props.put("skey", "existing-string")
props.put("ikey", "300")
props.put("sokey", "existing-string-2")
props.put("iokey", "400")
props.put("sondkey", "existing-string-3")
props.put("iondkey", "500")
val options = parser.parse()
CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt)
CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt)
CommandLineUtils.maybeMergeOptions(props, "sokey", options, stringOptOptionalArg)
CommandLineUtils.maybeMergeOptions(props, "iokey", options, intOptOptionalArg)
CommandLineUtils.maybeMergeOptions(props, "sondkey", options, stringOptOptionalArgNoDefault)
CommandLineUtils.maybeMergeOptions(props, "iondkey", options, intOptOptionalArgNoDefault)
assertEquals("existing-string", props.get("skey"))
assertEquals("300", props.get("ikey"))
assertEquals("existing-string-2", props.get("sokey"))
assertEquals("400", props.get("iokey"))
assertEquals("existing-string-3", props.get("sondkey"))
assertEquals("500", props.get("iondkey"))
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.util;
import joptsimple.AbstractOptionSpec;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
public abstract class CommandDefaultOptions {
public final String[] args;
public final OptionParser parser;
public final AbstractOptionSpec<Void> helpOpt;
public final AbstractOptionSpec<Void> versionOpt;
public OptionSet options;
public CommandDefaultOptions(String[] args) {
this(args, false);
}
public CommandDefaultOptions(String[] args, boolean allowCommandOptionAbbreviation) {
this.args = args;
this.parser = new OptionParser(allowCommandOptionAbbreviation);
this.helpOpt = parser.accepts("help", "Print usage information.").forHelp();
this.versionOpt = parser.accepts("version", "Display Kafka version.").forHelp();
this.options = null;
}
}

View File

@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.util;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Exit;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
/**
* Helper functions for dealing with command line utilities.
*/
public class CommandLineUtils {
/**
* Check if there are no options or `--help` option from command line.
*
* @param commandOpts Acceptable options for a command
* @return true on matching the help check condition
*/
public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) {
return commandOpts.args.length == 0 || commandOpts.options.has(commandOpts.helpOpt);
}
/**
* Check if there is `--version` option from command line.
*
* @param commandOpts Acceptable options for a command
* @return true on matching the help check condition
*/
public static boolean isPrintVersionNeeded(CommandDefaultOptions commandOpts) {
return commandOpts.options.has(commandOpts.versionOpt);
}
/**
* Check and print help message if there is no options or `--help` option
* from command line, if `--version` is specified on the command line
* print version information and exit.
*
* @param commandOpts Acceptable options for a command
* @param message Message to display on successful check
*/
public static void maybePrintHelpOrVersion(CommandDefaultOptions commandOpts, String message) {
if (isPrintHelpNeeded(commandOpts)) {
printUsageAndExit(commandOpts.parser, message);
}
if (isPrintVersionNeeded(commandOpts)) {
printVersionAndExit();
}
}
/**
* Check that all the listed options are present.
*/
public static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec<?>... requiredList) {
for (OptionSpec<?> arg : requiredList) {
if (!options.has(arg)) {
printUsageAndExit(parser, String.format("Missing required argument \"%s\"", arg));
}
}
}
/**
* Check that none of the listed options are present.
*/
public static void checkInvalidArgs(OptionParser parser,
OptionSet options,
OptionSpec<?> usedOption,
OptionSpec<?>... invalidOptions) {
if (options.has(usedOption)) {
for (OptionSpec<?> arg : invalidOptions) {
if (options.has(arg)) {
printUsageAndExit(parser, String.format("Option \"%s\" can't be used with option \"%s\"", usedOption, arg));
}
}
}
}
/**
* Check that none of the listed options are present.
*/
public static void checkInvalidArgs(OptionParser parser,
OptionSet options,
OptionSpec<?> usedOption,
Set<OptionSpec<?>> invalidOptions) {
OptionSpec<?>[] array = new OptionSpec<?>[invalidOptions.size()];
invalidOptions.toArray(array);
checkInvalidArgs(parser, options, usedOption, array);
}
/**
* Check that none of the listed options are present with the combination of used options.
*/
public static void checkInvalidArgsSet(OptionParser parser,
OptionSet options,
Set<OptionSpec<?>> usedOptions,
Set<OptionSpec<?>> invalidOptions,
Optional<String> trailingAdditionalMessage) {
if (usedOptions.stream().filter(options::has).count() == usedOptions.size()) {
for (OptionSpec<?> arg : invalidOptions) {
if (options.has(arg)) {
printUsageAndExit(parser, String.format("Option combination \"%s\" can't be used with option \"%s\"%s",
usedOptions, arg, trailingAdditionalMessage.orElse("")));
}
}
}
}
public static void printUsageAndExit(OptionParser parser, String message) {
System.err.println(message);
try {
parser.printHelpOn(System.err);
} catch (IOException e) {
throw new RuntimeException(e);
}
Exit.exit(1, message);
}
public static void printVersionAndExit() {
System.out.println(AppInfoParser.getVersion());
Exit.exit(0);
}
/**
* Parse key-value pairs in the form key=value.
* Value may contain equals sign.
*/
public static Properties parseKeyValueArgs(List<String> args) {
return parseKeyValueArgs(args, true);
}
/**
* Parse key-value pairs in the form key=value.
* Value may contain equals sign.
*/
public static Properties parseKeyValueArgs(List<String> args, boolean acceptMissingValue) {
Properties props = new Properties();
List<String[]> splits = new ArrayList<>();
args.forEach(arg -> {
String[] split = arg.split("=", 2);
if (split.length > 0) {
splits.add(split);
}
});
splits.forEach(split -> {
if (split.length == 1 || (split.length == 2 && (split[1] == null || split[1].isEmpty()))) {
if (acceptMissingValue) {
props.put(split[0], "");
} else {
throw new IllegalArgumentException(String.format("Missing value for key %s}", split[0]));
}
} else {
props.put(split[0], split[1]);
}
});
return props;
}
/**
* Merge the options into {@code props} for key {@code key}, with the following precedence, from high to low:
* 1) if {@code spec} is specified on {@code options} explicitly, use the value;
* 2) if {@code props} already has {@code key} set, keep it;
* 3) otherwise, use the default value of {@code spec}.
* A {@code null} value means to remove {@code key} from the {@code props}.
*/
public static <T> void maybeMergeOptions(Properties props, String key, OptionSet options, OptionSpec<T> spec) {
if (options.has(spec) || !props.containsKey(key)) {
T value = options.valueOf(spec);
if (value == null) {
props.remove(key);
} else {
props.put(key, value.toString());
}
}
}
}

View File

@ -0,0 +1,227 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.util;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class CommandLineUtilsTest {
@Test
public void testParseEmptyArg() {
List<String> argArray = Arrays.asList("my.empty.property=");
assertThrows(IllegalArgumentException.class, () -> CommandLineUtils.parseKeyValueArgs(argArray, false));
}
@Test
public void testParseEmptyArgWithNoDelimiter() {
List<String> argArray = Arrays.asList("my.empty.property");
assertThrows(IllegalArgumentException.class, () -> CommandLineUtils.parseKeyValueArgs(argArray, false));
}
@Test
public void testParseEmptyArgAsValid() {
List<String> argArray = Arrays.asList("my.empty.property=", "my.empty.property1");
Properties props = CommandLineUtils.parseKeyValueArgs(argArray);
assertEquals(props.getProperty("my.empty.property"), "", "Value of a key with missing value should be an empty string");
assertEquals(props.getProperty("my.empty.property1"), "", "Value of a key with missing value with no delimiter should be an empty string");
}
@Test
public void testParseSingleArg() {
List<String> argArray = Arrays.asList("my.property=value");
Properties props = CommandLineUtils.parseKeyValueArgs(argArray);
assertEquals(props.getProperty("my.property"), "value", "Value of a single property should be 'value'");
}
@Test
public void testParseArgs() {
List<String> argArray = Arrays.asList("first.property=first", "second.property=second");
Properties props = CommandLineUtils.parseKeyValueArgs(argArray);
assertEquals(props.getProperty("first.property"), "first", "Value of first property should be 'first'");
assertEquals(props.getProperty("second.property"), "second", "Value of second property should be 'second'");
}
@Test
public void testParseArgsWithMultipleDelimiters() {
List<String> argArray = Arrays.asList("first.property==first", "second.property=second=", "third.property=thi=rd");
Properties props = CommandLineUtils.parseKeyValueArgs(argArray);
assertEquals(props.getProperty("first.property"), "=first", "Value of first property should be '=first'");
assertEquals(props.getProperty("second.property"), "second=", "Value of second property should be 'second='");
assertEquals(props.getProperty("third.property"), "thi=rd", "Value of second property should be 'thi=rd'");
}
Properties props = new Properties();
OptionParser parser = new OptionParser(false);
OptionSpec<String> stringOpt;
OptionSpec<Integer> intOpt;
OptionSpec<String> stringOptOptionalArg;
OptionSpec<Integer> intOptOptionalArg;
OptionSpec<String> stringOptOptionalArgNoDefault;
OptionSpec<Integer> intOptOptionalArgNoDefault;
private void setUpOptions() {
stringOpt = parser.accepts("str")
.withRequiredArg()
.ofType(String.class)
.defaultsTo("default-string");
intOpt = parser.accepts("int")
.withRequiredArg()
.ofType(Integer.class)
.defaultsTo(100);
stringOptOptionalArg = parser.accepts("str-opt")
.withOptionalArg()
.ofType(String.class)
.defaultsTo("default-string-2");
intOptOptionalArg = parser.accepts("int-opt")
.withOptionalArg()
.ofType(Integer.class)
.defaultsTo(200);
stringOptOptionalArgNoDefault = parser.accepts("str-opt-nodef")
.withOptionalArg()
.ofType(String.class);
intOptOptionalArgNoDefault = parser.accepts("int-opt-nodef")
.withOptionalArg()
.ofType(Integer.class);
}
@Test
public void testMaybeMergeOptionsOverwriteExisting() {
setUpOptions();
props.put("skey", "existing-string");
props.put("ikey", "300");
props.put("sokey", "existing-string-2");
props.put("iokey", "400");
props.put("sondkey", "existing-string-3");
props.put("iondkey", "500");
OptionSet options = parser.parse(
"--str", "some-string",
"--int", "600",
"--str-opt", "some-string-2",
"--int-opt", "700",
"--str-opt-nodef", "some-string-3",
"--int-opt-nodef", "800"
);
CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt);
CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt);
CommandLineUtils.maybeMergeOptions(props, "sokey", options, stringOptOptionalArg);
CommandLineUtils.maybeMergeOptions(props, "iokey", options, intOptOptionalArg);
CommandLineUtils.maybeMergeOptions(props, "sondkey", options, stringOptOptionalArgNoDefault);
CommandLineUtils.maybeMergeOptions(props, "iondkey", options, intOptOptionalArgNoDefault);
assertEquals("some-string", props.get("skey"));
assertEquals("600", props.get("ikey"));
assertEquals("some-string-2", props.get("sokey"));
assertEquals("700", props.get("iokey"));
assertEquals("some-string-3", props.get("sondkey"));
assertEquals("800", props.get("iondkey"));
}
@Test
public void testMaybeMergeOptionsDefaultOverwriteExisting() {
setUpOptions();
props.put("sokey", "existing-string");
props.put("iokey", "300");
props.put("sondkey", "existing-string-2");
props.put("iondkey", "400");
OptionSet options = parser.parse(
"--str-opt",
"--int-opt",
"--str-opt-nodef",
"--int-opt-nodef"
);
CommandLineUtils.maybeMergeOptions(props, "sokey", options, stringOptOptionalArg);
CommandLineUtils.maybeMergeOptions(props, "iokey", options, intOptOptionalArg);
CommandLineUtils.maybeMergeOptions(props, "sondkey", options, stringOptOptionalArgNoDefault);
CommandLineUtils.maybeMergeOptions(props, "iondkey", options, intOptOptionalArgNoDefault);
assertEquals("default-string-2", props.get("sokey"));
assertEquals("200", props.get("iokey"));
assertNull(props.get("sondkey"));
assertNull(props.get("iondkey"));
}
@Test
public void testMaybeMergeOptionsDefaultValueIfNotExist() {
setUpOptions();
OptionSet options = parser.parse();
CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt);
CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt);
CommandLineUtils.maybeMergeOptions(props, "sokey", options, stringOptOptionalArg);
CommandLineUtils.maybeMergeOptions(props, "iokey", options, intOptOptionalArg);
CommandLineUtils.maybeMergeOptions(props, "sondkey", options, stringOptOptionalArgNoDefault);
CommandLineUtils.maybeMergeOptions(props, "iondkey", options, intOptOptionalArgNoDefault);
assertEquals("default-string", props.get("skey"));
assertEquals("100", props.get("ikey"));
assertEquals("default-string-2", props.get("sokey"));
assertEquals("200", props.get("iokey"));
assertNull(props.get("sondkey"));
assertNull(props.get("iondkey"));
}
@Test
public void testMaybeMergeOptionsNotOverwriteExisting() {
setUpOptions();
props.put("skey", "existing-string");
props.put("ikey", "300");
props.put("sokey", "existing-string-2");
props.put("iokey", "400");
props.put("sondkey", "existing-string-3");
props.put("iondkey", "500");
OptionSet options = parser.parse();
CommandLineUtils.maybeMergeOptions(props, "skey", options, stringOpt);
CommandLineUtils.maybeMergeOptions(props, "ikey", options, intOpt);
CommandLineUtils.maybeMergeOptions(props, "sokey", options, stringOptOptionalArg);
CommandLineUtils.maybeMergeOptions(props, "iokey", options, intOptOptionalArg);
CommandLineUtils.maybeMergeOptions(props, "sondkey", options, stringOptOptionalArgNoDefault);
CommandLineUtils.maybeMergeOptions(props, "iondkey", options, intOptOptionalArgNoDefault);
assertEquals("existing-string", props.get("skey"));
assertEquals("300", props.get("ikey"));
assertEquals("existing-string-2", props.get("sokey"));
assertEquals("400", props.get("iokey"));
assertEquals("existing-string-3", props.get("sondkey"));
assertEquals("500", props.get("iondkey"));
}
}