mirror of https://github.com/apache/kafka.git
KAFKA-17609:[4/4]Convert system tests to kraft part 4 (#17328)
Part 4 of 4 converting streams system tests to KRaft Reviewers: Matthias Sax <mjsax@apache.org>
This commit is contained in:
parent
f29ca9ba9a
commit
3d2edf8de0
|
@ -49,8 +49,8 @@ class StreamsSmokeTest(KafkaTest):
|
|||
@cluster(num_nodes=8)
|
||||
@matrix(processing_guarantee=['exactly_once_v2', 'at_least_once'],
|
||||
crash=[True, False],
|
||||
metadata_quorum=quorum.all_non_upgrade)
|
||||
def test_streams(self, processing_guarantee, crash, metadata_quorum=quorum.zk):
|
||||
metadata_quorum=[quorum.combined_kraft])
|
||||
def test_streams(self, processing_guarantee, crash, metadata_quorum):
|
||||
processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee)
|
||||
processor2 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee)
|
||||
processor3 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, processing_guarantee)
|
||||
|
|
|
@ -19,7 +19,6 @@ from ducktape.tests.test import Test
|
|||
from kafkatest.services.kafka import KafkaService, quorum
|
||||
from kafkatest.services.streams import StaticMemberTestService
|
||||
from kafkatest.services.verifiable_producer import VerifiableProducer
|
||||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
from kafkatest.tests.streams.utils import verify_stopped, stop_processors, verify_running, extract_generation_from_logs, extract_generation_id
|
||||
|
||||
class StreamsStaticMembershipTest(Test):
|
||||
|
@ -39,13 +38,8 @@ class StreamsStaticMembershipTest(Test):
|
|||
self.input_topic: {'partitions': 18},
|
||||
}
|
||||
|
||||
self.zookeeper = (
|
||||
ZookeeperService(self.test_context, 1)
|
||||
if quorum.for_test(self.test_context) == quorum.zk
|
||||
else None
|
||||
)
|
||||
self.kafka = KafkaService(self.test_context, num_nodes=3,
|
||||
zk=self.zookeeper, topics=self.topics, controller_num_nodes_override=1)
|
||||
zk=None, topics=self.topics, controller_num_nodes_override=1)
|
||||
|
||||
self.producer = VerifiableProducer(self.test_context,
|
||||
1,
|
||||
|
@ -57,8 +51,6 @@ class StreamsStaticMembershipTest(Test):
|
|||
@cluster(num_nodes=8)
|
||||
@matrix(metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[True, False])
|
||||
def test_rolling_bounces_will_not_trigger_rebalance_under_static_membership(self, metadata_quorum, use_new_coordinator=False):
|
||||
if self.zookeeper:
|
||||
self.zookeeper.start()
|
||||
self.kafka.start()
|
||||
|
||||
numThreads = 3
|
||||
|
@ -104,8 +96,6 @@ class StreamsStaticMembershipTest(Test):
|
|||
|
||||
self.producer.stop()
|
||||
self.kafka.stop(timeout_sec=120)
|
||||
if self.zookeeper:
|
||||
self.zookeeper.stop()
|
||||
|
||||
def verify_processing(self, processors):
|
||||
for processor in processors:
|
||||
|
|
|
@ -18,28 +18,22 @@ import time
|
|||
from ducktape.mark import matrix, ignore
|
||||
from ducktape.mark.resource import cluster
|
||||
from ducktape.tests.test import Test
|
||||
from ducktape.utils.util import wait_until
|
||||
from kafkatest.services.kafka import KafkaService
|
||||
from kafkatest.services.kafka import KafkaService, quorum
|
||||
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService, \
|
||||
StreamsUpgradeTestJobRunnerService
|
||||
from kafkatest.services.zookeeper import ZookeeperService
|
||||
from kafkatest.tests.streams.utils import extract_generation_from_logs, extract_generation_id
|
||||
from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \
|
||||
from kafkatest.version import LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \
|
||||
LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \
|
||||
LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, LATEST_3_6, LATEST_3_7, LATEST_3_8, DEV_BRANCH, DEV_VERSION, \
|
||||
KafkaVersion
|
||||
|
||||
# broker 0.10.0 is not compatible with newer Kafka Streams versions
|
||||
# broker 0.10.1 and 0.10.2 do not support headers, as required by suppress() (since v2.2.1)
|
||||
broker_upgrade_versions = [str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1),
|
||||
str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3),
|
||||
str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7),
|
||||
str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2),
|
||||
broker_upgrade_versions = [str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2),
|
||||
str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), str(LATEST_3_6),
|
||||
str(LATEST_3_7), str(LATEST_3_8), str(DEV_BRANCH)]
|
||||
|
||||
metadata_1_versions = [str(LATEST_0_10_0)]
|
||||
metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1),
|
||||
metadata_2_versions = [str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1),
|
||||
str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8),
|
||||
str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3)]
|
||||
# upgrading from version (2.4...3.3) is broken and only fixed later in 3.3.3 (unreleased) and 3.4.0
|
||||
|
@ -111,102 +105,10 @@ class StreamsUpgradeTest(Test):
|
|||
node.version = KafkaVersion(to_version)
|
||||
self.kafka.start_node(node)
|
||||
|
||||
@ignore
|
||||
@cluster(num_nodes=6)
|
||||
@matrix(from_version=broker_upgrade_versions, to_version=broker_upgrade_versions)
|
||||
def test_upgrade_downgrade_brokers(self, from_version, to_version):
|
||||
"""
|
||||
Start a smoke test client then perform rolling upgrades on the broker.
|
||||
"""
|
||||
|
||||
if from_version == to_version:
|
||||
return
|
||||
|
||||
self.replication = 3
|
||||
self.num_kafka_nodes = 3
|
||||
self.partitions = 1
|
||||
self.isr = 2
|
||||
self.topics = {
|
||||
'echo' : { 'partitions': self.partitions, 'replication-factor': self.replication,
|
||||
'configs': {"min.insync.replicas": self.isr}},
|
||||
'data' : { 'partitions': self.partitions, 'replication-factor': self.replication,
|
||||
'configs': {"min.insync.replicas": self.isr} },
|
||||
'min' : { 'partitions': self.partitions, 'replication-factor': self.replication,
|
||||
'configs': {"min.insync.replicas": self.isr} },
|
||||
'max' : { 'partitions': self.partitions, 'replication-factor': self.replication,
|
||||
'configs': {"min.insync.replicas": self.isr} },
|
||||
'sum' : { 'partitions': self.partitions, 'replication-factor': self.replication,
|
||||
'configs': {"min.insync.replicas": self.isr} },
|
||||
'dif' : { 'partitions': self.partitions, 'replication-factor': self.replication,
|
||||
'configs': {"min.insync.replicas": self.isr} },
|
||||
'cnt' : { 'partitions': self.partitions, 'replication-factor': self.replication,
|
||||
'configs': {"min.insync.replicas": self.isr} },
|
||||
'avg' : { 'partitions': self.partitions, 'replication-factor': self.replication,
|
||||
'configs': {"min.insync.replicas": self.isr} },
|
||||
'wcnt' : { 'partitions': self.partitions, 'replication-factor': self.replication,
|
||||
'configs': {"min.insync.replicas": self.isr} },
|
||||
'tagg' : { 'partitions': self.partitions, 'replication-factor': self.replication,
|
||||
'configs': {"min.insync.replicas": self.isr} }
|
||||
}
|
||||
|
||||
# Setup phase
|
||||
self.zk = ZookeeperService(self.test_context, num_nodes=1)
|
||||
self.zk.start()
|
||||
|
||||
# number of nodes needs to be >= 3 for the smoke test
|
||||
self.kafka = KafkaService(self.test_context, num_nodes=self.num_kafka_nodes,
|
||||
zk=self.zk, version=KafkaVersion(from_version), topics=self.topics)
|
||||
self.kafka.start()
|
||||
|
||||
# allow some time for topics to be created
|
||||
wait_until(lambda: self.confirm_topics_on_all_brokers(set(self.topics.keys())),
|
||||
timeout_sec=60,
|
||||
err_msg="Broker did not create all topics in 60 seconds ")
|
||||
|
||||
self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
|
||||
|
||||
processor = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, "at_least_once")
|
||||
|
||||
with self.driver.node.account.monitor_log(self.driver.STDOUT_FILE) as driver_monitor:
|
||||
self.driver.start()
|
||||
|
||||
with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
|
||||
processor.start()
|
||||
monitor.wait_until(self.processed_data_msg,
|
||||
timeout_sec=60,
|
||||
err_msg="Never saw output '%s' on " % self.processed_data_msg + str(processor.node))
|
||||
|
||||
connected_message = "Discovered group coordinator"
|
||||
with processor.node.account.monitor_log(processor.LOG_FILE) as log_monitor:
|
||||
with processor.node.account.monitor_log(processor.STDOUT_FILE) as stdout_monitor:
|
||||
self.perform_broker_upgrade(to_version)
|
||||
|
||||
log_monitor.wait_until(connected_message,
|
||||
timeout_sec=120,
|
||||
err_msg=("Never saw output '%s' on " % connected_message) + str(processor.node.account))
|
||||
|
||||
stdout_monitor.wait_until(self.processed_data_msg,
|
||||
timeout_sec=60,
|
||||
err_msg="Never saw output '%s' on" % self.processed_data_msg + str(processor.node.account))
|
||||
|
||||
# SmokeTestDriver allows up to 6 minutes to consume all
|
||||
# records for the verification step so this timeout is set to
|
||||
# 6 minutes (360 seconds) for consuming of verification records
|
||||
# and a very conservative additional 2 minutes (120 seconds) to process
|
||||
# the records in the verification step
|
||||
driver_monitor.wait_until('ALL-RECORDS-DELIVERED\|PROCESSED-MORE-THAN-GENERATED',
|
||||
timeout_sec=480,
|
||||
err_msg="Never saw output '%s' on" % 'ALL-RECORDS-DELIVERED|PROCESSED-MORE-THAN-GENERATED' + str(self.driver.node.account))
|
||||
|
||||
self.driver.stop()
|
||||
processor.stop()
|
||||
processor.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
|
||||
|
||||
@cluster(num_nodes=6)
|
||||
@matrix(from_version=metadata_1_versions)
|
||||
@matrix(from_version=metadata_2_versions)
|
||||
@matrix(from_version=fk_join_versions)
|
||||
def test_rolling_upgrade_with_2_bounces(self, from_version):
|
||||
@matrix(from_version=metadata_2_versions, metadata_quorum=[quorum.combined_kraft])
|
||||
@matrix(from_version=fk_join_versions, metadata_quorum=[quorum.combined_kraft])
|
||||
def test_rolling_upgrade_with_2_bounces(self, from_version, metadata_quorum):
|
||||
"""
|
||||
This test verifies that the cluster successfully upgrades despite changes in the metadata and FK
|
||||
join protocols.
|
||||
|
@ -245,7 +147,8 @@ class StreamsUpgradeTest(Test):
|
|||
self.stop_and_await()
|
||||
|
||||
@cluster(num_nodes=6)
|
||||
def test_version_probing_upgrade(self):
|
||||
@matrix(metadata_quorum=[quorum.combined_kraft])
|
||||
def test_version_probing_upgrade(self, metadata_quorum):
|
||||
"""
|
||||
Starts 3 KafkaStreams instances, and upgrades one-by-one to "future version"
|
||||
"""
|
||||
|
@ -272,8 +175,8 @@ class StreamsUpgradeTest(Test):
|
|||
self.stop_and_await()
|
||||
|
||||
@cluster(num_nodes=6)
|
||||
@matrix(from_version=[str(LATEST_3_2), str(DEV_VERSION)], upgrade=[True, False])
|
||||
def test_upgrade_downgrade_state_updater(self, from_version, upgrade):
|
||||
@matrix(from_version=[str(LATEST_3_2), str(DEV_VERSION)], upgrade=[True, False], metadata_quorum=[quorum.combined_kraft])
|
||||
def test_upgrade_downgrade_state_updater(self, from_version, upgrade, metadata_quorum):
|
||||
"""
|
||||
Starts 3 KafkaStreams instances, and enables / disables state restoration
|
||||
for the instances in a rolling bounce.
|
||||
|
@ -312,10 +215,7 @@ class StreamsUpgradeTest(Test):
|
|||
self.stop_and_await()
|
||||
|
||||
def set_up_services(self):
|
||||
self.zk = ZookeeperService(self.test_context, num_nodes=1)
|
||||
self.zk.start()
|
||||
|
||||
self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics=self.topics)
|
||||
self.kafka = KafkaService(self.test_context, num_nodes=1, zk=None, topics=self.topics)
|
||||
self.kafka.start()
|
||||
|
||||
self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
|
||||
|
|
Loading…
Reference in New Issue