mirror of https://github.com/apache/kafka.git
				
				
				
			KAFKA-3847: Use a separate producer per source task
This commit is contained in:
		
							parent
							
								
									39431f7347
								
							
						
					
					
						commit
						98ec7f69bb
					
				|  | @ -71,10 +71,10 @@ public class Worker { | |||
|     private final Converter internalKeyConverter; | ||||
|     private final Converter internalValueConverter; | ||||
|     private final OffsetBackingStore offsetBackingStore; | ||||
|     private final Map<String, Object> producerProps; | ||||
| 
 | ||||
|     private HashMap<String, WorkerConnector> connectors = new HashMap<>(); | ||||
|     private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>(); | ||||
|     private KafkaProducer<byte[], byte[]> producer; | ||||
|     private SourceTaskOffsetCommitter sourceTaskOffsetCommitter; | ||||
| 
 | ||||
|     public Worker(String workerId, | ||||
|  | @ -96,16 +96,11 @@ public class Worker { | |||
| 
 | ||||
|         this.offsetBackingStore = offsetBackingStore; | ||||
|         this.offsetBackingStore.configure(config); | ||||
|     } | ||||
| 
 | ||||
|     public void start() { | ||||
|         log.info("Worker starting"); | ||||
| 
 | ||||
|         Map<String, Object> producerProps = new HashMap<>(); | ||||
|         producerProps = new HashMap<>(); | ||||
|         producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",")); | ||||
|         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); | ||||
|         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); | ||||
| 
 | ||||
|         // These settings are designed to ensure there is no data loss. They *may* be overridden via configs passed to the | ||||
|         // worker, but this may compromise the delivery guarantees of Kafka Connect. | ||||
|         producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, ((Integer) Integer.MAX_VALUE).toString()); | ||||
|  | @ -113,10 +108,12 @@ public class Worker { | |||
|         producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, ((Long) Long.MAX_VALUE).toString()); | ||||
|         producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); | ||||
|         producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); | ||||
| 
 | ||||
|         // User-specified overrides | ||||
|         producerProps.putAll(config.originalsWithPrefix("producer.")); | ||||
|     } | ||||
| 
 | ||||
|         producer = new KafkaProducer<>(producerProps); | ||||
|     public void start() { | ||||
|         log.info("Worker starting"); | ||||
| 
 | ||||
|         offsetBackingStore.start(); | ||||
|         sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(config); | ||||
|  | @ -349,6 +346,7 @@ public class Worker { | |||
|                     internalKeyConverter, internalValueConverter); | ||||
|             OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(), | ||||
|                     internalKeyConverter, internalValueConverter); | ||||
|             KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps); | ||||
|             return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, | ||||
|                      valueConverter, producer, offsetReader, offsetWriter, config, time); | ||||
|         } else if (task instanceof SinkTask) { | ||||
|  |  | |||
|  | @ -115,7 +115,7 @@ class WorkerSourceTask extends WorkerTask { | |||
|     } | ||||
| 
 | ||||
|     protected void close() { | ||||
|         // nothing to do | ||||
|         producer.close(); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|  |  | |||
|  | @ -156,6 +156,9 @@ public class WorkerSourceTaskTest extends ThreadedTest { | |||
|         statusListener.onShutdown(taskId); | ||||
|         EasyMock.expectLastCall(); | ||||
| 
 | ||||
|         producer.close(); | ||||
|         EasyMock.expectLastCall(); | ||||
| 
 | ||||
|         PowerMock.replayAll(); | ||||
| 
 | ||||
|         workerTask.initialize(TASK_CONFIG); | ||||
|  | @ -195,6 +198,9 @@ public class WorkerSourceTaskTest extends ThreadedTest { | |||
|         statusListener.onShutdown(taskId); | ||||
|         EasyMock.expectLastCall(); | ||||
| 
 | ||||
|         producer.close(); | ||||
|         EasyMock.expectLastCall(); | ||||
| 
 | ||||
|         PowerMock.replayAll(); | ||||
| 
 | ||||
|         workerTask.initialize(TASK_CONFIG); | ||||
|  | @ -238,6 +244,9 @@ public class WorkerSourceTaskTest extends ThreadedTest { | |||
|         statusListener.onShutdown(taskId); | ||||
|         EasyMock.expectLastCall(); | ||||
| 
 | ||||
|         producer.close(); | ||||
|         EasyMock.expectLastCall(); | ||||
| 
 | ||||
|         PowerMock.replayAll(); | ||||
| 
 | ||||
|         workerTask.initialize(TASK_CONFIG); | ||||
|  | @ -280,6 +289,9 @@ public class WorkerSourceTaskTest extends ThreadedTest { | |||
|         EasyMock.expectLastCall(); | ||||
|         expectOffsetFlush(true); | ||||
| 
 | ||||
|         producer.close(); | ||||
|         EasyMock.expectLastCall(); | ||||
| 
 | ||||
|         PowerMock.replayAll(); | ||||
| 
 | ||||
|         workerTask.initialize(TASK_CONFIG); | ||||
|  | @ -317,6 +329,8 @@ public class WorkerSourceTaskTest extends ThreadedTest { | |||
|         statusListener.onShutdown(taskId); | ||||
|         EasyMock.expectLastCall(); | ||||
| 
 | ||||
|         producer.close(); | ||||
|         EasyMock.expectLastCall(); | ||||
| 
 | ||||
|         PowerMock.replayAll(); | ||||
| 
 | ||||
|  | @ -356,6 +370,9 @@ public class WorkerSourceTaskTest extends ThreadedTest { | |||
|         statusListener.onShutdown(taskId); | ||||
|         EasyMock.expectLastCall(); | ||||
| 
 | ||||
|         producer.close(); | ||||
|         EasyMock.expectLastCall(); | ||||
| 
 | ||||
|         PowerMock.replayAll(); | ||||
| 
 | ||||
|         workerTask.initialize(TASK_CONFIG); | ||||
|  | @ -499,6 +516,9 @@ public class WorkerSourceTaskTest extends ThreadedTest { | |||
|         statusListener.onShutdown(taskId); | ||||
|         EasyMock.expectLastCall(); | ||||
| 
 | ||||
|         producer.close(); | ||||
|         EasyMock.expectLastCall(); | ||||
| 
 | ||||
|         PowerMock.replayAll(); | ||||
| 
 | ||||
|         workerTask.initialize(TASK_CONFIG); | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue