KAFKA-16293: Test log directory failure in Kraft (#15409)

Enables log directory failure system test for all Kraft modes in addition to ZK mode.

Reviewers: Luke Chen <showuon@gmail.com>, Igor Soarez <soarez@apple.com>, Proven Provenzano <pprovenzano@confluent.io>
This commit is contained in:
Gaurav Narula 2024-04-06 09:01:25 +01:00 committed by Luke Chen
parent 2a4ee61003
commit 7bf785a646
1 changed files with 14 additions and 7 deletions

View File

@ -18,7 +18,7 @@ from ducktape.mark import matrix
from ducktape.mark.resource import cluster from ducktape.mark.resource import cluster
from kafkatest.services.kafka import config_property from kafkatest.services.kafka import config_property
from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
@ -62,7 +62,7 @@ class LogDirFailureTest(ProduceConsumeValidateTest):
self.topic1 = "test_topic_1" self.topic1 = "test_topic_1"
self.topic2 = "test_topic_2" self.topic2 = "test_topic_2"
self.zk = ZookeeperService(test_context, num_nodes=1) self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None
self.kafka = KafkaService(test_context, self.kafka = KafkaService(test_context,
num_nodes=3, num_nodes=3,
zk=self.zk, zk=self.zk,
@ -84,20 +84,23 @@ class LogDirFailureTest(ProduceConsumeValidateTest):
self.num_consumers = 1 self.num_consumers = 1
def setUp(self): def setUp(self):
self.zk.start() if self.zk:
self.zk.start()
def min_cluster_size(self): def min_cluster_size(self):
"""Override this since we're adding services outside of the constructor""" """Override this since we're adding services outside of the constructor"""
return super(LogDirFailureTest, self).min_cluster_size() + self.num_producers * 2 + self.num_consumers * 2 return super(LogDirFailureTest, self).min_cluster_size() + self.num_producers * 2 + self.num_consumers * 2
@cluster(num_nodes=9) @cluster(num_nodes=8)
@matrix(bounce_broker=[False, True], broker_type=["leader", "follower"], security_protocol=["PLAINTEXT"]) @matrix(bounce_broker=[False, True], broker_type=["leader", "follower"], security_protocol=["PLAINTEXT"], metadata_quorum=[quorum.zk])
def test_replication_with_disk_failure(self, bounce_broker, security_protocol, broker_type): @cluster(num_nodes=10)
@matrix(bounce_broker=[False, True], broker_type=["leader", "follower"], security_protocol=["PLAINTEXT"], metadata_quorum=[quorum.isolated_kraft])
def test_replication_with_disk_failure(self, bounce_broker, security_protocol, broker_type, metadata_quorum):
"""Replication tests. """Replication tests.
These tests verify that replication provides simple durability guarantees by checking that data acked by These tests verify that replication provides simple durability guarantees by checking that data acked by
brokers is still available for consumption in the face of various failure scenarios. brokers is still available for consumption in the face of various failure scenarios.
Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2 Setup: 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2
and another topic with partitions=3, replication-factor=3, and min.insync.replicas=1 and another topic with partitions=3, replication-factor=3, and min.insync.replicas=1
- Produce messages in the background - Produce messages in the background
- Consume messages in the background - Consume messages in the background
@ -108,6 +111,10 @@ class LogDirFailureTest(ProduceConsumeValidateTest):
self.kafka.security_protocol = security_protocol self.kafka.security_protocol = security_protocol
self.kafka.interbroker_security_protocol = security_protocol self.kafka.interbroker_security_protocol = security_protocol
if self.kafka.quorum_info.using_kraft:
controller_quorum = self.kafka.controller_quorum
controller_quorum.controller_security_protocol = security_protocol
controller_quorum.intercontroller_security_protocol = security_protocol
self.kafka.start() self.kafka.start()
try: try: