mirror of https://github.com/apache/kafka.git
KAFKA-4042: Contain connector & task start/stop failures within the Worker
Invoke the statusListener.onFailure() callback on start failures so that the statusBackingStore is updated. This involved a fix to the putSafe() functionality which prevented any update that was not preceded by a (non-safe) put() from completing, so here when a connector or task is transitioning directly to FAILED. Worker start methods can still throw if the same connector name or task ID is already registered with the worker, as this condition should not happen. Author: Shikhar Bhushan <shikhar@confluent.io> Reviewers: Jason Gustafson <jason@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io> Closes #1778 from shikhar/distherder-stayup-take4
This commit is contained in:
parent
1902394418
commit
71f7e7c3a2
|
@ -38,6 +38,7 @@ results
|
|||
tests/results
|
||||
.ducktape
|
||||
tests/.ducktape
|
||||
tests/venv
|
||||
.cache
|
||||
|
||||
docs/generated/
|
||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.SystemTime;
|
|||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.connect.runtime.Connect;
|
||||
import org.apache.kafka.connect.runtime.ConnectorFactory;
|
||||
import org.apache.kafka.connect.runtime.Worker;
|
||||
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
|
||||
import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
|
||||
|
@ -62,6 +63,7 @@ public class ConnectDistributed {
|
|||
Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
|
||||
|
||||
Time time = new SystemTime();
|
||||
ConnectorFactory connectorFactory = new ConnectorFactory();
|
||||
DistributedConfig config = new DistributedConfig(workerProps);
|
||||
|
||||
RestServer rest = new RestServer(config);
|
||||
|
@ -71,7 +73,7 @@ public class ConnectDistributed {
|
|||
KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
|
||||
offsetBackingStore.configure(config);
|
||||
|
||||
Worker worker = new Worker(workerId, time, config, offsetBackingStore);
|
||||
Worker worker = new Worker(workerId, time, connectorFactory, config, offsetBackingStore);
|
||||
|
||||
StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, worker.getInternalValueConverter());
|
||||
statusBackingStore.configure(config);
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Time;
|
|||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.connect.runtime.Connect;
|
||||
import org.apache.kafka.connect.runtime.ConnectorConfig;
|
||||
import org.apache.kafka.connect.runtime.ConnectorFactory;
|
||||
import org.apache.kafka.connect.runtime.Herder;
|
||||
import org.apache.kafka.connect.runtime.Worker;
|
||||
import org.apache.kafka.connect.runtime.rest.RestServer;
|
||||
|
@ -67,13 +68,14 @@ public class ConnectStandalone {
|
|||
Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
|
||||
|
||||
Time time = new SystemTime();
|
||||
ConnectorFactory connectorFactory = new ConnectorFactory();
|
||||
StandaloneConfig config = new StandaloneConfig(workerProps);
|
||||
|
||||
RestServer rest = new RestServer(config);
|
||||
URI advertisedUrl = rest.advertisedUrl();
|
||||
String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
|
||||
|
||||
Worker worker = new Worker(workerId, time, config, new FileOffsetBackingStore());
|
||||
Worker worker = new Worker(workerId, time, connectorFactory, config, new FileOffsetBackingStore());
|
||||
|
||||
Herder herder = new StandaloneHerder(worker);
|
||||
final Connect connect = new Connect(herder, rest);
|
||||
|
|
|
@ -361,7 +361,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
|
|||
if (tempConnectors.containsKey(connType)) {
|
||||
return tempConnectors.get(connType);
|
||||
} else {
|
||||
Connector connector = worker.getConnector(connType);
|
||||
Connector connector = worker.getConnectorFactory().newConnector(connType);
|
||||
tempConnectors.put(connType, connector);
|
||||
return connector;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,102 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.kafka.connect.runtime;
|
||||
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.connect.connector.Connector;
|
||||
import org.apache.kafka.connect.connector.Task;
|
||||
import org.apache.kafka.connect.errors.ConnectException;
|
||||
import org.reflections.Reflections;
|
||||
import org.reflections.util.ClasspathHelper;
|
||||
import org.reflections.util.ConfigurationBuilder;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
public class ConnectorFactory {
|
||||
|
||||
public Connector newConnector(String connectorClassOrAlias) {
|
||||
return instantiate(getConnectorClass(connectorClassOrAlias));
|
||||
}
|
||||
|
||||
public Task newTask(Class<? extends Task> taskClass) {
|
||||
return instantiate(taskClass);
|
||||
}
|
||||
|
||||
private static <T> T instantiate(Class<? extends T> cls) {
|
||||
try {
|
||||
return Utils.newInstance(cls);
|
||||
} catch (Throwable t) {
|
||||
throw new ConnectException("Instantiation error", t);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static Class<? extends Connector> getConnectorClass(String connectorClassOrAlias) {
|
||||
// Avoid the classpath scan if the full class name was provided
|
||||
try {
|
||||
Class<?> clazz = Class.forName(connectorClassOrAlias);
|
||||
if (!Connector.class.isAssignableFrom(clazz))
|
||||
throw new ConnectException("Class " + connectorClassOrAlias + " does not implement Connector");
|
||||
return (Class<? extends Connector>) clazz;
|
||||
} catch (ClassNotFoundException e) {
|
||||
// Fall through to scan for the alias
|
||||
}
|
||||
|
||||
// Iterate over our entire classpath to find all the connectors and hopefully one of them matches the alias from the connector configration
|
||||
Reflections reflections = new Reflections(new ConfigurationBuilder()
|
||||
.setUrls(ClasspathHelper.forJavaClassPath()));
|
||||
|
||||
Set<Class<? extends Connector>> connectors = reflections.getSubTypesOf(Connector.class);
|
||||
|
||||
List<Class<? extends Connector>> results = new ArrayList<>();
|
||||
|
||||
for (Class<? extends Connector> connector: connectors) {
|
||||
// Configuration included the class name but not package
|
||||
if (connector.getSimpleName().equals(connectorClassOrAlias))
|
||||
results.add(connector);
|
||||
|
||||
// Configuration included a short version of the name (i.e. FileStreamSink instead of FileStreamSinkConnector)
|
||||
if (connector.getSimpleName().equals(connectorClassOrAlias + "Connector"))
|
||||
results.add(connector);
|
||||
}
|
||||
|
||||
if (results.isEmpty())
|
||||
throw new ConnectException("Failed to find any class that implements Connector and which name matches " + connectorClassOrAlias +
|
||||
", available connectors are: " + connectorNames(connectors));
|
||||
if (results.size() > 1) {
|
||||
throw new ConnectException("More than one connector matches alias " + connectorClassOrAlias +
|
||||
". Please use full package and class name instead. Classes found: " + connectorNames(results));
|
||||
}
|
||||
|
||||
// We just validated that we have exactly one result, so this is safe
|
||||
return results.get(0);
|
||||
}
|
||||
|
||||
private static String connectorNames(Collection<Class<? extends Connector>> connectors) {
|
||||
StringBuilder names = new StringBuilder();
|
||||
for (Class<?> c : connectors)
|
||||
names.append(c.getName()).append(", ");
|
||||
return names.substring(0, names.toString().length() - 2);
|
||||
}
|
||||
|
||||
}
|
|
@ -19,7 +19,6 @@ package org.apache.kafka.connect.runtime;
|
|||
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.connect.connector.Connector;
|
||||
|
@ -34,15 +33,14 @@ import org.apache.kafka.connect.storage.OffsetStorageReader;
|
|||
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
|
||||
import org.apache.kafka.connect.storage.OffsetStorageWriter;
|
||||
import org.apache.kafka.connect.util.ConnectorTaskId;
|
||||
import org.reflections.Reflections;
|
||||
import org.reflections.util.ClasspathHelper;
|
||||
import org.reflections.util.ConfigurationBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -65,6 +63,7 @@ public class Worker {
|
|||
private final ExecutorService executor;
|
||||
private final Time time;
|
||||
private final String workerId;
|
||||
private final ConnectorFactory connectorFactory;
|
||||
private final WorkerConfig config;
|
||||
private final Converter defaultKeyConverter;
|
||||
private final Converter defaultValueConverter;
|
||||
|
@ -77,13 +76,11 @@ public class Worker {
|
|||
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
|
||||
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
|
||||
|
||||
public Worker(String workerId,
|
||||
Time time,
|
||||
WorkerConfig config,
|
||||
OffsetBackingStore offsetBackingStore) {
|
||||
public Worker(String workerId, Time time, ConnectorFactory connectorFactory, WorkerConfig config, OffsetBackingStore offsetBackingStore) {
|
||||
this.executor = Executors.newCachedThreadPool();
|
||||
this.workerId = workerId;
|
||||
this.time = time;
|
||||
this.connectorFactory = connectorFactory;
|
||||
this.config = config;
|
||||
this.defaultKeyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
|
||||
this.defaultKeyConverter.configure(config.originalsWithPrefix("key.converter."), true);
|
||||
|
@ -127,18 +124,15 @@ public class Worker {
|
|||
long started = time.milliseconds();
|
||||
long limit = started + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
|
||||
|
||||
for (Map.Entry<String, WorkerConnector> entry : connectors.entrySet()) {
|
||||
WorkerConnector workerConnector = entry.getValue();
|
||||
log.warn("Shutting down connector {} uncleanly; herder should have shut down connectors before the" +
|
||||
"Worker is stopped.", entry.getKey());
|
||||
workerConnector.shutdown();
|
||||
if (!connectors.isEmpty()) {
|
||||
log.warn("Shutting down connectors {} uncleanly; herder should have shut down connectors before the Worker is stopped", connectors.keySet());
|
||||
stopConnectors();
|
||||
}
|
||||
|
||||
Collection<ConnectorTaskId> taskIds = tasks.keySet();
|
||||
log.warn("Shutting down tasks {} uncleanly; herder should have shut down "
|
||||
+ "tasks before the Worker is stopped.", taskIds);
|
||||
stopTasks(taskIds);
|
||||
awaitStopTasks(taskIds);
|
||||
if (!tasks.isEmpty()) {
|
||||
log.warn("Shutting down tasks {} uncleanly; herder should have shut down tasks before the Worker is stopped", tasks.keySet());
|
||||
stopAndAwaitTasks();
|
||||
}
|
||||
|
||||
long timeoutMs = limit - time.milliseconds();
|
||||
sourceTaskOffsetCommitter.close(timeoutMs);
|
||||
|
@ -148,34 +142,36 @@ public class Worker {
|
|||
log.info("Worker stopped");
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a new connector.
|
||||
* @param connConfig connector configuration
|
||||
* @param ctx context for the connector
|
||||
* @param statusListener listener for notifications of connector status changes
|
||||
* @param initialState the initial target state that the connector should be initialized to
|
||||
*/
|
||||
public void startConnector(ConnectorConfig connConfig,
|
||||
ConnectorContext ctx,
|
||||
ConnectorStatus.Listener statusListener,
|
||||
TargetState initialState) {
|
||||
String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
|
||||
Class<? extends Connector> connClass = getConnectorClass(connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
|
||||
|
||||
log.info("Creating connector {} of type {}", connName, connClass.getName());
|
||||
|
||||
public boolean startConnector(
|
||||
String connName,
|
||||
Map<String, String> connProps,
|
||||
ConnectorContext ctx,
|
||||
ConnectorStatus.Listener statusListener,
|
||||
TargetState initialState
|
||||
) {
|
||||
if (connectors.containsKey(connName))
|
||||
throw new ConnectException("Connector with name " + connName + " already exists");
|
||||
|
||||
final Connector connector = instantiateConnector(connClass);
|
||||
WorkerConnector workerConnector = new WorkerConnector(connName, connector, ctx, statusListener);
|
||||
|
||||
log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connClass.getName());
|
||||
workerConnector.initialize(connConfig);
|
||||
workerConnector.transitionTo(initialState);
|
||||
final WorkerConnector workerConnector;
|
||||
try {
|
||||
final ConnectorConfig connConfig = new ConnectorConfig(connProps);
|
||||
final String connClass = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
|
||||
log.info("Creating connector {} of type {}", connName, connClass);
|
||||
final Connector connector = connectorFactory.newConnector(connClass);
|
||||
workerConnector = new WorkerConnector(connName, connector, ctx, statusListener);
|
||||
log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connector.getClass());
|
||||
workerConnector.initialize(connConfig);
|
||||
workerConnector.transitionTo(initialState);
|
||||
} catch (Throwable t) {
|
||||
log.error("Failed to start connector {}", connName, t);
|
||||
statusListener.onFailure(connName, t);
|
||||
return false;
|
||||
}
|
||||
|
||||
connectors.put(connName, workerConnector);
|
||||
|
||||
log.info("Finished creating connector {}", connName);
|
||||
return true;
|
||||
}
|
||||
|
||||
/* Now that the configuration doesn't contain the actual class name, we need to be able to tell the herder whether a connector is a Sink */
|
||||
|
@ -184,73 +180,6 @@ public class Worker {
|
|||
return workerConnector.isSinkConnector();
|
||||
}
|
||||
|
||||
public Connector getConnector(String connType) {
|
||||
Class<? extends Connector> connectorClass = getConnectorClass(connType);
|
||||
return instantiateConnector(connectorClass);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private Class<? extends Connector> getConnectorClass(String connectorAlias) {
|
||||
// Avoid the classpath scan if the full class name was provided
|
||||
try {
|
||||
Class<?> clazz = Class.forName(connectorAlias);
|
||||
if (!Connector.class.isAssignableFrom(clazz))
|
||||
throw new ConnectException("Class " + connectorAlias + " does not implement Connector");
|
||||
return (Class<? extends Connector>) clazz;
|
||||
} catch (ClassNotFoundException e) {
|
||||
// Fall through to scan for the alias
|
||||
}
|
||||
|
||||
// Iterate over our entire classpath to find all the connectors and hopefully one of them matches the alias from the connector configration
|
||||
Reflections reflections = new Reflections(new ConfigurationBuilder()
|
||||
.setUrls(ClasspathHelper.forJavaClassPath()));
|
||||
|
||||
Set<Class<? extends Connector>> connectors = reflections.getSubTypesOf(Connector.class);
|
||||
|
||||
List<Class<? extends Connector>> results = new ArrayList<>();
|
||||
|
||||
for (Class<? extends Connector> connector: connectors) {
|
||||
// Configuration included the class name but not package
|
||||
if (connector.getSimpleName().equals(connectorAlias))
|
||||
results.add(connector);
|
||||
|
||||
// Configuration included a short version of the name (i.e. FileStreamSink instead of FileStreamSinkConnector)
|
||||
if (connector.getSimpleName().equals(connectorAlias + "Connector"))
|
||||
results.add(connector);
|
||||
}
|
||||
|
||||
if (results.isEmpty())
|
||||
throw new ConnectException("Failed to find any class that implements Connector and which name matches " + connectorAlias + " available connectors are: " + connectorNames(connectors));
|
||||
if (results.size() > 1) {
|
||||
throw new ConnectException("More than one connector matches alias " + connectorAlias + ". Please use full package + class name instead. Classes found: " + connectorNames(results));
|
||||
}
|
||||
|
||||
// We just validated that we have exactly one result, so this is safe
|
||||
return results.get(0);
|
||||
}
|
||||
|
||||
private String connectorNames(Collection<Class<? extends Connector>> connectors) {
|
||||
StringBuilder names = new StringBuilder();
|
||||
for (Class<?> c : connectors)
|
||||
names.append(c.getName()).append(", ");
|
||||
|
||||
return names.substring(0, names.toString().length() - 2);
|
||||
}
|
||||
|
||||
public boolean ownsTask(ConnectorTaskId taskId) {
|
||||
return tasks.containsKey(taskId);
|
||||
}
|
||||
|
||||
private static Connector instantiateConnector(Class<? extends Connector> connClass) {
|
||||
try {
|
||||
return Utils.newInstance(connClass);
|
||||
} catch (Throwable t) {
|
||||
// Catches normal exceptions due to instantiation errors as well as any runtime errors that
|
||||
// may be caused by user code
|
||||
throw new ConnectException("Failed to create connector instance", t);
|
||||
}
|
||||
}
|
||||
|
||||
public List<Map<String, String>> connectorTaskConfigs(String connName, int maxTasks, List<String> sinkTopics) {
|
||||
log.trace("Reconfiguring connector tasks for {}", connName);
|
||||
|
||||
|
@ -271,17 +200,34 @@ public class Worker {
|
|||
return result;
|
||||
}
|
||||
|
||||
public void stopConnector(String connName) {
|
||||
public void stopConnectors() {
|
||||
stopConnectors(new HashSet<>(connectors.keySet()));
|
||||
}
|
||||
|
||||
public Collection<String> stopConnectors(Collection<String> connectors) {
|
||||
final List<String> stopped = new ArrayList<>(connectors.size());
|
||||
for (String connector: connectors) {
|
||||
if (stopConnector(connector)) {
|
||||
stopped.add(connector);
|
||||
}
|
||||
}
|
||||
return stopped;
|
||||
}
|
||||
|
||||
public boolean stopConnector(String connName) {
|
||||
log.info("Stopping connector {}", connName);
|
||||
|
||||
WorkerConnector connector = connectors.get(connName);
|
||||
if (connector == null)
|
||||
throw new ConnectException("Connector " + connName + " not found in this worker.");
|
||||
if (connector == null) {
|
||||
log.warn("Ignoring stop request for unowned connector {}", connName);
|
||||
return false;
|
||||
}
|
||||
|
||||
connector.shutdown();
|
||||
connectors.remove(connName);
|
||||
|
||||
log.info("Stopped connector {}", connName);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -293,60 +239,55 @@ public class Worker {
|
|||
|
||||
public boolean isRunning(String connName) {
|
||||
WorkerConnector connector = connectors.get(connName);
|
||||
if (connector == null)
|
||||
throw new ConnectException("Connector " + connName + " not found in this worker.");
|
||||
return connector.isRunning();
|
||||
return connector != null && connector.isRunning();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a new task.
|
||||
* @param id Globally unique ID for this task.
|
||||
* @param taskConfig the parsed task configuration
|
||||
* @param connConfig the parsed connector configuration
|
||||
* @param statusListener listener for notifications of task status changes
|
||||
* @param initialState the initial target state that the task should be initialized to
|
||||
*/
|
||||
public void startTask(ConnectorTaskId id,
|
||||
TaskConfig taskConfig,
|
||||
ConnectorConfig connConfig,
|
||||
TaskStatus.Listener statusListener,
|
||||
TargetState initialState) {
|
||||
public boolean startTask(
|
||||
ConnectorTaskId id,
|
||||
Map<String, String> connProps,
|
||||
Map<String, String> taskProps,
|
||||
TaskStatus.Listener statusListener,
|
||||
TargetState initialState
|
||||
) {
|
||||
log.info("Creating task {}", id);
|
||||
|
||||
if (tasks.containsKey(id)) {
|
||||
String msg = "Task already exists in this worker; the herder should not have requested "
|
||||
+ "that this : " + id;
|
||||
log.error(msg);
|
||||
throw new ConnectException(msg);
|
||||
if (tasks.containsKey(id))
|
||||
throw new ConnectException("Task already exists in this worker: " + id);
|
||||
|
||||
final WorkerTask workerTask;
|
||||
try {
|
||||
final ConnectorConfig connConfig = new ConnectorConfig(connProps);
|
||||
final TaskConfig taskConfig = new TaskConfig(taskProps);
|
||||
|
||||
final Class<? extends Task> taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
|
||||
final Task task = connectorFactory.newTask(taskClass);
|
||||
log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName());
|
||||
|
||||
Converter keyConverter = connConfig.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
|
||||
if (keyConverter != null)
|
||||
keyConverter.configure(connConfig.originalsWithPrefix("key.converter."), true);
|
||||
else
|
||||
keyConverter = defaultKeyConverter;
|
||||
Converter valueConverter = connConfig.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
|
||||
if (valueConverter != null)
|
||||
valueConverter.configure(connConfig.originalsWithPrefix("value.converter."), false);
|
||||
else
|
||||
valueConverter = defaultValueConverter;
|
||||
|
||||
workerTask = buildWorkerTask(id, task, statusListener, initialState, keyConverter, valueConverter);
|
||||
workerTask.initialize(taskConfig);
|
||||
} catch (Throwable t) {
|
||||
log.error("Failed to start task {}", id, t);
|
||||
statusListener.onFailure(id, t);
|
||||
return false;
|
||||
}
|
||||
|
||||
Class<? extends Task> taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
|
||||
final Task task = instantiateTask(taskClass);
|
||||
log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName());
|
||||
|
||||
Converter keyConverter = connConfig.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
|
||||
if (keyConverter != null)
|
||||
keyConverter.configure(connConfig.originalsWithPrefix("key.converter."), true);
|
||||
else
|
||||
keyConverter = defaultKeyConverter;
|
||||
Converter valueConverter = connConfig.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
|
||||
if (valueConverter != null)
|
||||
valueConverter.configure(connConfig.originalsWithPrefix("value.converter."), false);
|
||||
else
|
||||
valueConverter = defaultValueConverter;
|
||||
|
||||
final WorkerTask workerTask = buildWorkerTask(id, task, statusListener, initialState, keyConverter, valueConverter);
|
||||
|
||||
// Start the task before adding modifying any state, any exceptions are caught higher up the
|
||||
// call chain and there's no cleanup to do here
|
||||
workerTask.initialize(taskConfig);
|
||||
executor.submit(workerTask);
|
||||
|
||||
if (task instanceof SourceTask) {
|
||||
WorkerSourceTask workerSourceTask = (WorkerSourceTask) workerTask;
|
||||
sourceTaskOffsetCommitter.schedule(id, workerSourceTask);
|
||||
if (workerTask instanceof WorkerSourceTask) {
|
||||
sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask) workerTask);
|
||||
}
|
||||
tasks.put(id, workerTask);
|
||||
return true;
|
||||
}
|
||||
|
||||
private WorkerTask buildWorkerTask(ConnectorTaskId id,
|
||||
|
@ -373,25 +314,42 @@ public class Worker {
|
|||
}
|
||||
}
|
||||
|
||||
private static Task instantiateTask(Class<? extends Task> taskClass) {
|
||||
try {
|
||||
return Utils.newInstance(taskClass);
|
||||
} catch (KafkaException e) {
|
||||
throw new ConnectException("Task class not found", e);
|
||||
public boolean stopAndAwaitTask(ConnectorTaskId id) {
|
||||
return !stopAndAwaitTasks(Collections.singleton(id)).isEmpty();
|
||||
}
|
||||
|
||||
public void stopAndAwaitTasks() {
|
||||
stopAndAwaitTasks(new HashSet<>(tasks.keySet()));
|
||||
}
|
||||
|
||||
public Collection<ConnectorTaskId> stopAndAwaitTasks(Collection<ConnectorTaskId> ids) {
|
||||
final List<ConnectorTaskId> stoppable = new ArrayList<>(ids.size());
|
||||
for (ConnectorTaskId taskId : ids) {
|
||||
final WorkerTask task = tasks.get(taskId);
|
||||
if (task == null) {
|
||||
log.warn("Ignoring stop request for unowned task {}", taskId);
|
||||
continue;
|
||||
}
|
||||
stopTask(task);
|
||||
stoppable.add(taskId);
|
||||
}
|
||||
awaitStopTasks(stoppable);
|
||||
return stoppable;
|
||||
}
|
||||
|
||||
public void stopTasks(Collection<ConnectorTaskId> ids) {
|
||||
for (ConnectorTaskId id : ids)
|
||||
stopTask(getTask(id));
|
||||
private void stopTask(WorkerTask task) {
|
||||
log.info("Stopping task {}", task.id());
|
||||
if (task instanceof WorkerSourceTask)
|
||||
sourceTaskOffsetCommitter.remove(task.id());
|
||||
task.stop();
|
||||
}
|
||||
|
||||
public void awaitStopTasks(Collection<ConnectorTaskId> ids) {
|
||||
private void awaitStopTasks(Collection<ConnectorTaskId> ids) {
|
||||
long now = time.milliseconds();
|
||||
long deadline = now + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
|
||||
for (ConnectorTaskId id : ids) {
|
||||
long remaining = Math.max(0, deadline - time.milliseconds());
|
||||
awaitStopTask(getTask(id), remaining);
|
||||
awaitStopTask(tasks.get(id), remaining);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -403,20 +361,6 @@ public class Worker {
|
|||
tasks.remove(task.id());
|
||||
}
|
||||
|
||||
private void stopTask(WorkerTask task) {
|
||||
log.info("Stopping task {}", task.id());
|
||||
if (task instanceof WorkerSourceTask)
|
||||
sourceTaskOffsetCommitter.remove(task.id());
|
||||
task.stop();
|
||||
}
|
||||
|
||||
public void stopAndAwaitTask(ConnectorTaskId id) {
|
||||
WorkerTask task = getTask(id);
|
||||
stopTask(task);
|
||||
awaitStopTask(task, config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG));
|
||||
log.info("Task {} completed shutdown.", task.id());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the IDs of the tasks currently running in this worker.
|
||||
*/
|
||||
|
@ -424,15 +368,6 @@ public class Worker {
|
|||
return tasks.keySet();
|
||||
}
|
||||
|
||||
private WorkerTask getTask(ConnectorTaskId id) {
|
||||
WorkerTask task = tasks.get(id);
|
||||
if (task == null) {
|
||||
log.error("Task not found: " + id);
|
||||
throw new ConnectException("Task not found: " + id);
|
||||
}
|
||||
return task;
|
||||
}
|
||||
|
||||
public Converter getInternalKeyConverter() {
|
||||
return internalKeyConverter;
|
||||
}
|
||||
|
@ -441,12 +376,12 @@ public class Worker {
|
|||
return internalValueConverter;
|
||||
}
|
||||
|
||||
public String workerId() {
|
||||
return workerId;
|
||||
public ConnectorFactory getConnectorFactory() {
|
||||
return connectorFactory;
|
||||
}
|
||||
|
||||
public boolean ownsConnector(String connName) {
|
||||
return this.connectors.containsKey(connName);
|
||||
public String workerId() {
|
||||
return workerId;
|
||||
}
|
||||
|
||||
public void setTargetState(String connName, TargetState state) {
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package org.apache.kafka.connect.runtime.distributed;
|
||||
|
||||
import org.apache.kafka.common.config.ConfigException;
|
||||
import org.apache.kafka.common.errors.WakeupException;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
|
@ -31,7 +30,6 @@ import org.apache.kafka.connect.runtime.HerderConnectorContext;
|
|||
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
|
||||
import org.apache.kafka.connect.runtime.SourceConnectorConfig;
|
||||
import org.apache.kafka.connect.runtime.TargetState;
|
||||
import org.apache.kafka.connect.runtime.TaskConfig;
|
||||
import org.apache.kafka.connect.runtime.Worker;
|
||||
import org.apache.kafka.connect.runtime.rest.RestServer;
|
||||
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
|
||||
|
@ -322,7 +320,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
|
|||
|
||||
// additionally, if the worker is running the connector itself, then we need to
|
||||
// request reconfiguration to ensure that config changes while paused take effect
|
||||
if (worker.ownsConnector(connector) && targetState == TargetState.STARTED)
|
||||
if (targetState == TargetState.STARTED)
|
||||
reconfigureConnectorTasksWithRetry(connector);
|
||||
}
|
||||
}
|
||||
|
@ -331,18 +329,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
|
|||
public void halt() {
|
||||
synchronized (this) {
|
||||
// Clean up any connectors and tasks that are still running.
|
||||
log.info("Stopping connectors and tasks that are still assigned to this worker.");
|
||||
for (String connName : new HashSet<>(worker.connectorNames())) {
|
||||
try {
|
||||
worker.stopConnector(connName);
|
||||
} catch (Throwable t) {
|
||||
log.error("Failed to shut down connector " + connName, t);
|
||||
}
|
||||
}
|
||||
|
||||
Set<ConnectorTaskId> tasks = new HashSet<>(worker.taskIds());
|
||||
worker.stopTasks(tasks);
|
||||
worker.awaitStopTasks(tasks);
|
||||
log.info("Stopping connectors and tasks that are still assigned to the worker");
|
||||
worker.stopConnectors();
|
||||
worker.stopAndAwaitTasks();
|
||||
|
||||
member.stop();
|
||||
|
||||
|
@ -573,11 +562,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
|
|||
return null;
|
||||
}
|
||||
|
||||
if (worker.ownsConnector(connName)) {
|
||||
if (assignment.connectors().contains(connName)) {
|
||||
try {
|
||||
worker.stopConnector(connName);
|
||||
startConnector(connName);
|
||||
callback.onCompletion(null, null);
|
||||
if (startConnector(connName))
|
||||
callback.onCompletion(null, null);
|
||||
else
|
||||
callback.onCompletion(new ConnectException("Failed to start connector: " + connName), null);
|
||||
} catch (Throwable t) {
|
||||
callback.onCompletion(t, null);
|
||||
}
|
||||
|
@ -609,11 +600,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
|
|||
return null;
|
||||
}
|
||||
|
||||
if (worker.ownsTask(id)) {
|
||||
if (assignment.tasks().contains(id)) {
|
||||
try {
|
||||
worker.stopAndAwaitTask(id);
|
||||
startTask(id);
|
||||
callback.onCompletion(null, null);
|
||||
if (startTask(id))
|
||||
callback.onCompletion(null, null);
|
||||
else
|
||||
callback.onCompletion(new ConnectException("Failed to start task: " + id), null);
|
||||
} catch (Throwable t) {
|
||||
callback.onCompletion(t, null);
|
||||
}
|
||||
|
@ -751,48 +744,41 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
|
|||
// Start assigned connectors and tasks
|
||||
log.info("Starting connectors and tasks using config offset {}", assignment.offset());
|
||||
for (String connectorName : assignment.connectors()) {
|
||||
try {
|
||||
startConnector(connectorName);
|
||||
} catch (ConfigException e) {
|
||||
log.error("Couldn't instantiate connector " + connectorName + " because it has an invalid connector " +
|
||||
"configuration. This connector will not execute until reconfigured.", e);
|
||||
}
|
||||
startConnector(connectorName);
|
||||
}
|
||||
for (ConnectorTaskId taskId : assignment.tasks()) {
|
||||
try {
|
||||
startTask(taskId);
|
||||
} catch (ConfigException e) {
|
||||
log.error("Couldn't instantiate task " + taskId + " because it has an invalid task " +
|
||||
"configuration. This task will not execute until reconfigured.", e);
|
||||
}
|
||||
startTask(taskId);
|
||||
}
|
||||
log.info("Finished starting connectors and tasks");
|
||||
}
|
||||
|
||||
private void startTask(ConnectorTaskId taskId) {
|
||||
private boolean startTask(ConnectorTaskId taskId) {
|
||||
log.info("Starting task {}", taskId);
|
||||
TargetState initialState = configState.targetState(taskId.connector());
|
||||
TaskConfig taskConfig = new TaskConfig(configState.taskConfig(taskId));
|
||||
ConnectorConfig connConfig = new ConnectorConfig(configState.connectorConfig(taskId.connector()));
|
||||
worker.startTask(taskId, taskConfig, connConfig, this, initialState);
|
||||
return worker.startTask(
|
||||
taskId,
|
||||
configState.connectorConfig(taskId.connector()),
|
||||
configState.taskConfig(taskId),
|
||||
this,
|
||||
configState.targetState(taskId.connector())
|
||||
);
|
||||
}
|
||||
|
||||
// Helper for starting a connector with the given name, which will extract & parse the config, generate connector
|
||||
// context and add to the worker. This needs to be called from within the main worker thread for this herder.
|
||||
private void startConnector(String connectorName) {
|
||||
private boolean startConnector(String connectorName) {
|
||||
log.info("Starting connector {}", connectorName);
|
||||
Map<String, String> configs = configState.connectorConfig(connectorName);
|
||||
ConnectorConfig connConfig = new ConnectorConfig(configs);
|
||||
String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
|
||||
ConnectorContext ctx = new HerderConnectorContext(DistributedHerder.this, connName);
|
||||
TargetState initialState = configState.targetState(connectorName);
|
||||
worker.startConnector(connConfig, ctx, this, initialState);
|
||||
final Map<String, String> configProps = configState.connectorConfig(connectorName);
|
||||
final ConnectorContext ctx = new HerderConnectorContext(DistributedHerder.this, connectorName);
|
||||
final TargetState initialState = configState.targetState(connectorName);
|
||||
boolean started = worker.startConnector(connectorName, configProps, ctx, this, initialState);
|
||||
|
||||
// Immediately request configuration since this could be a brand new connector. However, also only update those
|
||||
// task configs if they are actually different from the existing ones to avoid unnecessary updates when this is
|
||||
// just restoring an existing connector.
|
||||
if (initialState == TargetState.STARTED)
|
||||
reconfigureConnectorTasksWithRetry(connName);
|
||||
if (started && initialState == TargetState.STARTED)
|
||||
reconfigureConnectorTasksWithRetry(connectorName);
|
||||
|
||||
return started;
|
||||
}
|
||||
|
||||
private void reconfigureConnectorTasksWithRetry(final String connName) {
|
||||
|
@ -1053,17 +1039,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
|
|||
// this worker. Instead, we can let them continue to run but buffer any update requests (which should be
|
||||
// rare anyway). This would avoid a steady stream of start/stop, which probably also includes lots of
|
||||
// unnecessary repeated connections to the source/sink system.
|
||||
for (String connectorName : connectors)
|
||||
worker.stopConnector(connectorName);
|
||||
worker.stopConnectors(connectors);
|
||||
|
||||
// TODO: We need to at least commit task offsets, but if we could commit offsets & pause them instead of
|
||||
// stopping them then state could continue to be reused when the task remains on this worker. For example,
|
||||
// this would avoid having to close a connection and then reopen it when the task is assigned back to this
|
||||
// worker again.
|
||||
if (!tasks.isEmpty()) {
|
||||
worker.stopTasks(tasks); // trigger stop() for all tasks
|
||||
worker.awaitStopTasks(tasks); // await stopping tasks
|
||||
}
|
||||
worker.stopAndAwaitTasks(tasks);
|
||||
|
||||
// Ensure that all status updates have been pushed to the storage system before rebalancing.
|
||||
// Otherwise, we may inadvertently overwrite the state with a stale value after the rebalance
|
||||
|
|
|
@ -26,7 +26,6 @@ import org.apache.kafka.connect.runtime.HerderConnectorContext;
|
|||
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
|
||||
import org.apache.kafka.connect.runtime.SourceConnectorConfig;
|
||||
import org.apache.kafka.connect.runtime.TargetState;
|
||||
import org.apache.kafka.connect.runtime.TaskConfig;
|
||||
import org.apache.kafka.connect.runtime.Worker;
|
||||
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
|
||||
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
|
||||
|
@ -81,11 +80,7 @@ public class StandaloneHerder extends AbstractHerder {
|
|||
// the tasks.
|
||||
for (String connName : configState.connectors()) {
|
||||
removeConnectorTasks(connName);
|
||||
try {
|
||||
worker.stopConnector(connName);
|
||||
} catch (ConnectException e) {
|
||||
log.error("Error shutting down connector {}: ", connName, e);
|
||||
}
|
||||
worker.stopConnector(connName);
|
||||
}
|
||||
stopServices();
|
||||
log.info("Herder stopped");
|
||||
|
@ -161,7 +156,10 @@ public class StandaloneHerder extends AbstractHerder {
|
|||
created = true;
|
||||
}
|
||||
if (config != null) {
|
||||
startConnector(config);
|
||||
if (!startConnector(config)) {
|
||||
callback.onCompletion(new ConnectException("Failed to start connector: " + connName), null);
|
||||
return;
|
||||
}
|
||||
updateConnectorTasks(connName);
|
||||
}
|
||||
if (config != null)
|
||||
|
@ -209,18 +207,14 @@ public class StandaloneHerder extends AbstractHerder {
|
|||
Map<String, String> taskConfigProps = configState.taskConfig(taskId);
|
||||
if (taskConfigProps == null)
|
||||
cb.onCompletion(new NotFoundException("Task " + taskId + " not found", null), null);
|
||||
TaskConfig taskConfig = new TaskConfig(taskConfigProps);
|
||||
ConnectorConfig connConfig = new ConnectorConfig(configState.connectorConfig(taskId.connector()));
|
||||
Map<String, String> connConfigProps = configState.connectorConfig(taskId.connector());
|
||||
|
||||
TargetState targetState = configState.targetState(taskId.connector());
|
||||
try {
|
||||
worker.stopAndAwaitTask(taskId);
|
||||
worker.startTask(taskId, taskConfig, connConfig, this, targetState);
|
||||
worker.stopAndAwaitTask(taskId);
|
||||
if (worker.startTask(taskId, connConfigProps, taskConfigProps, this, targetState))
|
||||
cb.onCompletion(null, null);
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to restart task {}", taskId, e);
|
||||
cb.onCompletion(e, null);
|
||||
}
|
||||
else
|
||||
cb.onCompletion(new ConnectException("Failed to start task: " + taskId), null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -229,28 +223,18 @@ public class StandaloneHerder extends AbstractHerder {
|
|||
cb.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null);
|
||||
|
||||
Map<String, String> config = configState.connectorConfig(connName);
|
||||
try {
|
||||
worker.stopConnector(connName);
|
||||
startConnector(config);
|
||||
worker.stopConnector(connName);
|
||||
if (startConnector(config))
|
||||
cb.onCompletion(null, null);
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to restart connector {}", connName, e);
|
||||
cb.onCompletion(e, null);
|
||||
}
|
||||
else
|
||||
cb.onCompletion(new ConnectException("Failed to start connector: " + connName), null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start a connector in the worker and record its state.
|
||||
* @param connectorProps new connector configuration
|
||||
* @return the connector name
|
||||
*/
|
||||
private String startConnector(Map<String, String> connectorProps) {
|
||||
ConnectorConfig connConfig = new ConnectorConfig(connectorProps);
|
||||
String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
|
||||
private boolean startConnector(Map<String, String> connectorProps) {
|
||||
String connName = connectorProps.get(ConnectorConfig.NAME_CONFIG);
|
||||
configBackingStore.putConnectorConfig(connName, connectorProps);
|
||||
TargetState targetState = configState.targetState(connName);
|
||||
worker.startConnector(connConfig, new HerderConnectorContext(this, connName), this, targetState);
|
||||
return connName;
|
||||
return worker.startConnector(connName, connectorProps, new HerderConnectorContext(this, connName), this, targetState);
|
||||
}
|
||||
|
||||
private List<Map<String, String>> recomputeTaskConfigs(String connName) {
|
||||
|
@ -273,27 +257,17 @@ public class StandaloneHerder extends AbstractHerder {
|
|||
|
||||
private void createConnectorTasks(String connName, TargetState initialState) {
|
||||
Map<String, String> connConfigs = configState.connectorConfig(connName);
|
||||
ConnectorConfig connConfig = new ConnectorConfig(connConfigs);
|
||||
|
||||
for (ConnectorTaskId taskId : configState.tasks(connName)) {
|
||||
Map<String, String> taskConfigMap = configState.taskConfig(taskId);
|
||||
TaskConfig taskConfig = new TaskConfig(taskConfigMap);
|
||||
try {
|
||||
worker.startTask(taskId, taskConfig, connConfig, this, initialState);
|
||||
} catch (Throwable e) {
|
||||
log.error("Failed to add task {}: ", taskId, e);
|
||||
// Swallow this so we can continue updating the rest of the tasks
|
||||
// FIXME what's the proper response? Kill all the tasks? Consider this the same as a task
|
||||
// that died after starting successfully.
|
||||
}
|
||||
worker.startTask(taskId, connConfigs, taskConfigMap, this, initialState);
|
||||
}
|
||||
}
|
||||
|
||||
private void removeConnectorTasks(String connName) {
|
||||
Collection<ConnectorTaskId> tasks = configState.tasks(connName);
|
||||
if (!tasks.isEmpty()) {
|
||||
worker.stopTasks(tasks);
|
||||
worker.awaitStopTasks(tasks);
|
||||
worker.stopAndAwaitTasks(tasks);
|
||||
configBackingStore.removeTaskConfigs(connName);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -64,11 +64,11 @@ import java.util.Set;
|
|||
* but it can avoid specific unsafe conditions. In particular, we putSafe()
|
||||
* allows writes in the following conditions:
|
||||
*
|
||||
* 3) It is (probably) safe to overwrite the state if there is no previous
|
||||
* 1) It is (probably) safe to overwrite the state if there is no previous
|
||||
* value.
|
||||
* 1) It is (probably) safe to overwrite the state if the previous value was
|
||||
* 2) It is (probably) safe to overwrite the state if the previous value was
|
||||
* set by a worker with the same workerId.
|
||||
* 2) It is (probably) safe to overwrite the previous state if the current
|
||||
* 3) It is (probably) safe to overwrite the previous state if the current
|
||||
* generation is higher than the previous .
|
||||
*
|
||||
* Basically all these conditions do is reduce the window for conflicts. They
|
||||
|
@ -201,7 +201,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
|
|||
final int sequence;
|
||||
synchronized (this) {
|
||||
this.generation = status.generation();
|
||||
if (safeWrite && !entry.canWrite(status))
|
||||
if (safeWrite && !entry.canWriteSafely(status))
|
||||
return;
|
||||
sequence = entry.increment();
|
||||
}
|
||||
|
@ -216,7 +216,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
|
|||
synchronized (KafkaStatusBackingStore.this) {
|
||||
if (entry.isDeleted()
|
||||
|| status.generation() != generation
|
||||
|| (safeWrite && !entry.canWrite(status, sequence)))
|
||||
|| (safeWrite && !entry.canWriteSafely(status, sequence)))
|
||||
return;
|
||||
}
|
||||
kafkaLog.send(key, value, this);
|
||||
|
@ -448,14 +448,14 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
|
|||
return deleted;
|
||||
}
|
||||
|
||||
public boolean canWrite(T status) {
|
||||
return value != null &&
|
||||
(value.workerId().equals(status.workerId())
|
||||
|| value.generation() <= status.generation());
|
||||
public boolean canWriteSafely(T status) {
|
||||
return value == null
|
||||
|| value.workerId().equals(status.workerId())
|
||||
|| value.generation() <= status.generation();
|
||||
}
|
||||
|
||||
public boolean canWrite(T status, int sequence) {
|
||||
return canWrite(status) && this.sequence == sequence;
|
||||
public boolean canWriteSafely(T status, int sequence) {
|
||||
return canWriteSafely(status) && this.sequence == sequence;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -56,10 +56,11 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
@RunWith(PowerMockRunner.class)
|
||||
@PrepareForTest(Worker.class)
|
||||
@PrepareForTest({Worker.class})
|
||||
@PowerMockIgnore("javax.management.*")
|
||||
public class WorkerTest extends ThreadedTest {
|
||||
|
||||
|
@ -69,6 +70,7 @@ public class WorkerTest extends ThreadedTest {
|
|||
|
||||
private WorkerConfig config;
|
||||
private Worker worker;
|
||||
private ConnectorFactory connectorFactory = PowerMock.createMock(ConnectorFactory.class);
|
||||
private OffsetBackingStore offsetBackingStore = PowerMock.createMock(OffsetBackingStore.class);
|
||||
private TaskStatus.Listener taskStatusListener = PowerMock.createStrictMock(TaskStatus.Listener.class);
|
||||
private ConnectorStatus.Listener connectorStatusListener = PowerMock.createStrictMock(ConnectorStatus.Listener.class);
|
||||
|
@ -96,8 +98,7 @@ public class WorkerTest extends ThreadedTest {
|
|||
Connector connector = PowerMock.createMock(Connector.class);
|
||||
ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
|
||||
|
||||
PowerMock.mockStaticPartial(Worker.class, "instantiateConnector");
|
||||
PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
|
||||
EasyMock.expect(connectorFactory.newConnector(WorkerTestConnector.class.getName())).andReturn(connector);
|
||||
EasyMock.expect(connector.version()).andReturn("1.0");
|
||||
|
||||
Map<String, String> props = new HashMap<>();
|
||||
|
@ -125,15 +126,14 @@ public class WorkerTest extends ThreadedTest {
|
|||
|
||||
PowerMock.replayAll();
|
||||
|
||||
worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
|
||||
worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
|
||||
worker.start();
|
||||
|
||||
ConnectorConfig config = new ConnectorConfig(props);
|
||||
assertEquals(Collections.emptySet(), worker.connectorNames());
|
||||
worker.startConnector(config, ctx, connectorStatusListener, TargetState.STARTED);
|
||||
worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener, TargetState.STARTED);
|
||||
assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
|
||||
try {
|
||||
worker.startConnector(config, ctx, connectorStatusListener, TargetState.STARTED);
|
||||
worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener, TargetState.STARTED);
|
||||
fail("Should have thrown exception when trying to add connector with same name.");
|
||||
} catch (ConnectException e) {
|
||||
// expected
|
||||
|
@ -146,6 +146,29 @@ public class WorkerTest extends ThreadedTest {
|
|||
PowerMock.verifyAll();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStartConnectorFailure() throws Exception {
|
||||
expectStartStorage();
|
||||
|
||||
worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
|
||||
worker.start();
|
||||
|
||||
Map<String, String> props = new HashMap<>();
|
||||
props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
|
||||
props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
|
||||
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
|
||||
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "java.util.HashMap"); // Bad connector class name
|
||||
|
||||
connectorStatusListener.onFailure(EasyMock.eq(CONNECTOR_ID), EasyMock.<Throwable>anyObject());
|
||||
EasyMock.expectLastCall();
|
||||
|
||||
assertFalse(worker.startConnector(CONNECTOR_ID, props, PowerMock.createMock(ConnectorContext.class), connectorStatusListener, TargetState.STARTED));
|
||||
|
||||
assertEquals(Collections.emptySet(), worker.connectorNames());
|
||||
|
||||
assertFalse(worker.stopConnector(CONNECTOR_ID));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddConnectorByAlias() throws Exception {
|
||||
expectStartStorage();
|
||||
|
@ -154,8 +177,7 @@ public class WorkerTest extends ThreadedTest {
|
|||
Connector connector = PowerMock.createMock(Connector.class);
|
||||
ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
|
||||
|
||||
PowerMock.mockStaticPartial(Worker.class, "instantiateConnector");
|
||||
PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
|
||||
EasyMock.expect(connectorFactory.newConnector("WorkerTestConnector")).andReturn(connector);
|
||||
EasyMock.expect(connector.version()).andReturn("1.0");
|
||||
|
||||
Map<String, String> props = new HashMap<>();
|
||||
|
@ -183,12 +205,11 @@ public class WorkerTest extends ThreadedTest {
|
|||
|
||||
PowerMock.replayAll();
|
||||
|
||||
worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
|
||||
worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
|
||||
worker.start();
|
||||
|
||||
ConnectorConfig config = new ConnectorConfig(props);
|
||||
assertEquals(Collections.emptySet(), worker.connectorNames());
|
||||
worker.startConnector(config, ctx, connectorStatusListener, TargetState.STARTED);
|
||||
worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener, TargetState.STARTED);
|
||||
assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
|
||||
|
||||
worker.stopConnector(CONNECTOR_ID);
|
||||
|
@ -207,8 +228,7 @@ public class WorkerTest extends ThreadedTest {
|
|||
Connector connector = PowerMock.createMock(Connector.class);
|
||||
ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
|
||||
|
||||
PowerMock.mockStaticPartial(Worker.class, "instantiateConnector");
|
||||
PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
|
||||
EasyMock.expect(connectorFactory.newConnector("WorkerTest")).andReturn(connector);
|
||||
EasyMock.expect(connector.version()).andReturn("1.0");
|
||||
|
||||
Map<String, String> props = new HashMap<>();
|
||||
|
@ -236,12 +256,11 @@ public class WorkerTest extends ThreadedTest {
|
|||
|
||||
PowerMock.replayAll();
|
||||
|
||||
worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
|
||||
worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
|
||||
worker.start();
|
||||
|
||||
ConnectorConfig config = new ConnectorConfig(props);
|
||||
assertEquals(Collections.emptySet(), worker.connectorNames());
|
||||
worker.startConnector(config, ctx, connectorStatusListener, TargetState.STARTED);
|
||||
worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener, TargetState.STARTED);
|
||||
assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
|
||||
|
||||
worker.stopConnector(CONNECTOR_ID);
|
||||
|
@ -252,14 +271,13 @@ public class WorkerTest extends ThreadedTest {
|
|||
PowerMock.verifyAll();
|
||||
}
|
||||
|
||||
|
||||
@Test(expected = ConnectException.class)
|
||||
@Test
|
||||
public void testStopInvalidConnector() {
|
||||
expectStartStorage();
|
||||
|
||||
PowerMock.replayAll();
|
||||
|
||||
worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
|
||||
worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
|
||||
worker.start();
|
||||
|
||||
worker.stopConnector(CONNECTOR_ID);
|
||||
|
@ -273,8 +291,7 @@ public class WorkerTest extends ThreadedTest {
|
|||
Connector connector = PowerMock.createMock(Connector.class);
|
||||
ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
|
||||
|
||||
PowerMock.mockStaticPartial(Worker.class, "instantiateConnector");
|
||||
PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
|
||||
EasyMock.expect(connectorFactory.newConnector(WorkerTestConnector.class.getName())).andReturn(connector);
|
||||
EasyMock.expect(connector.version()).andReturn("1.0");
|
||||
|
||||
Map<String, String> props = new HashMap<>();
|
||||
|
@ -308,15 +325,14 @@ public class WorkerTest extends ThreadedTest {
|
|||
|
||||
PowerMock.replayAll();
|
||||
|
||||
worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
|
||||
worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
|
||||
worker.start();
|
||||
|
||||
ConnectorConfig config = new ConnectorConfig(props);
|
||||
assertEquals(Collections.emptySet(), worker.connectorNames());
|
||||
worker.startConnector(config, ctx, connectorStatusListener, TargetState.STARTED);
|
||||
worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener, TargetState.STARTED);
|
||||
assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
|
||||
try {
|
||||
worker.startConnector(config, ctx, connectorStatusListener, TargetState.STARTED);
|
||||
worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener, TargetState.STARTED);
|
||||
fail("Should have thrown exception when trying to add connector with same name.");
|
||||
} catch (ConnectException e) {
|
||||
// expected
|
||||
|
@ -347,8 +363,7 @@ public class WorkerTest extends ThreadedTest {
|
|||
WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
|
||||
EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
|
||||
|
||||
PowerMock.mockStaticPartial(Worker.class, "instantiateTask");
|
||||
PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task);
|
||||
EasyMock.expect(connectorFactory.newTask(TestSourceTask.class)).andReturn(task);
|
||||
EasyMock.expect(task.version()).andReturn("1.0");
|
||||
|
||||
PowerMock.expectNew(
|
||||
|
@ -381,10 +396,10 @@ public class WorkerTest extends ThreadedTest {
|
|||
|
||||
PowerMock.replayAll();
|
||||
|
||||
worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
|
||||
worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
|
||||
worker.start();
|
||||
assertEquals(Collections.emptySet(), worker.taskIds());
|
||||
worker.startTask(TASK_ID, new TaskConfig(origProps), anyConnectorConfig(), taskStatusListener, TargetState.STARTED);
|
||||
worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
|
||||
assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds());
|
||||
worker.stopAndAwaitTask(TASK_ID);
|
||||
assertEquals(Collections.emptySet(), worker.taskIds());
|
||||
|
@ -394,16 +409,36 @@ public class WorkerTest extends ThreadedTest {
|
|||
PowerMock.verifyAll();
|
||||
}
|
||||
|
||||
@Test(expected = ConnectException.class)
|
||||
@Test
|
||||
public void testStartTaskFailure() throws Exception {
|
||||
expectStartStorage();
|
||||
|
||||
worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
|
||||
worker.start();
|
||||
|
||||
Map<String, String> origProps = new HashMap<>();
|
||||
origProps.put(TaskConfig.TASK_CLASS_CONFIG, "missing.From.This.Workers.Classpath");
|
||||
|
||||
assertFalse(worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED));
|
||||
|
||||
taskStatusListener.onFailure(EasyMock.eq(TASK_ID), EasyMock.<Throwable>anyObject());
|
||||
EasyMock.expectLastCall();
|
||||
|
||||
assertEquals(Collections.emptySet(), worker.taskIds());
|
||||
|
||||
assertFalse(worker.stopAndAwaitTask(TASK_ID));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStopInvalidTask() {
|
||||
expectStartStorage();
|
||||
|
||||
PowerMock.replayAll();
|
||||
|
||||
worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
|
||||
worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
|
||||
worker.start();
|
||||
|
||||
worker.stopAndAwaitTask(TASK_ID);
|
||||
assertFalse(worker.stopAndAwaitTask(TASK_ID));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -415,8 +450,7 @@ public class WorkerTest extends ThreadedTest {
|
|||
WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
|
||||
EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
|
||||
|
||||
PowerMock.mockStaticPartial(Worker.class, "instantiateTask");
|
||||
PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task);
|
||||
EasyMock.expect(connectorFactory.newTask(TestSourceTask.class)).andReturn(task);
|
||||
EasyMock.expect(task.version()).andReturn("1.0");
|
||||
|
||||
PowerMock.expectNew(
|
||||
|
@ -451,9 +485,9 @@ public class WorkerTest extends ThreadedTest {
|
|||
|
||||
PowerMock.replayAll();
|
||||
|
||||
worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
|
||||
worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
|
||||
worker.start();
|
||||
worker.startTask(TASK_ID, new TaskConfig(origProps), anyConnectorConfig(), taskStatusListener, TargetState.STARTED);
|
||||
worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
|
||||
worker.stop();
|
||||
|
||||
PowerMock.verifyAll();
|
||||
|
@ -467,8 +501,7 @@ public class WorkerTest extends ThreadedTest {
|
|||
WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
|
||||
EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
|
||||
|
||||
PowerMock.mockStaticPartial(Worker.class, "instantiateTask");
|
||||
PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task);
|
||||
EasyMock.expect(connectorFactory.newTask(TestSourceTask.class)).andReturn(task);
|
||||
EasyMock.expect(task.version()).andReturn("1.0");
|
||||
|
||||
Capture<TestConverter> keyConverter = EasyMock.newCapture();
|
||||
|
@ -504,7 +537,7 @@ public class WorkerTest extends ThreadedTest {
|
|||
|
||||
PowerMock.replayAll();
|
||||
|
||||
worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
|
||||
worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
|
||||
worker.start();
|
||||
assertEquals(Collections.emptySet(), worker.taskIds());
|
||||
Map<String, String> connProps = anyConnectorConfigMap();
|
||||
|
@ -512,7 +545,7 @@ public class WorkerTest extends ThreadedTest {
|
|||
connProps.put("key.converter.extra.config", "foo");
|
||||
connProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
|
||||
connProps.put("value.converter.extra.config", "bar");
|
||||
worker.startTask(TASK_ID, new TaskConfig(origProps), new ConnectorConfig(connProps), taskStatusListener, TargetState.STARTED);
|
||||
worker.startTask(TASK_ID, connProps, origProps, taskStatusListener, TargetState.STARTED);
|
||||
assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds());
|
||||
worker.stopAndAwaitTask(TASK_ID);
|
||||
assertEquals(Collections.emptySet(), worker.taskIds());
|
||||
|
@ -546,10 +579,6 @@ public class WorkerTest extends ThreadedTest {
|
|||
return props;
|
||||
}
|
||||
|
||||
private ConnectorConfig anyConnectorConfig() {
|
||||
return new ConnectorConfig(anyConnectorConfigMap());
|
||||
}
|
||||
|
||||
/* Name here needs to be unique as we are testing the aliasing mechanism */
|
||||
public static class WorkerTestConnector extends Connector {
|
||||
|
||||
|
|
|
@ -39,7 +39,6 @@ import org.apache.kafka.connect.storage.StatusBackingStore;
|
|||
import org.apache.kafka.connect.util.Callback;
|
||||
import org.apache.kafka.connect.util.ConnectorTaskId;
|
||||
import org.apache.kafka.connect.util.FutureCallback;
|
||||
import org.easymock.Capture;
|
||||
import org.easymock.EasyMock;
|
||||
import org.easymock.IAnswer;
|
||||
import org.junit.Before;
|
||||
|
@ -169,14 +168,15 @@ public class DistributedHerderTest {
|
|||
EasyMock.expect(member.memberId()).andStubReturn("member");
|
||||
expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
|
||||
expectPostRebalanceCatchup(SNAPSHOT);
|
||||
worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall();
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
|
||||
|
||||
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
|
||||
worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall();
|
||||
worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
|
||||
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
member.poll(EasyMock.anyInt());
|
||||
PowerMock.expectLastCall();
|
||||
|
||||
|
@ -193,13 +193,14 @@ public class DistributedHerderTest {
|
|||
EasyMock.expect(member.memberId()).andStubReturn("member");
|
||||
expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
|
||||
expectPostRebalanceCatchup(SNAPSHOT);
|
||||
worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall();
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
|
||||
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
|
||||
worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall();
|
||||
worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
|
||||
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
member.poll(EasyMock.anyInt());
|
||||
PowerMock.expectLastCall();
|
||||
|
||||
|
@ -207,9 +208,9 @@ public class DistributedHerderTest {
|
|||
1, Arrays.asList(CONN1), Arrays.<ConnectorTaskId>asList());
|
||||
|
||||
// and the new assignment started
|
||||
worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall();
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
|
||||
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
|
||||
member.poll(EasyMock.anyInt());
|
||||
|
@ -229,13 +230,14 @@ public class DistributedHerderTest {
|
|||
EasyMock.expect(member.memberId()).andStubReturn("member");
|
||||
expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
|
||||
expectPostRebalanceCatchup(SNAPSHOT);
|
||||
worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall();
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
|
||||
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
|
||||
worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall();
|
||||
worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
|
||||
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
member.poll(EasyMock.anyInt());
|
||||
PowerMock.expectLastCall();
|
||||
|
||||
|
@ -243,9 +245,9 @@ public class DistributedHerderTest {
|
|||
1, Arrays.asList(CONN1), Arrays.<ConnectorTaskId>asList());
|
||||
|
||||
// and the new assignment started
|
||||
worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall();
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
EasyMock.expect(worker.isRunning(CONN1)).andReturn(false);
|
||||
|
||||
// worker is not running, so we should see no call to connectorTaskConfigs()
|
||||
|
@ -263,13 +265,9 @@ public class DistributedHerderTest {
|
|||
|
||||
@Test
|
||||
public void testHaltCleansUpWorker() {
|
||||
EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONN1));
|
||||
worker.stopConnector(CONN1);
|
||||
worker.stopConnectors();
|
||||
PowerMock.expectLastCall();
|
||||
EasyMock.expect(worker.taskIds()).andReturn(Collections.singleton(TASK1));
|
||||
worker.stopTasks(Collections.singleton(TASK1));
|
||||
PowerMock.expectLastCall();
|
||||
worker.awaitStopTasks(Collections.singleton(TASK1));
|
||||
worker.stopAndAwaitTasks();
|
||||
PowerMock.expectLastCall();
|
||||
member.stop();
|
||||
PowerMock.expectLastCall();
|
||||
|
@ -342,9 +340,9 @@ public class DistributedHerderTest {
|
|||
// Start with one connector
|
||||
expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
|
||||
expectPostRebalanceCatchup(SNAPSHOT);
|
||||
worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall();
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
|
||||
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
|
||||
|
||||
|
@ -377,9 +375,9 @@ public class DistributedHerderTest {
|
|||
expectPostRebalanceCatchup(SNAPSHOT);
|
||||
member.poll(EasyMock.anyInt());
|
||||
PowerMock.expectLastCall();
|
||||
worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall();
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
|
||||
|
||||
// now handle the connector restart
|
||||
|
@ -390,13 +388,11 @@ public class DistributedHerderTest {
|
|||
member.poll(EasyMock.anyInt());
|
||||
PowerMock.expectLastCall();
|
||||
|
||||
EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(true);
|
||||
|
||||
worker.stopConnector(CONN1);
|
||||
PowerMock.expectLastCall();
|
||||
worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall();
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
|
||||
|
||||
PowerMock.replayAll();
|
||||
|
@ -461,8 +457,6 @@ public class DistributedHerderTest {
|
|||
member.poll(EasyMock.anyInt());
|
||||
PowerMock.expectLastCall();
|
||||
|
||||
EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(false);
|
||||
|
||||
PowerMock.replayAll();
|
||||
|
||||
herder.tick();
|
||||
|
@ -498,7 +492,6 @@ public class DistributedHerderTest {
|
|||
PowerMock.expectLastCall();
|
||||
|
||||
String ownerUrl = "ownerUrl";
|
||||
EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(false);
|
||||
EasyMock.expect(member.ownerUrl(CONN1)).andReturn(ownerUrl);
|
||||
|
||||
PowerMock.replayAll();
|
||||
|
@ -530,8 +523,9 @@ public class DistributedHerderTest {
|
|||
expectPostRebalanceCatchup(SNAPSHOT);
|
||||
member.poll(EasyMock.anyInt());
|
||||
PowerMock.expectLastCall();
|
||||
worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall();
|
||||
worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
|
||||
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
|
||||
// now handle the task restart
|
||||
member.wakeup();
|
||||
|
@ -541,12 +535,11 @@ public class DistributedHerderTest {
|
|||
member.poll(EasyMock.anyInt());
|
||||
PowerMock.expectLastCall();
|
||||
|
||||
EasyMock.expect(worker.ownsTask(TASK0)).andReturn(true);
|
||||
|
||||
worker.stopAndAwaitTask(TASK0);
|
||||
PowerMock.expectLastCall();
|
||||
worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall();
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
|
||||
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
|
||||
PowerMock.replayAll();
|
||||
|
||||
|
@ -602,7 +595,6 @@ public class DistributedHerderTest {
|
|||
PowerMock.expectLastCall();
|
||||
|
||||
// now handle the task restart
|
||||
EasyMock.expect(worker.ownsTask(TASK0)).andReturn(false);
|
||||
member.wakeup();
|
||||
PowerMock.expectLastCall();
|
||||
member.ensureActive();
|
||||
|
@ -638,7 +630,6 @@ public class DistributedHerderTest {
|
|||
|
||||
// now handle the task restart
|
||||
String ownerUrl = "ownerUrl";
|
||||
EasyMock.expect(worker.ownsTask(TASK0)).andReturn(false);
|
||||
EasyMock.expect(member.ownerUrl(TASK0)).andReturn(ownerUrl);
|
||||
member.wakeup();
|
||||
PowerMock.expectLastCall();
|
||||
|
@ -687,10 +678,10 @@ public class DistributedHerderTest {
|
|||
// Performs rebalance and gets new assignment
|
||||
expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
|
||||
ConnectProtocol.Assignment.NO_ERROR, 1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
|
||||
worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
|
||||
PowerMock.expectLastCall();
|
||||
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
|
||||
member.poll(EasyMock.anyInt());
|
||||
PowerMock.expectLastCall();
|
||||
|
@ -715,9 +706,9 @@ public class DistributedHerderTest {
|
|||
// join
|
||||
expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
|
||||
expectPostRebalanceCatchup(SNAPSHOT);
|
||||
worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall();
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
|
||||
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
|
||||
member.poll(EasyMock.anyInt());
|
||||
|
@ -729,10 +720,10 @@ public class DistributedHerderTest {
|
|||
PowerMock.expectLastCall();
|
||||
EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); // for this test, it doesn't matter if we use the same config snapshot
|
||||
worker.stopConnector(CONN1);
|
||||
PowerMock.expectLastCall();
|
||||
worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall();
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
|
||||
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
|
||||
member.poll(EasyMock.anyInt());
|
||||
|
@ -757,9 +748,9 @@ public class DistributedHerderTest {
|
|||
// join
|
||||
expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
|
||||
expectPostRebalanceCatchup(SNAPSHOT);
|
||||
worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall();
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
|
||||
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
|
||||
member.poll(EasyMock.anyInt());
|
||||
|
@ -773,8 +764,6 @@ public class DistributedHerderTest {
|
|||
EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1);
|
||||
PowerMock.expectLastCall();
|
||||
|
||||
EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(true);
|
||||
|
||||
worker.setTargetState(CONN1, TargetState.PAUSED);
|
||||
PowerMock.expectLastCall();
|
||||
|
||||
|
@ -798,9 +787,9 @@ public class DistributedHerderTest {
|
|||
// start with the connector paused
|
||||
expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
|
||||
expectPostRebalanceCatchup(SNAPSHOT_PAUSED_CONN1);
|
||||
worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED));
|
||||
PowerMock.expectLastCall();
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
|
||||
member.poll(EasyMock.anyInt());
|
||||
PowerMock.expectLastCall();
|
||||
|
@ -814,7 +803,6 @@ public class DistributedHerderTest {
|
|||
PowerMock.expectLastCall();
|
||||
|
||||
// we expect reconfiguration after resuming
|
||||
EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(true);
|
||||
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
|
||||
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
|
||||
|
||||
|
@ -841,8 +829,9 @@ public class DistributedHerderTest {
|
|||
// join
|
||||
expectRebalance(1, Collections.<String>emptyList(), Collections.singletonList(TASK0));
|
||||
expectPostRebalanceCatchup(SNAPSHOT);
|
||||
worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall();
|
||||
worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
|
||||
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
member.poll(EasyMock.anyInt());
|
||||
PowerMock.expectLastCall();
|
||||
|
||||
|
@ -877,8 +866,9 @@ public class DistributedHerderTest {
|
|||
// join
|
||||
expectRebalance(1, Collections.<String>emptyList(), Collections.singletonList(TASK0));
|
||||
expectPostRebalanceCatchup(SNAPSHOT);
|
||||
worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall();
|
||||
worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
|
||||
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
member.poll(EasyMock.anyInt());
|
||||
PowerMock.expectLastCall();
|
||||
|
||||
|
@ -890,8 +880,6 @@ public class DistributedHerderTest {
|
|||
EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1);
|
||||
PowerMock.expectLastCall();
|
||||
|
||||
EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(false);
|
||||
|
||||
worker.setTargetState(CONN1, TargetState.PAUSED);
|
||||
PowerMock.expectLastCall();
|
||||
|
||||
|
@ -918,8 +906,9 @@ public class DistributedHerderTest {
|
|||
// join
|
||||
expectRebalance(1, Collections.<String>emptyList(), Collections.singletonList(TASK0));
|
||||
expectPostRebalanceCatchup(SNAPSHOT_PAUSED_CONN1);
|
||||
worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED));
|
||||
PowerMock.expectLastCall();
|
||||
worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
|
||||
EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED));
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
member.poll(EasyMock.anyInt());
|
||||
PowerMock.expectLastCall();
|
||||
|
||||
|
@ -931,11 +920,11 @@ public class DistributedHerderTest {
|
|||
EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);
|
||||
PowerMock.expectLastCall();
|
||||
|
||||
EasyMock.expect(worker.ownsConnector(CONN1)).andReturn(false);
|
||||
|
||||
worker.setTargetState(CONN1, TargetState.STARTED);
|
||||
PowerMock.expectLastCall();
|
||||
|
||||
EasyMock.expect(worker.isRunning(CONN1)).andReturn(false);
|
||||
|
||||
member.poll(EasyMock.anyInt());
|
||||
PowerMock.expectLastCall();
|
||||
|
||||
|
@ -970,8 +959,9 @@ public class DistributedHerderTest {
|
|||
expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
|
||||
ConnectProtocol.Assignment.NO_ERROR, 1, Collections.<String>emptyList(),
|
||||
Arrays.asList(TASK0));
|
||||
worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall();
|
||||
worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
|
||||
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
member.poll(EasyMock.anyInt());
|
||||
PowerMock.expectLastCall();
|
||||
|
||||
|
@ -1004,12 +994,13 @@ public class DistributedHerderTest {
|
|||
expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
|
||||
expectPostRebalanceCatchup(SNAPSHOT);
|
||||
|
||||
worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall();
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
|
||||
worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall();
|
||||
worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
|
||||
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
|
||||
member.poll(EasyMock.anyInt());
|
||||
PowerMock.expectLastCall();
|
||||
|
@ -1070,9 +1061,9 @@ public class DistributedHerderTest {
|
|||
EasyMock.expect(member.memberId()).andStubReturn("leader");
|
||||
expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
|
||||
expectPostRebalanceCatchup(SNAPSHOT);
|
||||
worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall();
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
|
||||
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
|
||||
|
||||
|
@ -1098,11 +1089,10 @@ public class DistributedHerderTest {
|
|||
// connector without rebalance
|
||||
EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG);
|
||||
worker.stopConnector(CONN1);
|
||||
PowerMock.expectLastCall();
|
||||
Capture<ConnectorConfig> capturedUpdatedConfig = EasyMock.newCapture();
|
||||
worker.startConnector(EasyMock.capture(capturedUpdatedConfig), EasyMock.<ConnectorContext>anyObject(),
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
|
||||
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
PowerMock.expectLastCall();
|
||||
PowerMock.expectLastCall().andReturn(true);
|
||||
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
|
||||
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
|
||||
|
||||
|
@ -1147,7 +1137,6 @@ public class DistributedHerderTest {
|
|||
// This requires inter-worker communication, so needs the REST API
|
||||
}
|
||||
|
||||
|
||||
private void expectRebalance(final long offset,
|
||||
final List<String> assignedConnectors,
|
||||
final List<ConnectorTaskId> assignedTasks) {
|
||||
|
@ -1175,17 +1164,13 @@ public class DistributedHerderTest {
|
|||
});
|
||||
|
||||
if (revokedConnectors != null) {
|
||||
for (String connector : revokedConnectors) {
|
||||
worker.stopConnector(connector);
|
||||
PowerMock.expectLastCall();
|
||||
}
|
||||
worker.stopConnectors(revokedConnectors);
|
||||
PowerMock.expectLastCall().andReturn(revokedConnectors);
|
||||
}
|
||||
|
||||
if (revokedTasks != null && !revokedTasks.isEmpty()) {
|
||||
worker.stopTasks(revokedTasks);
|
||||
PowerMock.expectLastCall();
|
||||
worker.awaitStopTasks(revokedTasks);
|
||||
PowerMock.expectLastCall();
|
||||
if (revokedTasks != null) {
|
||||
worker.stopAndAwaitTasks(revokedTasks);
|
||||
PowerMock.expectLastCall().andReturn(revokedTasks);
|
||||
}
|
||||
|
||||
if (revokedConnectors != null) {
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.apache.kafka.connect.connector.Connector;
|
|||
import org.apache.kafka.connect.connector.ConnectorContext;
|
||||
import org.apache.kafka.connect.connector.Task;
|
||||
import org.apache.kafka.connect.errors.AlreadyExistsException;
|
||||
import org.apache.kafka.connect.errors.ConnectException;
|
||||
import org.apache.kafka.connect.errors.NotFoundException;
|
||||
import org.apache.kafka.connect.runtime.AbstractStatus;
|
||||
import org.apache.kafka.connect.runtime.ConnectorConfig;
|
||||
|
@ -166,11 +167,11 @@ public class StandaloneHerderTest {
|
|||
expectAdd(SourceSink.SOURCE);
|
||||
|
||||
worker.stopConnector(CONNECTOR_NAME);
|
||||
EasyMock.expectLastCall();
|
||||
EasyMock.expectLastCall().andReturn(true);
|
||||
|
||||
worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(SourceSink.SOURCE))),
|
||||
worker.startConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(connectorConfig(SourceSink.SOURCE)),
|
||||
EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
EasyMock.expectLastCall();
|
||||
EasyMock.expectLastCall().andReturn(true);
|
||||
|
||||
PowerMock.replayAll();
|
||||
|
||||
|
@ -183,43 +184,16 @@ public class StandaloneHerderTest {
|
|||
PowerMock.verifyAll();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRestartConnectorFailureOnStop() throws Exception {
|
||||
expectAdd(SourceSink.SOURCE);
|
||||
|
||||
RuntimeException e = new RuntimeException();
|
||||
worker.stopConnector(CONNECTOR_NAME);
|
||||
EasyMock.expectLastCall().andThrow(e);
|
||||
|
||||
// the connector will not be started after the failure in start
|
||||
|
||||
PowerMock.replayAll();
|
||||
|
||||
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
|
||||
|
||||
FutureCallback<Void> cb = new FutureCallback<>();
|
||||
herder.restartConnector(CONNECTOR_NAME, cb);
|
||||
try {
|
||||
cb.get(1000L, TimeUnit.MILLISECONDS);
|
||||
fail();
|
||||
} catch (ExecutionException exception) {
|
||||
assertEquals(e, exception.getCause());
|
||||
}
|
||||
|
||||
PowerMock.verifyAll();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRestartConnectorFailureOnStart() throws Exception {
|
||||
expectAdd(SourceSink.SOURCE);
|
||||
|
||||
worker.stopConnector(CONNECTOR_NAME);
|
||||
EasyMock.expectLastCall();
|
||||
EasyMock.expectLastCall().andReturn(true);
|
||||
|
||||
RuntimeException e = new RuntimeException();
|
||||
worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(SourceSink.SOURCE))),
|
||||
worker.startConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(connectorConfig(SourceSink.SOURCE)),
|
||||
EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
EasyMock.expectLastCall().andThrow(e);
|
||||
EasyMock.expectLastCall().andReturn(false);
|
||||
|
||||
PowerMock.replayAll();
|
||||
|
||||
|
@ -231,7 +205,7 @@ public class StandaloneHerderTest {
|
|||
cb.get(1000L, TimeUnit.MILLISECONDS);
|
||||
fail();
|
||||
} catch (ExecutionException exception) {
|
||||
assertEquals(e, exception.getCause());
|
||||
assertEquals(ConnectException.class, exception.getCause().getClass());
|
||||
}
|
||||
|
||||
PowerMock.verifyAll();
|
||||
|
@ -243,12 +217,10 @@ public class StandaloneHerderTest {
|
|||
expectAdd(SourceSink.SOURCE);
|
||||
|
||||
worker.stopAndAwaitTask(taskId);
|
||||
EasyMock.expectLastCall();
|
||||
EasyMock.expectLastCall().andReturn(true);
|
||||
|
||||
ConnectorConfig connConfig = new ConnectorConfig(connectorConfig(SourceSink.SOURCE));
|
||||
TaskConfig taskConfig = new TaskConfig(taskConfig(SourceSink.SOURCE));
|
||||
worker.startTask(taskId, taskConfig, connConfig, herder, TargetState.STARTED);
|
||||
EasyMock.expectLastCall();
|
||||
worker.startTask(taskId, connectorConfig(SourceSink.SOURCE), taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED);
|
||||
EasyMock.expectLastCall().andReturn(true);
|
||||
|
||||
PowerMock.replayAll();
|
||||
|
||||
|
@ -261,45 +233,16 @@ public class StandaloneHerderTest {
|
|||
PowerMock.verifyAll();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRestartTaskFailureOnStop() throws Exception {
|
||||
ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
|
||||
expectAdd(SourceSink.SOURCE);
|
||||
|
||||
RuntimeException e = new RuntimeException();
|
||||
worker.stopAndAwaitTask(taskId);
|
||||
EasyMock.expectLastCall().andThrow(e);
|
||||
|
||||
// task will not be started after the failure in stop
|
||||
|
||||
PowerMock.replayAll();
|
||||
|
||||
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
|
||||
|
||||
FutureCallback<Void> cb = new FutureCallback<>();
|
||||
herder.restartTask(taskId, cb);
|
||||
try {
|
||||
cb.get(1000L, TimeUnit.MILLISECONDS);
|
||||
fail("Expected restart callback to raise an exception");
|
||||
} catch (ExecutionException exception) {
|
||||
assertEquals(e, exception.getCause());
|
||||
}
|
||||
PowerMock.verifyAll();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRestartTaskFailureOnStart() throws Exception {
|
||||
ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
|
||||
expectAdd(SourceSink.SOURCE);
|
||||
|
||||
worker.stopAndAwaitTask(taskId);
|
||||
EasyMock.expectLastCall();
|
||||
EasyMock.expectLastCall().andReturn(true);
|
||||
|
||||
RuntimeException e = new RuntimeException();
|
||||
ConnectorConfig connConfig = new ConnectorConfig(connectorConfig(SourceSink.SOURCE));
|
||||
TaskConfig taskConfig = new TaskConfig(taskConfig(SourceSink.SOURCE));
|
||||
worker.startTask(taskId, taskConfig, connConfig, herder, TargetState.STARTED);
|
||||
EasyMock.expectLastCall().andThrow(e);
|
||||
worker.startTask(taskId, connectorConfig(SourceSink.SOURCE), taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED);
|
||||
EasyMock.expectLastCall().andReturn(false);
|
||||
|
||||
PowerMock.replayAll();
|
||||
|
||||
|
@ -311,7 +254,7 @@ public class StandaloneHerderTest {
|
|||
cb.get(1000L, TimeUnit.MILLISECONDS);
|
||||
fail("Expected restart callback to raise an exception");
|
||||
} catch (ExecutionException exception) {
|
||||
assertEquals(e, exception.getCause());
|
||||
assertEquals(ConnectException.class, exception.getCause().getClass());
|
||||
}
|
||||
|
||||
PowerMock.verifyAll();
|
||||
|
@ -409,11 +352,11 @@ public class StandaloneHerderTest {
|
|||
EasyMock.expectLastCall();
|
||||
// Update config, which requires stopping and restarting
|
||||
worker.stopConnector(CONNECTOR_NAME);
|
||||
EasyMock.expectLastCall();
|
||||
Capture<ConnectorConfig> capturedConfig = EasyMock.newCapture();
|
||||
worker.startConnector(EasyMock.capture(capturedConfig), EasyMock.<ConnectorContext>anyObject(),
|
||||
EasyMock.expectLastCall().andReturn(true);
|
||||
Capture<Map<String, String>> capturedConfig = EasyMock.newCapture();
|
||||
worker.startConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.capture(capturedConfig), EasyMock.<ConnectorContext>anyObject(),
|
||||
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
EasyMock.expectLastCall();
|
||||
EasyMock.expectLastCall().andReturn(true);
|
||||
EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true);
|
||||
// Generate same task config, which should result in no additional action to restart tasks
|
||||
EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, null))
|
||||
|
@ -432,7 +375,7 @@ public class StandaloneHerderTest {
|
|||
herder.putConnectorConfig(CONNECTOR_NAME, connConfig, false, createCallback);
|
||||
herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb);
|
||||
herder.putConnectorConfig(CONNECTOR_NAME, newConnConfig, true, putConnectorConfigCb);
|
||||
assertEquals("bar", capturedConfig.getValue().originals().get("foo"));
|
||||
assertEquals("bar", capturedConfig.getValue().get("foo"));
|
||||
herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb);
|
||||
|
||||
PowerMock.verifyAll();
|
||||
|
@ -456,26 +399,24 @@ public class StandaloneHerderTest {
|
|||
|
||||
Map<String, String> connectorProps = connectorConfig(sourceSink);
|
||||
|
||||
worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorProps)), EasyMock.anyObject(HerderConnectorContext.class),
|
||||
worker.startConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(connectorProps), EasyMock.anyObject(HerderConnectorContext.class),
|
||||
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
|
||||
EasyMock.expectLastCall();
|
||||
EasyMock.expectLastCall().andReturn(true);
|
||||
EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true);
|
||||
|
||||
ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connectorProps, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)));
|
||||
createCallback.onCompletion(null, new Herder.Created<>(true, connInfo));
|
||||
EasyMock.expectLastCall();
|
||||
|
||||
// And we should instantiate the tasks. For a sink task, we should see added properties for
|
||||
// the input topic partitions
|
||||
ConnectorConfig connConfig = new ConnectorConfig(connectorConfig(sourceSink));
|
||||
// And we should instantiate the tasks. For a sink task, we should see added properties for the input topic partitions
|
||||
|
||||
Map<String, String> generatedTaskProps = taskConfig(sourceSink);
|
||||
TaskConfig taskConfig = new TaskConfig(generatedTaskProps);
|
||||
|
||||
EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, sourceSink == SourceSink.SINK ? TOPICS_LIST : null))
|
||||
.andReturn(Collections.singletonList(generatedTaskProps));
|
||||
|
||||
worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig, connConfig, herder, TargetState.STARTED);
|
||||
EasyMock.expectLastCall();
|
||||
worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), connectorConfig(sourceSink), generatedTaskProps, herder, TargetState.STARTED);
|
||||
EasyMock.expectLastCall().andReturn(true);
|
||||
|
||||
worker.isSinkConnector(CONNECTOR_NAME);
|
||||
PowerMock.expectLastCall().andReturn(sourceSink == SourceSink.SINK);
|
||||
|
@ -483,12 +424,10 @@ public class StandaloneHerderTest {
|
|||
|
||||
private void expectStop() {
|
||||
ConnectorTaskId task = new ConnectorTaskId(CONNECTOR_NAME, 0);
|
||||
worker.stopTasks(Collections.singletonList(task));
|
||||
EasyMock.expectLastCall();
|
||||
worker.awaitStopTasks(Collections.singletonList(task));
|
||||
EasyMock.expectLastCall();
|
||||
worker.stopAndAwaitTasks(Collections.singletonList(task));
|
||||
EasyMock.expectLastCall().andReturn(Collections.singleton(task));
|
||||
worker.stopConnector(CONNECTOR_NAME);
|
||||
EasyMock.expectLastCall();
|
||||
EasyMock.expectLastCall().andReturn(true);
|
||||
}
|
||||
|
||||
private void expectDestroy() {
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.kafka.connect.runtime.TaskStatus;
|
|||
import org.apache.kafka.connect.util.ConnectorTaskId;
|
||||
import org.apache.kafka.connect.util.KafkaBasedLog;
|
||||
import org.easymock.Capture;
|
||||
import org.easymock.EasyMock;
|
||||
import org.easymock.EasyMockSupport;
|
||||
import org.easymock.IAnswer;
|
||||
import org.junit.Test;
|
||||
|
@ -185,6 +186,33 @@ public class KafkaStatusBackingStoreTest extends EasyMockSupport {
|
|||
verifyAll();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void putSafeWithNoPreviousValueIsPropagated() {
|
||||
final Converter converter = mock(Converter.class);
|
||||
final KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
|
||||
final KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
|
||||
|
||||
final byte[] value = new byte[0];
|
||||
|
||||
final Capture<Struct> statusValueStruct = newCapture();
|
||||
converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), capture(statusValueStruct));
|
||||
EasyMock.expectLastCall().andReturn(value);
|
||||
|
||||
kafkaBasedLog.send(eq("status-connector-" + CONNECTOR), eq(value), anyObject(Callback.class));
|
||||
expectLastCall();
|
||||
|
||||
replayAll();
|
||||
|
||||
final ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.FAILED, WORKER_ID, 0);
|
||||
store.putSafe(status);
|
||||
|
||||
verifyAll();
|
||||
|
||||
assertEquals(status.state().toString(), statusValueStruct.getValue().get(KafkaStatusBackingStore.STATE_KEY_NAME));
|
||||
assertEquals(status.workerId(), statusValueStruct.getValue().get(KafkaStatusBackingStore.WORKER_ID_KEY_NAME));
|
||||
assertEquals(status.generation(), statusValueStruct.getValue().get(KafkaStatusBackingStore.GENERATION_KEY_NAME));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void putSafeOverridesValueSetBySameWorker() {
|
||||
final byte[] value = new byte[0];
|
||||
|
|
|
@ -379,13 +379,12 @@ class MockSink(object):
|
|||
|
||||
class MockSource(object):
|
||||
|
||||
def __init__(self, cc, topics, mode=None, delay_sec=10, name="mock-source"):
|
||||
def __init__(self, cc, mode=None, delay_sec=10, name="mock-source"):
|
||||
self.cc = cc
|
||||
self.logger = self.cc.logger
|
||||
self.name = name
|
||||
self.mode = mode
|
||||
self.delay_sec = delay_sec
|
||||
self.topics = topics
|
||||
|
||||
def start(self):
|
||||
self.logger.info("Creating connector MockSourceConnector %s", self.name)
|
||||
|
@ -393,8 +392,6 @@ class MockSource(object):
|
|||
'name': self.name,
|
||||
'connector.class': 'org.apache.kafka.connect.tools.MockSourceConnector',
|
||||
'tasks.max': 1,
|
||||
'topics': ",".join(self.topics),
|
||||
'mock_mode': self.mode,
|
||||
'delay_ms': self.delay_sec * 1000
|
||||
})
|
||||
|
||||
|
|
|
@ -23,7 +23,7 @@ from kafkatest.services.security.security_config import SecurityConfig
|
|||
from ducktape.utils.util import wait_until
|
||||
from ducktape.mark import matrix
|
||||
import subprocess, itertools, time
|
||||
from collections import Counter
|
||||
from collections import Counter, namedtuple
|
||||
import operator
|
||||
|
||||
class ConnectDistributedTest(Test):
|
||||
|
@ -155,7 +155,43 @@ class ConnectDistributedTest(Test):
|
|||
wait_until(lambda: self.connector_is_running(self.sink), timeout_sec=10,
|
||||
err_msg="Failed to see connector transition to the RUNNING state")
|
||||
|
||||
|
||||
@matrix(delete_before_reconfig=[False, True])
|
||||
def test_bad_connector_class(self, delete_before_reconfig):
|
||||
"""
|
||||
For the same connector name, first configure it with a bad connector class name such that it fails to start, verify that it enters a FAILED state.
|
||||
Restart should also fail.
|
||||
Then try to rectify by reconfiguring it as a MockConnector and verifying it successfully transitions to RUNNING.
|
||||
"""
|
||||
self.setup_services()
|
||||
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
|
||||
self.cc.start()
|
||||
|
||||
connector_name = 'bad-to-good-test'
|
||||
|
||||
connector = namedtuple('BadConnector', ['name', 'tasks'])(connector_name, 1)
|
||||
config = {
|
||||
'name': connector.name,
|
||||
'tasks.max': connector.tasks,
|
||||
'connector.class': 'java.util.HashMap'
|
||||
}
|
||||
self.cc.create_connector(config)
|
||||
|
||||
wait_until(lambda: self.connector_is_failed(connector), timeout_sec=10, err_msg="Failed to see connector transition to FAILED state")
|
||||
|
||||
try:
|
||||
self.cc.restart_connector(connector_name)
|
||||
except ConnectRestError:
|
||||
pass
|
||||
else:
|
||||
raise AssertionError("Expected restart of %s to fail" % connector_name)
|
||||
|
||||
if delete_before_reconfig:
|
||||
self.cc.delete_connector(connector_name)
|
||||
|
||||
config['connector.class'] = 'org.apache.kafka.connect.tools.MockSourceConnector'
|
||||
self.cc.set_connector_config(connector_name, config)
|
||||
wait_until(lambda: self.connector_is_running(connector), timeout_sec=10, err_msg="Failed to see connector transition to the RUNNING state")
|
||||
|
||||
@matrix(connector_type=["source", "sink"])
|
||||
def test_restart_failed_task(self, connector_type):
|
||||
self.setup_services()
|
||||
|
@ -166,7 +202,7 @@ class ConnectDistributedTest(Test):
|
|||
if connector_type == "sink":
|
||||
connector = MockSink(self.cc, self.topics.keys(), mode='task-failure', delay_sec=5)
|
||||
else:
|
||||
connector = MockSource(self.cc, self.topics.keys(), mode='task-failure', delay_sec=5)
|
||||
connector = MockSource(self.cc, mode='task-failure', delay_sec=5)
|
||||
|
||||
connector.start()
|
||||
|
||||
|
|
Loading…
Reference in New Issue