Hadoop Consumer goes into an infinite loop; patched by Sam William; reviewed by Richard Park; KAFKA-131

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1166424 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jun Rao 2011-09-07 21:47:26 +00:00
parent c8e507aba4
commit d42450715c
1 changed files with 9 additions and 7 deletions

View File

@ -49,7 +49,7 @@ public class KafkaETLContext {
final static int DEFAULT_TIMEOUT = 60000; // one minute
final static KafkaETLKey DUMMY_KEY = new KafkaETLKey();
protected int _index; /*index of context*/
protected String _input = null; /*input string*/
protected KafkaETLRequest _request = null;
@ -61,7 +61,7 @@ public class KafkaETLContext {
protected MultiFetchResponse _response = null; /*fetch response*/
protected Iterator<MessageAndOffset> _messageIt = null; /*message iterator*/
protected Iterator<ByteBufferMessageSet> _respIterator = null;
protected int _retry = 0;
protected long _requestTime = 0; /*accumulative request time*/
protected long _startTime = -1;
@ -125,7 +125,7 @@ public class KafkaETLContext {
public boolean hasMore () {
return _messageIt != null && _messageIt.hasNext()
|| _response != null && _response.iterator().hasNext()
|| _response != null && _respIterator.hasNext()
|| _offset < _offsetRange[1];
}
@ -135,9 +135,9 @@ public class KafkaETLContext {
boolean gotNext = get(key, value);
if(_response != null) {
Iterator<ByteBufferMessageSet> iter = _response.iterator();
while ( !gotNext && iter.hasNext()) {
ByteBufferMessageSet msgSet = iter.next();
while ( !gotNext && _respIterator.hasNext()) {
ByteBufferMessageSet msgSet = _respIterator.next();
if ( hasError(msgSet)) return false;
_messageIt = (Iterator<MessageAndOffset>) msgSet.iterator();
gotNext = get(key, value);
@ -156,6 +156,8 @@ public class KafkaETLContext {
long tempTime = System.currentTimeMillis();
_response = _consumer.multifetch(array);
if(_response != null)
_respIterator = _response.iterator();
_requestTime += (System.currentTimeMillis() - tempTime);
return true;
@ -198,7 +200,7 @@ public class KafkaETLContext {
key.set(_index, _offset, msgAndOffset.message().checksum());
_offset += msgAndOffset.offset(); //increase offset
_offset = msgAndOffset.offset(); //increase offset
_count ++; //increase count
return true;