diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl index c24898288d..a18781bc2c 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl @@ -140,11 +140,15 @@ handle_cast({close_connection, Reason}, handle_cast(QueueEvent = {queue_event, _, _}, State = #state{proc_state = PState0}) -> - case rabbit_mqtt_processor:handle_queue_event(QueueEvent, PState0) of - {ok, PState} -> - maybe_process_deferred_recv(control_throttle(pstate(State, PState))); - {error, Reason, PState} -> - {stop, Reason, pstate(State, PState)} + try + case rabbit_mqtt_processor:handle_queue_event(QueueEvent, PState0) of + {ok, PState} -> + maybe_process_deferred_recv(control_throttle(pstate(State, PState))); + {error, Reason0, PState} -> + {stop, Reason0, pstate(State, PState)} + end + catch throw:{send_failed, Reason1} -> + network_error(Reason1, State) end; handle_cast({force_event_refresh, Ref}, State0) -> @@ -328,17 +332,17 @@ process_received_bytes(Bytes, State = #state{socket = Socket, {ok, Packet, Rest, ParseState1} -> case ProcState of connect_packet_unprocessed -> - Send = fun(Data) -> - case rabbit_net:send(Socket, Data) of - ok -> - ok; - {error, Reason} -> - ?LOG_ERROR("writing to MQTT socket ~p failed: ~p", - [Socket, Reason]), - exit({send_failed, Reason}) - end - end, - try rabbit_mqtt_processor:init(Packet, Socket, ConnName, Send) of + SendFun = fun(Data) -> + case rabbit_net:send(Socket, Data) of + ok -> + ok; + {error, Reason} -> + ?LOG_ERROR("writing to MQTT socket ~p failed: ~p", + [Socket, Reason]), + throw({send_failed, Reason}) + end + end, + try rabbit_mqtt_processor:init(Packet, Socket, ConnName, SendFun) of {ok, ProcState1} -> ?LOG_INFO("Accepted MQTT connection ~ts for client ID ~ts", [ConnName, rabbit_mqtt_processor:info(client_id, ProcState1)]), @@ -354,7 +358,7 @@ process_received_bytes(Bytes, State = #state{socket = Socket, ?LOG_ERROR("Rejected MQTT connection ~ts with Connect Reason Code ~p", [ConnName, ConnectReasonCode]), {stop, shutdown, {_SendWill = false, State}} - catch exit:{send_failed, Reason} -> + catch throw:{send_failed, Reason} -> network_error(Reason, State) end; _ -> @@ -375,7 +379,7 @@ process_received_bytes(Bytes, State = #state{socket = Socket, {stop, {shutdown, Reason}, pstate(State, ProcState1)}; {stop, {disconnect, {client_initiated, SendWill}}, ProcState1} -> {stop, normal, {SendWill, pstate(State, ProcState1)}} - catch exit:{send_failed, Reason} -> + catch throw:{send_failed, Reason} -> network_error(Reason, State) end end;