KAFKA-17609:[2/4]Convert system tests to kraft part 2 (#17321)

* Part 2 of 4 converting system tests to use KRaft

Reviewers: Matthias Sax <mjsax@apache.org>
This commit is contained in:
Bill Bejeck 2024-10-30 12:31:47 -04:00 committed by GitHub
parent 358d8775fb
commit 58dd76817e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 12 additions and 25 deletions

View File

@ -17,9 +17,8 @@ import time
from ducktape.mark import matrix from ducktape.mark import matrix
from ducktape.mark.resource import cluster from ducktape.mark.resource import cluster
from ducktape.tests.test import Test from ducktape.tests.test import Test
from kafkatest.services.kafka import KafkaService from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \ from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \
LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3 LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3
from kafkatest.services.streams import CooperativeRebalanceUpgradeService from kafkatest.services.streams import CooperativeRebalanceUpgradeService
@ -45,7 +44,7 @@ class StreamsCooperativeRebalanceUpgradeTest(Test):
second_bounce_phase = "second_bounce_phase-" second_bounce_phase = "second_bounce_phase-"
# !!CAUTION!!: THIS LIST OF VERSIONS IS FIXED, NO VERSIONS MUST BE ADDED # !!CAUTION!!: THIS LIST OF VERSIONS IS FIXED, NO VERSIONS MUST BE ADDED
streams_eager_rebalance_upgrade_versions = [str(LATEST_0_10_0), str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), streams_eager_rebalance_upgrade_versions = [str(LATEST_0_10_2), str(LATEST_0_11_0),
str(LATEST_1_0), str(LATEST_1_1), str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_1_0), str(LATEST_1_1), str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2),
str(LATEST_2_3)] str(LATEST_2_3)]
@ -56,9 +55,9 @@ class StreamsCooperativeRebalanceUpgradeTest(Test):
self.sink_topic: {'partitions': 9} self.sink_topic: {'partitions': 9}
} }
self.zookeeper = ZookeeperService(self.test_context, num_nodes=1)
self.kafka = KafkaService(self.test_context, num_nodes=3, self.kafka = KafkaService(self.test_context, num_nodes=3,
zk=self.zookeeper, topics=self.topics) zk=None, topics=self.topics,
controller_num_nodes_override=1)
self.producer = VerifiableProducer(self.test_context, self.producer = VerifiableProducer(self.test_context,
1, 1,
@ -68,9 +67,8 @@ class StreamsCooperativeRebalanceUpgradeTest(Test):
acks=1) acks=1)
@cluster(num_nodes=8) @cluster(num_nodes=8)
@matrix(upgrade_from_version=streams_eager_rebalance_upgrade_versions) @matrix(upgrade_from_version=streams_eager_rebalance_upgrade_versions, metadata_quorum=[quorum.combined_kraft])
def test_upgrade_to_cooperative_rebalance(self, upgrade_from_version): def test_upgrade_to_cooperative_rebalance(self, upgrade_from_version, metadata_quorum):
self.zookeeper.start()
self.kafka.start() self.kafka.start()
processor1 = CooperativeRebalanceUpgradeService(self.test_context, self.kafka) processor1 = CooperativeRebalanceUpgradeService(self.test_context, self.kafka)
@ -138,7 +136,6 @@ class StreamsCooperativeRebalanceUpgradeTest(Test):
self.producer.stop() self.producer.stop()
self.kafka.stop() self.kafka.stop()
self.zookeeper.stop()
def maybe_upgrade_rolling_bounce_and_verify(self, def maybe_upgrade_rolling_bounce_and_verify(self,
processors, processors,

View File

@ -39,7 +39,7 @@ class StreamsEosTest(KafkaTest):
self.test_context = test_context self.test_context = test_context
@cluster(num_nodes=9) @cluster(num_nodes=9)
@matrix(metadata_quorum=[quorum.isolated_kraft]) @matrix(metadata_quorum=[quorum.combined_kraft])
def test_rebalance_simple(self, metadata_quorum): def test_rebalance_simple(self, metadata_quorum):
self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka), self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestJobRunnerService(self.test_context, self.kafka), StreamsEosTestJobRunnerService(self.test_context, self.kafka),
@ -47,7 +47,7 @@ class StreamsEosTest(KafkaTest):
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka)) StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
@cluster(num_nodes=9) @cluster(num_nodes=9)
@matrix(metadata_quorum=[quorum.isolated_kraft]) @matrix(metadata_quorum=[quorum.combined_kraft])
def test_rebalance_complex(self, metadata_quorum): def test_rebalance_complex(self, metadata_quorum):
self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
@ -81,7 +81,7 @@ class StreamsEosTest(KafkaTest):
verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False) verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False)
@cluster(num_nodes=9) @cluster(num_nodes=9)
@matrix(metadata_quorum=[quorum.isolated_kraft]) @matrix(metadata_quorum=[quorum.combined_kraft])
def test_failure_and_recovery(self, metadata_quorum): def test_failure_and_recovery(self, metadata_quorum):
self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka), self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka),
StreamsEosTestJobRunnerService(self.test_context, self.kafka), StreamsEosTestJobRunnerService(self.test_context, self.kafka),
@ -89,7 +89,7 @@ class StreamsEosTest(KafkaTest):
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka)) StreamsEosTestVerifyRunnerService(self.test_context, self.kafka))
@cluster(num_nodes=9) @cluster(num_nodes=9)
@matrix(metadata_quorum=[quorum.isolated_kraft]) @matrix(metadata_quorum=[quorum.combined_kraft])
def test_failure_and_recovery_complex(self, metadata_quorum): def test_failure_and_recovery_complex(self, metadata_quorum):
self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka), StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka),

