diff --git a/deps/rabbit/rebar.config b/deps/rabbit/rebar.config new file mode 100644 index 0000000000..b0879bcbbe --- /dev/null +++ b/deps/rabbit/rebar.config @@ -0,0 +1,12 @@ +{plugins, [rebar3_format]}. + +{format, [ + {files, ["src/rabbit_stream_sac_coordinator.erl"]}, + {formatter, default_formatter}, + {options, #{ + paper => 80, + ribbon => 70, + inline_attributes => {when_under, 1}, + inline_items => {when_under, 4} + }} +]}. \ No newline at end of file diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 38c5d9687e..818e0b2868 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -44,6 +44,9 @@ -export([eval_listeners/3, state/0]). +%% Single Active Consumer API +-export([register_consumer/6, unregister_consumer/5]). + -rabbit_boot_step({?MODULE, [{description, "Restart stream coordinator"}, {mfa, {?MODULE, recover, []}}, @@ -88,6 +91,7 @@ {member_stopped, stream_id(), args()} | {retention_updated, stream_id(), args()} | {mnesia_updated, stream_id(), args()} | + {sac, rabbit_stream_sac_coordinator:command()} | ra_machine:effect(). -export_type([command/0]). @@ -273,6 +277,28 @@ register_local_member_listener(Q) when ?is_amqqueue(Q) -> stream_id => StreamId, type => local_member}}). +%% Single Active Consumer API +-spec register_consumer(binary(), binary(), integer(), binary(), pid(), integer()) -> {ok, boolean()}. +register_consumer(VirtualHost, + Stream, + PartitionIndex, + ConsumerName, + ConnectionPid, + SubscriptionId) -> + {ok, Res, _} = process_command({sac, + {register_consumer, VirtualHost, Stream, PartitionIndex, ConsumerName, ConnectionPid, SubscriptionId}}), + Res. + +-spec unregister_consumer(binary(), binary(), binary(), pid(), integer()) -> ok. +unregister_consumer(VirtualHost, + Stream, + ConsumerName, + ConnectionPid, + SubscriptionId) -> + {ok, Res, _} = process_command({sac, + {unregister_consumer, VirtualHost, Stream, ConsumerName, ConnectionPid, SubscriptionId}}), + Res. + process_command(Cmd) -> Servers = ensure_coordinator_started(), process_command(Servers, Cmd). @@ -344,7 +370,7 @@ which_module(_) -> ?MODULE. init(_Conf) -> - #?MODULE{}. + #?MODULE{single_active_consumer = rabbit_stream_sac_coordinator:init_state()}. -spec apply(map(), command(), state()) -> {state(), term(), ra_machine:effects()}. @@ -380,6 +406,10 @@ apply(#{index := _Idx, machine_version := MachineVersion} = Meta0, Reply -> return(Meta, State0, Reply, []) end; + +apply(Meta, {sac, SacCommand}, #?MODULE{single_active_consumer = SacState0} = State0) -> + {SacState1, Reply} = rabbit_stream_sac_coordinator:apply(SacCommand, SacState0), + return(Meta, State0#?MODULE{single_active_consumer = SacState1}, Reply, []); apply(#{machine_version := MachineVersion} = Meta, {down, Pid, Reason} = Cmd, #?MODULE{streams = Streams0, monitors = Monitors0, diff --git a/deps/rabbit/src/rabbit_stream_coordinator.hrl b/deps/rabbit/src/rabbit_stream_coordinator.hrl index 5d72ad9633..a586f781bf 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.hrl +++ b/deps/rabbit/src/rabbit_stream_coordinator.hrl @@ -63,6 +63,6 @@ %% not used as of v2 listeners = #{} :: undefined | #{stream_id() => #{pid() := queue_ref()}}, + single_active_consumer :: undefined | rabbit_stream_sac_coordinator:state(), %% future extensibility - reserved_1, reserved_2}). diff --git a/deps/rabbitmq_stream/src/rabbit_stream_sac_coordinator.erl b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl similarity index 67% rename from deps/rabbitmq_stream/src/rabbit_stream_sac_coordinator.erl rename to deps/rabbit/src/rabbit_stream_sac_coordinator.erl index 98ada29e25..34a395543c 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_sac_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl @@ -16,120 +16,43 @@ -module(rabbit_stream_sac_coordinator). --behaviour(gen_server). - -%% API functions --export([start_link/0]). -%% gen_server callbacks --export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3]). --export([register_consumer/6, - unregister_consumer/5]). - -type vhost() :: binary(). -type stream() :: binary(). -type consumer_name() :: binary(). -type subscription_id() :: byte(). +-opaque command() :: + {register_consumer, vhost()} | {unregister_consumer, vhost()}. + -record(consumer, {pid :: pid(), subscription_id :: subscription_id(), active :: boolean()}). -record(group, {consumers :: [#consumer{}], partition_index :: integer()}). --record(state, +-record(?MODULE, {groups :: #{{vhost(), stream(), consumer_name()} => #group{}}}). -register_consumer(VirtualHost, - Stream, - PartitionIndex, - ConsumerName, - ConnectionPid, - SubscriptionId) -> - call({register_consumer, - VirtualHost, - Stream, - PartitionIndex, - ConsumerName, - ConnectionPid, - SubscriptionId}). +-opaque state() :: #?MODULE{}. -unregister_consumer(VirtualHost, - Stream, - ConsumerName, - ConnectionPid, - SubscriptionId) -> - call({unregister_consumer, - VirtualHost, - Stream, - ConsumerName, - ConnectionPid, - SubscriptionId}). +-export_type([state/0, + command/0]). -call(Request) -> - gen_server:call({global, ?MODULE}, - Request).%%%=================================================================== - %%% API functions - %%%=================================================================== +-export([apply/2, + init_state/0]). -%%-------------------------------------------------------------------- -%% @doc -%% Starts the server -%% -%% @spec start_link() -> {ok, Pid} | ignore | {error, Error} -%% @end -%%-------------------------------------------------------------------- -start_link() -> - case gen_server:start_link({global, ?MODULE}, ?MODULE, [], []) of - {error, {already_started, _Pid}} -> - ignore; - R -> - R - end. +-spec init_state() -> state(). +init_state() -> + #?MODULE{groups = #{}}. -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Initializes the server -%% -%% @spec init(Args) -> {ok, State} | -%% {ok, State, Timeout} | -%% ignore | -%% {stop, Reason} -%% @end -%%-------------------------------------------------------------------- -init([]) -> - {ok, #state{groups = #{}}}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling call messages -%% -%% @spec handle_call(Request, From, State) -> -%% {reply, Reply, State} | -%% {reply, Reply, State, Timeout} | -%% {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, Reply, State} | -%% {stop, Reason, State} -%% @end -%%-------------------------------------------------------------------- -handle_call({register_consumer, - VirtualHost, - Stream, - PartitionIndex, - ConsumerName, - ConnectionPid, - SubscriptionId}, - _From, #state{groups = StreamGroups0} = State) -> +-spec apply(command(), state()) -> {state(), term()}. +apply({register_consumer, + VirtualHost, + Stream, + PartitionIndex, + ConsumerName, + ConnectionPid, + SubscriptionId}, + #?MODULE{groups = StreamGroups0} = State) -> %% TODO monitor connection PID to remove consumers when their connection dies %% this could require some index to avoid crawling the whole data structure %% this is necessary to fail over to another consumer when one dies abruptly @@ -180,14 +103,14 @@ handle_call({register_consumer, lookup_consumer(ConnectionPid, SubscriptionId, Group2), notify_consumers(FormerActive, Consumer1, Group2), #consumer{active = Active} = Consumer1, - {reply, {ok, Active}, State#state{groups = StreamGroups2}}; -handle_call({unregister_consumer, - VirtualHost, - Stream, - ConsumerName, - ConnectionPid, - SubscriptionId}, - _From, #state{groups = StreamGroups0} = State0) -> + {State#?MODULE{groups = StreamGroups2}, {ok, Active}}; +apply({unregister_consumer, + VirtualHost, + Stream, + ConsumerName, + ConnectionPid, + SubscriptionId}, + #?MODULE{groups = StreamGroups0} = State0) -> State1 = case lookup_group(VirtualHost, Stream, ConsumerName, StreamGroups0) of error -> @@ -237,11 +160,9 @@ handle_call({unregister_consumer, ConsumerName, Group1, StreamGroups0), - State0#state{groups = SGS} + State0#?MODULE{groups = SGS} end, - {reply, ok, State1}; -handle_call(which_children, _From, State) -> - {reply, [], State}. + {State1, ok}. maybe_create_group(VirtualHost, Stream, @@ -384,59 +305,3 @@ update_groups(VirtualHost, Group, StreamGroups) -> maps:put({VirtualHost, Stream, ConsumerName}, Group, StreamGroups). - -handle_cast(_Msg, State) -> - {noreply, State}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling cast messages -%% -%% @spec handle_cast(Msg, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% @end -%%-------------------------------------------------------------------- - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling all non call/cast messages -%% -%% @spec handle_info(Info, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% @end -%%-------------------------------------------------------------------- -handle_info(_Info, State) -> - {noreply, State}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. -%% -%% @spec terminate(Reason, State) -> void() -%% @end -%%-------------------------------------------------------------------- -terminate(_Reason, _State) -> - ok. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Convert process state when code is changed -%% -%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} -%% @end -%%-------------------------------------------------------------------- -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index f3dcba324e..45c0ec58c9 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -2681,12 +2681,12 @@ maybe_register_consumer(VirtualHost, true) -> PartitionIndex = partition_index(VirtualHost, Stream, Properties), {ok, Active} = - rabbit_stream_sac_coordinator:register_consumer(VirtualHost, - Stream, - PartitionIndex, - ConsumerName, - self(), - SubscriptionId), + rabbit_stream_coordinator:register_consumer(VirtualHost, + Stream, + PartitionIndex, + ConsumerName, + self(), + SubscriptionId), Active. maybe_notify_consumer(_, Connection, _, _, _, false = _Sac) -> @@ -2728,11 +2728,11 @@ maybe_unregister_consumer(VirtualHost, SubscriptionId}}, true = _Sac) -> ConsumerName = consumer_name(Properties), - rabbit_stream_sac_coordinator:unregister_consumer(VirtualHost, - Stream, - ConsumerName, - self(), - SubscriptionId). + rabbit_stream_coordinator:unregister_consumer(VirtualHost, + Stream, + ConsumerName, + self(), + SubscriptionId). partition_index(VirtualHost, Stream, Properties) -> case Properties of diff --git a/deps/rabbitmq_stream/src/rabbit_stream_sup.erl b/deps/rabbitmq_stream/src/rabbit_stream_sup.erl index bccc60ae29..92bec93a82 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_sup.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_sup.erl @@ -79,14 +79,9 @@ init([]) -> type => worker, start => {rabbit_stream_metrics_gc, start_link, []}}, - SacCoordinator = - #{id => rabbit_stream_sac_coordinator, - type => worker, - start => {rabbit_stream_sac_coordinator, start_link, []}}, - {ok, {{one_for_all, 10, 10}, - [StreamManager, MetricsGc, SacCoordinator] + [StreamManager, MetricsGc] ++ listener_specs(fun tcp_listener_spec/1, [SocketOpts, ServerConfiguration, NumTcpAcceptors], Listeners)