KAFKA-6694: The Trogdor Coordinator should support filtering task responses (#4741)

This commit is contained in:
Colin Patrick McCabe 2018-04-05 05:35:20 -07:00 committed by Rajini Sivaram
parent 9f8c3167eb
commit 63642d6051
7 changed files with 255 additions and 11 deletions

View File

@ -31,6 +31,7 @@ import org.apache.kafka.trogdor.rest.CreateTaskResponse;
import org.apache.kafka.trogdor.rest.JsonRestServer; import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.apache.kafka.trogdor.rest.StopTaskRequest; import org.apache.kafka.trogdor.rest.StopTaskRequest;
import org.apache.kafka.trogdor.rest.StopTaskResponse; import org.apache.kafka.trogdor.rest.StopTaskResponse;
import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TasksResponse; import org.apache.kafka.trogdor.rest.TasksResponse;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -94,8 +95,8 @@ public final class Coordinator {
return new StopTaskResponse(taskManager.stopTask(request.id())); return new StopTaskResponse(taskManager.stopTask(request.id()));
} }
public TasksResponse tasks() throws Exception { public TasksResponse tasks(TasksRequest request) throws Exception {
return taskManager.tasks(); return taskManager.tasks(request);
} }
public void beginShutdown(boolean stopAgents) throws Exception { public void beginShutdown(boolean stopAgents) throws Exception {

View File

@ -33,10 +33,13 @@ import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse; import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse;
import org.apache.kafka.trogdor.rest.StopTaskRequest; import org.apache.kafka.trogdor.rest.StopTaskRequest;
import org.apache.kafka.trogdor.rest.StopTaskResponse; import org.apache.kafka.trogdor.rest.StopTaskResponse;
import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TasksResponse; import org.apache.kafka.trogdor.rest.TasksResponse;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.ws.rs.core.UriBuilder;
import static net.sourceforge.argparse4j.impl.Arguments.store; import static net.sourceforge.argparse4j.impl.Arguments.store;
import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
@ -127,9 +130,15 @@ public class CoordinatorClient {
return resp.body(); return resp.body();
} }
public TasksResponse tasks() throws Exception { public TasksResponse tasks(TasksRequest request) throws Exception {
UriBuilder uriBuilder = UriBuilder.fromPath(url("/coordinator/tasks"));
uriBuilder.queryParam("taskId", request.taskIds().toArray(new String[0]));
uriBuilder.queryParam("firstStartMs", request.firstStartMs());
uriBuilder.queryParam("lastStartMs", request.lastStartMs());
uriBuilder.queryParam("firstEndMs", request.firstEndMs());
uriBuilder.queryParam("lastEndMs", request.lastEndMs());
HttpResponse<TasksResponse> resp = HttpResponse<TasksResponse> resp =
JsonRestServer.<TasksResponse>httpRequest(log, url("/coordinator/tasks"), "GET", JsonRestServer.<TasksResponse>httpRequest(log, uriBuilder.build().toString(), "GET",
null, new TypeReference<TasksResponse>() { }, maxTries); null, new TypeReference<TasksResponse>() { }, maxTries);
return resp.body(); return resp.body();
} }
@ -204,7 +213,8 @@ public class CoordinatorClient {
JsonUtil.toPrettyJsonString(client.status())); JsonUtil.toPrettyJsonString(client.status()));
} else if (res.getBoolean("show_tasks")) { } else if (res.getBoolean("show_tasks")) {
System.out.println("Got coordinator tasks: " + System.out.println("Got coordinator tasks: " +
JsonUtil.toPrettyJsonString(client.tasks())); JsonUtil.toPrettyJsonString(client.tasks(
new TasksRequest(null, 0, 0, 0, 0))));
} else if (res.getString("create_task") != null) { } else if (res.getString("create_task") != null) {
client.createTask(JsonUtil.JSON_SERDE.readValue(res.getString("create_task"), client.createTask(JsonUtil.JSON_SERDE.readValue(res.getString("create_task"),
CreateTaskRequest.class)); CreateTaskRequest.class));

View File

@ -23,16 +23,20 @@ import org.apache.kafka.trogdor.rest.CreateTaskResponse;
import org.apache.kafka.trogdor.rest.Empty; import org.apache.kafka.trogdor.rest.Empty;
import org.apache.kafka.trogdor.rest.StopTaskRequest; import org.apache.kafka.trogdor.rest.StopTaskRequest;
import org.apache.kafka.trogdor.rest.StopTaskResponse; import org.apache.kafka.trogdor.rest.StopTaskResponse;
import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TasksResponse; import org.apache.kafka.trogdor.rest.TasksResponse;
import javax.servlet.ServletContext; import javax.servlet.ServletContext;
import javax.ws.rs.Consumes; import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET; import javax.ws.rs.GET;
import javax.ws.rs.POST; import javax.ws.rs.POST;
import javax.ws.rs.PUT; import javax.ws.rs.PUT;
import javax.ws.rs.Path; import javax.ws.rs.Path;
import javax.ws.rs.Produces; import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -69,8 +73,12 @@ public class CoordinatorRestResource {
@GET @GET
@Path("/tasks") @Path("/tasks")
public TasksResponse tasks() throws Throwable { public TasksResponse tasks(@QueryParam("taskId") List<String> taskId,
return coordinator().tasks(); @DefaultValue("0") @QueryParam("firstStartMs") int firstStartMs,
@DefaultValue("0") @QueryParam("lastStartMs") int lastStartMs,
@DefaultValue("0") @QueryParam("firstEndMs") int firstEndMs,
@DefaultValue("0") @QueryParam("lastEndMs") int lastEndMs) throws Throwable {
return coordinator().tasks(new TasksRequest(taskId, firstStartMs, lastStartMs, firstEndMs, lastEndMs));
} }
@PUT @PUT

View File

@ -29,6 +29,7 @@ import org.apache.kafka.trogdor.rest.TaskPending;
import org.apache.kafka.trogdor.rest.TaskRunning; import org.apache.kafka.trogdor.rest.TaskRunning;
import org.apache.kafka.trogdor.rest.TaskState; import org.apache.kafka.trogdor.rest.TaskState;
import org.apache.kafka.trogdor.rest.TaskStopping; import org.apache.kafka.trogdor.rest.TaskStopping;
import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TasksResponse; import org.apache.kafka.trogdor.rest.TasksResponse;
import org.apache.kafka.trogdor.task.TaskController; import org.apache.kafka.trogdor.task.TaskController;
import org.apache.kafka.trogdor.task.TaskSpec; import org.apache.kafka.trogdor.task.TaskSpec;
@ -483,16 +484,24 @@ public final class TaskManager {
/** /**
* Get information about the tasks being managed. * Get information about the tasks being managed.
*/ */
public TasksResponse tasks() throws ExecutionException, InterruptedException { public TasksResponse tasks(TasksRequest request) throws ExecutionException, InterruptedException {
return executor.submit(new GetTasksResponse()).get(); return executor.submit(new GetTasksResponse(request)).get();
} }
class GetTasksResponse implements Callable<TasksResponse> { class GetTasksResponse implements Callable<TasksResponse> {
private final TasksRequest request;
GetTasksResponse(TasksRequest request) {
this.request = request;
}
@Override @Override
public TasksResponse call() throws Exception { public TasksResponse call() throws Exception {
TreeMap<String, TaskState> states = new TreeMap<>(); TreeMap<String, TaskState> states = new TreeMap<>();
for (ManagedTask task : tasks.values()) { for (ManagedTask task : tasks.values()) {
states.put(task.id, task.taskState()); if (request.matches(task.id, task.startedMs, task.doneMs)) {
states.put(task.id, task.taskState());
}
} }
return new TasksResponse(states); return new TasksResponse(states);
} }

View File

@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
/**
* The request to /coordinator/tasks
*/
public class TasksRequest extends Message {
/**
* The task IDs to list.
* An empty set of task IDs indicates that we should list all task IDs.
*/
private final Set<String> taskIds;
/**
* If this is non-zero, only tasks with a startMs at or after this time will be listed.
*/
private final long firstStartMs;
/**
* If this is non-zero, only tasks with a startMs at or before this time will be listed.
*/
private final long lastStartMs;
/**
* If this is non-zero, only tasks with an endMs at or after this time will be listed.
*/
private final long firstEndMs;
/**
* If this is non-zero, only tasks with an endMs at or before this time will be listed.
*/
private final long lastEndMs;
@JsonCreator
public TasksRequest(@JsonProperty("taskIds") Collection<String> taskIds,
@JsonProperty("firstStartMs") long firstStartMs,
@JsonProperty("lastStartMs") long lastStartMs,
@JsonProperty("firstEndMs") long firstEndMs,
@JsonProperty("lastEndMs") long lastEndMs) {
this.taskIds = Collections.unmodifiableSet((taskIds == null) ?
new HashSet<String>() : new HashSet<>(taskIds));
this.firstStartMs = Math.max(0, firstStartMs);
this.lastStartMs = Math.max(0, lastStartMs);
this.firstEndMs = Math.max(0, firstEndMs);
this.lastEndMs = Math.max(0, lastEndMs);
}
@JsonProperty
public Collection<String> taskIds() {
return taskIds;
}
@JsonProperty
public long firstStartMs() {
return firstStartMs;
}
@JsonProperty
public long lastStartMs() {
return lastStartMs;
}
@JsonProperty
public long firstEndMs() {
return firstEndMs;
}
@JsonProperty
public long lastEndMs() {
return lastEndMs;
}
/**
* Determine if this TaskRequest should return a particular task.
*
* @param taskId The task ID.
* @param startMs The task start time, or -1 if the task hasn't started.
* @param endMs The task end time, or -1 if the task hasn't ended.
* @return True if information about the task should be returned.
*/
public boolean matches(String taskId, long startMs, long endMs) {
if ((!taskIds.isEmpty()) && (!taskIds.contains(taskId))) {
return false;
}
if ((firstStartMs > 0) && (startMs < firstStartMs)) {
return false;
}
if ((lastStartMs > 0) && ((startMs < 0) || (startMs > lastStartMs))) {
return false;
}
if ((firstEndMs > 0) && (endMs < firstEndMs)) {
return false;
}
if ((lastEndMs > 0) && ((endMs < 0) || (endMs > lastEndMs))) {
return false;
}
return true;
}
}

View File

@ -25,6 +25,7 @@ import org.apache.kafka.trogdor.agent.AgentClient;
import org.apache.kafka.trogdor.coordinator.CoordinatorClient; import org.apache.kafka.trogdor.coordinator.CoordinatorClient;
import org.apache.kafka.trogdor.rest.AgentStatusResponse; import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.TaskState; import org.apache.kafka.trogdor.rest.TaskState;
import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TasksResponse; import org.apache.kafka.trogdor.rest.TasksResponse;
import org.apache.kafka.trogdor.rest.WorkerState; import org.apache.kafka.trogdor.rest.WorkerState;
import org.apache.kafka.trogdor.task.TaskSpec; import org.apache.kafka.trogdor.task.TaskSpec;
@ -144,7 +145,7 @@ public class ExpectedTasks {
public boolean conditionMet() { public boolean conditionMet() {
TasksResponse tasks = null; TasksResponse tasks = null;
try { try {
tasks = client.tasks(); tasks = client.tasks(new TasksRequest(null, 0, 0, 0, 0));
} catch (Exception e) { } catch (Exception e) {
log.info("Unable to get coordinator tasks", e); log.info("Unable to get coordinator tasks", e);
throw new RuntimeException(e); throw new RuntimeException(e);

View File

@ -36,6 +36,8 @@ import org.apache.kafka.trogdor.rest.StopTaskRequest;
import org.apache.kafka.trogdor.rest.TaskDone; import org.apache.kafka.trogdor.rest.TaskDone;
import org.apache.kafka.trogdor.rest.TaskPending; import org.apache.kafka.trogdor.rest.TaskPending;
import org.apache.kafka.trogdor.rest.TaskRunning; import org.apache.kafka.trogdor.rest.TaskRunning;
import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TasksResponse;
import org.apache.kafka.trogdor.rest.WorkerDone; import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerRunning; import org.apache.kafka.trogdor.rest.WorkerRunning;
import org.apache.kafka.trogdor.task.NoOpTaskSpec; import org.apache.kafka.trogdor.task.NoOpTaskSpec;
@ -50,6 +52,8 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class CoordinatorTest { public class CoordinatorTest {
private static final Logger log = LoggerFactory.getLogger(CoordinatorTest.class); private static final Logger log = LoggerFactory.getLogger(CoordinatorTest.class);
@ -302,4 +306,92 @@ public class CoordinatorTest {
"-m comment --comment node02"). "-m comment --comment node02").
waitFor("node03", runner); waitFor("node03", runner);
} }
@Test
public void testTasksRequestMatches() throws Exception {
TasksRequest req1 = new TasksRequest(null, 0, 0, 0, 0);
assertTrue(req1.matches("foo1", -1, -1));
assertTrue(req1.matches("bar1", 100, 200));
assertTrue(req1.matches("baz1", 100, -1));
TasksRequest req2 = new TasksRequest(null, 100, 0, 0, 0);
assertFalse(req2.matches("foo1", -1, -1));
assertTrue(req2.matches("bar1", 100, 200));
assertFalse(req2.matches("bar1", 99, 200));
assertFalse(req2.matches("baz1", 99, -1));
TasksRequest req3 = new TasksRequest(null, 200, 900, 200, 900);
assertFalse(req3.matches("foo1", -1, -1));
assertFalse(req3.matches("bar1", 100, 200));
assertFalse(req3.matches("bar1", 200, 1000));
assertTrue(req3.matches("bar1", 200, 700));
assertFalse(req3.matches("baz1", 101, -1));
List<String> taskIds = new ArrayList<>();
taskIds.add("foo1");
taskIds.add("bar1");
taskIds.add("baz1");
TasksRequest req4 = new TasksRequest(taskIds, 1000, -1, -1, -1);
assertFalse(req4.matches("foo1", -1, -1));
assertTrue(req4.matches("foo1", 1000, -1));
assertFalse(req4.matches("foo1", 900, -1));
assertFalse(req4.matches("baz2", 2000, -1));
assertFalse(req4.matches("baz2", -1, -1));
}
@Test
public void testTasksRequest() throws Exception {
MockTime time = new MockTime(0, 0, 0);
Scheduler scheduler = new MockScheduler(time);
try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().
addCoordinator("node01").
addAgent("node02").
scheduler(scheduler).
build()) {
CoordinatorClient coordinatorClient = cluster.coordinatorClient();
new ExpectedTasks().waitFor(coordinatorClient);
NoOpTaskSpec fooSpec = new NoOpTaskSpec(1, 10);
NoOpTaskSpec barSpec = new NoOpTaskSpec(3, 1);
coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec));
coordinatorClient.createTask(new CreateTaskRequest("bar", barSpec));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
taskState(new TaskPending(fooSpec)).
build()).
addTask(new ExpectedTaskBuilder("bar").
taskState(new TaskPending(barSpec)).
build()).
waitFor(coordinatorClient);
assertEquals(0, coordinatorClient.tasks(
new TasksRequest(null, 10, 0, 10, 0)).tasks().size());
TasksResponse resp1 = coordinatorClient.tasks(
new TasksRequest(Arrays.asList(new String[] {"foo", "baz" }), 0, 0, 0, 0));
assertTrue(resp1.tasks().containsKey("foo"));
assertFalse(resp1.tasks().containsKey("bar"));
assertEquals(1, resp1.tasks().size());
time.sleep(2);
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
taskState(new TaskRunning(fooSpec, 2)).
workerState(new WorkerRunning(fooSpec, 2, "")).
build()).
addTask(new ExpectedTaskBuilder("bar").
taskState(new TaskPending(barSpec)).
build()).
waitFor(coordinatorClient).
waitFor(cluster.agentClient("node02"));
TasksResponse resp2 = coordinatorClient.tasks(
new TasksRequest(null, 1, 0, 0, 0));
assertTrue(resp2.tasks().containsKey("foo"));
assertFalse(resp2.tasks().containsKey("bar"));
assertEquals(1, resp2.tasks().size());
assertEquals(0, coordinatorClient.tasks(
new TasksRequest(null, 3, 0, 0, 0)).tasks().size());
}
}
}; };