From f5b8891b0c2be05880233f76922d2e431adf7268 Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Wed, 30 Apr 2025 09:32:34 +0200 Subject: [PATCH] KAFKA-19202: Enable KIP-1071 in streams_broker_down_resilience_test (#19594) Enable KIP-1071 in the next system test. Reviewers: Matthias J. Sax --- .../tests/streams/base_streams_test.py | 6 ++- .../streams_broker_down_resilience_test.py | 41 ++++++++++++------- .../streams/streams_standby_replica_test.py | 2 + 3 files changed, 33 insertions(+), 16 deletions(-) diff --git a/tests/kafkatest/tests/streams/base_streams_test.py b/tests/kafkatest/tests/streams/base_streams_test.py index 11294604ef6..00b4c378897 100644 --- a/tests/kafkatest/tests/streams/base_streams_test.py +++ b/tests/kafkatest/tests/streams/base_streams_test.py @@ -29,6 +29,7 @@ class BaseStreamsTest(Test): see tests/kafkatest/tests/kafka_test.py for more info """ def __init__(self, test_context, topics, num_controllers=1, num_brokers=3): + super(BaseStreamsTest, self).__init__(test_context = test_context) self.num_controllers = num_controllers self.num_brokers = num_brokers self.topics = topics @@ -95,15 +96,16 @@ class BaseStreamsTest(Test): err_msg="At %s streams did not process messages in %s seconds " % (test_state, timeout_sec)) @staticmethod - def get_configs(extra_configs=""): + def get_configs(group_protocol="classic", extra_configs=""): # Consumer max.poll.interval > min(max.block.ms, ((retries + 1) * request.timeout) consumer_poll_ms = "consumer.max.poll.interval.ms=50000" retries_config = "producer.retries=2" request_timeout = "producer.request.timeout.ms=15000" max_block_ms = "producer.max.block.ms=30000" + group_protocol = "group.protocol=" + group_protocol # java code expects configs in key=value,key=value format - updated_configs = consumer_poll_ms + "," + retries_config + "," + request_timeout + "," + max_block_ms + extra_configs + updated_configs = consumer_poll_ms + "," + retries_config + "," + request_timeout + "," + max_block_ms + "," + group_protocol + extra_configs return updated_configs diff --git a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py index 8eb66195236..3b9d0b43bf7 100644 --- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py +++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py @@ -45,15 +45,17 @@ class StreamsBrokerDownResilience(BaseStreamsTest): pass @cluster(num_nodes=7) - @matrix(metadata_quorum=[quorum.combined_kraft]) - def test_streams_resilient_to_broker_down(self, metadata_quorum): + @matrix(metadata_quorum=[quorum.combined_kraft], + group_protocol=["classic", "streams"]) + def test_streams_resilient_to_broker_down(self, metadata_quorum, group_protocol): self.kafka.start() # Broker should be down over 2x of retries * timeout ms # So with (2 * 15000) = 30 seconds, we'll set downtime to 70 seconds broker_down_time_in_seconds = 70 - processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, self.get_configs()) + processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, self.get_configs( + group_protocol=group_protocol)) processor.start() self.assert_produce_consume(self.inputTopic, @@ -82,13 +84,14 @@ class StreamsBrokerDownResilience(BaseStreamsTest): self.kafka.stop() @cluster(num_nodes=7) - @matrix(metadata_quorum=[quorum.combined_kraft]) - def test_streams_runs_with_broker_down_initially(self, metadata_quorum): + @matrix(metadata_quorum=[quorum.combined_kraft], + group_protocol=["classic", "streams"]) + def test_streams_runs_with_broker_down_initially(self, metadata_quorum, group_protocol): self.kafka.start() node = self.kafka.leader(self.inputTopic) self.kafka.stop_node(node) - configs = self.get_configs(extra_configs=",application.id=starting_wo_broker_id") + configs = self.get_configs(group_protocol=group_protocol, extra_configs=",application.id=starting_wo_broker_id") # start streams with broker down initially processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs) @@ -150,14 +153,19 @@ class StreamsBrokerDownResilience(BaseStreamsTest): self.kafka.stop() @cluster(num_nodes=9) - @matrix(metadata_quorum=[quorum.combined_kraft]) - def test_streams_should_scale_in_while_brokers_down(self, metadata_quorum): + @matrix(metadata_quorum=[quorum.combined_kraft], + group_protocol=["classic", "streams"]) + def test_streams_should_scale_in_while_brokers_down(self, metadata_quorum, group_protocol): self.kafka.start() + extra_configs = ",application.id=shutdown_with_broker_down" # TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor + if group_protocol == "classic": + extra_configs += ",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor" + configs = self.get_configs( - extra_configs=",application.id=shutdown_with_broker_down" + - ",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor" + group_protocol=group_protocol, + extra_configs=extra_configs ) processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs) @@ -229,14 +237,19 @@ class StreamsBrokerDownResilience(BaseStreamsTest): self.kafka.stop() @cluster(num_nodes=9) - @matrix(metadata_quorum=[quorum.combined_kraft]) - def test_streams_should_failover_while_brokers_down(self, metadata_quorum): + @matrix(metadata_quorum=[quorum.combined_kraft], + group_protocol=["classic", "streams"]) + def test_streams_should_failover_while_brokers_down(self, metadata_quorum, group_protocol): self.kafka.start() + extra_configs = ",application.id=shutdown_with_broker_down" # TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor + if group_protocol == "classic": + extra_configs += ",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor" + configs = self.get_configs( - extra_configs=",application.id=failover_with_broker_down" + - ",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor" + group_protocol=group_protocol, + extra_configs=extra_configs ) processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs) diff --git a/tests/kafkatest/tests/streams/streams_standby_replica_test.py b/tests/kafkatest/tests/streams/streams_standby_replica_test.py index 44cb12e5cd8..8270652645a 100644 --- a/tests/kafkatest/tests/streams/streams_standby_replica_test.py +++ b/tests/kafkatest/tests/streams/streams_standby_replica_test.py @@ -51,6 +51,8 @@ class StreamsStandbyTask(BaseStreamsTest): def test_standby_tasks_rebalance(self, metadata_quorum): # TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor configs = self.get_configs( + group_protocol="classic", + extra_configs= ",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s,internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor" % ( self.streams_source_topic, self.streams_sink_topic_1,