KAFKA-10188: Prevent SinkTask::preCommit from being called after SinkTask::stop (#8910)

This commit is contained in:
Chris Egerton 2020-10-06 14:18:54 -04:00 committed by Randall Hauch
parent b926ccbbd1
commit 47c517931c
2 changed files with 58 additions and 7 deletions

View File

@ -93,6 +93,7 @@ class WorkerSinkTask extends WorkerTask {
private int commitFailures;
private boolean pausedForRedelivery;
private boolean committing;
private boolean taskStopped;
public WorkerSinkTask(ConnectorTaskId id,
SinkTask task,
@ -135,6 +136,7 @@ class WorkerSinkTask extends WorkerTask {
this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);
this.consumer = consumer;
this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
this.taskStopped = false;
}
@Override
@ -164,13 +166,8 @@ class WorkerSinkTask extends WorkerTask {
} catch (Throwable t) {
log.warn("Could not stop task", t);
}
if (consumer != null) {
try {
consumer.close();
} catch (Throwable t) {
log.warn("Could not close consumer", t);
}
}
taskStopped = true;
Utils.closeQuietly(consumer, "consumer");
try {
transformationChain.close();
} catch (Throwable t) {
@ -672,6 +669,10 @@ class WorkerSinkTask extends WorkerTask {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
if (taskStopped) {
log.trace("Skipping partition revocation callback as task has already been stopped");
return;
}
log.debug("{} Partitions revoked", WorkerSinkTask.this);
try {
closePartitions();

View File

@ -315,6 +315,56 @@ public class WorkerSinkTaskTest {
PowerMock.verifyAll();
}
@Test
public void testShutdown() throws Exception {
createTask(initialState);
expectInitializeTask();
expectTaskGetTopic(true);
// first iteration
expectPollInitialAssignment();
// second iteration
EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject())).andReturn(Collections.emptyMap());
expectConsumerPoll(1);
expectConversionAndTransformation(1);
sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
EasyMock.expectLastCall();
// WorkerSinkTask::stop
consumer.wakeup();
PowerMock.expectLastCall();
sinkTask.stop();
PowerMock.expectLastCall();
// WorkerSinkTask::close
consumer.close();
PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
@Override
public Object answer() throws Throwable {
rebalanceListener.getValue().onPartitionsRevoked(
asList(TOPIC_PARTITION, TOPIC_PARTITION2)
);
return null;
}
});
transformationChain.close();
PowerMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration();
sinkTaskContext.getValue().requestCommit(); // Force an offset commit
workerTask.iteration();
workerTask.stop();
workerTask.close();
PowerMock.verifyAll();
}
@Test
public void testPollRedelivery() throws Exception {
createTask(initialState);