From bdd85405e3d72b5089d99026ca1e5aa5b3bc3e51 Mon Sep 17 00:00:00 2001 From: Gaurav Narula Date: Sat, 6 Apr 2024 09:01:25 +0100 Subject: [PATCH] KAFKA-16293: Test log directory failure in Kraft (#15409) Enables log directory failure system test for all Kraft modes in addition to ZK mode. Reviewers: Luke Chen , Igor Soarez , Proven Provenzano --- .../tests/core/log_dir_failure_test.py | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/tests/kafkatest/tests/core/log_dir_failure_test.py b/tests/kafkatest/tests/core/log_dir_failure_test.py index 31a20fde1cc..483ed674c35 100644 --- a/tests/kafkatest/tests/core/log_dir_failure_test.py +++ b/tests/kafkatest/tests/core/log_dir_failure_test.py @@ -18,7 +18,7 @@ from ducktape.mark import matrix from ducktape.mark.resource import cluster from kafkatest.services.kafka import config_property from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest @@ -62,7 +62,7 @@ class LogDirFailureTest(ProduceConsumeValidateTest): self.topic1 = "test_topic_1" self.topic2 = "test_topic_2" - self.zk = ZookeeperService(test_context, num_nodes=1) + self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, @@ -84,20 +84,23 @@ class LogDirFailureTest(ProduceConsumeValidateTest): self.num_consumers = 1 def setUp(self): - self.zk.start() + if self.zk: + self.zk.start() def min_cluster_size(self): """Override this since we're adding services outside of the constructor""" return super(LogDirFailureTest, self).min_cluster_size() + self.num_producers * 2 + self.num_consumers * 2 - @cluster(num_nodes=9) - @matrix(bounce_broker=[False, True], broker_type=["leader", "follower"], security_protocol=["PLAINTEXT"]) - def test_replication_with_disk_failure(self, bounce_broker, security_protocol, broker_type): + @cluster(num_nodes=8) + @matrix(bounce_broker=[False, True], broker_type=["leader", "follower"], security_protocol=["PLAINTEXT"], metadata_quorum=[quorum.zk]) + @cluster(num_nodes=10) + @matrix(bounce_broker=[False, True], broker_type=["leader", "follower"], security_protocol=["PLAINTEXT"], metadata_quorum=[quorum.isolated_kraft]) + def test_replication_with_disk_failure(self, bounce_broker, security_protocol, broker_type, metadata_quorum): """Replication tests. These tests verify that replication provides simple durability guarantees by checking that data acked by brokers is still available for consumption in the face of various failure scenarios. - Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2 + Setup: 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2 and another topic with partitions=3, replication-factor=3, and min.insync.replicas=1 - Produce messages in the background - Consume messages in the background @@ -108,6 +111,10 @@ class LogDirFailureTest(ProduceConsumeValidateTest): self.kafka.security_protocol = security_protocol self.kafka.interbroker_security_protocol = security_protocol + if self.kafka.quorum_info.using_kraft: + controller_quorum = self.kafka.controller_quorum + controller_quorum.controller_security_protocol = security_protocol + controller_quorum.intercontroller_security_protocol = security_protocol self.kafka.start() try: