Support MQTT Keepalive in WebMQTT
Share the same MQTT keepalive code between rabbit_mqtt_reader and rabbit_web_mqtt_handler. Add MQTT keepalive test in both plugins rabbitmq_mqtt and rabbitmq_web_mqtt.
This commit is contained in:
parent
fc33719d77
commit
5710a9474a
|
|
@ -10,23 +10,19 @@
|
|||
-include("rabbit_mqtt_types.hrl").
|
||||
|
||||
%% reader state
|
||||
-record(state, {socket,
|
||||
proxy_socket,
|
||||
conn_name,
|
||||
await_recv,
|
||||
deferred_recv,
|
||||
received_connect_frame,
|
||||
connection_state,
|
||||
conserve,
|
||||
parse_state,
|
||||
proc_state,
|
||||
stats_timer,
|
||||
keepalive}).
|
||||
|
||||
-record(keepalive, {timer :: reference(),
|
||||
interval_ms :: pos_integer(),
|
||||
recv_oct :: non_neg_integer(),
|
||||
received :: boolean()}).
|
||||
-record(state,
|
||||
{socket,
|
||||
proxy_socket,
|
||||
conn_name,
|
||||
await_recv,
|
||||
deferred_recv,
|
||||
received_connect_frame,
|
||||
connection_state,
|
||||
conserve,
|
||||
parse_state,
|
||||
proc_state,
|
||||
stats_timer,
|
||||
keepalive :: rabbit_mqtt_keepalive:state()}).
|
||||
|
||||
%% processor state
|
||||
-record(proc_state,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,92 @@
|
|||
-module(rabbit_mqtt_keepalive).
|
||||
|
||||
-export([start/2,
|
||||
handle/2,
|
||||
start_timer/1,
|
||||
cancel_timer/1]).
|
||||
|
||||
-export_type([state/0]).
|
||||
|
||||
-record(state, {timer :: reference(),
|
||||
interval_ms :: pos_integer(),
|
||||
socket :: inet:socket(),
|
||||
recv_oct :: non_neg_integer(),
|
||||
received :: boolean()}).
|
||||
|
||||
-opaque(state() :: undefined | #state{}).
|
||||
|
||||
-spec start(IntervalSeconds :: non_neg_integer(), inet:socket()) -> ok.
|
||||
start(0, _Sock) ->
|
||||
ok;
|
||||
start(Seconds, Sock)
|
||||
when is_integer(Seconds) andalso Seconds > 0 ->
|
||||
self() ! {keepalive, {init, Seconds, Sock}},
|
||||
ok.
|
||||
|
||||
-spec handle(Request :: term(), state()) ->
|
||||
{ok, state()} | {error, Reason :: term()}.
|
||||
handle({init, IntervalSecs, Sock}, _State) ->
|
||||
case rabbit_net:getstat(Sock, [recv_oct]) of
|
||||
{ok, [{recv_oct, RecvOct}]} ->
|
||||
%% "If the Keep Alive value is non-zero and the Server does not receive a Control
|
||||
%% Packet from the Client within one and a half times the Keep Alive time period,
|
||||
%% it MUST disconnect the Network Connection to the Client as if the network had
|
||||
%% failed" [MQTT-3.1.2-24]
|
||||
%%
|
||||
%% We check every (1.5 / 2 = 0.75) * KeepaliveInterval whether we received
|
||||
%% any data from the client. If there was no activity for two consecutive times,
|
||||
%% we close the connection.
|
||||
%% We choose 0.75 (instead of a larger or smaller factor) to have the right balance
|
||||
%% between not checking too often (since it could become expensive when there are
|
||||
%% millions of clients) and not checking too rarely (to detect dead clients promptly).
|
||||
%%
|
||||
%% See https://github.com/emqx/emqx/issues/460
|
||||
%% PING
|
||||
%% | DOWN
|
||||
%% | |<-------Delay Time--------->
|
||||
%% t0---->|----------|----------|----------|---->tn
|
||||
%% | | |
|
||||
%% Ok Retry Timeout
|
||||
IntervalMs = round(0.75 * timer:seconds(IntervalSecs)),
|
||||
State = #state{socket = Sock,
|
||||
interval_ms = IntervalMs,
|
||||
recv_oct = RecvOct,
|
||||
received = true},
|
||||
{ok, start_timer(State)};
|
||||
{error, _} = Err ->
|
||||
Err
|
||||
end;
|
||||
handle(check, State = #state{socket = Sock,
|
||||
recv_oct = SameRecvOct,
|
||||
received = ReceivedPreviously}) ->
|
||||
case rabbit_net:getstat(Sock, [recv_oct]) of
|
||||
{ok, [{recv_oct, SameRecvOct}]}
|
||||
when ReceivedPreviously ->
|
||||
%% Did not receive from socket for the 1st time.
|
||||
{ok, start_timer(State#state{received = false})};
|
||||
{ok, [{recv_oct, SameRecvOct}]} ->
|
||||
%% Did not receive from socket for 2nd time.
|
||||
{error, timeout};
|
||||
{ok, [{recv_oct, NewRecvOct}]} ->
|
||||
%% Received from socket.
|
||||
{ok, start_timer(State#state{recv_oct = NewRecvOct,
|
||||
received = true})};
|
||||
{error, _} = Err ->
|
||||
Err
|
||||
end.
|
||||
|
||||
-spec start_timer(state()) -> state().
|
||||
start_timer(undefined) ->
|
||||
undefined;
|
||||
start_timer(#state{interval_ms = IntervalMs} = State) ->
|
||||
Ref = erlang:send_after(IntervalMs, self(), {keepalive, check}),
|
||||
State#state{timer = Ref}.
|
||||
|
||||
-spec cancel_timer(state()) -> state().
|
||||
cancel_timer(undefined) ->
|
||||
undefined;
|
||||
cancel_timer(#state{timer = Ref} = State)
|
||||
when is_reference(Ref) ->
|
||||
ok = erlang:cancel_timer(Ref, [{async, true},
|
||||
{info, false}]),
|
||||
State.
|
||||
|
|
@ -260,6 +260,7 @@ process_connect(#mqtt_frame{
|
|||
fun login/2,
|
||||
fun register_client/2,
|
||||
fun notify_connection_created/2,
|
||||
fun start_keepalive/2,
|
||||
fun handle_clean_session/2],
|
||||
FrameConnect, PState0) of
|
||||
{ok, SessionPresent0, PState1} ->
|
||||
|
|
@ -333,18 +334,14 @@ login({UserBin, PassBin,
|
|||
|
||||
register_client(already_connected, _PState) ->
|
||||
ok;
|
||||
register_client(Frame = #mqtt_frame_connect{
|
||||
keep_alive = Keepalive,
|
||||
proto_ver = ProtoVersion},
|
||||
register_client(Frame = #mqtt_frame_connect{proto_ver = ProtoVersion},
|
||||
PState = #proc_state{client_id = ClientId,
|
||||
socket = Socket,
|
||||
auth_state = #auth_state{
|
||||
vhost = VHost}}) ->
|
||||
auth_state = #auth_state{vhost = VHost}}) ->
|
||||
case rabbit_mqtt_collector:register(ClientId, self()) of
|
||||
{ok, Corr} ->
|
||||
RetainerPid = rabbit_mqtt_retainer_sup:child_for_vhost(VHost),
|
||||
Prefetch = rabbit_mqtt_util:env(prefetch),
|
||||
rabbit_mqtt_reader:start_keepalive(self(), Keepalive),
|
||||
{ok, {PeerHost, PeerPort, Host, Port}} = rabbit_net:socket_ends(Socket, inbound),
|
||||
ExchangeBin = rabbit_mqtt_util:env(exchange),
|
||||
ExchangeName = rabbit_misc:r(VHost, exchange, ExchangeBin),
|
||||
|
|
@ -421,6 +418,10 @@ return_connack(?CONNACK_ID_REJECTED, S) ->
|
|||
return_connack(?CONNACK_UNACCEPTABLE_PROTO_VER, S) ->
|
||||
{error, unsupported_protocol_version, S}.
|
||||
|
||||
start_keepalive(#mqtt_frame_connect{keep_alive = Seconds},
|
||||
#proc_state{socket = Socket}) ->
|
||||
ok = rabbit_mqtt_keepalive:start(Seconds, Socket).
|
||||
|
||||
handle_clean_session(_, PState0 = #proc_state{clean_sess = false}) ->
|
||||
case get_queue(?QOS_1, PState0) of
|
||||
{error, not_found} ->
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@
|
|||
%%TODO check where to best 'hibernate' when returning from callback
|
||||
%%TODO use rabbit_global_counters for MQTT protocol
|
||||
|
||||
-export([conserve_resources/3, start_keepalive/2,
|
||||
-export([conserve_resources/3,
|
||||
close_connection/2]).
|
||||
|
||||
-export([info/2]).
|
||||
|
|
@ -166,61 +166,14 @@ handle_info({bump_credit, Msg}, State) ->
|
|||
credit_flow:handle_bump_msg(Msg),
|
||||
maybe_process_deferred_recv(control_throttle(State));
|
||||
|
||||
handle_info({start_keepalive, KeepaliveSec},
|
||||
State = #state{socket = Sock,
|
||||
keepalive = undefined})
|
||||
when is_number(KeepaliveSec), KeepaliveSec > 0 ->
|
||||
case rabbit_net:getstat(Sock, [recv_oct]) of
|
||||
{ok, [{recv_oct, RecvOct}]} ->
|
||||
%% "If the Keep Alive value is non-zero and the Server does not receive a Control
|
||||
%% Packet from the Client within one and a half times the Keep Alive time period,
|
||||
%% it MUST disconnect the Network Connection to the Client as if the network had
|
||||
%% failed" [MQTT-3.1.2-24]
|
||||
%% 0.75 * 2 = 1.5
|
||||
IntervalMs = timer:seconds(round(0.75 * KeepaliveSec)),
|
||||
Ref = start_keepalive_timer(#keepalive{interval_ms = IntervalMs}),
|
||||
{noreply, State#state{keepalive = #keepalive{timer = Ref,
|
||||
interval_ms = IntervalMs,
|
||||
recv_oct = RecvOct,
|
||||
received = true}}};
|
||||
{error, einval} ->
|
||||
%% the socket is dead, most likely because the connection is being shut down
|
||||
{stop, {shutdown, cannot_get_socket_stats}, State};
|
||||
{error, Reason} ->
|
||||
{stop, Reason, State}
|
||||
end;
|
||||
|
||||
handle_info({timeout, Ref, keepalive},
|
||||
State = #state {socket = Sock,
|
||||
conn_name = ConnStr,
|
||||
proc_state = PState,
|
||||
keepalive = #keepalive{timer = Ref,
|
||||
recv_oct = SameRecvOct,
|
||||
received = ReceivedPreviously} = KeepAlive}) ->
|
||||
case rabbit_net:getstat(Sock, [recv_oct]) of
|
||||
{ok, [{recv_oct, SameRecvOct}]}
|
||||
when ReceivedPreviously ->
|
||||
%% Did not receive from socket for the 1st time.
|
||||
Ref1 = start_keepalive_timer(KeepAlive),
|
||||
{noreply,
|
||||
State#state{keepalive = KeepAlive#keepalive{timer = Ref1,
|
||||
received = false}},
|
||||
hibernate};
|
||||
{ok, [{recv_oct, SameRecvOct}]} ->
|
||||
%% Did not receive from socket for 2nd time successively.
|
||||
rabbit_log_connection:error("closing MQTT connection ~tp (keepalive timeout)", [ConnStr]),
|
||||
send_will_and_terminate(PState, {shutdown, keepalive_timeout}, State);
|
||||
{ok, [{recv_oct, RecvOct}]} ->
|
||||
%% Received from socket.
|
||||
Ref1 = start_keepalive_timer(KeepAlive),
|
||||
{noreply,
|
||||
State#state{keepalive = KeepAlive#keepalive{timer = Ref1,
|
||||
recv_oct = RecvOct,
|
||||
received = true}},
|
||||
hibernate};
|
||||
{error, einval} ->
|
||||
%% the socket is dead, most likely because the connection is being shut down
|
||||
{stop, {shutdown, cannot_get_socket_stats}, State};
|
||||
handle_info({keepalive, Req}, State = #state{keepalive = KState0,
|
||||
conn_name = ConnName}) ->
|
||||
case rabbit_mqtt_keepalive:handle(Req, KState0) of
|
||||
{ok, KState} ->
|
||||
{noreply, State#state{keepalive = KState}, hibernate};
|
||||
{error, timeout} ->
|
||||
rabbit_log_connection:error("closing MQTT connection ~p (keepalive timeout)", [ConnName]),
|
||||
send_will_and_terminate({shutdown, keepalive_timeout}, State);
|
||||
{error, Reason} ->
|
||||
{stop, Reason, State}
|
||||
end;
|
||||
|
|
@ -254,14 +207,9 @@ handle_info({'DOWN', _MRef, process, _Pid, _Reason} = Evt,
|
|||
handle_info(Msg, State) ->
|
||||
{stop, {mqtt_unexpected_msg, Msg}, State}.
|
||||
|
||||
start_keepalive_timer(#keepalive{interval_ms = Time}) ->
|
||||
erlang:start_timer(Time, self(), keepalive).
|
||||
|
||||
cancel_keepalive_timer(#keepalive{timer = Ref}) ->
|
||||
erlang:cancel_timer(Ref, [{async, true}, {info, false}]).
|
||||
|
||||
terminate(Reason, State) ->
|
||||
maybe_emit_stats(State),
|
||||
terminate(Reason, State = #state{keepalive = KState0}) ->
|
||||
KState = rabbit_mqtt_keepalive:cancel_timer(KState0),
|
||||
maybe_emit_stats(State#state{keepalive = KState}),
|
||||
do_terminate(Reason, State).
|
||||
|
||||
handle_pre_hibernate(State) ->
|
||||
|
|
@ -300,7 +248,7 @@ do_terminate({network_error, Reason}, _State) ->
|
|||
rabbit_log_connection:error("MQTT detected network error: ~tp", [Reason]);
|
||||
|
||||
do_terminate(normal, #state{proc_state = ProcState,
|
||||
conn_name = ConnName}) ->
|
||||
conn_name = ConnName}) ->
|
||||
rabbit_mqtt_processor:terminate(ProcState),
|
||||
rabbit_log_connection:info("closing MQTT connection ~p (~s)", [self(), ConnName]),
|
||||
ok;
|
||||
|
|
@ -395,9 +343,6 @@ callback_reply(State, {ok, ProcState}) ->
|
|||
callback_reply(State, {error, Reason, ProcState}) ->
|
||||
{stop, Reason, pstate(State, ProcState)}.
|
||||
|
||||
start_keepalive(_, 0 ) -> ok;
|
||||
start_keepalive(Pid, Keepalive) -> Pid ! {start_keepalive, Keepalive}.
|
||||
|
||||
pstate(State = #state {}, PState = #proc_state{}) ->
|
||||
State #state{ proc_state = PState }.
|
||||
|
||||
|
|
@ -415,17 +360,17 @@ parse(Bytes, ParseState) ->
|
|||
%% "The Will Message MUST be published when the Network Connection is subsequently
|
||||
%% closed unless the Will Message has been deleted by the Server on receipt of a
|
||||
%% DISCONNECT Packet [MQTT-3.1.2-8]."
|
||||
send_will_and_terminate(PState, State) ->
|
||||
send_will_and_terminate(PState, {shutdown, conn_closed}, State).
|
||||
send_will_and_terminate(State) ->
|
||||
send_will_and_terminate({shutdown, conn_closed}, State).
|
||||
|
||||
send_will_and_terminate(PState, Reason, State = #state{conn_name = ConnStr}) ->
|
||||
send_will_and_terminate(Reason, State = #state{conn_name = ConnStr,
|
||||
proc_state = PState}) ->
|
||||
rabbit_log_connection:debug("MQTT: about to send will message (if any) on connection ~p", [ConnStr]),
|
||||
rabbit_mqtt_processor:send_will(PState),
|
||||
{stop, Reason, State}.
|
||||
|
||||
network_error(closed,
|
||||
State = #state{conn_name = ConnStr,
|
||||
proc_state = PState,
|
||||
received_connect_frame = Connected}) ->
|
||||
Fmt = "MQTT connection ~p will terminate because peer closed TCP connection",
|
||||
Args = [ConnStr],
|
||||
|
|
@ -433,14 +378,13 @@ network_error(closed,
|
|||
true -> rabbit_log_connection:info(Fmt, Args);
|
||||
false -> rabbit_log_connection:debug(Fmt, Args)
|
||||
end,
|
||||
send_will_and_terminate(PState, State);
|
||||
send_will_and_terminate(State);
|
||||
|
||||
network_error(Reason,
|
||||
State = #state{conn_name = ConnStr,
|
||||
proc_state = PState}) ->
|
||||
rabbit_log_connection:info("MQTT detected network error for ~tp: ~tp",
|
||||
State = #state{conn_name = ConnStr}) ->
|
||||
rabbit_log_connection:info("MQTT detected network error for ~p: ~p",
|
||||
[ConnStr, Reason]),
|
||||
send_will_and_terminate(PState, State).
|
||||
send_will_and_terminate(State).
|
||||
|
||||
run_socket(State = #state{ connection_state = blocked }) ->
|
||||
State;
|
||||
|
|
@ -454,24 +398,14 @@ run_socket(State = #state{ socket = Sock }) ->
|
|||
|
||||
control_throttle(State = #state{connection_state = Flow,
|
||||
conserve = Conserve,
|
||||
keepalive = KeepAlive}) ->
|
||||
keepalive = KState}) ->
|
||||
case {Flow, Conserve orelse credit_flow:blocked()} of
|
||||
{running, true}
|
||||
when KeepAlive =:= undefined ->
|
||||
State#state{connection_state = blocked};
|
||||
{running, true} ->
|
||||
%%TODO Instead of cancelling / setting the timer every time the connection
|
||||
%% gets blocked / unblocked, restart the timer when it expires and
|
||||
%% the connection_state is blocked.
|
||||
ok = cancel_keepalive_timer(KeepAlive),
|
||||
State#state{connection_state = blocked};
|
||||
{blocked, false}
|
||||
when KeepAlive =:= undefined ->
|
||||
run_socket(State #state{connection_state = running});
|
||||
State#state{connection_state = blocked,
|
||||
keepalive = rabbit_mqtt_keepalive:cancel_timer(KState)};
|
||||
{blocked, false} ->
|
||||
Ref = start_keepalive_timer(KeepAlive),
|
||||
run_socket(State #state{connection_state = running,
|
||||
keepalive = KeepAlive#keepalive{timer = Ref}});
|
||||
run_socket(State#state{connection_state = running,
|
||||
keepalive = rabbit_mqtt_keepalive:start_timer(KState)});
|
||||
{_, _} ->
|
||||
run_socket(State)
|
||||
end.
|
||||
|
|
|
|||
|
|
@ -11,6 +11,8 @@
|
|||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-import(rabbit_ct_broker_helpers, [rpc/5]).
|
||||
|
||||
all() ->
|
||||
[
|
||||
{group, non_parallel_tests}
|
||||
|
|
@ -23,6 +25,7 @@ groups() ->
|
|||
block_connack_timeout,
|
||||
handle_invalid_frames,
|
||||
login_timeout,
|
||||
keepalive,
|
||||
stats,
|
||||
quorum_clean_session_false,
|
||||
quorum_clean_session_true,
|
||||
|
|
@ -179,6 +182,41 @@ login_timeout(Config) ->
|
|||
rpc(Config, application, unset_env, [rabbitmq_mqtt, login_timeout])
|
||||
end.
|
||||
|
||||
keepalive(Config) ->
|
||||
KeepaliveSecs = 1,
|
||||
KeepaliveMs = timer:seconds(KeepaliveSecs),
|
||||
P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
|
||||
{ok, C} = emqtt:start_link([{keepalive, KeepaliveSecs},
|
||||
{host, "localhost"},
|
||||
{port, P},
|
||||
{clientid, <<"simpleClient">>},
|
||||
{proto_ver, v4}
|
||||
]),
|
||||
{ok, _Properties} = emqtt:connect(C),
|
||||
|
||||
%% Connection should stay up when client sends PING requests.
|
||||
timer:sleep(KeepaliveMs),
|
||||
|
||||
%% Mock the server socket to not have received any bytes.
|
||||
rabbit_ct_broker_helpers:setup_meck(Config),
|
||||
Mod = rabbit_net,
|
||||
ok = rpc(Config, 0, meck, new, [Mod, [no_link, passthrough]]),
|
||||
ok = rpc(Config, 0, meck, expect, [Mod, getstat, 2, {ok, [{recv_oct, 999}]} ]),
|
||||
|
||||
process_flag(trap_exit, true),
|
||||
receive
|
||||
{'EXIT', C, {shutdown, tcp_closed}} ->
|
||||
ok
|
||||
after
|
||||
ceil(3 * 0.75 * KeepaliveMs) ->
|
||||
ct:fail("server did not respect keepalive")
|
||||
end,
|
||||
%%TODO Introduce Prometheus counter for number of connections closed
|
||||
%% due to keepalive timeout and assert here that this counter is 1.
|
||||
|
||||
true = rpc(Config, 0, meck, validate, [Mod]),
|
||||
ok = rpc(Config, 0, meck, unload, [Mod]).
|
||||
|
||||
stats(Config) ->
|
||||
P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
|
||||
{ok, C} = emqtt:start_link([{host, "localhost"},
|
||||
|
|
|
|||
|
|
@ -24,20 +24,17 @@
|
|||
takeover/7]).
|
||||
|
||||
-record(state, {
|
||||
conn_name,
|
||||
parse_state,
|
||||
proc_state,
|
||||
state,
|
||||
conserve_resources,
|
||||
socket,
|
||||
peername,
|
||||
stats_timer,
|
||||
received_connect_frame
|
||||
}).
|
||||
|
||||
%%TODO Use 1 Erlang process per connection
|
||||
%% => remove rabbit_heartbeat processes
|
||||
%% => partly revert https://github.com/rabbitmq/rabbitmq-server/commit/9c153b2d405 ?
|
||||
conn_name,
|
||||
parse_state,
|
||||
proc_state,
|
||||
state,
|
||||
conserve_resources,
|
||||
socket,
|
||||
peername,
|
||||
stats_timer,
|
||||
received_connect_frame,
|
||||
keepalive :: rabbit_mqtt_keepalive:state()
|
||||
}).
|
||||
|
||||
%%TODO move from deprecated callback results to new callback results
|
||||
%% see cowboy_websocket.erl
|
||||
|
|
@ -71,11 +68,6 @@ init(Req, Opts) ->
|
|||
{PeerAddr, _PeerPort} = maps:get(peer, Req),
|
||||
SockInfo = maps:get(proxy_header, Req, undefined),
|
||||
WsOpts0 = proplists:get_value(ws_opts, Opts, #{}),
|
||||
%%TODO return idle_timeout?
|
||||
%% Do we need both MQTT Keepalives and WebSocket pings or is the latter just enough to determine
|
||||
%% when we need to close the connection?
|
||||
%% Check how other MQTT over WebSocket brokers handle it.
|
||||
%%
|
||||
%%TODO is compress needed?
|
||||
WsOpts = maps:merge(#{compress => true}, WsOpts0),
|
||||
Req2 = case cowboy_req:header(<<"sec-websocket-protocol">>, Req) of
|
||||
|
|
@ -177,21 +169,29 @@ websocket_info({'$gen_cast', QueueEvent = {queue_event, _, _}},
|
|||
stop(State#state{proc_state = PState})
|
||||
end;
|
||||
websocket_info({'$gen_cast', duplicate_id}, State = #state{ proc_state = ProcState,
|
||||
conn_name = ConnName }) ->
|
||||
rabbit_log_connection:warning("Web MQTT disconnecting a client with duplicate ID '~ts' (~tp)",
|
||||
conn_name = ConnName }) ->
|
||||
rabbit_log_connection:warning("Web MQTT disconnecting a client with duplicate ID '~s' (~p)",
|
||||
[rabbit_mqtt_processor:info(client_id, ProcState), ConnName]),
|
||||
stop(State);
|
||||
websocket_info({'$gen_cast', {close_connection, Reason}}, State = #state{ proc_state = ProcState,
|
||||
conn_name = ConnName }) ->
|
||||
rabbit_log_connection:warning("Web MQTT disconnecting client with ID '~ts' (~tp), reason: ~ts",
|
||||
conn_name = ConnName }) ->
|
||||
rabbit_log_connection:warning("Web MQTT disconnecting client with ID '~s' (~p), reason: ~s",
|
||||
[rabbit_mqtt_processor:info(client_id, ProcState), ConnName, Reason]),
|
||||
stop(State);
|
||||
websocket_info({start_keepalive, _Keepalive}, State) ->
|
||||
%%TODO use timer as done in rabbit_mqtt_reader
|
||||
{ok, State, hibernate};
|
||||
% websocket_info(keepalive_timeout, State = #state{conn_name = ConnStr}) ->
|
||||
% rabbit_log_connection:error("closing Web MQTT connection ~p (keepalive timeout)", [ConnStr]),
|
||||
% stop(State);
|
||||
websocket_info({keepalive, Req}, State = #state{keepalive = KState0,
|
||||
conn_name = ConnName}) ->
|
||||
case rabbit_mqtt_keepalive:handle(Req, KState0) of
|
||||
{ok, KState} ->
|
||||
{ok, State#state{keepalive = KState}, hibernate};
|
||||
{error, timeout} ->
|
||||
rabbit_log_connection:error("keepalive timeout in Web MQTT connection ~p",
|
||||
[ConnName]),
|
||||
stop(State, 1000, <<"MQTT keepalive timeout">>);
|
||||
{error, Reason} ->
|
||||
rabbit_log_connection:error("keepalive error in Web MQTT connection ~p: ~p",
|
||||
[ConnName, Reason]),
|
||||
stop(State)
|
||||
end;
|
||||
websocket_info(emit_stats, State) ->
|
||||
{ok, emit_stats(State), hibernate};
|
||||
websocket_info({ra_event, _From, Evt},
|
||||
|
|
@ -202,11 +202,15 @@ websocket_info(Msg, State) ->
|
|||
rabbit_log_connection:warning("Web MQTT: unexpected message ~p", [Msg]),
|
||||
{ok, State, hibernate}.
|
||||
|
||||
terminate(_, _, #state{ proc_state = undefined }) ->
|
||||
ok;
|
||||
terminate(_, _, State) ->
|
||||
_ = stop_rabbit_mqtt_processor(State),
|
||||
ok.
|
||||
terminate(_Reason, _Request,
|
||||
#state{conn_name = ConnName,
|
||||
proc_state = PState,
|
||||
keepalive = KState} = State) ->
|
||||
rabbit_log_connection:info("closing Web MQTT connection ~p (~s)", [self(), ConnName]),
|
||||
maybe_emit_stats(State),
|
||||
rabbit_mqtt_keepalive:cancel_timer(KState),
|
||||
ok = file_handle_cache:release(),
|
||||
stop_rabbit_mqtt_processor(PState).
|
||||
|
||||
%% Internal.
|
||||
|
||||
|
|
@ -255,28 +259,24 @@ handle_data1(Data, State = #state{ parse_state = ParseState,
|
|||
Other
|
||||
end.
|
||||
|
||||
stop(State) ->
|
||||
stop(State, 1000, "MQTT died").
|
||||
|
||||
stop(State, CloseCode, Error0) ->
|
||||
ok = file_handle_cache:release(),
|
||||
_ = stop_rabbit_mqtt_processor(State),
|
||||
Error1 = rabbit_data_coercion:to_binary(Error0),
|
||||
{[{close, CloseCode, Error1}], State}.
|
||||
|
||||
stop_with_framing_error(State, Error0, ConnStr) ->
|
||||
Error1 = rabbit_misc:format("~tp", [Error0]),
|
||||
rabbit_log_connection:error("MQTT detected framing error '~ts' for connection ~tp",
|
||||
[Error1, ConnStr]),
|
||||
stop(State, 1007, Error1).
|
||||
|
||||
stop_rabbit_mqtt_processor(State = #state{state = running,
|
||||
proc_state = ProcState,
|
||||
conn_name = ConnName}) ->
|
||||
maybe_emit_stats(State),
|
||||
rabbit_log_connection:info("closing Web MQTT connection ~tp (~ts)", [self(), ConnName]),
|
||||
rabbit_mqtt_processor:send_will(ProcState),
|
||||
rabbit_mqtt_processor:terminate(ProcState).
|
||||
stop(State) ->
|
||||
stop(State, 1000, "MQTT died").
|
||||
|
||||
stop(State, CloseCode, Error0) ->
|
||||
Error1 = rabbit_data_coercion:to_binary(Error0),
|
||||
{[{close, CloseCode, Error1}], State}.
|
||||
|
||||
stop_rabbit_mqtt_processor(undefined) ->
|
||||
ok;
|
||||
stop_rabbit_mqtt_processor(PState) ->
|
||||
rabbit_mqtt_processor:send_will(PState),
|
||||
rabbit_mqtt_processor:terminate(PState).
|
||||
|
||||
handle_credits(State0) ->
|
||||
case control_throttle(State0) of
|
||||
|
|
@ -286,14 +286,16 @@ handle_credits(State0) ->
|
|||
{ok, State}
|
||||
end.
|
||||
|
||||
control_throttle(State = #state{ state = CS,
|
||||
conserve_resources = Mem }) ->
|
||||
control_throttle(State = #state{state = CS,
|
||||
conserve_resources = Mem,
|
||||
keepalive = KState}) ->
|
||||
case {CS, Mem orelse credit_flow:blocked()} of
|
||||
%%TODO cancel / resume keepalive timer as done in rabbit_mqtt_reader
|
||||
{running, true} ->
|
||||
State #state{state = blocked};
|
||||
State#state{state = blocked,
|
||||
keepalive = rabbit_mqtt_keepalive:cancel_timer(KState)};
|
||||
{blocked,false} ->
|
||||
State #state{state = running};
|
||||
State#state{state = running,
|
||||
keepalive = rabbit_mqtt_keepalive:start_timer(KState)};
|
||||
{_, _} ->
|
||||
State
|
||||
end.
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ groups() ->
|
|||
, last_will_enabled
|
||||
, last_will_disabled
|
||||
, disconnect
|
||||
, keepalive
|
||||
]}
|
||||
].
|
||||
|
||||
|
|
@ -227,6 +228,33 @@ disconnect(Config) ->
|
|||
|
||||
ok.
|
||||
|
||||
keepalive(Config) ->
|
||||
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_web_mqtt),
|
||||
PortStr = integer_to_list(Port),
|
||||
WS = rfc6455_client:new("ws://127.0.0.1:" ++ PortStr ++ "/ws", self()),
|
||||
{ok, _} = rfc6455_client:open(WS),
|
||||
|
||||
KeepaliveSecs = 1,
|
||||
KeepaliveMs = timer:seconds(KeepaliveSecs),
|
||||
ok = raw_send(WS,
|
||||
?CONNECT_PACKET(
|
||||
#mqtt_packet_connect{
|
||||
keep_alive = KeepaliveSecs,
|
||||
clean_sess = true,
|
||||
client_id = <<"web-mqtt-tests-disconnect">>,
|
||||
username = <<"guest">>,
|
||||
password = <<"guest">>})),
|
||||
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv(WS),
|
||||
|
||||
%% Sanity check that MQTT ping request and ping response work.
|
||||
timer:sleep(KeepaliveMs),
|
||||
ok = raw_send(WS, #mqtt_packet{header = #mqtt_packet_header{type = ?PINGREQ}}),
|
||||
{ok, #mqtt_packet{header = #mqtt_packet_header{type = ?PINGRESP}}, <<>>} = raw_recv(WS),
|
||||
|
||||
%% Stop sending any data to the server (including ping requests).
|
||||
%% The server should disconnect us.
|
||||
?assertEqual({close, {1000, <<"MQTT keepalive timeout">>}},
|
||||
rfc6455_client:recv(WS, ceil(3 * 0.75 * KeepaliveMs))).
|
||||
|
||||
raw_send(WS, Packet) ->
|
||||
Frame = emqttc_serialiser:serialise(Packet),
|
||||
|
|
|
|||
Loading…
Reference in New Issue