KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions (#8618)

* If two exceptions are thrown the `closePartitions` exception is suppressed
* Add unit tests that throw exceptions in put and close to verify that
  the exceptions are propagated and suppressed appropriately out of WorkerSinkTask::execute

Reviewers: Chris Egerton <chrise@confluent.io>, Nigel Liang <nigel@nigelliang.com>, Konstantine Karantasis <konstantine@confluent.io>
This commit is contained in:
Greg Harris 2020-05-15 17:53:32 -07:00 committed by GitHub
parent 8a0fcd1695
commit 62fa8fc9a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 102 additions and 5 deletions

View File

@ -885,6 +885,18 @@ public final class Utils {
throw exception;
}
/**
* An {@link AutoCloseable} interface without a throws clause in the signature
*
* This is used with lambda expressions in try-with-resources clauses
* to avoid casting un-checked exceptions to checked exceptions unnecessarily.
*/
@FunctionalInterface
public interface UncheckedCloseable extends AutoCloseable {
@Override
void close();
}
/**
* Closes {@code closeable} and if an exception is thrown, it is logged at the WARN level.
*/

View File

@ -33,6 +33,7 @@ import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.utils.Utils.UncheckedCloseable;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
@ -193,13 +194,11 @@ class WorkerSinkTask extends WorkerTask {
@Override
public void execute() {
initializeAndStart();
try {
while (!isStopping())
iteration();
} finally {
// Make sure any uncommitted data has been committed and the task has
// a chance to clean up its state
closePartitions();
try (UncheckedCloseable suppressible = this::closePartitions) {
while (!isStopping())
iteration();
}
}

View File

@ -36,6 +36,7 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
@ -86,6 +87,7 @@ import java.util.regex.Pattern;
import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
@ -856,6 +858,90 @@ public class WorkerSinkTaskTest {
PowerMock.verifyAll();
}
@Test
public void testSinkTasksHandleCloseErrors() throws Exception {
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
// Put one message through the task to get some offsets to commit
expectConsumerPoll(1);
expectConversionAndTransformation(1);
sinkTask.put(EasyMock.anyObject());
PowerMock.expectLastCall().andVoid();
// Stop the task during the next put
expectConsumerPoll(1);
expectConversionAndTransformation(1);
sinkTask.put(EasyMock.anyObject());
PowerMock.expectLastCall().andAnswer(() -> {
workerTask.stop();
return null;
});
consumer.wakeup();
PowerMock.expectLastCall();
// Throw another exception while closing the task's assignment
EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject()))
.andStubReturn(Collections.emptyMap());
Throwable closeException = new RuntimeException();
sinkTask.close(EasyMock.anyObject());
PowerMock.expectLastCall().andThrow(closeException);
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
try {
workerTask.execute();
fail("workerTask.execute should have thrown an exception");
} catch (RuntimeException e) {
PowerMock.verifyAll();
assertSame("Exception from close should propagate as-is", closeException, e);
}
}
@Test
public void testSuppressCloseErrors() throws Exception {
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
// Put one message through the task to get some offsets to commit
expectConsumerPoll(1);
expectConversionAndTransformation(1);
sinkTask.put(EasyMock.anyObject());
PowerMock.expectLastCall().andVoid();
// Throw an exception on the next put to trigger shutdown behavior
// This exception is the true "cause" of the failure
expectConsumerPoll(1);
expectConversionAndTransformation(1);
Throwable putException = new RuntimeException();
sinkTask.put(EasyMock.anyObject());
PowerMock.expectLastCall().andThrow(putException);
// Throw another exception while closing the task's assignment
EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject()))
.andStubReturn(Collections.emptyMap());
Throwable closeException = new RuntimeException();
sinkTask.close(EasyMock.anyObject());
PowerMock.expectLastCall().andThrow(closeException);
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
try {
workerTask.execute();
fail("workerTask.execute should have thrown an exception");
} catch (ConnectException e) {
PowerMock.verifyAll();
assertSame("Exception from put should be the cause", putException, e.getCause());
assertTrue("Exception from close should be suppressed", e.getSuppressed().length > 0);
assertSame(closeException, e.getSuppressed()[0]);
}
}
// Verify that when commitAsync is called but the supplied callback is not called by the consumer before a
// rebalance occurs, the async callback does not reset the last committed offset from the rebalance.
// See KAFKA-5731 for more information.