Refactor stream SAC coordinator

Separate logic between single SAC and SAC in a partition of a super
stream (makes code clearer), wait for former active consumer
notification to select the new active consumer (avoids a lock
of some sort on the group, so consumers can come and go).

References #3753
This commit is contained in:
Arnaud Cogoluègnes 2021-12-15 11:16:53 +01:00
parent 037af8c57f
commit 4ff2c4a03d
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
3 changed files with 276 additions and 109 deletions

View File

@ -45,7 +45,7 @@
state/0]).
%% Single Active Consumer API
-export([register_consumer/6, unregister_consumer/5]).
-export([register_consumer/6, unregister_consumer/5, activate_consumer/3]).
-rabbit_boot_step({?MODULE,
[{description, "Restart stream coordinator"},
@ -299,6 +299,11 @@ unregister_consumer(VirtualHost,
{unregister_consumer, VirtualHost, Stream, ConsumerName, ConnectionPid, SubscriptionId}}),
Res.
-spec activate_consumer(binary(), binary(), binary()) -> ok.
activate_consumer(VirtualHost, Stream, ConsumerName) ->
{ok, Res, _} = process_command({sac, {activate_consumer, VirtualHost, Stream, ConsumerName}}),
Res.
process_command(Cmd) ->
Servers = ensure_coordinator_started(),
process_command(Servers, Cmd).

View File

@ -17,12 +17,27 @@
-module(rabbit_stream_sac_coordinator).
-type vhost() :: binary().
-type partition_index() :: integer().
-type stream() :: binary().
-type consumer_name() :: binary().
-type connection_pid() :: pid().
-type subscription_id() :: byte().
-opaque command() ::
{register_consumer, vhost()} | {unregister_consumer, vhost()}.
{register_consumer,
vhost(),
stream(),
partition_index(),
consumer_name(),
connection_pid(),
subscription_id()} |
{unregister_consumer,
vhost(),
stream(),
consumer_name(),
connection_pid(),
subscription_id() |
{activate_consumer, vhost(), stream(), consumer_name()}}.
-record(consumer,
{pid :: pid(), subscription_id :: subscription_id(),
@ -74,38 +89,14 @@ apply({register_consumer,
PartitionIndex,
ConsumerName,
StreamGroups0),
Group0 =
lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups1),
rabbit_log:debug("Group: ~p", [Group0]),
FormerActive =
case lookup_active_consumer(Group0) of
{value, FA} ->
FA;
false ->
undefined
end,
Consumer0 =
#consumer{pid = ConnectionPid,
subscription_id = SubscriptionId,
active = false},
Group1 = add_to_group(Consumer0, Group0),
rabbit_log:debug("Consumer added to group: ~p", [Group1]),
Group2 = compute_active_consumer(Group1),
rabbit_log:debug("Consumers in group after active consumer computation: ~p",
[Group2]),
StreamGroups2 =
update_groups(VirtualHost,
Stream,
ConsumerName,
Group2,
StreamGroups1),
{value, Consumer1} =
lookup_consumer(ConnectionPid, SubscriptionId, Group2),
Effects = notify_consumers(FormerActive, Consumer1, Group2),
#consumer{active = Active} = Consumer1,
{State#?MODULE{groups = StreamGroups2}, {ok, Active}, Effects};
do_register_consumer(VirtualHost,
Stream,
PartitionIndex,
ConsumerName,
ConnectionPid,
SubscriptionId,
State#?MODULE{groups = StreamGroups1});
apply({unregister_consumer,
VirtualHost,
Stream,
@ -124,30 +115,10 @@ apply({unregister_consumer,
{value, Consumer} ->
rabbit_log:debug("Unregistering consumer ~p from group",
[Consumer]),
{value, ActiveInPreviousGroupInstance} =
lookup_active_consumer(Group0),
G1 = remove_from_group(Consumer, Group0),
rabbit_log:debug("Consumer removed from group: ~p",
[G1]),
G2 = compute_active_consumer(G1),
rabbit_log:debug("Consumers in group after active consumer computation: ~p",
[G2]),
NewActive =
case lookup_active_consumer(G2) of
{value, AC} ->
AC;
false ->
undefined
end,
AIPGI =
case ActiveInPreviousGroupInstance of
Consumer ->
undefined;
_ ->
ActiveInPreviousGroupInstance
end,
Effs = notify_consumers(AIPGI, NewActive, G2),
{G2, Effs};
handle_consumer_removal(G1, Consumer);
false ->
rabbit_log:debug("Could not find consumer ~p ~p in group ~p ~p ~p",
[ConnectionPid,
@ -164,7 +135,209 @@ apply({unregister_consumer,
StreamGroups0),
{State0#?MODULE{groups = SGS}, Effects}
end,
{State1, ok, Effects1}.
{State1, ok, Effects1};
apply({activate_consumer, VirtualHost, Stream, ConsumerName},
#?MODULE{groups = StreamGroups0} = State0) ->
{G, Eff} =
case lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0) of
undefined ->
rabbit_log:warning("trying to activate consumer in group ~p, but "
"the group does not longer exist",
[{VirtualHost, Stream, ConsumerName}]);
Group ->
#consumer{pid = Pid, subscription_id = SubId} =
evaluate_active_consumer(Group),
Group1 =
update_consumer_state_in_group(Group, Pid, SubId, true),
{Group1,
[mod_call_effect(Pid,
{sac,
{{subscription_id, SubId}, {active, true},
{extra, []}}})]}
end,
StreamGroups1 =
update_groups(VirtualHost, Stream, ConsumerName, G, StreamGroups0),
{State0#?MODULE{groups = StreamGroups1}, ok, Eff}.
do_register_consumer(VirtualHost,
Stream,
-1,
ConsumerName,
ConnectionPid,
SubscriptionId,
#?MODULE{groups = StreamGroups0} = State) ->
Group0 =
lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0),
rabbit_log:debug("Group: ~p", [Group0]),
Consumer =
case lookup_active_consumer(Group0) of
{value, _} ->
#consumer{pid = ConnectionPid,
subscription_id = SubscriptionId,
active = false};
false ->
#consumer{pid = ConnectionPid,
subscription_id = SubscriptionId,
active = true}
end,
Group1 = add_to_group(Consumer, Group0),
rabbit_log:debug("Consumer added to group: ~p", [Group1]),
StreamGroups1 =
update_groups(VirtualHost,
Stream,
ConsumerName,
Group1,
StreamGroups0),
#consumer{active = Active} = Consumer,
Effects =
case Consumer of
#consumer{active = true} ->
[mod_call_effect(ConnectionPid,
{sac,
{{subscription_id, SubscriptionId},
{active, Active}, {extra, []}}})];
_ ->
[]
end,
{State#?MODULE{groups = StreamGroups1}, {ok, Active}, Effects};
do_register_consumer(VirtualHost,
Stream,
_,
ConsumerName,
ConnectionPid,
SubscriptionId,
#?MODULE{groups = StreamGroups0} = State) ->
Group0 =
lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0),
rabbit_log:debug("Group: ~p", [Group0]),
{Group1, Effects} =
case Group0 of
#group{consumers = []} ->
%% first consumer in the group, it's the active one
Consumer0 =
#consumer{pid = ConnectionPid,
subscription_id = SubscriptionId,
active = true},
G1 = add_to_group(Consumer0, Group0),
{G1,
[mod_call_effect(ConnectionPid,
{sac,
{{subscription_id, SubscriptionId},
{active, true}, {extra, []}}})]};
_G ->
%% whatever the current state is, the newcomer will be passive
Consumer0 =
#consumer{pid = ConnectionPid,
subscription_id = SubscriptionId,
active = false},
G1 = add_to_group(Consumer0, Group0),
case lookup_active_consumer(G1) of
{value,
#consumer{pid = ActPid, subscription_id = ActSubId} =
CurrentActive} ->
case evaluate_active_consumer(G1) of
CurrentActive ->
%% the current active stays the same
{G1, []};
_ ->
%% there's a change, telling the active it's not longer active
{update_consumer_state_in_group(G1,
ActPid,
ActSubId,
false),
[mod_call_effect(ActPid,
{sac,
{{subscription_id, ActSubId},
{active, false},
{extra,
[{stepping_down,
true}]}}})]}
end;
undefined ->
%% no active consumer in the (non-empty) group, we are waiting for the reply of a former active
{G1, []}
end
end,
StreamGroups1 =
update_groups(VirtualHost,
Stream,
ConsumerName,
Group1,
StreamGroups0),
{value, #consumer{active = Active}} =
lookup_consumer(ConnectionPid, SubscriptionId, Group1),
{State#?MODULE{groups = StreamGroups1}, {ok, Active}, Effects}.
handle_consumer_removal(#group{consumers = []} = G, _) ->
{G, []};
handle_consumer_removal(#group{partition_index = -1} = Group0,
Consumer) ->
case Consumer of
#consumer{active = true} ->
Group1 = compute_active_consumer(Group0),
rabbit_log:debug("This is the active consumer, group after active "
"consumer calculation: ~p",
[Group1]),
case lookup_active_consumer(Group1) of
{value, #consumer{pid = Pid, subscription_id = SubId} = C} ->
rabbit_log:debug("Creating side effect to notify new active consumer ~p",
[C]),
{Group1,
[mod_call_effect(Pid,
{sac,
{{subscription_id, SubId},
{active, true}, {extra, []}}})]};
_ ->
rabbit_log:debug("No active consumer found in the group, nothing "
"to do"),
{Group1, []}
end;
#consumer{active = false} ->
rabbit_log:debug("Not the active consumer, nothing to do."),
{Group0, []}
end;
handle_consumer_removal(Group0, Consumer) ->
case lookup_active_consumer(Group0) of
{value,
#consumer{pid = ActPid, subscription_id = ActSubId} =
CurrentActive} ->
case evaluate_active_consumer(Group0) of
CurrentActive ->
%% the current active stays the same
{Group0, []};
_ ->
%% there's a change, telling the active it's not longer active
{update_consumer_state_in_group(Group0,
ActPid,
ActSubId,
false),
[mod_call_effect(ActPid,
{sac,
{{subscription_id, ActSubId},
{active, false},
{extra, [{stepping_down, true}]}}})]}
end;
false ->
case Consumer#consumer.active of
true ->
%% the active one is going away, picking a new one
#consumer{pid = P, subscription_id = SID} =
evaluate_active_consumer(Group0),
{update_consumer_state_in_group(Group0, P, SID, true),
[mod_call_effect(P,
{sac,
{{subscription_id, SID}, {active, true},
{extra, []}}})]};
false ->
%% no active consumer in the (non-empty) group, we are waiting for the reply of a former active
{Group0, []}
end
end.
maybe_create_group(VirtualHost,
Stream,
@ -181,7 +354,8 @@ maybe_create_group(VirtualHost,
end.
lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups) ->
maps:get({VirtualHost, Stream, ConsumerName}, StreamGroups).
maps:get({VirtualHost, Stream, ConsumerName}, StreamGroups,
undefined).
add_to_group(Consumer, #group{consumers = Consumers} = Group) ->
Group#group{consumers = Consumers ++ [Consumer]}.
@ -219,6 +393,11 @@ compute_active_consumer(#group{partition_index = PartitionIndex,
{length(Consumers0) - 1, []}, Consumers0),
Group#group{consumers = Consumers1}.
evaluate_active_consumer(#group{partition_index = PartitionIndex,
consumers = Consumers}) ->
ActiveConsumerIndex = PartitionIndex rem length(Consumers),
lists:nth(ActiveConsumerIndex + 1, Consumers).
lookup_consumer(ConnectionPid, SubscriptionId,
#group{consumers = Consumers}) ->
lists:search(fun(#consumer{pid = ConnPid, subscription_id = SubId}) ->
@ -232,35 +411,6 @@ lookup_active_consumer(#group{consumers = Consumers}) ->
notify_consumers(_, _, #group{consumers = []}) ->
[];
notify_consumers(_,
#consumer{pid = ConnectionPid,
subscription_id = SubscriptionId} =
NewConsumer,
#group{partition_index = -1, consumers = [NewConsumer]}) ->
[mod_call_effect(ConnectionPid,
{sac,
{{subscription_id, SubscriptionId}, {active, true},
{side_effects, []}}})];
notify_consumers(_,
#consumer{pid = ConnectionPid,
subscription_id = SubscriptionId} =
NewConsumer,
#group{partition_index = -1, consumers = [NewConsumer | _]}) ->
[mod_call_effect(ConnectionPid,
{sac,
{{subscription_id, SubscriptionId}, {active, true},
{side_effects, []}}})];
notify_consumers(_,
#consumer{pid = ConnectionPid,
subscription_id = SubscriptionId},
#group{partition_index = -1, consumers = _}) ->
%% notifying a newcomer that it's inactive
%% FIXME is consumer update always necessary for inactive newcomers?
%% can't they assume they are inactive by default?
[mod_call_effect(ConnectionPid,
{sac,
{{subscription_id, SubscriptionId}, {active, false},
{side_effects, []}}})];
notify_consumers(undefined,
#consumer{pid = ConnectionPid,
subscription_id = SubscriptionId},
@ -315,6 +465,21 @@ update_groups(VirtualHost,
StreamGroups) ->
maps:put({VirtualHost, Stream, ConsumerName}, Group, StreamGroups).
update_consumer_state_in_group(#group{consumers = Consumers0} = G,
Pid,
SubId,
NewState) ->
CS1 = lists:foldr(fun(C0, Acc) ->
case C0 of
#consumer{pid = Pid, subscription_id = SubId} ->
C1 = C0#consumer{active = NewState},
[C1 | Acc];
C -> [C | Acc]
end
end,
[], Consumers0),
G#group{consumers = CS1}.
mod_call_effect(Pid, Msg) ->
{mod_call, rabbit_stream_sac_coordinator, send_message, [Pid, Msg]}.

View File

@ -777,9 +777,7 @@ open(info, {OK, S, Data},
connection_state = State2}}
end;
open(info,
{sac,
{{subscription_id, SubId}, {active, Active},
{side_effects, Effects}}},
{sac, {{subscription_id, SubId}, {active, Active}, {extra, Extra}}},
#statem_data{transport = Transport,
connection = Connection0,
connection_state = ConnState0} =
@ -817,7 +815,7 @@ open(info,
SubId,
Active,
true,
Effects),
Extra),
{Conn1,
ConnState0#stream_connection_state{consumers =
Consumers0#{SubId =>
@ -2434,8 +2432,7 @@ handle_frame_post_auth(Transport,
ResponseOffsetSpec}}) ->
%% FIXME check response code? It's supposed to be OK all the time.
case maps:take(CorrelationId, Requests0) of
{{{subscription_id, SubscriptionId}, {side_effects, SideEffects}},
Rs} ->
{{{subscription_id, SubscriptionId}, {extra, Extra}}, Rs} ->
rabbit_log:debug("Received consumer update response for subscription ~p",
[SubscriptionId]),
Consumers1 =
@ -2513,16 +2510,28 @@ handle_frame_post_auth(Transport,
Consumers#{SubscriptionId => Consumer2};
#{SubscriptionId :=
#consumer{configuration =
#consumer_configuration{active =
false}}} ->
#consumer_configuration{active = false,
stream = Stream,
properties =
Properties}}} ->
rabbit_log:debug("Not an active consumer"),
case Extra of
[{stepping_down, true}] ->
ConsumerName = consumer_name(Properties),
rabbit_stream_coordinator:activate_consumer(VirtualHost,
Stream,
ConsumerName);
_ ->
ok
end,
Consumers;
_ ->
rabbit_log:debug("No consumer found for subscription ~p",
[SubscriptionId]),
Consumers
end,
apply_sac_side_effects(SideEffects),
{Connection#stream_connection{outstanding_requests = Rs},
State#stream_connection_state{consumers = Consumers1}};
@ -2689,7 +2698,7 @@ maybe_register_consumer(VirtualHost,
SubscriptionId),
Active.
maybe_notify_consumer(_, Connection, _, _, _, false = _Sac) ->
maybe_notify_consumer(_, Connection, _, _, false = _Sac, _) ->
Connection;
maybe_notify_consumer(Transport,
#stream_connection{socket = S,
@ -2700,7 +2709,7 @@ maybe_notify_consumer(Transport,
SubscriptionId,
Active,
true = _Sac,
SideEffects) ->
Extra) ->
rabbit_log:debug("SAC subscription ~p, active = ~p",
[SubscriptionId, Active]),
Frame =
@ -2709,8 +2718,7 @@ maybe_notify_consumer(Transport,
OutstandingRequests1 =
maps:put(CorrIdSeq,
{{subscription_id, SubscriptionId},
{side_effects, SideEffects}},
{{subscription_id, SubscriptionId}, {extra, Extra}},
OutstandingRequests0),
send(Transport, S, Frame),
Connection#stream_connection{correlation_id_sequence = CorrIdSeq + 1,
@ -2749,17 +2757,6 @@ partition_index(VirtualHost, Stream, Properties) ->
-1
end.
apply_sac_side_effects([]) ->
ok;
apply_sac_side_effects([Effect | T]) ->
apply_sac_side_effect(Effect),
apply_sac_side_effects(T).
apply_sac_side_effect({message, Pid, Msg}) ->
Pid ! Msg;
apply_sac_side_effect(Effect) ->
rabbit_log:warning("Unknown SAC side effect: ~p", [Effect]).
notify_connection_closed(#statem_data{connection =
#stream_connection{name = Name,
publishers =