KAFKA-544. Follow-up items on key-retention. Addresses misc. comments from Joel, see ticket for details.

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1413839 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Edward Jay Kreps 2012-11-26 20:59:21 +00:00
parent 602acaf412
commit 6e6522c7c9
6 changed files with 78 additions and 28 deletions

View File

@ -89,7 +89,7 @@ object ConsoleConsumer extends Logging {
.withRequiredArg
.describedAs("class")
.ofType(classOf[String])
.defaultsTo(classOf[NewlineMessageFormatter].getName)
.defaultsTo(classOf[DefaultMessageFormatter].getName)
val messageFormatterArgOpt = parser.accepts("property")
.withRequiredArg
.describedAs("prop")
@ -256,10 +256,27 @@ trait MessageFormatter {
def close() {}
}
class NewlineMessageFormatter extends MessageFormatter {
class DefaultMessageFormatter extends MessageFormatter {
var printKey = false
var keySeparator = "\t".getBytes
var lineSeparator = "\n".getBytes
override def init(props: Properties) {
if(props.containsKey("print.key"))
printKey = props.getProperty("print.key").trim.toLowerCase.equals("true")
if(props.containsKey("key.separator"))
keySeparator = props.getProperty("key.separator").getBytes
if(props.containsKey("line.separator"))
lineSeparator = props.getProperty("line.separator").getBytes
}
def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
if(printKey) {
output.write(key)
output.write(keySeparator)
}
output.write(value)
output.write('\n')
output.write(lineSeparator)
}
}

View File

@ -20,7 +20,9 @@ package kafka.producer
import scala.collection.JavaConversions._
import joptsimple._
import java.util.Properties
import java.util.regex._
import java.io._
import kafka.common._
import kafka.message._
import kafka.serializer._
@ -49,13 +51,18 @@ object ConsoleProducer {
.describedAs("timeout_ms")
.ofType(classOf[java.lang.Long])
.defaultsTo(1000)
val messageEncoderOpt = parser.accepts("message-encoder", "The class name of the message encoder implementation to use.")
val valueEncoderOpt = parser.accepts("value-serializer", "The class name of the message encoder implementation to use for serializing values.")
.withRequiredArg
.describedAs("encoder_class")
.ofType(classOf[java.lang.String])
.defaultsTo(classOf[StringEncoder].getName)
val keyEncoderOpt = parser.accepts("key-serializer", "The class name of the message encoder implementation to use for serializing keys.")
.withRequiredArg
.describedAs("encoder_class")
.ofType(classOf[java.lang.String])
.defaultsTo(classOf[StringEncoder].getName)
val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " +
"By default each line is read as a seperate message.")
"By default each line is read as a separate message.")
.withRequiredArg
.describedAs("reader_class")
.ofType(classOf[java.lang.String])
@ -82,9 +89,11 @@ object ConsoleProducer {
val compress = options.has(compressOpt)
val batchSize = options.valueOf(batchSizeOpt)
val sendTimeout = options.valueOf(sendTimeoutOpt)
val encoderClass = options.valueOf(messageEncoderOpt)
val keyEncoderClass = options.valueOf(keyEncoderOpt)
val valueEncoderClass = options.valueOf(valueEncoderOpt)
val readerClass = options.valueOf(messageReaderOpt)
val cmdLineProps = parseLineReaderArgs(options.valuesOf(propertyOpt))
cmdLineProps.put("topic", topic)
val props = new Properties()
props.put("broker.list", brokerList)
@ -94,12 +103,13 @@ object ConsoleProducer {
if(options.has(batchSizeOpt))
props.put("batch.size", batchSize.toString)
props.put("queue.time", sendTimeout.toString)
props.put("serializer.class", encoderClass)
props.put("key.serializer.class", keyEncoderClass)
props.put("serializer.class", valueEncoderClass)
val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader]
val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef, AnyRef]]
reader.init(System.in, cmdLineProps)
val producer = new Producer[Any, Any](new ProducerConfig(props))
val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
@ -107,11 +117,11 @@ object ConsoleProducer {
}
})
var message: AnyRef = null
var message: KeyedMessage[AnyRef, AnyRef] = null
do {
message = reader.readMessage()
if(message != null)
producer.send(new KeyedMessage(topic, message))
producer.send(message)
} while(message != null)
}
@ -127,19 +137,49 @@ object ConsoleProducer {
props
}
trait MessageReader {
trait MessageReader[K,V] {
def init(inputStream: InputStream, props: Properties) {}
def readMessage(): AnyRef
def readMessage(): KeyedMessage[K,V]
def close() {}
}
class LineMessageReader extends MessageReader {
class LineMessageReader extends MessageReader[String, String] {
var topic: String = null
var reader: BufferedReader = null
var parseKey = false
var keySeparator = "\t"
var ignoreError = false
var lineNumber = 0
override def init(inputStream: InputStream, props: Properties) {
override def init(inputStream: InputStream, props: Properties) {
topic = props.getProperty("topic")
if(props.containsKey("parse.key"))
parseKey = props.getProperty("parse.key").trim.toLowerCase.equals("true")
if(props.containsKey("key.seperator"))
keySeparator = props.getProperty("key.separator")
if(props.containsKey("ignore.error"))
ignoreError = props.getProperty("ignore.error").trim.toLowerCase.equals("true")
reader = new BufferedReader(new InputStreamReader(inputStream))
}
override def readMessage() = reader.readLine()
override def readMessage() = {
lineNumber += 1
val line = reader.readLine()
if(parseKey) {
line.indexOf(keySeparator) match {
case -1 =>
if(ignoreError)
new KeyedMessage(topic, line)
else
throw new KafkaException("No key found on line " + lineNumber + ": " + line)
case n =>
new KeyedMessage(topic,
line.substring(0, n),
if(n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size))
}
} else {
new KeyedMessage(topic, line)
}
}
}
}

View File

@ -19,7 +19,7 @@ package kafka.producer.async
import kafka.utils.{SystemTime, Logging}
import java.util.concurrent.{TimeUnit, CountDownLatch, BlockingQueue}
import collection.mutable.ListBuffer
import collection.mutable.ArrayBuffer
import kafka.producer.KeyedMessage
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
@ -57,7 +57,7 @@ class ProducerSendThread[K,V](val threadName: String,
private def processEvents() {
var lastSend = SystemTime.milliseconds
var events = new ListBuffer[KeyedMessage[K,V]]
var events = new ArrayBuffer[KeyedMessage[K,V]]
var full: Boolean = false
// drain the queue until you get a shutdown command
@ -85,7 +85,7 @@ class ProducerSendThread[K,V](val threadName: String,
// if either queue time has reached or batch size has reached, dispatch to event handler
tryToHandle(events)
lastSend = SystemTime.milliseconds
events = new ListBuffer[KeyedMessage[K,V]]
events = new ArrayBuffer[KeyedMessage[K,V]]
}
}
// send the last batch of events

View File

@ -36,13 +36,6 @@ class DefaultDecoder(props: VerifiableProperties = null) extends Decoder[Array[B
def fromBytes(bytes: Array[Byte]): Array[Byte] = bytes
}
/**
* Decode messages without any key
*/
class KeylessMessageDecoder(props: VerifiableProperties = null) extends Decoder[Message] {
def fromBytes(bytes: Array[Byte]) = new Message(bytes)
}
/**
* The string encoder translates strings into bytes. It uses UTF8 by default but takes
* an optional property serializer.encoding to control this.

View File

@ -74,7 +74,7 @@ object SimpleConsumerShell extends Logging {
.withRequiredArg
.describedAs("class")
.ofType(classOf[String])
.defaultsTo(classOf[NewlineMessageFormatter].getName)
.defaultsTo(classOf[DefaultMessageFormatter].getName)
val messageFormatterArgOpt = parser.accepts("property")
.withRequiredArg
.describedAs("prop")

View File

@ -426,7 +426,7 @@ object Utils extends Logging {
}
/**
* This method gets comma seperated values which contains key,value pairs and returns a map of
* This method gets comma separated values which contains key,value pairs and returns a map of
* key value pairs. the format of allCSVal is key1:val1, key2:val2 ....
*/
def parseCsvMap(str: String): Map[String, String] = {