KAFKA-5777; Add ducktape integration for Trogdor

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>

Closes #3726 from cmccabe/KAFKA-5777
This commit is contained in:
Colin P. Mccabe 2017-09-07 13:23:03 +01:00 committed by Rajini Sivaram
parent 329d5fa64a
commit 4065ffb3e1
25 changed files with 993 additions and 143 deletions

View File

@ -0,0 +1,14 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -0,0 +1,45 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
class FaultSpec(object):
"""
The base class for a fault specification.
MAX_DURATION_MS The longest duration we should use for a fault specification.
"""
MAX_DURATION_MS=10000000
def __init__(self, start_ms, duration_ms):
"""
Create a new fault specification.
:param start_ms: The start time in milliseconds since the epoch.
:param duration_ms: The duration in milliseconds.
"""
self.start_ms = start_ms
self.duration_ms = duration_ms
def message(self):
"""
Return a message suitable for sending to the Trogdor daemon.
"""
raise NotImplemented
def __str__(self):
return json.dumps(self.message())

View File

@ -0,0 +1,54 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kafkatest.services.trogdor.fault_spec import FaultSpec
class NetworkPartitionFaultSpec(FaultSpec):
"""
The specification for a network partition fault.
Network partition faults fracture the network into different partitions
that cannot communicate with each other.
"""
def __init__(self, start_ms, duration_ms, partitions):
"""
Create a new NetworkPartitionFaultSpec.
:param start_ms: The start time, as described in fault_spec.py
:param duration_ms: The duration in milliseconds.
:param partitions: An array of arrays describing the partitions.
The inner arrays may contain either node names,
or ClusterNode objects.
"""
super(NetworkPartitionFaultSpec, self).__init__(start_ms, duration_ms)
self.partitions = []
for partition in partitions:
nodes = []
for obj in partition:
if isinstance(obj, basestring):
nodes.append(obj)
else:
nodes.append(obj.name)
self.partitions.append(nodes)
def message(self):
return {
"class": "org.apache.kafka.trogdor.fault.NetworkPartitionFaultSpec",
"startMs": self.start_ms,
"durationMs": self.duration_ms,
"partitions": self.partitions,
}

View File

@ -0,0 +1,41 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kafkatest.services.trogdor.fault_spec import FaultSpec
class NoOpFaultSpec(FaultSpec):
"""
The specification for a nop-op fault.
No-op faults are used to test the fault injector. They don't do anything,
but must be propagated to all fault injector daemons.
"""
def __init__(self, start_ms, duration_ms):
"""
Create a new NoOpFault.
:param start_ms: The start time, as described in fault_spec.py
:param duration_ms: The duration in milliseconds.
"""
super(NoOpFaultSpec, self).__init__(start_ms, duration_ms)
def message(self):
return {
"class": "org.apache.kafka.trogdor.fault.NoOpFaultSpec",
"startMs": self.start_ms,
"durationMs": self.duration_ms,
}

View File

@ -0,0 +1,23 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
log4j.rootLogger=DEBUG, mylogger
log4j.logger.kafka=DEBUG
log4j.logger.org.apache.kafka=DEBUG
log4j.logger.org.eclipse=INFO
log4j.appender.mylogger=org.apache.log4j.FileAppender
log4j.appender.mylogger.File={{ log_path }}
log4j.appender.mylogger.layout=org.apache.log4j.PatternLayout
log4j.appender.mylogger.layout.ConversionPattern=[%d] %p %m (%c)%n

View File

