Better handle changing quorum queue leaders

All maps that track queues inside the channel should use the queue name
atom rather than the server id as they key so that leader changes don't
impact this tracking.
This commit is contained in:
kjnilsson 2018-11-30 14:07:22 +00:00
parent a4c00475eb
commit ea0e192dc1
5 changed files with 139 additions and 64 deletions

View File

@ -1094,7 +1094,8 @@ notify_down_all(QPids, ChPid, Timeout) ->
Error -> {error, Error}
end.
activate_limit_all(QPids, ChPid) ->
activate_limit_all(QRefs, ChPid) ->
QPids = [P || P <- QRefs, ?IS_CLASSIC(P)],
delegate:invoke_no_result(QPids, {gen_server2, cast,
[{activate_limit, ChPid}]}).

View File

@ -110,7 +110,7 @@
%% when queue.bind's queue field is empty,
%% this name will be used instead
most_recently_declared_queue,
%% a map of queue pid to queue name
%% a map of queue ref to queue name
queue_names,
%% queue processes are monitored to update
%% queue names
@ -210,7 +210,7 @@
end).
-define(IS_CLASSIC(QPid), is_pid(QPid)).
-define(IS_QUORUM(QPid), is_tuple(QPid)).
-define(IS_QUORUM(QPid), is_tuple(QPid) orelse is_atom(QPid)).
%%----------------------------------------------------------------------------
@ -672,18 +672,18 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
end, Actions),
noreply_coalesce(confirm(MsgSeqNos, Name, State));
eol ->
State1 = handle_consuming_queue_down_or_eol(From, State0),
State2 = handle_delivering_queue_down(From, State1),
State1 = handle_consuming_queue_down_or_eol(Name, State0),
State2 = handle_delivering_queue_down(Name, State1),
%% TODO: this should use dtree:take/3
{MXs, UC1} = dtree:take(Name, State2#ch.unconfirmed),
State3 = record_confirms(MXs, State1#ch{unconfirmed = UC1}),
case maps:find(From, QNames) of
case maps:find(Name, QNames) of
{ok, QName} -> erase_queue_stats(QName);
error -> ok
end,
noreply_coalesce(
State3#ch{queue_states = maps:remove(Name, QueueStates),
queue_names = maps:remove(From, QNames)})
queue_names = maps:remove(Name, QNames)})
end;
_ ->
%% the assumption here is that the queue state has been cleaned up and
@ -1336,13 +1336,14 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait},
return_ok(State, NoWait, OkMsg);
{ok, {Q = #amqqueue{pid = QPid}, _CParams}} ->
ConsumerMapping1 = maps:remove(ConsumerTag, ConsumerMapping),
QRef = qpid_to_ref(QPid),
QCons1 =
case maps:find(QPid, QCons) of
case maps:find(QRef, QCons) of
error -> QCons;
{ok, CTags} -> CTags1 = gb_sets:delete(ConsumerTag, CTags),
case gb_sets:is_empty(CTags1) of
true -> maps:remove(QPid, QCons);
false -> maps:put(QPid, CTags1, QCons)
true -> maps:remove(QRef, QCons);
false -> maps:put(QRef, CTags1, QCons)
end
end,
NewState = State#ch{consumer_mapping = ConsumerMapping1,
@ -1395,7 +1396,7 @@ handle_method(#'basic.qos'{global = true,
case ((not rabbit_limiter:is_active(Limiter)) andalso
rabbit_limiter:is_active(Limiter1)) of
true -> rabbit_amqqueue:activate_limit_all(
consumer_queues(State#ch.consumer_mapping), self());
consumer_queue_refs(State#ch.consumer_mapping), self());
false -> ok
end,
{reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
@ -1641,25 +1642,26 @@ consumer_monitor(ConsumerTag,
State = #ch{consumer_mapping = ConsumerMapping,
queue_monitors = QMons,
queue_consumers = QCons}) ->
{#amqqueue{pid = QPid}, _} =
maps:get(ConsumerTag, ConsumerMapping),
CTags1 = case maps:find(QPid, QCons) of
{#amqqueue{pid = QPid}, _} = maps:get(ConsumerTag, ConsumerMapping),
QRef = qpid_to_ref(QPid),
CTags1 = case maps:find(QRef, QCons) of
{ok, CTags} -> gb_sets:insert(ConsumerTag, CTags);
error -> gb_sets:singleton(ConsumerTag)
end,
QCons1 = maps:put(QPid, CTags1, QCons),
State#ch{queue_monitors = maybe_monitor(QPid, QMons),
QCons1 = maps:put(QRef, CTags1, QCons),
State#ch{queue_monitors = maybe_monitor(QRef, QMons),
queue_consumers = QCons1}.
track_delivering_queue(NoAck, QPid, QName,
State = #ch{queue_names = QNames,
queue_monitors = QMons,
delivering_queues = DQ}) ->
State#ch{queue_names = maps:put(QPid, QName, QNames),
queue_monitors = maybe_monitor(QPid, QMons),
QRef = qpid_to_ref(QPid),
State#ch{queue_names = maps:put(QRef, QName, QNames),
queue_monitors = maybe_monitor(QRef, QMons),
delivering_queues = case NoAck of
true -> DQ;
false -> sets:add_element(QPid, DQ)
false -> sets:add_element(QRef, DQ)
end}.
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC,
@ -1678,16 +1680,16 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC,
handle_publishing_queue_down(QPid, _Reason, _State) when ?IS_QUORUM(QPid) ->
error(quorum_queues_should_never_be_monitored).
handle_consuming_queue_down_or_eol(QPid,
State = #ch{queue_consumers = QCons,
queue_names = QNames}) ->
ConsumerTags = case maps:find(QPid, QCons) of
handle_consuming_queue_down_or_eol(QRef,
State = #ch{queue_consumers = QCons,
queue_names = QNames}) ->
ConsumerTags = case maps:find(QRef, QCons) of
error -> gb_sets:new();
{ok, CTags} -> CTags
end,
gb_sets:fold(
fun (CTag, StateN = #ch{consumer_mapping = CMap}) ->
QName = maps:get(QPid, QNames),
QName = maps:get(QRef, QNames),
case queue_down_consumer_action(CTag, CMap) of
remove ->
cancel_consumer(CTag, QName, StateN);
@ -1699,7 +1701,7 @@ handle_consuming_queue_down_or_eol(QPid,
_ -> cancel_consumer(CTag, QName, StateN)
end
end
end, State#ch{queue_consumers = maps:remove(QPid, QCons)}, ConsumerTags).
end, State#ch{queue_consumers = maps:remove(QRef, QCons)}, ConsumerTags).
%% [0] There is a slight danger here that if a queue is deleted and
%% then recreated again the reconsume will succeed even though it was
@ -1726,8 +1728,8 @@ queue_down_consumer_action(CTag, CMap) ->
_ -> {recover, ConsumeSpec}
end.
handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
handle_delivering_queue_down(QRef, State = #ch{delivering_queues = DQ}) ->
State#ch{delivering_queues = sets:del_element(QRef, DQ)}.
binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
RoutingKey, Arguments, VHostPath, ConnPid,
@ -1882,7 +1884,7 @@ ack(Acked, State = #ch{queue_names = QNames,
State#ch{queue_states = QueueStates}.
incr_queue_stats(QPid, QNames, MsgIds, State) ->
case maps:find(QPid, QNames) of
case maps:find(qpid_to_ref(QPid), QNames) of
{ok, QName} -> Count = length(MsgIds),
?INCR_STATS(queue_stats, QName, Count, ack, State);
error -> ok
@ -1906,10 +1908,10 @@ notify_queues(State = #ch{state = closing}) ->
{ok, State};
notify_queues(State = #ch{consumer_mapping = Consumers,
delivering_queues = DQ }) ->
QPids0 = sets:to_list(
sets:union(sets:from_list(consumer_queues(Consumers)), DQ)),
QRefs0 = sets:to_list(
sets:union(sets:from_list(consumer_queue_refs(Consumers)), DQ)),
%% filter to only include pids to avoid trying to notify quorum queues
QPids = [P || P <- QPids0, ?IS_CLASSIC(P)],
QPids = [P || P <- QRefs0, ?IS_CLASSIC(P)],
Timeout = get_operation_timeout(),
{rabbit_amqqueue:notify_down_all(QPids, self(), Timeout),
State#ch{state = closing}}.
@ -1925,8 +1927,8 @@ foreach_per_queue(F, UAL, Acc) ->
end, gb_trees:empty(), UAL),
rabbit_misc:gb_trees_fold(fun (Key, Val, Acc0) -> F(Key, Val, Acc0) end, Acc, T).
consumer_queues(Consumers) ->
lists:usort([QPid || {_Key, {#amqqueue{pid = QPid}, _CParams}}
consumer_queue_refs(Consumers) ->
lists:usort([qpid_to_ref(QPid) || {_Key, {#amqqueue{pid = QPid}, _CParams}}
<- maps:to_list(Consumers)]).
%% tell the limiter about the number of acks that have been received
@ -1983,9 +1985,10 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
{QNames1, QMons1} =
lists:foldl(fun (#amqqueue{pid = QPid, name = QName},
{QNames0, QMons0}) ->
{case maps:is_key(QPid, QNames0) of
QRef = qpid_to_ref(QPid),
{case maps:is_key(QRef, QNames0) of
true -> QNames0;
false -> maps:put(QPid, QName, QNames0)
false -> maps:put(QRef, QName, QNames0)
end, maybe_monitor(QPid, QMons0)}
end, {QNames, maybe_monitor_all(DeliveredQPids, QMons)}, Qs),
State1 = State#ch{queue_names = QNames1,
@ -2000,8 +2003,8 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
fine ->
?INCR_STATS(exchange_stats, XName, 1, publish),
[?INCR_STATS(queue_exchange_stats, {QName, XName}, 1, publish) ||
QPid <- AllDeliveredQRefs,
{ok, QName} <- [maps:find(QPid, QNames1)]];
QRef <- AllDeliveredQRefs,
{ok, QName} <- [maps:find(QRef, QNames1)]];
_ ->
ok
end,
@ -2494,3 +2497,8 @@ maybe_monitor_all(Items, S) -> lists:foldl(fun maybe_monitor/2, S, Items).
add_delivery_count_header(MsgHeader, Msg) ->
Count = maps:get(delivery_count, MsgHeader, 0),
rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg).
qpid_to_ref(Pid) when is_pid(Pid) -> Pid;
qpid_to_ref({Name, _}) -> Name;
%% assume it already is a ref
qpid_to_ref(Ref) -> Ref.

View File

@ -1,3 +1,19 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_fifo).
-behaviour(ra_machine).
@ -228,9 +244,9 @@ update_state(Conf, State) ->
{state(), ra_machine:effects(), Reply :: term()}.
apply(#{index := RaftIdx}, {enqueue, From, Seq, RawMsg}, Effects0, State00) ->
case maybe_enqueue(RaftIdx, From, Seq, RawMsg, Effects0, State00) of
{ok, State0, Effects} ->
State = append_to_master_index(RaftIdx, State0),
checkout(State, Effects);
{ok, State0, Effects1} ->
{State, Effects, ok} = checkout(State0, Effects1),
{append_to_master_index(RaftIdx, State), Effects, ok};
{duplicate, State, Effects} ->
{State, Effects, ok}
end;
@ -314,6 +330,7 @@ apply(_, {credit, NewCredit, RemoteDelCnt, Drain, ConsumerId}, Effects0,
apply(_, {checkout, {dequeue, _}, {_Tag, _Pid}}, Effects0,
#state{messages = M,
prefix_msg_count = 0} = State0) when map_size(M) == 0 ->
%% FIX: also check if there are returned messages
%% TODO do we need metric visibility of empty get requests?
{State0, Effects0, {dequeue, empty}};
apply(Meta, {checkout, {dequeue, settled}, ConsumerId},
@ -780,6 +797,7 @@ cancel_consumer_effects(Pid, Name,
update_smallest_raft_index(IncomingRaftIdx, OldIndexes,
#state{ra_indexes = Indexes,
% prefix_msg_count = 0,
messages = Messages} = State, Effects) ->
case rabbit_fifo_index:size(Indexes) of
0 when map_size(Messages) =:= 0 ->
@ -1023,12 +1041,17 @@ dehydrate_state(#state{messages = Messages0,
ra_indexes = rabbit_fifo_index:empty(),
low_msg_num = undefined,
consumers = maps:map(fun (_, C) ->
C#consumer{checked_out = #{}}
dehydrate_consumer(C)
% C#consumer{checked_out = #{}}
end, Consumers),
returns = queue:new(),
%% messages include returns
prefix_msg_count = maps:size(Messages0) + MsgCount}.
dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
Checked = maps:map(fun (_, _) -> '$prefix_msg' end, Checked0),
Con#consumer{checked_out = Checked}.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
@ -1406,7 +1429,7 @@ tick_test() ->
ok.
enq_deq_snapshot_recover_test() ->
Tag = <<"release_cursor_snapshot_state_test">>,
Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
% OthPid = spawn(fun () -> ok end),
% Oth = {<<"oth">>, OthPid},
@ -1539,6 +1562,23 @@ enq_check_settle_snapshot_purge_test() ->
% ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
run_snapshot_test(?FUNCTION_NAME, Commands).
enq_check_settle_duplicate_test() ->
%% duplicate settle commands are likely
Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
Commands = [
{checkout, {auto, 2, simple_prefetch}, Cid},
{enqueue, self(), 1, one}, %% 0
{enqueue, self(), 2, two}, %% 0
{settle, [0], Cid},
{settle, [1], Cid},
{settle, [1], Cid},
{enqueue, self(), 3, three},
{settle, [2], Cid}
],
% ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
run_snapshot_test(?FUNCTION_NAME, Commands).
run_snapshot_test(Name, Commands) ->
%% create every incremental permuation of the commands lists
%% and run the snapshot tests against that
@ -1556,6 +1596,7 @@ run_snapshot_test0(Name, Commands) ->
Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true;
(_) -> false
end, Entries),
?debugFmt("running from snapshot: ~b", [SnapIdx]),
{S, _} = run_log(SnapState, Filtered),
% assert log can be restored from any release cursor index
% ?debugFmt("Name ~p Idx ~p S~p~nState~p~nSnapState ~p~nFiltered ~p~n",
@ -1571,7 +1612,7 @@ prefixes(Source, N, Acc) ->
prefixes(Source, N+1, [X | Acc]).
delivery_query_returns_deliveries_test() ->
Tag = <<"release_cursor_snapshot_state_test">>,
Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
Commands = [
{checkout, {auto, 5, simple_prefetch}, Cid},

View File

@ -1,3 +1,19 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%
%% @doc Provides an easy to consume API for interacting with the {@link rabbit_fifo.}
%% state machine implementation running inside a `ra' raft system.
%%
@ -432,19 +448,10 @@ update_machine_state(Node, Conf) ->
{rabbit_fifo:client_msg(), state()} | eol.
handle_ra_event(From, {applied, Seqs},
#state{soft_limit = SftLmt,
leader = CurLeader,
last_applied = _Last,
unblock_handler = UnblockFun} = State0) ->
{Corrs, Actions, State1} = lists:foldl(fun seq_applied/2,
{[], [], State0#state{leader = From}},
Seqs),
case From of
CurLeader -> ok;
_ ->
?INFO("rabbit_fifo_client: leader change from ~w to ~w~n"
"applying ~w last ~w~n",
[CurLeader, From, Seqs, _Last])
end,
case maps:size(State1#state.pending) < SftLmt of
true when State1#state.slow == true ->
% we have exited soft limit state
@ -479,10 +486,7 @@ handle_ra_event(Leader, {machine, leader_change},
#state{leader = Leader} = State) ->
%% leader already known
{internal, [], [], State};
handle_ra_event(Leader, {machine, leader_change},
#state{leader = OldLeader} = State0) ->
?INFO("rabbit_fifo_client: leader changed from ~w to ~w~n",
[OldLeader, Leader]),
handle_ra_event(Leader, {machine, leader_change}, State0) ->
%% we need to update leader
%% and resend any pending commands
State = resend_all_pending(State0#state{leader = Leader}),
@ -490,9 +494,9 @@ handle_ra_event(Leader, {machine, leader_change},
handle_ra_event(_From, {rejected, {not_leader, undefined, _Seq}}, State0) ->
% TODO: how should these be handled? re-sent on timer or try random
{internal, [], [], State0};
handle_ra_event(From, {rejected, {not_leader, Leader, Seq}}, State0) ->
?INFO("rabbit_fifo_client: rejected ~b not leader ~w leader: ~w~n",
[Seq, From, Leader]),
handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) ->
% ?INFO("rabbit_fifo_client: rejected ~b not leader ~w leader: ~w~n",
% [Seq, From, Leader]),
State1 = State0#state{leader = Leader},
State = resend(Seq, State1),
{internal, [], [], State};
@ -535,7 +539,6 @@ try_process_command([Server | Rem], Cmd, State) ->
seq_applied({Seq, MaybeAction},
{Corrs, Actions0, #state{last_applied = Last} = State0})
when Seq > Last orelse Last =:= undefined ->
% ?INFO("rabbit_fifo_client: applying seq ~b last ~w", [Seq, Last]),
State1 = case Last of
undefined -> State0;
_ ->
@ -550,14 +553,12 @@ seq_applied({Seq, MaybeAction},
{[Corr | Corrs], Actions, State#state{pending = Pending,
last_applied = Seq}};
error ->
?INFO("rabbit_fifo_client: pending not found ~w", [Seq]),
% must have already been resent or removed for some other reason
% still need to update last_applied or we may inadvertently resend
% stuff later
{Corrs, Actions, State#state{last_applied = Seq}}
end;
seq_applied(_Seq, Acc) ->
?INFO("rabbit_fifo_client: dropping seq ~b", [_Seq]),
Acc.
maybe_add_action(ok, Acc, State) ->
@ -579,7 +580,7 @@ maybe_add_action(Action, Acc, State) ->
{[Action | Acc], State}.
do_resends(From, To, State) when From =< To ->
?INFO("doing resends From ~w To ~w~n", [From, To]),
% ?INFO("rabbit_fifo_client: doing resends From ~w To ~w~n", [From, To]),
lists:foldl(fun resend/2, State, lists:seq(From, To));
do_resends(_, _, State) ->
State.
@ -596,7 +597,6 @@ resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) ->
resend_all_pending(#state{pending = Pend} = State) ->
Seqs = lists:sort(maps:keys(Pend)),
?INFO ("rabbit_fifo_client: resending all ~w~n", [Seqs]),
lists:foldl(fun resend/2, State, Seqs).
handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0,

View File

@ -48,6 +48,7 @@ groups() ->
++ all_tests()},
{cluster_size_3, [], [
declare_during_node_down,
simple_confirm_availability_on_leader_change,
confirm_availability_on_leader_change,
recover_from_single_failure,
recover_from_multiple_failures,
@ -1563,6 +1564,30 @@ declare_during_node_down(Config) ->
wait_for_messages_ready(Servers, RaName, 1),
ok.
simple_confirm_availability_on_leader_change(Config) ->
[Node1, Node2, _Node3] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
%% declare a queue on node2 - this _should_ host the leader on node 2
DCh = rabbit_ct_client_helpers:open_channel(Config, Node2),
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(DCh, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
erlang:process_flag(trap_exit, true),
%% open a channel to another node
Ch = rabbit_ct_client_helpers:open_channel(Config, Node1),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
publish_confirm(Ch, QQ),
%% stop the node hosting the leader
stop_node(Config, Node2),
%% this should not fail as the channel should detect the new leader and
%% resend to that
publish_confirm(Ch, QQ),
ok = rabbit_ct_broker_helpers:start_node(Config, Node2),
ok.
confirm_availability_on_leader_change(Config) ->
[Node1, Node2, _Node3] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),