Merge pull request #14317 from rabbitmq/channel-reuse
Test (make) / Build and Xref (1.18, 26) (push) Waiting to run
Details
Test (make) / Build and Xref (1.18, 27) (push) Waiting to run
Details
Test (make) / Build and Xref (1.18, 28) (push) Waiting to run
Details
Test (make) / Test (1.18, 28, khepri) (push) Waiting to run
Details
Test (make) / Test (1.18, 28, mnesia) (push) Waiting to run
Details
Test (make) / Test mixed clusters (1.18, 28, khepri) (push) Waiting to run
Details
Test (make) / Test mixed clusters (1.18, 28, mnesia) (push) Waiting to run
Details
Test (make) / Type check (1.18, 28) (push) Waiting to run
Details
Trigger a 4.2.x alpha release build / trigger_alpha_build (push) Has been cancelled
Details
Test (make) / Build and Xref (1.18, 26) (push) Waiting to run
Details
Test (make) / Build and Xref (1.18, 27) (push) Waiting to run
Details
Test (make) / Build and Xref (1.18, 28) (push) Waiting to run
Details
Test (make) / Test (1.18, 28, khepri) (push) Waiting to run
Details
Test (make) / Test (1.18, 28, mnesia) (push) Waiting to run
Details
Test (make) / Test mixed clusters (1.18, 28, khepri) (push) Waiting to run
Details
Test (make) / Test mixed clusters (1.18, 28, mnesia) (push) Waiting to run
Details
Test (make) / Type check (1.18, 28) (push) Waiting to run
Details
Trigger a 4.2.x alpha release build / trigger_alpha_build (push) Has been cancelled
Details
Fix channel number reuse
This commit is contained in:
commit
aba63b0179
|
@ -17,7 +17,8 @@
|
||||||
-export([init/1,
|
-export([init/1,
|
||||||
info/2,
|
info/2,
|
||||||
mainloop/2,
|
mainloop/2,
|
||||||
set_credential/2]).
|
set_credential/2,
|
||||||
|
notify_session_ending/3]).
|
||||||
|
|
||||||
-export([system_continue/3,
|
-export([system_continue/3,
|
||||||
system_terminate/4,
|
system_terminate/4,
|
||||||
|
@ -79,6 +80,11 @@ set_credential(Pid, Credential) ->
|
||||||
Pid ! {set_credential, Credential},
|
Pid ! {set_credential, Credential},
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
-spec notify_session_ending(pid(), pid(), non_neg_integer()) -> ok.
|
||||||
|
notify_session_ending(ConnPid, SessionPid, ChannelNum) ->
|
||||||
|
ConnPid ! {session_ending, SessionPid, ChannelNum},
|
||||||
|
ok.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------------
|
%%--------------------------------------------------------------------------
|
||||||
|
|
||||||
recvloop(Deb, State = #v1{pending_recv = true}) ->
|
recvloop(Deb, State = #v1{pending_recv = true}) ->
|
||||||
|
@ -233,6 +239,8 @@ handle_other({set_credential, Cred}, State) ->
|
||||||
handle_other(credential_expired, State) ->
|
handle_other(credential_expired, State) ->
|
||||||
Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, "credential expired", []),
|
Error = error_frame(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, "credential expired", []),
|
||||||
handle_exception(State, 0, Error);
|
handle_exception(State, 0, Error);
|
||||||
|
handle_other({session_ending, SessionPid, ChannelNum}, State) ->
|
||||||
|
untrack_channel(ChannelNum, SessionPid, State);
|
||||||
handle_other(Other, _State) ->
|
handle_other(Other, _State) ->
|
||||||
%% internal error -> something worth dying for
|
%% internal error -> something worth dying for
|
||||||
exit({unexpected_message, Other}).
|
exit({unexpected_message, Other}).
|
||||||
|
|
|
@ -638,10 +638,11 @@ log_error_and_close_session(
|
||||||
Error, State = #state{cfg = #cfg{reader_pid = ReaderPid,
|
Error, State = #state{cfg = #cfg{reader_pid = ReaderPid,
|
||||||
writer_pid = WriterPid,
|
writer_pid = WriterPid,
|
||||||
channel_num = Ch}}) ->
|
channel_num = Ch}}) ->
|
||||||
End = #'v1_0.end'{error = Error},
|
|
||||||
?LOG_WARNING("Closing session for connection ~p: ~tp",
|
?LOG_WARNING("Closing session for connection ~p: ~tp",
|
||||||
[ReaderPid, Error]),
|
[ReaderPid, Error]),
|
||||||
ok = rabbit_amqp_writer:send_command_sync(WriterPid, Ch, End),
|
rabbit_amqp_reader:notify_session_ending(ReaderPid, self(), Ch),
|
||||||
|
ok = rabbit_amqp_writer:send_command_sync(
|
||||||
|
WriterPid, Ch, #'v1_0.end'{error = Error}),
|
||||||
{stop, {shutdown, Error}, State}.
|
{stop, {shutdown, Error}, State}.
|
||||||
|
|
||||||
%% Batch confirms / rejects to publishers.
|
%% Batch confirms / rejects to publishers.
|
||||||
|
@ -1178,9 +1179,11 @@ handle_frame(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)},
|
||||||
reply_frames(Reply, State);
|
reply_frames(Reply, State);
|
||||||
|
|
||||||
handle_frame(#'v1_0.end'{},
|
handle_frame(#'v1_0.end'{},
|
||||||
State0 = #state{cfg = #cfg{writer_pid = WriterPid,
|
State0 = #state{cfg = #cfg{reader_pid = ReaderPid,
|
||||||
|
writer_pid = WriterPid,
|
||||||
channel_num = Ch}}) ->
|
channel_num = Ch}}) ->
|
||||||
State = send_delivery_state_changes(State0),
|
State = send_delivery_state_changes(State0),
|
||||||
|
rabbit_amqp_reader:notify_session_ending(ReaderPid, self(), Ch),
|
||||||
ok = try rabbit_amqp_writer:send_command_sync(WriterPid, Ch, #'v1_0.end'{})
|
ok = try rabbit_amqp_writer:send_command_sync(WriterPid, Ch, #'v1_0.end'{})
|
||||||
catch exit:{Reason, {gen_server, call, _ArgList}}
|
catch exit:{Reason, {gen_server, call, _ArgList}}
|
||||||
when Reason =:= shutdown orelse
|
when Reason =:= shutdown orelse
|
||||||
|
|
Loading…
Reference in New Issue