Merge pull request #66 from rabbitmq/rabbitmq-stomp-55

Fill most management UI info
This commit is contained in:
Michael Klishin 2016-03-04 14:46:55 +03:00
commit ae376b5a27
2 changed files with 72 additions and 28 deletions

View File

@ -110,7 +110,18 @@ flush_and_die(State) ->
close_connection(State).
initial_state(Configuration,
{SendFun, ReceiveFun, AdapterInfo, StartHeartbeatFun, SSLLoginName, PeerAddr}) ->
{SendFun, ReceiveFun, AdapterInfo0 = #amqp_adapter_info{additional_info = Extra},
StartHeartbeatFun, SSLLoginName, PeerAddr}) ->
%% STOMP connections use exactly one channel. The frame max is not
%% applicable and there is no way to know what client is used.
AdapterInfo = AdapterInfo0#amqp_adapter_info{additional_info=[
{channels, 1},
{channel_max, 1},
{frame_max, 0},
%% TODO: can we use a header to make it possible for clients
%% to override this value?
{client_properties, [{<<"product">>, longstr, <<"STOMP client">>}]}
|Extra]},
#proc_state {
send_fun = SendFun,
receive_fun = ReceiveFun,
@ -140,7 +151,7 @@ command({"CONNECT", Frame}, State) ->
command(Request, State = #proc_state{channel = none,
config = #stomp_configuration{
implicit_connect = true}}) ->
{ok, State1 = #proc_state{channel = Ch}} =
{ok, State1 = #proc_state{channel = Ch}, _} =
process_connect(implicit, #stomp_frame{headers = []}, State),
case Ch of
none -> {stop, normal, State1};
@ -152,7 +163,7 @@ command(_Request, State = #proc_state{channel = none,
implicit_connect = false}}) ->
{ok, send_error("Illegal command",
"You must log in using CONNECT first",
State)};
State), none};
command({Command, Frame}, State = #proc_state{frame_transformer = FT}) ->
Frame1 = FT(Frame),
@ -195,7 +206,7 @@ process_request(ProcessFun, State) ->
process_request(ProcessFun, fun (StateM) -> StateM end, State).
process_request(ProcessFun, SuccessFun, State) ->
process_request(ProcessFun, SuccessFun, State=#proc_state{connection=Conn}) ->
Res = case catch ProcessFun(State) of
{'EXIT',
{{shutdown,
@ -213,9 +224,9 @@ process_request(ProcessFun, SuccessFun, State) ->
none -> ok;
_ -> send_frame(Frame, NewState)
end,
{ok, SuccessFun(NewState)};
{ok, SuccessFun(NewState), Conn};
{error, Message, Detail, NewState} ->
{ok, send_error(Message, Detail, NewState)};
{ok, send_error(Message, Detail, NewState), Conn};
{stop, normal, NewState} ->
{stop, normal, SuccessFun(NewState)};
{stop, R, NewState} ->

View File

@ -27,8 +27,8 @@
-include_lib("amqp_client/include/amqp_client.hrl").
-record(reader_state, {socket, conn_name, parse_state, processor_state, state,
conserve_resources, recv_outstanding,
parent}).
conserve_resources, recv_outstanding, stats_timer,
parent, connection}).
%%----------------------------------------------------------------------------
@ -65,14 +65,15 @@ init([SupHelperPid, Ref, Sock, Configuration]) ->
ParseState = rabbit_stomp_frame:initial_state(),
register_resource_alarm(),
gen_server2:enter_loop(?MODULE, [],
run_socket(control_throttle(
#reader_state{socket = Sock,
conn_name = ConnStr,
parse_state = ParseState,
processor_state = ProcState,
state = running,
conserve_resources = false,
recv_outstanding = false})),
rabbit_event:init_stats_timer(
run_socket(control_throttle(
#reader_state{socket = Sock,
conn_name = ConnStr,
parse_state = ParseState,
processor_state = ProcState,
state = running,
conserve_resources = false,
recv_outstanding = false})), #reader_state.stats_timer),
{backoff, 1000, 1000, 10000});
{network_error, Reason} ->
rabbit_net:fast_close(Sock),
@ -97,10 +98,10 @@ handle_cast(Msg, State) ->
handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
case process_received_bytes(Data, State#reader_state{recv_outstanding = false}) of
{ok, NewState} ->
{noreply, run_socket(control_throttle(NewState)), hibernate};
{stop, Reason, NewState} ->
{stop, Reason, NewState}
{ok, NewState} ->
{noreply, ensure_stats_timer(run_socket(control_throttle(NewState))), hibernate};
{stop, Reason, NewState} ->
{stop, Reason, NewState}
end;
handle_info({inet_async, _Sock, _Ref, {error, closed}}, State) ->
{stop, normal, State};
@ -112,6 +113,8 @@ handle_info({inet_reply, _, ok}, State) ->
{noreply, State, hibernate};
handle_info({inet_reply, _, Status}, State) ->
{stop, Status, State};
handle_info(emit_stats, State) ->
{noreply, emit_stats(State), hibernate};
handle_info({conserve_resources, Conserve}, State) ->
NewState = State#reader_state{conserve_resources = Conserve},
{noreply, run_socket(control_throttle(NewState)), hibernate};
@ -150,10 +153,10 @@ handle_info({Delivery = #'basic.deliver'{},
handle_info(#'basic.cancel'{consumer_tag = Ctag}, State) ->
ProcState = processor_state(State),
case rabbit_stomp_processor:cancel_consumer(Ctag, ProcState) of
{ok, NewProcState} ->
{noreply, processor_state(NewProcState, State), hibernate};
{stop, Reason, NewProcState} ->
{stop, Reason, processor_state(NewProcState, State)}
{ok, NewProcState, _} ->
{noreply, processor_state(NewProcState, State), hibernate};
{stop, Reason, NewProcState} ->
{stop, Reason, processor_state(NewProcState, State)}
end;
%%----------------------------------------------------------------------------
@ -179,11 +182,12 @@ process_received_bytes(Bytes,
{ok, State#reader_state{parse_state = ParseState1}};
{ok, Frame, Rest} ->
case rabbit_stomp_processor:process_frame(Frame, ProcState) of
{ok, NewProcState} ->
{ok, NewProcState, Conn} ->
PS = rabbit_stomp_frame:initial_state(),
process_received_bytes(Rest, State#reader_state{
processor_state = NewProcState,
parse_state = PS,
connection = Conn,
state = next_state(S, Frame)});
{stop, Reason, NewProcState} ->
{stop, Reason,
@ -231,9 +235,10 @@ run_socket(State = #reader_state{socket = Sock}) ->
terminate(Reason, State = #reader_state{ processor_state = ProcState }) ->
log_reason(Reason, State),
rabbit_stomp_processor:flush_and_die(ProcState),
ok.
maybe_emit_stats(State),
log_reason(Reason, State),
rabbit_stomp_processor:flush_and_die(ProcState),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@ -327,6 +332,34 @@ ssl_login_name(Sock, #stomp_configuration{ssl_cert_login = true}) ->
%%----------------------------------------------------------------------------
maybe_emit_stats(State) ->
rabbit_event:if_enabled(State, #reader_state.stats_timer,
fun() -> emit_stats(State) end).
emit_stats(State=#reader_state{socket = Sock, state = ConnState, connection = Conn}) ->
SockInfos = case rabbit_net:getstat(Sock,
[recv_oct, recv_cnt, send_oct, send_cnt, send_pend]) of
{ok, SI} -> SI;
{error, _} -> []
end,
Infos = [{pid, Conn}, {state, ConnState} | SockInfos],
rabbit_event:notify(connection_stats, Infos),
State1 = rabbit_event:reset_stats_timer(State, #reader_state.stats_timer),
%% If we emit an event which looks like we are in flow control, it's not a
%% good idea for it to be our last even if we go idle. Keep emitting
%% events, either we stay busy or we drop out of flow control.
case ConnState of
flow -> ensure_stats_timer(State1);
_ -> State1
end.
ensure_stats_timer(State = #reader_state{state = running}) ->
rabbit_event:ensure_stats_timer(State, #reader_state.stats_timer, emit_stats);
ensure_stats_timer(State) ->
State.
%%----------------------------------------------------------------------------
processor_state(#reader_state{ processor_state = ProcState }) -> ProcState.
processor_state(ProcState, #reader_state{} = State) ->