KAFKA-5776; Add the Trogdor fault injection daemon

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Rajini Sivaram <rajinisivaram@googlemail.com>

Closes #3699 from cmccabe/trogdor-review
This commit is contained in:
Colin P. Mccabe 2017-08-25 12:29:40 -07:00 committed by Rajini Sivaram
parent 607c3c21f6
commit 0772fde562
47 changed files with 4648 additions and 7 deletions

50
bin/trogdor.sh Executable file
View File

@ -0,0 +1,50 @@
#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
usage() {
cat <<EOF
The Trogdor fault injector.
Usage:
$0 [action] [options]
Actions:
agent: Run the trogdor agent.
coordinator: Run the trogdor coordinator.
client: Run the client which communicates with the trogdor coordinator.
agent-client: Run the client which communicates with the trogdor agent.
help: This help message.
EOF
}
if [[ $# -lt 1 ]]; then
usage
exit 0
fi
action="${1}"
shift
CLASS=""
case ${action} in
agent) CLASS="org.apache.kafka.trogdor.agent.Agent";;
coordinator) CLASS="org.apache.kafka.trogdor.coordinator.Coordinator";;
client) CLASS="org.apache.kafka.trogdor.coordinator.CoordinatorClient";;
agent-client) CLASS="org.apache.kafka.trogdor.agent.AgentClient";;
help) usage; exit 0;;
*) echo "Unknown action '${action}'. Type '$0 help' for help."; exit 1;;
esac
export INCLUDE_TEST_JARS=1
exec $(dirname $0)/kafka-run-class.sh "${CLASS}" "$@"

View File

@ -837,8 +837,15 @@ project(':tools') {
compile libs.jacksonDatabind
compile libs.slf4jlog4j
compile libs.jacksonJaxrsJsonProvider
compile libs.jerseyContainerServlet
compile libs.jettyServer
compile libs.jettyServlet
compile libs.jettyServlets
testCompile project(':clients')
testCompile libs.junit
testCompile project(':clients').sourceSets.test.output
}
javadoc {

View File

@ -177,6 +177,18 @@
<allow pkg="org.apache.log4j" />
</subpackage>
<subpackage name="trogdor">
<allow pkg="com.fasterxml.jackson" />
<allow pkg="javax.servlet" />
<allow pkg="javax.ws.rs" />
<allow pkg="net.sourceforge.argparse4j" />
<allow pkg="org.apache.kafka.test"/>
<allow pkg="org.apache.kafka.trogdor" />
<allow pkg="org.apache.log4j" />
<allow pkg="org.eclipse.jetty" />
<allow pkg="org.glassfish.jersey" />
</subpackage>
<subpackage name="streams">
<allow pkg="org.apache.kafka.common"/>
<allow pkg="org.apache.kafka.test"/>

View File

@ -56,7 +56,7 @@
files="AbstractRequest.java"/>
<suppress checks="NPathComplexity"
files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|SslTransportLayer|Sender|Serdes).java"/>
files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|SslTransportLayer|Sender|Serdes|Agent).java"/>
<!-- clients tests -->
<suppress checks="ClassDataAbstractionCoupling"
@ -186,14 +186,14 @@
files="VerifiableConsumer.java"/>
<suppress checks="CyclomaticComplexity"
files="StreamsResetter.java"/>
<suppress checks="CyclomaticComplexity"
files="ProducerPerformance.java"/>
files="(StreamsResetter|ProducerPerformance|Agent).java"/>
<suppress checks="NPathComplexity"
files="StreamsResetter.java"/>
<suppress checks="NPathComplexity"
files="ProducerPerformance.java"/>
files="(ProducerPerformance|StreamsResetter|Agent).java"/>
<suppress checks="ImportControl"
files="SignalLogger.java"/>
<suppress checks="IllegalImport"
files="SignalLogger.java"/>
<!-- Log4J-Appender -->
<suppress checks="CyclomaticComplexity"

View File

@ -31,6 +31,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
@ -306,6 +308,42 @@ public class Utils {
return Utils.newInstance(Class.forName(klass, true, Utils.getContextOrKafkaClassLoader()).asSubclass(base));
}
/**
* Construct a new object using a class name and parameters.
*
* @param className The full name of the class to construct.
* @param params A sequence of (type, object) elements.
* @param <T> The type of object to construct.
* @return The new object.
* @throws ClassNotFoundException If there was a problem constructing the object.
*/
public static <T> T newParameterizedInstance(String className, Object... params)
throws ClassNotFoundException {
Class<?>[] argTypes = new Class<?>[params.length / 2];
Object[] args = new Object[params.length / 2];
try {
Class c = Class.forName(className, true, Utils.getContextOrKafkaClassLoader());
for (int i = 0; i < params.length / 2; i++) {
argTypes[i] = (Class<?>) params[2 * i];
args[i] = params[(2 * i) + 1];
}
Constructor<T> constructor = c.getConstructor(argTypes);
return constructor.newInstance(args);
} catch (NoSuchMethodException e) {
throw new ClassNotFoundException(String.format("Failed to find " +
"constructor with %s for %s", Utils.join(argTypes, ", "), className), e);
} catch (InstantiationException e) {
throw new ClassNotFoundException(String.format("Failed to instantiate " +
"%s", className), e);
} catch (IllegalAccessException e) {
throw new ClassNotFoundException(String.format("Unable to access " +
"constructor of %s", className), e);
} catch (InvocationTargetException e) {
throw new ClassNotFoundException(String.format("Unable to invoke " +
"constructor of %s", className), e);
}
}
/**
* Generates 32 bit murmur2 hash from byte array
* @param data byte array to hash

View File

@ -0,0 +1,339 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.agent;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.fault.Fault;
import org.apache.kafka.trogdor.fault.FaultSet;
import org.apache.kafka.trogdor.fault.FaultSpec;
import org.apache.kafka.trogdor.fault.FaultState;
import org.apache.kafka.trogdor.rest.AgentFaultsResponse;
import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import static net.sourceforge.argparse4j.impl.Arguments.store;
/**
* The Trogdor agent.
*
* The agent process implements faults directly.
*/
public final class Agent {
private static final Logger log = LoggerFactory.getLogger(Agent.class);
/**
* The clock to use for this agent.
*/
private final Time time;
/**
* The time at which this server was started.
*/
private final long startTimeMs;
/**
* The platform.
*/
private final Platform platform;
/**
* The lock protecting shutdown and faultSet.
*/
private final ReentrantLock lock = new ReentrantLock();
/**
* The condition variable which the agent thread waits on.
*/
private final Condition cond = lock.newCondition();
/**
* The agent runnable.
*/
private final AgentRunnable runnable;
/**
* The REST server.
*/
private final JsonRestServer restServer;
/**
* The agent thread.
*/
private final KafkaThread thread;
/**
* The set of pending faults.
*/
private final FaultSet pendingFaults = new FaultSet();
/**
* The set of faults which are running.
*/
private final FaultSet runningFaults = new FaultSet();
/**
* The set of faults which are done.
*/
private final FaultSet doneFaults = new FaultSet();
/**
* True if the server is shutting down.
*/
private boolean shutdown = false;
class AgentRunnable implements Runnable {
@Override
public void run() {
log.info("Starting main service thread.");
try {
while (true) {
List<Fault> toStart = new ArrayList<>();
List<Fault> started = new ArrayList<>();
List<Fault> toEnd = new ArrayList<>();
List<Fault> ended = new ArrayList<>();
long now = time.milliseconds();
long nextWakeMs = now + (60L * 60L * 1000L);
lock.lock();
try {
Iterator<Fault> pending = pendingFaults.iterateByStart();
while (pending.hasNext()) {
Fault fault = pending.next();
toStart.add(fault);
long endMs = fault.spec().startMs() + fault.spec().durationMs();
nextWakeMs = Math.min(nextWakeMs, endMs);
pending.remove();
}
Iterator<Fault> running = runningFaults.iterateByEnd();
while (running.hasNext()) {
Fault fault = running.next();
long endMs = fault.spec().startMs() + fault.spec().durationMs();
if (now < endMs) {
nextWakeMs = Math.min(nextWakeMs, endMs);
break;
}
toEnd.add(fault);
running.remove();
}
} finally {
lock.unlock();
}
for (Fault fault: toStart) {
try {
log.debug("Activating fault " + fault);
fault.activate(platform);
started.add(fault);
} catch (Throwable e) {
log.error("Error activating fault " + fault.id(), e);
ended.add(fault);
}
}
for (Fault fault: toEnd) {
try {
log.debug("Deactivating fault " + fault);
fault.deactivate(platform);
} catch (Throwable e) {
log.error("Error deactivating fault " + fault.id(), e);
} finally {
ended.add(fault);
}
}
lock.lock();
try {
for (Fault fault : started) {
runningFaults.add(fault);
}
for (Fault fault : ended) {
doneFaults.add(fault);
}
if (shutdown) {
return;
}
if (nextWakeMs > now) {
log.trace("Sleeping for {} ms", nextWakeMs - now);
if (cond.await(nextWakeMs - now, TimeUnit.MILLISECONDS)) {
log.trace("AgentRunnable woke up early");
}
}
if (shutdown) {
return;
}
} finally {
lock.unlock();
}
}
} catch (Throwable t) {
log.error("Unhandled exception in AgentRunnable", t);
} finally {
log.info("AgentRunnable shutting down.");
restServer.stop();
}
}
}
/**
* Create a new Agent.
*
* @param platform The platform object to use.
* @param time The timekeeper to use for this Agent.
* @param restServer The REST server to use.
* @param resource The AgentRestResoure to use.
*/
public Agent(Platform platform, Time time, JsonRestServer restServer,
AgentRestResource resource) {
this.platform = platform;
this.time = time;
this.restServer = restServer;
this.startTimeMs = time.milliseconds();
this.runnable = new AgentRunnable();
this.thread = new KafkaThread("TrogdorAgentThread", runnable, false);
this.thread.start();
resource.setAgent(this);
}
public int port() {
return this.restServer.port();
}
public void beginShutdown() {
lock.lock();
try {
if (shutdown)
return;
this.shutdown = true;
cond.signalAll();
} finally {
lock.unlock();
}
}
public void waitForShutdown() {
try {
this.thread.join();
} catch (InterruptedException e) {
log.error("Interrupted while waiting for thread shutdown", e);
Thread.currentThread().interrupt();
}
}
public long startTimeMs() {
return startTimeMs;
}
public AgentFaultsResponse faults() {
Map<String, AgentFaultsResponse.FaultData> faultData = new TreeMap<>();
lock.lock();
try {
updateFaultsResponse(faultData, pendingFaults, FaultState.PENDING);
updateFaultsResponse(faultData, runningFaults, FaultState.RUNNING);
updateFaultsResponse(faultData, doneFaults, FaultState.DONE);
} finally {
lock.unlock();
}
return new AgentFaultsResponse(faultData);
}
private void updateFaultsResponse(Map<String, AgentFaultsResponse.FaultData> faultData,
FaultSet faultSet, FaultState state) {
for (Iterator<Fault> iter = faultSet.iterateByStart();
iter.hasNext(); ) {
Fault fault = iter.next();
AgentFaultsResponse.FaultData data =
new AgentFaultsResponse.FaultData(fault.spec(), state);
faultData.put(fault.id(), data);
}
}
public void createFault(CreateAgentFaultRequest request) throws ClassNotFoundException {
lock.lock();
try {
Fault fault = FaultSpec.Util.createFault(request.id(), request.spec());
pendingFaults.add(fault);
cond.signalAll();
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws Exception {
ArgumentParser parser = ArgumentParsers
.newArgumentParser("trogdor-agent")
.defaultHelp(true)
.description("The Trogdor fault injection agent");
parser.addArgument("--agent.config")
.action(store())
.required(true)
.type(String.class)
.dest("config")
.metavar("CONFIG")
.help("The configuration file to use.");
parser.addArgument("--node-name")
.action(store())
.required(true)
.type(String.class)
.dest("node_name")
.metavar("NODE_NAME")
.help("The name of this node.");
Namespace res = null;
try {
res = parser.parseArgs(args);
} catch (ArgumentParserException e) {
if (args.length == 0) {
parser.printHelp();
Exit.exit(0);
} else {
parser.handleError(e);
Exit.exit(1);
}
}
String configPath = res.getString("config");
String nodeName = res.getString("node_name");
Platform platform = Platform.Config.parse(nodeName, configPath);
JsonRestServer restServer =
new JsonRestServer(Node.Util.getTrogdorAgentPort(platform.curNode()));
AgentRestResource resource = new AgentRestResource();
Agent agent = new Agent(platform, Time.SYSTEM, restServer, resource);
restServer.start(resource);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
log.error("Agent shutting down...");
}
});
agent.waitForShutdown();
}
};

