KAFKA-3694; Ensure broker Zk deregistration prior to restart in ReplicationTest

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Geoff Anderson <geoff@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #1365 from hachikuji/KAFKA-3694
This commit is contained in:
Jason Gustafson 2016-05-11 23:48:46 +01:00 committed by Ismael Juma
parent b28bc57a1f
commit f892f0ca6d
4 changed files with 21 additions and 9 deletions

View File

@ -66,7 +66,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT, def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT,
client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
authorizer_class_name=None, topics=None, version=TRUNK, quota_config=None, jmx_object_names=None, authorizer_class_name=None, topics=None, version=TRUNK, quota_config=None, jmx_object_names=None,
jmx_attributes=[], zk_connect_timeout=5000): jmx_attributes=[], zk_connect_timeout=5000, zk_session_timeout=6000):
""" """
:type context :type context
:type zk: ZookeeperService :type zk: ZookeeperService
@ -99,6 +99,11 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
# for this constructor. # for this constructor.
self.zk_connect_timeout = zk_connect_timeout self.zk_connect_timeout = zk_connect_timeout
# Also allow the session timeout to be provided explicitly,
# primarily so that test cases can depend on it when waiting
# e.g. brokers to deregister after a hard kill.
self.zk_session_timeout = zk_session_timeout
self.port_mappings = { self.port_mappings = {
'PLAINTEXT': Port('PLAINTEXT', 9092, False), 'PLAINTEXT': Port('PLAINTEXT', 9092, False),
'SSL': Port('SSL', 9093, False), 'SSL': Port('SSL', 9093, False),
@ -513,6 +518,15 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.logger.info("Controller's ID: %d" % (controller_idx)) self.logger.info("Controller's ID: %d" % (controller_idx))
return self.get_node(controller_idx) return self.get_node(controller_idx)
def is_registered(self, node):
"""
Check whether a broker is registered in Zookeeper
"""
self.logger.debug("Querying zookeeper to see if broker %s is registered", node)
broker_info = self.zk.query("/brokers/ids/%s" % self.idx(node))
self.logger.debug("Broker info: %s", broker_info)
return broker_info is not None
def get_offset_shell(self, topic, partitions, max_wait_ms, offsets, time): def get_offset_shell(self, topic, partitions, max_wait_ms, offsets, time):
node = self.nodes[0] node = self.nodes[0]

View File

@ -72,6 +72,7 @@ zookeeper.set.acl={{zk_set_acl}}
{% endif %} {% endif %}
zookeeper.connection.timeout.ms={{ zk_connect_timeout }} zookeeper.connection.timeout.ms={{ zk_connect_timeout }}
zookeeper.session.timeout.ms={{ zk_session_timeout }}
{% if replica_lag is defined %} {% if replica_lag is defined %}
replica.lag.time.max.ms={{replica_lag}} replica.lag.time.max.ms={{replica_lag}}

View File

@ -65,15 +65,12 @@ def hard_bounce(test, broker_type):
test.kafka.signal_node(prev_broker_node, sig=signal.SIGKILL) test.kafka.signal_node(prev_broker_node, sig=signal.SIGKILL)
# Since this is a hard kill, we need to make sure the process is down and that # Since this is a hard kill, we need to make sure the process is down and that
# zookeeper and the broker cluster have registered the loss of the leader/controller. # zookeeper has registered the loss by expiring the broker's session timeout.
# Waiting for a new leader for the topic-partition/controller to be elected is a reasonable heuristic for this.
def role_reassigned(): wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0 and not test.kafka.is_registered(prev_broker_node),
current_elected_broker = broker_node(test, broker_type) timeout_sec=test.kafka.zk_session_timeout + 5,
return current_elected_broker is not None and current_elected_broker != prev_broker_node err_msg="Failed to see timely deregistration of hard-killed broker %s" % str(prev_broker_node.account))
wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0, timeout_sec=5)
wait_until(role_reassigned, timeout_sec=10, backoff_sec=.5)
test.kafka.start_node(prev_broker_node) test.kafka.start_node(prev_broker_node)
failures = { failures = {

View File

@ -35,7 +35,7 @@ class ProduceConsumeValidateTest(Test):
def start_producer_and_consumer(self): def start_producer_and_consumer(self):
# Start background producer and consumer # Start background producer and consumer
self.producer.start() self.producer.start()
wait_until(lambda: self.producer.num_acked > 5, timeout_sec=10, wait_until(lambda: self.producer.num_acked > 5, timeout_sec=20,
err_msg="Producer failed to start in a reasonable amount of time.") err_msg="Producer failed to start in a reasonable amount of time.")
self.consumer.start() self.consumer.start()
wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, timeout_sec=60, wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, timeout_sec=60,