Add marker rabbit_queue_type behaviour
And use the implementing module as the value of the amqqueue record `type` field. This will allow for easy dispatch to the queue type implementation. Make amqqueue compatible with the classic queue tag
This commit is contained in:
parent
e59dcbe3f4
commit
344492576f
|
|
@ -51,16 +51,16 @@
|
|||
(?is_amqqueue_v1(Q) andalso
|
||||
?amqqueue_v1_field_state(Q) =:= State))).
|
||||
|
||||
-define(amqqueue_v1_type, classic).
|
||||
-define(amqqueue_v1_type, rabbit_classic_queue).
|
||||
|
||||
-define(amqqueue_is_classic(Q),
|
||||
((?is_amqqueue_v2(Q) andalso
|
||||
?amqqueue_v2_field_type(Q) =:= classic) orelse
|
||||
?amqqueue_v2_field_type(Q) =:= rabbit_classic_queue) orelse
|
||||
?is_amqqueue_v1(Q))).
|
||||
|
||||
-define(amqqueue_is_quorum(Q),
|
||||
(?is_amqqueue_v2(Q) andalso
|
||||
?amqqueue_v2_field_type(Q) =:= quorum) orelse
|
||||
?amqqueue_v2_field_type(Q) =:= rabbit_quorum_queue) orelse
|
||||
false).
|
||||
|
||||
-define(amqqueue_has_valid_pid(Q),
|
||||
|
|
|
|||
|
|
@ -19,4 +19,4 @@
|
|||
-define(amqqueue_v2_field_vhost(Q), element(18, Q)).
|
||||
-define(amqqueue_v2_field_options(Q), element(19, Q)).
|
||||
-define(amqqueue_v2_field_type(Q), element(20, Q)).
|
||||
-define(amqqueue_v2_field_quorum_nodes(Q), element(21, Q)).
|
||||
-define(amqqueue_v2_field_type_state(Q), element(21, Q)).
|
||||
|
|
|
|||
|
|
@ -57,9 +57,9 @@
|
|||
% policy_version
|
||||
get_policy_version/1,
|
||||
set_policy_version/2,
|
||||
% quorum_nodes
|
||||
get_quorum_nodes/1,
|
||||
set_quorum_nodes/2,
|
||||
% type_state
|
||||
get_type_state/1,
|
||||
set_type_state/2,
|
||||
% recoverable_slaves
|
||||
get_recoverable_slaves/1,
|
||||
set_recoverable_slaves/2,
|
||||
|
|
@ -91,6 +91,8 @@
|
|||
macros/0]).
|
||||
|
||||
-define(record_version, amqqueue_v2).
|
||||
-define(is_backwards_compat_classic(T),
|
||||
(T =:= classic orelse T =:= ?amqqueue_v1_type)).
|
||||
|
||||
-record(amqqueue, {
|
||||
name :: rabbit_amqqueue:name() | '_', %% immutable
|
||||
|
|
@ -118,8 +120,8 @@
|
|||
slave_pids_pending_shutdown = [] :: [pid()] | '_',
|
||||
vhost :: rabbit_types:vhost() | undefined | '_', %% secondary index
|
||||
options = #{} :: map() | '_',
|
||||
type = ?amqqueue_v1_type :: atom() | '_',
|
||||
quorum_nodes = [] :: [node()] | '_'
|
||||
type = ?amqqueue_v1_type :: module() | '_',
|
||||
type_state = #{} :: map() | '_'
|
||||
}).
|
||||
|
||||
-type amqqueue() :: amqqueue_v1:amqqueue_v1() | amqqueue_v2().
|
||||
|
|
@ -143,7 +145,7 @@
|
|||
vhost :: rabbit_types:vhost() | undefined,
|
||||
options :: map(),
|
||||
type :: atom(),
|
||||
quorum_nodes :: [node()]
|
||||
type_state :: #{}
|
||||
}.
|
||||
|
||||
-type ra_server_id() :: {Name :: atom(), Node :: node()}.
|
||||
|
|
@ -170,7 +172,7 @@
|
|||
vhost :: '_',
|
||||
options :: '_',
|
||||
type :: atom() | '_',
|
||||
quorum_nodes :: '_'
|
||||
type_state :: '_'
|
||||
}.
|
||||
|
||||
-export_type([amqqueue/0,
|
||||
|
|
@ -341,7 +343,7 @@ new_with_version(?record_version,
|
|||
pid = Pid,
|
||||
vhost = VHost,
|
||||
options = Options,
|
||||
type = Type};
|
||||
type = ensure_type_compat(Type)};
|
||||
new_with_version(Version,
|
||||
Name,
|
||||
Pid,
|
||||
|
|
@ -351,7 +353,8 @@ new_with_version(Version,
|
|||
Args,
|
||||
VHost,
|
||||
Options,
|
||||
?amqqueue_v1_type) ->
|
||||
Type)
|
||||
when ?is_backwards_compat_classic(Type) ->
|
||||
amqqueue_v1:new_with_version(
|
||||
Version,
|
||||
Name,
|
||||
|
|
@ -451,7 +454,7 @@ set_gm_pids(Queue, GMPids) ->
|
|||
|
||||
-spec get_leader(amqqueue_v2()) -> node().
|
||||
|
||||
get_leader(#amqqueue{type = quorum, pid = {_, Leader}}) -> Leader.
|
||||
get_leader(#amqqueue{type = rabbit_quorum_queue, pid = {_, Leader}}) -> Leader.
|
||||
|
||||
% operator_policy
|
||||
|
||||
|
|
@ -551,18 +554,16 @@ set_recoverable_slaves(#amqqueue{} = Queue, Slaves) ->
|
|||
set_recoverable_slaves(Queue, Slaves) ->
|
||||
amqqueue_v1:set_recoverable_slaves(Queue, Slaves).
|
||||
|
||||
% quorum_nodes (new in v2)
|
||||
% type_state (new in v2)
|
||||
|
||||
-spec get_quorum_nodes(amqqueue()) -> [node()].
|
||||
-spec get_type_state(amqqueue()) -> map().
|
||||
get_type_state(#amqqueue{type_state = TState}) -> TState;
|
||||
get_type_state(_) -> [].
|
||||
|
||||
get_quorum_nodes(#amqqueue{quorum_nodes = Nodes}) -> Nodes;
|
||||
get_quorum_nodes(_) -> [].
|
||||
|
||||
-spec set_quorum_nodes(amqqueue(), [node()]) -> amqqueue().
|
||||
|
||||
set_quorum_nodes(#amqqueue{} = Queue, Nodes) ->
|
||||
Queue#amqqueue{quorum_nodes = Nodes};
|
||||
set_quorum_nodes(Queue, _Nodes) ->
|
||||
-spec set_type_state(amqqueue(), map()) -> amqqueue().
|
||||
set_type_state(#amqqueue{} = Queue, TState) ->
|
||||
Queue#amqqueue{type_state = TState};
|
||||
set_type_state(Queue, _TState) ->
|
||||
Queue.
|
||||
|
||||
% slave_pids
|
||||
|
|
@ -660,7 +661,7 @@ is_classic(Queue) ->
|
|||
-spec is_quorum(amqqueue()) -> boolean().
|
||||
|
||||
is_quorum(Queue) ->
|
||||
get_type(Queue) =:= quorum.
|
||||
get_type(Queue) =:= rabbit_quorum_queue.
|
||||
|
||||
fields() ->
|
||||
case record_version_to_use() of
|
||||
|
|
@ -697,13 +698,16 @@ pattern_match_on_name(Name) ->
|
|||
|
||||
pattern_match_on_type(Type) ->
|
||||
case record_version_to_use() of
|
||||
?record_version -> #amqqueue{type = Type, _ = '_'};
|
||||
_ when Type =:= classic -> amqqueue_v1:pattern_match_all();
|
||||
?record_version ->
|
||||
#amqqueue{type = Type, _ = '_'};
|
||||
_ when ?is_backwards_compat_classic(Type) ->
|
||||
amqqueue_v1:pattern_match_all();
|
||||
%% FIXME: We try a pattern which should never match when the
|
||||
%% `quorum_queue` feature flag is not enabled yet. Is there
|
||||
%% a better solution?
|
||||
_ -> amqqueue_v1:pattern_match_on_name(
|
||||
rabbit_misc:r(<<0>>, queue, <<0>>))
|
||||
_ ->
|
||||
amqqueue_v1:pattern_match_on_name(
|
||||
rabbit_misc:r(<<0>>, queue, <<0>>))
|
||||
end.
|
||||
|
||||
-spec reset_mirroring_and_decorators(amqqueue()) -> amqqueue().
|
||||
|
|
@ -757,3 +761,8 @@ macros([Field | Rest], I) ->
|
|||
macros(Rest, I + 1);
|
||||
macros([], _) ->
|
||||
ok.
|
||||
|
||||
ensure_type_compat(classic) ->
|
||||
?amqqueue_v1_type;
|
||||
ensure_type_compat(Type) ->
|
||||
Type.
|
||||
|
|
|
|||
|
|
@ -57,9 +57,9 @@
|
|||
% policy_version
|
||||
get_policy_version/1,
|
||||
set_policy_version/2,
|
||||
% quorum_nodes
|
||||
get_quorum_nodes/1,
|
||||
set_quorum_nodes/2,
|
||||
% type_state
|
||||
get_type_state/1,
|
||||
set_type_state/2,
|
||||
% recoverable_slaves
|
||||
get_recoverable_slaves/1,
|
||||
set_recoverable_slaves/2,
|
||||
|
|
@ -93,6 +93,8 @@
|
|||
-dialyzer({nowarn_function, is_quorum/1}).
|
||||
|
||||
-define(record_version, ?MODULE).
|
||||
-define(is_backwards_compat_classic(T),
|
||||
(T =:= classic orelse T =:= ?amqqueue_v1_type)).
|
||||
|
||||
-record(amqqueue, {
|
||||
name :: rabbit_amqqueue:name() | '_', %% immutable
|
||||
|
|
@ -214,7 +216,7 @@ new(#resource{kind = queue} = Name,
|
|||
rabbit_framing:amqp_table(),
|
||||
rabbit_types:vhost() | undefined,
|
||||
map(),
|
||||
?amqqueue_v1_type) -> amqqueue().
|
||||
?amqqueue_v1_type | classic) -> amqqueue().
|
||||
|
||||
new(#resource{kind = queue} = Name,
|
||||
Pid,
|
||||
|
|
@ -224,14 +226,15 @@ new(#resource{kind = queue} = Name,
|
|||
Args,
|
||||
VHost,
|
||||
Options,
|
||||
?amqqueue_v1_type)
|
||||
Type)
|
||||
when (is_pid(Pid) orelse Pid =:= none) andalso
|
||||
is_boolean(Durable) andalso
|
||||
is_boolean(AutoDelete) andalso
|
||||
(is_pid(Owner) orelse Owner =:= none) andalso
|
||||
is_list(Args) andalso
|
||||
(is_binary(VHost) orelse VHost =:= undefined) andalso
|
||||
is_map(Options) ->
|
||||
is_map(Options) andalso
|
||||
?is_backwards_compat_classic(Type) ->
|
||||
new(
|
||||
Name,
|
||||
Pid,
|
||||
|
|
@ -297,14 +300,15 @@ new_with_version(?record_version,
|
|||
Args,
|
||||
VHost,
|
||||
Options,
|
||||
?amqqueue_v1_type)
|
||||
Type)
|
||||
when (is_pid(Pid) orelse Pid =:= none) andalso
|
||||
is_boolean(Durable) andalso
|
||||
is_boolean(AutoDelete) andalso
|
||||
(is_pid(Owner) orelse Owner =:= none) andalso
|
||||
is_list(Args) andalso
|
||||
(is_binary(VHost) orelse VHost =:= undefined) andalso
|
||||
is_map(Options) ->
|
||||
is_map(Options) andalso
|
||||
?is_backwards_compat_classic(Type) ->
|
||||
new_with_version(
|
||||
?record_version,
|
||||
Name,
|
||||
|
|
@ -451,16 +455,16 @@ get_recoverable_slaves(#amqqueue{recoverable_slaves = Slaves}) ->
|
|||
set_recoverable_slaves(#amqqueue{} = Queue, Slaves) ->
|
||||
Queue#amqqueue{recoverable_slaves = Slaves}.
|
||||
|
||||
% quorum_nodes (new in v2)
|
||||
% type_state (new in v2)
|
||||
|
||||
-spec get_quorum_nodes(amqqueue()) -> no_return().
|
||||
-spec get_type_state(amqqueue()) -> no_return().
|
||||
|
||||
get_quorum_nodes(_) -> throw({unsupported, ?record_version, get_quorum_nodes}).
|
||||
get_type_state(_) -> throw({unsupported, ?record_version, get_type_state}).
|
||||
|
||||
-spec set_quorum_nodes(amqqueue(), [node()]) -> no_return().
|
||||
-spec set_type_state(amqqueue(), [node()]) -> no_return().
|
||||
|
||||
set_quorum_nodes(_, _) ->
|
||||
throw({unsupported, ?record_version, set_quorum_nodes}).
|
||||
set_type_state(_, _) ->
|
||||
throw({unsupported, ?record_version, set_type_state}).
|
||||
|
||||
% slave_pids
|
||||
|
||||
|
|
@ -527,8 +531,8 @@ is_classic(Queue) ->
|
|||
|
||||
-spec is_quorum(amqqueue()) -> boolean().
|
||||
|
||||
is_quorum(Queue) ->
|
||||
get_type(Queue) =:= quorum.
|
||||
is_quorum(Queue) when ?is_amqqueue(Queue) ->
|
||||
false.
|
||||
|
||||
fields() -> fields(?record_version).
|
||||
|
||||
|
|
|
|||
|
|
@ -44,7 +44,8 @@
|
|||
-export([update_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]).
|
||||
-export([emit_unresponsive/6, emit_unresponsive_local/5, is_unresponsive/2]).
|
||||
-export([is_replicated/1, is_dead_exclusive/1]). % Note: exported due to use in qlc expression.
|
||||
-export([list_local_followers/0]).
|
||||
-export([list_local_followers/0,
|
||||
get_quorum_nodes/1]).
|
||||
-export([ensure_rabbit_queue_record_is_initialized/1]).
|
||||
-export([format/1]).
|
||||
-export([delete_immediately_by_resource/1]).
|
||||
|
|
@ -192,7 +193,7 @@ find_local_quorum_queues(VHost) ->
|
|||
qlc:e(qlc:q([Q || Q <- mnesia:table(rabbit_durable_queue),
|
||||
amqqueue:get_vhost(Q) =:= VHost,
|
||||
amqqueue:is_quorum(Q) andalso
|
||||
(lists:member(Node, amqqueue:get_quorum_nodes(Q)))]))
|
||||
(lists:member(Node, get_quorum_nodes(Q)))]))
|
||||
end).
|
||||
|
||||
find_local_durable_classic_queues(VHost) ->
|
||||
|
|
@ -225,7 +226,7 @@ find_recoverable_queues() ->
|
|||
%% - if the record is present - in order to restart.
|
||||
(mnesia:read(rabbit_queue, amqqueue:get_name(Q), read) =:= []
|
||||
orelse not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q)))))
|
||||
orelse (amqqueue:is_quorum(Q) andalso lists:member(Node, amqqueue:get_quorum_nodes(Q)))
|
||||
orelse (amqqueue:is_quorum(Q) andalso lists:member(Node, get_quorum_nodes(Q)))
|
||||
]))
|
||||
end).
|
||||
|
||||
|
|
@ -274,7 +275,7 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
|
|||
ok = check_declare_arguments(QueueName, Args),
|
||||
Type = get_queue_type(Args),
|
||||
TypeIsAllowed =
|
||||
Type =:= classic orelse
|
||||
Type =:= rabbit_classic_queue orelse
|
||||
rabbit_feature_flags:is_enabled(quorum_queue),
|
||||
case TypeIsAllowed of
|
||||
true ->
|
||||
|
|
@ -325,9 +326,16 @@ declare_classic_queue(Q, Node) ->
|
|||
get_queue_type(Args) ->
|
||||
case rabbit_misc:table_lookup(Args, <<"x-queue-type">>) of
|
||||
undefined ->
|
||||
classic;
|
||||
rabbit_classic_queue;
|
||||
{_, V} ->
|
||||
erlang:binary_to_existing_atom(V, utf8)
|
||||
%% TODO: this mapping of "friendly" queue type name to the
|
||||
%% implementing module should be part of some kind of registry
|
||||
case V of
|
||||
<<"quorum">> ->
|
||||
rabbit_quorum_queue;
|
||||
<<"classic">> ->
|
||||
rabbit_classic_queue
|
||||
end
|
||||
end.
|
||||
|
||||
-spec internal_declare(amqqueue:amqqueue(), boolean()) ->
|
||||
|
|
@ -824,7 +832,7 @@ list_local_followers() ->
|
|||
[ amqqueue:get_name(Q)
|
||||
|| Q <- list(),
|
||||
amqqueue:is_quorum(Q),
|
||||
amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =/= node(), lists:member(node(), amqqueue:get_quorum_nodes(Q))].
|
||||
amqqueue:get_state(Q) =/= crashed, amqqueue:get_leader(Q) =/= node(), lists:member(node(), get_quorum_nodes(Q))].
|
||||
|
||||
is_local_to_node(QPid, Node) when ?IS_CLASSIC(QPid) ->
|
||||
Node =:= node(QPid);
|
||||
|
|
@ -1534,7 +1542,7 @@ forget_all_durable(Node) ->
|
|||
%% recovery.
|
||||
forget_node_for_queue(DeadNode, Q)
|
||||
when ?amqqueue_is_quorum(Q) ->
|
||||
QN = amqqueue:get_quorum_nodes(Q),
|
||||
QN = get_quorum_nodes(Q),
|
||||
forget_node_for_queue(DeadNode, QN, Q);
|
||||
forget_node_for_queue(DeadNode, Q) ->
|
||||
RS = amqqueue:get_recoverable_slaves(Q),
|
||||
|
|
@ -1555,9 +1563,11 @@ forget_node_for_queue(DeadNode, [H|T], Q) when ?is_amqqueue(Q) ->
|
|||
Type = amqqueue:get_type(Q),
|
||||
case {node_permits_offline_promotion(H), Type} of
|
||||
{false, _} -> forget_node_for_queue(DeadNode, T, Q);
|
||||
{true, classic} -> Q1 = amqqueue:set_pid(Q, rabbit_misc:node_to_fake_pid(H)),
|
||||
{true, rabbit_classic_queue} ->
|
||||
Q1 = amqqueue:set_pid(Q, rabbit_misc:node_to_fake_pid(H)),
|
||||
ok = mnesia:write(rabbit_durable_queue, Q1, write);
|
||||
{true, quorum} -> ok
|
||||
{true, rabbit_quorum_queue} ->
|
||||
ok
|
||||
end.
|
||||
|
||||
node_permits_offline_promotion(Node) ->
|
||||
|
|
@ -1755,7 +1765,7 @@ pseudo_queue(#resource{kind = queue} = QueueName, Pid, Durable)
|
|||
[],
|
||||
undefined, % VHost,
|
||||
#{user => undefined}, % ActingUser
|
||||
classic % Type
|
||||
rabbit_classic_queue % Type
|
||||
).
|
||||
|
||||
-spec immutable(amqqueue:amqqueue()) -> amqqueue:amqqueue().
|
||||
|
|
@ -1864,3 +1874,11 @@ get_quorum_state({Name, _} = Id, QName, Map) ->
|
|||
|
||||
get_quorum_state({Name, _}, Map) ->
|
||||
maps:get(Name, Map).
|
||||
|
||||
get_quorum_nodes(Q) when ?is_amqqueue(Q) ->
|
||||
case amqqueue:get_type_state(Q) of
|
||||
#{nodes := Nodes} ->
|
||||
Nodes;
|
||||
_ ->
|
||||
[]
|
||||
end.
|
||||
|
|
|
|||
|
|
@ -81,7 +81,7 @@ init_state({Name, _}, QName = #resource{}) ->
|
|||
%% know what to do if the queue has `disappeared`. Let it crash.
|
||||
{ok, Q} = rabbit_amqqueue:lookup(QName),
|
||||
Leader = amqqueue:get_pid(Q),
|
||||
Nodes = amqqueue:get_quorum_nodes(Q),
|
||||
Nodes = rabbit_amqqueue:get_quorum_nodes(Q),
|
||||
%% Ensure the leader is listed first
|
||||
Servers0 = [{Name, N} || N <- Nodes],
|
||||
Servers = [Leader | lists:delete(Leader, Servers0)],
|
||||
|
|
@ -117,7 +117,7 @@ declare(Q) when ?amqqueue_is_quorum(Q) ->
|
|||
Id = {RaName, node()},
|
||||
Nodes = select_quorum_nodes(QuorumSize, rabbit_mnesia:cluster_nodes(all)),
|
||||
NewQ0 = amqqueue:set_pid(Q, Id),
|
||||
NewQ1 = amqqueue:set_quorum_nodes(NewQ0, Nodes),
|
||||
NewQ1 = amqqueue:set_type_state(NewQ0, #{nodes => Nodes}),
|
||||
case rabbit_amqqueue:internal_declare(NewQ1, false) of
|
||||
{created, NewQ} ->
|
||||
TickTimeout = application:get_env(rabbit, quorum_tick_interval, ?TICK_TIMEOUT),
|
||||
|
|
@ -222,7 +222,7 @@ become_leader(QName, Name) ->
|
|||
end),
|
||||
case rabbit_amqqueue:lookup(QName) of
|
||||
{ok, Q0} when ?is_amqqueue(Q0) ->
|
||||
Nodes = amqqueue:get_quorum_nodes(Q0),
|
||||
Nodes = get_nodes(Q0),
|
||||
[rpc:call(Node, ?MODULE, rpc_delete_metrics,
|
||||
[QName], ?RPC_TIMEOUT)
|
||||
|| Node <- Nodes, Node =/= node()];
|
||||
|
|
@ -369,7 +369,7 @@ delete(Q,
|
|||
_IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
|
||||
{Name, _} = amqqueue:get_pid(Q),
|
||||
QName = amqqueue:get_name(Q),
|
||||
QNodes = amqqueue:get_quorum_nodes(Q),
|
||||
QNodes = get_nodes(Q),
|
||||
%% TODO Quorum queue needs to support consumer tracking for IfUnused
|
||||
Timeout = ?DELETE_TIMEOUT,
|
||||
{ok, ReadyMsgs, _} = stat(Q),
|
||||
|
|
@ -600,8 +600,8 @@ cleanup_data_dir() ->
|
|||
{Name, _} = amqqueue:get_pid(Q),
|
||||
Name
|
||||
end
|
||||
|| Q <- rabbit_amqqueue:list_by_type(quorum),
|
||||
lists:member(node(), amqqueue:get_quorum_nodes(Q))],
|
||||
|| Q <- rabbit_amqqueue:list_by_type(?MODULE),
|
||||
lists:member(node(), get_nodes(Q))],
|
||||
Registered = ra_directory:list_registered(),
|
||||
_ = [maybe_delete_data_dir(UId) || {Name, UId} <- Registered,
|
||||
not lists:member(Name, Names)],
|
||||
|
|
@ -644,7 +644,7 @@ status(Vhost, QueueName) ->
|
|||
{ok, Q} when ?amqqueue_is_classic(Q) ->
|
||||
{error, classic_queue_not_supported};
|
||||
{ok, Q} when ?amqqueue_is_quorum(Q) ->
|
||||
Nodes = amqqueue:get_quorum_nodes(Q),
|
||||
Nodes = get_nodes(Q),
|
||||
[begin
|
||||
case get_sys_status({RName, N}) of
|
||||
{ok, Sys} ->
|
||||
|
|
@ -696,7 +696,7 @@ add_member(VHost, Name, Node, Timeout) ->
|
|||
{ok, Q} when ?amqqueue_is_classic(Q) ->
|
||||
{error, classic_queue_not_supported};
|
||||
{ok, Q} when ?amqqueue_is_quorum(Q) ->
|
||||
QNodes = amqqueue:get_quorum_nodes(Q),
|
||||
QNodes = get_nodes(Q),
|
||||
case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) of
|
||||
false ->
|
||||
{error, node_not_running};
|
||||
|
|
@ -727,9 +727,10 @@ add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) ->
|
|||
case ra:add_member(Members, ServerId, Timeout) of
|
||||
{ok, _, Leader} ->
|
||||
Fun = fun(Q1) ->
|
||||
Q2 = amqqueue:set_quorum_nodes(
|
||||
Q1,
|
||||
[Node | amqqueue:get_quorum_nodes(Q1)]),
|
||||
Q2 = update_type_state(
|
||||
Q1, fun(#{nodes := Nodes} = Ts) ->
|
||||
Ts#{nodes => [Node | Nodes]}
|
||||
end),
|
||||
amqqueue:set_pid(Q2, Leader)
|
||||
end,
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
|
|
@ -753,7 +754,7 @@ delete_member(VHost, Name, Node) ->
|
|||
{ok, Q} when ?amqqueue_is_classic(Q) ->
|
||||
{error, classic_queue_not_supported};
|
||||
{ok, Q} when ?amqqueue_is_quorum(Q) ->
|
||||
QNodes = amqqueue:get_quorum_nodes(Q),
|
||||
QNodes = get_nodes(Q),
|
||||
case lists:member(Node, QNodes) of
|
||||
false ->
|
||||
%% idempotent by design
|
||||
|
|
@ -779,10 +780,11 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
|
|||
case ra:leave_and_delete_server(Members, ServerId) of
|
||||
ok ->
|
||||
Fun = fun(Q1) ->
|
||||
amqqueue:set_quorum_nodes(
|
||||
update_type_state(
|
||||
Q1,
|
||||
lists:delete(Node,
|
||||
amqqueue:get_quorum_nodes(Q1)))
|
||||
fun(#{nodes := Nodes} = Ts) ->
|
||||
Ts#{nodes => lists:delete(Node, Nodes)}
|
||||
end)
|
||||
end,
|
||||
rabbit_misc:execute_mnesia_transaction(
|
||||
fun() -> rabbit_amqqueue:update(QName, Fun) end),
|
||||
|
|
@ -802,7 +804,7 @@ shrink_all(Node) ->
|
|||
QName = amqqueue:get_name(Q),
|
||||
rabbit_log:info("~s: removing member (replica) on node ~w",
|
||||
[rabbit_misc:rs(QName), Node]),
|
||||
Size = length(amqqueue:get_quorum_nodes(Q)),
|
||||
Size = length(get_nodes(Q)),
|
||||
case delete_member(Q, Node) of
|
||||
ok ->
|
||||
{QName, {ok, Size-1}};
|
||||
|
|
@ -812,8 +814,8 @@ shrink_all(Node) ->
|
|||
{QName, {error, Size, Err}}
|
||||
end
|
||||
end || Q <- rabbit_amqqueue:list(),
|
||||
amqqueue:get_type(Q) == quorum,
|
||||
lists:member(Node, amqqueue:get_quorum_nodes(Q))].
|
||||
amqqueue:get_type(Q) == ?MODULE,
|
||||
lists:member(Node, get_nodes(Q))].
|
||||
|
||||
-spec grow(node(), binary(), binary(), all | even) ->
|
||||
[{rabbit_amqqueue:name(),
|
||||
|
|
@ -821,7 +823,7 @@ shrink_all(Node) ->
|
|||
grow(Node, VhostSpec, QueueSpec, Strategy) ->
|
||||
Running = rabbit_mnesia:cluster_nodes(running),
|
||||
[begin
|
||||
Size = length(amqqueue:get_quorum_nodes(Q)),
|
||||
Size = length(get_nodes(Q)),
|
||||
QName = amqqueue:get_name(Q),
|
||||
rabbit_log:info("~s: adding a new member (replica) on node ~w",
|
||||
[rabbit_misc:rs(QName), Node]),
|
||||
|
|
@ -836,12 +838,12 @@ grow(Node, VhostSpec, QueueSpec, Strategy) ->
|
|||
end
|
||||
end
|
||||
|| Q <- rabbit_amqqueue:list(),
|
||||
amqqueue:get_type(Q) == quorum,
|
||||
amqqueue:get_type(Q) == ?MODULE,
|
||||
%% don't add a member if there is already one on the node
|
||||
not lists:member(Node, amqqueue:get_quorum_nodes(Q)),
|
||||
not lists:member(Node, get_nodes(Q)),
|
||||
%% node needs to be running
|
||||
lists:member(Node, Running),
|
||||
matches_strategy(Strategy, amqqueue:get_quorum_nodes(Q)),
|
||||
matches_strategy(Strategy, get_nodes(Q)),
|
||||
is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
|
||||
is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ].
|
||||
|
||||
|
|
@ -989,13 +991,13 @@ i(garbage_collection, Q) when ?is_amqqueue(Q) ->
|
|||
[]
|
||||
end;
|
||||
i(members, Q) when ?is_amqqueue(Q) ->
|
||||
amqqueue:get_quorum_nodes(Q);
|
||||
get_nodes(Q);
|
||||
i(online, Q) -> online(Q);
|
||||
i(leader, Q) -> leader(Q);
|
||||
i(open_files, Q) when ?is_amqqueue(Q) ->
|
||||
{Name, _} = amqqueue:get_pid(Q),
|
||||
Nodes = amqqueue:get_quorum_nodes(Q),
|
||||
{Data, _} = rpc:multicall(Nodes, rabbit_quorum_queue, open_files, [Name]),
|
||||
Nodes = get_nodes(Q),
|
||||
{Data, _} = rpc:multicall(Nodes, ?MODULE, open_files, [Name]),
|
||||
lists:flatten(Data);
|
||||
i(single_active_consumer_pid, Q) when ?is_amqqueue(Q) ->
|
||||
QPid = amqqueue:get_pid(Q),
|
||||
|
|
@ -1047,12 +1049,12 @@ leader(Q) when ?is_amqqueue(Q) ->
|
|||
end.
|
||||
|
||||
online(Q) when ?is_amqqueue(Q) ->
|
||||
Nodes = amqqueue:get_quorum_nodes(Q),
|
||||
Nodes = get_nodes(Q),
|
||||
{Name, _} = amqqueue:get_pid(Q),
|
||||
[Node || Node <- Nodes, is_process_alive(Name, Node)].
|
||||
|
||||
format(Q) when ?is_amqqueue(Q) ->
|
||||
Nodes = amqqueue:get_quorum_nodes(Q),
|
||||
Nodes = get_nodes(Q),
|
||||
[{members, Nodes}, {online, online(Q)}, {leader, leader(Q)}].
|
||||
|
||||
is_process_alive(Name, Node) ->
|
||||
|
|
@ -1144,7 +1146,7 @@ select_quorum_nodes(Size, Rest, Selected) ->
|
|||
%% member with the current leader first
|
||||
members(Q) when ?amqqueue_is_quorum(Q) ->
|
||||
{RaName, LeaderNode} = amqqueue:get_pid(Q),
|
||||
Nodes = lists:delete(LeaderNode, amqqueue:get_quorum_nodes(Q)),
|
||||
Nodes = lists:delete(LeaderNode, get_nodes(Q)),
|
||||
[{RaName, N} || N <- [LeaderNode | Nodes]].
|
||||
|
||||
make_ra_conf(Q, ServerId, TickTimeout) ->
|
||||
|
|
@ -1163,3 +1165,10 @@ make_ra_conf(Q, ServerId, TickTimeout) ->
|
|||
tick_timeout => TickTimeout,
|
||||
machine => RaMachine}.
|
||||
|
||||
get_nodes(Q) when ?is_amqqueue(Q) ->
|
||||
#{nodes := Nodes} = amqqueue:get_type_state(Q),
|
||||
Nodes.
|
||||
|
||||
update_type_state(Q, Fun) when ?is_amqqueue(Q) ->
|
||||
Ts = amqqueue:get_type_state(Q),
|
||||
amqqueue:set_type_state(Q, Fun(Ts)).
|
||||
|
|
|
|||
|
|
@ -104,7 +104,7 @@ new_amqqueue_v2_is_amqqueue(_) ->
|
|||
[],
|
||||
VHost,
|
||||
#{},
|
||||
classic),
|
||||
rabbit_classic_queue),
|
||||
?assert(?is_amqqueue(Queue)),
|
||||
?assert(?is_amqqueue_v2(Queue)),
|
||||
?assert(not ?is_amqqueue_v1(Queue)),
|
||||
|
|
@ -253,7 +253,7 @@ amqqueue_v2_type_matching(_) ->
|
|||
[],
|
||||
VHost,
|
||||
#{},
|
||||
classic),
|
||||
rabbit_classic_queue),
|
||||
?assert(?amqqueue_is_classic(ClassicQueue)),
|
||||
?assert(amqqueue:is_classic(ClassicQueue)),
|
||||
?assert(not ?amqqueue_is_quorum(ClassicQueue)),
|
||||
|
|
@ -267,7 +267,7 @@ amqqueue_v2_type_matching(_) ->
|
|||
[],
|
||||
VHost,
|
||||
#{},
|
||||
quorum),
|
||||
rabbit_quorum_queue),
|
||||
?assert(not ?amqqueue_is_classic(QuorumQueue)),
|
||||
?assert(not amqqueue:is_classic(QuorumQueue)),
|
||||
?assert(?amqqueue_is_quorum(QuorumQueue)),
|
||||
|
|
|
|||
|
|
@ -264,15 +264,15 @@ declare_args(Config) ->
|
|||
declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
|
||||
{<<"x-max-length">>, long, 2000},
|
||||
{<<"x-max-length-bytes">>, long, 2000}]),
|
||||
assert_queue_type(Server, LQ, quorum),
|
||||
assert_queue_type(Server, LQ, rabbit_quorum_queue),
|
||||
|
||||
DQ = <<"classic-declare-args-q">>,
|
||||
declare(Ch, DQ, [{<<"x-queue-type">>, longstr, <<"classic">>}]),
|
||||
assert_queue_type(Server, DQ, classic),
|
||||
assert_queue_type(Server, DQ, rabbit_classic_queue),
|
||||
|
||||
DQ2 = <<"classic-q2">>,
|
||||
declare(Ch, DQ2),
|
||||
assert_queue_type(Server, DQ2, classic).
|
||||
assert_queue_type(Server, DQ2, rabbit_classic_queue).
|
||||
|
||||
declare_invalid_properties(Config) ->
|
||||
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
||||
|
|
|
|||
Loading…
Reference in New Issue