More rabbit_amqqueue:list_* functions

This commit is contained in:
Michael Klishin 2021-05-12 01:56:12 +03:00
parent 188216aad3
commit 6d8a3ff7b0
No known key found for this signature in database
GPG Key ID: E80EDCFA0CDB21EE
1 changed files with 37 additions and 8 deletions

View File

@ -35,13 +35,15 @@
-export([update/2, store_queue/1, update_decorators/1, policy_changed/2]).
-export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]).
-export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]).
-export([has_synchronised_mirrors_online/1]).
-export([has_synchronised_mirrors_online/1, is_match/2, is_in_virtual_host/2]).
-export([is_replicated/1, is_exclusive/1, is_not_exclusive/1, is_dead_exclusive/1]).
-export([list_local_quorum_queues/0, list_local_quorum_queue_names/0,
-export([list_local_quorum_queues/0, list_local_quorum_queue_names/0, list_local_stream_queues/0,
list_local_mirrored_classic_queues/0, list_local_mirrored_classic_names/0,
list_local_leaders/0, list_local_followers/0, get_quorum_nodes/1,
list_local_mirrored_classic_without_synchronised_mirrors/0,
list_local_mirrored_classic_without_synchronised_mirrors_for_cli/0]).
list_local_mirrored_classic_without_synchronised_mirrors_for_cli/0,
list_local_quorum_queues_with_name_matching/1,
list_local_quorum_queues_with_name_matching/2]).
-export([ensure_rabbit_queue_record_is_initialized/1]).
-export([format/1]).
-export([delete_immediately_by_resource/1]).
@ -59,7 +61,7 @@
-export([is_server_named_allowed/1]).
-export([check_max_age/1]).
-export([get_queue_type/1]).
-export([get_queue_type/1, get_resource_vhost_name/1, get_resource_name/1]).
%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
@ -471,11 +473,14 @@ rebalance_module(Q) when ?amqqueue_is_quorum(Q) ->
rebalance_module(Q) when ?amqqueue_is_classic(Q) ->
rabbit_mirror_queue_misc.
get_resource_name(#resource{name = Name}) ->
get_resource_name(#resource{name = Name}) ->
Name.
is_match(Subj, E) ->
nomatch /= re:run(Subj, E).
get_resource_vhost_name(#resource{virtual_host = VHostName}) ->
VHostName.
is_match(Subj, RegEx) ->
nomatch /= re:run(Subj, RegEx).
iterative_rebalance(ByNode, MaxQueuesDesired) ->
case maybe_migrate(ByNode, MaxQueuesDesired) of
@ -1072,6 +1077,7 @@ sample_n(Queues, N) when is_list(Queues) andalso is_integer(N) andalso N > 0 ->
list_by_type(classic) -> list_by_type(rabbit_classic_queue);
list_by_type(quorum) -> list_by_type(rabbit_quorum_queue);
list_by_type(stream) -> list_by_type(rabbit_stream_queue);
list_by_type(Type) ->
{atomic, Qs} =
mnesia:sync_transaction(
@ -1095,6 +1101,12 @@ list_local_quorum_queues() ->
amqqueue:get_state(Q) =/= crashed,
lists:member(node(), get_quorum_nodes(Q))].
-spec list_local_stream_queues() -> [amqqueue:amqqueue()].
list_local_stream_queues() ->
[ Q || Q <- list_by_type(stream),
amqqueue:get_state(Q) =/= crashed,
lists:member(node(), get_quorum_nodes(Q))].
-spec list_local_leaders() -> [amqqueue:amqqueue()].
list_local_leaders() ->
[ Q || Q <- list(),
@ -1153,6 +1165,21 @@ list_local_mirrored_classic_without_synchronised_mirrors_for_cli() ->
}
end || Q <- ClassicQs].
-spec list_local_quorum_queues_with_name_matching(binary()) -> [amqqueue:amqqueue()].
list_local_quorum_queues_with_name_matching(Pattern) ->
[ Q || Q <- list_by_type(quorum),
amqqueue:get_state(Q) =/= crashed,
lists:member(node(), get_quorum_nodes(Q)),
is_match(get_resource_name(amqqueue:get_name(Q)), Pattern)].
-spec list_local_quorum_queues_with_name_matching(vhost:name(), binary()) -> [amqqueue:amqqueue()].
list_local_quorum_queues_with_name_matching(VHostName, Pattern) ->
[ Q || Q <- list_by_type(quorum),
amqqueue:get_state(Q) =/= crashed,
lists:member(node(), get_quorum_nodes(Q)),
is_in_virtual_host(Q, VHostName),
is_match(get_resource_name(amqqueue:get_name(Q)), Pattern)].
is_local_to_node(QPid, Node) when ?IS_CLASSIC(QPid) ->
Node =:= node(QPid);
is_local_to_node({_, Leader} = QPid, Node) when ?IS_QUORUM(QPid) ->
@ -1160,8 +1187,10 @@ is_local_to_node({_, Leader} = QPid, Node) when ?IS_QUORUM(QPid) ->
is_local_to_node(_QPid, _Node) ->
false.
-spec list(rabbit_types:vhost()) -> [amqqueue:amqqueue()].
is_in_virtual_host(Q, VHostName) ->
VHostName =:= get_resource_vhost_name(amqqueue:get_name(Q)).
-spec list(vhost:name()) -> [amqqueue:amqqueue()].
list(VHostPath) ->
list(VHostPath, rabbit_queue).