Move SAC coordinator to stream coordinator

The SAC group coordination should happen as part of raft state
machine, to make it more reliable and robust.
It would also benefit from different "services" the coordinator
and RA provide.

References #3753
This commit is contained in:
Arnaud Cogoluègnes 2021-12-09 14:58:50 +01:00
parent 70598325a9
commit 60529aae84
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
6 changed files with 87 additions and 185 deletions

12
deps/rabbit/rebar.config vendored Normal file
View File

@ -0,0 +1,12 @@
{plugins, [rebar3_format]}.
{format, [
{files, ["src/rabbit_stream_sac_coordinator.erl"]},
{formatter, default_formatter},
{options, #{
paper => 80,
ribbon => 70,
inline_attributes => {when_under, 1},
inline_items => {when_under, 4}
}}
]}.

View File

@ -44,6 +44,9 @@
-export([eval_listeners/3,
state/0]).
%% Single Active Consumer API
-export([register_consumer/6, unregister_consumer/5]).
-rabbit_boot_step({?MODULE,
[{description, "Restart stream coordinator"},
{mfa, {?MODULE, recover, []}},
@ -88,6 +91,7 @@
{member_stopped, stream_id(), args()} |
{retention_updated, stream_id(), args()} |
{mnesia_updated, stream_id(), args()} |
{sac, rabbit_stream_sac_coordinator:command()} |
ra_machine:effect().
-export_type([command/0]).
@ -273,6 +277,28 @@ register_local_member_listener(Q) when ?is_amqqueue(Q) ->
stream_id => StreamId,
type => local_member}}).
%% Single Active Consumer API
-spec register_consumer(binary(), binary(), integer(), binary(), pid(), integer()) -> {ok, boolean()}.
register_consumer(VirtualHost,
Stream,
PartitionIndex,
ConsumerName,
ConnectionPid,
SubscriptionId) ->
{ok, Res, _} = process_command({sac,
{register_consumer, VirtualHost, Stream, PartitionIndex, ConsumerName, ConnectionPid, SubscriptionId}}),
Res.
-spec unregister_consumer(binary(), binary(), binary(), pid(), integer()) -> ok.
unregister_consumer(VirtualHost,
Stream,
ConsumerName,
ConnectionPid,
SubscriptionId) ->
{ok, Res, _} = process_command({sac,
{unregister_consumer, VirtualHost, Stream, ConsumerName, ConnectionPid, SubscriptionId}}),
Res.
process_command(Cmd) ->
Servers = ensure_coordinator_started(),
process_command(Servers, Cmd).
@ -344,7 +370,7 @@ which_module(_) ->
?MODULE.
init(_Conf) ->
#?MODULE{}.
#?MODULE{single_active_consumer = rabbit_stream_sac_coordinator:init_state()}.
-spec apply(map(), command(), state()) ->
{state(), term(), ra_machine:effects()}.
@ -380,6 +406,10 @@ apply(#{index := _Idx, machine_version := MachineVersion} = Meta0,
Reply ->
return(Meta, State0, Reply, [])
end;
apply(Meta, {sac, SacCommand}, #?MODULE{single_active_consumer = SacState0} = State0) ->
{SacState1, Reply} = rabbit_stream_sac_coordinator:apply(SacCommand, SacState0),
return(Meta, State0#?MODULE{single_active_consumer = SacState1}, Reply, []);
apply(#{machine_version := MachineVersion} = Meta, {down, Pid, Reason} = Cmd,
#?MODULE{streams = Streams0,
monitors = Monitors0,

View File

@ -63,6 +63,6 @@
%% not used as of v2
listeners = #{} :: undefined | #{stream_id() =>
#{pid() := queue_ref()}},
single_active_consumer :: undefined | rabbit_stream_sac_coordinator:state(),
%% future extensibility
reserved_1,
reserved_2}).

View File

@ -16,120 +16,43 @@
-module(rabbit_stream_sac_coordinator).
-behaviour(gen_server).
%% API functions
-export([start_link/0]).
%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-export([register_consumer/6,
unregister_consumer/5]).
-type vhost() :: binary().
-type stream() :: binary().
-type consumer_name() :: binary().
-type subscription_id() :: byte().
-opaque command() ::
{register_consumer, vhost()} | {unregister_consumer, vhost()}.
-record(consumer,
{pid :: pid(), subscription_id :: subscription_id(),
active :: boolean()}).
-record(group,
{consumers :: [#consumer{}], partition_index :: integer()}).
-record(state,
-record(?MODULE,
{groups :: #{{vhost(), stream(), consumer_name()} => #group{}}}).
register_consumer(VirtualHost,
Stream,
PartitionIndex,
ConsumerName,
ConnectionPid,
SubscriptionId) ->
call({register_consumer,
VirtualHost,
Stream,
PartitionIndex,
ConsumerName,
ConnectionPid,
SubscriptionId}).
-opaque state() :: #?MODULE{}.
unregister_consumer(VirtualHost,
Stream,
ConsumerName,
ConnectionPid,
SubscriptionId) ->
call({unregister_consumer,
VirtualHost,
Stream,
ConsumerName,
ConnectionPid,
SubscriptionId}).
-export_type([state/0,
command/0]).
call(Request) ->
gen_server:call({global, ?MODULE},
Request).%%%===================================================================
%%% API functions
%%%===================================================================
-export([apply/2,
init_state/0]).
%%--------------------------------------------------------------------
%% @doc
%% Starts the server
%%
%% @spec start_link() -> {ok, Pid} | ignore | {error, Error}
%% @end
%%--------------------------------------------------------------------
start_link() ->
case gen_server:start_link({global, ?MODULE}, ?MODULE, [], []) of
{error, {already_started, _Pid}} ->
ignore;
R ->
R
end.
-spec init_state() -> state().
init_state() ->
#?MODULE{groups = #{}}.
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Initializes the server
%%
%% @spec init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% @end
%%--------------------------------------------------------------------
init([]) ->
{ok, #state{groups = #{}}}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling call messages
%%
%% @spec handle_call(Request, From, State) ->
%% {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_call({register_consumer,
VirtualHost,
Stream,
PartitionIndex,
ConsumerName,
ConnectionPid,
SubscriptionId},
_From, #state{groups = StreamGroups0} = State) ->
-spec apply(command(), state()) -> {state(), term()}.
apply({register_consumer,
VirtualHost,
Stream,
PartitionIndex,
ConsumerName,
ConnectionPid,
SubscriptionId},
#?MODULE{groups = StreamGroups0} = State) ->
%% TODO monitor connection PID to remove consumers when their connection dies
%% this could require some index to avoid crawling the whole data structure
%% this is necessary to fail over to another consumer when one dies abruptly
@ -180,14 +103,14 @@ handle_call({register_consumer,
lookup_consumer(ConnectionPid, SubscriptionId, Group2),
notify_consumers(FormerActive, Consumer1, Group2),
#consumer{active = Active} = Consumer1,
{reply, {ok, Active}, State#state{groups = StreamGroups2}};
handle_call({unregister_consumer,
VirtualHost,
Stream,
ConsumerName,
ConnectionPid,
SubscriptionId},
_From, #state{groups = StreamGroups0} = State0) ->
{State#?MODULE{groups = StreamGroups2}, {ok, Active}};
apply({unregister_consumer,
VirtualHost,
Stream,
ConsumerName,
ConnectionPid,
SubscriptionId},
#?MODULE{groups = StreamGroups0} = State0) ->
State1 =
case lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0) of
error ->
@ -237,11 +160,9 @@ handle_call({unregister_consumer,
ConsumerName,
Group1,
StreamGroups0),
State0#state{groups = SGS}
State0#?MODULE{groups = SGS}
end,
{reply, ok, State1};
handle_call(which_children, _From, State) ->
{reply, [], State}.
{State1, ok}.
maybe_create_group(VirtualHost,
Stream,
@ -384,59 +305,3 @@ update_groups(VirtualHost,
Group,
StreamGroups) ->
maps:put({VirtualHost, Stream, ConsumerName}, Group, StreamGroups).
handle_cast(_Msg, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling cast messages
%%
%% @spec handle_cast(Msg, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Handling all non call/cast messages
%%
%% @spec handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
handle_info(_Info, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
%%
%% @spec terminate(Reason, State) -> void()
%% @end
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Convert process state when code is changed
%%
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
%% @end
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================

View File

@ -2681,12 +2681,12 @@ maybe_register_consumer(VirtualHost,
true) ->
PartitionIndex = partition_index(VirtualHost, Stream, Properties),
{ok, Active} =
rabbit_stream_sac_coordinator:register_consumer(VirtualHost,
Stream,
PartitionIndex,
ConsumerName,
self(),
SubscriptionId),
rabbit_stream_coordinator:register_consumer(VirtualHost,
Stream,
PartitionIndex,
ConsumerName,
self(),
SubscriptionId),
Active.
maybe_notify_consumer(_, Connection, _, _, _, false = _Sac) ->
@ -2728,11 +2728,11 @@ maybe_unregister_consumer(VirtualHost,
SubscriptionId}},
true = _Sac) ->
ConsumerName = consumer_name(Properties),
rabbit_stream_sac_coordinator:unregister_consumer(VirtualHost,
Stream,
ConsumerName,
self(),
SubscriptionId).
rabbit_stream_coordinator:unregister_consumer(VirtualHost,
Stream,
ConsumerName,
self(),
SubscriptionId).
partition_index(VirtualHost, Stream, Properties) ->
case Properties of

View File

@ -79,14 +79,9 @@ init([]) ->
type => worker,
start => {rabbit_stream_metrics_gc, start_link, []}},
SacCoordinator =
#{id => rabbit_stream_sac_coordinator,
type => worker,
start => {rabbit_stream_sac_coordinator, start_link, []}},
{ok,
{{one_for_all, 10, 10},
[StreamManager, MetricsGc, SacCoordinator]
[StreamManager, MetricsGc]
++ listener_specs(fun tcp_listener_spec/1,
[SocketOpts, ServerConfiguration, NumTcpAcceptors],
Listeners)