View File

@ -19,7 +19,6 @@ from ducktape.tests.test import Test
from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.streams import StreamsNamedRepartitionTopicService from kafkatest.services.streams import StreamsNamedRepartitionTopicService
from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.streams.utils import verify_stopped, stop_processors, verify_running from kafkatest.tests.streams.utils import verify_stopped, stop_processors, verify_running
class StreamsNamedRepartitionTopicTest(Test): class StreamsNamedRepartitionTopicTest(Test):
@ -41,13 +40,8 @@ class StreamsNamedRepartitionTopicTest(Test):
self.aggregation_topic: {'partitions': 6} self.aggregation_topic: {'partitions': 6}
} }
self.zookeeper = (
ZookeeperService(self.test_context, 1)
if quorum.for_test(self.test_context) == quorum.zk
else None
)
self.kafka = KafkaService(self.test_context, num_nodes=3, self.kafka = KafkaService(self.test_context, num_nodes=3,
zk=self.zookeeper, topics=self.topics, controller_num_nodes_override=1) zk=None, topics=self.topics, controller_num_nodes_override=1)
self.producer = VerifiableProducer(self.test_context, self.producer = VerifiableProducer(self.test_context,
1, 1,
@ -57,10 +51,8 @@ class StreamsNamedRepartitionTopicTest(Test):
acks=1) acks=1)
@cluster(num_nodes=8) @cluster(num_nodes=8)
@matrix(metadata_quorum=[quorum.isolated_kraft]) @matrix(metadata_quorum=[quorum.combined_kraft])
def test_upgrade_topology_with_named_repartition_topic(self, metadata_quorum): def test_upgrade_topology_with_named_repartition_topic(self, metadata_quorum):
if self.zookeeper:
self.zookeeper.start()
self.kafka.start() self.kafka.start()
processor1 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka) processor1 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka)
@ -91,8 +83,6 @@ class StreamsNamedRepartitionTopicTest(Test):
self.producer.stop() self.producer.stop()
self.kafka.stop() self.kafka.stop()
if self.zookeeper:
self.zookeeper.stop()
def verify_processing(self, processors): def verify_processing(self, processors):
for processor in processors: for processor in processors: