mirror of https://github.com/apache/kafka.git
KAFKA-130 Add a "console producer" that sends messages from standard input
git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1178659 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
031a559e89
commit
59a5c3215a
|
@ -0,0 +1,4 @@
|
||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
base_dir=$(dirname $0)
|
||||||
|
$base_dir/kafka-run-class.sh kafka.producer.ConsoleProducer $@
|
|
@ -0,0 +1,151 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package kafka.producer
|
||||||
|
|
||||||
|
import scala.collection.JavaConversions._
|
||||||
|
import org.I0Itec.zkclient._
|
||||||
|
import joptsimple._
|
||||||
|
import org.apache.log4j.Logger
|
||||||
|
import java.util.Arrays.asList
|
||||||
|
import java.util.Properties
|
||||||
|
import java.util.Random
|
||||||
|
import java.io._
|
||||||
|
import kafka.message._
|
||||||
|
import kafka.utils._
|
||||||
|
import kafka.serializer._
|
||||||
|
|
||||||
|
object ConsoleProducer {
|
||||||
|
|
||||||
|
private val logger = Logger.getLogger(getClass())
|
||||||
|
|
||||||
|
def main(args: Array[String]) {
|
||||||
|
val parser = new OptionParser
|
||||||
|
val topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
|
||||||
|
.withRequiredArg
|
||||||
|
.describedAs("topic")
|
||||||
|
.ofType(classOf[String])
|
||||||
|
val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The zookeeper connection string for the kafka zookeeper instance in the form HOST:PORT[/CHROOT].")
|
||||||
|
.withRequiredArg
|
||||||
|
.describedAs("connection_string")
|
||||||
|
.ofType(classOf[String])
|
||||||
|
val asyncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
|
||||||
|
val compressOpt = parser.accepts("compress", "If set, messages batches are sent compressed")
|
||||||
|
val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously.")
|
||||||
|
.withRequiredArg
|
||||||
|
.describedAs("size")
|
||||||
|
.ofType(classOf[java.lang.Integer])
|
||||||
|
.defaultsTo(200)
|
||||||
|
val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" +
|
||||||
|
" a message will queue awaiting suffient batch size. The value is given in ms.")
|
||||||
|
.withRequiredArg
|
||||||
|
.describedAs("timeout_ms")
|
||||||
|
.ofType(classOf[java.lang.Long])
|
||||||
|
.defaultsTo(1000)
|
||||||
|
val messageEncoderOpt = parser.accepts("message-encoder", "The class name of the message encoder implementation to use.")
|
||||||
|
.withRequiredArg
|
||||||
|
.describedAs("encoder_class")
|
||||||
|
.ofType(classOf[java.lang.String])
|
||||||
|
.defaultsTo(classOf[StringEncoder].getName)
|
||||||
|
val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " +
|
||||||
|
"By default each line is read as a seperate message.")
|
||||||
|
.withRequiredArg
|
||||||
|
.describedAs("reader_class")
|
||||||
|
.ofType(classOf[java.lang.String])
|
||||||
|
.defaultsTo(classOf[LineMessageReader].getName)
|
||||||
|
val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " +
|
||||||
|
"This allows custom configuration for a user-defined message reader.")
|
||||||
|
.withRequiredArg
|
||||||
|
.describedAs("prop")
|
||||||
|
.ofType(classOf[String])
|
||||||
|
|
||||||
|
|
||||||
|
val options = parser.parse(args : _*)
|
||||||
|
for(arg <- List(topicOpt, zkConnectOpt)) {
|
||||||
|
if(!options.has(arg)) {
|
||||||
|
System.err.println("Missing required argument \"" + arg + "\"")
|
||||||
|
parser.printHelpOn(System.err)
|
||||||
|
System.exit(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val topic = options.valueOf(topicOpt)
|
||||||
|
val zkConnect = options.valueOf(zkConnectOpt)
|
||||||
|
val async = options.has(asyncOpt)
|
||||||
|
val compress = options.has(compressOpt)
|
||||||
|
val batchSize = options.valueOf(batchSizeOpt)
|
||||||
|
val sendTimeout = options.valueOf(sendTimeoutOpt)
|
||||||
|
val encoderClass = options.valueOf(messageEncoderOpt)
|
||||||
|
val readerClass = options.valueOf(messageReaderOpt)
|
||||||
|
val cmdLineProps = parseLineReaderArgs(options.valuesOf(propertyOpt))
|
||||||
|
|
||||||
|
val props = new Properties()
|
||||||
|
props.put("zk.connect", zkConnect)
|
||||||
|
props.put("compression.codec", DefaultCompressionCodec.codec.toString)
|
||||||
|
props.put("producer.type", if(async) "async" else "sync")
|
||||||
|
if(options.has(batchSizeOpt))
|
||||||
|
props.put("batch.size", batchSize)
|
||||||
|
props.put("queue.enqueueTimeout.ms", sendTimeout.toString)
|
||||||
|
props.put("serializer.class", encoderClass)
|
||||||
|
|
||||||
|
val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader]
|
||||||
|
reader.init(System.in, cmdLineProps)
|
||||||
|
|
||||||
|
val producer = new Producer[Any, Any](new ProducerConfig(props))
|
||||||
|
|
||||||
|
Runtime.getRuntime.addShutdownHook(new Thread() {
|
||||||
|
override def run() {
|
||||||
|
producer.close()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
var message: AnyRef = null
|
||||||
|
do {
|
||||||
|
message = reader.readMessage()
|
||||||
|
if(message != null)
|
||||||
|
producer.send(new ProducerData(topic, message))
|
||||||
|
} while(message != null)
|
||||||
|
}
|
||||||
|
|
||||||
|
def parseLineReaderArgs(args: Iterable[String]): Properties = {
|
||||||
|
val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0)
|
||||||
|
if(!splits.forall(_.length == 2)) {
|
||||||
|
System.err.println("Invalid line reader properties: " + args.mkString(" "))
|
||||||
|
System.exit(1)
|
||||||
|
}
|
||||||
|
val props = new Properties
|
||||||
|
for(a <- splits)
|
||||||
|
props.put(a(0), a(1))
|
||||||
|
props
|
||||||
|
}
|
||||||
|
|
||||||
|
trait MessageReader {
|
||||||
|
def init(inputStream: InputStream, props: Properties) {}
|
||||||
|
def readMessage(): AnyRef
|
||||||
|
def close() {}
|
||||||
|
}
|
||||||
|
|
||||||
|
class LineMessageReader extends MessageReader {
|
||||||
|
var reader: BufferedReader = null
|
||||||
|
|
||||||
|
override def init(inputStream: InputStream, props: Properties) {
|
||||||
|
reader = new BufferedReader(new InputStreamReader(inputStream))
|
||||||
|
}
|
||||||
|
|
||||||
|
override def readMessage() = reader.readLine()
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue