mirror of https://github.com/apache/kafka.git
Fix sink task offset commit concurrency issue by moving it to the worker thread and waking up the consumer to ensure it exits promptly.
This commit is contained in:
parent
0aefe21a51
commit
25b5739ff0
|
@ -71,13 +71,12 @@ public class WorkerSinkTask implements WorkerTask {
|
|||
|
||||
@Override
|
||||
public void stop() throws CopycatException {
|
||||
// Offset commit is handled upon exit in work thread
|
||||
task.stop();
|
||||
commitOffsets(time.milliseconds(), true, -1, false);
|
||||
if (workThread != null) {
|
||||
workThread.startGracefulShutdown();
|
||||
}
|
||||
// Closing the consumer has to wait until we're sure the work thread has exited so it won't
|
||||
// call poll() anymore.
|
||||
consumer.wakeup();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -107,10 +106,14 @@ public class WorkerSinkTask implements WorkerTask {
|
|||
|
||||
/** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */
|
||||
public void poll(long timeoutMs) {
|
||||
log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
|
||||
ConsumerRecords<Object, Object> msgs = consumer.poll(timeoutMs);
|
||||
log.trace("{} polling returned {} messages", id, msgs.count());
|
||||
deliverMessages(msgs);
|
||||
try {
|
||||
log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
|
||||
ConsumerRecords<Object, Object> msgs = consumer.poll(timeoutMs);
|
||||
log.trace("{} polling returned {} messages", id, msgs.count());
|
||||
deliverMessages(msgs);
|
||||
} catch (ConsumerWakeupException we) {
|
||||
log.trace("{} consumer woken up", id);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -54,6 +54,8 @@ class WorkerSinkTaskThread extends ShutdownableThread {
|
|||
while (getRunning()) {
|
||||
iteration();
|
||||
}
|
||||
// Make sure any uncommitted data has committed
|
||||
task.commitOffsets(task.getTime().milliseconds(), true, -1, false);
|
||||
}
|
||||
|
||||
public void iteration() {
|
||||
|
|
|
@ -290,18 +290,10 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
|||
sinkTask.stop();
|
||||
PowerMock.expectLastCall();
|
||||
|
||||
// Triggers final offset commit
|
||||
EasyMock.expect(consumer.subscriptions()).andReturn(Collections.singleton(TOPIC_PARTITION));
|
||||
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andAnswer(new IAnswer<Long>() {
|
||||
@Override
|
||||
public Long answer() throws Throwable {
|
||||
return FIRST_OFFSET + recordsReturned - 1;
|
||||
}
|
||||
});
|
||||
final Capture<ConsumerCommitCallback> capturedCallback = EasyMock.newCapture();
|
||||
consumer.commit(EasyMock.eq(Collections.singletonMap(TOPIC_PARTITION, finalOffset)),
|
||||
EasyMock.eq(CommitType.SYNC),
|
||||
EasyMock.capture(capturedCallback));
|
||||
// No offset commit since it happens in the mocked worker thread, but the main thread does need to wake up the
|
||||
// consumer so it exits quickly
|
||||
consumer.wakeup();
|
||||
PowerMock.expectLastCall();
|
||||
|
||||
consumer.close();
|
||||
PowerMock.expectLastCall();
|
||||
|
|
Loading…
Reference in New Issue