mirror of https://github.com/apache/kafka.git
MINOR: Cleaning ignored streams tests (#20585)
- Test has been disabled for years + it tests ZK setup, but with KRaft there is no more a "controller", so cleaning it up. - Test has also been disabled for years + we did not get any incident about KS breaking during broker upgrade/downgrade
This commit is contained in:
parent
e37f31e8b2
commit
c29c130fd9
|
@ -17,7 +17,6 @@ from ducktape.utils.util import wait_until
|
||||||
from ducktape.tests.test import Test
|
from ducktape.tests.test import Test
|
||||||
from ducktape.mark.resource import cluster
|
from ducktape.mark.resource import cluster
|
||||||
from ducktape.mark import matrix
|
from ducktape.mark import matrix
|
||||||
from ducktape.mark import ignore
|
|
||||||
from kafkatest.services.kafka import KafkaService, quorum
|
from kafkatest.services.kafka import KafkaService, quorum
|
||||||
from kafkatest.services.zookeeper import ZookeeperService
|
from kafkatest.services.zookeeper import ZookeeperService
|
||||||
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
|
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
|
||||||
|
@ -239,29 +238,6 @@ class StreamsBrokerBounceTest(Test):
|
||||||
|
|
||||||
return self.collect_results(sleep_time_secs)
|
return self.collect_results(sleep_time_secs)
|
||||||
|
|
||||||
@ignore
|
|
||||||
@cluster(num_nodes=7)
|
|
||||||
@matrix(failure_mode=["clean_shutdown"],
|
|
||||||
broker_type=["controller"],
|
|
||||||
sleep_time_secs=[0])
|
|
||||||
def test_broker_type_bounce_at_start(self, failure_mode, broker_type, sleep_time_secs):
|
|
||||||
"""
|
|
||||||
Start a smoke test client, then kill one particular broker immediately before streams stats
|
|
||||||
Streams should throw an exception since it cannot create topics with the desired
|
|
||||||
replication factor of 3
|
|
||||||
"""
|
|
||||||
self.setup_system(start_processor=False)
|
|
||||||
|
|
||||||
# Sleep to allow test to run for a bit
|
|
||||||
time.sleep(sleep_time_secs)
|
|
||||||
|
|
||||||
# Fail brokers
|
|
||||||
self.fail_broker_type(failure_mode, broker_type)
|
|
||||||
|
|
||||||
self.processor1.start()
|
|
||||||
|
|
||||||
return self.collect_results(sleep_time_secs)
|
|
||||||
|
|
||||||
@cluster(num_nodes=7)
|
@cluster(num_nodes=7)
|
||||||
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
|
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
|
||||||
num_failures=[2],
|
num_failures=[2],
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
import random
|
import random
|
||||||
import time
|
import time
|
||||||
from ducktape.mark import matrix, ignore
|
from ducktape.mark import matrix
|
||||||
from ducktape.mark.resource import cluster
|
from ducktape.mark.resource import cluster
|
||||||
from ducktape.tests.test import Test
|
from ducktape.tests.test import Test
|
||||||
from ducktape.utils.util import wait_until
|
from ducktape.utils.util import wait_until
|
||||||
|
@ -111,97 +111,6 @@ class StreamsUpgradeTest(Test):
|
||||||
node.version = KafkaVersion(to_version)
|
node.version = KafkaVersion(to_version)
|
||||||
self.kafka.start_node(node)
|
self.kafka.start_node(node)
|
||||||
|
|
||||||
@ignore
|
|
||||||
@cluster(num_nodes=6)
|
|
||||||
@matrix(from_version=broker_upgrade_versions, to_version=broker_upgrade_versions)
|
|
||||||
def test_upgrade_downgrade_brokers(self, from_version, to_version):
|
|
||||||
"""
|
|
||||||
Start a smoke test client then perform rolling upgrades on the broker.
|
|
||||||
"""
|
|
||||||
|
|
||||||
if from_version == to_version:
|
|
||||||
return
|
|
||||||
|
|
||||||
self.replication = 3
|
|
||||||
self.num_kafka_nodes = 3
|
|
||||||
self.partitions = 1
|
|
||||||
self.isr = 2
|
|
||||||
self.topics = {
|
|
||||||
'echo' : { 'partitions': self.partitions, 'replication-factor': self.replication,
|
|
||||||
'configs': {"min.insync.replicas": self.isr}},
|
|
||||||
'data' : { 'partitions': self.partitions, 'replication-factor': self.replication,
|
|
||||||
'configs': {"min.insync.replicas": self.isr} },
|
|
||||||
'min' : { 'partitions': self.partitions, 'replication-factor': self.replication,
|
|
||||||
'configs': {"min.insync.replicas": self.isr} },
|
|
||||||
'max' : { 'partitions': self.partitions, 'replication-factor': self.replication,
|
|
||||||
'configs': {"min.insync.replicas": self.isr} },
|
|
||||||
'sum' : { 'partitions': self.partitions, 'replication-factor': self.replication,
|
|
||||||
'configs': {"min.insync.replicas": self.isr} },
|
|
||||||
'dif' : { 'partitions': self.partitions, 'replication-factor': self.replication,
|
|
||||||
'configs': {"min.insync.replicas": self.isr} },
|
|
||||||
'cnt' : { 'partitions': self.partitions, 'replication-factor': self.replication,
|
|
||||||
'configs': {"min.insync.replicas": self.isr} },
|
|
||||||
'avg' : { 'partitions': self.partitions, 'replication-factor': self.replication,
|
|
||||||
'configs': {"min.insync.replicas": self.isr} },
|
|
||||||
'wcnt' : { 'partitions': self.partitions, 'replication-factor': self.replication,
|
|
||||||
'configs': {"min.insync.replicas": self.isr} },
|
|
||||||
'tagg' : { 'partitions': self.partitions, 'replication-factor': self.replication,
|
|
||||||
'configs': {"min.insync.replicas": self.isr} }
|
|
||||||
}
|
|
||||||
|
|
||||||
# Setup phase
|
|
||||||
self.zk = ZookeeperService(self.test_context, num_nodes=1)
|
|
||||||
self.zk.start()
|
|
||||||
|
|
||||||
# number of nodes needs to be >= 3 for the smoke test
|
|
||||||
self.kafka = KafkaService(self.test_context, num_nodes=self.num_kafka_nodes,
|
|
||||||
zk=self.zk, version=KafkaVersion(from_version), topics=self.topics)
|
|
||||||
self.kafka.start()
|
|
||||||
|
|
||||||
# allow some time for topics to be created
|
|
||||||
wait_until(lambda: self.confirm_topics_on_all_brokers(set(self.topics.keys())),
|
|
||||||
timeout_sec=60,
|
|
||||||
err_msg="Broker did not create all topics in 60 seconds ")
|
|
||||||
|
|
||||||
self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
|
|
||||||
|
|
||||||
processor = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, "at_least_once")
|
|
||||||
|
|
||||||
with self.driver.node.account.monitor_log(self.driver.STDOUT_FILE) as driver_monitor:
|
|
||||||
self.driver.start()
|
|
||||||
|
|
||||||
with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
|
|
||||||
processor.start()
|
|
||||||
monitor.wait_until(self.processed_data_msg,
|
|
||||||
timeout_sec=60,
|
|
||||||
err_msg="Never saw output '%s' on " % self.processed_data_msg + str(processor.node))
|
|
||||||
|
|
||||||
connected_message = "Discovered group coordinator"
|
|
||||||
with processor.node.account.monitor_log(processor.LOG_FILE) as log_monitor:
|
|
||||||
with processor.node.account.monitor_log(processor.STDOUT_FILE) as stdout_monitor:
|
|
||||||
self.perform_broker_upgrade(to_version)
|
|
||||||
|
|
||||||
log_monitor.wait_until(connected_message,
|
|
||||||
timeout_sec=120,
|
|
||||||
err_msg=("Never saw output '%s' on " % connected_message) + str(processor.node.account))
|
|
||||||
|
|
||||||
stdout_monitor.wait_until(self.processed_data_msg,
|
|
||||||
timeout_sec=60,
|
|
||||||
err_msg="Never saw output '%s' on" % self.processed_data_msg + str(processor.node.account))
|
|
||||||
|
|
||||||
# SmokeTestDriver allows up to 6 minutes to consume all
|
|
||||||
# records for the verification step so this timeout is set to
|
|
||||||
# 6 minutes (360 seconds) for consuming of verification records
|
|
||||||
# and a very conservative additional 2 minutes (120 seconds) to process
|
|
||||||
# the records in the verification step
|
|
||||||
driver_monitor.wait_until('ALL-RECORDS-DELIVERED\|PROCESSED-MORE-THAN-GENERATED',
|
|
||||||
timeout_sec=480,
|
|
||||||
err_msg="Never saw output '%s' on" % 'ALL-RECORDS-DELIVERED|PROCESSED-MORE-THAN-GENERATED' + str(self.driver.node.account))
|
|
||||||
|
|
||||||
self.driver.stop()
|
|
||||||
processor.stop()
|
|
||||||
processor.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
|
|
||||||
|
|
||||||
@cluster(num_nodes=6)
|
@cluster(num_nodes=6)
|
||||||
@matrix(from_version=metadata_1_versions)
|
@matrix(from_version=metadata_1_versions)
|
||||||
@matrix(from_version=metadata_2_versions)
|
@matrix(from_version=metadata_2_versions)
|
||||||
|
|
Loading…
Reference in New Issue