mirror of https://github.com/apache/kafka.git
Tweaked behavior of stop_node, clean_node to generally fail fast
This commit is contained in:
parent
7f7c3e0e68
commit
42dcdb1d66
|
@ -138,8 +138,8 @@ class ConsoleConsumer(BackgroundThreadService):
|
|||
super(ConsoleConsumer, self).start_node(node)
|
||||
|
||||
def stop_node(self, node):
|
||||
node.account.kill_process("java", allow_fail=True)
|
||||
node.account.kill_process("java", allow_fail=False)
|
||||
|
||||
def clean_node(self, node):
|
||||
node.account.ssh("rm -rf /mnt/console_consumer.properties /mnt/consumer.log", allow_fail=True)
|
||||
node.account.ssh("rm -rf /mnt/console_consumer.properties /mnt/consumer.log", allow_fail=False)
|
||||
|
||||
|
|
|
@ -82,17 +82,17 @@ class KafkaService(Service):
|
|||
leader = self.leader(topic, partition)
|
||||
self.signal_node(leader, sig)
|
||||
|
||||
def stop_node(self, node, clean_shutdown=True, allow_fail=True):
|
||||
def stop_node(self, node, clean_shutdown=True):
|
||||
pids = self.pids(node)
|
||||
sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
|
||||
|
||||
for pid in pids:
|
||||
node.account.signal(pid, sig, allow_fail=allow_fail)
|
||||
node.account.signal(pid, sig, allow_fail=False)
|
||||
|
||||
node.account.ssh("rm -f /mnt/kafka.pid", allow_fail=True)
|
||||
node.account.ssh("rm -f /mnt/kafka.pid", allow_fail=False)
|
||||
|
||||
def clean_node(self, node):
|
||||
node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log")
|
||||
node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log", allow_fail=False)
|
||||
|
||||
def create_topic(self, topic_cfg):
|
||||
node = self.nodes[0] # any node is fine here
|
||||
|
|
|
@ -96,7 +96,7 @@ class VerifiableProducer(BackgroundThreadService):
|
|||
self.worker_threads[self.idx(node) - 1].join()
|
||||
|
||||
def clean_node(self, node):
|
||||
node.account.ssh("rm -rf /mnt/producer.log")
|
||||
node.account.ssh("rm -rf /mnt/producer.log", allow_fail=False)
|
||||
|
||||
def try_parse_json(self, string):
|
||||
"""Try to parse a string as json. Return None if not parseable."""
|
||||
|
|
|
@ -49,18 +49,14 @@ class ZookeeperService(Service):
|
|||
|
||||
time.sleep(5) # give it some time to start
|
||||
|
||||
def stop_node(self, node, allow_fail=True):
|
||||
# This uses Kafka-REST's stop service script because it's better behaved
|
||||
# (knows how to wait) and sends SIGTERM instead of
|
||||
# zookeeper-stop-server.sh's SIGINT. We don't actually care about clean
|
||||
# shutdown here, so it's ok to use the bigger hammer
|
||||
def stop_node(self, node):
|
||||
idx = self.idx(node)
|
||||
self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname))
|
||||
node.account.ssh("/opt/kafka-rest/bin/kafka-rest-stop-service zookeeper", allow_fail=allow_fail)
|
||||
node.account.kill_process("zookeeper", allow_fail=False)
|
||||
|
||||
def clean_node(self, node, allow_fail=True):
|
||||
def clean_node(self, node):
|
||||
self.logger.info("Cleaning ZK node %d on %s", self.idx(node), node.account.hostname)
|
||||
node.account.ssh("rm -rf /mnt/zookeeper /mnt/zookeeper.properties /mnt/zk.log", allow_fail=allow_fail)
|
||||
node.account.ssh("rm -rf /mnt/zookeeper /mnt/zookeeper.properties /mnt/zk.log", allow_fail=False)
|
||||
|
||||
def connect_setting(self):
|
||||
return ','.join([node.account.hostname + ':2181' for node in self.nodes])
|
||||
|
|
Loading…
Reference in New Issue