diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java index a717af23432..228c248f4e2 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java @@ -185,8 +185,14 @@ abstract class WorkerTask implements Runnable { execute(); } catch (Throwable t) { - log.error("{} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted", this, t); - throw t; + if (cancelled) { + log.warn("{} After being scheduled for shutdown, the orphan task threw an uncaught exception. A newer instance of this task might be already running", this, t); + } else if (stopping) { + log.warn("{} After being scheduled for shutdown, task threw an uncaught exception.", this, t); + } else { + log.error("{} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted", this, t); + throw t; + } } finally { doClose(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 47098dcb0de..aefc3a94e87 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -391,8 +391,101 @@ public class WorkerSourceTaskTest extends ThreadedTest { workerTask.initialize(TASK_CONFIG); Future taskFuture = executor.submit(workerTask); + assertTrue(awaitLatch(pollLatch)); + //Failure in poll should trigger automatic stop of the worker + assertTrue(workerTask.awaitStop(1000)); + + taskFuture.get(); + assertPollMetrics(0); + + PowerMock.verifyAll(); + } + + @Test + public void testFailureInPollAfterCancel() throws Exception { + createWorkerTask(); + + sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); + EasyMock.expectLastCall(); + sourceTask.start(TASK_PROPS); + EasyMock.expectLastCall(); + statusListener.onStartup(taskId); + EasyMock.expectLastCall(); + + final CountDownLatch pollLatch = new CountDownLatch(1); + final CountDownLatch workerCancelLatch = new CountDownLatch(1); + final RuntimeException exception = new RuntimeException(); + EasyMock.expect(sourceTask.poll()).andAnswer(() -> { + pollLatch.countDown(); + assertTrue(awaitLatch(workerCancelLatch)); + throw exception; + }); + + offsetReader.close(); + PowerMock.expectLastCall(); + + producer.close(Duration.ZERO); + PowerMock.expectLastCall(); + + sourceTask.stop(); + EasyMock.expectLastCall(); + expectOffsetFlush(true); + + expectClose(); + + PowerMock.replayAll(); + + workerTask.initialize(TASK_CONFIG); + Future taskFuture = executor.submit(workerTask); + + assertTrue(awaitLatch(pollLatch)); + workerTask.cancel(); + workerCancelLatch.countDown(); + assertTrue(workerTask.awaitStop(1000)); + + taskFuture.get(); + assertPollMetrics(0); + + PowerMock.verifyAll(); + } + + @Test + public void testFailureInPollAfterStop() throws Exception { + createWorkerTask(); + + sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); + EasyMock.expectLastCall(); + sourceTask.start(TASK_PROPS); + EasyMock.expectLastCall(); + statusListener.onStartup(taskId); + EasyMock.expectLastCall(); + + final CountDownLatch pollLatch = new CountDownLatch(1); + final CountDownLatch workerStopLatch = new CountDownLatch(1); + final RuntimeException exception = new RuntimeException(); + EasyMock.expect(sourceTask.poll()).andAnswer(() -> { + pollLatch.countDown(); + assertTrue(awaitLatch(workerStopLatch)); + throw exception; + }); + + statusListener.onShutdown(taskId); + EasyMock.expectLastCall(); + + sourceTask.stop(); + EasyMock.expectLastCall(); + expectOffsetFlush(true); + + expectClose(); + + PowerMock.replayAll(); + + workerTask.initialize(TASK_CONFIG); + Future taskFuture = executor.submit(workerTask); + assertTrue(awaitLatch(pollLatch)); workerTask.stop(); + workerStopLatch.countDown(); assertTrue(workerTask.awaitStop(1000)); taskFuture.get();