From 929a90da50559f311af87c79eacf4bd02a31f624 Mon Sep 17 00:00:00 2001 From: kevin-wu24 <66326898+kevin-wu24@users.noreply.github.com> Date: Tue, 18 Feb 2025 11:58:58 -0600 Subject: [PATCH] KAFKA-18667 Add replication system test case for combined broker + controller failure (#18757) This patch adds a test case to replication_test.py test_replication_with_broker_failure which validates the scenario when we have failures of a combined mode broker/controller. Reviewers: David Arthur --- tests/kafkatest/tests/core/replication_test.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tests/kafkatest/tests/core/replication_test.py b/tests/kafkatest/tests/core/replication_test.py index c1e2e6df3ee..c2407a3866f 100644 --- a/tests/kafkatest/tests/core/replication_test.py +++ b/tests/kafkatest/tests/core/replication_test.py @@ -128,6 +128,11 @@ class ReplicationTest(EndToEndTest): broker_type=["leader"], security_protocol=["PLAINTEXT", "SASL_SSL"], metadata_quorum=quorum.all_non_upgrade) + @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], + broker_type=["leader"], + security_protocol=["PLAINTEXT", "SASL_SSL"], + metadata_quorum=[quorum.combined_kraft], + num_controllers=[3]) @matrix(failure_mode=["hard_bounce"], broker_type=["leader"], security_protocol=["SASL_SSL"], client_sasl_mechanism=["PLAIN"], interbroker_sasl_mechanism=["PLAIN", "GSSAPI"], @@ -138,7 +143,7 @@ class ReplicationTest(EndToEndTest): def test_replication_with_broker_failure(self, failure_mode, security_protocol, broker_type, client_sasl_mechanism="GSSAPI", interbroker_sasl_mechanism="GSSAPI", compression_type=None, enable_idempotence=False, tls_version=None, - metadata_quorum=quorum.zk): + metadata_quorum=quorum.zk, num_controllers=1): """Replication tests. These tests verify that replication provides simple durability guarantees by checking that data acked by brokers is still available for consumption in the face of various failure scenarios. @@ -161,7 +166,7 @@ class ReplicationTest(EndToEndTest): client_sasl_mechanism=client_sasl_mechanism, interbroker_sasl_mechanism=interbroker_sasl_mechanism, tls_version=tls_version, - controller_num_nodes_override = 1) + controller_num_nodes_override = num_controllers) self.kafka.start() compression_types = None if not compression_type else [compression_type]