enable shallow iterator in ByteBufferMessageSet to allow mirroing data without decompression; patched by Jun Rao; reviewed by Joel Koshy; KAFKA-315

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1310595 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jun Rao 2012-04-06 21:18:56 +00:00
parent f8346239d4
commit d9441f0066
6 changed files with 40 additions and 10 deletions

View File

@ -105,5 +105,12 @@ class ConsumerConfig(props: Properties) extends ZKConfig(props) {
val mirrorConsumerNumThreads = Utils.getInt(
props, MirrorConsumerNumThreadsProp, MirrorConsumerNumThreads)
/** Use shallow iterator over compressed messages directly. This feature should be used very carefully.
* Typically, it's only used for mirroring raw messages from one kafka cluster to another to save the
* overhead of decompression.
* */
val enableShallowIterator = Utils.getBoolean(props, "shallowiterator.enable", false)
}

View File

@ -31,7 +31,8 @@ import java.util.concurrent.atomic.AtomicReference
class ConsumerIterator[T](private val topic: String,
private val channel: BlockingQueue[FetchedDataChunk],
consumerTimeoutMs: Int,
private val decoder: Decoder[T])
private val decoder: Decoder[T],
val enableShallowIterator: Boolean)
extends IteratorTemplate[T] with Logging {
private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null)
@ -74,7 +75,8 @@ class ConsumerIterator[T](private val topic: String,
.format(currentTopicInfo.getConsumeOffset, currentDataChunk.fetchOffset, currentTopicInfo))
currentTopicInfo.resetConsumeOffset(currentDataChunk.fetchOffset)
}
localCurrent = currentDataChunk.messages.iterator
localCurrent = if (enableShallowIterator) currentDataChunk.messages.shallowIterator
else currentDataChunk.messages.iterator
current.set(localCurrent)
}
}

View File

@ -27,11 +27,12 @@ import kafka.serializer.Decoder
class KafkaMessageStream[T](val topic: String,
private val queue: BlockingQueue[FetchedDataChunk],
consumerTimeoutMs: Int,
private val decoder: Decoder[T])
private val decoder: Decoder[T],
val enableShallowIterator: Boolean)
extends Iterable[T] with java.lang.Iterable[T]{
private val iter: ConsumerIterator[T] =
new ConsumerIterator[T](topic, queue, consumerTimeoutMs, decoder)
new ConsumerIterator[T](topic, queue, consumerTimeoutMs, decoder, enableShallowIterator)
/**
* Create an iterator over messages in the stream.

View File

@ -178,7 +178,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
for (threadId <- threadIdSet) {
val stream = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
queues.put((topic, threadId), stream)
streamList ::= new KafkaMessageStream[T](topic, stream, config.consumerTimeoutMs, decoder)
streamList ::= new KafkaMessageStream[T](topic, stream, config.consumerTimeoutMs, decoder, config.enableShallowIterator)
}
ret += (topic -> streamList)
debug("adding topic " + topic + " and stream to map..")

View File

@ -78,9 +78,12 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
buffer.reset()
written
}
/** default iterator that iterates over decompressed messages */
override def iterator: Iterator[MessageAndOffset] = internalIterator()
/** iterator over compressed messages without decompressing */
def shallowIterator: Iterator[MessageAndOffset] = internalIterator(true)
def verifyMessageSize(maxMessageSize: Int){
var shallowIter = internalIterator(true)
@ -124,6 +127,9 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
message.limit(size)
topIter.position(topIter.position + size)
val newMessage = new Message(message)
if(!newMessage.isValid)
throw new InvalidMessageException("message is invalid, compression codec: " + newMessage.compressionCodec
+ " size: " + size + " curr offset: " + currValidBytes + " init offset: " + initialOffset)
if(isShallow){
currValidBytes += 4 + size
@ -133,16 +139,12 @@ class ByteBufferMessageSet(private val buffer: ByteBuffer,
else{
newMessage.compressionCodec match {
case NoCompressionCodec =>
if(!newMessage.isValid)
throw new InvalidMessageException("Uncompressed essage is invalid")
debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes))
innerIter = null
currValidBytes += 4 + size
trace("currValidBytes = " + currValidBytes)
new MessageAndOffset(newMessage, currValidBytes)
case _ =>
if(!newMessage.isValid)
throw new InvalidMessageException("Compressed message is invalid")
debug("Message is compressed. Valid byte count = %d".format(currValidBytes))
innerIter = CompressionUtils.decompress(newMessage).internalIterator()
if (!innerIter.hasNext) {

View File

@ -94,6 +94,10 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
//make sure the last offset after iteration is correct
assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.serialized.limit)
//make sure shallow iterator is the same as deep iterator
TestUtils.checkEquals[Message](TestUtils.getMessageIterator(messageSet.shallowIterator),
TestUtils.getMessageIterator(messageSet.iterator))
}
// test for compressed regular messages
@ -104,6 +108,8 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(messageSet.iterator))
//make sure the last offset after iteration is correct
assertEquals("offset of last message not expected", messageSet.last.offset, messageSet.serialized.limit)
verifyShallowIterator(messageSet)
}
// test for mixed empty and non-empty messagesets uncompressed
@ -121,6 +127,10 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
//make sure the last offset after iteration is correct
assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.serialized.limit)
//make sure shallow iterator is the same as deep iterator
TestUtils.checkEquals[Message](TestUtils.getMessageIterator(mixedMessageSet.shallowIterator),
TestUtils.getMessageIterator(mixedMessageSet.iterator))
}
// test for mixed empty and non-empty messagesets compressed
@ -138,7 +148,15 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
TestUtils.checkEquals[Message](messageList.iterator, TestUtils.getMessageIterator(mixedMessageSet.iterator))
//make sure the last offset after iteration is correct
assertEquals("offset of last message not expected", mixedMessageSet.last.offset, mixedMessageSet.serialized.limit)
verifyShallowIterator(mixedMessageSet)
}
}
def verifyShallowIterator(messageSet: ByteBufferMessageSet) {
//make sure the offsets returned by a shallow iterator is a subset of that of a deep iterator
val shallowOffsets = messageSet.shallowIterator.map(msgAndOff => msgAndOff.offset).toSet
val deepOffsets = messageSet.iterator.map(msgAndOff => msgAndOff.offset).toSet
assertTrue(shallowOffsets.subsetOf(deepOffsets))
}
}