Fix sink task offset commit concurrency issue by moving it to the worker thread and waking up the consumer to ensure it exits promptly.

This commit is contained in:
Ewen Cheslack-Postava 2015-07-30 11:38:47 -07:00
parent 0aefe21a51
commit 25b5739ff0
3 changed files with 16 additions and 19 deletions

View File

@ -71,13 +71,12 @@ public class WorkerSinkTask implements WorkerTask {
@Override @Override
public void stop() throws CopycatException { public void stop() throws CopycatException {
// Offset commit is handled upon exit in work thread
task.stop(); task.stop();
commitOffsets(time.milliseconds(), true, -1, false);
if (workThread != null) { if (workThread != null) {
workThread.startGracefulShutdown(); workThread.startGracefulShutdown();
} }
// Closing the consumer has to wait until we're sure the work thread has exited so it won't consumer.wakeup();
// call poll() anymore.
} }
@Override @Override
@ -107,10 +106,14 @@ public class WorkerSinkTask implements WorkerTask {
/** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */ /** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */
public void poll(long timeoutMs) { public void poll(long timeoutMs) {
try {
log.trace("{} polling consumer with timeout {} ms", id, timeoutMs); log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
ConsumerRecords<Object, Object> msgs = consumer.poll(timeoutMs); ConsumerRecords<Object, Object> msgs = consumer.poll(timeoutMs);
log.trace("{} polling returned {} messages", id, msgs.count()); log.trace("{} polling returned {} messages", id, msgs.count());
deliverMessages(msgs); deliverMessages(msgs);
} catch (ConsumerWakeupException we) {
log.trace("{} consumer woken up", id);
}
} }
/** /**

View File

@ -54,6 +54,8 @@ class WorkerSinkTaskThread extends ShutdownableThread {
while (getRunning()) { while (getRunning()) {
iteration(); iteration();
} }
// Make sure any uncommitted data has committed
task.commitOffsets(task.getTime().milliseconds(), true, -1, false);
} }
public void iteration() { public void iteration() {

View File

@ -290,18 +290,10 @@ public class WorkerSinkTaskTest extends ThreadedTest {
sinkTask.stop(); sinkTask.stop();
PowerMock.expectLastCall(); PowerMock.expectLastCall();
// Triggers final offset commit // No offset commit since it happens in the mocked worker thread, but the main thread does need to wake up the
EasyMock.expect(consumer.subscriptions()).andReturn(Collections.singleton(TOPIC_PARTITION)); // consumer so it exits quickly
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andAnswer(new IAnswer<Long>() { consumer.wakeup();
@Override PowerMock.expectLastCall();
public Long answer() throws Throwable {
return FIRST_OFFSET + recordsReturned - 1;
}
});
final Capture<ConsumerCommitCallback> capturedCallback = EasyMock.newCapture();
consumer.commit(EasyMock.eq(Collections.singletonMap(TOPIC_PARTITION, finalOffset)),
EasyMock.eq(CommitType.SYNC),
EasyMock.capture(capturedCallback));
consumer.close(); consumer.close();
PowerMock.expectLastCall(); PowerMock.expectLastCall();