rabbitmq-server/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl

323 lines
13 KiB
Erlang
Raw Normal View History

2012-06-27 00:57:24 +08:00
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
2013-07-01 17:49:14 +08:00
%% The Initial Developer of the Original Code is GoPivotal, Inc.
2016-01-01 17:59:18 +08:00
%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
2012-06-27 00:57:24 +08:00
%%
-module(rabbit_mqtt_reader).
-behaviour(gen_server2).
2012-06-27 00:57:24 +08:00
-export([start_link/3]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
code_change/3, terminate/2]).
2014-01-07 01:50:02 +08:00
-export([conserve_resources/3, start_keepalive/2]).
2012-06-27 00:57:24 +08:00
-export([ssl_login_name/1]).
2012-06-27 00:57:24 +08:00
-include_lib("amqp_client/include/amqp_client.hrl").
2012-09-12 21:34:41 +08:00
-include("rabbit_mqtt.hrl").
2012-06-27 00:57:24 +08:00
%%----------------------------------------------------------------------------
start_link(KeepaliveSup, Ref, Sock) ->
Pid = proc_lib:spawn_link(?MODULE, init,
[[KeepaliveSup, Ref, Sock]]),
%% In the event that somebody floods us with connections, the
%% reader processes can spew log events at error_logger faster
%% than it can keep up, causing its mailbox to grow unbounded
%% until we eat all the memory available and crash. So here is a
%% meaningless synchronous call to the underlying gen_event
%% mechanism. When it returns the mailbox is drained, and we
%% return to our caller to accept more connections.
gen_event:which_handlers(error_logger),
{ok, Pid}.
2012-06-27 00:57:24 +08:00
2013-02-28 20:27:23 +08:00
conserve_resources(Pid, _, Conserve) ->
Pid ! {conserve_resources, Conserve},
ok.
%%----------------------------------------------------------------------------
2012-06-27 00:57:24 +08:00
init([KeepaliveSup, Ref, Sock]) ->
2012-08-06 06:52:54 +08:00
process_flag(trap_exit, true),
rabbit_net:accept_ack(Ref, Sock),
case rabbit_net:connection_string(Sock, inbound) of
{ok, ConnStr} ->
2016-01-09 21:43:52 +08:00
log(debug, "MQTT accepting TCP connection ~p (~s)~n", [self(), ConnStr]),
rabbit_alarm:register(
self(), {?MODULE, conserve_resources, []}),
ProcessorState = rabbit_mqtt_processor:initial_state(Sock,ssl_login_name(Sock)),
gen_server2:enter_loop(?MODULE, [],
control_throttle(
2016-01-08 22:42:00 +08:00
#state{socket = Sock,
conn_name = ConnStr,
await_recv = false,
connection_state = running,
received_connect_frame = false,
keepalive = {none, none},
keepalive_sup = KeepaliveSup,
conserve = false,
parse_state = rabbit_mqtt_frame:initial_state(),
proc_state = ProcessorState }),
{backoff, 1000, 1000, 10000});
{network_error, Reason} ->
rabbit_net:fast_close(Sock),
terminate({shutdown, Reason}, undefined);
{error, enotconn} ->
rabbit_net:fast_close(Sock),
terminate(shutdown, undefined);
{error, Reason} ->
rabbit_net:fast_close(Sock),
terminate({network_error, Reason}, undefined)
end.
handle_call(Msg, From, State) ->
{stop, {mqtt_unexpected_call, Msg, From}, State}.
2012-08-06 06:52:54 +08:00
2013-11-14 21:48:14 +08:00
handle_cast(duplicate_id,
State = #state{ proc_state = PState,
conn_name = ConnName }) ->
2012-08-06 06:52:54 +08:00
log(warning, "MQTT disconnecting duplicate client id ~p (~p)~n",
[rabbit_mqtt_processor:info(client_id, PState), ConnName]),
2014-05-21 15:13:12 +08:00
{stop, {shutdown, duplicate_id}, State};
handle_cast(Msg, State) ->
2014-05-21 15:13:12 +08:00
{stop, {mqtt_unexpected_cast, Msg}, State}.
2014-08-11 18:26:27 +08:00
handle_info({#'basic.deliver'{}, #amqp_msg{}, _DeliveryCtx} = Delivery,
State = #state{ proc_state = ProcState }) ->
2014-08-11 18:26:27 +08:00
callback_reply(State, rabbit_mqtt_processor:amqp_callback(Delivery,
ProcState));
handle_info(#'basic.ack'{} = Ack, State = #state{ proc_state = ProcState }) ->
callback_reply(State, rabbit_mqtt_processor:amqp_callback(Ack, ProcState));
2012-08-06 06:52:54 +08:00
handle_info(#'basic.consume_ok'{}, State) ->
{noreply, State, hibernate};
2012-08-06 06:52:54 +08:00
handle_info(#'basic.cancel'{}, State) ->
2014-05-21 15:13:12 +08:00
{stop, {shutdown, subscription_cancelled}, State};
handle_info({'EXIT', _Conn, Reason}, State) ->
2014-05-21 15:13:12 +08:00
{stop, {connection_died, Reason}, State};
handle_info({inet_reply, _Ref, ok}, State) ->
{noreply, State, hibernate};
handle_info({inet_async, Sock, _Ref, {ok, Data}},
State = #state{ socket = Sock }) ->
2012-08-06 06:52:54 +08:00
process_received_bytes(
2016-01-08 22:21:08 +08:00
Data, control_throttle(State #state{ await_recv = false }));
2012-08-06 06:52:54 +08:00
handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State = #state {}) ->
network_error(Reason, State);
handle_info({inet_reply, _Sock, {error, Reason}}, State = #state {}) ->
network_error(Reason, State);
2012-08-06 06:52:54 +08:00
handle_info({conserve_resources, Conserve}, State) ->
{noreply, control_throttle(State #state{ conserve = Conserve }), hibernate};
2012-06-27 00:57:24 +08:00
2012-08-06 06:52:54 +08:00
handle_info({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
{noreply, control_throttle(State), hibernate};
2012-06-27 00:57:24 +08:00
2014-01-20 18:33:27 +08:00
handle_info({start_keepalives, Keepalive},
State = #state { keepalive_sup = KeepaliveSup, socket = Sock }) ->
%% Only the client has the responsibility for sending keepalives
2014-01-07 01:50:02 +08:00
SendFun = fun() -> ok end,
Parent = self(),
ReceiveFun = fun() -> Parent ! keepalive_timeout end,
2014-01-20 18:33:27 +08:00
Heartbeater = rabbit_heartbeat:start(
KeepaliveSup, Sock, 0, SendFun, Keepalive, ReceiveFun),
2014-01-07 01:50:02 +08:00
{noreply, State #state { keepalive = Heartbeater }};
handle_info(keepalive_timeout, State = #state {conn_name = ConnStr,
proc_state = PState}) ->
2014-01-07 01:50:02 +08:00
log(error, "closing MQTT connection ~p (keepalive timeout)~n", [ConnStr]),
send_will_and_terminate(PState, {shutdown, keepalive_timeout}, State);
2014-01-07 01:50:02 +08:00
handle_info(Msg, State) ->
2014-05-21 15:13:12 +08:00
{stop, {mqtt_unexpected_msg, Msg}, State}.
2012-06-27 00:57:24 +08:00
terminate({network_error, {ssl_upgrade_error, closed}, ConnStr}, _State) ->
log(error, "MQTT detected TLS upgrade error on ~s: connection closed~n",
[ConnStr]);
terminate({network_error,
{ssl_upgrade_error,
{tls_alert, "handshake failure"}}, ConnStr}, _State) ->
log(error, "MQTT detected TLS upgrade error on ~s: handshake failure~n",
[ConnStr]);
terminate({network_error,
{ssl_upgrade_error,
{tls_alert, "unknown ca"}}, ConnStr}, _State) ->
log(error, "MQTT detected TLS certificate verification error on ~s: alert 'unknown CA'~n",
[ConnStr]);
terminate({network_error,
{ssl_upgrade_error,
{tls_alert, Alert}}, ConnStr}, _State) ->
log(error, "MQTT detected TLS upgrade error on ~s: alert ~s~n",
[ConnStr, Alert]);
terminate({network_error, {ssl_upgrade_error, Reason}, ConnStr}, _State) ->
log(error, "MQTT detected TLS upgrade error on ~s: ~p~n",
[ConnStr, Reason]);
terminate({network_error, Reason, ConnStr}, _State) ->
log(error, "MQTT detected network error on ~s: ~p~n",
[ConnStr, Reason]);
terminate({network_error, Reason}, _State) ->
log(error, "MQTT detected network error: ~p~n", [Reason]);
2014-07-04 16:10:00 +08:00
terminate(normal, #state{proc_state = ProcState,
conn_name = ConnName}) ->
rabbit_mqtt_processor:close_connection(ProcState),
log(info, "closing MQTT connection ~p (~s)~n", [self(), ConnName]),
ok;
2014-07-04 16:10:00 +08:00
terminate(_Reason, #state{proc_state = ProcState}) ->
2014-05-21 15:13:12 +08:00
rabbit_mqtt_processor:close_connection(ProcState),
2012-08-06 06:52:54 +08:00
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
ssl_login_name(Sock) ->
case rabbit_net:peercert(Sock) of
{ok, C} -> case rabbit_ssl:peer_cert_auth_name(C) of
unsafe -> none;
not_found -> none;
Name -> Name
end;
{error, no_peercert} -> none;
nossl -> none
end.
2012-08-06 06:52:54 +08:00
%%----------------------------------------------------------------------------
2012-06-27 00:57:24 +08:00
2016-01-08 22:21:08 +08:00
log_new_connection(#state{conn_name = ConnStr}) ->
log(info, "accepting MQTT connection ~p (~s)~n", [self(), ConnStr]).
2016-01-08 22:42:00 +08:00
process_received_bytes(<<>>, State = #state{proc_state = ProcState,
received_connect_frame = false}) ->
2016-01-08 22:21:08 +08:00
MqttConn = ProcState#proc_state.connection,
case MqttConn of
undefined -> ok;
_ -> log_new_connection(State)
end,
2016-01-08 22:42:00 +08:00
{noreply, State#state{ received_connect_frame = true }, hibernate};
process_received_bytes(<<>>, State) ->
{noreply, State, hibernate};
process_received_bytes(Bytes,
State = #state{ parse_state = ParseState,
proc_state = ProcState,
conn_name = ConnStr }) ->
2013-02-28 20:27:23 +08:00
case rabbit_mqtt_frame:parse(Bytes, ParseState) of
{more, ParseState1} ->
{noreply,
control_throttle( State #state{ parse_state = ParseState1 }),
hibernate};
{ok, Frame, Rest} ->
case rabbit_mqtt_processor:process_frame(Frame, ProcState) of
{ok, ProcState1} ->
PS = rabbit_mqtt_frame:initial_state(),
process_received_bytes(
Rest,
State #state{ parse_state = PS,
proc_state = ProcState1 });
{error, Reason, ProcState1} ->
2013-02-28 20:27:23 +08:00
log(info, "MQTT protocol error ~p for connection ~p~n",
[Reason, ConnStr]),
2014-05-21 15:13:12 +08:00
{stop, {shutdown, Reason}, pstate(State, ProcState1)};
{error, Error} ->
log(error, "MQTT detected framing error '~p' for connection ~p~n",
[Error, ConnStr]),
{stop, {shutdown, Error}, State};
2013-02-28 20:27:23 +08:00
{stop, ProcState1} ->
2014-05-21 15:13:12 +08:00
{stop, normal, pstate(State, ProcState1)}
2013-02-28 20:27:23 +08:00
end;
{error, Error} ->
log(error, "MQTT detected framing error '~p' for connection ~p~n",
2013-02-28 20:27:23 +08:00
[ConnStr, Error]),
2014-05-21 15:13:12 +08:00
{stop, {shutdown, Error}, State}
2012-06-27 00:57:24 +08:00
end.
callback_reply(State, {ok, ProcState}) ->
{noreply, pstate(State, ProcState), hibernate};
2014-05-25 14:19:01 +08:00
callback_reply(State, {error, Reason, ProcState}) ->
{stop, Reason, pstate(State, ProcState)}.
2014-01-07 01:50:02 +08:00
start_keepalive(_, 0 ) -> ok;
2014-01-20 18:33:27 +08:00
start_keepalive(Pid, Keepalive) -> Pid ! {start_keepalives, Keepalive}.
2014-01-07 01:50:02 +08:00
pstate(State = #state {}, PState = #proc_state{}) ->
State #state{ proc_state = PState }.
2012-06-27 00:57:24 +08:00
%%----------------------------------------------------------------------------
2013-02-28 20:27:23 +08:00
log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).
send_will_and_terminate(PState, State) ->
send_will_and_terminate(PState, {shutdown, conn_closed}, State).
send_will_and_terminate(PState, Reason, State) ->
rabbit_mqtt_processor:send_will(PState),
% todo: flush channel after publish
{stop, Reason, State}.
network_error(closed,
State = #state{ conn_name = ConnStr,
2016-01-08 22:21:08 +08:00
proc_state = PState }) ->
MqttConn = PState#proc_state.connection,
log(case MqttConn of
undefined -> debug;
_ -> info
end,
"MQTT detected network error for ~p: peer closed TCP connection~n",
[ConnStr]),
send_will_and_terminate(PState, State);
network_error(Reason,
State = #state{ conn_name = ConnStr,
proc_state = PState }) ->
2013-02-27 19:01:09 +08:00
log(info, "MQTT detected network error for ~p: ~p~n", [ConnStr, Reason]),
send_will_and_terminate(PState, State).
2012-06-27 00:57:24 +08:00
run_socket(State = #state{ connection_state = blocked }) ->
2012-08-06 06:52:54 +08:00
State;
run_socket(State = #state{ await_recv = true }) ->
State;
run_socket(State = #state{ socket = Sock }) ->
rabbit_net:async_recv(Sock, 0, infinity),
State#state{ await_recv = true }.
control_throttle(State = #state{ connection_state = Flow,
conserve = Conserve }) ->
2012-08-06 06:52:54 +08:00
case {Flow, Conserve orelse credit_flow:blocked()} of
2014-01-07 01:50:02 +08:00
{running, true} -> ok = rabbit_heartbeat:pause_monitor(
State#state.keepalive),
State #state{ connection_state = blocked };
{blocked, false} -> ok = rabbit_heartbeat:resume_monitor(
State#state.keepalive),
run_socket(State #state{
connection_state = running });
2012-08-06 06:52:54 +08:00
{_, _} -> run_socket(State)
2012-07-17 05:52:53 +08:00
end.