View File

@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.agent;
import com.fasterxml.jackson.core.type.TypeReference;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.rest.AgentFaultsResponse;
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest;
import org.apache.kafka.trogdor.rest.Empty;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse;
import static net.sourceforge.argparse4j.impl.Arguments.store;
import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
/**
* A client for the Trogdor agent.
*/
public class AgentClient {
/**
* The URL target.
*/
private final String target;
public AgentClient(String host, int port) {
this(String.format("%s:%d", host, port));
}
public AgentClient(String target) {
this.target = target;
}
public String target() {
return target;
}
private String url(String suffix) {
return String.format("http://%s%s", target, suffix);
}
public AgentStatusResponse getStatus() throws Exception {
HttpResponse<AgentStatusResponse> resp =
JsonRestServer.<AgentStatusResponse>httpRequest(url("/agent/status"), "GET",
null, new TypeReference<AgentStatusResponse>() { });
return resp.body();
}
public AgentFaultsResponse getFaults() throws Exception {
HttpResponse<AgentFaultsResponse> resp =
JsonRestServer.<AgentFaultsResponse>httpRequest(url("/agent/faults"), "GET",
null, new TypeReference<AgentFaultsResponse>() { });
return resp.body();
}
public void putFault(CreateAgentFaultRequest request) throws Exception {
HttpResponse<AgentFaultsResponse> resp =
JsonRestServer.<AgentFaultsResponse>httpRequest(url("/agent/fault"), "PUT",
request, new TypeReference<AgentFaultsResponse>() { });
resp.body();
}
public void invokeShutdown() throws Exception {
HttpResponse<Empty> resp =
JsonRestServer.<Empty>httpRequest(url("/agent/shutdown"), "PUT",
null, new TypeReference<Empty>() { });
resp.body();
}
public static void main(String[] args) throws Exception {
ArgumentParser parser = ArgumentParsers
.newArgumentParser("trogdor-agent-client")
.defaultHelp(true)
.description("The Trogdor fault injection agent client.");
parser.addArgument("target")
.action(store())
.required(true)
.type(String.class)
.dest("target")
.metavar("TARGET")
.help("A colon-separated host and port pair. For example, example.com:8888");
MutuallyExclusiveGroup actions = parser.addMutuallyExclusiveGroup();
actions.addArgument("--status")
.action(storeTrue())
.type(Boolean.class)
.dest("status")
.help("Get agent status.");
actions.addArgument("--get-faults")
.action(storeTrue())
.type(Boolean.class)
.dest("get_faults")
.help("Get agent faults.");
actions.addArgument("--create-fault")
.action(store())
.type(String.class)
.dest("create_fault")
.metavar("FAULT_JSON")
.help("Create a new fault.");
actions.addArgument("--shutdown")
.action(storeTrue())
.type(Boolean.class)
.dest("shutdown")
.help("Trigger agent shutdown");
Namespace res = null;
try {
res = parser.parseArgs(args);
} catch (ArgumentParserException e) {
if (args.length == 0) {
parser.printHelp();
Exit.exit(0);
} else {
parser.handleError(e);
Exit.exit(1);
}
}
String target = res.getString("target");
AgentClient client = new AgentClient(target);
if (res.getBoolean("status")) {
System.out.println("Got agent status: " +
JsonUtil.toPrettyJsonString(client.getStatus()));
} else if (res.getBoolean("get_faults")) {
System.out.println("Got agent faults: " +
JsonUtil.toPrettyJsonString(client.getFaults()));
} else if (res.getString("create_fault") != null) {
client.putFault(JsonUtil.JSON_SERDE.readValue(res.getString("create_fault"),
CreateAgentFaultRequest.class));
System.out.println("Created fault.");
} else if (res.getBoolean("shutdown")) {
client.invokeShutdown();
System.out.println("Sent shutdown request.");
} else {
System.out.println("You must choose an action. Type --help for help.");
Exit.exit(1);
}
}
};

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.agent;
import org.apache.kafka.trogdor.rest.AgentFaultsResponse;
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest;
import org.apache.kafka.trogdor.rest.Empty;
import javax.servlet.ServletContext;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import java.util.concurrent.atomic.AtomicReference;
@Path("/agent")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class AgentRestResource {
private final AtomicReference<Agent> agent = new AtomicReference<>(null);
@javax.ws.rs.core.Context
private ServletContext context;
public void setAgent(Agent myAgent) {
agent.set(myAgent);
}
@GET
@Path("/status")
public AgentStatusResponse getStatus() throws Throwable {
return new AgentStatusResponse(agent().startTimeMs());
}
@GET
@Path("/faults")
public AgentFaultsResponse getAgentFaults() throws Throwable {
return agent().faults();
}
@PUT
@Path("/fault")
public Empty putAgentFault(CreateAgentFaultRequest request) throws Throwable {
agent().createFault(request);
return Empty.INSTANCE;
}
@PUT
@Path("/shutdown")
public Empty shutdown() throws Throwable {
agent().beginShutdown();
return Empty.INSTANCE;
}
private Agent agent() {
Agent myAgent = agent.get();
if (myAgent == null) {
throw new RuntimeException("AgentRestResource has not been initialized yet.");
}
return myAgent;
}
}

