From 6d8a3ff7b0412713c0211a3150672d5790a80b5b Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 12 May 2021 01:56:12 +0300 Subject: [PATCH] More rabbit_amqqueue:list_* functions --- deps/rabbit/src/rabbit_amqqueue.erl | 45 ++++++++++++++++++++++++----- 1 file changed, 37 insertions(+), 8 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 196ce325b0..153c93cc0d 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -35,13 +35,15 @@ -export([update/2, store_queue/1, update_decorators/1, policy_changed/2]). -export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]). -export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]). --export([has_synchronised_mirrors_online/1]). +-export([has_synchronised_mirrors_online/1, is_match/2, is_in_virtual_host/2]). -export([is_replicated/1, is_exclusive/1, is_not_exclusive/1, is_dead_exclusive/1]). --export([list_local_quorum_queues/0, list_local_quorum_queue_names/0, +-export([list_local_quorum_queues/0, list_local_quorum_queue_names/0, list_local_stream_queues/0, list_local_mirrored_classic_queues/0, list_local_mirrored_classic_names/0, list_local_leaders/0, list_local_followers/0, get_quorum_nodes/1, list_local_mirrored_classic_without_synchronised_mirrors/0, - list_local_mirrored_classic_without_synchronised_mirrors_for_cli/0]). + list_local_mirrored_classic_without_synchronised_mirrors_for_cli/0, + list_local_quorum_queues_with_name_matching/1, + list_local_quorum_queues_with_name_matching/2]). -export([ensure_rabbit_queue_record_is_initialized/1]). -export([format/1]). -export([delete_immediately_by_resource/1]). @@ -59,7 +61,7 @@ -export([is_server_named_allowed/1]). -export([check_max_age/1]). --export([get_queue_type/1]). +-export([get_queue_type/1, get_resource_vhost_name/1, get_resource_name/1]). %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, @@ -471,11 +473,14 @@ rebalance_module(Q) when ?amqqueue_is_quorum(Q) -> rebalance_module(Q) when ?amqqueue_is_classic(Q) -> rabbit_mirror_queue_misc. -get_resource_name(#resource{name = Name}) -> +get_resource_name(#resource{name = Name}) -> Name. -is_match(Subj, E) -> - nomatch /= re:run(Subj, E). +get_resource_vhost_name(#resource{virtual_host = VHostName}) -> + VHostName. + +is_match(Subj, RegEx) -> + nomatch /= re:run(Subj, RegEx). iterative_rebalance(ByNode, MaxQueuesDesired) -> case maybe_migrate(ByNode, MaxQueuesDesired) of @@ -1072,6 +1077,7 @@ sample_n(Queues, N) when is_list(Queues) andalso is_integer(N) andalso N > 0 -> list_by_type(classic) -> list_by_type(rabbit_classic_queue); list_by_type(quorum) -> list_by_type(rabbit_quorum_queue); +list_by_type(stream) -> list_by_type(rabbit_stream_queue); list_by_type(Type) -> {atomic, Qs} = mnesia:sync_transaction( @@ -1095,6 +1101,12 @@ list_local_quorum_queues() -> amqqueue:get_state(Q) =/= crashed, lists:member(node(), get_quorum_nodes(Q))]. +-spec list_local_stream_queues() -> [amqqueue:amqqueue()]. +list_local_stream_queues() -> + [ Q || Q <- list_by_type(stream), + amqqueue:get_state(Q) =/= crashed, + lists:member(node(), get_quorum_nodes(Q))]. + -spec list_local_leaders() -> [amqqueue:amqqueue()]. list_local_leaders() -> [ Q || Q <- list(), @@ -1153,6 +1165,21 @@ list_local_mirrored_classic_without_synchronised_mirrors_for_cli() -> } end || Q <- ClassicQs]. +-spec list_local_quorum_queues_with_name_matching(binary()) -> [amqqueue:amqqueue()]. +list_local_quorum_queues_with_name_matching(Pattern) -> + [ Q || Q <- list_by_type(quorum), + amqqueue:get_state(Q) =/= crashed, + lists:member(node(), get_quorum_nodes(Q)), + is_match(get_resource_name(amqqueue:get_name(Q)), Pattern)]. + +-spec list_local_quorum_queues_with_name_matching(vhost:name(), binary()) -> [amqqueue:amqqueue()]. +list_local_quorum_queues_with_name_matching(VHostName, Pattern) -> + [ Q || Q <- list_by_type(quorum), + amqqueue:get_state(Q) =/= crashed, + lists:member(node(), get_quorum_nodes(Q)), + is_in_virtual_host(Q, VHostName), + is_match(get_resource_name(amqqueue:get_name(Q)), Pattern)]. + is_local_to_node(QPid, Node) when ?IS_CLASSIC(QPid) -> Node =:= node(QPid); is_local_to_node({_, Leader} = QPid, Node) when ?IS_QUORUM(QPid) -> @@ -1160,8 +1187,10 @@ is_local_to_node({_, Leader} = QPid, Node) when ?IS_QUORUM(QPid) -> is_local_to_node(_QPid, _Node) -> false. --spec list(rabbit_types:vhost()) -> [amqqueue:amqqueue()]. +is_in_virtual_host(Q, VHostName) -> + VHostName =:= get_resource_vhost_name(amqqueue:get_name(Q)). +-spec list(vhost:name()) -> [amqqueue:amqqueue()]. list(VHostPath) -> list(VHostPath, rabbit_queue).