Miscellaneous minor improvements in stream SAC coordinator

This commit handles edge cases in the stream SAC coordinator to make
sure it does not crash during execution. Most of these edge cases
consist in an inconsistent state, so there are very unlikely to happen.

This commit also makes sure there is no duplicate in the consumer list
of a group. Consumers are also now identified only by their connection
PID and their subscription ID, as now the timestamp they contain in
their state does not allow a field-by-field comparison.
This commit is contained in:
Arnaud Cogoluègnes 2025-06-23 10:16:37 +02:00
parent 4bca14a4bb
commit b4f7d46842
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
3 changed files with 300 additions and 310 deletions

View File

@ -710,8 +710,7 @@ apply(#{machine_version := Vsn} = Meta,
_ ->
return(Meta, State0, stream_not_found, [])
end;
apply(#{machine_version := Vsn} = Meta,
{nodeup, Node} = Cmd,
apply(Meta, {nodeup, Node} = Cmd,
#?MODULE{monitors = Monitors0,
streams = Streams0,
single_active_consumer = Sac0} = State) ->
@ -735,14 +734,8 @@ apply(#{machine_version := Vsn} = Meta,
{Ss#{Id => S}, E}
end, {Streams0, Effects0}, Streams0),
{Sac1, Effects2} = case ?V5_OR_MORE(Vsn) of
true ->
SacMod = sac_module(Meta),
SacMod:handle_node_reconnected(Node,
Sac0, Effects1);
false ->
{Sac0, Effects1}
end,
{Sac1, Effects2} = sac_handle_node_reconnected(Meta, Node, Sac0, Effects1),
return(Meta, State#?MODULE{monitors = Monitors,
streams = Streams,
single_active_consumer = Sac1}, ok, Effects2);
@ -2444,6 +2437,17 @@ sac_handle_connection_down(SacState, Pid, Reason, Vsn) when ?V5_OR_MORE(Vsn) ->
sac_handle_connection_down(SacState, Pid, _Reason, _Vsn) ->
?SAC_V4:handle_connection_down(Pid, SacState).
sac_handle_node_reconnected(#{machine_version := Vsn} = Meta, Node,
Sac, Effects) ->
case ?V5_OR_MORE(Vsn) of
true ->
SacMod = sac_module(Meta),
SacMod:handle_node_reconnected(Node,
Sac, Effects);
false ->
{Sac, Effects}
end.
sac_make_purge_nodes(Nodes) ->
rabbit_stream_sac_coordinator:make_purge_nodes(Nodes).

View File

@ -83,6 +83,11 @@
-define(DISCONNECTED_TIMEOUT_MS, 60_000).
-define(SAC_ERRORS, [partition_index_conflict, not_found]).
-define(IS_STATE_REC(T), is_record(T, ?MODULE)).
-define(IS_GROUP_REC(T), is_record(T, group)).
-define(SAME_CSR(C1, C2),
(is_record(C1, consumer) andalso is_record(C2, consumer) andalso
C1#consumer.pid =:= C2#consumer.pid andalso
C1#consumer.subscription_id =:= C2#consumer.subscription_id)).
%% Single Active Consumer API
-spec register_consumer(binary(),
@ -132,6 +137,7 @@ activate_consumer(VH, Stream, Name) ->
stream = Stream,
consumer_name= Name}).
%% called by a stream connection to inform it is still alive
-spec connection_reconnected(connection_pid()) ->
ok | {error, sac_error() | term()}.
connection_reconnected(Pid) ->
@ -228,10 +234,10 @@ apply(#command_register_consumer{vhost = VirtualHost,
subscription_id = SubscriptionId},
#?MODULE{groups = StreamGroups0} = State) ->
case maybe_create_group(VirtualHost,
Stream,
PartitionIndex,
ConsumerName,
StreamGroups0) of
Stream,
PartitionIndex,
ConsumerName,
StreamGroups0) of
{ok, StreamGroups1} ->
do_register_consumer(VirtualHost,
Stream,
@ -256,8 +262,7 @@ apply(#command_unregister_consumer{vhost = VirtualHost,
{State0, []};
Group0 ->
{Group1, Effects} =
case lookup_consumer(ConnectionPid, SubscriptionId, Group0)
of
case lookup_consumer(ConnectionPid, SubscriptionId, Group0) of
{value, Consumer} ->
G1 = remove_from_group(Consumer, Group0),
handle_consumer_removal(
@ -274,27 +279,24 @@ apply(#command_unregister_consumer{vhost = VirtualHost,
{State0#?MODULE{groups = SGS}, Effects}
end,
{State1, ok, Effects1};
apply(#command_activate_consumer{vhost = VirtualHost,
stream = Stream,
consumer_name = ConsumerName},
apply(#command_activate_consumer{vhost = VH, stream = S, consumer_name = Name},
#?MODULE{groups = StreamGroups0} = State0) ->
{G, Eff} =
case lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0) of
case lookup_group(VH, S, Name, StreamGroups0) of
undefined ->
rabbit_log:warning("Trying to activate consumer in group ~tp, but "
"the group does not longer exist",
[{VirtualHost, Stream, ConsumerName}]),
[{VH, S, Name}]),
{undefined, []};
G0 ->
%% keep track of the former active, if any
{ActPid, ActSubId} =
case lookup_active_consumer(G0) of
{value, #consumer{pid = ActivePid,
subscription_id = ActiveSubId}} ->
{ActivePid, ActiveSubId};
_ ->
{-1, -1}
end,
ActCsr = case lookup_active_consumer(G0) of
{value, Consumer} ->
Consumer;
_ ->
undefined
end,
%% connected consumers are set to waiting status
G1 = update_connected_consumers(G0, ?CONN_WAIT),
case evaluate_active_consumer(G1) of
undefined ->
@ -302,26 +304,23 @@ apply(#command_activate_consumer{vhost = VirtualHost,
#consumer{status = {?DISCONNECTED, _}} ->
%% we keep it this way, the consumer may come back
{G1, []};
#consumer{pid = Pid, subscription_id = SubId} ->
G2 = update_consumer_state_in_group(G1, Pid,
SubId,
?CONN_ACT),
Csr ->
G2 = update_consumer_state_in_group(G1, Csr, ?CONN_ACT),
%% do we need effects or not?
Effects =
case {Pid, SubId} of
{ActPid, ActSubId} ->
%% it is the same active consumer as before
%% no need to notify it
[];
_ ->
%% new active consumer, need to notify it
[notify_consumer_effect(Pid, SubId, Stream,
ConsumerName, true)]
end,
case Csr of
Csr when ?SAME_CSR(Csr, ActCsr) ->
%% it is the same active consumer as before
%% no need to notify it
[];
_ ->
%% new active consumer, need to notify it
[notify_csr_effect(Csr, S, Name, true)]
end,
{G2, Effects}
end
end,
StreamGroups1 = update_groups(VirtualHost, Stream, ConsumerName,
StreamGroups1 = update_groups(VH, S, Name,
G, StreamGroups0),
R = case G of
undefined ->
@ -363,28 +362,30 @@ handle_group_connection_reconnected(Pid, #?MODULE{groups = Groups0} = S0,
undefined ->
{S0, Eff0};
Group ->
case has_forgotten_active(Group, Pid) of
case has_pdown_active(Group, Pid) of
true ->
%% a forgotten active is coming in the connection
%% a presumed-down active is coming back in the connection
%% we need to reconcile the group,
%% as there may have been 2 active consumers at a time
handle_forgotten_active_reconnected(Pid, S0, Eff0, K);
handle_pdown_active_reconnected(Pid, S0, Eff0, K);
false ->
do_handle_group_connection_reconnected(Pid, S0, Eff0, K)
end
end.
do_handle_group_connection_reconnected(Pid, #?MODULE{groups = Groups0} = S0,
Eff0, {VH, S, Name} = K) ->
Eff0, {VH, S, Name} = K)
when is_map_key(K, Groups0) ->
G0 = #group{consumers = Consumers0} = lookup_group(VH, S, Name, Groups0),
%% update the status of the consumers from the connection
{Consumers1, Updated} =
lists:foldr(
fun(#consumer{pid = P, status = {_, St}} = C, {L, _})
when P == Pid ->
{[csr_status(C, {?CONNECTED, St}) | L], true};
(C, {L, UpdatedFlag}) ->
{[C | L], UpdatedFlag or false}
end, {[], false}, Consumers0),
lists:foldr(
fun(#consumer{pid = P, status = {_, St}} = C, {L, _})
when P == Pid ->
{[csr_status(C, {?CONNECTED, St}) | L], true};
(C, {L, UpdatedFlag}) ->
{[C | L], UpdatedFlag or false}
end, {[], false}, Consumers0),
case Updated of
true ->
@ -394,60 +395,59 @@ do_handle_group_connection_reconnected(Pid, #?MODULE{groups = Groups0} = S0,
{S0#?MODULE{groups = Groups1}, Eff ++ Eff0};
false ->
{S0, Eff0}
end.
end;
do_handle_group_connection_reconnected(_, S0, Eff0, _) ->
{S0, Eff0}.
handle_forgotten_active_reconnected(Pid,
#?MODULE{groups = Groups0} = S0,
Eff0, {VH, S, Name}) ->
handle_pdown_active_reconnected(Pid,
#?MODULE{groups = Groups0} = S0,
Eff0, {VH, S, Name} = K)
when is_map_key(K, Groups0) ->
G0 = #group{consumers = Consumers0} = lookup_group(VH, S, Name, Groups0),
{Consumers1, Eff1} =
case has_disconnected_active(G0) of
true ->
%% disconnected active consumer in the group, no rebalancing possible
%% we update the disconnected active consumers
%% we update the presumed-down active consumers
%% and tell them to step down
lists:foldr(fun(#consumer{status = St,
pid = P,
subscription_id = SID} = C, {Cs, Eff})
lists:foldr(fun(#consumer{status = St, pid = P} = C, {Cs, Eff})
when P =:= Pid andalso St =:= ?PDOWN_ACT ->
{[csr_status(C, ?CONN_WAIT) | Cs],
[notify_consumer_effect(Pid, SID, S,
Name, false, true) | Eff]};
[notify_csr_effect(C, S,
Name, false, true) | Eff]};
(C, {Cs, Eff}) ->
{[C | Cs], Eff}
end, {[], Eff0}, Consumers0);
false ->
lists:foldr(fun(#consumer{status = St,
pid = P,
subscription_id = SID} = C, {Cs, Eff})
lists:foldr(fun(#consumer{status = St, pid = P} = C, {Cs, Eff})
when P =:= Pid andalso St =:= ?PDOWN_ACT ->
%% update forgotten active
%% update presumed-down active
%% tell it to step down
{[csr_status(C, ?CONN_WAIT) | Cs],
[notify_consumer_effect(P, SID, S,
Name, false, true) | Eff]};
[notify_csr_effect(C, S,
Name, false, true) | Eff]};
(#consumer{status = {?PDOWN, _},
pid = P} = C, {Cs, Eff})
when P =:= Pid ->
%% update forgotten
%% update presumed-down
{[csr_status(C, ?CONN_WAIT) | Cs], Eff};
(#consumer{status = ?CONN_ACT,
pid = P,
subscription_id = SID} = C, {Cs, Eff}) ->
(#consumer{status = ?CONN_ACT} = C, {Cs, Eff}) ->
%% update connected active
%% tell it to step down
{[csr_status(C, ?CONN_WAIT) | Cs],
[notify_consumer_effect(P, SID, S,
Name, false, true) | Eff]};
[notify_csr_effect(C, S,
Name, false, true) | Eff]};
(C, {Cs, Eff}) ->
{[C | Cs], Eff}
end, {[], Eff0}, Consumers0)
end,
G1 = G0#group{consumers = Consumers1},
Groups1 = update_groups(VH, S, Name, G1, Groups0),
{S0#?MODULE{groups = Groups1}, Eff1}.
{S0#?MODULE{groups = Groups1}, Eff1};
handle_pdown_active_reconnected(_, S0, Eff0, _) ->
{S0, Eff0}.
has_forgotten_active(#group{consumers = Consumers}, Pid) ->
has_pdown_active(#group{consumers = Consumers}, Pid) ->
case lists:search(fun(#consumer{status = ?PDOWN_ACT,
pid = P}) when P =:= Pid ->
true;
@ -473,24 +473,33 @@ has_consumer_with_status(#group{consumers = Consumers}, Status) ->
true
end.
maybe_rebalance_group(#group{partition_index = PI} = G0, _) when PI < -1 ->
%% should not happen
{G0, []};
maybe_rebalance_group(#group{consumers = CS} = G0, _) when length(CS) == 0 ->
{G0, []};
maybe_rebalance_group(#group{partition_index = -1, consumers = Consumers0} = G0,
{_VH, S, Name}) ->
case lookup_active_consumer(G0) of
{value, ActiveConsumer} ->
{value, ActiveCsr} ->
%% there is already an active consumer, we just re-arrange
%% the group to make sure the active consumer is the first in the array
Consumers1 = lists:filter(fun(C) ->
not same_consumer(C, ActiveConsumer)
%% remove the active consumer from the list
Consumers1 = lists:filter(fun(C) when ?SAME_CSR(C, ActiveCsr) ->
false;
(_) ->
true
end, Consumers0),
G1 = G0#group{consumers = [ActiveConsumer | Consumers1]},
%% add it back to the front
G1 = G0#group{consumers = [ActiveCsr | Consumers1]},
{G1, []};
_ ->
%% no active consumer
G1 = compute_active_consumer(G0),
case lookup_active_consumer(G1) of
{value, #consumer{pid = Pid, subscription_id = SubId}} ->
{value, Csr} ->
%% creating the side effect to notify the new active consumer
{G1, [notify_consumer_effect(Pid, SubId, S, Name, true)]};
{G1, [notify_csr_effect(Csr, S, Name, true)]};
_ ->
%% no active consumer found in the group, nothing to do
{G1, []}
@ -499,8 +508,7 @@ maybe_rebalance_group(#group{partition_index = -1, consumers = Consumers0} = G0,
maybe_rebalance_group(#group{partition_index = _, consumers = Consumers} = G,
{_VH, S, Name}) ->
case lookup_active_consumer(G) of
{value, #consumer{pid = ActPid,
subscription_id = ActSubId} = CurrentActive} ->
{value, CurrentActive} ->
case evaluate_active_consumer(G) of
undefined ->
%% no-one to select
@ -510,19 +518,12 @@ maybe_rebalance_group(#group{partition_index = _, consumers = Consumers} = G,
{G, []};
_ ->
%% there's a change, telling the active it's not longer active
{update_consumer_state_in_group(G,
ActPid,
ActSubId,
{update_consumer_state_in_group(G, CurrentActive,
{?CONNECTED, ?DEACTIVATING}),
[notify_consumer_effect(ActPid,
ActSubId,
S,
Name,
false,
true)]}
[notify_csr_effect(CurrentActive, S, Name, false, true)]}
end;
false ->
%% no active consumer in the (non-empty) group,
%% no active consumer in the group,
case lists:search(fun(#consumer{status = Status}) ->
Status =:= {?CONNECTED, ?DEACTIVATING}
end, Consumers) of
@ -532,22 +533,16 @@ maybe_rebalance_group(#group{partition_index = _, consumers = Consumers} = G,
{G, []};
_ ->
%% nothing going on in the group
%% a {disconnected, active} may have become {forgotten, active}
%% a {disconnected, active} may have become {pdown, active}
%% we must select a new active
case evaluate_active_consumer(G) of
undefined ->
%% no-one to select
{G, []};
#consumer{pid = ActPid, subscription_id = ActSubId} ->
{update_consumer_state_in_group(G,
ActPid,
ActSubId,
Csr ->
{update_consumer_state_in_group(G, Csr,
{?CONNECTED, ?ACTIVE}),
[notify_consumer_effect(ActPid,
ActSubId,
S,
Name,
true)]}
[notify_csr_effect(Csr, S, Name, true)]}
end
end
end.
@ -640,14 +635,14 @@ connectivity_label(Cnty) ->
map(),
ra_machine:effects()) ->
{state(), map(), ra_machine:effects()}.
ensure_monitors(#command_register_consumer{vhost = VirtualHost,
stream = Stream,
consumer_name = ConsumerName,
ensure_monitors(#command_register_consumer{vhost = VH,
stream = S,
consumer_name = Name,
connection_pid = Pid},
#?MODULE{pids_groups = PidsGroups0} = State0,
Monitors0,
Effects) ->
GroupId = {VirtualHost, Stream, ConsumerName},
GroupId = {VH, S, Name},
%% get the group IDs that depend on the PID
Groups0 = maps:get(Pid, PidsGroups0, #{}),
%% add the group ID
@ -656,7 +651,7 @@ ensure_monitors(#command_register_consumer{vhost = VirtualHost,
PidsGroups1 = PidsGroups0#{Pid => Groups1},
{State0#?MODULE{pids_groups = PidsGroups1}, Monitors0#{Pid => sac},
[{monitor, process, Pid}, {monitor, node, node(Pid)} | Effects]};
ensure_monitors(#command_unregister_consumer{vhost = VirtualHost,
ensure_monitors(#command_unregister_consumer{vhost = VH,
stream = Stream,
consumer_name = ConsumerName,
connection_pid = Pid},
@ -664,11 +659,11 @@ ensure_monitors(#command_unregister_consumer{vhost = VirtualHost,
pids_groups = PidsGroups0} = State0,
Monitors,
Effects)
when is_map_key(Pid, PidsGroups0) ->
GroupId = {VirtualHost, Stream, ConsumerName},
when is_map_key(Pid, PidsGroups0) ->
GroupId = {VH, Stream, ConsumerName},
#{Pid := PidGroup0} = PidsGroups0,
PidGroup1 =
case lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0) of
case lookup_group(VH, Stream, ConsumerName, StreamGroups0) of
undefined ->
%% group is gone, can be removed from the PID map
maps:remove(GroupId, PidGroup0);
@ -785,95 +780,78 @@ presume_connection_down(Pid, #?MODULE{groups = Groups} = State0) ->
{State1, Eff}.
handle_group_connection_presumed_down(Pid, #?MODULE{groups = Groups0} = S0,
Eff0, {VH, S, Name} = K) ->
case lookup_group(VH, S, Name, Groups0) of
undefined ->
{S0, Eff0};
#group{consumers = Consumers0} = G0 ->
{Consumers1, Updated} =
lists:foldr(
fun(#consumer{pid = P, status = {?DISCONNECTED, St}} = C, {L, _})
when P == Pid ->
{[csr_status(C, {?PDOWN, St}) | L], true};
(C, {L, UpdatedFlag}) ->
{[C | L], UpdatedFlag or false}
end, {[], false}, Consumers0),
Eff0, {VH, S, Name} = K)
when is_map_key(K, Groups0) ->
#group{consumers = Consumers0} = G0 = lookup_group(VH, S, Name, Groups0),
{Consumers1, Updated} =
lists:foldr(
fun(#consumer{pid = P, status = {?DISCONNECTED, St}} = C, {L, _})
when P == Pid ->
{[csr_status(C, {?PDOWN, St}) | L], true};
(C, {L, UpdatedFlag}) ->
{[C | L], UpdatedFlag or false}
end, {[], false}, Consumers0),
case Updated of
true ->
G1 = G0#group{consumers = Consumers1},
{G2, Eff} = maybe_rebalance_group(G1, K),
Groups1 = update_groups(VH, S, Name, G2, Groups0),
{S0#?MODULE{groups = Groups1}, Eff ++ Eff0};
false ->
{S0, Eff0}
end
end.
case Updated of
true ->
G1 = G0#group{consumers = Consumers1},
{G2, Eff} = maybe_rebalance_group(G1, K),
Groups1 = update_groups(VH, S, Name, G2, Groups0),
{S0#?MODULE{groups = Groups1}, Eff ++ Eff0};
false ->
{S0, Eff0}
end;
handle_group_connection_presumed_down(_, S0, Eff0, _) ->
{S0, Eff0}.
handle_group_after_connection_down(Pid,
{#?MODULE{groups = Groups0} = S0, Eff0},
{VirtualHost, Stream, ConsumerName}) ->
case lookup_group(VirtualHost,
Stream,
ConsumerName,
Groups0) of
undefined ->
{S0, Eff0};
#group{consumers = Consumers0} = G0 ->
%% remove the connection consumers from the group state
%% keep flags to know what happened
{Consumers1, ActiveRemoved, AnyRemoved} =
lists:foldl(
fun(#consumer{pid = P, status = S}, {L, ActiveFlag, _})
when P == Pid ->
{L, is_active(S) or ActiveFlag, true};
(C, {L, ActiveFlag, AnyFlag}) ->
{L ++ [C], ActiveFlag, AnyFlag}
end, {[], false, false}, Consumers0),
{VH, St, Name} = K)
when is_map_key(K, Groups0) ->
#group{consumers = Consumers0} = G0 = lookup_group(VH, St, Name, Groups0),
%% remove the connection consumers from the group state
%% keep flags to know what happened
{Consumers1, ActiveRemoved, AnyRemoved} =
lists:foldl(
fun(#consumer{pid = P, status = S}, {L, ActiveFlag, _})
when P == Pid ->
{L, is_active(S) or ActiveFlag, true};
(C, {L, ActiveFlag, AnyFlag}) ->
{L ++ [C], ActiveFlag, AnyFlag}
end, {[], false, false}, Consumers0),
case AnyRemoved of
true ->
G1 = G0#group{consumers = Consumers1},
{G2, Effects} = handle_consumer_removal(G1, Stream,
ConsumerName,
ActiveRemoved),
Groups1 = update_groups(VirtualHost,
Stream,
ConsumerName,
G2,
Groups0),
{S0#?MODULE{groups = Groups1}, Effects ++ Eff0};
false ->
{S0, Eff0}
end
end.
case AnyRemoved of
true ->
G1 = G0#group{consumers = Consumers1},
{G2, Effects} = handle_consumer_removal(G1, St,
Name,
ActiveRemoved),
Groups1 = update_groups(VH, St, Name, G2, Groups0),
{S0#?MODULE{groups = Groups1}, Effects ++ Eff0};
false ->
{S0, Eff0}
end;
handle_group_after_connection_down(_, {S0, Eff0}, _) ->
{S0, Eff0}.
handle_group_after_connection_node_disconnected(ConnPid,
#?MODULE{groups = Groups0} = S0,
{VirtualHost, Stream, ConsumerName}) ->
case lookup_group(VirtualHost,
Stream,
ConsumerName,
Groups0) of
undefined ->
S0;
#group{consumers = Cs0} = G0 ->
Cs1 = lists:foldr(fun(#consumer{status = {_, St},
pid = Pid} = C0,
Acc) when Pid =:= ConnPid ->
C1 = csr_status(C0, {?DISCONNECTED, St}),
[C1 | Acc];
(C, Acc) ->
[C | Acc]
end, [], Cs0),
G1 = G0#group{consumers = Cs1},
Groups1 = update_groups(VirtualHost,
Stream,
ConsumerName,
G1,
Groups0),
S0#?MODULE{groups = Groups1}
end.
{VH, S, Name} = K)
when is_map_key(K, Groups0) ->
#group{consumers = Cs0} = G0 = lookup_group(VH, S, Name, Groups0),
Cs1 = lists:foldr(fun(#consumer{status = {_, St},
pid = Pid} = C0,
Acc) when Pid =:= ConnPid ->
C1 = csr_status(C0, {?DISCONNECTED, St}),
[C1 | Acc];
(C, Acc) ->
[C | Acc]
end, [], Cs0),
G1 = G0#group{consumers = Cs1},
Groups1 = update_groups(VH, S, Name, G1, Groups0),
S0#?MODULE{groups = Groups1};
handle_group_after_connection_node_disconnected(_, S0, _) ->
S0.
-spec import_state(ra_machine:version(), map()) -> state().
import_state(4, #{<<"groups">> := Groups, <<"pids_groups">> := PidsGroups}) ->
@ -909,10 +887,13 @@ list_nodes(#?MODULE{groups = Groups}) ->
ra_machine:effects().
state_enter(leader, #?MODULE{groups = Groups} = State)
when ?IS_STATE_REC(State) ->
%% becoming leader, we re-issue monitors and timers for connections with
%% disconnected consumers
%% iterate over groups
{Nodes, DisConns} =
maps:fold(fun(_, #group{consumers = Cs}, Acc) ->
%% iterage over group consumers
%% iterate over group consumers
lists:foldl(fun(#consumer{pid = P,
status = {?DISCONNECTED, _},
ts = Ts},
@ -922,7 +903,7 @@ state_enter(leader, #?MODULE{groups = Groups} = State)
{Nodes#{node(P) => true},
DisConns#{P => Ts}};
(#consumer{pid = P}, {Nodes, DisConns}) ->
%% store connection node
%% store connection node only
{Nodes#{node(P) => true}, DisConns}
end, Acc, Cs)
end, {#{}, #{}}, Groups),
@ -973,7 +954,12 @@ disconnected_timeout(_) ->
map_to_groups(Groups) when is_map(Groups) ->
maps:fold(fun(K, V, Acc) ->
Acc#{K => map_to_group(V)}
case map_to_group(V) of
G when ?IS_GROUP_REC(G) ->
Acc#{K => map_to_group(V)};
_ ->
Acc
end
end, #{}, Groups);
map_to_groups(_) ->
#{}.
@ -984,15 +970,26 @@ map_to_pids_groups(_) ->
#{}.
map_to_group(#{<<"consumers">> := Consumers, <<"partition_index">> := Index}) ->
C = lists:foldl(fun(V, Acc) ->
Acc ++ [map_to_consumer(V)]
end, [], Consumers),
#group{consumers = C,
partition_index = Index}.
{C, _} =
lists:foldl(fun(V, {Cs, Dedup}) ->
case map_to_consumer(V) of
#consumer{pid = P, subscription_id = SubId} = C
when not is_map_key({P, SubId}, Dedup) ->
{[C | Cs], Dedup#{{P, SubId} => true}};
_ ->
{Cs, Dedup}
end
end, {[], #{}}, Consumers),
#group{consumers = lists:reverse(C),
partition_index = Index};
map_to_group(_) ->
undefined.
map_to_consumer(#{<<"pid">> := Pid, <<"subscription_id">> := SubId,
<<"owner">> := Owner, <<"active">> := Active}) ->
csr(Pid, SubId, Owner, active_to_status(Active)).
csr(Pid, SubId, Owner, active_to_status(Active));
map_to_consumer(_) ->
undefined.
active_to_status(true) ->
{?CONNECTED, ?ACTIVE};
@ -1008,82 +1005,69 @@ is_active({_, ?DEACTIVATING}) ->
is_active(_) ->
false.
do_register_consumer(VirtualHost,
Stream,
-1 = _PartitionIndex,
ConsumerName,
ConnectionPid,
Owner,
SubscriptionId,
#?MODULE{groups = StreamGroups0} = State) ->
Group0 = lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0),
do_register_consumer(VH, S, -1 = _PI, Name, Pid, Owner, SubId,
#?MODULE{groups = StreamGroups0} = State)
when is_map_key({VH, S, Name}, StreamGroups0) ->
Group0 = lookup_group(VH, S, Name, StreamGroups0),
Consumer =
case lookup_active_consumer(Group0) of
{value, _} ->
csr(ConnectionPid, SubscriptionId, Owner, ?CONN_WAIT);
false ->
csr(ConnectionPid, SubscriptionId, Owner, ?CONN_ACT)
end,
Consumer = case lookup_active_consumer(Group0) of
{value, _} ->
csr(Pid, SubId, Owner, ?CONN_WAIT);
false ->
csr(Pid, SubId, Owner, ?CONN_ACT)
end,
Group1 = add_to_group(Consumer, Group0),
StreamGroups1 = update_groups(VirtualHost, Stream, ConsumerName,
StreamGroups1 = update_groups(VH, S, Name,
Group1,
StreamGroups0),
#consumer{status = Status} = Consumer,
Effects =
case Status of
{_, ?ACTIVE} ->
[notify_consumer_effect(ConnectionPid, SubscriptionId,
Stream, ConsumerName, is_active(Status))];
_ ->
[]
end,
Effects = case Status of
{_, ?ACTIVE} ->
[notify_csr_effect(Consumer, S, Name, is_active(Status))];
_ ->
[]
end,
{State#?MODULE{groups = StreamGroups1}, {ok, is_active(Status)}, Effects};
do_register_consumer(VirtualHost,
Stream,
_PartitionIndex,
ConsumerName,
ConnectionPid,
Owner,
SubscriptionId,
#?MODULE{groups = StreamGroups0} = State) ->
Group0 = lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0),
do_register_consumer(VH, S, _PI, Name, Pid, Owner, SubId,
#?MODULE{groups = StreamGroups0} = State)
when is_map_key({VH, S, Name}, StreamGroups0) ->
Group0 = lookup_group(VH, S, Name, StreamGroups0),
{Group1, Effects} =
case Group0 of
#group{consumers = []} ->
%% first consumer in the group, it's the active one
Consumer0 = csr(ConnectionPid, SubscriptionId, Owner, ?CONN_ACT),
Consumer0 = csr(Pid, SubId, Owner, ?CONN_ACT),
G1 = add_to_group(Consumer0, Group0),
{G1,
[notify_consumer_effect(ConnectionPid, SubscriptionId,
Stream, ConsumerName, true)]};
[notify_csr_effect(Consumer0, S, Name, true)]};
_G ->
Consumer0 = csr(ConnectionPid, SubscriptionId, Owner, ?CONN_WAIT),
Consumer0 = csr(Pid, SubId, Owner, ?CONN_WAIT),
G1 = add_to_group(Consumer0, Group0),
maybe_rebalance_group(G1, {VirtualHost, Stream, ConsumerName})
maybe_rebalance_group(G1, {VH, S, Name})
end,
StreamGroups1 = update_groups(VirtualHost, Stream, ConsumerName,
StreamGroups1 = update_groups(VH, S, Name,
Group1,
StreamGroups0),
{value, #consumer{status = Status}} =
lookup_consumer(ConnectionPid, SubscriptionId, Group1),
{State#?MODULE{groups = StreamGroups1}, {ok, is_active(Status)}, Effects}.
{value, #consumer{status = Status}} = lookup_consumer(Pid, SubId, Group1),
{State#?MODULE{groups = StreamGroups1}, {ok, is_active(Status)}, Effects};
do_register_consumer(_, _, _, _, _, _, _, State) ->
{State, {ok, false}, []}.
handle_consumer_removal(#group{consumers = []} = G, _, _, _) ->
{G, []};
handle_consumer_removal(#group{partition_index = -1} = Group0,
Stream, ConsumerName, ActiveRemoved) ->
S, Name, ActiveRemoved) ->
case ActiveRemoved of
true ->
%% this is the active consumer we remove, computing the new one
Group1 = compute_active_consumer(Group0),
case lookup_active_consumer(Group1) of
{value, #consumer{pid = Pid, subscription_id = SubId}} ->
{value, Csr} ->
%% creating the side effect to notify the new active consumer
{Group1, [notify_consumer_effect(Pid, SubId, Stream, ConsumerName, true)]};
{Group1, [notify_csr_effect(Csr, S, Name, true)]};
_ ->
%% no active consumer found in the group, nothing to do
{Group1, []}
@ -1094,8 +1078,7 @@ handle_consumer_removal(#group{partition_index = -1} = Group0,
end;
handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) ->
case lookup_active_consumer(Group0) of
{value, #consumer{pid = ActPid,
subscription_id = ActSubId} = CurrentActive} ->
{value, CurrentActive} ->
case evaluate_active_consumer(Group0) of
undefined ->
{Group0, []};
@ -1104,12 +1087,10 @@ handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) ->
{Group0, []};
_ ->
%% there's a change, telling the active it's not longer active
{update_consumer_state_in_group(Group0,
ActPid,
ActSubId,
{update_consumer_state_in_group(Group0, CurrentActive,
{?CONNECTED, ?DEACTIVATING}),
[notify_consumer_effect(ActPid, ActSubId,
Stream, ConsumerName, false, true)]}
[notify_csr_effect(CurrentActive,
Stream, ConsumerName, false, true)]}
end;
false ->
case ActiveRemoved of
@ -1118,11 +1099,10 @@ handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) ->
case evaluate_active_consumer(Group0) of
undefined ->
{Group0, []};
#consumer{pid = P, subscription_id = SID} ->
{update_consumer_state_in_group(Group0, P, SID,
Csr ->
{update_consumer_state_in_group(Group0, Csr,
{?CONNECTED, ?ACTIVE}),
[notify_consumer_effect(P, SID,
Stream, ConsumerName, true)]}
[notify_csr_effect(Csr, Stream, ConsumerName, true)]}
end;
false ->
%% no active consumer in the (non-empty) group,
@ -1134,17 +1114,19 @@ handle_consumer_removal(Group0, Stream, ConsumerName, ActiveRemoved) ->
notify_connection_effect(Pid) ->
mod_call_effect(Pid, {sac, check_connection, #{}}).
notify_consumer_effect(Pid, SubId, Stream, Name, Active) ->
notify_consumer_effect(Pid, SubId, Stream, Name, Active, false).
notify_csr_effect(Csr, S, Name, Active) ->
notify_csr_effect(Csr, S, Name, Active, false).
notify_consumer_effect(Pid, SubId, Stream, Name, Active, false = _SteppingDown) ->
mod_call_effect(Pid,
notify_csr_effect(#consumer{pid = P, subscription_id = SubId},
Stream, Name, Active, false = _SteppingDown) ->
mod_call_effect(P,
{sac, #{subscription_id => SubId,
stream => Stream,
consumer_name => Name,
active => Active}});
notify_consumer_effect(Pid, SubId, Stream, Name, Active, true = SteppingDown) ->
mod_call_effect(Pid,
notify_csr_effect(#consumer{pid = P, subscription_id = SubId},
Stream, Name, Active, true = SteppingDown) ->
mod_call_effect(P,
{sac, #{subscription_id => SubId,
stream => Stream,
consumer_name => Name,
@ -1171,11 +1153,23 @@ lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups) ->
maps:get({VirtualHost, Stream, ConsumerName}, StreamGroups,
undefined).
add_to_group(Consumer, #group{consumers = Consumers} = Group) ->
Group#group{consumers = Consumers ++ [Consumer]}.
add_to_group(#consumer{pid = Pid, subscription_id = SubId} = Consumer,
#group{consumers = Consumers} = Group) ->
case lookup_consumer(Pid, SubId, Group) of
{value, _} ->
%% the consumer is already in the group, nothing to do
Group;
false ->
Group#group{consumers = Consumers ++ [Consumer]}
end.
remove_from_group(Consumer, #group{consumers = Consumers} = Group) ->
Group#group{consumers = lists:delete(Consumer, Consumers)}.
remove_from_group(Csr, #group{consumers = Consumers} = Group) ->
CS = lists:filter(fun(C) when ?SAME_CSR(C, Csr) ->
false;
(_) ->
true
end, Consumers),
Group#group{consumers = CS}.
has_consumers_from_pid(#group{consumers = Consumers}, Pid) ->
lists:any(fun (#consumer{pid = P}) when P == Pid ->
@ -1192,19 +1186,19 @@ compute_active_consumer(#group{partition_index = -1,
compute_active_consumer(#group{partition_index = -1,
consumers = Consumers} = G) ->
case lists:search(fun(#consumer{status = S}) ->
S =:= {?DISCONNECTED, ?ACTIVE}
S =:= ?DISCONN_ACT
end, Consumers) of
{value, _DisconnectedActive} ->
%% no rebalancing if there is a disconnected active
G;
false ->
case evaluate_active_consumer(G) of
undefined ->
G;
#consumer{pid = Pid, subscription_id = SubId} ->
AC ->
Consumers1 =
lists:foldr(
fun(#consumer{pid = P, subscription_id = SID} = C, L)
when P =:= Pid andalso SID =:= SubId ->
fun(C, L) when ?SAME_CSR(AC, C) ->
%% change status of new active
[csr_status(C, ?CONN_ACT) | L];
(#consumer{status = {?CONNECTED, _}} = C, L) ->
@ -1226,11 +1220,15 @@ evaluate_active_consumer(#group{consumers = Consumers} = G) ->
S =:= ?DISCONN_ACT
end, Consumers) of
{value, C} ->
%% no rebalancing if there is a disconnected active
C;
_ ->
do_evaluate_active_consumer(G#group{consumers = eligible(Consumers)})
end.
do_evaluate_active_consumer(#group{partition_index = PI}) when PI < -1 ->
%% should not happen
undefined;
do_evaluate_active_consumer(#group{consumers = Consumers})
when length(Consumers) == 0 ->
undefined;
@ -1264,36 +1262,25 @@ lookup_active_consumer(#group{consumers = Consumers}) ->
lists:search(fun(#consumer{status = Status}) -> is_active(Status) end,
Consumers).
update_groups(_VirtualHost,
_Stream,
_ConsumerName,
undefined,
StreamGroups) ->
StreamGroups;
update_groups(VirtualHost,
Stream,
ConsumerName,
#group{consumers = []},
StreamGroups) ->
update_groups(_VH, _S, _Name, undefined, Groups) ->
Groups;
update_groups(VH, S, Name, #group{consumers = []}, Groups)
when is_map_key({VH, S, Name}, Groups) ->
%% the group is now empty, removing the key
maps:remove({VirtualHost, Stream, ConsumerName}, StreamGroups);
update_groups(VirtualHost,
Stream,
ConsumerName,
Group,
StreamGroups) ->
StreamGroups#{{VirtualHost, Stream, ConsumerName} => Group}.
maps:remove({VH, S, Name}, Groups);
update_groups(_VH, _S, _Name, #group{consumers = []}, Groups) ->
%% the group is now empty, but not in the group map
%% just returning the map
Groups;
update_groups(VH, S, Name, G, Groups) ->
Groups#{{VH, S, Name} => G}.
update_consumer_state_in_group(#group{consumers = Consumers0} = G,
Pid,
SubId,
update_consumer_state_in_group(#group{consumers = Consumers0} = G, Csr,
NewStatus) ->
CS1 = lists:map(fun(C0) ->
case C0 of
#consumer{pid = Pid, subscription_id = SubId} ->
CS1 = lists:map(fun(C0) when ?SAME_CSR(C0, Csr) ->
csr_status(C0, NewStatus);
C -> C
end
(C) ->
C
end,
Consumers0),
G#group{consumers = CS1}.
@ -1314,12 +1301,6 @@ send_message(ConnectionPid, Msg) ->
ConnectionPid ! Msg,
ok.
same_consumer(#consumer{pid = Pid, subscription_id = SubId},
#consumer{pid = Pid, subscription_id = SubId}) ->
true;
same_consumer(_, _) ->
false.
-spec compute_pid_group_dependencies(groups()) -> pids_groups().
compute_pid_group_dependencies(Groups) ->
maps:fold(fun(K, #group{consumers = Cs}, Acc) ->

View File

@ -562,10 +562,15 @@ import_state_v4_test(_) ->
OldState5 = apply_ensure_monitors(OldMod, Cmd4, OldState4),
Cmd5 = register_consumer_command(P, 1, App1, Pid2, 2),
OldState6 = apply_ensure_monitors(OldMod, Cmd5, OldState5),
Cmd6 = activate_consumer_command(P, App1),
%% a duplicate consumer sneaks in
%% this should not happen in real life, but it tests the dedup
%% logic in the import function
Cmd6 = register_consumer_command(P, 1, App1, Pid0, 0),
OldState7 = apply_ensure_monitors(OldMod, Cmd6, OldState6),
Cmd7 = activate_consumer_command(P, App1),
OldState8 = apply_ensure_monitors(OldMod, Cmd7, OldState7),
Export = OldMod:state_to_map(OldState7),
Export = OldMod:state_to_map(OldState8),
#?STATE{groups = Groups, pids_groups = PidsGroups} = ?MOD:import_state(4, Export),
assertHasGroup({<<"/">>, S, App0},
grp(-1, [csr(Pid0, 0, active),