KAFKA-171 Do a single write for request sends.

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1205313 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Edward Jay Kreps 2011-11-23 07:29:37 +00:00
parent 1280b71a35
commit cb4486656e
9 changed files with 22 additions and 25 deletions

View File

@ -20,7 +20,7 @@ package kafka.api
import java.nio.ByteBuffer
import kafka.utils.{nonthreadsafe, Utils}
import kafka.network.{Send, Request}
import java.nio.channels.WritableByteChannel
import java.nio.channels.GatheringByteChannel
import kafka.common.ErrorMapping
object OffsetRequest {
@ -85,7 +85,7 @@ private[kafka] class OffsetArraySend(offsets: Array[Long]) extends Send {
var complete: Boolean = false
def writeTo(channel: WritableByteChannel): Int = {
def writeTo(channel: GatheringByteChannel): Int = {
expectIncomplete()
var written = 0
if(header.hasRemaining)

View File

@ -17,7 +17,7 @@
package kafka.javaapi.message
import java.nio.channels.WritableByteChannel
import java.nio.channels._
import kafka.message.{MessageAndOffset, InvalidMessageException, Message}
/**

View File

@ -21,7 +21,7 @@ import scala.collection.mutable
import kafka.utils.Logging
import kafka.common.{InvalidMessageSizeException, ErrorMapping}
import java.nio.ByteBuffer
import java.nio.channels.WritableByteChannel
import java.nio.channels._
import kafka.utils.IteratorTemplate
/**
@ -71,7 +71,7 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
}
/** Write the messages in this set to the given channel */
def writeTo(channel: WritableByteChannel, offset: Long, size: Long): Long =
def writeTo(channel: GatheringByteChannel, offset: Long, size: Long): Long =
channel.write(buffer.duplicate)
override def iterator: Iterator[MessageAndOffset] = deepIterator

View File

@ -100,7 +100,7 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel,
/**
* Write some of this set to the given channel, return the ammount written
*/
def writeTo(destChannel: WritableByteChannel, writeOffset: Long, size: Long): Long =
def writeTo(destChannel: GatheringByteChannel, writeOffset: Long, size: Long): Long =
channel.transferTo(offset + writeOffset, scala.math.min(size, sizeInBytes), destChannel)
/**

View File

@ -89,7 +89,7 @@ abstract class MessageSet extends Iterable[MessageAndOffset] {
/** Write the messages in this set to the given channel starting at the given offset byte.
* Less than the complete amount may be written, but no more than maxSize can be. The number
* of bytes written is returned */
def writeTo(channel: WritableByteChannel, offset: Long, maxSize: Long): Long
def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Long): Long
/**
* Provides an iterator over the messages in this set

View File

@ -26,6 +26,10 @@ private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send
private var sizeBuffer = ByteBuffer.allocate(4)
// Avoid possibility of overflow for 2GB-4 byte buffer
if(buffer.remaining > Int.MaxValue - sizeBuffer.limit)
throw new IllegalArgumentException("Attempt to create a bounded buffer of " + buffer.remaining + " bytes, but the maximum " +
"allowable size for a bounded buffer is " + (Int.MaxValue - sizeBuffer.limit) + ".")
sizeBuffer.putInt(buffer.limit)
sizeBuffer.rewind()
@ -40,20 +44,13 @@ private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send
buffer.rewind()
}
def writeTo(channel: WritableByteChannel): Int = {
def writeTo(channel: GatheringByteChannel): Int = {
expectIncomplete()
var written = 0
// try to write the size if we haven't already
if(sizeBuffer.hasRemaining)
written += channel.write(sizeBuffer)
// try to write the actual buffer itself
if(!sizeBuffer.hasRemaining && buffer.hasRemaining)
written += channel.write(buffer)
var written = channel.write(Array(sizeBuffer, buffer))
// if we are done, mark it off
if(!buffer.hasRemaining)
complete = true
written
written.asInstanceOf[Int]
}
}

View File

@ -28,7 +28,7 @@ private[kafka] class ByteBufferSend(val buffer: ByteBuffer) extends Send {
def this(size: Int) = this(ByteBuffer.allocate(size))
def writeTo(channel: WritableByteChannel): Int = {
def writeTo(channel: GatheringByteChannel): Int = {
expectIncomplete()
var written = 0
written += channel.write(buffer)

View File

@ -65,9 +65,9 @@ private[kafka] trait Receive extends Transmission {
*/
private[kafka] trait Send extends Transmission {
def writeTo(channel: WritableByteChannel): Int
def writeTo(channel: GatheringByteChannel): Int
def writeCompletely(channel: WritableByteChannel): Int = {
def writeCompletely(channel: GatheringByteChannel): Int = {
var written = 0
while(!complete) {
written = writeTo(channel)
@ -86,7 +86,7 @@ abstract class MultiSend[S <: Send](val sends: List[S]) extends Send {
private var current = sends
var totalWritten = 0
def writeTo(channel: WritableByteChannel): Int = {
def writeTo(channel: GatheringByteChannel): Int = {
expectIncomplete
val written = current.head.writeTo(channel)
totalWritten += written

View File

@ -44,7 +44,7 @@ private[server] class MessageSetSend(val messages: MessageSet, val errorCode: In
def this() = this(MessageSet.Empty)
def writeTo(channel: WritableByteChannel): Int = {
def writeTo(channel: GatheringByteChannel): Int = {
expectIncomplete()
var written = 0
if(header.hasRemaining)