From 8a07bdb71e74377d7b8ae3c53fe1e59f46bbe2a8 Mon Sep 17 00:00:00 2001 From: Neha Narkhede Date: Fri, 19 Aug 2011 00:29:09 +0000 Subject: [PATCH] Bug in serialize and collate logic in the DefaultEventHandler KAFKA-107; patched by Neha; reviewed by Jun git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1159452 13f79535-47bb-0310-9956-ffa450edef68 --- .../producer/async/DefaultEventHandler.scala | 65 ++++++++++--------- .../kafka/producer/AsyncProducerTest.scala | 42 ++++++++++++ 2 files changed, 76 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 7b2ef645b3f..ae167dff67b 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -23,8 +23,9 @@ import org.apache.log4j.Logger import kafka.api.ProducerRequest import kafka.serializer.Encoder import java.util.Properties +import kafka.producer.{ProducerConfig, SyncProducer} import kafka.message.{NoCompressionCodec, ByteBufferMessageSet} -import kafka.producer.{ProducerConfig, SyncProducerConfigShared, SyncProducerConfig, SyncProducer} + private[kafka] class DefaultEventHandler[T](val config: ProducerConfig, val cbkHandler: CallbackHandler[T]) extends EventHandler[T] { @@ -37,6 +38,7 @@ private[kafka] class DefaultEventHandler[T](val config: ProducerConfig, var processedEvents = events if(cbkHandler != null) processedEvents = cbkHandler.beforeSendingData(events) + send(serialize(collate(processedEvents), serializer), syncProducer) } @@ -51,7 +53,6 @@ private[kafka] class DefaultEventHandler[T](val config: ProducerConfig, private def serialize(eventsPerTopic: Map[(String,Int), Seq[T]], serializer: Encoder[T]): Map[(String, Int), ByteBufferMessageSet] = { - import scala.collection.JavaConversions._ val eventsPerTopicMap = eventsPerTopic.map(e => ((e._1._1, e._1._2) , e._2.map(l => serializer.toMessage(l)))) val topicsAndPartitions = eventsPerTopic.map(e => e._1) /** enforce the compressed.topics config here. @@ -60,34 +61,36 @@ private[kafka] class DefaultEventHandler[T](val config: ProducerConfig, * If the list of compressed topics is empty, then enable the specified compression codec for all topics * If the compression codec is NoCompressionCodec, compression is disabled for all topics */ - val messages = eventsPerTopicMap.map(e => { - config.compressionCodec match { - case NoCompressionCodec => - if(logger.isDebugEnabled) - logger.debug("Sending %d messages with no compression".format(e._2.size)) - new ByteBufferMessageSet(NoCompressionCodec, e._2: _*) - case _ => - config.compressedTopics.size match { - case 0 => - if(logger.isDebugEnabled) - logger.debug("Sending %d messages with compression %d".format(e._2.size, config.compressionCodec.codec)) - new ByteBufferMessageSet(config.compressionCodec, e._2: _*) - case _ => - if(config.compressedTopics.contains(e._1._1)) { + + val messagesPerTopicPartition = eventsPerTopicMap.map { topicAndEvents => + ((topicAndEvents._1._1, topicAndEvents._1._2), + config.compressionCodec match { + case NoCompressionCodec => + if(logger.isDebugEnabled) + logger.debug("Sending %d messages with no compression".format(topicAndEvents._2.size)) + new ByteBufferMessageSet(NoCompressionCodec, topicAndEvents._2: _*) + case _ => + config.compressedTopics.size match { + case 0 => if(logger.isDebugEnabled) - logger.debug("Sending %d messages with compression %d".format(e._2.size, config.compressionCodec.codec)) - new ByteBufferMessageSet(config.compressionCodec, e._2: _*) - } - else { - if(logger.isDebugEnabled) - logger.debug("Sending %d messages with no compression as %s is not in compressed.topics - %s" - .format(e._2.size, e._1._1, config.compressedTopics.toString)) - new ByteBufferMessageSet(NoCompressionCodec, e._2: _*) - } - } - } - }) - topicsAndPartitions.zip(messages) + logger.debug("Sending %d messages with compression %d".format(topicAndEvents._2.size, config.compressionCodec.codec)) + new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*) + case _ => + if(config.compressedTopics.contains(topicAndEvents._1._1)) { + if(logger.isDebugEnabled) + logger.debug("Sending %d messages with compression %d".format(topicAndEvents._2.size, config.compressionCodec.codec)) + new ByteBufferMessageSet(config.compressionCodec, topicAndEvents._2: _*) + } + else { + if(logger.isDebugEnabled) + logger.debug("Sending %d messages with no compression as %s is not in compressed.topics - %s" + .format(topicAndEvents._2.size, topicAndEvents._1._1, config.compressedTopics.toString)) + new ByteBufferMessageSet(NoCompressionCodec, topicAndEvents._2: _*) + } + } + }) + } + messagesPerTopicPartition } private def collate(events: Seq[QueueItem[T]]): Map[(String,Int), Seq[T]] = { @@ -100,8 +103,8 @@ private[kafka] class DefaultEventHandler[T](val config: ProducerConfig, val topicEvents = remainingEvents partition (e => e.getTopic.equals(topic)) remainingEvents = topicEvents._2 distinctPartitions.foreach { p => - val topicPartitionEvents = topicEvents._1 partition (e => (e.getPartition == p)) - collatedEvents += ( (topic, p) -> topicPartitionEvents._1.map(q => q.getData).toSeq) + val topicPartitionEvents = (topicEvents._1 partition (e => (e.getPartition == p)))._1 + collatedEvents += ( (topic, p) -> topicPartitionEvents.map(q => q.getData)) } } collatedEvents diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index d9441d820e2..bff30f59bac 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -243,6 +243,48 @@ class AsyncProducerTest extends JUnitSuite { } + @Test + def testCollateAndSerializeEvents() { + val basicProducer = EasyMock.createMock(classOf[SyncProducer]) + basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic2, 1, + getMessageSetOfSize(List(message2), 5)), + new ProducerRequest(topic1, 0, + getMessageSetOfSize(List(message1), 5)), + new ProducerRequest(topic1, 1, + getMessageSetOfSize(List(message1), 5)), + new ProducerRequest(topic2, 0, + getMessageSetOfSize(List(message2), 5))))) + + EasyMock.expectLastCall + basicProducer.close + EasyMock.expectLastCall + EasyMock.replay(basicProducer) + + val props = new Properties() + props.put("host", "localhost") + props.put("port", "9092") + props.put("queue.size", "50") + props.put("serializer.class", "kafka.producer.StringSerializer") + props.put("batch.size", "20") + + val config = new AsyncProducerConfig(props) + + val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer) + + producer.start + val serializer = new StringSerializer + for(i <- 0 until 5) { + producer.send(topic2, messageContent2, 0) + producer.send(topic2, messageContent2, 1) + producer.send(topic1, messageContent1, 0) + producer.send(topic1, messageContent1, 1) + } + + producer.close + EasyMock.verify(basicProducer) + + } + private def getMessageSetOfSize(messages: List[Message], counts: Int): ByteBufferMessageSet = { var messageList = new Array[Message](counts) for(message <- messages) {