Fix sink task offset commit concurrency issue by moving it to the worker thread and waking up the consumer to ensure it exits promptly.

This commit is contained in:
Ewen Cheslack-Postava 2015-07-30 11:38:47 -07:00
parent 0aefe21a51
commit 25b5739ff0
3 changed files with 16 additions and 19 deletions

View File

@ -71,13 +71,12 @@ public class WorkerSinkTask implements WorkerTask {
@Override
public void stop() throws CopycatException {
// Offset commit is handled upon exit in work thread
task.stop();
commitOffsets(time.milliseconds(), true, -1, false);
if (workThread != null) {
workThread.startGracefulShutdown();
}
// Closing the consumer has to wait until we're sure the work thread has exited so it won't
// call poll() anymore.
consumer.wakeup();
}
@Override
@ -107,10 +106,14 @@ public class WorkerSinkTask implements WorkerTask {
/** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */
public void poll(long timeoutMs) {
log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
ConsumerRecords<Object, Object> msgs = consumer.poll(timeoutMs);
log.trace("{} polling returned {} messages", id, msgs.count());
deliverMessages(msgs);
try {
log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
ConsumerRecords<Object, Object> msgs = consumer.poll(timeoutMs);
log.trace("{} polling returned {} messages", id, msgs.count());
deliverMessages(msgs);
} catch (ConsumerWakeupException we) {
log.trace("{} consumer woken up", id);
}
}
/**

View File

@ -54,6 +54,8 @@ class WorkerSinkTaskThread extends ShutdownableThread {
while (getRunning()) {
iteration();
}
// Make sure any uncommitted data has committed
task.commitOffsets(task.getTime().milliseconds(), true, -1, false);
}
public void iteration() {

View File

@ -290,18 +290,10 @@ public class WorkerSinkTaskTest extends ThreadedTest {
sinkTask.stop();
PowerMock.expectLastCall();
// Triggers final offset commit
EasyMock.expect(consumer.subscriptions()).andReturn(Collections.singleton(TOPIC_PARTITION));
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andAnswer(new IAnswer<Long>() {
@Override
public Long answer() throws Throwable {
return FIRST_OFFSET + recordsReturned - 1;
}
});
final Capture<ConsumerCommitCallback> capturedCallback = EasyMock.newCapture();
consumer.commit(EasyMock.eq(Collections.singletonMap(TOPIC_PARTITION, finalOffset)),
EasyMock.eq(CommitType.SYNC),
EasyMock.capture(capturedCallback));
// No offset commit since it happens in the mocked worker thread, but the main thread does need to wake up the
// consumer so it exits quickly
consumer.wakeup();
PowerMock.expectLastCall();
consumer.close();
PowerMock.expectLastCall();