Stream reader: close osiris logs and sockets in terminate

Instead of injecting it into varios places inside the code.

When the osiris log is closed it will decrement the global "readers"
counter which is why it is much safer to do this in terminate.
This commit is contained in:
Karl Nilsson 2021-09-13 11:23:35 +01:00
parent 239e136480
commit 135575b3ff
1 changed files with 7 additions and 10 deletions

View File

@ -173,11 +173,15 @@
callback_mode() ->
[state_functions, state_enter].
terminate(Reason, State, StatemData) ->
terminate(Reason, State,
#statem_data{transport = Transport,
connection = #stream_connection{socket = Socket},
connection_state = ConnectionState} = StatemData) ->
close(Transport, Socket, ConnectionState),
rabbit_networking:unregister_non_amqp_connection(self()),
notify_connection_closed(StatemData),
rabbit_log:debug("~p terminating in state '~s' with reason '~p'",
[?MODULE, State, Reason]).
rabbit_log:debug("~s terminating in state '~s' with reason '~W'",
[?MODULE, State, Reason, 10]).
start_link(KeepaliveSup, Transport, Ref, Opts) ->
{ok,
@ -713,7 +717,6 @@ open(info, {OK, S, Data},
#stream_connection{connection_step = Step} = Connection1,
case Step of
closing ->
close(Transport, S, State),
stop;
close_sent ->
rabbit_log_connection:debug("Transitioned to close_sent"),
@ -808,7 +811,6 @@ open(info, heartbeat_send,
rabbit_log_connection:info("Heartbeat send error ~p, closing connection",
[Unexpected]),
_C1 = demonitor_all_streams(Connection),
close(Transport, S, State),
stop
end;
open(info, heartbeat_timeout,
@ -817,7 +819,6 @@ open(info, heartbeat_timeout,
connection_state = State}) ->
rabbit_log_connection:debug("Heartbeat timeout, closing connection"),
_C1 = demonitor_all_streams(Connection),
close(Transport, S, State),
stop;
open(info, {infos, From},
#statem_data{connection =
@ -857,7 +858,6 @@ open({call, From}, {shutdown, Explanation},
rabbit_log_connection:info("Forcing stream connection ~p closing: ~p",
[self(), Explanation]),
demonitor_all_streams(Connection),
close(Transport, S, State),
{stop_and_reply, normal, {reply, From, ok}};
open(cast,
{queue_event, _, {osiris_written, _, undefined, CorrelationList}},
@ -1060,7 +1060,6 @@ close_sent(state_timeout, close,
connection_state = State}) ->
rabbit_log_connection:warning("Closing connection because of timeout in state '~s' likely due to lack of client action.",
[?FUNCTION_NAME]),
close(Transport, Socket, State),
stop;
close_sent(info, {tcp, S, Data},
#statem_data{transport = Transport,
@ -1075,7 +1074,6 @@ close_sent(info, {tcp, S, Data},
[?FUNCTION_NAME, Step]),
case Step of
closing_done ->
close(Transport, S, State1),
stop;
_ ->
Transport:setopts(S, [{active, once}]),
@ -1091,7 +1089,6 @@ close_sent(info, {tcp_error, S, Reason},
#statem_data{transport = Transport, connection_state = State}) ->
rabbit_log_connection:error("Stream protocol connection socket error: ~p [~w] [~w]",
[Reason, S, self()]),
close(Transport, S, State),
stop;
close_sent(info, {resource_alarm, IsThereAlarm},
StatemData = #statem_data{connection = Connection}) ->