mirror of https://github.com/apache/kafka.git
KAFKA-2684; Add force option to topic / config command so they can be called programatically
Tiny change to add a force option to the topic and config commands so they can be called programatically without requiring user input. Author: Ben Stopford <benstopford@gmail.com> Reviewers: Ismael Juma <ismael@juma.me.uk> Closes #351 from benstopford/CPKAFKA-61B
This commit is contained in:
parent
03a1f7d39c
commit
b410ea37b0
|
@ -69,7 +69,7 @@ object ConfigCommand {
|
|||
val configsToBeDeleted = parseConfigsToBeDeleted(opts)
|
||||
val entityType = opts.options.valueOf(opts.entityType)
|
||||
val entityName = opts.options.valueOf(opts.entityName)
|
||||
warnOnMaxMessagesChange(configsToBeAdded)
|
||||
warnOnMaxMessagesChange(configsToBeAdded, opts.options.has(opts.forceOpt))
|
||||
|
||||
// compile the final set of configs
|
||||
val configs = AdminUtils.fetchEntityConfig(zkUtils, entityType, entityName)
|
||||
|
@ -85,14 +85,15 @@ object ConfigCommand {
|
|||
}
|
||||
}
|
||||
|
||||
def warnOnMaxMessagesChange(configs: Properties): Unit = {
|
||||
def warnOnMaxMessagesChange(configs: Properties, force: Boolean): Unit = {
|
||||
val maxMessageBytes = configs.get(LogConfig.MaxMessageBytesProp) match {
|
||||
case n: String => n.toInt
|
||||
case _ => -1
|
||||
}
|
||||
if (maxMessageBytes > Defaults.MaxMessageSize){
|
||||
error(TopicCommand.longMessageSizeWarning(maxMessageBytes))
|
||||
TopicCommand.askToProceed
|
||||
if (!force)
|
||||
TopicCommand.askToProceed
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -107,14 +108,14 @@ object ConfigCommand {
|
|||
for (entityName <- entityNames) {
|
||||
val configs = AdminUtils.fetchEntityConfig(zkUtils, entityType, entityName)
|
||||
println("Configs for %s:%s are %s"
|
||||
.format(entityType, entityName, configs.map(kv => kv._1 + "=" + kv._2).mkString(",")))
|
||||
.format(entityType, entityName, configs.map(kv => kv._1 + "=" + kv._2).mkString(",")))
|
||||
}
|
||||
}
|
||||
|
||||
private[admin] def parseConfigsToBeAdded(opts: ConfigCommandOptions): Properties = {
|
||||
val configsToBeAdded = opts.options.valuesOf(opts.addConfig).map(_.split("""\s*=\s*"""))
|
||||
require(configsToBeAdded.forall(config => config.length == 2),
|
||||
"Invalid entity config: all configs to be added must be in the format \"key=val\".")
|
||||
"Invalid entity config: all configs to be added must be in the format \"key=val\".")
|
||||
val props = new Properties
|
||||
configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim))
|
||||
if (props.containsKey(LogConfig.MessageFormatVersionProp)) {
|
||||
|
@ -164,6 +165,7 @@ object ConfigCommand {
|
|||
.ofType(classOf[String])
|
||||
.withValuesSeparatedBy(',')
|
||||
val helpOpt = parser.accepts("help", "Print usage information.")
|
||||
val forceOpt = parser.accepts("force", "Suppress console prompts")
|
||||
val options = parser.parse(args : _*)
|
||||
|
||||
val allOpts: Set[OptionSpec[_]] = Set(alterOpt, describeOpt, entityType, entityName, addConfig, deleteConfig, helpOpt)
|
||||
|
|
|
@ -97,13 +97,13 @@ object TopicCommand extends Logging {
|
|||
try {
|
||||
if (opts.options.has(opts.replicaAssignmentOpt)) {
|
||||
val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
|
||||
warnOnMaxMessagesChange(configs, assignment.valuesIterator.next().length)
|
||||
warnOnMaxMessagesChange(configs, assignment.valuesIterator.next().length, opts.options.has(opts.forceOpt))
|
||||
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false)
|
||||
} else {
|
||||
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
|
||||
val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
|
||||
val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
|
||||
warnOnMaxMessagesChange(configs, replicas)
|
||||
warnOnMaxMessagesChange(configs, replicas, opts.options.has(opts.forceOpt))
|
||||
val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled
|
||||
else RackAwareMode.Enforced
|
||||
AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode)
|
||||
|
@ -326,6 +326,9 @@ object TopicCommand extends Logging {
|
|||
"if set when creating topics, the action will only execute if the topic does not already exist")
|
||||
|
||||
val disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware replica assignment")
|
||||
|
||||
val forceOpt = parser.accepts("force", "Suppress console prompts")
|
||||
|
||||
val options = parser.parse(args : _*)
|
||||
|
||||
val allTopicLevelOpts: Set[OptionSpec[_]] = Set(alterOpt, createOpt, describeOpt, listOpt, deleteOpt)
|
||||
|
@ -354,7 +357,7 @@ object TopicCommand extends Logging {
|
|||
CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, allTopicLevelOpts -- Set(createOpt))
|
||||
}
|
||||
}
|
||||
def warnOnMaxMessagesChange(configs: Properties, replicas: Integer): Unit = {
|
||||
def warnOnMaxMessagesChange(configs: Properties, replicas: Integer, force: Boolean): Unit = {
|
||||
val maxMessageBytes = configs.get(LogConfig.MaxMessageBytesProp) match {
|
||||
case n: String => n.toInt
|
||||
case _ => -1
|
||||
|
@ -362,7 +365,8 @@ object TopicCommand extends Logging {
|
|||
if (maxMessageBytes > Defaults.MaxMessageSize)
|
||||
if (replicas > 1) {
|
||||
error(longMessageSizeWarning(maxMessageBytes))
|
||||
askToProceed
|
||||
if (!force)
|
||||
askToProceed
|
||||
}
|
||||
else
|
||||
warn(shortMessageSizeWarning(maxMessageBytes))
|
||||
|
@ -405,3 +409,4 @@ object TopicCommand extends Logging {
|
|||
s"- Default Consumer fetch.message.max.bytes: ${ConsumerConfig.FetchSize}\n\n"
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue