KAFKA-2746; Add support for using ConsumerGroupCommand on secure install

Author: Ashish Singh <asingh@cloudera.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>, Jun Rao <junrao@gmail.com>

Closes #534 from SinghAsDev/KAFKA-2746
This commit is contained in:
Ashish Singh 2015-11-17 12:00:16 -08:00 committed by Jun Rao
parent 52d5e88393
commit ffc0965d38
2 changed files with 19 additions and 20 deletions

View File

@ -13,6 +13,7 @@
package kafka.admin
import java.nio.ByteBuffer
import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger
import kafka.common.KafkaException
@ -209,6 +210,8 @@ object AdminClient {
create(new AdminConfig(config))
}
def create(props: Properties): AdminClient = create(props.asScala.toMap)
def create(props: Map[String, _]): AdminClient = create(new AdminConfig(props))
def create(config: AdminConfig): AdminClient = {

View File

@ -26,6 +26,7 @@ import kafka.common.{TopicAndPartition, _}
import kafka.consumer.SimpleConsumer
import kafka.utils._
import org.I0Itec.zkclient.exception.ZkNoNodeException
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.security.JaasUtils
@ -75,15 +76,6 @@ object ConsumerGroupCommand {
}
}
private def parseConfigs(opts: ConsumerGroupCommandOptions): Properties = {
val configsToBeAdded = opts.options.valuesOf(opts.configOpt).asScala.map(_.split("""\s*=\s*"""))
require(configsToBeAdded.forall(config => config.length == 2),
"Invalid config: all configs to be added must be in the format \"key=val\".")
val props = new Properties
configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim))
props
}
sealed trait ConsumerGroupService {
def list(): Unit
@ -160,9 +152,9 @@ object ConsumerGroupCommand {
}
protected def describeGroup(group: String) {
val configs = parseConfigs(opts)
val channelSocketTimeoutMs = configs.getProperty("channelSocketTimeoutMs", "600").toInt
val channelRetryBackoffMs = configs.getProperty("channelRetryBackoffMsOpt", "300").toInt
val props = if (opts.options.has(opts.commandConfigOpt)) Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) else new Properties()
val channelSocketTimeoutMs = props.getProperty("channelSocketTimeoutMs", "600").toInt
val channelRetryBackoffMs = props.getProperty("channelRetryBackoffMsOpt", "300").toInt
val topics = zkUtils.getTopicsByConsumerGroup(group)
if (topics.isEmpty)
println("No topic available for consumer group provided")
@ -352,8 +344,11 @@ object ConsumerGroupCommand {
if (consumer != null) consumer.close()
}
private def createAdminClient(): AdminClient =
AdminClient.createSimplePlaintext(opts.options.valueOf(opts.bootstrapServerOpt))
private def createAdminClient(): AdminClient = {
val props = if (opts.options.has(opts.commandConfigOpt)) Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) else new Properties()
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
AdminClient.create(props)
}
private def getConsumer() = {
if (consumer == null)
@ -371,6 +366,8 @@ object ConsumerGroupCommand {
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer)
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
if (opts.options.has(opts.commandConfigOpt)) properties.putAll(Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)))
new KafkaConsumer(properties)
}
@ -390,7 +387,6 @@ object ConsumerGroupCommand {
val BootstrapServerDoc = "REQUIRED (only when using new-consumer): The server to connect to."
val GroupDoc = "The consumer group we wish to act on."
val TopicDoc = "The topic whose consumer group information should be deleted."
val ConfigDoc = "Configuration for timeouts. For instance --config channelSocketTimeoutMs=600"
val ListDoc = "List all consumer groups."
val DescribeDoc = "Describe consumer group and list offset lag related to given group."
val nl = System.getProperty("line.separator")
@ -402,6 +398,7 @@ object ConsumerGroupCommand {
"for every consumer group. For instance --topic t1" + nl +
"WARNING: Group deletion only works for old ZK-based consumer groups, and one has to use it carefully to only delete groups that are not active."
val NewConsumerDoc = "Use new consumer."
val CommandConfigDoc = "Property file containing configs to be passed to Admin Client and Consumer."
val parser = new OptionParser
val zkConnectOpt = parser.accepts("zookeeper", ZkConnectDoc)
.withRequiredArg
@ -419,14 +416,14 @@ object ConsumerGroupCommand {
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
val configOpt = parser.accepts("config", ConfigDoc)
.withRequiredArg
.describedAs("name=value")
.ofType(classOf[String])
val listOpt = parser.accepts("list", ListDoc)
val describeOpt = parser.accepts("describe", DescribeDoc)
val deleteOpt = parser.accepts("delete", DeleteDoc)
val newConsumerOpt = parser.accepts("new-consumer", NewConsumerDoc)
val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc)
.withRequiredArg
.describedAs("command config property file")
.ofType(classOf[String])
val options = parser.parse(args : _*)
val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt)
@ -460,7 +457,6 @@ object ConsumerGroupCommand {
// check invalid args
CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, allConsumerGroupLevelOpts - describeOpt - deleteOpt)
CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, allConsumerGroupLevelOpts - deleteOpt)
CommandLineUtils.checkInvalidArgs(parser, options, configOpt, allConsumerGroupLevelOpts - describeOpt)
}
}
}