KAFKA-5725; More failure testing

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Damian Guy <damian.guy@gmail.com>

Closes #3656 from enothereska/minor-add-more-tests
This commit is contained in:
Eno Thereska 2017-08-18 18:13:02 +01:00 committed by Damian Guy
parent 75c78e9692
commit 3e22c1c04a
1 changed files with 48 additions and 4 deletions

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.utils.util import wait_until
from ducktape.tests.test import Test
from ducktape.mark.resource import cluster
from ducktape.mark import matrix
@ -51,10 +52,35 @@ def hard_shutdown(test, topic, broker_type):
node = broker_node(test, topic, broker_type)
signal_node(test, node, signal.SIGKILL)
def clean_bounce(test, topic, broker_type):
"""Chase the leader of one partition and restart it cleanly a few times (5 times)."""
for i in range(5):
prev_broker_node = broker_node(test, topic, broker_type)
test.kafka.restart_node(prev_broker_node, clean_shutdown=True)
def hard_bounce(test, topic, broker_type):
"""Chase the leader and restart it with a hard kill. Do this a few times (5)."""
for i in range(5):
prev_broker_node = broker_node(test, topic, broker_type)
test.kafka.signal_node(prev_broker_node, sig=signal.SIGKILL)
# Since this is a hard kill, we need to make sure the process is down and that
# zookeeper has registered the loss by expiring the broker's session timeout.
wait_until(lambda: len(test.kafka.pids(prev_broker_node)) == 0 and not test.kafka.is_registered(prev_broker_node),
timeout_sec=test.kafka.zk_session_timeout + 5,
err_msg="Failed to see timely deregistration of hard-killed broker %s" % str(prev_broker_node.account))
test.kafka.start_node(prev_broker_node)
failures = {
"clean_shutdown": clean_shutdown,
"hard_shutdown": hard_shutdown
"hard_shutdown": hard_shutdown,
"clean_bounce": clean_bounce,
"hard_bounce": hard_bounce
}
class StreamsBrokerBounceTest(Test):
@ -156,7 +182,7 @@ class StreamsBrokerBounceTest(Test):
return data
@cluster(num_nodes=7)
@matrix(failure_mode=["clean_shutdown", "hard_shutdown"],
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
broker_type=["leader", "controller"],
sleep_time_secs=[120])
def test_broker_type_bounce(self, failure_mode, broker_type, sleep_time_secs):
@ -193,9 +219,9 @@ class StreamsBrokerBounceTest(Test):
self.fail_broker_type(failure_mode, broker_type);
return self.collect_results(sleep_time_secs)
@cluster(num_nodes=7)
@matrix(failure_mode=["clean_shutdown", "hard_shutdown"],
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
num_failures=[2])
def test_many_brokers_bounce(self, failure_mode, num_failures):
"""
@ -211,3 +237,21 @@ class StreamsBrokerBounceTest(Test):
self.fail_many_brokers(failure_mode, num_failures);
return self.collect_results(120)
@cluster(num_nodes=7)
@matrix(failure_mode=["clean_bounce", "hard_bounce"],
num_failures=[3])
def test_all_brokers_bounce(self, failure_mode, num_failures):
"""
Start a smoke test client, then kill a few brokers and ensure data is still received
Record if records are delivered
"""
self.setup_system()
# Sleep to allow test to run for a bit
time.sleep(120)
# Fail brokers
self.fail_many_brokers(failure_mode, num_failures);
return self.collect_results(120)