CallbackHandler.afterDequeuingExistingData is not called during event queue timeout; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-326

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1310482 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jun Rao 2012-04-06 16:59:44 +00:00
parent ba8fd9f1ee
commit f8346239d4
1 changed files with 11 additions and 8 deletions

View File

@ -73,18 +73,21 @@ private[async] class ProducerSendThread[T](val threadName: String,
// check if the queue time is reached. This happens when the poll method above returns after a timeout and
// returns a null object
val expired = currentQueueItem == null
if(currentQueueItem != null) {
if(currentQueueItem != null)
trace("Dequeued item for topic %s and partition %d"
.format(currentQueueItem.getTopic, currentQueueItem.getPartition))
// handle the dequeued current item
if(cbkHandler != null)
events = events ++ cbkHandler.afterDequeuingExistingData(currentQueueItem)
else
events += currentQueueItem
// check if the batch size is reached
full = events.size >= batchSize
// handle the dequeued current item
if(cbkHandler != null)
events = events ++ cbkHandler.afterDequeuingExistingData(currentQueueItem)
else {
if (currentQueueItem != null)
events += currentQueueItem
}
// check if the batch size is reached
full = events.size >= batchSize
if(full || expired) {
if(expired) debug(elapsed + " ms elapsed. Queue time reached. Sending..")
if(full) debug("Batch full. Sending..")