@ -0,0 +1,255 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os.path
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3 import Retry
from ducktape.services.service import Service
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
class TrogdorService(KafkaPathResolverMixin, Service):
"""
A ducktape service for running the trogdor fault injection daemons.
Attributes:
PERSISTENT_ROOT The root filesystem path to store service files under.
COORDINATOR_STDOUT_STDERR The path where we store the coordinator's stdout/stderr output.
AGENT_STDOUT_STDERR The path where we store the agents's stdout/stderr output.
COORDINATOR_LOG The path where we store the coordinator's log4j output.
AGENT_LOG The path where we store the agent's log4j output.
AGENT_LOG4J_PROPERTIES The path to the agent log4j.properties file for log config.
COORDINATOR_LOG4J_PROPERTIES The path to the coordinator log4j.properties file for log config.
CONFIG_PATH The path to the trogdor configuration file.
DEFAULT_AGENT_PORT The default port to use for trogdor_agent daemons.
DEFAULT_COORDINATOR_PORT The default port to use for trogdor_coordinator daemons.
REQUEST_TIMEOUT The request timeout in seconds to use for REST requests.
REQUEST_HEADERS The request headers to use when communicating with trogdor.
"""
PERSISTENT_ROOT="/mnt/trogdor"
COORDINATOR_STDOUT_STDERR = os.path.join(PERSISTENT_ROOT, "trogdor-coordinator-stdout-stderr.log")
AGENT_STDOUT_STDERR = os.path.join(PERSISTENT_ROOT, "trogdor-agent-stdout-stderr.log")
COORDINATOR_LOG = os.path.join(PERSISTENT_ROOT, "trogdor-coordinator.log")
AGENT_LOG = os.path.join(PERSISTENT_ROOT, "trogdor-agent.log")
COORDINATOR_LOG4J_PROPERTIES = os.path.join(PERSISTENT_ROOT, "trogdor-coordinator-log4j.properties")
AGENT_LOG4J_PROPERTIES = os.path.join(PERSISTENT_ROOT, "trogdor-agent-log4j.properties")
CONFIG_PATH = os.path.join(PERSISTENT_ROOT, "trogdor.conf")
DEFAULT_AGENT_PORT=8888
DEFAULT_COORDINATOR_PORT=8889
REQUEST_TIMEOUT=5
REQUEST_HEADERS = {"Content-type": "application/json"}
logs = {
"trogdor_coordinator_stdout_stderr": {
"path": COORDINATOR_STDOUT_STDERR,
"collect_default": True},
"trogdor_agent_stdout_stderr": {
"path": AGENT_STDOUT_STDERR,
"collect_default": True},
"trogdor_coordinator_log": {
"path": COORDINATOR_LOG,
"collect_default": True},
"trogdor_agent_log": {
"path": AGENT_LOG,
"collect_default": True},
}
def __init__(self, context, agent_nodes, agent_port=DEFAULT_AGENT_PORT,
coordinator_port=DEFAULT_COORDINATOR_PORT):
"""
Create a Trogdor service.
:param context: The test context.
:param agent_nodes: The nodes to run the agents on.
:param agent_port: The port to use for the trogdor_agent daemons.
:param coordinator_port: The port to use for the trogdor_coordinator daemons.
"""
Service.__init__(self, context, num_nodes=1)
self.coordinator_node = self.nodes[0]
if (len(agent_nodes) == 0):
raise RuntimeError("You must supply at least one node to run the service on.")
for agent_node in agent_nodes:
self.nodes.append(agent_node)
self.agent_port = agent_port
self.coordinator_port = coordinator_port
def free(self):
# We only want to deallocate the coordinator node, not the agent nodes. So we
# change self.nodes to include only the coordinator node, and then invoke
# the base class' free method.
if self.coordinator_node is not None:
self.nodes = [self.coordinator_node]
self.coordinator_node = None
Service.free(self)
def _create_config_dict(self):
"""
Create a dictionary with the Trogdor configuration.
:return: The configuration dictionary.
"""
dict_nodes = {}
for node in self.nodes:
dict_nodes[node.name] = {
"hostname": node.account.ssh_hostname,
"trogdor.agent.port": self.agent_port,
}
dict_nodes[self.coordinator_node.name]["trogdor.coordinator.port"] = self.coordinator_port
return {
"platform": "org.apache.kafka.trogdor.basic.BasicPlatform",
"nodes": dict_nodes,
}
def start_node(self, node):
node.account.mkdirs(TrogdorService.PERSISTENT_ROOT)
# Create the configuration file on the node.
str = json.dumps(self._create_config_dict(), indent=2)
self.logger.info("Creating configuration file %s with %s" % (TrogdorService.CONFIG_PATH, str))
node.account.create_file(TrogdorService.CONFIG_PATH, str)
if self.is_coordinator(node):
self._start_coordinator_node(node)
else:
self._start_agent_node(node)
def _start_coordinator_node(self, node):
node.account.create_file(TrogdorService.COORDINATOR_LOG4J_PROPERTIES,
self.render('log4j.properties',
log_path=TrogdorService.COORDINATOR_LOG))
self._start_trogdor_daemon("coordinator", TrogdorService.COORDINATOR_STDOUT_STDERR,
TrogdorService.COORDINATOR_LOG4J_PROPERTIES,
TrogdorService.COORDINATOR_LOG, node)
self.logger.info("Started trogdor coordinator on %s." % node.name)
def _start_agent_node(self, node):
node.account.create_file(TrogdorService.AGENT_LOG4J_PROPERTIES,
self.render('log4j.properties',
log_path=TrogdorService.AGENT_LOG))
self._start_trogdor_daemon("agent", TrogdorService.AGENT_STDOUT_STDERR,
TrogdorService.AGENT_LOG4J_PROPERTIES,
TrogdorService.AGENT_LOG, node)
self.logger.info("Started trogdor agent on %s." % node.name)
def _start_trogdor_daemon(self, daemon_name, stdout_stderr_capture_path,
log4j_properties_path, log_path, node):
cmd = "export KAFKA_LOG4J_OPTS='-Dlog4j.configuration=file:%s'; " % log4j_properties_path
cmd += "%s %s --%s.config %s --node-name %s 1>> %s 2>> %s &" % \
(self.path.script("trogdor.sh", node),
daemon_name,
daemon_name,
TrogdorService.CONFIG_PATH,
node.name,
stdout_stderr_capture_path,
stdout_stderr_capture_path)
node.account.ssh(cmd)
with node.account.monitor_log(log_path) as monitor:
monitor.wait_until("Starting main service thread.", timeout_sec=30, backoff_sec=.25,
err_msg=("%s on %s didn't finish startup" % (daemon_name, node.name)))
def wait_node(self, node, timeout_sec=None):
if self.is_coordinator(node):
return len(node.account.java_pids(self.coordinator_class_name())) == 0
else:
return len(node.account.java_pids(self.agent_class_name())) == 0
def stop_node(self, node):
"""Halt trogdor processes on this node."""
if self.is_coordinator(node):
node.account.kill_java_processes(self.coordinator_class_name())
else:
node.account.kill_java_processes(self.agent_class_name())
def clean_node(self, node):
"""Clean up persistent state on this node - e.g. service logs, configuration files etc."""
self.stop_node(node)
node.account.ssh("rm -rf -- %s" % TrogdorService.PERSISTENT_ROOT)
def _coordinator_url(self, path):
return "http://%s:%d/coordinator/%s" % \
(self.coordinator_node.account.ssh_hostname, self.coordinator_port, path)
def request_session(self):
"""
Creates a new request session which will retry for a while.
"""
session = requests.Session()
session.mount('http://',
HTTPAdapter(max_retries=Retry(total=4, backoff_factor=0.3)))
return session
def _coordinator_put(self, path, message):
"""
Make a PUT request to the Trogdor coordinator.
:param path: The URL path to use.
:param message: The message object to send.
:return: The response as an object.
"""
url = self._coordinator_url(path)
self.logger.info("PUT %s %s" % (url, message))
response = self.request_session().put(url, json=message,
timeout=TrogdorService.REQUEST_TIMEOUT,
headers=TrogdorService.REQUEST_HEADERS)
response.raise_for_status()
return response.json()
def _coordinator_get(self, path, message):
"""
Make a GET request to the Trogdor coordinator.
:param path: The URL path to use.
:param message: The message object to send.
:return: The response as an object.
"""
url = self._coordinator_url(path)
self.logger.info("GET %s %s" % (url, message))
response = self.request_session().get(url, json=message,
timeout=TrogdorService.REQUEST_TIMEOUT,
headers=TrogdorService.REQUEST_HEADERS)
response.raise_for_status()
return response.json()
def create_fault(self, id, spec):
"""
Create a new fault.
:param id: The fault id.
:param spec: The fault spec.
"""
self._coordinator_put("fault", { "id": id, "spec": spec.message()})
def get_faults(self):
"""
Get the faults which are on the coordinator.
:returns: A map of fault id strings to fault data objects.
Fault data objects contain a 'spec' field with the spec
and a 'state' field with the state.
"""
return self._coordinator_get("faults", {})
def is_coordinator(self, node):
return node == self.coordinator_node
def agent_class_name(self):
return "org.apache.kafka.trogdor.agent.Agent"
def coordinator_class_name(self):
return "org.apache.kafka.trogdor.coordinator.Coordinator"

