diff --git a/TROGDOR.md b/TROGDOR.md
index 168acfb78c1..b551773ab98 100644
--- a/TROGDOR.md
+++ b/TROGDOR.md
@@ -35,61 +35,26 @@ Let's confirm that all of the daemons are running:
115420 Kafka
115694 Agent
-Now, we can submit a test job to Trogdor. Here's an example of a short bash script which makes it easier.
+Now, we can submit a test job to Trogdor.
- > ./tests/bin/trogdor-run-produce-bench.sh
- Sent CreateTaskRequest for task produce_bench_21634.$TASK_ID = produce_bench_21634
+ > ./bin/trogdor.sh client createTask -t localhost:8889 -i produce0 --spec ./tests/spec/simple_produce_bench.json
+ Sent CreateTaskRequest for task produce0.
-To get the test results, we run --show-tasks:
+We can run showTask to see what the task's status is:
- ./bin/trogdor.sh client --show-tasks localhost:8889
- Got coordinator tasks: {
- "tasks" : {
- "produce_bench_21634" : {
- "state" : "DONE",
- "spec" : {
- "class" : "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
- "startMs" : 0,
- "durationMs" : 10000000,
- "producerNode" : "node0",
- "bootstrapServers" : "localhost:9092",
- "targetMessagesPerSec" : 10000,
- "maxMessages" : 50000,
- "keyGenerator" : {
- "type" : "sequential",
- "size" : 4,
- "startOffset" : 0
- },
- "valueGenerator" : {
- "type" : "constant",
- "size" : 512,
- "value" : "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="
- },
- "activeTopics" : {
- "foo[1-3]" : {
- "numPartitions" : 10,
- "replicationFactor" : 1
- }
- },
- "inactiveTopics" : {
- "foo[4-5]" : {
- "numPartitions" : 10,
- "replicationFactor" : 1
- }
- }
- },
- "startedMs" : 1541435949784,
- "doneMs" : 1541435955803,
- "cancelled" : false,
- "status" : {
- "totalSent" : 50000,
- "averageLatencyMs" : 11.0293,
- "p50LatencyMs" : 9,
- "p95LatencyMs" : 27,
- "p99LatencyMs" : 39
- }
- }
- }
+ > ./bin/trogdor.sh client showTask -t localhost:8889 -i produce0
+ Task bar of type org.apache.kafka.trogdor.workload.ProduceBenchSpec is DONE. FINISHED at 2019-01-09T20:38:22.039-08:00 after 6s
+
+To see the results, we use showTask with --show-status:
+ > ./bin/trogdor.sh client showTask -t localhost:8889 -i produce0 --show-status
+ Task bar of type org.apache.kafka.trogdor.workload.ProduceBenchSpec is DONE. FINISHED at 2019-01-09T20:38:22.039-08:00 after 6s
+ Status: {
+ "totalSent" : 50000,
+ "averageLatencyMs" : 17.83388,
+ "p50LatencyMs" : 12,
+ "p95LatencyMs" : 75,
+ "p99LatencyMs" : 96,
+ "transactionsCommitted" : 0
}
Trogdor Architecture
@@ -157,3 +122,15 @@ ProcessStopFault stops a process by sending it a SIGSTOP signal. When the fault
### NetworkPartitionFault
NetworkPartitionFault sets up an artificial network partition between one or more sets of nodes. Currently, this is implemented using iptables. The iptables rules are set up on the outbound traffic from the affected nodes. Therefore, the affected nodes should still be reachable from outside the cluster.
+
+Exec Mode
+========================================
+Sometimes, you just want to run a test quickly on a single node. In this case, you can use "exec mode." This mode allows you to run a single Trogdor Agent without a Coordinator.
+
+When using exec mode, you must pass in a Task specification to use. The Agent will try to start this task.
+
+For example:
+
+ > ./bin/trogdor.sh agent -n node0 -c ./config/trogdor.conf --exec ./tests/spec/simple_produce_bench.json
+
+When using exec mode, the Agent will exit once the task is complete.
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 1dcb133ba3f..dd78ba44fe1 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -59,6 +59,9 @@
+
+
diff --git a/tests/.gitignore b/tests/.gitignore
index 402057a5d6e..5eb616422fa 100644
--- a/tests/.gitignore
+++ b/tests/.gitignore
@@ -1,12 +1,7 @@
Vagrantfile.local
-
.idea/
-
*.pyc
*.ipynb
-
.DS_Store
-
.ducktape
results/
-*.json
diff --git a/tests/bin/trogdor-run-consume-bench.sh b/tests/bin/trogdor-run-consume-bench.sh
deleted file mode 100755
index 0df396989c2..00000000000
--- a/tests/bin/trogdor-run-consume-bench.sh
+++ /dev/null
@@ -1,39 +0,0 @@
-#!/usr/bin/env bash
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-COORDINATOR_ENDPOINT="localhost:8889"
-TASK_ID="consume_bench_$RANDOM"
-TASK_SPEC=$(
-cat < nodes = controller.targetNodes(platform.topology());
+ if (!nodes.contains(platform.curNode().name())) {
+ out.println("This task is not configured to run on this node. It runs on node(s): " +
+ Utils.join(nodes, ", ") + ", whereas this node is " +
+ platform.curNode().name());
+ return false;
+ }
+ KafkaFuture future = null;
+ try {
+ future = workerManager.createWorker(EXEC_WORKER_ID, EXEC_TASK_ID, spec);
+ } catch (Throwable e) {
+ out.println("createWorker failed");
+ e.printStackTrace(out);
+ return false;
+ }
+ out.println("Waiting for completion of task:" + JsonUtil.toPrettyJsonString(spec));
+ String error = future.get();
+ if (error == null || error.isEmpty()) {
+ out.println("Task succeeded with status " +
+ JsonUtil.toPrettyJsonString(workerManager.workerStates().get(EXEC_WORKER_ID).status()));
+ return true;
+ } else {
+ out.println("Task failed with status " +
+ JsonUtil.toPrettyJsonString(workerManager.workerStates().get(EXEC_WORKER_ID).status()) +
+ " and error " + error);
+ return false;
+ }
+ }
+
public static void main(String[] args) throws Exception {
ArgumentParser parser = ArgumentParsers
.newArgumentParser("trogdor-agent")
@@ -134,6 +220,12 @@ public final class Agent {
.dest("node_name")
.metavar("NODE_NAME")
.help("The name of this node.");
+ parser.addArgument("--exec", "-e")
+ .action(store())
+ .type(String.class)
+ .dest("task_spec")
+ .metavar("TASK_SPEC")
+ .help("Execute a single task spec and then exit. The argument is the task spec to load when starting up, or a path to it.");
Namespace res = null;
try {
res = parser.parseArgs(args);
@@ -148,6 +240,7 @@ public final class Agent {
}
String configPath = res.getString("config");
String nodeName = res.getString("node_name");
+ String taskSpec = res.getString("task_spec");
Platform platform = Platform.Config.parse(nodeName, configPath);
JsonRestServer restServer =
@@ -165,6 +258,18 @@ public final class Agent {
log.error("Got exception while running agent shutdown hook.", e);
}
}));
+ if (taskSpec != null) {
+ TaskSpec spec = null;
+ try {
+ spec = JsonUtil.objectFromCommandLineArgument(taskSpec, TaskSpec.class);
+ } catch (Exception e) {
+ System.out.println("Unable to parse the supplied task spec.");
+ e.printStackTrace();
+ Exit.exit(1);
+ }
+ TaskSpec effectiveSpec = agent.rebaseTaskSpecTime(spec);
+ Exit.exit(agent.exec(effectiveSpec, System.out) ? 0 : 1);
+ }
agent.waitForShutdown();
}
};
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java
index 55c3e7b1896..56f53f3ba19 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java
@@ -20,11 +20,12 @@ package org.apache.kafka.trogdor.agent;
import com.fasterxml.jackson.core.type.TypeReference;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
-import net.sourceforge.argparse4j.inf.ArgumentParserException;
-import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import net.sourceforge.argparse4j.inf.Subparsers;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.trogdor.common.JsonUtil;
+import org.apache.kafka.trogdor.common.StringFormatter;
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
import org.apache.kafka.trogdor.rest.DestroyWorkerRequest;
@@ -32,14 +33,25 @@ import org.apache.kafka.trogdor.rest.Empty;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse;
import org.apache.kafka.trogdor.rest.StopWorkerRequest;
+import org.apache.kafka.trogdor.rest.WorkerState;
+import org.apache.kafka.trogdor.task.TaskSpec;
import org.apache.kafka.trogdor.rest.UptimeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.core.UriBuilder;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
import static net.sourceforge.argparse4j.impl.Arguments.store;
import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
+import static org.apache.kafka.trogdor.common.StringFormatter.dateString;
+import static org.apache.kafka.trogdor.common.StringFormatter.durationString;
/**
* A client for the Trogdor agent.
@@ -158,95 +170,157 @@ public class AgentClient {
resp.body();
}
- public static void main(String[] args) throws Exception {
- ArgumentParser parser = ArgumentParsers
- .newArgumentParser("trogdor-agent-client")
- .defaultHelp(true)
- .description("The Trogdor fault injection agent client.");
- parser.addArgument("target")
+ private static void addTargetArgument(ArgumentParser parser) {
+ parser.addArgument("--target", "-t")
.action(store())
.required(true)
.type(String.class)
.dest("target")
.metavar("TARGET")
.help("A colon-separated host and port pair. For example, example.com:8888");
- MutuallyExclusiveGroup actions = parser.addMutuallyExclusiveGroup();
- actions.addArgument("--status")
- .action(storeTrue())
- .type(Boolean.class)
- .dest("status")
- .help("Get agent status.");
- actions.addArgument("--uptime")
- .action(storeTrue())
- .type(Boolean.class)
- .dest("uptime")
- .help("Get agent uptime.");
- actions.addArgument("--create-worker")
- .action(store())
- .type(String.class)
- .dest("create_worker")
- .metavar("SPEC_JSON")
- .help("Create a new fault.");
- actions.addArgument("--stop-worker")
- .action(store())
- .type(Long.class)
- .dest("stop_worker")
- .metavar("WORKER_ID")
- .help("Stop a worker ID.");
- actions.addArgument("--destroy-worker")
- .action(store())
- .type(Long.class)
- .dest("destroy_worker")
- .metavar("WORKER_ID")
- .help("Destroy a worker ID.");
- actions.addArgument("--shutdown")
- .action(storeTrue())
- .type(Boolean.class)
- .dest("shutdown")
- .help("Trigger agent shutdown");
+ }
- Namespace res = null;
- try {
- res = parser.parseArgs(args);
- } catch (ArgumentParserException e) {
- if (args.length == 0) {
- parser.printHelp();
- Exit.exit(0);
- } else {
- parser.handleError(e);
- Exit.exit(1);
- }
- }
+ private static void addJsonArgument(ArgumentParser parser) {
+ parser.addArgument("--json")
+ .action(storeTrue())
+ .dest("json")
+ .metavar("JSON")
+ .help("Show the full response as JSON.");
+ }
+
+ private static void addWorkerIdArgument(ArgumentParser parser, String help) {
+ parser.addArgument("--workerId")
+ .action(storeTrue())
+ .type(Long.class)
+ .dest("workerId")
+ .metavar("WORKER_ID")
+ .help(help);
+ }
+
+ public static void main(String[] args) throws Exception {
+ ArgumentParser rootParser = ArgumentParsers
+ .newArgumentParser("trogdor-agent-client")
+ .defaultHelp(true)
+ .description("The Trogdor agent client.");
+ Subparsers subParsers = rootParser.addSubparsers().
+ dest("command");
+ Subparser uptimeParser = subParsers.addParser("uptime")
+ .help("Get the agent uptime.");
+ addTargetArgument(uptimeParser);
+ addJsonArgument(uptimeParser);
+ Subparser statusParser = subParsers.addParser("status")
+ .help("Get the agent status.");
+ addTargetArgument(statusParser);
+ addJsonArgument(statusParser);
+ Subparser createWorkerParser = subParsers.addParser("createWorker")
+ .help("Create a new worker.");
+ addTargetArgument(createWorkerParser);
+ addWorkerIdArgument(createWorkerParser, "The worker ID to create.");
+ createWorkerParser.addArgument("--taskId")
+ .action(store())
+ .required(true)
+ .type(String.class)
+ .dest("taskId")
+ .metavar("TASK_ID")
+ .help("The task ID to create.");
+ createWorkerParser.addArgument("--spec", "-s")
+ .action(store())
+ .required(true)
+ .type(String.class)
+ .dest("taskSpec")
+ .metavar("TASK_SPEC")
+ .help("The task spec to create, or a path to a file containing the task spec.");
+ Subparser stopWorkerParser = subParsers.addParser("stopWorker")
+ .help("Stop a worker.");
+ addTargetArgument(stopWorkerParser);
+ addWorkerIdArgument(stopWorkerParser, "The worker ID to stop.");
+ Subparser destroyWorkerParser = subParsers.addParser("destroyWorker")
+ .help("Destroy a worker.");
+ addTargetArgument(destroyWorkerParser);
+ addWorkerIdArgument(destroyWorkerParser, "The worker ID to destroy.");
+ Subparser shutdownParser = subParsers.addParser("shutdown")
+ .help("Shut down the agent.");
+ addTargetArgument(shutdownParser);
+
+ Namespace res = rootParser.parseArgsOrFail(args);
String target = res.getString("target");
AgentClient client = new Builder().
maxTries(3).
target(target).
build();
- if (res.getBoolean("status")) {
- System.out.println("Got agent status: " +
- JsonUtil.toPrettyJsonString(client.status()));
- } else if (res.getBoolean("uptime")) {
- System.out.println("Got agent uptime: " +
- JsonUtil.toPrettyJsonString(client.uptime()));
- } else if (res.getString("create_worker") != null) {
- CreateWorkerRequest req = JsonUtil.JSON_SERDE.
- readValue(res.getString("create_worker"), CreateWorkerRequest.class);
- client.createWorker(req);
- System.out.printf("Sent CreateWorkerRequest for worker %d%n.", req.workerId());
- } else if (res.getString("stop_worker") != null) {
- long workerId = res.getLong("stop_worker");
- client.stopWorker(new StopWorkerRequest(workerId));
- System.out.printf("Sent StopWorkerRequest for worker %d%n.", workerId);
- } else if (res.getString("destroy_worker") != null) {
- long workerId = res.getLong("stop_worker");
- client.destroyWorker(new DestroyWorkerRequest(workerId));
- System.out.printf("Sent DestroyWorkerRequest for worker %d%n.", workerId);
- } else if (res.getBoolean("shutdown")) {
- client.invokeShutdown();
- System.out.println("Sent ShutdownRequest.");
- } else {
- System.out.println("You must choose an action. Type --help for help.");
- Exit.exit(1);
+ ZoneOffset localOffset = OffsetDateTime.now().getOffset();
+ switch (res.getString("command")) {
+ case "uptime": {
+ UptimeResponse uptime = client.uptime();
+ if (res.getBoolean("json")) {
+ System.out.println(JsonUtil.toJsonString(uptime));
+ } else {
+ System.out.printf("Agent is running at %s.%n", target);
+ System.out.printf("\tStart time: %s%n",
+ dateString(uptime.serverStartMs(), localOffset));
+ System.out.printf("\tCurrent server time: %s%n",
+ dateString(uptime.nowMs(), localOffset));
+ System.out.printf("\tUptime: %s%n",
+ durationString(uptime.nowMs() - uptime.serverStartMs()));
+ }
+ break;
+ }
+ case "status": {
+ AgentStatusResponse status = client.status();
+ if (res.getBoolean("json")) {
+ System.out.println(JsonUtil.toJsonString(status));
+ } else {
+ System.out.printf("Agent is running at %s.%n", target);
+ System.out.printf("\tStart time: %s%n",
+ dateString(status.serverStartMs(), localOffset));
+ List> lines = new ArrayList<>();
+ List header = new ArrayList<>(
+ Arrays.asList("WORKER_ID", "TASK_ID", "STATE", "TASK_TYPE"));
+ lines.add(header);
+ for (Map.Entry entry : status.workers().entrySet()) {
+ List cols = new ArrayList<>();
+ cols.add(Long.toString(entry.getKey()));
+ cols.add(entry.getValue().taskId());
+ cols.add(entry.getValue().getClass().getSimpleName());
+ cols.add(entry.getValue().spec().getClass().getCanonicalName());
+ lines.add(cols);
+ }
+ System.out.print(StringFormatter.prettyPrintGrid(lines));
+ }
+ break;
+ }
+ case "createWorker": {
+ long workerId = res.getLong("workerId");
+ String taskId = res.getString("taskId");
+ TaskSpec taskSpec = JsonUtil.
+ objectFromCommandLineArgument(res.getString("taskSpec"), TaskSpec.class);
+ CreateWorkerRequest req =
+ new CreateWorkerRequest(workerId, taskId, taskSpec);
+ client.createWorker(req);
+ System.out.printf("Sent CreateWorkerRequest for worker %d%n.", req.workerId());
+ break;
+ }
+ case "stopWorker": {
+ long workerId = res.getLong("workerId");
+ client.stopWorker(new StopWorkerRequest(workerId));
+ System.out.printf("Sent StopWorkerRequest for worker %d%n.", workerId);
+ break;
+ }
+ case "destroyWorker": {
+ long workerId = res.getLong("workerId");
+ client.destroyWorker(new DestroyWorkerRequest(workerId));
+ System.out.printf("Sent DestroyWorkerRequest for worker %d%n.", workerId);
+ break;
+ }
+ case "shutdown": {
+ client.invokeShutdown();
+ System.out.println("Sent ShutdownRequest.");
+ break;
+ }
+ default: {
+ System.out.println("You must choose an action. Type --help for help.");
+ Exit.exit(1);
+ }
}
}
};
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
index bf3d293e417..ba0c3b5c1b4 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
@@ -230,6 +230,11 @@ public final class WorkerManager {
*/
private Future timeoutFuture = null;
+ /**
+ * A future which is completed when the task transitions to DONE state.
+ */
+ private KafkaFutureImpl doneFuture = null;
+
/**
* A shutdown manager reference which will keep the WorkerManager
* alive for as long as this worker is alive.
@@ -300,6 +305,7 @@ public final class WorkerManager {
reference.close();
reference = null;
}
+ doneFuture.complete(error);
}
@Override
@@ -308,20 +314,21 @@ public final class WorkerManager {
}
}
- public void createWorker(long workerId, String taskId, TaskSpec spec) throws Throwable {
+ public KafkaFuture createWorker(long workerId, String taskId, TaskSpec spec) throws Throwable {
try (ShutdownManager.Reference ref = shutdownManager.takeReference()) {
final Worker worker = stateChangeExecutor.
submit(new CreateWorker(workerId, taskId, spec, time.milliseconds())).get();
- if (worker == null) {
+ if (worker.doneFuture != null) {
log.info("{}: Ignoring request to create worker {}, because there is already " +
"a worker with that id.", nodeName, workerId);
- return;
+ return worker.doneFuture;
}
+ worker.doneFuture = new KafkaFutureImpl<>();
if (worker.spec.endMs() <= time.milliseconds()) {
log.info("{}: Will not run worker {} as it has expired.", nodeName, worker);
stateChangeExecutor.submit(new HandleWorkerHalting(worker,
"worker expired", true));
- return;
+ return worker.doneFuture;
}
KafkaFutureImpl haltFuture = new KafkaFutureImpl<>();
haltFuture.thenApply((KafkaFuture.BaseFunction) errorString -> {
@@ -345,6 +352,7 @@ public final class WorkerManager {
"worker.start() exception: " + Utils.stackTrace(e), true));
}
stateChangeExecutor.submit(new FinishCreatingWorker(worker));
+ return worker.doneFuture;
} catch (ExecutionException e) {
if (e.getCause() instanceof RequestConflictException) {
log.info("{}: request conflict while creating worker {} for task {} with spec {}.",
@@ -385,7 +393,7 @@ public final class WorkerManager {
throw new RequestConflictException("There is already a worker ID " + workerId +
" with a different task spec.");
} else {
- return null;
+ return worker;
}
}
worker = new Worker(workerId, taskId, spec, now);
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/JsonUtil.java b/tools/src/main/java/org/apache/kafka/trogdor/common/JsonUtil.java
index ad90ffc6e84..b3915e466ca 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/common/JsonUtil.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/common/JsonUtil.java
@@ -18,12 +18,15 @@
package org.apache.kafka.trogdor.common;
import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
+import java.io.File;
+
/**
* Utilities for working with JSON.
*/
@@ -34,6 +37,7 @@ public class JsonUtil {
JSON_SERDE = new ObjectMapper();
JSON_SERDE.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
JSON_SERDE.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true);
+ JSON_SERDE.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
JSON_SERDE.registerModule(new Jdk8Module());
JSON_SERDE.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);
}
@@ -53,4 +57,41 @@ public class JsonUtil {
throw new RuntimeException(e);
}
}
+
+ /**
+ * Determine if a string is a JSON object literal.
+ * Object literals must begin with an open brace.
+ *
+ * @param input The input string.
+ * @return True if the string is a JSON literal.
+ */
+ static boolean openBraceComesFirst(String input) {
+ for (int i = 0; i < input.length(); i++) {
+ char c = input.charAt(i);
+ if (!Character.isWhitespace(c)) {
+ return c == '{';
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Read a JSON object from a command-line argument. This can take the form of a path to
+ * a file containing the JSON object, or simply the raw JSON object itself. We will assume
+ * that if the string is a valid JSON object, the latter is true. If you want to specify a
+ * file name containing an open brace, you can force it to be interpreted as a file name be
+ * prefixing a ./ or full path.
+ *
+ * @param argument The command-line argument.
+ * @param clazz The class of the object to be read.
+ * @param The object type.
+ * @return The object which we read.
+ */
+ public static T objectFromCommandLineArgument(String argument, Class clazz) throws Exception {
+ if (openBraceComesFirst(argument)) {
+ return JSON_SERDE.readValue(argument, clazz);
+ } else {
+ return JSON_SERDE.readValue(new File(argument), clazz);
+ }
+ }
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/StringFormatter.java b/tools/src/main/java/org/apache/kafka/trogdor/common/StringFormatter.java
new file mode 100644
index 00000000000..2e4a91c9a2d
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/common/StringFormatter.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.common;
+
+import java.time.Duration;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * Utilities for formatting strings.
+ */
+public class StringFormatter {
+ /**
+ * Pretty-print a date string.
+ *
+ * @param timeMs The time since the epoch in milliseconds.
+ * @param zoneOffset The time zone offset.
+ * @return The date string in ISO format.
+ */
+ public static String dateString(long timeMs, ZoneOffset zoneOffset) {
+ return new Date(timeMs).toInstant().
+ atOffset(zoneOffset).
+ format(DateTimeFormatter.ISO_OFFSET_DATE_TIME);
+ }
+
+ /**
+ * Pretty-print a duration.
+ *
+ * @param periodMs The duration in milliseconds.
+ * @return A human-readable duration string.
+ */
+ public static String durationString(long periodMs) {
+ StringBuilder bld = new StringBuilder();
+ Duration duration = Duration.ofMillis(periodMs);
+ long hours = duration.toHours();
+ if (hours > 0) {
+ bld.append(hours).append("h");
+ duration = duration.minusHours(hours);
+ }
+ long minutes = duration.toMinutes();
+ if (minutes > 0) {
+ bld.append(minutes).append("m");
+ duration = duration.minusMinutes(minutes);
+ }
+ long seconds = duration.getSeconds();
+ if ((seconds != 0) || bld.toString().isEmpty()) {
+ bld.append(seconds).append("s");
+ }
+ return bld.toString();
+ }
+
+ /**
+ * Formats strings in a grid pattern.
+ *
+ * All entries in the same column will have the same width.
+ *
+ * @param lines A list of lines. Each line contains a list of columns.
+ * Each line must contain the same number of columns.
+ * @return The string.
+ */
+ public static String prettyPrintGrid(List> lines) {
+ int numColumns = -1;
+ int rowIndex = 0;
+ for (List col : lines) {
+ if (numColumns == -1) {
+ numColumns = col.size();
+ } else if (numColumns != col.size()) {
+ throw new RuntimeException("Expected " + numColumns + " columns in row " +
+ rowIndex + ", but got " + col.size());
+ }
+ rowIndex++;
+ }
+ List widths = new ArrayList<>(numColumns);
+ for (int x = 0; x < numColumns; x++) {
+ int w = 0;
+ for (List cols : lines) {
+ w = Math.max(w, cols.get(x).length() + 1);
+ }
+ widths.add(w);
+ }
+ StringBuilder bld = new StringBuilder();
+ for (int y = 0; y < lines.size(); y++) {
+ List cols = lines.get(y);
+ for (int x = 0; x < cols.size(); x++) {
+ String val = cols.get(x);
+ int minWidth = widths.get(x);
+ bld.append(val);
+ for (int i = 0; i < minWidth - val.length(); i++) {
+ bld.append(" ");
+ }
+ }
+ bld.append(String.format("%n"));
+ }
+ return bld.toString();
+ }
+}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
index 3765b3b8882..476c32d8b84 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
@@ -20,11 +20,13 @@ package org.apache.kafka.trogdor.coordinator;
import com.fasterxml.jackson.core.type.TypeReference;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
-import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import net.sourceforge.argparse4j.inf.Subparsers;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.trogdor.common.JsonUtil;
+import org.apache.kafka.trogdor.common.StringFormatter;
import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
import org.apache.kafka.trogdor.rest.CreateTaskRequest;
import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
@@ -32,10 +34,16 @@ import org.apache.kafka.trogdor.rest.Empty;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse;
import org.apache.kafka.trogdor.rest.StopTaskRequest;
+import org.apache.kafka.trogdor.rest.TaskDone;
+import org.apache.kafka.trogdor.rest.TaskPending;
import org.apache.kafka.trogdor.rest.TaskRequest;
+import org.apache.kafka.trogdor.rest.TaskRunning;
+import org.apache.kafka.trogdor.rest.TaskStateType;
+import org.apache.kafka.trogdor.rest.TaskStopping;
import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TaskState;
import org.apache.kafka.trogdor.rest.TasksResponse;
+import org.apache.kafka.trogdor.task.TaskSpec;
import org.apache.kafka.trogdor.rest.UptimeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,10 +51,22 @@ import org.slf4j.LoggerFactory;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.core.UriBuilder;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.TreeMap;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+import static net.sourceforge.argparse4j.impl.Arguments.append;
import static net.sourceforge.argparse4j.impl.Arguments.store;
import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
+import static org.apache.kafka.trogdor.common.StringFormatter.dateString;
+import static org.apache.kafka.trogdor.common.StringFormatter.durationString;
/**
* A client for the Trogdor coordinator.
@@ -158,6 +178,9 @@ public class CoordinatorClient {
uriBuilder.queryParam("lastStartMs", request.lastStartMs());
uriBuilder.queryParam("firstEndMs", request.firstEndMs());
uriBuilder.queryParam("lastEndMs", request.lastEndMs());
+ if (request.state().isPresent()) {
+ uriBuilder.queryParam("state", request.state().get().toString());
+ }
HttpResponse resp =
JsonRestServer.httpRequest(log, uriBuilder.build().toString(), "GET",
null, new TypeReference() { }, maxTries);
@@ -178,119 +201,309 @@ public class CoordinatorClient {
resp.body();
}
- public static void main(String[] args) throws Exception {
- ArgumentParser parser = ArgumentParsers
- .newArgumentParser("trogdor-coordinator-client")
- .defaultHelp(true)
- .description("The Trogdor fault injection coordinator client.");
- parser.addArgument("target")
+ private static void addTargetArgument(ArgumentParser parser) {
+ parser.addArgument("--target", "-t")
.action(store())
.required(true)
.type(String.class)
.dest("target")
.metavar("TARGET")
.help("A colon-separated host and port pair. For example, example.com:8889");
- MutuallyExclusiveGroup actions = parser.addMutuallyExclusiveGroup();
- actions.addArgument("--status")
- .action(storeTrue())
- .type(Boolean.class)
- .dest("status")
- .help("Get coordinator status.");
- actions.addArgument("--uptime")
- .action(storeTrue())
- .type(Boolean.class)
- .dest("uptime")
- .help("Get coordinator uptime.");
- actions.addArgument("--show-tasks")
- .action(storeTrue())
- .type(Boolean.class)
- .dest("show_tasks")
- .help("Show coordinator tasks.");
- actions.addArgument("--show-task")
- .action(store())
- .type(String.class)
- .dest("show_task")
- .metavar("TASK_ID")
- .help("Show a specific coordinator task.");
- actions.addArgument("--create-task")
- .action(store())
- .type(String.class)
- .dest("create_task")
- .metavar("TASK_SPEC_JSON")
- .help("Create a new task from a task spec.");
- actions.addArgument("--stop-task")
- .action(store())
- .type(String.class)
- .dest("stop_task")
- .metavar("TASK_ID")
- .help("Stop a task.");
- actions.addArgument("--destroy-task")
- .action(store())
- .type(String.class)
- .dest("destroy_task")
- .metavar("TASK_ID")
- .help("Destroy a task.");
- actions.addArgument("--shutdown")
- .action(storeTrue())
- .type(Boolean.class)
- .dest("shutdown")
- .help("Trigger coordinator shutdown");
+ }
- Namespace res = null;
- try {
- res = parser.parseArgs(args);
- } catch (ArgumentParserException e) {
- if (args.length == 0) {
- parser.printHelp();
- Exit.exit(0);
- } else {
- parser.handleError(e);
- Exit.exit(1);
- }
- }
+ private static void addJsonArgument(ArgumentParser parser) {
+ parser.addArgument("--json")
+ .action(storeTrue())
+ .dest("json")
+ .metavar("JSON")
+ .help("Show the full response as JSON.");
+ }
+
+ public static void main(String[] args) throws Exception {
+ ArgumentParser rootParser = ArgumentParsers
+ .newArgumentParser("trogdor-coordinator-client")
+ .description("The Trogdor coordinator client.");
+ Subparsers subParsers = rootParser.addSubparsers().
+ dest("command");
+ Subparser uptimeParser = subParsers.addParser("uptime")
+ .help("Get the coordinator uptime.");
+ addTargetArgument(uptimeParser);
+ addJsonArgument(uptimeParser);
+ Subparser statusParser = subParsers.addParser("status")
+ .help("Get the coordinator status.");
+ addTargetArgument(statusParser);
+ addJsonArgument(statusParser);
+ Subparser showTaskParser = subParsers.addParser("showTask")
+ .help("Show a coordinator task.");
+ addTargetArgument(showTaskParser);
+ addJsonArgument(showTaskParser);
+ showTaskParser.addArgument("--id", "-i")
+ .action(store())
+ .required(true)
+ .type(String.class)
+ .dest("taskId")
+ .metavar("TASK_ID")
+ .help("The task ID to show.");
+ showTaskParser.addArgument("--verbose", "-v")
+ .action(storeTrue())
+ .dest("verbose")
+ .metavar("VERBOSE")
+ .help("Print out everything.");
+ showTaskParser.addArgument("--show-status", "-S")
+ .action(storeTrue())
+ .dest("showStatus")
+ .metavar("SHOW_STATUS")
+ .help("Show the task status.");
+ Subparser showTasksParser = subParsers.addParser("showTasks")
+ .help("Show many coordinator tasks. By default, all tasks are shown, but " +
+ "command-line options can be specified as filters.");
+ addTargetArgument(showTasksParser);
+ addJsonArgument(showTasksParser);
+ MutuallyExclusiveGroup idGroup = showTasksParser.addMutuallyExclusiveGroup();
+ idGroup.addArgument("--id", "-i")
+ .action(append())
+ .type(String.class)
+ .dest("taskIds")
+ .metavar("TASK_IDS")
+ .help("Show only this task ID. This option may be specified multiple times.");
+ idGroup.addArgument("--id-pattern")
+ .action(store())
+ .type(String.class)
+ .dest("taskIdPattern")
+ .metavar("TASK_ID_PATTERN")
+ .help("Only display tasks which match the given ID pattern.");
+ showTasksParser.addArgument("--state", "-s")
+ .type(TaskStateType.class)
+ .dest("taskStateType")
+ .metavar("TASK_STATE_TYPE")
+ .help("Show only tasks in this state.");
+ Subparser createTaskParser = subParsers.addParser("createTask")
+ .help("Create a new task.");
+ addTargetArgument(createTaskParser);
+ createTaskParser.addArgument("--id", "-i")
+ .action(store())
+ .required(true)
+ .type(String.class)
+ .dest("taskId")
+ .metavar("TASK_ID")
+ .help("The task ID to create.");
+ createTaskParser.addArgument("--spec", "-s")
+ .action(store())
+ .required(true)
+ .type(String.class)
+ .dest("taskSpec")
+ .metavar("TASK_SPEC")
+ .help("The task spec to create, or a path to a file containing the task spec.");
+ Subparser stopTaskParser = subParsers.addParser("stopTask")
+ .help("Stop a task.");
+ addTargetArgument(stopTaskParser);
+ stopTaskParser.addArgument("--id", "-i")
+ .action(store())
+ .required(true)
+ .type(String.class)
+ .dest("taskId")
+ .metavar("TASK_ID")
+ .help("The task ID to create.");
+ Subparser destroyTaskParser = subParsers.addParser("destroyTask")
+ .help("Destroy a task.");
+ addTargetArgument(destroyTaskParser);
+ destroyTaskParser.addArgument("--id", "-i")
+ .action(store())
+ .required(true)
+ .type(String.class)
+ .dest("taskId")
+ .metavar("TASK_ID")
+ .help("The task ID to destroy.");
+ Subparser shutdownParser = subParsers.addParser("shutdown")
+ .help("Shut down the coordinator.");
+ addTargetArgument(shutdownParser);
+
+ Namespace res = rootParser.parseArgsOrFail(args);
String target = res.getString("target");
CoordinatorClient client = new Builder().
maxTries(3).
target(target).
build();
- if (res.getBoolean("status")) {
- System.out.println("Got coordinator status: " +
- JsonUtil.toPrettyJsonString(client.status()));
- } else if (res.getBoolean("uptime")) {
- System.out.println("Got coordinator uptime: " +
- JsonUtil.toPrettyJsonString(client.uptime()));
- } else if (res.getBoolean("show_tasks")) {
- System.out.println("Got coordinator tasks: " +
- JsonUtil.toPrettyJsonString(client.tasks(
- new TasksRequest(null, 0, 0, 0, 0, Optional.empty()))));
- } else if (res.getString("show_task") != null) {
- String taskId = res.getString("show_task");
- TaskRequest req = new TaskRequest(res.getString("show_task"));
- try {
- String taskOutput = String.format("Got coordinator task \"%s\": %s", taskId, JsonUtil.toPrettyJsonString(client.task(req)));
- System.out.println(taskOutput);
- } catch (NotFoundException e) {
- System.out.println(e.getMessage());
+ ZoneOffset localOffset = OffsetDateTime.now().getOffset();
+ switch (res.getString("command")) {
+ case "uptime": {
+ UptimeResponse uptime = client.uptime();
+ if (res.getBoolean("json")) {
+ System.out.println(JsonUtil.toJsonString(uptime));
+ } else {
+ System.out.printf("Coordinator is running at %s.%n", target);
+ System.out.printf("\tStart time: %s%n",
+ dateString(uptime.serverStartMs(), localOffset));
+ System.out.printf("\tCurrent server time: %s%n",
+ dateString(uptime.nowMs(), localOffset));
+ System.out.printf("\tUptime: %s%n",
+ durationString(uptime.nowMs() - uptime.serverStartMs()));
+ }
+ break;
+ }
+ case "status": {
+ CoordinatorStatusResponse response = client.status();
+ if (res.getBoolean("json")) {
+ System.out.println(JsonUtil.toJsonString(response));
+ } else {
+ System.out.printf("Coordinator is running at %s.%n", target);
+ System.out.printf("\tStart time: %s%n", dateString(response.serverStartMs(), localOffset));
+ }
+ break;
+ }
+ case "showTask": {
+ String taskId = res.getString("taskId");
+ TaskRequest req = new TaskRequest(taskId);
+ TaskState taskState = null;
+ try {
+ taskState = client.task(req);
+ } catch (NotFoundException e) {
+ System.out.printf("Task %s was not found.%n", taskId);
+ Exit.exit(1);
+ }
+ if (res.getBoolean("json")) {
+ System.out.println(JsonUtil.toJsonString(taskState));
+ } else {
+ System.out.printf("Task %s of type %s is %s. %s%n", taskId,
+ taskState.spec().getClass().getCanonicalName(),
+ taskState.stateType(), prettyPrintTaskInfo(taskState, localOffset));
+ if (taskState instanceof TaskDone) {
+ TaskDone taskDone = (TaskDone) taskState;
+ if ((taskDone.error() != null) && (!taskDone.error().isEmpty())) {
+ System.out.printf("Error: %s%n", taskDone.error());
+ }
+ }
+ if (res.getBoolean("verbose")) {
+ System.out.printf("Spec: %s%n%n", JsonUtil.toPrettyJsonString(taskState.spec()));
+ }
+ if (res.getBoolean("verbose") || res.getBoolean("showStatus")) {
+ System.out.printf("Status: %s%n%n", JsonUtil.toPrettyJsonString(taskState.status()));
+ }
+ }
+ break;
+ }
+ case "showTasks": {
+ TaskStateType taskStateType = res.get("taskStateType");
+ List taskIds = new ArrayList<>();
+ Pattern taskIdPattern = null;
+ if (res.getList("taskIds") != null) {
+ for (Object taskId : res.getList("taskIds")) {
+ taskIds.add((String) taskId);
+ }
+ } else if (res.getString("taskIdPattern") != null) {
+ try {
+ taskIdPattern = Pattern.compile(res.getString("taskIdPattern"));
+ } catch (PatternSyntaxException e) {
+ System.out.println("Invalid task ID regular expression " + res.getString("taskIdPattern"));
+ e.printStackTrace();
+ Exit.exit(1);
+ }
+ }
+ TasksRequest req = new TasksRequest(taskIds, 0, 0, 0, 0,
+ Optional.ofNullable(taskStateType));
+ TasksResponse response = client.tasks(req);
+ if (taskIdPattern != null) {
+ TreeMap filteredTasks = new TreeMap<>();
+ for (Map.Entry entry : response.tasks().entrySet()) {
+ if (taskIdPattern.matcher(entry.getKey()).matches()) {
+ filteredTasks.put(entry.getKey(), entry.getValue());
+ }
+ }
+ response = new TasksResponse(filteredTasks);
+ }
+ if (res.getBoolean("json")) {
+ System.out.println(JsonUtil.toJsonString(response));
+ } else {
+ System.out.println(prettyPrintTasksResponse(response, localOffset));
+ }
+ if (response.tasks().isEmpty()) {
+ Exit.exit(1);
+ }
+ break;
+ }
+ case "createTask": {
+ String taskId = res.getString("taskId");
+ TaskSpec taskSpec = JsonUtil.
+ objectFromCommandLineArgument(res.getString("taskSpec"), TaskSpec.class);
+ CreateTaskRequest req = new CreateTaskRequest(taskId, taskSpec);
+ client.createTask(req);
+ System.out.printf("Sent CreateTaskRequest for task %s.%n", req.id());
+ break;
+ }
+ case "stopTask": {
+ String taskId = res.getString("taskId");
+ StopTaskRequest req = new StopTaskRequest(taskId);
+ client.stopTask(req);
+ System.out.printf("Sent StopTaskRequest for task %s.%n", taskId);
+ break;
+ }
+ case "destroyTask": {
+ String taskId = res.getString("taskId");
+ DestroyTaskRequest req = new DestroyTaskRequest(taskId);
+ client.destroyTask(req);
+ System.out.printf("Sent DestroyTaskRequest for task %s.%n", taskId);
+ break;
+ }
+ case "shutdown": {
+ client.shutdown();
+ System.out.println("Sent ShutdownRequest.");
+ break;
+ }
+ default: {
+ System.out.println("You must choose an action. Type --help for help.");
+ Exit.exit(1);
}
- } else if (res.getString("create_task") != null) {
- CreateTaskRequest req = JsonUtil.JSON_SERDE.
- readValue(res.getString("create_task"), CreateTaskRequest.class);
- client.createTask(req);
- System.out.printf("Sent CreateTaskRequest for task %s.", req.id());
- } else if (res.getString("stop_task") != null) {
- String taskId = res.getString("stop_task");
- client.stopTask(new StopTaskRequest(taskId));
- System.out.printf("Sent StopTaskRequest for task %s.%n", taskId);
- } else if (res.getString("destroy_task") != null) {
- String taskId = res.getString("destroy_task");
- client.destroyTask(new DestroyTaskRequest(taskId));
- System.out.printf("Sent DestroyTaskRequest for task %s.%n", taskId);
- } else if (res.getBoolean("shutdown")) {
- client.shutdown();
- System.out.println("Sent ShutdownRequest.");
- } else {
- System.out.println("You must choose an action. Type --help for help.");
- Exit.exit(1);
}
}
-};
+
+ static String prettyPrintTasksResponse(TasksResponse response, ZoneOffset zoneOffset) {
+ if (response.tasks().isEmpty()) {
+ return "No matching tasks found.";
+ }
+ List> lines = new ArrayList<>();
+ List header = new ArrayList<>(
+ Arrays.asList("ID", "TYPE", "STATE", "INFO"));
+ lines.add(header);
+ for (Map.Entry entry : response.tasks().entrySet()) {
+ String taskId = entry.getKey();
+ TaskState taskState = entry.getValue();
+ List cols = new ArrayList<>();
+ cols.add(taskId);
+ cols.add(taskState.spec().getClass().getCanonicalName());
+ cols.add(taskState.stateType().toString());
+ cols.add(prettyPrintTaskInfo(taskState, zoneOffset));
+ lines.add(cols);
+ }
+ return StringFormatter.prettyPrintGrid(lines);
+ }
+
+ static String prettyPrintTaskInfo(TaskState taskState, ZoneOffset zoneOffset) {
+ if (taskState instanceof TaskPending) {
+ return "Will start at " + dateString(taskState.spec().startMs(), zoneOffset);
+ } else if (taskState instanceof TaskRunning) {
+ TaskRunning runState = (TaskRunning) taskState;
+ return "Started " + dateString(runState.startedMs(), zoneOffset) +
+ "; will stop after " + durationString(taskState.spec().durationMs());
+ } else if (taskState instanceof TaskStopping) {
+ TaskStopping stoppingState = (TaskStopping) taskState;
+ return "Started " + dateString(stoppingState.startedMs(), zoneOffset);
+ } else if (taskState instanceof TaskDone) {
+ TaskDone doneState = (TaskDone) taskState;
+ String status = null;
+ if (doneState.error() == null || doneState.error().isEmpty()) {
+ if (doneState.cancelled()) {
+ status = "CANCELLED";
+ } else {
+ status = "FINISHED";
+ }
+ } else {
+ status = "FAILED";
+ }
+ return String.format("%s at %s after %s", status,
+ dateString(doneState.doneMs(), zoneOffset),
+ durationString(doneState.doneMs() - doneState.startedMs()));
+ } else {
+ throw new RuntimeException("Unknown task state type " + taskState.stateType());
+ }
+ }
+}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java
index e8d6003bede..6e9761bf5e5 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java
@@ -78,4 +78,9 @@ public class TaskDone extends TaskState {
public boolean cancelled() {
return cancelled;
}
+
+ @Override
+ public TaskStateType stateType() {
+ return TaskStateType.DONE;
+ }
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java
index 7831301425c..ca1d314a53a 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java
@@ -30,4 +30,9 @@ public class TaskPending extends TaskState {
public TaskPending(@JsonProperty("spec") TaskSpec spec) {
super(spec, NullNode.instance);
}
+
+ @Override
+ public TaskStateType stateType() {
+ return TaskStateType.PENDING;
+ }
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java
index 7a81bdf7867..8487bc39d16 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java
@@ -43,4 +43,9 @@ public class TaskRunning extends TaskState {
public long startedMs() {
return startedMs;
}
+
+ @Override
+ public TaskStateType stateType() {
+ return TaskStateType.RUNNING;
+ }
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
index 24288934b8c..b47836ed23e 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
@@ -55,4 +55,6 @@ public abstract class TaskState extends Message {
public JsonNode status() {
return status;
}
+
+ public abstract TaskStateType stateType();
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java
index d40b43c485c..2b2c4c4ec0b 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java
@@ -43,4 +43,9 @@ public class TaskStopping extends TaskState {
public long startedMs() {
return startedMs;
}
+
+ @Override
+ public TaskStateType stateType() {
+ return TaskStateType.STOPPING;
+ }
}
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
index 425fe65ab20..34126685d12 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
@@ -28,7 +28,9 @@ import org.apache.kafka.trogdor.basic.BasicPlatform;
import org.apache.kafka.trogdor.basic.BasicTopology;
import org.apache.kafka.trogdor.common.ExpectedTasks;
import org.apache.kafka.trogdor.common.ExpectedTasks.ExpectedTaskBuilder;
+import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Node;
+import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.fault.FilesUnreadableFaultSpec;
import org.apache.kafka.trogdor.fault.Kibosh;
import org.apache.kafka.trogdor.fault.Kibosh.KiboshControlFile;
@@ -45,13 +47,17 @@ import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerRunning;
import org.apache.kafka.trogdor.task.NoOpTaskSpec;
import org.apache.kafka.trogdor.task.SampleTaskSpec;
+import org.apache.kafka.trogdor.task.TaskSpec;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -69,6 +75,7 @@ public class AgentTest {
private static BasicPlatform createBasicPlatform(Scheduler scheduler) {
TreeMap nodes = new TreeMap<>();
HashMap config = new HashMap<>();
+ config.put(Platform.Config.TROGDOR_AGENT_PORT, Integer.toString(Agent.DEFAULT_PORT));
nodes.put("node01", new BasicNode("node01", "localhost",
config, Collections.emptySet()));
BasicTopology topology = new BasicTopology(nodes);
@@ -447,4 +454,42 @@ public class AgentTest {
agent.beginShutdown();
agent.waitForShutdown();
}
+
+ static void testExec(Agent agent, String expected, boolean expectedReturn, TaskSpec spec) throws Exception {
+ ByteArrayOutputStream b = new ByteArrayOutputStream();
+ PrintStream p = new PrintStream(b, true, StandardCharsets.UTF_8.toString());
+ boolean actualReturn = agent.exec(spec, p);
+ assertEquals(expected, b.toString());
+ assertEquals(expectedReturn, actualReturn);
+ }
+
+ @Test
+ public void testAgentExecWithTimeout() throws Exception {
+ Agent agent = createAgent(Scheduler.SYSTEM);
+ NoOpTaskSpec spec = new NoOpTaskSpec(0, 1);
+ TaskSpec rebasedSpec = agent.rebaseTaskSpecTime(spec);
+ testExec(agent,
+ String.format("Waiting for completion of task:%s%n",
+ JsonUtil.toPrettyJsonString(rebasedSpec)) +
+ String.format("Task failed with status null and error worker expired%n"),
+ false, rebasedSpec);
+ agent.beginShutdown();
+ agent.waitForShutdown();
+ }
+
+ @Test
+ public void testAgentExecWithNormalExit() throws Exception {
+ Agent agent = createAgent(Scheduler.SYSTEM);
+ SampleTaskSpec spec = new SampleTaskSpec(0, 120000,
+ Collections.singletonMap("node01", 1L), "");
+ TaskSpec rebasedSpec = agent.rebaseTaskSpecTime(spec);
+ testExec(agent,
+ String.format("Waiting for completion of task:%s%n",
+ JsonUtil.toPrettyJsonString(rebasedSpec)) +
+ String.format("Task succeeded with status \"halted\"%n"),
+ true, rebasedSpec);
+ agent.beginShutdown();
+ agent.waitForShutdown();
+ }
+
};
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonUtilTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonUtilTest.java
new file mode 100644
index 00000000000..5acf88eab20
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonUtilTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.common;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class JsonUtilTest {
+ private static final Logger log = LoggerFactory.getLogger(JsonUtilTest.class);
+
+ @Rule
+ final public Timeout globalTimeout = Timeout.millis(120000);
+
+ @Test
+ public void testOpenBraceComesFirst() {
+ assertTrue(JsonUtil.openBraceComesFirst("{}"));
+ assertTrue(JsonUtil.openBraceComesFirst(" \t{\"foo\":\"bar\"}"));
+ assertTrue(JsonUtil.openBraceComesFirst(" { \"foo\": \"bar\" }"));
+ assertFalse(JsonUtil.openBraceComesFirst("/my/file/path"));
+ assertFalse(JsonUtil.openBraceComesFirst("mypath"));
+ assertFalse(JsonUtil.openBraceComesFirst(" blah{}"));
+ }
+
+ static final class Foo {
+ @JsonProperty
+ final int bar;
+
+ @JsonCreator
+ Foo(@JsonProperty("bar") int bar) {
+ this.bar = bar;
+ }
+ }
+
+ @Test
+ public void testObjectFromCommandLineArgument() throws Exception {
+ assertEquals(123, JsonUtil.
+ objectFromCommandLineArgument("{\"bar\":123}", Foo.class).bar);
+ assertEquals(1, JsonUtil.
+ objectFromCommandLineArgument(" {\"bar\": 1} ", Foo.class).bar);
+ File tempFile = TestUtils.tempFile();
+ try {
+ Files.write(tempFile.toPath(),
+ "{\"bar\": 456}".getBytes(StandardCharsets.UTF_8));
+ assertEquals(456, JsonUtil.
+ objectFromCommandLineArgument(tempFile.getAbsolutePath(), Foo.class).bar);
+ } finally {
+ Files.delete(tempFile.toPath());
+ }
+ }
+};
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/StringFormatterTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/StringFormatterTest.java
new file mode 100644
index 00000000000..a259e8479fd
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/StringFormatterTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.common;
+
+import org.junit.Rule;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.junit.Test;
+
+import java.time.ZoneOffset;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+import static org.apache.kafka.trogdor.common.StringFormatter.durationString;
+import static org.apache.kafka.trogdor.common.StringFormatter.dateString;
+
+public class StringFormatterTest {
+ private static final Logger log = LoggerFactory.getLogger(StringFormatterTest.class);
+
+ @Rule
+ final public Timeout globalTimeout = Timeout.millis(120000);
+
+ @Test
+ public void testDateString() {
+ assertEquals("2019-01-08T20:59:29.85Z",
+ dateString(1546981169850L, ZoneOffset.UTC));
+ }
+
+ @Test
+ public void testDurationString() {
+ assertEquals("1m", durationString(60000));
+ assertEquals("1m1s", durationString(61000));
+ assertEquals("1m1s", durationString(61200));
+ assertEquals("5s", durationString(5000));
+ assertEquals("2h", durationString(7200000));
+ assertEquals("2h1s", durationString(7201000));
+ assertEquals("2h5m3s", durationString(7503000));
+ }
+
+ @Test
+ public void testPrettyPrintGrid() {
+ assertEquals(String.format(
+ "ANIMAL NUMBER INDEX %n" +
+ "lion 1 12345 %n" +
+ "manatee 50 1 %n"),
+ StringFormatter.prettyPrintGrid(
+ Arrays.asList(Arrays.asList("ANIMAL", "NUMBER", "INDEX"),
+ Arrays.asList("lion", "1", "12345"),
+ Arrays.asList("manatee", "50", "1"))));
+ }
+};
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorClientTest.java b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorClientTest.java
new file mode 100644
index 00000000000..f5c0c6574e6
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorClientTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.coordinator;
+
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import org.apache.kafka.trogdor.rest.TaskDone;
+import org.apache.kafka.trogdor.rest.TaskPending;
+import org.apache.kafka.trogdor.rest.TaskRunning;
+import org.apache.kafka.trogdor.rest.TaskStopping;
+import org.apache.kafka.trogdor.task.NoOpTaskSpec;
+import org.junit.Rule;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.junit.Test;
+
+import java.time.ZoneOffset;
+
+import static org.junit.Assert.assertEquals;
+
+public class CoordinatorClientTest {
+ private static final Logger log = LoggerFactory.getLogger(CoordinatorTest.class);
+
+ @Rule
+ final public Timeout globalTimeout = Timeout.millis(120000);
+
+ @Test
+ public void testPrettyPrintTaskInfo() {
+ assertEquals("Will start at 2019-01-08T07:05:59.85Z",
+ CoordinatorClient.prettyPrintTaskInfo(
+ new TaskPending(new NoOpTaskSpec(1546931159850L, 9000)),
+ ZoneOffset.UTC));
+ assertEquals("Started 2009-07-07T01:45:59.85Z; will stop after 9s",
+ CoordinatorClient.prettyPrintTaskInfo(
+ new TaskRunning(new NoOpTaskSpec(1146931159850L, 9000),
+ 1246931159850L,
+ JsonNodeFactory.instance.objectNode()), ZoneOffset.UTC));
+ assertEquals("Started 2009-07-07T01:45:59.85Z",
+ CoordinatorClient.prettyPrintTaskInfo(
+ new TaskStopping(new NoOpTaskSpec(1146931159850L, 9000),
+ 1246931159850L,
+ JsonNodeFactory.instance.objectNode()), ZoneOffset.UTC));
+ assertEquals("FINISHED at 2019-01-08T20:59:29.85Z after 10s",
+ CoordinatorClient.prettyPrintTaskInfo(
+ new TaskDone(new NoOpTaskSpec(0, 1000),
+ 1546981159850L,
+ 1546981169850L,
+ "",
+ false,
+ JsonNodeFactory.instance.objectNode()), ZoneOffset.UTC));
+ assertEquals("CANCELLED at 2019-01-08T20:59:29.85Z after 10s",
+ CoordinatorClient.prettyPrintTaskInfo(
+ new TaskDone(new NoOpTaskSpec(0, 1000),
+ 1546981159850L,
+ 1546981169850L,
+ "",
+ true,
+ JsonNodeFactory.instance.objectNode()), ZoneOffset.UTC));
+ assertEquals("FAILED at 2019-01-08T20:59:29.85Z after 10s",
+ CoordinatorClient.prettyPrintTaskInfo(
+ new TaskDone(new NoOpTaskSpec(0, 1000),
+ 1546981159850L,
+ 1546981169850L,
+ "foobar",
+ true,
+ JsonNodeFactory.instance.objectNode()), ZoneOffset.UTC));
+ }
+};