mirror of https://github.com/apache/kafka.git
KAFKA-18277 Convert network_degrade_test to Kraft mode (#18247)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
cf7029c026
commit
5acbd42dd7
|
@ -22,7 +22,7 @@ from ducktape.utils.util import wait_until
|
|||
|
||||
from kafkatest.services.trogdor.degraded_network_fault_spec import DegradedNetworkFaultSpec
|
||||
from kafkatest.services.trogdor.trogdor import TrogdorService
|
||||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
from kafkatest.services.kafka import KafkaService, quorum
|
||||
|
||||
|
||||
class NetworkDegradeTest(Test):
|
||||
|
@ -34,36 +34,37 @@ class NetworkDegradeTest(Test):
|
|||
|
||||
def __init__(self, test_context):
|
||||
super(NetworkDegradeTest, self).__init__(test_context)
|
||||
self.zk = ZookeeperService(test_context, num_nodes=3)
|
||||
self.trogdor = TrogdorService(context=self.test_context, client_services=[self.zk])
|
||||
self.kafka = KafkaService(test_context, num_nodes=2, zk=None, controller_num_nodes_override=2)
|
||||
self.trogdor = TrogdorService(context=self.test_context, client_services=[self.kafka.controller_quorum])
|
||||
|
||||
def setUp(self):
|
||||
self.zk.start()
|
||||
self.kafka.start()
|
||||
self.trogdor.start()
|
||||
|
||||
def teardown(self):
|
||||
self.trogdor.stop()
|
||||
self.zk.stop()
|
||||
self.kafka.stop()
|
||||
|
||||
@cluster(num_nodes=5)
|
||||
@parametrize(task_name="latency-100", device_name="eth0", latency_ms=50, rate_limit_kbit=0)
|
||||
@parametrize(task_name="latency-100-rate-1000", device_name="eth0", latency_ms=50, rate_limit_kbit=1000)
|
||||
def test_latency(self, task_name, device_name, latency_ms, rate_limit_kbit):
|
||||
@cluster(num_nodes=3)
|
||||
@parametrize(task_name="latency-100", device_name="eth0", latency_ms=50, rate_limit_kbit=0, metadata_quorum=quorum.combined_kraft)
|
||||
@parametrize(task_name="latency-100-rate-1000", device_name="eth0", latency_ms=50, rate_limit_kbit=1000, metadata_quorum=quorum.combined_kraft)
|
||||
def test_latency(self, task_name, device_name, latency_ms, rate_limit_kbit, metadata_quorum=quorum.combined_kraft):
|
||||
spec = DegradedNetworkFaultSpec(0, 10000)
|
||||
for node in self.zk.nodes:
|
||||
for node in self.kafka.controller_quorum.nodes:
|
||||
spec.add_node_spec(node.name, device_name, latency_ms, rate_limit_kbit)
|
||||
|
||||
latency = self.trogdor.create_task(task_name, spec)
|
||||
|
||||
zk0 = self.zk.nodes[0]
|
||||
zk1 = self.zk.nodes[1]
|
||||
quorum0 = self.kafka.controller_quorum.nodes[0]
|
||||
quorum1 = self.kafka.controller_quorum.nodes[1]
|
||||
|
||||
|
||||
# Capture the ping times from the ping stdout
|
||||
# 64 bytes from ducker01 (172.24.0.2): icmp_seq=1 ttl=64 time=0.325 ms
|
||||
r = re.compile(r".*time=(?P<time>[\d.]+)\sms.*")
|
||||
|
||||
times = []
|
||||
for line in zk0.account.ssh_capture("ping -i 1 -c 20 %s" % zk1.account.hostname):
|
||||
for line in quorum0.account.ssh_capture("ping -i 1 -c 20 %s" % quorum1.account.hostname):
|
||||
self.logger.debug("Ping output: %s" % line)
|
||||
m = r.match(line)
|
||||
if m is not None and m.group("time"):
|
||||
|
@ -86,15 +87,15 @@ class NetworkDegradeTest(Test):
|
|||
assert len(slow_times) > 5, "Expected to see more slow ping times (lower than %d)" % low_time_ms
|
||||
assert len(fast_times) > 5, "Expected to see more fast ping times (higher than %d)" % high_time_ms
|
||||
|
||||
@cluster(num_nodes=5)
|
||||
@parametrize(task_name="rate-1000", device_name="eth0", latency_ms=0, rate_limit_kbit=1000000)
|
||||
@parametrize(task_name="rate-1000-latency-50", device_name="eth0", latency_ms=50, rate_limit_kbit=1000000)
|
||||
def test_rate(self, task_name, device_name, latency_ms, rate_limit_kbit):
|
||||
zk0 = self.zk.nodes[0]
|
||||
zk1 = self.zk.nodes[1]
|
||||
@cluster(num_nodes=3)
|
||||
@parametrize(task_name="rate-1000", device_name="eth0", latency_ms=0, rate_limit_kbit=1000000, metadata_quorum=quorum.combined_kraft)
|
||||
@parametrize(task_name="rate-1000-latency-50", device_name="eth0", latency_ms=50, rate_limit_kbit=1000000, metadata_quorum=quorum.combined_kraft)
|
||||
def test_rate(self, task_name, device_name, latency_ms, rate_limit_kbit, metadata_quorum=quorum.combined_kraft):
|
||||
quorum0 = self.kafka.controller_quorum.nodes[0]
|
||||
quorum1 = self.kafka.controller_quorum.nodes[1]
|
||||
|
||||
spec = DegradedNetworkFaultSpec(0, 60000)
|
||||
spec.add_node_spec(zk0.name, device_name, latency_ms, rate_limit_kbit)
|
||||
spec.add_node_spec(quorum0.name, device_name, latency_ms, rate_limit_kbit)
|
||||
|
||||
# start the task and wait
|
||||
rate_limit = self.trogdor.create_task(task_name, spec)
|
||||
|
@ -102,8 +103,8 @@ class NetworkDegradeTest(Test):
|
|||
timeout_sec=10,
|
||||
err_msg="%s failed to start within 10 seconds." % rate_limit)
|
||||
|
||||
# Run iperf server on zk1, iperf client on zk0
|
||||
iperf_server = zk1.account.ssh_capture("iperf -s")
|
||||
# Run iperf server on quorum1, iperf client on quorum0
|
||||
iperf_server = quorum1.account.ssh_capture("iperf -s")
|
||||
|
||||
# Wait until iperf server is listening before starting the client
|
||||
for line in iperf_server:
|
||||
|
@ -117,7 +118,7 @@ class NetworkDegradeTest(Test):
|
|||
r = re.compile(r"^.*\s(?P<rate>[\d.]+)\sKbits/sec$")
|
||||
|
||||
measured_rates = []
|
||||
for line in zk0.account.ssh_capture("iperf -i 1 -t 20 -f k -c %s" % zk1.account.hostname):
|
||||
for line in quorum0.account.ssh_capture("iperf -i 1 -t 20 -f k -c %s" % quorum1.account.hostname):
|
||||
self.logger.info("iperf output %s" % line)
|
||||
m = r.match(line)
|
||||
if m is not None:
|
||||
|
@ -126,7 +127,7 @@ class NetworkDegradeTest(Test):
|
|||
self.logger.info("Parsed rate of %d kbit/s from iperf" % measured_rate)
|
||||
|
||||
# kill iperf server and consume the stdout to ensure clean exit
|
||||
zk1.account.kill_process("iperf")
|
||||
quorum1.account.kill_process("iperf")
|
||||
for _ in iperf_server:
|
||||
continue
|
||||
|
||||
|
|
Loading…
Reference in New Issue