Add federation support for quorum queues

This commit is contained in:
dcorbacho 2021-02-10 15:06:19 +01:00
parent a71771b8c1
commit 699cd1ab29
6 changed files with 224 additions and 78 deletions

View File

@ -1637,6 +1637,8 @@ basic_cancel(Q, ConsumerTag, OkMsg, ActingUser, QStates) ->
notify_decorators(Q) when ?amqqueue_is_classic(Q) -> notify_decorators(Q) when ?amqqueue_is_classic(Q) ->
QPid = amqqueue:get_pid(Q), QPid = amqqueue:get_pid(Q),
delegate:invoke_no_result(QPid, {gen_server2, cast, [notify_decorators]}); delegate:invoke_no_result(QPid, {gen_server2, cast, [notify_decorators]});
notify_decorators(Q) when ?amqqueue_is_quorum(Q) ->
rabbit_quorum_queue:notify_decorators(Q);
notify_decorators(_Q) -> notify_decorators(_Q) ->
%% Not supported by any other queue type %% Not supported by any other queue type
ok. ok.

View File

@ -41,6 +41,7 @@
query_single_active_consumer/1, query_single_active_consumer/1,
query_in_memory_usage/1, query_in_memory_usage/1,
query_peek/2, query_peek/2,
query_notify_decorators_info/1,
usage/1, usage/1,
zero/1, zero/1,
@ -241,7 +242,7 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
{State1, ok, Effects} = {State1, ok, Effects} =
checkout(Meta, State0, checkout(Meta, State0,
State0#?MODULE{service_queue = ServiceQueue, State0#?MODULE{service_queue = ServiceQueue,
consumers = Cons}, []), consumers = Cons}, [], false),
Response = {send_credit_reply, messages_ready(State1)}, Response = {send_credit_reply, messages_ready(State1)},
%% by this point all checkouts for the updated credit value %% by this point all checkouts for the updated credit value
%% should be processed so we can evaluate the drain %% should be processed so we can evaluate the drain
@ -299,7 +300,8 @@ apply(#{index := Index,
Exists = maps:is_key(ConsumerId, Consumers), Exists = maps:is_key(ConsumerId, Consumers),
case messages_ready(State0) of case messages_ready(State0) of
0 -> 0 ->
update_smallest_raft_index(Index, {dequeue, empty}, State0, []); update_smallest_raft_index(Index, {dequeue, empty}, State0,
[notify_decorators_effect(State0)]);
_ when Exists -> _ when Exists ->
%% a dequeue using the same consumer_id isn't possible at this point %% a dequeue using the same consumer_id isn't possible at this point
{State0, {dequeue, empty}}; {State0, {dequeue, empty}};
@ -330,8 +332,8 @@ apply(#{index := Index,
{{dequeue, {MsgId, Msg}, Ready-1}, Effects1} {{dequeue, {MsgId, Msg}, Ready-1}, Effects1}
end, end,
NotifyEffect = notify_decorators_effect(State4),
case evaluate_limit(Index, false, State0, State4, Effects2) of case evaluate_limit(Index, false, State0, State4, [NotifyEffect | Effects2]) of
{State, true, Effects} -> {State, true, Effects} ->
update_smallest_raft_index(Index, Reply, State, Effects); update_smallest_raft_index(Index, Reply, State, Effects);
{State, false, Effects} -> {State, false, Effects} ->
@ -456,6 +458,7 @@ apply(#{system_time := Ts} = Meta, {down, Pid, noconnection},
% Monitor the node so that we can "unsuspect" these processes when the node % Monitor the node so that we can "unsuspect" these processes when the node
% comes back, then re-issue all monitors and discover the final fate of % comes back, then re-issue all monitors and discover the final fate of
% these processes % these processes
Effects = case maps:size(State#?MODULE.consumers) of Effects = case maps:size(State#?MODULE.consumers) of
0 -> 0 ->
[{aux, inactive}, {monitor, node, Node}]; [{aux, inactive}, {monitor, node, Node}];
@ -959,6 +962,21 @@ query_peek(Pos, State0) when Pos > 0 ->
query_peek(Pos-1, State) query_peek(Pos-1, State)
end. end.
query_notify_decorators_info(#?MODULE{consumers = Consumers} = State) ->
MaxActivePriority = maps:fold(fun(_, #consumer{credit = C,
status = up,
priority = P0}, MaxP) when C > 0 ->
P = -P0,
case MaxP of
empty -> P;
MaxP when MaxP > P -> MaxP;
_ -> P
end;
(_, _, MaxP) ->
MaxP
end, empty, Consumers),
IsEmpty = (messages_ready(State) == 0),
{MaxActivePriority, IsEmpty}.
-spec usage(atom()) -> float(). -spec usage(atom()) -> float().
usage(Name) when is_atom(Name) -> usage(Name) when is_atom(Name) ->
@ -1062,11 +1080,13 @@ cancel_consumer0(Meta, ConsumerId,
#{ConsumerId := Consumer} -> #{ConsumerId := Consumer} ->
{S, Effects2} = maybe_return_all(Meta, ConsumerId, Consumer, {S, Effects2} = maybe_return_all(Meta, ConsumerId, Consumer,
S0, Effects0, Reason), S0, Effects0, Reason),
%% The effects are emitted before the consumer is actually removed %% The effects are emitted before the consumer is actually removed
%% if the consumer has unacked messages. This is a bit weird but %% if the consumer has unacked messages. This is a bit weird but
%% in line with what classic queues do (from an external point of %% in line with what classic queues do (from an external point of
%% view) %% view)
Effects = cancel_consumer_effects(ConsumerId, S, Effects2), Effects = cancel_consumer_effects(ConsumerId, S, Effects2),
case maps:size(S#?MODULE.consumers) of case maps:size(S#?MODULE.consumers) of
0 -> 0 ->
{S, [{aux, inactive} | Effects]}; {S, [{aux, inactive} | Effects]};
@ -1129,7 +1149,7 @@ apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of
{ok, State1, Effects1} -> {ok, State1, Effects1} ->
State2 = append_to_master_index(RaftIdx, State1), State2 = append_to_master_index(RaftIdx, State1),
{State, ok, Effects} = checkout(Meta, State0, State2, Effects1), {State, ok, Effects} = checkout(Meta, State0, State2, Effects1, false),
{maybe_store_dehydrated_state(RaftIdx, State), ok, Effects}; {maybe_store_dehydrated_state(RaftIdx, State), ok, Effects};
{duplicate, State, Effects} -> {duplicate, State, Effects} ->
{State, ok, Effects} {State, ok, Effects}
@ -1287,7 +1307,7 @@ return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned,
_ -> _ ->
State1 State1
end, end,
{State, ok, Effects} = checkout(Meta, State0, State2, Effects1), {State, ok, Effects} = checkout(Meta, State0, State2, Effects1, false),
update_smallest_raft_index(IncomingRaftIdx, State, Effects). update_smallest_raft_index(IncomingRaftIdx, State, Effects).
% used to processes messages that are finished % used to processes messages that are finished
@ -1331,7 +1351,7 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
Discarded = maps:with(MsgIds, Checked0), Discarded = maps:with(MsgIds, Checked0),
{State2, Effects1} = complete(Meta, ConsumerId, Discarded, Con0, {State2, Effects1} = complete(Meta, ConsumerId, Discarded, Con0,
Effects0, State0), Effects0, State0),
{State, ok, Effects} = checkout(Meta, State0, State2, Effects1), {State, ok, Effects} = checkout(Meta, State0, State2, Effects1, false),
update_smallest_raft_index(IncomingRaftIdx, State, Effects). update_smallest_raft_index(IncomingRaftIdx, State, Effects).
dead_letter_effects(_Reason, _Discarded, dead_letter_effects(_Reason, _Discarded,
@ -1363,9 +1383,10 @@ dead_letter_effects(Reason, Discarded,
end} | Effects]. end} | Effects].
cancel_consumer_effects(ConsumerId, cancel_consumer_effects(ConsumerId,
#?MODULE{cfg = #cfg{resource = QName}}, Effects) -> #?MODULE{cfg = #cfg{resource = QName}} = State, Effects) ->
[{mod_call, rabbit_quorum_queue, [{mod_call, rabbit_quorum_queue,
cancel_consumer_handler, [QName, ConsumerId]} | Effects]. cancel_consumer_handler, [QName, ConsumerId]},
notify_decorators_effect(State) | Effects].
update_smallest_raft_index(Idx, State, Effects) -> update_smallest_raft_index(Idx, State, Effects) ->
update_smallest_raft_index(Idx, ok, State, Effects). update_smallest_raft_index(Idx, ok, State, Effects).
@ -1500,14 +1521,30 @@ return_all(Meta, #?MODULE{consumers = Cons} = State0, Effects0, ConsumerId,
end, {State, Effects0}, Checked). end, {State, Effects0}, Checked).
%% checkout new messages to consumers %% checkout new messages to consumers
checkout(#{index := Index} = Meta, OldState, State0, Effects0) -> checkout(Meta, OldState, State, Effects) ->
checkout(Meta, OldState, State, Effects, true).
checkout(#{index := Index} = Meta, #?MODULE{cfg = #cfg{resource = QName}} = OldState, State0,
Effects0, HandleConsumerChanges) ->
{State1, _Result, Effects1} = checkout0(Meta, checkout_one(Meta, State0), {State1, _Result, Effects1} = checkout0(Meta, checkout_one(Meta, State0),
Effects0, {#{}, #{}}), Effects0, {#{}, #{}}),
case evaluate_limit(Index, false, OldState, State1, Effects1) of case evaluate_limit(Index, false, OldState, State1, Effects1) of
{State, true, Effects} -> {State, true, Effects} ->
update_smallest_raft_index(Index, State, Effects); case have_active_consumers_changed(State, HandleConsumerChanges) of
{true, {MaxActivePriority, IsEmpty}} ->
NotifyEffect = notify_decorators_effect(QName, MaxActivePriority, IsEmpty),
update_smallest_raft_index(Index, State, [NotifyEffect | Effects]);
false ->
update_smallest_raft_index(Index, State, Effects)
end;
{State, false, Effects} -> {State, false, Effects} ->
{State, ok, Effects} case have_active_consumers_changed(State, HandleConsumerChanges) of
{true, {MaxActivePriority, IsEmpty}} ->
NotifyEffect = notify_decorators_effect(QName, MaxActivePriority, IsEmpty),
{State, ok, [NotifyEffect | Effects]};
false ->
{State, ok, Effects}
end
end. end.
checkout0(Meta, {success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State}, checkout0(Meta, {success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State},
@ -2126,3 +2163,16 @@ get_priority_from_args(#{args := Args}) ->
end; end;
get_priority_from_args(_) -> get_priority_from_args(_) ->
0. 0.
have_active_consumers_changed(_, false) ->
false;
have_active_consumers_changed(State, _) ->
{true, query_notify_decorators_info(State)}.
notify_decorators_effect(#?MODULE{cfg = #cfg{resource = QName}} = State) ->
{MaxActivePriority, IsEmpty} = query_notify_decorators_info(State),
notify_decorators_effect(QName, MaxActivePriority, IsEmpty).
notify_decorators_effect(QName, MaxActivePriority, IsEmpty) ->
{mod_call, rabbit_quorum_queue, spawn_notify_decorators,
[QName, consumer_state_changed, [MaxActivePriority, IsEmpty]]}.

View File

@ -48,6 +48,9 @@
repair_amqqueue_nodes/2 repair_amqqueue_nodes/2
]). ]).
-export([reclaim_memory/2]). -export([reclaim_memory/2]).
-export([notify_decorators/1,
notify_decorators/3,
spawn_notify_decorators/3]).
-export([is_enabled/0, -export([is_enabled/0,
declare/2]). declare/2]).
@ -172,6 +175,7 @@ start_cluster(Q) ->
ra_machine_config(NewQ)), ra_machine_config(NewQ)),
%% force a policy change to ensure the latest config is %% force a policy change to ensure the latest config is
%% updated even when running the machine version from 0 %% updated even when running the machine version from 0
notify_decorators(QName, startup),
rabbit_event:notify(queue_created, rabbit_event:notify(queue_created,
[{name, QName}, [{name, QName},
{durable, Durable}, {durable, Durable},
@ -369,6 +373,11 @@ spawn_deleter(QName) ->
delete(Q, false, false, <<"expired">>) delete(Q, false, false, <<"expired">>)
end). end).
spawn_notify_decorators(QName, Fun, Args) ->
spawn(fun () ->
notify_decorators(QName, Fun, Args)
end).
handle_tick(QName, handle_tick(QName,
{Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack},
Nodes) -> Nodes) ->
@ -568,6 +577,7 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
after Timeout -> after Timeout ->
ok = force_delete_queue(Servers) ok = force_delete_queue(Servers)
end, end,
notify_decorators(QName, shutdown),
ok = delete_queue_data(QName, ActingUser), ok = delete_queue_data(QName, ActingUser),
rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName], rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
?RPC_TIMEOUT), ?RPC_TIMEOUT),
@ -589,6 +599,7 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
" Attempting force delete.", " Attempting force delete.",
[rabbit_misc:rs(QName), Errs]), [rabbit_misc:rs(QName), Errs]),
ok = force_delete_queue(Servers), ok = force_delete_queue(Servers),
notify_decorators(QName, shutdown),
delete_queue_data(QName, ActingUser), delete_queue_data(QName, ActingUser),
{ok, ReadyMsgs} {ok, ReadyMsgs}
end end
@ -1525,3 +1536,25 @@ parse_credit_args(Default, Args) ->
undefined -> undefined ->
{simple_prefetch, Default, false} {simple_prefetch, Default, false}
end. end.
-spec notify_decorators(amqqueue:amqqueue()) -> 'ok'.
notify_decorators(Q) when ?is_amqqueue(Q) ->
QName = amqqueue:get_name(Q),
QPid = amqqueue:get_pid(Q),
case ra:local_query(QPid, fun rabbit_fifo:query_notify_decorators_info/1) of
{ok, {_, {MaxActivePriority, IsEmpty}}, _} ->
notify_decorators(QName, consumer_state_changed, [MaxActivePriority, IsEmpty]);
_ -> ok
end.
notify_decorators(QName, Event) -> notify_decorators(QName, Event, []).
notify_decorators(QName, F, A) ->
%% Look up again in case policy and hence decorators have changed
case rabbit_amqqueue:lookup(QName) of
{ok, Q} ->
Ds = amqqueue:get_decorators(Q),
[ok = apply(M, F, [Q|A]) || M <- rabbit_queue_decorator:select(Ds)];
{error, not_found} ->
ok
end.

View File

@ -177,7 +177,8 @@ enq_enq_deq_test(_) ->
{State2, _} = enq(2, 2, second, State1), {State2, _} = enq(2, 2, second, State1),
% get returns a reply value % get returns a reply value
NumReady = 1, NumReady = 1,
{_State3, {dequeue, {0, {_, first}}, NumReady}, [{monitor, _, _}]} = {_State3, {dequeue, {0, {_, first}}, NumReady},
[{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _}, {monitor, _, _}]} =
apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}), apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}),
State2), State2),
ok. ok.
@ -187,7 +188,8 @@ enq_enq_deq_deq_settle_test(_) ->
{State1, _} = enq(1, 1, first, test_init(test)), {State1, _} = enq(1, 1, first, test_init(test)),
{State2, _} = enq(2, 2, second, State1), {State2, _} = enq(2, 2, second, State1),
% get returns a reply value % get returns a reply value
{State3, {dequeue, {0, {_, first}}, 1}, [{monitor, _, _}]} = {State3, {dequeue, {0, {_, first}}, 1},
[{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _}, {monitor, _, _}]} =
apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}), apply(meta(3), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}),
State2), State2),
{_State4, {dequeue, empty}} = {_State4, {dequeue, empty}} =
@ -235,7 +237,8 @@ release_cursor_test(_) ->
checkout_enq_settle_test(_) -> checkout_enq_settle_test(_) ->
Cid = {?FUNCTION_NAME, self()}, Cid = {?FUNCTION_NAME, self()},
{State1, [{monitor, _, _} | _]} = check(Cid, 1, test_init(test)), {State1, [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _},
{monitor, _, _} | _]} = check(Cid, 1, test_init(test)),
{State2, Effects0} = enq(2, 1, first, State1), {State2, Effects0} = enq(2, 1, first, State1),
?ASSERT_EFF({send_msg, _, ?ASSERT_EFF({send_msg, _,
{delivery, ?FUNCTION_NAME, {delivery, ?FUNCTION_NAME,
@ -250,7 +253,8 @@ checkout_enq_settle_test(_) ->
out_of_order_enqueue_test(_) -> out_of_order_enqueue_test(_) ->
Cid = {?FUNCTION_NAME, self()}, Cid = {?FUNCTION_NAME, self()},
{State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)), {State1, [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _},
{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)),
{State2, Effects2} = enq(2, 1, first, State1), {State2, Effects2} = enq(2, 1, first, State1),
?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2), ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2),
% assert monitor was set up % assert monitor was set up
@ -280,7 +284,8 @@ out_of_order_first_enqueue_test(_) ->
duplicate_enqueue_test(_) -> duplicate_enqueue_test(_) ->
Cid = {<<"duplicate_enqueue_test">>, self()}, Cid = {<<"duplicate_enqueue_test">>, self()},
{State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)), {State1, [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _},
{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)),
{State2, Effects2} = enq(2, 1, first, State1), {State2, Effects2} = enq(2, 1, first, State1),
?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2), ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2),
{_State3, Effects3} = enq(3, 1, first, State2), {_State3, Effects3} = enq(3, 1, first, State2),
@ -331,7 +336,8 @@ return_non_existent_test(_) ->
return_checked_out_test(_) -> return_checked_out_test(_) ->
Cid = {<<"cid">>, self()}, Cid = {<<"cid">>, self()},
{State0, [_, _]} = enq(1, 1, first, test_init(test)), {State0, [_, _]} = enq(1, 1, first, test_init(test)),
{State1, [_Monitor, {State1, [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _},
_Monitor,
{send_msg, _, {delivery, _, [{MsgId, _}]}, _}, {send_msg, _, {delivery, _, [{MsgId, _}]}, _},
{aux, active} | _ ]} = check_auto(Cid, 2, State0), {aux, active} | _ ]} = check_auto(Cid, 2, State0),
% returning immediately checks out the same message again % returning immediately checks out the same message again
@ -348,7 +354,8 @@ return_checked_out_limit_test(_) ->
release_cursor_interval => 0, release_cursor_interval => 0,
delivery_limit => 1}), delivery_limit => 1}),
{State0, [_, _]} = enq(1, 1, first, Init), {State0, [_, _]} = enq(1, 1, first, Init),
{State1, [_Monitor, {State1, [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _},
_Monitor,
{send_msg, _, {delivery, _, [{MsgId, _}]}, _}, {send_msg, _, {delivery, _, [{MsgId, _}]}, _},
{aux, active} | _ ]} = check_auto(Cid, 2, State0), {aux, active} | _ ]} = check_auto(Cid, 2, State0),
% returning immediately checks out the same message again % returning immediately checks out the same message again
@ -366,7 +373,8 @@ return_auto_checked_out_test(_) ->
{State0, [_]} = enq(2, 2, second, State00), {State0, [_]} = enq(2, 2, second, State00),
% it first active then inactive as the consumer took on but cannot take % it first active then inactive as the consumer took on but cannot take
% any more % any more
{State1, [_Monitor, {State1, [{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _},
_Monitor,
{send_msg, _, {delivery, _, [{MsgId, _}]}, _}, {send_msg, _, {delivery, _, [{MsgId, _}]}, _},
{aux, active}, {aux, active},
{aux, inactive} {aux, inactive}
@ -401,7 +409,7 @@ cancelled_checkout_out_test(_) ->
down_with_noproc_consumer_returns_unsettled_test(_) -> down_with_noproc_consumer_returns_unsettled_test(_) ->
Cid = {<<"down_consumer_returns_unsettled_test">>, self()}, Cid = {<<"down_consumer_returns_unsettled_test">>, self()},
{State0, [_, _]} = enq(1, 1, second, test_init(test)), {State0, [_, _]} = enq(1, 1, second, test_init(test)),
{State1, [{monitor, process, Pid} | _]} = check(Cid, 2, State0), {State1, [_, {monitor, process, Pid} | _]} = check(Cid, 2, State0),
{State2, _, _} = apply(meta(3), {down, Pid, noproc}, State1), {State2, _, _} = apply(meta(3), {down, Pid, noproc}, State1),
{_State, Effects} = check(Cid, 4, State2), {_State, Effects} = check(Cid, 4, State2),
?ASSERT_EFF({monitor, process, _}, Effects), ?ASSERT_EFF({monitor, process, _}, Effects),
@ -600,7 +608,8 @@ purge_test(_) ->
{State2, {purge, 1}, _} = apply(meta(2), rabbit_fifo:make_purge(), State1), {State2, {purge, 1}, _} = apply(meta(2), rabbit_fifo:make_purge(), State1),
{State3, _} = enq(3, 2, second, State2), {State3, _} = enq(3, 2, second, State2),
% get returns a reply value % get returns a reply value
{_State4, {dequeue, {0, {_, second}}, _}, [{monitor, _, _}]} = {_State4, {dequeue, {0, {_, second}}, _},
[{mod_call, rabbit_quorum_queue, spawn_notify_decorators, _}, {monitor, _, _}]} =
apply(meta(4), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}), State3), apply(meta(4), rabbit_fifo:make_checkout(Cid, {dequeue, unsettled}, #{}), State3),
ok. ok.
@ -1137,12 +1146,12 @@ active_flag_updated_when_consumer_suspected_unsuspected_test(_) ->
{State2, _, Effects2} = apply(#{index => 3, {State2, _, Effects2} = apply(#{index => 3,
system_time => 1500}, {down, Pid1, noconnection}, State1), system_time => 1500}, {down, Pid1, noconnection}, State1),
% 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node % 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node, 1 more decorators effect
?assertEqual(4 + 1, length(Effects2)), ?assertEqual(4 + 1 + 1, length(Effects2)),
{_, _, Effects3} = apply(#{index => 4}, {nodeup, node(self())}, State2), {_, _, Effects3} = apply(#{index => 4}, {nodeup, node(self())}, State2),
% for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID % for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID, 1 more decorators effect
?assertEqual(4 + 4, length(Effects3)). ?assertEqual(4 + 4 + 1, length(Effects3)).
active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_consumer_is_on_test(_) -> active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_consumer_is_on_test(_) ->
State0 = init(#{name => ?FUNCTION_NAME, State0 = init(#{name => ?FUNCTION_NAME,
@ -1171,11 +1180,11 @@ active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_co
{State2, _, Effects2} = apply(meta(2), {down, Pid1, noconnection}, State1), {State2, _, Effects2} = apply(meta(2), {down, Pid1, noconnection}, State1),
% one monitor and one consumer status update (deactivated) % one monitor and one consumer status update (deactivated)
?assertEqual(3, length(Effects2)), ?assertEqual(4, length(Effects2)),
{_, _, Effects3} = apply(meta(3), {nodeup, node(self())}, State2), {_, _, Effects3} = apply(meta(3), {nodeup, node(self())}, State2),
% for each consumer: 1 effect to monitor the consumer PID % for each consumer: 1 effect to monitor the consumer PID
?assertEqual(5, length(Effects3)). ?assertEqual(6, length(Effects3)).
single_active_cancelled_with_unacked_test(_) -> single_active_cancelled_with_unacked_test(_) ->
State0 = init(#{name => ?FUNCTION_NAME, State0 = init(#{name => ?FUNCTION_NAME,

View File

@ -15,7 +15,7 @@
-import(rabbit_federation_test_util, -import(rabbit_federation_test_util,
[wait_for_federation/2, expect/3, expect/4, [wait_for_federation/2, expect/3, expect/4,
set_upstream/4, set_upstream/5, clear_upstream/3, set_policy/5, clear_policy/3, set_upstream/4, set_upstream/5, clear_upstream/3, set_policy/5, clear_policy/3,
set_policy_pattern/5, set_policy_upstream/5, q/1, with_ch/3, set_policy_pattern/5, set_policy_upstream/5, q/2, with_ch/3,
declare_queue/2, delete_queue/2, declare_queue/2, delete_queue/2,
federation_links_in_vhost/3]). federation_links_in_vhost/3]).
@ -24,30 +24,37 @@
all() -> all() ->
[ [
{group, without_disambiguate}, {group, classic_queue},
{group, with_disambiguate} {group, quorum_queue}
]. ].
groups() -> groups() ->
[ ClusterSize1 = [simple,
{without_disambiguate, [], [ multiple_upstreams,
{cluster_size_1, [], [ multiple_upstreams_pattern,
simple, multiple_downstreams,
multiple_upstreams, bidirectional,
multiple_upstreams_pattern, dynamic_reconfiguration,
multiple_downstreams, federate_unfederate,
bidirectional, dynamic_plugin_stop_start
dynamic_reconfiguration, ],
federate_unfederate, ClusterSize2 = [restart_upstream],
dynamic_plugin_stop_start [{classic_queue, [], [
]} {without_disambiguate, [], [
]}, {cluster_size_1, [], ClusterSize1}
{with_disambiguate, [], [ ]},
{cluster_size_2, [], [ {with_disambiguate, [], [
restart_upstream {cluster_size_2, [], ClusterSize2}
]} ]}
]} ]},
]. {quorum_queue, [], [
{without_disambiguate, [], [
{cluster_size_1, [], ClusterSize1}
]},
{with_disambiguate, [], [
{cluster_size_2, [], ClusterSize2}
]}
]}].
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% Testsuite setup/teardown. %% Testsuite setup/teardown.
@ -60,6 +67,16 @@ init_per_suite(Config) ->
end_per_suite(Config) -> end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config). rabbit_ct_helpers:run_teardown_steps(Config).
init_per_group(classic_queue, Config) ->
rabbit_ct_helpers:set_config(
Config,
[{queue_type, classic},
{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}]);
init_per_group(quorum_queue, Config) ->
rabbit_ct_helpers:set_config(
Config,
[{queue_type, quorum},
{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}]);
init_per_group(without_disambiguate, Config) -> init_per_group(without_disambiguate, Config) ->
rabbit_ct_helpers:set_config(Config, rabbit_ct_helpers:set_config(Config,
{disambiguate_step, []}); {disambiguate_step, []});
@ -88,15 +105,30 @@ init_per_group1(Group, Config) ->
{rmq_nodename_suffix, Suffix}, {rmq_nodename_suffix, Suffix},
{rmq_nodes_clustered, false} {rmq_nodes_clustered, false}
]), ]),
rabbit_ct_helpers:run_steps(Config1, Config2 = rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps() ++
SetupFederation ++ Disambiguate). SetupFederation ++ Disambiguate),
case ?config(queue_type, Config2) of
quorum ->
case rabbit_ct_broker_helpers:enable_feature_flag(Config2, quorum_queue) of
ok ->
Config2;
Skip ->
Skip
end;
_ ->
Config2
end.
end_per_group(without_disambiguate, Config) -> end_per_group(without_disambiguate, Config) ->
Config; Config;
end_per_group(with_disambiguate, Config) -> end_per_group(with_disambiguate, Config) ->
Config; Config;
end_per_group(classic_queue, Config) ->
Config;
end_per_group(quorum_queue, Config) ->
Config;
end_per_group(_, Config) -> end_per_group(_, Config) ->
rabbit_ct_helpers:run_steps(Config, rabbit_ct_helpers:run_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++ rabbit_ct_client_helpers:teardown_steps() ++
@ -113,19 +145,21 @@ end_per_testcase(Testcase, Config) ->
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
simple(Config) -> simple(Config) ->
Args = ?config(queue_args, Config),
with_ch(Config, with_ch(Config,
fun (Ch) -> fun (Ch) ->
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>) expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>)
end, upstream_downstream()). end, upstream_downstream(Args)).
multiple_upstreams(Config) -> multiple_upstreams(Config) ->
Args = ?config(queue_args, Config),
with_ch(Config, with_ch(Config,
fun (Ch) -> fun (Ch) ->
expect_federation(Ch, <<"upstream">>, <<"fed12.downstream">>), expect_federation(Ch, <<"upstream">>, <<"fed12.downstream">>),
expect_federation(Ch, <<"upstream2">>, <<"fed12.downstream">>) expect_federation(Ch, <<"upstream2">>, <<"fed12.downstream">>)
end, [q(<<"upstream">>), end, [q(<<"upstream">>, Args),
q(<<"upstream2">>), q(<<"upstream2">>, Args),
q(<<"fed12.downstream">>)]). q(<<"fed12.downstream">>, Args)]).
multiple_upstreams_pattern(Config) -> multiple_upstreams_pattern(Config) ->
set_upstream(Config, 0, <<"local453x">>, set_upstream(Config, 0, <<"local453x">>,
@ -145,43 +179,51 @@ multiple_upstreams_pattern(Config) ->
set_policy_pattern(Config, 0, <<"pattern">>, <<"^pattern\.">>, <<"local\\d+x">>), set_policy_pattern(Config, 0, <<"pattern">>, <<"^pattern\.">>, <<"local\\d+x">>),
Args = ?config(queue_args, Config),
with_ch(Config, with_ch(Config,
fun (Ch) -> fun (Ch) ->
expect_federation(Ch, <<"upstream">>, <<"pattern.downstream">>, ?EXPECT_FEDERATION_TIMEOUT), expect_federation(Ch, <<"upstream">>, <<"pattern.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
expect_federation(Ch, <<"upstream2">>, <<"pattern.downstream">>, ?EXPECT_FEDERATION_TIMEOUT) expect_federation(Ch, <<"upstream2">>, <<"pattern.downstream">>, ?EXPECT_FEDERATION_TIMEOUT)
end, [q(<<"upstream">>), end, [q(<<"upstream">>, Args),
q(<<"upstream2">>), q(<<"upstream2">>, Args),
q(<<"pattern.downstream">>)]), q(<<"pattern.downstream">>, Args)]),
clear_upstream(Config, 0, <<"local453x">>), clear_upstream(Config, 0, <<"local453x">>),
clear_upstream(Config, 0, <<"local3214x">>), clear_upstream(Config, 0, <<"local3214x">>),
clear_policy(Config, 0, <<"pattern">>). clear_policy(Config, 0, <<"pattern">>).
multiple_downstreams(Config) -> multiple_downstreams(Config) ->
Args = ?config(queue_args, Config),
with_ch(Config, with_ch(Config,
fun (Ch) -> fun (Ch) ->
timer:sleep(?INITIAL_WAIT), timer:sleep(?INITIAL_WAIT),
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>, ?EXPECT_FEDERATION_TIMEOUT), expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>, ?EXPECT_FEDERATION_TIMEOUT),
expect_federation(Ch, <<"upstream">>, <<"fed.downstream2">>, ?EXPECT_FEDERATION_TIMEOUT) expect_federation(Ch, <<"upstream">>, <<"fed.downstream2">>, ?EXPECT_FEDERATION_TIMEOUT)
end, upstream_downstream() ++ [q(<<"fed.downstream2">>)]). end, upstream_downstream(Args) ++ [q(<<"fed.downstream2">>, Args)]).
bidirectional(Config) -> bidirectional(Config) ->
Args = ?config(queue_args, Config),
with_ch(Config, with_ch(Config,
fun (Ch) -> fun (Ch) ->
timer:sleep(?INITIAL_WAIT), timer:sleep(?INITIAL_WAIT),
publish_expect(Ch, <<>>, <<"one">>, <<"one">>, <<"first one">>, ?EXPECT_FEDERATION_TIMEOUT), publish_expect(Ch, <<>>, <<"one">>, <<"one">>, <<"first one">>, ?EXPECT_FEDERATION_TIMEOUT),
publish_expect(Ch, <<>>, <<"two">>, <<"two">>, <<"first two">>, ?EXPECT_FEDERATION_TIMEOUT), publish_expect(Ch, <<>>, <<"two">>, <<"two">>, <<"first two">>, ?EXPECT_FEDERATION_TIMEOUT),
Seq = lists:seq(1, 100), Seq = lists:seq(1, 50),
[publish(Ch, <<>>, <<"one">>, <<"bulk">>) || _ <- Seq], [publish(Ch, <<>>, <<"one">>, <<"bulk">>) || _ <- Seq],
[publish(Ch, <<>>, <<"two">>, <<"bulk">>) || _ <- Seq], [publish(Ch, <<>>, <<"two">>, <<"bulk">>) || _ <- Seq],
expect(Ch, <<"one">>, repeat(150, <<"bulk">>)), expect(Ch, <<"one">>, repeat(100, <<"bulk">>)),
expect(Ch, <<"two">>, repeat(50, <<"bulk">>)), expect_empty(Ch, <<"one">>),
expect_empty(Ch, <<"two">>),
[publish(Ch, <<>>, <<"one">>, <<"bulk">>) || _ <- Seq],
[publish(Ch, <<>>, <<"two">>, <<"bulk">>) || _ <- Seq],
expect(Ch, <<"two">>, repeat(100, <<"bulk">>)),
expect_empty(Ch, <<"one">>), expect_empty(Ch, <<"one">>),
expect_empty(Ch, <<"two">>) expect_empty(Ch, <<"two">>)
end, [q(<<"one">>), end, [q(<<"one">>, Args),
q(<<"two">>)]). q(<<"two">>, Args)]).
dynamic_reconfiguration(Config) -> dynamic_reconfiguration(Config) ->
Args = ?config(queue_args, Config),
with_ch(Config, with_ch(Config,
fun (Ch) -> fun (Ch) ->
timer:sleep(?INITIAL_WAIT), timer:sleep(?INITIAL_WAIT),
@ -199,9 +241,10 @@ dynamic_reconfiguration(Config) ->
set_upstream(Config, 0, <<"localhost">>, URI), set_upstream(Config, 0, <<"localhost">>, URI),
set_upstream(Config, 0, <<"localhost">>, URI), set_upstream(Config, 0, <<"localhost">>, URI),
expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>) expect_federation(Ch, <<"upstream">>, <<"fed.downstream">>)
end, upstream_downstream()). end, upstream_downstream(Args)).
federate_unfederate(Config) -> federate_unfederate(Config) ->
Args = ?config(queue_args, Config),
with_ch(Config, with_ch(Config,
fun (Ch) -> fun (Ch) ->
timer:sleep(?INITIAL_WAIT), timer:sleep(?INITIAL_WAIT),
@ -217,10 +260,11 @@ federate_unfederate(Config) ->
rabbit_ct_broker_helpers:set_policy(Config, 0, rabbit_ct_broker_helpers:set_policy(Config, 0,
<<"fed">>, <<"^fed\.">>, <<"all">>, [ <<"fed">>, <<"^fed\.">>, <<"all">>, [
{<<"federation-upstream-set">>, <<"upstream">>}]) {<<"federation-upstream-set">>, <<"upstream">>}])
end, upstream_downstream() ++ [q(<<"fed.downstream2">>)]). end, upstream_downstream(Args) ++ [q(<<"fed.downstream2">>, Args)]).
dynamic_plugin_stop_start(Config) -> dynamic_plugin_stop_start(Config) ->
DownQ2 = <<"fed.downstream2">>, DownQ2 = <<"fed.downstream2">>,
Args = ?config(queue_args, Config),
with_ch(Config, with_ch(Config,
fun (Ch) -> fun (Ch) ->
timer:sleep(?INITIAL_WAIT), timer:sleep(?INITIAL_WAIT),
@ -235,8 +279,8 @@ dynamic_plugin_stop_start(Config) ->
expect_no_federation(Ch, UpQ, DownQ1), expect_no_federation(Ch, UpQ, DownQ1),
expect_no_federation(Ch, UpQ, DownQ2), expect_no_federation(Ch, UpQ, DownQ2),
declare_queue(Ch, q(DownQ1)), declare_queue(Ch, q(DownQ1, Args)),
declare_queue(Ch, q(DownQ2)), declare_queue(Ch, q(DownQ2, Args)),
ok = rabbit_ct_broker_helpers:enable_plugin(Config, 0, "rabbitmq_federation"), ok = rabbit_ct_broker_helpers:enable_plugin(Config, 0, "rabbitmq_federation"),
%% Declare a queue then re-enable the plugin, the links appear %% Declare a queue then re-enable the plugin, the links appear
@ -255,7 +299,7 @@ dynamic_plugin_stop_start(Config) ->
length(L) =:= 2 length(L) =:= 2
end), end),
expect_federation(Ch, UpQ, DownQ1, 120000) expect_federation(Ch, UpQ, DownQ1, 120000)
end, upstream_downstream() ++ [q(DownQ2)]). end, upstream_downstream(Args) ++ [q(DownQ2, Args)]).
restart_upstream(Config) -> restart_upstream(Config) ->
[Rabbit, Hare] = rabbit_ct_broker_helpers:get_node_configs(Config, [Rabbit, Hare] = rabbit_ct_broker_helpers:get_node_configs(Config,
@ -266,8 +310,9 @@ restart_upstream(Config) ->
Downstream = rabbit_ct_client_helpers:open_channel(Config, Rabbit), Downstream = rabbit_ct_client_helpers:open_channel(Config, Rabbit),
Upstream = rabbit_ct_client_helpers:open_channel(Config, Hare), Upstream = rabbit_ct_client_helpers:open_channel(Config, Hare),
declare_queue(Upstream, q(<<"test">>)), Args = ?config(queue_args, Config),
declare_queue(Downstream, q(<<"test">>)), declare_queue(Upstream, q(<<"test">>, Args)),
declare_queue(Downstream, q(<<"test">>, Args)),
Seq = lists:seq(1, 100), Seq = lists:seq(1, 100),
[publish(Upstream, <<>>, <<"test">>, <<"bulk">>) || _ <- Seq], [publish(Upstream, <<>>, <<"test">>, <<"bulk">>) || _ <- Seq],
expect(Upstream, <<"test">>, repeat(25, <<"bulk">>)), expect(Upstream, <<"test">>, repeat(25, <<"bulk">>)),
@ -325,4 +370,7 @@ expect_no_federation(Ch, UpstreamQ, DownstreamQ) ->
expect(Ch, UpstreamQ, [<<"HELLO">>]). expect(Ch, UpstreamQ, [<<"HELLO">>]).
upstream_downstream() -> upstream_downstream() ->
[q(<<"upstream">>), q(<<"fed.downstream">>)]. upstream_downstream([]).
upstream_downstream(Args) ->
[q(<<"upstream">>, Args), q(<<"fed.downstream">>, Args)].

View File

@ -176,10 +176,10 @@ expect([], _Timeout) ->
ok; ok;
expect(Payloads, Timeout) -> expect(Payloads, Timeout) ->
receive receive
{#'basic.deliver'{}, #amqp_msg{payload = Payload}} -> {#'basic.deliver'{delivery_tag = DTag}, #amqp_msg{payload = Payload}} ->
case lists:member(Payload, Payloads) of case lists:member(Payload, Payloads) of
true -> true ->
ct:pal("Consumed a message: ~p", [Payload]), ct:pal("Consumed a message: ~p ~p left: ~p", [Payload, DTag, length(Payloads) - 1]),
expect(Payloads -- [Payload], Timeout); expect(Payloads -- [Payload], Timeout);
false -> ?assert(false, rabbit_misc:format("received an unexpected payload ~p", [Payload])) false -> ?assert(false, rabbit_misc:format("received an unexpected payload ~p", [Payload]))
end end
@ -350,5 +350,9 @@ delete_queue(Ch, Q) ->
amqp_channel:call(Ch, #'queue.delete'{queue = Q}). amqp_channel:call(Ch, #'queue.delete'{queue = Q}).
q(Name) -> q(Name) ->
q(Name, []).
q(Name, Args) ->
#'queue.declare'{queue = Name, #'queue.declare'{queue = Name,
durable = true}. durable = true,
arguments = Args}.