rabbitmq-server/deps/rabbit/test/unit_quorum_queue_SUITE.erl

143 lines
5.7 KiB
Erlang

-module(unit_quorum_queue_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
all() ->
[
all_replica_states_includes_alive_nonvoters,
filter_nonvoters,
filter_quorum_critical_accounts_nonvoters,
ra_machine_conf_delivery_limit
].
ra_machine_conf_delivery_limit(_Config) ->
Q0 = amqqueue:new(rabbit_misc:r(<<"/">>, queue, <<"q2">>),
{q2, test@leader},
false, false, none, [], undefined, #{}),
%% ensure default is set
?assertMatch(#{delivery_limit := 20},
rabbit_quorum_queue:ra_machine_config(Q0)),
Q = amqqueue:set_policy(Q0, [{name, <<"p1">>},
{definition, [{<<"delivery-limit">>,-1}]}]),
%% a policy of -1
?assertMatch(#{delivery_limit := -1},
rabbit_quorum_queue:ra_machine_config(Q)),
%% if therre is a queue arg with a non neg value this takes precedence
Q1 = amqqueue:set_arguments(Q, [{<<"x-delivery-limit">>, long, 5}]),
?assertMatch(#{delivery_limit := 5},
rabbit_quorum_queue:ra_machine_config(Q1)),
Q2 = amqqueue:set_policy(Q1, [{name, <<"o1">>},
{definition, [{<<"delivery-limit">>, 5}]}]),
Q3 = amqqueue:set_arguments(Q2, [{<<"x-delivery-limit">>, long, -1}]),
?assertMatch(#{delivery_limit := 5},
rabbit_quorum_queue:ra_machine_config(Q3)),
%% non neg takes precedence
?assertMatch(#{delivery_limit := 5},
make_ra_machine_conf(Q0, -1, -1, 5)),
?assertMatch(#{delivery_limit := 5},
make_ra_machine_conf(Q0, -1, 5, -1)),
?assertMatch(#{delivery_limit := 5},
make_ra_machine_conf(Q0, 5, -1, -1)),
?assertMatch(#{delivery_limit := 5},
make_ra_machine_conf(Q0, -1, 10, 5)),
?assertMatch(#{delivery_limit := 5},
make_ra_machine_conf(Q0, -1, 5, 10)),
?assertMatch(#{delivery_limit := 5},
make_ra_machine_conf(Q0, 5, 15, 10)),
?assertMatch(#{delivery_limit := 5},
make_ra_machine_conf(Q0, 15, 5, 10)),
?assertMatch(#{delivery_limit := 5},
make_ra_machine_conf(Q0, 15, 10, 5)),
ok.
filter_quorum_critical_accounts_nonvoters(_Config) ->
Nodes = [test@leader, test@follower1, test@follower2],
Qs0 = [amqqueue:new(rabbit_misc:r(<<"/">>, queue, <<"q1">>),
{q1, test@leader},
false, false, none, [], undefined, #{}),
amqqueue:new(rabbit_misc:r(<<"/">>, queue, <<"q2">>),
{q2, test@leader},
false, false, none, [], undefined, #{})],
Qs = [Q1, Q2] = lists:map(fun (Q) ->
amqqueue:set_type_state(Q, #{nodes => Nodes})
end, Qs0),
Ss = #{test@leader => #{q1 => leader, q2 => leader},
test@follower1 => #{q1 => promotable, q2 => follower},
test@follower2 => #{q1 => follower, q2 => promotable}},
Qs = rabbit_quorum_queue:filter_quorum_critical(Qs, Ss, test@leader),
[Q2] = rabbit_quorum_queue:filter_quorum_critical(Qs, Ss, test@follower1),
[Q1] = rabbit_quorum_queue:filter_quorum_critical(Qs, Ss, test@follower2),
ok.
filter_nonvoters(_Config) ->
Qs = [_, _, _, Q4] =
[amqqueue:new(rabbit_misc:r(<<"/">>, queue, <<"q1">>),
{q1, test@leader},
false, false, none, [], undefined, #{}),
amqqueue:new(rabbit_misc:r(<<"/">>, queue, <<"q2">>),
{q2, test@leader},
false, false, none, [], undefined, #{}),
amqqueue:new(rabbit_misc:r(<<"/">>, queue, <<"q3">>),
{q3, test@leader},
false, false, none, [], undefined, #{}),
amqqueue:new(rabbit_misc:r(<<"/">>, queue, <<"q4">>),
{q4, test@leader},
false, false, none, [], undefined, #{})],
Ss = #{q1 => leader, q2 => follower, q3 => non_voter, q4 => promotable},
[Q4] = rabbit_quorum_queue:filter_promotable(Qs, Ss),
ok.
all_replica_states_includes_alive_nonvoters(_Config) ->
ets:new(ra_state, [named_table, public, {write_concurrency, true}]),
QPids = start_qprocs(_AliveQs = [q1, q2, q3, q4]),
ets:insert(ra_state, [
{q1, leader, voter},
{q2, follower, voter},
{q3, follower, promotable},
{q4, init, unknown},
%% queues in ra_state but not alive
{q5, leader, voter},
{q6, follower, noproc}
]),
{_, #{
q1 := leader,
q2 := follower,
q3 := promotable,
q4 := init
} = ReplicaStates} = rabbit_quorum_queue:all_replica_states(),
?assertNot(maps:is_key(q5, ReplicaStates)),
?assertNot(maps:is_key(q6, ReplicaStates)),
true = ets:delete(ra_state),
_ = stop_qprocs(QPids),
ok.
make_ra_machine_conf(Q0, Arg, Pol, OpPol) ->
Q1 = amqqueue:set_arguments(Q0, [{<<"x-delivery-limit">>, long, Arg}]),
Q2 = amqqueue:set_policy(Q1, [{name, <<"p1">>},
{definition, [{<<"delivery-limit">>,Pol}]}]),
Q = amqqueue:set_operator_policy(Q2, [{name, <<"p1">>},
{definition, [{<<"delivery-limit">>,OpPol}]}]),
rabbit_quorum_queue:ra_machine_config(Q).
start_qprocs(Qs) ->
[begin
QPid = spawn(fun() -> receive done -> ok end end),
erlang:register(Q, QPid),
QPid
end || Q <- Qs].
stop_qprocs(Pids) ->
[erlang:send(P, done)|| P <- Pids].