Add admin RPC requests; clean up Response objects; patched by Yang Ye; reviewed by Jun Rao; KAFKA-349; KAFKA-336

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1346697 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jun Rao 2012-06-06 01:28:46 +00:00
parent c4a913e3c4
commit f99177766a
30 changed files with 829 additions and 317 deletions

View File

@ -255,7 +255,7 @@ public class KafkaETLContext {
*/
protected boolean hasError(ByteBufferMessageSet messages)
throws IOException {
int errorCode = messages.getErrorCode();
short errorCode = messages.getErrorCode();
if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
/* offset cannot cross the maximum offset (guaranteed by Kafka protocol).
Kafka server may delete old files from time to time */

View File

@ -18,7 +18,6 @@
package kafka.api
import java.nio.ByteBuffer
import kafka.network.Request
import kafka.utils.Utils
import scala.collection.mutable.{HashMap, Buffer, ListBuffer}
import kafka.common.FetchRequestFormatException
@ -105,7 +104,7 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
replicaId: Int = FetchRequest.DefaultReplicaId,
maxWait: Int = FetchRequest.DefaultMaxWait,
minBytes: Int = FetchRequest.DefaultMinBytes,
offsetInfo: Seq[OffsetDetail] ) extends Request(RequestKeys.Fetch) {
offsetInfo: Seq[OffsetDetail] ) extends RequestOrResponse(Some(RequestKeys.Fetch)) {
// ensure that a topic "X" appears in at most one OffsetDetail
def validate() {

View File

@ -26,8 +26,8 @@ import kafka.utils.Utils
object PartitionData {
def readFrom(buffer: ByteBuffer): PartitionData = {
val error = buffer.getShort
val partition = buffer.getInt
val error = buffer.getInt
val initialOffset = buffer.getLong
val hw = buffer.getLong()
val messageSetSize = buffer.getInt
@ -38,21 +38,48 @@ object PartitionData {
}
}
case class PartitionData(partition: Int, error: Int = ErrorMapping.NoError, initialOffset:Long = 0L, hw: Long = -1L,
messages: MessageSet) {
val sizeInBytes = 4 + 4 + 8 + 4 + messages.sizeInBytes.intValue() + 8
case class PartitionData(partition: Int, error: Short = ErrorMapping.NoError, initialOffset:Long = 0L, hw: Long = -1L, messages: MessageSet) {
val sizeInBytes = 4 + 2 + 8 + 4 + messages.sizeInBytes.intValue() + 8
def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, -1L, messages)
}
object TopicData {
// SENDS
class PartitionDataSend(val partitionData: PartitionData) extends Send {
private val messageSize = partitionData.messages.sizeInBytes
private var messagesSentSize = 0L
private val buffer = ByteBuffer.allocate(26)
buffer.putShort(partitionData.error)
buffer.putInt(partitionData.partition)
buffer.putLong(partitionData.initialOffset)
buffer.putLong(partitionData.hw)
buffer.putInt(partitionData.messages.sizeInBytes.intValue())
buffer.rewind()
def complete = !buffer.hasRemaining && messagesSentSize >= messageSize
def writeTo(channel: GatheringByteChannel): Int = {
var written = 0
if(buffer.hasRemaining)
written += channel.write(buffer)
if(!buffer.hasRemaining && messagesSentSize < messageSize) {
val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize).toInt
messagesSentSize += bytesSent
written += bytesSent
}
written
}
}
object TopicData {
def readFrom(buffer: ByteBuffer): TopicData = {
val topic = Utils.readShortString(buffer, "UTF-8")
val partitionCount = buffer.getInt
val partitions = new Array[PartitionData](partitionCount)
for(i <- 0 until partitions.length)
for(i <- 0 until partitionCount)
partitions(i) = PartitionData.readFrom(buffer)
new TopicData(topic, partitions.sortBy(_.partition))
}
@ -90,74 +117,6 @@ case class TopicData(topic: String, partitionData: Array[PartitionData]) {
}
}
object FetchResponse {
val CurrentVersion = 1.shortValue()
def readFrom(buffer: ByteBuffer): FetchResponse = {
val versionId = buffer.getShort
val correlationId = buffer.getInt
val dataCount = buffer.getInt
val data = new Array[TopicData](dataCount)
for(i <- 0 until data.length)
data(i) = TopicData.readFrom(buffer)
new FetchResponse(versionId, correlationId, data)
}
}
case class FetchResponse(versionId: Short, correlationId: Int, data: Array[TopicData]) {
val sizeInBytes = 2 + 4 + data.foldLeft(4)(_ + _.sizeInBytes)
lazy val topicMap = data.groupBy(_.topic).mapValues(_.head)
def messageSet(topic: String, partition: Int): ByteBufferMessageSet = {
val messageSet = topicMap.get(topic) match {
case Some(topicData) =>
TopicData.findPartition(topicData.partitionData, partition).map(_.messages).getOrElse(MessageSet.Empty)
case None =>
MessageSet.Empty
}
messageSet.asInstanceOf[ByteBufferMessageSet]
}
def highWatermark(topic: String, partition: Int): Long = {
topicMap.get(topic) match {
case Some(topicData) =>
TopicData.findPartition(topicData.partitionData, partition).map(_.hw).getOrElse(-1L)
case None => -1L
}
}
}
// SENDS
class PartitionDataSend(val partitionData: PartitionData) extends Send {
private val messageSize = partitionData.messages.sizeInBytes
private var messagesSentSize = 0L
private val buffer = ByteBuffer.allocate(28)
buffer.putInt(partitionData.partition)
buffer.putInt(partitionData.error)
buffer.putLong(partitionData.initialOffset)
buffer.putLong(partitionData.hw)
buffer.putInt(partitionData.messages.sizeInBytes.intValue())
buffer.rewind()
def complete = !buffer.hasRemaining && messagesSentSize >= messageSize
def writeTo(channel: GatheringByteChannel): Int = {
var written = 0
if(buffer.hasRemaining)
written += channel.write(buffer)
if(!buffer.hasRemaining && messagesSentSize < messageSize) {
val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize).toInt
messagesSentSize += bytesSent
written += bytesSent
}
written
}
}
class TopicDataSend(val topicData: TopicData) extends Send {
val size = topicData.sizeInBytes
@ -187,17 +146,60 @@ class TopicDataSend(val topicData: TopicData) extends Send {
}
}
class FetchResponseSend(val fetchResponse: FetchResponse,
val errorCode: Int = ErrorMapping.NoError) extends Send {
object FetchResponse {
def readFrom(buffer: ByteBuffer): FetchResponse = {
val versionId = buffer.getShort
val errorCode = buffer.getShort
val correlationId = buffer.getInt
val dataCount = buffer.getInt
val data = new Array[TopicData](dataCount)
for(i <- 0 until data.length)
data(i) = TopicData.readFrom(buffer)
new FetchResponse(versionId, correlationId, data, errorCode)
}
}
case class FetchResponse(versionId: Short,
correlationId: Int,
data: Array[TopicData],
errorCode: Short = ErrorMapping.NoError) {
val sizeInBytes = 2 + 4 + 2 + data.foldLeft(4)(_ + _.sizeInBytes)
lazy val topicMap = data.groupBy(_.topic).mapValues(_.head)
def messageSet(topic: String, partition: Int): ByteBufferMessageSet = {
val messageSet = topicMap.get(topic) match {
case Some(topicData) =>
TopicData.findPartition(topicData.partitionData, partition).map(_.messages).getOrElse(MessageSet.Empty)
case None =>
MessageSet.Empty
}
messageSet.asInstanceOf[ByteBufferMessageSet]
}
def highWatermark(topic: String, partition: Int): Long = {
topicMap.get(topic) match {
case Some(topicData) =>
TopicData.findPartition(topicData.partitionData, partition).map(_.hw).getOrElse(-1L)
case None => -1L
}
}
}
class FetchResponseSend(val fetchResponse: FetchResponse) extends Send {
private val size = fetchResponse.sizeInBytes
private var sent = 0
private val buffer = ByteBuffer.allocate(16)
buffer.putInt(size + 2)
buffer.putShort(errorCode.shortValue())
buffer.putInt(size)
buffer.putShort(fetchResponse.versionId)
buffer.putShort(fetchResponse.errorCode)
buffer.putInt(fetchResponse.correlationId)
buffer.putInt(fetchResponse.data.length)
buffer.rewind()
@ -220,6 +222,5 @@ class FetchResponseSend(val fetchResponse: FetchResponse,
written
}
def sendSize = 4 + 2 + fetchResponse.sizeInBytes
def sendSize = 4 + fetchResponse.sizeInBytes
}

View File

@ -0,0 +1,107 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import java.nio._
import kafka.utils._
import collection.mutable.Map
import collection.mutable.HashMap
object LeaderAndISR {
def readFrom(buffer: ByteBuffer): LeaderAndISR = {
val leader = buffer.getInt
val leaderGenId = buffer.getInt
val ISRString = Utils.readShortString(buffer, "UTF-8")
val ISR = ISRString.split(",").map(_.toInt).toList
val zkVersion = buffer.getLong
new LeaderAndISR(leader, leaderGenId, ISR, zkVersion)
}
}
case class LeaderAndISR(leader: Int, leaderEpoc: Int, ISR: List[Int], zkVersion: Long){
def writeTo(buffer: ByteBuffer) {
buffer.putInt(leader)
buffer.putInt(leaderEpoc)
Utils.writeShortString(buffer, ISR.mkString(","), "UTF-8")
buffer.putLong(zkVersion)
}
def sizeInBytes(): Int = {
val size = 4 + 4 + (2 + ISR.mkString(",").length) + 8
size
}
}
object LeaderAndISRRequest {
val CurrentVersion = 1.shortValue()
val DefaultClientId = ""
def readFrom(buffer: ByteBuffer): LeaderAndISRRequest = {
val versionId = buffer.getShort
val clientId = Utils.readShortString(buffer)
val isInit = buffer.get()
val ackTimeout = buffer.getInt
val leaderAndISRRequestCount = buffer.getInt
val leaderAndISRInfos = new HashMap[(String, Int), LeaderAndISR]
for(i <- 0 until leaderAndISRRequestCount){
val topic = Utils.readShortString(buffer, "UTF-8")
val partition = buffer.getInt
val leaderAndISRRequest = LeaderAndISR.readFrom(buffer)
leaderAndISRInfos.put((topic, partition), leaderAndISRRequest)
}
new LeaderAndISRRequest(versionId, clientId, isInit, ackTimeout, leaderAndISRInfos)
}
}
case class LeaderAndISRRequest (versionId: Short,
clientId: String,
isInit: Byte,
ackTimeout: Int,
leaderAndISRInfos:
Map[(String, Int), LeaderAndISR])
extends RequestOrResponse(Some(RequestKeys.LeaderAndISRRequest)) {
def this(isInit: Byte, ackTimeout: Int, leaderAndISRInfos: Map[(String, Int), LeaderAndISR]) = {
this(LeaderAndISRRequest.CurrentVersion, LeaderAndISRRequest.DefaultClientId, isInit, ackTimeout, leaderAndISRInfos)
}
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
Utils.writeShortString(buffer, clientId)
buffer.put(isInit)
buffer.putInt(ackTimeout)
buffer.putInt(leaderAndISRInfos.size)
for((key, value) <- leaderAndISRInfos){
Utils.writeShortString(buffer, key._1, "UTF-8")
buffer.putInt(key._2)
value.writeTo(buffer)
}
}
def sizeInBytes(): Int = {
var size = 1 + 2 + (2 + clientId.length) + 4 + 4
for((key, value) <- leaderAndISRInfos)
size += (2 + key._1.length) + 4 + value.sizeInBytes
size
}
}

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import kafka.common.ErrorMapping
import java.nio.ByteBuffer
import kafka.utils.Utils
import collection.mutable.HashMap
import collection.mutable.Map
object LeaderAndISRResponse {
def readFrom(buffer: ByteBuffer): LeaderAndISRResponse = {
val versionId = buffer.getShort
val errorCode = buffer.getShort
val numEntries = buffer.getInt
val responseMap = new HashMap[(String, Int), Short]()
for (i<- 0 until numEntries){
val topic = Utils.readShortString(buffer, "UTF-8")
val partition = buffer.getInt
val partitionErrorCode = buffer.getShort
responseMap.put((topic, partition), partitionErrorCode)
}
new LeaderAndISRResponse(versionId, responseMap, errorCode)
}
}
case class LeaderAndISRResponse(versionId: Short,
responseMap: Map[(String, Int), Short],
errorCode: Short = ErrorMapping.NoError)
extends RequestOrResponse{
def sizeInBytes(): Int ={
var size = 2 + 2 + 4
for ((key, value) <- responseMap){
size += 2 + key._1.length + 4 + 2
}
size
}
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
buffer.putShort(errorCode)
buffer.putInt(responseMap.size)
for ((key:(String, Int), value) <- responseMap){
Utils.writeShortString(buffer, key._1, "UTF-8")
buffer.putInt(key._2)
buffer.putShort(value)
}
}
}

View File

@ -18,83 +18,50 @@
package kafka.api
import java.nio.ByteBuffer
import kafka.utils.{nonthreadsafe, Utils}
import kafka.network.{Send, Request}
import java.nio.channels.GatheringByteChannel
import kafka.common.ErrorMapping
import kafka.utils.Utils
object OffsetRequest {
val CurrentVersion = 1.shortValue()
val DefaultClientId = ""
val SmallestTimeString = "smallest"
val LargestTimeString = "largest"
val LatestTime = -1L
val EarliestTime = -2L
def readFrom(buffer: ByteBuffer): OffsetRequest = {
val versionId = buffer.getShort
val clientId = Utils.readShortString(buffer)
val topic = Utils.readShortString(buffer, "UTF-8")
val partition = buffer.getInt()
val offset = buffer.getLong
val maxNumOffsets = buffer.getInt
new OffsetRequest(topic, partition, offset, maxNumOffsets)
}
def serializeOffsetArray(offsets: Array[Long]): ByteBuffer = {
val size = 4 + 8 * offsets.length
val buffer = ByteBuffer.allocate(size)
buffer.putInt(offsets.length)
for (i <- 0 until offsets.length)
buffer.putLong(offsets(i))
buffer.rewind
buffer
}
def deserializeOffsetArray(buffer: ByteBuffer): Array[Long] = {
val size = buffer.getInt
val offsets = new Array[Long](size)
for (i <- 0 until offsets.length)
offsets(i) = buffer.getLong
offsets
new OffsetRequest(versionId, clientId, topic, partition, offset, maxNumOffsets)
}
}
class OffsetRequest(val topic: String,
val partition: Int,
val time: Long,
val maxNumOffsets: Int) extends Request(RequestKeys.Offsets) {
case class OffsetRequest(versionId: Short = OffsetRequest.CurrentVersion,
clientId: String = OffsetRequest.DefaultClientId,
topic: String,
partition: Int,
time: Long,
maxNumOffsets: Int) extends RequestOrResponse(Some(RequestKeys.Offsets)) {
def this(topic: String, partition: Int, time: Long, maxNumOffsets: Int) =
this(OffsetRequest.CurrentVersion, OffsetRequest.DefaultClientId, topic, partition, time, maxNumOffsets)
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
Utils.writeShortString(buffer, clientId)
Utils.writeShortString(buffer, topic)
buffer.putInt(partition)
buffer.putLong(time)
buffer.putInt(maxNumOffsets)
}
def sizeInBytes(): Int = 2 + topic.length + 4 + 8 + 4
def sizeInBytes(): Int = 2 + (2 + clientId.length()) + (2 + topic.length) + 4 + 8 + 4
override def toString(): String= "OffsetRequest(topic:" + topic + ", part:" + partition + ", time:" + time +
", maxNumOffsets:" + maxNumOffsets + ")"
}
@nonthreadsafe
private[kafka] class OffsetArraySend(offsets: Array[Long]) extends Send {
private var size: Long = offsets.foldLeft(4)((sum, _) => sum + 8)
private val header = ByteBuffer.allocate(6)
header.putInt(size.asInstanceOf[Int] + 2)
header.putShort(ErrorMapping.NoError.asInstanceOf[Short])
header.rewind()
private val contentBuffer = OffsetRequest.serializeOffsetArray(offsets)
var complete: Boolean = false
def writeTo(channel: GatheringByteChannel): Int = {
expectIncomplete()
var written = 0
if(header.hasRemaining)
written += channel.write(header)
if(!header.hasRemaining && contentBuffer.hasRemaining)
written += channel.write(contentBuffer)
if(!contentBuffer.hasRemaining)
complete = true
written
}
override def toString(): String= "OffsetRequest(version:" + versionId + ", client id:" + clientId +
", topic:" + topic + ", part:" + partition + ", time:" + time + ", maxNumOffsets:" + maxNumOffsets + ")"
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import java.nio.ByteBuffer
import kafka.common.ErrorMapping
object OffsetResponse {
def readFrom(buffer: ByteBuffer): OffsetResponse = {
val versionId = buffer.getShort
val errorCode = buffer.getShort
val offsetsSize = buffer.getInt
val offsets = new Array[Long](offsetsSize)
for( i <- 0 until offsetsSize) {
offsets(i) = buffer.getLong
}
new OffsetResponse(versionId, offsets, errorCode)
}
}
case class OffsetResponse(versionId: Short,
offsets: Array[Long],
errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{
val sizeInBytes = 2 + 2 + offsets.foldLeft(4)((sum, _) => sum + 8)
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
/* error code */
buffer.putShort(errorCode)
buffer.putInt(offsets.length)
offsets.foreach(buffer.putLong(_))
}
}

View File

@ -19,7 +19,6 @@ package kafka.api
import java.nio._
import kafka.message._
import kafka.network._
import kafka.utils._
object ProducerRequest {
@ -58,7 +57,7 @@ case class ProducerRequest( versionId: Short,
clientId: String,
requiredAcks: Short,
ackTimeout: Int,
data: Array[TopicData] ) extends Request(RequestKeys.Produce) {
data: Array[TopicData] ) extends RequestOrResponse(Some(RequestKeys.Produce)) {
def this(correlationId: Int, clientId: String, requiredAcks: Short, ackTimeout: Int, data: Array[TopicData]) =
this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeout, data)

View File

@ -18,16 +18,14 @@
package kafka.api
import java.nio.ByteBuffer
import java.nio.channels.GatheringByteChannel
import kafka.common.ErrorMapping
import kafka.network.Send
object ProducerResponse {
val CurrentVersion = 1.shortValue()
def readFrom(buffer: ByteBuffer): ProducerResponse = {
val versionId = buffer.getShort
val correlationId = buffer.getInt
val errorCode = buffer.getShort
val errorsSize = buffer.getInt
val errors = new Array[Short](errorsSize)
for( i <- 0 until errorsSize) {
@ -38,28 +36,21 @@ object ProducerResponse {
for( i <- 0 until offsetsSize) {
offsets(i) = buffer.getLong
}
new ProducerResponse(versionId, correlationId, errors, offsets)
new ProducerResponse(versionId, correlationId, errors, offsets, errorCode)
}
def serializeResponse(producerResponse: ProducerResponse): ByteBuffer = {
val buffer = ByteBuffer.allocate(producerResponse.sizeInBytes)
producerResponse.writeTo(buffer)
buffer.rewind()
buffer
}
def deserializeResponse(buffer: ByteBuffer): ProducerResponse = readFrom(buffer)
}
case class ProducerResponse(versionId: Short, correlationId: Int, errors: Array[Short], offsets: Array[Long]) {
val sizeInBytes = 2 + 4 + (4 + 2 * errors.length) + (4 + 8 * offsets.length)
case class ProducerResponse(versionId: Short, correlationId: Int, errors: Array[Short],
offsets: Array[Long], errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{
val sizeInBytes = 2 + 2 + 4 + (4 + 2 * errors.length) + (4 + 8 * offsets.length)
def writeTo(buffer: ByteBuffer) {
/* version */
/* version id */
buffer.putShort(versionId)
/* correlation id */
buffer.putInt(correlationId)
/* error code */
buffer.putShort(errorCode)
/* errors */
buffer.putInt(errors.length)
errors.foreach(buffer.putShort(_))
@ -68,34 +59,3 @@ case class ProducerResponse(versionId: Short, correlationId: Int, errors: Array[
offsets.foreach(buffer.putLong(_))
}
}
class ProducerResponseSend(val producerResponse: ProducerResponse,
val error: Int = ErrorMapping.NoError) extends Send {
private val header = ByteBuffer.allocate(6)
header.putInt(producerResponse.sizeInBytes + 2)
header.putShort(error.toShort)
header.rewind()
val responseContent = ProducerResponse.serializeResponse(producerResponse)
var complete = false
def writeTo(channel: GatheringByteChannel):Int = {
expectIncomplete()
var written = 0
if(header.hasRemaining)
written += channel.write(header)
trace("Wrote %d bytes for header".format(written))
if(!header.hasRemaining && responseContent.hasRemaining)
written += channel.write(responseContent)
trace("Wrote %d bytes for header, errors and offsets".format(written))
if(!header.hasRemaining && !responseContent.hasRemaining)
complete = true
written
}
}

View File

@ -22,4 +22,6 @@ object RequestKeys {
val Fetch: Short = 1
val Offsets: Short = 2
val TopicMetadata: Short = 3
val LeaderAndISRRequest: Short = 4
val StopReplicaRequest: Short = 5
}

View File

@ -1,3 +1,5 @@
package kafka.api
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@ -15,11 +17,9 @@
* limitations under the License.
*/
package kafka.network
import java.nio._
private[kafka] abstract class Request(val id: Short) {
private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) {
def sizeInBytes: Int

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import java.nio._
import kafka.utils._
import collection.mutable.HashSet
import collection.mutable.Set
object StopReplicaRequest {
val CurrentVersion = 1.shortValue()
val DefaultClientId = ""
def readFrom(buffer: ByteBuffer): StopReplicaRequest = {
val versionId = buffer.getShort
val clientId = Utils.readShortString(buffer)
val ackTimeout = buffer.getInt
val topicPartitionPairCount = buffer.getInt
val topicPartitionPairSet = new HashSet[(String, Int)]()
for (i <- 0 until topicPartitionPairCount){
topicPartitionPairSet.add((Utils.readShortString(buffer, "UTF-8"), buffer.getInt))
}
new StopReplicaRequest(versionId, clientId, ackTimeout, topicPartitionPairSet)
}
}
case class StopReplicaRequest(versionId: Short,
clientId: String,
ackTimeout: Int,
stopReplicaSet: Set[(String, Int)]
) extends RequestOrResponse(Some(RequestKeys.StopReplicaRequest)) {
def this(ackTimeout: Int, stopReplicaSet: Set[(String, Int)]) = {
this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, ackTimeout, stopReplicaSet)
}
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
Utils.writeShortString(buffer, clientId)
buffer.putInt(ackTimeout)
buffer.putInt(stopReplicaSet.size)
for ((topic, partitionId) <- stopReplicaSet){
Utils.writeShortString(buffer, topic, "UTF-8")
buffer.putInt(partitionId)
}
}
def sizeInBytes(): Int = {
var size = 2 + (2 + clientId.length()) + 4 + 4
for ((topic, partitionId) <- stopReplicaSet){
size += (2 + topic.length()) + 4
}
size
}
}

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import java.nio.ByteBuffer
import kafka.utils.Utils
import collection.mutable.HashMap
import collection.mutable.Map
import kafka.common.ErrorMapping
object StopReplicaResponse {
def readFrom(buffer: ByteBuffer): StopReplicaResponse = {
val versionId = buffer.getShort
val errorCode = buffer.getShort
val numEntries = buffer.getInt
val responseMap = new HashMap[(String, Int), Short]()
for (i<- 0 until numEntries){
val topic = Utils.readShortString(buffer, "UTF-8")
val partition = buffer.getInt
val partitionErrorCode = buffer.getShort()
responseMap.put((topic, partition), partitionErrorCode)
}
new StopReplicaResponse(versionId, responseMap, errorCode)
}
}
case class StopReplicaResponse(val versionId: Short,
val responseMap: Map[(String, Int), Short],
val errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{
def sizeInBytes: Int ={
var size = 2 + 2 + 4
for ((key, value) <- responseMap){
size += (2 + key._1.length) + 4 + 2
}
size
}
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
buffer.putShort(errorCode)
buffer.putInt(responseMap.size)
for ((key:(String, Int), value) <- responseMap){
Utils.writeShortString(buffer, key._1, "UTF-8")
buffer.putInt(key._2)
buffer.putShort(value)
}
}
}

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import java.nio.ByteBuffer
import kafka.common.ErrorMapping
object TopicMetaDataResponse {
def readFrom(buffer: ByteBuffer): TopicMetaDataResponse = {
val errorCode = buffer.getShort
val versionId = buffer.getShort
val topicCount = buffer.getInt
val topicsMetadata = new Array[TopicMetadata](topicCount)
for( i <- 0 until topicCount) {
topicsMetadata(i) = TopicMetadata.readFrom(buffer)
}
new TopicMetaDataResponse(versionId, topicsMetadata.toSeq, errorCode)
}
}
case class TopicMetaDataResponse(versionId: Short,
topicsMetadata: Seq[TopicMetadata],
errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse
{
val sizeInBytes = 2 + topicsMetadata.foldLeft(4)(_ + _.sizeInBytes) + 2
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
/* error code */
buffer.putShort(errorCode)
/* topic metadata */
buffer.putInt(topicsMetadata.length)
topicsMetadata.foreach(_.writeTo(buffer))
}
}

View File

@ -19,16 +19,16 @@ package kafka.api
import java.nio.ByteBuffer
import kafka.utils.Utils._
import kafka.network.{Send, Request}
import java.nio.channels.GatheringByteChannel
import kafka.common.ErrorMapping
import collection.mutable.ListBuffer
import kafka.utils._
sealed trait DetailedMetadataRequest { def requestId: Short }
case object SegmentMetadata extends DetailedMetadataRequest { val requestId = 1.asInstanceOf[Short] }
case object NoSegmentMetadata extends DetailedMetadataRequest { val requestId = 0.asInstanceOf[Short] }
object TopicMetadataRequest {
val CurrentVersion = 1.shortValue()
val DefaultClientId = ""
/**
* TopicMetadataRequest has the following format -
@ -48,6 +48,8 @@ object TopicMetadataRequest {
}
def readFrom(buffer: ByteBuffer): TopicMetadataRequest = {
val versionId = buffer.getShort
val clientId = Utils.readShortString(buffer)
val numTopics = getIntInRange(buffer, "number of topics", (0, Int.MaxValue))
val topics = new ListBuffer[String]()
for(i <- 0 until numTopics)
@ -66,37 +68,26 @@ object TopicMetadataRequest {
}
debug("topic = %s, detailed metadata request = %d"
.format(topicsList.head, returnDetailedMetadata.requestId))
new TopicMetadataRequest(topics.toList, returnDetailedMetadata, timestamp, count)
}
def serializeTopicMetadata(topicMetadata: Seq[TopicMetadata]): ByteBuffer = {
val size = topicMetadata.foldLeft(4 /* num topics */)(_ + _.sizeInBytes)
val buffer = ByteBuffer.allocate(size)
debug("Allocating buffer of size %d for topic metadata response".format(size))
/* number of topics */
buffer.putInt(topicMetadata.size)
/* topic partition_metadata */
topicMetadata.foreach(m => m.writeTo(buffer))
buffer.rewind()
buffer
}
def deserializeTopicsMetadataResponse(buffer: ByteBuffer): Seq[TopicMetadata] = {
/* number of topics */
val numTopics = getIntInRange(buffer, "number of topics", (0, Int.MaxValue))
val topicMetadata = new Array[TopicMetadata](numTopics)
for(i <- 0 until numTopics)
topicMetadata(i) = TopicMetadata.readFrom(buffer)
topicMetadata
new TopicMetadataRequest(versionId, clientId, topics.toList, returnDetailedMetadata, timestamp, count)
}
}
case class TopicMetadataRequest(val topics: Seq[String],
case class TopicMetadataRequest(val versionId: Short,
val clientId: String,
val topics: Seq[String],
val detailedMetadata: DetailedMetadataRequest = NoSegmentMetadata,
val timestamp: Option[Long] = None, val count: Option[Int] = None)
extends Request(RequestKeys.TopicMetadata){
extends RequestOrResponse(Some(RequestKeys.TopicMetadata)){
def this(topics: Seq[String]) =
this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics, NoSegmentMetadata, None, None)
def writeTo(buffer: ByteBuffer) {
buffer.putShort(versionId)
Utils.writeShortString(buffer, clientId)
buffer.putInt(topics.size)
topics.foreach(topic => writeShortString(buffer, topic))
buffer.putShort(detailedMetadata.requestId)
@ -110,7 +101,7 @@ case class TopicMetadataRequest(val topics: Seq[String],
}
def sizeInBytes(): Int = {
var size: Int = 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */ +
var size: Int = 2 + (2 + clientId.length) + 4 /* number of topics */ + topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */ +
2 /* detailed metadata */
detailedMetadata match {
case SegmentMetadata =>
@ -121,34 +112,3 @@ case class TopicMetadataRequest(val topics: Seq[String],
size
}
}
class TopicMetadataSend(topicsMetadata: Seq[TopicMetadata]) extends Send {
private var size: Int = topicsMetadata.foldLeft(4)(_ + _.sizeInBytes)
private val header = ByteBuffer.allocate(6)
header.putInt(size + 2)
header.putShort(ErrorMapping.NoError.asInstanceOf[Short])
header.rewind()
val metadata = TopicMetadataRequest.serializeTopicMetadata(topicsMetadata)
metadata.rewind()
trace("Wrote size %d in header".format(size + 2))
var complete: Boolean = false
def writeTo(channel: GatheringByteChannel): Int = {
expectIncomplete()
var written = 0
if(header.hasRemaining)
written += channel.write(header)
trace("Wrote %d bytes for header".format(written))
if(!header.hasRemaining && metadata.hasRemaining)
written += channel.write(metadata)
trace("Wrote %d bytes for header and metadata".format(written))
if(!metadata.hasRemaining)
complete = true
written
}
}

View File

@ -28,36 +28,36 @@ import scala.Predef._
object ErrorMapping {
val EmptyByteBuffer = ByteBuffer.allocate(0)
val UnknownCode = -1
val NoError = 0
val OffsetOutOfRangeCode = 1
val InvalidMessageCode = 2
val InvalidPartitionCode = 3
val InvalidFetchSizeCode = 4
val InvalidFetchRequestFormatCode = 5
val NotLeaderForPartitionCode = 6
val NoLeaderForPartitionCode = 7
val UnknownTopicCode = 8
val UnknownCode : Short = -1
val NoError : Short = 0
val OffsetOutOfRangeCode : Short = 1
val InvalidMessageCode : Short = 2
val InvalidPartitionCode : Short = 3
val InvalidFetchSizeCode : Short = 4
val InvalidFetchRequestFormatCode : Short = 5
val NoLeaderForPartitionCode : Short = 6
val NotLeaderForPartitionCode : Short = 7
val UnknownTopicCode : Short = 8
private val exceptionToCode =
Map[Class[Throwable], Int](
Map[Class[Throwable], Short](
classOf[OffsetOutOfRangeException].asInstanceOf[Class[Throwable]] -> OffsetOutOfRangeCode,
classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode,
classOf[InvalidPartitionException].asInstanceOf[Class[Throwable]] -> InvalidPartitionCode,
classOf[InvalidMessageSizeException].asInstanceOf[Class[Throwable]] -> InvalidFetchSizeCode,
classOf[FetchRequestFormatException].asInstanceOf[Class[Throwable]] -> InvalidFetchRequestFormatCode,
classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode,
classOf[NoLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NoLeaderForPartitionCode
// classOf[UnknownTopicException].asInstanceOf[Class[Throwable]] -> UnknownTopicCode
classOf[NoLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NoLeaderForPartitionCode,
classOf[UnknownTopicException].asInstanceOf[Class[Throwable]] -> UnknownTopicCode
).withDefaultValue(UnknownCode)
/* invert the mapping */
private val codeToException =
(Map[Int, Class[Throwable]]() ++ exceptionToCode.iterator.map(p => (p._2, p._1))).withDefaultValue(classOf[UnknownException])
(Map[Short, Class[Throwable]]() ++ exceptionToCode.iterator.map(p => (p._2, p._1))).withDefaultValue(classOf[UnknownException])
def codeFor(exception: Class[Throwable]): Int = exceptionToCode(exception)
def codeFor(exception: Class[Throwable]): Short = exceptionToCode(exception)
def maybeThrowException(code: Int) =
def maybeThrowException(code: Short) =
if(code != 0)
throw codeToException(code).newInstance()
}

View File

@ -57,10 +57,10 @@ class SimpleConsumer( val host: String,
}
}
private def sendRequest(request: Request): Tuple2[Receive, Int] = {
private def sendRequest(request: RequestOrResponse): Receive = {
lock synchronized {
getOrMakeConnection()
var response: Tuple2[Receive,Int] = null
var response: Receive = null
try {
blockingChannel.send(request)
response = blockingChannel.receive()
@ -92,7 +92,7 @@ class SimpleConsumer( val host: String,
def fetch(request: FetchRequest): FetchResponse = {
val startTime = SystemTime.nanoseconds
val response = sendRequest(request)
val fetchResponse = FetchResponse.readFrom(response._1.buffer)
val fetchResponse = FetchResponse.readFrom(response.buffer)
val fetchedSize = fetchResponse.sizeInBytes
val endTime = SystemTime.nanoseconds
@ -112,7 +112,7 @@ class SimpleConsumer( val host: String,
def getOffsetsBefore(topic: String, partition: Int, time: Long, maxNumOffsets: Int): Array[Long] = {
val request = new OffsetRequest(topic, partition, time, maxNumOffsets)
val response = sendRequest(request)
OffsetRequest.deserializeOffsetArray(response._1.buffer)
OffsetResponse.readFrom(response.buffer).offsets
}
private def getOrMakeConnection() {

View File

@ -16,7 +16,7 @@
*/
package kafka.javaapi
import kafka.network.Request
import kafka.api.RequestOrResponse
import kafka.api.{RequestKeys, TopicData}
import java.nio.ByteBuffer
@ -24,7 +24,7 @@ class ProducerRequest(val correlationId: Int,
val clientId: String,
val requiredAcks: Short,
val ackTimeout: Int,
val data: Array[TopicData]) extends Request(RequestKeys.Produce) {
val data: Array[TopicData]) extends RequestOrResponse(Some(RequestKeys.Produce)) {
val underlying = new kafka.api.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeout, data)

View File

@ -22,7 +22,7 @@ import kafka.message._
class ByteBufferMessageSet(private val buffer: ByteBuffer,
private val initialOffset: Long = 0L,
private val errorCode: Int = ErrorMapping.NoError) extends MessageSet {
private val errorCode: Short = ErrorMapping.NoError) extends MessageSet {
val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer,
initialOffset,
errorCode)

View File

@ -35,7 +35,7 @@ import kafka.common.{MessageSizeTooLargeException, InvalidMessageSizeException,
*/
class ByteBufferMessageSet(private val buffer: ByteBuffer,
private val initialOffset: Long = 0L,
private val errorCode: Int = ErrorMapping.NoError) extends MessageSet with Logging {
private val errorCode: Short = ErrorMapping.NoError) extends MessageSet with Logging {
private var shallowValidByteCount = -1L
if(sizeInBytes > Int.MaxValue)
throw new InvalidMessageSizeException("Message set cannot be larger than " + Int.MaxValue)

View File

@ -20,6 +20,7 @@ package kafka.network
import java.net.InetSocketAddress
import java.nio.channels._
import kafka.utils.{nonthreadsafe, Logging}
import kafka.api.RequestOrResponse
/**
* A simple blocking channel with timeouts correctly enabled.
@ -70,7 +71,7 @@ class BlockingChannel( val host: String,
def isConnected = connected
def send(request: Request):Int = {
def send(request: RequestOrResponse):Int = {
if(!connected)
throw new ClosedChannelException()
@ -78,16 +79,14 @@ class BlockingChannel( val host: String,
send.writeCompletely(writeChannel)
}
def receive(): Tuple2[Receive, Int] = {
def receive(): Receive = {
if(!connected)
throw new ClosedChannelException()
val response = new BoundedByteBufferReceive()
response.readCompletely(readChannel)
// this has the side effect of setting the initial position of buffer correctly
val errorCode: Int = response.buffer.getShort
(response, errorCode)
response
}
}

View File

@ -20,6 +20,7 @@ package kafka.network
import java.nio._
import java.nio.channels._
import kafka.utils._
import kafka.api.RequestOrResponse
@nonthreadsafe
private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send {
@ -37,13 +38,19 @@ private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send
def this(size: Int) = this(ByteBuffer.allocate(size))
def this(request: Request) = {
this(request.sizeInBytes + 2)
buffer.putShort(request.id)
def this(request: RequestOrResponse) = {
this(request.sizeInBytes + (if(request.requestId != None) 2 else 0))
request.requestId match {
case Some(requestId) =>
buffer.putShort(requestId)
case None =>
}
request.writeTo(buffer)
buffer.rewind()
}
def writeTo(channel: GatheringByteChannel): Int = {
expectIncomplete()
var written = channel.write(Array(sizeBuffer, buffer))

View File

@ -19,7 +19,7 @@ package kafka.producer
import kafka.api._
import kafka.message.MessageSet
import kafka.network.{BlockingChannel, BoundedByteBufferSend, Request, Receive}
import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive}
import kafka.utils._
import java.util.Random
import kafka.common.MessageSizeTooLargeException
@ -46,7 +46,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
trace("Instantiating Scala Sync Producer")
private def verifyRequest(request: Request) = {
private def verifyRequest(request: RequestOrResponse) = {
/**
* This seems a little convoluted, but the idea is to turn on verification simply changing log4j settings
* Also, when verification is turned on, care should be taken to see that the logs don't fill up with unnecessary
@ -66,13 +66,13 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
/**
* Common functionality for the public send methods
*/
private def doSend(request: Request): Tuple2[Receive, Int] = {
private def doSend(request: RequestOrResponse): Receive = {
lock synchronized {
verifyRequest(request)
val startTime = SystemTime.nanoseconds
getOrMakeConnection()
var response: Tuple2[Receive, Int] = null
var response: Receive = null
try {
blockingChannel.send(request)
response = blockingChannel.receive()
@ -108,12 +108,13 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging {
}
}
val response = doSend(producerRequest)
ProducerResponse.deserializeResponse(response._1.buffer)
ProducerResponse.readFrom(response.buffer)
}
def send(request: TopicMetadataRequest): Seq[TopicMetadata] = {
val response = doSend(request)
TopicMetadataRequest.deserializeTopicsMetadataResponse(response._1.buffer)
val topicMetaDataResponse = TopicMetaDataResponse.readFrom(response.buffer)
topicMetaDataResponse.topicsMetadata
}
def close() = {

View File

@ -28,8 +28,11 @@ import kafka.network._
import kafka.utils.{SystemTime, Logging}
import org.apache.log4j.Logger
import scala.collection._
import mutable.HashMap
import scala.math._
import java.lang.IllegalStateException
import kafka.network.RequestChannel.Response
/**
* Logic to handle the various Kafka requests
@ -50,10 +53,40 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
case RequestKeys.Fetch => handleFetchRequest(request)
case RequestKeys.Offsets => handleOffsetRequest(request)
case RequestKeys.TopicMetadata => handleTopicMetadataRequest(request)
case RequestKeys.LeaderAndISRRequest => handleLeaderAndISRRequest(request)
case RequestKeys.StopReplicaRequest => handleStopReplicaRequest(request)
case _ => throw new IllegalStateException("No mapping found for handler id " + apiId)
}
}
def handleLeaderAndISRRequest(request: RequestChannel.Request){
val leaderAndISRRequest = LeaderAndISRRequest.readFrom(request.request.buffer)
val responseMap = new HashMap[(String, Int), Short]
// TODO: put in actual logic later
for((key, value) <- leaderAndISRRequest.leaderAndISRInfos){
responseMap.put(key, ErrorMapping.NoError)
}
val leaderAndISRResponse = new LeaderAndISRResponse(leaderAndISRRequest.versionId, responseMap)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndISRResponse), -1))
}
def handleStopReplicaRequest(request: RequestChannel.Request){
val stopReplicaRequest = StopReplicaRequest.readFrom(request.request.buffer)
val responseMap = new HashMap[(String, Int), Short]
// TODO: put in actual logic later
for((topic, partition) <- stopReplicaRequest.stopReplicaSet){
responseMap.put((topic, partition), ErrorMapping.NoError)
}
val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, responseMap)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse), -1))
}
/**
* Handle a produce request
*/
@ -65,7 +98,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
val response = produce(produceRequest)
debug("kafka produce time " + (SystemTime.milliseconds - sTime) + " ms")
requestChannel.sendResponse(new RequestChannel.Response(request, new ProducerResponseSend(response), -1))
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response), -1))
// Now check any outstanding fetches this produce just unblocked
var satisfied = new mutable.ArrayBuffer[DelayedFetch]
@ -77,7 +110,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
for(fetchReq <- satisfied) {
val topicData = readMessageSets(fetchReq.fetch)
val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData)
requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response), -1))
}
}
@ -115,7 +148,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
}
}
}
new ProducerResponse(ProducerResponse.CurrentVersion, request.correlationId, errors, offsets)
new ProducerResponse(request.versionId, request.correlationId, errors, offsets)
}
/**
@ -131,9 +164,8 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
fetchRequest.validate()
} catch {
case e:FetchRequestFormatException =>
val response = new FetchResponse(FetchResponse.CurrentVersion, fetchRequest.correlationId, Array.empty)
val channelResponse = new RequestChannel.Response(request, new FetchResponseSend(response,
ErrorMapping.InvalidFetchRequestFormatCode), -1)
val response = new FetchResponse(fetchRequest.versionId, fetchRequest.correlationId, Array.empty)
val channelResponse = new RequestChannel.Response(request, new FetchResponseSend(response), -1)
requestChannel.sendResponse(channelResponse)
}
@ -147,7 +179,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
debug("Returning fetch response %s for fetch request with correlation id %d"
.format(topicData.map(_.partitionData.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId))
val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData)
requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response), -1))
} else {
// create a list of (topic, partition) pairs to use as keys for this delayed request
val keys: Seq[Any] = fetchRequest.offsetInfo.flatMap(o => o.partitions.map((o.topic, _)))
@ -240,8 +272,8 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
/**
* Read from a single topic/partition at the given offset
*/
private def readMessageSet(topic: String, partition: Int, offset: Long, maxSize: Int): Either[Int, MessageSet] = {
var response: Either[Int, MessageSet] = null
private def readMessageSet(topic: String, partition: Int, offset: Long, maxSize: Int): Either[Short, MessageSet] = {
var response: Either[Short, MessageSet] = null
try {
// check if the current broker is the leader for the partitions
kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topic, partition)
@ -264,8 +296,8 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
if(requestLogger.isTraceEnabled)
requestLogger.trace("Offset request " + offsetRequest.toString)
val offsets = logManager.getOffsets(offsetRequest)
val response = new OffsetArraySend(offsets)
requestChannel.sendResponse(new RequestChannel.Response(request, response, -1))
val response = new OffsetResponse(offsetRequest.versionId, offsets)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response), -1))
}
/**
@ -303,7 +335,8 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
}
}
info("Sending response for topic metadata request")
requestChannel.sendResponse(new RequestChannel.Response(request, new TopicMetadataSend(topicsMetadata), -1))
val response = new TopicMetaDataResponse(metadataRequest.versionId, topicsMetadata.toSeq)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response), -1))
}
def close() {
@ -337,7 +370,7 @@ class KafkaApis(val requestChannel: RequestChannel, val logManager: LogManager,
def expire(delayed: DelayedFetch) {
val topicData = readMessageSets(delayed.fetch)
val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData)
requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response, ErrorMapping.NoError), -1))
requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response), -1))
}
}
}

View File

@ -29,13 +29,13 @@ import kafka.common.ErrorMapping
* wholly in kernel space
*/
@nonthreadsafe
private[server] class MessageSetSend(val messages: MessageSet, val errorCode: Int) extends Send {
private[server] class MessageSetSend(val messages: MessageSet, val errorCode: Short) extends Send {
private var sent: Long = 0
private var size: Long = messages.sizeInBytes
private val header = ByteBuffer.allocate(6)
header.putInt(size.asInstanceOf[Int] + 2)
header.putShort(errorCode.asInstanceOf[Short])
header.putShort(errorCode)
header.rewind()
var complete: Boolean = false

View File

@ -0,0 +1,173 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.controller
import org.scalatest.junit.JUnit3Suite
import junit.framework.Assert._
import java.nio.ByteBuffer
import kafka.common.ErrorMapping
import kafka.api._
import collection.mutable.Map
import collection.mutable.Set
import kafka.integration.KafkaServerTestHarness
import kafka.utils.TestUtils
import kafka.server.KafkaConfig
import kafka.network.{Receive, BlockingChannel}
class ControllerToBrokerRequestTest extends JUnit3Suite with KafkaServerTestHarness {
val kafkaProps = TestUtils.createBrokerConfigs(1)
val configs = List(new KafkaConfig(kafkaProps.head))
var blockingChannel: BlockingChannel = null
override def setUp() {
super.setUp()
blockingChannel = new BlockingChannel("localhost", configs.head.port, 1000000, 0, 64*1024)
blockingChannel.connect
}
override def tearDown() {
super.tearDown()
blockingChannel.disconnect()
}
def createSampleLeaderAndISRRequest() : LeaderAndISRRequest = {
val topic1 = "test1"
val topic2 = "test2"
val leader1 = 1;
val ISR1 = List(1, 2, 3)
val leader2 = 2;
val ISR2 = List(2, 3, 4)
val leaderAndISR1 = new LeaderAndISR(leader1, 1, ISR1, 1)
val leaderAndISR2 = new LeaderAndISR(leader2, 1, ISR2, 2)
val map = Map(((topic1, 1), leaderAndISR1), ((topic1, 2), leaderAndISR1),
((topic2, 1), leaderAndISR2), ((topic2, 2), leaderAndISR2))
new LeaderAndISRRequest(1, "client 1", 1, 4, map)
}
def createSampleLeaderAndISRResponse() : LeaderAndISRResponse = {
val topic1 = "test1"
val topic2 = "test2"
val responseMap = Map(((topic1, 1), ErrorMapping.NoError), ((topic1, 2), ErrorMapping.NoError),
((topic2, 1), ErrorMapping.NoError), ((topic2, 2), ErrorMapping.NoError))
new LeaderAndISRResponse(1, responseMap)
}
def createSampleStopReplicaRequest() : StopReplicaRequest = {
val topic1 = "test1"
val topic2 = "test2"
new StopReplicaRequest(1, "client 1", 1000, Set((topic1, 1), (topic1, 2),
(topic2, 1), (topic2, 2)))
}
def createSampleStopReplicaResponse() : StopReplicaResponse = {
val topic1 = "test1"
val topic2 = "test2"
val responseMap = Map(((topic1, 1), ErrorMapping.NoError), ((topic1, 2), ErrorMapping.NoError),
((topic2, 1), ErrorMapping.NoError), ((topic2, 2), ErrorMapping.NoError))
new StopReplicaResponse(1, responseMap)
}
def testLeaderAndISRRequest {
val leaderAndISRRequest = createSampleLeaderAndISRRequest()
val serializedLeaderAndISRRequest = ByteBuffer.allocate(leaderAndISRRequest.sizeInBytes)
leaderAndISRRequest.writeTo(serializedLeaderAndISRRequest)
serializedLeaderAndISRRequest.rewind()
val deserializedLeaderAndISRRequest = LeaderAndISRRequest.readFrom(serializedLeaderAndISRRequest)
assertEquals(leaderAndISRRequest, deserializedLeaderAndISRRequest)
}
def testLeaderAndISRResponse {
val leaderAndISRResponse = createSampleLeaderAndISRResponse()
val serializedLeaderAndISRResponse = ByteBuffer.allocate(leaderAndISRResponse.sizeInBytes)
leaderAndISRResponse.writeTo(serializedLeaderAndISRResponse)
serializedLeaderAndISRResponse.rewind()
val deserializedLeaderAndISRResponse = LeaderAndISRResponse.readFrom(serializedLeaderAndISRResponse)
assertEquals(leaderAndISRResponse, deserializedLeaderAndISRResponse)
}
def testStopReplicaRequest {
val stopReplicaRequest = createSampleStopReplicaRequest()
val serializedStopReplicaRequest = ByteBuffer.allocate(stopReplicaRequest.sizeInBytes)
stopReplicaRequest.writeTo(serializedStopReplicaRequest)
serializedStopReplicaRequest.rewind()
val deserializedStopReplicaRequest = StopReplicaRequest.readFrom(serializedStopReplicaRequest)
assertEquals(stopReplicaRequest, deserializedStopReplicaRequest)
}
def testStopReplicaResponse {
val stopReplicaResponse = createSampleStopReplicaResponse()
val serializedStopReplicaResponse = ByteBuffer.allocate(stopReplicaResponse.sizeInBytes)
stopReplicaResponse.writeTo(serializedStopReplicaResponse)
serializedStopReplicaResponse.rewind()
val deserializedStopReplicaResponse = StopReplicaResponse.readFrom(serializedStopReplicaResponse)
assertEquals(stopReplicaResponse, deserializedStopReplicaResponse)
}
def testEndToEndLeaderAndISRRequest {
val leaderAndISRRequest = createSampleLeaderAndISRRequest()
var response: Receive = null
blockingChannel.send(leaderAndISRRequest)
response = blockingChannel.receive()
val leaderAndISRResponse = LeaderAndISRResponse.readFrom(response.buffer)
val expectedLeaderAndISRResponse = createSampleLeaderAndISRResponse()
assertEquals(leaderAndISRResponse, expectedLeaderAndISRResponse)
}
def testEndToEndStopReplicaRequest {
val stopReplicaRequest = createSampleStopReplicaRequest()
var response: Receive = null
blockingChannel.send(stopReplicaRequest)
response = blockingChannel.receive()
val stopReplicaResponse = StopReplicaResponse.readFrom(response.buffer)
val expectedStopReplicaResponse = createSampleStopReplicaResponse()
assertEquals(stopReplicaResponse, expectedStopReplicaResponse)
}
}

View File

@ -25,12 +25,13 @@ import kafka.log.LogManager
import junit.framework.Assert._
import org.easymock.EasyMock
import kafka.network._
import kafka.api.{TopicMetadataSend, TopicMetadataRequest}
import kafka.api.{TopicMetaDataResponse, TopicMetadataRequest}
import kafka.cluster.Broker
import kafka.utils.TestUtils
import kafka.utils.TestUtils._
import kafka.server.{ReplicaManager, KafkaZooKeeper, KafkaApis, KafkaConfig}
class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
val props = createBrokerConfigs(1)
val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
@ -104,10 +105,10 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
// call the API (to be tested) to get metadata
apis.handleTopicMetadataRequest(new RequestChannel.Request(processor=0, requestKey=5, request=receivedRequest, start=1))
val metadataResponse = requestChannel.receiveResponse(0).response.asInstanceOf[TopicMetadataSend].metadata
val metadataResponse = requestChannel.receiveResponse(0).response.asInstanceOf[BoundedByteBufferSend].buffer
// check assertions
val topicMetadata = TopicMetadataRequest.deserializeTopicsMetadataResponse(metadataResponse)
val topicMetadata = TopicMetaDataResponse.readFrom(metadataResponse).topicsMetadata
assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
val partitionMetadata = topicMetadata.head.partitionsMetadata

View File

@ -121,7 +121,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
buffer.put(emptyMessageSet.getSerialized())
buffer.put(regularMessgeSet.getSerialized())
buffer.rewind
val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0)
val mixedMessageSet = new ByteBufferMessageSet(buffer)
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
//make sure ByteBufferMessageSet is re-iterable.
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
@ -142,7 +142,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
buffer.put(emptyMessageSet.getSerialized())
buffer.put(regularMessgeSet.getSerialized())
buffer.rewind
val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0)
val mixedMessageSet = new ByteBufferMessageSet(buffer)
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
//make sure ByteBufferMessageSet is re-iterable.
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))

View File

@ -382,9 +382,9 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
mockSyncProducer.send(new TopicMetadataRequest(List(topic)))
EasyMock.expectLastCall().andReturn(List(topic1Metadata))
mockSyncProducer.send(TestUtils.produceRequest(topic, 0, messagesToSet(msgs.take(5))))
EasyMock.expectLastCall().andReturn(new ProducerResponse(ProducerResponse.CurrentVersion, 0, Array(0.toShort), Array(0L)))
EasyMock.expectLastCall().andReturn(new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(0.toShort), Array(0L)))
mockSyncProducer.send(TestUtils.produceRequest(topic, 0, messagesToSet(msgs.takeRight(5))))
EasyMock.expectLastCall().andReturn(new ProducerResponse(ProducerResponse.CurrentVersion, 0, Array(0.toShort), Array(0L)))
EasyMock.expectLastCall().andReturn(new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(0.toShort), Array(0L)))
EasyMock.replay(mockSyncProducer)
val producerPool = EasyMock.createMock(classOf[ProducerPool])
@ -442,9 +442,9 @@ class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
// On the third try for partition 0, let it succeed.
val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), 0)
val response1 =
new ProducerResponse(ProducerResponse.CurrentVersion, 0, Array(ErrorMapping.NotLeaderForPartitionCode.toShort, 0.toShort), Array(0L, 0L))
new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(ErrorMapping.NotLeaderForPartitionCode.toShort, 0.toShort), Array(0L, 0L))
val request2 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs))
val response2 = new ProducerResponse(ProducerResponse.CurrentVersion, 0, Array(0.toShort), Array(0L))
val response2 = new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(0.toShort), Array(0L))
val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException
EasyMock.expect(mockSyncProducer.send(request1)).andReturn(response1)

View File

@ -31,7 +31,6 @@ import org.I0Itec.zkclient.ZkClient
import kafka.cluster.Broker
import collection.mutable.ListBuffer
import kafka.consumer.ConsumerConfig
import scala.collection.Map
import kafka.api.{TopicData, PartitionData}
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.TimeUnit