Merge pull request #11683 from rabbitmq/mergify/bp/v3.13.x/pr-11682
Catch abrupt TCP closure when processing `queue_event` (backport #11676) (backport #11682)
This commit is contained in:
commit
c10364af6e
|
|
@ -140,11 +140,15 @@ handle_cast({close_connection, Reason},
|
|||
|
||||
handle_cast(QueueEvent = {queue_event, _, _},
|
||||
State = #state{proc_state = PState0}) ->
|
||||
case rabbit_mqtt_processor:handle_queue_event(QueueEvent, PState0) of
|
||||
{ok, PState} ->
|
||||
maybe_process_deferred_recv(control_throttle(pstate(State, PState)));
|
||||
{error, Reason, PState} ->
|
||||
{stop, Reason, pstate(State, PState)}
|
||||
try
|
||||
case rabbit_mqtt_processor:handle_queue_event(QueueEvent, PState0) of
|
||||
{ok, PState} ->
|
||||
maybe_process_deferred_recv(control_throttle(pstate(State, PState)));
|
||||
{error, Reason0, PState} ->
|
||||
{stop, Reason0, pstate(State, PState)}
|
||||
end
|
||||
catch throw:{send_failed, Reason1} ->
|
||||
network_error(Reason1, State)
|
||||
end;
|
||||
|
||||
handle_cast({force_event_refresh, Ref}, State0) ->
|
||||
|
|
@ -328,17 +332,17 @@ process_received_bytes(Bytes, State = #state{socket = Socket,
|
|||
{ok, Packet, Rest, ParseState1} ->
|
||||
case ProcState of
|
||||
connect_packet_unprocessed ->
|
||||
Send = fun(Data) ->
|
||||
case rabbit_net:send(Socket, Data) of
|
||||
ok ->
|
||||
ok;
|
||||
{error, Reason} ->
|
||||
?LOG_ERROR("writing to MQTT socket ~p failed: ~p",
|
||||
[Socket, Reason]),
|
||||
exit({send_failed, Reason})
|
||||
end
|
||||
end,
|
||||
try rabbit_mqtt_processor:init(Packet, Socket, ConnName, Send) of
|
||||
SendFun = fun(Data) ->
|
||||
case rabbit_net:send(Socket, Data) of
|
||||
ok ->
|
||||
ok;
|
||||
{error, Reason} ->
|
||||
?LOG_ERROR("writing to MQTT socket ~p failed: ~p",
|
||||
[Socket, Reason]),
|
||||
throw({send_failed, Reason})
|
||||
end
|
||||
end,
|
||||
try rabbit_mqtt_processor:init(Packet, Socket, ConnName, SendFun) of
|
||||
{ok, ProcState1} ->
|
||||
?LOG_INFO("Accepted MQTT connection ~ts for client ID ~ts",
|
||||
[ConnName, rabbit_mqtt_processor:info(client_id, ProcState1)]),
|
||||
|
|
@ -354,7 +358,7 @@ process_received_bytes(Bytes, State = #state{socket = Socket,
|
|||
?LOG_ERROR("Rejected MQTT connection ~ts with Connect Reason Code ~p",
|
||||
[ConnName, ConnectReasonCode]),
|
||||
{stop, shutdown, {_SendWill = false, State}}
|
||||
catch exit:{send_failed, Reason} ->
|
||||
catch throw:{send_failed, Reason} ->
|
||||
network_error(Reason, State)
|
||||
end;
|
||||
_ ->
|
||||
|
|
@ -375,7 +379,7 @@ process_received_bytes(Bytes, State = #state{socket = Socket,
|
|||
{stop, {shutdown, Reason}, pstate(State, ProcState1)};
|
||||
{stop, {disconnect, {client_initiated, SendWill}}, ProcState1} ->
|
||||
{stop, normal, {SendWill, pstate(State, ProcState1)}}
|
||||
catch exit:{send_failed, Reason} ->
|
||||
catch throw:{send_failed, Reason} ->
|
||||
network_error(Reason, State)
|
||||
end
|
||||
end;
|
||||
|
|
|
|||
Loading…
Reference in New Issue