KAFKA-2825: Add controller failover to existing replication tests

Author: Anna Povzner <anna@confluent.io>

Reviewers: Geoff Anderson

Closes #618 from apovzner/kafka_2825_01
This commit is contained in:
Anna Povzner 2015-12-03 10:43:44 -08:00 committed by Guozhang Wang
parent b09663eeec
commit 5b5f6bbe68
3 changed files with 81 additions and 39 deletions

View File

@ -321,21 +321,9 @@ class KafkaService(JmxMixin, Service):
def leader(self, topic, partition=0):
""" Get the leader replica for the given topic and partition.
"""
kafka_dir = KAFKA_TRUNK
cmd = "/opt/%s/bin/kafka-run-class.sh kafka.tools.ZooKeeperMainWrapper -server %s " %\
(kafka_dir, self.zk.connect_setting())
cmd += "get /brokers/topics/%s/partitions/%d/state" % (topic, partition)
self.logger.debug(cmd)
node = self.zk.nodes[0]
self.logger.debug("Querying zookeeper to find leader replica for topic %s: \n%s" % (cmd, topic))
partition_state = None
for line in node.account.ssh_capture(cmd):
# loop through all lines in the output, but only hold on to the first match
if partition_state is None:
match = re.match("^({.+})$", line)
if match is not None:
partition_state = match.groups()[0]
self.logger.debug("Querying zookeeper to find leader replica for topic: \n%s" % (topic))
zk_path = "/brokers/topics/%s/partitions/%d/state" % (topic, partition)
partition_state = self.zk.query(zk_path)
if partition_state is None:
raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition))
@ -358,4 +346,20 @@ class KafkaService(JmxMixin, Service):
if not port_mapping.open:
raise ValueError("We are retrieving bootstrap servers for the port: %s which is not currently open. - " % str(port_mapping))
return ','.join([node.account.hostname + ":" + str(port_mapping.number) for node in self.nodes])
return ','.join([node.account.hostname + ":" + str(port_mapping.number) for node in self.nodes])
def controller(self):
""" Get the controller node
"""
self.logger.debug("Querying zookeeper to find controller broker")
controller_info = self.zk.query("/controller")
if controller_info is None:
raise Exception("Error finding controller info")
controller_info = json.loads(controller_info)
self.logger.debug(controller_info)
controller_idx = int(controller_info["brokerid"])
self.logger.info("Controller's ID: %d" % (controller_idx))
return self.get_node(controller_idx)

View File

@ -16,10 +16,11 @@
from ducktape.services.service import Service
from kafkatest.services.kafka.directory import kafka_dir
from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK
import subprocess
import time
import re
class ZookeeperService(Service):
@ -83,3 +84,22 @@ class ZookeeperService(Service):
def connect_setting(self):
return ','.join([node.account.hostname + ':2181' for node in self.nodes])
def query(self, path):
"""
Queries zookeeper for data associated with 'path' and returns all fields in the schema
"""
kafka_dir = KAFKA_TRUNK
cmd = "/opt/%s/bin/kafka-run-class.sh kafka.tools.ZooKeeperMainWrapper -server %s get %s" % \
(kafka_dir, self.connect_setting(), path)
self.logger.debug(cmd)
node = self.nodes[0]
result = None
for line in node.account.ssh_capture(cmd):
# loop through all lines in the output, but only hold on to the first match
if result is None:
match = re.match("^({.+})$", line)
if match is not None:
result = match.groups()[0]
return result

View File

@ -25,41 +25,55 @@ from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
import signal
def broker_node(test, broker_type):
""" Discover node of requested type. For leader type, discovers leader for our topic and partition 0
"""
if broker_type == "leader":
node = test.kafka.leader(test.topic, partition=0)
elif broker_type == "controller":
node = test.kafka.controller()
else:
raise Exception("Unexpected broker type %s." % (broker_type))
def clean_shutdown(test):
"""Discover leader node for our topic and shut it down cleanly."""
test.kafka.signal_leader(test.topic, partition=0, sig=signal.SIGTERM)
return node
def clean_shutdown(test, broker_type):
"""Discover broker node of requested type and shut it down cleanly.
"""
node = broker_node(test, broker_type)
test.kafka.signal_node(node, sig=signal.SIGTERM)
def hard_shutdown(test):
"""Discover leader node for our topic and shut it down with a hard kill."""
test.kafka.signal_leader(test.topic, partition=0, sig=signal.SIGKILL)
def hard_shutdown(test, broker_type):
"""Discover broker node of requested type and shut it down with a hard kill."""
node = broker_node(test, broker_type)
test.kafka.signal_node(node, sig=signal.SIGKILL)
def clean_bounce(test):
def clean_bounce(test, broker_type):
"""Chase the leader of one partition and restart it cleanly."""
for i in range(5):
prev_leader_node = test.kafka.leader(topic=test.topic, partition=0)
test.kafka.restart_node(prev_leader_node, clean_shutdown=True)
prev_broker_node = broker_node(test, broker_type)
test.kafka.restart_node(prev_broker_node, clean_shutdown=True)
def hard_bounce(test):
def hard_bounce(test, broker_type):
"""Chase the leader and restart it with a hard kill."""
for i in range(5):
prev_leader_node = test.kafka.leader(topic=test.topic, partition=0)
test.kafka.signal_node(prev_leader_node, sig=signal.SIGKILL)
prev_broker_node = broker_node(test, broker_type)
test.kafka.signal_node(prev_broker_node, sig=signal.SIGKILL)
# Since this is a hard kill, we need to make sure the process is down and that
# zookeeper and the broker cluster have registered the loss of the leader.
# Waiting for a new leader to be elected on the topic-partition is a reasonable heuristic for this.
# zookeeper and the broker cluster have registered the loss of the leader/controller.
# Waiting for a new leader for the topic-partition/controller to be elected is a reasonable heuristic for this.
def leader_changed():
current_leader = test.kafka.leader(topic=test.topic, partition=0)
return current_leader is not None and current_leader != prev_leader_node
def role_reassigned():
current_elected_broker = broker_node(test, broker_type)
return current_elected_broker is not None and current_elected_broker != prev_broker_node
wait_until(lambda: len(test.kafka.pids(prev_leader_node)) == 0, timeout_sec=5)
wait_until(leader_changed, timeout_sec=10, backoff_sec=.5)
test.kafka.start_node(prev_leader_node)
wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0, timeout_sec=5)
wait_until(role_reassigned, timeout_sec=10, backoff_sec=.5)
test.kafka.start_node(prev_broker_node)
failures = {
"clean_shutdown": clean_shutdown,
@ -108,8 +122,12 @@ class ReplicationTest(ProduceConsumeValidateTest):
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
broker_type=["leader"],
security_protocol=["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"])
def test_replication_with_broker_failure(self, failure_mode, security_protocol):
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
broker_type=["controller"],
security_protocol=["PLAINTEXT", "SASL_SSL"])
def test_replication_with_broker_failure(self, failure_mode, security_protocol, broker_type):
"""Replication tests.
These tests verify that replication provides simple durability guarantees by checking that data acked by
brokers is still available for consumption in the face of various failure scenarios.
@ -130,4 +148,4 @@ class ReplicationTest(ProduceConsumeValidateTest):
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)
self.kafka.start()
self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self))
self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self, broker_type))