mirror of https://github.com/apache/kafka.git
KAFKA-3847: Use a separate producer per source task
This commit is contained in:
parent
39431f7347
commit
98ec7f69bb
|
|
@ -71,10 +71,10 @@ public class Worker {
|
||||||
private final Converter internalKeyConverter;
|
private final Converter internalKeyConverter;
|
||||||
private final Converter internalValueConverter;
|
private final Converter internalValueConverter;
|
||||||
private final OffsetBackingStore offsetBackingStore;
|
private final OffsetBackingStore offsetBackingStore;
|
||||||
|
private final Map<String, Object> producerProps;
|
||||||
|
|
||||||
private HashMap<String, WorkerConnector> connectors = new HashMap<>();
|
private HashMap<String, WorkerConnector> connectors = new HashMap<>();
|
||||||
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
|
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
|
||||||
private KafkaProducer<byte[], byte[]> producer;
|
|
||||||
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
|
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
|
||||||
|
|
||||||
public Worker(String workerId,
|
public Worker(String workerId,
|
||||||
|
|
@ -96,16 +96,11 @@ public class Worker {
|
||||||
|
|
||||||
this.offsetBackingStore = offsetBackingStore;
|
this.offsetBackingStore = offsetBackingStore;
|
||||||
this.offsetBackingStore.configure(config);
|
this.offsetBackingStore.configure(config);
|
||||||
}
|
|
||||||
|
|
||||||
public void start() {
|
producerProps = new HashMap<>();
|
||||||
log.info("Worker starting");
|
|
||||||
|
|
||||||
Map<String, Object> producerProps = new HashMap<>();
|
|
||||||
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
|
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
|
||||||
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
|
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
|
||||||
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
|
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
|
||||||
|
|
||||||
// These settings are designed to ensure there is no data loss. They *may* be overridden via configs passed to the
|
// These settings are designed to ensure there is no data loss. They *may* be overridden via configs passed to the
|
||||||
// worker, but this may compromise the delivery guarantees of Kafka Connect.
|
// worker, but this may compromise the delivery guarantees of Kafka Connect.
|
||||||
producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, ((Integer) Integer.MAX_VALUE).toString());
|
producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, ((Integer) Integer.MAX_VALUE).toString());
|
||||||
|
|
@ -113,10 +108,12 @@ public class Worker {
|
||||||
producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, ((Long) Long.MAX_VALUE).toString());
|
producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, ((Long) Long.MAX_VALUE).toString());
|
||||||
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
|
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
|
||||||
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
|
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
|
||||||
|
// User-specified overrides
|
||||||
producerProps.putAll(config.originalsWithPrefix("producer."));
|
producerProps.putAll(config.originalsWithPrefix("producer."));
|
||||||
|
}
|
||||||
|
|
||||||
producer = new KafkaProducer<>(producerProps);
|
public void start() {
|
||||||
|
log.info("Worker starting");
|
||||||
|
|
||||||
offsetBackingStore.start();
|
offsetBackingStore.start();
|
||||||
sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(config);
|
sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(config);
|
||||||
|
|
@ -349,6 +346,7 @@ public class Worker {
|
||||||
internalKeyConverter, internalValueConverter);
|
internalKeyConverter, internalValueConverter);
|
||||||
OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
|
OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
|
||||||
internalKeyConverter, internalValueConverter);
|
internalKeyConverter, internalValueConverter);
|
||||||
|
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
|
||||||
return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter,
|
return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter,
|
||||||
valueConverter, producer, offsetReader, offsetWriter, config, time);
|
valueConverter, producer, offsetReader, offsetWriter, config, time);
|
||||||
} else if (task instanceof SinkTask) {
|
} else if (task instanceof SinkTask) {
|
||||||
|
|
|
||||||
|
|
@ -115,7 +115,7 @@ class WorkerSourceTask extends WorkerTask {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void close() {
|
protected void close() {
|
||||||
// nothing to do
|
producer.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -156,6 +156,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
||||||
statusListener.onShutdown(taskId);
|
statusListener.onShutdown(taskId);
|
||||||
EasyMock.expectLastCall();
|
EasyMock.expectLastCall();
|
||||||
|
|
||||||
|
producer.close();
|
||||||
|
EasyMock.expectLastCall();
|
||||||
|
|
||||||
PowerMock.replayAll();
|
PowerMock.replayAll();
|
||||||
|
|
||||||
workerTask.initialize(TASK_CONFIG);
|
workerTask.initialize(TASK_CONFIG);
|
||||||
|
|
@ -195,6 +198,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
||||||
statusListener.onShutdown(taskId);
|
statusListener.onShutdown(taskId);
|
||||||
EasyMock.expectLastCall();
|
EasyMock.expectLastCall();
|
||||||
|
|
||||||
|
producer.close();
|
||||||
|
EasyMock.expectLastCall();
|
||||||
|
|
||||||
PowerMock.replayAll();
|
PowerMock.replayAll();
|
||||||
|
|
||||||
workerTask.initialize(TASK_CONFIG);
|
workerTask.initialize(TASK_CONFIG);
|
||||||
|
|
@ -238,6 +244,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
||||||
statusListener.onShutdown(taskId);
|
statusListener.onShutdown(taskId);
|
||||||
EasyMock.expectLastCall();
|
EasyMock.expectLastCall();
|
||||||
|
|
||||||
|
producer.close();
|
||||||
|
EasyMock.expectLastCall();
|
||||||
|
|
||||||
PowerMock.replayAll();
|
PowerMock.replayAll();
|
||||||
|
|
||||||
workerTask.initialize(TASK_CONFIG);
|
workerTask.initialize(TASK_CONFIG);
|
||||||
|
|
@ -280,6 +289,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
||||||
EasyMock.expectLastCall();
|
EasyMock.expectLastCall();
|
||||||
expectOffsetFlush(true);
|
expectOffsetFlush(true);
|
||||||
|
|
||||||
|
producer.close();
|
||||||
|
EasyMock.expectLastCall();
|
||||||
|
|
||||||
PowerMock.replayAll();
|
PowerMock.replayAll();
|
||||||
|
|
||||||
workerTask.initialize(TASK_CONFIG);
|
workerTask.initialize(TASK_CONFIG);
|
||||||
|
|
@ -317,6 +329,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
||||||
statusListener.onShutdown(taskId);
|
statusListener.onShutdown(taskId);
|
||||||
EasyMock.expectLastCall();
|
EasyMock.expectLastCall();
|
||||||
|
|
||||||
|
producer.close();
|
||||||
|
EasyMock.expectLastCall();
|
||||||
|
|
||||||
PowerMock.replayAll();
|
PowerMock.replayAll();
|
||||||
|
|
||||||
|
|
@ -356,6 +370,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
||||||
statusListener.onShutdown(taskId);
|
statusListener.onShutdown(taskId);
|
||||||
EasyMock.expectLastCall();
|
EasyMock.expectLastCall();
|
||||||
|
|
||||||
|
producer.close();
|
||||||
|
EasyMock.expectLastCall();
|
||||||
|
|
||||||
PowerMock.replayAll();
|
PowerMock.replayAll();
|
||||||
|
|
||||||
workerTask.initialize(TASK_CONFIG);
|
workerTask.initialize(TASK_CONFIG);
|
||||||
|
|
@ -499,6 +516,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
||||||
statusListener.onShutdown(taskId);
|
statusListener.onShutdown(taskId);
|
||||||
EasyMock.expectLastCall();
|
EasyMock.expectLastCall();
|
||||||
|
|
||||||
|
producer.close();
|
||||||
|
EasyMock.expectLastCall();
|
||||||
|
|
||||||
PowerMock.replayAll();
|
PowerMock.replayAll();
|
||||||
|
|
||||||
workerTask.initialize(TASK_CONFIG);
|
workerTask.initialize(TASK_CONFIG);
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue