Capture and log crashes on the stream coordinator machine

This commit is contained in:
dcorbacho 2021-04-21 13:00:04 +02:00
parent b5ed4e7ca1
commit eabdd8cdc1
1 changed files with 79 additions and 68 deletions

View File

@ -842,9 +842,20 @@ make_ra_conf(Node, Nodes) ->
ra_event_formatter => Formatter}.
update_stream(#{system_time := _} = Meta,
{new_stream, StreamId, #{leader_node := LeaderNode,
queue := Q}}, undefined) ->
update_stream(Meta, Cmd, Stream) ->
try
update_stream0(Meta, Cmd, Stream)
catch
_:E:Stacktrace ->
rabbit_log:warning(
"~s failed to update stream:~n~p~n~p",
[?MODULE, E, Stacktrace]),
Stream
end.
update_stream0(#{system_time := _} = Meta,
{new_stream, StreamId, #{leader_node := LeaderNode,
queue := Q}}, undefined) ->
#{nodes := Nodes} = Conf = amqqueue:get_type_state(Q),
%% this jumps straight to the state where all members
%% have been stopped and a new writer has been chosen
@ -867,10 +878,10 @@ update_stream(#{system_time := _} = Meta,
conf = Conf,
members = Members,
reply_to = maps:get(from, Meta, undefined)};
update_stream(#{system_time := _Ts} = _Meta,
{delete_stream, _StreamId, #{}},
#stream{members = Members0,
target = _} = Stream0) ->
update_stream0(#{system_time := _Ts} = _Meta,
{delete_stream, _StreamId, #{}},
#stream{members = Members0,
target = _} = Stream0) ->
Members = maps:map(
fun (_, M) ->
M#member{target = deleted}
@ -878,12 +889,12 @@ update_stream(#{system_time := _Ts} = _Meta,
Stream0#stream{members = Members,
% reply_to = maps:get(from, Meta, undefined),
target = deleted};
update_stream(#{system_time := _Ts} = _Meta,
{add_replica, _StreamId, #{node := Node}},
#stream{members = Members0,
epoch = Epoch,
nodes = Nodes,
target = _} = Stream0) ->
update_stream0(#{system_time := _Ts} = _Meta,
{add_replica, _StreamId, #{node := Node}},
#stream{members = Members0,
epoch = Epoch,
nodes = Nodes,
target = _} = Stream0) ->
case maps:is_key(Node, Members0) of
true ->
Stream0;
@ -895,12 +906,12 @@ update_stream(#{system_time := _Ts} = _Meta,
Stream0#stream{members = Members,
nodes = lists:sort([Node | Nodes])}
end;
update_stream(#{system_time := _Ts} = _Meta,
{delete_replica, _StreamId, #{node := Node}},
#stream{members = Members0,
epoch = _Epoch,
nodes = Nodes,
target = _} = Stream0) ->
update_stream0(#{system_time := _Ts} = _Meta,
{delete_replica, _StreamId, #{node := Node}},
#stream{members = Members0,
epoch = _Epoch,
nodes = Nodes,
target = _} = Stream0) ->
case maps:is_key(Node, Members0) of
true ->
%% TODO: check of duplicate
@ -917,12 +928,12 @@ update_stream(#{system_time := _Ts} = _Meta,
false ->
Stream0
end;
update_stream(#{system_time := _Ts},
{member_started, _StreamId,
#{epoch := E,
index := Idx,
pid := Pid} = Args}, #stream{epoch = E,
members = Members} = Stream0) ->
update_stream0(#{system_time := _Ts},
{member_started, _StreamId,
#{epoch := E,
index := Idx,
pid := Pid} = Args}, #stream{epoch = E,
members = Members} = Stream0) ->
Node = node(Pid),
case maps:get(Node, Members, undefined) of
#member{role = {_, E},
@ -942,10 +953,10 @@ update_stream(#{system_time := _Ts},
[?MODULE, Args, Member]),
Stream0
end;
update_stream(#{system_time := _Ts},
{member_deleted, _StreamId, #{node := Node}},
#stream{nodes = Nodes,
members = Members0} = Stream0) ->
update_stream0(#{system_time := _Ts},
{member_deleted, _StreamId, #{node := Node}},
#stream{nodes = Nodes,
members = Members0} = Stream0) ->
case maps:take(Node, Members0) of
{_, Members} when map_size(Members) == 0 ->
undefined;
@ -959,15 +970,15 @@ update_stream(#{system_time := _Ts},
%% epochs?
Stream0
end;
update_stream(#{system_time := _Ts},
{member_stopped, _StreamId,
#{node := Node,
index := Idx,
epoch := StoppedEpoch,
tail := Tail}}, #stream{epoch = Epoch,
target = Target,
nodes = Nodes,
members = Members0} = Stream0) ->
update_stream0(#{system_time := _Ts},
{member_stopped, _StreamId,
#{node := Node,
index := Idx,
epoch := StoppedEpoch,
tail := Tail}}, #stream{epoch = Epoch,
target = Target,
nodes = Nodes,
members = Members0} = Stream0) ->
IsLeaderInCurrent = case find_leader(Members0) of
{#member{role = {writer, Epoch},
target = running,
@ -1046,9 +1057,9 @@ update_stream(#{system_time := _Ts},
_Member ->
Stream0
end;
update_stream(#{system_time := _Ts},
{mnesia_updated, _StreamId, #{epoch := E}},
Stream0) ->
update_stream0(#{system_time := _Ts},
{mnesia_updated, _StreamId, #{epoch := E}},
Stream0) ->
%% reset mnesia state
case Stream0 of
undefined ->
@ -1056,25 +1067,25 @@ update_stream(#{system_time := _Ts},
_ ->
Stream0#stream{mnesia = {updated, E}}
end;
update_stream(#{system_time := _Ts},
{retention_updated, _StreamId, #{node := Node}},
#stream{members = Members0,
conf = Conf} = Stream0) ->
update_stream0(#{system_time := _Ts},
{retention_updated, _StreamId, #{node := Node}},
#stream{members = Members0,
conf = Conf} = Stream0) ->
Members = maps:update_with(Node, fun (M) ->
M#member{current = undefined,
conf = Conf}
end, Members0),
Stream0#stream{members = Members};
update_stream(#{system_time := _Ts},
{action_failed, _StreamId, #{action := updating_mnesia}},
#stream{mnesia = {_, E}} = Stream0) ->
update_stream0(#{system_time := _Ts},
{action_failed, _StreamId, #{action := updating_mnesia}},
#stream{mnesia = {_, E}} = Stream0) ->
Stream0#stream{mnesia = {updated, E}};
update_stream(#{system_time := _Ts},
{action_failed, _StreamId,
#{node := Node,
index := Idx,
action := Action,
epoch := _Epoch}}, #stream{members = Members0} = Stream0) ->
update_stream0(#{system_time := _Ts},
{action_failed, _StreamId,
#{node := Node,
index := Idx,
action := Action,
epoch := _Epoch}}, #stream{members = Members0} = Stream0) ->
Members1 = maps:update_with(Node,
fun (#member{current = {C, I}} = M)
when C == Action andalso I == Idx ->
@ -1094,10 +1105,10 @@ update_stream(#{system_time := _Ts},
_ ->
Stream0#stream{members = Members1}
end;
update_stream(#{system_time := _Ts},
{down, Pid, Reason},
#stream{epoch = E,
members = Members0} = Stream0) ->
update_stream0(#{system_time := _Ts},
{down, Pid, Reason},
#stream{epoch = E,
members = Members0} = Stream0) ->
DownNode = node(Pid),
case Members0 of
#{DownNode := #member{role = {writer, E},
@ -1128,12 +1139,12 @@ update_stream(#{system_time := _Ts},
_ ->
Stream0
end;
update_stream(#{system_time := _Ts},
{down, _Pid, _Reason}, undefined) ->
update_stream0(#{system_time := _Ts},
{down, _Pid, _Reason}, undefined) ->
undefined;
update_stream(#{system_time := _Ts} = _Meta,
{nodeup, Node},
#stream{members = Members0} = Stream0) ->
update_stream0(#{system_time := _Ts} = _Meta,
{nodeup, Node},
#stream{members = Members0} = Stream0) ->
Members = maps:map(
fun (_, #member{node = N,
current = {sleeping, nodeup}} = M)
@ -1143,13 +1154,13 @@ update_stream(#{system_time := _Ts} = _Meta,
M
end, Members0),
Stream0#stream{members = Members};
update_stream(#{system_time := _Ts},
{policy_changed, _StreamId, #{queue := Q}},
#stream{conf = Conf0,
members = _Members0} = Stream0) ->
update_stream0(#{system_time := _Ts},
{policy_changed, _StreamId, #{queue := Q}},
#stream{conf = Conf0,
members = _Members0} = Stream0) ->
Conf = rabbit_stream_queue:update_stream_conf(Q, Conf0),
Stream0#stream{conf = Conf};
update_stream(_Meta, _Cmd, undefined) ->
update_stream0(_Meta, _Cmd, undefined) ->
undefined.
eval_listeners(#stream{listeners = Listeners0,