mirror of https://github.com/apache/kafka.git
KAFKA-19202: Enable KIP-1071 in streams_standby_replica_test.py (#19625)
New system test for KIP-1071. Standby replicas need to be enabled via `kafka-configs.sh`. Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
parent
e1da318722
commit
3f465fc1b6
|
@ -64,7 +64,6 @@ public class StreamsStandByReplicaTest {
|
|||
Exit.exit(1);
|
||||
}
|
||||
|
||||
streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-standby-tasks");
|
||||
streamsProperties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
|
||||
streamsProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
|
||||
streamsProperties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
|
|
|
@ -33,6 +33,7 @@ class BaseStreamsTest(Test):
|
|||
self.num_controllers = num_controllers
|
||||
self.num_brokers = num_brokers
|
||||
self.topics = topics
|
||||
self.use_streams_groups = True
|
||||
|
||||
self.kafka = KafkaService(
|
||||
test_context, self.num_brokers,
|
||||
|
@ -47,7 +48,8 @@ class BaseStreamsTest(Test):
|
|||
|
||||
def setUp(self):
|
||||
self.kafka.start()
|
||||
self.kafka.run_features_command("upgrade", "streams.version", 1)
|
||||
if self.use_streams_groups:
|
||||
self.kafka.run_features_command("upgrade", "streams.version", 1)
|
||||
|
||||
def get_consumer(self, client_id, topic, num_messages):
|
||||
return VerifiableConsumer(self.test_context,
|
||||
|
@ -96,6 +98,17 @@ class BaseStreamsTest(Test):
|
|||
timeout_sec=timeout_sec,
|
||||
err_msg="At %s streams did not process messages in %s seconds " % (test_state, timeout_sec))
|
||||
|
||||
def configure_standby_replicas(self, group_id, num_standby_replicas):
|
||||
force_use_zk_connection = not self.kafka.all_nodes_configs_command_uses_bootstrap_server()
|
||||
node = self.kafka.nodes[0]
|
||||
cmd = "%s --alter --add-config streams.num.standby.replicas=%d --entity-type groups --entity-name %s" % \
|
||||
(
|
||||
self.kafka.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_connection),
|
||||
num_standby_replicas,
|
||||
group_id
|
||||
)
|
||||
node.account.ssh(cmd)
|
||||
|
||||
@staticmethod
|
||||
def get_configs(group_protocol="classic", extra_configs=""):
|
||||
# Consumer max.poll.interval > min(max.block.ms, ((retries + 1) * request.timeout)
|
||||
|
|
|
@ -47,19 +47,23 @@ class StreamsStandbyTask(BaseStreamsTest):
|
|||
})
|
||||
|
||||
@cluster(num_nodes=10)
|
||||
@matrix(metadata_quorum=[quorum.isolated_kraft])
|
||||
def test_standby_tasks_rebalance(self, metadata_quorum):
|
||||
@matrix(metadata_quorum=[quorum.combined_kraft],
|
||||
group_protocol=["classic", "streams"])
|
||||
def test_standby_tasks_rebalance(self, metadata_quorum, group_protocol):
|
||||
# TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor
|
||||
configs = self.get_configs(
|
||||
group_protocol="classic",
|
||||
group_protocol=group_protocol,
|
||||
extra_configs=
|
||||
",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s,internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor" % (
|
||||
",application.id=test_standby_tasks_rebalance,sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s,internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor" % (
|
||||
self.streams_source_topic,
|
||||
self.streams_sink_topic_1,
|
||||
self.streams_sink_topic_2
|
||||
)
|
||||
)
|
||||
|
||||
if group_protocol == "streams":
|
||||
self.configure_standby_replicas("test_standby_tasks_rebalance", 1)
|
||||
|
||||
producer = self.get_producer(self.streams_source_topic, self.num_messages, throughput=15000, repeating_keys=6)
|
||||
producer.start()
|
||||
|
||||
|
|
Loading…
Reference in New Issue