KAFKA-7792: Add simple /agent/uptime and /coordinator/uptime health check endpoints (#6130)

Reviewed-by: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
Stanislav Kozlovski 2019-01-15 21:52:48 +02:00 committed by Colin Patrick McCabe
parent 68a6a7ae29
commit 2e53fa08af
9 changed files with 156 additions and 9 deletions

View File

@ -23,6 +23,7 @@ import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Scheduler;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
@ -30,6 +31,7 @@ import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
import org.apache.kafka.trogdor.rest.DestroyWorkerRequest;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.apache.kafka.trogdor.rest.StopWorkerRequest;
import org.apache.kafka.trogdor.rest.UptimeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -60,6 +62,8 @@ public final class Agent {
*/
private final JsonRestServer restServer;
private final Time time;
/**
* Create a new Agent.
*
@ -70,7 +74,8 @@ public final class Agent {
*/
public Agent(Platform platform, Scheduler scheduler,
JsonRestServer restServer, AgentRestResource resource) {
this.serverStartMs = scheduler.time().milliseconds();
this.time = scheduler.time();
this.serverStartMs = time.milliseconds();
this.workerManager = new WorkerManager(platform, scheduler);
this.restServer = restServer;
resource.setAgent(this);
@ -94,6 +99,10 @@ public final class Agent {
return new AgentStatusResponse(serverStartMs, workerManager.workerStates());
}
public UptimeResponse uptime() {
return new UptimeResponse(serverStartMs, time.milliseconds());
}
public void createWorker(CreateWorkerRequest req) throws Throwable {
workerManager.createWorker(req.workerId(), req.taskId(), req.spec());
}

View File

@ -32,6 +32,7 @@ import org.apache.kafka.trogdor.rest.Empty;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse;
import org.apache.kafka.trogdor.rest.StopWorkerRequest;
import org.apache.kafka.trogdor.rest.UptimeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -117,6 +118,13 @@ public class AgentClient {
return resp.body();
}
public UptimeResponse uptime() throws Exception {
HttpResponse<UptimeResponse> resp =
JsonRestServer.httpRequest(url("/agent/uptime"), "GET",
null, new TypeReference<UptimeResponse>() { }, maxTries);
return resp.body();
}
public void createWorker(CreateWorkerRequest request) throws Exception {
HttpResponse<Empty> resp =
JsonRestServer.<Empty>httpRequest(
@ -168,6 +176,11 @@ public class AgentClient {
.type(Boolean.class)
.dest("status")
.help("Get agent status.");
actions.addArgument("--uptime")
.action(storeTrue())
.type(Boolean.class)
.dest("uptime")
.help("Get agent uptime.");
actions.addArgument("--create-worker")
.action(store())
.type(String.class)
@ -212,6 +225,9 @@ public class AgentClient {
if (res.getBoolean("status")) {
System.out.println("Got agent status: " +
JsonUtil.toPrettyJsonString(client.status()));
} else if (res.getBoolean("uptime")) {
System.out.println("Got agent uptime: " +
JsonUtil.toPrettyJsonString(client.uptime()));
} else if (res.getString("create_worker") != null) {
CreateWorkerRequest req = JsonUtil.JSON_SERDE.
readValue(res.getString("create_worker"), CreateWorkerRequest.class);

View File

@ -21,6 +21,7 @@ import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
import org.apache.kafka.trogdor.rest.DestroyWorkerRequest;
import org.apache.kafka.trogdor.rest.Empty;
import org.apache.kafka.trogdor.rest.StopWorkerRequest;
import org.apache.kafka.trogdor.rest.UptimeResponse;
import javax.servlet.ServletContext;
import javax.ws.rs.Consumes;
@ -65,6 +66,12 @@ public class AgentRestResource {
return agent().status();
}
@GET
@Path("/uptime")
public UptimeResponse uptime() {
return agent().uptime();
}
@POST
@Path("/worker/create")
public Empty createWorker(CreateWorkerRequest req) throws Throwable {

View File

@ -23,6 +23,7 @@ import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Scheduler;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
@ -31,9 +32,10 @@ import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.apache.kafka.trogdor.rest.StopTaskRequest;
import org.apache.kafka.trogdor.rest.TaskRequest;
import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TaskState;
import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TasksResponse;
import org.apache.kafka.trogdor.rest.UptimeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -66,6 +68,8 @@ public final class Coordinator {
*/
private final JsonRestServer restServer;
private final Time time;
/**
* Create a new Coordinator.
*
@ -76,7 +80,8 @@ public final class Coordinator {
*/
public Coordinator(Platform platform, Scheduler scheduler, JsonRestServer restServer,
CoordinatorRestResource resource, long firstWorkerId) {
this.startTimeMs = scheduler.time().milliseconds();
this.time = scheduler.time();
this.startTimeMs = time.milliseconds();
this.taskManager = new TaskManager(platform, scheduler, firstWorkerId);
this.restServer = restServer;
resource.setCoordinator(this);
@ -90,6 +95,10 @@ public final class Coordinator {
return new CoordinatorStatusResponse(startTimeMs);
}
public UptimeResponse uptime() {
return new UptimeResponse(startTimeMs, time.milliseconds());
}
public void createTask(CreateTaskRequest request) throws Throwable {
taskManager.createTask(request.id(), request.spec());
}

View File

@ -36,6 +36,7 @@ import org.apache.kafka.trogdor.rest.TaskRequest;
import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TaskState;
import org.apache.kafka.trogdor.rest.TasksResponse;
import org.apache.kafka.trogdor.rest.UptimeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -120,6 +121,13 @@ public class CoordinatorClient {
return resp.body();
}
public UptimeResponse uptime() throws Exception {
HttpResponse<UptimeResponse> resp =
JsonRestServer.httpRequest(url("/coordinator/uptime"), "GET",
null, new TypeReference<UptimeResponse>() { }, maxTries);
return resp.body();
}
public void createTask(CreateTaskRequest request) throws Exception {
HttpResponse<Empty> resp =
JsonRestServer.httpRequest(log, url("/coordinator/task/create"), "POST",
@ -188,6 +196,11 @@ public class CoordinatorClient {
.type(Boolean.class)
.dest("status")
.help("Get coordinator status.");
actions.addArgument("--uptime")
.action(storeTrue())
.type(Boolean.class)
.dest("uptime")
.help("Get coordinator uptime.");
actions.addArgument("--show-tasks")
.action(storeTrue())
.type(Boolean.class)
@ -243,6 +256,9 @@ public class CoordinatorClient {
if (res.getBoolean("status")) {
System.out.println("Got coordinator status: " +
JsonUtil.toPrettyJsonString(client.status()));
} else if (res.getBoolean("uptime")) {
System.out.println("Got coordinator uptime: " +
JsonUtil.toPrettyJsonString(client.uptime()));
} else if (res.getBoolean("show_tasks")) {
System.out.println("Got coordinator tasks: " +
JsonUtil.toPrettyJsonString(client.tasks(

View File

@ -27,6 +27,7 @@ import org.apache.kafka.trogdor.rest.TaskState;
import org.apache.kafka.trogdor.rest.TaskStateType;
import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TasksResponse;
import org.apache.kafka.trogdor.rest.UptimeResponse;
import javax.servlet.ServletContext;
import javax.ws.rs.Consumes;
@ -77,6 +78,12 @@ public class CoordinatorRestResource {
return coordinator().status();
}
@GET
@Path("/uptime")
public UptimeResponse uptime() {
return coordinator().uptime();
}
@POST
@Path("/task/create")
public Empty createTask(CreateTaskRequest request) throws Throwable {

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* A response from the Trogdor Agent/Coordinator about how long it has been running
*/
public class UptimeResponse extends Message {
private long serverStartMs;
private long nowMs;
@JsonCreator
public UptimeResponse(@JsonProperty("serverStartMs") long serverStartMs,
@JsonProperty("nowMs") long nowMs) {
this.serverStartMs = serverStartMs;
this.nowMs = nowMs;
}
@JsonProperty
public long serverStartMs() {
return serverStartMs;
}
@JsonProperty
public long nowMs() {
return nowMs;
}
}

View File

@ -34,21 +34,21 @@ import org.apache.kafka.trogdor.fault.Kibosh;
import org.apache.kafka.trogdor.fault.Kibosh.KiboshControlFile;
import org.apache.kafka.trogdor.fault.Kibosh.KiboshFilesUnreadableFaultSpec;
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
import org.apache.kafka.trogdor.rest.DestroyWorkerRequest;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.apache.kafka.trogdor.rest.RequestConflictException;
import org.apache.kafka.trogdor.rest.StopWorkerRequest;
import org.apache.kafka.trogdor.rest.TaskDone;
import org.apache.kafka.trogdor.rest.UptimeResponse;
import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerRunning;
import org.apache.kafka.trogdor.task.NoOpTaskSpec;
import org.apache.kafka.trogdor.task.SampleTaskSpec;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.rules.Timeout;
import org.junit.Test;
import org.junit.rules.Timeout;
import java.io.File;
import java.io.IOException;
@ -60,6 +60,7 @@ import java.util.HashMap;
import java.util.TreeMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
public class AgentTest {
@Rule
@ -132,6 +133,7 @@ public class AgentTest {
AgentClient client = new AgentClient.Builder().
maxTries(10).target("localhost", agent.port()).build();
AgentStatusResponse status = client.status();
assertEquals(Collections.emptyMap(), status.workers());
new ExpectedTasks().waitFor(client);
@ -147,6 +149,23 @@ public class AgentTest {
waitFor(client);
}
@Test
public void testAgentGetUptime() throws Exception {
MockTime time = new MockTime(0, 111, 0);
MockScheduler scheduler = new MockScheduler(time);
Agent agent = createAgent(scheduler);
AgentClient client = new AgentClient.Builder().
maxTries(10).target("localhost", agent.port()).build();
UptimeResponse uptime = client.uptime();
assertEquals(agent.uptime(), uptime);
time.setCurrentTimeMs(150);
assertNotEquals(agent.uptime(), uptime);
agent.beginShutdown();
agent.waitForShutdown();
}
@Test
public void testAgentCreateWorkers() throws Exception {
MockTime time = new MockTime(0, 0, 0);

View File

@ -30,7 +30,6 @@ import org.apache.kafka.trogdor.common.CapturingCommandRunner;
import org.apache.kafka.trogdor.common.ExpectedTasks;
import org.apache.kafka.trogdor.common.ExpectedTasks.ExpectedTaskBuilder;
import org.apache.kafka.trogdor.common.MiniTrogdorCluster;
import org.apache.kafka.trogdor.fault.NetworkPartitionFaultSpec;
import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
import org.apache.kafka.trogdor.rest.CreateTaskRequest;
@ -39,21 +38,22 @@ import org.apache.kafka.trogdor.rest.RequestConflictException;
import org.apache.kafka.trogdor.rest.StopTaskRequest;
import org.apache.kafka.trogdor.rest.TaskDone;
import org.apache.kafka.trogdor.rest.TaskPending;
import org.apache.kafka.trogdor.rest.TaskRunning;
import org.apache.kafka.trogdor.rest.TaskRequest;
import org.apache.kafka.trogdor.rest.TaskRunning;
import org.apache.kafka.trogdor.rest.TaskState;
import org.apache.kafka.trogdor.rest.TaskStateType;
import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TaskState;
import org.apache.kafka.trogdor.rest.TasksResponse;
import org.apache.kafka.trogdor.rest.UptimeResponse;
import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerRunning;
import org.apache.kafka.trogdor.task.NoOpTaskSpec;
import org.apache.kafka.trogdor.task.SampleTaskSpec;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.Test;
import javax.ws.rs.NotFoundException;
import java.util.ArrayList;
@ -63,6 +63,7 @@ import java.util.List;
import java.util.Optional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
@ -83,6 +84,22 @@ public class CoordinatorTest {
}
}
@Test
public void testCoordinatorUptime() throws Exception {
MockTime time = new MockTime(0, 200, 0);
Scheduler scheduler = new MockScheduler(time);
try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().
addCoordinator("node01").
scheduler(scheduler).
build()) {
UptimeResponse uptime = cluster.coordinatorClient().uptime();
assertEquals(cluster.coordinator().uptime(), uptime);
time.setCurrentTimeMs(250);
assertNotEquals(cluster.coordinator().uptime(), uptime);
}
}
@Test
public void testCreateTask() throws Exception {
MockTime time = new MockTime(0, 0, 0);