KAFKA-391 Refactor fetch/producer requests to use maps instead of several arrays; patched by Joel Koshy; reviewed by Jun Rao.

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1386806 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Joel Jacob Koshy 2012-09-17 20:15:59 +00:00
parent c8f7587072
commit b688c3ba04
31 changed files with 762 additions and 832 deletions

View File

@ -26,10 +26,10 @@ import java.util.Random;
import kafka.etl.KafkaETLKey; import kafka.etl.KafkaETLKey;
import kafka.etl.KafkaETLRequest; import kafka.etl.KafkaETLRequest;
import kafka.etl.Props; import kafka.etl.Props;
import kafka.javaapi.message.ByteBufferMessageSet; import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.SyncProducer; import kafka.javaapi.producer.ProducerData;
import kafka.message.Message; import kafka.message.Message;
import kafka.producer.SyncProducerConfig; import kafka.producer.ProducerConfig;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.BytesWritable;
@ -47,7 +47,7 @@ public class DataGenerator {
System.currentTimeMillis()); System.currentTimeMillis());
protected Props _props; protected Props _props;
protected SyncProducer _producer = null; protected Producer _producer = null;
protected URI _uri = null; protected URI _uri = null;
protected String _topic; protected String _topic;
protected int _count; protected int _count;
@ -70,12 +70,12 @@ public class DataGenerator {
System.out.println("server uri:" + _uri.toString()); System.out.println("server uri:" + _uri.toString());
Properties producerProps = new Properties(); Properties producerProps = new Properties();
producerProps.put("host", _uri.getHost()); producerProps.put("broker.list", String.format("%s:%d", _uri.getHost(), _uri.getPort()));
producerProps.put("port", String.valueOf(_uri.getPort()));
producerProps.put("buffer.size", String.valueOf(TCP_BUFFER_SIZE)); producerProps.put("buffer.size", String.valueOf(TCP_BUFFER_SIZE));
producerProps.put("connect.timeout.ms", String.valueOf(CONNECT_TIMEOUT)); producerProps.put("connect.timeout.ms", String.valueOf(CONNECT_TIMEOUT));
producerProps.put("reconnect.interval", String.valueOf(RECONNECT_INTERVAL)); producerProps.put("reconnect.interval", String.valueOf(RECONNECT_INTERVAL));
_producer = new SyncProducer(new SyncProducerConfig(producerProps));
_producer = new Producer(new ProducerConfig(producerProps));
} }
@ -91,7 +91,8 @@ public class DataGenerator {
} }
// send events // send events
System.out.println(" send " + list.size() + " " + _topic + " count events to " + _uri); System.out.println(" send " + list.size() + " " + _topic + " count events to " + _uri);
_producer.send(_topic, new ByteBufferMessageSet(kafka.message.NoCompressionCodec$.MODULE$, list)); ProducerData<Integer, Message> pd = new ProducerData<Integer, Message>(_topic, null, list);
_producer.send(pd);
// close the producer // close the producer
_producer.close(); _producer.close();

View File

@ -18,60 +18,13 @@
package kafka.api package kafka.api
import java.nio.ByteBuffer import java.nio.ByteBuffer
import kafka.utils.Utils import kafka.utils.{nonthreadsafe, Utils}
import scala.collection.mutable.{HashMap, Buffer, ListBuffer} import scala.collection.immutable.Map
import kafka.common.{KafkaException, FetchRequestFormatException} import kafka.common.TopicAndPartition
object OffsetDetail {
def readFrom(buffer: ByteBuffer): OffsetDetail = { case class PartitionFetchInfo(offset: Long, fetchSize: Int)
val topic = Utils.readShortString(buffer, "UTF-8")
val partitionsCount = buffer.getInt
val partitions = new Array[Int](partitionsCount)
for (i <- 0 until partitions.length)
partitions(i) = buffer.getInt
val offsetsCount = buffer.getInt
val offsets = new Array[Long](offsetsCount)
for (i <- 0 until offsets.length)
offsets(i) = buffer.getLong
val fetchesCount = buffer.getInt
val fetchSizes = new Array[Int](fetchesCount)
for (i <- 0 until fetchSizes.length)
fetchSizes(i) = buffer.getInt
new OffsetDetail(topic, partitions, offsets, fetchSizes)
}
}
case class OffsetDetail(topic: String, partitions: Seq[Int], offsets: Seq[Long], fetchSizes: Seq[Int]) {
def writeTo(buffer: ByteBuffer) {
Utils.writeShortString(buffer, topic, "UTF-8")
if(partitions.size > Int.MaxValue || offsets.size > Int.MaxValue || fetchSizes.size > Int.MaxValue)
throw new KafkaException("Number of fetches in FetchRequest exceeds " + Int.MaxValue + ".")
buffer.putInt(partitions.length)
partitions.foreach(buffer.putInt(_))
buffer.putInt(offsets.length)
offsets.foreach(buffer.putLong(_))
buffer.putInt(fetchSizes.length)
fetchSizes.foreach(buffer.putInt(_))
}
def sizeInBytes(): Int = {
2 + topic.length() + // topic string
partitions.foldLeft(4)((s, _) => s + 4) + // each request partition (int)
offsets.foldLeft(4)((s, _) => s + 8) + // each request offset (long)
fetchSizes.foldLeft(4)((s,_) => s + 4) // each request fetch size
}
}
object FetchRequest { object FetchRequest {
val CurrentVersion = 1.shortValue() val CurrentVersion = 1.shortValue()
@ -85,18 +38,23 @@ object FetchRequest {
def readFrom(buffer: ByteBuffer): FetchRequest = { def readFrom(buffer: ByteBuffer): FetchRequest = {
val versionId = buffer.getShort val versionId = buffer.getShort
val correlationId = buffer.getInt val correlationId = buffer.getInt
val clientId = Utils.readShortString(buffer, "UTF-8") val clientId = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
val replicaId = buffer.getInt val replicaId = buffer.getInt
val maxWait = buffer.getInt val maxWait = buffer.getInt
val minBytes = buffer.getInt val minBytes = buffer.getInt
val offsetsCount = buffer.getInt val topicCount = buffer.getInt
val offsetInfo = new Array[OffsetDetail](offsetsCount) val pairs = (1 to topicCount).flatMap(_ => {
for(i <- 0 until offsetInfo.length) val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
offsetInfo(i) = OffsetDetail.readFrom(buffer) val partitionCount = buffer.getInt
(1 to partitionCount).map(_ => {
new FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, offsetInfo) val partitionId = buffer.getInt
val offset = buffer.getLong
val fetchSize = buffer.getInt
(TopicAndPartition(topic, partitionId), PartitionFetchInfo(offset, fetchSize))
})
})
FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, Map(pairs:_*))
} }
} }
case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
@ -105,50 +63,63 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
replicaId: Int = FetchRequest.DefaultReplicaId, replicaId: Int = FetchRequest.DefaultReplicaId,
maxWait: Int = FetchRequest.DefaultMaxWait, maxWait: Int = FetchRequest.DefaultMaxWait,
minBytes: Int = FetchRequest.DefaultMinBytes, minBytes: Int = FetchRequest.DefaultMinBytes,
offsetInfo: Seq[OffsetDetail] ) extends RequestOrResponse(Some(RequestKeys.FetchKey)) { requestInfo: Map[TopicAndPartition, PartitionFetchInfo])
extends RequestOrResponse(Some(RequestKeys.FetchKey)) {
// ensure that a topic "X" appears in at most one OffsetDetail /**
def validate() { * Partitions the request info into a map of maps (one for each topic).
if(offsetInfo == null) */
throw new FetchRequestFormatException("FetchRequest has null offsetInfo") lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
// We don't want to get fancy with groupBy's and filter's since we just want the first occurrence
var topics = Set[String]()
val iter = offsetInfo.iterator
while(iter.hasNext) {
val offsetData = iter.next()
val topic = offsetData.topic
if(topics.contains(topic))
throw new FetchRequestFormatException("FetchRequest has multiple OffsetDetails for topic: " + topic)
else
topics += topic
}
}
def writeTo(buffer: ByteBuffer) { def writeTo(buffer: ByteBuffer) {
// validate first
validate()
buffer.putShort(versionId) buffer.putShort(versionId)
buffer.putInt(correlationId) buffer.putInt(correlationId)
Utils.writeShortString(buffer, clientId, "UTF-8") Utils.writeShortString(buffer, clientId, RequestOrResponse.DefaultCharset)
buffer.putInt(replicaId) buffer.putInt(replicaId)
buffer.putInt(maxWait) buffer.putInt(maxWait)
buffer.putInt(minBytes) buffer.putInt(minBytes)
buffer.putInt(offsetInfo.size) buffer.putInt(requestInfoGroupedByTopic.size) // topic count
for(topicDetail <- offsetInfo) { requestInfoGroupedByTopic.foreach {
topicDetail.writeTo(buffer) case (topic, partitionFetchInfos) =>
Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset)
buffer.putInt(partitionFetchInfos.size) // partition count
partitionFetchInfos.foreach {
case (TopicAndPartition(_, partition), PartitionFetchInfo(offset, fetchSize)) =>
buffer.putInt(partition)
buffer.putLong(offset)
buffer.putInt(fetchSize)
}
} }
} }
def sizeInBytes: Int = 2 + 4 + (2 + clientId.length()) + 4 + 4 + 4 + offsetInfo.foldLeft(4)(_ + _.sizeInBytes()) def sizeInBytes: Int = {
2 + /* versionId */
4 + /* correlationId */
Utils.shortStringLength(clientId, RequestOrResponse.DefaultCharset) +
4 + /* replicaId */
4 + /* maxWait */
4 + /* minBytes */
4 + /* topic count */
requestInfoGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
val (topic, partitionFetchInfos) = currTopic
foldedTopics +
Utils.shortStringLength(topic, RequestOrResponse.DefaultCharset) +
4 + /* partition count */
partitionFetchInfos.size * (
4 + /* partition id */
8 + /* offset */
4 /* fetch size */
)
})
}
def numPartitions: Int = offsetInfo.foldLeft(0)(_ + _.offsets.size) def isFromFollower = replicaId != FetchRequest.NonFollowerId
def isFromFollower(): Boolean = replicaId != FetchRequest.NonFollowerId def numPartitions = requestInfo.size
} }
@nonthreadsafe
class FetchRequestBuilder() { class FetchRequestBuilder() {
private var correlationId = FetchRequest.DefaultCorrelationId private var correlationId = FetchRequest.DefaultCorrelationId
private val versionId = FetchRequest.CurrentVersion private val versionId = FetchRequest.CurrentVersion
@ -156,13 +127,10 @@ class FetchRequestBuilder() {
private var replicaId = FetchRequest.DefaultReplicaId private var replicaId = FetchRequest.DefaultReplicaId
private var maxWait = FetchRequest.DefaultMaxWait private var maxWait = FetchRequest.DefaultMaxWait
private var minBytes = FetchRequest.DefaultMinBytes private var minBytes = FetchRequest.DefaultMinBytes
private val requestMap = new HashMap[String, Tuple3[Buffer[Int], Buffer[Long], Buffer[Int]]] private val requestMap = new collection.mutable.HashMap[TopicAndPartition, PartitionFetchInfo]
def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) = { def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) = {
val topicData = requestMap.getOrElseUpdate(topic, (ListBuffer[Int](), ListBuffer[Long](), ListBuffer[Int]())) requestMap.put(TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize))
topicData._1.append(partition)
topicData._2.append(offset)
topicData._3.append(fetchSize)
this this
} }
@ -191,10 +159,5 @@ class FetchRequestBuilder() {
this this
} }
def build() = { def build() = FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, requestMap.toMap)
val offsetDetails = requestMap.map{ topicData =>
new OffsetDetail(topicData._1, topicData._2._1.toArray, topicData._2._2.toArray, topicData._2._3.toArray)
}
new FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, offsetDetails.toArray[OffsetDetail])
}
} }

View File

