Orderly shutdown of sessions

Make AMQP 1.0 connection shut down its sessions before sending the
close frame to the client similar to how the AMQP 0.9.1 connection
shuts down its channels before closing the connection.

This commit avoids concurrent deletion of exclusive queues by the session process
and the classic queue process.
This commit should also fix https://github.com/rabbitmq/rabbitmq-server/issues/2596
This commit is contained in:
David Ansari 2025-02-09 13:28:41 +01:00
parent 2ab890f344
commit 06ec8f0342
3 changed files with 39 additions and 3 deletions

View File

@ -3,6 +3,8 @@
-define(CLOSING_TIMEOUT, 30_000).
-define(SILENT_CLOSE_DELAY, 3_000).
-define(SHUTDOWN_SESSIONS_TIMEOUT, 10_000).
%% Allow for potentially large sets of tokens during the SASL exchange.
%% https://docs.oasis-open.org/amqp/amqp-cbs/v1.0/csd01/amqp-cbs-v1.0-csd01.html#_Toc67999915
-define(INITIAL_MAX_FRAME_SIZE, 8192).

View File

@ -220,10 +220,17 @@ terminate(_, _) ->
%%--------------------------------------------------------------------------
%% error handling / termination
close(Error, State = #v1{connection = #v1_connection{timeout = Timeout}}) ->
close(Error, State0 = #v1{connection = #v1_connection{timeout = Timeout}}) ->
%% Client properties will be emitted in the connection_closed event by rabbit_reader.
ClientProperties = i(client_properties, State),
ClientProperties = i(client_properties, State0),
put(client_properties, ClientProperties),
%% "It is illegal to send any more frames (or bytes of any other kind)
%% after sending a close frame." [2.7.9]
%% Sessions might send frames via the writer proc.
%% Therefore, let's first try to orderly shutdown our sessions.
State = shutdown_sessions(State0),
Time = case Timeout > 0 andalso
Timeout < ?CLOSING_TIMEOUT of
true -> Timeout;
@ -233,6 +240,31 @@ close(Error, State = #v1{connection = #v1_connection{timeout = Timeout}}) ->
ok = send_on_channel0(State, #'v1_0.close'{error = Error}, amqp10_framing),
State#v1{connection_state = closed}.
shutdown_sessions(#v1{tracked_channels = Channels} = State) ->
maps:foreach(fun(_ChannelNum, Pid) ->
gen_server:cast(Pid, shutdown)
end, Channels),
TimerRef = erlang:send_after(?SHUTDOWN_SESSIONS_TIMEOUT,
self(),
shutdown_sessions_timeout),
wait_for_shutdown_sessions(TimerRef, State).
wait_for_shutdown_sessions(TimerRef, #v1{tracked_channels = Channels} = State)
when map_size(Channels) =:= 0 ->
ok = erlang:cancel_timer(TimerRef, [{async, false},
{info, false}]),
State;
wait_for_shutdown_sessions(TimerRef, #v1{tracked_channels = Channels} = State0) ->
receive
{{'DOWN', ChannelNum}, _MRef, process, SessionPid, _Reason} ->
State = untrack_channel(ChannelNum, SessionPid, State0),
wait_for_shutdown_sessions(TimerRef, State);
shutdown_sessions_timeout ->
?LOG_INFO("sessions not shut down after ~b ms: ~p",
[?SHUTDOWN_SESSIONS_TIMEOUT, Channels]),
State0
end.
handle_session_exit(ChannelNum, SessionPid, Reason, State0) ->
State = untrack_channel(ChannelNum, SessionPid, State0),
S = case terminated_normally(Reason) of

View File

@ -602,7 +602,9 @@ handle_cast({reset_authz, User}, #state{cfg = Cfg} = State0) ->
noreply(State)
catch exit:#'v1_0.error'{} = Error ->
log_error_and_close_session(Error, State1)
end.
end;
handle_cast(shutdown, State) ->
{stop, normal, State}.
log_error_and_close_session(
Error, State = #state{cfg = #cfg{reader_pid = ReaderPid,