Log MQTT connection.

This commit is contained in:
Daniil Fedotov 2016-01-08 14:21:08 +00:00 committed by Michael Klishin
parent 954016c69f
commit 17e4c707c6
2 changed files with 29 additions and 41 deletions

View File

@ -20,8 +20,7 @@
-record(state, { socket, -record(state, { socket,
conn_name, conn_name,
await_recv, await_recv,
no_data_received, protocol_connected,
connected_at,
connection_state, connection_state,
keepalive, keepalive,
keepalive_sup, keepalive_sup,

View File

@ -56,23 +56,22 @@ init([KeepaliveSup, Ref, Sock]) ->
rabbit_net:accept_ack(Ref, Sock), rabbit_net:accept_ack(Ref, Sock),
case rabbit_net:connection_string(Sock, inbound) of case rabbit_net:connection_string(Sock, inbound) of
{ok, ConnStr} -> {ok, ConnStr} ->
log(debug, "incoming MQTT connection ~p (~s)~n", [self(), ConnStr]), log(debug, "accepting MQTT TCP connection ~p (~s)~n", [self(), ConnStr]),
rabbit_alarm:register( rabbit_alarm:register(
self(), {?MODULE, conserve_resources, []}), self(), {?MODULE, conserve_resources, []}),
ProcessorState = rabbit_mqtt_processor:initial_state(Sock,ssl_login_name(Sock)), ProcessorState = rabbit_mqtt_processor:initial_state(Sock,ssl_login_name(Sock)),
gen_server2:enter_loop(?MODULE, [], gen_server2:enter_loop(?MODULE, [],
control_throttle( control_throttle(
#state{socket = Sock, #state{socket = Sock,
conn_name = ConnStr, conn_name = ConnStr,
await_recv = false, await_recv = false,
connection_state = running, connection_state = running,
no_data_received = true, protocol_connected = false,
connected_at = time_compat:os_system_time(seconds), keepalive = {none, none},
keepalive = {none, none}, keepalive_sup = KeepaliveSup,
keepalive_sup = KeepaliveSup, conserve = false,
conserve = false, parse_state = rabbit_mqtt_frame:initial_state(),
parse_state = rabbit_mqtt_frame:initial_state(), proc_state = ProcessorState }),
proc_state = ProcessorState }),
{backoff, 1000, 1000, 10000}); {backoff, 1000, 1000, 10000});
{network_error, Reason} -> {network_error, Reason} ->
rabbit_net:fast_close(Sock), rabbit_net:fast_close(Sock),
@ -85,24 +84,6 @@ init([KeepaliveSup, Ref, Sock]) ->
terminate({network_error, Reason}, undefined) terminate({network_error, Reason}, undefined)
end. end.
log_new_connection(State) -> log_new_connection(State, accepted).
log_new_connection(#state{no_data_received = false}, _) -> ok;
log_new_connection(#state{conn_name = ConnStr,
connected_at = ConnectionTime},
LogReason) ->
BaseDate = calendar:datetime_to_gregorian_seconds({{1970, 1, 1},
{0, 0, 0}}),
{{Year, Month, Day}, {Hour, Min, Sec}} =
calendar:gregorian_seconds_to_datetime(BaseDate + ConnectionTime),
log(case LogReason of
closed -> debug;
accepted -> info
end,
"new MQTT connection ~p (~s) - "
"accepted at ~b-~2..0b-~2..0b::~2..0b:~2..0b:~2..0b~n",
[self(), ConnStr, Year, Month, Day, Hour, Min, Sec]).
handle_call(Msg, From, State) -> handle_call(Msg, From, State) ->
{stop, {mqtt_unexpected_call, Msg, From}, State}. {stop, {mqtt_unexpected_call, Msg, From}, State}.
@ -138,10 +119,8 @@ handle_info({inet_reply, _Ref, ok}, State) ->
handle_info({inet_async, Sock, _Ref, {ok, Data}}, handle_info({inet_async, Sock, _Ref, {ok, Data}},
State = #state{ socket = Sock }) -> State = #state{ socket = Sock }) ->
log_new_connection(State),
process_received_bytes( process_received_bytes(
Data, control_throttle(State #state{ await_recv = false, Data, control_throttle(State #state{ await_recv = false }));
no_data_received = false }));
handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State = #state {}) -> handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State = #state {}) ->
network_error(Reason, State); network_error(Reason, State);
@ -233,6 +212,17 @@ ssl_login_name(Sock) ->
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
log_new_connection(#state{conn_name = ConnStr}) ->
log(info, "accepting MQTT connection ~p (~s)~n", [self(), ConnStr]).
process_received_bytes(<<>>, State = #state{ proc_state = ProcState,
protocol_connected = false } ) ->
MqttConn = ProcState#proc_state.connection,
case MqttConn of
undefined -> ok;
_ -> log_new_connection(State)
end,
{noreply, State#state{ protocol_connected = true }, hibernate};
process_received_bytes(<<>>, State) -> process_received_bytes(<<>>, State) ->
{noreply, State, hibernate}; {noreply, State, hibernate};
process_received_bytes(Bytes, process_received_bytes(Bytes,
@ -294,12 +284,11 @@ send_will_and_terminate(PState, Reason, State) ->
network_error(closed, network_error(closed,
State = #state{ conn_name = ConnStr, State = #state{ conn_name = ConnStr,
proc_state = PState, proc_state = PState }) ->
no_data_received = NoDataReceived }) -> MqttConn = PState#proc_state.connection,
log_new_connection(State, closed), log(case MqttConn of
log(case NoDataReceived of undefined -> debug;
true -> debug; _ -> info
false -> info
end, end,
"MQTT detected network error for ~p: peer closed TCP connection~n", "MQTT detected network error for ~p: peer closed TCP connection~n",
[ConnStr]), [ConnStr]),