KAFKA-14007: Close header converters during Connect task shutdown (#12309)

The HeaderConverter interface extends Closeable, but we weren't closing them anywhere before. This change causes header converters to be closed as part of task shutdown.

Reviewers: Kvicii <42023367+Kvicii@users.noreply.github.com>, Chris Egerton <fearthecellos@gmail.com>
This commit is contained in:
vamossagar12 2022-07-28 08:01:07 +05:30 committed by GitHub
parent 06f47c3b51
commit 0c5f5a7f8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 42 additions and 2 deletions

View File

@ -20,6 +20,7 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
@ -104,6 +105,7 @@ public class StringConverter implements Converter, HeaderConverter {
@Override
public void close() {
// do nothing
Utils.closeQuietly(this.serializer, "string converter serializer");
Utils.closeQuietly(this.deserializer, "string converter deserializer");
}
}

View File

@ -26,6 +26,7 @@ import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
@ -282,7 +283,8 @@ public class JsonConverter implements Converter, HeaderConverter {
@Override
public void close() {
// do nothing
Utils.closeQuietly(this.serializer, "JSON converter serializer");
Utils.closeQuietly(this.deserializer, "JSON converter deserializer");
}
@Override

View File

@ -20,6 +20,7 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.DataException;
@ -122,5 +123,7 @@ abstract class NumberConverter<T extends Number> implements Converter, HeaderCon
@Override
public void close() {
Utils.closeQuietly(this.serializer, "number converter serializer");
Utils.closeQuietly(this.deserializer, "number converter deserializer");
}
}

View File

@ -313,6 +313,7 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask {
Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
Utils.closeQuietly(offsetReader, "offset reader");
Utils.closeQuietly(offsetStore::stop, "offset backing store");
Utils.closeQuietly(headerConverter, "header converter");
}
private void closeProducer(Duration duration) {

View File

@ -176,6 +176,7 @@ class WorkerSinkTask extends WorkerTask {
Utils.closeQuietly(consumer, "consumer");
Utils.closeQuietly(transformationChain, "transformation chain");
Utils.closeQuietly(retryWithToleranceOperator, "retry operator");
Utils.closeQuietly(headerConverter, "header converter");
}
@Override

View File

@ -77,6 +77,7 @@ import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
@ -244,6 +245,9 @@ public class ErrorHandlingTaskTest {
consumer.close();
EasyMock.expectLastCall();
headerConverter.close();
EasyMock.expectLastCall();
PowerMock.replayAll();
workerSinkTask.initialize(TASK_CONFIG);
@ -541,6 +545,13 @@ public class ErrorHandlingTaskTest {
offsetStore.stop();
EasyMock.expectLastCall();
try {
headerConverter.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
EasyMock.expectLastCall();
}
private void expectTopicCreation(String topic) {

View File

@ -1311,6 +1311,7 @@ public class ExactlyOnceWorkerSourceTaskTest extends ThreadedTest {
expectCall(() -> admin.close(EasyMock.anyObject(Duration.class)));
expectCall(transformationChain::close);
expectCall(offsetReader::close);
expectCall(headerConverter::close);
}
private void expectTopicCreation(String topic) {

View File

@ -348,6 +348,9 @@ public class WorkerSinkTaskTest {
transformationChain.close();
PowerMock.expectLastCall();
headerConverter.close();
PowerMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);

View File

@ -56,6 +56,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
@ -552,6 +553,13 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
consumer.close();
PowerMock.expectLastCall();
try {
headerConverter.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
PowerMock.expectLastCall();
}
// Note that this can only be called once per test currently

View File

@ -68,6 +68,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import org.powermock.reflect.Whitebox;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
@ -1091,6 +1092,13 @@ public class WorkerSourceTaskTest extends ThreadedTest {
offsetStore.stop();
EasyMock.expectLastCall();
try {
headerConverter.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
EasyMock.expectLastCall();
}
private void expectTopicCreation(String topic) {