Merge pull request #97 from rabbitmq/rabbitmq-web-stomp-40

Implement credits flow
This commit is contained in:
Michael Klishin 2018-12-05 21:29:14 +03:00 committed by GitHub
commit 50e1e48f10
1 changed files with 69 additions and 26 deletions

View File

@ -18,6 +18,7 @@
-behaviour(cowboy_websocket).
-include_lib("rabbitmq_stomp/include/rabbit_stomp.hrl").
-include_lib("rabbitmq_stomp/include/rabbit_stomp_frame.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
%% Websocket.
@ -34,6 +35,8 @@
heartbeat_sup,
parse_state,
proc_state,
state,
conserve_resources,
socket,
peername,
auth_hd,
@ -65,13 +68,15 @@ init(Req0, Opts) ->
WsOpts0 = proplists:get_value(ws_opts, Opts, #{}),
WsOpts = maps:merge(#{compress => true}, WsOpts0),
{cowboy_websocket, Req, #state{
frame_type = proplists:get_value(type, Opts, text),
heartbeat_sup = KeepaliveSup,
heartbeat = {none, none},
heartbeat_mode = heartbeat,
socket = Sock,
peername = PeerAddr,
auth_hd = cowboy_req:header(<<"authorization">>, Req)
frame_type = proplists:get_value(type, Opts, text),
heartbeat_sup = KeepaliveSup,
heartbeat = {none, none},
heartbeat_mode = heartbeat,
state = running,
conserve_resources = false,
socket = Sock,
peername = PeerAddr,
auth_hd = cowboy_req:header(<<"authorization">>, Req)
}, WsOpts}.
websocket_init(State) ->
@ -112,11 +117,7 @@ init_processor_state(#state{socket=Sock, peername=PeerAddr, auth_hd=AuthHd}) ->
StompConfig0
end,
AdapterInfo0 = #amqp_adapter_info{additional_info=Extra}
= amqp_connection:socket_adapter_info(Sock, {'Web STOMP', 0}),
%% Flow control is not supported for Web-STOMP connections.
AdapterInfo = AdapterInfo0#amqp_adapter_info{
additional_info=[{state, running}|Extra]},
AdapterInfo = amqp_connection:socket_adapter_info(Sock, {'Web STOMP', 0}),
ProcessorState = rabbit_stomp_processor:initial_state(
StompConfig,
@ -133,11 +134,12 @@ websocket_handle(_Frame, State) ->
websocket_info({send, Msg}, State=#state{frame_type=FrameType}) ->
{reply, {FrameType, Msg}, State};
%% TODO this is a bit rubbish - after the preview release we should
%% make the credit_flow:send/1 invocation in
%% rabbit_stomp_processor:process_frame/2 optional.
websocket_info({bump_credit, {_, _}}, State) ->
{ok, State};
websocket_info({conserve_resources, Conserve}, State) ->
NewState = State#state{conserve_resources = Conserve},
handle_credits(control_throttle(NewState));
websocket_info({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
handle_credits(control_throttle(State));
websocket_info(#'basic.consume_ok'{}, State) ->
{ok, State};
@ -225,9 +227,17 @@ filter_stomp_protocols(Protocols) ->
%%----------------------------------------------------------------------------
handle_data(<<>>, State) ->
handle_data(Data, State0) ->
case handle_data1(Data, State0) of
{ok, State1 = #state{state = blocked}} ->
{[{active, false}], State1};
Other ->
Other
end.
handle_data1(<<>>, State) ->
{ok, ensure_stats_timer(State)};
handle_data(Bytes, State = #state{proc_state = ProcState,
handle_data1(Bytes, State = #state{proc_state = ProcState,
parse_state = ParseState}) ->
case rabbit_stomp_frame:parse(Bytes, ParseState) of
{more, ParseState1} ->
@ -236,17 +246,24 @@ handle_data(Bytes, State = #state{proc_state = ProcState,
case rabbit_stomp_processor:process_frame(Frame, ProcState) of
{ok, ProcState1, ConnPid} ->
ParseState1 = rabbit_stomp_frame:initial_state(),
handle_data(
State1 = maybe_block(State, Frame),
handle_data1(
Rest,
State #state{ parse_state = ParseState1,
proc_state = ProcState1,
connection = ConnPid });
State1 #state{ parse_state = ParseState1,
proc_state = ProcState1,
connection = ConnPid });
{stop, _Reason, ProcState1} ->
io:format(user, "~p~n", [_Reason]),
stop(State#state{ proc_state = ProcState1 })
end
end.
maybe_block(State = #state{state = blocking, heartbeat = Heartbeat},
#stomp_frame{command = "SEND"}) ->
rabbit_heartbeat:pause_monitor(Heartbeat),
State#state{state = blocked};
maybe_block(State, _) ->
State.
stop(State = #state{proc_state = ProcState}) ->
maybe_emit_stats(State),
ok = file_handle_cache:release(),
@ -255,6 +272,32 @@ stop(State = #state{proc_state = ProcState}) ->
%%----------------------------------------------------------------------------
handle_credits(State0) ->
case control_throttle(State0) of
State = #state{state = running} ->
{[{active, true}], State};
State ->
{ok, State}
end.
control_throttle(State = #state{state = CS,
conserve_resources = Mem}) ->
case {CS, Mem orelse credit_flow:blocked()} of
{running, true} -> blocking(State);
{blocking, false} -> running(State);
{blocked, false} -> running(State);
{_, _} -> State
end.
blocking(State) ->
State#state{state = blocking}.
running(State = #state{heartbeat=Heartbeat}) ->
rabbit_heartbeat:resume_monitor(Heartbeat),
State#state{state = running}.
%%----------------------------------------------------------------------------
ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #state.stats_timer, emit_stats).
@ -267,13 +310,13 @@ emit_stats(State=#state{connection = C}) when C == none; C == undefined ->
%% established, as this causes orphan entries on the stats database
State1 = rabbit_event:reset_stats_timer(State, #state.stats_timer),
State1;
emit_stats(State=#state{socket=Sock, connection=Conn}) ->
emit_stats(State=#state{socket=Sock, state=RunningState, connection=Conn}) ->
SockInfos = case rabbit_net:getstat(Sock,
[recv_oct, recv_cnt, send_oct, send_cnt, send_pend]) of
{ok, SI} -> SI;
{error, _} -> []
end,
Infos = [{pid, Conn}|SockInfos],
Infos = [{pid, Conn}, {state, RunningState}|SockInfos],
rabbit_core_metrics:connection_stats(Conn, Infos),
rabbit_event:notify(connection_stats, Infos),
State1 = rabbit_event:reset_stats_timer(State, #state.stats_timer),