@ -19,27 +19,35 @@ package kafka.api
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.nio.channels.GatheringByteChannel import java.nio.channels.GatheringByteChannel
import kafka.common.ErrorMapping import kafka.common.{TopicAndPartition, ErrorMapping}
import kafka.message.{MessageSet, ByteBufferMessageSet} import kafka.message.{MessageSet, ByteBufferMessageSet}
import kafka.network.{MultiSend, Send} import kafka.network.{MultiSend, Send}
import kafka.utils.Utils import kafka.utils.Utils
object PartitionData { object PartitionData {
def readFrom(buffer: ByteBuffer): PartitionData = { def readFrom(buffer: ByteBuffer): PartitionData = {
val error = buffer.getShort
val partition = buffer.getInt val partition = buffer.getInt
val error = buffer.getShort
val initialOffset = buffer.getLong val initialOffset = buffer.getLong
val hw = buffer.getLong() val hw = buffer.getLong
val messageSetSize = buffer.getInt val messageSetSize = buffer.getInt
val messageSetBuffer = buffer.slice() val messageSetBuffer = buffer.slice()
messageSetBuffer.limit(messageSetSize) messageSetBuffer.limit(messageSetSize)
buffer.position(buffer.position + messageSetSize) buffer.position(buffer.position + messageSetSize)
new PartitionData(partition, error, initialOffset, hw, new ByteBufferMessageSet(messageSetBuffer, initialOffset)) new PartitionData(partition, error, initialOffset, hw, new ByteBufferMessageSet(messageSetBuffer, initialOffset))
} }
val headerSize =
4 + /* partition */
2 + /* error code */
8 + /* initialOffset */
8 + /* high watermark */
4 /* messageSetSize */
} }
case class PartitionData(partition: Int, error: Short = ErrorMapping.NoError, initialOffset:Long = 0L, hw: Long = -1L, messages: MessageSet) { case class PartitionData(partition: Int, error: Short = ErrorMapping.NoError, initialOffset:Long = 0L, hw: Long = -1L, messages: MessageSet) {
val sizeInBytes = 4 + 2 + 8 + 4 + messages.sizeInBytes.intValue() + 8
val sizeInBytes = PartitionData.headerSize + messages.sizeInBytes.intValue()
def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, -1L, messages) def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, -1L, messages)
} }
@ -50,17 +58,17 @@ class PartitionDataSend(val partitionData: PartitionData) extends Send {
private val messageSize = partitionData.messages.sizeInBytes private val messageSize = partitionData.messages.sizeInBytes
private var messagesSentSize = 0L private var messagesSentSize = 0L
private val buffer = ByteBuffer.allocate(26) private val buffer = ByteBuffer.allocate(PartitionData.headerSize)
buffer.putShort(partitionData.error)
buffer.putInt(partitionData.partition) buffer.putInt(partitionData.partition)
buffer.putShort(partitionData.error)
buffer.putLong(partitionData.initialOffset) buffer.putLong(partitionData.initialOffset)
buffer.putLong(partitionData.hw) buffer.putLong(partitionData.hw)
buffer.putInt(partitionData.messages.sizeInBytes.intValue()) buffer.putInt(partitionData.messages.sizeInBytes.intValue())
buffer.rewind() buffer.rewind()
def complete = !buffer.hasRemaining && messagesSentSize >= messageSize override def complete = !buffer.hasRemaining && messagesSentSize >= messageSize
def writeTo(channel: GatheringByteChannel): Int = { override def writeTo(channel: GatheringByteChannel): Int = {
var written = 0 var written = 0
if(buffer.hasRemaining) if(buffer.hasRemaining)
written += channel.write(buffer) written += channel.write(buffer)
@ -75,63 +83,43 @@ class PartitionDataSend(val partitionData: PartitionData) extends Send {
object TopicData { object TopicData {
def readFrom(buffer: ByteBuffer): TopicData = { def readFrom(buffer: ByteBuffer): TopicData = {
val topic = Utils.readShortString(buffer, "UTF-8") val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
val partitionCount = buffer.getInt val partitionCount = buffer.getInt
val partitions = new Array[PartitionData](partitionCount) val topicPartitionDataPairs = (1 to partitionCount).map(_ => {
for(i <- 0 until partitionCount) val partitionData = PartitionData.readFrom(buffer)
partitions(i) = PartitionData.readFrom(buffer) (TopicAndPartition(topic, partitionData.partition), partitionData)
new TopicData(topic, partitions.sortBy(_.partition)) })
TopicData(topic, Map(topicPartitionDataPairs:_*))
} }
def findPartition(data: Array[PartitionData], partition: Int): Option[PartitionData] = { def headerSize(topic: String) =
if(data == null || data.size == 0) Utils.shortStringLength(topic, RequestOrResponse.DefaultCharset) +
return None 4 /* partition count */
var (low, high) = (0, data.size-1)
while(low <= high) {
val mid = (low + high) / 2
val found = data(mid)
if(found.partition == partition)
return Some(found)
else if(partition < found.partition)
high = mid - 1
else
low = mid + 1
}
None
}
} }
case class TopicData(topic: String, partitionDataArray: Array[PartitionData]) { case class TopicData(topic: String, partitionData: Map[TopicAndPartition, PartitionData]) {
val sizeInBytes = 2 + topic.length + partitionDataArray.foldLeft(4)(_ + _.sizeInBytes) val sizeInBytes =
TopicData.headerSize(topic) + partitionData.values.foldLeft(0)(_ + _.sizeInBytes)
// need to override equals due to brokern java-arrays equals functionality val headerSize = TopicData.headerSize(topic)
override def equals(other: Any): Boolean = {
other match {
case that: TopicData =>
( topic == that.topic &&
partitionDataArray.toSeq == that.partitionDataArray.toSeq )
case _ => false
}
}
} }
class TopicDataSend(val topicData: TopicData) extends Send { class TopicDataSend(val topicData: TopicData) extends Send {
val size = topicData.sizeInBytes private val size = topicData.sizeInBytes
var sent = 0 private var sent = 0
private val buffer = ByteBuffer.allocate(2 + topicData.topic.length() + 4) override def complete = sent >= size
Utils.writeShortString(buffer, topicData.topic, "UTF-8")
buffer.putInt(topicData.partitionDataArray.length) private val buffer = ByteBuffer.allocate(topicData.headerSize)
Utils.writeShortString(buffer, topicData.topic, RequestOrResponse.DefaultCharset)
buffer.putInt(topicData.partitionData.size)
buffer.rewind() buffer.rewind()
val sends = new MultiSend(topicData.partitionDataArray.map(new PartitionDataSend(_)).toList) { val sends = new MultiSend(topicData.partitionData.toList.map(d => new PartitionDataSend(d._2))) {
val expectedBytesToWrite = topicData.partitionDataArray.foldLeft(0)(_ + _.sizeInBytes) val expectedBytesToWrite = topicData.sizeInBytes - topicData.headerSize
} }
def complete = sent >= size
def writeTo(channel: GatheringByteChannel): Int = { def writeTo(channel: GatheringByteChannel): Int = {
expectIncomplete() expectIncomplete()
var written = 0 var written = 0
@ -146,69 +134,88 @@ class TopicDataSend(val topicData: TopicData) extends Send {
} }
object FetchResponse { object FetchResponse {
val headerSize =
2 + /* versionId */
4 + /* correlationId */
4 /* topic count */
def readFrom(buffer: ByteBuffer): FetchResponse = { def readFrom(buffer: ByteBuffer): FetchResponse = {
val versionId = buffer.getShort val versionId = buffer.getShort
val errorCode = buffer.getShort
val correlationId = buffer.getInt val correlationId = buffer.getInt
val dataCount = buffer.getInt val topicCount = buffer.getInt
val data = new Array[TopicData](dataCount) val pairs = (1 to topicCount).flatMap(_ => {
for(i <- 0 until data.length) val topicData = TopicData.readFrom(buffer)
data(i) = TopicData.readFrom(buffer) topicData.partitionData.values.map(
new FetchResponse(versionId, correlationId, data, errorCode) partitionData => (TopicAndPartition(topicData.topic, partitionData.partition), partitionData)
)
})
FetchResponse(versionId, correlationId, Map(pairs:_*))
} }
} }
case class FetchResponse(versionId: Short, case class FetchResponse(versionId: Short,
correlationId: Int, correlationId: Int,
data: Array[TopicData], data: Map[TopicAndPartition, PartitionData]) {
errorCode: Short = ErrorMapping.NoError) {
val sizeInBytes = 2 + 4 + 2 + data.foldLeft(4)(_ + _.sizeInBytes) /**
* Partitions the data into a map of maps (one for each topic).
*/
lazy val dataGroupedByTopic = data.groupBy(_._1.topic)
lazy val topicMap = data.groupBy(_.topic).mapValues(_.head) val sizeInBytes =
FetchResponse.headerSize +
dataGroupedByTopic.foldLeft(0) ((folded, curr) => {
val topicData = TopicData(curr._1, curr._2)
folded +
topicData.sizeInBytes
})
def messageSet(topic: String, partition: Int): ByteBufferMessageSet = { private def partitionDataFor(topic: String, partition: Int): PartitionData = {
val messageSet = topicMap.get(topic) match { val topicAndPartition = TopicAndPartition(topic, partition)
case Some(topicData) => data.get(topicAndPartition) match {
TopicData.findPartition(topicData.partitionDataArray, partition).map(_.messages).getOrElse(MessageSet.Empty) case Some(partitionData) => partitionData
case None => case _ =>
MessageSet.Empty throw new IllegalArgumentException(
} "No partition %s in fetch response %s".format(topicAndPartition, this.toString))
messageSet.asInstanceOf[ByteBufferMessageSet]
}
def highWatermark(topic: String, partition: Int): Long = {
topicMap.get(topic) match {
case Some(topicData) =>
TopicData.findPartition(topicData.partitionDataArray, partition).map(_.hw).getOrElse(-1L)
case None => -1L
} }
} }
def messageSet(topic: String, partition: Int): ByteBufferMessageSet =
partitionDataFor(topic, partition).messages.asInstanceOf[ByteBufferMessageSet]
def highWatermark(topic: String, partition: Int) = partitionDataFor(topic, partition).hw
def hasError = data.values.exists(_.error != ErrorMapping.NoError)
def errorCode(topic: String, partition: Int) = partitionDataFor(topic, partition).error
} }
class FetchResponseSend(val fetchResponse: FetchResponse) extends Send { class FetchResponseSend(val fetchResponse: FetchResponse) extends Send {
private val size = fetchResponse.sizeInBytes private val size = fetchResponse.sizeInBytes
private var sent = 0 private var sent = 0
private val buffer = ByteBuffer.allocate(16) private val sendSize = 4 /* for size */ + size
override def complete = sent >= sendSize
private val buffer = ByteBuffer.allocate(4 /* for size */ + FetchResponse.headerSize)
buffer.putInt(size) buffer.putInt(size)
buffer.putShort(fetchResponse.versionId) buffer.putShort(fetchResponse.versionId)
buffer.putShort(fetchResponse.errorCode)
buffer.putInt(fetchResponse.correlationId) buffer.putInt(fetchResponse.correlationId)
buffer.putInt(fetchResponse.data.length) buffer.putInt(fetchResponse.dataGroupedByTopic.size) // topic count
buffer.rewind() buffer.rewind()
val sends = new MultiSend(fetchResponse.data.map(new TopicDataSend(_)).toList) { val sends = new MultiSend(fetchResponse.dataGroupedByTopic.toList.map {
val expectedBytesToWrite = fetchResponse.data.foldLeft(0)(_ + _.sizeInBytes) case(topic, data) => new TopicDataSend(TopicData(topic, data))
}) {
val expectedBytesToWrite = fetchResponse.sizeInBytes - FetchResponse.headerSize
} }
def complete = sent >= sendSize
def writeTo(channel: GatheringByteChannel):Int = { def writeTo(channel: GatheringByteChannel):Int = {
expectIncomplete() expectIncomplete()
var written = 0 var written = 0
@ -220,6 +227,5 @@ class FetchResponseSend(val fetchResponse: FetchResponse) extends Send {
sent += written sent += written
written written
} }
def sendSize = 4 + fetchResponse.sizeInBytes
} }

View File

@ -20,6 +20,8 @@ package kafka.api
import java.nio._ import java.nio._
import kafka.message._ import kafka.message._
import kafka.utils._ import kafka.utils._
import scala.collection.Map
import kafka.common.TopicAndPartition
object ProducerRequest { object ProducerRequest {
@ -28,88 +30,95 @@ object ProducerRequest {
def readFrom(buffer: ByteBuffer): ProducerRequest = { def readFrom(buffer: ByteBuffer): ProducerRequest = {
val versionId: Short = buffer.getShort val versionId: Short = buffer.getShort
val correlationId: Int = buffer.getInt val correlationId: Int = buffer.getInt
val clientId: String = Utils.readShortString(buffer, "UTF-8") val clientId: String = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
val requiredAcks: Short = buffer.getShort val requiredAcks: Short = buffer.getShort
val ackTimeoutMs: Int = buffer.getInt val ackTimeoutMs: Int = buffer.getInt
//build the topic structure //build the topic structure
val topicCount = buffer.getInt val topicCount = buffer.getInt
val data = new Array[TopicData](topicCount) val partitionDataPairs = (1 to topicCount).flatMap(_ => {
for(i <- 0 until topicCount) { // process topic
val topic = Utils.readShortString(buffer, "UTF-8") val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
val partitionCount = buffer.getInt val partitionCount = buffer.getInt
//build the partition structure within this topic (1 to partitionCount).map(_ => {
val partitionData = new Array[PartitionData](partitionCount)
for (j <- 0 until partitionCount) {
val partition = buffer.getInt val partition = buffer.getInt
val messageSetSize = buffer.getInt val messageSetSize = buffer.getInt
val messageSetBuffer = new Array[Byte](messageSetSize) val messageSetBuffer = new Array[Byte](messageSetSize)
buffer.get(messageSetBuffer,0,messageSetSize) buffer.get(messageSetBuffer,0,messageSetSize)
partitionData(j) = new PartitionData(partition,new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer))) (TopicAndPartition(topic, partition),
} new PartitionData(partition,new ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer))))
data(i) = new TopicData(topic,partitionData) })
} })
new ProducerRequest(versionId, correlationId, clientId, requiredAcks, ackTimeoutMs, data)
ProducerRequest(versionId, correlationId, clientId, requiredAcks, ackTimeoutMs, Map(partitionDataPairs:_*))
} }
} }
case class ProducerRequest( versionId: Short, case class ProducerRequest( versionId: Short = ProducerRequest.CurrentVersion,
correlationId: Int, correlationId: Int,
clientId: String, clientId: String,
requiredAcks: Short, requiredAcks: Short,
ackTimeoutMs: Int, ackTimeoutMs: Int,
data: Array[TopicData] ) extends RequestOrResponse(Some(RequestKeys.ProduceKey)) { data: Map[TopicAndPartition, PartitionData])
extends RequestOrResponse(Some(RequestKeys.ProduceKey)) {
def this(correlationId: Int, clientId: String, requiredAcks: Short, ackTimeoutMs: Int, data: Array[TopicData]) = /**
* Partitions the data into a map of maps (one for each topic).
*/
private lazy val dataGroupedByTopic = data.groupBy(_._1.topic)
def this(correlationId: Int,
clientId: String,
requiredAcks: Short,
ackTimeoutMs: Int,
data: Map[TopicAndPartition, PartitionData]) =
this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs, data) this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs, data)
def writeTo(buffer: ByteBuffer) { def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId) buffer.putShort(versionId)
buffer.putInt(correlationId) buffer.putInt(correlationId)
Utils.writeShortString(buffer, clientId, "UTF-8") Utils.writeShortString(buffer, clientId, RequestOrResponse.DefaultCharset)
buffer.putShort(requiredAcks) buffer.putShort(requiredAcks)
buffer.putInt(ackTimeoutMs) buffer.putInt(ackTimeoutMs)
//save the topic structure //save the topic structure
buffer.putInt(data.size) //the number of topics buffer.putInt(dataGroupedByTopic.size) //the number of topics
for(topicData <- data) { dataGroupedByTopic.foreach {
Utils.writeShortString(buffer, topicData.topic, "UTF-8") //write the topic case (topic, topicAndPartitionData) =>
buffer.putInt(topicData.partitionDataArray.size) //the number of partitions Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset) //write the topic
for(partitionData <- topicData.partitionDataArray) { buffer.putInt(topicAndPartitionData.size) //the number of partitions
topicAndPartitionData.foreach(partitionAndData => {
val partitionData = partitionAndData._2
buffer.putInt(partitionData.partition) buffer.putInt(partitionData.partition)
buffer.putInt(partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer.limit) buffer.putInt(partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer.limit)
buffer.put(partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer) buffer.put(partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer)
partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer.rewind partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer.rewind
} })
}
}
def sizeInBytes(): Int = {
var size = 0
//size, request_type_id, version_id, correlation_id, client_id, required_acks, ack_timeout, data.size
size = 2 + 4 + 2 + clientId.length + 2 + 4 + 4
for(topicData <- data) {
size += 2 + topicData.topic.length + 4
for(partitionData <- topicData.partitionDataArray) {
size += 4 + 4 + partitionData.messages.sizeInBytes.asInstanceOf[Int]
}
}
size
}
// need to override case-class equals due to broken java-array equals()
override def equals(other: Any): Boolean = {
other match {
case that: ProducerRequest =>
( correlationId == that.correlationId &&
clientId == that.clientId &&
requiredAcks == that.requiredAcks &&
ackTimeoutMs == that.ackTimeoutMs &&
data.toSeq == that.data.toSeq )
case _ => false
} }
} }
def topicPartitionCount = data.foldLeft(0)(_ + _.partitionDataArray.length) def sizeInBytes: Int = {
2 + /* versionId */
4 + /* correlationId */
Utils.shortStringLength(clientId, RequestOrResponse.DefaultCharset) + /* client id */
2 + /* requiredAcks */
4 + /* ackTimeoutMs */
4 + /* number of topics */
dataGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
foldedTopics +
Utils.shortStringLength(currTopic._1, RequestOrResponse.DefaultCharset) +
4 + /* the number of partitions */
{
currTopic._2.foldLeft(0)((foldedPartitions, currPartition) => {
foldedPartitions +
4 + /* partition id */
4 + /* byte-length of serialized messages */
currPartition._2.messages.sizeInBytes.toInt
})
}
})
}
def numPartitions = data.size
} }

