mirror of https://github.com/apache/kafka.git
KAFKA-3847: Use a separate producer per source task
This commit is contained in:
parent
39431f7347
commit
98ec7f69bb
|
|
@ -71,10 +71,10 @@ public class Worker {
|
|||
private final Converter internalKeyConverter;
|
||||
private final Converter internalValueConverter;
|
||||
private final OffsetBackingStore offsetBackingStore;
|
||||
private final Map<String, Object> producerProps;
|
||||
|
||||
private HashMap<String, WorkerConnector> connectors = new HashMap<>();
|
||||
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
|
||||
private KafkaProducer<byte[], byte[]> producer;
|
||||
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
|
||||
|
||||
public Worker(String workerId,
|
||||
|
|
@ -96,16 +96,11 @@ public class Worker {
|
|||
|
||||
this.offsetBackingStore = offsetBackingStore;
|
||||
this.offsetBackingStore.configure(config);
|
||||
}
|
||||
|
||||
public void start() {
|
||||
log.info("Worker starting");
|
||||
|
||||
Map<String, Object> producerProps = new HashMap<>();
|
||||
producerProps = new HashMap<>();
|
||||
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
|
||||
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
|
||||
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
|
||||
|
||||
// These settings are designed to ensure there is no data loss. They *may* be overridden via configs passed to the
|
||||
// worker, but this may compromise the delivery guarantees of Kafka Connect.
|
||||
producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, ((Integer) Integer.MAX_VALUE).toString());
|
||||
|
|
@ -113,10 +108,12 @@ public class Worker {
|
|||
producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, ((Long) Long.MAX_VALUE).toString());
|
||||
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
|
||||
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
|
||||
|
||||
// User-specified overrides
|
||||
producerProps.putAll(config.originalsWithPrefix("producer."));
|
||||
}
|
||||
|
||||
producer = new KafkaProducer<>(producerProps);
|
||||
public void start() {
|
||||
log.info("Worker starting");
|
||||
|
||||
offsetBackingStore.start();
|
||||
sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(config);
|
||||
|
|
@ -349,6 +346,7 @@ public class Worker {
|
|||
internalKeyConverter, internalValueConverter);
|
||||
OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
|
||||
internalKeyConverter, internalValueConverter);
|
||||
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
|
||||
return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter,
|
||||
valueConverter, producer, offsetReader, offsetWriter, config, time);
|
||||
} else if (task instanceof SinkTask) {
|
||||
|
|
|
|||
|
|
@ -115,7 +115,7 @@ class WorkerSourceTask extends WorkerTask {
|
|||
}
|
||||
|
||||
protected void close() {
|
||||
// nothing to do
|
||||
producer.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -156,6 +156,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
|||
statusListener.onShutdown(taskId);
|
||||
EasyMock.expectLastCall();
|
||||
|
||||
producer.close();
|
||||
EasyMock.expectLastCall();
|
||||
|
||||
PowerMock.replayAll();
|
||||
|
||||
workerTask.initialize(TASK_CONFIG);
|
||||
|
|
@ -195,6 +198,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
|||
statusListener.onShutdown(taskId);
|
||||
EasyMock.expectLastCall();
|
||||
|
||||
producer.close();
|
||||
EasyMock.expectLastCall();
|
||||
|
||||
PowerMock.replayAll();
|
||||
|
||||
workerTask.initialize(TASK_CONFIG);
|
||||
|
|
@ -238,6 +244,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
|||
statusListener.onShutdown(taskId);
|
||||
EasyMock.expectLastCall();
|
||||
|
||||
producer.close();
|
||||
EasyMock.expectLastCall();
|
||||
|
||||
PowerMock.replayAll();
|
||||
|
||||
workerTask.initialize(TASK_CONFIG);
|
||||
|
|
@ -280,6 +289,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
|||
EasyMock.expectLastCall();
|
||||
expectOffsetFlush(true);
|
||||
|
||||
producer.close();
|
||||
EasyMock.expectLastCall();
|
||||
|
||||
PowerMock.replayAll();
|
||||
|
||||
workerTask.initialize(TASK_CONFIG);
|
||||
|
|
@ -317,6 +329,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
|||
statusListener.onShutdown(taskId);
|
||||
EasyMock.expectLastCall();
|
||||
|
||||
producer.close();
|
||||
EasyMock.expectLastCall();
|
||||
|
||||
PowerMock.replayAll();
|
||||
|
||||
|
|
@ -356,6 +370,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
|||
statusListener.onShutdown(taskId);
|
||||
EasyMock.expectLastCall();
|
||||
|
||||
producer.close();
|
||||
EasyMock.expectLastCall();
|
||||
|
||||
PowerMock.replayAll();
|
||||
|
||||
workerTask.initialize(TASK_CONFIG);
|
||||
|
|
@ -499,6 +516,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
|||
statusListener.onShutdown(taskId);
|
||||
EasyMock.expectLastCall();
|
||||
|
||||
producer.close();
|
||||
EasyMock.expectLastCall();
|
||||
|
||||
PowerMock.replayAll();
|
||||
|
||||
workerTask.initialize(TASK_CONFIG);
|
||||
|
|
|
|||
Loading…
Reference in New Issue