MINOR: Clean up of SourceTaskOffsetCommiter

Author: Liquan Pei <liquanpei@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1170 from Ishiihara/minor-cleanup
This commit is contained in:
Liquan Pei 2016-04-03 19:04:48 -07:00 committed by Ewen Cheslack-Postava
parent 3a20ba3055
commit 83cf38545b
2 changed files with 4 additions and 7 deletions

View File

@ -17,7 +17,6 @@
package org.apache.kafka.connect.runtime; package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -45,13 +44,11 @@ import java.util.concurrent.TimeUnit;
class SourceTaskOffsetCommitter { class SourceTaskOffsetCommitter {
private static final Logger log = LoggerFactory.getLogger(SourceTaskOffsetCommitter.class); private static final Logger log = LoggerFactory.getLogger(SourceTaskOffsetCommitter.class);
private Time time;
private WorkerConfig config; private WorkerConfig config;
private ScheduledExecutorService commitExecutorService = null; private ScheduledExecutorService commitExecutorService = null;
private HashMap<ConnectorTaskId, ScheduledCommitTask> committers = new HashMap<>(); private final HashMap<ConnectorTaskId, ScheduledCommitTask> committers = new HashMap<>();
SourceTaskOffsetCommitter(Time time, WorkerConfig config) { SourceTaskOffsetCommitter(WorkerConfig config) {
this.time = time;
this.config = config; this.config = config;
commitExecutorService = Executors.newSingleThreadScheduledExecutor(); commitExecutorService = Executors.newSingleThreadScheduledExecutor();
} }
@ -96,7 +93,7 @@ class SourceTaskOffsetCommitter {
} }
} }
public void commit(ConnectorTaskId id, WorkerSourceTask workerTask) { private void commit(ConnectorTaskId id, WorkerSourceTask workerTask) {
final ScheduledCommitTask task; final ScheduledCommitTask task;
synchronized (committers) { synchronized (committers) {
task = committers.get(id); task = committers.get(id);

View File

@ -120,7 +120,7 @@ public class Worker {
producer = new KafkaProducer<>(producerProps); producer = new KafkaProducer<>(producerProps);
offsetBackingStore.start(); offsetBackingStore.start();
sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(time, config); sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(config);
log.info("Worker started"); log.info("Worker started");
} }