From 0153d4bb597283f48ae07af7cda3c633a71e436b Mon Sep 17 00:00:00 2001 From: Jun Rao Date: Mon, 7 Nov 2011 18:11:10 +0000 Subject: [PATCH] change ProducerShell to use high level producer; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-195 git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1198849 13f79535-47bb-0310-9956-ffa450edef68 --- config/producer.properties | 80 +++++++++++++++++++ .../scala/kafka/tools/ProducerShell.scala | 25 +++--- 2 files changed, 90 insertions(+), 15 deletions(-) create mode 100644 config/producer.properties diff --git a/config/producer.properties b/config/producer.properties new file mode 100644 index 00000000000..e94d78b161c --- /dev/null +++ b/config/producer.properties @@ -0,0 +1,80 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.producer.ProducerConfig for more details + +############################# Producer Basics ############################# + +# need to set either broker.list or zk.connect + +# configure brokers statically +# format: brokerid1:host1:port1,brokerid2:host2:port2 ... +broker.list=0:localhost:9092 + +# discover brokers from ZK +#zk.connect= + +# zookeeper session timeout; default is 6000 +#zk.sessiontimeout.ms= + +# the max time that the client waits to establish a connection to zookeeper; default is 6000 +#zk.connectiontimeout.ms + +# name of the partitioner class for partitioning events; default partition spreads data randomly +#partitioner.class= + +# specifies whether the messages are sent asynchronously (async) or synchronously (sync) +producer.type=sync + +# specify the compression codec for all data generated: 0: no compression, 1: gzip +compression.codec=0 + +# message encoder +serializer.class=kafka.serializer.StringEncoder + +# allow topic level compression +#compressed.topics= + +# max message size; messages larger than that size are discarded; default is 1000000 +#max.message.size= + + +############################# Async Producer ############################# +# maximum time, in milliseconds, for buffering data on the producer queue +#queue.time= + +# the maximum size of the blocking queue for buffering on the producer +#queue.size= + +# Timeout for event enqueue: +# 0: events will be enqueued immediately or dropped if the queue is full +# -ve: enqueue will block indefinitely if the queue is full +# +ve: enqueue will block up to this many milliseconds if the queue is full +#queue.enqueueTimeout.ms= + +# the number of messages batched at the producer +#batch.size= + +# the callback handler for one or multiple events +#callback.handler= + +# properties required to initialize the callback handler +#callback.handler.props= + +# the handler for events +#event.handler= + +# properties required to initialize the event handler +#event.handler.props= + diff --git a/core/src/main/scala/kafka/tools/ProducerShell.scala b/core/src/main/scala/kafka/tools/ProducerShell.scala index f6b70573d6f..eda58d9be24 100644 --- a/core/src/main/scala/kafka/tools/ProducerShell.scala +++ b/core/src/main/scala/kafka/tools/ProducerShell.scala @@ -23,6 +23,7 @@ import joptsimple._ import kafka.message._ import kafka.producer._ import java.util.Properties +import kafka.utils.Utils /** * Interactive shell for producing messages from the command line @@ -32,9 +33,9 @@ object ProducerShell { def main(args: Array[String]) { val parser = new OptionParser - val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.") + val producerPropsOpt = parser.accepts("props", "REQUIRED: Properties file with the producer properties.") .withRequiredArg - .describedAs("kafka://hostname:port") + .describedAs("properties") .ofType(classOf[String]) val topicOpt = parser.accepts("topic", "REQUIRED: The topic to produce to.") .withRequiredArg @@ -43,7 +44,7 @@ object ProducerShell { val options = parser.parse(args : _*) - for(arg <- List(urlOpt, topicOpt)) { + for(arg <- List(producerPropsOpt, topicOpt)) { if(!options.has(arg)) { System.err.println("Missing required argument \"" + arg + "\"") parser.printHelpOn(System.err) @@ -51,15 +52,10 @@ object ProducerShell { } } - val url = new URI(options.valueOf(urlOpt)) + val propsFile = options.valueOf(producerPropsOpt) + val producerConfig = new ProducerConfig(Utils.loadProps(propsFile)) val topic = options.valueOf(topicOpt) - val props = new Properties() - props.put("host", url.getHost) - props.put("port", url.getPort.toString) - props.put("buffer.size", "65536") - props.put("connect.timeout.ms", "10000") - props.put("reconnect.interval", "100") - val producer = new SyncProducer(new SyncProducerConfig(props)) + val producer = new Producer[String, String](producerConfig) val input = new BufferedReader(new InputStreamReader(System.in)) var done = false @@ -68,10 +64,9 @@ object ProducerShell { if(line == null) { done = true } else { - val lineBytes = line.trim.getBytes() - val messageList = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(lineBytes)) - producer.send(topic, messageList) - println("Sent: %s (%d bytes)".format(line, messageList.sizeInBytes)) + val message = line.trim + producer.send(new ProducerData[String, String](topic, message)) + println("Sent: %s (%d bytes)".format(line, message.getBytes.length)) } } producer.close()