diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala index fe5151ad650..747d205690c 100644 --- a/core/src/main/scala/kafka/api/OffsetRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetRequest.scala @@ -20,7 +20,7 @@ package kafka.api import java.nio.ByteBuffer import kafka.utils.{nonthreadsafe, Utils} import kafka.network.{Send, Request} -import java.nio.channels.WritableByteChannel +import java.nio.channels.GatheringByteChannel import kafka.common.ErrorMapping object OffsetRequest { @@ -85,7 +85,7 @@ private[kafka] class OffsetArraySend(offsets: Array[Long]) extends Send { var complete: Boolean = false - def writeTo(channel: WritableByteChannel): Int = { + def writeTo(channel: GatheringByteChannel): Int = { expectIncomplete() var written = 0 if(header.hasRemaining) diff --git a/core/src/main/scala/kafka/javaapi/message/MessageSet.scala b/core/src/main/scala/kafka/javaapi/message/MessageSet.scala index be8a1b43cbf..4a3f19357ae 100644 --- a/core/src/main/scala/kafka/javaapi/message/MessageSet.scala +++ b/core/src/main/scala/kafka/javaapi/message/MessageSet.scala @@ -17,7 +17,7 @@ package kafka.javaapi.message -import java.nio.channels.WritableByteChannel +import java.nio.channels._ import kafka.message.{MessageAndOffset, InvalidMessageException, Message} /** diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala index a1525318a91..75da3027015 100644 --- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import kafka.utils.Logging import kafka.common.{InvalidMessageSizeException, ErrorMapping} import java.nio.ByteBuffer -import java.nio.channels.WritableByteChannel +import java.nio.channels._ import kafka.utils.IteratorTemplate /** @@ -71,7 +71,7 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer, } /** Write the messages in this set to the given channel */ - def writeTo(channel: WritableByteChannel, offset: Long, size: Long): Long = + def writeTo(channel: GatheringByteChannel, offset: Long, size: Long): Long = channel.write(buffer.duplicate) override def iterator: Iterator[MessageAndOffset] = deepIterator diff --git a/core/src/main/scala/kafka/message/FileMessageSet.scala b/core/src/main/scala/kafka/message/FileMessageSet.scala index c2cd8ebe821..edfe8efa21f 100644 --- a/core/src/main/scala/kafka/message/FileMessageSet.scala +++ b/core/src/main/scala/kafka/message/FileMessageSet.scala @@ -100,7 +100,7 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel, /** * Write some of this set to the given channel, return the ammount written */ - def writeTo(destChannel: WritableByteChannel, writeOffset: Long, size: Long): Long = + def writeTo(destChannel: GatheringByteChannel, writeOffset: Long, size: Long): Long = channel.transferTo(offset + writeOffset, scala.math.min(size, sizeInBytes), destChannel) /** diff --git a/core/src/main/scala/kafka/message/MessageSet.scala b/core/src/main/scala/kafka/message/MessageSet.scala index 807de4af158..bf45d91fda8 100644 --- a/core/src/main/scala/kafka/message/MessageSet.scala +++ b/core/src/main/scala/kafka/message/MessageSet.scala @@ -89,7 +89,7 @@ abstract class MessageSet extends Iterable[MessageAndOffset] { /** Write the messages in this set to the given channel starting at the given offset byte. * Less than the complete amount may be written, but no more than maxSize can be. The number * of bytes written is returned */ - def writeTo(channel: WritableByteChannel, offset: Long, maxSize: Long): Long + def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Long): Long /** * Provides an iterator over the messages in this set diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala b/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala index da3080951d6..5e1eb5f05ae 100644 --- a/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala +++ b/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala @@ -25,10 +25,14 @@ import kafka.utils._ private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send { private var sizeBuffer = ByteBuffer.allocate(4) - + + // Avoid possibility of overflow for 2GB-4 byte buffer + if(buffer.remaining > Int.MaxValue - sizeBuffer.limit) + throw new IllegalArgumentException("Attempt to create a bounded buffer of " + buffer.remaining + " bytes, but the maximum " + + "allowable size for a bounded buffer is " + (Int.MaxValue - sizeBuffer.limit) + ".") sizeBuffer.putInt(buffer.limit) sizeBuffer.rewind() - + var complete: Boolean = false def this(size: Int) = this(ByteBuffer.allocate(size)) @@ -40,20 +44,13 @@ private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send buffer.rewind() } - def writeTo(channel: WritableByteChannel): Int = { + def writeTo(channel: GatheringByteChannel): Int = { expectIncomplete() - var written = 0 - // try to write the size if we haven't already - if(sizeBuffer.hasRemaining) - written += channel.write(sizeBuffer) - // try to write the actual buffer itself - if(!sizeBuffer.hasRemaining && buffer.hasRemaining) - written += channel.write(buffer) + var written = channel.write(Array(sizeBuffer, buffer)) // if we are done, mark it off if(!buffer.hasRemaining) - complete = true - - written + complete = true + written.asInstanceOf[Int] } } diff --git a/core/src/main/scala/kafka/network/ByteBufferSend.scala b/core/src/main/scala/kafka/network/ByteBufferSend.scala index d088442550f..af30042a4c7 100644 --- a/core/src/main/scala/kafka/network/ByteBufferSend.scala +++ b/core/src/main/scala/kafka/network/ByteBufferSend.scala @@ -28,7 +28,7 @@ private[kafka] class ByteBufferSend(val buffer: ByteBuffer) extends Send { def this(size: Int) = this(ByteBuffer.allocate(size)) - def writeTo(channel: WritableByteChannel): Int = { + def writeTo(channel: GatheringByteChannel): Int = { expectIncomplete() var written = 0 written += channel.write(buffer) diff --git a/core/src/main/scala/kafka/network/Transmission.scala b/core/src/main/scala/kafka/network/Transmission.scala index 16e3702f3e3..13f1b198790 100644 --- a/core/src/main/scala/kafka/network/Transmission.scala +++ b/core/src/main/scala/kafka/network/Transmission.scala @@ -65,9 +65,9 @@ private[kafka] trait Receive extends Transmission { */ private[kafka] trait Send extends Transmission { - def writeTo(channel: WritableByteChannel): Int + def writeTo(channel: GatheringByteChannel): Int - def writeCompletely(channel: WritableByteChannel): Int = { + def writeCompletely(channel: GatheringByteChannel): Int = { var written = 0 while(!complete) { written = writeTo(channel) @@ -86,7 +86,7 @@ abstract class MultiSend[S <: Send](val sends: List[S]) extends Send { private var current = sends var totalWritten = 0 - def writeTo(channel: WritableByteChannel): Int = { + def writeTo(channel: GatheringByteChannel): Int = { expectIncomplete val written = current.head.writeTo(channel) totalWritten += written diff --git a/core/src/main/scala/kafka/server/MessageSetSend.scala b/core/src/main/scala/kafka/server/MessageSetSend.scala index 76cd941ea16..e300ad038e6 100644 --- a/core/src/main/scala/kafka/server/MessageSetSend.scala +++ b/core/src/main/scala/kafka/server/MessageSetSend.scala @@ -44,7 +44,7 @@ private[server] class MessageSetSend(val messages: MessageSet, val errorCode: In def this() = this(MessageSet.Empty) - def writeTo(channel: WritableByteChannel): Int = { + def writeTo(channel: GatheringByteChannel): Int = { expectIncomplete() var written = 0 if(header.hasRemaining)