KAFKA-19202: Enable KIP-1071 in streams_broker_bounce_test.py (#19584)

Enable KIP-1071 in the next system test.

Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
Lucas Brutschy 2025-04-29 18:11:46 +02:00 committed by GitHub
parent 036ed569d5
commit 14ea1cf61a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 20 additions and 16 deletions

View File

@ -149,10 +149,10 @@ class StreamsBrokerBounceTest(Test):
return True return True
def setup_system(self, start_processor=True, num_threads=3): def setup_system(self, start_processor=True, num_threads=3, group_protocol='classic'):
# Setup phase # Setup phase
use_streams_groups = True if group_protocol == 'streams' else False
self.kafka = KafkaService(self.test_context, num_nodes=self.replication, zk=None, topics=self.topics) self.kafka = KafkaService(self.test_context, num_nodes=self.replication, zk=None, topics=self.topics, use_streams_groups=use_streams_groups)
self.kafka.start() self.kafka.start()
# allow some time for topics to be created # allow some time for topics to be created
@ -162,7 +162,7 @@ class StreamsBrokerBounceTest(Test):
# Start test harness # Start test harness
self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka) self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, "at_least_once", num_threads) self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, "at_least_once", group_protocol=group_protocol, num_threads = num_threads)
self.driver.start() self.driver.start()
@ -207,15 +207,16 @@ class StreamsBrokerBounceTest(Test):
broker_type=["leader"], broker_type=["leader"],
num_threads=[1, 3], num_threads=[1, 3],
sleep_time_secs=[120], sleep_time_secs=[120],
metadata_quorum=[quorum.combined_kraft]) metadata_quorum=[quorum.combined_kraft],
def test_broker_type_bounce(self, failure_mode, broker_type, sleep_time_secs, num_threads, metadata_quorum): group_protocol=["classic", "streams"])
def test_broker_type_bounce(self, failure_mode, broker_type, sleep_time_secs, num_threads, metadata_quorum, group_protocol):
""" """
Start a smoke test client, then kill one particular broker and ensure data is still received Start a smoke test client, then kill one particular broker and ensure data is still received
Record if records are delivered. Record if records are delivered.
We also add a single thread stream client to make sure we could get all partitions reassigned in We also add a single thread stream client to make sure we could get all partitions reassigned in
next generation so to verify the partition lost is correctly triggered. next generation so to verify the partition lost is correctly triggered.
""" """
self.setup_system(num_threads=num_threads) self.setup_system(num_threads=num_threads, group_protocol=group_protocol)
# Sleep to allow test to run for a bit # Sleep to allow test to run for a bit
time.sleep(sleep_time_secs) time.sleep(sleep_time_secs)
@ -230,14 +231,15 @@ class StreamsBrokerBounceTest(Test):
@matrix(failure_mode=["clean_shutdown"], @matrix(failure_mode=["clean_shutdown"],
broker_type=["controller"], broker_type=["controller"],
sleep_time_secs=[0], sleep_time_secs=[0],
metadata_quorum=[quorum.combined_kraft]) metadata_quorum=[quorum.combined_kraft],
def test_broker_type_bounce_at_start(self, failure_mode, broker_type, sleep_time_secs, metadata_quorum): group_protocol=["classic", "streams"])
def test_broker_type_bounce_at_start(self, failure_mode, broker_type, sleep_time_secs, metadata_quorum, group_protocol):
""" """
Start a smoke test client, then kill one particular broker immediately before streams stats Start a smoke test client, then kill one particular broker immediately before streams stats
Streams should throw an exception since it cannot create topics with the desired Streams should throw an exception since it cannot create topics with the desired
replication factor of 3 replication factor of 3
""" """
self.setup_system(start_processor=False) self.setup_system(start_processor=False, group_protocol=group_protocol)
# Sleep to allow test to run for a bit # Sleep to allow test to run for a bit
time.sleep(sleep_time_secs) time.sleep(sleep_time_secs)
@ -252,13 +254,14 @@ class StreamsBrokerBounceTest(Test):
@cluster(num_nodes=10) @cluster(num_nodes=10)
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
num_failures=[2], num_failures=[2],
metadata_quorum=[quorum.isolated_kraft]) metadata_quorum=[quorum.isolated_kraft],
def test_many_brokers_bounce(self, failure_mode, num_failures, metadata_quorum): group_protocol=["classic", "streams"])
def test_many_brokers_bounce(self, failure_mode, num_failures, metadata_quorum, group_protocol):
""" """
Start a smoke test client, then kill a few brokers and ensure data is still received Start a smoke test client, then kill a few brokers and ensure data is still received
Record if records are delivered Record if records are delivered
""" """
self.setup_system() self.setup_system(group_protocol=group_protocol)
# Sleep to allow test to run for a bit # Sleep to allow test to run for a bit
time.sleep(120) time.sleep(120)
@ -271,8 +274,9 @@ class StreamsBrokerBounceTest(Test):
@cluster(num_nodes=10) @cluster(num_nodes=10)
@matrix(failure_mode=["clean_bounce", "hard_bounce"], @matrix(failure_mode=["clean_bounce", "hard_bounce"],
num_failures=[3], num_failures=[3],
metadata_quorum=[quorum.isolated_kraft]) metadata_quorum=[quorum.isolated_kraft],
def test_all_brokers_bounce(self, failure_mode, num_failures, metadata_quorum): group_protocol=["classic", "streams"])
def test_all_brokers_bounce(self, failure_mode, num_failures, metadata_quorum, group_protocol):
""" """
Start a smoke test client, then kill a few brokers and ensure data is still received Start a smoke test client, then kill a few brokers and ensure data is still received
Record if records are delivered Record if records are delivered
@ -284,7 +288,7 @@ class StreamsBrokerBounceTest(Test):
self.topics['__consumer_offsets'] = { 'partitions': 50, 'replication-factor': self.replication, self.topics['__consumer_offsets'] = { 'partitions': 50, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": 1} } 'configs': {"min.insync.replicas": 1} }
self.setup_system() self.setup_system(group_protocol=group_protocol)
# Sleep to allow test to run for a bit # Sleep to allow test to run for a bit
time.sleep(120) time.sleep(120)