mirror of https://github.com/apache/kafka.git
Fix sink task offset commit concurrency issue by moving it to the worker thread and waking up the consumer to ensure it exits promptly.
This commit is contained in:
parent
0aefe21a51
commit
25b5739ff0
|
@ -71,13 +71,12 @@ public class WorkerSinkTask implements WorkerTask {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stop() throws CopycatException {
|
public void stop() throws CopycatException {
|
||||||
|
// Offset commit is handled upon exit in work thread
|
||||||
task.stop();
|
task.stop();
|
||||||
commitOffsets(time.milliseconds(), true, -1, false);
|
|
||||||
if (workThread != null) {
|
if (workThread != null) {
|
||||||
workThread.startGracefulShutdown();
|
workThread.startGracefulShutdown();
|
||||||
}
|
}
|
||||||
// Closing the consumer has to wait until we're sure the work thread has exited so it won't
|
consumer.wakeup();
|
||||||
// call poll() anymore.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -107,10 +106,14 @@ public class WorkerSinkTask implements WorkerTask {
|
||||||
|
|
||||||
/** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */
|
/** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */
|
||||||
public void poll(long timeoutMs) {
|
public void poll(long timeoutMs) {
|
||||||
|
try {
|
||||||
log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
|
log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
|
||||||
ConsumerRecords<Object, Object> msgs = consumer.poll(timeoutMs);
|
ConsumerRecords<Object, Object> msgs = consumer.poll(timeoutMs);
|
||||||
log.trace("{} polling returned {} messages", id, msgs.count());
|
log.trace("{} polling returned {} messages", id, msgs.count());
|
||||||
deliverMessages(msgs);
|
deliverMessages(msgs);
|
||||||
|
} catch (ConsumerWakeupException we) {
|
||||||
|
log.trace("{} consumer woken up", id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -54,6 +54,8 @@ class WorkerSinkTaskThread extends ShutdownableThread {
|
||||||
while (getRunning()) {
|
while (getRunning()) {
|
||||||
iteration();
|
iteration();
|
||||||
}
|
}
|
||||||
|
// Make sure any uncommitted data has committed
|
||||||
|
task.commitOffsets(task.getTime().milliseconds(), true, -1, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void iteration() {
|
public void iteration() {
|
||||||
|
|
|
@ -290,18 +290,10 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
||||||
sinkTask.stop();
|
sinkTask.stop();
|
||||||
PowerMock.expectLastCall();
|
PowerMock.expectLastCall();
|
||||||
|
|
||||||
// Triggers final offset commit
|
// No offset commit since it happens in the mocked worker thread, but the main thread does need to wake up the
|
||||||
EasyMock.expect(consumer.subscriptions()).andReturn(Collections.singleton(TOPIC_PARTITION));
|
// consumer so it exits quickly
|
||||||
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andAnswer(new IAnswer<Long>() {
|
consumer.wakeup();
|
||||||
@Override
|
PowerMock.expectLastCall();
|
||||||
public Long answer() throws Throwable {
|
|
||||||
return FIRST_OFFSET + recordsReturned - 1;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
final Capture<ConsumerCommitCallback> capturedCallback = EasyMock.newCapture();
|
|
||||||
consumer.commit(EasyMock.eq(Collections.singletonMap(TOPIC_PARTITION, finalOffset)),
|
|
||||||
EasyMock.eq(CommitType.SYNC),
|
|
||||||
EasyMock.capture(capturedCallback));
|
|
||||||
|
|
||||||
consumer.close();
|
consumer.close();
|
||||||
PowerMock.expectLastCall();
|
PowerMock.expectLastCall();
|
||||||
|
|
Loading…
Reference in New Issue