KAFKA-3847: Use a separate producer per source task

This commit is contained in:
Ewen Cheslack-Postava 2016-08-12 11:22:04 -07:00
parent 39431f7347
commit 98ec7f69bb
3 changed files with 28 additions and 10 deletions

View File

@ -71,10 +71,10 @@ public class Worker {
private final Converter internalKeyConverter;
private final Converter internalValueConverter;
private final OffsetBackingStore offsetBackingStore;
private final Map<String, Object> producerProps;
private HashMap<String, WorkerConnector> connectors = new HashMap<>();
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
private KafkaProducer<byte[], byte[]> producer;
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
public Worker(String workerId,
@ -96,16 +96,11 @@ public class Worker {
this.offsetBackingStore = offsetBackingStore;
this.offsetBackingStore.configure(config);
}
public void start() {
log.info("Worker starting");
Map<String, Object> producerProps = new HashMap<>();
producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
// These settings are designed to ensure there is no data loss. They *may* be overridden via configs passed to the
// worker, but this may compromise the delivery guarantees of Kafka Connect.
producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, ((Integer) Integer.MAX_VALUE).toString());
@ -113,10 +108,12 @@ public class Worker {
producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, ((Long) Long.MAX_VALUE).toString());
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
// User-specified overrides
producerProps.putAll(config.originalsWithPrefix("producer."));
}
producer = new KafkaProducer<>(producerProps);
public void start() {
log.info("Worker starting");
offsetBackingStore.start();
sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(config);
@ -349,6 +346,7 @@ public class Worker {
internalKeyConverter, internalValueConverter);
OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
internalKeyConverter, internalValueConverter);
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter,
valueConverter, producer, offsetReader, offsetWriter, config, time);
} else if (task instanceof SinkTask) {

View File

@ -115,7 +115,7 @@ class WorkerSourceTask extends WorkerTask {
}
protected void close() {
// nothing to do
producer.close();
}
@Override

View File

@ -156,6 +156,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
statusListener.onShutdown(taskId);
EasyMock.expectLastCall();
producer.close();
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
@ -195,6 +198,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
statusListener.onShutdown(taskId);
EasyMock.expectLastCall();
producer.close();
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
@ -238,6 +244,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
statusListener.onShutdown(taskId);
EasyMock.expectLastCall();
producer.close();
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
@ -280,6 +289,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
EasyMock.expectLastCall();
expectOffsetFlush(true);
producer.close();
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
@ -317,6 +329,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
statusListener.onShutdown(taskId);
EasyMock.expectLastCall();
producer.close();
EasyMock.expectLastCall();
PowerMock.replayAll();
@ -356,6 +370,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
statusListener.onShutdown(taskId);
EasyMock.expectLastCall();
producer.close();
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
@ -499,6 +516,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
statusListener.onShutdown(taskId);
EasyMock.expectLastCall();
producer.close();
EasyMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);