diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index e39a7e2b4b3..a0664adff8f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -71,10 +71,10 @@ public class Worker { private final Converter internalKeyConverter; private final Converter internalValueConverter; private final OffsetBackingStore offsetBackingStore; + private final Map producerProps; private HashMap connectors = new HashMap<>(); private HashMap tasks = new HashMap<>(); - private KafkaProducer producer; private SourceTaskOffsetCommitter sourceTaskOffsetCommitter; public Worker(String workerId, @@ -96,16 +96,11 @@ public class Worker { this.offsetBackingStore = offsetBackingStore; this.offsetBackingStore.configure(config); - } - public void start() { - log.info("Worker starting"); - - Map producerProps = new HashMap<>(); + producerProps = new HashMap<>(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",")); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - // These settings are designed to ensure there is no data loss. They *may* be overridden via configs passed to the // worker, but this may compromise the delivery guarantees of Kafka Connect. producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, ((Integer) Integer.MAX_VALUE).toString()); @@ -113,10 +108,12 @@ public class Worker { producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, ((Long) Long.MAX_VALUE).toString()); producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); - + // User-specified overrides producerProps.putAll(config.originalsWithPrefix("producer.")); + } - producer = new KafkaProducer<>(producerProps); + public void start() { + log.info("Worker starting"); offsetBackingStore.start(); sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(config); @@ -349,6 +346,7 @@ public class Worker { internalKeyConverter, internalValueConverter); OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(), internalKeyConverter, internalValueConverter); + KafkaProducer producer = new KafkaProducer<>(producerProps); return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter, producer, offsetReader, offsetWriter, config, time); } else if (task instanceof SinkTask) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index 6d91b362a36..e4ce9e07013 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -115,7 +115,7 @@ class WorkerSourceTask extends WorkerTask { } protected void close() { - // nothing to do + producer.close(); } @Override diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 9854f225aa2..094adc50057 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -156,6 +156,9 @@ public class WorkerSourceTaskTest extends ThreadedTest { statusListener.onShutdown(taskId); EasyMock.expectLastCall(); + producer.close(); + EasyMock.expectLastCall(); + PowerMock.replayAll(); workerTask.initialize(TASK_CONFIG); @@ -195,6 +198,9 @@ public class WorkerSourceTaskTest extends ThreadedTest { statusListener.onShutdown(taskId); EasyMock.expectLastCall(); + producer.close(); + EasyMock.expectLastCall(); + PowerMock.replayAll(); workerTask.initialize(TASK_CONFIG); @@ -238,6 +244,9 @@ public class WorkerSourceTaskTest extends ThreadedTest { statusListener.onShutdown(taskId); EasyMock.expectLastCall(); + producer.close(); + EasyMock.expectLastCall(); + PowerMock.replayAll(); workerTask.initialize(TASK_CONFIG); @@ -280,6 +289,9 @@ public class WorkerSourceTaskTest extends ThreadedTest { EasyMock.expectLastCall(); expectOffsetFlush(true); + producer.close(); + EasyMock.expectLastCall(); + PowerMock.replayAll(); workerTask.initialize(TASK_CONFIG); @@ -317,6 +329,8 @@ public class WorkerSourceTaskTest extends ThreadedTest { statusListener.onShutdown(taskId); EasyMock.expectLastCall(); + producer.close(); + EasyMock.expectLastCall(); PowerMock.replayAll(); @@ -356,6 +370,9 @@ public class WorkerSourceTaskTest extends ThreadedTest { statusListener.onShutdown(taskId); EasyMock.expectLastCall(); + producer.close(); + EasyMock.expectLastCall(); + PowerMock.replayAll(); workerTask.initialize(TASK_CONFIG); @@ -499,6 +516,9 @@ public class WorkerSourceTaskTest extends ThreadedTest { statusListener.onShutdown(taskId); EasyMock.expectLastCall(); + producer.close(); + EasyMock.expectLastCall(); + PowerMock.replayAll(); workerTask.initialize(TASK_CONFIG);