View File

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.basic;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.trogdor.common.Node;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
public class BasicNode implements Node {
private final String name;
private final String hostname;
private final Map<String, String> config;
private final Set<String> tags;
public BasicNode(String name, String hostname, Map<String, String> config,
Set<String> tags) {
this.name = name;
this.hostname = hostname;
this.config = config;
this.tags = tags;
}
public BasicNode(String name, JsonNode root) {
this.name = name;
String hostname = "localhost";
Set<String> tags = Collections.emptySet();
Map<String, String> config = new HashMap<>();
for (Iterator<Map.Entry<String, JsonNode>> iter = root.fields();
iter.hasNext(); ) {
Map.Entry<String, JsonNode> entry = iter.next();
String key = entry.getKey();
JsonNode node = entry.getValue();
if (key.equals("hostname")) {
hostname = node.asText();
} else if (key.equals("tags")) {
if (!node.isArray()) {
throw new RuntimeException("Expected the 'tags' field to be an " +
"array of strings.");
}
tags = new HashSet<>();
for (Iterator<JsonNode> tagIter = node.elements(); tagIter.hasNext(); ) {
JsonNode tag = tagIter.next();
tags.add(tag.asText());
}
} else {
config.put(key, node.asText());
}
}
this.hostname = hostname;
this.tags = tags;
this.config = config;
}
@Override
public String name() {
return name;
}
@Override
public String hostname() {
return hostname;
}
@Override
public String getConfig(String key) {
return config.get(key);
}
@Override
public Set<String> tags() {
return tags;
}
@Override
public int hashCode() {
return Objects.hash(name, hostname, config, tags);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
BasicNode that = (BasicNode) o;
return Objects.equals(name, that.name) &&
Objects.equals(hostname, that.hostname) &&
Objects.equals(config, that.config) &&
Objects.equals(tags, that.tags);
}
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.basic;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.common.utils.Shell;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.Topology;
import java.io.IOException;
/**
* Defines a cluster topology
*/
public class BasicPlatform implements Platform {
private final Node curNode;
private final BasicTopology topology;
private final CommandRunner commandRunner;
public interface CommandRunner {
String run(Node curNode, String[] command) throws IOException;
}
public static class ShellCommandRunner implements CommandRunner {
@Override
public String run(Node curNode, String[] command) throws IOException {
return Shell.execCommand(command);
}
}
public BasicPlatform(String curNodeName, BasicTopology topology,
CommandRunner commandRunner) {
this.curNode = topology.node(curNodeName);
if (this.curNode == null) {
throw new RuntimeException(String.format("No node named %s found " +
"in the cluster! Cluster nodes are: %s", curNodeName,
Utils.join(topology.nodes().keySet(), ",")));
}
this.topology = topology;
this.commandRunner = commandRunner;
}
public BasicPlatform(String curNodeName, JsonNode configRoot) {
JsonNode nodes = configRoot.get("nodes");
if (nodes == null) {
throw new RuntimeException("Expected to find a 'nodes' field " +
"in the root JSON configuration object");
}
this.topology = new BasicTopology(nodes);
this.curNode = topology.node(curNodeName);
if (this.curNode == null) {
throw new RuntimeException(String.format("No node named %s found " +
"in the cluster! Cluster nodes are: %s", curNodeName,
Utils.join(topology.nodes().keySet(), ",")));
}
this.commandRunner = new ShellCommandRunner();
}
@Override
public String name() {
return "BasicPlatform";
}
@Override
public Node curNode() {
return curNode;
}
@Override
public Topology topology() {
return topology;
}
@Override
public String runCommand(String[] command) throws IOException {
return commandRunner.run(curNode, command);
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.basic;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Topology;
import java.util.Iterator;
import java.util.NavigableMap;
import java.util.TreeMap;
public class BasicTopology implements Topology {
private final NavigableMap<String, Node> nodes;
public BasicTopology(NavigableMap<String, Node> nodes) {
this.nodes = nodes;
}
public BasicTopology(JsonNode configRoot) {
if (!configRoot.isObject()) {
throw new RuntimeException("Expected the 'nodes' element to be " +
"a JSON object.");
}
nodes = new TreeMap<>();
for (Iterator<String> iter = configRoot.fieldNames(); iter.hasNext(); ) {
String nodeName = iter.next();
JsonNode nodeConfig = configRoot.get(nodeName);
BasicNode node = new BasicNode(nodeName, nodeConfig);
nodes.put(nodeName, node);
}
}
@Override
public Node node(String id) {
return nodes.get(id);
}
@Override
public NavigableMap<String, Node> nodes() {
return nodes;
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.common;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
/**
* Utilities for working with JSON.
*/
public class JsonUtil {
public static final ObjectMapper JSON_SERDE;
static {
JSON_SERDE = new ObjectMapper();
JSON_SERDE.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
JSON_SERDE.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true);
}
public static String toJsonString(Object object) {
try {
return JSON_SERDE.writeValueAsString(object);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public static String toPrettyJsonString(Object object) {
try {
return JSON_SERDE.writerWithDefaultPrettyPrinter().writeValueAsString(object);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.common;
import java.util.Set;
/**
* Defines a node in a cluster topology
*/
public interface Node {
public static class Util {
public static int getIntConfig(Node node, String key, int defaultVal) {
String val = node.getConfig(key);
if (val == null) {
return defaultVal;
} else {
return Integer.parseInt(val);
}
}
public static int getTrogdorAgentPort(Node node) {
return getIntConfig(node, Platform.Config.TROGDOR_AGENT_PORT, 0);
}
public static int getTrogdorCoordinatorPort(Node node) {
return getIntConfig(node, Platform.Config.TROGDOR_COORDINATOR_PORT, 0);
}
}
/**
* Get name for this node.
*/
String name();
/**
* Get hostname for this node.
*/
String hostname();
/**
* Get the configuration value associated with the key, or null if there
* is none.
*/
String getConfig(String key);
/**
* Get the tags for this node.
*/
Set<String> tags();
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.common;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.io.IOException;
import org.apache.kafka.common.utils.Utils;
/**
* Defines a cluster topology
*/
public interface Platform {
class Config {
public static final String TROGDOR_AGENT_PORT = "trogdor.agent.port";
public static final String TROGDOR_COORDINATOR_PORT = "trogdor.coordinator.port";
public static final String TROGDOR_COORDINATOR_HEARTBEAT_MS =
"trogdor.coordinator.heartbeat.ms";
public static final int TROGDOR_COORDINATOR_HEARTBEAT_MS_DEFAULT = 60000;
public static Platform parse(String curNodeName, String path) throws Exception {
JsonNode root = JsonUtil.JSON_SERDE.readTree(new File(path));
JsonNode platformNode = root.get("platform");
if (platformNode == null) {
throw new RuntimeException("Expected to find a 'platform' field " +
"in the root JSON configuration object");
}
String platformName = platformNode.textValue();
return Utils.newParameterizedInstance(platformName,
String.class, curNodeName,
JsonNode.class, root);
}
}
/**
* Get name for this platform.
*/
String name();
/**
* Get the current node.
*/
Node curNode();
/**
* Get the cluster topology.
*/
Topology topology();
/**
* Run a command on this local node.
*
* Throws an exception if the command could not be run, or if the
* command returned a non-zero error status.
*
* @param command The command
*
* @return The command output.
*/
String runCommand(String[] command) throws IOException;
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.common;
import java.util.NavigableMap;
/**
* Defines a cluster topology
*/
public interface Topology {
/**
* Get the node with the given name.
*/
Node node(String id);
/**
* Get a sorted map of node names to nodes.
*/
NavigableMap<String, Node> nodes();
}

View File

@ -0,0 +1,341 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.coordinator;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.fault.Fault;
import org.apache.kafka.trogdor.fault.FaultSet;
import org.apache.kafka.trogdor.fault.FaultSpec;
import org.apache.kafka.trogdor.fault.FaultState;
import org.apache.kafka.trogdor.rest.CoordinatorFaultsResponse;
import org.apache.kafka.trogdor.rest.CreateCoordinatorFaultRequest;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import static net.sourceforge.argparse4j.impl.Arguments.store;
/**
* The Trogdor coordinator.
*
* The coordinator manages the agent processes in the cluster.
*/
public final class Coordinator {
private static final Logger log = LoggerFactory.getLogger(Coordinator.class);
/**
* The clock to use for this coordinator.
*/
private final Time time;
/**
* The start time in milliseconds.
*/
private final long startTimeMs;
/**
* The platform.
*/
private final Platform platform;
/**
* NodeManager objects for each node in the cluster.
*/
private final Map<String, NodeManager> nodeManagers;
/**
* The lock protecting shutdown and faultQueue.
*/
private final ReentrantLock lock = new ReentrantLock();
/**
* The condition variable which the coordinator thread waits on.
*/
private final Condition cond = lock.newCondition();
/**
* The coordinator runnable.
*/
private final CoordinatorRunnable runnable;
/**
* The REST server.
*/
private final JsonRestServer restServer;
/**
* The coordinator thread.
*/
private final KafkaThread thread;
/**
* True if the server is shutting down.
*/
private boolean shutdown = false;
/**
* The set of faults which have been scheduled.
*/
private final FaultSet pendingFaults = new FaultSet();
/**
* The set of faults which have been sent to the agents.
*/
private final FaultSet doneFaults = new FaultSet();
class CoordinatorRunnable implements Runnable {
@Override
public void run() {
log.info("Starting main service thread.");
try {
long nextWakeMs = 0;
while (true) {
long now = time.milliseconds();
List<Fault> toStart = new ArrayList<>();
lock.lock();
try {
if (nextWakeMs > now) {
if (cond.await(nextWakeMs - now, TimeUnit.MILLISECONDS)) {
log.trace("CoordinatorRunnable woke up early.");
}
}
nextWakeMs = now + (60L * 60L * 1000L);
if (shutdown) {
log.info("CoordinatorRunnable shutting down.");
return;
}
Iterator<Fault> iter = pendingFaults.iterateByStart();
while (iter.hasNext()) {
Fault fault = iter.next();
if (now < fault.spec().startMs()) {
nextWakeMs = Math.min(nextWakeMs, fault.spec().startMs());
break;
}
toStart.add(fault);
iter.remove();
doneFaults.add(fault);
}
} finally {
lock.unlock();
}
for (Fault fault: toStart) {
startFault(fault);
}
}
} catch (Throwable t) {
log.error("CoordinatorRunnable shutting down with exception", t);
} finally {
log.info("CoordinatorRunnable shutting down.");
restServer.stop();
for (NodeManager nodeManager : nodeManagers.values()) {
nodeManager.beginShutdown();
}
for (NodeManager nodeManager : nodeManagers.values()) {
nodeManager.waitForShutdown();
}
}
}
}
/**
* Create a new Coordinator.
*
* @param platform The platform object to use.
* @param time The timekeeper to use for this Coordinator.
* @param restServer The REST server to use.
* @param resource The AgentRestResoure to use.
*/
public Coordinator(Platform platform, Time time, JsonRestServer restServer,
CoordinatorRestResource resource) {
this.platform = platform;
this.time = time;
this.startTimeMs = time.milliseconds();
this.runnable = new CoordinatorRunnable();
this.restServer = restServer;
this.nodeManagers = new HashMap<>();
for (Node node : platform.topology().nodes().values()) {
if (Node.Util.getTrogdorAgentPort(node) > 0) {
this.nodeManagers.put(node.name(), new NodeManager(time, node));
}
}
if (this.nodeManagers.isEmpty()) {
log.warn("No agent nodes configured.");
}
this.thread = new KafkaThread("TrogdorCoordinatorThread", runnable, false);
this.thread.start();
resource.setCoordinator(this);
}
public int port() {
return this.restServer.port();
}
private void startFault(Fault fault) {
Set<String> affectedNodes = fault.targetNodes(platform.topology());
Set<NodeManager> affectedManagers = new HashSet<>();
Set<String> nonexistentNodes = new HashSet<>();
Set<String> nodeNames = new HashSet<>();
for (String affectedNode : affectedNodes) {
NodeManager nodeManager = nodeManagers.get(affectedNode);
if (nodeManager == null) {
nonexistentNodes.add(affectedNode);
} else {
affectedManagers.add(nodeManager);
nodeNames.add(affectedNode);
}
}
if (!nonexistentNodes.isEmpty()) {
log.warn("Fault {} refers to {} non-existent node(s): {}", fault.id(),
nonexistentNodes.size(), Utils.join(nonexistentNodes, ", "));
}
log.info("Applying fault {} on {} node(s): {}", fault.id(),
nodeNames.size(), Utils.join(nodeNames, ", "));
for (NodeManager nodeManager : affectedManagers) {
nodeManager.enqueueFault(fault);
}
}
public void beginShutdown() {
lock.lock();
try {
this.shutdown = true;
cond.signalAll();
} finally {
lock.unlock();
}
}
public void waitForShutdown() {
try {
this.thread.join();
} catch (InterruptedException e) {
log.error("Interrupted while waiting for thread shutdown", e);
Thread.currentThread().interrupt();
}
}
public long startTimeMs() {
return startTimeMs;
}
public CoordinatorFaultsResponse getFaults() {
Map<String, CoordinatorFaultsResponse.FaultData> faultData = new TreeMap<>();
lock.lock();
try {
getFaultsImpl(faultData, pendingFaults, FaultState.PENDING);
getFaultsImpl(faultData, doneFaults, FaultState.DONE);
} finally {
lock.unlock();
}
return new CoordinatorFaultsResponse(faultData);
}
private void getFaultsImpl(Map<String, CoordinatorFaultsResponse.FaultData> faultData,
FaultSet faultSet, FaultState state) {
for (Iterator<Fault> iter = faultSet.iterateByStart();
iter.hasNext(); ) {
Fault fault = iter.next();
CoordinatorFaultsResponse.FaultData data =
new CoordinatorFaultsResponse.FaultData(fault.spec(), state);
faultData.put(fault.id(), data);
}
}
public void createFault(CreateCoordinatorFaultRequest request) throws ClassNotFoundException {
lock.lock();
try {
Fault fault = FaultSpec.Util.createFault(request.id(), request.spec());
pendingFaults.add(fault);
cond.signalAll();
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws Exception {
ArgumentParser parser = ArgumentParsers
.newArgumentParser("trogdor-coordinator")
.defaultHelp(true)
.description("The Trogdor fault injection coordinator");
parser.addArgument("--coordinator.config")
.action(store())
.required(true)
.type(String.class)
.dest("config")
.metavar("CONFIG")
.help("The configuration file to use.");
parser.addArgument("--node-name")
.action(store())
.required(true)
.type(String.class)
.dest("node_name")
.metavar("NODE_NAME")
.help("The name of this node.");
Namespace res = null;
try {
res = parser.parseArgs(args);
} catch (ArgumentParserException e) {
if (args.length == 0) {
parser.printHelp();
Exit.exit(0);
} else {
parser.handleError(e);
Exit.exit(1);
}
}
String configPath = res.getString("config");
String nodeName = res.getString("node_name");
Platform platform = Platform.Config.parse(nodeName, configPath);
JsonRestServer restServer = new JsonRestServer(
Node.Util.getTrogdorCoordinatorPort(platform.curNode()));
CoordinatorRestResource resource = new CoordinatorRestResource();
Coordinator coordinator = new Coordinator(platform, Time.SYSTEM,
restServer, resource);
restServer.start(resource);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
log.error("Coordinator shutting down...");
}
});
coordinator.waitForShutdown();
}
};

View File

@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.coordinator;
import com.fasterxml.jackson.core.type.TypeReference;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.rest.CoordinatorFaultsResponse;
import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
import org.apache.kafka.trogdor.rest.CreateCoordinatorFaultRequest;
import org.apache.kafka.trogdor.rest.Empty;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse;
import static net.sourceforge.argparse4j.impl.Arguments.store;
import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
/**
* A client for the Trogdor coordinator.
*/
public class CoordinatorClient {
/**
* The URL target.
*/
private final String target;
public CoordinatorClient(String host, int port) {
this(String.format("%s:%d", host, port));
}
public CoordinatorClient(String target) {
this.target = target;
}
private String url(String suffix) {
return String.format("http://%s%s", target, suffix);
}
public CoordinatorStatusResponse getStatus() throws Exception {
HttpResponse<CoordinatorStatusResponse> resp =
JsonRestServer.<CoordinatorStatusResponse>httpRequest(url("/coordinator/status"), "GET",
null, new TypeReference<CoordinatorStatusResponse>() { });
return resp.body();
}
public CoordinatorFaultsResponse getFaults() throws Exception {
HttpResponse<CoordinatorFaultsResponse> resp =
JsonRestServer.<CoordinatorFaultsResponse>httpRequest(url("/coordinator/faults"), "GET",
null, new TypeReference<CoordinatorFaultsResponse>() { });
return resp.body();
}
public void putFault(CreateCoordinatorFaultRequest request) throws Exception {
HttpResponse<CreateCoordinatorFaultRequest> resp =
JsonRestServer.<CreateCoordinatorFaultRequest>httpRequest(url("/coordinator/fault"), "PUT",
request, new TypeReference<CreateCoordinatorFaultRequest>() { });
resp.body();
}
public void shutdown() throws Exception {
HttpResponse<Empty> resp =
JsonRestServer.<Empty>httpRequest(url("/coordinator/shutdown"), "PUT",
null, new TypeReference<Empty>() { });
resp.body();
}
public static void main(String[] args) throws Exception {
ArgumentParser parser = ArgumentParsers
.newArgumentParser("trogdor-coordinator-client")
.defaultHelp(true)
.description("The Trogdor fault injection coordinator client.");
parser.addArgument("target")
.action(store())
.required(true)
.type(String.class)
.dest("target")
.metavar("TARGET")
.help("A colon-separated host and port pair. For example, example.com:8889");
MutuallyExclusiveGroup actions = parser.addMutuallyExclusiveGroup();
actions.addArgument("--status")
.action(storeTrue())
.type(Boolean.class)
.dest("status")
.help("Get coordinator status.");
actions.addArgument("--get-faults")
.action(storeTrue())
.type(Boolean.class)
.dest("get_faults")
.help("Get coordinator faults.");
actions.addArgument("--create-fault")
.action(store())
.type(String.class)
.dest("create_fault")
.metavar("FAULT_JSON")
.help("Create a new fault.");
actions.addArgument("--shutdown")
.action(storeTrue())
.type(Boolean.class)
.dest("shutdown")
.help("Trigger coordinator shutdown");
Namespace res = null;
try {
res = parser.parseArgs(args);
} catch (ArgumentParserException e) {
if (args.length == 0) {
parser.printHelp();
Exit.exit(0);
} else {
parser.handleError(e);
Exit.exit(1);
}
}
String target = res.getString("target");
CoordinatorClient client = new CoordinatorClient(target);
if (res.getBoolean("status")) {
System.out.println("Got coordinator status: " +
JsonUtil.toPrettyJsonString(client.getStatus()));
} else if (res.getBoolean("get_faults")) {
System.out.println("Got coordinator faults: " +
JsonUtil.toPrettyJsonString(client.getFaults()));
} else if (res.getString("create_fault") != null) {
client.putFault(JsonUtil.JSON_SERDE.readValue(res.getString("create_fault"),
CreateCoordinatorFaultRequest.class));
System.out.println("Created fault.");
} else if (res.getBoolean("shutdown")) {
client.shutdown();
System.out.println("Sent shutdown request.");
} else {
System.out.println("You must choose an action. Type --help for help.");
Exit.exit(1);
}
}
};

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.coordinator;
import org.apache.kafka.trogdor.rest.CoordinatorFaultsResponse;
import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
import org.apache.kafka.trogdor.rest.CreateCoordinatorFaultRequest;
import org.apache.kafka.trogdor.rest.Empty;
import javax.servlet.ServletContext;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import java.util.concurrent.atomic.AtomicReference;
@Path("/coordinator")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class CoordinatorRestResource {
private final AtomicReference<Coordinator> coordinator = new AtomicReference<Coordinator>();
@javax.ws.rs.core.Context
private ServletContext context;
public void setCoordinator(Coordinator myCoordinator) {
coordinator.set(myCoordinator);
}
@GET
@Path("/status")
public CoordinatorStatusResponse getStatus() throws Throwable {
return new CoordinatorStatusResponse(coordinator().startTimeMs());
}
@GET
@Path("/faults")
public CoordinatorFaultsResponse getCoordinatorFaults() throws Throwable {
return coordinator().getFaults();
}
@PUT
@Path("/fault")
public Empty putCoordinatorFault(CreateCoordinatorFaultRequest request) throws Throwable {
coordinator().createFault(request);
return Empty.INSTANCE;
}
@PUT
@Path("/shutdown")
public Empty shutdown() throws Throwable {
coordinator().beginShutdown();
return Empty.INSTANCE;
}
private Coordinator coordinator() {
Coordinator myCoordinator = coordinator.get();
if (myCoordinator == null) {
throw new RuntimeException("CoordinatorRestResource has not been initialized yet.");
}
return myCoordinator;
}
}

View File

@ -0,0 +1,262 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.coordinator;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.trogdor.agent.AgentClient;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.fault.Fault;
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class NodeManager {
private static final Logger log = LoggerFactory.getLogger(NodeManager.class);
/**
* The Time object used to fetch the current time.
*/
private final Time time;
/**
* The node which is being managed.
*/
private final Node node;
/**
* The client for the node being managed.
*/
private final AgentClient client;
/**
* The maximum amount of time to go without contacting the node.
*/
private final long heartbeatMs;
/**
* True if the NodeManager is shutting down. Protected by the queueLock.
*/
private boolean shutdown = false;
/**
* The Node Manager runnable.
*/
private final NodeManagerRunnable runnable;
/**
* The Node Manager thread.
*/
private final KafkaThread thread;
/**
* The lock protecting the NodeManager fields.
*/
private final Lock lock = new ReentrantLock();
/**
* The condition variable used to wake the thread when it is waiting for a
* queue or shutdown change.
*/
private final Condition cond = lock.newCondition();
/**
* A queue of faults which should be sent to this node. Protected by the lock.
*/
private final List<Fault> faultQueue = new ArrayList<>();
/**
* The last time we successfully contacted the node. Protected by the lock.
*/
private long lastContactMs = 0;
/**
* The current status of this node.
*/
public static class NodeStatus {
private final String nodeName;
private final long lastContactMs;
NodeStatus(String nodeName, long lastContactMs) {
this.nodeName = nodeName;
this.lastContactMs = lastContactMs;
}
public String nodeName() {
return nodeName;
}
public long lastContactMs() {
return lastContactMs;
}
}
class NodeManagerRunnable implements Runnable {
@Override
public void run() {
try {
Fault fault = null;
long lastCommAttemptMs = 0;
while (true) {
long now = time.milliseconds();
if (fault != null) {
lastCommAttemptMs = now;
if (sendFault(now, fault)) {
fault = null;
}
}
long nextCommAttemptMs = lastCommAttemptMs + heartbeatMs;
if (now < nextCommAttemptMs) {
lastCommAttemptMs = now;
sendHeartbeat(now);
}
long waitMs = Math.max(0L, nextCommAttemptMs - now);
lock.lock();
try {
if (shutdown) {
return;
}
try {
cond.await(waitMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.info("{}: NodeManagerRunnable got InterruptedException", node.name());
Thread.currentThread().interrupt();
}
if (fault == null) {
if (!faultQueue.isEmpty()) {
fault = faultQueue.remove(0);
}
}
} finally {
lock.unlock();
}
}
} catch (Throwable e) {
log.warn("{}: exiting NodeManagerRunnable with exception", node.name(), e);
} finally {
}
}
}
NodeManager(Time time, Node node) {
this.time = time;
this.node = node;
this.client = new AgentClient(node.hostname(), Node.Util.getTrogdorAgentPort(node));
this.heartbeatMs = Node.Util.getIntConfig(node,
Platform.Config.TROGDOR_COORDINATOR_HEARTBEAT_MS,
Platform.Config.TROGDOR_COORDINATOR_HEARTBEAT_MS_DEFAULT);
this.runnable = new NodeManagerRunnable();
this.thread = new KafkaThread("NodeManagerThread(" + node.name() + ")", runnable, false);
this.thread.start();
}
private boolean sendFault(long now, Fault fault) {
try {
client.putFault(new CreateAgentFaultRequest(fault.id(), fault.spec()));
} catch (Exception e) {
log.warn("{}: error sending fault to {}.", node.name(), client.target(), e);
return false;
}
lock.lock();
try {
lastContactMs = now;
} finally {
lock.unlock();
}
return true;
}
private void sendHeartbeat(long now) {
AgentStatusResponse status = null;
try {
status = client.getStatus();
} catch (Exception e) {
log.warn("{}: error sending heartbeat to {}.", node.name(), client.target(), e);
return;
}
lock.lock();
try {
lastContactMs = now;
} finally {
lock.unlock();
}
log.debug("{}: got heartbeat status {}.", node.name(), status);
}
public void beginShutdown() {
lock.lock();
try {
if (shutdown)
return;
log.trace("{}: beginning shutdown.", node.name());
shutdown = true;
cond.signalAll();
} finally {
lock.unlock();
}
}
public void waitForShutdown() {
log.trace("waiting for NodeManager({}) shutdown.", node.name());
try {
thread.join();
} catch (InterruptedException e) {
log.error("{}: Interrupted while waiting for thread shutdown", node.name(), e);
Thread.currentThread().interrupt();
}
}
/**
* Get the current status of this node.
*
* @return The node status.
*/
public NodeStatus status() {
lock.lock();
try {
return new NodeStatus(node.name(), lastContactMs);
} finally {
lock.unlock();
}
}
/**
* Enqueue a new fault.
*
* @param fault The fault to enqueue.
*/
public void enqueueFault(Fault fault) {
lock.lock();
try {
log.trace("{}: added {} to fault queue.", node.name(), fault);
faultQueue.add(fault);
cond.signalAll();
} finally {
lock.unlock();
}
}
};

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.fault;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.trogdor.common.JsonUtil;
/**
* A base class that can be used for FaultSpecs.
*/
public abstract class AbstractFaultSpec implements FaultSpec {
private final long startMs;
private final long durationMs;
protected AbstractFaultSpec(@JsonProperty("startMs") long startMs,
@JsonProperty("durationMs") long durationMs) {
this.startMs = startMs;
this.durationMs = durationMs;
}
@JsonProperty
@Override
public long startMs() {
return startMs;
}
@JsonProperty
@Override
public long durationMs() {
return durationMs;
}
@Override
public String toString() {
return JsonUtil.toJsonString(this);
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.fault;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.Topology;
import java.util.Set;
public interface Fault {
/**
* Get the ID of this fault.
*/
String id();
/**
* Get the specification for this Fault.
*/
FaultSpec spec();
/**
* Activate the fault.
*/
void activate(Platform platform) throws Exception;
/**
* Deactivate the fault.
*/
void deactivate(Platform platform) throws Exception;
/**
* Get the nodes which this fault is targetting.
*/
Set<String> targetNodes(Topology topology);
}

View File

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.fault;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.TreeMap;
public class FaultSet {
private final static long NS_PER_MS = 1000000L;
/**
* Maps fault start times in nanoseconds to faults.
*/
private final TreeMap<Long, Fault> byStart = new TreeMap<Long, Fault>();
/**
* Maps fault end times in nanoseconds to faults.
*/
private final TreeMap<Long, Fault> byEnd = new TreeMap<Long, Fault>();
/**
* Return an iterator that iterates over the fault set in start time order.
*/
public FaultSetIterator iterateByStart() {
return new FaultSetIterator(byStart);
}
/**
* Return an iterator that iterates over the fault set in end time order.
*/
public FaultSetIterator iterateByEnd() {
return new FaultSetIterator(byEnd);
}
/**
* Add a new fault to the FaultSet.
*/
public void add(Fault fault) {
insertUnique(byStart, fault.spec().startMs() * NS_PER_MS, fault);
long endMs = fault.spec().startMs() + fault.spec().durationMs();
insertUnique(byEnd, endMs * NS_PER_MS, fault);
}
/**
* Insert a new fault to a TreeMap.
*
* If there is already a fault with the given key, the fault will be stored
* with the next available key.
*/
private void insertUnique(TreeMap<Long, Fault> map, long key, Fault fault) {
while (true) {
Fault existing = map.get(key);
if (existing == null) {
map.put(key, fault);
return;
} else if (existing == fault) {
return;
} else {
key++;
}
}
}
/**
* Remove a fault from the TreeMap. The fault is removed by object equality.
*/
public void remove(Fault fault) {
removeUnique(byStart, fault.spec().startMs() * NS_PER_MS, fault);
long endMs = fault.spec().startMs() + fault.spec().durationMs();
removeUnique(byEnd, endMs * NS_PER_MS, fault);
}
/**
* Helper function to remove a fault from a map. We will search every
* element of the map equal to or higher than the given key.
*/
private void removeUnique(TreeMap<Long, Fault> map, long key, Fault fault) {
while (true) {
Map.Entry<Long, Fault> existing = map.ceilingEntry(key);
if (existing == null) {
throw new NoSuchElementException("No such element as " + fault);
} else if (existing.getValue() == fault) {
map.remove(existing.getKey());
return;
} else {
key = existing.getKey() + 1;
}
}
}
/**
* An iterator over the FaultSet.
*/
class FaultSetIterator implements Iterator<Fault> {
private final TreeMap<Long, Fault> map;
private Fault cur = null;
private long prevKey = -1;
FaultSetIterator(TreeMap<Long, Fault> map) {
this.map = map;
}
@Override
public boolean hasNext() {
Map.Entry<Long, Fault> entry = map.higherEntry(prevKey);
return entry != null;
}
@Override
public Fault next() {
Map.Entry<Long, Fault> entry = map.higherEntry(prevKey);
if (entry == null) {
throw new NoSuchElementException();
}
prevKey = entry.getKey();
cur = entry.getValue();
return cur;
}
@Override
public void remove() {
if (cur == null) {
throw new IllegalStateException();
}
FaultSet.this.remove(cur);
cur = null;
}
}
};

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.fault;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.kafka.common.utils.Utils;
/**
* The specification for a fault.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS,
include = JsonTypeInfo.As.PROPERTY,
property = "class")
public interface FaultSpec {
class Util {
private static final String SPEC_STRING = "Spec";
public static Fault createFault(String faultId, FaultSpec faultSpec) throws ClassNotFoundException {
String faultSpecClassName = faultSpec.getClass().getName();
if (!faultSpecClassName.endsWith(SPEC_STRING)) {
throw new RuntimeException("FaultSpec class name must end with " + SPEC_STRING);
}
String faultClassName = faultSpecClassName.substring(0,
faultSpecClassName.length() - SPEC_STRING.length());
return Utils.newParameterizedInstance(faultClassName,
String.class, faultId,
FaultSpec.class, faultSpec);
}
}
/**
* Get the start time of this fault in ms.
*/
@JsonProperty
long startMs();
/**
* Get the duration of this fault in ms.
*/
@JsonProperty
long durationMs();
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.fault;
import com.fasterxml.jackson.annotation.JsonFormat;
@JsonFormat(shape = JsonFormat.Shape.STRING)
public enum FaultState {
PENDING,
RUNNING,
DONE
}

View File

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.fault;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.Topology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
public class NetworkPartitionFault implements Fault {
private static final Logger log = LoggerFactory.getLogger(NetworkPartitionFault.class);
private final String id;
private final NetworkPartitionFaultSpec spec;
private final List<Set<String>> partitions;
public NetworkPartitionFault(String id, FaultSpec spec) {
this.id = id;
this.spec = (NetworkPartitionFaultSpec) spec;
this.partitions = new ArrayList<>();
HashSet<String> prevNodes = new HashSet<>();
for (List<String> partition : this.spec.partitions()) {
for (String nodeName : partition) {
if (prevNodes.contains(nodeName)) {
throw new RuntimeException("Node " + nodeName +
" appears in more than one partition.");
}
prevNodes.add(nodeName);
this.partitions.add(new HashSet<String>(partition));
}
}
}
@Override
public String id() {
return id;
}
@Override
public FaultSpec spec() {
return spec;
}
@Override
public void activate(Platform platform) throws Exception {
log.info("Activating NetworkPartitionFault...");
runIptablesCommands(platform, "-A");
}
@Override
public void deactivate(Platform platform) throws Exception {
log.info("Deactivating NetworkPartitionFault...");
runIptablesCommands(platform, "-D");
}
private void runIptablesCommands(Platform platform, String iptablesAction) throws Exception {
Node curNode = platform.curNode();
Topology topology = platform.topology();
TreeSet<String> toBlock = new TreeSet<>();
for (Set<String> partition : partitions) {
if (!partition.contains(curNode.name())) {
for (String nodeName : partition) {
toBlock.add(nodeName);
}
}
}
for (String nodeName : toBlock) {
Node node = topology.node(nodeName);
InetAddress addr = InetAddress.getByName(node.hostname());
platform.runCommand(new String[] {
"sudo", "iptables", iptablesAction, "INPUT", "-p", "tcp", "-s",
addr.getHostAddress(), "-j", "DROP", "-m", "comment", "--comment", nodeName
});
}
}
@Override
public Set<String> targetNodes(Topology topology) {
Set<String> targetNodes = new HashSet<>();
for (Set<String> partition : partitions) {
targetNodes.addAll(partition);
}
return targetNodes;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
NetworkPartitionFault that = (NetworkPartitionFault) o;
return Objects.equals(id, that.id) &&
Objects.equals(spec, that.spec) &&
Objects.equals(partitions, that.partitions);
}
@Override
public int hashCode() {
return Objects.hash(id, spec, partitions);
}
@Override
public String toString() {
return "NoOpFault(id=" + id + ", spec=" + JsonUtil.toJsonString(spec) + ")";
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.fault;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.trogdor.common.JsonUtil;
import java.util.List;
import java.util.Objects;
/**
* The specification for a fault that creates a network partition.
*/
public class NetworkPartitionFaultSpec extends AbstractFaultSpec {
private final List<List<String>> partitions;
@JsonCreator
public NetworkPartitionFaultSpec(@JsonProperty("startMs") long startMs,
@JsonProperty("durationMs") long durationMs,
@JsonProperty("partitions") List<List<String>> partitions) {
super(startMs, durationMs);
this.partitions = partitions;
}
@JsonProperty
public List<List<String>> partitions() {
return partitions;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
NetworkPartitionFaultSpec that = (NetworkPartitionFaultSpec) o;
return Objects.equals(startMs(), that.startMs()) &&
Objects.equals(durationMs(), that.durationMs()) &&
Objects.equals(partitions, that.partitions);
}
@Override
public int hashCode() {
return Objects.hash(startMs(), durationMs(), partitions);
}
@Override
public String toString() {
return JsonUtil.toJsonString(this);
}
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.fault;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.Topology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
public class NoOpFault implements Fault {
private static final Logger log = LoggerFactory.getLogger(NoOpFault.class);
private final String id;
private final FaultSpec spec;
public NoOpFault(String id, FaultSpec spec) {
this.id = id;
this.spec = spec;
}
@Override
public String id() {
return id;
}
@Override
public FaultSpec spec() {
return spec;
}
@Override
public void activate(Platform platform) {
log.info("Activating NoOpFault...");
}
@Override
public void deactivate(Platform platform) {
log.info("Deactivating NoOpFault...");
}
@Override
public Set<String> targetNodes(Topology topology) {
Set<String> set = new HashSet<>();
for (Map.Entry<String, Node> entry : topology.nodes().entrySet()) {
if (Node.Util.getTrogdorAgentPort(entry.getValue()) > 0) {
set.add(entry.getKey());
}
}
return set;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
NoOpFault that = (NoOpFault) o;
return Objects.equals(id, that.id) &&
Objects.equals(spec, that.spec);
}
@Override
public int hashCode() {
return Objects.hash(id, spec);
}
@Override
public String toString() {
return "NoOpFault(id=" + id + ", spec=" + JsonUtil.toJsonString(spec) + ")";
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.fault;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
/**
* The specification for a fault that does nothing.
*
* This fault type exists mainly to test the fault injection system.
*/
public class NoOpFaultSpec extends AbstractFaultSpec {
@JsonCreator
public NoOpFaultSpec(@JsonProperty("startMs") long startMs,
@JsonProperty("durationMs") long durationMs) {
super(startMs, durationMs);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
NoOpFaultSpec that = (NoOpFaultSpec) o;
return Objects.equals(startMs(), that.startMs()) &&
Objects.equals(durationMs(), that.durationMs());
}
@Override
public int hashCode() {
return Objects.hash(startMs(), durationMs());
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.trogdor.common.JsonUtil;
import java.util.Map;
/**
* Response to GET /faults
*/
public class AgentFaultsResponse extends FaultDataMap {
@JsonCreator
public AgentFaultsResponse(@JsonProperty("faults") Map<String, FaultData> faults) {
super(faults);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AgentFaultsResponse that = (AgentFaultsResponse) o;
return super.equals(that);
}
@Override
public int hashCode() {
return super.hashCode();
}
@Override
public String toString() {
return JsonUtil.toJsonString(this);
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.trogdor.common.JsonUtil;
import java.util.Objects;
/**
* The status of the Trogdor agent.
*/
public class AgentStatusResponse {
private final long startTimeMs;
@JsonCreator
public AgentStatusResponse(@JsonProperty("startTimeMs") long startTimeMs) {
this.startTimeMs = startTimeMs;
}
@JsonProperty
public long startTimeMs() {
return startTimeMs;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AgentStatusResponse that = (AgentStatusResponse) o;
return Objects.equals(startTimeMs, that.startTimeMs);
}
@Override
public int hashCode() {
return Objects.hash(startTimeMs);
}
@Override
public String toString() {
return JsonUtil.toJsonString(this);
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.trogdor.common.JsonUtil;
import java.util.Map;
/**
* Response to GET /faults
*/
public class CoordinatorFaultsResponse extends FaultDataMap {
@JsonCreator
public CoordinatorFaultsResponse(@JsonProperty("faults") Map<String, FaultData> faults) {
super(faults);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CoordinatorFaultsResponse that = (CoordinatorFaultsResponse) o;
return super.equals(that);
}
@Override
public int hashCode() {
return super.hashCode();
}
@Override
public String toString() {
return JsonUtil.toJsonString(this);
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.trogdor.common.JsonUtil;
import java.util.Objects;
/**
* The status of the Trogdor coordinator.
*/
public class CoordinatorStatusResponse {
private final long startTimeMs;
@JsonCreator
public CoordinatorStatusResponse(@JsonProperty("startTimeMs") long startTimeMs) {
this.startTimeMs = startTimeMs;
}
@JsonProperty
public long startTimeMs() {
return startTimeMs;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CoordinatorStatusResponse that = (CoordinatorStatusResponse) o;
return Objects.equals(startTimeMs, that.startTimeMs);
}
@Override
public int hashCode() {
return Objects.hash(startTimeMs);
}
@Override
public String toString() {
return JsonUtil.toJsonString(this);
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.fault.FaultSpec;
import java.util.Objects;
/**
* A request to the Trogdor agent to create a fault.
*/
public class CreateAgentFaultRequest {
private final String id;
private final FaultSpec spec;
@JsonCreator
public CreateAgentFaultRequest(@JsonProperty("id") String id,
@JsonProperty("spec") FaultSpec spec) {
this.id = id;
this.spec = spec;
}
@JsonProperty
public String id() {
return id;
}
@JsonProperty
public FaultSpec spec() {
return spec;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CreateAgentFaultRequest that = (CreateAgentFaultRequest) o;
return Objects.equals(id, that.id) &&
Objects.equals(spec, that.spec);
}
@Override
public int hashCode() {
return Objects.hash(id, spec);
}
@Override
public String toString() {
return JsonUtil.toJsonString(this);
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.fault.FaultSpec;
import java.util.Objects;
/**
* A request to the Trogdor coordinator to create a fault.
*/
public class CreateCoordinatorFaultRequest {
private final String id;
private final FaultSpec spec;
@JsonCreator
public CreateCoordinatorFaultRequest(@JsonProperty("id") String id,
@JsonProperty("spec") FaultSpec spec) {
this.id = id;
this.spec = spec;
}
@JsonProperty
public String id() {
return id;
}
@JsonProperty
public FaultSpec spec() {
return spec;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CreateCoordinatorFaultRequest that = (CreateCoordinatorFaultRequest) o;
return Objects.equals(id, that.id) &&
Objects.equals(spec, that.spec);
}
@Override
public int hashCode() {
return Objects.hash(id, spec);
}
@Override
public String toString() {
return JsonUtil.toJsonString(this);
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.kafka.trogdor.common.JsonUtil;
/**
* An empty request or response.
*/
public class Empty {
public static final Empty INSTANCE = new Empty();
@JsonCreator
public Empty() {
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
return true;
}
@Override
public int hashCode() {
return 1;
}
@Override
public String toString() {
return JsonUtil.toJsonString(this);
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.trogdor.common.JsonUtil;
import java.util.Objects;
/**
* An error response.
*/
public class ErrorResponse {
private final int code;
private final String message;
@JsonCreator
public ErrorResponse(@JsonProperty("code") int code,
@JsonProperty("message") String message) {
this.code = code;
this.message = message;
}
@JsonProperty
public int code() {
return code;
}
@JsonProperty
public String message() {
return message;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ErrorResponse that = (ErrorResponse) o;
return Objects.equals(code, that.code) &&
Objects.equals(message, that.message);
}
@Override
public int hashCode() {
return Objects.hash(code, message);
}
@Override
public String toString() {
return JsonUtil.toJsonString(this);
}
}

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.fault.FaultSpec;
import org.apache.kafka.trogdor.fault.FaultState;
import java.util.Map;
import java.util.Objects;
/**
* Response to GET /faults
*/
public class FaultDataMap {
private final Map<String, FaultData> faults;
public static class FaultData {
private final FaultSpec spec;
private final FaultState state;
@JsonCreator
public FaultData(@JsonProperty("spec") FaultSpec spec,
@JsonProperty("status") FaultState state) {
this.spec = spec;
this.state = state;
}
@JsonProperty
public FaultSpec spec() {
return spec;
}
@JsonProperty
public FaultState state() {
return state;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FaultData that = (FaultData) o;
return Objects.equals(spec, that.spec) &&
Objects.equals(state, that.state);
}
@Override
public int hashCode() {
return Objects.hash(spec, state);
}
}
@JsonCreator
public FaultDataMap(@JsonProperty("faults") Map<String, FaultData> faults) {
this.faults = faults;
}
@JsonProperty
public Map<String, FaultData> faults() {
return faults;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FaultDataMap that = (FaultDataMap) o;
return Objects.equals(faults, that.faults);
}
@Override
public int hashCode() {
return Objects.hashCode(faults);
}
@Override
public String toString() {
return JsonUtil.toJsonString(this);
}
}

View File

@ -0,0 +1,220 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.Slf4jRequestLog;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.server.handler.RequestLogHandler;
import org.eclipse.jetty.server.handler.StatisticsHandler;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.servlet.ServletContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
/**
* Embedded server for the REST API that provides the control plane for Trogdor.
*/
public class JsonRestServer {
private static final Logger log = LoggerFactory.getLogger(JsonRestServer.class);
private static final long GRACEFUL_SHUTDOWN_TIMEOUT_MS = 2 * 1000;
private final Server jettyServer;
private final ServerConnector connector;
/**
* Create a REST server for this herder using the specified configs.
*
* @param port The port number to use for the REST server, or
* 0 to use a random port.
*/
public JsonRestServer(int port) {
this.jettyServer = new Server();
this.connector = new ServerConnector(jettyServer);
if (port > 0) {
connector.setPort(port);
}
jettyServer.setConnectors(new Connector[]{connector});
}
/**
* Start the JsonRestServer.
*
* @param resources The path handling resources to register.
*/
public void start(Object... resources) {
log.info("Starting REST server");
ResourceConfig resourceConfig = new ResourceConfig();
resourceConfig.register(new JacksonJsonProvider(JsonUtil.JSON_SERDE));
for (Object resource : resources) {
resourceConfig.register(resource);
log.info("Registered resource {}", resource);
}
resourceConfig.register(RestExceptionMapper.class);
ServletContainer servletContainer = new ServletContainer(resourceConfig);
ServletHolder servletHolder = new ServletHolder(servletContainer);
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath("/");
context.addServlet(servletHolder, "/*");
RequestLogHandler requestLogHandler = new RequestLogHandler();
Slf4jRequestLog requestLog = new Slf4jRequestLog();
requestLog.setLoggerName(JsonRestServer.class.getCanonicalName());
requestLog.setLogLatency(true);
requestLogHandler.setRequestLog(requestLog);
HandlerCollection handlers = new HandlerCollection();
handlers.setHandlers(new Handler[]{context, new DefaultHandler(), requestLogHandler});
StatisticsHandler statsHandler = new StatisticsHandler();
statsHandler.setHandler(handlers);
jettyServer.setHandler(statsHandler);
/* Needed for graceful shutdown as per `setStopTimeout` documentation */
jettyServer.setStopTimeout(GRACEFUL_SHUTDOWN_TIMEOUT_MS);
jettyServer.setStopAtShutdown(true);
try {
jettyServer.start();
} catch (Exception e) {
throw new RuntimeException("Unable to start REST server", e);
}
log.info("REST server listening at " + jettyServer.getURI());
}
public int port() {
return connector.getLocalPort();
}
public void stop() {
log.info("Stopping REST server");
try {
jettyServer.stop();
jettyServer.join();
log.info("REST server stopped");
} catch (Exception e) {
log.error("Unable to stop REST server", e);
} finally {
jettyServer.destroy();
}
}
/**
* @param url HTTP connection will be established with this url.
* @param method HTTP method ("GET", "POST", "PUT", etc.)
* @param requestBodyData Object to serialize as JSON and send in the request body.
* @param responseFormat Expected format of the response to the HTTP request.
* @param <T> The type of the deserialized response to the HTTP request.
* @return The deserialized response to the HTTP request, or null if no data is expected.
*/
public static <T> HttpResponse<T> httpRequest(String url, String method, Object requestBodyData,
TypeReference<T> responseFormat) throws IOException {
HttpURLConnection connection = null;
try {
String serializedBody = requestBodyData == null ? null :
JsonUtil.JSON_SERDE.writeValueAsString(requestBodyData);
log.debug("Sending {} with input {} to {}", method, serializedBody, url);
connection = (HttpURLConnection) new URL(url).openConnection();
connection.setRequestMethod(method);
connection.setRequestProperty("User-Agent", "kafka");
connection.setRequestProperty("Accept", "application/json");
// connection.getResponseCode() implicitly calls getInputStream, so always set
// this to true.
connection.setDoInput(true);
connection.setUseCaches(false);
if (requestBodyData != null) {
connection.setRequestProperty("Content-Type", "application/json");
connection.setDoOutput(true);
OutputStream os = connection.getOutputStream();
os.write(serializedBody.getBytes(StandardCharsets.UTF_8));
os.flush();
os.close();
}
int responseCode = connection.getResponseCode();
if (responseCode == HttpURLConnection.HTTP_NO_CONTENT) {
return new HttpResponse<>(null, new ErrorResponse(responseCode, connection.getResponseMessage()));
} else if ((responseCode >= 200) && (responseCode < 300)) {
InputStream is = connection.getInputStream();
T result = JsonUtil.JSON_SERDE.readValue(is, responseFormat);
is.close();
return new HttpResponse<>(result, null);
} else {
// If the resposne code was not in the 200s, we assume that this is an error
// response.
InputStream es = connection.getErrorStream();
if (es == null) {
// Handle the case where HttpURLConnection#getErrorStream returns null.
return new HttpResponse<>(null, new ErrorResponse(responseCode, ""));
}
// Try to read the error response JSON.
ErrorResponse error = JsonUtil.JSON_SERDE.readValue(es, ErrorResponse.class);
es.close();
return new HttpResponse<>(null, error);
}
} finally {
if (connection != null) {
connection.disconnect();
}
}
}
public static class HttpResponse<T> {
private final T body;
private final ErrorResponse error;
HttpResponse(T body, ErrorResponse error) {
this.body = body;
this.error = error;
}
public T body() throws Exception {
if (error != null) {
throw RestExceptionMapper.toException(error.code(), error.message());
}
return body;
}
public ErrorResponse error() {
return error;
}
}
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.exc.InvalidTypeIdException;
import org.apache.kafka.common.errors.SerializationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ExceptionMapper;
public class RestExceptionMapper implements ExceptionMapper<Throwable> {
private static final Logger log = LoggerFactory.getLogger(RestExceptionMapper.class);
@Override
public Response toResponse(Throwable e) {
if (log.isDebugEnabled()) {
log.debug("Uncaught exception in REST call: ", e);
} else if (log.isInfoEnabled()) {
log.info("Uncaught exception in REST call: {}", e.getMessage());
}
if (e instanceof NotFoundException) {
return buildResponse(Response.Status.NOT_FOUND, e);
} else if (e instanceof JsonMappingException) {
return buildResponse(Response.Status.BAD_REQUEST, e);
} else if (e instanceof ClassNotFoundException) {
return buildResponse(Response.Status.NOT_IMPLEMENTED, e);
} else if (e instanceof InvalidTypeIdException) {
return buildResponse(Response.Status.NOT_IMPLEMENTED, e);
} else if (e instanceof SerializationException) {
return buildResponse(Response.Status.BAD_REQUEST, e);
} else {
return buildResponse(Response.Status.INTERNAL_SERVER_ERROR, e);
}
}
public static Exception toException(int code, String msg) throws Exception {
if (code == Response.Status.NOT_FOUND.getStatusCode()) {
throw new NotFoundException(msg);
} else if (code == Response.Status.NOT_IMPLEMENTED.getStatusCode()) {
throw new ClassNotFoundException(msg);
} else if (code == Response.Status.BAD_REQUEST.getStatusCode()) {
throw new SerializationException(msg);
} else {
throw new RuntimeException(msg);
}
}
private Response buildResponse(Response.Status code, Throwable e) {
return Response.status(code).
entity(new ErrorResponse(code.getStatusCode(), e.getMessage())).
build();
}
}

View File

@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.agent;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.trogdor.basic.BasicNode;
import org.apache.kafka.trogdor.basic.BasicPlatform;
import org.apache.kafka.trogdor.basic.BasicTopology;
import org.apache.kafka.trogdor.common.ExpectedFaults;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.fault.FaultState;
import org.apache.kafka.trogdor.fault.NoOpFaultSpec;
import org.apache.kafka.trogdor.rest.AgentFaultsResponse;
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.junit.Rule;
import org.junit.rules.Timeout;
import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.TreeMap;
import static org.junit.Assert.assertEquals;
public class AgentTest {
@Rule
final public Timeout globalTimeout = Timeout.millis(120000);
private static BasicPlatform createBasicPlatform() {
TreeMap<String, Node> nodes = new TreeMap<>();
HashMap<String, String> config = new HashMap<>();
nodes.put("node01", new BasicNode("node01", "localhost",
config, Collections.<String>emptySet()));
BasicTopology topology = new BasicTopology(nodes);
return new BasicPlatform("node01", topology, new BasicPlatform.ShellCommandRunner());
}
private Agent createAgent(Time time) {
JsonRestServer restServer = new JsonRestServer(0);
AgentRestResource resource = new AgentRestResource();
restServer.start(resource);
return new Agent(createBasicPlatform(), time, restServer, resource);
}
@Test
public void testAgentStartShutdown() throws Exception {
Agent agent = createAgent(Time.SYSTEM);
agent.beginShutdown();
agent.waitForShutdown();
}
@Test
public void testAgentProgrammaticShutdown() throws Exception {
Agent agent = createAgent(Time.SYSTEM);
AgentClient client = new AgentClient("localhost", agent.port());
client.invokeShutdown();
agent.waitForShutdown();
}
@Test
public void testAgentGetStatus() throws Exception {
Agent agent = createAgent(Time.SYSTEM);
AgentClient client = new AgentClient("localhost", agent.port());
AgentStatusResponse status = client.getStatus();
assertEquals(agent.startTimeMs(), status.startTimeMs());
agent.beginShutdown();
agent.waitForShutdown();
}
@Test
public void testAgentCreateFaults() throws Exception {
Time time = new MockTime(0, 0, 0);
Agent agent = createAgent(time);
AgentClient client = new AgentClient("localhost", agent.port());
AgentFaultsResponse faults = client.getFaults();
assertEquals(Collections.emptyMap(), faults.faults());
new ExpectedFaults().waitFor(client);
final NoOpFaultSpec fooSpec = new NoOpFaultSpec(1000, 600000);
client.putFault(new CreateAgentFaultRequest("foo", fooSpec));
new ExpectedFaults().addFault("foo", fooSpec).waitFor(client);
final NoOpFaultSpec barSpec = new NoOpFaultSpec(2000, 900000);
client.putFault(new CreateAgentFaultRequest("bar", barSpec));
new ExpectedFaults().
addFault("foo", fooSpec).
addFault("bar", barSpec).
waitFor(client);
final NoOpFaultSpec bazSpec = new NoOpFaultSpec(1, 450000);
client.putFault(new CreateAgentFaultRequest("baz", bazSpec));
new ExpectedFaults().
addFault("foo", fooSpec).
addFault("bar", barSpec).
addFault("baz", bazSpec).
waitFor(client);
agent.beginShutdown();
agent.waitForShutdown();
}
@Test
public void testAgentActivatesFaults() throws Exception {
Time time = new MockTime(0, 0, 0);
Agent agent = createAgent(time);
AgentClient client = new AgentClient("localhost", agent.port());
AgentFaultsResponse faults = client.getFaults();
assertEquals(Collections.emptyMap(), faults.faults());
new ExpectedFaults().waitFor(client);
final NoOpFaultSpec fooSpec = new NoOpFaultSpec(10, 2);
client.putFault(new CreateAgentFaultRequest("foo", fooSpec));
new ExpectedFaults().addFault("foo", FaultState.RUNNING).waitFor(client);
final NoOpFaultSpec barSpec = new NoOpFaultSpec(20, 3);
client.putFault(new CreateAgentFaultRequest("bar", barSpec));
time.sleep(11);
new ExpectedFaults().
addFault("foo", FaultState.RUNNING).
addFault("bar", FaultState.RUNNING).
waitFor(client);
final NoOpFaultSpec bazSpec = new NoOpFaultSpec(1, 11);
client.putFault(new CreateAgentFaultRequest("baz", bazSpec));
new ExpectedFaults().
addFault("foo", FaultState.RUNNING).
addFault("bar", FaultState.RUNNING).
addFault("baz", FaultState.RUNNING).
waitFor(client);
time.sleep(2);
new ExpectedFaults().
addFault("foo", FaultState.DONE).
addFault("bar", FaultState.RUNNING).
addFault("baz", FaultState.DONE).
waitFor(client);
time.sleep(100);
new ExpectedFaults().
addFault("foo", FaultState.DONE).
addFault("bar", FaultState.DONE).
addFault("baz", FaultState.DONE).
waitFor(client);
agent.beginShutdown();
agent.waitForShutdown();
}
};

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.basic;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.trogdor.common.Platform;
import org.junit.Rule;
import org.junit.rules.Timeout;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import static org.junit.Assert.assertEquals;
public class BasicPlatformTest {
@Rule
final public Timeout globalTimeout = Timeout.millis(120000);
@Test
public void testCreateBasicPlatform() throws Exception {
File configFile = TestUtils.tempFile();
try {
try (OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(configFile),
StandardCharsets.UTF_8)) {
writer.write("{\n");
writer.write(" \"platform\": \"org.apache.kafka.trogdor.basic.BasicPlatform\",\n");
writer.write(" \"nodes\": {\n");
writer.write(" \"bob01\": {\n");
writer.write(" \"hostname\": \"localhost\",\n");
writer.write(" \"trogdor.agent.port\": 8888\n");
writer.write(" },\n");
writer.write(" \"bob02\": {\n");
writer.write(" \"hostname\": \"localhost\",\n");
writer.write(" \"trogdor.agent.port\": 8889\n");
writer.write(" }\n");
writer.write(" }\n");
writer.write("}\n");
}
Platform platform = Platform.Config.parse("bob01", configFile.getPath());
assertEquals("BasicPlatform", platform.name());
assertEquals(2, platform.topology().nodes().size());
assertEquals("bob01, bob02", Utils.join(platform.topology().nodes().keySet(), ", "));
} finally {
Files.delete(configFile.toPath());
}
}
};

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.common;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.basic.BasicPlatform;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
public class CapturingCommandRunner implements BasicPlatform.CommandRunner {
private static final Logger log = LoggerFactory.getLogger(CapturingCommandRunner.class);
private final Map<String, List<String>> commands = new HashMap<>();
private synchronized List<String> getOrCreate(String nodeName) {
List<String> lines = commands.get(nodeName);
if (lines != null) {
return lines;
}
lines = new LinkedList<>();
commands.put(nodeName, lines);
return lines;
}
@Override
public String run(Node curNode, String[] command) throws IOException {
String line = Utils.join(command, " ");
synchronized (this) {
getOrCreate(curNode.name()).add(line);
}
log.debug("RAN {}: {}", curNode, Utils.join(command, " "));
return "";
}
public synchronized List<String> lines(String nodeName) {
return new ArrayList<String>(getOrCreate(nodeName));
}
};

View File

@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.common;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.trogdor.agent.AgentClient;
import org.apache.kafka.trogdor.coordinator.CoordinatorClient;
import org.apache.kafka.trogdor.fault.FaultSpec;
import org.apache.kafka.trogdor.fault.FaultState;
import org.apache.kafka.trogdor.rest.AgentFaultsResponse;
import org.apache.kafka.trogdor.rest.CoordinatorFaultsResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.TreeMap;
public class ExpectedFaults {
private static final Logger log = LoggerFactory.getLogger(ExpectedFaults.class);
private static class FaultData {
final FaultSpec spec;
final FaultState state;
FaultData(FaultSpec spec, FaultState state) {
this.spec = spec;
this.state = state;
}
}
private interface FaultFetcher {
TreeMap<String, FaultData> fetch() throws Exception;
}
private static class AgentFaultFetcher implements FaultFetcher {
private final AgentClient client;
AgentFaultFetcher(AgentClient client) {
this.client = client;
}
@Override
public TreeMap<String, FaultData> fetch() throws Exception {
TreeMap<String, FaultData> results = new TreeMap<>();
AgentFaultsResponse response = client.getFaults();
for (Map.Entry<String, AgentFaultsResponse.FaultData> entry :
response.faults().entrySet()) {
results.put(entry.getKey(),
new FaultData(entry.getValue().spec(), entry.getValue().state()));
}
return results;
}
}
private static class CoordinatorFaultFetcher implements FaultFetcher {
private final CoordinatorClient client;
CoordinatorFaultFetcher(CoordinatorClient client) {
this.client = client;
}
@Override
public TreeMap<String, FaultData> fetch() throws Exception {
TreeMap<String, FaultData> results = new TreeMap<>();
CoordinatorFaultsResponse response = client.getFaults();
for (Map.Entry<String, CoordinatorFaultsResponse.FaultData> entry :
response.faults().entrySet()) {
results.put(entry.getKey(),
new FaultData(entry.getValue().spec(), entry.getValue().state()));
}
return results;
}
}
private final TreeMap<String, FaultData> expected = new TreeMap<String, FaultData>();
public ExpectedFaults addFault(String id, FaultSpec spec) {
expected.put(id, new FaultData(spec, null));
return this;
}
public ExpectedFaults addFault(String id, FaultState state) {
expected.put(id, new FaultData(null, state));
return this;
}
public ExpectedFaults addFault(String id, FaultSpec spec, FaultState state) {
expected.put(id, new FaultData(spec, state));
return this;
}
public ExpectedFaults waitFor(AgentClient agentClient) throws InterruptedException {
waitFor(new AgentFaultFetcher(agentClient));
return this;
}
public ExpectedFaults waitFor(CoordinatorClient client) throws InterruptedException {
waitFor(new CoordinatorFaultFetcher(client));
return this;
}
private void waitFor(final FaultFetcher faultFetcher) throws InterruptedException {
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
TreeMap<String, FaultData> curData = null;
try {
curData = faultFetcher.fetch();
} catch (Exception e) {
log.info("Got error fetching faults", e);
throw new RuntimeException(e);
}
StringBuilder errors = new StringBuilder();
for (Map.Entry<String, FaultData> entry : expected.entrySet()) {
String id = entry.getKey();
FaultData expectedFaultData = entry.getValue();
FaultData curFaultData = curData.get(id);
if (curFaultData == null) {
errors.append("Did not find fault id " + id + "\n");
} else {
if (expectedFaultData.spec != null) {
if (!expectedFaultData.spec.equals(curFaultData.spec)) {
errors.append("For fault id " + id + ", expected fault " +
"spec " + expectedFaultData.spec + ", but got " +
curFaultData.spec + "\n");
}
}
if (expectedFaultData.state != null) {
if (!expectedFaultData.state.equals(curFaultData.state)) {
errors.append("For fault id " + id + ", expected fault " +
"state " + expectedFaultData.state + ", but got " +
curFaultData.state + "\n");
}
}
}
}
for (String id : curData.keySet()) {
if (expected.get(id) == null) {
errors.append("Got unexpected fault id " + id + "\n");
}
}
String errorString = errors.toString();
if (!errorString.isEmpty()) {
log.info("EXPECTED FAULTS: {}", faultsToString(expected));
log.info("ACTUAL FAULTS : {}", faultsToString(curData));
log.info(errorString);
return false;
}
return true;
}
}, "Timed out waiting for expected fault specs " + faultsToString(expected));
}
private static String faultsToString(TreeMap<String, FaultData> faults) {
StringBuilder bld = new StringBuilder();
bld.append("{");
String faultsPrefix = "";
for (Map.Entry<String, FaultData> entry : faults.entrySet()) {
String id = entry.getKey();
bld.append(faultsPrefix).append(id).append(": {");
faultsPrefix = ", ";
String faultValuesPrefix = "";
FaultData faultData = entry.getValue();
if (faultData.spec != null) {
bld.append(faultValuesPrefix).append("spec: ").append(faultData.spec);
faultValuesPrefix = ", ";
}
if (faultData.state != null) {
bld.append(faultValuesPrefix).append("state: ").append(faultData.state);
faultValuesPrefix = ", ";
}
bld.append("}");
}
bld.append("}");
return bld.toString();
}
};

View File

@ -0,0 +1,238 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.common;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.agent.Agent;
import org.apache.kafka.trogdor.agent.AgentClient;
import org.apache.kafka.trogdor.agent.AgentRestResource;
import org.apache.kafka.trogdor.basic.BasicNode;
import org.apache.kafka.trogdor.basic.BasicPlatform;
import org.apache.kafka.trogdor.basic.BasicTopology;
import org.apache.kafka.trogdor.coordinator.Coordinator;
import org.apache.kafka.trogdor.coordinator.CoordinatorClient;
import org.apache.kafka.trogdor.coordinator.CoordinatorRestResource;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.TreeSet;
/**
* MiniTrogdorCluster sets up a local cluster of Trogdor Agents and Coordinators.
*/
public class MiniTrogdorCluster implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(MiniTrogdorCluster.class);
/**
* The MiniTrogdorCluster#Builder is used to set up a new MiniTrogdorCluster.
*/
public static class Builder {
private final TreeSet<String> agentNames = new TreeSet<>();
private String coordinatorName = null;
private Time time = Time.SYSTEM;
private BasicPlatform.CommandRunner commandRunner =
new BasicPlatform.ShellCommandRunner();
private static class NodeData {
String hostname;
AgentRestResource agentRestResource = null;
JsonRestServer agentRestServer = null;
int agentPort = 0;
JsonRestServer coordinatorRestServer = null;
int coordinatorPort = 0;
CoordinatorRestResource coordinatorRestResource = null;
Platform platform = null;
Agent agent = null;
Coordinator coordinator = null;
BasicNode node = null;
}
public Builder() {
}
/**
* Set the timekeeper used by this MiniTrogdorCluster.
*/
public Builder time(Time time) {
this.time = time;
return this;
}
public Builder commandRunner(BasicPlatform.CommandRunner commandRunner) {
this.commandRunner = commandRunner;
return this;
}
/**
* Add a new trogdor coordinator node to the cluster.
*/
public Builder addCoordinator(String nodeName) {
if (coordinatorName != null) {
throw new RuntimeException("At most one coordinator is allowed.");
}
coordinatorName = nodeName;
return this;
}
/**
* Add a new trogdor agent node to the cluster.
*/
public Builder addAgent(String nodeName) {
if (agentNames.contains(nodeName)) {
throw new RuntimeException("There is already an agent on node " + nodeName);
}
agentNames.add(nodeName);
return this;
}
private NodeData getOrCreate(String nodeName, TreeMap<String, NodeData> nodes) {
NodeData data = nodes.get(nodeName);
if (data != null)
return data;
data = new NodeData();
data.hostname = "127.0.0.1";
nodes.put(nodeName, data);
return data;
}
/**
* Create the MiniTrogdorCluster.
*/
public MiniTrogdorCluster build() {
log.info("Creating MiniTrogdorCluster with agents: {} and coordinator: {}",
Utils.join(agentNames, ", "), coordinatorName);
TreeMap<String, NodeData> nodes = new TreeMap<>();
for (String agentName : agentNames) {
NodeData node = getOrCreate(agentName, nodes);
node.agentRestResource = new AgentRestResource();
node.agentRestServer = new JsonRestServer(0);
node.agentRestServer.start(node.agentRestResource);
node.agentPort = node.agentRestServer.port();
}
if (coordinatorName != null) {
NodeData node = getOrCreate(coordinatorName, nodes);
node.coordinatorRestResource = new CoordinatorRestResource();
node.coordinatorRestServer = new JsonRestServer(0);
node.coordinatorRestServer.start(node.coordinatorRestResource);
node.coordinatorPort = node.coordinatorRestServer.port();
}
for (Map.Entry<String, NodeData> entry : nodes.entrySet()) {
NodeData node = entry.getValue();
HashMap<String, String> config = new HashMap<>();
config.put(Platform.Config.TROGDOR_AGENT_PORT,
Integer.toString(node.agentPort));
config.put(Platform.Config.TROGDOR_COORDINATOR_PORT,
Integer.toString(node.coordinatorPort));
node.node = new BasicNode(entry.getKey(), node.hostname, config,
Collections.<String>emptySet());
}
TreeMap<String, Node> topologyNodes = new TreeMap<>();
for (Map.Entry<String, NodeData> entry : nodes.entrySet()) {
topologyNodes.put(entry.getKey(), entry.getValue().node);
}
BasicTopology topology = new BasicTopology(topologyNodes);
for (Map.Entry<String, NodeData> entry : nodes.entrySet()) {
String nodeName = entry.getKey();
NodeData node = entry.getValue();
node.platform = new BasicPlatform(nodeName, topology, commandRunner);
if (node.agentRestResource != null) {
node.agent = new Agent(node.platform, time, node.agentRestServer,
node.agentRestResource);
}
if (node.coordinatorRestResource != null) {
node.coordinator = new Coordinator(node.platform, time,
node.coordinatorRestServer, node.coordinatorRestResource);
}
}
TreeMap<String, Agent> agents = new TreeMap<>();
Coordinator coordinator = null;
for (Map.Entry<String, NodeData> entry : nodes.entrySet()) {
NodeData node = entry.getValue();
if (node.agent != null) {
agents.put(entry.getKey(), node.agent);
}
if (node.coordinator != null) {
coordinator = node.coordinator;
}
}
return new MiniTrogdorCluster(agents, coordinator);
}
}
private final TreeMap<String, Agent> agents;
private final Coordinator coordinator;
private MiniTrogdorCluster(TreeMap<String, Agent> agents,
Coordinator coordinator) {
this.agents = agents;
this.coordinator = coordinator;
}
public TreeMap<String, Agent> agents() {
return agents;
}
public Coordinator coordinator() {
return coordinator;
}
public CoordinatorClient coordinatorClient() {
if (coordinator == null) {
throw new RuntimeException("No coordinator configured.");
}
return new CoordinatorClient("localhost", coordinator.port());
}
public AgentClient agentClient(String nodeName) {
Agent agent = agents.get(nodeName);
if (agent == null) {
throw new RuntimeException("No agent configured on node " + nodeName);
}
return new AgentClient("localhost", agent.port());
}
@Override
public void close() throws Exception {
for (Agent agent : agents.values()) {
agent.beginShutdown();
}
if (coordinator != null) {
coordinator.beginShutdown();
}
for (Agent agent : agents.values()) {
agent.waitForShutdown();
}
if (coordinator != null) {
coordinator.waitForShutdown();
}
}
};

View File

@ -0,0 +1,212 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.coordinator;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.trogdor.agent.AgentClient;
import org.apache.kafka.trogdor.common.CapturingCommandRunner;
import org.apache.kafka.trogdor.common.ExpectedFaults;
import org.apache.kafka.trogdor.common.MiniTrogdorCluster;
import org.apache.kafka.trogdor.fault.FaultState;
import org.apache.kafka.trogdor.fault.NetworkPartitionFaultSpec;
import org.apache.kafka.trogdor.fault.NoOpFaultSpec;
import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
import org.apache.kafka.trogdor.rest.CreateCoordinatorFaultRequest;
import org.junit.Rule;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class CoordinatorTest {
private static final Logger log = LoggerFactory.getLogger(CoordinatorTest.class);
@Rule
final public Timeout globalTimeout = Timeout.millis(120000);
@Test
public void testCoordinatorStatus() throws Exception {
try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().
addCoordinator("node01").
build()) {
CoordinatorStatusResponse status = cluster.coordinatorClient().getStatus();
assertEquals(cluster.coordinator().startTimeMs(), status.startTimeMs());
}
}
@Test
public void testCreateFault() throws Exception {
Time time = new MockTime(0, 0, 0);
try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().
addCoordinator("node01").
time(time).
build()) {
new ExpectedFaults().waitFor(cluster.coordinatorClient());
NoOpFaultSpec noOpFaultSpec = new NoOpFaultSpec(10, 2);
cluster.coordinatorClient().putFault(
new CreateCoordinatorFaultRequest("fault1", noOpFaultSpec));
new ExpectedFaults().
addFault("fault1", noOpFaultSpec, FaultState.PENDING).
waitFor(cluster.coordinatorClient());
time.sleep(11);
new ExpectedFaults().
addFault("fault1", noOpFaultSpec, FaultState.DONE).
waitFor(cluster.coordinatorClient());
}
}
@Test
public void testFaultDistribution() throws Exception {
Time time = new MockTime(0, 0, 0);
try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().
addCoordinator("node01").
addAgent("node01").
addAgent("node02").
time(time).
build()) {
CoordinatorClient coordinatorClient = cluster.coordinatorClient();
AgentClient agentClient1 = cluster.agentClient("node01");
AgentClient agentClient2 = cluster.agentClient("node02");
NoOpFaultSpec noOpFaultSpec = new NoOpFaultSpec(10, 2);
coordinatorClient.putFault(new CreateCoordinatorFaultRequest("fault1", noOpFaultSpec));
new ExpectedFaults().
addFault("fault1", noOpFaultSpec, FaultState.PENDING).
waitFor(coordinatorClient);
new ExpectedFaults().
waitFor(agentClient1).
waitFor(agentClient2);
time.sleep(10);
new ExpectedFaults().
addFault("fault1", noOpFaultSpec, FaultState.DONE).
waitFor(coordinatorClient);
new ExpectedFaults().
addFault("fault1", noOpFaultSpec, FaultState.RUNNING).
waitFor(agentClient1).
waitFor(agentClient2);
}
}
public static class ExpectedLines {
List<String> expectedLines = new ArrayList<>();
public ExpectedLines addLine(String line) {
expectedLines.add(line);
return this;
}
public ExpectedLines waitFor(final String nodeName,
final CapturingCommandRunner runner) throws InterruptedException {
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
return linesMatch(nodeName, runner.lines(nodeName));
}
}, "failed to find the expected lines " + this.toString());
return this;
}
private boolean linesMatch(final String nodeName, List<String> actualLines) {
int matchIdx = 0, i = 0;
while (true) {
if (matchIdx == expectedLines.size()) {
log.debug("Got expected lines for {}", nodeName);
return true;
}
if (i == actualLines.size()) {
log.info("Failed to find the expected lines for {}. First " +
"missing line on index {}: {}",
nodeName, matchIdx, expectedLines.get(matchIdx));
return false;
}
String actualLine = actualLines.get(i++);
String expectedLine = expectedLines.get(matchIdx);
if (expectedLine.equals(actualLine)) {
matchIdx++;
} else {
log.trace("Expected:\n'{}', Got:\n'{}'", expectedLine, actualLine);
matchIdx = 0;
}
}
}
@Override
public String toString() {
return Utils.join(expectedLines, ", ");
}
}
private static List<List<String>> createPartitionLists(String[][] array) {
List<List<String>> list = new ArrayList<>();
for (String[] a : array) {
list.add(Arrays.asList(a));
}
return list;
}
@Test
public void testNetworkPartitionFault() throws Exception {
CapturingCommandRunner runner = new CapturingCommandRunner();
try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().
addCoordinator("node01").
addAgent("node01").
addAgent("node02").
addAgent("node03").
commandRunner(runner).
build()) {
CoordinatorClient coordinatorClient = cluster.coordinatorClient();
NetworkPartitionFaultSpec spec = new NetworkPartitionFaultSpec(0, Long.MAX_VALUE,
createPartitionLists(new String[][] {
new String[] {"node01", "node02"},
new String[] {"node03"},
}));
coordinatorClient.putFault(new CreateCoordinatorFaultRequest("netpart", spec));
new ExpectedFaults().
addFault("netpart", spec).
waitFor(coordinatorClient);
new ExpectedLines().
addLine("sudo iptables -A INPUT -p tcp -s 127.0.0.1 -j DROP " +
"-m comment --comment node03").
waitFor("node01", runner);
new ExpectedLines().
addLine("sudo iptables -A INPUT -p tcp -s 127.0.0.1 -j DROP " +
"-m comment --comment node03").
waitFor("node02", runner);
new ExpectedLines().
addLine("sudo iptables -A INPUT -p tcp -s 127.0.0.1 -j DROP " +
"-m comment --comment node01").
addLine("sudo iptables -A INPUT -p tcp -s 127.0.0.1 -j DROP " +
"-m comment --comment node02").
waitFor("node03", runner);
}
}
};

View File

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.fault;
import org.junit.Test;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
public class FaultSetTest {
private static final NoOpFault FAULT_A =
new NoOpFault("faultA", new NoOpFaultSpec(0, 100));
private static final NoOpFault FAULT_B =
new NoOpFault("faultB", new NoOpFaultSpec(20, 60));
private static final NoOpFault FAULT_C =
new NoOpFault("faultC", new NoOpFaultSpec(40, 50));
private static final NoOpFault FAULT_D =
new NoOpFault("faultD", new NoOpFaultSpec(50, 10));
private static final List<Fault> FAULTS_IN_START_ORDER =
Arrays.<Fault>asList(FAULT_A, FAULT_B, FAULT_C, FAULT_D);
private static final List<Fault> FAULTS_IN_END_ORDER =
Arrays.<Fault>asList(FAULT_D, FAULT_B, FAULT_C, FAULT_A);
@Test
public void testIterateByStart() throws Exception {
FaultSet faultSet = new FaultSet();
for (Fault fault: FAULTS_IN_END_ORDER) {
faultSet.add(fault);
}
int i = 0;
for (Iterator<Fault> iter = faultSet.iterateByStart(); iter.hasNext(); ) {
Fault fault = iter.next();
assertEquals(FAULTS_IN_START_ORDER.get(i), fault);
i++;
}
}
@Test
public void testIterateByEnd() throws Exception {
FaultSet faultSet = new FaultSet();
for (Fault fault: FAULTS_IN_START_ORDER) {
faultSet.add(fault);
}
int i = 0;
for (Iterator<Fault> iter = faultSet.iterateByEnd(); iter.hasNext(); ) {
Fault fault = iter.next();
assertEquals(FAULTS_IN_END_ORDER.get(i), fault);
i++;
}
}
@Test
public void testDeletes() throws Exception {
FaultSet faultSet = new FaultSet();
for (Fault fault: FAULTS_IN_START_ORDER) {
faultSet.add(fault);
}
Iterator<Fault> iter = faultSet.iterateByEnd();
iter.next();
iter.next();
iter.remove();
iter.next();
iter.next();
iter.remove();
assertFalse(iter.hasNext());
try {
iter.next();
fail("expected NoSuchElementException");
} catch (NoSuchElementException e) {
}
iter = faultSet.iterateByEnd();
assertEquals(FAULT_D, iter.next());
assertEquals(FAULT_C, iter.next());
assertFalse(iter.hasNext());
iter = faultSet.iterateByStart();
faultSet.remove(FAULT_C);
assertEquals(FAULT_D, iter.next());
assertFalse(iter.hasNext());
}
@Test
public void testEqualRanges() throws Exception {
FaultSet faultSet = new FaultSet();
faultSet.add(new NoOpFault("fault1", new NoOpFaultSpec(10, 20)));
faultSet.add(new NoOpFault("fault2", new NoOpFaultSpec(10, 20)));
faultSet.add(new NoOpFault("fault3", new NoOpFaultSpec(10, 20)));
faultSet.add(new NoOpFault("fault4", new NoOpFaultSpec(10, 20)));
for (Iterator<Fault> iter = faultSet.iterateByStart(); iter.hasNext(); ) {
Fault fault = iter.next();
if (fault.id().equals("fault3")) {
iter.remove();
}
}
Iterator<Fault> iter = faultSet.iterateByStart();
assertEquals("fault1", iter.next().id());
assertEquals("fault2", iter.next().id());
assertEquals("fault4", iter.next().id());
assertFalse(iter.hasNext());
}
};

View File

@ -0,0 +1,22 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
log4j.rootLogger=TRACE, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.logger.org.apache.kafka=TRACE
log4j.logger.org.eclipse.jetty=INFO