diff --git a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java index f0ec56863eb..c8e9c4b56c2 100644 --- a/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java +++ b/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java @@ -255,7 +255,7 @@ public class KafkaETLContext { */ protected boolean hasError(ByteBufferMessageSet messages) throws IOException { - int errorCode = messages.getErrorCode(); + short errorCode = messages.getErrorCode(); if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) { /* offset cannot cross the maximum offset (guaranteed by Kafka protocol). Kafka server may delete old files from time to time */ diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala index 2a21a53ab75..468c7d08faa 100644 --- a/core/src/main/scala/kafka/api/FetchRequest.scala +++ b/core/src/main/scala/kafka/api/FetchRequest.scala @@ -18,7 +18,6 @@ package kafka.api import java.nio.ByteBuffer -import kafka.network.Request import kafka.utils.Utils import scala.collection.mutable.{HashMap, Buffer, ListBuffer} import kafka.common.FetchRequestFormatException @@ -105,7 +104,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion, replicaId: Int = FetchRequest.DefaultReplicaId, maxWait: Int = FetchRequest.DefaultMaxWait, minBytes: Int = FetchRequest.DefaultMinBytes, - offsetInfo: Seq[OffsetDetail] ) extends Request(RequestKeys.Fetch) { + offsetInfo: Seq[OffsetDetail] ) extends RequestOrResponse(Some(RequestKeys.Fetch)) { // ensure that a topic "X" appears in at most one OffsetDetail def validate() { diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index 3a4d9b83225..9ec5c331838 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -26,8 +26,8 @@ import kafka.utils.Utils object PartitionData { def readFrom(buffer: ByteBuffer): PartitionData = { + val error = buffer.getShort val partition = buffer.getInt - val error = buffer.getInt val initialOffset = buffer.getLong val hw = buffer.getLong() val messageSetSize = buffer.getInt @@ -38,21 +38,48 @@ object PartitionData { } } -case class PartitionData(partition: Int, error: Int = ErrorMapping.NoError, initialOffset:Long = 0L, hw: Long = -1L, - messages: MessageSet) { - val sizeInBytes = 4 + 4 + 8 + 4 + messages.sizeInBytes.intValue() + 8 +case class PartitionData(partition: Int, error: Short = ErrorMapping.NoError, initialOffset:Long = 0L, hw: Long = -1L, messages: MessageSet) { + val sizeInBytes = 4 + 2 + 8 + 4 + messages.sizeInBytes.intValue() + 8 def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, -1L, messages) - } -object TopicData { +// SENDS +class PartitionDataSend(val partitionData: PartitionData) extends Send { + private val messageSize = partitionData.messages.sizeInBytes + private var messagesSentSize = 0L + + private val buffer = ByteBuffer.allocate(26) + buffer.putShort(partitionData.error) + buffer.putInt(partitionData.partition) + buffer.putLong(partitionData.initialOffset) + buffer.putLong(partitionData.hw) + buffer.putInt(partitionData.messages.sizeInBytes.intValue()) + buffer.rewind() + + def complete = !buffer.hasRemaining && messagesSentSize >= messageSize + + def writeTo(channel: GatheringByteChannel): Int = { + var written = 0 + if(buffer.hasRemaining) + written += channel.write(buffer) + if(!buffer.hasRemaining && messagesSentSize < messageSize) { + val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize).toInt + messagesSentSize += bytesSent + written += bytesSent + } + written + } +} + + +object TopicData { def readFrom(buffer: ByteBuffer): TopicData = { val topic = Utils.readShortString(buffer, "UTF-8") val partitionCount = buffer.getInt val partitions = new Array[PartitionData](partitionCount) - for(i <- 0 until partitions.length) + for(i <- 0 until partitionCount) partitions(i) = PartitionData.readFrom(buffer) new TopicData(topic, partitions.sortBy(_.partition)) } @@ -90,74 +117,6 @@ case class TopicData(topic: String, partitionData: Array[PartitionData]) { } } -object FetchResponse { - val CurrentVersion = 1.shortValue() - - def readFrom(buffer: ByteBuffer): FetchResponse = { - val versionId = buffer.getShort - val correlationId = buffer.getInt - val dataCount = buffer.getInt - val data = new Array[TopicData](dataCount) - for(i <- 0 until data.length) - data(i) = TopicData.readFrom(buffer) - new FetchResponse(versionId, correlationId, data) - } -} - -case class FetchResponse(versionId: Short, correlationId: Int, data: Array[TopicData]) { - - val sizeInBytes = 2 + 4 + data.foldLeft(4)(_ + _.sizeInBytes) - - lazy val topicMap = data.groupBy(_.topic).mapValues(_.head) - - def messageSet(topic: String, partition: Int): ByteBufferMessageSet = { - val messageSet = topicMap.get(topic) match { - case Some(topicData) => - TopicData.findPartition(topicData.partitionData, partition).map(_.messages).getOrElse(MessageSet.Empty) - case None => - MessageSet.Empty - } - messageSet.asInstanceOf[ByteBufferMessageSet] - } - - def highWatermark(topic: String, partition: Int): Long = { - topicMap.get(topic) match { - case Some(topicData) => - TopicData.findPartition(topicData.partitionData, partition).map(_.hw).getOrElse(-1L) - case None => -1L - } - } -} - -// SENDS - -class PartitionDataSend(val partitionData: PartitionData) extends Send { - private val messageSize = partitionData.messages.sizeInBytes - private var messagesSentSize = 0L - - private val buffer = ByteBuffer.allocate(28) - buffer.putInt(partitionData.partition) - buffer.putInt(partitionData.error) - buffer.putLong(partitionData.initialOffset) - buffer.putLong(partitionData.hw) - buffer.putInt(partitionData.messages.sizeInBytes.intValue()) - buffer.rewind() - - def complete = !buffer.hasRemaining && messagesSentSize >= messageSize - - def writeTo(channel: GatheringByteChannel): Int = { - var written = 0 - if(buffer.hasRemaining) - written += channel.write(buffer) - if(!buffer.hasRemaining && messagesSentSize < messageSize) { - val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize).toInt - messagesSentSize += bytesSent - written += bytesSent - } - written - } -} - class TopicDataSend(val topicData: TopicData) extends Send { val size = topicData.sizeInBytes @@ -187,17 +146,60 @@ class TopicDataSend(val topicData: TopicData) extends Send { } } -class FetchResponseSend(val fetchResponse: FetchResponse, - val errorCode: Int = ErrorMapping.NoError) extends Send { + + +object FetchResponse { + def readFrom(buffer: ByteBuffer): FetchResponse = { + val versionId = buffer.getShort + val errorCode = buffer.getShort + val correlationId = buffer.getInt + val dataCount = buffer.getInt + val data = new Array[TopicData](dataCount) + for(i <- 0 until data.length) + data(i) = TopicData.readFrom(buffer) + new FetchResponse(versionId, correlationId, data, errorCode) + } +} + + +case class FetchResponse(versionId: Short, + correlationId: Int, + data: Array[TopicData], + errorCode: Short = ErrorMapping.NoError) { + + val sizeInBytes = 2 + 4 + 2 + data.foldLeft(4)(_ + _.sizeInBytes) + + lazy val topicMap = data.groupBy(_.topic).mapValues(_.head) + + def messageSet(topic: String, partition: Int): ByteBufferMessageSet = { + val messageSet = topicMap.get(topic) match { + case Some(topicData) => + TopicData.findPartition(topicData.partitionData, partition).map(_.messages).getOrElse(MessageSet.Empty) + case None => + MessageSet.Empty + } + messageSet.asInstanceOf[ByteBufferMessageSet] + } + + def highWatermark(topic: String, partition: Int): Long = { + topicMap.get(topic) match { + case Some(topicData) => + TopicData.findPartition(topicData.partitionData, partition).map(_.hw).getOrElse(-1L) + case None => -1L + } + } +} + + +class FetchResponseSend(val fetchResponse: FetchResponse) extends Send { private val size = fetchResponse.sizeInBytes - private var sent = 0 private val buffer = ByteBuffer.allocate(16) - buffer.putInt(size + 2) - buffer.putShort(errorCode.shortValue()) + buffer.putInt(size) buffer.putShort(fetchResponse.versionId) + buffer.putShort(fetchResponse.errorCode) buffer.putInt(fetchResponse.correlationId) buffer.putInt(fetchResponse.data.length) buffer.rewind() @@ -220,6 +222,5 @@ class FetchResponseSend(val fetchResponse: FetchResponse, written } - def sendSize = 4 + 2 + fetchResponse.sizeInBytes - + def sendSize = 4 + fetchResponse.sizeInBytes } diff --git a/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala b/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala new file mode 100644 index 00000000000..bf70b86eef8 --- /dev/null +++ b/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package kafka.api + +import java.nio._ +import kafka.utils._ +import collection.mutable.Map +import collection.mutable.HashMap + + +object LeaderAndISR { + def readFrom(buffer: ByteBuffer): LeaderAndISR = { + val leader = buffer.getInt + val leaderGenId = buffer.getInt + val ISRString = Utils.readShortString(buffer, "UTF-8") + val ISR = ISRString.split(",").map(_.toInt).toList + val zkVersion = buffer.getLong + new LeaderAndISR(leader, leaderGenId, ISR, zkVersion) + } +} + +case class LeaderAndISR(leader: Int, leaderEpoc: Int, ISR: List[Int], zkVersion: Long){ + def writeTo(buffer: ByteBuffer) { + buffer.putInt(leader) + buffer.putInt(leaderEpoc) + Utils.writeShortString(buffer, ISR.mkString(","), "UTF-8") + buffer.putLong(zkVersion) + } + + def sizeInBytes(): Int = { + val size = 4 + 4 + (2 + ISR.mkString(",").length) + 8 + size + } +} + + +object LeaderAndISRRequest { + val CurrentVersion = 1.shortValue() + val DefaultClientId = "" + + def readFrom(buffer: ByteBuffer): LeaderAndISRRequest = { + val versionId = buffer.getShort + val clientId = Utils.readShortString(buffer) + val isInit = buffer.get() + val ackTimeout = buffer.getInt + val leaderAndISRRequestCount = buffer.getInt + val leaderAndISRInfos = new HashMap[(String, Int), LeaderAndISR] + + for(i <- 0 until leaderAndISRRequestCount){ + val topic = Utils.readShortString(buffer, "UTF-8") + val partition = buffer.getInt + val leaderAndISRRequest = LeaderAndISR.readFrom(buffer) + + leaderAndISRInfos.put((topic, partition), leaderAndISRRequest) + } + new LeaderAndISRRequest(versionId, clientId, isInit, ackTimeout, leaderAndISRInfos) + } +} + + +case class LeaderAndISRRequest (versionId: Short, + clientId: String, + isInit: Byte, + ackTimeout: Int, + leaderAndISRInfos: + Map[(String, Int), LeaderAndISR]) + extends RequestOrResponse(Some(RequestKeys.LeaderAndISRRequest)) { + def this(isInit: Byte, ackTimeout: Int, leaderAndISRInfos: Map[(String, Int), LeaderAndISR]) = { + this(LeaderAndISRRequest.CurrentVersion, LeaderAndISRRequest.DefaultClientId, isInit, ackTimeout, leaderAndISRInfos) + } + + def writeTo(buffer: ByteBuffer) { + buffer.putShort(versionId) + Utils.writeShortString(buffer, clientId) + buffer.put(isInit) + buffer.putInt(ackTimeout) + buffer.putInt(leaderAndISRInfos.size) + for((key, value) <- leaderAndISRInfos){ + Utils.writeShortString(buffer, key._1, "UTF-8") + buffer.putInt(key._2) + value.writeTo(buffer) + } + } + + def sizeInBytes(): Int = { + var size = 1 + 2 + (2 + clientId.length) + 4 + 4 + for((key, value) <- leaderAndISRInfos) + size += (2 + key._1.length) + 4 + value.sizeInBytes + size + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala b/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala new file mode 100644 index 00000000000..0a9e5399cf8 --- /dev/null +++ b/core/src/main/scala/kafka/api/LeaderAndISRResponse.scala @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import kafka.common.ErrorMapping +import java.nio.ByteBuffer +import kafka.utils.Utils +import collection.mutable.HashMap +import collection.mutable.Map + + +object LeaderAndISRResponse { + def readFrom(buffer: ByteBuffer): LeaderAndISRResponse = { + val versionId = buffer.getShort + val errorCode = buffer.getShort + val numEntries = buffer.getInt + val responseMap = new HashMap[(String, Int), Short]() + for (i<- 0 until numEntries){ + val topic = Utils.readShortString(buffer, "UTF-8") + val partition = buffer.getInt + val partitionErrorCode = buffer.getShort + responseMap.put((topic, partition), partitionErrorCode) + } + new LeaderAndISRResponse(versionId, responseMap, errorCode) + } +} + + +case class LeaderAndISRResponse(versionId: Short, + responseMap: Map[(String, Int), Short], + errorCode: Short = ErrorMapping.NoError) + extends RequestOrResponse{ + def sizeInBytes(): Int ={ + var size = 2 + 2 + 4 + for ((key, value) <- responseMap){ + size += 2 + key._1.length + 4 + 2 + } + size + } + + def writeTo(buffer: ByteBuffer) { + buffer.putShort(versionId) + buffer.putShort(errorCode) + buffer.putInt(responseMap.size) + for ((key:(String, Int), value) <- responseMap){ + Utils.writeShortString(buffer, key._1, "UTF-8") + buffer.putInt(key._2) + buffer.putShort(value) + } + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala index b8b98c7309c..33e737b9adf 100644 --- a/core/src/main/scala/kafka/api/OffsetRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetRequest.scala @@ -18,83 +18,50 @@ package kafka.api import java.nio.ByteBuffer -import kafka.utils.{nonthreadsafe, Utils} -import kafka.network.{Send, Request} -import java.nio.channels.GatheringByteChannel -import kafka.common.ErrorMapping +import kafka.utils.Utils object OffsetRequest { + val CurrentVersion = 1.shortValue() + val DefaultClientId = "" + val SmallestTimeString = "smallest" val LargestTimeString = "largest" val LatestTime = -1L val EarliestTime = -2L def readFrom(buffer: ByteBuffer): OffsetRequest = { + val versionId = buffer.getShort + val clientId = Utils.readShortString(buffer) val topic = Utils.readShortString(buffer, "UTF-8") val partition = buffer.getInt() val offset = buffer.getLong val maxNumOffsets = buffer.getInt - new OffsetRequest(topic, partition, offset, maxNumOffsets) - } - - def serializeOffsetArray(offsets: Array[Long]): ByteBuffer = { - val size = 4 + 8 * offsets.length - val buffer = ByteBuffer.allocate(size) - buffer.putInt(offsets.length) - for (i <- 0 until offsets.length) - buffer.putLong(offsets(i)) - buffer.rewind - buffer - } - - def deserializeOffsetArray(buffer: ByteBuffer): Array[Long] = { - val size = buffer.getInt - val offsets = new Array[Long](size) - for (i <- 0 until offsets.length) - offsets(i) = buffer.getLong - offsets + new OffsetRequest(versionId, clientId, topic, partition, offset, maxNumOffsets) } } -class OffsetRequest(val topic: String, - val partition: Int, - val time: Long, - val maxNumOffsets: Int) extends Request(RequestKeys.Offsets) { +case class OffsetRequest(versionId: Short = OffsetRequest.CurrentVersion, + clientId: String = OffsetRequest.DefaultClientId, + topic: String, + partition: Int, + time: Long, + maxNumOffsets: Int) extends RequestOrResponse(Some(RequestKeys.Offsets)) { + def this(topic: String, partition: Int, time: Long, maxNumOffsets: Int) = + this(OffsetRequest.CurrentVersion, OffsetRequest.DefaultClientId, topic, partition, time, maxNumOffsets) + + def writeTo(buffer: ByteBuffer) { + buffer.putShort(versionId) + Utils.writeShortString(buffer, clientId) Utils.writeShortString(buffer, topic) buffer.putInt(partition) buffer.putLong(time) buffer.putInt(maxNumOffsets) } - def sizeInBytes(): Int = 2 + topic.length + 4 + 8 + 4 + def sizeInBytes(): Int = 2 + (2 + clientId.length()) + (2 + topic.length) + 4 + 8 + 4 - override def toString(): String= "OffsetRequest(topic:" + topic + ", part:" + partition + ", time:" + time + - ", maxNumOffsets:" + maxNumOffsets + ")" -} - -@nonthreadsafe -private[kafka] class OffsetArraySend(offsets: Array[Long]) extends Send { - private var size: Long = offsets.foldLeft(4)((sum, _) => sum + 8) - private val header = ByteBuffer.allocate(6) - header.putInt(size.asInstanceOf[Int] + 2) - header.putShort(ErrorMapping.NoError.asInstanceOf[Short]) - header.rewind() - private val contentBuffer = OffsetRequest.serializeOffsetArray(offsets) - - var complete: Boolean = false - - def writeTo(channel: GatheringByteChannel): Int = { - expectIncomplete() - var written = 0 - if(header.hasRemaining) - written += channel.write(header) - if(!header.hasRemaining && contentBuffer.hasRemaining) - written += channel.write(contentBuffer) - - if(!contentBuffer.hasRemaining) - complete = true - written - } + override def toString(): String= "OffsetRequest(version:" + versionId + ", client id:" + clientId + + ", topic:" + topic + ", part:" + partition + ", time:" + time + ", maxNumOffsets:" + maxNumOffsets + ")" } diff --git a/core/src/main/scala/kafka/api/OffsetResponse.scala b/core/src/main/scala/kafka/api/OffsetResponse.scala new file mode 100644 index 00000000000..56d169ce8e0 --- /dev/null +++ b/core/src/main/scala/kafka/api/OffsetResponse.scala @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer +import kafka.common.ErrorMapping + + +object OffsetResponse { + def readFrom(buffer: ByteBuffer): OffsetResponse = { + val versionId = buffer.getShort + val errorCode = buffer.getShort + val offsetsSize = buffer.getInt + val offsets = new Array[Long](offsetsSize) + for( i <- 0 until offsetsSize) { + offsets(i) = buffer.getLong + } + new OffsetResponse(versionId, offsets, errorCode) + } +} + +case class OffsetResponse(versionId: Short, + offsets: Array[Long], + errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{ + val sizeInBytes = 2 + 2 + offsets.foldLeft(4)((sum, _) => sum + 8) + + def writeTo(buffer: ByteBuffer) { + buffer.putShort(versionId) + /* error code */ + buffer.putShort(errorCode) + buffer.putInt(offsets.length) + offsets.foreach(buffer.putLong(_)) + } +} diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index 47c0e556647..a4a7184d089 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -19,7 +19,6 @@ package kafka.api import java.nio._ import kafka.message._ -import kafka.network._ import kafka.utils._ object ProducerRequest { @@ -58,7 +57,7 @@ case class ProducerRequest( versionId: Short, clientId: String, requiredAcks: Short, ackTimeout: Int, - data: Array[TopicData] ) extends Request(RequestKeys.Produce) { + data: Array[TopicData] ) extends RequestOrResponse(Some(RequestKeys.Produce)) { def this(correlationId: Int, clientId: String, requiredAcks: Short, ackTimeout: Int, data: Array[TopicData]) = this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeout, data) diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala index 08f7f1d2a83..06e8625e689 100644 --- a/core/src/main/scala/kafka/api/ProducerResponse.scala +++ b/core/src/main/scala/kafka/api/ProducerResponse.scala @@ -18,16 +18,14 @@ package kafka.api import java.nio.ByteBuffer -import java.nio.channels.GatheringByteChannel import kafka.common.ErrorMapping -import kafka.network.Send + object ProducerResponse { - val CurrentVersion = 1.shortValue() - def readFrom(buffer: ByteBuffer): ProducerResponse = { val versionId = buffer.getShort val correlationId = buffer.getInt + val errorCode = buffer.getShort val errorsSize = buffer.getInt val errors = new Array[Short](errorsSize) for( i <- 0 until errorsSize) { @@ -38,28 +36,21 @@ object ProducerResponse { for( i <- 0 until offsetsSize) { offsets(i) = buffer.getLong } - new ProducerResponse(versionId, correlationId, errors, offsets) + new ProducerResponse(versionId, correlationId, errors, offsets, errorCode) } - - def serializeResponse(producerResponse: ProducerResponse): ByteBuffer = { - val buffer = ByteBuffer.allocate(producerResponse.sizeInBytes) - producerResponse.writeTo(buffer) - buffer.rewind() - buffer - } - - def deserializeResponse(buffer: ByteBuffer): ProducerResponse = readFrom(buffer) - } -case class ProducerResponse(versionId: Short, correlationId: Int, errors: Array[Short], offsets: Array[Long]) { - val sizeInBytes = 2 + 4 + (4 + 2 * errors.length) + (4 + 8 * offsets.length) +case class ProducerResponse(versionId: Short, correlationId: Int, errors: Array[Short], + offsets: Array[Long], errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{ + val sizeInBytes = 2 + 2 + 4 + (4 + 2 * errors.length) + (4 + 8 * offsets.length) def writeTo(buffer: ByteBuffer) { - /* version */ + /* version id */ buffer.putShort(versionId) /* correlation id */ buffer.putInt(correlationId) + /* error code */ + buffer.putShort(errorCode) /* errors */ buffer.putInt(errors.length) errors.foreach(buffer.putShort(_)) @@ -67,35 +58,4 @@ case class ProducerResponse(versionId: Short, correlationId: Int, errors: Array[ buffer.putInt(offsets.length) offsets.foreach(buffer.putLong(_)) } -} - -class ProducerResponseSend(val producerResponse: ProducerResponse, - val error: Int = ErrorMapping.NoError) extends Send { - private val header = ByteBuffer.allocate(6) - header.putInt(producerResponse.sizeInBytes + 2) - header.putShort(error.toShort) - header.rewind() - - val responseContent = ProducerResponse.serializeResponse(producerResponse) - - var complete = false - - def writeTo(channel: GatheringByteChannel):Int = { - expectIncomplete() - var written = 0 - if(header.hasRemaining) - written += channel.write(header) - - trace("Wrote %d bytes for header".format(written)) - - if(!header.hasRemaining && responseContent.hasRemaining) - written += channel.write(responseContent) - - trace("Wrote %d bytes for header, errors and offsets".format(written)) - - if(!header.hasRemaining && !responseContent.hasRemaining) - complete = true - - written - } -} +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala index 445a1cef79c..fabc0b80a03 100644 --- a/core/src/main/scala/kafka/api/RequestKeys.scala +++ b/core/src/main/scala/kafka/api/RequestKeys.scala @@ -22,4 +22,6 @@ object RequestKeys { val Fetch: Short = 1 val Offsets: Short = 2 val TopicMetadata: Short = 3 + val LeaderAndISRRequest: Short = 4 + val StopReplicaRequest: Short = 5 } diff --git a/core/src/main/scala/kafka/network/Request.scala b/core/src/main/scala/kafka/api/RequestOrResponse.scala similarity index 89% rename from core/src/main/scala/kafka/network/Request.scala rename to core/src/main/scala/kafka/api/RequestOrResponse.scala index d403d359b2d..ac5b64e24e5 100644 --- a/core/src/main/scala/kafka/network/Request.scala +++ b/core/src/main/scala/kafka/api/RequestOrResponse.scala @@ -1,3 +1,5 @@ +package kafka.api + /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -15,11 +17,9 @@ * limitations under the License. */ -package kafka.network - import java.nio._ -private[kafka] abstract class Request(val id: Short) { +private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) { def sizeInBytes: Int diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala new file mode 100644 index 00000000000..328448f2df1 --- /dev/null +++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + + +import java.nio._ +import kafka.utils._ +import collection.mutable.HashSet +import collection.mutable.Set + +object StopReplicaRequest { + val CurrentVersion = 1.shortValue() + val DefaultClientId = "" + + def readFrom(buffer: ByteBuffer): StopReplicaRequest = { + val versionId = buffer.getShort + val clientId = Utils.readShortString(buffer) + val ackTimeout = buffer.getInt + val topicPartitionPairCount = buffer.getInt + val topicPartitionPairSet = new HashSet[(String, Int)]() + for (i <- 0 until topicPartitionPairCount){ + topicPartitionPairSet.add((Utils.readShortString(buffer, "UTF-8"), buffer.getInt)) + } + new StopReplicaRequest(versionId, clientId, ackTimeout, topicPartitionPairSet) + } +} + +case class StopReplicaRequest(versionId: Short, + clientId: String, + ackTimeout: Int, + stopReplicaSet: Set[(String, Int)] + ) extends RequestOrResponse(Some(RequestKeys.StopReplicaRequest)) { + def this(ackTimeout: Int, stopReplicaSet: Set[(String, Int)]) = { + this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, ackTimeout, stopReplicaSet) + } + + def writeTo(buffer: ByteBuffer) { + buffer.putShort(versionId) + Utils.writeShortString(buffer, clientId) + buffer.putInt(ackTimeout) + buffer.putInt(stopReplicaSet.size) + for ((topic, partitionId) <- stopReplicaSet){ + Utils.writeShortString(buffer, topic, "UTF-8") + buffer.putInt(partitionId) + } + } + + def sizeInBytes(): Int = { + var size = 2 + (2 + clientId.length()) + 4 + 4 + for ((topic, partitionId) <- stopReplicaSet){ + size += (2 + topic.length()) + 4 + } + size + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/api/StopReplicaResponse.scala b/core/src/main/scala/kafka/api/StopReplicaResponse.scala new file mode 100644 index 00000000000..80bed172431 --- /dev/null +++ b/core/src/main/scala/kafka/api/StopReplicaResponse.scala @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer +import kafka.utils.Utils +import collection.mutable.HashMap +import collection.mutable.Map +import kafka.common.ErrorMapping + + +object StopReplicaResponse { + def readFrom(buffer: ByteBuffer): StopReplicaResponse = { + val versionId = buffer.getShort + val errorCode = buffer.getShort + val numEntries = buffer.getInt + + val responseMap = new HashMap[(String, Int), Short]() + for (i<- 0 until numEntries){ + val topic = Utils.readShortString(buffer, "UTF-8") + val partition = buffer.getInt + val partitionErrorCode = buffer.getShort() + responseMap.put((topic, partition), partitionErrorCode) + } + new StopReplicaResponse(versionId, responseMap, errorCode) + } +} + + +case class StopReplicaResponse(val versionId: Short, + val responseMap: Map[(String, Int), Short], + val errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{ + def sizeInBytes: Int ={ + var size = 2 + 2 + 4 + for ((key, value) <- responseMap){ + size += (2 + key._1.length) + 4 + 2 + } + size + } + + def writeTo(buffer: ByteBuffer) { + buffer.putShort(versionId) + buffer.putShort(errorCode) + buffer.putInt(responseMap.size) + for ((key:(String, Int), value) <- responseMap){ + Utils.writeShortString(buffer, key._1, "UTF-8") + buffer.putInt(key._2) + buffer.putShort(value) + } + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/api/TopicMetaDataResponse.scala b/core/src/main/scala/kafka/api/TopicMetaDataResponse.scala new file mode 100644 index 00000000000..99b4a18bb09 --- /dev/null +++ b/core/src/main/scala/kafka/api/TopicMetaDataResponse.scala @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.nio.ByteBuffer +import kafka.common.ErrorMapping + + +object TopicMetaDataResponse { + + def readFrom(buffer: ByteBuffer): TopicMetaDataResponse = { + val errorCode = buffer.getShort + val versionId = buffer.getShort + + val topicCount = buffer.getInt + val topicsMetadata = new Array[TopicMetadata](topicCount) + for( i <- 0 until topicCount) { + topicsMetadata(i) = TopicMetadata.readFrom(buffer) + } + new TopicMetaDataResponse(versionId, topicsMetadata.toSeq, errorCode) + } +} + +case class TopicMetaDataResponse(versionId: Short, + topicsMetadata: Seq[TopicMetadata], + errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse +{ + val sizeInBytes = 2 + topicsMetadata.foldLeft(4)(_ + _.sizeInBytes) + 2 + + def writeTo(buffer: ByteBuffer) { + buffer.putShort(versionId) + /* error code */ + buffer.putShort(errorCode) + /* topic metadata */ + buffer.putInt(topicsMetadata.length) + topicsMetadata.foreach(_.writeTo(buffer)) + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala index b4b7a1218a3..7d3b8d8bd15 100644 --- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala +++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala @@ -19,16 +19,16 @@ package kafka.api import java.nio.ByteBuffer import kafka.utils.Utils._ -import kafka.network.{Send, Request} -import java.nio.channels.GatheringByteChannel -import kafka.common.ErrorMapping import collection.mutable.ListBuffer +import kafka.utils._ sealed trait DetailedMetadataRequest { def requestId: Short } case object SegmentMetadata extends DetailedMetadataRequest { val requestId = 1.asInstanceOf[Short] } case object NoSegmentMetadata extends DetailedMetadataRequest { val requestId = 0.asInstanceOf[Short] } object TopicMetadataRequest { + val CurrentVersion = 1.shortValue() + val DefaultClientId = "" /** * TopicMetadataRequest has the following format - @@ -48,6 +48,8 @@ object TopicMetadataRequest { } def readFrom(buffer: ByteBuffer): TopicMetadataRequest = { + val versionId = buffer.getShort + val clientId = Utils.readShortString(buffer) val numTopics = getIntInRange(buffer, "number of topics", (0, Int.MaxValue)) val topics = new ListBuffer[String]() for(i <- 0 until numTopics) @@ -66,37 +68,26 @@ object TopicMetadataRequest { } debug("topic = %s, detailed metadata request = %d" .format(topicsList.head, returnDetailedMetadata.requestId)) - new TopicMetadataRequest(topics.toList, returnDetailedMetadata, timestamp, count) - } - - def serializeTopicMetadata(topicMetadata: Seq[TopicMetadata]): ByteBuffer = { - val size = topicMetadata.foldLeft(4 /* num topics */)(_ + _.sizeInBytes) - val buffer = ByteBuffer.allocate(size) - debug("Allocating buffer of size %d for topic metadata response".format(size)) - /* number of topics */ - buffer.putInt(topicMetadata.size) - /* topic partition_metadata */ - topicMetadata.foreach(m => m.writeTo(buffer)) - buffer.rewind() - buffer - } - - def deserializeTopicsMetadataResponse(buffer: ByteBuffer): Seq[TopicMetadata] = { - /* number of topics */ - val numTopics = getIntInRange(buffer, "number of topics", (0, Int.MaxValue)) - val topicMetadata = new Array[TopicMetadata](numTopics) - for(i <- 0 until numTopics) - topicMetadata(i) = TopicMetadata.readFrom(buffer) - topicMetadata + new TopicMetadataRequest(versionId, clientId, topics.toList, returnDetailedMetadata, timestamp, count) } } -case class TopicMetadataRequest(val topics: Seq[String], +case class TopicMetadataRequest(val versionId: Short, + val clientId: String, + val topics: Seq[String], val detailedMetadata: DetailedMetadataRequest = NoSegmentMetadata, val timestamp: Option[Long] = None, val count: Option[Int] = None) - extends Request(RequestKeys.TopicMetadata){ + extends RequestOrResponse(Some(RequestKeys.TopicMetadata)){ + +def this(topics: Seq[String]) = + this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics, NoSegmentMetadata, None, None) + + + def writeTo(buffer: ByteBuffer) { + buffer.putShort(versionId) + Utils.writeShortString(buffer, clientId) buffer.putInt(topics.size) topics.foreach(topic => writeShortString(buffer, topic)) buffer.putShort(detailedMetadata.requestId) @@ -110,7 +101,7 @@ case class TopicMetadataRequest(val topics: Seq[String], } def sizeInBytes(): Int = { - var size: Int = 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */ + + var size: Int = 2 + (2 + clientId.length) + 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */ + 2 /* detailed metadata */ detailedMetadata match { case SegmentMetadata => @@ -121,34 +112,3 @@ case class TopicMetadataRequest(val topics: Seq[String], size } } - -class TopicMetadataSend(topicsMetadata: Seq[TopicMetadata]) extends Send { - private var size: Int = topicsMetadata.foldLeft(4)(_ + _.sizeInBytes) - private val header = ByteBuffer.allocate(6) - header.putInt(size + 2) - header.putShort(ErrorMapping.NoError.asInstanceOf[Short]) - header.rewind() - - val metadata = TopicMetadataRequest.serializeTopicMetadata(topicsMetadata) - metadata.rewind() - - trace("Wrote size %d in header".format(size + 2)) - var complete: Boolean = false - - def writeTo(channel: GatheringByteChannel): Int = { - expectIncomplete() - var written = 0 - if(header.hasRemaining) - written += channel.write(header) - trace("Wrote %d bytes for header".format(written)) - - if(!header.hasRemaining && metadata.hasRemaining) - written += channel.write(metadata) - - trace("Wrote %d bytes for header and metadata".format(written)) - - if(!metadata.hasRemaining) - complete = true - written - } -} \ No newline at end of file diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala index d308b741b57..958e16566e2 100644 --- a/core/src/main/scala/kafka/common/ErrorMapping.scala +++ b/core/src/main/scala/kafka/common/ErrorMapping.scala @@ -28,36 +28,36 @@ import scala.Predef._ object ErrorMapping { val EmptyByteBuffer = ByteBuffer.allocate(0) - val UnknownCode = -1 - val NoError = 0 - val OffsetOutOfRangeCode = 1 - val InvalidMessageCode = 2 - val InvalidPartitionCode = 3 - val InvalidFetchSizeCode = 4 - val InvalidFetchRequestFormatCode = 5 - val NotLeaderForPartitionCode = 6 - val NoLeaderForPartitionCode = 7 - val UnknownTopicCode = 8 + val UnknownCode : Short = -1 + val NoError : Short = 0 + val OffsetOutOfRangeCode : Short = 1 + val InvalidMessageCode : Short = 2 + val InvalidPartitionCode : Short = 3 + val InvalidFetchSizeCode : Short = 4 + val InvalidFetchRequestFormatCode : Short = 5 + val NoLeaderForPartitionCode : Short = 6 + val NotLeaderForPartitionCode : Short = 7 + val UnknownTopicCode : Short = 8 private val exceptionToCode = - Map[Class[Throwable], Int]( + Map[Class[Throwable], Short]( classOf[OffsetOutOfRangeException].asInstanceOf[Class[Throwable]] -> OffsetOutOfRangeCode, classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode, classOf[InvalidPartitionException].asInstanceOf[Class[Throwable]] -> InvalidPartitionCode, classOf[InvalidMessageSizeException].asInstanceOf[Class[Throwable]] -> InvalidFetchSizeCode, classOf[FetchRequestFormatException].asInstanceOf[Class[Throwable]] -> InvalidFetchRequestFormatCode, classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode, - classOf[NoLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NoLeaderForPartitionCode -// classOf[UnknownTopicException].asInstanceOf[Class[Throwable]] -> UnknownTopicCode + classOf[NoLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NoLeaderForPartitionCode, + classOf[UnknownTopicException].asInstanceOf[Class[Throwable]] -> UnknownTopicCode ).withDefaultValue(UnknownCode) /* invert the mapping */ private val codeToException = - (Map[Int, Class[Throwable]]() ++ exceptionToCode.iterator.map(p => (p._2, p._1))).withDefaultValue(classOf[UnknownException]) + (Map[Short, Class[Throwable]]() ++ exceptionToCode.iterator.map(p => (p._2, p._1))).withDefaultValue(classOf[UnknownException]) - def codeFor(exception: Class[Throwable]): Int = exceptionToCode(exception) + def codeFor(exception: Class[Throwable]): Short = exceptionToCode(exception) - def maybeThrowException(code: Int) = + def maybeThrowException(code: Short) = if(code != 0) throw codeToException(code).newInstance() } diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index cfefeb98194..ebb08c02546 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -57,10 +57,10 @@ class SimpleConsumer( val host: String, } } - private def sendRequest(request: Request): Tuple2[Receive, Int] = { + private def sendRequest(request: RequestOrResponse): Receive = { lock synchronized { getOrMakeConnection() - var response: Tuple2[Receive,Int] = null + var response: Receive = null try { blockingChannel.send(request) response = blockingChannel.receive() @@ -92,7 +92,7 @@ class SimpleConsumer( val host: String, def fetch(request: FetchRequest): FetchResponse = { val startTime = SystemTime.nanoseconds val response = sendRequest(request) - val fetchResponse = FetchResponse.readFrom(response._1.buffer) + val fetchResponse = FetchResponse.readFrom(response.buffer) val fetchedSize = fetchResponse.sizeInBytes val endTime = SystemTime.nanoseconds @@ -112,7 +112,7 @@ class SimpleConsumer( val host: String, def getOffsetsBefore(topic: String, partition: Int, time: Long, maxNumOffsets: Int): Array[Long] = { val request = new OffsetRequest(topic, partition, time, maxNumOffsets) val response = sendRequest(request) - OffsetRequest.deserializeOffsetArray(response._1.buffer) + OffsetResponse.readFrom(response.buffer).offsets } private def getOrMakeConnection() { diff --git a/core/src/main/scala/kafka/javaapi/ProducerRequest.scala b/core/src/main/scala/kafka/javaapi/ProducerRequest.scala index 1c15dc58683..b9ef1c72d79 100644 --- a/core/src/main/scala/kafka/javaapi/ProducerRequest.scala +++ b/core/src/main/scala/kafka/javaapi/ProducerRequest.scala @@ -16,7 +16,7 @@ */ package kafka.javaapi -import kafka.network.Request +import kafka.api.RequestOrResponse import kafka.api.{RequestKeys, TopicData} import java.nio.ByteBuffer @@ -24,7 +24,7 @@ class ProducerRequest(val correlationId: Int, val clientId: String, val requiredAcks: Short, val ackTimeout: Int, - val data: Array[TopicData]) extends Request(RequestKeys.Produce) { + val data: Array[TopicData]) extends RequestOrResponse(Some(RequestKeys.Produce)) { val underlying = new kafka.api.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeout, data) diff --git a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala index fa912cc13e9..57ad7613272 100644 --- a/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala @@ -22,7 +22,7 @@ import kafka.message._ class ByteBufferMessageSet(private val buffer: ByteBuffer, private val initialOffset: Long = 0L, - private val errorCode: Int = ErrorMapping.NoError) extends MessageSet { + private val errorCode: Short = ErrorMapping.NoError) extends MessageSet { val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer, initialOffset, errorCode) diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala index 3babc3507b1..47e9e867f88 100644 --- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala @@ -35,7 +35,7 @@ import kafka.common.{MessageSizeTooLargeException, InvalidMessageSizeException, */ class ByteBufferMessageSet(private val buffer: ByteBuffer, private val initialOffset: Long = 0L, - private val errorCode: Int = ErrorMapping.NoError) extends MessageSet with Logging { + private val errorCode: Short = ErrorMapping.NoError) extends MessageSet with Logging { private var shallowValidByteCount = -1L if(sizeInBytes > Int.MaxValue) throw new InvalidMessageSizeException("Message set cannot be larger than " + Int.MaxValue) diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala index addb6253335..262861f236e 100644 --- a/core/src/main/scala/kafka/network/BlockingChannel.scala +++ b/core/src/main/scala/kafka/network/BlockingChannel.scala @@ -20,6 +20,7 @@ package kafka.network import java.net.InetSocketAddress import java.nio.channels._ import kafka.utils.{nonthreadsafe, Logging} +import kafka.api.RequestOrResponse /** * A simple blocking channel with timeouts correctly enabled. @@ -70,7 +71,7 @@ class BlockingChannel( val host: String, def isConnected = connected - def send(request: Request):Int = { + def send(request: RequestOrResponse):Int = { if(!connected) throw new ClosedChannelException() @@ -78,16 +79,14 @@ class BlockingChannel( val host: String, send.writeCompletely(writeChannel) } - def receive(): Tuple2[Receive, Int] = { + def receive(): Receive = { if(!connected) throw new ClosedChannelException() val response = new BoundedByteBufferReceive() response.readCompletely(readChannel) - // this has the side effect of setting the initial position of buffer correctly - val errorCode: Int = response.buffer.getShort - (response, errorCode) + response } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala b/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala index 5e1eb5f05ae..7aaf7751aae 100644 --- a/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala +++ b/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala @@ -20,6 +20,7 @@ package kafka.network import java.nio._ import java.nio.channels._ import kafka.utils._ +import kafka.api.RequestOrResponse @nonthreadsafe private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send { @@ -37,12 +38,18 @@ private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send def this(size: Int) = this(ByteBuffer.allocate(size)) - def this(request: Request) = { - this(request.sizeInBytes + 2) - buffer.putShort(request.id) + def this(request: RequestOrResponse) = { + this(request.sizeInBytes + (if(request.requestId != None) 2 else 0)) + request.requestId match { + case Some(requestId) => + buffer.putShort(requestId) + case None => + } + request.writeTo(buffer) buffer.rewind() } + def writeTo(channel: GatheringByteChannel): Int = { expectIncomplete() diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index fc7cc09c6d0..89db7233e70 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -19,7 +19,7 @@ package kafka.producer import kafka.api._ import kafka.message.MessageSet -import kafka.network.{BlockingChannel, BoundedByteBufferSend, Request, Receive} +import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive} import kafka.utils._ import java.util.Random import kafka.common.MessageSizeTooLargeException @@ -46,7 +46,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { trace("Instantiating Scala Sync Producer") - private def verifyRequest(request: Request) = { + private def verifyRequest(request: RequestOrResponse) = { /** * This seems a little convoluted, but the idea is to turn on verification simply changing log4j settings * Also, when verification is turned on, care should be taken to see that the logs don't fill up with unnecessary @@ -66,13 +66,13 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { /** * Common functionality for the public send methods */ - private def doSend(request: Request): Tuple2[Receive, Int] = { + private def doSend(request: RequestOrResponse): Receive = { lock synchronized { verifyRequest(request) val startTime = SystemTime.nanoseconds getOrMakeConnection() - var response: Tuple2[Receive, Int] = null + var response: Receive = null try { blockingChannel.send(request) response = blockingChannel.receive() @@ -108,12 +108,13 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { } } val response = doSend(producerRequest) - ProducerResponse.deserializeResponse(response._1.buffer) + ProducerResponse.readFrom(response.buffer) } def send(request: TopicMetadataRequest): Seq[TopicMetadata] = { val response = doSend(request) - TopicMetadataRequest.deserializeTopicsMetadataResponse(response._1.buffer) + val topicMetaDataResponse = TopicMetaDataResponse.readFrom(response.buffer) + topicMetaDataResponse.topicsMetadata } def close() = { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 5f9cbd6d1a1..187106b7916 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -28,8 +28,11 @@ import kafka.network._ import kafka.utils.{SystemTime, Logging} import org.apache.log4j.Logger import scala.collection._ +import mutable.HashMap import scala.math._ import java.lang.IllegalStateException +import kafka.network.RequestChannel.Response + /** * Logic to handle the various Kafka requests @@ -50,10 +53,40 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, case RequestKeys.Fetch => handleFetchRequest(request) case RequestKeys.Offsets => handleOffsetRequest(request) case RequestKeys.TopicMetadata => handleTopicMetadataRequest(request) + case RequestKeys.LeaderAndISRRequest => handleLeaderAndISRRequest(request) + case RequestKeys.StopReplicaRequest => handleStopReplicaRequest(request) case _ => throw new IllegalStateException("No mapping found for handler id " + apiId) } } + + def handleLeaderAndISRRequest(request: RequestChannel.Request){ + val leaderAndISRRequest = LeaderAndISRRequest.readFrom(request.request.buffer) + val responseMap = new HashMap[(String, Int), Short] + + // TODO: put in actual logic later + for((key, value) <- leaderAndISRRequest.leaderAndISRInfos){ + responseMap.put(key, ErrorMapping.NoError) + } + + val leaderAndISRResponse = new LeaderAndISRResponse(leaderAndISRRequest.versionId, responseMap) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndISRResponse), -1)) + } + + + def handleStopReplicaRequest(request: RequestChannel.Request){ + val stopReplicaRequest = StopReplicaRequest.readFrom(request.request.buffer) + val responseMap = new HashMap[(String, Int), Short] + + // TODO: put in actual logic later + for((topic, partition) <- stopReplicaRequest.stopReplicaSet){ + responseMap.put((topic, partition), ErrorMapping.NoError) + } + val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, responseMap) + requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse), -1)) + } + + /** * Handle a produce request */ @@ -65,7 +98,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, val response = produce(produceRequest) debug("kafka produce time " + (SystemTime.milliseconds - sTime) + " ms") - requestChannel.sendResponse(new RequestChannel.Response(request, new ProducerResponseSend(response), -1)) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response), -1)) // Now check any outstanding fetches this produce just unblocked var satisfied = new mutable.ArrayBuffer[DelayedFetch] @@ -77,7 +110,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, for(fetchReq <- satisfied) { val topicData = readMessageSets(fetchReq.fetch) val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData) - requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response, ErrorMapping.NoError), -1)) + requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response), -1)) } } @@ -115,7 +148,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, } } } - new ProducerResponse(ProducerResponse.CurrentVersion, request.correlationId, errors, offsets) + new ProducerResponse(request.versionId, request.correlationId, errors, offsets) } /** @@ -131,9 +164,8 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, fetchRequest.validate() } catch { case e:FetchRequestFormatException => - val response = new FetchResponse(FetchResponse.CurrentVersion, fetchRequest.correlationId, Array.empty) - val channelResponse = new RequestChannel.Response(request, new FetchResponseSend(response, - ErrorMapping.InvalidFetchRequestFormatCode), -1) + val response = new FetchResponse(fetchRequest.versionId, fetchRequest.correlationId, Array.empty) + val channelResponse = new RequestChannel.Response(request, new FetchResponseSend(response), -1) requestChannel.sendResponse(channelResponse) } @@ -147,7 +179,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, debug("Returning fetch response %s for fetch request with correlation id %d" .format(topicData.map(_.partitionData.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId)) val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData) - requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response, ErrorMapping.NoError), -1)) + requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response), -1)) } else { // create a list of (topic, partition) pairs to use as keys for this delayed request val keys: Seq[Any] = fetchRequest.offsetInfo.flatMap(o => o.partitions.map((o.topic, _))) @@ -240,8 +272,8 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, /** * Read from a single topic/partition at the given offset */ - private def readMessageSet(topic: String, partition: Int, offset: Long, maxSize: Int): Either[Int, MessageSet] = { - var response: Either[Int, MessageSet] = null + private def readMessageSet(topic: String, partition: Int, offset: Long, maxSize: Int): Either[Short, MessageSet] = { + var response: Either[Short, MessageSet] = null try { // check if the current broker is the leader for the partitions kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topic, partition) @@ -264,8 +296,8 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, if(requestLogger.isTraceEnabled) requestLogger.trace("Offset request " + offsetRequest.toString) val offsets = logManager.getOffsets(offsetRequest) - val response = new OffsetArraySend(offsets) - requestChannel.sendResponse(new RequestChannel.Response(request, response, -1)) + val response = new OffsetResponse(offsetRequest.versionId, offsets) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response), -1)) } /** @@ -303,7 +335,8 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, } } info("Sending response for topic metadata request") - requestChannel.sendResponse(new RequestChannel.Response(request, new TopicMetadataSend(topicsMetadata), -1)) + val response = new TopicMetaDataResponse(metadataRequest.versionId, topicsMetadata.toSeq) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response), -1)) } def close() { @@ -337,7 +370,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager, def expire(delayed: DelayedFetch) { val topicData = readMessageSets(delayed.fetch) val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData) - requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response, ErrorMapping.NoError), -1)) + requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response), -1)) } } } diff --git a/core/src/main/scala/kafka/server/MessageSetSend.scala b/core/src/main/scala/kafka/server/MessageSetSend.scala index e300ad038e6..079460d3e27 100644 --- a/core/src/main/scala/kafka/server/MessageSetSend.scala +++ b/core/src/main/scala/kafka/server/MessageSetSend.scala @@ -29,13 +29,13 @@ import kafka.common.ErrorMapping * wholly in kernel space */ @nonthreadsafe -private[server] class MessageSetSend(val messages: MessageSet, val errorCode: Int) extends Send { +private[server] class MessageSetSend(val messages: MessageSet, val errorCode: Short) extends Send { private var sent: Long = 0 private var size: Long = messages.sizeInBytes private val header = ByteBuffer.allocate(6) header.putInt(size.asInstanceOf[Int] + 2) - header.putShort(errorCode.asInstanceOf[Short]) + header.putShort(errorCode) header.rewind() var complete: Boolean = false diff --git a/core/src/test/scala/unit/kafka/controller/ControllerToBrokerRequestTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerToBrokerRequestTest.scala new file mode 100644 index 00000000000..673a924d804 --- /dev/null +++ b/core/src/test/scala/unit/kafka/controller/ControllerToBrokerRequestTest.scala @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package kafka.controller + + +import org.scalatest.junit.JUnit3Suite + +import junit.framework.Assert._ +import java.nio.ByteBuffer +import kafka.common.ErrorMapping +import kafka.api._ +import collection.mutable.Map +import collection.mutable.Set +import kafka.integration.KafkaServerTestHarness +import kafka.utils.TestUtils +import kafka.server.KafkaConfig +import kafka.network.{Receive, BlockingChannel} + + +class ControllerToBrokerRequestTest extends JUnit3Suite with KafkaServerTestHarness { + + val kafkaProps = TestUtils.createBrokerConfigs(1) + val configs = List(new KafkaConfig(kafkaProps.head)) + var blockingChannel: BlockingChannel = null + + override def setUp() { + super.setUp() + blockingChannel = new BlockingChannel("localhost", configs.head.port, 1000000, 0, 64*1024) + blockingChannel.connect + } + + override def tearDown() { + super.tearDown() + blockingChannel.disconnect() + } + + + def createSampleLeaderAndISRRequest() : LeaderAndISRRequest = { + val topic1 = "test1" + val topic2 = "test2" + + val leader1 = 1; + val ISR1 = List(1, 2, 3) + + val leader2 = 2; + val ISR2 = List(2, 3, 4) + + val leaderAndISR1 = new LeaderAndISR(leader1, 1, ISR1, 1) + val leaderAndISR2 = new LeaderAndISR(leader2, 1, ISR2, 2) + val map = Map(((topic1, 1), leaderAndISR1), ((topic1, 2), leaderAndISR1), + ((topic2, 1), leaderAndISR2), ((topic2, 2), leaderAndISR2)) + + new LeaderAndISRRequest(1, "client 1", 1, 4, map) + } + + def createSampleLeaderAndISRResponse() : LeaderAndISRResponse = { + val topic1 = "test1" + val topic2 = "test2" + val responseMap = Map(((topic1, 1), ErrorMapping.NoError), ((topic1, 2), ErrorMapping.NoError), + ((topic2, 1), ErrorMapping.NoError), ((topic2, 2), ErrorMapping.NoError)) + + new LeaderAndISRResponse(1, responseMap) + } + + + def createSampleStopReplicaRequest() : StopReplicaRequest = { + val topic1 = "test1" + val topic2 = "test2" + new StopReplicaRequest(1, "client 1", 1000, Set((topic1, 1), (topic1, 2), + (topic2, 1), (topic2, 2))) + } + + def createSampleStopReplicaResponse() : StopReplicaResponse = { + val topic1 = "test1" + val topic2 = "test2" + val responseMap = Map(((topic1, 1), ErrorMapping.NoError), ((topic1, 2), ErrorMapping.NoError), + ((topic2, 1), ErrorMapping.NoError), ((topic2, 2), ErrorMapping.NoError)) + + new StopReplicaResponse(1, responseMap) + } + + + def testLeaderAndISRRequest { + val leaderAndISRRequest = createSampleLeaderAndISRRequest() + + val serializedLeaderAndISRRequest = ByteBuffer.allocate(leaderAndISRRequest.sizeInBytes) + leaderAndISRRequest.writeTo(serializedLeaderAndISRRequest) + serializedLeaderAndISRRequest.rewind() + val deserializedLeaderAndISRRequest = LeaderAndISRRequest.readFrom(serializedLeaderAndISRRequest) + + assertEquals(leaderAndISRRequest, deserializedLeaderAndISRRequest) + } + + def testLeaderAndISRResponse { + val leaderAndISRResponse = createSampleLeaderAndISRResponse() + + val serializedLeaderAndISRResponse = ByteBuffer.allocate(leaderAndISRResponse.sizeInBytes) + leaderAndISRResponse.writeTo(serializedLeaderAndISRResponse) + serializedLeaderAndISRResponse.rewind() + val deserializedLeaderAndISRResponse = LeaderAndISRResponse.readFrom(serializedLeaderAndISRResponse) + assertEquals(leaderAndISRResponse, deserializedLeaderAndISRResponse) + } + + + def testStopReplicaRequest { + val stopReplicaRequest = createSampleStopReplicaRequest() + + val serializedStopReplicaRequest = ByteBuffer.allocate(stopReplicaRequest.sizeInBytes) + stopReplicaRequest.writeTo(serializedStopReplicaRequest) + serializedStopReplicaRequest.rewind() + val deserializedStopReplicaRequest = StopReplicaRequest.readFrom(serializedStopReplicaRequest) + assertEquals(stopReplicaRequest, deserializedStopReplicaRequest) + } + + + def testStopReplicaResponse { + val stopReplicaResponse = createSampleStopReplicaResponse() + + val serializedStopReplicaResponse = ByteBuffer.allocate(stopReplicaResponse.sizeInBytes) + stopReplicaResponse.writeTo(serializedStopReplicaResponse) + serializedStopReplicaResponse.rewind() + val deserializedStopReplicaResponse = StopReplicaResponse.readFrom(serializedStopReplicaResponse) + assertEquals(stopReplicaResponse, deserializedStopReplicaResponse) + } + + + + def testEndToEndLeaderAndISRRequest { + + val leaderAndISRRequest = createSampleLeaderAndISRRequest() + + var response: Receive = null + blockingChannel.send(leaderAndISRRequest) + response = blockingChannel.receive() + + val leaderAndISRResponse = LeaderAndISRResponse.readFrom(response.buffer) + val expectedLeaderAndISRResponse = createSampleLeaderAndISRResponse() + + assertEquals(leaderAndISRResponse, expectedLeaderAndISRResponse) + + } + + + + def testEndToEndStopReplicaRequest { + val stopReplicaRequest = createSampleStopReplicaRequest() + + var response: Receive = null + blockingChannel.send(stopReplicaRequest) + response = blockingChannel.receive() + + val stopReplicaResponse = StopReplicaResponse.readFrom(response.buffer) + val expectedStopReplicaResponse = createSampleStopReplicaResponse() + assertEquals(stopReplicaResponse, expectedStopReplicaResponse) + + } + +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala index 893637a50a3..b26cf25c5f0 100644 --- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala @@ -25,12 +25,13 @@ import kafka.log.LogManager import junit.framework.Assert._ import org.easymock.EasyMock import kafka.network._ -import kafka.api.{TopicMetadataSend, TopicMetadataRequest} +import kafka.api.{TopicMetaDataResponse, TopicMetadataRequest} import kafka.cluster.Broker import kafka.utils.TestUtils import kafka.utils.TestUtils._ import kafka.server.{ReplicaManager, KafkaZooKeeper, KafkaApis, KafkaConfig} + class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { val props = createBrokerConfigs(1) val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1}) @@ -104,10 +105,10 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness { // call the API (to be tested) to get metadata apis.handleTopicMetadataRequest(new RequestChannel.Request(processor=0, requestKey=5, request=receivedRequest, start=1)) - val metadataResponse = requestChannel.receiveResponse(0).response.asInstanceOf[TopicMetadataSend].metadata + val metadataResponse = requestChannel.receiveResponse(0).response.asInstanceOf[BoundedByteBufferSend].buffer // check assertions - val topicMetadata = TopicMetadataRequest.deserializeTopicsMetadataResponse(metadataResponse) + val topicMetadata = TopicMetaDataResponse.readFrom(metadataResponse).topicsMetadata assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size) assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic) val partitionMetadata = topicMetadata.head.partitionsMetadata diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala index 684029f23a2..26601ebdd47 100644 --- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala +++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala @@ -121,7 +121,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { buffer.put(emptyMessageSet.getSerialized()) buffer.put(regularMessgeSet.getSerialized()) buffer.rewind - val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0) + val mixedMessageSet = new ByteBufferMessageSet(buffer) TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator)) //make sure ByteBufferMessageSet is re-iterable. TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator)) @@ -142,7 +142,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases { buffer.put(emptyMessageSet.getSerialized()) buffer.put(regularMessgeSet.getSerialized()) buffer.rewind - val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0) + val mixedMessageSet = new ByteBufferMessageSet(buffer) TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator)) //make sure ByteBufferMessageSet is re-iterable. TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator)) diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index cfc9384b2ec..13a949a4cb3 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -382,9 +382,9 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { mockSyncProducer.send(new TopicMetadataRequest(List(topic))) EasyMock.expectLastCall().andReturn(List(topic1Metadata)) mockSyncProducer.send(TestUtils.produceRequest(topic, 0, messagesToSet(msgs.take(5)))) - EasyMock.expectLastCall().andReturn(new ProducerResponse(ProducerResponse.CurrentVersion, 0, Array(0.toShort), Array(0L))) + EasyMock.expectLastCall().andReturn(new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(0.toShort), Array(0L))) mockSyncProducer.send(TestUtils.produceRequest(topic, 0, messagesToSet(msgs.takeRight(5)))) - EasyMock.expectLastCall().andReturn(new ProducerResponse(ProducerResponse.CurrentVersion, 0, Array(0.toShort), Array(0L))) + EasyMock.expectLastCall().andReturn(new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(0.toShort), Array(0L))) EasyMock.replay(mockSyncProducer) val producerPool = EasyMock.createMock(classOf[ProducerPool]) @@ -442,9 +442,9 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness { // On the third try for partition 0, let it succeed. val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), 0) val response1 = - new ProducerResponse(ProducerResponse.CurrentVersion, 0, Array(ErrorMapping.NotLeaderForPartitionCode.toShort, 0.toShort), Array(0L, 0L)) + new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(ErrorMapping.NotLeaderForPartitionCode.toShort, 0.toShort), Array(0L, 0L)) val request2 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs)) - val response2 = new ProducerResponse(ProducerResponse.CurrentVersion, 0, Array(0.toShort), Array(0L)) + val response2 = new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(0.toShort), Array(0L)) val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer]) EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException EasyMock.expect(mockSyncProducer.send(request1)).andReturn(response1) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 306fa075ead..95f2648fe6d 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -31,7 +31,6 @@ import org.I0Itec.zkclient.ZkClient import kafka.cluster.Broker import collection.mutable.ListBuffer import kafka.consumer.ConsumerConfig -import scala.collection.Map import kafka.api.{TopicData, PartitionData} import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.TimeUnit