Bug in serialize and collate logic in the DefaultEventHandler KAFKA-107; patched by Neha; reviewed by Jun

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1159452 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Neha Narkhede 2011-08-19 00:29:09 +00:00
parent 82b42163a5
commit 8a07bdb71e
2 changed files with 76 additions and 31 deletions

View File

@ -23,8 +23,9 @@ import org.apache.log4j.Logger
import kafka.api.ProducerRequest import kafka.api.ProducerRequest
import kafka.serializer.Encoder import kafka.serializer.Encoder
import java.util.Properties import java.util.Properties
import kafka.producer.{ProducerConfig, SyncProducer}
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet} import kafka.message.{NoCompressionCodec, ByteBufferMessageSet}
import kafka.producer.{ProducerConfig, SyncProducerConfigShared, SyncProducerConfig, SyncProducer}
private[kafka] class DefaultEventHandler[T](val config: ProducerConfig, private[kafka] class DefaultEventHandler[T](val config: ProducerConfig,
val cbkHandler: CallbackHandler[T]) extends EventHandler[T] { val cbkHandler: CallbackHandler[T]) extends EventHandler[T] {
@ -37,6 +38,7 @@ private[kafka] class DefaultEventHandler[T](val config: ProducerConfig,
var processedEvents = events var processedEvents = events
if(cbkHandler != null) if(cbkHandler != null)
processedEvents = cbkHandler.beforeSendingData(events) processedEvents = cbkHandler.beforeSendingData(events)
send(serialize(collate(processedEvents), serializer), syncProducer) send(serialize(collate(processedEvents), serializer), syncProducer)
} }
@ -51,7 +53,6 @@ private[kafka] class DefaultEventHandler[T](val config: ProducerConfig,
private def serialize(eventsPerTopic: Map[(String,Int), Seq[T]], private def serialize(eventsPerTopic: Map[(String,Int), Seq[T]],
serializer: Encoder[T]): Map[(String, Int), ByteBufferMessageSet] = { serializer: Encoder[T]): Map[(String, Int), ByteBufferMessageSet] = {
import scala.collection.JavaConversions._
val eventsPerTopicMap = eventsPerTopic.map(e => ((e._1._1, e._1._2) , e._2.map(l => serializer.toMessage(l)))) val eventsPerTopicMap = eventsPerTopic.map(e => ((e._1._1, e._1._2) , e._2.map(l => serializer.toMessage(l))))
val topicsAndPartitions = eventsPerTopic.map(e => e._1) val topicsAndPartitions = eventsPerTopic.map(e => e._1)
/** enforce the compressed.topics config here. /** enforce the compressed.topics config here.
@ -60,34 +61,36 @@ private[kafka] class DefaultEventHandler[T](val config: ProducerConfig,
* If the list of compressed topics is empty, then enable the specified compression codec for all topics * If the list of compressed topics is empty, then enable the specified compression codec for all topics
* If the compression codec is NoCompressionCodec, compression is disabled for all topics * If the compression codec is NoCompressionCodec, compression is disabled for all topics
*/ */
val messages = eventsPerTopicMap.map(e => {
config.compressionCodec match { val messagesPerTopicPartition = eventsPerTopicMap.map { topicAndEvents =>
case NoCompressionCodec => ((topicAndEvents._1._1, topicAndEvents._1._2),
if(logger.isDebugEnabled) config.compressionCodec match {
logger.debug("Sending %d messages with no compression".format(e._2.size)) case NoCompressionCodec =>
new ByteBufferMessageSet(NoCompressionCodec, e._2: _*) if(logger.isDebugEnabled)
case _ => logger.debug("Sending %d messages with no compression".format(topicAndEvents._2.size))
config.compressedTopics.size match { new ByteBufferMessageSet(NoCompressionCodec, topicAndEvents._2: _*)
case 0 => case _ =>
if(logger.isDebugEnabled) config.compressedTopics.size match {
logger.debug("Sending %d messages with compression %d".format(e._2.size, config.compressionCodec.codec)) case 0 =>
new ByteBufferMessageSet(config.compressionCodec, e._2: _*)
case _ =>
if(config.compressedTopics.contains(e._1._1)) {
if(logger.isDebugEnabled) if(logger.isDebugEnabled)
logger.debug("Sending %d messages with compression %d".format(e._2.size, config.compressionCodec.codec)) logger.debug("Sending %d messages with compression %d".format(topicAndEvents._2.size, config.compressionCodec.codec))
new ByteBufferMessageSet(config.compressionCodec, e._2: _*) new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*)
} case _ =>
else { if(config.compressedTopics.contains(topicAndEvents._1._1)) {
if(logger.isDebugEnabled) if(logger.isDebugEnabled)
logger.debug("Sending %d messages with no compression as %s is not in compressed.topics - %s" logger.debug("Sending %d messages with compression %d".format(topicAndEvents._2.size, config.compressionCodec.codec))
.format(e._2.size, e._1._1, config.compressedTopics.toString)) new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*)
new ByteBufferMessageSet(NoCompressionCodec, e._2: _*) }
} else {
} if(logger.isDebugEnabled)
} logger.debug("Sending %d messages with no compression as %s is not in compressed.topics - %s"
}) .format(topicAndEvents._2.size, topicAndEvents._1._1, config.compressedTopics.toString))
topicsAndPartitions.zip(messages) new ByteBufferMessageSet(NoCompressionCodec, topicAndEvents._2: _*)
}
}
})
}
messagesPerTopicPartition
} }
private def collate(events: Seq[QueueItem[T]]): Map[(String,Int), Seq[T]] = { private def collate(events: Seq[QueueItem[T]]): Map[(String,Int), Seq[T]] = {
@ -100,8 +103,8 @@ private[kafka] class DefaultEventHandler[T](val config: ProducerConfig,
val topicEvents = remainingEvents partition (e => e.getTopic.equals(topic)) val topicEvents = remainingEvents partition (e => e.getTopic.equals(topic))
remainingEvents = topicEvents._2 remainingEvents = topicEvents._2
distinctPartitions.foreach { p => distinctPartitions.foreach { p =>
val topicPartitionEvents = topicEvents._1 partition (e => (e.getPartition == p)) val topicPartitionEvents = (topicEvents._1 partition (e => (e.getPartition == p)))._1
collatedEvents += ( (topic, p) -> topicPartitionEvents._1.map(q => q.getData).toSeq) collatedEvents += ( (topic, p) -> topicPartitionEvents.map(q => q.getData))
} }
} }
collatedEvents collatedEvents

View File

@ -243,6 +243,48 @@ class AsyncProducerTest extends JUnitSuite {
} }
@Test
def testCollateAndSerializeEvents() {
val basicProducer = EasyMock.createMock(classOf[SyncProducer])
basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic2, 1,
getMessageSetOfSize(List(message2), 5)),
new ProducerRequest(topic1, 0,
getMessageSetOfSize(List(message1), 5)),
new ProducerRequest(topic1, 1,
getMessageSetOfSize(List(message1), 5)),
new ProducerRequest(topic2, 0,
getMessageSetOfSize(List(message2), 5)))))
EasyMock.expectLastCall
basicProducer.close
EasyMock.expectLastCall
EasyMock.replay(basicProducer)
val props = new Properties()
props.put("host", "localhost")
props.put("port", "9092")
props.put("queue.size", "50")
props.put("serializer.class", "kafka.producer.StringSerializer")
props.put("batch.size", "20")
val config = new AsyncProducerConfig(props)
val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
producer.start
val serializer = new StringSerializer
for(i <- 0 until 5) {
producer.send(topic2, messageContent2, 0)
producer.send(topic2, messageContent2, 1)
producer.send(topic1, messageContent1, 0)
producer.send(topic1, messageContent1, 1)
}
producer.close
EasyMock.verify(basicProducer)
}
private def getMessageSetOfSize(messages: List[Message], counts: Int): ByteBufferMessageSet = { private def getMessageSetOfSize(messages: List[Message], counts: Int): ByteBufferMessageSet = {
var messageList = new Array[Message](counts) var messageList = new Array[Message](counts)
for(message <- messages) { for(message <- messages) {