Deal with noconnection in stream SAC coordinator

This commit is contained in:
Arnaud Cogoluègnes 2025-04-14 17:17:49 +02:00
parent c28d25ac60
commit e1bdd4b851
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
5 changed files with 513 additions and 83 deletions

View File

@ -15,7 +15,7 @@
apply/3,
state_enter/2,
init_aux/1,
handle_aux/6,
handle_aux/5,
tick/2,
version/0,
which_module/1,
@ -629,11 +629,19 @@ apply(#{machine_version := MachineVersion} = Meta, {down, Pid, Reason} = Cmd,
return(Meta, State#?MODULE{streams = Streams0,
monitors = Monitors1}, ok, Effects0)
end;
{sac, Monitors1} ->
{sac, Monitors1} when MachineVersion < 5 orelse Reason =/= noconnection ->
%% A connection went down, v5+ treats noconnection differently but
%% v4- does not.
Mod = sac_module(Meta),
{SacState1, Effects} = Mod:handle_connection_down(Pid, SacState0),
return(Meta, State#?MODULE{single_active_consumer = SacState1,
monitors = Monitors1}, ok, Effects);
monitors = Monitors1}, ok, [Effects0 ++ Effects]);
{sac, Monitors1} when Reason =:= noconnection ->
%% the node of a connection got disconnected
Mod = sac_module(Meta),
{SacState1, Effects} = Mod:handle_connection_node_disconnected(Pid, SacState0),
return(Meta, State#?MODULE{single_active_consumer = SacState1,
monitors = Monitors1}, ok, [Effects0 ++ Effects]);
error ->
return(Meta, State, ok, Effects0)
end;
@ -687,9 +695,11 @@ apply(#{machine_version := MachineVersion} = Meta,
_ ->
return(Meta, State0, stream_not_found, [])
end;
apply(Meta, {nodeup, Node} = Cmd,
apply(#{machine_version := MachineVersion} = Meta,
{nodeup, Node} = Cmd,
#?MODULE{monitors = Monitors0,
streams = Streams0} = State) ->
streams = Streams0,
single_active_consumer = Sac0} = State) ->
%% reissue monitors for all disconnected members
{Effects0, Monitors} =
maps:fold(
@ -703,14 +713,23 @@ apply(Meta, {nodeup, Node} = Cmd,
{Acc, Mon}
end
end, {[], Monitors0}, Streams0),
{Streams, Effects} =
{Streams, Effects1} =
maps:fold(fun (Id, S0, {Ss, E0}) ->
S1 = update_stream(Meta, Cmd, S0),
{S, E} = evaluate_stream(Meta, S1, E0),
{Ss#{Id => S}, E}
end, {Streams0, Effects0}, Streams0),
{Sac1, Effects2} = case MachineVersion > 5 of
true ->
SacMod = sac_module(Meta),
SacMod:handle_node_reconnected(Sac0, Effects1);
false ->
{Sac0, Effects1}
end,
return(Meta, State#?MODULE{monitors = Monitors,
streams = Streams}, ok, Effects);
streams = Streams,
single_active_consumer = Sac1}, ok, Effects2);
apply(Meta, {machine_version, From, To}, State0) ->
rabbit_log:info("Stream coordinator machine version changes from ~tp to ~tp, "
++ "applying incremental upgrade.", [From, To]),
@ -721,6 +740,12 @@ apply(Meta, {machine_version, From, To}, State0) ->
{S1, Eff0 ++ Eff1}
end, {State0, []}, lists:seq(From, To - 1)),
return(Meta, State1, ok, Effects);
apply(Meta, {timeout, {sac, node_disconnected, #{connection_pid := Pid}}},
#?MODULE{single_active_consumer = SacState0} = State0) ->
Mod = sac_module(Meta),
{SacState1, Effects} = Mod:forget_connection(Pid, SacState0),
return(Meta, State0#?MODULE{single_active_consumer = SacState1}, ok,
Effects);
apply(Meta, UnkCmd, State) ->
rabbit_log:debug("~ts: unknown command ~W",
[?MODULE, UnkCmd, 10]),
@ -787,7 +812,7 @@ members() ->
end
end.
maybe_resize_coordinator_cluster() ->
maybe_resize_coordinator_cluster(MachineVersion) ->
spawn(fun() ->
RabbitIsRunning = rabbit:is_running(),
case members() of
@ -813,19 +838,38 @@ maybe_resize_coordinator_cluster() ->
case MemberNodes -- RabbitNodes of
[] ->
ok;
[Old | _] ->
[Old | _] when length(RabbitNodes) > 0 ->
%% this ought to be rather rare as the stream
%% coordinator member is now removed as part
%% of the forget_cluster_node command
rabbit_log:info("~ts: Rabbit node(s) removed from the cluster, "
"deleting: ~w", [?MODULE, Old]),
remove_member(Leader, Members, Old)
end;
_ ->
end,
maybe_handle_stale_nodes(MemberNodes, RabbitNodes,
MachineVersion);
_ ->
ok
end
end).
maybe_handle_stale_nodes(MemberNodes, ExpectedNodes,
MachineVersion) when MachineVersion > 4 ->
case MemberNodes -- ExpectedNodes of
[] ->
ok;
Stale when length(ExpectedNodes) > 0 ->
rabbit_log:debug("Stale nodes detected in stream SAC "
"coordinator: ~w. Purging state.",
[Stale]),
%% TODO SAC pipeline command to purge state from stale nodes
ok;
_ ->
ok
end;
maybe_handle_stale_nodes(_, _, _) ->
ok.
add_member(Members, Node) ->
MinMacVersion = erpc:call(Node, ?MODULE, version, []),
Conf = make_ra_conf(Node, [N || {_, N} <- Members], MinMacVersion),
@ -899,65 +943,62 @@ init_aux(_Name) ->
%% TODO ensure the dead writer is restarted as a replica at some point in time, increasing timeout?
handle_aux(leader, _, maybe_resize_coordinator_cluster,
#aux{resizer = undefined} = Aux, LogState, _) ->
Pid = maybe_resize_coordinator_cluster(),
{no_reply, Aux#aux{resizer = Pid}, LogState, [{monitor, process, aux, Pid}]};
#aux{resizer = undefined} = Aux, RaAux) ->
MachineVersion = ra_aux:effective_machine_version(RaAux),
Pid = maybe_resize_coordinator_cluster(MachineVersion),
{no_reply, Aux#aux{resizer = Pid}, RaAux, [{monitor, process, aux, Pid}]};
handle_aux(leader, _, maybe_resize_coordinator_cluster,
AuxState, LogState, _) ->
AuxState, RaAux) ->
%% Coordinator resizing is still happening, let's ignore this tick event
{no_reply, AuxState, LogState};
{no_reply, AuxState, RaAux};
handle_aux(leader, _, {down, Pid, _},
#aux{resizer = Pid} = Aux, LogState, _) ->
#aux{resizer = Pid} = Aux, RaAux) ->
%% Coordinator resizing has finished
{no_reply, Aux#aux{resizer = undefined}, LogState};
{no_reply, Aux#aux{resizer = undefined}, RaAux};
handle_aux(leader, _, {start_writer, StreamId,
#{epoch := Epoch, node := Node} = Args, Conf},
Aux, LogState, _) ->
Aux, RaAux) ->
rabbit_log:debug("~ts: running action: 'start_writer'"
" for ~ts on node ~w in epoch ~b",
[?MODULE, StreamId, Node, Epoch]),
ActionFun = phase_start_writer(StreamId, Args, Conf),
run_action(starting, StreamId, Args, ActionFun, Aux, LogState);
run_action(starting, StreamId, Args, ActionFun, Aux, RaAux);
handle_aux(leader, _, {start_replica, StreamId,
#{epoch := Epoch, node := Node} = Args, Conf},
Aux, LogState, _) ->
Aux, RaAux) ->
rabbit_log:debug("~ts: running action: 'start_replica'"
" for ~ts on node ~w in epoch ~b",
[?MODULE, StreamId, Node, Epoch]),
ActionFun = phase_start_replica(StreamId, Args, Conf),
run_action(starting, StreamId, Args, ActionFun, Aux, LogState);
run_action(starting, StreamId, Args, ActionFun, Aux, RaAux);
handle_aux(leader, _, {stop, StreamId, #{node := Node,
epoch := Epoch} = Args, Conf},
Aux, LogState, _) ->
Aux, RaAux) ->
rabbit_log:debug("~ts: running action: 'stop'"
" for ~ts on node ~w in epoch ~b",
[?MODULE, StreamId, Node, Epoch]),
ActionFun = phase_stop_member(StreamId, Args, Conf),
run_action(stopping, StreamId, Args, ActionFun, Aux, LogState);
run_action(stopping, StreamId, Args, ActionFun, Aux, RaAux);
handle_aux(leader, _, {update_mnesia, StreamId, Args, Conf},
#aux{actions = _Monitors} = Aux, LogState,
#?MODULE{streams = _Streams}) ->
#aux{actions = _Monitors} = Aux, RaAux) ->
rabbit_log:debug("~ts: running action: 'update_mnesia'"
" for ~ts", [?MODULE, StreamId]),
ActionFun = phase_update_mnesia(StreamId, Args, Conf),
run_action(updating_mnesia, StreamId, Args, ActionFun, Aux, LogState);
run_action(updating_mnesia, StreamId, Args, ActionFun, Aux, RaAux);
handle_aux(leader, _, {update_retention, StreamId, Args, _Conf},
#aux{actions = _Monitors} = Aux, LogState,
#?MODULE{streams = _Streams}) ->
#aux{actions = _Monitors} = Aux, RaAux) ->
rabbit_log:debug("~ts: running action: 'update_retention'"
" for ~ts", [?MODULE, StreamId]),
ActionFun = phase_update_retention(StreamId, Args),
run_action(update_retention, StreamId, Args, ActionFun, Aux, LogState);
run_action(update_retention, StreamId, Args, ActionFun, Aux, RaAux);
handle_aux(leader, _, {delete_member, StreamId, #{node := Node} = Args, Conf},
#aux{actions = _Monitors} = Aux, LogState,
#?MODULE{streams = _Streams}) ->
#aux{actions = _Monitors} = Aux, RaAux) ->
rabbit_log:debug("~ts: running action: 'delete_member'"
" for ~ts ~ts", [?MODULE, StreamId, Node]),
ActionFun = phase_delete_member(StreamId, Args, Conf),
run_action(delete_member, StreamId, Args, ActionFun, Aux, LogState);
run_action(delete_member, StreamId, Args, ActionFun, Aux, RaAux);
handle_aux(leader, _, fail_active_actions,
#aux{actions = Actions} = Aux, LogState,
#?MODULE{streams = Streams}) ->
#aux{actions = Actions} = Aux, RaAux) ->
%% this bit of code just creates an exclude map of currently running
%% tasks to avoid failing them, this could only really happen during
%% a leader flipflap
@ -965,14 +1006,15 @@ handle_aux(leader, _, fail_active_actions,
|| {P, {S, _, _}} <- maps_to_list(Actions),
is_process_alive(P)]),
rabbit_log:debug("~ts: failing actions: ~w", [?MODULE, Exclude]),
#?MODULE{streams = Streams} = ra_aux:machine_state(RaAux),
fail_active_actions(Streams, Exclude),
{no_reply, Aux, LogState, []};
{no_reply, Aux, RaAux, []};
handle_aux(leader, _, {down, Pid, normal},
#aux{actions = Monitors} = Aux, LogState, _) ->
#aux{actions = Monitors} = Aux, RaAux) ->
%% action process finished normally, just remove from actions map
{no_reply, Aux#aux{actions = maps:remove(Pid, Monitors)}, LogState, []};
{no_reply, Aux#aux{actions = maps:remove(Pid, Monitors)}, RaAux, []};
handle_aux(leader, _, {down, Pid, Reason},
#aux{actions = Monitors0} = Aux, LogState, _) ->
#aux{actions = Monitors0} = Aux, RaAux) ->
%% An action has failed - report back to the state machine
case maps:get(Pid, Monitors0, undefined) of
{StreamId, Action, #{node := Node, epoch := Epoch} = Args} ->
@ -983,13 +1025,13 @@ handle_aux(leader, _, {down, Pid, Reason},
Cmd = {action_failed, StreamId, Args#{action => Action}},
send_self_command(Cmd),
{no_reply, Aux#aux{actions = maps:remove(Pid, Monitors)},
LogState, []};
RaAux, []};
undefined ->
%% should this ever happen?
{no_reply, Aux, LogState, []}
{no_reply, Aux, RaAux, []}
end;
handle_aux(_, _, _, AuxState, LogState, _) ->
{no_reply, AuxState, LogState}.
handle_aux(_, _, _, AuxState, RaAux) ->
{no_reply, AuxState, RaAux}.
overview(#?MODULE{streams = Streams,
monitors = Monitors,
@ -1025,7 +1067,7 @@ stream_overview0(#stream{epoch = Epoch,
run_action(Action, StreamId, #{node := _Node,
epoch := _Epoch} = Args,
ActionFun, #aux{actions = Actions0} = Aux, Log) ->
ActionFun, #aux{actions = Actions0} = Aux, RaAux) ->
Coordinator = self(),
Pid = spawn_link(fun() ->
ActionFun(),
@ -1033,7 +1075,7 @@ run_action(Action, StreamId, #{node := _Node,
end),
Effects = [{monitor, process, aux, Pid}],
Actions = Actions0#{Pid => {StreamId, Action, Args}},
{no_reply, Aux#aux{actions = Actions}, Log, Effects}.
{no_reply, Aux#aux{actions = Actions}, RaAux, Effects}.
wrap_reply(From, Reply) ->
[{reply, From, {wrap_reply, Reply}}].

View File

@ -20,7 +20,7 @@
-opaque command() ::
#command_register_consumer{} | #command_unregister_consumer{} |
#command_activate_consumer{}.
#command_activate_consumer{} | #command_connection_reconnected{}.
-opaque state() :: #?MODULE{}.
-export_type([state/0,
@ -31,12 +31,16 @@
unregister_consumer/5,
activate_consumer/3,
consumer_groups/2,
group_consumers/4]).
group_consumers/4,
connection_reconnected/1]).
-export([apply/2,
init_state/0,
send_message/2,
ensure_monitors/4,
handle_connection_down/2,
handle_connection_node_disconnected/2,
handle_node_reconnected/2,
forget_connection/2,
consumer_groups/3,
group_consumers/5,
overview/1,
@ -48,6 +52,11 @@
-define(WAITING, waiting).
-define(DEACTIVATING, deactivating).
-define(CONNECTED, connected).
-define(DISCONNECTED, disconnected).
-define(FORGOTTTEN, forgotten).
%% Single Active Consumer API
-spec register_consumer(binary(),
binary(),
@ -121,6 +130,11 @@ activate_consumer(VirtualHost, Stream, ConsumerName) ->
=
ConsumerName}}).
-spec connection_reconnected(connection_pid()) -> ok.
connection_reconnected(Pid) ->
process_command({sac,
#command_connection_reconnected{pid = Pid}}).
process_command(Cmd) ->
case rabbit_stream_coordinator:process_command(Cmd) of
{ok, Res, _} ->
@ -262,15 +276,102 @@ apply(#command_activate_consumer{vhost = VirtualHost,
[{VirtualHost, Stream, ConsumerName}]),
{undefined, []};
Group0 ->
Group1 = update_consumers(Group0, ?WAITING),
Group1 = update_consumers(Group0, {?CONNECTED, ?WAITING}),
#consumer{pid = Pid, subscription_id = SubId} =
evaluate_active_consumer(Group1),
Group2 = update_consumer_state_in_group(Group1, Pid, SubId, ?ACTIVE),
Group2 = update_consumer_state_in_group(Group1, Pid, SubId,
{?CONNECTED, ?ACTIVE}),
{Group2, [notify_consumer_effect(Pid, SubId, Stream, ConsumerName, true)]}
end,
StreamGroups1 =
update_groups(VirtualHost, Stream, ConsumerName, G, StreamGroups0),
{State0#?MODULE{groups = StreamGroups1}, ok, Eff}.
{State0#?MODULE{groups = StreamGroups1}, ok, Eff};
apply(#command_connection_reconnected{pid = Pid},
#?MODULE{groups = Groups0} = State0) ->
{State1, Eff} =
maps:fold(fun(G, _, {St, Eff}) ->
handle_group_connection_reconnected(Pid, St, Eff, G)
end, {State0, []}, Groups0),
{State1, ok, Eff}.
handle_group_connection_reconnected(Pid, #?MODULE{groups = Groups0} = S0,
Eff0, {VH, S, Name} = K) ->
%% TODO sac: handle forgotten_active case (reconciliate state with current active)
case lookup_group(VH, S, Name, Groups0) of
undefined ->
{S0, Eff0};
#group{consumers = Consumers0} = G0 ->
{Consumers1, Updated} =
lists:foldr(
fun(#consumer{pid = P, status = {_, St}} = C, {L, _})
when P == Pid ->
{[C#consumer{status = {?CONNECTED, St}} | L], true};
(C, {L, UpdatedFlag}) ->
{[C | L], UpdatedFlag or false}
end, {[], false}, Consumers0),
case Updated of
true ->
G1 = G0#group{consumers = Consumers1},
{G2, Eff} = maybe_rebalance_group(G1, K),
Groups1 = update_groups(VH, S, Name, G2, Groups0),
{S0#?MODULE{groups = Groups1}, Eff ++ Eff0};
false ->
{S0, Eff0}
end
end.
maybe_rebalance_group(#group{partition_index = -1, consumers = Consumers0} = G0,
{_VH, S, Name}) ->
case lookup_active_consumer(G0) of
{value, ActiveConsumer} ->
%% there is already an active consumer, we just re-arrange
%% the group to make sure the active consumer is the first in the array
Consumers1 = lists:filter(fun(C) ->
not same_consumer(C, ActiveConsumer)
end, Consumers0),
G1 = G0#group{consumers = [ActiveConsumer | Consumers1]},
{G1, []};
_ ->
%% no active consumer
G1 = compute_active_consumer(G0),
case lookup_active_consumer(G1) of
{value, #consumer{pid = Pid, subscription_id = SubId}} ->
%% creating the side effect to notify the new active consumer
{G1, [notify_consumer_effect(Pid, SubId, S, Name, true)]};
_ ->
%% no active consumer found in the group, nothing to do
{G1, []}
end
end;
maybe_rebalance_group(#group{partition_index = _} = G, {_VH, S, Name}) ->
%% TODO re-use logic from do_register_consumer
case lookup_active_consumer(G) of
{value,
#consumer{pid = ActPid, subscription_id = ActSubId} = CurrentActive} ->
case evaluate_active_consumer(G) of
CurrentActive ->
%% the current active stays the same
{G, []};
_ ->
%% there's a change, telling the active it's not longer active
{update_consumer_state_in_group(G,
ActPid,
ActSubId,
{?CONNECTED, ?DEACTIVATING}),
[notify_consumer_effect(ActPid,
ActSubId,
S,
Name,
false,
true)]}
end;
false ->
%% no active consumer in the (non-empty) group,
%% we are waiting for the reply of a former active
{G, []}
end.
-spec consumer_groups(binary(), [atom()], state()) -> {ok, [term()]}.
consumer_groups(VirtualHost, InfoKeys, #?MODULE{groups = Groups}) ->
@ -349,7 +450,9 @@ group_consumers(VirtualHost,
{error, not_found}
end.
cli_consumer_status_label(?ACTIVE) ->
cli_consumer_status_label({?FORGOTTTEN, _}) ->
inactive;
cli_consumer_status_label({_, ?ACTIVE}) ->
active;
cli_consumer_status_label(_) ->
inactive.
@ -413,6 +516,21 @@ ensure_monitors(#command_unregister_consumer{vhost = VirtualHost,
maps:put(Pid, PidGroup1, PidsGroups0)},
Monitors, Effects}
end;
ensure_monitors(#command_connection_reconnected{pid = Pid},
#?MODULE{pids_groups = PidsGroups,
groups = Groups} = State,
Monitors,
Effects)
when not is_map_key(Pid, Monitors) orelse
not is_map_key(Pid, PidsGroups) ->
%% the connection PID should be monitored
%% the inconsistency can happen when a forgotten connection comes back,
%% we must re-compute the connection PID / group dependency mapping
%% and re-issue the monitor
AllPidsGroups = compute_pid_group_dependencies(Groups),
{State#?MODULE{pids_groups = AllPidsGroups},
Monitors#{Pid => sac},
[{monitor, process, Pid}, {monitor, node, node(Pid)} | Effects]};
ensure_monitors(_, #?MODULE{} = State0, Monitors, Effects) ->
{State0, Monitors, Effects}.
@ -430,9 +548,53 @@ handle_connection_down(Pid,
end, {State1, []}, Groups)
end.
-spec handle_connection_node_disconnected(connection_pid(), state()) ->
{state(), ra_machine:effects()}.
handle_connection_node_disconnected(ConnPid,
#?MODULE{pids_groups = PidsGroups0} = State0) ->
case maps:take(ConnPid, PidsGroups0) of
error ->
{State0, []};
{Groups, PidsGroups1} ->
State1 = State0#?MODULE{pids_groups = PidsGroups1},
State2 =
maps:fold(fun(G, _, Acc) ->
handle_group_after_connection_node_disconnected(
ConnPid, Acc, G)
end, State1, Groups),
%% TODO configure timeout to forget connection from disconnected node
T = 60_000,
{State2, [{timer, {sac, node_disconnected,
#{connection_pid => ConnPid}}, T}]}
end.
-spec handle_node_reconnected(state(), ra_machine:effects()) ->
{state(), ra_machine:effects()}.
handle_node_reconnected(#?MODULE{pids_groups = PidsGroups0,
groups = Groups0} = State0,
Effects0) ->
AllPidsGroups = compute_pid_group_dependencies(Groups0),
NotMonitored = maps:keys(AllPidsGroups) -- maps:keys(PidsGroups0),
Effects1 =
lists:foldr(fun(P, Acc) ->
[notify_connection_effect(P),
{monitor, process, P},
{monitor, node, node(P)} | Acc]
end, Effects0, NotMonitored),
{State0#?MODULE{pids_groups = AllPidsGroups}, Effects1}.
-spec forget_connection(connection_pid(), state()) ->
{state(), ra_machine:effects()}.
forget_connection(_ConnPid, _State0) ->
%% TODO SAC forget connection
%% mark connection consumers as forgotten
%% re-evaluate SAC for affected groups
ok.
handle_group_after_connection_down(Pid,
{#?MODULE{groups = Groups0} = S0, Eff0},
{VirtualHost, Stream, ConsumerName}) ->
{#?MODULE{groups = Groups0} = S0, Eff0},
{VirtualHost, Stream, ConsumerName}) ->
case lookup_group(VirtualHost,
Stream,
ConsumerName,
@ -454,7 +616,9 @@ handle_group_after_connection_down(Pid,
case AnyRemoved of
true ->
G1 = G0#group{consumers = Consumers1},
{G2, Effects} = handle_consumer_removal(G1, Stream, ConsumerName, ActiveRemoved),
{G2, Effects} = handle_consumer_removal(G1, Stream,
ConsumerName,
ActiveRemoved),
Groups1 = update_groups(VirtualHost,
Stream,
ConsumerName,
@ -466,6 +630,33 @@ handle_group_after_connection_down(Pid,
end
end.
handle_group_after_connection_node_disconnected(ConnPid,
#?MODULE{groups = Groups0} = S0,
{VirtualHost, Stream, ConsumerName}) ->
case lookup_group(VirtualHost,
Stream,
ConsumerName,
Groups0) of
undefined ->
S0;
#group{consumers = Cs0} = G0 ->
Cs1 = lists:foldr(fun(#consumer{status = {_, St},
pid = Pid} = C0,
Acc) when Pid =:= ConnPid ->
C1 = C0#consumer{status = {?DISCONNECTED, St}},
[C1 | Acc];
(C, Acc) ->
[C | Acc]
end, [], Cs0),
G1 = G0#group{consumers = Cs1},
Groups1 = update_groups(VirtualHost,
Stream,
ConsumerName,
G1,
Groups0),
S0#?MODULE{groups = Groups1}
end.
-spec import_state(ra_machine:version(), map()) -> state().
import_state(4, #{<<"groups">> := Groups, <<"pids_groups">> := PidsGroups}) ->
#?MODULE{groups = map_to_groups(Groups),
@ -498,14 +689,18 @@ map_to_consumer(#{<<"pid">> := Pid, <<"subscription_id">> := SubId,
status = active_to_status(Active)}.
active_to_status(true) ->
?ACTIVE;
{?CONNECTED, ?ACTIVE};
active_to_status(false) ->
?WAITING.
{?CONNECTED, ?WAITING}.
is_active(waiting) ->
is_active({?FORGOTTTEN, _}) ->
false;
is_active({_, ?ACTIVE}) ->
true;
is_active({_, ?DEACTIVATING}) ->
true;
is_active(_) ->
true.
false.
do_register_consumer(VirtualHost,
Stream,
@ -524,12 +719,12 @@ do_register_consumer(VirtualHost,
#consumer{pid = ConnectionPid,
owner = Owner,
subscription_id = SubscriptionId,
status = ?WAITING};
status = {?CONNECTED, ?WAITING}};
false ->
#consumer{pid = ConnectionPid,
subscription_id = SubscriptionId,
owner = Owner,
status = ?ACTIVE}
status = {?CONNECTED, ?ACTIVE}}
end,
Group1 = add_to_group(Consumer, Group0),
StreamGroups1 =
@ -542,7 +737,7 @@ do_register_consumer(VirtualHost,
#consumer{status = Status} = Consumer,
Effects =
case Status of
?ACTIVE ->
{_, ?ACTIVE} ->
[notify_consumer_effect(ConnectionPid, SubscriptionId,
Stream, ConsumerName, is_active(Status))];
_ ->
@ -569,7 +764,7 @@ do_register_consumer(VirtualHost,
#consumer{pid = ConnectionPid,
owner = Owner,
subscription_id = SubscriptionId,
status = ?ACTIVE},
status = {?CONNECTED, ?ACTIVE}},
G1 = add_to_group(Consumer0, Group0),
{G1,
[notify_consumer_effect(ConnectionPid, SubscriptionId,
@ -580,7 +775,7 @@ do_register_consumer(VirtualHost,
#consumer{pid = ConnectionPid,
owner = Owner,
subscription_id = SubscriptionId,
status = ?WAITING},
status = {?CONNECTED, ?WAITING}},
G1 = add_to_group(Consumer0, Group0),
case lookup_active_consumer(G1) of
@ -596,7 +791,7 @@ do_register_consumer(VirtualHost,
{update_consumer_state_in_group(G1,
ActPid,
ActSubId,
?DEACTIVATING),
{?CONNECTED, ?DEACTIVATING}),
[notify_consumer_effect(ActPid,
ActSubId,
Stream,
@ -654,7 +849,7 @@ handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) ->
{update_consumer_state_in_group(Group0,
ActPid,
ActSubId,
?DEACTIVATING),
{?CONNECTED, ?DEACTIVATING}),
[notify_consumer_effect(ActPid, ActSubId,
Stream, ConsumerName, false, true)]}
end;
@ -664,7 +859,8 @@ handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) ->
%% the active one is going away, picking a new one
#consumer{pid = P, subscription_id = SID} =
evaluate_active_consumer(Group0),
{update_consumer_state_in_group(Group0, P, SID, ?ACTIVE),
{update_consumer_state_in_group(Group0, P, SID,
{?CONNECTED, ?ACTIVE}),
[notify_consumer_effect(P, SID,
Stream, ConsumerName, true)]};
false ->
@ -674,6 +870,9 @@ handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) ->
end
end.
notify_connection_effect(Pid) ->
mod_call_effect(Pid, {sac, check_connection, #{}}).
notify_consumer_effect(Pid, SubId, Stream, Name, Active) ->
notify_consumer_effect(Pid, SubId, Stream, Name, Active, false).
@ -727,25 +926,26 @@ has_consumers_from_pid(#group{consumers = Consumers}, Pid) ->
Consumers).
compute_active_consumer(#group{consumers = Crs,
partition_index = -1} =
Group)
partition_index = -1} = Group)
when length(Crs) == 0 ->
Group;
compute_active_consumer(#group{partition_index = -1,
consumers = [Consumer0]} =
Group0) ->
Consumer1 = Consumer0#consumer{status = ?ACTIVE},
consumers = [Consumer0]} = Group0) ->
%% TODO activate only if CONNECTED
Consumer1 = Consumer0#consumer{status = {?CONNECTED, ?ACTIVE}},
Group0#group{consumers = [Consumer1]};
compute_active_consumer(#group{partition_index = -1,
consumers = [Consumer0 | T]} =
Group0) ->
Consumer1 = Consumer0#consumer{status = ?ACTIVE},
Consumers = lists:map(fun(C) -> C#consumer{status = ?WAITING} end, T),
%% TODO activate only if CONNECTED
Consumer1 = Consumer0#consumer{status = {?CONNECTED, ?ACTIVE}},
Consumers = lists:map(fun(C) -> C#consumer{status = {?CONNECTED, ?WAITING}} end, T),
Group0#group{consumers = [Consumer1] ++ Consumers}.
evaluate_active_consumer(#group{partition_index = PartitionIndex,
consumers = Consumers})
when PartitionIndex >= 0 ->
%% TODO activate only if CONNECTED
ActiveConsumerIndex = PartitionIndex rem length(Consumers),
lists:nth(ActiveConsumerIndex + 1, Consumers).
@ -807,3 +1007,19 @@ mod_call_effect(Pid, Msg) ->
send_message(ConnectionPid, Msg) ->
ConnectionPid ! Msg,
ok.
same_consumer(#consumer{pid = Pid, subscription_id = SubId},
#consumer{pid = Pid, subscription_id = SubId}) ->
true;
same_consumer(_, _) ->
false.
-spec compute_pid_group_dependencies(groups()) -> pids_groups().
compute_pid_group_dependencies(Groups) ->
maps:fold(fun(K, #group{consumers = Cs}, Acc) ->
lists:foldl(fun(#consumer{pid = Pid}, AccIn) ->
PG0 = maps:get(Pid, AccIn, #{}),
PG1 = PG0#{K => true},
AccIn#{Pid => PG1}
end, Acc, Cs)
end, #{}, Groups).

View File

@ -23,22 +23,27 @@
-type group_id() :: {vhost(), stream(), consumer_name()}.
-type owner() :: binary().
-type consumer_status() :: active | waiting | deactivating.
-type consumer_connectivity() :: connected | disconnected | forgotten.
-record(consumer,
{pid :: pid(),
subscription_id :: subscription_id(),
owner :: owner(), %% just a label
status :: consumer_status()}).
status :: {consumer_connectivity(), consumer_status()}}).
-record(group,
{consumers :: [#consumer{}], partition_index :: integer()}).
-record(rabbit_stream_sac_coordinator,
{groups :: #{group_id() => #group{}},
pids_groups ::
#{connection_pid() =>
#{group_id() => true}}, %% inner map acts as a set
{groups :: groups(),
pids_groups :: pids_groups(),
%% future extensibility
reserved_1,
reserved_2}).
-type group() :: #group{}.
-type groups() :: #{group_id() => group()}.
%% inner map acts as a set
-type pids_groups() :: #{connection_pid() => #{group_id() => true}}.
%% commands
-record(command_register_consumer,
{vhost :: vhost(),
@ -57,3 +62,5 @@
-record(command_activate_consumer,
{vhost :: vhost(), stream :: stream(),
consumer_name :: consumer_name()}).
-record(command_connection_reconnected,
{pid :: connection_pid()}).

View File

@ -506,9 +506,9 @@ handle_connection_down_consumers_from_dead_connection_should_be_filtered_out_tes
consumer(Pid1, 1, active),
consumer(Pid2, 2, waiting)]),
State0 = state(#{GroupId => Group},
#{Pid0 => maps:from_list([{GroupId, true}]),
Pid1 => maps:from_list([{GroupId, true}]),
Pid2 => maps:from_list([{GroupId, true}])}),
#{Pid0 => maps:from_list([{GroupId, true}]),
Pid1 => maps:from_list([{GroupId, true}]),
Pid2 => maps:from_list([{GroupId, true}])}),
{#?STATE{pids_groups = PidsGroups1, groups = Groups1} = State1,
Effects1} =
@ -587,6 +587,163 @@ import_state_v4_test(_) ->
ok.
handle_connection_node_disconnected_test(_) ->
Stream = <<"stream">>,
ConsumerName = <<"app">>,
GroupId = {<<"/">>, Stream, ConsumerName},
Pid0 = self(),
Pid1 = spawn(fun() -> ok end),
Pid2 = spawn(fun() -> ok end),
Group = cgroup(1, [consumer(Pid0, 0, waiting),
consumer(Pid1, 1, active),
consumer(Pid2, 2, waiting)]),
State0 = state(#{GroupId => Group},
#{Pid0 => #{GroupId => true},
Pid1 => #{GroupId => true},
Pid2 => #{GroupId => true}}),
{#?STATE{pids_groups = PidsGroups1, groups = Groups1} = _State1,
[Effect1]} =
?MOD:handle_connection_node_disconnected(Pid1, State0),
assertSize(2, PidsGroups1),
assertSize(1, maps:get(Pid0, PidsGroups1)),
assertSize(1, maps:get(Pid2, PidsGroups1)),
?assertEqual({timer, {sac, node_disconnected, #{connection_pid => Pid1}},
60_000},
Effect1),
assertHasGroup(GroupId,
cgroup(1, [consumer(Pid0, 0, {connected, waiting}),
consumer(Pid1, 1, {disconnected, active}),
consumer(Pid2, 2, {connected, waiting})]),
Groups1),
ok.
handle_node_reconnected_test(_) ->
Pid0 = spawn(fun() -> ok end),
Pid1 = spawn(fun() -> ok end),
Pid2 = spawn(fun() -> ok end),
CName = <<"app">>,
S0 = <<"s0">>,
GId0 = {<<"/">>, S0, CName},
Group0 = cgroup(0, [consumer(Pid0, 0, {connected, active}),
consumer(Pid1, 1, {disconnected, waiting}),
consumer(Pid2, 2, {connected, waiting})]),
S1 = <<"s1">>,
GId1 = {<<"/">>, S1, CName},
Group1 = cgroup(1, [consumer(Pid0, 0, {connected, waiting}),
consumer(Pid1, 1, {disconnected, active}),
consumer(Pid2, 2, {connected, waiting})]),
S2 = <<"s2">>,
GId2 = {<<"/">>, S2, CName},
Group2 = cgroup(1, [consumer(Pid0, 0, {connected, waiting}),
consumer(Pid1, 1, {disconnected, waiting}),
consumer(Pid2, 2, {connected, active})]),
Groups0 = #{GId0 => Group0,
GId1 => Group1,
GId2 => Group2},
State0 = state(Groups0,
#{Pid0 => #{GId0 => true, GId1 => true, GId2 => true},
Pid2 => #{GId0 => true, GId1 => true, GId2 => true}}),
{#?STATE{pids_groups = PidsGroups1, groups = Groups1} = _State1,
Effects1} =
?MOD:handle_node_reconnected(State0, []),
?assertEqual(Groups0, Groups1),
?assertEqual(#{Pid0 => #{GId0 => true, GId1 => true, GId2 => true},
Pid1 => #{GId0 => true, GId1 => true, GId2 => true},
Pid2 => #{GId0 => true, GId1 => true, GId2 => true}},
PidsGroups1),
?assertEqual([{mod_call,rabbit_stream_sac_coordinator,send_message,
[Pid1,{sac,check_connection,#{}}]},
{monitor, process, Pid1},
{monitor, node, node(Pid1)}],
Effects1),
ok.
connection_reconnected_simple_disconnected_becomes_connected_test(_) ->
Pid0 = spawn(fun() -> ok end),
Pid1 = spawn(fun() -> ok end),
Pid2 = spawn(fun() -> ok end),
GId = group_id(),
Group = cgroup([consumer(Pid0, 0, {disconnected, active}),
consumer(Pid1, 1, {connected, waiting}),
consumer(Pid2, 2, {connected, waiting})]),
Groups0 = #{GId => Group},
State0 = state(Groups0, #{Pid0 => #{GId => true}}),
Cmd = connection_reconnection_command(Pid0),
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
assertHasGroup(GId, cgroup([consumer(Pid0, 0, {connected, active}),
consumer(Pid1, 1, {connected, waiting}),
consumer(Pid2, 2, {connected, waiting})]),
Groups1),
assertEmpty(Eff),
ok.
connection_reconnected_simple_active_should_be_first_test(_) ->
Pid0 = spawn(fun() -> ok end),
Pid1 = spawn(fun() -> ok end),
Pid2 = spawn(fun() -> ok end),
GId = group_id(),
%% disconnected for a while, got first in consumer array
%% because consumers arrived and left
Group = cgroup([consumer(Pid0, 0, {disconnected, waiting}),
consumer(Pid1, 1, {connected, active}),
consumer(Pid2, 2, {connected, waiting})]),
Groups0 = #{GId => Group},
State0 = state(Groups0, #{Pid0 => #{GId => true}}),
Cmd = connection_reconnection_command(Pid0),
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
assertHasGroup(GId, cgroup([consumer(Pid1, 1, {connected, active}),
consumer(Pid0, 0, {connected, waiting}),
consumer(Pid2, 2, {connected, waiting})]),
Groups1),
assertEmpty(Eff),
ok.
connection_reconnected_super_disconnected_becomes_connected_test(_) ->
Pid0 = spawn(fun() -> ok end),
Pid1 = spawn(fun() -> ok end),
Pid2 = spawn(fun() -> ok end),
GId = group_id(),
Group = cgroup(1, [consumer(Pid0, 0, {disconnected, waiting}),
consumer(Pid1, 1, {connected, waiting}),
consumer(Pid2, 2, {connected, active})]),
Groups0 = #{GId => Group},
State0 = state(Groups0, #{Pid0 => #{GId => true}}),
Cmd = connection_reconnection_command(Pid0),
{#?STATE{groups = Groups1}, ok, Eff} = ?MOD:apply(Cmd, State0),
assertHasGroup(GId, cgroup(1, [consumer(Pid0, 0, {connected, waiting}),
consumer(Pid1, 1, {connected, waiting}),
consumer(Pid2, 2, {connected, deactivating})]),
Groups1),
assertSendMessageSteppingDownEffect(Pid2, 2, stream(), name(), Eff),
ok.
group_id() ->
{<<"/">>, stream(), name()}.
stream() ->
<<"sO">>.
name() ->
<<"app">>.
apply_ensure_monitors(Mod, Cmd, State0) ->
{State1, _, _} = Mod:apply(Cmd, State0),
{State2, _, _} = Mod:ensure_monitors(Cmd, State1, #{}, []),
@ -606,11 +763,13 @@ assertHasGroup(GroupId, Group, Groups) ->
G = maps:get(GroupId, Groups),
?assertEqual(Group, G).
consumer(Pid, SubId, Status) ->
consumer(Pid, SubId, {Connectivity, Status}) ->
#consumer{pid = Pid,
subscription_id = SubId,
owner = <<"owning connection label">>,
status = Status}.
status = {Connectivity, Status}};
consumer(Pid, SubId, Status) ->
consumer(Pid, SubId, {connected, Status}).
cgroup(Consumers) ->
cgroup(-1, Consumers).
@ -655,6 +814,9 @@ activate_consumer_command(Stream, ConsumerName) ->
stream = Stream,
consumer_name = ConsumerName}.
connection_reconnection_command(Pid) ->
#command_connection_reconnected{pid = Pid}.
assertSendMessageEffect(Pid, SubId, Stream, ConsumerName, Active, [Effect]) ->
?assertEqual({mod_call,
rabbit_stream_sac_coordinator,

View File

@ -720,6 +720,9 @@ open(info, {OK, S, Data},
StatemData#statem_data{connection = Connection1,
connection_state = State2}}
end;
open(info, {sac, check_connection, _}, State) ->
rabbit_stream_sac_coordinator:connection_reconnected(self()),
{keep_state, State};
open(info,
{sac, #{subscription_id := SubId,
active := Active} = Msg},