mirror of https://github.com/apache/kafka.git
KAFKA-2536: topics tool should allow users to alter topic configuration
This is a minimal revert of some backward incompatible changes made in KAFKA-2205, with the addition of the deprecation logging message. Author: Grant Henke <granthenke@gmail.com> Reviewers: Gwen Shapira Closes #305 from granthenke/topic-configs
This commit is contained in:
parent
5013a41a51
commit
3626133473
|
@ -36,15 +36,15 @@ import kafka.coordinator.ConsumerCoordinator
|
|||
object TopicCommand extends Logging {
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
|
||||
|
||||
val opts = new TopicCommandOptions(args)
|
||||
|
||||
|
||||
if(args.length == 0)
|
||||
CommandLineUtils.printUsageAndDie(opts.parser, "Create, delete, describe, or change a topic.")
|
||||
|
||||
|
||||
// should have exactly one action
|
||||
val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _)
|
||||
if(actions != 1)
|
||||
if(actions != 1)
|
||||
CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --create, --alter or --delete")
|
||||
|
||||
opts.checkArgs()
|
||||
|
@ -108,6 +108,20 @@ object TopicCommand extends Logging {
|
|||
opts.options.valueOf(opts.zkConnectOpt)))
|
||||
}
|
||||
topics.foreach { topic =>
|
||||
val configs = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic)
|
||||
if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) {
|
||||
println("WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.")
|
||||
println(" Going forward, please use kafka-configs.sh for this functionality")
|
||||
|
||||
val configsToBeAdded = parseTopicConfigsToBeAdded(opts)
|
||||
val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts)
|
||||
// compile the final set of configs
|
||||
configs.putAll(configsToBeAdded)
|
||||
configsToBeDeleted.foreach(config => configs.remove(config))
|
||||
AdminUtils.changeTopicConfig(zkClient, topic, configs)
|
||||
println("Updated config for topic \"%s\".".format(topic))
|
||||
}
|
||||
|
||||
if(opts.options.has(opts.partitionsOpt)) {
|
||||
if (topic == ConsumerCoordinator.OffsetsTopicName) {
|
||||
throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.")
|
||||
|
@ -121,7 +135,7 @@ object TopicCommand extends Logging {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def listTopics(zkClient: ZkClient, opts: TopicCommandOptions) {
|
||||
val topics = getTopics(zkClient, opts)
|
||||
for(topic <- topics) {
|
||||
|
@ -211,6 +225,18 @@ object TopicCommand extends Logging {
|
|||
props
|
||||
}
|
||||
|
||||
def parseTopicConfigsToBeDeleted(opts: TopicCommandOptions): Seq[String] = {
|
||||
if (opts.options.has(opts.deleteConfigOpt)) {
|
||||
val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfigOpt).map(_.trim())
|
||||
val propsToBeDeleted = new Properties
|
||||
configsToBeDeleted.foreach(propsToBeDeleted.setProperty(_, ""))
|
||||
LogConfig.validateNames(propsToBeDeleted)
|
||||
configsToBeDeleted
|
||||
}
|
||||
else
|
||||
Seq.empty
|
||||
}
|
||||
|
||||
def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = {
|
||||
val partitionList = replicaAssignmentList.split(",")
|
||||
val ret = new mutable.HashMap[Int, List[Int]]()
|
||||
|
@ -225,7 +251,7 @@ object TopicCommand extends Logging {
|
|||
}
|
||||
ret.toMap
|
||||
}
|
||||
|
||||
|
||||
class TopicCommandOptions(args: Array[String]) {
|
||||
val parser = new OptionParser
|
||||
val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
|
||||
|
@ -236,7 +262,7 @@ object TopicCommand extends Logging {
|
|||
val listOpt = parser.accepts("list", "List all available topics.")
|
||||
val createOpt = parser.accepts("create", "Create a new topic.")
|
||||
val deleteOpt = parser.accepts("delete", "Delete a topic")
|
||||
val alterOpt = parser.accepts("alter", "Alter the number of partitions and/or replica assignment for a topic")
|
||||
val alterOpt = parser.accepts("alter", "Alter the number of partitions, replica assignment, and/or configuration for the topic.")
|
||||
val describeOpt = parser.accepts("describe", "List details for the given topics.")
|
||||
val helpOpt = parser.accepts("help", "Print usage information.")
|
||||
val topicOpt = parser.accepts("topic", "The topic to be create, alter or describe. Can also accept a regular " +
|
||||
|
@ -245,12 +271,16 @@ object TopicCommand extends Logging {
|
|||
.describedAs("topic")
|
||||
.ofType(classOf[String])
|
||||
val nl = System.getProperty("line.separator")
|
||||
val configOpt = parser.accepts("config", "A configuration override for the topic being created." +
|
||||
"The following is a list of valid configurations: " + nl + LogConfig.configNames.map("\t" + _).mkString(nl) + nl +
|
||||
"See the Kafka documentation for full details on the topic configs.")
|
||||
.withRequiredArg
|
||||
.describedAs("name=value")
|
||||
.ofType(classOf[String])
|
||||
val configOpt = parser.accepts("config", "A topic configuration override for the topic being created or altered." +
|
||||
"The following is a list of valid configurations: " + nl + LogConfig.configNames.map("\t" + _).mkString(nl) + nl +
|
||||
"See the Kafka documentation for full details on the topic configs.")
|
||||
.withRequiredArg
|
||||
.describedAs("name=value")
|
||||
.ofType(classOf[String])
|
||||
val deleteConfigOpt = parser.accepts("delete-config", "A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option).")
|
||||
.withRequiredArg
|
||||
.describedAs("name")
|
||||
.ofType(classOf[String])
|
||||
val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created or " +
|
||||
"altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected")
|
||||
.withRequiredArg
|
||||
|
@ -284,11 +314,10 @@ object TopicCommand extends Logging {
|
|||
|
||||
// check invalid args
|
||||
CommandLineUtils.checkInvalidArgs(parser, options, configOpt, allTopicLevelOpts -- Set(alterOpt, createOpt))
|
||||
CommandLineUtils.checkInvalidArgs(parser, options, deleteConfigOpt, allTopicLevelOpts -- Set(alterOpt))
|
||||
CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt, allTopicLevelOpts -- Set(alterOpt, createOpt))
|
||||
CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt, allTopicLevelOpts -- Set(createOpt))
|
||||
CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, allTopicLevelOpts -- Set(createOpt,alterOpt))
|
||||
// Topic configs cannot be changed with alterTopic
|
||||
CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, Set(configOpt))
|
||||
if(options.has(createOpt))
|
||||
CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, Set(partitionsOpt, replicationFactorOpt))
|
||||
CommandLineUtils.checkInvalidArgs(parser, options, reportUnderReplicatedPartitionsOpt,
|
||||
|
@ -299,5 +328,4 @@ object TopicCommand extends Logging {
|
|||
allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + reportUnavailablePartitionsOpt)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue