Remove deprecated cowboy return types from web mqtt

- change introduced in cowboy 2.5; see:
8404b1c908
This commit is contained in:
Chunyi Lyu 2022-10-04 15:46:50 +01:00 committed by David Ansari
parent 5710a9474a
commit 7a325a3a99
1 changed files with 28 additions and 24 deletions

View File

@ -64,6 +64,10 @@ takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, HandlerState})
{Handler, HandlerState#state{socket = Sock}}).
%% cowboy_websocket
-spec init(Req, any()) ->
{ok | module(), Req, any()} |
{module(), Req, any(), any()}
when Req::cowboy_req:req().
init(Req, Opts) ->
{PeerAddr, _PeerPort} = maps:get(peer, Req),
SockInfo = maps:get(proxy_header, Req, undefined),
@ -87,6 +91,9 @@ init(Req, Opts) ->
received_connect_frame = false
}, WsOpts}.
-spec websocket_init(State) ->
{cowboy_websocket:commands(), State} |
{cowboy_websocket:commands(), State, hibernate}.
websocket_init(State0 = #state{socket = Sock, peername = PeerAddr}) ->
ok = file_handle_cache:obtain(),
case rabbit_net:connection_string(Sock, inbound) of
@ -102,13 +109,13 @@ websocket_init(State0 = #state{socket = Sock, peername = PeerAddr}) ->
fun send_reply/2,
PeerAddr),
process_flag(trap_exit, true),
{ok,
{[],
rabbit_event:init_stats_timer(
State#state{proc_state = ProcessorState},
#state.stats_timer),
hibernate};
_ ->
{stop, State0}
{error, Reason} ->
{[{shutdown_reason, Reason}], State0}
end.
-spec close_connection(pid(), string()) -> 'ok'.
@ -118,15 +125,18 @@ close_connection(Pid, Reason) ->
sys:terminate(Pid, Reason),
ok.
-spec websocket_handle(ping | pong | {text | binary | ping | pong, binary()}, State) ->
{cowboy_websocket:commands(), State} |
{cowboy_websocket:commands(), State, hibernate}.
websocket_handle({binary, Data}, State) ->
handle_data(Data, State);
%% Silently ignore ping and pong frames as Cowboy will automatically reply to ping frames.
websocket_handle({Ping, _}, State)
when Ping =:= ping orelse Ping =:= pong ->
{ok, State, hibernate};
{[], State, hibernate};
websocket_handle(Ping, State)
when Ping =:= ping orelse Ping =:= pong ->
{ok, State, hibernate};
{[], State, hibernate};
%% Log any other unexpected frames.
websocket_handle(Frame, State) ->
rabbit_log_connection:info("Web MQTT: unexpected WebSocket frame ~tp",
@ -134,19 +144,11 @@ websocket_handle(Frame, State) ->
%%TODO close connection instead?
%%"MQTT Control Packets MUST be sent in WebSocket binary data frames.
%% If any other type of data frame is received the recipient MUST close the Network Connection"
{ok, State, hibernate}.
%% `rabbit_mqtt_processor:amqp_callback/2` doesn't actually return
%% {'error', _, _}, so this small function is a place to silence
%% unmatched warning. This allows to keep currently-unused
%% error-handling code.
-spec callback_reply(#state{}, {'ok', any()} | {'error', any(), any()}) -> {'ok', #state{}, 'hibernate'}.
-dialyzer({no_match, callback_reply/2}).
callback_reply(State, {ok, ProcState}) ->
{ok, State #state { proc_state = ProcState }, hibernate};
callback_reply(State, {error, _Reason, _ProcState}) ->
stop(State).
{[], State, hibernate}.
-spec websocket_info(any(), State) ->
{cowboy_websocket:commands(), State} |
{cowboy_websocket:commands(), State, hibernate}.
websocket_info({conserve_resources, Conserve}, State) ->
NewState = State#state{conserve_resources = Conserve},
handle_credits(control_throttle(NewState));
@ -155,14 +157,14 @@ websocket_info({bump_credit, Msg}, State) ->
handle_credits(control_throttle(State));
%%TODO return hibernate?
websocket_info({reply, Data}, State) ->
{reply, {binary, Data}, State, hibernate};
{[{binary, Data}], State, hibernate};
websocket_info({'EXIT', _, _}, State) ->
stop(State);
websocket_info({'$gen_cast', QueueEvent = {queue_event, _, _}},
State = #state{proc_state = PState0}) ->
case rabbit_mqtt_processor:handle_queue_event(QueueEvent, PState0) of
{ok, PState} ->
{ok, State#state{proc_state = PState}, hibernate};
{[], State#state{proc_state = PState}, hibernate};
{error, Reason, PState} ->
rabbit_log_connection:error("Web MQTT connection ~p failed to handle queue event: ~p",
[State#state.conn_name, Reason]),
@ -182,7 +184,7 @@ websocket_info({keepalive, Req}, State = #state{keepalive = KState0,
conn_name = ConnName}) ->
case rabbit_mqtt_keepalive:handle(Req, KState0) of
{ok, KState} ->
{ok, State#state{keepalive = KState}, hibernate};
{[], State#state{keepalive = KState}, hibernate};
{error, timeout} ->
rabbit_log_connection:error("keepalive timeout in Web MQTT connection ~p",
[ConnName]),
@ -193,15 +195,16 @@ websocket_info({keepalive, Req}, State = #state{keepalive = KState0,
stop(State)
end;
websocket_info(emit_stats, State) ->
{ok, emit_stats(State), hibernate};
{[], emit_stats(State), hibernate};
websocket_info({ra_event, _From, Evt},
#state{proc_state = PState0} = State) ->
PState = rabbit_mqtt_processor:handle_ra_event(Evt, PState0),
{ok, State#state{proc_state = PState}, hibernate};
{[], State#state{proc_state = PState}, hibernate};
websocket_info(Msg, State) ->
rabbit_log_connection:warning("Web MQTT: unexpected message ~p", [Msg]),
{ok, State, hibernate}.
{[], State, hibernate}.
-spec terminate(any(), cowboy_req:req(), any()) -> ok.
terminate(_Reason, _Request,
#state{conn_name = ConnName,
proc_state = PState,
@ -283,7 +286,7 @@ handle_credits(State0) ->
State = #state{state = running} ->
{[{active, true}], State};
State ->
{ok, State}
{[], State}
end.
control_throttle(State = #state{state = CS,
@ -306,6 +309,7 @@ send_reply(Frame, PState) ->
ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #state.stats_timer, emit_stats).
%% TODO if #state.stats_timer is undefined, rabbit_event:if_enabled crashes
maybe_emit_stats(State) ->
rabbit_event:if_enabled(State, #state.stats_timer,
fun() -> emit_stats(State) end).