From 62fa8fc9a95d738780d1f73d2d758d7329828feb Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Fri, 15 May 2020 17:53:32 -0700 Subject: [PATCH] KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions (#8618) * If two exceptions are thrown the `closePartitions` exception is suppressed * Add unit tests that throw exceptions in put and close to verify that the exceptions are propagated and suppressed appropriately out of WorkerSinkTask::execute Reviewers: Chris Egerton , Nigel Liang , Konstantine Karantasis --- .../org/apache/kafka/common/utils/Utils.java | 12 +++ .../kafka/connect/runtime/WorkerSinkTask.java | 9 +- .../connect/runtime/WorkerSinkTaskTest.java | 86 +++++++++++++++++++ 3 files changed, 102 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 9da8e52f5f1..7b47a1a3553 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -885,6 +885,18 @@ public final class Utils { throw exception; } + /** + * An {@link AutoCloseable} interface without a throws clause in the signature + * + * This is used with lambda expressions in try-with-resources clauses + * to avoid casting un-checked exceptions to checked exceptions unnecessarily. + */ + @FunctionalInterface + public interface UncheckedCloseable extends AutoCloseable { + @Override + void close(); + } + /** * Closes {@code closeable} and if an exception is thrown, it is logged at the WARN level. */ diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 659dadfd873..3a8c8d48402 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.metrics.stats.Value; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.common.utils.Utils.UncheckedCloseable; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; @@ -193,13 +194,11 @@ class WorkerSinkTask extends WorkerTask { @Override public void execute() { initializeAndStart(); - try { + // Make sure any uncommitted data has been committed and the task has + // a chance to clean up its state + try (UncheckedCloseable suppressible = this::closePartitions) { while (!isStopping()) iteration(); - } finally { - // Make sure any uncommitted data has been committed and the task has - // a chance to clean up its state - closePartitions(); } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index 285cbbe50b5..5dc2f44fecb 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; @@ -86,6 +87,7 @@ import java.util.regex.Pattern; import static java.util.Arrays.asList; import static java.util.Collections.singleton; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -856,6 +858,90 @@ public class WorkerSinkTaskTest { PowerMock.verifyAll(); } + @Test + public void testSinkTasksHandleCloseErrors() throws Exception { + createTask(initialState); + expectInitializeTask(); + expectTaskGetTopic(true); + + // Put one message through the task to get some offsets to commit + expectConsumerPoll(1); + expectConversionAndTransformation(1); + sinkTask.put(EasyMock.anyObject()); + PowerMock.expectLastCall().andVoid(); + + // Stop the task during the next put + expectConsumerPoll(1); + expectConversionAndTransformation(1); + sinkTask.put(EasyMock.anyObject()); + PowerMock.expectLastCall().andAnswer(() -> { + workerTask.stop(); + return null; + }); + + consumer.wakeup(); + PowerMock.expectLastCall(); + + // Throw another exception while closing the task's assignment + EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject())) + .andStubReturn(Collections.emptyMap()); + Throwable closeException = new RuntimeException(); + sinkTask.close(EasyMock.anyObject()); + PowerMock.expectLastCall().andThrow(closeException); + + PowerMock.replayAll(); + + workerTask.initialize(TASK_CONFIG); + try { + workerTask.execute(); + fail("workerTask.execute should have thrown an exception"); + } catch (RuntimeException e) { + PowerMock.verifyAll(); + assertSame("Exception from close should propagate as-is", closeException, e); + } + } + + @Test + public void testSuppressCloseErrors() throws Exception { + createTask(initialState); + expectInitializeTask(); + expectTaskGetTopic(true); + + // Put one message through the task to get some offsets to commit + expectConsumerPoll(1); + expectConversionAndTransformation(1); + sinkTask.put(EasyMock.anyObject()); + PowerMock.expectLastCall().andVoid(); + + // Throw an exception on the next put to trigger shutdown behavior + // This exception is the true "cause" of the failure + expectConsumerPoll(1); + expectConversionAndTransformation(1); + Throwable putException = new RuntimeException(); + sinkTask.put(EasyMock.anyObject()); + PowerMock.expectLastCall().andThrow(putException); + + // Throw another exception while closing the task's assignment + EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject())) + .andStubReturn(Collections.emptyMap()); + Throwable closeException = new RuntimeException(); + sinkTask.close(EasyMock.anyObject()); + PowerMock.expectLastCall().andThrow(closeException); + + PowerMock.replayAll(); + + workerTask.initialize(TASK_CONFIG); + try { + workerTask.execute(); + fail("workerTask.execute should have thrown an exception"); + } catch (ConnectException e) { + PowerMock.verifyAll(); + assertSame("Exception from put should be the cause", putException, e.getCause()); + assertTrue("Exception from close should be suppressed", e.getSuppressed().length > 0); + assertSame(closeException, e.getSuppressed()[0]); + } + } + // Verify that when commitAsync is called but the supplied callback is not called by the consumer before a // rebalance occurs, the async callback does not reset the last committed offset from the rebalance. // See KAFKA-5731 for more information.