KAFKA-9988: Suppress uncaught exceptions in log messages during Connect task shutdown (#10503)

Uncaught exceptions logged during task stop were misleading because the task is already on its way of being shutdown.

The suppression of exception causes a change in behavior as the caller method now calls `statusListener.onShutdown` instead of `statusListener.onFailure` which is the right behavior. A new test was added to test the right behavior for uncaught exception during shutdown and existing test was modified to test uncaught exception during normal execution.

Reviewers: Chris Egerton <chrise@confluent.io>, Konstantine Karantasis <k.karantasis@gmail.com>
This commit is contained in:
kpatelatwork 2021-04-12 10:06:18 -05:00 committed by GitHub
parent 0b0eaefc60
commit 10308b7a52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 101 additions and 2 deletions

View File

@ -185,8 +185,14 @@ abstract class WorkerTask implements Runnable {
execute();
} catch (Throwable t) {
log.error("{} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted", this, t);
throw t;
if (cancelled) {
log.warn("{} After being scheduled for shutdown, the orphan task threw an uncaught exception. A newer instance of this task might be already running", this, t);
} else if (stopping) {
log.warn("{} After being scheduled for shutdown, task threw an uncaught exception.", this, t);
} else {
log.error("{} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted", this, t);
throw t;
}
} finally {
doClose();
}

View File

@ -391,8 +391,101 @@ public class WorkerSourceTaskTest extends ThreadedTest {
workerTask.initialize(TASK_CONFIG);
Future<?> taskFuture = executor.submit(workerTask);
assertTrue(awaitLatch(pollLatch));
//Failure in poll should trigger automatic stop of the worker
assertTrue(workerTask.awaitStop(1000));
taskFuture.get();
assertPollMetrics(0);
PowerMock.verifyAll();
}
@Test
public void testFailureInPollAfterCancel() throws Exception {
createWorkerTask();
sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
EasyMock.expectLastCall();
sourceTask.start(TASK_PROPS);
EasyMock.expectLastCall();
statusListener.onStartup(taskId);
EasyMock.expectLastCall();
final CountDownLatch pollLatch = new CountDownLatch(1);
final CountDownLatch workerCancelLatch = new CountDownLatch(1);
final RuntimeException exception = new RuntimeException();
EasyMock.expect(sourceTask.poll()).andAnswer(() -> {
pollLatch.countDown();
assertTrue(awaitLatch(workerCancelLatch));
throw exception;
});
offsetReader.close();
PowerMock.expectLastCall();
producer.close(Duration.ZERO);
PowerMock.expectLastCall();
sourceTask.stop();
EasyMock.expectLastCall();
expectOffsetFlush(true);
expectClose();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
Future<?> taskFuture = executor.submit(workerTask);
assertTrue(awaitLatch(pollLatch));
workerTask.cancel();
workerCancelLatch.countDown();
assertTrue(workerTask.awaitStop(1000));
taskFuture.get();
assertPollMetrics(0);
PowerMock.verifyAll();
}
@Test
public void testFailureInPollAfterStop() throws Exception {
createWorkerTask();
sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
EasyMock.expectLastCall();
sourceTask.start(TASK_PROPS);
EasyMock.expectLastCall();
statusListener.onStartup(taskId);
EasyMock.expectLastCall();
final CountDownLatch pollLatch = new CountDownLatch(1);
final CountDownLatch workerStopLatch = new CountDownLatch(1);
final RuntimeException exception = new RuntimeException();
EasyMock.expect(sourceTask.poll()).andAnswer(() -> {
pollLatch.countDown();
assertTrue(awaitLatch(workerStopLatch));
throw exception;
});
statusListener.onShutdown(taskId);
EasyMock.expectLastCall();
sourceTask.stop();
EasyMock.expectLastCall();
expectOffsetFlush(true);
expectClose();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
Future<?> taskFuture = executor.submit(workerTask);
assertTrue(awaitLatch(pollLatch));
workerTask.stop();
workerStopLatch.countDown();
assertTrue(workerTask.awaitStop(1000));
taskFuture.get();