MINOR: small code optimizations in streams

guozhangwang

Author: Yasuhiro Matsuda <yasuhiro@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #1176 from ymatsuda/optimize
This commit is contained in:
Yasuhiro Matsuda 2016-04-01 17:14:29 -07:00 committed by Guozhang Wang
parent 75ec67eda8
commit bd5325dd8b
3 changed files with 24 additions and 29 deletions

View File

@ -49,6 +49,10 @@ public class PartitionGroup {
public TopicPartition partition() {
return queue.partition();
}
public RecordQueue queue() {
return queue;
}
}
// since task is thread-safe, we do not need to synchronize on local variables
@ -88,7 +92,7 @@ public class PartitionGroup {
// get the first record from this queue.
record = queue.poll();
if (queue.size() > 0) {
if (!queue.isEmpty()) {
queuesByTime.offer(queue);
}
}

View File

@ -179,7 +179,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
// after processing this record, if its partition queue's buffered size has been
// decreased to the threshold, we can then resume the consumption on this partition
if (partitionGroup.numBuffered(partition) == this.maxBufferedSize) {
if (recordInfo.queue().size() == this.maxBufferedSize) {
consumer.resume(singleton(partition));
requiresPoll = true;
}
@ -320,13 +320,13 @@ public class StreamTask extends AbstractTask implements Punctuator {
@SuppressWarnings("unchecked")
public <K, V> void forward(K key, V value) {
ProcessorNode thisNode = currNode;
for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
currNode = childNode;
try {
try {
for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
currNode = childNode;
childNode.process(key, value);
} finally {
currNode = thisNode;
}
} finally {
currNode = thisNode;
}
}

View File

@ -350,9 +350,12 @@ public class StreamThread extends Thread {
requiresPoll = requiresPoll || task.requiresPoll();
sensors.processTimeSensor.record(time.milliseconds() - startProcess);
}
maybePunctuate();
maybePunctuate(task);
if (task.commitNeeded())
commitOne(task, time.milliseconds());
}
// if pollTimeMs has passed since the last poll, we poll to respond to a possible rebalance
// even when we paused all partitions.
@ -424,18 +427,16 @@ public class StreamThread extends Thread {
return true;
}
private void maybePunctuate() {
for (StreamTask task : activeTasks.values()) {
try {
long now = time.milliseconds();
private void maybePunctuate(StreamTask task) {
try {
long now = time.milliseconds();
if (task.maybePunctuate(now))
sensors.punctuateTimeSensor.record(time.milliseconds() - now);
if (task.maybePunctuate(now))
sensors.punctuateTimeSensor.record(time.milliseconds() - now);
} catch (KafkaException e) {
log.error("Failed to punctuate active task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
throw e;
}
} catch (KafkaException e) {
log.error("Failed to punctuate active task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
throw e;
}
}
@ -449,16 +450,6 @@ public class StreamThread extends Thread {
lastCommit = now;
processStandbyRecords = true;
} else {
for (StreamTask task : activeTasks.values()) {
try {
if (task.commitNeeded())
commitOne(task, time.milliseconds());
} catch (KafkaException e) {
log.error("Failed to commit active task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
throw e;
}
}
}
}