KAFKA-6688. The Trogdor coordinator should track task statuses (#4737)

Reviewers: Anna Povzner <anna@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
This commit is contained in:
Colin Patrick McCabe 2018-04-08 01:35:33 -07:00 committed by Rajini Sivaram
parent fedac0cea7
commit 40183e3156
29 changed files with 487 additions and 197 deletions

View File

@ -22,6 +22,7 @@ import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.Scheduler;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.rest.WorkerDone;
@ -29,6 +30,7 @@ import org.apache.kafka.trogdor.rest.WorkerRunning;
import org.apache.kafka.trogdor.rest.WorkerStarting;
import org.apache.kafka.trogdor.rest.WorkerStopping;
import org.apache.kafka.trogdor.rest.WorkerState;
import org.apache.kafka.trogdor.task.AgentWorkerStatusTracker;
import org.apache.kafka.trogdor.task.TaskSpec;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.slf4j.Logger;
@ -43,7 +45,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public final class WorkerManager {
private static final Logger log = LoggerFactory.getLogger(WorkerManager.class);
@ -190,7 +191,7 @@ public final class WorkerManager {
/**
* The worker status.
*/
private final AtomicReference<String> status = new AtomicReference<>("");
private final AgentWorkerStatusTracker status = new AgentWorkerStatusTracker();
/**
* The time when this task was started.
@ -293,6 +294,8 @@ public final class WorkerManager {
haltFuture.thenApply(new KafkaFuture.BaseFunction<String, Void>() {
@Override
public Void apply(String errorString) {
if (errorString == null)
errorString = "";
if (errorString.isEmpty()) {
log.info("{}: Worker {} is halting.", nodeName, id);
} else {
@ -306,8 +309,9 @@ public final class WorkerManager {
try {
worker.taskWorker.start(platform, worker.status, haltFuture);
} catch (Exception e) {
log.info("{}: Worker {} start() exception", nodeName, id, e);
stateChangeExecutor.submit(new HandleWorkerHalting(worker,
"worker.start() exception: " + e.getMessage(), true));
"worker.start() exception: " + Utils.stackTrace(e), true));
}
stateChangeExecutor.submit(new FinishCreatingWorker(worker));
}

View File

@ -49,7 +49,6 @@ import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
import org.apache.kafka.trogdor.rest.StopWorkerRequest;
import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerReceiving;
import org.apache.kafka.trogdor.rest.WorkerRunning;
import org.apache.kafka.trogdor.rest.WorkerStarting;
@ -192,6 +191,9 @@ public final class NodeManager {
// agents going down?
return;
}
if (log.isTraceEnabled()) {
log.trace("{}: got heartbeat status {}", node.name(), agentStatus);
}
// Identify workers which we think should be running, but which do not appear
// in the agent's response. We need to send startWorker requests for these.
for (Map.Entry<String, ManagedWorker> entry : workers.entrySet()) {
@ -203,40 +205,31 @@ public final class NodeManager {
}
}
}
// Identify tasks which are running, but which we don't know about.
// Add these to the NodeManager as tasks that should not be running.
for (Map.Entry<String, WorkerState> entry : agentStatus.workers().entrySet()) {
String id = entry.getKey();
WorkerState state = entry.getValue();
if (!workers.containsKey(id)) {
log.warn("{}: scheduling unknown worker {} for stopping.", node.name(), id);
workers.put(id, new ManagedWorker(id, state.spec(), false, state));
}
}
// Handle workers which need to be stopped. Handle workers which have newly completed.
for (Map.Entry<String, WorkerState> entry : agentStatus.workers().entrySet()) {
String id = entry.getKey();
WorkerState state = entry.getValue();
ManagedWorker worker = workers.get(id);
if (state instanceof WorkerStarting || state instanceof WorkerRunning) {
if (!worker.shouldRun) {
worker.tryStop();
}
} else if (state instanceof WorkerDone) {
if (!(worker.state instanceof WorkerDone)) {
WorkerDone workerDoneState = (WorkerDone) state;
String error = workerDoneState.error();
if (error.isEmpty()) {
log.info("{}: Worker {} finished with status '{}'",
node.name(), id, workerDoneState.status());
} else {
log.warn("{}: Worker {} finished with error '{}' and status '{}'",
node.name(), id, error, workerDoneState.status());
if (worker == null) {
// Identify tasks which are running, but which we don't know about.
// Add these to the NodeManager as tasks that should not be running.
log.warn("{}: scheduling unknown worker {} for stopping.", node.name(), id);
workers.put(id, new ManagedWorker(id, state.spec(), false, state));
} else {
// Handle workers which need to be stopped.
if (state instanceof WorkerStarting || state instanceof WorkerRunning) {
if (!worker.shouldRun) {
worker.tryStop();
}
taskManager.handleWorkerCompletion(node.name(), worker.id, error);
}
// Notify the TaskManager if the worker state has changed.
if (worker.state.equals(state)) {
log.debug("{}: worker state is still {}", node.name(), worker.state);
} else {
log.info("{}: worker state changed from {} to {}", node.name(), worker.state, state);
worker.state = state;
taskManager.updateWorkerState(node.name(), worker.id, state);
}
}
worker.state = state;
}
} catch (Throwable e) {
log.error("{}: Unhandled exception in NodeHeartbeatRunnable", node.name(), e);

View File

@ -17,10 +17,14 @@
package org.apache.kafka.trogdor.coordinator;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.Scheduler;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
@ -31,13 +35,15 @@ import org.apache.kafka.trogdor.rest.TaskState;
import org.apache.kafka.trogdor.rest.TaskStopping;
import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TasksResponse;
import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerReceiving;
import org.apache.kafka.trogdor.rest.WorkerState;
import org.apache.kafka.trogdor.task.TaskController;
import org.apache.kafka.trogdor.task.TaskSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
@ -172,16 +178,9 @@ public final class TaskManager {
private Future<?> startFuture = null;
/**
* The name of the worker nodes involved with this task.
* Null if the task is not running.
* The states of the workers involved with this task.
*/
private Set<String> workers = null;
/**
* The names of the worker nodes which are still running this task.
* Null if the task is not running.
*/
private Set<String> activeWorkers = null;
public Map<String, WorkerState> workerStates = new TreeMap<>();
/**
* If this is non-empty, a message describing how this task failed.
@ -241,14 +240,39 @@ public final class TaskManager {
case PENDING:
return new TaskPending(spec);
case RUNNING:
return new TaskRunning(spec, startedMs);
return new TaskRunning(spec, startedMs, getCombinedStatus(workerStates));
case STOPPING:
return new TaskStopping(spec, startedMs);
return new TaskStopping(spec, startedMs, getCombinedStatus(workerStates));
case DONE:
return new TaskDone(spec, startedMs, doneMs, error, cancelled);
return new TaskDone(spec, startedMs, doneMs, error, cancelled, getCombinedStatus(workerStates));
}
throw new RuntimeException("unreachable");
}
TreeSet<String> activeWorkers() {
TreeSet<String> workerNames = new TreeSet<>();
for (Map.Entry<String, WorkerState> entry : workerStates.entrySet()) {
if (!entry.getValue().done()) {
workerNames.add(entry.getKey());
}
}
return workerNames;
}
}
private static final JsonNode getCombinedStatus(Map<String, WorkerState> states) {
if (states.size() == 1) {
return states.values().iterator().next().status();
} else {
ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
for (Map.Entry<String, WorkerState> entry : states.entrySet()) {
JsonNode node = entry.getValue().status();
if (node != null) {
objectNode.set(entry.getKey(), node);
}
}
return objectNode;
}
}
/**
@ -349,10 +373,8 @@ public final class TaskManager {
log.info("Running task {} on node(s): {}", task.id, Utils.join(nodeNames, ", "));
task.state = ManagedTaskState.RUNNING;
task.startedMs = time.milliseconds();
task.workers = nodeNames;
task.activeWorkers = new HashSet<>();
for (String workerName : task.workers) {
task.activeWorkers.add(workerName);
for (String workerName : nodeNames) {
task.workerStates.put(workerName, new WorkerReceiving(task.spec));
nodeManagers.get(workerName).createWorker(task.id, task.spec);
}
return null;
@ -398,15 +420,16 @@ public final class TaskManager {
break;
case RUNNING:
task.cancelled = true;
if (task.activeWorkers.size() == 0) {
TreeSet<String> activeWorkers = task.activeWorkers();
if (activeWorkers.isEmpty()) {
log.info("Task {} is now complete with error: {}", id, task.error);
task.doneMs = time.milliseconds();
task.state = ManagedTaskState.DONE;
} else {
for (String workerName : task.activeWorkers) {
for (String workerName : activeWorkers) {
nodeManagers.get(workerName).stopWorker(id);
}
log.info("Cancelling task {} on worker(s): {}", id, Utils.join(task.activeWorkers, ", "));
log.info("Cancelling task {} on worker(s): {}", id, Utils.join(activeWorkers, ", "));
task.state = ManagedTaskState.STOPPING;
}
break;
@ -422,65 +445,79 @@ public final class TaskManager {
}
/**
* A callback NodeManager makes to indicate that a worker has completed.
* The task will transition to DONE once all workers are done.
* Update the state of a particular agent's worker.
*
* @param nodeName The node name.
* @param nodeName The node where the agent is running.
* @param id The worker name.
* @param error An empty string if there is no error, or an error string.
* @param state The worker state.
*/
public void handleWorkerCompletion(String nodeName, String id, String error) {
executor.submit(new HandleWorkerCompletion(nodeName, id, error));
public void updateWorkerState(String nodeName, String id, WorkerState state) {
executor.submit(new UpdateWorkerState(nodeName, id, state));
}
class HandleWorkerCompletion implements Callable<Void> {
class UpdateWorkerState implements Callable<Void> {
private final String nodeName;
private final String id;
private final String error;
private final WorkerState state;
HandleWorkerCompletion(String nodeName, String id, String error) {
UpdateWorkerState(String nodeName, String id, WorkerState state) {
this.nodeName = nodeName;
this.id = id;
this.error = error;
this.state = state;
}
@Override
public Void call() throws Exception {
ManagedTask task = tasks.get(id);
if (task == null) {
log.error("Can't handle completion of unknown worker {} on node {}",
log.error("Can't update worker state unknown worker {} on node {}",
id, nodeName);
return null;
}
if ((task.state == ManagedTaskState.PENDING) || (task.state == ManagedTaskState.DONE)) {
log.error("Task {} got unexpected worker completion from {} while " +
"in {} state.", id, nodeName, task.state);
return null;
}
boolean broadcastStop = false;
if (task.state == ManagedTaskState.RUNNING) {
task.state = ManagedTaskState.STOPPING;
broadcastStop = true;
}
task.maybeSetError(error);
task.activeWorkers.remove(nodeName);
if (task.activeWorkers.size() == 0) {
task.doneMs = time.milliseconds();
task.state = ManagedTaskState.DONE;
log.info("Task {} is now complete on {} with error: {}", id,
Utils.join(task.workers, ", "),
task.error.isEmpty() ? "(none)" : task.error);
} else if (broadcastStop) {
log.info("Node {} stopped. Stopping task {} on worker(s): {}",
id, Utils.join(task.activeWorkers, ", "));
for (String workerName : task.activeWorkers) {
nodeManagers.get(workerName).stopWorker(id);
}
WorkerState prevState = task.workerStates.get(nodeName);
log.debug("Task {}: Updating worker state for {} from {} to {}.",
id, nodeName, prevState, state);
task.workerStates.put(nodeName, state);
if (state.done() && (!prevState.done())) {
handleWorkerCompletion(task, nodeName, (WorkerDone) state);
}
return null;
}
}
/**
* Handle a worker being completed.
*
* @param task The task that owns the worker.
* @param nodeName The name of the node on which the worker is running.
* @param state The worker state.
*/
private void handleWorkerCompletion(ManagedTask task, String nodeName, WorkerDone state) {
if (state.error().isEmpty()) {
log.info("{}: Worker {} finished with status '{}'",
nodeName, task.id, JsonUtil.toJsonString(state.status()));
} else {
log.warn("{}: Worker {} finished with error '{}' and status '{}'",
nodeName, task.id, state.error(), JsonUtil.toJsonString(state.status()));
task.maybeSetError(state.error());
}
if (task.activeWorkers().isEmpty()) {
task.doneMs = time.milliseconds();
task.state = ManagedTaskState.DONE;
log.info("{}: Task {} is now complete on {} with error: {}",
nodeName, task.id, Utils.join(task.workerStates.keySet(), ", "),
task.error.isEmpty() ? "(none)" : task.error);
} else if ((task.state == ManagedTaskState.RUNNING) && (!task.error.isEmpty())) {
TreeSet<String> activeWorkers = task.activeWorkers();
log.info("{}: task {} stopped with error {}. Stopping worker(s): {}",
nodeName, task.id, task.error, Utils.join(activeWorkers, ", "));
task.state = ManagedTaskState.STOPPING;
for (String workerName : activeWorkers) {
nodeManagers.get(workerName).stopWorker(task.id);
}
}
}
/**
* Get information about the tasks being managed.
*/

View File

@ -17,15 +17,15 @@
package org.apache.kafka.trogdor.fault;
import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.fault.Kibosh.KiboshFaultSpec;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicReference;
public class KiboshFaultWorker implements TaskWorker {
private static final Logger log = LoggerFactory.getLogger(KiboshFaultWorker.class);
@ -35,6 +35,8 @@ public class KiboshFaultWorker implements TaskWorker {
private final String mountPath;
private WorkerStatusTracker status;
public KiboshFaultWorker(String id, KiboshFaultSpec spec, String mountPath) {
this.id = id;
this.spec = spec;
@ -42,15 +44,20 @@ public class KiboshFaultWorker implements TaskWorker {
}
@Override
public void start(Platform platform, AtomicReference<String> status,
public void start(Platform platform, WorkerStatusTracker status,
KafkaFutureImpl<String> errorFuture) throws Exception {
log.info("Activating {} {}: {}.", spec.getClass().getSimpleName(), id, spec);
this.status = status;
this.status.update(new TextNode("Adding fault " + id));
Kibosh.INSTANCE.addFault(mountPath, spec);
this.status.update(new TextNode("Added fault " + id));
}
@Override
public void stop(Platform platform) throws Exception {
log.info("Deactivating {} {}: {}.", spec.getClass().getSimpleName(), id, spec);
this.status.update(new TextNode("Removing fault " + id));
Kibosh.INSTANCE.removeFault(mountPath, spec);
this.status.update(new TextNode("Removed fault " + id));
}
}

View File

@ -17,11 +17,13 @@
package org.apache.kafka.trogdor.fault;
import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.Topology;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -29,7 +31,6 @@ import java.net.InetAddress;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicReference;
public class NetworkPartitionFaultWorker implements TaskWorker {
private static final Logger log = LoggerFactory.getLogger(NetworkPartitionFaultWorker.class);
@ -38,22 +39,29 @@ public class NetworkPartitionFaultWorker implements TaskWorker {
private final List<Set<String>> partitionSets;
private WorkerStatusTracker status;
public NetworkPartitionFaultWorker(String id, List<Set<String>> partitionSets) {
this.id = id;
this.partitionSets = partitionSets;
}
@Override
public void start(Platform platform, AtomicReference<String> status,
public void start(Platform platform, WorkerStatusTracker status,
KafkaFutureImpl<String> errorFuture) throws Exception {
log.info("Activating NetworkPartitionFault {}.", id);
this.status = status;
this.status.update(new TextNode("creating network partition " + id));
runIptablesCommands(platform, "-A");
this.status.update(new TextNode("created network partition " + id));
}
@Override
public void stop(Platform platform) throws Exception {
log.info("Deactivating NetworkPartitionFault {}.", id);
this.status.update(new TextNode("removing network partition " + id));
runIptablesCommands(platform, "-D");
this.status.update(new TextNode("removed network partition " + id));
}
private void runIptablesCommands(Platform platform, String iptablesAction) throws Exception {

View File

@ -17,16 +17,17 @@
package org.apache.kafka.trogdor.fault;
import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
public class ProcessStopFaultWorker implements TaskWorker {
private static final Logger log = LoggerFactory.getLogger(ProcessStopFaultWorker.class);
@ -35,22 +36,29 @@ public class ProcessStopFaultWorker implements TaskWorker {
private final String javaProcessName;
private WorkerStatusTracker status;
public ProcessStopFaultWorker(String id, String javaProcessName) {
this.id = id;
this.javaProcessName = javaProcessName;
}
@Override
public void start(Platform platform, AtomicReference<String> status,
public void start(Platform platform, WorkerStatusTracker status,
KafkaFutureImpl<String> errorFuture) throws Exception {
this.status = status;
log.info("Activating ProcessStopFault {}.", id);
this.status.update(new TextNode("stopping " + javaProcessName));
sendSignals(platform, "SIGSTOP");
this.status.update(new TextNode("stopped " + javaProcessName));
}
@Override
public void stop(Platform platform) throws Exception {
log.info("Deactivating ProcessStopFault {}.", id);
this.status.update(new TextNode("resuming " + javaProcessName));
sendSignals(platform, "SIGCONT");
this.status.update(new TextNode("resumed " + javaProcessName));
}
private void sendSignals(Platform platform, String signalName) throws Exception {

View File

@ -16,9 +16,9 @@
*/
package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.trogdor.task.TaskSpec;
/**
@ -50,8 +50,9 @@ public class TaskDone extends TaskState {
@JsonProperty("startedMs") long startedMs,
@JsonProperty("doneMs") long doneMs,
@JsonProperty("error") String error,
@JsonProperty("cancelled") boolean cancelled) {
super(spec);
@JsonProperty("cancelled") boolean cancelled,
@JsonProperty("status") JsonNode status) {
super(spec, status);
this.startedMs = startedMs;
this.doneMs = doneMs;
this.error = error;

View File

@ -19,6 +19,7 @@ package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.kafka.trogdor.task.TaskSpec;
/**
@ -27,6 +28,6 @@ import org.apache.kafka.trogdor.task.TaskSpec;
public class TaskPending extends TaskState {
@JsonCreator
public TaskPending(@JsonProperty("spec") TaskSpec spec) {
super(spec);
super(spec, NullNode.instance);
}
}

View File

@ -19,6 +19,7 @@ package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.trogdor.task.TaskSpec;
/**
@ -32,8 +33,9 @@ public class TaskRunning extends TaskState {
@JsonCreator
public TaskRunning(@JsonProperty("spec") TaskSpec spec,
@JsonProperty("startedMs") long startedMs) {
super(spec);
@JsonProperty("startedMs") long startedMs,
@JsonProperty("status") JsonNode status) {
super(spec, status);
this.startedMs = startedMs;
}

View File

@ -20,6 +20,8 @@ package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.kafka.trogdor.task.TaskSpec;
/**
@ -37,12 +39,20 @@ import org.apache.kafka.trogdor.task.TaskSpec;
public abstract class TaskState extends Message {
private final TaskSpec spec;
public TaskState(TaskSpec spec) {
private final JsonNode status;
public TaskState(TaskSpec spec, JsonNode status) {
this.spec = spec;
this.status = status == null ? NullNode.instance : status;
}
@JsonProperty
public TaskSpec spec() {
return spec;
}
@JsonProperty
public JsonNode status() {
return status;
}
}

View File

@ -19,6 +19,7 @@ package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.trogdor.task.TaskSpec;
/**
@ -32,8 +33,9 @@ public class TaskStopping extends TaskState {
@JsonCreator
public TaskStopping(@JsonProperty("spec") TaskSpec spec,
@JsonProperty("startedMs") long startedMs) {
super(spec);
@JsonProperty("startedMs") long startedMs,
@JsonProperty("status") JsonNode status) {
super(spec, status);
this.startedMs = startedMs;
}

View File

@ -19,6 +19,8 @@ package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.kafka.trogdor.task.TaskSpec;
/**
@ -39,7 +41,7 @@ public class WorkerDone extends WorkerState {
* The task status. The format will depend on the type of task that is
* being run.
*/
private final String status;
private final JsonNode status;
/**
* Empty if the task completed without error; the error message otherwise.
@ -50,12 +52,12 @@ public class WorkerDone extends WorkerState {
public WorkerDone(@JsonProperty("spec") TaskSpec spec,
@JsonProperty("startedMs") long startedMs,
@JsonProperty("doneMs") long doneMs,
@JsonProperty("status") String status,
@JsonProperty("status") JsonNode status,
@JsonProperty("error") String error) {
super(spec);
this.startedMs = startedMs;
this.doneMs = doneMs;
this.status = status == null ? "" : status;
this.status = status == null ? NullNode.instance : status;
this.error = error == null ? "" : error;
}
@ -72,7 +74,7 @@ public class WorkerDone extends WorkerState {
@JsonProperty
@Override
public String status() {
public JsonNode status() {
return status;
}

View File

@ -19,6 +19,8 @@ package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.kafka.trogdor.task.TaskSpec;
/**
@ -30,4 +32,9 @@ public final class WorkerReceiving extends WorkerState {
public WorkerReceiving(@JsonProperty("spec") TaskSpec spec) {
super(spec);
}
@Override
public JsonNode status() {
return new TextNode("receiving");
}
}

View File

@ -19,6 +19,8 @@ package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.kafka.trogdor.task.TaskSpec;
/**
@ -34,15 +36,15 @@ public class WorkerRunning extends WorkerState {
* The task status. The format will depend on the type of task that is
* being run.
*/
private final String status;
private final JsonNode status;
@JsonCreator
public WorkerRunning(@JsonProperty("spec") TaskSpec spec,
@JsonProperty("startedMs") long startedMs,
@JsonProperty("status") String status) {
@JsonProperty("status") JsonNode status) {
super(spec);
this.startedMs = startedMs;
this.status = status == null ? "" : status;
this.status = status == null ? NullNode.instance : status;
}
@JsonProperty
@ -53,7 +55,7 @@ public class WorkerRunning extends WorkerState {
@JsonProperty
@Override
public String status() {
public JsonNode status() {
return status;
}

View File

@ -19,6 +19,8 @@ package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.kafka.trogdor.task.TaskSpec;
/**
@ -29,4 +31,9 @@ public final class WorkerStarting extends WorkerState {
public WorkerStarting(@JsonProperty("spec") TaskSpec spec) {
super(spec);
}
@Override
public JsonNode status() {
return new TextNode("starting");
}
}

View File

@ -20,6 +20,7 @@ package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.trogdor.task.TaskSpec;
@ -60,9 +61,7 @@ public abstract class WorkerState extends Message {
throw new KafkaException("invalid state");
}
public String status() {
throw new KafkaException("invalid state");
}
public abstract JsonNode status();
public boolean running() {
return false;

View File

@ -19,6 +19,8 @@ package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.kafka.trogdor.task.TaskSpec;
/**
@ -34,15 +36,15 @@ public class WorkerStopping extends WorkerState {
* The task status. The format will depend on the type of task that is
* being run.
*/
private final String status;
private final JsonNode status;
@JsonCreator
public WorkerStopping(@JsonProperty("spec") TaskSpec spec,
@JsonProperty("startedMs") long startedMs,
@JsonProperty("status") String status) {
@JsonProperty("status") JsonNode status) {
super(spec);
this.startedMs = startedMs;
this.status = status == null ? "" : status;
this.status = status == null ? NullNode.instance : status;
}
@JsonProperty
@ -53,7 +55,7 @@ public class WorkerStopping extends WorkerState {
@JsonProperty
@Override
public String status() {
public JsonNode status() {
return status;
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.task;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
/**
* Tracks the status of a Trogdor worker.
*/
public class AgentWorkerStatusTracker implements WorkerStatusTracker {
private JsonNode status = NullNode.instance;
@Override
public void update(JsonNode newStatus) {
JsonNode status = newStatus.deepCopy();
synchronized (this) {
this.status = status;
}
}
/**
* Retrieves the status.
*/
public synchronized JsonNode get() {
return status;
}
}

View File

@ -17,30 +17,34 @@
package org.apache.kafka.trogdor.task;
import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.trogdor.common.Platform;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicReference;
public class NoOpTaskWorker implements TaskWorker {
private static final Logger log = LoggerFactory.getLogger(NoOpTaskWorker.class);
private final String id;
private WorkerStatusTracker status;
public NoOpTaskWorker(String id) {
this.id = id;
}
@Override
public void start(Platform platform, AtomicReference<String> status,
public void start(Platform platform, WorkerStatusTracker status,
KafkaFutureImpl<String> errorFuture) throws Exception {
log.info("{}: Activating NoOpTask.", id);
this.status = status;
this.status.update(new TextNode("active"));
}
@Override
public void stop(Platform platform) throws Exception {
log.info("{}: Deactivating NoOpTask.", id);
this.status.update(new TextNode("done"));
}
}

View File

@ -20,8 +20,6 @@ package org.apache.kafka.trogdor.task;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.trogdor.common.Platform;
import java.util.concurrent.atomic.AtomicReference;
/**
* The agent-side interface for implementing tasks.
*/
@ -42,7 +40,7 @@ public interface TaskWorker {
*
*
* @param platform The platform to use.
* @param status The current status string. The TaskWorker can update
* @param status The current status. The TaskWorker can update
* this at any time to provide an updated status.
* @param haltFuture A future which the worker should complete if it halts.
* If it is completed with an empty string, that means the task
@ -53,7 +51,7 @@ public interface TaskWorker {
*
* @throws Exception If the TaskWorker failed to start. stop() will not be invoked.
*/
void start(Platform platform, AtomicReference<String> status, KafkaFutureImpl<String> haltFuture)
void start(Platform platform, WorkerStatusTracker status, KafkaFutureImpl<String> haltFuture)
throws Exception;
/**

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.task;
import com.fasterxml.jackson.databind.JsonNode;
/**
* Tracks the status of a Trogdor worker.
*/
public interface WorkerStatusTracker {
/**
* Updates the status.
*
* @param status The new status.
*/
void update(JsonNode status);
}

View File

@ -19,6 +19,7 @@ package org.apache.kafka.trogdor.workload;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
@ -34,6 +35,7 @@ import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.common.WorkerUtils;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -46,7 +48,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class ProduceBenchWorker implements TaskWorker {
private static final Logger log = LoggerFactory.getLogger(ProduceBenchWorker.class);
@ -61,7 +62,7 @@ public class ProduceBenchWorker implements TaskWorker {
private ScheduledExecutorService executor;
private AtomicReference<String> status;
private WorkerStatusTracker status;
private KafkaFutureImpl<String> doneFuture;
@ -81,7 +82,7 @@ public class ProduceBenchWorker implements TaskWorker {
}
@Override
public void start(Platform platform, AtomicReference<String> status,
public void start(Platform platform, WorkerStatusTracker status,
KafkaFutureImpl<String> doneFuture) throws Exception {
if (!running.compareAndSet(false, true)) {
throw new IllegalStateException("ProducerBenchWorker is already running.");
@ -112,9 +113,10 @@ public class ProduceBenchWorker implements TaskWorker {
newTopics.put(name, new NewTopic(name, spec.numPartitions(),
spec.replicationFactor()));
}
status.update(new TextNode("Creating " + spec.totalTopics() + " topic(s)"));
WorkerUtils.createTopics(log, spec.bootstrapServers(), spec.commonClientConf(),
spec.adminClientConf(), newTopics, false);
status.update(new TextNode("Created " + spec.totalTopics() + " topic(s)"));
executor.submit(new SendRecords());
} catch (Throwable e) {
WorkerUtils.abort(log, "Prepare", e, doneFuture);
@ -181,7 +183,7 @@ public class ProduceBenchWorker implements TaskWorker {
this.histogram = new Histogram(5000);
int perPeriod = WorkerUtils.perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
this.statusUpdaterFuture = executor.scheduleWithFixedDelay(
new StatusUpdater(histogram), 1, 1, TimeUnit.MINUTES);
new StatusUpdater(histogram), 30, 30, TimeUnit.SECONDS);
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
// add common client configs to producer properties, and then user-specified producer
@ -218,10 +220,10 @@ public class ProduceBenchWorker implements TaskWorker {
WorkerUtils.abort(log, "SendRecords", e, doneFuture);
} finally {
statusUpdaterFuture.cancel(false);
new StatusUpdater(histogram).run();
StatusData statusData = new StatusUpdater(histogram).update();
long curTimeMs = Time.SYSTEM.milliseconds();
log.info("Sent {} total record(s) in {} ms. status: {}",
histogram.summarize().numSamples(), curTimeMs - startTimeMs, status.get());
histogram.summarize().numSamples(), curTimeMs - startTimeMs, statusData);
}
doneFuture.complete("");
return null;
@ -234,46 +236,54 @@ public class ProduceBenchWorker implements TaskWorker {
public class StatusUpdater implements Runnable {
private final Histogram histogram;
private final float[] percentiles;
StatusUpdater(Histogram histogram) {
this.histogram = histogram;
this.percentiles = new float[] {0.50f, 0.95f, 0.99f};
}
@Override
public void run() {
try {
Histogram.Summary summary = histogram.summarize(percentiles);
StatusData statusData = new StatusData(summary.numSamples(), summary.average(),
summary.percentiles().get(0).value(),
summary.percentiles().get(1).value(),
summary.percentiles().get(2).value());
String statusDataString = JsonUtil.toJsonString(statusData);
status.set(statusDataString);
update();
} catch (Exception e) {
WorkerUtils.abort(log, "StatusUpdater", e, doneFuture);
}
}
StatusData update() {
Histogram.Summary summary = histogram.summarize(StatusData.PERCENTILES);
StatusData statusData = new StatusData(summary.numSamples(), summary.average(),
summary.percentiles().get(0).value(),
summary.percentiles().get(1).value(),
summary.percentiles().get(2).value());
status.update(JsonUtil.JSON_SERDE.valueToTree(statusData));
return statusData;
}
}
public static class StatusData {
private final long totalSent;
private final float averageLatencyMs;
private final int p50LatencyMs;
private final int p90LatencyMs;
private final int p95LatencyMs;
private final int p99LatencyMs;
/**
* The percentiles to use when calculating the histogram data.
* These should match up with the p50LatencyMs, p95LatencyMs, etc. fields.
*/
final static float[] PERCENTILES = {0.5f, 0.95f, 0.99f};
@JsonCreator
StatusData(@JsonProperty("totalSent") long totalSent,
@JsonProperty("averageLatencyMs") float averageLatencyMs,
@JsonProperty("p50LatencyMs") int p50latencyMs,
@JsonProperty("p90LatencyMs") int p90latencyMs,
@JsonProperty("p95LatencyMs") int p95latencyMs,
@JsonProperty("p99LatencyMs") int p99latencyMs) {
this.totalSent = totalSent;
this.averageLatencyMs = averageLatencyMs;
this.p50LatencyMs = p50latencyMs;
this.p90LatencyMs = p90latencyMs;
this.p95LatencyMs = p95latencyMs;
this.p99LatencyMs = p99latencyMs;
}
@ -293,8 +303,8 @@ public class ProduceBenchWorker implements TaskWorker {
}
@JsonProperty
public int p90LatencyMs() {
return p90LatencyMs;
public int p95LatencyMs() {
return p95LatencyMs;
}
@JsonProperty

View File

@ -39,6 +39,7 @@ import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.common.WorkerUtils;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -55,7 +56,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class RoundTripWorker implements TaskWorker {
private static final int THROTTLE_PERIOD_MS = 100;
@ -98,7 +98,7 @@ public class RoundTripWorker implements TaskWorker {
}
@Override
public void start(Platform platform, AtomicReference<String> status,
public void start(Platform platform, WorkerStatusTracker status,
KafkaFutureImpl<String> doneFuture) throws Exception {
if (!running.compareAndSet(false, true)) {
throw new IllegalStateException("RoundTripWorker is already running.");

View File

@ -17,6 +17,7 @@
package org.apache.kafka.trogdor.agent;
import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.kafka.common.utils.MockScheduler;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Scheduler;
@ -122,7 +123,7 @@ public class AgentTest {
CreateWorkerResponse response = client.createWorker(new CreateWorkerRequest("foo", fooSpec));
assertEquals(fooSpec.toString(), response.spec().toString());
new ExpectedTasks().addTask(new ExpectedTaskBuilder("foo").
workerState(new WorkerRunning(fooSpec, 0, "")).
workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
build()).
waitFor(client);
@ -131,10 +132,10 @@ public class AgentTest {
client.createWorker(new CreateWorkerRequest("bar", barSpec));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
workerState(new WorkerRunning(fooSpec, 0, "")).
workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
build()).
addTask(new ExpectedTaskBuilder("bar").
workerState(new WorkerRunning(barSpec, 0, "")).
workerState(new WorkerRunning(barSpec, 0, new TextNode("active"))).
build()).
waitFor(client);
@ -142,13 +143,13 @@ public class AgentTest {
client.createWorker(new CreateWorkerRequest("baz", bazSpec));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
workerState(new WorkerRunning(fooSpec, 0, "")).
workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
build()).
addTask(new ExpectedTaskBuilder("bar").
workerState(new WorkerRunning(barSpec, 0, "")).
workerState(new WorkerRunning(barSpec, 0, new TextNode("active"))).
build()).
addTask(new ExpectedTaskBuilder("baz").
workerState(new WorkerRunning(bazSpec, 0, "")).
workerState(new WorkerRunning(bazSpec, 0, new TextNode("active"))).
build()).
waitFor(client);
@ -169,7 +170,7 @@ public class AgentTest {
client.createWorker(new CreateWorkerRequest("foo", fooSpec));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
workerState(new WorkerRunning(fooSpec, 0, "")).
workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
build()).
waitFor(client);
@ -179,10 +180,10 @@ public class AgentTest {
client.createWorker(new CreateWorkerRequest("bar", barSpec));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
workerState(new WorkerRunning(fooSpec, 0, "")).
workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
build()).
addTask(new ExpectedTaskBuilder("bar").
workerState(new WorkerRunning(barSpec, 1, "")).
workerState(new WorkerRunning(barSpec, 1, new TextNode("active"))).
build()).
waitFor(client);
@ -190,10 +191,10 @@ public class AgentTest {
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
workerState(new WorkerDone(fooSpec, 0, 2, "", "")).
workerState(new WorkerDone(fooSpec, 0, 2, new TextNode("done"), "")).
build()).
addTask(new ExpectedTaskBuilder("bar").
workerState(new WorkerRunning(barSpec, 1, "")).
workerState(new WorkerRunning(barSpec, 1, new TextNode("active"))).
build()).
waitFor(client);
@ -201,10 +202,10 @@ public class AgentTest {
client.stopWorker(new StopWorkerRequest("bar"));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
workerState(new WorkerDone(fooSpec, 0, 2, "", "")).
workerState(new WorkerDone(fooSpec, 0, 2, new TextNode("done"), "")).
build()).
addTask(new ExpectedTaskBuilder("bar").
workerState(new WorkerDone(barSpec, 1, 7, "", "")).
workerState(new WorkerDone(barSpec, 1, 7, new TextNode("done"), "")).
build()).
waitFor(client);
@ -221,34 +222,40 @@ public class AgentTest {
maxTries(10).target("localhost", agent.port()).build();
new ExpectedTasks().waitFor(client);
SampleTaskSpec fooSpec = new SampleTaskSpec(0, 900000, 1, "");
SampleTaskSpec fooSpec = new SampleTaskSpec(0, 900000,
Collections.singletonMap("node01", 1L), "");
client.createWorker(new CreateWorkerRequest("foo", fooSpec));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
workerState(new WorkerRunning(fooSpec, 0, "")).
workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
build()).
waitFor(client);
SampleTaskSpec barSpec = new SampleTaskSpec(0, 900000, 2, "baz");
SampleTaskSpec barSpec = new SampleTaskSpec(0, 900000,
Collections.singletonMap("node01", 2L), "baz");
client.createWorker(new CreateWorkerRequest("bar", barSpec));
time.sleep(1);
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
workerState(new WorkerDone(fooSpec, 0, 1, "", "")).
workerState(new WorkerDone(fooSpec, 0, 1,
new TextNode("halted"), "")).
build()).
addTask(new ExpectedTaskBuilder("bar").
workerState(new WorkerRunning(barSpec, 0, "")).
workerState(new WorkerRunning(barSpec, 0,
new TextNode("active"))).
build()).
waitFor(client);
time.sleep(1);
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
workerState(new WorkerDone(fooSpec, 0, 1, "", "")).
workerState(new WorkerDone(fooSpec, 0, 1,
new TextNode("halted"), "")).
build()).
addTask(new ExpectedTaskBuilder("bar").
workerState(new WorkerDone(barSpec, 0, 2, "", "baz")).
workerState(new WorkerDone(barSpec, 0, 2,
new TextNode("halted"), "baz")).
build()).
waitFor(client);
}
@ -289,7 +296,7 @@ public class AgentTest {
client.createWorker(new CreateWorkerRequest("foo", fooSpec));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
workerState(new WorkerRunning(fooSpec, 0, "")).
workerState(new WorkerRunning(fooSpec, 0, new TextNode("Added fault foo"))).
build()).
waitFor(client);
Assert.assertEquals(new KiboshControlFile(Collections.<Kibosh.KiboshFaultSpec>singletonList(
@ -299,9 +306,9 @@ public class AgentTest {
client.createWorker(new CreateWorkerRequest("bar", barSpec));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
workerState(new WorkerRunning(fooSpec, 0, "")).build()).
workerState(new WorkerRunning(fooSpec, 0, new TextNode("Added fault foo"))).build()).
addTask(new ExpectedTaskBuilder("bar").
workerState(new WorkerRunning(barSpec, 0, "")).build()).
workerState(new WorkerRunning(barSpec, 0, new TextNode("Added fault bar"))).build()).
waitFor(client);
Assert.assertEquals(new KiboshControlFile(new ArrayList<Kibosh.KiboshFaultSpec>() {{
add(new KiboshFilesUnreadableFaultSpec("/foo", 123));
@ -311,9 +318,9 @@ public class AgentTest {
client.stopWorker(new StopWorkerRequest("foo"));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
workerState(new WorkerDone(fooSpec, 0, 1, "", "")).build()).
workerState(new WorkerDone(fooSpec, 0, 1, new TextNode("Removed fault foo"), "")).build()).
addTask(new ExpectedTaskBuilder("bar").
workerState(new WorkerRunning(barSpec, 0, "")).build()).
workerState(new WorkerRunning(barSpec, 0, new TextNode("Added fault bar"))).build()).
waitFor(client);
Assert.assertEquals(new KiboshControlFile(Collections.<Kibosh.KiboshFaultSpec>singletonList(
new KiboshFilesUnreadableFaultSpec("/bar", 456))), mockKibosh.read());

View File

@ -52,7 +52,7 @@ public class JsonSerializationTest {
0, 0, null, null, null, null, null, 0, 0, "test-topic", 1, (short) 3));
verify(new RoundTripWorkloadSpec(0, 0, null, null, null, null, null, null,
0, null, null, 0));
verify(new SampleTaskSpec(0, 0, 0, null));
verify(new SampleTaskSpec(0, 0, null, null));
}
private <T> void verify(T val1) throws Exception {

View File

@ -17,6 +17,9 @@
package org.apache.kafka.trogdor.coordinator;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.kafka.common.utils.MockScheduler;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Scheduler;
@ -41,6 +44,7 @@ import org.apache.kafka.trogdor.rest.TasksResponse;
import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerRunning;
import org.apache.kafka.trogdor.task.NoOpTaskSpec;
import org.apache.kafka.trogdor.task.SampleTaskSpec;
import org.junit.Rule;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
@ -49,6 +53,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import static org.junit.Assert.assertEquals;
@ -94,8 +99,8 @@ public class CoordinatorTest {
time.sleep(2);
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
taskState(new TaskRunning(fooSpec, 2)).
workerState(new WorkerRunning(fooSpec, 2, "")).
taskState(new TaskRunning(fooSpec, 2, new TextNode("active"))).
workerState(new WorkerRunning(fooSpec, 2, new TextNode("active"))).
build()).
waitFor(cluster.coordinatorClient()).
waitFor(cluster.agentClient("node02"));
@ -103,7 +108,7 @@ public class CoordinatorTest {
time.sleep(3);
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
taskState(new TaskDone(fooSpec, 2, 5, "", false)).
taskState(new TaskDone(fooSpec, 2, 5, "", false, new TextNode("done"))).
build()).
waitFor(cluster.coordinatorClient());
}
@ -131,26 +136,34 @@ public class CoordinatorTest {
NoOpTaskSpec fooSpec = new NoOpTaskSpec(5, 2);
coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").taskState(new TaskPending(fooSpec)).build()).
addTask(new ExpectedTaskBuilder("foo").taskState(
new TaskPending(fooSpec)).build()).
waitFor(coordinatorClient).
waitFor(agentClient1).
waitFor(agentClient2);
time.sleep(11);
ObjectNode status1 = new ObjectNode(JsonNodeFactory.instance);
status1.set("node01", new TextNode("active"));
status1.set("node02", new TextNode("active"));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
taskState(new TaskRunning(fooSpec, 11)).
workerState(new WorkerRunning(fooSpec, 11, "")).
taskState(new TaskRunning(fooSpec, 11, status1)).
workerState(new WorkerRunning(fooSpec, 11, new TextNode("active"))).
build()).
waitFor(coordinatorClient).
waitFor(agentClient1).
waitFor(agentClient2);
time.sleep(2);
ObjectNode status2 = new ObjectNode(JsonNodeFactory.instance);
status2.set("node01", new TextNode("done"));
status2.set("node02", new TextNode("done"));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
taskState(new TaskDone(fooSpec, 11, 13, "", false)).
workerState(new WorkerDone(fooSpec, 11, 13, "", "")).
taskState(new TaskDone(fooSpec, 11, 13,
"", false, status2)).
workerState(new WorkerDone(fooSpec, 11, 13, new TextNode("done"), "")).
build()).
waitFor(coordinatorClient).
waitFor(agentClient1).
@ -186,21 +199,29 @@ public class CoordinatorTest {
waitFor(agentClient2);
time.sleep(11);
ObjectNode status1 = new ObjectNode(JsonNodeFactory.instance);
status1.set("node01", new TextNode("active"));
status1.set("node02", new TextNode("active"));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
taskState(new TaskRunning(fooSpec, 11)).
workerState(new WorkerRunning(fooSpec, 11, "")).
taskState(new TaskRunning(fooSpec, 11, status1)).
workerState(new WorkerRunning(fooSpec, 11, new TextNode("active"))).
build()).
waitFor(coordinatorClient).
waitFor(agentClient1).
waitFor(agentClient2);
ObjectNode status2 = new ObjectNode(JsonNodeFactory.instance);
status2.set("node01", new TextNode("done"));
status2.set("node02", new TextNode("done"));
time.sleep(1);
coordinatorClient.stopTask(new StopTaskRequest("foo"));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
taskState(new TaskDone(fooSpec, 11, 12, "", true)).
workerState(new WorkerDone(fooSpec, 11, 12, "", "")).
taskState(new TaskDone(fooSpec, 11, 12, "",
true, status2)).
workerState(new WorkerDone(fooSpec, 11, 12, new TextNode("done"), "")).
build()).
waitFor(coordinatorClient).
waitFor(agentClient1).
@ -375,8 +396,8 @@ public class CoordinatorTest {
time.sleep(2);
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
taskState(new TaskRunning(fooSpec, 2)).
workerState(new WorkerRunning(fooSpec, 2, "")).
taskState(new TaskRunning(fooSpec, 2, new TextNode("active"))).
workerState(new WorkerRunning(fooSpec, 2, new TextNode("active"))).
build()).
addTask(new ExpectedTaskBuilder("bar").
taskState(new TaskPending(barSpec)).
@ -394,4 +415,73 @@ public class CoordinatorTest {
new TasksRequest(null, 3, 0, 0, 0)).tasks().size());
}
}
@Test
public void testWorkersExitingAtDifferentTimes() throws Exception {
MockTime time = new MockTime(0, 0, 0);
Scheduler scheduler = new MockScheduler(time);
try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().
addCoordinator("node01").
addAgent("node02").
addAgent("node03").
scheduler(scheduler).
build()) {
CoordinatorClient coordinatorClient = cluster.coordinatorClient();
new ExpectedTasks().waitFor(coordinatorClient);
HashMap<String, Long> nodeToExitMs = new HashMap<>();
nodeToExitMs.put("node02", 10L);
nodeToExitMs.put("node03", 20L);
SampleTaskSpec fooSpec =
new SampleTaskSpec(2, 100, nodeToExitMs, "");
coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
taskState(new TaskPending(fooSpec)).
build()).
waitFor(coordinatorClient);
time.sleep(2);
ObjectNode status1 = new ObjectNode(JsonNodeFactory.instance);
status1.set("node02", new TextNode("active"));
status1.set("node03", new TextNode("active"));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
taskState(new TaskRunning(fooSpec, 2, status1)).
workerState(new WorkerRunning(fooSpec, 2, new TextNode("active"))).
build()).
waitFor(coordinatorClient).
waitFor(cluster.agentClient("node02")).
waitFor(cluster.agentClient("node03"));
time.sleep(10);
ObjectNode status2 = new ObjectNode(JsonNodeFactory.instance);
status2.set("node02", new TextNode("halted"));
status2.set("node03", new TextNode("active"));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
taskState(new TaskRunning(fooSpec, 2, status2)).
workerState(new WorkerRunning(fooSpec, 2, new TextNode("active"))).
build()).
waitFor(coordinatorClient).
waitFor(cluster.agentClient("node03"));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
taskState(new TaskRunning(fooSpec, 2, status2)).
workerState(new WorkerDone(fooSpec, 2, 12, new TextNode("halted"), "")).
build()).
waitFor(cluster.agentClient("node02"));
time.sleep(10);
ObjectNode status3 = new ObjectNode(JsonNodeFactory.instance);
status3.set("node02", new TextNode("halted"));
status3.set("node03", new TextNode("halted"));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
taskState(new TaskDone(fooSpec, 2, 22, "",
false, status3)).
build()).
waitFor(coordinatorClient);
}
}
};

View File

@ -20,23 +20,28 @@ package org.apache.kafka.trogdor.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class SampleTaskSpec extends TaskSpec {
private final long exitMs;
private final Map<String, Long> nodeToExitMs;
private final String error;
@JsonCreator
public SampleTaskSpec(@JsonProperty("startMs") long startMs,
@JsonProperty("durationMs") long durationMs,
@JsonProperty("exitMs") long exitMs,
@JsonProperty("nodeToExitMs") Map<String, Long> nodeToExitMs,
@JsonProperty("error") String error) {
super(startMs, durationMs);
this.exitMs = exitMs;
this.nodeToExitMs = nodeToExitMs == null ? new HashMap<String, Long>() :
Collections.unmodifiableMap(nodeToExitMs);
this.error = error == null ? "" : error;
}
@JsonProperty
public long exitMs() {
return exitMs;
public Map<String, Long> nodeToExitMs() {
return nodeToExitMs;
}
@JsonProperty

View File

@ -17,6 +17,7 @@
package org.apache.kafka.trogdor.task;
import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
@ -26,12 +27,12 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class SampleTaskWorker implements TaskWorker {
private final SampleTaskSpec spec;
private final ScheduledExecutorService executor;
private Future<Void> future;
private WorkerStatusTracker status;
SampleTaskWorker(SampleTaskSpec spec) {
this.spec = spec;
@ -41,17 +42,24 @@ public class SampleTaskWorker implements TaskWorker {
}
@Override
public synchronized void start(Platform platform, AtomicReference<String> status,
public synchronized void start(Platform platform, WorkerStatusTracker status,
final KafkaFutureImpl<String> haltFuture) throws Exception {
if (this.future != null)
return;
this.status = status;
this.status.update(new TextNode("active"));
Long exitMs = spec.nodeToExitMs().get(platform.curNode().name());
if (exitMs == null) {
exitMs = Long.MAX_VALUE;
}
this.future = platform.scheduler().schedule(executor, new Callable<Void>() {
@Override
public Void call() throws Exception {
haltFuture.complete(spec.error());
return null;
}
}, spec.exitMs());
}, exitMs);
}
@Override
@ -59,5 +67,6 @@ public class SampleTaskWorker implements TaskWorker {
this.future.cancel(false);
this.executor.shutdown();
this.executor.awaitTermination(1, TimeUnit.DAYS);
this.status.update(new TextNode("halted"));
}
};

View File

@ -40,11 +40,11 @@ public class TaskSpecTest {
} catch (InvalidTypeIdException e) {
}
String inputJson = "{\"class\":\"org.apache.kafka.trogdor.task.SampleTaskSpec\"," +
"\"startMs\":123,\"durationMs\":456,\"exitMs\":1000,\"error\":\"foo\"}";
"\"startMs\":123,\"durationMs\":456,\"nodeToExitMs\":{\"node01\":1000},\"error\":\"foo\"}";
SampleTaskSpec spec = JsonUtil.JSON_SERDE.readValue(inputJson, SampleTaskSpec.class);
assertEquals(123, spec.startMs());
assertEquals(456, spec.durationMs());
assertEquals(1000, spec.exitMs());
assertEquals(Long.valueOf(1000), spec.nodeToExitMs().get("node01"));
assertEquals("foo", spec.error());
String outputJson = JsonUtil.toJsonString(spec);
assertEquals(inputJson, outputJson);