mirror of https://github.com/apache/kafka.git
kafka-882; Enhance 0.7 ProducerPerformance to send sequential MessageID as in 0.8; patched by John Fung; reviewed by Jun Rao
This commit is contained in:
parent
68c8434f61
commit
a984f2fe5e
|
@ -97,6 +97,13 @@ object ProducerPerformance extends Logging {
|
||||||
.describedAs("compression codec ")
|
.describedAs("compression codec ")
|
||||||
.ofType(classOf[java.lang.Integer])
|
.ofType(classOf[java.lang.Integer])
|
||||||
.defaultsTo(0)
|
.defaultsTo(0)
|
||||||
|
val initialMessageIdOpt = parser.accepts("initial-message-id", "The is used for generating test data, If set, messages will be tagged with an " +
|
||||||
|
"ID and sent by producer starting from this ID sequentially. Message content will be String type and " +
|
||||||
|
"in the form of 'Message:000...1:xxx...'")
|
||||||
|
.withRequiredArg()
|
||||||
|
.describedAs("initial message id")
|
||||||
|
.ofType(classOf[java.lang.Integer])
|
||||||
|
.defaultsTo(-1)
|
||||||
|
|
||||||
val options = parser.parse(args : _*)
|
val options = parser.parse(args : _*)
|
||||||
for(arg <- List(topicOpt, brokerInfoOpt, numMessagesOpt)) {
|
for(arg <- List(topicOpt, brokerInfoOpt, numMessagesOpt)) {
|
||||||
|
@ -119,6 +126,7 @@ object ProducerPerformance extends Logging {
|
||||||
var batchSize = options.valueOf(batchSizeOpt).intValue
|
var batchSize = options.valueOf(batchSizeOpt).intValue
|
||||||
val numThreads = options.valueOf(numThreadsOpt).intValue
|
val numThreads = options.valueOf(numThreadsOpt).intValue
|
||||||
val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue)
|
val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue)
|
||||||
|
var initialMessageId = options.valueOf(initialMessageIdOpt).intValue()
|
||||||
}
|
}
|
||||||
|
|
||||||
private def getStringOfLength(len: Int) : String = {
|
private def getStringOfLength(len: Int) : String = {
|
||||||
|
@ -157,6 +165,35 @@ object ProducerPerformance extends Logging {
|
||||||
}
|
}
|
||||||
val producerConfig = new ProducerConfig(props)
|
val producerConfig = new ProducerConfig(props)
|
||||||
val producer = new Producer[Message, Message](producerConfig)
|
val producer = new Producer[Message, Message](producerConfig)
|
||||||
|
val seqIdNumDigit = 10 // no. of digits for max int value
|
||||||
|
// generate the sequential message ID
|
||||||
|
private val SEP = ":" // message field separator
|
||||||
|
private val messageIdLabel = "MessageID"
|
||||||
|
private val threadIdLabel = "ThreadID"
|
||||||
|
private val topicLabel = "Topic"
|
||||||
|
private var leftPaddedSeqId : String = ""
|
||||||
|
|
||||||
|
private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Array[Byte] = {
|
||||||
|
// Each thread gets a unique range of sequential no. for its ids.
|
||||||
|
// Eg. 1000 msg in 10 threads => 100 msg per thread
|
||||||
|
// thread 0 IDs : 0 ~ 99
|
||||||
|
// thread 1 IDs : 100 ~ 199
|
||||||
|
// thread 2 IDs : 200 ~ 299
|
||||||
|
// . . .
|
||||||
|
leftPaddedSeqId = String.format("%0"+seqIdNumDigit+"d", long2Long(msgId))
|
||||||
|
|
||||||
|
val msgHeader = topicLabel + SEP +
|
||||||
|
topic + SEP +
|
||||||
|
threadIdLabel + SEP +
|
||||||
|
threadId + SEP +
|
||||||
|
messageIdLabel + SEP +
|
||||||
|
leftPaddedSeqId + SEP
|
||||||
|
|
||||||
|
// pad the rest of the message with 'x'
|
||||||
|
val seqMsgString = String.format("%1$-"+msgSize+"s", msgHeader).replace(' ', 'x')
|
||||||
|
debug(seqMsgString)
|
||||||
|
return seqMsgString.getBytes()
|
||||||
|
}
|
||||||
|
|
||||||
override def run {
|
override def run {
|
||||||
var bytesSent = 0L
|
var bytesSent = 0L
|
||||||
|
@ -195,10 +232,17 @@ object ProducerPerformance extends Logging {
|
||||||
nSends += config.batchSize
|
nSends += config.batchSize
|
||||||
}else {
|
}else {
|
||||||
if(!config.isFixSize) {
|
if(!config.isFixSize) {
|
||||||
strLength = rand.nextInt(config.messageSize)
|
strLength = rand.nextInt(config.messageSize) + 1
|
||||||
val messageBytes = getByteArrayOfLength(strLength)
|
var message : Message = null
|
||||||
rand.nextBytes(messageBytes)
|
if (config.initialMessageId > -1) {
|
||||||
val message = new Message(messageBytes)
|
val seqId = config.initialMessageId + (messagesPerThread * threadId) + j
|
||||||
|
message = new Message(generateMessageWithSeqId(config.topic, seqId, strLength))
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
val messageBytes = getByteArrayOfLength(strLength)
|
||||||
|
rand.nextBytes(messageBytes)
|
||||||
|
message = new Message(messageBytes)
|
||||||
|
}
|
||||||
producer.send(new ProducerData[Message,Message](config.topic, message))
|
producer.send(new ProducerData[Message,Message](config.topic, message))
|
||||||
debug(config.topic + "-checksum:" + message.checksum)
|
debug(config.topic + "-checksum:" + message.checksum)
|
||||||
bytesSent += message.payloadSize
|
bytesSent += message.payloadSize
|
||||||
|
|
Loading…
Reference in New Issue