Catch abrupt TCP closure when processing `queue_event`

Reported here:
https://groups.google.com/g/rabbitmq-users/c/4AOwZrQyekI

(cherry picked from commit 994008aa7f)
(cherry picked from commit 4c6191eb0c)
This commit is contained in:
Luke Bakken 2024-07-11 09:17:31 -07:00 committed by Mergify
parent 0e8ea17d20
commit 6fb22af96d
1 changed files with 22 additions and 18 deletions

View File

@ -140,11 +140,15 @@ handle_cast({close_connection, Reason},
handle_cast(QueueEvent = {queue_event, _, _},
State = #state{proc_state = PState0}) ->
try
case rabbit_mqtt_processor:handle_queue_event(QueueEvent, PState0) of
{ok, PState} ->
maybe_process_deferred_recv(control_throttle(pstate(State, PState)));
{error, Reason, PState} ->
{stop, Reason, pstate(State, PState)}
{error, Reason0, PState} ->
{stop, Reason0, pstate(State, PState)}
end
catch throw:{send_failed, Reason1} ->
network_error(Reason1, State)
end;
handle_cast({force_event_refresh, Ref}, State0) ->
@ -328,17 +332,17 @@ process_received_bytes(Bytes, State = #state{socket = Socket,
{ok, Packet, Rest, ParseState1} ->
case ProcState of
connect_packet_unprocessed ->
Send = fun(Data) ->
SendFun = fun(Data) ->
case rabbit_net:send(Socket, Data) of
ok ->
ok;
{error, Reason} ->
?LOG_ERROR("writing to MQTT socket ~p failed: ~p",
[Socket, Reason]),
exit({send_failed, Reason})
throw({send_failed, Reason})
end
end,
try rabbit_mqtt_processor:init(Packet, Socket, ConnName, Send) of
try rabbit_mqtt_processor:init(Packet, Socket, ConnName, SendFun) of
{ok, ProcState1} ->
?LOG_INFO("Accepted MQTT connection ~ts for client ID ~ts",
[ConnName, rabbit_mqtt_processor:info(client_id, ProcState1)]),
@ -354,7 +358,7 @@ process_received_bytes(Bytes, State = #state{socket = Socket,
?LOG_ERROR("Rejected MQTT connection ~ts with Connect Reason Code ~p",
[ConnName, ConnectReasonCode]),
{stop, shutdown, {_SendWill = false, State}}
catch exit:{send_failed, Reason} ->
catch throw:{send_failed, Reason} ->
network_error(Reason, State)
end;
_ ->
@ -375,7 +379,7 @@ process_received_bytes(Bytes, State = #state{socket = Socket,
{stop, {shutdown, Reason}, pstate(State, ProcState1)};
{stop, {disconnect, {client_initiated, SendWill}}, ProcState1} ->
{stop, normal, {SendWill, pstate(State, ProcState1)}}
catch exit:{send_failed, Reason} ->
catch throw:{send_failed, Reason} ->
network_error(Reason, State)
end
end;