diff --git a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py index 86c19f9017c..67d587792d2 100644 --- a/tests/kafkatest/tests/streams/streams_broker_bounce_test.py +++ b/tests/kafkatest/tests/streams/streams_broker_bounce_test.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from ducktape.utils.util import wait_until from ducktape.tests.test import Test from ducktape.mark.resource import cluster from ducktape.mark import matrix @@ -51,10 +52,35 @@ def hard_shutdown(test, topic, broker_type): node = broker_node(test, topic, broker_type) signal_node(test, node, signal.SIGKILL) +def clean_bounce(test, topic, broker_type): + """Chase the leader of one partition and restart it cleanly a few times (5 times).""" + for i in range(5): + prev_broker_node = broker_node(test, topic, broker_type) + test.kafka.restart_node(prev_broker_node, clean_shutdown=True) + + +def hard_bounce(test, topic, broker_type): + """Chase the leader and restart it with a hard kill. Do this a few times (5).""" + for i in range(5): + prev_broker_node = broker_node(test, topic, broker_type) + test.kafka.signal_node(prev_broker_node, sig=signal.SIGKILL) + + # Since this is a hard kill, we need to make sure the process is down and that + # zookeeper has registered the loss by expiring the broker's session timeout. + + wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0 and not test.kafka.is_registered(prev_broker_node), + timeout_sec=test.kafka.zk_session_timeout + 5, + err_msg="Failed to see timely deregistration of hard-killed broker %s" % str(prev_broker_node.account)) + + test.kafka.start_node(prev_broker_node) + + failures = { "clean_shutdown": clean_shutdown, - "hard_shutdown": hard_shutdown + "hard_shutdown": hard_shutdown, + "clean_bounce": clean_bounce, + "hard_bounce": hard_bounce } class StreamsBrokerBounceTest(Test): @@ -156,7 +182,7 @@ class StreamsBrokerBounceTest(Test): return data @cluster(num_nodes=7) - @matrix(failure_mode=["clean_shutdown", "hard_shutdown"], + @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], broker_type=["leader", "controller"], sleep_time_secs=[120]) def test_broker_type_bounce(self, failure_mode, broker_type, sleep_time_secs): @@ -193,9 +219,9 @@ class StreamsBrokerBounceTest(Test): self.fail_broker_type(failure_mode, broker_type); return self.collect_results(sleep_time_secs) - + @cluster(num_nodes=7) - @matrix(failure_mode=["clean_shutdown", "hard_shutdown"], + @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], num_failures=[2]) def test_many_brokers_bounce(self, failure_mode, num_failures): """ @@ -211,3 +237,21 @@ class StreamsBrokerBounceTest(Test): self.fail_many_brokers(failure_mode, num_failures); return self.collect_results(120) + + @cluster(num_nodes=7) + @matrix(failure_mode=["clean_bounce", "hard_bounce"], + num_failures=[3]) + def test_all_brokers_bounce(self, failure_mode, num_failures): + """ + Start a smoke test client, then kill a few brokers and ensure data is still received + Record if records are delivered + """ + self.setup_system() + + # Sleep to allow test to run for a bit + time.sleep(120) + + # Fail brokers + self.fail_many_brokers(failure_mode, num_failures); + + return self.collect_results(120)