diff --git a/tests/kafkatest/services/trogdor/__init__.py b/tests/kafkatest/services/trogdor/__init__.py new file mode 100644 index 00000000000..ec2014340d7 --- /dev/null +++ b/tests/kafkatest/services/trogdor/__init__.py @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/kafkatest/services/trogdor/fault_spec.py b/tests/kafkatest/services/trogdor/fault_spec.py new file mode 100644 index 00000000000..97687657a38 --- /dev/null +++ b/tests/kafkatest/services/trogdor/fault_spec.py @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + + +class FaultSpec(object): + """ + The base class for a fault specification. + + MAX_DURATION_MS The longest duration we should use for a fault specification. + """ + + MAX_DURATION_MS=10000000 + + def __init__(self, start_ms, duration_ms): + """ + Create a new fault specification. + + :param start_ms: The start time in milliseconds since the epoch. + :param duration_ms: The duration in milliseconds. + """ + self.start_ms = start_ms + self.duration_ms = duration_ms + + def message(self): + """ + Return a message suitable for sending to the Trogdor daemon. + """ + raise NotImplemented + + def __str__(self): + return json.dumps(self.message()) diff --git a/tests/kafkatest/services/trogdor/network_partition_fault_spec.py b/tests/kafkatest/services/trogdor/network_partition_fault_spec.py new file mode 100644 index 00000000000..deb5c5612e4 --- /dev/null +++ b/tests/kafkatest/services/trogdor/network_partition_fault_spec.py @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kafkatest.services.trogdor.fault_spec import FaultSpec + + +class NetworkPartitionFaultSpec(FaultSpec): + """ + The specification for a network partition fault. + + Network partition faults fracture the network into different partitions + that cannot communicate with each other. + """ + + def __init__(self, start_ms, duration_ms, partitions): + """ + Create a new NetworkPartitionFaultSpec. + + :param start_ms: The start time, as described in fault_spec.py + :param duration_ms: The duration in milliseconds. + :param partitions: An array of arrays describing the partitions. + The inner arrays may contain either node names, + or ClusterNode objects. + """ + super(NetworkPartitionFaultSpec, self).__init__(start_ms, duration_ms) + self.partitions = [] + for partition in partitions: + nodes = [] + for obj in partition: + if isinstance(obj, basestring): + nodes.append(obj) + else: + nodes.append(obj.name) + self.partitions.append(nodes) + + def message(self): + return { + "class": "org.apache.kafka.trogdor.fault.NetworkPartitionFaultSpec", + "startMs": self.start_ms, + "durationMs": self.duration_ms, + "partitions": self.partitions, + } diff --git a/tests/kafkatest/services/trogdor/no_op_fault_spec.py b/tests/kafkatest/services/trogdor/no_op_fault_spec.py new file mode 100644 index 00000000000..82e9713a78f --- /dev/null +++ b/tests/kafkatest/services/trogdor/no_op_fault_spec.py @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kafkatest.services.trogdor.fault_spec import FaultSpec + + +class NoOpFaultSpec(FaultSpec): + """ + The specification for a nop-op fault. + + No-op faults are used to test the fault injector. They don't do anything, + but must be propagated to all fault injector daemons. + """ + + def __init__(self, start_ms, duration_ms): + """ + Create a new NoOpFault. + + :param start_ms: The start time, as described in fault_spec.py + :param duration_ms: The duration in milliseconds. + """ + super(NoOpFaultSpec, self).__init__(start_ms, duration_ms) + + def message(self): + return { + "class": "org.apache.kafka.trogdor.fault.NoOpFaultSpec", + "startMs": self.start_ms, + "durationMs": self.duration_ms, + } diff --git a/tests/kafkatest/services/trogdor/templates/log4j.properties b/tests/kafkatest/services/trogdor/templates/log4j.properties new file mode 100644 index 00000000000..252668e3dab --- /dev/null +++ b/tests/kafkatest/services/trogdor/templates/log4j.properties @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=DEBUG, mylogger +log4j.logger.kafka=DEBUG +log4j.logger.org.apache.kafka=DEBUG +log4j.logger.org.eclipse=INFO +log4j.appender.mylogger=org.apache.log4j.FileAppender +log4j.appender.mylogger.File={{ log_path }} +log4j.appender.mylogger.layout=org.apache.log4j.PatternLayout +log4j.appender.mylogger.layout.ConversionPattern=[%d] %p %m (%c)%n diff --git a/tests/kafkatest/services/trogdor/trogdor.py b/tests/kafkatest/services/trogdor/trogdor.py new file mode 100644 index 00000000000..8b05e9963d5 --- /dev/null +++ b/tests/kafkatest/services/trogdor/trogdor.py @@ -0,0 +1,255 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os.path +import requests +from requests.adapters import HTTPAdapter +from requests.packages.urllib3 import Retry + +from ducktape.services.service import Service +from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin + + +class TrogdorService(KafkaPathResolverMixin, Service): + """ + A ducktape service for running the trogdor fault injection daemons. + + Attributes: + PERSISTENT_ROOT The root filesystem path to store service files under. + COORDINATOR_STDOUT_STDERR The path where we store the coordinator's stdout/stderr output. + AGENT_STDOUT_STDERR The path where we store the agents's stdout/stderr output. + COORDINATOR_LOG The path where we store the coordinator's log4j output. + AGENT_LOG The path where we store the agent's log4j output. + AGENT_LOG4J_PROPERTIES The path to the agent log4j.properties file for log config. + COORDINATOR_LOG4J_PROPERTIES The path to the coordinator log4j.properties file for log config. + CONFIG_PATH The path to the trogdor configuration file. + DEFAULT_AGENT_PORT The default port to use for trogdor_agent daemons. + DEFAULT_COORDINATOR_PORT The default port to use for trogdor_coordinator daemons. + REQUEST_TIMEOUT The request timeout in seconds to use for REST requests. + REQUEST_HEADERS The request headers to use when communicating with trogdor. + """ + + PERSISTENT_ROOT="/mnt/trogdor" + COORDINATOR_STDOUT_STDERR = os.path.join(PERSISTENT_ROOT, "trogdor-coordinator-stdout-stderr.log") + AGENT_STDOUT_STDERR = os.path.join(PERSISTENT_ROOT, "trogdor-agent-stdout-stderr.log") + COORDINATOR_LOG = os.path.join(PERSISTENT_ROOT, "trogdor-coordinator.log") + AGENT_LOG = os.path.join(PERSISTENT_ROOT, "trogdor-agent.log") + COORDINATOR_LOG4J_PROPERTIES = os.path.join(PERSISTENT_ROOT, "trogdor-coordinator-log4j.properties") + AGENT_LOG4J_PROPERTIES = os.path.join(PERSISTENT_ROOT, "trogdor-agent-log4j.properties") + CONFIG_PATH = os.path.join(PERSISTENT_ROOT, "trogdor.conf") + DEFAULT_AGENT_PORT=8888 + DEFAULT_COORDINATOR_PORT=8889 + REQUEST_TIMEOUT=5 + REQUEST_HEADERS = {"Content-type": "application/json"} + + logs = { + "trogdor_coordinator_stdout_stderr": { + "path": COORDINATOR_STDOUT_STDERR, + "collect_default": True}, + "trogdor_agent_stdout_stderr": { + "path": AGENT_STDOUT_STDERR, + "collect_default": True}, + "trogdor_coordinator_log": { + "path": COORDINATOR_LOG, + "collect_default": True}, + "trogdor_agent_log": { + "path": AGENT_LOG, + "collect_default": True}, + } + + def __init__(self, context, agent_nodes, agent_port=DEFAULT_AGENT_PORT, + coordinator_port=DEFAULT_COORDINATOR_PORT): + """ + Create a Trogdor service. + + :param context: The test context. + :param agent_nodes: The nodes to run the agents on. + :param agent_port: The port to use for the trogdor_agent daemons. + :param coordinator_port: The port to use for the trogdor_coordinator daemons. + """ + Service.__init__(self, context, num_nodes=1) + self.coordinator_node = self.nodes[0] + if (len(agent_nodes) == 0): + raise RuntimeError("You must supply at least one node to run the service on.") + for agent_node in agent_nodes: + self.nodes.append(agent_node) + self.agent_port = agent_port + self.coordinator_port = coordinator_port + + def free(self): + # We only want to deallocate the coordinator node, not the agent nodes. So we + # change self.nodes to include only the coordinator node, and then invoke + # the base class' free method. + if self.coordinator_node is not None: + self.nodes = [self.coordinator_node] + self.coordinator_node = None + Service.free(self) + + def _create_config_dict(self): + """ + Create a dictionary with the Trogdor configuration. + + :return: The configuration dictionary. + """ + dict_nodes = {} + for node in self.nodes: + dict_nodes[node.name] = { + "hostname": node.account.ssh_hostname, + "trogdor.agent.port": self.agent_port, + } + dict_nodes[self.coordinator_node.name]["trogdor.coordinator.port"] = self.coordinator_port + return { + "platform": "org.apache.kafka.trogdor.basic.BasicPlatform", + "nodes": dict_nodes, + } + + def start_node(self, node): + node.account.mkdirs(TrogdorService.PERSISTENT_ROOT) + + # Create the configuration file on the node. + str = json.dumps(self._create_config_dict(), indent=2) + self.logger.info("Creating configuration file %s with %s" % (TrogdorService.CONFIG_PATH, str)) + node.account.create_file(TrogdorService.CONFIG_PATH, str) + + if self.is_coordinator(node): + self._start_coordinator_node(node) + else: + self._start_agent_node(node) + + def _start_coordinator_node(self, node): + node.account.create_file(TrogdorService.COORDINATOR_LOG4J_PROPERTIES, + self.render('log4j.properties', + log_path=TrogdorService.COORDINATOR_LOG)) + self._start_trogdor_daemon("coordinator", TrogdorService.COORDINATOR_STDOUT_STDERR, + TrogdorService.COORDINATOR_LOG4J_PROPERTIES, + TrogdorService.COORDINATOR_LOG, node) + self.logger.info("Started trogdor coordinator on %s." % node.name) + + def _start_agent_node(self, node): + node.account.create_file(TrogdorService.AGENT_LOG4J_PROPERTIES, + self.render('log4j.properties', + log_path=TrogdorService.AGENT_LOG)) + self._start_trogdor_daemon("agent", TrogdorService.AGENT_STDOUT_STDERR, + TrogdorService.AGENT_LOG4J_PROPERTIES, + TrogdorService.AGENT_LOG, node) + self.logger.info("Started trogdor agent on %s." % node.name) + + def _start_trogdor_daemon(self, daemon_name, stdout_stderr_capture_path, + log4j_properties_path, log_path, node): + cmd = "export KAFKA_LOG4J_OPTS='-Dlog4j.configuration=file:%s'; " % log4j_properties_path + cmd += "%s %s --%s.config %s --node-name %s 1>> %s 2>> %s &" % \ + (self.path.script("trogdor.sh", node), + daemon_name, + daemon_name, + TrogdorService.CONFIG_PATH, + node.name, + stdout_stderr_capture_path, + stdout_stderr_capture_path) + node.account.ssh(cmd) + with node.account.monitor_log(log_path) as monitor: + monitor.wait_until("Starting main service thread.", timeout_sec=30, backoff_sec=.25, + err_msg=("%s on %s didn't finish startup" % (daemon_name, node.name))) + + def wait_node(self, node, timeout_sec=None): + if self.is_coordinator(node): + return len(node.account.java_pids(self.coordinator_class_name())) == 0 + else: + return len(node.account.java_pids(self.agent_class_name())) == 0 + + def stop_node(self, node): + """Halt trogdor processes on this node.""" + if self.is_coordinator(node): + node.account.kill_java_processes(self.coordinator_class_name()) + else: + node.account.kill_java_processes(self.agent_class_name()) + + def clean_node(self, node): + """Clean up persistent state on this node - e.g. service logs, configuration files etc.""" + self.stop_node(node) + node.account.ssh("rm -rf -- %s" % TrogdorService.PERSISTENT_ROOT) + + def _coordinator_url(self, path): + return "http://%s:%d/coordinator/%s" % \ + (self.coordinator_node.account.ssh_hostname, self.coordinator_port, path) + + def request_session(self): + """ + Creates a new request session which will retry for a while. + """ + session = requests.Session() + session.mount('http://', + HTTPAdapter(max_retries=Retry(total=4, backoff_factor=0.3))) + return session + + def _coordinator_put(self, path, message): + """ + Make a PUT request to the Trogdor coordinator. + + :param path: The URL path to use. + :param message: The message object to send. + :return: The response as an object. + """ + url = self._coordinator_url(path) + self.logger.info("PUT %s %s" % (url, message)) + response = self.request_session().put(url, json=message, + timeout=TrogdorService.REQUEST_TIMEOUT, + headers=TrogdorService.REQUEST_HEADERS) + response.raise_for_status() + return response.json() + + def _coordinator_get(self, path, message): + """ + Make a GET request to the Trogdor coordinator. + + :param path: The URL path to use. + :param message: The message object to send. + :return: The response as an object. + """ + url = self._coordinator_url(path) + self.logger.info("GET %s %s" % (url, message)) + response = self.request_session().get(url, json=message, + timeout=TrogdorService.REQUEST_TIMEOUT, + headers=TrogdorService.REQUEST_HEADERS) + response.raise_for_status() + return response.json() + + def create_fault(self, id, spec): + """ + Create a new fault. + + :param id: The fault id. + :param spec: The fault spec. + """ + self._coordinator_put("fault", { "id": id, "spec": spec.message()}) + + def get_faults(self): + """ + Get the faults which are on the coordinator. + + :returns: A map of fault id strings to fault data objects. + Fault data objects contain a 'spec' field with the spec + and a 'state' field with the state. + """ + return self._coordinator_get("faults", {}) + + def is_coordinator(self, node): + return node == self.coordinator_node + + def agent_class_name(self): + return "org.apache.kafka.trogdor.agent.Agent" + + def coordinator_class_name(self): + return "org.apache.kafka.trogdor.coordinator.Coordinator" diff --git a/tests/kafkatest/tests/tools/trogdor_test.py b/tests/kafkatest/tests/tools/trogdor_test.py new file mode 100644 index 00000000000..026ecafdcec --- /dev/null +++ b/tests/kafkatest/tests/tools/trogdor_test.py @@ -0,0 +1,97 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kafkatest.services.trogdor.network_partition_fault_spec import NetworkPartitionFaultSpec + +from ducktape.cluster.cluster_spec import ClusterSpec +from ducktape.mark.resource import cluster +from ducktape.tests.test import Test +from ducktape.utils.util import wait_until +from kafkatest.services.trogdor.fault_spec import FaultSpec +from kafkatest.services.trogdor.no_op_fault_spec import NoOpFaultSpec +from kafkatest.services.trogdor.trogdor import TrogdorService +from kafkatest.utils import node_is_reachable + + +class TrogdorTest(Test): + """ + Tests the Trogdor fault injection daemon in isolation. + """ + + def __init__(self, test_context): + super(TrogdorTest, self).__init__(test_context) + + def set_up_trogdor(self, num_agent_nodes): + self.agent_nodes = self.test_context.cluster.alloc(ClusterSpec.simple_linux(num_agent_nodes)) + self.trogdor = TrogdorService(context=self.test_context, agent_nodes=self.agent_nodes) + for agent_node in self.agent_nodes: + agent_node.account.logger = self.trogdor.logger + self.trogdor.start() + + def setUp(self): + self.trogdor = None + self.agent_nodes = None + + def tearDown(self): + if self.trogdor is not None: + self.trogdor.stop() + self.trogdor = None + if self.agent_nodes is not None: + self.test_context.cluster.free(self.agent_nodes) + self.agent_nodes = None + + @cluster(num_nodes=4) + def test_trogdor_service(self): + """ + Test that we can bring up Trogdor and create a no-op fault. + """ + self.set_up_trogdor(3) + spec = NoOpFaultSpec(0, FaultSpec.MAX_DURATION_MS) + self.trogdor.create_fault("myfault", spec) + def check_for_faults(): + faults = self.trogdor.get_faults() + self.logger.info("faults = %s" % faults) + return "myfault" in faults + wait_until(lambda: check_for_faults, + timeout_sec=10, backoff_sec=.2, err_msg="Failed to read back myfault.") + + @cluster(num_nodes=4) + def test_network_partition_fault(self): + """ + Test that the network partition fault results in a true network partition between nodes. + """ + self.set_up_trogdor(3) + spec = NetworkPartitionFaultSpec(0, FaultSpec.MAX_DURATION_MS, + [[self.agent_nodes[0]], self.agent_nodes[1:]]) + assert 2 == len(spec.partitions) + assert [self.agent_nodes[0].name] == spec.partitions[0] + assert [self.agent_nodes[1].name, self.agent_nodes[2].name] == spec.partitions[1] + self.trogdor.create_fault("partition0", spec) + def verify_nodes_partitioned(): + if node_is_reachable(self.agent_nodes[0], self.agent_nodes[1]): + return False + if node_is_reachable(self.agent_nodes[1], self.agent_nodes[0]): + return False + if node_is_reachable(self.agent_nodes[2], self.agent_nodes[0]): + return False + return True + wait_until(lambda: verify_nodes_partitioned, + timeout_sec=10, backoff_sec=.2, err_msg="Failed to verify that the nodes were partitioned.") + if not node_is_reachable(self.agent_nodes[0], self.agent_nodes[0]): + raise RuntimeError("Node 0 must be reachable from itself.") + if not node_is_reachable(self.agent_nodes[1], self.agent_nodes[2]): + raise RuntimeError("Node 2 must be reachable from node 1.") + if not node_is_reachable(self.agent_nodes[2], self.agent_nodes[1]): + raise RuntimeError("Node 1 must be reachable from node 2.") diff --git a/tests/kafkatest/utils/__init__.py b/tests/kafkatest/utils/__init__.py index 66180162bca..8c473bfa635 100644 --- a/tests/kafkatest/utils/__init__.py +++ b/tests/kafkatest/utils/__init__.py @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -from util import kafkatest_version, is_version, is_int, is_int_with_prefix +from util import kafkatest_version, is_version, is_int, is_int_with_prefix, node_is_reachable diff --git a/tests/kafkatest/utils/util.py b/tests/kafkatest/utils/util.py index d82903a3e61..dd2027331dd 100644 --- a/tests/kafkatest/utils/util.py +++ b/tests/kafkatest/utils/util.py @@ -103,3 +103,12 @@ def is_int_with_prefix(msg): "prefix dot integer value, but one of the two parts (before or after dot) " "are not integers. Message: %s" % (msg)) +def node_is_reachable(src_node, dst_node): + """ + Returns true if a node is unreachable from another node. + + :param src_node: The source node to check from reachability from. + :param dst_node: The destination node to check for reachability to. + :return: True only if dst is reachable from src. + """ + return 0 == src_node.account.ssh("nc -w 3 -z %s 22" % dst_node.account.hostname, allow_fail=True) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java index 0cb01ad6b39..0ddf4c1fc57 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java @@ -29,7 +29,7 @@ import org.apache.kafka.trogdor.common.Platform; import org.apache.kafka.trogdor.fault.Fault; import org.apache.kafka.trogdor.fault.FaultSet; import org.apache.kafka.trogdor.fault.FaultSpec; -import org.apache.kafka.trogdor.fault.FaultState; +import org.apache.kafka.trogdor.fault.RunningState; import org.apache.kafka.trogdor.rest.AgentFaultsResponse; import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest; import org.apache.kafka.trogdor.rest.JsonRestServer; @@ -140,7 +140,8 @@ public final class Agent { Iterator running = runningFaults.iterateByEnd(); while (running.hasNext()) { Fault fault = running.next(); - long endMs = fault.spec().startMs() + fault.spec().durationMs(); + RunningState state = (RunningState) fault.state(); + long endMs = state.startedMs() + fault.spec().durationMs(); if (now < endMs) { nextWakeMs = Math.min(nextWakeMs, endMs); break; @@ -154,7 +155,7 @@ public final class Agent { for (Fault fault: toStart) { try { log.debug("Activating fault " + fault); - fault.activate(platform); + fault.activate(now, platform); started.add(fault); } catch (Throwable e) { log.error("Error activating fault " + fault.id(), e); @@ -164,7 +165,7 @@ public final class Agent { for (Fault fault: toEnd) { try { log.debug("Deactivating fault " + fault); - fault.deactivate(platform); + fault.deactivate(now, platform); } catch (Throwable e) { log.error("Error deactivating fault " + fault.id(), e); } finally { @@ -200,10 +201,35 @@ public final class Agent { } finally { log.info("AgentRunnable shutting down."); restServer.stop(); + int numDeactivated = deactivateRunningFaults(); + log.info("AgentRunnable deactivated {} fault(s).", numDeactivated); } } } + private int deactivateRunningFaults() { + long now = time.milliseconds(); + int numDeactivated = 0; + lock.lock(); + try { + for (Iterator iter = runningFaults.iterateByStart(); iter.hasNext(); ) { + Fault fault = iter.next(); + try { + numDeactivated++; + iter.remove(); + fault.deactivate(now, platform); + } catch (Exception e) { + log.error("Got exception while deactivating {}", fault, e); + } finally { + doneFaults.add(fault); + } + } + } finally { + lock.unlock(); + } + return numDeactivated; + } + /** * Create a new Agent. * @@ -257,9 +283,9 @@ public final class Agent { Map faultData = new TreeMap<>(); lock.lock(); try { - updateFaultsResponse(faultData, pendingFaults, FaultState.PENDING); - updateFaultsResponse(faultData, runningFaults, FaultState.RUNNING); - updateFaultsResponse(faultData, doneFaults, FaultState.DONE); + updateFaultsResponse(faultData, pendingFaults); + updateFaultsResponse(faultData, runningFaults); + updateFaultsResponse(faultData, doneFaults); } finally { lock.unlock(); } @@ -267,12 +293,12 @@ public final class Agent { } private void updateFaultsResponse(Map faultData, - FaultSet faultSet, FaultState state) { + FaultSet faultSet) { for (Iterator iter = faultSet.iterateByStart(); iter.hasNext(); ) { Fault fault = iter.next(); AgentFaultsResponse.FaultData data = - new AgentFaultsResponse.FaultData(fault.spec(), state); + new AgentFaultsResponse.FaultData(fault.spec(), fault.state()); faultData.put(fault.id(), data); } } @@ -326,12 +352,14 @@ public final class Agent { JsonRestServer restServer = new JsonRestServer(Node.Util.getTrogdorAgentPort(platform.curNode())); AgentRestResource resource = new AgentRestResource(); - Agent agent = new Agent(platform, Time.SYSTEM, restServer, resource); + final Agent agent = new Agent(platform, Time.SYSTEM, restServer, resource); restServer.start(resource); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { - log.error("Agent shutting down..."); + log.error("Running shutdown hook..."); + agent.beginShutdown(); + agent.waitForShutdown(); } }); agent.waitForShutdown(); diff --git a/tools/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java b/tools/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java index 9bd2cb9cf6b..85270cdcd8d 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java @@ -23,6 +23,8 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.trogdor.common.Node; import org.apache.kafka.trogdor.common.Platform; import org.apache.kafka.trogdor.common.Topology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; @@ -30,6 +32,8 @@ import java.io.IOException; * Defines a cluster topology */ public class BasicPlatform implements Platform { + private static final Logger log = LoggerFactory.getLogger(BasicPlatform.class); + private final Node curNode; private final BasicTopology topology; private final CommandRunner commandRunner; @@ -41,7 +45,14 @@ public class BasicPlatform implements Platform { public static class ShellCommandRunner implements CommandRunner { @Override public String run(Node curNode, String[] command) throws IOException { - return Shell.execCommand(command); + try { + String result = Shell.execCommand(command); + log.info("RUN: {}. RESULT: [{}]", Utils.join(command, " "), result); + return result; + } catch (RuntimeException | IOException e) { + log.info("RUN: {}. ERROR: [{}]", Utils.join(command, " "), e.getMessage()); + throw e; + } } } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java index 2759a366123..8f3563bd687 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java @@ -27,10 +27,11 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.trogdor.common.Node; import org.apache.kafka.trogdor.common.Platform; +import org.apache.kafka.trogdor.fault.DoneState; import org.apache.kafka.trogdor.fault.Fault; import org.apache.kafka.trogdor.fault.FaultSet; import org.apache.kafka.trogdor.fault.FaultSpec; -import org.apache.kafka.trogdor.fault.FaultState; +import org.apache.kafka.trogdor.fault.SendingState; import org.apache.kafka.trogdor.rest.CoordinatorFaultsResponse; import org.apache.kafka.trogdor.rest.CreateCoordinatorFaultRequest; import org.apache.kafka.trogdor.rest.JsonRestServer; @@ -115,9 +116,9 @@ public final class Coordinator { private final FaultSet pendingFaults = new FaultSet(); /** - * The set of faults which have been sent to the agents. + * The set of faults which have been sent to the NodeManagers. */ - private final FaultSet doneFaults = new FaultSet(); + private final FaultSet processedFaults = new FaultSet(); class CoordinatorRunnable implements Runnable { @Override @@ -152,13 +153,13 @@ public final class Coordinator { } toStart.add(fault); iter.remove(); - doneFaults.add(fault); + processedFaults.add(fault); } } finally { lock.unlock(); } for (Fault fault: toStart) { - startFault(fault); + startFault(now, fault); } } } catch (Throwable t) { @@ -209,7 +210,7 @@ public final class Coordinator { return this.restServer.port(); } - private void startFault(Fault fault) { + private void startFault(long now, Fault fault) { Set affectedNodes = fault.targetNodes(platform.topology()); Set affectedManagers = new HashSet<>(); Set nonexistentNodes = new HashSet<>(); @@ -229,6 +230,11 @@ public final class Coordinator { } log.info("Applying fault {} on {} node(s): {}", fault.id(), nodeNames.size(), Utils.join(nodeNames, ", ")); + if (nodeNames.isEmpty()) { + fault.setState(new DoneState(now, "")); + } else { + fault.setState(new SendingState(nodeNames)); + } for (NodeManager nodeManager : affectedManagers) { nodeManager.enqueueFault(fault); } @@ -261,8 +267,8 @@ public final class Coordinator { Map faultData = new TreeMap<>(); lock.lock(); try { - getFaultsImpl(faultData, pendingFaults, FaultState.PENDING); - getFaultsImpl(faultData, doneFaults, FaultState.DONE); + getFaultsImpl(faultData, pendingFaults); + getFaultsImpl(faultData, processedFaults); } finally { lock.unlock(); } @@ -270,12 +276,12 @@ public final class Coordinator { } private void getFaultsImpl(Map faultData, - FaultSet faultSet, FaultState state) { + FaultSet faultSet) { for (Iterator iter = faultSet.iterateByStart(); iter.hasNext(); ) { Fault fault = iter.next(); CoordinatorFaultsResponse.FaultData data = - new CoordinatorFaultsResponse.FaultData(fault.spec(), state); + new CoordinatorFaultsResponse.FaultData(fault.spec(), fault.state()); faultData.put(fault.id(), data); } } @@ -330,13 +336,15 @@ public final class Coordinator { JsonRestServer restServer = new JsonRestServer( Node.Util.getTrogdorCoordinatorPort(platform.curNode())); CoordinatorRestResource resource = new CoordinatorRestResource(); - Coordinator coordinator = new Coordinator(platform, Time.SYSTEM, + final Coordinator coordinator = new Coordinator(platform, Time.SYSTEM, restServer, resource); restServer.start(resource); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { - log.error("Coordinator shutting down..."); + log.error("Running shutdown hook..."); + coordinator.beginShutdown(); + coordinator.waitForShutdown(); } }); coordinator.waitForShutdown(); diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java index 04d714c3a6b..ee711907beb 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java @@ -22,7 +22,9 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.trogdor.agent.AgentClient; import org.apache.kafka.trogdor.common.Node; import org.apache.kafka.trogdor.common.Platform; +import org.apache.kafka.trogdor.fault.DoneState; import org.apache.kafka.trogdor.fault.Fault; +import org.apache.kafka.trogdor.fault.SendingState; import org.apache.kafka.trogdor.rest.AgentStatusResponse; import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest; import org.slf4j.Logger; @@ -187,6 +189,10 @@ class NodeManager { } finally { lock.unlock(); } + SendingState state = (SendingState) fault.state(); + if (state.completeSend(node.name())) { + fault.setState(new DoneState(now, "")); + } return true; } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/AbstractFault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/AbstractFault.java new file mode 100644 index 00000000000..2d63b82f801 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/AbstractFault.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.fault; + +import org.apache.kafka.trogdor.common.JsonUtil; +import org.apache.kafka.trogdor.common.Platform; +import org.apache.kafka.trogdor.common.Topology; + +import java.util.Objects; +import java.util.Set; + +public abstract class AbstractFault implements Fault { + private final String id; + private final FaultSpec spec; + private FaultState state; + + public AbstractFault(String id, FaultSpec spec) { + this.id = id; + this.spec = spec; + this.state = new PendingState(); + } + + @Override + public final String id() { + return id; + } + + @Override + public final FaultSpec spec() { + return spec; + } + + @Override + public synchronized FaultState state() { + return state; + } + + @Override + public synchronized void setState(FaultState state) { + this.state = state; + } + + @Override + public final void activate(long now, Platform platform) throws Exception { + try { + handleActivation(now, platform); + setState(new RunningState(now)); + } catch (Exception e) { + setState(new DoneState(now, e.getMessage())); + throw e; + } + } + + protected abstract void handleActivation(long now, Platform platform) throws Exception; + + @Override + public final void deactivate(long now, Platform platform) throws Exception { + try { + handleDeactivation(now, platform); + setState(new DoneState(now, "")); + } catch (Exception e) { + setState(new DoneState(now, e.getMessage())); + throw e; + } + } + + protected abstract void handleDeactivation(long now, Platform platform) throws Exception; + + @Override + public abstract Set targetNodes(Topology topology); + + @Override + public final boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + return toString().equals(o.toString()); + } + + @Override + public final int hashCode() { + return Objects.hashCode(toString()); + } + + @Override + public final String toString() { + return getClass().getSimpleName() + "(id=" + id + + ", spec=" + JsonUtil.toJsonString(spec) + + ", state=" + JsonUtil.toJsonString(state()) + + ")"; + } +} diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/DoneState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/DoneState.java new file mode 100644 index 00000000000..222caf0d0ab --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/DoneState.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.fault; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * The state a fault is in on the agent or controller when it is completed, + * either normally or with an error. + */ +public class DoneState extends FaultState { + private final long doneMs; + private final String errorStr; + + @JsonCreator + public DoneState(@JsonProperty("doneMs") long doneMs, + @JsonProperty("errorStr") String errorStr) { + this.doneMs = doneMs; + this.errorStr = errorStr; + } + + @JsonProperty + public long doneMs() { + return doneMs; + } + + @JsonProperty + public String errorStr() { + return errorStr; + } +} diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java index 9f1a19afbf6..e44d56a5893 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java @@ -34,17 +34,37 @@ public interface Fault { FaultSpec spec(); /** - * Activate the fault. + * Get the current fault state. Thread-safe. */ - void activate(Platform platform) throws Exception; + FaultState state(); /** - * Deactivate the fault. + * Set the current fault state. Thread-safe. */ - void deactivate(Platform platform) throws Exception; + void setState(FaultState state); + + /** + * Activate the fault. Will transition into RunningState or DoneState. + * + * @param now The current time in ms. + * @param platform The platform to use. + */ + void activate(long now, Platform platform) throws Exception; + + /** + * Deactivate the fault. Will transition into DoneState. + * + * @param now The current time in ms. + * @param platform The platform to use. + */ + void deactivate(long now, Platform platform) throws Exception; /** * Get the nodes which this fault is targetting. + * + * @param topology The topology to use. + * + * @return A set of target node names. */ Set targetNodes(Topology topology); } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java index bec0792a127..cba8419126a 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java @@ -17,11 +17,35 @@ package org.apache.kafka.trogdor.fault; -import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.kafka.trogdor.common.JsonUtil; +import java.util.Objects; -@JsonFormat(shape = JsonFormat.Shape.STRING) -public enum FaultState { - PENDING, - RUNNING, - DONE +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, + include = JsonTypeInfo.As.PROPERTY, + property = "stateName") +@JsonSubTypes({ + @JsonSubTypes.Type(value = DoneState.class, name = "done"), + @JsonSubTypes.Type(value = PendingState.class, name = "pending"), + @JsonSubTypes.Type(value = RunningState.class, name = "running"), + @JsonSubTypes.Type(value = SendingState.class, name = "sending") + }) +public abstract class FaultState { + @Override + public final boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + return toString().equals(o.toString()); + } + + @Override + public final int hashCode() { + return Objects.hashCode(toString()); + } + + @Override + public final String toString() { + return JsonUtil.toJsonString(this); + } } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java index 7524af17f7a..cf3270a32b0 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java @@ -17,7 +17,6 @@ package org.apache.kafka.trogdor.fault; -import org.apache.kafka.trogdor.common.JsonUtil; import org.apache.kafka.trogdor.common.Node; import org.apache.kafka.trogdor.common.Platform; import org.apache.kafka.trogdor.common.Topology; @@ -28,23 +27,20 @@ import java.net.InetAddress; import java.util.ArrayList; import java.util.HashSet; import java.util.List; -import java.util.Objects; import java.util.Set; import java.util.TreeSet; -public class NetworkPartitionFault implements Fault { +public class NetworkPartitionFault extends AbstractFault { private static final Logger log = LoggerFactory.getLogger(NetworkPartitionFault.class); - private final String id; - private final NetworkPartitionFaultSpec spec; private final List> partitions; public NetworkPartitionFault(String id, FaultSpec spec) { - this.id = id; - this.spec = (NetworkPartitionFaultSpec) spec; + super(id, spec); + NetworkPartitionFaultSpec faultSpec = (NetworkPartitionFaultSpec) spec; this.partitions = new ArrayList<>(); HashSet prevNodes = new HashSet<>(); - for (List partition : this.spec.partitions()) { + for (List partition : faultSpec.partitions()) { for (String nodeName : partition) { if (prevNodes.contains(nodeName)) { throw new RuntimeException("Node " + nodeName + @@ -57,23 +53,13 @@ public class NetworkPartitionFault implements Fault { } @Override - public String id() { - return id; - } - - @Override - public FaultSpec spec() { - return spec; - } - - @Override - public void activate(Platform platform) throws Exception { + protected void handleActivation(long now, Platform platform) throws Exception { log.info("Activating NetworkPartitionFault..."); runIptablesCommands(platform, "-A"); } @Override - public void deactivate(Platform platform) throws Exception { + protected void handleDeactivation(long now, Platform platform) throws Exception { log.info("Deactivating NetworkPartitionFault..."); runIptablesCommands(platform, "-D"); } @@ -107,24 +93,4 @@ public class NetworkPartitionFault implements Fault { } return targetNodes; } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - NetworkPartitionFault that = (NetworkPartitionFault) o; - return Objects.equals(id, that.id) && - Objects.equals(spec, that.spec) && - Objects.equals(partitions, that.partitions); - } - - @Override - public int hashCode() { - return Objects.hash(id, spec, partitions); - } - - @Override - public String toString() { - return "NoOpFault(id=" + id + ", spec=" + JsonUtil.toJsonString(spec) + ")"; - } } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java index c7ac4de2e6e..70b49652105 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java @@ -17,7 +17,6 @@ package org.apache.kafka.trogdor.fault; -import org.apache.kafka.trogdor.common.JsonUtil; import org.apache.kafka.trogdor.common.Node; import org.apache.kafka.trogdor.common.Platform; import org.apache.kafka.trogdor.common.Topology; @@ -26,37 +25,22 @@ import org.slf4j.LoggerFactory; import java.util.HashSet; import java.util.Map; -import java.util.Objects; import java.util.Set; -public class NoOpFault implements Fault { +public class NoOpFault extends AbstractFault { private static final Logger log = LoggerFactory.getLogger(NoOpFault.class); - private final String id; - private final FaultSpec spec; - public NoOpFault(String id, FaultSpec spec) { - this.id = id; - this.spec = spec; + super(id, spec); } @Override - public String id() { - return id; - } - - @Override - public FaultSpec spec() { - return spec; - } - - @Override - public void activate(Platform platform) { + protected void handleActivation(long now, Platform platform) throws Exception { log.info("Activating NoOpFault..."); } @Override - public void deactivate(Platform platform) { + protected void handleDeactivation(long now, Platform platform) throws Exception { log.info("Deactivating NoOpFault..."); } @@ -70,23 +54,4 @@ public class NoOpFault implements Fault { } return set; } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - NoOpFault that = (NoOpFault) o; - return Objects.equals(id, that.id) && - Objects.equals(spec, that.spec); - } - - @Override - public int hashCode() { - return Objects.hash(id, spec); - } - - @Override - public String toString() { - return "NoOpFault(id=" + id + ", spec=" + JsonUtil.toJsonString(spec) + ")"; - } } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/PendingState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/PendingState.java new file mode 100644 index 00000000000..57c8e88b574 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/PendingState.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.fault; + +import com.fasterxml.jackson.annotation.JsonCreator; + +/** + * The state a fault is in on the agent or controller when we haven't yet done + * anything with it. + */ +public class PendingState extends FaultState { + @JsonCreator + public PendingState() { + } +} diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/RunningState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/RunningState.java new file mode 100644 index 00000000000..1b81bf53d2e --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/RunningState.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.fault; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * The state a fault is in on the agent when it is running. + */ +public class RunningState extends FaultState { + private final long startedMs; + + @JsonCreator + public RunningState(@JsonProperty("startedMs") long startedMs) { + this.startedMs = startedMs; + } + + @JsonProperty + public long startedMs() { + return startedMs; + } +} diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/SendingState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/SendingState.java new file mode 100644 index 00000000000..edfbed23430 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/SendingState.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.fault; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.common.utils.Utils; + +import java.util.TreeMap; +import java.util.Set; + +/** + * The state a fault is in on the controller when it is scheduled to be sent to several agents. + */ +public class SendingState extends FaultState { + private final TreeMap nodes; + private int remainingNodes; + + public SendingState(@JsonProperty("nodeNames") Set nodeNames) { + this.nodes = new TreeMap<>(); + for (String nodeName : nodeNames) { + nodes.put(nodeName, false); + } + remainingNodes = nodeNames.size(); + } + + @JsonProperty + public synchronized Set nodeNames() { + return nodes.keySet(); + } + + /** + * Complete a send operation. + * + * @param nodeName The name of the node we sent to. + * @return True if there are no more send operations left. + */ + public synchronized boolean completeSend(String nodeName) { + if (!nodes.containsKey(nodeName)) { + throw new RuntimeException("Node " + nodeName + " was not to supposed to " + + "receive this fault. The fault was scheduled on nodes: " + + Utils.join(nodes.keySet(), ", ")); + } + if (nodes.put(nodeName, true)) { + throw new RuntimeException("Node " + nodeName + " already received this fault."); + } + remainingNodes--; + return remainingNodes == 0; + } +} diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java index 773d5197759..b2f7c91ba38 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java @@ -38,7 +38,7 @@ public class FaultDataMap { @JsonCreator public FaultData(@JsonProperty("spec") FaultSpec spec, - @JsonProperty("status") FaultState state) { + @JsonProperty("state") FaultState state) { this.spec = spec; this.state = state; } diff --git a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java index c587e44e38c..53ef8498f1d 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java @@ -24,8 +24,9 @@ import org.apache.kafka.trogdor.basic.BasicPlatform; import org.apache.kafka.trogdor.basic.BasicTopology; import org.apache.kafka.trogdor.common.ExpectedFaults; import org.apache.kafka.trogdor.common.Node; -import org.apache.kafka.trogdor.fault.FaultState; +import org.apache.kafka.trogdor.fault.DoneState; import org.apache.kafka.trogdor.fault.NoOpFaultSpec; +import org.apache.kafka.trogdor.fault.RunningState; import org.apache.kafka.trogdor.rest.AgentFaultsResponse; import org.apache.kafka.trogdor.rest.AgentStatusResponse; import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest; @@ -129,36 +130,32 @@ public class AgentTest { final NoOpFaultSpec fooSpec = new NoOpFaultSpec(10, 2); client.putFault(new CreateAgentFaultRequest("foo", fooSpec)); - new ExpectedFaults().addFault("foo", FaultState.RUNNING).waitFor(client); + new ExpectedFaults().addFault("foo", new RunningState(0)).waitFor(client); + + time.sleep(3); + new ExpectedFaults().addFault("foo", new DoneState(3, "")).waitFor(client); final NoOpFaultSpec barSpec = new NoOpFaultSpec(20, 3); client.putFault(new CreateAgentFaultRequest("bar", barSpec)); - time.sleep(11); new ExpectedFaults(). - addFault("foo", FaultState.RUNNING). - addFault("bar", FaultState.RUNNING). + addFault("foo", new DoneState(3, "")). + addFault("bar", new RunningState(3)). waitFor(client); - final NoOpFaultSpec bazSpec = new NoOpFaultSpec(1, 11); + time.sleep(4); + final NoOpFaultSpec bazSpec = new NoOpFaultSpec(1, 2); client.putFault(new CreateAgentFaultRequest("baz", bazSpec)); new ExpectedFaults(). - addFault("foo", FaultState.RUNNING). - addFault("bar", FaultState.RUNNING). - addFault("baz", FaultState.RUNNING). + addFault("foo", new DoneState(3, "")). + addFault("bar", new DoneState(7, "")). + addFault("baz", new RunningState(7)). waitFor(client); - time.sleep(2); + time.sleep(3); new ExpectedFaults(). - addFault("foo", FaultState.DONE). - addFault("bar", FaultState.RUNNING). - addFault("baz", FaultState.DONE). - waitFor(client); - - time.sleep(100); - new ExpectedFaults(). - addFault("foo", FaultState.DONE). - addFault("bar", FaultState.DONE). - addFault("baz", FaultState.DONE). + addFault("foo", new DoneState(3, "")). + addFault("bar", new DoneState(7, "")). + addFault("baz", new DoneState(10, "")). waitFor(client); agent.beginShutdown(); diff --git a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java index d7a8fa0d111..75109d2a490 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java @@ -27,9 +27,11 @@ import org.apache.kafka.trogdor.common.CapturingCommandRunner; import org.apache.kafka.trogdor.common.ExpectedFaults; import org.apache.kafka.trogdor.common.MiniTrogdorCluster; -import org.apache.kafka.trogdor.fault.FaultState; +import org.apache.kafka.trogdor.fault.DoneState; import org.apache.kafka.trogdor.fault.NetworkPartitionFaultSpec; import org.apache.kafka.trogdor.fault.NoOpFaultSpec; +import org.apache.kafka.trogdor.fault.PendingState; +import org.apache.kafka.trogdor.fault.RunningState; import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse; import org.apache.kafka.trogdor.rest.CreateCoordinatorFaultRequest; import org.junit.Rule; @@ -69,16 +71,16 @@ public class CoordinatorTest { build()) { new ExpectedFaults().waitFor(cluster.coordinatorClient()); - NoOpFaultSpec noOpFaultSpec = new NoOpFaultSpec(10, 2); + NoOpFaultSpec noOpFaultSpec = new NoOpFaultSpec(1, 2); cluster.coordinatorClient().putFault( new CreateCoordinatorFaultRequest("fault1", noOpFaultSpec)); new ExpectedFaults(). - addFault("fault1", noOpFaultSpec, FaultState.PENDING). + addFault("fault1", noOpFaultSpec, new PendingState()). waitFor(cluster.coordinatorClient()); - time.sleep(11); + time.sleep(2); new ExpectedFaults(). - addFault("fault1", noOpFaultSpec, FaultState.DONE). + addFault("fault1", noOpFaultSpec, new DoneState(2, "")). waitFor(cluster.coordinatorClient()); } } @@ -99,7 +101,7 @@ public class CoordinatorTest { NoOpFaultSpec noOpFaultSpec = new NoOpFaultSpec(10, 2); coordinatorClient.putFault(new CreateCoordinatorFaultRequest("fault1", noOpFaultSpec)); new ExpectedFaults(). - addFault("fault1", noOpFaultSpec, FaultState.PENDING). + addFault("fault1", noOpFaultSpec, new PendingState()). waitFor(coordinatorClient); new ExpectedFaults(). waitFor(agentClient1). @@ -107,10 +109,10 @@ public class CoordinatorTest { time.sleep(10); new ExpectedFaults(). - addFault("fault1", noOpFaultSpec, FaultState.DONE). + addFault("fault1", noOpFaultSpec, new DoneState(10, "")). waitFor(coordinatorClient); new ExpectedFaults(). - addFault("fault1", noOpFaultSpec, FaultState.RUNNING). + addFault("fault1", noOpFaultSpec, new RunningState(10)). waitFor(agentClient1). waitFor(agentClient2); }