From 6413d2d7dde2e33d4be18c8625f95bc46b2dc85e Mon Sep 17 00:00:00 2001 From: David Ansari Date: Thu, 31 Jul 2025 13:10:23 +0200 Subject: [PATCH] Fix channel reuse bug This commit fixes the following test flake that occurred in CI: ``` make -C deps/rabbit ct-amqp_dotnet t=cluster_size_1:redelivery ``` After receiving the end frame, the server session proc replies with the end frame. Usually when the test case succeeds, the server connection process receives a DOWN for the session proc and untracks its channel number such that a subsequent begin frame for the same channel number will create a new session proc in the server. In the flake however, the client receives the end, and pipelines new begin, attach, and flow frames. These frames are received in the server connection's mailbox before the monitor for the old session proc fires. That's why these new frames are sent to the old session proc causing the test case to fail. This reveals a bug in the server. This commit fixes this bug similarly as done in the AMQP 0.9.1 channel in https://github.com/rabbitmq/rabbitmq-server/blob/94b4a6aafdfac6b6cae102f50b188e5ea4a32c0e/deps/rabbit/src/rabbit_channel.erl#L1146-L1155 Channel reuse by the client is valid and actually common, e.g. if channel-max is 0. --- deps/rabbit/src/rabbit_amqp_reader.erl | 10 +++++++++- deps/rabbit/src/rabbit_amqp_session.erl | 9 ++++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqp_reader.erl b/deps/rabbit/src/rabbit_amqp_reader.erl index 996cc53310..24546b71df 100644 --- a/deps/rabbit/src/rabbit_amqp_reader.erl +++ b/deps/rabbit/src/rabbit_amqp_reader.erl @@ -17,7 +17,8 @@ -export([init/1, info/2, mainloop/2, - set_credential/2]). + set_credential/2, + notify_session_ending/3]). -export([system_continue/3, system_terminate/4, @@ -79,6 +80,11 @@ set_credential(Pid, Credential) -> Pid ! {set_credential, Credential}, ok. +-spec notify_session_ending(pid(), pid(), non_neg_integer()) -> ok. +notify_session_ending(ConnPid, SessionPid, ChannelNum) -> + ConnPid ! {session_ending, SessionPid, ChannelNum}, + ok. + %%-------------------------------------------------------------------------- recvloop(Deb, State = #v1{pending_recv = true}) -> @@ -233,6 +239,8 @@ handle_other({set_credential, Cred}, State) -> handle_other(credential_expired, State) -> Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, "credential expired", []), handle_exception(State, 0, Error); +handle_other({session_ending, SessionPid, ChannelNum}, State) -> + untrack_channel(ChannelNum, SessionPid, State); handle_other(Other, _State) -> %% internal error -> something worth dying for exit({unexpected_message, Other}). diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 32337bd93f..1bb5d6a414 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -638,10 +638,11 @@ log_error_and_close_session( Error, State = #state{cfg = #cfg{reader_pid = ReaderPid, writer_pid = WriterPid, channel_num = Ch}}) -> - End = #'v1_0.end'{error = Error}, ?LOG_WARNING("Closing session for connection ~p: ~tp", [ReaderPid, Error]), - ok = rabbit_amqp_writer:send_command_sync(WriterPid, Ch, End), + rabbit_amqp_reader:notify_session_ending(ReaderPid, self(), Ch), + ok = rabbit_amqp_writer:send_command_sync( + WriterPid, Ch, #'v1_0.end'{error = Error}), {stop, {shutdown, Error}, State}. %% Batch confirms / rejects to publishers. @@ -1178,9 +1179,11 @@ handle_frame(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)}, reply_frames(Reply, State); handle_frame(#'v1_0.end'{}, - State0 = #state{cfg = #cfg{writer_pid = WriterPid, + State0 = #state{cfg = #cfg{reader_pid = ReaderPid, + writer_pid = WriterPid, channel_num = Ch}}) -> State = send_delivery_state_changes(State0), + rabbit_amqp_reader:notify_session_ending(ReaderPid, self(), Ch), ok = try rabbit_amqp_writer:send_command_sync(WriterPid, Ch, #'v1_0.end'{}) catch exit:{Reason, {gen_server, call, _ArgList}} when Reason =:= shutdown orelse