From 3f465fc1b6250ea337237487eca90ac23b3485da Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Wed, 7 May 2025 09:43:11 +0200 Subject: [PATCH] KAFKA-19202: Enable KIP-1071 in streams_standby_replica_test.py (#19625) New system test for KIP-1071. Standby replicas need to be enabled via `kafka-configs.sh`. Reviewers: Bill Bejeck , Matthias J. Sax --- .../streams/tests/StreamsStandByReplicaTest.java | 1 - .../kafkatest/tests/streams/base_streams_test.py | 15 ++++++++++++++- .../tests/streams/streams_standby_replica_test.py | 12 ++++++++---- 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java index d23915790b2..27771b5be16 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java @@ -64,7 +64,6 @@ public class StreamsStandByReplicaTest { Exit.exit(1); } - streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-standby-tasks"); streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); streamsProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); streamsProperties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); diff --git a/tests/kafkatest/tests/streams/base_streams_test.py b/tests/kafkatest/tests/streams/base_streams_test.py index 96ac192e606..81cad7a4d1b 100644 --- a/tests/kafkatest/tests/streams/base_streams_test.py +++ b/tests/kafkatest/tests/streams/base_streams_test.py @@ -33,6 +33,7 @@ class BaseStreamsTest(Test): self.num_controllers = num_controllers self.num_brokers = num_brokers self.topics = topics + self.use_streams_groups = True self.kafka = KafkaService( test_context, self.num_brokers, @@ -47,7 +48,8 @@ class BaseStreamsTest(Test): def setUp(self): self.kafka.start() - self.kafka.run_features_command("upgrade", "streams.version", 1) + if self.use_streams_groups: + self.kafka.run_features_command("upgrade", "streams.version", 1) def get_consumer(self, client_id, topic, num_messages): return VerifiableConsumer(self.test_context, @@ -96,6 +98,17 @@ class BaseStreamsTest(Test): timeout_sec=timeout_sec, err_msg="At %s streams did not process messages in %s seconds " % (test_state, timeout_sec)) + def configure_standby_replicas(self, group_id, num_standby_replicas): + force_use_zk_connection = not self.kafka.all_nodes_configs_command_uses_bootstrap_server() + node = self.kafka.nodes[0] + cmd = "%s --alter --add-config streams.num.standby.replicas=%d --entity-type groups --entity-name %s" % \ + ( + self.kafka.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_connection), + num_standby_replicas, + group_id + ) + node.account.ssh(cmd) + @staticmethod def get_configs(group_protocol="classic", extra_configs=""): # Consumer max.poll.interval > min(max.block.ms, ((retries + 1) * request.timeout) diff --git a/tests/kafkatest/tests/streams/streams_standby_replica_test.py b/tests/kafkatest/tests/streams/streams_standby_replica_test.py index 8270652645a..77e3c65aab9 100644 --- a/tests/kafkatest/tests/streams/streams_standby_replica_test.py +++ b/tests/kafkatest/tests/streams/streams_standby_replica_test.py @@ -47,19 +47,23 @@ class StreamsStandbyTask(BaseStreamsTest): }) @cluster(num_nodes=10) - @matrix(metadata_quorum=[quorum.isolated_kraft]) - def test_standby_tasks_rebalance(self, metadata_quorum): + @matrix(metadata_quorum=[quorum.combined_kraft], + group_protocol=["classic", "streams"]) + def test_standby_tasks_rebalance(self, metadata_quorum, group_protocol): # TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor configs = self.get_configs( - group_protocol="classic", + group_protocol=group_protocol, extra_configs= - ",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s,internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor" % ( + ",application.id=test_standby_tasks_rebalance,sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s,internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor" % ( self.streams_source_topic, self.streams_sink_topic_1, self.streams_sink_topic_2 ) ) + if group_protocol == "streams": + self.configure_standby_replicas("test_standby_tasks_rebalance", 1) + producer = self.get_producer(self.streams_source_topic, self.num_messages, throughput=15000, repeating_keys=6) producer.start()