View File

@ -0,0 +1,97 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kafkatest.services.trogdor.network_partition_fault_spec import NetworkPartitionFaultSpec
from ducktape.cluster.cluster_spec import ClusterSpec
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from kafkatest.services.trogdor.fault_spec import FaultSpec
from kafkatest.services.trogdor.no_op_fault_spec import NoOpFaultSpec
from kafkatest.services.trogdor.trogdor import TrogdorService
from kafkatest.utils import node_is_reachable
class TrogdorTest(Test):
"""
Tests the Trogdor fault injection daemon in isolation.
"""
def __init__(self, test_context):
super(TrogdorTest, self).__init__(test_context)
def set_up_trogdor(self, num_agent_nodes):
self.agent_nodes = self.test_context.cluster.alloc(ClusterSpec.simple_linux(num_agent_nodes))
self.trogdor = TrogdorService(context=self.test_context, agent_nodes=self.agent_nodes)
for agent_node in self.agent_nodes:
agent_node.account.logger = self.trogdor.logger
self.trogdor.start()
def setUp(self):
self.trogdor = None
self.agent_nodes = None
def tearDown(self):
if self.trogdor is not None:
self.trogdor.stop()
self.trogdor = None
if self.agent_nodes is not None:
self.test_context.cluster.free(self.agent_nodes)
self.agent_nodes = None
@cluster(num_nodes=4)
def test_trogdor_service(self):
"""
Test that we can bring up Trogdor and create a no-op fault.
"""
self.set_up_trogdor(3)
spec = NoOpFaultSpec(0, FaultSpec.MAX_DURATION_MS)
self.trogdor.create_fault("myfault", spec)
def check_for_faults():
faults = self.trogdor.get_faults()
self.logger.info("faults = %s" % faults)
return "myfault" in faults
wait_until(lambda: check_for_faults,
timeout_sec=10, backoff_sec=.2, err_msg="Failed to read back myfault.")
@cluster(num_nodes=4)
def test_network_partition_fault(self):
"""
Test that the network partition fault results in a true network partition between nodes.
"""
self.set_up_trogdor(3)
spec = NetworkPartitionFaultSpec(0, FaultSpec.MAX_DURATION_MS,
[[self.agent_nodes[0]], self.agent_nodes[1:]])
assert 2 == len(spec.partitions)
assert [self.agent_nodes[0].name] == spec.partitions[0]
assert [self.agent_nodes[1].name, self.agent_nodes[2].name] == spec.partitions[1]
self.trogdor.create_fault("partition0", spec)
def verify_nodes_partitioned():
if node_is_reachable(self.agent_nodes[0], self.agent_nodes[1]):
return False
if node_is_reachable(self.agent_nodes[1], self.agent_nodes[0]):
return False
if node_is_reachable(self.agent_nodes[2], self.agent_nodes[0]):
return False
return True
wait_until(lambda: verify_nodes_partitioned,
timeout_sec=10, backoff_sec=.2, err_msg="Failed to verify that the nodes were partitioned.")
if not node_is_reachable(self.agent_nodes[0], self.agent_nodes[0]):
raise RuntimeError("Node 0 must be reachable from itself.")
if not node_is_reachable(self.agent_nodes[1], self.agent_nodes[2]):
raise RuntimeError("Node 2 must be reachable from node 1.")
if not node_is_reachable(self.agent_nodes[2], self.agent_nodes[1]):
raise RuntimeError("Node 1 must be reachable from node 2.")

View File

@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from util import kafkatest_version, is_version, is_int, is_int_with_prefix from util import kafkatest_version, is_version, is_int, is_int_with_prefix, node_is_reachable

View File

@ -103,3 +103,12 @@ def is_int_with_prefix(msg):
"prefix dot integer value, but one of the two parts (before or after dot) " "prefix dot integer value, but one of the two parts (before or after dot) "
"are not integers. Message: %s" % (msg)) "are not integers. Message: %s" % (msg))
def node_is_reachable(src_node, dst_node):
"""
Returns true if a node is unreachable from another node.
:param src_node: The source node to check from reachability from.
:param dst_node: The destination node to check for reachability to.
:return: True only if dst is reachable from src.
"""
return 0 == src_node.account.ssh("nc -w 3 -z %s 22" % dst_node.account.hostname, allow_fail=True)

View File

