mirror of https://github.com/apache/kafka.git
KAFKA-16293: Test log directory failure in Kraft (#15409)
Enables log directory failure system test for all Kraft modes in addition to ZK mode. Reviewers: Luke Chen <showuon@gmail.com>, Igor Soarez <soarez@apple.com>, Proven Provenzano <pprovenzano@confluent.io>
This commit is contained in:
parent
e798bed198
commit
bdd85405e3
|
@ -18,7 +18,7 @@ from ducktape.mark import matrix
|
|||
from ducktape.mark.resource import cluster
|
||||
from kafkatest.services.kafka import config_property
|
||||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
from kafkatest.services.kafka import KafkaService
|
||||
from kafkatest.services.kafka import KafkaService, quorum
|
||||
from kafkatest.services.verifiable_producer import VerifiableProducer
|
||||
from kafkatest.services.console_consumer import ConsoleConsumer
|
||||
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
|
||||
|
@ -62,7 +62,7 @@ class LogDirFailureTest(ProduceConsumeValidateTest):
|
|||
|
||||
self.topic1 = "test_topic_1"
|
||||
self.topic2 = "test_topic_2"
|
||||
self.zk = ZookeeperService(test_context, num_nodes=1)
|
||||
self.zk = ZookeeperService(test_context, num_nodes=1) if quorum.for_test(test_context) == quorum.zk else None
|
||||
self.kafka = KafkaService(test_context,
|
||||
num_nodes=3,
|
||||
zk=self.zk,
|
||||
|
@ -84,20 +84,23 @@ class LogDirFailureTest(ProduceConsumeValidateTest):
|
|||
self.num_consumers = 1
|
||||
|
||||
def setUp(self):
|
||||
self.zk.start()
|
||||
if self.zk:
|
||||
self.zk.start()
|
||||
|
||||
def min_cluster_size(self):
|
||||
"""Override this since we're adding services outside of the constructor"""
|
||||
return super(LogDirFailureTest, self).min_cluster_size() + self.num_producers * 2 + self.num_consumers * 2
|
||||
|
||||
@cluster(num_nodes=9)
|
||||
@matrix(bounce_broker=[False, True], broker_type=["leader", "follower"], security_protocol=["PLAINTEXT"])
|
||||
def test_replication_with_disk_failure(self, bounce_broker, security_protocol, broker_type):
|
||||
@cluster(num_nodes=8)
|
||||
@matrix(bounce_broker=[False, True], broker_type=["leader", "follower"], security_protocol=["PLAINTEXT"], metadata_quorum=[quorum.zk])
|
||||
@cluster(num_nodes=10)
|
||||
@matrix(bounce_broker=[False, True], broker_type=["leader", "follower"], security_protocol=["PLAINTEXT"], metadata_quorum=[quorum.isolated_kraft])
|
||||
def test_replication_with_disk_failure(self, bounce_broker, security_protocol, broker_type, metadata_quorum):
|
||||
"""Replication tests.
|
||||
These tests verify that replication provides simple durability guarantees by checking that data acked by
|
||||
brokers is still available for consumption in the face of various failure scenarios.
|
||||
|
||||
Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2
|
||||
Setup: 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2
|
||||
and another topic with partitions=3, replication-factor=3, and min.insync.replicas=1
|
||||
- Produce messages in the background
|
||||
- Consume messages in the background
|
||||
|
@ -108,6 +111,10 @@ class LogDirFailureTest(ProduceConsumeValidateTest):
|
|||
|
||||
self.kafka.security_protocol = security_protocol
|
||||
self.kafka.interbroker_security_protocol = security_protocol
|
||||
if self.kafka.quorum_info.using_kraft:
|
||||
controller_quorum = self.kafka.controller_quorum
|
||||
controller_quorum.controller_security_protocol = security_protocol
|
||||
controller_quorum.intercontroller_security_protocol = security_protocol
|
||||
self.kafka.start()
|
||||
|
||||
try:
|
||||
|
|
Loading…
Reference in New Issue