2012-06-27 00:57:24 +08:00
|
|
|
%% The contents of this file are subject to the Mozilla Public License
|
|
|
|
|
%% Version 1.1 (the "License"); you may not use this file except in
|
|
|
|
|
%% compliance with the License. You may obtain a copy of the License
|
|
|
|
|
%% at http://www.mozilla.org/MPL/
|
|
|
|
|
%%
|
|
|
|
|
%% Software distributed under the License is distributed on an "AS IS"
|
|
|
|
|
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
|
|
|
|
|
%% the License for the specific language governing rights and
|
|
|
|
|
%% limitations under the License.
|
|
|
|
|
%%
|
|
|
|
|
%% The Original Code is RabbitMQ.
|
|
|
|
|
%%
|
|
|
|
|
%% The Initial Developer of the Original Code is VMware, Inc.
|
|
|
|
|
%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
|
|
|
|
|
%%
|
|
|
|
|
|
|
|
|
|
-module(rabbit_mqtt_processor).
|
|
|
|
|
-behaviour(gen_server2).
|
|
|
|
|
|
2012-07-03 16:55:31 +08:00
|
|
|
-export([start_link/1, process_frame/2, flush_and_die/1]).
|
2012-06-27 00:57:24 +08:00
|
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
|
|
|
|
code_change/3, terminate/2]).
|
|
|
|
|
|
|
|
|
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
2012-07-03 16:55:31 +08:00
|
|
|
-include("include/rabbit_mqtt_frame.hrl").
|
2012-06-27 00:57:24 +08:00
|
|
|
|
2012-07-04 00:35:18 +08:00
|
|
|
-record(state, {
|
|
|
|
|
client_id,
|
2012-07-04 23:31:30 +08:00
|
|
|
message_id,
|
2012-07-03 16:55:31 +08:00
|
|
|
channel,
|
|
|
|
|
connection,
|
|
|
|
|
adapter_info,
|
|
|
|
|
send_fun
|
|
|
|
|
}).
|
2012-06-27 00:57:24 +08:00
|
|
|
|
2012-07-06 01:08:23 +08:00
|
|
|
-define(DEFAULT_EXCHANGE, <<"amq.topic">>).
|
2012-07-03 16:55:31 +08:00
|
|
|
-define(MQTT_PROTOCOL_VERSION, 3).
|
2012-06-27 00:57:24 +08:00
|
|
|
-define(FLUSH_TIMEOUT, 60000).
|
|
|
|
|
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
%% Public API
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
start_link(Args) ->
|
|
|
|
|
gen_server2:start_link(?MODULE, Args, []).
|
|
|
|
|
|
2012-07-03 16:55:31 +08:00
|
|
|
process_frame(Pid, Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBLISH }} ) ->
|
|
|
|
|
credit_flow:send(Pid),
|
|
|
|
|
gen_server2:cast(Pid, {?PUBLISH, Frame, self()});
|
|
|
|
|
process_frame(Pid, Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }} ) ->
|
|
|
|
|
gen_server2:cast(Pid, {Type, Frame, noflow}).
|
|
|
|
|
|
|
|
|
|
flush_and_die(Pid) ->
|
|
|
|
|
gen_server2:cast(Pid, flush_and_die).
|
|
|
|
|
|
2012-06-27 00:57:24 +08:00
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
%% Basic gen_server2 callbacks
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
|
2012-07-05 00:47:07 +08:00
|
|
|
init([SendFun, AdapterInfo, _Configuration]) ->
|
2012-06-27 00:57:24 +08:00
|
|
|
process_flag(trap_exit, true),
|
|
|
|
|
{ok,
|
|
|
|
|
#state {
|
2012-07-04 00:35:18 +08:00
|
|
|
client_id = none,
|
2012-07-04 23:31:30 +08:00
|
|
|
message_id = 0,
|
2012-07-03 16:55:31 +08:00
|
|
|
channel = none,
|
|
|
|
|
connection = none,
|
|
|
|
|
adapter_info = AdapterInfo,
|
|
|
|
|
send_fun = SendFun
|
|
|
|
|
},
|
2012-06-27 00:57:24 +08:00
|
|
|
hibernate,
|
|
|
|
|
{backoff, 1000, 1000, 10000}
|
|
|
|
|
}.
|
|
|
|
|
|
2012-07-03 16:55:31 +08:00
|
|
|
terminate(_Reason, State) ->
|
|
|
|
|
close_connection(State).
|
|
|
|
|
|
|
|
|
|
handle_cast(flush_and_die, State) ->
|
|
|
|
|
{stop, normal, close_connection(State)};
|
|
|
|
|
|
|
|
|
|
handle_cast({?CONNECT, Frame, noflow}, State) ->
|
|
|
|
|
process_connect(Frame, State);
|
|
|
|
|
|
|
|
|
|
handle_cast({_Type, Frame, _FlowPid}, State = #state{channel = none}) ->
|
2012-07-04 23:31:30 +08:00
|
|
|
rabbit_log:error("Ignoring invalid MQTT frame prior to CONNECT: ~p~n",
|
|
|
|
|
[Frame]),
|
2012-07-03 16:55:31 +08:00
|
|
|
{noreply, State, hibernate};
|
|
|
|
|
|
2012-07-04 00:35:18 +08:00
|
|
|
handle_cast({Type, Frame, FlowPid}, State) ->
|
2012-07-03 16:55:31 +08:00
|
|
|
case FlowPid of
|
|
|
|
|
noflow -> ok;
|
|
|
|
|
_ -> credit_flow:ack(FlowPid)
|
|
|
|
|
end,
|
2012-07-04 00:35:18 +08:00
|
|
|
process_request(Type, Frame, State);
|
2012-06-27 00:57:24 +08:00
|
|
|
|
2012-07-03 16:55:31 +08:00
|
|
|
handle_cast(client_timeout, State) ->
|
|
|
|
|
{stop, client_timeout, State}.
|
2012-06-27 00:57:24 +08:00
|
|
|
|
2012-07-04 23:31:30 +08:00
|
|
|
handle_info({#'basic.deliver'{delivery_tag = Tag,
|
|
|
|
|
routing_key = RoutingKey },
|
|
|
|
|
#amqp_msg{ payload = Payload }},
|
|
|
|
|
#state { channel = Channel,
|
|
|
|
|
message_id = _MessageId } = State ) ->
|
2012-07-05 00:47:07 +08:00
|
|
|
send_frame(
|
|
|
|
|
#mqtt_frame{ fixed = #mqtt_frame_fixed {type = ?PUBLISH},
|
|
|
|
|
variable = #mqtt_frame_publish {
|
|
|
|
|
topic_name = untranslate_topic( RoutingKey) },
|
|
|
|
|
payload = Payload},
|
|
|
|
|
State),
|
2012-07-04 23:31:30 +08:00
|
|
|
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
|
|
|
|
|
{noreply, State, hibernate};
|
2012-06-27 00:57:24 +08:00
|
|
|
handle_info(#'basic.consume_ok'{}, State) ->
|
|
|
|
|
{noreply, State, hibernate};
|
|
|
|
|
handle_info(#'basic.cancel_ok'{}, State) ->
|
|
|
|
|
{noreply, State, hibernate};
|
2012-07-03 16:55:31 +08:00
|
|
|
handle_info({'EXIT', Conn, Reason}, State = #state{connection = Conn}) ->
|
|
|
|
|
{stop, {conn_died, Reason}, State};
|
2012-06-27 00:57:24 +08:00
|
|
|
handle_info({inet_reply, _, ok}, State) ->
|
|
|
|
|
{noreply, State, hibernate};
|
2012-07-03 16:55:31 +08:00
|
|
|
handle_info({bump_credit, Msg}, State) ->
|
|
|
|
|
credit_flow:handle_bump_msg(Msg),
|
|
|
|
|
{noreply, State, hibernate};
|
2012-06-27 00:57:24 +08:00
|
|
|
|
|
|
|
|
handle_info({inet_reply, _, Status}, State) ->
|
|
|
|
|
{stop, Status, State}.
|
|
|
|
|
|
2012-07-04 00:35:18 +08:00
|
|
|
process_connect(#mqtt_frame{
|
|
|
|
|
variable = #mqtt_frame_connect { username = Username,
|
|
|
|
|
password = Password,
|
|
|
|
|
proto_ver = ProtoVersion,
|
|
|
|
|
client_id = ClientId }},
|
|
|
|
|
State = #state{channel = none,
|
|
|
|
|
adapter_info = AdapterInfo}) ->
|
|
|
|
|
{ReturnCode, State1} =
|
|
|
|
|
case {ProtoVersion =:= ?MQTT_PROTO_MAJOR, valid_client_id(ClientId)} of
|
|
|
|
|
{false, _} ->
|
|
|
|
|
{?CONNACK_PROTO_VER, State};
|
|
|
|
|
{_, false} ->
|
|
|
|
|
{?CONNACK_INVALID_ID, State};
|
|
|
|
|
_ ->
|
|
|
|
|
{UserBin, Creds} = creds(Username, Password),
|
|
|
|
|
case rabbit_access_control:check_user_login(UserBin, Creds) of
|
|
|
|
|
{ok, _User} ->
|
|
|
|
|
{ok, VHost} = application:get_env(rabbitmq_mqtt, vhost),
|
|
|
|
|
case amqp_connection:start(
|
|
|
|
|
#amqp_params_direct{username = UserBin,
|
|
|
|
|
virtual_host = VHost,
|
|
|
|
|
adapter_info = AdapterInfo}) of
|
|
|
|
|
{ok, Connection} ->
|
|
|
|
|
link(Connection),
|
|
|
|
|
{ok, Channel} =
|
|
|
|
|
amqp_connection:open_channel(Connection),
|
|
|
|
|
ok = ensure_unique_client_id(ClientId),
|
|
|
|
|
{?CONNACK_ACCEPT,
|
|
|
|
|
State#state{connection = Connection,
|
|
|
|
|
channel = Channel,
|
|
|
|
|
client_id = ClientId}};
|
|
|
|
|
{error, auth_failure} ->
|
|
|
|
|
rabbit_log:error("MQTT login failed - " ++
|
|
|
|
|
"auth_failure " ++
|
|
|
|
|
"(user vanished)~n"),
|
|
|
|
|
{?CONNACK_CREDENTIALS, State};
|
|
|
|
|
{error, access_refused} ->
|
|
|
|
|
rabbit_log:warning("MQTT login failed - " ++
|
|
|
|
|
"access_refused " ++
|
|
|
|
|
"(vhost access not allowed)~n"),
|
|
|
|
|
{?CONNACK_AUTH, State}
|
|
|
|
|
end;
|
|
|
|
|
{refused, Msg, Args} ->
|
|
|
|
|
rabbit_log:warning("MQTT login failed: " ++ Msg ++
|
|
|
|
|
"\n", Args),
|
|
|
|
|
{?CONNACK_CREDENTIALS, State}
|
|
|
|
|
end
|
|
|
|
|
end,
|
2012-07-03 16:55:31 +08:00
|
|
|
send_frame(#mqtt_frame{ fixed = #mqtt_frame_fixed {type = ?CONNACK},
|
2012-07-04 23:31:30 +08:00
|
|
|
variable = #mqtt_frame_connack {
|
|
|
|
|
return_code = ReturnCode }}, State),
|
2012-07-04 00:35:18 +08:00
|
|
|
{noreply, State1, hibernate}.
|
|
|
|
|
|
2012-07-04 23:31:30 +08:00
|
|
|
process_request(?PUBLISH,
|
|
|
|
|
#mqtt_frame {
|
|
|
|
|
variable = #mqtt_frame_publish { topic_name = TopicName,
|
2012-07-05 00:47:07 +08:00
|
|
|
message_id = _MessageId },
|
2012-07-04 23:31:30 +08:00
|
|
|
payload = Payload }, #state { channel = Channel } = State) ->
|
2012-07-06 01:08:23 +08:00
|
|
|
Method = #'basic.publish'{ exchange = ?DEFAULT_EXCHANGE,
|
2012-07-04 23:31:30 +08:00
|
|
|
routing_key = translate_topic(TopicName)},
|
2012-07-04 00:35:18 +08:00
|
|
|
amqp_channel:cast(Channel, Method, #amqp_msg{payload = Payload}),
|
2012-07-04 23:31:30 +08:00
|
|
|
{noreply, State, hibernate};
|
|
|
|
|
|
|
|
|
|
process_request(?SUBSCRIBE,
|
|
|
|
|
#mqtt_frame {
|
|
|
|
|
variable = #mqtt_frame_subscribe { message_id = MessageId,
|
2012-07-05 00:47:07 +08:00
|
|
|
topic_table = Topics },
|
2012-07-04 23:31:30 +08:00
|
|
|
payload = undefined }, #state { channel = Channel,
|
|
|
|
|
client_id = ClientId} = State) ->
|
2012-07-05 00:47:07 +08:00
|
|
|
Queue = subcription_queue_name(ClientId),
|
2012-07-04 23:31:30 +08:00
|
|
|
#'queue.declare_ok'{} =
|
|
|
|
|
amqp_channel:call(Channel, #'queue.declare'{ queue = Queue,
|
|
|
|
|
exclusive = false,
|
|
|
|
|
auto_delete = false}),
|
2012-07-05 00:47:07 +08:00
|
|
|
QosResponse =
|
|
|
|
|
[begin
|
|
|
|
|
Binding = #'queue.bind'{queue = Queue,
|
2012-07-06 01:08:23 +08:00
|
|
|
exchange = ?DEFAULT_EXCHANGE,
|
2012-07-05 00:47:07 +08:00
|
|
|
routing_key = translate_topic(TopicName)},
|
|
|
|
|
#'queue.bind_ok'{} = amqp_channel:call(Channel, Binding),
|
|
|
|
|
?QOS_0
|
|
|
|
|
end || #mqtt_topic { name = TopicName } <- Topics ],
|
2012-07-04 23:31:30 +08:00
|
|
|
Method = #'basic.consume'{queue = Queue},
|
2012-07-05 00:47:07 +08:00
|
|
|
#'basic.consume_ok'{consumer_tag = _Tag} = amqp_channel:call(Channel, Method),
|
2012-07-04 23:31:30 +08:00
|
|
|
send_frame(#mqtt_frame{ fixed = #mqtt_frame_fixed {type = ?SUBACK},
|
|
|
|
|
variable = #mqtt_frame_suback {
|
|
|
|
|
message_id = MessageId,
|
2012-07-05 00:47:07 +08:00
|
|
|
qos_table = QosResponse }}, State),
|
|
|
|
|
{noreply, State, hibernate};
|
|
|
|
|
|
|
|
|
|
process_request(?UNSUBSCRIBE,
|
|
|
|
|
#mqtt_frame {
|
|
|
|
|
variable = #mqtt_frame_subscribe { message_id = MessageId,
|
|
|
|
|
topic_table = Topics },
|
|
|
|
|
payload = undefined }, #state { channel = Channel,
|
|
|
|
|
client_id = ClientId} = State) ->
|
|
|
|
|
Queue = subcription_queue_name(ClientId),
|
|
|
|
|
[begin
|
|
|
|
|
Binding = #'queue.unbind'{queue = Queue,
|
2012-07-06 01:08:23 +08:00
|
|
|
exchange = ?DEFAULT_EXCHANGE,
|
2012-07-05 00:47:07 +08:00
|
|
|
routing_key = translate_topic(TopicName)},
|
|
|
|
|
#'queue.unbind_ok'{} = amqp_channel:call(Channel, Binding)
|
|
|
|
|
end || #mqtt_topic { name = TopicName } <- Topics ],
|
|
|
|
|
send_frame(#mqtt_frame{ fixed = #mqtt_frame_fixed {type = ?UNSUBACK},
|
|
|
|
|
variable = #mqtt_frame_suback {
|
|
|
|
|
message_id = MessageId }}, State),
|
2012-07-04 23:31:30 +08:00
|
|
|
{noreply, State, hibernate};
|
|
|
|
|
|
|
|
|
|
process_request(?PINGREQ, _, State) ->
|
|
|
|
|
send_frame(#mqtt_frame{ fixed = #mqtt_frame_fixed {type = ?PINGRESP}}, State),
|
2012-07-06 01:08:23 +08:00
|
|
|
{noreply, State, hibernate};
|
|
|
|
|
|
|
|
|
|
process_request(?DISCONNECT, _, State) ->
|
|
|
|
|
{stop, normal, close_connection(State)}.
|
2012-07-03 16:55:31 +08:00
|
|
|
|
2012-07-05 00:47:07 +08:00
|
|
|
subcription_queue_name(ClientId) ->
|
|
|
|
|
list_to_binary("MQTT_subscription_" ++ ClientId).
|
2012-07-04 23:31:30 +08:00
|
|
|
|
|
|
|
|
%% amqp mqtt descr
|
|
|
|
|
%% * + match one topic level
|
|
|
|
|
%% # # match multiple topic levels
|
|
|
|
|
%% . / topic level separator
|
|
|
|
|
translate_topic(Topic) ->
|
|
|
|
|
erlang:iolist_to_binary(
|
|
|
|
|
re:replace(re:replace(Topic, "/", ".", [global]),
|
|
|
|
|
"[\+]", "*", [global])).
|
|
|
|
|
|
2012-07-05 00:47:07 +08:00
|
|
|
untranslate_topic(Topic) ->
|
|
|
|
|
erlang:iolist_to_binary(
|
|
|
|
|
re:replace(re:replace(Topic, "[\*]", "+", [global]),
|
|
|
|
|
"[\.]", "/", [global])).
|
|
|
|
|
|
2012-07-04 00:35:18 +08:00
|
|
|
valid_client_id(ClientId) ->
|
|
|
|
|
ClientIdLen = length(ClientId),
|
|
|
|
|
1 =< ClientIdLen andalso ClientIdLen =< 23.
|
|
|
|
|
|
2012-07-05 00:47:07 +08:00
|
|
|
ensure_unique_client_id(_ClientId) ->
|
2012-07-04 00:35:18 +08:00
|
|
|
%% todo spec section 3.1:
|
|
|
|
|
%% If a client with the same Client ID is already connected to the server,
|
|
|
|
|
%% the "older" client must be disconnected by the server before completing
|
|
|
|
|
%% the CONNECT flow of the new client.
|
|
|
|
|
ok.
|
|
|
|
|
|
|
|
|
|
creds(Username, Password) ->
|
|
|
|
|
{ok, DefaultUser} = application:get_env(rabbitmq_mqtt, default_user),
|
|
|
|
|
{ok, DefaultPass} = application:get_env(rabbitmq_mqtt, default_pass),
|
|
|
|
|
U = case Username of
|
|
|
|
|
undefined -> DefaultUser;
|
|
|
|
|
_ -> list_to_binary(Username)
|
|
|
|
|
end,
|
|
|
|
|
P = case Password of
|
|
|
|
|
undefined -> DefaultPass;
|
|
|
|
|
_ -> list_to_binary(Password)
|
|
|
|
|
end,
|
|
|
|
|
{U, [{password, P}]}.
|
|
|
|
|
|
2012-07-03 16:55:31 +08:00
|
|
|
send_frame(Frame, State = #state{send_fun = SendFun}) ->
|
|
|
|
|
SendFun(async, rabbit_mqtt_frame:serialise(Frame)),
|
|
|
|
|
State.
|
|
|
|
|
|
|
|
|
|
%% Closing the connection will close the channel and subchannels
|
|
|
|
|
close_connection(State = #state{connection = Connection}) ->
|
|
|
|
|
%% ignore noproc or other exceptions to avoid debris
|
|
|
|
|
catch amqp_connection:close(Connection),
|
|
|
|
|
State#state{channel = none, connection = none}.
|
2012-06-27 00:57:24 +08:00
|
|
|
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
%% Skeleton gen_server2 callbacks
|
|
|
|
|
%%----------------------------------------------------------------------------
|
|
|
|
|
handle_call(_Msg, _From, State) ->
|
|
|
|
|
{noreply, State}.
|
|
|
|
|
|
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
|
|
|
{ok, State}.
|
2012-07-03 16:55:31 +08:00
|
|
|
|