View File

@ -18,57 +18,81 @@
package kafka.api package kafka.api
import java.nio.ByteBuffer import java.nio.ByteBuffer
import kafka.common.ErrorMapping import kafka.utils.Utils
import scala.collection.Map
import kafka.common.{TopicAndPartition, ErrorMapping}
object ProducerResponse { object ProducerResponse {
def readFrom(buffer: ByteBuffer): ProducerResponse = { def readFrom(buffer: ByteBuffer): ProducerResponse = {
val versionId = buffer.getShort val versionId = buffer.getShort
val correlationId = buffer.getInt val correlationId = buffer.getInt
val errorCode = buffer.getShort val topicCount = buffer.getInt
val errorsSize = buffer.getInt val statusPairs = (1 to topicCount).flatMap(_ => {
val errors = new Array[Short](errorsSize) val topic = Utils.readShortString(buffer, RequestOrResponse.DefaultCharset)
for( i <- 0 until errorsSize) { val partitionCount = buffer.getInt
errors(i) = buffer.getShort (1 to partitionCount).map(_ => {
} val partition = buffer.getInt
val offsetsSize = buffer.getInt val error = buffer.getShort
val offsets = new Array[Long](offsetsSize) val offset = buffer.getLong
for( i <- 0 until offsetsSize) { (TopicAndPartition(topic, partition), ProducerResponseStatus(error, offset))
offsets(i) = buffer.getLong })
} })
new ProducerResponse(versionId, correlationId, errors, offsets, errorCode)
ProducerResponse(versionId, correlationId, Map(statusPairs:_*))
} }
} }
case class ProducerResponse(versionId: Short, correlationId: Int, errors: Array[Short], case class ProducerResponseStatus(error: Short, nextOffset: Long)
offsets: Array[Long], errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{
val sizeInBytes = 2 + 2 + 4 + (4 + 2 * errors.length) + (4 + 8 * offsets.length)
case class ProducerResponse(versionId: Short,
correlationId: Int,
status: Map[TopicAndPartition, ProducerResponseStatus]) extends RequestOrResponse {
/**
* Partitions the status map into a map of maps (one for each topic).
*/
private lazy val statusGroupedByTopic = status.groupBy(_._1.topic)
def hasError = status.values.exists(_.error != ErrorMapping.NoError)
val sizeInBytes = {
val groupedStatus = statusGroupedByTopic
2 + /* version id */
4 + /* correlation id */
4 + /* topic count */
groupedStatus.foldLeft (0) ((foldedTopics, currTopic) => {
foldedTopics +
Utils.shortStringLength(currTopic._1, RequestOrResponse.DefaultCharset) +
4 + /* partition count for this topic */
currTopic._2.foldLeft (0) ((foldedPartitions, currPartition) => {
foldedPartitions +
4 + /* partition id */
2 + /* error code */
8 /* offset */
})
})
}
def writeTo(buffer: ByteBuffer) { def writeTo(buffer: ByteBuffer) {
/* version id */ val groupedStatus = statusGroupedByTopic
buffer.putShort(versionId) buffer.putShort(versionId)
/* correlation id */
buffer.putInt(correlationId) buffer.putInt(correlationId)
/* error code */ buffer.putInt(groupedStatus.size) // topic count
buffer.putShort(errorCode)
/* errors */ groupedStatus.foreach(topicStatus => {
buffer.putInt(errors.length) val (topic, errorsAndOffsets) = topicStatus
errors.foreach(buffer.putShort(_)) Utils.writeShortString(buffer, topic, RequestOrResponse.DefaultCharset)
/* offsets */ buffer.putInt(errorsAndOffsets.size) // partition count
buffer.putInt(offsets.length) errorsAndOffsets.foreach {
offsets.foreach(buffer.putLong(_)) case((TopicAndPartition(_, partition), ProducerResponseStatus(error, nextOffset))) =>
buffer.putInt(partition)
buffer.putShort(error)
buffer.putLong(nextOffset)
}
})
}
} }
// need to override case-class equals due to broken java-array equals()
override def equals(other: Any): Boolean = {
other match {
case that: ProducerResponse =>
( correlationId == that.correlationId &&
versionId == that.versionId &&
errorCode == that.errorCode &&
errors.toSeq == that.errors.toSeq &&
offsets.toSeq == that.offsets.toSeq)
case _ => false
}
}
}

View File

@ -19,6 +19,12 @@ package kafka.api
import java.nio._ import java.nio._
object RequestOrResponse {
val DefaultCharset = "UTF-8"
}
private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) { private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) {
def sizeInBytes: Int def sizeInBytes: Int

View File

@ -34,13 +34,12 @@ object ErrorMapping {
val InvalidMessageCode : Short = 2 val InvalidMessageCode : Short = 2
val UnknownTopicOrPartitionCode : Short = 3 val UnknownTopicOrPartitionCode : Short = 3
val InvalidFetchSizeCode : Short = 4 val InvalidFetchSizeCode : Short = 4
val InvalidFetchRequestFormatCode : Short = 5 val LeaderNotAvailableCode : Short = 5
val LeaderNotAvailableCode : Short = 6 val NotLeaderForPartitionCode : Short = 6
val NotLeaderForPartitionCode : Short = 7 val RequestTimedOutCode: Short = 7
val RequestTimedOutCode: Short = 8 val BrokerNotAvailableCode: Short = 8
val BrokerNotAvailableCode: Short = 9 val ReplicaNotAvailableCode: Short = 9
val ReplicaNotAvailableCode: Short = 10 val MessageSizeTooLargeCode: Short = 10
val MessageSizeTooLargeCode: Short = 11
private val exceptionToCode = private val exceptionToCode =
Map[Class[Throwable], Short]( Map[Class[Throwable], Short](
@ -48,7 +47,6 @@ object ErrorMapping {
classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode, classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode,
classOf[UnknownTopicOrPartitionException].asInstanceOf[Class[Throwable]] -> UnknownTopicOrPartitionCode, classOf[UnknownTopicOrPartitionException].asInstanceOf[Class[Throwable]] -> UnknownTopicOrPartitionCode,
classOf[InvalidMessageSizeException].asInstanceOf[Class[Throwable]] -> InvalidFetchSizeCode, classOf[InvalidMessageSizeException].asInstanceOf[Class[Throwable]] -> InvalidFetchSizeCode,
classOf[FetchRequestFormatException].asInstanceOf[Class[Throwable]] -> InvalidFetchRequestFormatCode,
classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode, classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode,
classOf[LeaderNotAvailableException].asInstanceOf[Class[Throwable]] -> LeaderNotAvailableCode, classOf[LeaderNotAvailableException].asInstanceOf[Class[Throwable]] -> LeaderNotAvailableCode,
classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode, classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode,

View File

@ -1,3 +1,5 @@
package kafka.common
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
@ -14,8 +16,14 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package kafka.common
class FetchRequestFormatException(val message: String) extends RuntimeException(message) { /**
def this() = this(null) * Convenience case class since (topic, partition) pairs are ubiquitous.
*/
case class TopicAndPartition(topic: String, partition: Int) {
def this(tuple: (String, Int)) = this(tuple._1, tuple._2)
def asTuple = (topic, partition)
} }

View File

@ -25,6 +25,7 @@ import scala.collection.mutable
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.locks.ReentrantLock
import kafka.utils.ZkUtils._ import kafka.utils.ZkUtils._
import kafka.utils.{ShutdownableThread, SystemTime} import kafka.utils.{ShutdownableThread, SystemTime}
import kafka.common.TopicAndPartition
/** /**
@ -38,7 +39,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds), 1) { extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds), 1) {
private var partitionMap: immutable.Map[(String, Int), PartitionTopicInfo] = null private var partitionMap: immutable.Map[(String, Int), PartitionTopicInfo] = null
private var cluster: Cluster = null private var cluster: Cluster = null
private val noLeaderPartitionSet = new mutable.HashSet[(String, Int)] private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition]
private val lock = new ReentrantLock private val lock = new ReentrantLock
private val cond = lock.newCondition() private val cond = lock.newCondition()
private val leaderFinderThread = new ShutdownableThread(consumerIdString + "-leader-finder-thread"){ private val leaderFinderThread = new ShutdownableThread(consumerIdString + "-leader-finder-thread"){
@ -48,7 +49,8 @@ class ConsumerFetcherManager(private val consumerIdString: String,
try { try {
if (noLeaderPartitionSet.isEmpty) if (noLeaderPartitionSet.isEmpty)
cond.await() cond.await()
for ((topic, partitionId) <- noLeaderPartitionSet) { noLeaderPartitionSet.foreach {
case(TopicAndPartition(topic, partitionId)) =>
// find the leader for this partition // find the leader for this partition
getLeaderForPartition(zkClient, topic, partitionId) match { getLeaderForPartition(zkClient, topic, partitionId) match {
case Some(leaderId) => case Some(leaderId) =>
@ -56,7 +58,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
case Some(broker) => case Some(broker) =>
val pti = partitionMap((topic, partitionId)) val pti = partitionMap((topic, partitionId))
addFetcher(topic, partitionId, pti.getFetchOffset(), broker) addFetcher(topic, partitionId, pti.getFetchOffset(), broker)
noLeaderPartitionSet.remove((topic, partitionId)) noLeaderPartitionSet.remove(TopicAndPartition(topic, partitionId))
case None => case None =>
error("Broker %d is unavailable, fetcher for topic %s partition %d could not be started" error("Broker %d is unavailable, fetcher for topic %s partition %d could not be started"
.format(leaderId, topic, partitionId)) .format(leaderId, topic, partitionId))
@ -84,7 +86,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
try { try {
partitionMap = topicInfos.map(tpi => ((tpi.topic, tpi.partitionId), tpi)).toMap partitionMap = topicInfos.map(tpi => ((tpi.topic, tpi.partitionId), tpi)).toMap
this.cluster = cluster this.cluster = cluster
noLeaderPartitionSet ++= topicInfos.map(tpi => (tpi.topic, tpi.partitionId)) noLeaderPartitionSet ++= topicInfos.map(tpi => TopicAndPartition(tpi.topic, tpi.partitionId))
cond.signalAll() cond.signalAll()
} finally { } finally {
lock.unlock() lock.unlock()
@ -117,7 +119,7 @@ class ConsumerFetcherManager(private val consumerIdString: String,
pti pti
} }
def addPartitionsWithError(partitionList: Iterable[(String, Int)]) { def addPartitionsWithError(partitionList: Iterable[TopicAndPartition]) {
debug("adding partitions with error %s".format(partitionList)) debug("adding partitions with error %s".format(partitionList))
lock.lock() lock.lock()
try { try {

View File

@ -21,6 +21,8 @@ import kafka.cluster.Broker
import kafka.server.AbstractFetcherThread import kafka.server.AbstractFetcherThread
import kafka.message.ByteBufferMessageSet import kafka.message.ByteBufferMessageSet
import kafka.api.{FetchRequest, OffsetRequest, PartitionData} import kafka.api.{FetchRequest, OffsetRequest, PartitionData}
import kafka.common.TopicAndPartition
class ConsumerFetcherThread(name: String, class ConsumerFetcherThread(name: String,
val config: ConsumerConfig, val config: ConsumerConfig,
@ -57,7 +59,7 @@ class ConsumerFetcherThread(name: String,
} }
// any logic for partitions whose leader has changed // any logic for partitions whose leader has changed
def handlePartitionsWithErrors(partitions: Iterable[(String, Int)]) { def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) {
consumerFetcherManager.addPartitionsWithError(partitions) consumerFetcherManager.addPartitionsWithError(partitions)
} }
} }

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.javaapi
import scala.collection.JavaConversions
import kafka.api.PartitionFetchInfo
import java.nio.ByteBuffer
import kafka.common.TopicAndPartition
class FetchRequest(correlationId: Int,
clientId: String,
replicaId: Int,
maxWait: Int,
minBytes: Int,
requestInfo: java.util.Map[TopicAndPartition, PartitionFetchInfo]) {
val underlying = {
val scalaMap = JavaConversions.asMap(requestInfo).toMap
kafka.api.FetchRequest(
correlationId = correlationId,
clientId = clientId,
replicaId = replicaId,
maxWait = maxWait,
minBytes = minBytes,
requestInfo = scalaMap
)
}
def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) }
def sizeInBytes = underlying.sizeInBytes
override def toString = underlying.toString
override def equals(other: Any) = canEqual(other) && {
val otherFetchRequest = other.asInstanceOf[kafka.javaapi.FetchRequest]
this.underlying.equals(otherFetchRequest.underlying)
}
def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.FetchRequest]
override def hashCode = underlying.hashCode
}

View File

@ -17,17 +17,33 @@
package kafka.javaapi package kafka.javaapi
import kafka.api.TopicData import kafka.api.PartitionData
import kafka.common.TopicAndPartition
class FetchResponse( val versionId: Short, class FetchResponse( val versionId: Short,
val correlationId: Int, val correlationId: Int,
private val data: Array[TopicData] ) { private val data: Map[TopicAndPartition, PartitionData] ) {
private val underlying = new kafka.api.FetchResponse(versionId, correlationId, data) private val underlying = kafka.api.FetchResponse(versionId, correlationId, data)
def messageSet(topic: String, partition: Int): kafka.javaapi.message.ByteBufferMessageSet = { def messageSet(topic: String, partition: Int): kafka.javaapi.message.ByteBufferMessageSet = {
import Implicits._ import Implicits._
underlying.messageSet(topic, partition) underlying.messageSet(topic, partition)
} }
def highWatermark(topic: String, partition: Int) = underlying.highWatermark(topic, partition)
def hasError = underlying.hasError
def errorCode(topic: String, partition: Int) = underlying.errorCode(topic, partition)
override def equals(other: Any) = canEqual(other) && {
val otherFetchResponse = other.asInstanceOf[kafka.javaapi.FetchResponse]
this.underlying.equals(otherFetchResponse.underlying)
}
def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.FetchResponse]
override def hashCode = underlying.hashCode
} }

View File

@ -1,44 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.javaapi
import kafka.api.RequestOrResponse
import kafka.api.{RequestKeys, TopicData}
import java.nio.ByteBuffer
class ProducerRequest(val correlationId: Int,
val clientId: String,
val requiredAcks: Short,
val ackTimeoutMs: Int,
val data: Array[TopicData]) extends RequestOrResponse(Some(RequestKeys.ProduceKey)) {
val underlying = new kafka.api.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeoutMs, data)
def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) }
def sizeInBytes(): Int = underlying.sizeInBytes
override def toString: String =
underlying.toString
override def equals(other: Any): Boolean = underlying.equals(other)
def canEqual(other: Any): Boolean = other.isInstanceOf[ProducerRequest]
override def hashCode: Int = underlying.hashCode
}

View File

@ -17,9 +17,8 @@
package kafka.javaapi.consumer package kafka.javaapi.consumer
import kafka.api.FetchRequest
import kafka.javaapi.FetchResponse
import kafka.utils.threadsafe import kafka.utils.threadsafe
import kafka.javaapi.FetchResponse
/** /**
* A consumer of kafka messages * A consumer of kafka messages
@ -31,15 +30,28 @@ class SimpleConsumer(val host: String,
val bufferSize: Int) { val bufferSize: Int) {
val underlying = new kafka.consumer.SimpleConsumer(host, port, soTimeout, bufferSize) val underlying = new kafka.consumer.SimpleConsumer(host, port, soTimeout, bufferSize)
/**
* Fetch a set of messages from a topic. This version of the fetch method
* takes the Scala version of a fetch request (i.e.,
* [[kafka.api.FetchRequest]] and is intended for use with the
* [[kafka.api.FetchRequestBuilder]].
*
* @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
* @return a set of fetched messages
*/
def fetch(request: kafka.api.FetchRequest): FetchResponse = {
import kafka.javaapi.Implicits._
underlying.fetch(request)
}
/** /**
* Fetch a set of messages from a topic. * Fetch a set of messages from a topic.
* *
* @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched. * @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
* @return a set of fetched messages * @return a set of fetched messages
*/ */
def fetch(request: FetchRequest): FetchResponse = { def fetch(request: kafka.javaapi.FetchRequest): FetchResponse = {
import kafka.javaapi.Implicits._ fetch(request.underlying)
underlying.fetch(request)
} }
/** /**

View File

@ -1,43 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.javaapi.producer
import kafka.producer.SyncProducerConfig
import kafka.javaapi.message.ByteBufferMessageSet
import kafka.api.{ProducerResponse, PartitionData, TopicData}
class SyncProducer(syncProducer: kafka.producer.SyncProducer) {
def this(config: SyncProducerConfig) = this(new kafka.producer.SyncProducer(config))
val underlying = syncProducer
def send(producerRequest: kafka.javaapi.ProducerRequest): ProducerResponse = {
underlying.send(producerRequest.underlying)
}
def send(topic: String, messages: ByteBufferMessageSet): ProducerResponse = {
val partitionData = Array[PartitionData]( new PartitionData(-1, messages.underlying) )
val data = Array[TopicData]( new TopicData(topic, partitionData) )
val producerRequest = new kafka.api.ProducerRequest(-1, "", 0, 0, data)
underlying.send(producerRequest)
}
def close() {
underlying.close
}
}

View File

@ -23,12 +23,14 @@ import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge import com.yammer.metrics.core.Gauge
import java.nio.ByteBuffer import java.nio.ByteBuffer
import kafka.api._ import kafka.api._
import kafka.common.TopicAndPartition
object RequestChannel { object RequestChannel {
val AllDone = new Request(1, 2, getShutdownReceive(), 0) val AllDone = new Request(1, 2, getShutdownReceive(), 0)
def getShutdownReceive() = { def getShutdownReceive() = {
val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, Array[TopicData]()) val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, Map[TopicAndPartition, PartitionData]())
val byteBuffer = ByteBuffer.allocate(emptyProducerRequest.sizeInBytes + 2) val byteBuffer = ByteBuffer.allocate(emptyProducerRequest.sizeInBytes + 2)
byteBuffer.putShort(RequestKeys.ProduceKey) byteBuffer.putShort(RequestKeys.ProduceKey)
emptyProducerRequest.writeTo(byteBuffer) emptyProducerRequest.writeTo(byteBuffer)

View File

@ -18,13 +18,13 @@
package kafka.producer.async package kafka.producer.async
import kafka.common._ import kafka.common._
import kafka.message.{Message, NoCompressionCodec, ByteBufferMessageSet} import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
import kafka.producer._ import kafka.producer._
import kafka.serializer.Encoder import kafka.serializer.Encoder
import kafka.utils.{Utils, Logging} import kafka.utils.{Utils, Logging}
import scala.collection.Map import scala.collection.{Seq, Map}
import scala.collection.mutable.{ListBuffer, HashMap} import scala.collection.mutable.{ListBuffer, HashMap}
import kafka.api._ import kafka.api.{TopicMetadata, ProducerRequest, PartitionData}
class DefaultEventHandler[K,V](config: ProducerConfig, class DefaultEventHandler[K,V](config: ProducerConfig,
@ -81,12 +81,13 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
val messageSetPerBroker = groupMessagesToSet(eventsPerBrokerMap) val messageSetPerBroker = groupMessagesToSet(eventsPerBrokerMap)
val failedTopicPartitions = send(brokerid, messageSetPerBroker) val failedTopicPartitions = send(brokerid, messageSetPerBroker)
for( (topic, partition) <- failedTopicPartitions ) { failedTopicPartitions.foreach(topicPartition => {
eventsPerBrokerMap.get((topic, partition)) match { eventsPerBrokerMap.get(topicPartition) match {
case Some(data) => failedProduceRequests.appendAll(data) case Some(data) => failedProduceRequests.appendAll(data)
case None => // nothing case None => // nothing
} }
} })
} }
} catch { } catch {
case t: Throwable => error("Failed to send messages", t) case t: Throwable => error("Failed to send messages", t)
@ -112,7 +113,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
else { else {
// currently, if in async mode, we just log the serialization error. We need to revisit // currently, if in async mode, we just log the serialization error. We need to revisit
// this when doing kafka-496 // this when doing kafka-496
error("Error serializing message " + t) error("Error serializing message ", t)
} }
} }
} }
@ -122,8 +123,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
serializedProducerData serializedProducerData
} }
def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Option[Map[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]]] = { def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Option[Map[Int, Map[TopicAndPartition, Seq[ProducerData[K,Message]]]]] = {
val ret = new HashMap[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]] val ret = new HashMap[Int, Map[TopicAndPartition, Seq[ProducerData[K,Message]]]]
try { try {
for (event <- events) { for (event <- events) {
val topicPartitionsList = getPartitionListForTopic(event) val topicPartitionsList = getPartitionListForTopic(event)
@ -135,16 +136,16 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
// postpone the failure until the send operation, so that requests for other brokers are handled correctly // postpone the failure until the send operation, so that requests for other brokers are handled correctly
val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1) val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1)
var dataPerBroker: HashMap[(String, Int), Seq[ProducerData[K,Message]]] = null var dataPerBroker: HashMap[TopicAndPartition, Seq[ProducerData[K,Message]]] = null
ret.get(leaderBrokerId) match { ret.get(leaderBrokerId) match {
case Some(element) => case Some(element) =>
dataPerBroker = element.asInstanceOf[HashMap[(String, Int), Seq[ProducerData[K,Message]]]] dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[ProducerData[K,Message]]]]
case None => case None =>
dataPerBroker = new HashMap[(String, Int), Seq[ProducerData[K,Message]]] dataPerBroker = new HashMap[TopicAndPartition, Seq[ProducerData[K,Message]]]
ret.put(leaderBrokerId, dataPerBroker) ret.put(leaderBrokerId, dataPerBroker)
} }
val topicAndPartition = (event.getTopic, brokerPartition.partitionId) val topicAndPartition = TopicAndPartition(event.getTopic, brokerPartition.partitionId)
var dataPerTopicPartition: ListBuffer[ProducerData[K,Message]] = null var dataPerTopicPartition: ListBuffer[ProducerData[K,Message]] = null
dataPerBroker.get(topicAndPartition) match { dataPerBroker.get(topicAndPartition) match {
case Some(element) => case Some(element) =>
@ -199,39 +200,30 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
* @param messagesPerTopic the messages as a map from (topic, partition) -> messages * @param messagesPerTopic the messages as a map from (topic, partition) -> messages
* @return the set (topic, partitions) messages which incurred an error sending or processing * @return the set (topic, partitions) messages which incurred an error sending or processing
*/ */
private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), ByteBufferMessageSet]): Seq[(String, Int)] = { private def send(brokerId: Int, messagesPerTopic: Map[TopicAndPartition, ByteBufferMessageSet]) = {
if(brokerId < 0) { if(brokerId < 0) {
warn("failed to send to broker %d with data %s".format(brokerId, messagesPerTopic)) warn("Failed to send to broker %d with data %s".format(brokerId, messagesPerTopic))
messagesPerTopic.keys.toSeq messagesPerTopic.keys.toSeq
} else if(messagesPerTopic.size > 0) { } else if(messagesPerTopic.size > 0) {
val topics = new HashMap[String, ListBuffer[PartitionData]]() val topicPartitionDataPairs = messagesPerTopic.toSeq.map {
for( ((topicName, partitionId), messagesSet) <- messagesPerTopic ) { case (topicAndPartition, messages) =>
val partitionData = topics.getOrElseUpdate(topicName, new ListBuffer[PartitionData]()) (topicAndPartition, new PartitionData(topicAndPartition.partition, messages))
partitionData.append(new PartitionData(partitionId, messagesSet))
} }
val topicData = topics.map(kv => new TopicData(kv._1, kv._2.toArray)).toArray
val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks, val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks,
config.requestTimeoutMs, topicData) config.requestTimeoutMs, Map(topicPartitionDataPairs:_*))
try { try {
val syncProducer = producerPool.getProducer(brokerId) val syncProducer = producerPool.getProducer(brokerId)
val response = syncProducer.send(producerRequest) val response = syncProducer.send(producerRequest)
trace("producer sent messages for topics %s to broker %d on %s:%d" trace("Producer sent messages for topics %s to broker %d on %s:%d"
.format(messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port)) .format(messagesPerTopic, brokerId, syncProducer.config.host, syncProducer.config.port))
var msgIdx = -1 if (response.status.size != producerRequest.data.size)
val errors = new ListBuffer[(String, Int)] throw new KafkaException("Incomplete response (%s) for producer request (%s)"
for( topic <- topicData; partition <- topic.partitionDataArray ) { .format(response, producerRequest))
msgIdx += 1 response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
if (msgIdx >= response.errors.size || response.errors(msgIdx) != ErrorMapping.NoError) { .map(partitionStatus => partitionStatus._1)
errors.append((topic.topic, partition.partition))
if (msgIdx < response.errors.size)
warn("Received error " + ErrorMapping.exceptionFor(response.errors(msgIdx)) +
"from broker %d on %s:%d".format(brokerId, topic.topic, partition.partition))
}
}
errors
} catch { } catch {
case e => case t: Throwable =>
warn("failed to send to broker %d with data %s".format(brokerId, messagesPerTopic), e) warn("failed to send to broker %d with data %s".format(brokerId, messagesPerTopic), t)
messagesPerTopic.keys.toSeq messagesPerTopic.keys.toSeq
} }
} else { } else {
@ -239,7 +231,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
} }
} }
private def groupMessagesToSet(eventsPerTopicAndPartition: Map[(String,Int), Seq[ProducerData[K,Message]]]): Map[(String, Int), ByteBufferMessageSet] = { private def groupMessagesToSet(eventsPerTopicAndPartition: Map[TopicAndPartition, Seq[ProducerData[K,Message]]]) = {
/** enforce the compressed.topics config here. /** enforce the compressed.topics config here.
* If the compression codec is anything other than NoCompressionCodec, * If the compression codec is anything other than NoCompressionCodec,
* Enable compression only for specified topics if any * Enable compression only for specified topics if any
@ -255,25 +247,23 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
( topicAndPartition, ( topicAndPartition,
config.compressionCodec match { config.compressionCodec match {
case NoCompressionCodec => case NoCompressionCodec =>
trace("Sending %d messages with no compression to topic %s on partition %d" trace("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition))
.format(messages.size, topicAndPartition._1, topicAndPartition._2))
new ByteBufferMessageSet(NoCompressionCodec, messages: _*) new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
case _ => case _ =>
config.compressedTopics.size match { config.compressedTopics.size match {
case 0 => case 0 =>
trace("Sending %d messages with compression codec %d to topic %s on partition %d" trace("Sending %d messages with compression codec %d to %s"
.format(messages.size, config.compressionCodec.codec, topicAndPartition._1, topicAndPartition._2)) .format(messages.size, config.compressionCodec.codec, topicAndPartition))
new ByteBufferMessageSet(config.compressionCodec, messages: _*) new ByteBufferMessageSet(config.compressionCodec, messages: _*)
case _ => case _ =>
if(config.compressedTopics.contains(topicAndPartition._1)) { if(config.compressedTopics.contains(topicAndPartition.topic)) {
trace("Sending %d messages with compression codec %d to topic %s on partition %d" trace("Sending %d messages with compression codec %d to %s"
.format(messages.size, config.compressionCodec.codec, topicAndPartition._1, topicAndPartition._2)) .format(messages.size, config.compressionCodec.codec, topicAndPartition))
new ByteBufferMessageSet(config.compressionCodec, messages: _*) new ByteBufferMessageSet(config.compressionCodec, messages: _*)
} }
else { else {
trace("Sending %d messages to topic %s and partition %d with no compression as %s is not in compressed.topics - %s" trace("Sending %d messages to %s with no compression as it is not in compressed.topics - %s"
.format(messages.size, topicAndPartition._1, topicAndPartition._2, topicAndPartition._1, .format(messages.size, topicAndPartition, config.compressedTopics.toString))
config.compressedTopics.toString))
new ByteBufferMessageSet(NoCompressionCodec, messages: _*) new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
} }
} }

View File

@ -19,7 +19,7 @@ package kafka.server
import kafka.cluster.Broker import kafka.cluster.Broker
import kafka.consumer.SimpleConsumer import kafka.consumer.SimpleConsumer
import kafka.common.ErrorMapping import kafka.common.{TopicAndPartition, ErrorMapping}
import collection.mutable import collection.mutable
import kafka.message.ByteBufferMessageSet import kafka.message.ByteBufferMessageSet
import kafka.api.{FetchResponse, PartitionData, FetchRequestBuilder} import kafka.api.{FetchResponse, PartitionData, FetchRequestBuilder}
@ -36,7 +36,7 @@ import java.util.concurrent.TimeUnit
abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int, abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int,
fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1) fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1)
extends ShutdownableThread(name) { extends ShutdownableThread(name) {
private val fetchMap = new mutable.HashMap[Tuple2[String,Int], Long] // a (topic, partitionId) -> offset map private val fetchMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset map
private val fetchMapLock = new Object private val fetchMapLock = new Object
val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize) val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize)
val fetcherMetrics = FetcherStat.getFetcherStat(name + "-" + sourceBroker.id) val fetcherMetrics = FetcherStat.getFetcherStat(name + "-" + sourceBroker.id)
@ -50,7 +50,7 @@ abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socket
def handleOffsetOutOfRange(topic: String, partitionId: Int): Long def handleOffsetOutOfRange(topic: String, partitionId: Int): Long
// deal with partitions with errors, potentially due to leadership changes // deal with partitions with errors, potentially due to leadership changes
def handlePartitionsWithErrors(partitions: Iterable[(String, Int)]) def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition])
override def shutdown(){ override def shutdown(){
super.shutdown() super.shutdown()
@ -65,12 +65,15 @@ abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socket
minBytes(minBytes) minBytes(minBytes)
fetchMapLock synchronized { fetchMapLock synchronized {
for ( ((topic, partitionId), offset) <- fetchMap ) fetchMap.foreach {
builder.addFetch(topic, partitionId, offset.longValue, fetchSize) case((topicAndPartition, offset)) =>
builder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
offset, fetchSize)
}
} }
val fetchRequest = builder.build() val fetchRequest = builder.build()
val partitionsWithError = new mutable.HashSet[(String, Int)] val partitionsWithError = new mutable.HashSet[TopicAndPartition]
var response: FetchResponse = null var response: FetchResponse = null
try { try {
trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
@ -90,37 +93,35 @@ abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socket
if (response != null) { if (response != null) {
// process fetched data // process fetched data
fetchMapLock synchronized { fetchMapLock synchronized {
for ( topicData <- response.data ) { response.data.foreach {
for ( partitionData <- topicData.partitionDataArray) { case(topicAndPartition, partitionData) =>
val topic = topicData.topic val (topic, partitionId) = topicAndPartition.asTuple
val partitionId = partitionData.partition val currentOffset = fetchMap.get(topicAndPartition)
val key = (topic, partitionId)
val currentOffset = fetchMap.get(key)
if (currentOffset.isDefined) { if (currentOffset.isDefined) {
partitionData.error match { partitionData.error match {
case ErrorMapping.NoError => case ErrorMapping.NoError =>
processPartitionData(topic, currentOffset.get, partitionData) processPartitionData(topic, currentOffset.get, partitionData)
val validBytes = partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes val validBytes = partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes
val newOffset = currentOffset.get + validBytes val newOffset = currentOffset.get + partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes
fetchMap.put(key, newOffset) fetchMap.put(topicAndPartition, newOffset)
FetcherLagMetrics.getFetcherLagMetrics(topic, partitionId).lag = partitionData.hw - newOffset FetcherLagMetrics.getFetcherLagMetrics(topic, partitionId).lag = partitionData.hw - newOffset
fetcherMetrics.byteRate.mark(validBytes) fetcherMetrics.byteRate.mark(validBytes)
case ErrorMapping.OffsetOutOfRangeCode => case ErrorMapping.OffsetOutOfRangeCode =>
val newOffset = handleOffsetOutOfRange(topic, partitionId) val newOffset = handleOffsetOutOfRange(topic, partitionId)
fetchMap.put(key, newOffset) fetchMap.put(topicAndPartition, newOffset)
warn("current offset %d for topic %s partition %d out of range; reset offset to %d" warn("current offset %d for topic %s partition %d out of range; reset offset to %d"
.format(currentOffset.get, topic, partitionId, newOffset)) .format(currentOffset.get, topic, partitionId, newOffset))
case _ => case _ =>
error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.host), error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.host),
ErrorMapping.exceptionFor(partitionData.error)) ErrorMapping.exceptionFor(partitionData.error))
partitionsWithError += key partitionsWithError += topicAndPartition
fetchMap.remove(key) fetchMap.remove(topicAndPartition)
}
} }
} }
} }
} }
} }
if (partitionsWithError.size > 0) { if (partitionsWithError.size > 0) {
debug("handling partitions with error for %s".format(partitionsWithError)) debug("handling partitions with error for %s".format(partitionsWithError))
handlePartitionsWithErrors(partitionsWithError) handlePartitionsWithErrors(partitionsWithError)
@ -129,19 +130,19 @@ abstract class AbstractFetcherThread(name: String, sourceBroker: Broker, socket
def addPartition(topic: String, partitionId: Int, initialOffset: Long) { def addPartition(topic: String, partitionId: Int, initialOffset: Long) {
fetchMapLock synchronized { fetchMapLock synchronized {
fetchMap.put((topic, partitionId), initialOffset) fetchMap.put(TopicAndPartition(topic, partitionId), initialOffset)
} }
} }
def removePartition(topic: String, partitionId: Int) { def removePartition(topic: String, partitionId: Int) {
fetchMapLock synchronized { fetchMapLock synchronized {
fetchMap.remove((topic, partitionId)) fetchMap.remove(TopicAndPartition(topic, partitionId))
} }
} }
def hasPartition(topic: String, partitionId: Int): Boolean = { def hasPartition(topic: String, partitionId: Int): Boolean = {
fetchMapLock synchronized { fetchMapLock synchronized {
fetchMap.get((topic, partitionId)).isDefined fetchMap.get(TopicAndPartition(topic, partitionId)).isDefined
} }
} }

View File

@ -20,7 +20,6 @@ package kafka.server
import java.io.IOException import java.io.IOException
import kafka.admin.{CreateTopicCommand, AdminUtils} import kafka.admin.{CreateTopicCommand, AdminUtils}
import kafka.api._ import kafka.api._
import kafka.common._
import kafka.message._ import kafka.message._
import kafka.network._ import kafka.network._
import kafka.utils.{TopicNameValidator, Pool, SystemTime, Logging} import kafka.utils.{TopicNameValidator, Pool, SystemTime, Logging}
@ -32,6 +31,7 @@ import kafka.network.RequestChannel.Response
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import kafka.metrics.KafkaMetricsGroup import kafka.metrics.KafkaMetricsGroup
import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.ZkClient
import kafka.common._
/** /**
* Logic to handle the various Kafka requests * Logic to handle the various Kafka requests
@ -40,13 +40,14 @@ class KafkaApis(val requestChannel: RequestChannel,
val replicaManager: ReplicaManager, val replicaManager: ReplicaManager,
val zkClient: ZkClient, val zkClient: ZkClient,
brokerId: Int) extends Logging { brokerId: Int) extends Logging {
private val producerRequestPurgatory = new ProducerRequestPurgatory(brokerId)
private val fetchRequestPurgatory = new FetchRequestPurgatory(brokerId, requestChannel) private val producerRequestPurgatory = new ProducerRequestPurgatory
private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel)
private val delayedRequestMetrics = new DelayedRequestMetrics private val delayedRequestMetrics = new DelayedRequestMetrics
private val topicNameValidator = new TopicNameValidator(replicaManager.config.maxTopicNameLength) private val topicNameValidator = new TopicNameValidator(replicaManager.config.maxTopicNameLength)
private val requestLogger = Logger.getLogger("kafka.request.logger") private val requestLogger = Logger.getLogger("kafka.request.logger")
this.logIdent = "[KafkaApi on Broker " + brokerId + "], " this.logIdent = "[KafkaApi-%d] ".format(brokerId)
/** /**
* Top-level method that handles all requests and multiplexes to the right api * Top-level method that handles all requests and multiplexes to the right api
@ -93,18 +94,18 @@ class KafkaApis(val requestChannel: RequestChannel,
} }
/** /**
* Check if the partitionDataArray from a produce request can unblock any * Check if a partitionData from a produce request can unblock any
* DelayedFetch requests. * DelayedFetch requests.
*/ */
def maybeUnblockDelayedFetchRequests(topic: String, partitionDatas: Array[PartitionData]) { def maybeUnblockDelayedFetchRequests(topic: String, partitionData: PartitionData) {
var satisfied = new mutable.ArrayBuffer[DelayedFetch] val partition = partitionData.partition
for(partitionData <- partitionDatas) val satisfied = fetchRequestPurgatory.update(RequestKey(topic, partition), null)
satisfied ++= fetchRequestPurgatory.update(RequestKey(topic, partitionData.partition), null) trace("Producer request to (%s-%d) unblocked %d fetch requests.".format(topic, partition, satisfied.size))
trace("Producer request to %s unblocked %d fetch requests.".format(topic, satisfied.size))
// send any newly unblocked responses // send any newly unblocked responses
for(fetchReq <- satisfied) { for(fetchReq <- satisfied) {
val topicData = readMessageSets(fetchReq.fetch) val topicData = readMessageSets(fetchReq.fetch)
val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData) val response = FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData)
requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response))) requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response)))
} }
} }
@ -119,28 +120,25 @@ class KafkaApis(val requestChannel: RequestChannel,
requestLogger.trace("Handling producer request " + request.toString) requestLogger.trace("Handling producer request " + request.toString)
trace("Handling producer request " + request.toString) trace("Handling producer request " + request.toString)
val response = produceToLocalLog(produceRequest) val localProduceResponse = produceToLocalLog(produceRequest)
debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime)) debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
val partitionsInError = response.errors.count(_ != ErrorMapping.NoError)
for (topicData <- produceRequest.data) val numPartitionsInError = localProduceResponse.status.count(_._2.error != ErrorMapping.NoError)
maybeUnblockDelayedFetchRequests(topicData.topic, topicData.partitionDataArray) produceRequest.data.foreach(partitionAndData =>
maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._2))
if (produceRequest.requiredAcks == 0 || produceRequest.requiredAcks == 1 || if (produceRequest.requiredAcks == 0 ||
produceRequest.data.size <= 0 || partitionsInError == response.errors.size) produceRequest.requiredAcks == 1 ||
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) produceRequest.numPartitions <= 0 ||
numPartitionsInError == produceRequest.numPartitions)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(localProduceResponse)))
else { else {
// create a list of (topic, partition) pairs to use as keys for this delayed request // create a list of (topic, partition) pairs to use as keys for this delayed request
val producerRequestKeys = produceRequest.data.flatMap(topicData => { val producerRequestKeys = produceRequest.data.keys.map(
val topic = topicData.topic topicAndPartition => new RequestKey(topicAndPartition)).toSeq
topicData.partitionDataArray.map(partitionData => {
RequestKey(topic, partitionData.partition)
})
})
val delayedProduce = new DelayedProduce( val delayedProduce = new DelayedProduce(
producerRequestKeys, request, producerRequestKeys, request, localProduceResponse,
response.errors, response.offsets,
produceRequest, produceRequest.ackTimeoutMs.toLong) produceRequest, produceRequest.ackTimeoutMs.toLong)
producerRequestPurgatory.watch(delayedProduce) producerRequestPurgatory.watch(delayedProduce)
@ -164,43 +162,41 @@ class KafkaApis(val requestChannel: RequestChannel,
*/ */
private def produceToLocalLog(request: ProducerRequest): ProducerResponse = { private def produceToLocalLog(request: ProducerRequest): ProducerResponse = {
trace("Produce [%s] to local log ".format(request.toString)) trace("Produce [%s] to local log ".format(request.toString))
val requestSize = request.topicPartitionCount
val errors = new Array[Short](requestSize)
val offsets = new Array[Long](requestSize)
var msgIndex = -1 val localErrorsAndOffsets = request.data.map (topicAndPartitionData => {
for(topicData <- request.data) { val (topic, partitionData) = (topicAndPartitionData._1.topic, topicAndPartitionData._2)
for(partitionData <- topicData.partitionDataArray) { BrokerTopicStat.getBrokerTopicStat(topic).bytesInRate.mark(partitionData.messages.sizeInBytes)
msgIndex += 1
BrokerTopicStat.getBrokerTopicStat(topicData.topic).bytesInRate.mark(partitionData.messages.sizeInBytes)
BrokerTopicStat.getBrokerAllTopicStat.bytesInRate.mark(partitionData.messages.sizeInBytes) BrokerTopicStat.getBrokerAllTopicStat.bytesInRate.mark(partitionData.messages.sizeInBytes)
try { try {
val localReplica = replicaManager.getLeaderReplicaIfLocal(topicData.topic, partitionData.partition) val localReplica = replicaManager.getLeaderReplicaIfLocal(topic, partitionData.partition)
val log = localReplica.log.get val log = localReplica.log.get
log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet]) log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
// we may need to increment high watermark since ISR could be down to 1 // we may need to increment high watermark since ISR could be down to 1
localReplica.partition.maybeIncrementLeaderHW(localReplica) localReplica.partition.maybeIncrementLeaderHW(localReplica)
offsets(msgIndex) = log.logEndOffset val responseStatus = ProducerResponseStatus(ErrorMapping.NoError, log.logEndOffset)
errors(msgIndex) = ErrorMapping.NoError.toShort
trace("%d bytes written to logs, nextAppendOffset = %d" trace("%d bytes written to logs, nextAppendOffset = %d"
.format(partitionData.messages.sizeInBytes, offsets(msgIndex))) .format(partitionData.messages.sizeInBytes, responseStatus.nextOffset))
(TopicAndPartition(topic, partitionData.partition), responseStatus)
} catch { } catch {
case e => case e: Throwable =>
BrokerTopicStat.getBrokerTopicStat(topicData.topic).failedProduceRequestRate.mark() BrokerTopicStat.getBrokerTopicStat(topic).failedProduceRequestRate.mark()
BrokerTopicStat.getBrokerAllTopicStat.failedProduceRequestRate.mark() BrokerTopicStat.getBrokerAllTopicStat.failedProduceRequestRate.mark()
error("Error processing ProducerRequest on %s:%d".format(topicData.topic, partitionData.partition), e) error("Error processing ProducerRequest on %s:%d".format(topic, partitionData.partition), e)
e match { e match {
case _: IOException => case _: IOException =>
fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e) fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e)
System.exit(1) // compiler requires scala.sys.exit (not System.exit).
exit(1)
case _ => case _ =>
errors(msgIndex) = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort val (error, offset) = (ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1L)
offsets(msgIndex) = -1 (TopicAndPartition(topic, partitionData.partition), ProducerResponseStatus(error, offset))
} }
} }
} }
} )
new ProducerResponse(request.versionId, request.correlationId, errors, offsets)
ProducerResponse(request.versionId, request.correlationId, localErrorsAndOffsets)
} }
/** /**
@ -212,27 +208,17 @@ class KafkaApis(val requestChannel: RequestChannel,
requestLogger.trace("Handling fetch request " + fetchRequest.toString) requestLogger.trace("Handling fetch request " + fetchRequest.toString)
trace("Handling fetch request " + fetchRequest.toString) trace("Handling fetch request " + fetchRequest.toString)
// validate the request
try {
fetchRequest.validate()
} catch {
case e:FetchRequestFormatException =>
val response = new FetchResponse(fetchRequest.versionId, fetchRequest.correlationId, Array.empty)
val channelResponse = new RequestChannel.Response(request, new FetchResponseSend(response))
requestChannel.sendResponse(channelResponse)
}
if(fetchRequest.isFromFollower) { if(fetchRequest.isFromFollower) {
maybeUpdatePartitionHW(fetchRequest) maybeUpdatePartitionHW(fetchRequest)
// after updating HW, some delayed produce requests may be unblocked // after updating HW, some delayed produce requests may be unblocked
var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce] var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce]
fetchRequest.offsetInfo.foreach(topicOffsetInfo => { fetchRequest.requestInfo.foreach {
topicOffsetInfo.partitions.foreach(partition => { case (topicAndPartition, _) =>
val key = RequestKey(topicOffsetInfo.topic, partition) val key = new RequestKey(topicAndPartition)
satisfiedProduceRequests ++= producerRequestPurgatory.update(key, key) satisfiedProduceRequests ++= producerRequestPurgatory.update(key, key)
}) }
}) debug("Replica %d fetch unblocked %d producer requests."
debug("Replica %d fetch unblocked %d producer requests.".format(fetchRequest.replicaId, satisfiedProduceRequests.size)) .format(fetchRequest.replicaId, satisfiedProduceRequests.size))
satisfiedProduceRequests.foreach(_.respond()) satisfiedProduceRequests.foreach(_.respond())
} }
@ -243,13 +229,13 @@ class KafkaApis(val requestChannel: RequestChannel,
fetchRequest.numPartitions <= 0) { fetchRequest.numPartitions <= 0) {
val topicData = readMessageSets(fetchRequest) val topicData = readMessageSets(fetchRequest)
debug("Returning fetch response %s for fetch request with correlation id %d".format( debug("Returning fetch response %s for fetch request with correlation id %d".format(
topicData.map(_.partitionDataArray.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId)) topicData.values.map(_.error).mkString(","), fetchRequest.correlationId))
val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData) val response = FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData)
requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
} else { } else {
debug("Putting fetch request into purgatory") debug("Putting fetch request into purgatory")
// create a list of (topic, partition) pairs to use as keys for this delayed request // create a list of (topic, partition) pairs to use as keys for this delayed request
val delayedFetchKeys = fetchRequest.offsetInfo.flatMap(o => o.partitions.map(RequestKey(o.topic, _))) val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new RequestKey(_))
val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest, fetchRequest.maxWait) val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest, fetchRequest.maxWait)
fetchRequestPurgatory.watch(delayedFetch) fetchRequestPurgatory.watch(delayedFetch)
} }
@ -259,60 +245,54 @@ class KafkaApis(val requestChannel: RequestChannel,
* Calculate the number of available bytes for the given fetch request * Calculate the number of available bytes for the given fetch request
*/ */
private def availableFetchBytes(fetchRequest: FetchRequest): Long = { private def availableFetchBytes(fetchRequest: FetchRequest): Long = {
var totalBytes = 0L val totalBytes = fetchRequest.requestInfo.foldLeft(0L)((folded, curr) => {
for(offsetDetail <- fetchRequest.offsetInfo) { folded +
for(i <- 0 until offsetDetail.partitions.size) { {
debug("Fetching log for topic %s partition %d".format(offsetDetail.topic, offsetDetail.partitions(i))) val (topic, partition) = (curr._1.topic, curr._1.partition)
val (offset, fetchSize) = (curr._2.offset, curr._2.fetchSize)
debug("Fetching log for topic %s partition %d".format(topic, partition))
try { try {
val leader = replicaManager.getLeaderReplicaIfLocal(offsetDetail.topic, offsetDetail.partitions(i)) val leader = replicaManager.getLeaderReplicaIfLocal(topic, partition)
val end = if (!fetchRequest.isFromFollower) { val end = if (!fetchRequest.isFromFollower) {
leader.highWatermark leader.highWatermark
} else { } else {
leader.logEndOffset leader.logEndOffset
} }
val available = max(0, end - offsetDetail.offsets(i)) val available = max(0, end - offset)
totalBytes += math.min(offsetDetail.fetchSizes(i), available) math.min(fetchSize, available)
} catch { } catch {
case e: UnknownTopicOrPartitionException => case e: UnknownTopicOrPartitionException =>
info("Invalid partition %d in fetch request from client %d." info("Invalid partition %d in fetch request from client %s."
.format(offsetDetail.partitions(i), fetchRequest.clientId)) .format(partition, fetchRequest.clientId))
0
case e => case e =>
error("Error determining available fetch bytes for topic %s partition %s on broker %s for client %s" error("Error determining available fetch bytes for topic %s partition %s on broker %s for client %s"
.format(offsetDetail.topic, offsetDetail.partitions(i), brokerId, fetchRequest.clientId), e) .format(topic, partition, brokerId, fetchRequest.clientId), e)
} 0
} }
} }
})
trace(totalBytes + " available bytes for fetch request.") trace(totalBytes + " available bytes for fetch request.")
totalBytes totalBytes
} }
private def maybeUpdatePartitionHW(fetchRequest: FetchRequest) { private def maybeUpdatePartitionHW(fetchRequest: FetchRequest) {
val offsets = fetchRequest.offsetInfo debug("Maybe update partition HW due to fetch request: %s ".format(fetchRequest))
debug("Act on update partition HW, check offset detail: %s ".format(offsets)) fetchRequest.requestInfo.foreach(info => {
for(offsetDetail <- offsets) { val (topic, partition, offset) = (info._1.topic, info._1.partition, info._2.offset)
val topic = offsetDetail.topic
val (partitions, offsets) = (offsetDetail.partitions, offsetDetail.offsets)
for( (partition, offset) <- (partitions, offsets).zipped.map((_,_))) {
replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset) replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset)
} })
}
} }
/** /**
* Read from all the offset details given and produce an array of topic datas * Read from all the offset details given and return a map of
* (topic, partition) -> PartitionData
*/ */
private def readMessageSets(fetchRequest: FetchRequest): Array[TopicData] = { private def readMessageSets(fetchRequest: FetchRequest) = {
val offsets = fetchRequest.offsetInfo val isFetchFromFollower = fetchRequest.isFromFollower
val fetchedData = new mutable.ArrayBuffer[TopicData]() fetchRequest.requestInfo.map {
case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) =>
for(offsetDetail <- offsets) { val partitionData = try {
val info = new mutable.ArrayBuffer[PartitionData]()
val topic = offsetDetail.topic
val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes)
for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) {
val isFetchFromFollower = fetchRequest.isFromFollower()
val partitionInfo =
try {
val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, isFetchFromFollower) val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, isFetchFromFollower)
BrokerTopicStat.getBrokerTopicStat(topic).bytesOutRate.mark(messages.sizeInBytes) BrokerTopicStat.getBrokerTopicStat(topic).bytesOutRate.mark(messages.sizeInBytes)
BrokerTopicStat.getBrokerAllTopicStat.bytesOutRate.mark(messages.sizeInBytes) BrokerTopicStat.getBrokerAllTopicStat.bytesOutRate.mark(messages.sizeInBytes)
@ -327,18 +307,15 @@ class KafkaApis(val requestChannel: RequestChannel,
} }
} }
catch { catch {
case e => case t: Throwable =>
BrokerTopicStat.getBrokerTopicStat(topic).failedFetchRequestRate.mark() BrokerTopicStat.getBrokerTopicStat(topic).failedFetchRequestRate.mark()
BrokerTopicStat.getBrokerAllTopicStat.failedFetchRequestRate.mark() BrokerTopicStat.getBrokerAllTopicStat.failedFetchRequestRate.mark()
error("error when processing request " + (topic, partition, offset, fetchSize), e) error("error when processing request " + (topic, partition, offset, fetchSize), t)
new PartitionData(partition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), new PartitionData(partition, ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]),
offset, -1L, MessageSet.Empty) offset, -1L, MessageSet.Empty)
} }
info.append(partitionInfo) (TopicAndPartition(topic, partition), partitionData)
} }
fetchedData.append(new TopicData(topic, info.toArray))
}
fetchedData.toArray
} }
/** /**
@ -454,8 +431,14 @@ class KafkaApis(val requestChannel: RequestChannel,
private [kafka] case class RequestKey(topic: String, partition: Int) private [kafka] case class RequestKey(topic: String, partition: Int)
extends MetricKey { extends MetricKey {
def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition)
def topicAndPartition = TopicAndPartition(topic, partition)
override def keyLabel = "%s-%d".format(topic, partition) override def keyLabel = "%s-%d".format(topic, partition)
} }
/** /**
* A delayed fetch request * A delayed fetch request
*/ */
@ -465,9 +448,9 @@ class KafkaApis(val requestChannel: RequestChannel,
/** /**
* A holding pen for fetch requests waiting to be satisfied * A holding pen for fetch requests waiting to be satisfied
*/ */
class FetchRequestPurgatory(brokerId: Int, requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, Null](brokerId) { class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, Null](brokerId) {
this.logIdent = "[FetchRequestPurgatory-%d], ".format(brokerId) this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId)
/** /**
* A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
@ -480,7 +463,7 @@ class KafkaApis(val requestChannel: RequestChannel,
*/ */
def expire(delayed: DelayedFetch) { def expire(delayed: DelayedFetch) {
val topicData = readMessageSets(delayed.fetch) val topicData = readMessageSets(delayed.fetch)
val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData) val response = FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData)
val fromFollower = delayed.fetch.isFromFollower val fromFollower = delayed.fetch.isFromFollower
delayedRequestMetrics.recordDelayedFetchExpired(fromFollower) delayedRequestMetrics.recordDelayedFetchExpired(fromFollower)
requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response))) requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response)))
@ -489,48 +472,43 @@ class KafkaApis(val requestChannel: RequestChannel,
class DelayedProduce(keys: Seq[RequestKey], class DelayedProduce(keys: Seq[RequestKey],
request: RequestChannel.Request, request: RequestChannel.Request,
localErrors: Array[Short], localProduceResponse: ProducerResponse,
requiredOffsets: Array[Long],
val produce: ProducerRequest, val produce: ProducerRequest,
delayMs: Long) delayMs: Long)
extends DelayedRequest(keys, request, delayMs) with Logging { extends DelayedRequest(keys, request, delayMs) with Logging {
private val initialErrorsAndOffsets = localProduceResponse.status
/** /**
* Map of (topic, partition) -> partition status * Map of (topic, partition) -> partition status
* The values in this map don't need to be synchronized since updates to the * The values in this map don't need to be synchronized since updates to the
* values are effectively synchronized by the ProducerRequestPurgatory's * values are effectively synchronized by the ProducerRequestPurgatory's
* update method * update method
*/ */
private [kafka] val partitionStatus = keys.map(key => { private [kafka] val partitionStatus = keys.map(requestKey => {
val keyIndex = keys.indexOf(key) val producerResponseStatus = initialErrorsAndOffsets(TopicAndPartition(requestKey.topic, requestKey.partition))
// if there was an error in writing to the local replica's log, then don't // if there was an error in writing to the local replica's log, then don't
// wait for acks on this partition // wait for acks on this partition
val acksPending = val (acksPending, error, nextOffset) =
if (localErrors(keyIndex) == ErrorMapping.NoError) { if (producerResponseStatus.error == ErrorMapping.NoError) {
// Timeout error state will be cleared when requiredAcks are received // Timeout error state will be cleared when requiredAcks are received
localErrors(keyIndex) = ErrorMapping.RequestTimedOutCode (true, ErrorMapping.RequestTimedOutCode, producerResponseStatus.nextOffset)
true
} }
else else (false, producerResponseStatus.error, producerResponseStatus.nextOffset)
false
val initialStatus = new PartitionStatus(acksPending, localErrors(keyIndex), requiredOffsets(keyIndex)) val initialStatus = PartitionStatus(acksPending, error, nextOffset)
trace("Initial partition status for %s = %s".format(key, initialStatus)) trace("Initial partition status for %s = %s".format(requestKey.keyLabel, initialStatus))
(key, initialStatus) (requestKey, initialStatus)
}).toMap }).toMap
def respond() { def respond() {
val errorsAndOffsets: (List[Short], List[Long]) = (
keys.foldRight val finalErrorsAndOffsets = initialErrorsAndOffsets.map(
((List[Short](), List[Long]())) status => {
((key: RequestKey, result: (List[Short], List[Long])) => { val pstat = partitionStatus(new RequestKey(status._1))
val status = partitionStatus(key) (status._1, ProducerResponseStatus(pstat.error, pstat.requiredOffset))
(status.error :: result._1, status.requiredOffset :: result._2)
}) })
)
val response = new ProducerResponse(produce.versionId, produce.correlationId, val response = ProducerResponse(produce.versionId, produce.correlationId, finalErrorsAndOffsets)
errorsAndOffsets._1.toArray, errorsAndOffsets._2.toArray)
requestChannel.sendResponse(new RequestChannel.Response( requestChannel.sendResponse(new RequestChannel.Response(
request, new BoundedByteBufferSend(response))) request, new BoundedByteBufferSend(response)))
@ -565,9 +543,8 @@ class KafkaApis(val requestChannel: RequestChannel,
fetchPartitionStatus.error = ErrorMapping.NoError fetchPartitionStatus.error = ErrorMapping.NoError
} }
if (!fetchPartitionStatus.acksPending) { if (!fetchPartitionStatus.acksPending) {
val topicData = produce.data.find(_.topic == topic).get val partitionData = produce.data(followerFetchRequestKey.topicAndPartition)
val partitionData = topicData.partitionDataArray.find(_.partition == partitionId).get maybeUnblockDelayedFetchRequests(topic, partitionData)
maybeUnblockDelayedFetchRequests(topic, Array(partitionData))
} }
} }
@ -576,9 +553,9 @@ class KafkaApis(val requestChannel: RequestChannel,
satisfied satisfied
} }
class PartitionStatus(var acksPending: Boolean, case class PartitionStatus(var acksPending: Boolean,
var error: Short, var error: Short,
val requiredOffset: Long) { requiredOffset: Long) {
def setThisBrokerNotLeader() { def setThisBrokerNotLeader() {
error = ErrorMapping.NotLeaderForPartitionCode error = ErrorMapping.NotLeaderForPartitionCode
acksPending = false acksPending = false
@ -594,9 +571,9 @@ class KafkaApis(val requestChannel: RequestChannel,
/** /**
* A holding pen for produce requests waiting to be satisfied. * A holding pen for produce requests waiting to be satisfied.
*/ */
private [kafka] class ProducerRequestPurgatory(brokerId: Int) extends RequestPurgatory[DelayedProduce, RequestKey](brokerId) { private [kafka] class ProducerRequestPurgatory extends RequestPurgatory[DelayedProduce, RequestKey](brokerId) {
this.logIdent = "[ProducerRequestPurgatory-%d], ".format(brokerId) this.logIdent = "[ProducerRequestPurgatory-%d] ".format(brokerId)
protected def checkSatisfied(followerFetchRequestKey: RequestKey, protected def checkSatisfied(followerFetchRequestKey: RequestKey,
delayedProduce: DelayedProduce) = delayedProduce: DelayedProduce) =

View File

@ -23,7 +23,7 @@ class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val r
extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId + ", ", brokerConfig.numReplicaFetchers) { extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId + ", ", brokerConfig.numReplicaFetchers) {
override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = { override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d on broker %d, ".format(sourceBroker.id, fetcherId, brokerConfig.brokerId), sourceBroker, brokerConfig, replicaMgr) new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d-on-broker-%d-".format(sourceBroker.id, fetcherId, brokerConfig.brokerId), sourceBroker, brokerConfig, replicaMgr)
} }
def shutdown() { def shutdown() {

View File

@ -20,6 +20,8 @@ package kafka.server
import kafka.api.{OffsetRequest, PartitionData} import kafka.api.{OffsetRequest, PartitionData}
import kafka.cluster.Broker import kafka.cluster.Broker
import kafka.message.ByteBufferMessageSet import kafka.message.ByteBufferMessageSet
import kafka.common.TopicAndPartition
class ReplicaFetcherThread(name:String, sourceBroker: Broker, brokerConfig: KafkaConfig, replicaMgr: ReplicaManager) class ReplicaFetcherThread(name:String, sourceBroker: Broker, brokerConfig: KafkaConfig, replicaMgr: ReplicaManager)
extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout = brokerConfig.replicaSocketTimeoutMs, extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout = brokerConfig.replicaSocketTimeoutMs,
@ -56,7 +58,7 @@ class ReplicaFetcherThread(name:String, sourceBroker: Broker, brokerConfig: Kafk
} }
// any logic for partitions whose leader has changed // any logic for partitions whose leader has changed
def handlePartitionsWithErrors(partitions: Iterable[(String, Int)]) { def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) {
// no handler needed since the controller will make the changes accordingly // no handler needed since the controller will make the changes accordingly
} }
} }

View File

@ -367,7 +367,7 @@ object Utils extends Logging {
/** /**
* Read an unsigned integer from the given position without modifying the buffers * Read an unsigned integer from the given position without modifying the buffers
* position * position
* @param The buffer to read from * @param buffer the buffer to read from
* @param index the index from which to read the integer * @param index the index from which to read the integer
* @return The integer read, as a long to avoid signedness * @return The integer read, as a long to avoid signedness
*/ */

View File

@ -72,7 +72,7 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness
// send an invalid offset // send an invalid offset
try { try {
val fetchedWithError = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, -1, 10000).build()) val fetchedWithError = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, -1, 10000).build())
fetchedWithError.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))) fetchedWithError.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
fail("Expected an OffsetOutOfRangeException exception to be thrown") fail("Expected an OffsetOutOfRangeException exception to be thrown")
} catch { } catch {
case e: OffsetOutOfRangeException => case e: OffsetOutOfRangeException =>
@ -101,7 +101,6 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness
} }
} }
{
// send some invalid offsets // send some invalid offsets
val builder = new FetchRequestBuilder() val builder = new FetchRequestBuilder()
for( (topic, offset) <- topicOffsets ) for( (topic, offset) <- topicOffsets )
@ -109,15 +108,15 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness
val request = builder.build() val request = builder.build()
val responses = consumer.fetch(request) val responses = consumer.fetch(request)
for(topicData <- responses.data) { responses.data.values.foreach(pd => {
try { try {
topicData.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)) ErrorMapping.maybeThrowException(pd.error)
fail("Expected an OffsetOutOfRangeException exception to be thrown") fail("Expected an OffsetOutOfRangeException exception to be thrown")
} catch { } catch {
case e: OffsetOutOfRangeException => case e: OffsetOutOfRangeException =>
} }
} })
}
} }
def testMultiProduce() { def testMultiProduce() {

View File

@ -19,7 +19,7 @@ package kafka.integration
import java.nio.ByteBuffer import java.nio.ByteBuffer
import junit.framework.Assert._ import junit.framework.Assert._
import kafka.api.{OffsetDetail, FetchRequest, FetchRequestBuilder} import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder}
import kafka.server.{KafkaRequestHandler, KafkaConfig} import kafka.server.{KafkaRequestHandler, KafkaConfig}
import java.util.Properties import java.util.Properties
import kafka.producer.{ProducerData, Producer, ProducerConfig} import kafka.producer.{ProducerData, Producer, ProducerConfig}
@ -31,8 +31,8 @@ import org.I0Itec.zkclient.ZkClient
import kafka.zk.ZooKeeperTestHarness import kafka.zk.ZooKeeperTestHarness
import org.scalatest.junit.JUnit3Suite import org.scalatest.junit.JUnit3Suite
import scala.collection._ import scala.collection._
import kafka.common.{ErrorMapping, UnknownTopicOrPartitionException, FetchRequestFormatException, OffsetOutOfRangeException}
import kafka.admin.{AdminUtils, CreateTopicCommand} import kafka.admin.{AdminUtils, CreateTopicCommand}
import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException}
/** /**
* End to end tests of the primitive apis against a local server * End to end tests of the primitive apis against a local server
@ -77,27 +77,11 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
assertEquals(request, deserializedRequest) assertEquals(request, deserializedRequest)
} }
def testFetchRequestEnforcesUniqueTopicsForOffsetDetails() {
val offsets = Array(
new OffsetDetail("topic1", Array(0, 1, 2), Array(0L, 0L, 0L), Array(1000, 1000, 1000)),
new OffsetDetail("topic2", Array(0, 1, 2), Array(0L, 0L, 0L), Array(1000, 1000, 1000)),
new OffsetDetail("topic1", Array(3, 4, 5), Array(0L, 0L, 0L), Array(1000, 1000, 1000)),
new OffsetDetail("topic2", Array(3, 4, 5), Array(0L, 0L, 0L), Array(1000, 1000, 1000))
)
val request = new FetchRequest(offsetInfo = offsets)
try {
consumer.fetch(request)
fail("FetchRequest should throw FetchRequestFormatException due to duplicate topics")
} catch {
case e: FetchRequestFormatException => "success"
}
}
def testEmptyFetchRequest() { def testEmptyFetchRequest() {
val offsets = Array[OffsetDetail]() val partitionRequests = immutable.Map[TopicAndPartition, PartitionFetchInfo]()
val request = new FetchRequest(offsetInfo = offsets) val request = new FetchRequest(requestInfo = partitionRequests)
val fetched = consumer.fetch(request) val fetched = consumer.fetch(request)
assertTrue(fetched.errorCode == ErrorMapping.NoError && fetched.data.size == 0) assertTrue(!fetched.hasError && fetched.data.size == 0)
} }
def testDefaultEncoderProducerAndFetch() { def testDefaultEncoderProducerAndFetch() {
@ -189,7 +173,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
try { try {
val request = builder.build() val request = builder.build()
val response = consumer.fetch(request) val response = consumer.fetch(request)
response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))) response.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
fail("Expected exception when fetching message with invalid offset") fail("Expected exception when fetching message with invalid offset")
} catch { } catch {
case e: OffsetOutOfRangeException => "this is good" case e: OffsetOutOfRangeException => "this is good"
@ -205,7 +189,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
try { try {
val request = builder.build() val request = builder.build()
val response = consumer.fetch(request) val response = consumer.fetch(request)
response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))) response.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
fail("Expected exception when fetching message with invalid partition") fail("Expected exception when fetching message with invalid partition")
} catch { } catch {
case e: UnknownTopicOrPartitionException => "this is good" case e: UnknownTopicOrPartitionException => "this is good"
@ -253,7 +237,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
try { try {
val request = builder.build() val request = builder.build()
val response = consumer.fetch(request) val response = consumer.fetch(request)
response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))) response.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
fail("Expected exception when fetching message with invalid offset") fail("Expected exception when fetching message with invalid offset")
} catch { } catch {
case e: OffsetOutOfRangeException => "this is good" case e: OffsetOutOfRangeException => "this is good"
@ -269,7 +253,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
try { try {
val request = builder.build() val request = builder.build()
val response = consumer.fetch(request) val response = consumer.fetch(request)
response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))) response.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
fail("Expected exception when fetching message with invalid partition") fail("Expected exception when fetching message with invalid partition")
} catch { } catch {
case e: UnknownTopicOrPartitionException => "this is good" case e: UnknownTopicOrPartitionException => "this is good"

View File

@ -1,53 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.javaapi.integration
import org.scalatest.junit.JUnit3Suite
import java.util.Properties
import kafka.producer.SyncProducerConfig
import kafka.javaapi.producer.SyncProducer
import kafka.javaapi.consumer.SimpleConsumer
trait ProducerConsumerTestHarness extends JUnit3Suite {
val port: Int
val host = "localhost"
var producer: SyncProducer = null
var consumer: SimpleConsumer = null
override def setUp() {
val props = new Properties()
props.put("host", host)
props.put("port", port.toString)
props.put("buffer.size", "65536")
props.put("connect.timeout.ms", "100000")
props.put("reconnect.interval", "10000")
producer = new SyncProducer(new SyncProducerConfig(props))
consumer = new SimpleConsumer(host,
port,
1000000,
64*1024)
super.setUp
}
override def tearDown() {
super.tearDown
producer.close()
consumer.close()
}
}

View File

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package kafka.network; package kafka.network
import org.junit._ import org.junit._
import org.scalatest.junit.JUnitSuite import org.scalatest.junit.JUnitSuite
@ -24,31 +24,45 @@ import java.nio.ByteBuffer
import kafka.api._ import kafka.api._
import kafka.message.{Message, ByteBufferMessageSet} import kafka.message.{Message, ByteBufferMessageSet}
import kafka.cluster.Broker import kafka.cluster.Broker
import kafka.common.ErrorMapping
import collection.mutable._ import collection.mutable._
import kafka.common.{TopicAndPartition, ErrorMapping}
object RpcDataSerializationTestUtils{ object RpcDataSerializationTestUtils{
private val topic1 = "test1" private val topic1 = "test1"
private val topic2 = "test2" private val topic2 = "test2"
private val leader1 = 0; private val leader1 = 0
private val isr1 = List(0, 1, 2) private val isr1 = List(0, 1, 2)
private val leader2 = 0; private val leader2 = 0
private val isr2 = List(0, 2, 3) private val isr2 = List(0, 2, 3)
private val partitionData0 = new PartitionData(0, new ByteBufferMessageSet(new Message("first message".getBytes))) private val partitionData0 = new PartitionData(0, new ByteBufferMessageSet(new Message("first message".getBytes)))
private val partitionData1 = new PartitionData(1, new ByteBufferMessageSet(new Message("second message".getBytes))) private val partitionData1 = new PartitionData(1, new ByteBufferMessageSet(new Message("second message".getBytes)))
private val partitionData2 = new PartitionData(2, new ByteBufferMessageSet(new Message("third message".getBytes))) private val partitionData2 = new PartitionData(2, new ByteBufferMessageSet(new Message("third message".getBytes)))
private val partitionData3 = new PartitionData(3, new ByteBufferMessageSet(new Message("fourth message".getBytes))) private val partitionData3 = new PartitionData(3, new ByteBufferMessageSet(new Message("fourth message".getBytes)))
private val partitionDataArray = Array(partitionData0, partitionData1, partitionData2, partitionData3) private val partitionDataArray = Array(partitionData0, partitionData1, partitionData2, partitionData3)
private val topicData1 = new TopicData(topic1, partitionDataArray)
private val topicData2 = new TopicData(topic2, partitionDataArray) private val topicData = {
private val topicDataArray = Array(topicData1, topicData2) val groupedData = Array(topic1, topic2).flatMap(topic =>
private val offsetDetail1 = new OffsetDetail(topic1, Seq(0, 1, 2, 3), Seq(1000, 2000, 3000, 4000), Seq(100, 100, 100, 100)) partitionDataArray.map(partitionData =>
private val offsetDetail2 = new OffsetDetail(topic2, Seq(0, 1, 2, 3), Seq(1000, 2000, 3000, 4000), Seq(100, 100, 100, 100)) (TopicAndPartition(topic, partitionData.partition), partitionData)))
private val offsetDetailSeq = Seq(offsetDetail1, offsetDetail2) collection.immutable.Map(groupedData:_*)
private val partitionMetaData0 = new PartitionMetadata(0, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty) }
private val partitionMetaData1 = new PartitionMetadata(1, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty)
private val partitionMetaData2 = new PartitionMetadata(2, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty) private val requestInfos = collection.immutable.Map(
private val partitionMetaData3 = new PartitionMetadata(3, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty) TopicAndPartition(topic1, 0) -> PartitionFetchInfo(1000, 100),
TopicAndPartition(topic1, 1) -> PartitionFetchInfo(2000, 100),
TopicAndPartition(topic1, 2) -> PartitionFetchInfo(3000, 100),
TopicAndPartition(topic1, 3) -> PartitionFetchInfo(4000, 100),
TopicAndPartition(topic2, 0) -> PartitionFetchInfo(1000, 100),
TopicAndPartition(topic2, 1) -> PartitionFetchInfo(2000, 100),
TopicAndPartition(topic2, 2) -> PartitionFetchInfo(3000, 100),
TopicAndPartition(topic2, 3) -> PartitionFetchInfo(4000, 100)
)
private val partitionMetaData0 = new PartitionMetadata(0, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty)
private val partitionMetaData1 = new PartitionMetadata(1, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty)
private val partitionMetaData2 = new PartitionMetadata(2, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty)
private val partitionMetaData3 = new PartitionMetadata(3, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty)
private val partitionMetaDataSeq = Seq(partitionMetaData0, partitionMetaData1, partitionMetaData2, partitionMetaData3) private val partitionMetaDataSeq = Seq(partitionMetaData0, partitionMetaData1, partitionMetaData2, partitionMetaData3)
private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq) private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq)
private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq) private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq)
@ -78,19 +92,21 @@ object RpcDataSerializationTestUtils{
} }
def createTestProducerRequest: ProducerRequest = { def createTestProducerRequest: ProducerRequest = {
new ProducerRequest(1, "client 1", 0, 1000, topicDataArray) new ProducerRequest(1, "client 1", 0, 1000, topicData)
} }
def createTestProducerResponse: ProducerResponse = { def createTestProducerResponse: ProducerResponse =
new ProducerResponse(1, 1, Array(0.toShort, 0.toShort), Array(1000l, 2000l), 0) ProducerResponse(1, 1, Map(
} TopicAndPartition(topic1, 0) -> ProducerResponseStatus(0.toShort, 10001),
TopicAndPartition(topic2, 0) -> ProducerResponseStatus(0.toShort, 20001)
))
def createTestFetchRequest: FetchRequest = { def createTestFetchRequest: FetchRequest = {
new FetchRequest(offsetInfo = offsetDetailSeq) new FetchRequest(requestInfo = requestInfos)
} }
def createTestFetchResponse: FetchResponse = { def createTestFetchResponse: FetchResponse = {
new FetchResponse(1, 1, topicDataArray) FetchResponse(1, 1, topicData)
} }
def createTestOffsetRequest: OffsetRequest = { def createTestOffsetRequest: OffsetRequest = {
@ -154,7 +170,7 @@ class RpcDataSerializationTest extends JUnitSuite {
assertEquals("The original and deserialzed stopReplicaResponse should be the same", stopReplicaResponse, assertEquals("The original and deserialzed stopReplicaResponse should be the same", stopReplicaResponse,
deserializedStopReplicaResponse) deserializedStopReplicaResponse)
buffer = ByteBuffer.allocate(producerRequest.sizeInBytes()) buffer = ByteBuffer.allocate(producerRequest.sizeInBytes)
producerRequest.writeTo(buffer) producerRequest.writeTo(buffer)
buffer.rewind() buffer.rewind()
val deserializedProducerRequest = ProducerRequest.readFrom(buffer) val deserializedProducerRequest = ProducerRequest.readFrom(buffer)

View File

@ -25,8 +25,10 @@ import kafka.utils.TestUtils
import java.util.Random import java.util.Random
import junit.framework.Assert._ import junit.framework.Assert._
import kafka.producer.SyncProducerConfig import kafka.producer.SyncProducerConfig
import kafka.api.{TopicData, ProducerRequest} import kafka.api.{PartitionData, ProducerRequest}
import java.nio.ByteBuffer import java.nio.ByteBuffer
import kafka.common.TopicAndPartition
class SocketServerTest extends JUnitSuite { class SocketServerTest extends JUnitSuite {
@ -75,9 +77,10 @@ class SocketServerTest extends JUnitSuite {
val clientId = SyncProducerConfig.DefaultClientId val clientId = SyncProducerConfig.DefaultClientId
val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
val ack = SyncProducerConfig.DefaultRequiredAcks val ack = SyncProducerConfig.DefaultRequiredAcks
val emptyRequest = new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Array[TopicData]()) val emptyRequest =
new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, PartitionData]())
val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes()) val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes)
emptyRequest.writeTo(byteBuffer) emptyRequest.writeTo(byteBuffer)
byteBuffer.rewind() byteBuffer.rewind()
val serializedBytes = new Array[Byte](byteBuffer.remaining) val serializedBytes = new Array[Byte](byteBuffer.remaining)

View File

@ -201,11 +201,11 @@ class AsyncProducerTest extends JUnit3Suite {
topic2Broker2Data.appendAll(List(new ProducerData[Int,Message]("topic2", 1, new Message("msg2".getBytes)))) topic2Broker2Data.appendAll(List(new ProducerData[Int,Message]("topic2", 1, new Message("msg2".getBytes))))
val expectedResult = Some(Map( val expectedResult = Some(Map(
0 -> Map( 0 -> Map(
("topic1", 0) -> topic1Broker1Data, TopicAndPartition("topic1", 0) -> topic1Broker1Data,
("topic2", 0) -> topic2Broker1Data), TopicAndPartition("topic2", 0) -> topic2Broker1Data),
1 -> Map( 1 -> Map(
("topic1", 1) -> topic1Broker2Data, TopicAndPartition("topic1", 1) -> topic1Broker2Data,
("topic2", 1) -> topic2Broker2Data) TopicAndPartition("topic2", 1) -> topic2Broker2Data)
)) ))
val actualResult = handler.partitionAndCollate(producerDataList) val actualResult = handler.partitionAndCollate(producerDataList)
@ -344,7 +344,7 @@ class AsyncProducerTest extends JUnit3Suite {
partitionedDataOpt match { partitionedDataOpt match {
case Some(partitionedData) => case Some(partitionedData) =>
for ((brokerId, dataPerBroker) <- partitionedData) { for ((brokerId, dataPerBroker) <- partitionedData) {
for ( ((topic, partitionId), dataPerTopic) <- dataPerBroker) for ( (TopicAndPartition(topic, partitionId), dataPerTopic) <- dataPerBroker)
assertTrue(partitionId == 0) assertTrue(partitionId == 0)
} }
case None => case None =>
@ -408,10 +408,12 @@ class AsyncProducerTest extends JUnit3Suite {
// entirely. The second request will succeed for partition 1 but fail for partition 0. // entirely. The second request will succeed for partition 1 but fail for partition 0.
// On the third try for partition 0, let it succeed. // On the third try for partition 0, let it succeed.
val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), TestUtils.messagesToSet(msgs), 0) val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), TestUtils.messagesToSet(msgs), 0)
val response1 = val response1 = ProducerResponse(ProducerRequest.CurrentVersion, 0,
new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(ErrorMapping.NotLeaderForPartitionCode.toShort, 0.toShort), Array(0L, 0L)) Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NotLeaderForPartitionCode.toShort, 0L)),
(TopicAndPartition("topic1", 1), ProducerResponseStatus(ErrorMapping.NoError, 0L))))
val request2 = TestUtils.produceRequest(topic1, 0, TestUtils.messagesToSet(msgs)) val request2 = TestUtils.produceRequest(topic1, 0, TestUtils.messagesToSet(msgs))
val response2 = new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(0.toShort), Array(0L)) val response2 = ProducerResponse(ProducerRequest.CurrentVersion, 0,
Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NoError, 0L))))
val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer]) val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException
EasyMock.expect(mockSyncProducer.send(request1)).andReturn(response1) EasyMock.expect(mockSyncProducer.send(request1)).andReturn(response1)

View File

@ -21,14 +21,14 @@ import java.net.SocketTimeoutException
import java.util.Properties import java.util.Properties
import junit.framework.Assert import junit.framework.Assert
import kafka.admin.CreateTopicCommand import kafka.admin.CreateTopicCommand
import kafka.common.{ErrorMapping, MessageSizeTooLargeException}
import kafka.integration.KafkaServerTestHarness import kafka.integration.KafkaServerTestHarness
import kafka.message.{NoCompressionCodec, DefaultCompressionCodec, Message, ByteBufferMessageSet} import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
import kafka.server.KafkaConfig import kafka.server.KafkaConfig
import kafka.utils.{TestZKUtils, SystemTime, TestUtils} import kafka.utils.{TestZKUtils, SystemTime, TestUtils}
import org.junit.Test import org.junit.Test
import org.scalatest.junit.JUnit3Suite import org.scalatest.junit.JUnit3Suite
import kafka.api.TopicData import kafka.api.{ProducerResponseStatus, PartitionData}
import kafka.common.{TopicAndPartition, ErrorMapping}
class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
private var messageBytes = new Array[Byte](2); private var messageBytes = new Array[Byte](2);
@ -85,11 +85,11 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
val clientId = SyncProducerConfig.DefaultClientId val clientId = SyncProducerConfig.DefaultClientId
val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
val ack = SyncProducerConfig.DefaultRequiredAcks val ack = SyncProducerConfig.DefaultRequiredAcks
val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Array[TopicData]()) val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, PartitionData]())
val producer = new SyncProducer(new SyncProducerConfig(props)) val producer = new SyncProducer(new SyncProducerConfig(props))
val response = producer.send(emptyRequest) val response = producer.send(emptyRequest)
Assert.assertTrue(response.errorCode == ErrorMapping.NoError && response.errors.size == 0 && response.offsets.size == 0) Assert.assertTrue(!response.hasError && response.status.size == 0)
} }
@Test @Test
@ -109,17 +109,17 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1) val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1)
val response1 = producer.send(TestUtils.produceRequest("test", 0, messageSet1)) val response1 = producer.send(TestUtils.produceRequest("test", 0, messageSet1))
Assert.assertEquals(1, response1.errors.length) Assert.assertEquals(1, response1.status.count(_._2.error != ErrorMapping.NoError))
Assert.assertEquals(ErrorMapping.MessageSizeTooLargeCode, response1.errors(0)) Assert.assertEquals(ErrorMapping.MessageSizeTooLargeCode, response1.status(TopicAndPartition("test", 0)).error)
Assert.assertEquals(-1L, response1.offsets(0)) Assert.assertEquals(-1L, response1.status(TopicAndPartition("test", 0)).nextOffset)
val message2 = new Message(new Array[Byte](1000000)) val message2 = new Message(new Array[Byte](1000000))
val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message2) val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message2)
val response2 = producer.send(TestUtils.produceRequest("test", 0, messageSet2)) val response2 = producer.send(TestUtils.produceRequest("test", 0, messageSet2))
Assert.assertEquals(1, response2.errors.length) Assert.assertEquals(1, response1.status.count(_._2.error != ErrorMapping.NoError))
Assert.assertEquals(ErrorMapping.NoError, response2.errors(0)) Assert.assertEquals(ErrorMapping.NoError, response2.status(TopicAndPartition("test", 0)).error)
Assert.assertEquals(messageSet2.sizeInBytes, response2.offsets(0)) Assert.assertEquals(messageSet2.sizeInBytes, response2.status(TopicAndPartition("test", 0)).nextOffset)
} }
@Test @Test
@ -142,10 +142,12 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
Assert.assertNotNull(response) Assert.assertNotNull(response)
Assert.assertEquals(request.correlationId, response.correlationId) Assert.assertEquals(request.correlationId, response.correlationId)
Assert.assertEquals(response.errors.length, response.offsets.length) Assert.assertEquals(3, response.status.size)
Assert.assertEquals(3, response.errors.length) response.status.values.foreach {
response.errors.foreach(Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort, _)) case ProducerResponseStatus(error, nextOffset) =>
response.offsets.foreach(Assert.assertEquals(-1L, _)) Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort, error)
Assert.assertEquals(-1L, nextOffset)
}
// #2 - test that we get correct offsets when partition is owned by broker // #2 - test that we get correct offsets when partition is owned by broker
CreateTopicCommand.createTopic(zkClient, "topic1", 1, 1) CreateTopicCommand.createTopic(zkClient, "topic1", 1, 1)
@ -156,18 +158,18 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
val response2 = producer.send(request) val response2 = producer.send(request)
Assert.assertNotNull(response2) Assert.assertNotNull(response2)
Assert.assertEquals(request.correlationId, response2.correlationId) Assert.assertEquals(request.correlationId, response2.correlationId)
Assert.assertEquals(response2.errors.length, response2.offsets.length) Assert.assertEquals(3, response2.status.size)
Assert.assertEquals(3, response2.errors.length)
// the first and last message should have been accepted by broker // the first and last message should have been accepted by broker
Assert.assertEquals(0, response2.errors(0)) Assert.assertEquals(ErrorMapping.NoError, response2.status(TopicAndPartition("topic1", 0)).error)
Assert.assertEquals(0, response2.errors(2)) Assert.assertEquals(ErrorMapping.NoError, response2.status(TopicAndPartition("topic3", 0)).error)
Assert.assertEquals(messages.sizeInBytes, response2.offsets(0)) Assert.assertEquals(messages.sizeInBytes, response2.status(TopicAndPartition("topic1", 0)).nextOffset)
Assert.assertEquals(messages.sizeInBytes, response2.offsets(2)) Assert.assertEquals(messages.sizeInBytes, response2.status(TopicAndPartition("topic3", 0)).nextOffset)
// the middle message should have been rejected because broker doesn't lead partition // the middle message should have been rejected because broker doesn't lead partition
Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort, response2.errors(1)) Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort,
Assert.assertEquals(-1, response2.offsets(1)) response2.status(TopicAndPartition("topic2", 0)).error)
Assert.assertEquals(-1, response2.status(TopicAndPartition("topic2", 0)).nextOffset)
} }
@Test @Test

View File

@ -34,7 +34,9 @@ import kafka.consumer.ConsumerConfig
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import kafka.api._ import kafka.api._
import collection.mutable.Map
import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder} import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
import kafka.common.TopicAndPartition
/** /**
@ -364,28 +366,10 @@ object TestUtils extends Logging {
val correlationId = SyncProducerConfig.DefaultCorrelationId val correlationId = SyncProducerConfig.DefaultCorrelationId
val clientId = SyncProducerConfig.DefaultClientId val clientId = SyncProducerConfig.DefaultClientId
val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
val data = topics.map(new TopicData(_, partitions.map(new PartitionData(_, message)).toArray)) val data = topics.flatMap(topic =>
new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeoutMs, data.toArray) partitions.map(partition => (TopicAndPartition(topic, partition), new PartitionData(partition, message)))
} )
new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeoutMs, Map(data:_*))
def produceJavaRequest(topic: String, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = {
produceJavaRequest(-1,topic,-1,message)
}
def produceJavaRequest(topic: String, partition: Int, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = {
produceJavaRequest(-1,topic,partition,message)
}
def produceJavaRequest(correlationId: Int, topic: String, partition: Int, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = {
val clientId = "test"
val requiredAcks: Short = 0
val ackTimeoutMs = 0
var data = new Array[TopicData](1)
var partitionData = new Array[PartitionData](1)
partitionData(0) = new PartitionData(partition,message.underlying)
data(0) = new TopicData(topic,partitionData)
val pr = new kafka.javaapi.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeoutMs, data)
pr
} }
def makeLeaderForPartition(zkClient: ZkClient, topic: String, leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int]) { def makeLeaderForPartition(zkClient: ZkClient, topic: String, leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int]) {