diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala index 5888f1e30df..a8bd47f1c93 100644 --- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala +++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala @@ -97,6 +97,13 @@ object ProducerPerformance extends Logging { .describedAs("compression codec ") .ofType(classOf[java.lang.Integer]) .defaultsTo(0) + val initialMessageIdOpt = parser.accepts("initial-message-id", "The is used for generating test data, If set, messages will be tagged with an " + + "ID and sent by producer starting from this ID sequentially. Message content will be String type and " + + "in the form of 'Message:000...1:xxx...'") + .withRequiredArg() + .describedAs("initial message id") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(-1) val options = parser.parse(args : _*) for(arg <- List(topicOpt, brokerInfoOpt, numMessagesOpt)) { @@ -119,6 +126,7 @@ object ProducerPerformance extends Logging { var batchSize = options.valueOf(batchSizeOpt).intValue val numThreads = options.valueOf(numThreadsOpt).intValue val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue) + var initialMessageId = options.valueOf(initialMessageIdOpt).intValue() } private def getStringOfLength(len: Int) : String = { @@ -157,6 +165,35 @@ object ProducerPerformance extends Logging { } val producerConfig = new ProducerConfig(props) val producer = new Producer[Message, Message](producerConfig) + val seqIdNumDigit = 10 // no. of digits for max int value + // generate the sequential message ID + private val SEP = ":" // message field separator + private val messageIdLabel = "MessageID" + private val threadIdLabel = "ThreadID" + private val topicLabel = "Topic" + private var leftPaddedSeqId : String = "" + + private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Array[Byte] = { + // Each thread gets a unique range of sequential no. for its ids. + // Eg. 1000 msg in 10 threads => 100 msg per thread + // thread 0 IDs : 0 ~ 99 + // thread 1 IDs : 100 ~ 199 + // thread 2 IDs : 200 ~ 299 + // . . . + leftPaddedSeqId = String.format("%0"+seqIdNumDigit+"d", long2Long(msgId)) + + val msgHeader = topicLabel + SEP + + topic + SEP + + threadIdLabel + SEP + + threadId + SEP + + messageIdLabel + SEP + + leftPaddedSeqId + SEP + + // pad the rest of the message with 'x' + val seqMsgString = String.format("%1$-"+msgSize+"s", msgHeader).replace(' ', 'x') + debug(seqMsgString) + return seqMsgString.getBytes() + } override def run { var bytesSent = 0L @@ -195,10 +232,17 @@ object ProducerPerformance extends Logging { nSends += config.batchSize }else { if(!config.isFixSize) { - strLength = rand.nextInt(config.messageSize) - val messageBytes = getByteArrayOfLength(strLength) - rand.nextBytes(messageBytes) - val message = new Message(messageBytes) + strLength = rand.nextInt(config.messageSize) + 1 + var message : Message = null + if (config.initialMessageId > -1) { + val seqId = config.initialMessageId + (messagesPerThread * threadId) + j + message = new Message(generateMessageWithSeqId(config.topic, seqId, strLength)) + } + else { + val messageBytes = getByteArrayOfLength(strLength) + rand.nextBytes(messageBytes) + message = new Message(messageBytes) + } producer.send(new ProducerData[Message,Message](config.topic, message)) debug(config.topic + "-checksum:" + message.checksum) bytesSent += message.payloadSize