Fix channel reuse bug

This commit fixes the following test flake that occurred in CI:
```
make -C deps/rabbit ct-amqp_dotnet t=cluster_size_1:redelivery
```

After receiving the end frame, the server session proc replies with the end frame.

Usually when the test case succeeds, the server connection process receives
a DOWN for the session proc and untracks its channel number such that a
subsequent begin frame for the same channel number will create a new session
proc in the server.

In the flake however, the client receives the end, and pipelines new begin,
attach, and flow frames. These frames are received in the server connection's
mailbox before the monitor for the old session proc fires. That's why these
new frames are sent to the old session proc causing the test case to
fail.

This reveals a bug in the server.
This commit fixes this bug similarly as done in the AMQP 0.9.1 channel in
94b4a6aafd/deps/rabbit/src/rabbit_channel.erl (L1146-L1155)

Channel reuse by the client is valid and actually common, e.g. if channel-max
is 0.
This commit is contained in:
David Ansari 2025-07-31 13:10:23 +02:00
parent 94b4a6aafd
commit 6413d2d7dd
2 changed files with 15 additions and 4 deletions

View File

@ -17,7 +17,8 @@
-export([init/1,
info/2,
mainloop/2,
set_credential/2]).
set_credential/2,
notify_session_ending/3]).
-export([system_continue/3,
system_terminate/4,
@ -79,6 +80,11 @@ set_credential(Pid, Credential) ->
Pid ! {set_credential, Credential},
ok.
-spec notify_session_ending(pid(), pid(), non_neg_integer()) -> ok.
notify_session_ending(ConnPid, SessionPid, ChannelNum) ->
ConnPid ! {session_ending, SessionPid, ChannelNum},
ok.
%%--------------------------------------------------------------------------
recvloop(Deb, State = #v1{pending_recv = true}) ->
@ -233,6 +239,8 @@ handle_other({set_credential, Cred}, State) ->
handle_other(credential_expired, State) ->
Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, "credential expired", []),
handle_exception(State, 0, Error);
handle_other({session_ending, SessionPid, ChannelNum}, State) ->
untrack_channel(ChannelNum, SessionPid, State);
handle_other(Other, _State) ->
%% internal error -> something worth dying for
exit({unexpected_message, Other}).

View File

@ -638,10 +638,11 @@ log_error_and_close_session(
Error, State = #state{cfg = #cfg{reader_pid = ReaderPid,
writer_pid = WriterPid,
channel_num = Ch}}) ->
End = #'v1_0.end'{error = Error},
?LOG_WARNING("Closing session for connection ~p: ~tp",
[ReaderPid, Error]),
ok = rabbit_amqp_writer:send_command_sync(WriterPid, Ch, End),
rabbit_amqp_reader:notify_session_ending(ReaderPid, self(), Ch),
ok = rabbit_amqp_writer:send_command_sync(
WriterPid, Ch, #'v1_0.end'{error = Error}),
{stop, {shutdown, Error}, State}.
%% Batch confirms / rejects to publishers.
@ -1178,9 +1179,11 @@ handle_frame(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)},
reply_frames(Reply, State);
handle_frame(#'v1_0.end'{},
State0 = #state{cfg = #cfg{writer_pid = WriterPid,
State0 = #state{cfg = #cfg{reader_pid = ReaderPid,
writer_pid = WriterPid,
channel_num = Ch}}) ->
State = send_delivery_state_changes(State0),
rabbit_amqp_reader:notify_session_ending(ReaderPid, self(), Ch),
ok = try rabbit_amqp_writer:send_command_sync(WriterPid, Ch, #'v1_0.end'{})
catch exit:{Reason, {gen_server, call, _ArgList}}
when Reason =:= shutdown orelse