rabbit_writer: Convert to a regular gen_server

[Why]
This process failed to implement properly the OTP principles. For
instance, the mainloop always kept a reference on the module because it
was not tail-recursive.

This prevents the module from being reloaded at runtime: because the
process always keep that reference on the module, it is killed by the
Code server as part of the code reloading.
This commit is contained in:
Jean-Sébastien Pédron 2023-12-11 11:56:52 +01:00
parent 41922cad4d
commit 7849f143d8
No known key found for this signature in database
GPG Key ID: 39E99761A5FD94CC
1 changed files with 64 additions and 62 deletions

View File

@ -7,6 +7,8 @@
-module(rabbit_writer).
-behavior(gen_server).
%% This module backs writer processes ("writers"). The responsibility of
%% a writer is to serialise protocol methods and write them to the socket.
%% Every writer is associated with a channel and normally it's the channel
@ -27,7 +29,12 @@
-include("rabbit.hrl").
-export([start/6, start_link/6, start/7, start_link/7, start/8, start_link/8]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-export([send_command/2, send_command/3,
send_command_sync/2, send_command_sync/3,
@ -37,9 +44,6 @@
-export([internal_send_command/4, internal_send_command/6]).
-export([msg_size/1, maybe_gc_large_msg/1, maybe_gc_large_msg/2]).
%% internal
-export([enter_mainloop/2, mainloop/2, mainloop1/2]).
-record(wstate, {
%% socket (port)
sock,
@ -97,10 +101,6 @@
rabbit_types:proc_name(), boolean(), undefined|non_neg_integer()) ->
rabbit_types:ok(pid()).
-spec system_code_change(_,_,_,_) -> {'ok',_}.
-spec system_continue(_,_,#wstate{}) -> any().
-spec system_terminate(_,_,_,_) -> no_return().
-spec send_command(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
-spec send_command
(pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) ->
@ -161,13 +161,15 @@ start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity,
ReaderWantsStats, GCThreshold) ->
State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
ReaderWantsStats, GCThreshold),
{ok, proc_lib:spawn(?MODULE, enter_mainloop, [Identity, State])}.
Options = [{hibernate_after, ?HIBERNATE_AFTER}],
gen_server:start(?MODULE, [Identity, State], Options).
start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity,
ReaderWantsStats, GCThreshold) ->
State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
ReaderWantsStats, GCThreshold),
{ok, proc_lib:spawn_link(?MODULE, enter_mainloop, [Identity, State])}.
Options = [{hibernate_after, ?HIBERNATE_AFTER}],
gen_server:start_link(?MODULE, [Identity, State], Options).
initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats, GCThreshold) ->
(case ReaderWantsStats of
@ -182,49 +184,57 @@ initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats, GC
writer_gc_threshold = GCThreshold},
#wstate.stats_timer).
system_continue(Parent, Deb, State) ->
mainloop(Deb, State#wstate{reader = Parent}).
system_terminate(Reason, _Parent, _Deb, _State) ->
exit(Reason).
system_code_change(Misc, _Module, _OldVsn, _Extra) ->
{ok, Misc}.
enter_mainloop(Identity, State) ->
init([Identity, State]) ->
?LG_PROCESS_TYPE(writer),
Deb = sys:debug_options([]),
?store_proc_name(Identity),
mainloop(Deb, State).
{ok, State}.
mainloop(Deb, State) ->
handle_call({send_command_sync, MethodRecord}, _From, State) ->
try
mainloop1(Deb, State)
State1 = internal_flush(
internal_send_command_async(MethodRecord, State)),
{reply, ok, State1, 0}
catch
exit:Error -> #wstate{reader = ReaderPid, channel = Channel} = State,
ReaderPid ! {channel_exit, Channel, Error}
end,
done.
mainloop1(Deb, State = #wstate{pending = []}) ->
receive
Message -> {Deb1, State1} = handle_message(Deb, Message, State),
?MODULE:mainloop1(Deb1, State1)
after ?HIBERNATE_AFTER ->
erlang:hibernate(?MODULE, mainloop, [Deb, State])
_Class:Reason ->
{stop, {shutdown, Reason}, State}
end;
mainloop1(Deb, State) ->
receive
Message -> {Deb1, State1} = handle_message(Deb, Message, State),
?MODULE:mainloop1(Deb1, State1)
after 0 ->
?MODULE:mainloop1(Deb, internal_flush(State))
handle_call({send_command_sync, MethodRecord, Content}, _From, State) ->
try
State1 = internal_flush(
internal_send_command_async(MethodRecord, Content, State)),
{reply, ok, State1, 0}
catch
_Class:Reason ->
{stop, {shutdown, Reason}, State}
end;
handle_call(flush, _From, State) ->
try
State1 = internal_flush(State),
{reply, ok, State1, 0}
catch
_Class:Reason ->
{stop, {shutdown, Reason}, State}
end.
handle_message(Deb, {system, From, Req}, State = #wstate{reader = Parent}) ->
sys:handle_system_msg(Req, From, Parent, ?MODULE, Deb, State);
handle_message(Deb, Message, State) ->
{Deb, handle_message(Message, State)}.
handle_cast(_Message, State) ->
{noreply, State, 0}.
handle_info(timeout, State) ->
try
State1 = internal_flush(State),
{noreply, State1}
catch
_Class:Reason ->
{stop, {shutdown, Reason}, State}
end;
handle_info(Message, State) ->
try
State1 = handle_message(Message, State),
{noreply, State1, 0}
catch
_Class:Reason ->
{stop, {shutdown, Reason}, State}
end.
handle_message({send_command, MethodRecord}, State) ->
internal_send_command_async(MethodRecord, State);
@ -236,21 +246,6 @@ handle_message({send_command_flow, MethodRecord, Sender}, State) ->
handle_message({send_command_flow, MethodRecord, Content, Sender}, State) ->
credit_flow:ack(Sender),
internal_send_command_async(MethodRecord, Content, State);
handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) ->
State1 = internal_flush(
internal_send_command_async(MethodRecord, State)),
gen_server:reply(From, ok),
State1;
handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}},
State) ->
State1 = internal_flush(
internal_send_command_async(MethodRecord, Content, State)),
gen_server:reply(From, ok),
State1;
handle_message({'$gen_call', From, flush}, State) ->
State1 = internal_flush(State),
gen_server:reply(From, ok),
State1;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord}, State) ->
State1 = internal_send_command_async(MethodRecord, State),
rabbit_amqqueue_common:notify_sent(QPid, ChPid),
@ -277,6 +272,14 @@ handle_message({ok, _Ref} = Msg, State) ->
handle_message(Message, _State) ->
exit({writer, message_not_understood, Message}).
terminate(Reason, State) ->
#wstate{reader = ReaderPid, channel = Channel} = State,
ReaderPid ! {channel_exit, Channel, Reason},
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%---------------------------------------------------------------------------
send_command(W, MethodRecord) ->
@ -316,8 +319,7 @@ flush(W) -> call(W, flush).
%%---------------------------------------------------------------------------
call(Pid, Msg) ->
{ok, Res} = gen:call(Pid, '$gen_call', Msg, infinity),
Res.
gen_server:call(Pid, Msg, infinity).
%%---------------------------------------------------------------------------