mirror of https://github.com/apache/kafka.git
ZK consumer gets into infinite loop if a message is larger than fetch size; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-160
git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1186570 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a0b2aa8847
commit
81f8d0dd83
|
@ -76,7 +76,8 @@ private[consumer] class PartitionTopicInfo(val topic: String,
|
|||
* add an empty message with the exception to the queue so that client can see the error
|
||||
*/
|
||||
def enqueueError(e: Throwable, fetchOffset: Long) = {
|
||||
val messages = new ByteBufferMessageSet(ErrorMapping.EmptyByteBuffer, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
|
||||
val messages = new ByteBufferMessageSet(buffer = ErrorMapping.EmptyByteBuffer, initialOffset = 0,
|
||||
errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
|
||||
chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
|
||||
}
|
||||
|
||||
|
|
|
@ -99,7 +99,7 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
|
|||
logger.trace("size of data = " + size)
|
||||
}
|
||||
if(size < 0 || topIter.remaining < size) {
|
||||
if (currValidBytes == 0 || size < 0)
|
||||
if (currValidBytes == initialOffset || size < 0)
|
||||
throw new InvalidMessageSizeException("invalid message size: " + size + " only received bytes: " +
|
||||
topIter.remaining + " at " + currValidBytes + "( possible causes (1) a single message larger than " +
|
||||
"the fetch size; (2) log corruption )")
|
||||
|
|
|
@ -98,7 +98,7 @@ class ZKConsumerThread(stream: KafkaMessageStream[String]) extends Thread {
|
|||
}
|
||||
}catch {
|
||||
case e:ConsumerTimeoutException => // this is ok
|
||||
case oe: Exception => logger.error(oe)
|
||||
case oe: Exception => logger.error("error in ZKConsumerThread", oe)
|
||||
}
|
||||
shutdownLatch.countDown
|
||||
println("Received " + count + " messages")
|
||||
|
|
|
@ -21,12 +21,31 @@ import java.nio._
|
|||
import junit.framework.Assert._
|
||||
import org.junit.Test
|
||||
import kafka.utils.TestUtils
|
||||
import kafka.common.InvalidMessageSizeException
|
||||
|
||||
class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
|
||||
|
||||
override def createMessageSet(messages: Seq[Message]): ByteBufferMessageSet =
|
||||
new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
|
||||
|
||||
@Test
|
||||
def testSmallFetchSize() {
|
||||
// create a ByteBufferMessageSet that doesn't contain a full message
|
||||
// iterating it should get an InvalidMessageSizeException
|
||||
val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("01234567890123456789".getBytes()))
|
||||
val buffer = messages.serialized.slice
|
||||
buffer.limit(10)
|
||||
val messageSetWithNoFullMessage = new ByteBufferMessageSet(buffer = buffer, initialOffset = 1000)
|
||||
try {
|
||||
for (message <- messageSetWithNoFullMessage)
|
||||
fail("shouldn't see any message")
|
||||
}
|
||||
catch {
|
||||
case e: InvalidMessageSizeException => //this is expected
|
||||
case e2 => fail("shouldn't see any other exceptions")
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
def testValidBytes() {
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue