Stop on errors

Rather than crashing with case_clause, errors returned in the data
handlers should result in the orderly shutdown of the MQTT connection.

Fixes #64
This commit is contained in:
Luke Bakken 2020-06-15 09:41:58 -07:00
parent 5714f7e4a9
commit ebeed7bb10
1 changed files with 40 additions and 22 deletions

View File

@ -113,28 +113,28 @@ websocket_info({#'basic.deliver'{}, #amqp_msg{}, _DeliveryCtx} = Delivery,
{ok, ProcState} ->
{ok, State #state { proc_state = ProcState }, hibernate};
{error, _, _} ->
{stop, State}
stop(State)
end;
websocket_info(#'basic.ack'{} = Ack, State = #state{ proc_state = ProcState0 }) ->
case rabbit_mqtt_processor:amqp_callback(Ack, ProcState0) of
{ok, ProcState} ->
{ok, State #state { proc_state = ProcState }, hibernate};
{error, _, _} ->
{stop, State}
stop(State)
end;
websocket_info(#'basic.consume_ok'{}, State) ->
{ok, State, hibernate};
websocket_info(#'basic.cancel'{}, State) ->
{stop, State};
stop(State);
websocket_info({reply, Data}, State) ->
{reply, {binary, Data}, State, hibernate};
websocket_info({'EXIT', _, _}, State) ->
{stop, State};
stop(State);
websocket_info({'$gen_cast', duplicate_id}, State = #state{ proc_state = ProcState,
conn_name = ConnName }) ->
rabbit_log_connection:warning("Web MQTT disconnecting duplicate client id ~p (~p)~n",
[rabbit_mqtt_processor:info(client_id, ProcState), ConnName]),
{stop, State};
stop(State);
websocket_info({start_keepalives, Keepalive},
State = #state{ socket = Sock, keepalive_sup = KeepaliveSup }) ->
%% Only the client has the responsibility for sending keepalives
@ -146,9 +146,11 @@ websocket_info({start_keepalives, Keepalive},
{ok, State #state { keepalive = Heartbeater }, hibernate};
websocket_info(keepalive_timeout, State = #state{conn_name = ConnStr}) ->
rabbit_log_connection:error("closing Web MQTT connection ~p (keepalive timeout)~n", [ConnStr]),
{stop, State};
stop(State);
websocket_info(emit_stats, State) ->
{ok, emit_stats(State), hibernate};
websocket_info({ra_event, _, _}, State) ->
{ok, State, hibernate};
websocket_info(Msg, State) ->
rabbit_log_connection:info("Web MQTT: unexpected message ~p~n",
[Msg]),
@ -156,20 +158,18 @@ websocket_info(Msg, State) ->
terminate(_, _, #state{ proc_state = undefined }) ->
ok;
terminate(_, _, State = #state{ proc_state = ProcState,
conn_name = ConnName }) ->
maybe_emit_stats(State),
rabbit_log_connection:info("closing Web MQTT connection ~p (~s)~n", [self(), ConnName]),
rabbit_mqtt_processor:send_will(ProcState),
rabbit_mqtt_processor:close_connection(ProcState),
terminate(_, _, State) ->
stop_rabbit_mqtt_processor(State),
ok.
%% Internal.
handle_data(Data, State0) ->
handle_data(Data, State0 = #state{conn_name = ConnStr}) ->
case handle_data1(Data, State0) of
{ok, State1 = #state{state = blocked}, hibernate} ->
{[{active, false}], State1, hibernate};
{error, Error} ->
stop_with_framing_error(State0, Error, ConnStr);
Other ->
Other
end.
@ -195,20 +195,38 @@ handle_data1(Data, State = #state{ parse_state = ParseState,
{error, Reason, _} ->
rabbit_log_connection:info("MQTT protocol error ~p for connection ~p~n",
[Reason, ConnStr]),
{stop, State};
stop(State, 1002, Reason);
{error, Error} ->
rabbit_log_connection:error("MQTT detected framing error '~p' for connection ~p~n",
[Error, ConnStr]),
{stop, State};
stop_with_framing_error(State, Error, ConnStr);
{stop, _} ->
{stop, State}
stop(State)
end;
{error, Error} ->
rabbit_log_connection:error("MQTT detected framing error '~p' for connection ~p~n",
[ConnStr, Error]),
{stop, State}
Other ->
Other
end.
stop(State) ->
stop(State, 1000, "MQTT died").
stop(State, CloseCode, Error0) ->
stop_rabbit_mqtt_processor(State),
Error1 = rabbit_data_coercion:to_binary(Error0),
{[{close, CloseCode, Error1}], State}.
stop_with_framing_error(State, Error0, ConnStr) ->
Error1 = rabbit_misc:format("~p", [Error0]),
rabbit_log_connection:error("MQTT detected framing error '~s' for connection ~p~n",
[Error1, ConnStr]),
stop(State, 1007, Error1).
stop_rabbit_mqtt_processor(State = #state{state = running,
proc_state = ProcState,
conn_name = ConnName}) ->
maybe_emit_stats(State),
rabbit_log_connection:info("closing Web MQTT connection ~p (~s)~n", [self(), ConnName]),
rabbit_mqtt_processor:send_will(ProcState),
rabbit_mqtt_processor:close_connection(ProcState).
handle_credits(State0) ->
case control_throttle(State0) of
State = #state{state = running} ->