KAFKA-19202: Enable KIP-1071 in streams_broker_down_resilience_test (#19594)

Enable KIP-1071 in the next system test.

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Lucas Brutschy 2025-04-30 09:32:34 +02:00 committed by GitHub
parent b0a26bc2f4
commit f5b8891b0c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 33 additions and 16 deletions

View File

@ -29,6 +29,7 @@ class BaseStreamsTest(Test):
see tests/kafkatest/tests/kafka_test.py for more info see tests/kafkatest/tests/kafka_test.py for more info
""" """
def __init__(self, test_context, topics, num_controllers=1, num_brokers=3): def __init__(self, test_context, topics, num_controllers=1, num_brokers=3):
super(BaseStreamsTest, self).__init__(test_context = test_context)
self.num_controllers = num_controllers self.num_controllers = num_controllers
self.num_brokers = num_brokers self.num_brokers = num_brokers
self.topics = topics self.topics = topics
@ -95,15 +96,16 @@ class BaseStreamsTest(Test):
err_msg="At %s streams did not process messages in %s seconds " % (test_state, timeout_sec)) err_msg="At %s streams did not process messages in %s seconds " % (test_state, timeout_sec))
@staticmethod @staticmethod
def get_configs(extra_configs=""): def get_configs(group_protocol="classic", extra_configs=""):
# Consumer max.poll.interval > min(max.block.ms, ((retries + 1) * request.timeout) # Consumer max.poll.interval > min(max.block.ms, ((retries + 1) * request.timeout)
consumer_poll_ms = "consumer.max.poll.interval.ms=50000" consumer_poll_ms = "consumer.max.poll.interval.ms=50000"
retries_config = "producer.retries=2" retries_config = "producer.retries=2"
request_timeout = "producer.request.timeout.ms=15000" request_timeout = "producer.request.timeout.ms=15000"
max_block_ms = "producer.max.block.ms=30000" max_block_ms = "producer.max.block.ms=30000"
group_protocol = "group.protocol=" + group_protocol
# java code expects configs in key=value,key=value format # java code expects configs in key=value,key=value format
updated_configs = consumer_poll_ms + "," + retries_config + "," + request_timeout + "," + max_block_ms + extra_configs updated_configs = consumer_poll_ms + "," + retries_config + "," + request_timeout + "," + max_block_ms + "," + group_protocol + extra_configs
return updated_configs return updated_configs

View File

@ -45,15 +45,17 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
pass pass
@cluster(num_nodes=7) @cluster(num_nodes=7)
@matrix(metadata_quorum=[quorum.combined_kraft]) @matrix(metadata_quorum=[quorum.combined_kraft],
def test_streams_resilient_to_broker_down(self, metadata_quorum): group_protocol=["classic", "streams"])
def test_streams_resilient_to_broker_down(self, metadata_quorum, group_protocol):
self.kafka.start() self.kafka.start()
# Broker should be down over 2x of retries * timeout ms # Broker should be down over 2x of retries * timeout ms
# So with (2 * 15000) = 30 seconds, we'll set downtime to 70 seconds # So with (2 * 15000) = 30 seconds, we'll set downtime to 70 seconds
broker_down_time_in_seconds = 70 broker_down_time_in_seconds = 70
processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, self.get_configs()) processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, self.get_configs(
group_protocol=group_protocol))
processor.start() processor.start()
self.assert_produce_consume(self.inputTopic, self.assert_produce_consume(self.inputTopic,
@ -82,13 +84,14 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
self.kafka.stop() self.kafka.stop()
@cluster(num_nodes=7) @cluster(num_nodes=7)
@matrix(metadata_quorum=[quorum.combined_kraft]) @matrix(metadata_quorum=[quorum.combined_kraft],
def test_streams_runs_with_broker_down_initially(self, metadata_quorum): group_protocol=["classic", "streams"])
def test_streams_runs_with_broker_down_initially(self, metadata_quorum, group_protocol):
self.kafka.start() self.kafka.start()
node = self.kafka.leader(self.inputTopic) node = self.kafka.leader(self.inputTopic)
self.kafka.stop_node(node) self.kafka.stop_node(node)
configs = self.get_configs(extra_configs=",application.id=starting_wo_broker_id") configs = self.get_configs(group_protocol=group_protocol, extra_configs=",application.id=starting_wo_broker_id")
# start streams with broker down initially # start streams with broker down initially
processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs) processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
@ -150,14 +153,19 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
self.kafka.stop() self.kafka.stop()
@cluster(num_nodes=9) @cluster(num_nodes=9)
@matrix(metadata_quorum=[quorum.combined_kraft]) @matrix(metadata_quorum=[quorum.combined_kraft],
def test_streams_should_scale_in_while_brokers_down(self, metadata_quorum): group_protocol=["classic", "streams"])
def test_streams_should_scale_in_while_brokers_down(self, metadata_quorum, group_protocol):
self.kafka.start() self.kafka.start()
extra_configs = ",application.id=shutdown_with_broker_down"
# TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor # TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor
if group_protocol == "classic":
extra_configs += ",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor"
configs = self.get_configs( configs = self.get_configs(
extra_configs=",application.id=shutdown_with_broker_down" + group_protocol=group_protocol,
",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor" extra_configs=extra_configs
) )
processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs) processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
@ -229,14 +237,19 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
self.kafka.stop() self.kafka.stop()
@cluster(num_nodes=9) @cluster(num_nodes=9)
@matrix(metadata_quorum=[quorum.combined_kraft]) @matrix(metadata_quorum=[quorum.combined_kraft],
def test_streams_should_failover_while_brokers_down(self, metadata_quorum): group_protocol=["classic", "streams"])
def test_streams_should_failover_while_brokers_down(self, metadata_quorum, group_protocol):
self.kafka.start() self.kafka.start()
extra_configs = ",application.id=shutdown_with_broker_down"
# TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor # TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor
if group_protocol == "classic":
extra_configs += ",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor"
configs = self.get_configs( configs = self.get_configs(
extra_configs=",application.id=failover_with_broker_down" + group_protocol=group_protocol,
",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor" extra_configs=extra_configs
) )
processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs) processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)

View File

@ -51,6 +51,8 @@ class StreamsStandbyTask(BaseStreamsTest):
def test_standby_tasks_rebalance(self, metadata_quorum): def test_standby_tasks_rebalance(self, metadata_quorum):
# TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor # TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor
configs = self.get_configs( configs = self.get_configs(
group_protocol="classic",
extra_configs=
",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s,internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor" % ( ",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s,internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor" % (
self.streams_source_topic, self.streams_source_topic,
self.streams_sink_topic_1, self.streams_sink_topic_1,