MINOR: Wakeups propagated from commitOffsets in WorkerSinkTask should be caught

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1907 from hachikuji/catch-wakeup-worker-sink-task

(cherry picked from commit b75245cfbb)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>
This commit is contained in:
Jason Gustafson 2016-09-26 14:54:01 -07:00 committed by Ewen Cheslack-Postava
parent 4104f014e9
commit d6b3ff142a
2 changed files with 62 additions and 62 deletions

View File

@ -149,26 +149,39 @@ class WorkerSinkTask extends WorkerTask {
}
protected void iteration() {
long now = time.milliseconds();
try {
long now = time.milliseconds();
// Maybe commit
if (!committing && now >= nextCommit) {
commitOffsets(now, false);
nextCommit += workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
// Maybe commit
if (!committing && now >= nextCommit) {
commitOffsets(now, false);
nextCommit += workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
}
// Check for timed out commits
long commitTimeout = commitStarted + workerConfig.getLong(
WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
if (committing && now >= commitTimeout) {
log.warn("Commit of {} offsets timed out", this);
commitFailures++;
committing = false;
}
// And process messages
long timeoutMs = Math.max(nextCommit - now, 0);
poll(timeoutMs);
} catch (WakeupException we) {
log.trace("{} consumer woken up", id);
if (isStopping())
return;
if (shouldPause()) {
pauseAll();
} else if (!pausedForRedelivery) {
resumeAll();
}
}
// Check for timed out commits
long commitTimeout = commitStarted + workerConfig.getLong(
WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
if (committing && now >= commitTimeout) {
log.warn("Commit of {} offsets timed out", this);
commitFailures++;
committing = false;
}
// And process messages
long timeoutMs = Math.max(nextCommit - now, 0);
poll(timeoutMs);
}
private void onCommitCompleted(Throwable error, long seqno) {
@ -211,33 +224,20 @@ class WorkerSinkTask extends WorkerTask {
/** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */
protected void poll(long timeoutMs) {
try {
rewind();
long retryTimeout = context.timeout();
if (retryTimeout > 0) {
timeoutMs = Math.min(timeoutMs, retryTimeout);
context.timeout(-1L);
}
log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs);
assert messageBatch.isEmpty() || msgs.isEmpty();
log.trace("{} polling returned {} messages", id, msgs.count());
convertMessages(msgs);
deliverMessages();
} catch (WakeupException we) {
log.trace("{} consumer woken up", id);
if (isStopping())
return;
if (shouldPause()) {
pauseAll();
} else if (!pausedForRedelivery) {
resumeAll();
}
rewind();
long retryTimeout = context.timeout();
if (retryTimeout > 0) {
timeoutMs = Math.min(timeoutMs, retryTimeout);
context.timeout(-1L);
}
log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs);
assert messageBatch.isEmpty() || msgs.isEmpty();
log.trace("{} polling returned {} messages", id, msgs.count());
convertMessages(msgs);
deliverMessages();
}
private void doCommitSync(Map<TopicPartition, OffsetAndMetadata> offsets, int seqno) {

View File

@ -148,7 +148,7 @@ public class WorkerSinkTaskTest {
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.poll(Long.MAX_VALUE);
workerTask.iteration();
PowerMock.verifyAll();
}
@ -197,14 +197,14 @@ public class WorkerSinkTaskTest {
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.poll(Long.MAX_VALUE); // initial assignment
workerTask.poll(Long.MAX_VALUE); // fetch some data
workerTask.iteration(); // initial assignment
workerTask.iteration(); // fetch some data
workerTask.transitionTo(TargetState.PAUSED);
workerTask.poll(Long.MAX_VALUE); // wakeup
workerTask.poll(Long.MAX_VALUE); // now paused
workerTask.iteration(); // wakeup
workerTask.iteration(); // now paused
workerTask.transitionTo(TargetState.STARTED);
workerTask.poll(Long.MAX_VALUE); // wakeup
workerTask.poll(Long.MAX_VALUE); // now unpaused
workerTask.iteration(); // wakeup
workerTask.iteration(); // now unpaused
PowerMock.verifyAll();
}
@ -241,9 +241,9 @@ public class WorkerSinkTaskTest {
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.poll(Long.MAX_VALUE);
workerTask.poll(Long.MAX_VALUE);
workerTask.poll(Long.MAX_VALUE);
workerTask.iteration();
workerTask.iteration();
workerTask.iteration();
PowerMock.verifyAll();
}
@ -260,9 +260,9 @@ public class WorkerSinkTaskTest {
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.poll(Long.MAX_VALUE);
workerTask.iteration();
try {
workerTask.poll(Long.MAX_VALUE);
workerTask.iteration();
fail("Poll should have raised the rebalance exception");
} catch (RuntimeException e) {
assertEquals(exception, e);
@ -283,9 +283,9 @@ public class WorkerSinkTaskTest {
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.poll(Long.MAX_VALUE);
workerTask.iteration();
try {
workerTask.poll(Long.MAX_VALUE);
workerTask.iteration();
fail("Poll should have raised the rebalance exception");
} catch (RuntimeException e) {
assertEquals(exception, e);
@ -343,8 +343,8 @@ public class WorkerSinkTaskTest {
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.poll(Long.MAX_VALUE); // poll for initial assignment
workerTask.poll(Long.MAX_VALUE); // now rebalance with the wakeup triggered
workerTask.iteration(); // poll for initial assignment
workerTask.iteration(); // now rebalance with the wakeup triggered
PowerMock.verifyAll();
}
@ -363,7 +363,7 @@ public class WorkerSinkTaskTest {
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.poll(Long.MAX_VALUE);
workerTask.iteration();
SinkRecord record = records.getValue().iterator().next();
@ -391,7 +391,7 @@ public class WorkerSinkTaskTest {
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.poll(Long.MAX_VALUE);
workerTask.iteration();
SinkRecord record = records.getValue().iterator().next();