diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 2767132886d..64258bf7b07 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -45,7 +45,7 @@
+ files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest).java"/>
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
index 3b5b21e68d8..0324d2d2dba 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
@@ -27,10 +27,9 @@ import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
-import org.apache.kafka.trogdor.rest.CreateWorkerResponse;
+import org.apache.kafka.trogdor.rest.DestroyWorkerRequest;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.apache.kafka.trogdor.rest.StopWorkerRequest;
-import org.apache.kafka.trogdor.rest.StopWorkerResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -95,13 +94,16 @@ public final class Agent {
return new AgentStatusResponse(serverStartMs, workerManager.workerStates());
}
- public CreateWorkerResponse createWorker(CreateWorkerRequest req) throws Exception {
- workerManager.createWorker(req.id(), req.spec());
- return new CreateWorkerResponse(req.spec());
+ public void createWorker(CreateWorkerRequest req) throws Throwable {
+ workerManager.createWorker(req.workerId(), req.taskId(), req.spec());
}
- public StopWorkerResponse stopWorker(StopWorkerRequest req) throws Exception {
- return new StopWorkerResponse(workerManager.stopWorker(req.id()));
+ public void stopWorker(StopWorkerRequest req) throws Throwable {
+ workerManager.stopWorker(req.workerId(), false);
+ }
+
+ public void destroyWorker(DestroyWorkerRequest req) throws Throwable {
+ workerManager.stopWorker(req.workerId(), true);
}
public static void main(String[] args) throws Exception {
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java
index 08769a0971d..c89011b8650 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java
@@ -27,15 +27,16 @@ import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
-import org.apache.kafka.trogdor.rest.CreateWorkerResponse;
+import org.apache.kafka.trogdor.rest.DestroyWorkerRequest;
import org.apache.kafka.trogdor.rest.Empty;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse;
import org.apache.kafka.trogdor.rest.StopWorkerRequest;
-import org.apache.kafka.trogdor.rest.StopWorkerResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.ws.rs.core.UriBuilder;
+
import static net.sourceforge.argparse4j.impl.Arguments.store;
import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
@@ -116,20 +117,29 @@ public class AgentClient {
return resp.body();
}
- public CreateWorkerResponse createWorker(CreateWorkerRequest request) throws Exception {
- HttpResponse resp =
- JsonRestServer.httpRequest(
+ public void createWorker(CreateWorkerRequest request) throws Exception {
+ HttpResponse resp =
+ JsonRestServer.httpRequest(
url("/agent/worker/create"), "POST",
- request, new TypeReference() { }, maxTries);
- return resp.body();
+ request, new TypeReference() { }, maxTries);
+ resp.body();
}
- public StopWorkerResponse stopWorker(StopWorkerRequest request) throws Exception {
- HttpResponse resp =
- JsonRestServer.httpRequest(url(
+ public void stopWorker(StopWorkerRequest request) throws Exception {
+ HttpResponse resp =
+ JsonRestServer.httpRequest(url(
"/agent/worker/stop"), "PUT",
- request, new TypeReference() { }, maxTries);
- return resp.body();
+ request, new TypeReference() { }, maxTries);
+ resp.body();
+ }
+
+ public void destroyWorker(DestroyWorkerRequest request) throws Exception {
+ UriBuilder uriBuilder = UriBuilder.fromPath(url("/agent/worker"));
+ uriBuilder.queryParam("workerId", request.workerId());
+ HttpResponse resp =
+ JsonRestServer.httpRequest(uriBuilder.build().toString(), "DELETE",
+ null, new TypeReference() { }, maxTries);
+ resp.body();
}
public void invokeShutdown() throws Exception {
@@ -166,10 +176,16 @@ public class AgentClient {
.help("Create a new fault.");
actions.addArgument("--stop-worker")
.action(store())
- .type(String.class)
+ .type(Long.class)
.dest("stop_worker")
- .metavar("SPEC_JSON")
- .help("Create a new fault.");
+ .metavar("WORKER_ID")
+ .help("Stop a worker ID.");
+ actions.addArgument("--destroy-worker")
+ .action(store())
+ .type(Long.class)
+ .dest("destroy_worker")
+ .metavar("WORKER_ID")
+ .help("Destroy a worker ID.");
actions.addArgument("--shutdown")
.action(storeTrue())
.type(Boolean.class)
@@ -197,13 +213,21 @@ public class AgentClient {
System.out.println("Got agent status: " +
JsonUtil.toPrettyJsonString(client.status()));
} else if (res.getString("create_worker") != null) {
- client.createWorker(JsonUtil.JSON_SERDE.
- readValue(res.getString("create_worker"),
- CreateWorkerRequest.class));
- System.out.println("Created fault.");
+ CreateWorkerRequest req = JsonUtil.JSON_SERDE.
+ readValue(res.getString("create_worker"), CreateWorkerRequest.class);
+ client.createWorker(req);
+ System.out.printf("Sent CreateWorkerRequest for worker %d%n.", req.workerId());
+ } else if (res.getString("stop_worker") != null) {
+ long workerId = res.getLong("stop_worker");
+ client.stopWorker(new StopWorkerRequest(workerId));
+ System.out.printf("Sent StopWorkerRequest for worker %d%n.", workerId);
+ } else if (res.getString("destroy_worker") != null) {
+ long workerId = res.getLong("stop_worker");
+ client.destroyWorker(new DestroyWorkerRequest(workerId));
+ System.out.printf("Sent DestroyWorkerRequest for worker %d%n.", workerId);
} else if (res.getBoolean("shutdown")) {
client.invokeShutdown();
- System.out.println("Sent shutdown request.");
+ System.out.println("Sent ShutdownRequest.");
} else {
System.out.println("You must choose an action. Type --help for help.");
Exit.exit(1);
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java
index 773c580fa15..1f2ad49d2fe 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java
@@ -18,22 +18,34 @@ package org.apache.kafka.trogdor.agent;
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
-import org.apache.kafka.trogdor.rest.CreateWorkerResponse;
+import org.apache.kafka.trogdor.rest.DestroyWorkerRequest;
import org.apache.kafka.trogdor.rest.Empty;
import org.apache.kafka.trogdor.rest.StopWorkerRequest;
-import org.apache.kafka.trogdor.rest.StopWorkerResponse;
import javax.servlet.ServletContext;
import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import java.util.concurrent.atomic.AtomicReference;
-
+/**
+ * The REST resource for the Agent. This describes the RPCs which the agent can accept.
+ *
+ * RPCs should be idempotent. This is important because if the server's response is
+ * lost, the client will simply retransmit the same request. The server's response must
+ * be the same the second time around.
+ *
+ * We return the empty JSON object {} rather than void for RPCs that have no results.
+ * This ensures that if we want to add more return results later, we can do so in a
+ * compatible way.
+ */
@Path("/agent")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@@ -55,14 +67,23 @@ public class AgentRestResource {
@POST
@Path("/worker/create")
- public CreateWorkerResponse createWorker(CreateWorkerRequest req) throws Throwable {
- return agent().createWorker(req);
+ public Empty createWorker(CreateWorkerRequest req) throws Throwable {
+ agent().createWorker(req);
+ return Empty.INSTANCE;
}
@PUT
@Path("/worker/stop")
- public StopWorkerResponse stopWorker(StopWorkerRequest req) throws Throwable {
- return agent().stopWorker(req);
+ public Empty stopWorker(StopWorkerRequest req) throws Throwable {
+ agent().stopWorker(req);
+ return Empty.INSTANCE;
+ }
+
+ @DELETE
+ @Path("/worker")
+ public Empty destroyWorker(@DefaultValue("0") @QueryParam("workerId") long workerId) throws Throwable {
+ agent().destroyWorker(new DestroyWorkerRequest(workerId));
+ return Empty.INSTANCE;
}
@PUT
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
index 7c8de6d3f22..59d34c90ab6 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
+import org.apache.kafka.trogdor.rest.RequestConflictException;
import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerRunning;
import org.apache.kafka.trogdor.rest.WorkerStarting;
@@ -36,10 +37,12 @@ import org.apache.kafka.trogdor.task.TaskWorker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
@@ -72,7 +75,7 @@ public final class WorkerManager {
/**
* A map of task IDs to Work objects.
*/
- private final Map workers;
+ private final Map workers;
/**
* An ExecutorService used to schedule events in the future.
@@ -137,12 +140,15 @@ public final class WorkerManager {
return false;
}
shutdown = true;
+ if (refCount == 0) {
+ this.notifyAll();
+ }
return true;
}
synchronized void waitForQuiescence() throws InterruptedException {
while ((!shutdown) || (refCount > 0)) {
- wait();
+ this.wait();
}
}
}
@@ -173,10 +179,15 @@ public final class WorkerManager {
* A worker which is being tracked.
*/
class Worker {
+ /**
+ * The worker ID.
+ */
+ private final long workerId;
+
/**
* The task ID.
*/
- private final String id;
+ private final String taskId;
/**
* The task specification.
@@ -217,7 +228,7 @@ public final class WorkerManager {
* If there is a task timeout scheduled, this is a future which can
* be used to cancel it.
*/
- private Future timeoutFuture = null;
+ private Future timeoutFuture = null;
/**
* A shutdown manager reference which will keep the WorkerManager
@@ -225,16 +236,26 @@ public final class WorkerManager {
*/
private ShutdownManager.Reference reference;
- Worker(String id, TaskSpec spec, long now) {
- this.id = id;
+ /**
+ * Whether we should destroy the records of this worker once it stops.
+ */
+ private boolean mustDestroy = false;
+
+ Worker(long workerId, String taskId, TaskSpec spec, long now) {
+ this.workerId = workerId;
+ this.taskId = taskId;
this.spec = spec;
- this.taskWorker = spec.newTaskWorker(id);
+ this.taskWorker = spec.newTaskWorker(taskId);
this.startedMs = now;
this.reference = shutdownManager.takeReference();
}
- String id() {
- return id;
+ long workerId() {
+ return workerId;
+ }
+
+ String taskId() {
+ return taskId;
}
TaskSpec spec() {
@@ -244,14 +265,14 @@ public final class WorkerManager {
WorkerState state() {
switch (state) {
case STARTING:
- return new WorkerStarting(spec);
+ return new WorkerStarting(taskId, spec);
case RUNNING:
- return new WorkerRunning(spec, startedMs, status.get());
+ return new WorkerRunning(taskId, spec, startedMs, status.get());
case CANCELLING:
case STOPPING:
- return new WorkerStopping(spec, startedMs, status.get());
+ return new WorkerStopping(taskId, spec, startedMs, status.get());
case DONE:
- return new WorkerDone(spec, startedMs, doneMs, status.get(), error);
+ return new WorkerDone(taskId, spec, startedMs, doneMs, status.get(), error);
}
throw new RuntimeException("unreachable");
}
@@ -259,7 +280,7 @@ public final class WorkerManager {
void transitionToRunning() {
state = State.RUNNING;
timeoutFuture = scheduler.schedule(stateChangeExecutor,
- new StopWorker(id), spec.durationMs());
+ new StopWorker(workerId, false), spec.durationMs());
}
void transitionToStopping() {
@@ -268,7 +289,7 @@ public final class WorkerManager {
timeoutFuture.cancel(false);
timeoutFuture = null;
}
- workerCleanupExecutor.submit(new CleanupWorker(this));
+ workerCleanupExecutor.submit(new HaltWorker(this));
}
void transitionToDone() {
@@ -279,15 +300,20 @@ public final class WorkerManager {
reference = null;
}
}
+
+ @Override
+ public String toString() {
+ return String.format("%s_%d", taskId, workerId);
+ }
}
- public void createWorker(final String id, TaskSpec spec) throws Exception {
+ public void createWorker(long workerId, String taskId, TaskSpec spec) throws Throwable {
try (ShutdownManager.Reference ref = shutdownManager.takeReference()) {
final Worker worker = stateChangeExecutor.
- submit(new CreateWorker(id, spec, time.milliseconds())).get();
+ submit(new CreateWorker(workerId, taskId, spec, time.milliseconds())).get();
if (worker == null) {
log.info("{}: Ignoring request to create worker {}, because there is already " +
- "a worker with that id.", nodeName, id);
+ "a worker with that id.", nodeName, workerId);
return;
}
KafkaFutureImpl haltFuture = new KafkaFutureImpl<>();
@@ -297,9 +323,10 @@ public final class WorkerManager {
if (errorString == null)
errorString = "";
if (errorString.isEmpty()) {
- log.info("{}: Worker {} is halting.", nodeName, id);
+ log.info("{}: Worker {} is halting.", nodeName, worker);
} else {
- log.info("{}: Worker {} is halting with error {}", nodeName, id, errorString);
+ log.info("{}: Worker {} is halting with error {}",
+ nodeName, worker, errorString);
}
stateChangeExecutor.submit(
new HandleWorkerHalting(worker, errorString, false));
@@ -309,11 +336,20 @@ public final class WorkerManager {
try {
worker.taskWorker.start(platform, worker.status, haltFuture);
} catch (Exception e) {
- log.info("{}: Worker {} start() exception", nodeName, id, e);
+ log.info("{}: Worker {} start() exception", nodeName, worker, e);
stateChangeExecutor.submit(new HandleWorkerHalting(worker,
"worker.start() exception: " + Utils.stackTrace(e), true));
}
stateChangeExecutor.submit(new FinishCreatingWorker(worker));
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof RequestConflictException) {
+ log.info("{}: request conflict while creating worker {} for task {} with spec {}.",
+ nodeName, workerId, taskId, spec);
+ } else {
+ log.info("{}: Error creating worker {} for task {} with spec {}",
+ nodeName, workerId, taskId, spec, e);
+ }
+ throw e.getCause();
}
}
@@ -321,27 +357,42 @@ public final class WorkerManager {
* Handles a request to create a new worker. Processed by the state change thread.
*/
class CreateWorker implements Callable {
- private final String id;
+ private final long workerId;
+ private final String taskId;
private final TaskSpec spec;
private final long now;
- CreateWorker(String id, TaskSpec spec, long now) {
- this.id = id;
+ CreateWorker(long workerId, String taskId, TaskSpec spec, long now) {
+ this.workerId = workerId;
+ this.taskId = taskId;
this.spec = spec;
this.now = now;
}
@Override
public Worker call() throws Exception {
- Worker worker = workers.get(id);
- if (worker != null) {
- log.info("{}: Task ID {} is already in use.", nodeName, id);
- return null;
+ try {
+ Worker worker = workers.get(workerId);
+ if (worker != null) {
+ if (!worker.taskId().equals(taskId)) {
+ throw new RequestConflictException("There is already a worker ID " + workerId +
+ " with a different task ID.");
+ } else if (!worker.spec().equals(spec)) {
+ throw new RequestConflictException("There is already a worker ID " + workerId +
+ " with a different task spec.");
+ } else {
+ return null;
+ }
+ }
+ worker = new Worker(workerId, taskId, spec, now);
+ workers.put(workerId, worker);
+ log.info("{}: Created worker {} with spec {}", nodeName, worker, spec);
+ return worker;
+ } catch (Exception e) {
+ log.info("{}: unable to create worker {} for task {}, with spec {}",
+ nodeName, workerId, taskId, spec, e);
+ throw e;
}
- worker = new Worker(id, spec, now);
- workers.put(id, worker);
- log.info("{}: Created a new worker for task {} with spec {}", nodeName, id, spec);
- return worker;
}
}
@@ -360,12 +411,12 @@ public final class WorkerManager {
switch (worker.state) {
case CANCELLING:
log.info("{}: Worker {} was cancelled while it was starting up. " +
- "Transitioning to STOPPING.", nodeName, worker.id);
+ "Transitioning to STOPPING.", nodeName, worker);
worker.transitionToStopping();
break;
case STARTING:
log.info("{}: Worker {} is now RUNNING. Scheduled to stop in {} ms.",
- nodeName, worker.id, worker.spec.durationMs());
+ nodeName, worker, worker.spec.durationMs());
worker.transitionToRunning();
break;
default:
@@ -400,29 +451,29 @@ public final class WorkerManager {
case STARTING:
if (startupHalt) {
log.info("{}: Worker {} {} during startup. Transitioning to DONE.",
- nodeName, worker.id, verb);
+ nodeName, worker, verb);
worker.transitionToDone();
} else {
log.info("{}: Worker {} {} during startup. Transitioning to CANCELLING.",
- nodeName, worker.id, verb);
+ nodeName, worker, verb);
worker.state = State.CANCELLING;
}
break;
case CANCELLING:
log.info("{}: Cancelling worker {} {}. ",
- nodeName, worker.id, verb);
+ nodeName, worker, verb);
break;
case RUNNING:
log.info("{}: Running worker {} {}. Transitioning to STOPPING.",
- nodeName, worker.id, verb);
+ nodeName, worker, verb);
worker.transitionToStopping();
break;
case STOPPING:
- log.info("{}: Stopping worker {} {}.", nodeName, worker.id, verb);
+ log.info("{}: Stopping worker {} {}.", nodeName, worker, verb);
break;
case DONE:
log.info("{}: Can't halt worker {} because it is already DONE.",
- nodeName, worker.id);
+ nodeName, worker);
break;
}
return null;
@@ -432,7 +483,7 @@ public final class WorkerManager {
/**
* Transitions a worker to WorkerDone. Processed by the state change thread.
*/
- static class CompleteWorker implements Callable {
+ class CompleteWorker implements Callable {
private final Worker worker;
private final String failure;
@@ -448,60 +499,79 @@ public final class WorkerManager {
worker.error = failure;
}
worker.transitionToDone();
+ if (worker.mustDestroy) {
+ log.info("{}: destroying worker {} with error {}",
+ nodeName, worker, worker.error);
+ workers.remove(worker.workerId);
+ } else {
+ log.info("{}: completed worker {} with error {}",
+ nodeName, worker, worker.error);
+ }
return null;
}
}
- public TaskSpec stopWorker(String id) throws Exception {
+ public void stopWorker(long workerId, boolean mustDestroy) throws Throwable {
try (ShutdownManager.Reference ref = shutdownManager.takeReference()) {
- TaskSpec taskSpec = stateChangeExecutor.submit(new StopWorker(id)).get();
- if (taskSpec == null) {
- throw new KafkaException("No task found with id " + id);
- }
- return taskSpec;
+ stateChangeExecutor.submit(new StopWorker(workerId, mustDestroy)).get();
+ } catch (ExecutionException e) {
+ throw e.getCause();
}
}
/**
* Stops a worker. Processed by the state change thread.
*/
- class StopWorker implements Callable {
- private final String id;
+ class StopWorker implements Callable {
+ private final long workerId;
+ private final boolean mustDestroy;
- StopWorker(String id) {
- this.id = id;
+ StopWorker(long workerId, boolean mustDestroy) {
+ this.workerId = workerId;
+ this.mustDestroy = mustDestroy;
}
@Override
- public TaskSpec call() throws Exception {
- Worker worker = workers.get(id);
+ public Void call() throws Exception {
+ Worker worker = workers.get(workerId);
if (worker == null) {
+ log.info("{}: Can't stop worker {} because there is no worker with that ID.",
+ nodeName, workerId);
return null;
}
+ if (mustDestroy) {
+ worker.mustDestroy = true;
+ }
switch (worker.state) {
case STARTING:
log.info("{}: Cancelling worker {} during its startup process.",
- nodeName, id);
+ nodeName, worker);
worker.state = State.CANCELLING;
break;
case CANCELLING:
log.info("{}: Can't stop worker {}, because it is already being " +
- "cancelled.", nodeName, id);
+ "cancelled.", nodeName, worker);
break;
case RUNNING:
- log.info("{}: Stopping running worker {}.", nodeName, id);
+ log.info("{}: Stopping running worker {}.", nodeName, worker);
worker.transitionToStopping();
break;
case STOPPING:
log.info("{}: Can't stop worker {}, because it is already " +
- "stopping.", nodeName, id);
+ "stopping.", nodeName, worker);
break;
case DONE:
- log.debug("{}: Can't stop worker {}, because it is already done.",
- nodeName, id);
+ if (worker.mustDestroy) {
+ log.info("{}: destroying worker {} with error {}",
+ nodeName, worker, worker.error);
+ workers.remove(worker.workerId);
+ } else {
+ log.debug("{}: Can't stop worker {}, because it is already done.",
+ nodeName, worker);
+ }
break;
}
- return worker.spec();
+ return null;
}
}
@@ -509,10 +579,10 @@ public final class WorkerManager {
* Cleans up the resources associated with a worker. Processed by the worker
* cleanup thread pool.
*/
- class CleanupWorker implements Callable {
+ class HaltWorker implements Callable {
private final Worker worker;
- CleanupWorker(Worker worker) {
+ HaltWorker(Worker worker) {
this.worker = worker;
}
@@ -530,18 +600,18 @@ public final class WorkerManager {
}
}
- public TreeMap workerStates() throws Exception {
+ public TreeMap workerStates() throws Exception {
try (ShutdownManager.Reference ref = shutdownManager.takeReference()) {
return stateChangeExecutor.submit(new GetWorkerStates()).get();
}
}
- class GetWorkerStates implements Callable> {
+ class GetWorkerStates implements Callable> {
@Override
- public TreeMap call() throws Exception {
- TreeMap workerMap = new TreeMap<>();
+ public TreeMap call() throws Exception {
+ TreeMap workerMap = new TreeMap<>();
for (Worker worker : workers.values()) {
- workerMap.put(worker.id(), worker.state());
+ workerMap.put(worker.workerId(), worker.state());
}
return workerMap;
}
@@ -562,17 +632,53 @@ public final class WorkerManager {
class Shutdown implements Callable {
@Override
public Void call() throws Exception {
- log.info("{}: Shutting down WorkerManager.", platform.curNode().name());
- for (Worker worker : workers.values()) {
- stateChangeExecutor.submit(new StopWorker(worker.id));
+ log.info("{}: Shutting down WorkerManager.", nodeName);
+ try {
+ stateChangeExecutor.submit(new DestroyAllWorkers()).get();
+ log.info("{}: Waiting for shutdownManager quiescence...", nodeName);
+ shutdownManager.waitForQuiescence();
+ workerCleanupExecutor.shutdownNow();
+ stateChangeExecutor.shutdownNow();
+ log.info("{}: Waiting for workerCleanupExecutor to terminate...", nodeName);
+ workerCleanupExecutor.awaitTermination(1, TimeUnit.DAYS);
+ log.info("{}: Waiting for stateChangeExecutor to terminate...", nodeName);
+ stateChangeExecutor.awaitTermination(1, TimeUnit.DAYS);
+ log.info("{}: Shutting down shutdownExecutor.", nodeName);
+ shutdownExecutor.shutdown();
+ } catch (Exception e) {
+ log.info("{}: Caught exception while shutting down WorkerManager", nodeName, e);
+ throw e;
}
- shutdownManager.waitForQuiescence();
- workerCleanupExecutor.shutdownNow();
- stateChangeExecutor.shutdownNow();
- workerCleanupExecutor.awaitTermination(1, TimeUnit.DAYS);
- stateChangeExecutor.awaitTermination(1, TimeUnit.DAYS);
- shutdownExecutor.shutdown();
return null;
}
}
+
+ /**
+ * Begins the process of destroying all workers. Processed by the state change thread.
+ */
+ class DestroyAllWorkers implements Callable {
+ @Override
+ public Void call() throws Exception {
+ log.info("{}: Destroying all workers.", nodeName);
+
+ // StopWorker may remove elements from the set of worker IDs. That might generate
+ // a ConcurrentModificationException if we were iterating over the worker ID
+ // set directly. Therefore, we make a copy of the worker IDs here and iterate
+ // over that instead.
+ //
+ // Note that there is no possible way that more worker IDs can be added while this
+ // callable is running, because the state change executor is single-threaded.
+ ArrayList workerIds = new ArrayList<>(workers.keySet());
+
+ for (long workerId : workerIds) {
+ try {
+ new StopWorker(workerId, true).call();
+ } catch (Exception e) {
+ log.error("Failed to stop worker {}", workerId, e);
+ }
+ }
+ return null;
+ }
+ }
+
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
index 717d7c7047a..23f3ceb91b0 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
@@ -27,15 +27,16 @@ import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
import org.apache.kafka.trogdor.rest.CreateTaskRequest;
-import org.apache.kafka.trogdor.rest.CreateTaskResponse;
+import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.apache.kafka.trogdor.rest.StopTaskRequest;
-import org.apache.kafka.trogdor.rest.StopTaskResponse;
import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TasksResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.ThreadLocalRandom;
+
import static net.sourceforge.argparse4j.impl.Arguments.store;
/**
@@ -72,9 +73,9 @@ public final class Coordinator {
* @param resource The AgentRestResoure to use.
*/
public Coordinator(Platform platform, Scheduler scheduler, JsonRestServer restServer,
- CoordinatorRestResource resource) {
+ CoordinatorRestResource resource, long firstWorkerId) {
this.startTimeMs = scheduler.time().milliseconds();
- this.taskManager = new TaskManager(platform, scheduler);
+ this.taskManager = new TaskManager(platform, scheduler, firstWorkerId);
this.restServer = restServer;
resource.setCoordinator(this);
}
@@ -87,12 +88,16 @@ public final class Coordinator {
return new CoordinatorStatusResponse(startTimeMs);
}
- public CreateTaskResponse createTask(CreateTaskRequest request) throws Exception {
- return new CreateTaskResponse(taskManager.createTask(request.id(), request.spec()));
+ public void createTask(CreateTaskRequest request) throws Throwable {
+ taskManager.createTask(request.id(), request.spec());
}
- public StopTaskResponse stopTask(StopTaskRequest request) throws Exception {
- return new StopTaskResponse(taskManager.stopTask(request.id()));
+ public void stopTask(StopTaskRequest request) throws Throwable {
+ taskManager.stopTask(request.id());
+ }
+
+ public void destroyTask(DestroyTaskRequest request) throws Throwable {
+ taskManager.destroyTask(request.id());
}
public TasksResponse tasks(TasksRequest request) throws Exception {
@@ -149,7 +154,7 @@ public final class Coordinator {
CoordinatorRestResource resource = new CoordinatorRestResource();
log.info("Starting coordinator process.");
final Coordinator coordinator = new Coordinator(platform, Scheduler.SYSTEM,
- restServer, resource);
+ restServer, resource, ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2));
restServer.start(resource);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
index 0677296ee3c..780ae737e0e 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
@@ -27,12 +27,11 @@ import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
import org.apache.kafka.trogdor.rest.CreateTaskRequest;
-import org.apache.kafka.trogdor.rest.CreateTaskResponse;
+import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
import org.apache.kafka.trogdor.rest.Empty;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse;
import org.apache.kafka.trogdor.rest.StopTaskRequest;
-import org.apache.kafka.trogdor.rest.StopTaskResponse;
import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TasksResponse;
import org.slf4j.Logger;
@@ -116,36 +115,45 @@ public class CoordinatorClient {
return resp.body();
}
- public CreateTaskResponse createTask(CreateTaskRequest request) throws Exception {
- HttpResponse resp =
- JsonRestServer.httpRequest(log, url("/coordinator/task/create"), "POST",
- request, new TypeReference() { }, maxTries);
- return resp.body();
+ public void createTask(CreateTaskRequest request) throws Exception {
+ HttpResponse resp =
+ JsonRestServer.httpRequest(log, url("/coordinator/task/create"), "POST",
+ request, new TypeReference() { }, maxTries);
+ resp.body();
}
- public StopTaskResponse stopTask(StopTaskRequest request) throws Exception {
- HttpResponse resp =
- JsonRestServer.httpRequest(log, url("/coordinator/task/stop"), "PUT",
- request, new TypeReference() { }, maxTries);
- return resp.body();
+ public void stopTask(StopTaskRequest request) throws Exception {
+ HttpResponse resp =
+ JsonRestServer.httpRequest(log, url("/coordinator/task/stop"), "PUT",
+ request, new TypeReference() { }, maxTries);
+ resp.body();
+ }
+
+ public void destroyTask(DestroyTaskRequest request) throws Exception {
+ UriBuilder uriBuilder = UriBuilder.fromPath(url("/coordinator/tasks"));
+ uriBuilder.queryParam("taskId", request.id());
+ HttpResponse resp =
+ JsonRestServer.httpRequest(log, uriBuilder.build().toString(), "DELETE",
+ null, new TypeReference() { }, maxTries);
+ resp.body();
}
public TasksResponse tasks(TasksRequest request) throws Exception {
UriBuilder uriBuilder = UriBuilder.fromPath(url("/coordinator/tasks"));
- uriBuilder.queryParam("taskId", request.taskIds().toArray(new String[0]));
+ uriBuilder.queryParam("taskId", (Object[]) request.taskIds().toArray(new String[0]));
uriBuilder.queryParam("firstStartMs", request.firstStartMs());
uriBuilder.queryParam("lastStartMs", request.lastStartMs());
uriBuilder.queryParam("firstEndMs", request.firstEndMs());
uriBuilder.queryParam("lastEndMs", request.lastEndMs());
HttpResponse resp =
- JsonRestServer.httpRequest(log, uriBuilder.build().toString(), "GET",
+ JsonRestServer.httpRequest(log, uriBuilder.build().toString(), "GET",
null, new TypeReference() { }, maxTries);
return resp.body();
}
public void shutdown() throws Exception {
HttpResponse resp =
- JsonRestServer.httpRequest(log, url("/coordinator/shutdown"), "PUT",
+ JsonRestServer.httpRequest(log, url("/coordinator/shutdown"), "PUT",
null, new TypeReference() { }, maxTries);
resp.body();
}
@@ -185,6 +193,12 @@ public class CoordinatorClient {
.dest("stop_task")
.metavar("TASK_ID")
.help("Stop a task.");
+ actions.addArgument("--destroy-task")
+ .action(store())
+ .type(String.class)
+ .dest("destroy_task")
+ .metavar("TASK_ID")
+ .help("Destroy a task.");
actions.addArgument("--shutdown")
.action(storeTrue())
.type(Boolean.class)
@@ -216,15 +230,21 @@ public class CoordinatorClient {
JsonUtil.toPrettyJsonString(client.tasks(
new TasksRequest(null, 0, 0, 0, 0))));
} else if (res.getString("create_task") != null) {
- client.createTask(JsonUtil.JSON_SERDE.readValue(res.getString("create_task"),
- CreateTaskRequest.class));
- System.out.println("Created task.");
+ CreateTaskRequest req = JsonUtil.JSON_SERDE.
+ readValue(res.getString("create_task"), CreateTaskRequest.class);
+ client.createTask(req);
+ System.out.printf("Sent CreateTaskRequest for task %s.", req.id());
} else if (res.getString("stop_task") != null) {
- client.stopTask(new StopTaskRequest(res.getString("stop_task")));
- System.out.println("Created task.");
+ String taskId = res.getString("stop_task");
+ client.stopTask(new StopTaskRequest(taskId));
+ System.out.printf("Sent StopTaskRequest for task %s.%n", taskId);
+ } else if (res.getString("destroy_task") != null) {
+ String taskId = res.getString("destroy_task");
+ client.destroyTask(new DestroyTaskRequest(taskId));
+ System.out.printf("Sent DestroyTaskRequest for task %s.%n", taskId);
} else if (res.getBoolean("shutdown")) {
client.shutdown();
- System.out.println("Sent shutdown request.");
+ System.out.println("Sent ShutdownRequest.");
} else {
System.out.println("You must choose an action. Type --help for help.");
Exit.exit(1);
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
index b8663ec4cc3..cbfbddd7eda 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
@@ -19,15 +19,15 @@ package org.apache.kafka.trogdor.coordinator;
import org.apache.kafka.trogdor.rest.CoordinatorShutdownRequest;
import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
import org.apache.kafka.trogdor.rest.CreateTaskRequest;
-import org.apache.kafka.trogdor.rest.CreateTaskResponse;
+import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
import org.apache.kafka.trogdor.rest.Empty;
import org.apache.kafka.trogdor.rest.StopTaskRequest;
-import org.apache.kafka.trogdor.rest.StopTaskResponse;
import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TasksResponse;
import javax.servlet.ServletContext;
import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
@@ -39,7 +39,18 @@ import javax.ws.rs.core.MediaType;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
-
+/**
+ * The REST resource for the Coordinator. This describes the RPCs which the coordinator
+ * can accept.
+ *
+ * RPCs should be idempotent. This is important because if the server's response is
+ * lost, the client will simply retransmit the same request. The server's response must
+ * be the same the second time around.
+ *
+ * We return the empty JSON object {} rather than void for RPCs that have no results.
+ * This ensures that if we want to add more return results later, we can do so in a
+ * compatible way.
+ */
@Path("/coordinator")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@@ -61,14 +72,23 @@ public class CoordinatorRestResource {
@POST
@Path("/task/create")
- public CreateTaskResponse createTask(CreateTaskRequest request) throws Throwable {
- return coordinator().createTask(request);
+ public Empty createTask(CreateTaskRequest request) throws Throwable {
+ coordinator().createTask(request);
+ return Empty.INSTANCE;
}
@PUT
@Path("/task/stop")
- public StopTaskResponse stopTask(StopTaskRequest request) throws Throwable {
- return coordinator().stopTask(request);
+ public Empty stopTask(StopTaskRequest request) throws Throwable {
+ coordinator().stopTask(request);
+ return Empty.INSTANCE;
+ }
+
+ @DELETE
+ @Path("/tasks")
+ public Empty destroyTask(@DefaultValue("") @QueryParam("taskId") String taskId) throws Throwable {
+ coordinator().destroyTask(new DestroyTaskRequest(taskId));
+ return Empty.INSTANCE;
}
@GET
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
index 91ef9c2928a..3f0075e598a 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
@@ -79,13 +79,16 @@ public final class NodeManager {
private static final long HEARTBEAT_DELAY_MS = 1000L;
class ManagedWorker {
- private final String id;
+ private final long workerId;
+ private final String taskId;
private final TaskSpec spec;
private boolean shouldRun;
private WorkerState state;
- ManagedWorker(String id, TaskSpec spec, boolean shouldRun, WorkerState state) {
- this.id = id;
+ ManagedWorker(long workerId, String taskId, TaskSpec spec,
+ boolean shouldRun, WorkerState state) {
+ this.workerId = workerId;
+ this.taskId = taskId;
this.spec = spec;
this.shouldRun = shouldRun;
this.state = state;
@@ -93,19 +96,24 @@ public final class NodeManager {
void tryCreate() {
try {
- client.createWorker(new CreateWorkerRequest(id, spec));
+ client.createWorker(new CreateWorkerRequest(workerId, taskId, spec));
} catch (Throwable e) {
- log.error("{}: error creating worker {}.", node.name(), id, e);
+ log.error("{}: error creating worker {}.", node.name(), this, e);
}
}
void tryStop() {
try {
- client.stopWorker(new StopWorkerRequest(id));
+ client.stopWorker(new StopWorkerRequest(workerId));
} catch (Throwable e) {
- log.error("{}: error stopping worker {}.", node.name(), id, e);
+ log.error("{}: error stopping worker {}.", node.name(), this, e);
}
}
+
+ @Override
+ public String toString() {
+ return String.format("%s_%d", taskId, workerId);
+ }
}
/**
@@ -126,7 +134,7 @@ public final class NodeManager {
/**
* Maps task IDs to worker structures.
*/
- private final Map workers;
+ private final Map workers;
/**
* An executor service which manages the thread dedicated to this node.
@@ -196,24 +204,25 @@ public final class NodeManager {
}
// Identify workers which we think should be running, but which do not appear
// in the agent's response. We need to send startWorker requests for these.
- for (Map.Entry entry : workers.entrySet()) {
- String id = entry.getKey();
- if (!agentStatus.workers().containsKey(id)) {
+ for (Map.Entry entry : workers.entrySet()) {
+ Long workerId = entry.getKey();
+ if (!agentStatus.workers().containsKey(workerId)) {
ManagedWorker worker = entry.getValue();
if (worker.shouldRun) {
worker.tryCreate();
}
}
}
- for (Map.Entry entry : agentStatus.workers().entrySet()) {
- String id = entry.getKey();
+ for (Map.Entry entry : agentStatus.workers().entrySet()) {
+ long workerId = entry.getKey();
WorkerState state = entry.getValue();
- ManagedWorker worker = workers.get(id);
+ ManagedWorker worker = workers.get(workerId);
if (worker == null) {
// Identify tasks which are running, but which we don't know about.
// Add these to the NodeManager as tasks that should not be running.
- log.warn("{}: scheduling unknown worker {} for stopping.", node.name(), id);
- workers.put(id, new ManagedWorker(id, state.spec(), false, state));
+ log.warn("{}: scheduling unknown worker with ID {} for stopping.", node.name(), workerId);
+ workers.put(workerId, new ManagedWorker(workerId, state.taskId(),
+ state.spec(), false, state));
} else {
// Handle workers which need to be stopped.
if (state instanceof WorkerStarting || state instanceof WorkerRunning) {
@@ -227,7 +236,7 @@ public final class NodeManager {
} else {
log.info("{}: worker state changed from {} to {}", node.name(), worker.state, state);
worker.state = state;
- taskManager.updateWorkerState(node.name(), worker.id, state);
+ taskManager.updateWorkerState(node.name(), worker.workerId, state);
}
}
}
@@ -240,34 +249,39 @@ public final class NodeManager {
/**
* Create a new worker.
*
- * @param id The new worker id.
+ * @param workerId The new worker id.
+ * @param taskId The new task id.
* @param spec The task specification to use with the new worker.
*/
- public void createWorker(String id, TaskSpec spec) {
- executor.submit(new CreateWorker(id, spec));
+ public void createWorker(long workerId, String taskId, TaskSpec spec) {
+ executor.submit(new CreateWorker(workerId, taskId, spec));
}
/**
* Starts a worker.
*/
class CreateWorker implements Callable {
- private final String id;
+ private final long workerId;
+ private final String taskId;
private final TaskSpec spec;
- CreateWorker(String id, TaskSpec spec) {
- this.id = id;
+ CreateWorker(long workerId, String taskId, TaskSpec spec) {
+ this.workerId = workerId;
+ this.taskId = taskId;
this.spec = spec;
}
@Override
public Void call() throws Exception {
- ManagedWorker worker = workers.get(id);
+ ManagedWorker worker = workers.get(workerId);
if (worker != null) {
- log.error("{}: there is already a worker for task {}.", node.name(), id);
+ log.error("{}: there is already a worker {} with ID {}.",
+ node.name(), worker, workerId);
return null;
}
- log.info("{}: scheduling worker {} to start.", node.name(), id);
- workers.put(id, new ManagedWorker(id, spec, true, new WorkerReceiving(spec)));
+ worker = new ManagedWorker(workerId, taskId, spec, true, new WorkerReceiving(taskId, spec));
+ log.info("{}: scheduling worker {} to start.", node.name(), worker);
+ workers.put(workerId, worker);
rescheduleNextHeartbeat(0);
return null;
}
@@ -276,41 +290,72 @@ public final class NodeManager {
/**
* Stop a worker.
*
- * @param id The id of the worker to stop.
+ * @param workerId The id of the worker to stop.
*/
- public void stopWorker(String id) {
- executor.submit(new StopWorker(id));
+ public void stopWorker(long workerId) {
+ executor.submit(new StopWorker(workerId));
}
/**
* Stops a worker.
*/
class StopWorker implements Callable {
- private final String id;
+ private final long workerId;
- StopWorker(String id) {
- this.id = id;
+ StopWorker(long workerId) {
+ this.workerId = workerId;
}
@Override
public Void call() throws Exception {
- ManagedWorker worker = workers.get(id);
+ ManagedWorker worker = workers.get(workerId);
if (worker == null) {
- log.error("{}: can't stop non-existent worker {}.", node.name(), id);
+ log.error("{}: unable to locate worker to stop with ID {}.", node.name(), workerId);
return null;
}
if (!worker.shouldRun) {
- log.error("{}: The worker for task {} is already scheduled to stop.",
- node.name(), id);
+ log.error("{}: Worker {} is already scheduled to stop.",
+ node.name(), worker);
return null;
}
- log.info("{}: scheduling worker {} on {} to stop.", node.name(), id);
+ log.info("{}: scheduling worker {} to stop.", node.name(), worker);
worker.shouldRun = false;
rescheduleNextHeartbeat(0);
return null;
}
}
+ /**
+ * Destroy a worker.
+ *
+ * @param workerId The id of the worker to destroy.
+ */
+ public void destroyWorker(long workerId) {
+ executor.submit(new DestroyWorker(workerId));
+ }
+
+ /**
+ * Destroys a worker.
+ */
+ class DestroyWorker implements Callable {
+ private final long workerId;
+
+ DestroyWorker(long workerId) {
+ this.workerId = workerId;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ ManagedWorker worker = workers.remove(workerId);
+ if (worker == null) {
+ log.error("{}: unable to locate worker to destroy with ID {}.", node.name(), workerId);
+ return null;
+ }
+ rescheduleNextHeartbeat(0);
+ return null;
+ }
+ }
+
public void beginShutdown(boolean stopNode) {
executor.shutdownNow();
if (stopNode) {
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
index 7e19c8b34ae..74082bdb60a 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.utils.Scheduler;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -28,6 +29,7 @@ import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
+import org.apache.kafka.trogdor.rest.RequestConflictException;
import org.apache.kafka.trogdor.rest.TaskDone;
import org.apache.kafka.trogdor.rest.TaskPending;
import org.apache.kafka.trogdor.rest.TaskRunning;
@@ -106,12 +108,22 @@ public final class TaskManager {
*/
private final Map nodeManagers;
+ /**
+ * The states of all workers.
+ */
+ private final Map workerStates = new HashMap<>();
+
/**
* True if the TaskManager is shut down.
*/
private AtomicBoolean shutdown = new AtomicBoolean(false);
- TaskManager(Platform platform, Scheduler scheduler) {
+ /**
+ * The ID to use for the next worker. Only accessed by the state change thread.
+ */
+ private long nextWorkerId;
+
+ TaskManager(Platform platform, Scheduler scheduler, long firstWorkerId) {
this.platform = platform;
this.scheduler = scheduler;
this.time = scheduler.time();
@@ -119,6 +131,7 @@ public final class TaskManager {
this.executor = Executors.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("TaskManagerStateThread", false));
this.nodeManagers = new HashMap<>();
+ this.nextWorkerId = firstWorkerId;
for (Node node : platform.topology().nodes().values()) {
if (Node.Util.getTrogdorAgentPort(node) > 0) {
this.nodeManagers.put(node.name(), new NodeManager(node, this));
@@ -178,9 +191,9 @@ public final class TaskManager {
private Future> startFuture = null;
/**
- * The states of the workers involved with this task.
+ * Maps node names to worker IDs.
*/
- public Map workerStates = new TreeMap<>();
+ public TreeMap workerIds = new TreeMap<>();
/**
* If this is non-empty, a message describing how this task failed.
@@ -240,38 +253,42 @@ public final class TaskManager {
case PENDING:
return new TaskPending(spec);
case RUNNING:
- return new TaskRunning(spec, startedMs, getCombinedStatus(workerStates));
+ return new TaskRunning(spec, startedMs, getCombinedStatus());
case STOPPING:
- return new TaskStopping(spec, startedMs, getCombinedStatus(workerStates));
+ return new TaskStopping(spec, startedMs, getCombinedStatus());
case DONE:
- return new TaskDone(spec, startedMs, doneMs, error, cancelled, getCombinedStatus(workerStates));
+ return new TaskDone(spec, startedMs, doneMs, error, cancelled, getCombinedStatus());
}
throw new RuntimeException("unreachable");
}
- TreeSet activeWorkers() {
- TreeSet workerNames = new TreeSet<>();
- for (Map.Entry entry : workerStates.entrySet()) {
- if (!entry.getValue().done()) {
- workerNames.add(entry.getKey());
+ private JsonNode getCombinedStatus() {
+ if (workerIds.size() == 1) {
+ return workerStates.get(workerIds.values().iterator().next()).status();
+ } else {
+ ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
+ for (Map.Entry entry : workerIds.entrySet()) {
+ String nodeName = entry.getKey();
+ Long workerId = entry.getValue();
+ WorkerState state = workerStates.get(workerId);
+ JsonNode node = state.status();
+ if (node != null) {
+ objectNode.set(nodeName, node);
+ }
}
+ return objectNode;
}
- return workerNames;
}
- }
- private static final JsonNode getCombinedStatus(Map states) {
- if (states.size() == 1) {
- return states.values().iterator().next().status();
- } else {
- ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
- for (Map.Entry entry : states.entrySet()) {
- JsonNode node = entry.getValue().status();
- if (node != null) {
- objectNode.set(entry.getKey(), node);
+ TreeMap activeWorkerIds() {
+ TreeMap activeWorkerIds = new TreeMap<>();
+ for (Map.Entry entry : workerIds.entrySet()) {
+ WorkerState workerState = workerStates.get(entry.getValue());
+ if (!workerState.done()) {
+ activeWorkerIds.put(entry.getKey(), entry.getValue());
}
}
- return objectNode;
+ return activeWorkerIds;
}
}
@@ -280,27 +297,21 @@ public final class TaskManager {
*
* @param id The ID of the task to create.
* @param spec The specification of the task to create.
- *
- * @return The specification of the task with the given ID.
- * Note that if there was already a task with the given ID,
- * this may be different from the specification that was
- * requested.
*/
- public TaskSpec createTask(final String id, TaskSpec spec)
- throws ExecutionException, InterruptedException {
- final TaskSpec existingSpec = executor.submit(new CreateTask(id, spec)).get();
- if (existingSpec != null) {
- log.info("Ignoring request to create task {}, because there is already " +
- "a task with that id.", id);
- return existingSpec;
+ public void createTask(final String id, TaskSpec spec)
+ throws Throwable {
+ try {
+ executor.submit(new CreateTask(id, spec)).get();
+ } catch (ExecutionException e) {
+ log.info("createTask(id={}, spec={}) error", id, spec, e);
+ throw e.getCause();
}
- return spec;
}
/**
* Handles a request to create a new task. Processed by the state change thread.
*/
- class CreateTask implements Callable {
+ class CreateTask implements Callable {
private final String id;
private final TaskSpec spec;
@@ -310,11 +321,18 @@ public final class TaskManager {
}
@Override
- public TaskSpec call() throws Exception {
+ public Void call() throws Exception {
+ if (id.isEmpty()) {
+ throw new InvalidRequestException("Invalid empty ID in createTask request.");
+ }
ManagedTask task = tasks.get(id);
if (task != null) {
- log.info("Task ID {} is already in use.", id);
- return task.spec;
+ if (!task.spec.equals(spec)) {
+ throw new RequestConflictException("Task ID " + id + " already " +
+ "exists, and has a different spec " + task.spec);
+ }
+ log.info("Task {} already exists with spec {}", id, spec);
+ return null;
}
TaskController controller = null;
String failure = null;
@@ -374,8 +392,10 @@ public final class TaskManager {
task.state = ManagedTaskState.RUNNING;
task.startedMs = time.milliseconds();
for (String workerName : nodeNames) {
- task.workerStates.put(workerName, new WorkerReceiving(task.spec));
- nodeManagers.get(workerName).createWorker(task.id, task.spec);
+ long workerId = nextWorkerId++;
+ task.workerIds.put(workerName, workerId);
+ workerStates.put(workerId, new WorkerReceiving(task.id, task.spec));
+ nodeManagers.get(workerName).createWorker(workerId, task.id, task.spec);
}
return null;
}
@@ -385,18 +405,20 @@ public final class TaskManager {
* Stop a task.
*
* @param id The ID of the task to stop.
- * @return The specification of the task which was stopped, or null if there
- * was no task found with the given ID.
*/
- public TaskSpec stopTask(final String id) throws ExecutionException, InterruptedException {
- final TaskSpec spec = executor.submit(new CancelTask(id)).get();
- return spec;
+ public void stopTask(final String id) throws Throwable {
+ try {
+ executor.submit(new CancelTask(id)).get();
+ } catch (ExecutionException e) {
+ log.info("stopTask(id={}) error", id, e);
+ throw e.getCause();
+ }
}
/**
* Handles cancelling a task. Processed by the state change thread.
*/
- class CancelTask implements Callable {
+ class CancelTask implements Callable {
private final String id;
CancelTask(String id) {
@@ -404,7 +426,10 @@ public final class TaskManager {
}
@Override
- public TaskSpec call() throws Exception {
+ public Void call() throws Exception {
+ if (id.isEmpty()) {
+ throw new InvalidRequestException("Invalid empty ID in stopTask request.");
+ }
ManagedTask task = tasks.get(id);
if (task == null) {
log.info("Can't cancel non-existent task {}.", id);
@@ -420,16 +445,21 @@ public final class TaskManager {
break;
case RUNNING:
task.cancelled = true;
- TreeSet activeWorkers = task.activeWorkers();
- if (activeWorkers.isEmpty()) {
- log.info("Task {} is now complete with error: {}", id, task.error);
+ TreeMap activeWorkerIds = task.activeWorkerIds();
+ if (activeWorkerIds.isEmpty()) {
+ if (task.error.isEmpty()) {
+ log.info("Task {} is now complete with no errors.", id);
+ } else {
+ log.info("Task {} is now complete with error: {}", id, task.error);
+ }
task.doneMs = time.milliseconds();
task.state = ManagedTaskState.DONE;
} else {
- for (String workerName : activeWorkers) {
- nodeManagers.get(workerName).stopWorker(id);
+ for (Map.Entry entry : activeWorkerIds.entrySet()) {
+ nodeManagers.get(entry.getKey()).stopWorker(entry.getValue());
}
- log.info("Cancelling task {} on worker(s): {}", id, Utils.join(activeWorkers, ", "));
+ log.info("Cancelling task {} with worker(s) {}",
+ id, Utils.mkString(activeWorkerIds, "", "", " = ", ", "));
task.state = ManagedTaskState.STOPPING;
}
break;
@@ -440,7 +470,48 @@ public final class TaskManager {
log.info("Can't cancel task {} because it is already done.", id);
break;
}
- return task.spec;
+ return null;
+ }
+ }
+
+ public void destroyTask(String id) throws Throwable {
+ try {
+ executor.submit(new DestroyTask(id)).get();
+ } catch (ExecutionException e) {
+ log.info("destroyTask(id={}) error", id, e);
+ throw e.getCause();
+ }
+ }
+
+ /**
+ * Handles destroying a task. Processed by the state change thread.
+ */
+ class DestroyTask implements Callable {
+ private final String id;
+
+ DestroyTask(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ if (id.isEmpty()) {
+ throw new InvalidRequestException("Invalid empty ID in destroyTask request.");
+ }
+ ManagedTask task = tasks.remove(id);
+ if (task == null) {
+ log.info("Can't destroy task {}: no such task found.", id);
+ return null;
+ }
+ log.info("Destroying task {}.", id);
+ task.clearStartFuture();
+ for (Map.Entry entry : task.workerIds.entrySet()) {
+ long workerId = entry.getValue();
+ workerStates.remove(workerId);
+ String nodeName = entry.getKey();
+ nodeManagers.get(nodeName).destroyWorker(workerId);
+ }
+ return null;
}
}
@@ -448,38 +519,48 @@ public final class TaskManager {
* Update the state of a particular agent's worker.
*
* @param nodeName The node where the agent is running.
- * @param id The worker name.
+ * @param workerId The worker ID.
* @param state The worker state.
*/
- public void updateWorkerState(String nodeName, String id, WorkerState state) {
- executor.submit(new UpdateWorkerState(nodeName, id, state));
+ public void updateWorkerState(String nodeName, long workerId, WorkerState state) {
+ executor.submit(new UpdateWorkerState(nodeName, workerId, state));
}
+ /**
+ * Updates the state of a worker. Process by the state change thread.
+ */
class UpdateWorkerState implements Callable {
private final String nodeName;
- private final String id;
- private final WorkerState state;
+ private final long workerId;
+ private final WorkerState nextState;
- UpdateWorkerState(String nodeName, String id, WorkerState state) {
+ UpdateWorkerState(String nodeName, long workerId, WorkerState nextState) {
this.nodeName = nodeName;
- this.id = id;
- this.state = state;
+ this.workerId = workerId;
+ this.nextState = nextState;
}
@Override
public Void call() throws Exception {
- ManagedTask task = tasks.get(id);
- if (task == null) {
- log.error("Can't update worker state unknown worker {} on node {}",
- id, nodeName);
- return null;
- }
- WorkerState prevState = task.workerStates.get(nodeName);
- log.debug("Task {}: Updating worker state for {} from {} to {}.",
- id, nodeName, prevState, state);
- task.workerStates.put(nodeName, state);
- if (state.done() && (!prevState.done())) {
- handleWorkerCompletion(task, nodeName, (WorkerDone) state);
+ try {
+ WorkerState prevState = workerStates.get(workerId);
+ if (prevState == null) {
+ throw new RuntimeException("Unable to find workerId " + workerId);
+ }
+ ManagedTask task = tasks.get(prevState.taskId());
+ if (task == null) {
+ throw new RuntimeException("Unable to find taskId " + prevState.taskId());
+ }
+ log.debug("Task {}: Updating worker state for {} on {} from {} to {}.",
+ task.id, workerId, nodeName, prevState, nextState);
+ workerStates.put(workerId, nextState);
+ if (nextState.done() && (!prevState.done())) {
+ handleWorkerCompletion(task, nodeName, (WorkerDone) nextState);
+ }
+ } catch (Exception e) {
+ log.error("Error updating worker state for {} on {}. Stopping worker.",
+ workerId, nodeName, e);
+ nodeManagers.get(nodeName).stopWorker(workerId);
}
return null;
}
@@ -501,19 +582,19 @@ public final class TaskManager {
nodeName, task.id, state.error(), JsonUtil.toJsonString(state.status()));
task.maybeSetError(state.error());
}
- if (task.activeWorkers().isEmpty()) {
+ TreeMap activeWorkerIds = task.activeWorkerIds();
+ if (activeWorkerIds.isEmpty()) {
task.doneMs = time.milliseconds();
task.state = ManagedTaskState.DONE;
log.info("{}: Task {} is now complete on {} with error: {}",
- nodeName, task.id, Utils.join(task.workerStates.keySet(), ", "),
+ nodeName, task.id, Utils.join(task.workerIds.keySet(), ", "),
task.error.isEmpty() ? "(none)" : task.error);
} else if ((task.state == ManagedTaskState.RUNNING) && (!task.error.isEmpty())) {
- TreeSet activeWorkers = task.activeWorkers();
log.info("{}: task {} stopped with error {}. Stopping worker(s): {}",
- nodeName, task.id, task.error, Utils.join(activeWorkers, ", "));
+ nodeName, task.id, task.error, Utils.mkString(activeWorkerIds, "{", "}", ": ", ", "));
task.state = ManagedTaskState.STOPPING;
- for (String workerName : activeWorkers) {
- nodeManagers.get(workerName).stopWorker(task.id);
+ for (Map.Entry entry : activeWorkerIds.entrySet()) {
+ nodeManagers.get(entry.getKey()).stopWorker(entry.getValue());
}
}
}
@@ -525,6 +606,9 @@ public final class TaskManager {
return executor.submit(new GetTasksResponse(request)).get();
}
+ /**
+ * Gets information about the tasks being managed. Processed by the state change thread.
+ */
class GetTasksResponse implements Callable {
private final TasksRequest request;
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java
index c505e75e3ff..d41a54b2881 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java
@@ -27,13 +27,13 @@ import java.util.TreeMap;
*/
public class AgentStatusResponse extends Message {
private final long serverStartMs;
- private final TreeMap workers;
+ private final TreeMap workers;
@JsonCreator
public AgentStatusResponse(@JsonProperty("serverStartMs") long serverStartMs,
- @JsonProperty("workers") TreeMap workers) {
+ @JsonProperty("workers") TreeMap workers) {
this.serverStartMs = serverStartMs;
- this.workers = workers == null ? new TreeMap() : workers;
+ this.workers = workers == null ? new TreeMap() : workers;
}
@JsonProperty
@@ -42,7 +42,7 @@ public class AgentStatusResponse extends Message {
}
@JsonProperty
- public TreeMap workers() {
+ public TreeMap workers() {
return workers;
}
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerRequest.java
index 9f6e8dcf0d2..4acc943251e 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerRequest.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerRequest.java
@@ -25,19 +25,27 @@ import org.apache.kafka.trogdor.task.TaskSpec;
* A request to the Trogdor agent to create a worker.
*/
public class CreateWorkerRequest extends Message {
- private final String id;
+ private final long workerId;
+ private final String taskId;
private final TaskSpec spec;
@JsonCreator
- public CreateWorkerRequest(@JsonProperty("id") String id,
+ public CreateWorkerRequest(@JsonProperty("workerId") long workerId,
+ @JsonProperty("taskId") String taskId,
@JsonProperty("spec") TaskSpec spec) {
- this.id = id;
+ this.workerId = workerId;
+ this.taskId = taskId;
this.spec = spec;
}
@JsonProperty
- public String id() {
- return id;
+ public long workerId() {
+ return workerId;
+ }
+
+ @JsonProperty
+ public String taskId() {
+ return taskId;
}
@JsonProperty
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerResponse.java
deleted file mode 100644
index 9e068eccd7a..00000000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerResponse.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.rest;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.task.TaskSpec;
-
-/**
- * A response from the Trogdor agent about creating a worker.
- */
-public class CreateWorkerResponse extends Message {
- private final TaskSpec spec;
-
- @JsonCreator
- public CreateWorkerResponse(@JsonProperty("spec") TaskSpec spec) {
- this.spec = spec;
- }
-
- @JsonProperty
- public TaskSpec spec() {
- return spec;
- }
-}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/DestroyTaskRequest.java
similarity index 74%
rename from tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerResponse.java
rename to tools/src/main/java/org/apache/kafka/trogdor/rest/DestroyTaskRequest.java
index 7d5b4687db3..d782d5d1cfb 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerResponse.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/DestroyTaskRequest.java
@@ -19,21 +19,20 @@ package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.task.TaskSpec;
/**
- * A response from the Trogdor agent about stopping a worker.
+ * A request to the Trogdor coordinator to delete all memory of a task.
*/
-public class StopWorkerResponse extends Message {
- private final TaskSpec spec;
+public class DestroyTaskRequest extends Message {
+ private final String id;
@JsonCreator
- public StopWorkerResponse(@JsonProperty("spec") TaskSpec spec) {
- this.spec = spec;
+ public DestroyTaskRequest(@JsonProperty("id") String id) {
+ this.id = id;
}
@JsonProperty
- public TaskSpec spec() {
- return spec;
+ public String id() {
+ return id;
}
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/DestroyWorkerRequest.java
similarity index 75%
rename from tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskResponse.java
rename to tools/src/main/java/org/apache/kafka/trogdor/rest/DestroyWorkerRequest.java
index f344dc9666a..e5a8969d4cd 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskResponse.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/DestroyWorkerRequest.java
@@ -19,21 +19,20 @@ package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.task.TaskSpec;
/**
- * A response from the Trogdor coordinator about stopping a task.
+ * A request to the Trogdor agent to delete all memory of a task.
*/
-public class StopTaskResponse extends Message {
- private final TaskSpec spec;
+public class DestroyWorkerRequest extends Message {
+ private final long workerId;
@JsonCreator
- public StopTaskResponse(@JsonProperty("spec") TaskSpec spec) {
- this.spec = spec;
+ public DestroyWorkerRequest(@JsonProperty("workerId") long workerId) {
+ this.workerId = workerId;
}
@JsonProperty
- public TaskSpec spec() {
- return spec;
+ public long workerId() {
+ return workerId;
}
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/RequestConflictException.java
similarity index 64%
rename from tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskResponse.java
rename to tools/src/main/java/org/apache/kafka/trogdor/rest/RequestConflictException.java
index 54ea0f23c97..2701f6af8f9 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskResponse.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/RequestConflictException.java
@@ -17,23 +17,17 @@
package org.apache.kafka.trogdor.rest;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.task.TaskSpec;
-
/**
- * A response from the Trogdor coordinator about creating a task.
+ * Indicates that a given request got an HTTP error 409: CONFLICT.
*/
-public class CreateTaskResponse extends Message {
- private final TaskSpec spec;
+public class RequestConflictException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
- @JsonCreator
- public CreateTaskResponse(@JsonProperty("spec") TaskSpec spec) {
- this.spec = spec;
+ public RequestConflictException(String message) {
+ super(message);
}
- @JsonProperty
- public TaskSpec spec() {
- return spec;
+ public RequestConflictException() {
+ super();
}
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/RestExceptionMapper.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/RestExceptionMapper.java
index f62a775fad9..57c54ec04d8 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/RestExceptionMapper.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/RestExceptionMapper.java
@@ -18,6 +18,7 @@ package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
+import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.SerializationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +39,8 @@ public class RestExceptionMapper implements ExceptionMapper {
}
if (e instanceof NotFoundException) {
return buildResponse(Response.Status.NOT_FOUND, e);
+ } else if (e instanceof InvalidRequestException) {
+ return buildResponse(Response.Status.BAD_REQUEST, e);
} else if (e instanceof InvalidTypeIdException) {
return buildResponse(Response.Status.NOT_IMPLEMENTED, e);
} else if (e instanceof JsonMappingException) {
@@ -46,6 +49,8 @@ public class RestExceptionMapper implements ExceptionMapper {
return buildResponse(Response.Status.NOT_IMPLEMENTED, e);
} else if (e instanceof SerializationException) {
return buildResponse(Response.Status.BAD_REQUEST, e);
+ } else if (e instanceof RequestConflictException) {
+ return buildResponse(Response.Status.CONFLICT, e);
} else {
return buildResponse(Response.Status.INTERNAL_SERVER_ERROR, e);
}
@@ -57,7 +62,9 @@ public class RestExceptionMapper implements ExceptionMapper {
} else if (code == Response.Status.NOT_IMPLEMENTED.getStatusCode()) {
throw new ClassNotFoundException(msg);
} else if (code == Response.Status.BAD_REQUEST.getStatusCode()) {
- throw new SerializationException(msg);
+ throw new InvalidRequestException(msg);
+ } else if (code == Response.Status.CONFLICT.getStatusCode()) {
+ throw new RequestConflictException(msg);
} else {
throw new RuntimeException(msg);
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskRequest.java
index 3287801d303..704a961f99e 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskRequest.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskRequest.java
@@ -28,7 +28,7 @@ public class StopTaskRequest extends Message {
@JsonCreator
public StopTaskRequest(@JsonProperty("id") String id) {
- this.id = id;
+ this.id = (id == null) ? "" : id;
}
@JsonProperty
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerRequest.java
index 54c689adfcd..c1dcff363c8 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerRequest.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerRequest.java
@@ -24,15 +24,15 @@ import com.fasterxml.jackson.annotation.JsonProperty;
* A request to the Trogdor agent to stop a worker.
*/
public class StopWorkerRequest extends Message {
- private final String id;
+ private final long workerId;
@JsonCreator
- public StopWorkerRequest(@JsonProperty("id") String id) {
- this.id = id;
+ public StopWorkerRequest(@JsonProperty("workerId") long workerId) {
+ this.workerId = workerId;
}
@JsonProperty
- public String id() {
- return id;
+ public long workerId() {
+ return workerId;
}
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java
index 500d3c6a0c2..5f773bba5c4 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java
@@ -49,12 +49,13 @@ public class WorkerDone extends WorkerState {
private final String error;
@JsonCreator
- public WorkerDone(@JsonProperty("spec") TaskSpec spec,
+ public WorkerDone(@JsonProperty("taskId") String taskId,
+ @JsonProperty("spec") TaskSpec spec,
@JsonProperty("startedMs") long startedMs,
@JsonProperty("doneMs") long doneMs,
@JsonProperty("status") JsonNode status,
@JsonProperty("error") String error) {
- super(spec);
+ super(taskId, spec);
this.startedMs = startedMs;
this.doneMs = doneMs;
this.status = status == null ? NullNode.instance : status;
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java
index 70687743f74..1babcce2a57 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java
@@ -29,8 +29,9 @@ import org.apache.kafka.trogdor.task.TaskSpec;
*/
public final class WorkerReceiving extends WorkerState {
@JsonCreator
- public WorkerReceiving(@JsonProperty("spec") TaskSpec spec) {
- super(spec);
+ public WorkerReceiving(@JsonProperty("taskId") String taskId,
+ @JsonProperty("spec") TaskSpec spec) {
+ super(taskId, spec);
}
@Override
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java
index af8ee88a1ab..15e77528d62 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java
@@ -39,10 +39,11 @@ public class WorkerRunning extends WorkerState {
private final JsonNode status;
@JsonCreator
- public WorkerRunning(@JsonProperty("spec") TaskSpec spec,
+ public WorkerRunning(@JsonProperty("taskId") String taskId,
+ @JsonProperty("spec") TaskSpec spec,
@JsonProperty("startedMs") long startedMs,
@JsonProperty("status") JsonNode status) {
- super(spec);
+ super(taskId, spec);
this.startedMs = startedMs;
this.status = status == null ? NullNode.instance : status;
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java
index b568ec1f887..7a06eac5b7d 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java
@@ -28,8 +28,9 @@ import org.apache.kafka.trogdor.task.TaskSpec;
*/
public final class WorkerStarting extends WorkerState {
@JsonCreator
- public WorkerStarting(@JsonProperty("spec") TaskSpec spec) {
- super(spec);
+ public WorkerStarting(@JsonProperty("taskId") String taskId,
+ @JsonProperty("spec") TaskSpec spec) {
+ super(taskId, spec);
}
@Override
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java
index 044d719f894..6480a2410dc 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java
@@ -38,12 +38,19 @@ import org.apache.kafka.trogdor.task.TaskSpec;
@JsonSubTypes.Type(value = WorkerDone.class, name = "DONE")
})
public abstract class WorkerState extends Message {
+ private final String taskId;
private final TaskSpec spec;
- public WorkerState(TaskSpec spec) {
+ public WorkerState(String taskId, TaskSpec spec) {
+ this.taskId = taskId;
this.spec = spec;
}
+ @JsonProperty
+ public String taskId() {
+ return taskId;
+ }
+
@JsonProperty
public TaskSpec spec() {
return spec;
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java
index 9fbb3ff7306..2942e118ac6 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java
@@ -39,10 +39,11 @@ public class WorkerStopping extends WorkerState {
private final JsonNode status;
@JsonCreator
- public WorkerStopping(@JsonProperty("spec") TaskSpec spec,
+ public WorkerStopping(@JsonProperty("taskId") String taskId,
+ @JsonProperty("spec") TaskSpec spec,
@JsonProperty("startedMs") long startedMs,
@JsonProperty("status") JsonNode status) {
- super(spec);
+ super(taskId, spec);
this.startedMs = startedMs;
this.status = status == null ? NullNode.instance : status;
}
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
index 61de5c98797..158e690da4b 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
@@ -36,8 +36,9 @@ import org.apache.kafka.trogdor.fault.Kibosh.KiboshFilesUnreadableFaultSpec;
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
-import org.apache.kafka.trogdor.rest.CreateWorkerResponse;
+import org.apache.kafka.trogdor.rest.DestroyWorkerRequest;
import org.apache.kafka.trogdor.rest.JsonRestServer;
+import org.apache.kafka.trogdor.rest.RequestConflictException;
import org.apache.kafka.trogdor.rest.StopWorkerRequest;
import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerRunning;
@@ -120,36 +121,47 @@ public class AgentTest {
new ExpectedTasks().waitFor(client);
final NoOpTaskSpec fooSpec = new NoOpTaskSpec(1000, 600000);
- CreateWorkerResponse response = client.createWorker(new CreateWorkerRequest("foo", fooSpec));
- assertEquals(fooSpec.toString(), response.spec().toString());
+ client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec));
new ExpectedTasks().addTask(new ExpectedTaskBuilder("foo").
- workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
+ workerState(new WorkerRunning("foo", fooSpec, 0, new TextNode("active"))).
build()).
waitFor(client);
final NoOpTaskSpec barSpec = new NoOpTaskSpec(2000, 900000);
- client.createWorker(new CreateWorkerRequest("bar", barSpec));
- client.createWorker(new CreateWorkerRequest("bar", barSpec));
+ client.createWorker(new CreateWorkerRequest(1, "bar", barSpec));
+ client.createWorker(new CreateWorkerRequest(1, "bar", barSpec));
+
+ try {
+ client.createWorker(new CreateWorkerRequest(1, "foo", barSpec));
+ Assert.fail("Expected RequestConflictException when re-creating a request with a different taskId.");
+ } catch (RequestConflictException exception) {
+ }
+ try {
+ client.createWorker(new CreateWorkerRequest(1, "bar", fooSpec));
+ Assert.fail("Expected RequestConflictException when re-creating a request with a different spec.");
+ } catch (RequestConflictException exception) {
+ }
+
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
+ workerState(new WorkerRunning("foo", fooSpec, 0, new TextNode("active"))).
build()).
addTask(new ExpectedTaskBuilder("bar").
- workerState(new WorkerRunning(barSpec, 0, new TextNode("active"))).
+ workerState(new WorkerRunning("bar", barSpec, 0, new TextNode("active"))).
build()).
waitFor(client);
final NoOpTaskSpec bazSpec = new NoOpTaskSpec(1, 450000);
- client.createWorker(new CreateWorkerRequest("baz", bazSpec));
+ client.createWorker(new CreateWorkerRequest(2, "baz", bazSpec));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
+ workerState(new WorkerRunning("foo", fooSpec, 0, new TextNode("active"))).
build()).
addTask(new ExpectedTaskBuilder("bar").
- workerState(new WorkerRunning(barSpec, 0, new TextNode("active"))).
+ workerState(new WorkerRunning("bar", barSpec, 0, new TextNode("active"))).
build()).
addTask(new ExpectedTaskBuilder("baz").
- workerState(new WorkerRunning(bazSpec, 0, new TextNode("active"))).
+ workerState(new WorkerRunning("baz", bazSpec, 0, new TextNode("active"))).
build()).
waitFor(client);
@@ -167,23 +179,23 @@ public class AgentTest {
new ExpectedTasks().waitFor(client);
final NoOpTaskSpec fooSpec = new NoOpTaskSpec(10, 2);
- client.createWorker(new CreateWorkerRequest("foo", fooSpec));
+ client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
+ workerState(new WorkerRunning("foo", fooSpec, 0, new TextNode("active"))).
build()).
waitFor(client);
time.sleep(1);
final NoOpTaskSpec barSpec = new NoOpTaskSpec(2000, 900000);
- client.createWorker(new CreateWorkerRequest("bar", barSpec));
+ client.createWorker(new CreateWorkerRequest(1, "bar", barSpec));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
+ workerState(new WorkerRunning("foo", fooSpec, 0, new TextNode("active"))).
build()).
addTask(new ExpectedTaskBuilder("bar").
- workerState(new WorkerRunning(barSpec, 1, new TextNode("active"))).
+ workerState(new WorkerRunning("bar", barSpec, 1, new TextNode("active"))).
build()).
waitFor(client);
@@ -191,21 +203,21 @@ public class AgentTest {
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- workerState(new WorkerDone(fooSpec, 0, 2, new TextNode("done"), "")).
+ workerState(new WorkerDone("foo", fooSpec, 0, 2, new TextNode("done"), "")).
build()).
addTask(new ExpectedTaskBuilder("bar").
- workerState(new WorkerRunning(barSpec, 1, new TextNode("active"))).
+ workerState(new WorkerRunning("bar", barSpec, 1, new TextNode("active"))).
build()).
waitFor(client);
time.sleep(5);
- client.stopWorker(new StopWorkerRequest("bar"));
+ client.stopWorker(new StopWorkerRequest(1));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- workerState(new WorkerDone(fooSpec, 0, 2, new TextNode("done"), "")).
+ workerState(new WorkerDone("foo", fooSpec, 0, 2, new TextNode("done"), "")).
build()).
addTask(new ExpectedTaskBuilder("bar").
- workerState(new WorkerDone(barSpec, 1, 7, new TextNode("done"), "")).
+ workerState(new WorkerDone("bar", barSpec, 1, 7, new TextNode("done"), "")).
build()).
waitFor(client);
@@ -224,25 +236,25 @@ public class AgentTest {
SampleTaskSpec fooSpec = new SampleTaskSpec(0, 900000,
Collections.singletonMap("node01", 1L), "");
- client.createWorker(new CreateWorkerRequest("foo", fooSpec));
+ client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
+ workerState(new WorkerRunning("foo", fooSpec, 0, new TextNode("active"))).
build()).
waitFor(client);
SampleTaskSpec barSpec = new SampleTaskSpec(0, 900000,
Collections.singletonMap("node01", 2L), "baz");
- client.createWorker(new CreateWorkerRequest("bar", barSpec));
+ client.createWorker(new CreateWorkerRequest(1, "bar", barSpec));
time.sleep(1);
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- workerState(new WorkerDone(fooSpec, 0, 1,
+ workerState(new WorkerDone("foo", fooSpec, 0, 1,
new TextNode("halted"), "")).
build()).
addTask(new ExpectedTaskBuilder("bar").
- workerState(new WorkerRunning(barSpec, 0,
+ workerState(new WorkerRunning("bar", barSpec, 0,
new TextNode("active"))).
build()).
waitFor(client);
@@ -250,11 +262,11 @@ public class AgentTest {
time.sleep(1);
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- workerState(new WorkerDone(fooSpec, 0, 1,
+ workerState(new WorkerDone("foo", fooSpec, 0, 1,
new TextNode("halted"), "")).
build()).
addTask(new ExpectedTaskBuilder("bar").
- workerState(new WorkerDone(barSpec, 0, 2,
+ workerState(new WorkerDone("bar", barSpec, 0, 2,
new TextNode("halted"), "baz")).
build()).
waitFor(client);
@@ -293,37 +305,84 @@ public class AgentTest {
Assert.assertEquals(KiboshControlFile.EMPTY, mockKibosh.read());
FilesUnreadableFaultSpec fooSpec = new FilesUnreadableFaultSpec(0, 900000,
Collections.singleton("myAgent"), mockKibosh.tempDir.getPath().toString(), "/foo", 123);
- client.createWorker(new CreateWorkerRequest("foo", fooSpec));
+ client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- workerState(new WorkerRunning(fooSpec, 0, new TextNode("Added fault foo"))).
+ workerState(new WorkerRunning("foo", fooSpec, 0, new TextNode("Added fault foo"))).
build()).
waitFor(client);
Assert.assertEquals(new KiboshControlFile(Collections.singletonList(
new KiboshFilesUnreadableFaultSpec("/foo", 123))), mockKibosh.read());
FilesUnreadableFaultSpec barSpec = new FilesUnreadableFaultSpec(0, 900000,
Collections.singleton("myAgent"), mockKibosh.tempDir.getPath().toString(), "/bar", 456);
- client.createWorker(new CreateWorkerRequest("bar", barSpec));
+ client.createWorker(new CreateWorkerRequest(1, "bar", barSpec));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- workerState(new WorkerRunning(fooSpec, 0, new TextNode("Added fault foo"))).build()).
+ workerState(new WorkerRunning("foo", fooSpec, 0, new TextNode("Added fault foo"))).build()).
addTask(new ExpectedTaskBuilder("bar").
- workerState(new WorkerRunning(barSpec, 0, new TextNode("Added fault bar"))).build()).
+ workerState(new WorkerRunning("bar", barSpec, 0, new TextNode("Added fault bar"))).build()).
waitFor(client);
Assert.assertEquals(new KiboshControlFile(new ArrayList() {{
add(new KiboshFilesUnreadableFaultSpec("/foo", 123));
add(new KiboshFilesUnreadableFaultSpec("/bar", 456));
}}), mockKibosh.read());
time.sleep(1);
- client.stopWorker(new StopWorkerRequest("foo"));
+ client.stopWorker(new StopWorkerRequest(0));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- workerState(new WorkerDone(fooSpec, 0, 1, new TextNode("Removed fault foo"), "")).build()).
+ workerState(new WorkerDone("foo", fooSpec, 0, 1, new TextNode("Removed fault foo"), "")).build()).
addTask(new ExpectedTaskBuilder("bar").
- workerState(new WorkerRunning(barSpec, 0, new TextNode("Added fault bar"))).build()).
+ workerState(new WorkerRunning("bar", barSpec, 0, new TextNode("Added fault bar"))).build()).
waitFor(client);
Assert.assertEquals(new KiboshControlFile(Collections.singletonList(
new KiboshFilesUnreadableFaultSpec("/bar", 456))), mockKibosh.read());
}
}
+
+ @Test
+ public void testDestroyWorkers() throws Exception {
+ MockTime time = new MockTime(0, 0, 0);
+ MockScheduler scheduler = new MockScheduler(time);
+ Agent agent = createAgent(scheduler);
+ AgentClient client = new AgentClient.Builder().
+ maxTries(10).target("localhost", agent.port()).build();
+ new ExpectedTasks().waitFor(client);
+
+ final NoOpTaskSpec fooSpec = new NoOpTaskSpec(10, 5);
+ client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec));
+ new ExpectedTasks().
+ addTask(new ExpectedTaskBuilder("foo").
+ workerState(new WorkerRunning("foo", fooSpec, 0, new TextNode("active"))).
+ build()).
+ waitFor(client);
+ time.sleep(1);
+
+ client.destroyWorker(new DestroyWorkerRequest(0));
+ client.destroyWorker(new DestroyWorkerRequest(0));
+ client.destroyWorker(new DestroyWorkerRequest(1));
+ new ExpectedTasks().waitFor(client);
+ time.sleep(1);
+
+ final NoOpTaskSpec fooSpec2 = new NoOpTaskSpec(100, 1);
+ client.createWorker(new CreateWorkerRequest(1, "foo", fooSpec2));
+ new ExpectedTasks().
+ addTask(new ExpectedTaskBuilder("foo").
+ workerState(new WorkerRunning("foo", fooSpec2, 2, new TextNode("active"))).
+ build()).
+ waitFor(client);
+
+ time.sleep(2);
+ new ExpectedTasks().
+ addTask(new ExpectedTaskBuilder("foo").
+ workerState(new WorkerDone("foo", fooSpec2, 2, 4, new TextNode("done"), "")).
+ build()).
+ waitFor(client);
+
+ time.sleep(1);
+ client.destroyWorker(new DestroyWorkerRequest(1));
+ new ExpectedTasks().waitFor(client);
+
+ agent.beginShutdown();
+ agent.waitForShutdown();
+ }
};
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
index 617bf34bcd9..121281f5910 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
@@ -32,6 +32,7 @@ import org.apache.kafka.trogdor.task.TaskSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
@@ -184,10 +185,14 @@ public class ExpectedTasks {
throw new RuntimeException(e);
}
StringBuilder errors = new StringBuilder();
+ HashMap taskIdToWorkerState = new HashMap<>();
+ for (WorkerState state : status.workers().values()) {
+ taskIdToWorkerState.put(state.taskId(), state);
+ }
for (Map.Entry entry : expected.entrySet()) {
String id = entry.getKey();
ExpectedTask worker = entry.getValue();
- String differences = worker.compare(status.workers().get(id));
+ String differences = worker.compare(taskIdToWorkerState.get(id));
if (differences != null) {
errors.append(differences);
}
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
index 8101d9c6e4e..c1f7490cc82 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
@@ -45,9 +45,9 @@ public class JsonSerializationTest {
verify(new ProcessStopFaultSpec(0, 0, null, null));
verify(new AgentStatusResponse(0, null));
verify(new TasksResponse(null));
- verify(new WorkerDone(null, 0, 0, null, null));
- verify(new WorkerRunning(null, 0, null));
- verify(new WorkerStopping(null, 0, null));
+ verify(new WorkerDone(null, null, 0, 0, null, null));
+ verify(new WorkerRunning(null, null, 0, null));
+ verify(new WorkerStopping(null, null, 0, null));
verify(new ProduceBenchSpec(0, 0, null, null,
0, 0, null, null, null, null, null, 0, 0, "test-topic", 1, (short) 3));
verify(new RoundTripWorkloadSpec(0, 0, null, null, null, null, null, null,
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java b/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
index 07f02c5830b..46315c27d15 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
@@ -185,7 +185,7 @@ public class MiniTrogdorCluster implements AutoCloseable {
}
if (node.coordinatorRestResource != null) {
node.coordinator = new Coordinator(node.platform, scheduler,
- node.coordinatorRestServer, node.coordinatorRestResource);
+ node.coordinatorRestServer, node.coordinatorRestResource, 0);
}
} catch (Exception e) {
log.error("Unable to initialize {}", nodeName, e);
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
index 34d7ffe6106..e9434844405 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
@@ -35,6 +35,8 @@ import org.apache.kafka.trogdor.common.MiniTrogdorCluster;
import org.apache.kafka.trogdor.fault.NetworkPartitionFaultSpec;
import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
import org.apache.kafka.trogdor.rest.CreateTaskRequest;
+import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
+import org.apache.kafka.trogdor.rest.RequestConflictException;
import org.apache.kafka.trogdor.rest.StopTaskRequest;
import org.apache.kafka.trogdor.rest.TaskDone;
import org.apache.kafka.trogdor.rest.TaskPending;
@@ -57,8 +59,9 @@ import java.util.HashMap;
import java.util.List;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
public class CoordinatorTest {
private static final Logger log = LoggerFactory.getLogger(CoordinatorTest.class);
@@ -96,11 +99,25 @@ public class CoordinatorTest {
build()).
waitFor(cluster.coordinatorClient());
+ // Re-creating a task with the same arguments is not an error.
+ cluster.coordinatorClient().createTask(
+ new CreateTaskRequest("foo", fooSpec));
+
+ // Re-creating a task with different arguments gives a RequestConflictException.
+ try {
+ NoOpTaskSpec barSpec = new NoOpTaskSpec(1000, 2000);
+ cluster.coordinatorClient().createTask(
+ new CreateTaskRequest("foo", barSpec));
+ fail("Expected to get an exception when re-creating a task with a " +
+ "different task spec.");
+ } catch (RequestConflictException exception) {
+ }
+
time.sleep(2);
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
taskState(new TaskRunning(fooSpec, 2, new TextNode("active"))).
- workerState(new WorkerRunning(fooSpec, 2, new TextNode("active"))).
+ workerState(new WorkerRunning("foo", fooSpec, 2, new TextNode("active"))).
build()).
waitFor(cluster.coordinatorClient()).
waitFor(cluster.agentClient("node02"));
@@ -149,7 +166,7 @@ public class CoordinatorTest {
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
taskState(new TaskRunning(fooSpec, 11, status1)).
- workerState(new WorkerRunning(fooSpec, 11, new TextNode("active"))).
+ workerState(new WorkerRunning("foo", fooSpec, 11, new TextNode("active"))).
build()).
waitFor(coordinatorClient).
waitFor(agentClient1).
@@ -163,7 +180,7 @@ public class CoordinatorTest {
addTask(new ExpectedTaskBuilder("foo").
taskState(new TaskDone(fooSpec, 11, 13,
"", false, status2)).
- workerState(new WorkerDone(fooSpec, 11, 13, new TextNode("done"), "")).
+ workerState(new WorkerDone("foo", fooSpec, 11, 13, new TextNode("done"), "")).
build()).
waitFor(coordinatorClient).
waitFor(agentClient1).
@@ -206,7 +223,7 @@ public class CoordinatorTest {
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
taskState(new TaskRunning(fooSpec, 11, status1)).
- workerState(new WorkerRunning(fooSpec, 11, new TextNode("active"))).
+ workerState(new WorkerRunning("foo", fooSpec, 11, new TextNode("active"))).
build()).
waitFor(coordinatorClient).
waitFor(agentClient1).
@@ -221,11 +238,68 @@ public class CoordinatorTest {
addTask(new ExpectedTaskBuilder("foo").
taskState(new TaskDone(fooSpec, 11, 12, "",
true, status2)).
- workerState(new WorkerDone(fooSpec, 11, 12, new TextNode("done"), "")).
+ workerState(new WorkerDone("foo", fooSpec, 11, 12, new TextNode("done"), "")).
build()).
waitFor(coordinatorClient).
waitFor(agentClient1).
waitFor(agentClient2);
+
+ coordinatorClient.destroyTask(new DestroyTaskRequest("foo"));
+ new ExpectedTasks().
+ waitFor(coordinatorClient).
+ waitFor(agentClient1).
+ waitFor(agentClient2);
+ }
+ }
+
+ @Test
+ public void testTaskDestruction() throws Exception {
+ MockTime time = new MockTime(0, 0, 0);
+ Scheduler scheduler = new MockScheduler(time);
+ try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().
+ addCoordinator("node01").
+ addAgent("node01").
+ addAgent("node02").
+ scheduler(scheduler).
+ build()) {
+ CoordinatorClient coordinatorClient = cluster.coordinatorClient();
+ AgentClient agentClient1 = cluster.agentClient("node01");
+ AgentClient agentClient2 = cluster.agentClient("node02");
+
+ new ExpectedTasks().
+ waitFor(coordinatorClient).
+ waitFor(agentClient1).
+ waitFor(agentClient2);
+
+ NoOpTaskSpec fooSpec = new NoOpTaskSpec(2, 2);
+ coordinatorClient.destroyTask(new DestroyTaskRequest("foo"));
+ coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec));
+ NoOpTaskSpec barSpec = new NoOpTaskSpec(20, 20);
+ coordinatorClient.createTask(new CreateTaskRequest("bar", barSpec));
+ coordinatorClient.destroyTask(new DestroyTaskRequest("bar"));
+ new ExpectedTasks().
+ addTask(new ExpectedTaskBuilder("foo").taskState(new TaskPending(fooSpec)).build()).
+ waitFor(coordinatorClient).
+ waitFor(agentClient1).
+ waitFor(agentClient2);
+ time.sleep(10);
+
+ ObjectNode status1 = new ObjectNode(JsonNodeFactory.instance);
+ status1.set("node01", new TextNode("active"));
+ status1.set("node02", new TextNode("active"));
+ new ExpectedTasks().
+ addTask(new ExpectedTaskBuilder("foo").
+ taskState(new TaskRunning(fooSpec, 10, status1)).
+ build()).
+ waitFor(coordinatorClient).
+ waitFor(agentClient1).
+ waitFor(agentClient2);
+
+ coordinatorClient.destroyTask(new DestroyTaskRequest("foo"));
+ new ExpectedTasks().
+ waitFor(coordinatorClient).
+ waitFor(agentClient1).
+ waitFor(agentClient2);
}
}
@@ -397,7 +471,7 @@ public class CoordinatorTest {
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
taskState(new TaskRunning(fooSpec, 2, new TextNode("active"))).
- workerState(new WorkerRunning(fooSpec, 2, new TextNode("active"))).
+ workerState(new WorkerRunning("foo", fooSpec, 2, new TextNode("active"))).
build()).
addTask(new ExpectedTaskBuilder("bar").
taskState(new TaskPending(barSpec)).
@@ -448,7 +522,7 @@ public class CoordinatorTest {
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
taskState(new TaskRunning(fooSpec, 2, status1)).
- workerState(new WorkerRunning(fooSpec, 2, new TextNode("active"))).
+ workerState(new WorkerRunning("foo", fooSpec, 2, new TextNode("active"))).
build()).
waitFor(coordinatorClient).
waitFor(cluster.agentClient("node02")).
@@ -461,14 +535,14 @@ public class CoordinatorTest {
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
taskState(new TaskRunning(fooSpec, 2, status2)).
- workerState(new WorkerRunning(fooSpec, 2, new TextNode("active"))).
+ workerState(new WorkerRunning("foo", fooSpec, 2, new TextNode("active"))).
build()).
waitFor(coordinatorClient).
waitFor(cluster.agentClient("node03"));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
taskState(new TaskRunning(fooSpec, 2, status2)).
- workerState(new WorkerDone(fooSpec, 2, 12, new TextNode("halted"), "")).
+ workerState(new WorkerDone("foo", fooSpec, 2, 12, new TextNode("halted"), "")).
build()).
waitFor(cluster.agentClient("node02"));
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/rest/RestExceptionMapperTest.java b/tools/src/test/java/org/apache/kafka/trogdor/rest/RestExceptionMapperTest.java
index c40f958eb8d..9c7f7522853 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/rest/RestExceptionMapperTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/rest/RestExceptionMapperTest.java
@@ -24,6 +24,8 @@ import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.core.Response;
+
+import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.SerializationException;
import org.junit.Test;
@@ -67,6 +69,13 @@ public class RestExceptionMapperTest {
assertEquals(resp.getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
}
+ @Test
+ public void testToResponseInvalidRequestException() {
+ RestExceptionMapper mapper = new RestExceptionMapper();
+ Response resp = mapper.toResponse(new InvalidRequestException("invalid request"));
+ assertEquals(resp.getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
+ }
+
@Test
public void testToResponseUnknownException() {
RestExceptionMapper mapper = new RestExceptionMapper();
@@ -84,7 +93,7 @@ public class RestExceptionMapperTest {
RestExceptionMapper.toException(Response.Status.NOT_IMPLEMENTED.getStatusCode(), "Not Implemented");
}
- @Test(expected = SerializationException.class)
+ @Test(expected = InvalidRequestException.class)
public void testToExceptionSerializationException() throws Exception {
RestExceptionMapper.toException(Response.Status.BAD_REQUEST.getStatusCode(), "Bad Request");
}