Support dlx policy changes in quorum queues

[#161247406]
This commit is contained in:
Diana Corbacho 2018-11-07 19:14:50 +00:00
parent 84fb288e45
commit b56db0e487
5 changed files with 72 additions and 25 deletions

View File

@ -894,8 +894,11 @@ list_local(VHostPath) ->
[ Q || #amqqueue{state = State, pid = QPid} = Q <- list(VHostPath),
State =/= crashed, is_local_to_node(QPid, node()) ].
notify_policy_changed(#amqqueue{pid = QPid}) ->
gen_server2:cast(QPid, policy_changed).
notify_policy_changed(#amqqueue{pid = QPid}) when ?IS_CLASSIC(QPid) ->
gen_server2:cast(QPid, policy_changed);
notify_policy_changed(#amqqueue{pid = QPid,
name = QName}) when ?IS_QUORUM(QPid) ->
rabbit_quorum_queue:policy_changed(QName, QPid).
consumers(#amqqueue{ pid = QPid }) ->
delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}).

View File

@ -207,19 +207,19 @@
-spec init(config()) -> {state(), ra_machine:effects()}.
init(#{name := Name} = Conf) ->
update_state(Conf, #state{name = Name}).
update_state(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
CCH = maps:get(cancel_consumer_handler, Conf, undefined),
BLH = maps:get(become_leader_handler, Conf, undefined),
MH = maps:get(metrics_handler, Conf, undefined),
SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL),
#state{name = Name,
dead_letter_handler = DLH,
cancel_consumer_handler = CCH,
become_leader_handler = BLH,
metrics_handler = MH,
shadow_copy_interval = SHI}.
State#state{dead_letter_handler = DLH,
cancel_consumer_handler = CCH,
become_leader_handler = BLH,
metrics_handler = MH,
shadow_copy_interval = SHI}.
% msg_ids are scoped per consumer
% ra_indexes holds all raft indexes for enqueues currently on queue
@ -431,7 +431,9 @@ apply(_, {nodeup, Node}, Effects0,
% TODO: avoid list concat
{State0, Monitors ++ Effects0, ok};
apply(_, {nodedown, _Node}, Effects, State) ->
{State, Effects, ok}.
{State, Effects, ok};
apply(_, {update_state, Conf}, Effects, State) ->
{update_state(Conf, State), Effects, ok}.
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
state_enter(leader, #state{consumers = Custs,

View File

@ -21,7 +21,8 @@
handle_ra_event/3,
untracked_enqueue/2,
purge/1,
cluster_name/1
cluster_name/1,
update_machine_state/2
]).
-include_lib("ra/include/ra.hrl").
@ -375,6 +376,14 @@ purge(Node) ->
cluster_name(#state{cluster_name = ClusterName}) ->
ClusterName.
update_machine_state(Node, Conf) ->
case ra:process_command(Node, {update_state, Conf}) of
{ok, ok, _} ->
ok;
Err ->
Err
end.
%% @doc Handles incoming `ra_events'. Events carry both internal "bookeeping"
%% events emitted by the `ra' leader as well as `rabbit_fifo' emitted events such
%% as message deliveries. All ra events need to be handled by {@module}

View File

@ -34,6 +34,7 @@
-export([add_member/3]).
-export([delete_member/3]).
-export([requeue/3]).
-export([policy_changed/2]).
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
@ -147,12 +148,14 @@ declare(#amqqueue{name = QName,
ra_machine(Q = #amqqueue{name = QName}) ->
{module, rabbit_fifo,
#{dead_letter_handler => dlx_mfa(Q),
cancel_consumer_handler => {?MODULE, cancel_consumer, [QName]},
become_leader_handler => {?MODULE, become_leader, [QName]},
metrics_handler => {?MODULE, update_metrics, [QName]}}}.
ra_machine(Q) ->
{module, rabbit_fifo, ra_machine_state(Q)}.
ra_machine_state(Q = #amqqueue{name = QName}) ->
#{dead_letter_handler => dlx_mfa(Q),
cancel_consumer_handler => {?MODULE, cancel_consumer, [QName]},
become_leader_handler => {?MODULE, become_leader, [QName]},
metrics_handler => {?MODULE, update_metrics, [QName]}}.
cancel_consumer_handler(QName, {ConsumerTag, ChPid}, _Name) ->
Node = node(ChPid),
@ -386,6 +389,10 @@ purge(Node) ->
requeue(ConsumerTag, MsgIds, FState) ->
rabbit_fifo_client:return(quorum_ctag(ConsumerTag), MsgIds, FState).
policy_changed(QName, Node) ->
{ok, Q} = rabbit_amqqueue:lookup(QName),
rabbit_fifo_client:update_machine_state(Node, ra_machine_state(Q)).
cluster_state(Name) ->
case whereis(Name) of
undefined -> down;

View File

@ -99,6 +99,7 @@ all_tests() ->
dead_letter_to_classic_queue,
dead_letter_to_quorum_queue,
dead_letter_from_classic_to_quorum_queue,
dead_letter_policy,
cleanup_queue_state_on_channel_after_publish,
cleanup_queue_state_on_channel_after_subscribe,
basic_cancel,
@ -1063,22 +1064,47 @@ dead_letter_to_classic_queue(Config) ->
{<<"x-dead-letter-routing-key">>, longstr, CQ}
])),
?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])),
RaName = ra_name(QQ),
publish(Ch, QQ),
test_dead_lettering(true, Config, Ch, Servers, ra_name(QQ), QQ, CQ).
test_dead_lettering(PolicySet, Config, Ch, Servers, RaName, Source, Destination) ->
publish(Ch, Source),
wait_for_messages_ready(Servers, RaName, 1),
wait_for_messages_pending_ack(Servers, RaName, 0),
wait_for_messages(Config, [[CQ, <<"0">>, <<"0">>, <<"0">>]]),
DeliveryTag = consume(Ch, QQ, false),
wait_for_messages(Config, [[Destination, <<"0">>, <<"0">>, <<"0">>]]),
DeliveryTag = consume(Ch, Source, false),
wait_for_messages_ready(Servers, RaName, 0),
wait_for_messages_pending_ack(Servers, RaName, 1),
wait_for_messages(Config, [[CQ, <<"0">>, <<"0">>, <<"0">>]]),
wait_for_messages(Config, [[Destination, <<"0">>, <<"0">>, <<"0">>]]),
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = false,
requeue = false}),
wait_for_messages_ready(Servers, RaName, 0),
wait_for_messages_pending_ack(Servers, RaName, 0),
wait_for_messages(Config, [[CQ, <<"1">>, <<"1">>, <<"0">>]]),
_ = consume(Ch, CQ, false).
case PolicySet of
true ->
wait_for_messages(Config, [[Destination, <<"1">>, <<"1">>, <<"0">>]]),
_ = consume(Ch, Destination, true);
false ->
wait_for_messages(Config, [[Destination, <<"0">>, <<"0">>, <<"0">>]])
end.
dead_letter_policy(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
CQ = <<"classic-dead_letter_policy">>,
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])),
ok = rabbit_ct_broker_helpers:set_policy(
Config, 0, <<"dlx">>, <<"dead_letter.*">>, <<"queues">>,
[{<<"dead-letter-exchange">>, <<"">>},
{<<"dead-letter-routing-key">>, CQ}]),
RaName = ra_name(QQ),
test_dead_lettering(true, Config, Ch, Servers, RaName, QQ, CQ),
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"dlx">>),
test_dead_lettering(false, Config, Ch, Servers, RaName, QQ, CQ).
dead_letter_to_quorum_queue(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),