Stream coordinator bug fix

Fix issue where a deleted replica could be restarted if the leader went
down whilst the replica was still running it's start phase.
This commit is contained in:
kjnilsson 2021-03-17 13:53:38 +00:00
parent f3969f57a3
commit cbf0107605
3 changed files with 119 additions and 15 deletions

View File

@ -887,12 +887,10 @@ update_stream(#{system_time := _Ts} = _Meta,
true ->
Stream0;
false ->
Members = maps:map(
fun (_, M) ->
M#member{target = stopped}
end, Members0#{Node => #member{role = {replica, Epoch},
node = Node,
target = stopped}}),
Members1 = Members0#{Node => #member{role = {replica, Epoch},
node = Node,
target = stopped}},
Members = set_running_to_stopped(Members1),
Stream0#stream{members = Members,
nodes = lists:sort([Node | Nodes])}
end;
@ -908,8 +906,10 @@ update_stream(#{system_time := _Ts} = _Meta,
Members = maps:map(
fun (K, M) when K == Node ->
M#member{target = deleted};
(_, #member{target = running} = M) ->
M#member{target = stopped};
(_, M) ->
M#member{target = stopped}
M
end, Members0),
Stream0#stream{members = Members,
nodes = lists:delete(Node, Nodes)};
@ -1088,9 +1088,7 @@ update_stream(#{system_time := _Ts},
when Action == starting ->
%% the leader failed to start = we need a new election
%% stop all members
Members = maps:map(fun (_K, M) ->
M#member{target = stopped}
end, Members1),
Members = set_running_to_stopped(Members1),
Stream0#stream{members = Members};
_ ->
Stream0#stream{members = Members1}
@ -1104,9 +1102,11 @@ update_stream(#{system_time := _Ts},
#{DownNode := #member{role = {writer, E},
state = {running, E, Pid}} = Member} ->
Members1 = Members0#{DownNode => Member#member{state = {down, E}}},
%% leader is down, set all members to stop
Members = maps:map(fun (_, M) ->
M#member{target = stopped}
%% leader is down, set all members that should be running to stopped
Members = maps:map(fun (_, #member{target = running} = M) ->
M#member{target = stopped};
(_, M) ->
M
end, Members1),
Stream0#stream{members = Members};
#{DownNode := #member{role = {replica, _},
@ -1483,6 +1483,12 @@ find_members([Node | Nodes]) ->
find_members(Nodes)
end.
set_running_to_stopped(Members) ->
maps:map(fun (_, #member{target = running} = M) ->
M#member{target = stopped};
(_, M) ->
M
end, Members).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.

View File

@ -30,6 +30,7 @@ all_tests() ->
delete_stream,
delete_replica_leader,
delete_replica,
delete_replica_2,
leader_start_failed
].
@ -906,6 +907,102 @@ delete_replica(_) ->
{S4, []} = evaluate_stream(meta(?LINE), S4, []),
ok.
delete_replica_2(_) ->
%% replica is deleted before it has been fully started
E = 1,
StreamId = atom_to_list(?FUNCTION_NAME),
LeaderPid = fake_pid(n1),
[Replica1, Replica2] = [fake_pid(n2), fake_pid(n3)],
N1 = node(LeaderPid),
N2 = node(Replica1),
%% this is to be added
N3 = node(Replica2),
%% set replicas back to starting state
#stream{id = StreamId,
members = Members00} = S00 = started_stream(StreamId, LeaderPid,
[Replica1, Replica2]),
Members = maps:map(fun (_, #member{role = {replica, _}} = M) ->
M#member{state = {ready, 1},
current = {starting, 1}};
(_, M) ->
M
end, Members00),
S0 = S00#stream{members = Members},
From = {self(), make_ref()},
Idx1 = ?LINE,
Meta1 = (meta(Idx1))#{from => From},
%% DELETE REPLICA
S1 = update_stream(Meta1, {delete_replica, StreamId, #{node => N3}}, S0),
?assertMatch(#stream{target = running,
nodes = [N1, N2],
members = #{N1 := #member{target = stopped,
current = undefined,
state = {running, _, _}},
N2 := #member{target = stopped,
current = {starting, _},
state = {ready, _}},
N3 := #member{target = deleted,
current = {starting, _},
state = {ready, _}}
}},
S1),
Idx2 = ?LINE,
{S2, Actions1} = evaluate_stream(meta(Idx2), S1, []),
?assertMatch([
% {aux, {delete_member, StreamId, #{node := N3}, _}},
{aux, {stop, StreamId, #{node := N1, epoch := E}, _}}],
lists:sort(Actions1)),
%% LEADER DOWN
Meta3 = #{index := _Idx3} = meta(?LINE),
S3 = update_stream(Meta3, {down, LeaderPid, normal}, S2),
?assertMatch(#stream{target = running,
members = #{N1 := #member{target = stopped,
current = {stopping, _},
state = {down, _}},
N2 := #member{target = stopped,
current = {starting, _},
state = {ready, _}},
N3 := #member{target = deleted,
current = {starting, _},
state = {ready, _}}
}},
S3),
{S4, Actions4} = evaluate_stream(meta(?LINE), S3, []),
?assertMatch([], Actions4),
%% LEADER STOPPED
Idx4 = ?LINE,
S5 = update_stream(meta(Idx4),
{member_stopped, StreamId, #{node => N1,
index => Idx2,
epoch => E,
tail => {E, 100}}},
S4),
?assertMatch(#stream{members = #{N1 := #member{target = running,
current = undefined,
state = {stopped, _, _}}}},
S5),
{S6, Actions6} = evaluate_stream(meta(?LINE), S5, []),
?assertMatch([], Actions6),
%% DELETED REPLICA START FAIL
Meta7 = meta(?LINE),
S7 = update_stream(Meta7, {action_failed, StreamId,
#{action => starting,
index => 1,
node => N3,
epoch => E}}, S6),
{S8, Actions8} = evaluate_stream(Meta7, S7, []),
?assertMatch([{aux, {delete_member, _, #{node := N3}, _}}], Actions8),
%% OTHER REPLICA START FAIL
Meta9 = meta(?LINE),
S9 = update_stream(Meta9, {action_failed, StreamId,
#{action => starting,
index => 1,
node => N2,
epoch => E}}, S8),
{_S10, Actions10} = evaluate_stream(Meta9, S9, []),
?assertMatch([{aux, {stop, _, _, _}} ], Actions10),
ok.
delete_replica_leader(_) ->
%% TOOD: replica and leader needs to be tested
E = 1,

View File

@ -1733,8 +1733,9 @@ check_leader_and_replicas(Config, Name, Members) ->
lists:member({name, QNameRes}, Props)
end,
rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue,
info_all, [<<"/">>, [name, leader,
members]])),
info_all, [<<"/">>,
[name, leader,
members]])),
ct:pal("~s members ~w ~p", [?FUNCTION_NAME, Members, Info]),
lists:member(proplists:get_value(leader, Info), Members)
andalso (lists:sort(Members) == lists:sort(proplists:get_value(members, Info)))