QQ: resend pending commands when new leader detected on applied notification.
When a leader changes all enqueuer and consumer processes are notified from the `state_enter(leader,` callback. However a new leader may not yet have applied all commands that the old leader had. If any of those commands is a checkout or a register_enqueuer command these processes will not be notified of the new leader and thus may never resend their pending commands. The new leader will however send an applied notification when it does apply these entries and these are always sent from the leader process so can also be used to trigger pending resends. This commit implements that.
This commit is contained in:
parent
a51d8a5ec9
commit
d31b9aa8a3
|
@ -586,26 +586,50 @@ update_machine_state(Server, Conf) ->
|
|||
ra_server_proc:ra_event_body(), state()) ->
|
||||
{internal, Correlators :: [term()], rabbit_queue_type:actions(), state()} |
|
||||
{rabbit_fifo:client_msg(), state()} | {eol, rabbit_queue_type:actions()}.
|
||||
handle_ra_event(QName, From, {applied, Seqs},
|
||||
#state{cfg = #cfg{soft_limit = SftLmt}} = State0) ->
|
||||
handle_ra_event(QName, Leader, {applied, Seqs},
|
||||
#state{leader = OldLeader,
|
||||
cfg = #cfg{soft_limit = SftLmt}} = State0) ->
|
||||
|
||||
{Corrs, ActionsRev, State1} = lists:foldl(fun seq_applied/2,
|
||||
{[], [], State0#state{leader = From}},
|
||||
{[], [], State0#state{leader = Leader}},
|
||||
Seqs),
|
||||
|
||||
%% if the leader has changed we need to resend any pending commands remaining
|
||||
%% after the applied processing
|
||||
State2 = if OldLeader =/= Leader ->
|
||||
%% double check before resending as applied notifications
|
||||
%% can arrive from old leaders in any order
|
||||
case ra:members(Leader) of
|
||||
{ok, _, ActualLeader}
|
||||
when ActualLeader =/= OldLeader ->
|
||||
%% there is a new leader
|
||||
rabbit_log:debug("~ts: Detected QQ leader change (applied) "
|
||||
"from ~w to ~w, "
|
||||
"resending ~b pending commands",
|
||||
[?MODULE, OldLeader, ActualLeader,
|
||||
maps:size(State1#state.pending)]),
|
||||
resend_all_pending(State1#state{leader = ActualLeader});
|
||||
_ ->
|
||||
State1
|
||||
end;
|
||||
true ->
|
||||
State1
|
||||
end,
|
||||
|
||||
Actions0 = lists:reverse(ActionsRev),
|
||||
Actions = case Corrs of
|
||||
[] ->
|
||||
Actions0;
|
||||
_ ->
|
||||
%%TODO consider using lists:foldr/3 above because
|
||||
%%TODO: consider using lists:foldr/3 above because
|
||||
%% Corrs is returned in the wrong order here.
|
||||
%% The wrong order does not matter much because the channel sorts the
|
||||
%% sequence numbers before confirming to the client. But rabbit_fifo_client
|
||||
%% is sequence numer agnostic: it handles any correlation terms.
|
||||
[{settled, QName, Corrs} | Actions0]
|
||||
end,
|
||||
case map_size(State1#state.pending) < SftLmt of
|
||||
true when State1#state.slow == true ->
|
||||
case map_size(State2#state.pending) < SftLmt of
|
||||
true when State2#state.slow == true ->
|
||||
% we have exited soft limit state
|
||||
% send any unsent commands and cancel the time as
|
||||
% TODO: really the timer should only be cancelled when the channel
|
||||
|
@ -613,7 +637,7 @@ handle_ra_event(QName, From, {applied, Seqs},
|
|||
% channel is interacting with)
|
||||
% but the fact the queue has just applied suggests
|
||||
% it's ok to cancel here anyway
|
||||
State2 = cancel_timer(State1#state{slow = false,
|
||||
State3 = cancel_timer(State2#state{slow = false,
|
||||
unsent_commands = #{}}),
|
||||
% build up a list of commands to issue
|
||||
Commands = maps:fold(
|
||||
|
@ -622,16 +646,16 @@ handle_ra_event(QName, From, {applied, Seqs},
|
|||
add_command(Cid, return, Returns,
|
||||
add_command(Cid, discard,
|
||||
Discards, Acc)))
|
||||
end, [], State1#state.unsent_commands),
|
||||
ServerId = pick_server(State2),
|
||||
end, [], State2#state.unsent_commands),
|
||||
ServerId = pick_server(State3),
|
||||
%% send all the settlements and returns
|
||||
State = lists:foldl(fun (C, S0) ->
|
||||
send_command(ServerId, undefined, C,
|
||||
normal, S0)
|
||||
end, State2, Commands),
|
||||
end, State3, Commands),
|
||||
{ok, State, [{unblock, cluster_name(State)} | Actions]};
|
||||
_ ->
|
||||
{ok, State1, Actions}
|
||||
{ok, State2, Actions}
|
||||
end;
|
||||
handle_ra_event(QName, From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) ->
|
||||
handle_delivery(QName, From, Del, State0);
|
||||
|
|
|
@ -23,6 +23,7 @@ all_tests() ->
|
|||
[
|
||||
basics,
|
||||
return,
|
||||
lost_return_is_resent_on_applied_after_leader_change,
|
||||
rabbit_fifo_returns_correlation,
|
||||
resends_lost_command,
|
||||
returns,
|
||||
|
@ -56,9 +57,11 @@ init_per_group(_, Config) ->
|
|||
PrivDir = ?config(priv_dir, Config),
|
||||
_ = application:load(ra),
|
||||
ok = application:set_env(ra, data_dir, PrivDir),
|
||||
application:ensure_all_started(logger),
|
||||
application:ensure_all_started(ra),
|
||||
application:ensure_all_started(lg),
|
||||
SysCfg = ra_system:default_config(),
|
||||
ra_env:configure_logger(logger),
|
||||
ra_system:start(SysCfg#{name => ?RA_SYSTEM}),
|
||||
Config.
|
||||
|
||||
|
@ -67,6 +70,7 @@ end_per_group(_, Config) ->
|
|||
Config.
|
||||
|
||||
init_per_testcase(TestCase, Config) ->
|
||||
ok = logger:set_primary_config(level, all),
|
||||
meck:new(rabbit_quorum_queue, [passthrough]),
|
||||
meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _, _) -> ok end),
|
||||
meck:expect(rabbit_quorum_queue, cancel_consumer_handler, fun (_, _) -> ok end),
|
||||
|
@ -162,6 +166,63 @@ return(Config) ->
|
|||
rabbit_quorum_queue:stop_server(ServerId),
|
||||
ok.
|
||||
|
||||
lost_return_is_resent_on_applied_after_leader_change(Config) ->
|
||||
%% this test handles a case where a combination of a lost/overwritten
|
||||
%% command and a leader change could result in a client never detecting
|
||||
%% a new leader and thus never resends whatever command was overwritten
|
||||
%% in the prior term. The fix is to handle leader changes when processing
|
||||
%% the {appliekd, _} ra event.
|
||||
ClusterName = ?config(cluster_name, Config),
|
||||
ServerId = ?config(node_id, Config),
|
||||
ServerId2 = ?config(node_id2, Config),
|
||||
ServerId3 = ?config(node_id3, Config),
|
||||
Members = [ServerId, ServerId2, ServerId3],
|
||||
|
||||
ok = meck:new(ra, [passthrough]),
|
||||
ok = start_cluster(ClusterName, Members),
|
||||
|
||||
{ok, _, Leader} = ra:members(ServerId),
|
||||
Followers = lists:delete(Leader, Members),
|
||||
|
||||
F00 = rabbit_fifo_client:init(Members),
|
||||
{ok, F0, []} = rabbit_fifo_client:enqueue(ClusterName, 1, msg1, F00),
|
||||
F1 = F0,
|
||||
{_, _, F2} = process_ra_events(receive_ra_events(1, 0), ClusterName, F1),
|
||||
{ok, _, {_, _, MsgId, _, _}, F3} =
|
||||
rabbit_fifo_client:dequeue(ClusterName, <<"tag">>, unsettled, F2),
|
||||
{F4, _} = rabbit_fifo_client:return(<<"tag">>, [MsgId], F3),
|
||||
RaEvt = receive
|
||||
{ra_event, Leader, {applied, _} = Evt} ->
|
||||
Evt
|
||||
after 5000 ->
|
||||
ct:fail("no ra event")
|
||||
end,
|
||||
NextLeader = hd(Followers),
|
||||
timer:sleep(100),
|
||||
ok = ra:transfer_leadership(Leader, NextLeader),
|
||||
%% get rid of leader change event
|
||||
receive
|
||||
{ra_event, _, {machine, leader_change}} ->
|
||||
ok
|
||||
after 5000 ->
|
||||
ct:fail("no machine leader_change event")
|
||||
end,
|
||||
%% client will "send" to the old leader
|
||||
meck:expect(ra, pipeline_command, fun (_, _, _, _) -> ok end),
|
||||
{ok, F5, []} = rabbit_fifo_client:enqueue(ClusterName, 2, msg2, F4),
|
||||
?assertEqual(2, rabbit_fifo_client:pending_size(F5)),
|
||||
meck:unload(ra),
|
||||
%% pass the ra event with the new leader as if the entry was applied
|
||||
%% by the new leader, not the old
|
||||
{ok, F6, _} = rabbit_fifo_client:handle_ra_event(ClusterName, NextLeader,
|
||||
RaEvt, F5),
|
||||
%% this should resend the never applied enqueue
|
||||
{_, _, F7} = process_ra_events(receive_ra_events(1, 0), ClusterName, F6),
|
||||
?assertEqual(0, rabbit_fifo_client:pending_size(F7)),
|
||||
|
||||
flush(),
|
||||
ok.
|
||||
|
||||
rabbit_fifo_returns_correlation(Config) ->
|
||||
ClusterName = ?config(cluster_name, Config),
|
||||
ServerId = ?config(node_id, Config),
|
||||
|
|
Loading…
Reference in New Issue