@ -29,7 +29,7 @@ import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.fault.Fault; import org.apache.kafka.trogdor.fault.Fault;
import org.apache.kafka.trogdor.fault.FaultSet; import org.apache.kafka.trogdor.fault.FaultSet;
import org.apache.kafka.trogdor.fault.FaultSpec; import org.apache.kafka.trogdor.fault.FaultSpec;
import org.apache.kafka.trogdor.fault.FaultState; import org.apache.kafka.trogdor.fault.RunningState;
import org.apache.kafka.trogdor.rest.AgentFaultsResponse; import org.apache.kafka.trogdor.rest.AgentFaultsResponse;
import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest; import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest;
import org.apache.kafka.trogdor.rest.JsonRestServer; import org.apache.kafka.trogdor.rest.JsonRestServer;
@ -140,7 +140,8 @@ public final class Agent {
Iterator<Fault> running = runningFaults.iterateByEnd(); Iterator<Fault> running = runningFaults.iterateByEnd();
while (running.hasNext()) { while (running.hasNext()) {
Fault fault = running.next(); Fault fault = running.next();
long endMs = fault.spec().startMs() + fault.spec().durationMs(); RunningState state = (RunningState) fault.state();
long endMs = state.startedMs() + fault.spec().durationMs();
if (now < endMs) { if (now < endMs) {
nextWakeMs = Math.min(nextWakeMs, endMs); nextWakeMs = Math.min(nextWakeMs, endMs);
break; break;
@ -154,7 +155,7 @@ public final class Agent {
for (Fault fault: toStart) { for (Fault fault: toStart) {
try { try {
log.debug("Activating fault " + fault); log.debug("Activating fault " + fault);
fault.activate(platform); fault.activate(now, platform);
started.add(fault); started.add(fault);
} catch (Throwable e) { } catch (Throwable e) {
log.error("Error activating fault " + fault.id(), e); log.error("Error activating fault " + fault.id(), e);
@ -164,7 +165,7 @@ public final class Agent {
for (Fault fault: toEnd) { for (Fault fault: toEnd) {
try { try {
log.debug("Deactivating fault " + fault); log.debug("Deactivating fault " + fault);
fault.deactivate(platform); fault.deactivate(now, platform);
} catch (Throwable e) { } catch (Throwable e) {
log.error("Error deactivating fault " + fault.id(), e); log.error("Error deactivating fault " + fault.id(), e);
} finally { } finally {
@ -200,10 +201,35 @@ public final class Agent {
} finally { } finally {
log.info("AgentRunnable shutting down."); log.info("AgentRunnable shutting down.");
restServer.stop(); restServer.stop();
int numDeactivated = deactivateRunningFaults();
log.info("AgentRunnable deactivated {} fault(s).", numDeactivated);
} }
} }
} }
private int deactivateRunningFaults() {
long now = time.milliseconds();
int numDeactivated = 0;
lock.lock();
try {
for (Iterator<Fault> iter = runningFaults.iterateByStart(); iter.hasNext(); ) {
Fault fault = iter.next();
try {
numDeactivated++;
iter.remove();
fault.deactivate(now, platform);
} catch (Exception e) {
log.error("Got exception while deactivating {}", fault, e);
} finally {
doneFaults.add(fault);
}
}
} finally {
lock.unlock();
}
return numDeactivated;
}
/** /**
* Create a new Agent. * Create a new Agent.
* *
@ -257,9 +283,9 @@ public final class Agent {
Map<String, AgentFaultsResponse.FaultData> faultData = new TreeMap<>(); Map<String, AgentFaultsResponse.FaultData> faultData = new TreeMap<>();
lock.lock(); lock.lock();
try { try {
updateFaultsResponse(faultData, pendingFaults, FaultState.PENDING); updateFaultsResponse(faultData, pendingFaults);
updateFaultsResponse(faultData, runningFaults, FaultState.RUNNING); updateFaultsResponse(faultData, runningFaults);
updateFaultsResponse(faultData, doneFaults, FaultState.DONE); updateFaultsResponse(faultData, doneFaults);
} finally { } finally {
lock.unlock(); lock.unlock();
} }
@ -267,12 +293,12 @@ public final class Agent {
} }
private void updateFaultsResponse(Map<String, AgentFaultsResponse.FaultData> faultData, private void updateFaultsResponse(Map<String, AgentFaultsResponse.FaultData> faultData,
FaultSet faultSet, FaultState state) { FaultSet faultSet) {
for (Iterator<Fault> iter = faultSet.iterateByStart(); for (Iterator<Fault> iter = faultSet.iterateByStart();
iter.hasNext(); ) { iter.hasNext(); ) {
Fault fault = iter.next(); Fault fault = iter.next();
AgentFaultsResponse.FaultData data = AgentFaultsResponse.FaultData data =
new AgentFaultsResponse.FaultData(fault.spec(), state); new AgentFaultsResponse.FaultData(fault.spec(), fault.state());
faultData.put(fault.id(), data); faultData.put(fault.id(), data);
} }
} }
@ -326,12 +352,14 @@ public final class Agent {
JsonRestServer restServer = JsonRestServer restServer =
new JsonRestServer(Node.Util.getTrogdorAgentPort(platform.curNode())); new JsonRestServer(Node.Util.getTrogdorAgentPort(platform.curNode()));
AgentRestResource resource = new AgentRestResource(); AgentRestResource resource = new AgentRestResource();
Agent agent = new Agent(platform, Time.SYSTEM, restServer, resource); final Agent agent = new Agent(platform, Time.SYSTEM, restServer, resource);
restServer.start(resource); restServer.start(resource);
Runtime.getRuntime().addShutdownHook(new Thread() { Runtime.getRuntime().addShutdownHook(new Thread() {
@Override @Override
public void run() { public void run() {
log.error("Agent shutting down..."); log.error("Running shutdown hook...");
agent.beginShutdown();
agent.waitForShutdown();
} }
}); });
agent.waitForShutdown(); agent.waitForShutdown();

View File

@ -23,6 +23,8 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.Node; import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform; import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.Topology; import org.apache.kafka.trogdor.common.Topology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
@ -30,6 +32,8 @@ import java.io.IOException;
* Defines a cluster topology * Defines a cluster topology
*/ */
public class BasicPlatform implements Platform { public class BasicPlatform implements Platform {
private static final Logger log = LoggerFactory.getLogger(BasicPlatform.class);
private final Node curNode; private final Node curNode;
private final BasicTopology topology; private final BasicTopology topology;
private final CommandRunner commandRunner; private final CommandRunner commandRunner;
@ -41,7 +45,14 @@ public class BasicPlatform implements Platform {
public static class ShellCommandRunner implements CommandRunner { public static class ShellCommandRunner implements CommandRunner {
@Override @Override
public String run(Node curNode, String[] command) throws IOException { public String run(Node curNode, String[] command) throws IOException {
return Shell.execCommand(command); try {
String result = Shell.execCommand(command);
log.info("RUN: {}. RESULT: [{}]", Utils.join(command, " "), result);
return result;
} catch (RuntimeException | IOException e) {
log.info("RUN: {}. ERROR: [{}]", Utils.join(command, " "), e.getMessage());
throw e;
}
} }
} }

View File

@ -27,10 +27,11 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.Node; import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform; import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.fault.DoneState;
import org.apache.kafka.trogdor.fault.Fault; import org.apache.kafka.trogdor.fault.Fault;
import org.apache.kafka.trogdor.fault.FaultSet; import org.apache.kafka.trogdor.fault.FaultSet;
import org.apache.kafka.trogdor.fault.FaultSpec; import org.apache.kafka.trogdor.fault.FaultSpec;
import org.apache.kafka.trogdor.fault.FaultState; import org.apache.kafka.trogdor.fault.SendingState;
import org.apache.kafka.trogdor.rest.CoordinatorFaultsResponse; import org.apache.kafka.trogdor.rest.CoordinatorFaultsResponse;
import org.apache.kafka.trogdor.rest.CreateCoordinatorFaultRequest; import org.apache.kafka.trogdor.rest.CreateCoordinatorFaultRequest;
import org.apache.kafka.trogdor.rest.JsonRestServer; import org.apache.kafka.trogdor.rest.JsonRestServer;
@ -115,9 +116,9 @@ public final class Coordinator {
private final FaultSet pendingFaults = new FaultSet(); private final FaultSet pendingFaults = new FaultSet();
/** /**
* The set of faults which have been sent to the agents. * The set of faults which have been sent to the NodeManagers.
*/ */
private final FaultSet doneFaults = new FaultSet(); private final FaultSet processedFaults = new FaultSet();
class CoordinatorRunnable implements Runnable { class CoordinatorRunnable implements Runnable {
@Override @Override
@ -152,13 +153,13 @@ public final class Coordinator {
} }
toStart.add(fault); toStart.add(fault);
iter.remove(); iter.remove();
doneFaults.add(fault); processedFaults.add(fault);
} }
} finally { } finally {
lock.unlock(); lock.unlock();
} }
for (Fault fault: toStart) { for (Fault fault: toStart) {
startFault(fault); startFault(now, fault);
} }
} }
} catch (Throwable t) { } catch (Throwable t) {
@ -209,7 +210,7 @@ public final class Coordinator {
return this.restServer.port(); return this.restServer.port();
} }
private void startFault(Fault fault) { private void startFault(long now, Fault fault) {
Set<String> affectedNodes = fault.targetNodes(platform.topology()); Set<String> affectedNodes = fault.targetNodes(platform.topology());
Set<NodeManager> affectedManagers = new HashSet<>(); Set<NodeManager> affectedManagers = new HashSet<>();
Set<String> nonexistentNodes = new HashSet<>(); Set<String> nonexistentNodes = new HashSet<>();
@ -229,6 +230,11 @@ public final class Coordinator {
} }
log.info("Applying fault {} on {} node(s): {}", fault.id(), log.info("Applying fault {} on {} node(s): {}", fault.id(),
nodeNames.size(), Utils.join(nodeNames, ", ")); nodeNames.size(), Utils.join(nodeNames, ", "));
if (nodeNames.isEmpty()) {
fault.setState(new DoneState(now, ""));
} else {
fault.setState(new SendingState(nodeNames));
}
for (NodeManager nodeManager : affectedManagers) { for (NodeManager nodeManager : affectedManagers) {
nodeManager.enqueueFault(fault); nodeManager.enqueueFault(fault);
} }
@ -261,8 +267,8 @@ public final class Coordinator {
Map<String, CoordinatorFaultsResponse.FaultData> faultData = new TreeMap<>(); Map<String, CoordinatorFaultsResponse.FaultData> faultData = new TreeMap<>();
lock.lock(); lock.lock();
try { try {
getFaultsImpl(faultData, pendingFaults, FaultState.PENDING); getFaultsImpl(faultData, pendingFaults);
getFaultsImpl(faultData, doneFaults, FaultState.DONE); getFaultsImpl(faultData, processedFaults);
} finally { } finally {
lock.unlock(); lock.unlock();
} }
@ -270,12 +276,12 @@ public final class Coordinator {
} }
private void getFaultsImpl(Map<String, CoordinatorFaultsResponse.FaultData> faultData, private void getFaultsImpl(Map<String, CoordinatorFaultsResponse.FaultData> faultData,
FaultSet faultSet, FaultState state) { FaultSet faultSet) {
for (Iterator<Fault> iter = faultSet.iterateByStart(); for (Iterator<Fault> iter = faultSet.iterateByStart();
iter.hasNext(); ) { iter.hasNext(); ) {
Fault fault = iter.next(); Fault fault = iter.next();
CoordinatorFaultsResponse.FaultData data = CoordinatorFaultsResponse.FaultData data =
new CoordinatorFaultsResponse.FaultData(fault.spec(), state); new CoordinatorFaultsResponse.FaultData(fault.spec(), fault.state());
faultData.put(fault.id(), data); faultData.put(fault.id(), data);
} }
} }
@ -330,13 +336,15 @@ public final class Coordinator {
JsonRestServer restServer = new JsonRestServer( JsonRestServer restServer = new JsonRestServer(
Node.Util.getTrogdorCoordinatorPort(platform.curNode())); Node.Util.getTrogdorCoordinatorPort(platform.curNode()));
CoordinatorRestResource resource = new CoordinatorRestResource(); CoordinatorRestResource resource = new CoordinatorRestResource();
Coordinator coordinator = new Coordinator(platform, Time.SYSTEM, final Coordinator coordinator = new Coordinator(platform, Time.SYSTEM,
restServer, resource); restServer, resource);
restServer.start(resource); restServer.start(resource);
Runtime.getRuntime().addShutdownHook(new Thread() { Runtime.getRuntime().addShutdownHook(new Thread() {
@Override @Override
public void run() { public void run() {
log.error("Coordinator shutting down..."); log.error("Running shutdown hook...");
coordinator.beginShutdown();
coordinator.waitForShutdown();
} }
}); });
coordinator.waitForShutdown(); coordinator.waitForShutdown();

View File

@ -22,7 +22,9 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.trogdor.agent.AgentClient; import org.apache.kafka.trogdor.agent.AgentClient;
import org.apache.kafka.trogdor.common.Node; import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform; import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.fault.DoneState;
import org.apache.kafka.trogdor.fault.Fault; import org.apache.kafka.trogdor.fault.Fault;
import org.apache.kafka.trogdor.fault.SendingState;
import org.apache.kafka.trogdor.rest.AgentStatusResponse; import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest; import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -187,6 +189,10 @@ class NodeManager {
} finally { } finally {
lock.unlock(); lock.unlock();
} }
SendingState state = (SendingState) fault.state();
if (state.completeSend(node.name())) {
fault.setState(new DoneState(now, ""));
}
return true; return true;
} }

View File

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.fault;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.Topology;
import java.util.Objects;
import java.util.Set;
public abstract class AbstractFault implements Fault {
private final String id;
private final FaultSpec spec;
private FaultState state;
public AbstractFault(String id, FaultSpec spec) {
this.id = id;
this.spec = spec;
this.state = new PendingState();
}
@Override
public final String id() {
return id;
}
@Override
public final FaultSpec spec() {
return spec;
}
@Override
public synchronized FaultState state() {
return state;
}
@Override
public synchronized void setState(FaultState state) {
this.state = state;
}
@Override
public final void activate(long now, Platform platform) throws Exception {
try {
handleActivation(now, platform);
setState(new RunningState(now));
} catch (Exception e) {
setState(new DoneState(now, e.getMessage()));
throw e;
}
}
protected abstract void handleActivation(long now, Platform platform) throws Exception;
@Override
public final void deactivate(long now, Platform platform) throws Exception {
try {
handleDeactivation(now, platform);
setState(new DoneState(now, ""));
} catch (Exception e) {
setState(new DoneState(now, e.getMessage()));
throw e;
}
}
protected abstract void handleDeactivation(long now, Platform platform) throws Exception;
@Override
public abstract Set<String> targetNodes(Topology topology);
@Override
public final boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
return toString().equals(o.toString());
}
@Override
public final int hashCode() {
return Objects.hashCode(toString());
}
@Override
public final String toString() {
return getClass().getSimpleName() + "(id=" + id +
", spec=" + JsonUtil.toJsonString(spec) +
", state=" + JsonUtil.toJsonString(state()) +
")";
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.fault;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* The state a fault is in on the agent or controller when it is completed,
* either normally or with an error.
*/
public class DoneState extends FaultState {
private final long doneMs;
private final String errorStr;
@JsonCreator
public DoneState(@JsonProperty("doneMs") long doneMs,
@JsonProperty("errorStr") String errorStr) {
this.doneMs = doneMs;
this.errorStr = errorStr;
}
@JsonProperty
public long doneMs() {
return doneMs;
}
@JsonProperty
public String errorStr() {
return errorStr;
}
}

View File

@ -34,17 +34,37 @@ public interface Fault {
FaultSpec spec(); FaultSpec spec();
/** /**
* Activate the fault. * Get the current fault state. Thread-safe.
*/ */
void activate(Platform platform) throws Exception; FaultState state();
/** /**
* Deactivate the fault. * Set the current fault state. Thread-safe.
*/ */
void deactivate(Platform platform) throws Exception; void setState(FaultState state);
/**
* Activate the fault. Will transition into RunningState or DoneState.
*
* @param now The current time in ms.
* @param platform The platform to use.
*/
void activate(long now, Platform platform) throws Exception;
/**
* Deactivate the fault. Will transition into DoneState.
*
* @param now The current time in ms.
* @param platform The platform to use.
*/
void deactivate(long now, Platform platform) throws Exception;
/** /**
* Get the nodes which this fault is targetting. * Get the nodes which this fault is targetting.
*
* @param topology The topology to use.
*
* @return A set of target node names.
*/ */
Set<String> targetNodes(Topology topology); Set<String> targetNodes(Topology topology);
} }

View File

@ -17,11 +17,35 @@
package org.apache.kafka.trogdor.fault; package org.apache.kafka.trogdor.fault;
import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.kafka.trogdor.common.JsonUtil;
import java.util.Objects;
@JsonFormat(shape = JsonFormat.Shape.STRING) @JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
public enum FaultState { include = JsonTypeInfo.As.PROPERTY,
PENDING, property = "stateName")
RUNNING, @JsonSubTypes({
DONE @JsonSubTypes.Type(value = DoneState.class, name = "done"),
@JsonSubTypes.Type(value = PendingState.class, name = "pending"),
@JsonSubTypes.Type(value = RunningState.class, name = "running"),
@JsonSubTypes.Type(value = SendingState.class, name = "sending")
})
public abstract class FaultState {
@Override
public final boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
return toString().equals(o.toString());
}
@Override
public final int hashCode() {
return Objects.hashCode(toString());
}
@Override
public final String toString() {
return JsonUtil.toJsonString(this);
}
} }

View File

@ -17,7 +17,6 @@
package org.apache.kafka.trogdor.fault; package org.apache.kafka.trogdor.fault;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Node; import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform; import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.Topology; import org.apache.kafka.trogdor.common.Topology;
@ -28,23 +27,20 @@ import java.net.InetAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
public class NetworkPartitionFault implements Fault { public class NetworkPartitionFault extends AbstractFault {
private static final Logger log = LoggerFactory.getLogger(NetworkPartitionFault.class); private static final Logger log = LoggerFactory.getLogger(NetworkPartitionFault.class);
private final String id;
private final NetworkPartitionFaultSpec spec;
private final List<Set<String>> partitions; private final List<Set<String>> partitions;
public NetworkPartitionFault(String id, FaultSpec spec) { public NetworkPartitionFault(String id, FaultSpec spec) {
this.id = id; super(id, spec);
this.spec = (NetworkPartitionFaultSpec) spec; NetworkPartitionFaultSpec faultSpec = (NetworkPartitionFaultSpec) spec;
this.partitions = new ArrayList<>(); this.partitions = new ArrayList<>();
HashSet<String> prevNodes = new HashSet<>(); HashSet<String> prevNodes = new HashSet<>();
for (List<String> partition : this.spec.partitions()) { for (List<String> partition : faultSpec.partitions()) {
for (String nodeName : partition) { for (String nodeName : partition) {
if (prevNodes.contains(nodeName)) { if (prevNodes.contains(nodeName)) {
throw new RuntimeException("Node " + nodeName + throw new RuntimeException("Node " + nodeName +
@ -57,23 +53,13 @@ public class NetworkPartitionFault implements Fault {
} }
@Override @Override
public String id() { protected void handleActivation(long now, Platform platform) throws Exception {
return id;
}
@Override
public FaultSpec spec() {
return spec;
}
@Override
public void activate(Platform platform) throws Exception {
log.info("Activating NetworkPartitionFault..."); log.info("Activating NetworkPartitionFault...");
runIptablesCommands(platform, "-A"); runIptablesCommands(platform, "-A");
} }
@Override @Override
public void deactivate(Platform platform) throws Exception { protected void handleDeactivation(long now, Platform platform) throws Exception {
log.info("Deactivating NetworkPartitionFault..."); log.info("Deactivating NetworkPartitionFault...");
runIptablesCommands(platform, "-D"); runIptablesCommands(platform, "-D");
} }
@ -107,24 +93,4 @@ public class NetworkPartitionFault implements Fault {
} }
return targetNodes; return targetNodes;
} }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
NetworkPartitionFault that = (NetworkPartitionFault) o;
return Objects.equals(id, that.id) &&
Objects.equals(spec, that.spec) &&
Objects.equals(partitions, that.partitions);
}
@Override
public int hashCode() {
return Objects.hash(id, spec, partitions);
}
@Override
public String toString() {
return "NoOpFault(id=" + id + ", spec=" + JsonUtil.toJsonString(spec) + ")";
}
} }

View File

@ -17,7 +17,6 @@
package org.apache.kafka.trogdor.fault; package org.apache.kafka.trogdor.fault;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Node; import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform; import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.Topology; import org.apache.kafka.trogdor.common.Topology;
@ -26,37 +25,22 @@ import org.slf4j.LoggerFactory;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Set; import java.util.Set;
public class NoOpFault implements Fault { public class NoOpFault extends AbstractFault {
private static final Logger log = LoggerFactory.getLogger(NoOpFault.class); private static final Logger log = LoggerFactory.getLogger(NoOpFault.class);
private final String id;
private final FaultSpec spec;
public NoOpFault(String id, FaultSpec spec) { public NoOpFault(String id, FaultSpec spec) {
this.id = id; super(id, spec);
this.spec = spec;
} }
@Override @Override
public String id() { protected void handleActivation(long now, Platform platform) throws Exception {
return id;
}
@Override
public FaultSpec spec() {
return spec;
}
@Override
public void activate(Platform platform) {
log.info("Activating NoOpFault..."); log.info("Activating NoOpFault...");
} }
@Override @Override
public void deactivate(Platform platform) { protected void handleDeactivation(long now, Platform platform) throws Exception {
log.info("Deactivating NoOpFault..."); log.info("Deactivating NoOpFault...");
} }
@ -70,23 +54,4 @@ public class NoOpFault implements Fault {
} }
return set; return set;
} }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
NoOpFault that = (NoOpFault) o;
return Objects.equals(id, that.id) &&
Objects.equals(spec, that.spec);
}
@Override
public int hashCode() {
return Objects.hash(id, spec);
}
@Override
public String toString() {
return "NoOpFault(id=" + id + ", spec=" + JsonUtil.toJsonString(spec) + ")";
}
} }

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.fault;
import com.fasterxml.jackson.annotation.JsonCreator;
/**
* The state a fault is in on the agent or controller when we haven't yet done
* anything with it.
*/
public class PendingState extends FaultState {
@JsonCreator
public PendingState() {
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.fault;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* The state a fault is in on the agent when it is running.
*/
public class RunningState extends FaultState {
private final long startedMs;
@JsonCreator
public RunningState(@JsonProperty("startedMs") long startedMs) {
this.startedMs = startedMs;
}
@JsonProperty
public long startedMs() {
return startedMs;
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.trogdor.fault;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.common.utils.Utils;
import java.util.TreeMap;
import java.util.Set;
/**
* The state a fault is in on the controller when it is scheduled to be sent to several agents.
*/
public class SendingState extends FaultState {
private final TreeMap<String, Boolean> nodes;
private int remainingNodes;
public SendingState(@JsonProperty("nodeNames") Set<String> nodeNames) {
this.nodes = new TreeMap<>();
for (String nodeName : nodeNames) {
nodes.put(nodeName, false);
}
remainingNodes = nodeNames.size();
}
@JsonProperty
public synchronized Set<String> nodeNames() {
return nodes.keySet();
}
/**
* Complete a send operation.
*
* @param nodeName The name of the node we sent to.
* @return True if there are no more send operations left.
*/
public synchronized boolean completeSend(String nodeName) {
if (!nodes.containsKey(nodeName)) {
throw new RuntimeException("Node " + nodeName + " was not to supposed to " +
"receive this fault. The fault was scheduled on nodes: " +
Utils.join(nodes.keySet(), ", "));
}
if (nodes.put(nodeName, true)) {
throw new RuntimeException("Node " + nodeName + " already received this fault.");
}
remainingNodes--;
return remainingNodes == 0;
}
}

View File

@ -38,7 +38,7 @@ public class FaultDataMap {
@JsonCreator @JsonCreator
public FaultData(@JsonProperty("spec") FaultSpec spec, public FaultData(@JsonProperty("spec") FaultSpec spec,
@JsonProperty("status") FaultState state) { @JsonProperty("state") FaultState state) {
this.spec = spec; this.spec = spec;
this.state = state; this.state = state;
} }

View File

@ -24,8 +24,9 @@ import org.apache.kafka.trogdor.basic.BasicPlatform;
import org.apache.kafka.trogdor.basic.BasicTopology; import org.apache.kafka.trogdor.basic.BasicTopology;
import org.apache.kafka.trogdor.common.ExpectedFaults; import org.apache.kafka.trogdor.common.ExpectedFaults;
import org.apache.kafka.trogdor.common.Node; import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.fault.FaultState; import org.apache.kafka.trogdor.fault.DoneState;
import org.apache.kafka.trogdor.fault.NoOpFaultSpec; import org.apache.kafka.trogdor.fault.NoOpFaultSpec;
import org.apache.kafka.trogdor.fault.RunningState;
import org.apache.kafka.trogdor.rest.AgentFaultsResponse; import org.apache.kafka.trogdor.rest.AgentFaultsResponse;
import org.apache.kafka.trogdor.rest.AgentStatusResponse; import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest; import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest;
@ -129,36 +130,32 @@ public class AgentTest {
final NoOpFaultSpec fooSpec = new NoOpFaultSpec(10, 2); final NoOpFaultSpec fooSpec = new NoOpFaultSpec(10, 2);
client.putFault(new CreateAgentFaultRequest("foo", fooSpec)); client.putFault(new CreateAgentFaultRequest("foo", fooSpec));
new ExpectedFaults().addFault("foo", FaultState.RUNNING).waitFor(client); new ExpectedFaults().addFault("foo", new RunningState(0)).waitFor(client);
time.sleep(3);
new ExpectedFaults().addFault("foo", new DoneState(3, "")).waitFor(client);
final NoOpFaultSpec barSpec = new NoOpFaultSpec(20, 3); final NoOpFaultSpec barSpec = new NoOpFaultSpec(20, 3);
client.putFault(new CreateAgentFaultRequest("bar", barSpec)); client.putFault(new CreateAgentFaultRequest("bar", barSpec));
time.sleep(11);
new ExpectedFaults(). new ExpectedFaults().
addFault("foo", FaultState.RUNNING). addFault("foo", new DoneState(3, "")).
addFault("bar", FaultState.RUNNING). addFault("bar", new RunningState(3)).
waitFor(client); waitFor(client);
final NoOpFaultSpec bazSpec = new NoOpFaultSpec(1, 11); time.sleep(4);
final NoOpFaultSpec bazSpec = new NoOpFaultSpec(1, 2);
client.putFault(new CreateAgentFaultRequest("baz", bazSpec)); client.putFault(new CreateAgentFaultRequest("baz", bazSpec));
new ExpectedFaults(). new ExpectedFaults().
addFault("foo", FaultState.RUNNING). addFault("foo", new DoneState(3, "")).
addFault("bar", FaultState.RUNNING). addFault("bar", new DoneState(7, "")).
addFault("baz", FaultState.RUNNING). addFault("baz", new RunningState(7)).
waitFor(client); waitFor(client);
time.sleep(2); time.sleep(3);
new ExpectedFaults(). new ExpectedFaults().
addFault("foo", FaultState.DONE). addFault("foo", new DoneState(3, "")).
addFault("bar", FaultState.RUNNING). addFault("bar", new DoneState(7, "")).
addFault("baz", FaultState.DONE). addFault("baz", new DoneState(10, "")).
waitFor(client);
time.sleep(100);
new ExpectedFaults().
addFault("foo", FaultState.DONE).
addFault("bar", FaultState.DONE).
addFault("baz", FaultState.DONE).
waitFor(client); waitFor(client);
agent.beginShutdown(); agent.beginShutdown();

View File

@ -27,9 +27,11 @@ import org.apache.kafka.trogdor.common.CapturingCommandRunner;
import org.apache.kafka.trogdor.common.ExpectedFaults; import org.apache.kafka.trogdor.common.ExpectedFaults;
import org.apache.kafka.trogdor.common.MiniTrogdorCluster; import org.apache.kafka.trogdor.common.MiniTrogdorCluster;
import org.apache.kafka.trogdor.fault.FaultState; import org.apache.kafka.trogdor.fault.DoneState;
import org.apache.kafka.trogdor.fault.NetworkPartitionFaultSpec; import org.apache.kafka.trogdor.fault.NetworkPartitionFaultSpec;
import org.apache.kafka.trogdor.fault.NoOpFaultSpec; import org.apache.kafka.trogdor.fault.NoOpFaultSpec;
import org.apache.kafka.trogdor.fault.PendingState;
import org.apache.kafka.trogdor.fault.RunningState;
import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse; import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
import org.apache.kafka.trogdor.rest.CreateCoordinatorFaultRequest; import org.apache.kafka.trogdor.rest.CreateCoordinatorFaultRequest;
import org.junit.Rule; import org.junit.Rule;
@ -69,16 +71,16 @@ public class CoordinatorTest {
build()) { build()) {
new ExpectedFaults().waitFor(cluster.coordinatorClient()); new ExpectedFaults().waitFor(cluster.coordinatorClient());
NoOpFaultSpec noOpFaultSpec = new NoOpFaultSpec(10, 2); NoOpFaultSpec noOpFaultSpec = new NoOpFaultSpec(1, 2);
cluster.coordinatorClient().putFault( cluster.coordinatorClient().putFault(
new CreateCoordinatorFaultRequest("fault1", noOpFaultSpec)); new CreateCoordinatorFaultRequest("fault1", noOpFaultSpec));
new ExpectedFaults(). new ExpectedFaults().
addFault("fault1", noOpFaultSpec, FaultState.PENDING). addFault("fault1", noOpFaultSpec, new PendingState()).
waitFor(cluster.coordinatorClient()); waitFor(cluster.coordinatorClient());
time.sleep(11); time.sleep(2);
new ExpectedFaults(). new ExpectedFaults().
addFault("fault1", noOpFaultSpec, FaultState.DONE). addFault("fault1", noOpFaultSpec, new DoneState(2, "")).
waitFor(cluster.coordinatorClient()); waitFor(cluster.coordinatorClient());
} }
} }
@ -99,7 +101,7 @@ public class CoordinatorTest {
NoOpFaultSpec noOpFaultSpec = new NoOpFaultSpec(10, 2); NoOpFaultSpec noOpFaultSpec = new NoOpFaultSpec(10, 2);
coordinatorClient.putFault(new CreateCoordinatorFaultRequest("fault1", noOpFaultSpec)); coordinatorClient.putFault(new CreateCoordinatorFaultRequest("fault1", noOpFaultSpec));
new ExpectedFaults(). new ExpectedFaults().
addFault("fault1", noOpFaultSpec, FaultState.PENDING). addFault("fault1", noOpFaultSpec, new PendingState()).
waitFor(coordinatorClient); waitFor(coordinatorClient);
new ExpectedFaults(). new ExpectedFaults().
waitFor(agentClient1). waitFor(agentClient1).
@ -107,10 +109,10 @@ public class CoordinatorTest {
time.sleep(10); time.sleep(10);
new ExpectedFaults(). new ExpectedFaults().
addFault("fault1", noOpFaultSpec, FaultState.DONE). addFault("fault1", noOpFaultSpec, new DoneState(10, "")).
waitFor(coordinatorClient); waitFor(coordinatorClient);
new ExpectedFaults(). new ExpectedFaults().
addFault("fault1", noOpFaultSpec, FaultState.RUNNING). addFault("fault1", noOpFaultSpec, new RunningState(10)).
waitFor(agentClient1). waitFor(agentClient1).
waitFor(agentClient2); waitFor(agentClient2);
} }