KAFKA-1281 add the new producer to existing tools; reviewed by Jun Rao and Guozhang Wang

This commit is contained in:
Neha Narkhede 2014-03-06 18:08:55 -08:00
parent c3520fe7e0
commit 74c54c7eeb
8 changed files with 258 additions and 228 deletions

View File

@ -89,12 +89,6 @@ public class ProducerConfig extends AbstractConfig {
*/
public static final String LINGER_MS_CONFIG = "linger.ms";
/**
* Force a refresh of the cluster metadata after this period of time. This ensures that changes to the number of
* partitions or other settings will by taken up by producers without restart.
*/
public static final String METADATA_REFRESH_MS_CONFIG = "topic.metadata.refresh.interval.ms";
/**
* The id string to pass to the server when making requests. The purpose of this is to be able to track the source
* of requests beyond just ip/port by allowing a logical application name to be included.
@ -158,7 +152,6 @@ public class ProducerConfig extends AbstractConfig {
.define(REQUIRED_ACKS_CONFIG, Type.INT, 1, between(-1, Short.MAX_VALUE), "blah blah")
.define(REQUEST_TIMEOUT_CONFIG, Type.INT, 30 * 1000, atLeast(0), "blah blah")
.define(LINGER_MS_CONFIG, Type.LONG, 0, atLeast(0L), "blah blah")
.define(METADATA_REFRESH_MS_CONFIG, Type.LONG, 10 * 60 * 1000, atLeast(-1L), "blah blah")
.define(CLIENT_ID_CONFIG, Type.STRING, "", "blah blah")
.define(SEND_BUFFER_CONFIG, Type.INT, 128 * 1024, atLeast(0), "blah blah")
.define(RECEIVE_BUFFER_CONFIG, Type.INT, 32 * 1024, atLeast(0), "blah blah")

View File

@ -18,5 +18,3 @@ log4j.rootLogger=WARN, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

View File

@ -17,17 +17,49 @@
package kafka.producer
import scala.collection.JavaConversions._
import joptsimple._
import java.util.Properties
import java.io._
import kafka.common._
import kafka.message._
import kafka.serializer._
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import kafka.utils.{CommandLineUtils, Utils}
object ConsoleProducer {
def main(args: Array[String]) {
val config = new ProducerConfig(args)
val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]
reader.init(System.in, config.cmdLineProps)
try {
val producer =
if(config.useNewProducer) new NewShinyProducer(config)
else new OldProducer(config)
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
producer.close()
}
})
var message: KeyedMessage[Array[Byte], Array[Byte]] = null
do {
message = reader.readMessage()
if(message != null)
producer.send(message.topic, message.key, message.message)
} while(message != null)
} catch {
case e: Exception =>
e.printStackTrace
System.exit(1)
}
System.exit(0)
}
class ProducerConfig(args: Array[String]) {
val parser = new OptionParser
val topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
.withRequiredArg
@ -79,16 +111,41 @@ object ConsoleProducer {
.describedAs("request timeout ms")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1500)
val metadataExpiryMsOpt = parser.accepts("metadata-expiry-ms",
"The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any leadership changes.")
.withRequiredArg
.describedAs("metadata expiration interval")
.ofType(classOf[java.lang.Long])
.defaultsTo(5*60*1000L)
val metadataFetchTimeoutMsOpt = parser.accepts("metadata-fetch-timeout-ms",
"The amount of time to block waiting to fetch metadata about a topic the first time a record is sent to that topic.")
.withRequiredArg
.describedAs("metadata fetch timeout")
.ofType(classOf[java.lang.Long])
.defaultsTo(60*1000L)
val maxMemoryBytesOpt = parser.accepts("max-memory-bytes",
"The total memory used by the producer to buffer records waiting to be sent to the server.")
.withRequiredArg
.describedAs("total memory in bytes")
.ofType(classOf[java.lang.Long])
.defaultsTo(32 * 1024 * 1024L)
val maxPartitionMemoryBytesOpt = parser.accepts("max-partition-memory-bytes",
"The buffer size allocated for a partition. When records are received which are smaller than this size the producer " +
"will attempt to optimistically group them together until this size is reached.")
.withRequiredArg
.describedAs("memory in bytes per partition")
.ofType(classOf[java.lang.Long])
.defaultsTo(16 * 1024L)
val valueEncoderOpt = parser.accepts("value-serializer", "The class name of the message encoder implementation to use for serializing values.")
.withRequiredArg
.describedAs("encoder_class")
.ofType(classOf[java.lang.String])
.defaultsTo(classOf[StringEncoder].getName)
.defaultsTo(classOf[DefaultEncoder].getName)
val keyEncoderOpt = parser.accepts("key-serializer", "The class name of the message encoder implementation to use for serializing keys.")
.withRequiredArg
.describedAs("encoder_class")
.ofType(classOf[java.lang.String])
.defaultsTo(classOf[StringEncoder].getName)
.defaultsTo(classOf[DefaultEncoder].getName)
val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " +
"By default each line is read as a separate message.")
.withRequiredArg
@ -105,7 +162,7 @@ object ConsoleProducer {
.withRequiredArg
.describedAs("prop")
.ofType(classOf[String])
val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.")
val options = parser.parse(args : _*)
for(arg <- List(topicOpt, brokerListOpt)) {
@ -116,6 +173,8 @@ object ConsoleProducer {
}
}
import scala.collection.JavaConversions._
val useNewProducer = options.has(useNewProducerOpt)
val topic = options.valueOf(topicOpt)
val brokerList = options.valueOf(brokerListOpt)
val sync = options.has(syncOpt)
@ -126,76 +185,28 @@ object ConsoleProducer {
val queueEnqueueTimeoutMs = options.valueOf(queueEnqueueTimeoutMsOpt)
val requestRequiredAcks = options.valueOf(requestRequiredAcksOpt)
val requestTimeoutMs = options.valueOf(requestTimeoutMsOpt)
val messageSendMaxRetries = options.valueOf(messageSendMaxRetriesOpt)
val retryBackoffMs = options.valueOf(retryBackoffMsOpt)
val keyEncoderClass = options.valueOf(keyEncoderOpt)
val valueEncoderClass = options.valueOf(valueEncoderOpt)
val readerClass = options.valueOf(messageReaderOpt)
val socketBuffer = options.valueOf(socketBufferSizeOpt)
val cmdLineProps = parseLineReaderArgs(options.valuesOf(propertyOpt))
val cmdLineProps = CommandLineUtils.parseCommandLineArgs(options.valuesOf(propertyOpt))
cmdLineProps.put("topic", topic)
val props = new Properties()
props.put("metadata.broker.list", brokerList)
val codec = if(compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec
props.put("compression.codec", codec.toString)
props.put("producer.type", if(sync) "sync" else "async")
if(options.has(batchSizeOpt))
props.put("batch.num.messages", batchSize.toString)
props.put("message.send.max.retries", options.valueOf(messageSendMaxRetriesOpt).toString)
props.put("retry.backoff.ms", options.valueOf(retryBackoffMsOpt).toString)
props.put("queue.buffering.max.ms", sendTimeout.toString)
props.put("queue.buffering.max.messages", queueSize.toString)
props.put("queue.enqueue.timeout.ms", queueEnqueueTimeoutMs.toString)
props.put("request.required.acks", requestRequiredAcks.toString)
props.put("request.timeout.ms", requestTimeoutMs.toString)
props.put("key.serializer.class", keyEncoderClass)
props.put("serializer.class", valueEncoderClass)
props.put("send.buffer.bytes", socketBuffer.toString)
val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef, AnyRef]]
reader.init(System.in, cmdLineProps)
try {
val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
producer.close()
}
})
var message: KeyedMessage[AnyRef, AnyRef] = null
do {
message = reader.readMessage()
if(message != null)
producer.send(message)
} while(message != null)
} catch {
case e: Exception =>
e.printStackTrace
System.exit(1)
}
System.exit(0)
/* new producer related configs */
val maxMemoryBytes = options.valueOf(maxMemoryBytesOpt)
val maxPartitionMemoryBytes = options.valueOf(maxPartitionMemoryBytesOpt)
val metadataExpiryMs = options.valueOf(metadataExpiryMsOpt)
val metadataFetchTimeoutMs = options.valueOf(metadataFetchTimeoutMsOpt)
}
def parseLineReaderArgs(args: Iterable[String]): Properties = {
val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0)
if(!splits.forall(_.length == 2)) {
System.err.println("Invalid line reader properties: " + args.mkString(" "))
System.exit(1)
}
val props = new Properties
for(a <- splits)
props.put(a(0), a(1))
props
}
trait MessageReader[K,V] {
trait MessageReader {
def init(inputStream: InputStream, props: Properties) {}
def readMessage(): KeyedMessage[K,V]
def readMessage(): KeyedMessage[Array[Byte], Array[Byte]]
def close() {}
}
class LineMessageReader extends MessageReader[String, String] {
class LineMessageReader extends MessageReader {
var topic: String = null
var reader: BufferedReader = null
var parseKey = false
@ -222,17 +233,84 @@ object ConsoleProducer {
line.indexOf(keySeparator) match {
case -1 =>
if(ignoreError)
new KeyedMessage(topic, line)
new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes())
else
throw new KafkaException("No key found on line " + lineNumber + ": " + line)
case n =>
new KeyedMessage(topic,
line.substring(0, n),
if(n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size))
new KeyedMessage[Array[Byte], Array[Byte]](topic,
line.substring(0, n).getBytes,
(if(n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes())
}
case (line, false) =>
new KeyedMessage(topic, line)
new KeyedMessage[Array[Byte], Array[Byte]](topic, line.getBytes())
}
}
}
trait Producer {
def send(topic: String, key: Array[Byte], bytes: Array[Byte])
def close()
}
class NewShinyProducer(producerConfig: ProducerConfig) extends Producer {
val props = new Properties()
props.put("metadata.broker.list", producerConfig.brokerList)
val codec = if(producerConfig.compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec
props.put("compression.codec", codec.toString)
props.put("send.buffer.bytes", producerConfig.socketBuffer.toString)
props.put("metadata.fetch.backoff.ms", producerConfig.retryBackoffMs.toString)
props.put("metadata.expiry.ms", producerConfig.metadataExpiryMs.toString)
props.put("metadata.fetch.timeout.ms", producerConfig.metadataFetchTimeoutMs.toString)
props.put("request.required.acks", producerConfig.requestRequiredAcks.toString)
props.put("request.timeout.ms", producerConfig.requestTimeoutMs.toString)
props.put("request.retries", producerConfig.messageSendMaxRetries.toString)
props.put("linger.ms", producerConfig.sendTimeout.toString)
if(producerConfig.queueEnqueueTimeoutMs != -1)
props.put("block.on.buffer.full", "false")
props.put("total.memory.bytes", producerConfig.maxMemoryBytes.toString)
props.put("max.partition.bytes", producerConfig.maxPartitionMemoryBytes.toString)
props.put("client.id", "console-producer")
val producer = new KafkaProducer(props)
def send(topic: String, key: Array[Byte], bytes: Array[Byte]) {
val response = this.producer.send(new ProducerRecord(topic, key, bytes))
if(producerConfig.sync) {
response.get()
}
}
def close() {
this.producer.close()
}
}
class OldProducer(producerConfig: ConsoleProducer.ProducerConfig) extends Producer {
val props = new Properties()
props.put("metadata.broker.list", producerConfig.brokerList)
val codec = if(producerConfig.compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec
props.put("compression.codec", codec.toString)
props.put("producer.type", if(producerConfig.sync) "sync" else "async")
props.put("batch.num.messages", producerConfig.batchSize.toString)
props.put("message.send.max.retries", producerConfig.messageSendMaxRetries.toString)
props.put("retry.backoff.ms", producerConfig.retryBackoffMs.toString)
props.put("queue.buffering.max.ms", producerConfig.sendTimeout.toString)
props.put("queue.buffering.max.messages", producerConfig.queueSize.toString)
props.put("queue.enqueue.timeout.ms", producerConfig.queueEnqueueTimeoutMs.toString)
props.put("request.required.acks", producerConfig.requestRequiredAcks.toString)
props.put("request.timeout.ms", producerConfig.requestTimeoutMs.toString)
props.put("key.serializer.class", producerConfig.keyEncoderClass)
props.put("serializer.class", producerConfig.valueEncoderClass)
props.put("send.buffer.bytes", producerConfig.socketBuffer.toString)
props.put("topic.metadata.refresh.interval.ms", producerConfig.metadataExpiryMs.toString)
props.put("client.id", "console-producer")
val producer = new kafka.producer.Producer[Array[Byte], Array[Byte]](new kafka.producer.ProducerConfig(props))
def send(topic: String, key: Array[Byte], bytes: Array[Byte]) {
this.producer.send(new KeyedMessage[Array[Byte], Array[Byte]](topic, key, bytes))
}
def close() {
this.producer.close()
}
}
}

View File

@ -23,18 +23,15 @@ import org.apache.log4j.AppenderSkeleton
import org.apache.log4j.helpers.LogLog
import kafka.utils.Logging
import java.util.{Properties, Date}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
class KafkaLog4jAppender extends AppenderSkeleton with Logging {
var topic:String = null
var serializerClass:String = null
var brokerList:String = null
var producerType:String = null
var compressionCodec:String = null
var enqueueTimeout:String = null
var queueSize:String = null
var requiredNumAcks: Int = Int.MaxValue
private var producer: Producer[String, String] = null
private var producer: KafkaProducer = null
def getTopic:String = topic
def setTopic(topic: String) { this.topic = topic }
@ -42,21 +39,9 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
def getBrokerList:String = brokerList
def setBrokerList(brokerList: String) { this.brokerList = brokerList }
def getSerializerClass:String = serializerClass
def setSerializerClass(serializerClass:String) { this.serializerClass = serializerClass }
def getProducerType:String = producerType
def setProducerType(producerType:String) { this.producerType = producerType }
def getCompressionCodec:String = compressionCodec
def setCompressionCodec(compressionCodec:String) { this.compressionCodec = compressionCodec }
def getEnqueueTimeout:String = enqueueTimeout
def setEnqueueTimeout(enqueueTimeout:String) { this.enqueueTimeout = enqueueTimeout }
def getQueueSize:String = queueSize
def setQueueSize(queueSize:String) { this.queueSize = queueSize }
def getRequiredNumAcks:Int = requiredNumAcks
def setRequiredNumAcks(requiredNumAcks:Int) { this.requiredNumAcks = requiredNumAcks }
@ -69,28 +54,17 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging {
throw new MissingConfigException("The metadata.broker.list property should be specified")
if(topic == null)
throw new MissingConfigException("topic must be specified by the Kafka log4j appender")
if(serializerClass == null) {
serializerClass = "kafka.serializer.StringEncoder"
LogLog.debug("Using default encoder - kafka.serializer.StringEncoder")
}
props.put("serializer.class", serializerClass)
//These have default values in ProducerConfig and AsyncProducerConfig. We don't care if they're not specified
if(producerType != null) props.put("producer.type", producerType)
if(compressionCodec != null) props.put("compression.codec", compressionCodec)
if(enqueueTimeout != null) props.put("queue.enqueue.timeout.ms", enqueueTimeout)
if(queueSize != null) props.put("queue.buffering.max.messages", queueSize)
if(requiredNumAcks != Int.MaxValue) props.put("request.required.acks", requiredNumAcks.toString)
val config : ProducerConfig = new ProducerConfig(props)
producer = new Producer[String, String](config)
LogLog.debug("Kafka producer connected to " + config.brokerList)
producer = new KafkaProducer(props)
LogLog.debug("Kafka producer connected to " + brokerList)
LogLog.debug("Logging for topic: " + topic)
}
override def append(event: LoggingEvent) {
val message = subAppend(event)
LogLog.debug("[" + new Date(event.getTimeStamp).toString + "]" + message)
val messageData = new KeyedMessage[String, String](topic, message)
producer.send(messageData);
producer.send(new ProducerRecord(topic, message.getBytes()));
}
def subAppend(event: LoggingEvent): String = {

View File

@ -20,11 +20,10 @@ package kafka.tools
import joptsimple.OptionParser
import java.util.concurrent.{Executors, CountDownLatch}
import java.util.Properties
import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
import kafka.consumer._
import kafka.utils.{Logging, ZkUtils}
import kafka.utils.{CommandLineUtils, Logging, ZkUtils}
import kafka.api.OffsetRequest
import kafka.message.CompressionCodec
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
object ReplayLogProducer extends Logging {
@ -88,17 +87,6 @@ object ReplayLogProducer extends Logging {
.describedAs("count")
.ofType(classOf[java.lang.Integer])
.defaultsTo(-1)
val asyncOpt = parser.accepts("async", "If set, messages are sent asynchronously.")
val delayMSBtwBatchOpt = parser.accepts("delay-btw-batch-ms", "Delay in ms between 2 batch sends.")
.withRequiredArg
.describedAs("ms")
.ofType(classOf[java.lang.Long])
.defaultsTo(0)
val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch.")
.withRequiredArg
.describedAs("batch size")
.ofType(classOf[java.lang.Integer])
.defaultsTo(200)
val numThreadsOpt = parser.accepts("threads", "Number of sending threads.")
.withRequiredArg
.describedAs("threads")
@ -109,11 +97,12 @@ object ReplayLogProducer extends Logging {
.describedAs("size")
.ofType(classOf[java.lang.Integer])
.defaultsTo(5000)
val compressionCodecOption = parser.accepts("compression-codec", "If set, messages are sent compressed")
val propertyOpt = parser.accepts("property", "A mechanism to pass properties in the form key=value to the producer. " +
"This allows the user to override producer properties that are not exposed by the existing command line arguments")
.withRequiredArg
.describedAs("compression codec ")
.ofType(classOf[java.lang.Integer])
.defaultsTo(0)
.describedAs("producer properties")
.ofType(classOf[String])
val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
val options = parser.parse(args : _*)
for(arg <- List(brokerListOpt, inputTopicOpt)) {
@ -126,31 +115,19 @@ object ReplayLogProducer extends Logging {
val zkConnect = options.valueOf(zkConnectOpt)
val brokerList = options.valueOf(brokerListOpt)
val numMessages = options.valueOf(numMessagesOpt).intValue
val isAsync = options.has(asyncOpt)
val delayedMSBtwSend = options.valueOf(delayMSBtwBatchOpt).longValue
var batchSize = options.valueOf(batchSizeOpt).intValue
val numThreads = options.valueOf(numThreadsOpt).intValue
val inputTopic = options.valueOf(inputTopicOpt)
val outputTopic = options.valueOf(outputTopicOpt)
val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue)
val isSync = options.has(syncOpt)
import scala.collection.JavaConversions._
val producerProps = CommandLineUtils.parseCommandLineArgs(options.valuesOf(propertyOpt))
producerProps.put("metadata.broker.list", brokerList)
}
class ZKConsumerThread(config: Config, stream: KafkaStream[Array[Byte], Array[Byte]]) extends Thread with Logging {
val shutdownLatch = new CountDownLatch(1)
val props = new Properties()
props.put("metadata.broker.list", config.brokerList)
props.put("reconnect.interval", Integer.MAX_VALUE.toString)
props.put("send.buffer.bytes", (64*1024).toString)
props.put("compression.codec", config.compressionCodec.codec.toString)
props.put("batch.num.messages", config.batchSize.toString)
props.put("queue.enqueue.timeout.ms", "-1")
if(config.isAsync)
props.put("producer.type", "async")
val producerConfig = new ProducerConfig(props)
val producer = new Producer[Array[Byte], Array[Byte]](producerConfig)
val producer = new KafkaProducer(config.producerProps)
override def run() {
info("Starting consumer thread..")
@ -163,9 +140,11 @@ object ReplayLogProducer extends Logging {
stream
for (messageAndMetadata <- iter) {
try {
producer.send(new KeyedMessage[Array[Byte], Array[Byte]](config.outputTopic, messageAndMetadata.message))
if (config.delayedMSBtwSend > 0 && (messageCount + 1) % config.batchSize == 0)
Thread.sleep(config.delayedMSBtwSend)
val response = producer.send(new ProducerRecord(config.outputTopic,
messageAndMetadata.key(), messageAndMetadata.message()))
if(config.isSync) {
response.get()
}
messageCount += 1
}catch {
case ie: Exception => error("Skipping this message", ie)

View File

@ -15,12 +15,11 @@
* limitations under the License.
*/
package kafka
package kafka.tools
import java.util.Properties
import kafka.consumer._
import kafka.producer._
import kafka.message._
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
object TestEndToEndLatency {
def main(args: Array[String]) {
@ -36,7 +35,7 @@ object TestEndToEndLatency {
val consumerProps = new Properties()
consumerProps.put("group.id", topic)
consumerProps.put("auto.commit", "true")
consumerProps.put("auto.commit.enable", "true")
consumerProps.put("auto.offset.reset", "largest")
consumerProps.put("zookeeper.connect", zkConnect)
consumerProps.put("socket.timeout.ms", 1201000.toString)
@ -48,14 +47,16 @@ object TestEndToEndLatency {
val producerProps = new Properties()
producerProps.put("metadata.broker.list", brokerList)
producerProps.put("producer.type", "sync")
val producer = new Producer[Any, Any](new ProducerConfig(producerProps))
producerProps.put("linger.ms", "0")
producerProps.put("block.on.buffer.full", "true")
val producer = new KafkaProducer(producerProps)
val message = "hello there beautiful".getBytes
var totalTime = 0.0
for (i <- 0 until numMessages) {
var begin = System.nanoTime
producer.send(new KeyedMessage(topic, message))
val response = producer.send(new ProducerRecord(topic, message))
response.get()
val received = iter.next
val elapsed = System.nanoTime - begin
// poor man's progress bar

View File

@ -15,20 +15,18 @@
* limitations under the License.
*/
package kafka
package kafka.tools
import joptsimple.OptionParser
import java.util.Properties
import java.util.Random
import java.io._
import scala.io.Source
import scala.io.BufferedSource
import kafka.producer._
import kafka.consumer._
import kafka.serializer._
import kafka.utils._
import kafka.log.FileMessageSet
import kafka.log.Log
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
/**
* This is a torture test that runs against an existing broker. Here is how it works:
@ -123,14 +121,14 @@ object TestLogCleaning {
val reduction = 1.0 - consumedLines.toDouble/producedLines.toDouble
println("%d rows of data produced, %d rows of data consumed (%.1f%% reduction).".format(producedLines, consumedLines, 100 * reduction))
println("Deduplicating and validating output files...")
println("De-duplicating and validating output files...")
validateOutput(producedDataFile, consumedDataFile)
producedDataFile.delete()
consumedDataFile.delete()
}
def dumpLog(dir: File) {
require(dir.exists, "Non-existant directory: " + dir.getAbsolutePath)
require(dir.exists, "Non-existent directory: " + dir.getAbsolutePath)
for(file <- dir.list.sorted; if file.endsWith(Log.LogFileSuffix)) {
val ms = new FileMessageSet(new File(dir, file))
for(entry <- ms) {
@ -242,13 +240,9 @@ object TestLogCleaning {
dups: Int,
percentDeletes: Int): File = {
val producerProps = new Properties
producerProps.setProperty("producer.type", "async")
producerProps.setProperty("block.on.buffer.full", "true")
producerProps.setProperty("metadata.broker.list", brokerUrl)
producerProps.setProperty("serializer.class", classOf[StringEncoder].getName)
producerProps.setProperty("key.serializer.class", classOf[StringEncoder].getName)
producerProps.setProperty("queue.enqueue.timeout.ms", "-1")
producerProps.setProperty("batch.num.messages", 1000.toString)
val producer = new Producer[String, String](new ProducerConfig(producerProps))
val producer = new KafkaProducer(producerProps)
val rand = new Random(1)
val keyCount = (messages / dups).toInt
val producedFile = File.createTempFile("kafka-log-cleaner-produced-", ".txt")
@ -260,9 +254,9 @@ object TestLogCleaning {
val delete = i % 100 < percentDeletes
val msg =
if(delete)
new KeyedMessage[String, String](topic = topic, key = key.toString, message = null)
new ProducerRecord(topic, key.toString.getBytes(), null)
else
new KeyedMessage[String, String](topic = topic, key = key.toString, message = i.toString)
new ProducerRecord(topic, key.toString.getBytes(), i.toString.getBytes())
producer.send(msg)
producedWriter.write(TestRecord(topic, key, i, delete).toString)
producedWriter.newLine()

View File

@ -18,6 +18,7 @@
import joptsimple.{OptionSpec, OptionSet, OptionParser}
import scala.collection.Set
import java.util.Properties
/**
* Helper functions for dealing with command line utilities
@ -45,4 +46,16 @@ object CommandLineUtils extends Logging {
}
}
}
}
def parseCommandLineArgs(args: Iterable[String]): Properties = {
val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0)
if(!splits.forall(_.length == 2)) {
System.err.println("Invalid command line properties: " + args.mkString(" "))
System.exit(1)
}
val props = new Properties
for(a <- splits)
props.put(a(0), a(1))
props
}
}