KAFKA-7564: Expose single task details in Trogdor (#5852)

This commit adds a new "/coordinator/tasks/{taskId}" endpoint which fetches details for a single task.
This commit is contained in:
Stanislav Kozlovski 2018-11-09 18:31:04 +00:00 committed by Colin Patrick McCabe
parent 895c83f88d
commit ecb71cf471
7 changed files with 152 additions and 1 deletions

View File

@ -30,7 +30,9 @@ import org.apache.kafka.trogdor.rest.CreateTaskRequest;
import org.apache.kafka.trogdor.rest.DestroyTaskRequest; import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
import org.apache.kafka.trogdor.rest.JsonRestServer; import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.apache.kafka.trogdor.rest.StopTaskRequest; import org.apache.kafka.trogdor.rest.StopTaskRequest;
import org.apache.kafka.trogdor.rest.TaskRequest;
import org.apache.kafka.trogdor.rest.TasksRequest; import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TaskState;
import org.apache.kafka.trogdor.rest.TasksResponse; import org.apache.kafka.trogdor.rest.TasksResponse;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -104,6 +106,10 @@ public final class Coordinator {
return taskManager.tasks(request); return taskManager.tasks(request);
} }
public TaskState task(TaskRequest request) throws Exception {
return taskManager.task(request);
}
public void beginShutdown(boolean stopAgents) throws Exception { public void beginShutdown(boolean stopAgents) throws Exception {
restServer.beginShutdown(); restServer.beginShutdown();
taskManager.beginShutdown(stopAgents); taskManager.beginShutdown(stopAgents);

View File

@ -32,11 +32,14 @@ import org.apache.kafka.trogdor.rest.Empty;
import org.apache.kafka.trogdor.rest.JsonRestServer; import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse; import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse;
import org.apache.kafka.trogdor.rest.StopTaskRequest; import org.apache.kafka.trogdor.rest.StopTaskRequest;
import org.apache.kafka.trogdor.rest.TaskRequest;
import org.apache.kafka.trogdor.rest.TasksRequest; import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TaskState;
import org.apache.kafka.trogdor.rest.TasksResponse; import org.apache.kafka.trogdor.rest.TasksResponse;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.core.UriBuilder; import javax.ws.rs.core.UriBuilder;
import static net.sourceforge.argparse4j.impl.Arguments.store; import static net.sourceforge.argparse4j.impl.Arguments.store;
@ -151,6 +154,13 @@ public class CoordinatorClient {
return resp.body(); return resp.body();
} }
public TaskState task(TaskRequest request) throws Exception {
String uri = UriBuilder.fromPath(url("/coordinator/tasks/{taskId}")).build(request.taskId()).toString();
HttpResponse<TaskState> resp = JsonRestServer.httpRequest(log, uri, "GET",
null, new TypeReference<TaskState>() { }, maxTries);
return resp.body();
}
public void shutdown() throws Exception { public void shutdown() throws Exception {
HttpResponse<Empty> resp = HttpResponse<Empty> resp =
JsonRestServer.httpRequest(log, url("/coordinator/shutdown"), "PUT", JsonRestServer.httpRequest(log, url("/coordinator/shutdown"), "PUT",
@ -181,6 +191,12 @@ public class CoordinatorClient {
.type(Boolean.class) .type(Boolean.class)
.dest("show_tasks") .dest("show_tasks")
.help("Show coordinator tasks."); .help("Show coordinator tasks.");
actions.addArgument("--show-task")
.action(store())
.type(String.class)
.dest("show_task")
.metavar("TASK_ID")
.help("Show a specific coordinator task.");
actions.addArgument("--create-task") actions.addArgument("--create-task")
.action(store()) .action(store())
.type(String.class) .type(String.class)
@ -229,6 +245,15 @@ public class CoordinatorClient {
System.out.println("Got coordinator tasks: " + System.out.println("Got coordinator tasks: " +
JsonUtil.toPrettyJsonString(client.tasks( JsonUtil.toPrettyJsonString(client.tasks(
new TasksRequest(null, 0, 0, 0, 0)))); new TasksRequest(null, 0, 0, 0, 0))));
} else if (res.getString("show_task") != null) {
String taskId = res.getString("show_task");
TaskRequest req = new TaskRequest(res.getString("show_task"));
try {
String taskOutput = String.format("Got coordinator task \"%s\": %s", taskId, JsonUtil.toPrettyJsonString(client.task(req)));
System.out.println(taskOutput);
} catch (NotFoundException e) {
System.out.println(e.getMessage());
}
} else if (res.getString("create_task") != null) { } else if (res.getString("create_task") != null) {
CreateTaskRequest req = JsonUtil.JSON_SERDE. CreateTaskRequest req = JsonUtil.JSON_SERDE.
readValue(res.getString("create_task"), CreateTaskRequest.class); readValue(res.getString("create_task"), CreateTaskRequest.class);

View File

@ -22,7 +22,9 @@ import org.apache.kafka.trogdor.rest.CreateTaskRequest;
import org.apache.kafka.trogdor.rest.DestroyTaskRequest; import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
import org.apache.kafka.trogdor.rest.Empty; import org.apache.kafka.trogdor.rest.Empty;
import org.apache.kafka.trogdor.rest.StopTaskRequest; import org.apache.kafka.trogdor.rest.StopTaskRequest;
import org.apache.kafka.trogdor.rest.TaskRequest;
import org.apache.kafka.trogdor.rest.TasksRequest; import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TaskState;
import org.apache.kafka.trogdor.rest.TasksResponse; import org.apache.kafka.trogdor.rest.TasksResponse;
import javax.servlet.ServletContext; import javax.servlet.ServletContext;
@ -35,6 +37,8 @@ import javax.ws.rs.PUT;
import javax.ws.rs.Path; import javax.ws.rs.Path;
import javax.ws.rs.Produces; import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam; import javax.ws.rs.QueryParam;
import javax.ws.rs.PathParam;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -101,6 +105,16 @@ public class CoordinatorRestResource {
return coordinator().tasks(new TasksRequest(taskId, firstStartMs, lastStartMs, firstEndMs, lastEndMs)); return coordinator().tasks(new TasksRequest(taskId, firstStartMs, lastStartMs, firstEndMs, lastEndMs));
} }
@GET
@Path("/tasks/{taskId}")
public TaskState tasks(@PathParam("taskId") String taskId) throws Throwable {
TaskState response = coordinator().task(new TaskRequest(taskId));
if (response == null)
throw new NotFoundException(String.format("No task with ID \"%s\" exists.", taskId));
return response;
}
@PUT @PUT
@Path("/shutdown") @Path("/shutdown")
public Empty beginShutdown(CoordinatorShutdownRequest request) throws Throwable { public Empty beginShutdown(CoordinatorShutdownRequest request) throws Throwable {

View File

@ -36,6 +36,7 @@ import org.apache.kafka.trogdor.rest.TaskRunning;
import org.apache.kafka.trogdor.rest.TaskState; import org.apache.kafka.trogdor.rest.TaskState;
import org.apache.kafka.trogdor.rest.TaskStopping; import org.apache.kafka.trogdor.rest.TaskStopping;
import org.apache.kafka.trogdor.rest.TasksRequest; import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TaskRequest;
import org.apache.kafka.trogdor.rest.TasksResponse; import org.apache.kafka.trogdor.rest.TasksResponse;
import org.apache.kafka.trogdor.rest.WorkerDone; import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerReceiving; import org.apache.kafka.trogdor.rest.WorkerReceiving;
@ -628,6 +629,36 @@ public final class TaskManager {
} }
} }
/**
* Get information about a single task being managed.
*
* Returns #{@code null} if the task does not exist
*/
public TaskState task(TaskRequest request) throws ExecutionException, InterruptedException {
return executor.submit(new GetTaskState(request)).get();
}
/**
* Gets information about the tasks being managed. Processed by the state change thread.
*/
class GetTaskState implements Callable<TaskState> {
private final TaskRequest request;
GetTaskState(TaskRequest request) {
this.request = request;
}
@Override
public TaskState call() throws Exception {
ManagedTask task = tasks.get(request.taskId());
if (task == null) {
return null;
}
return task.taskState();
}
}
/** /**
* Initiate shutdown, but do not wait for it to complete. * Initiate shutdown, but do not wait for it to complete.
*/ */

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* The request to /coordinator/tasks/{taskId}
*/
public class TaskRequest {
private final String taskId;
@JsonCreator
public TaskRequest(@JsonProperty("taskId") String taskId) {
this.taskId = taskId == null ? "" : taskId;
}
@JsonProperty
public String taskId() {
return taskId;
}
}

View File

@ -71,7 +71,7 @@ public class ExpectedTasks {
} }
} }
static class ExpectedTask { public static class ExpectedTask {
private final String id; private final String id;
private final TaskSpec taskSpec; private final TaskSpec taskSpec;
private final TaskState taskState; private final TaskState taskState;

View File

@ -41,7 +41,9 @@ import org.apache.kafka.trogdor.rest.StopTaskRequest;
import org.apache.kafka.trogdor.rest.TaskDone; import org.apache.kafka.trogdor.rest.TaskDone;
import org.apache.kafka.trogdor.rest.TaskPending; import org.apache.kafka.trogdor.rest.TaskPending;
import org.apache.kafka.trogdor.rest.TaskRunning; import org.apache.kafka.trogdor.rest.TaskRunning;
import org.apache.kafka.trogdor.rest.TaskRequest;
import org.apache.kafka.trogdor.rest.TasksRequest; import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TaskState;
import org.apache.kafka.trogdor.rest.TasksResponse; import org.apache.kafka.trogdor.rest.TasksResponse;
import org.apache.kafka.trogdor.rest.WorkerDone; import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerRunning; import org.apache.kafka.trogdor.rest.WorkerRunning;
@ -53,6 +55,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.junit.Test; import org.junit.Test;
import javax.ws.rs.NotFoundException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
@ -490,6 +493,40 @@ public class CoordinatorTest {
} }
} }
@Test
public void testTaskRequest() throws Exception {
MockTime time = new MockTime(0, 0, 0);
Scheduler scheduler = new MockScheduler(time);
try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().
addCoordinator("node01").
addAgent("node02").
scheduler(scheduler).
build()) {
CoordinatorClient coordinatorClient = cluster.coordinatorClient();
NoOpTaskSpec fooSpec = new NoOpTaskSpec(1, 10);
coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec));
TaskState expectedState = new ExpectedTaskBuilder("foo").taskState(new TaskPending(fooSpec)).build().taskState();
TaskState resp = coordinatorClient.task(new TaskRequest("foo"));
assertEquals(expectedState, resp);
time.sleep(2);
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
taskState(new TaskRunning(fooSpec, 2, new TextNode("active"))).
workerState(new WorkerRunning("foo", fooSpec, 2, new TextNode("active"))).
build()).
waitFor(coordinatorClient).
waitFor(cluster.agentClient("node02"));
try {
coordinatorClient.task(new TaskRequest("non-existent-foo"));
fail("Non existent task request should have raised a NotFoundException");
} catch (NotFoundException ignored) { }
}
}
@Test @Test
public void testWorkersExitingAtDifferentTimes() throws Exception { public void testWorkersExitingAtDifferentTimes() throws Exception {
MockTime time = new MockTime(0, 0, 0); MockTime time = new MockTime(0, 0, 0);