2012-06-27 00:57:24 +08:00
|
|
|
%% The contents of this file are subject to the Mozilla Public License
|
|
|
|
|
%% Version 1.1 (the "License"); you may not use this file except in
|
|
|
|
|
%% compliance with the License. You may obtain a copy of the License
|
|
|
|
|
%% at http://www.mozilla.org/MPL/
|
|
|
|
|
%%
|
|
|
|
|
%% Software distributed under the License is distributed on an "AS IS"
|
|
|
|
|
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
|
|
|
|
|
%% the License for the specific language governing rights and
|
|
|
|
|
%% limitations under the License.
|
|
|
|
|
%%
|
|
|
|
|
%% The Original Code is RabbitMQ.
|
|
|
|
|
%%
|
|
|
|
|
%% The Initial Developer of the Original Code is VMware, Inc.
|
|
|
|
|
%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
|
|
|
|
|
%%
|
|
|
|
|
|
|
|
|
|
-module(rabbit_mqtt_processor).
|
|
|
|
|
|
2012-07-13 23:32:13 +08:00
|
|
|
-export([process_frame/2]).
|
2012-06-27 00:57:24 +08:00
|
|
|
|
|
|
|
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
2012-07-03 16:55:31 +08:00
|
|
|
-include("include/rabbit_mqtt_frame.hrl").
|
2012-07-13 23:32:13 +08:00
|
|
|
-include("include/rabbit_mqtt.hrl").
|
|
|
|
|
|
|
|
|
|
-define(FRAME_TYPE(Frame, Type),
|
|
|
|
|
Frame = #mqtt_frame{fixed = #mqtt_frame_fixed { type = Type }}).
|
|
|
|
|
|
|
|
|
|
process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
|
|
|
|
|
State ) ->
|
|
|
|
|
%rabbit_log:error("received frame ~p ~n", [Frame]),
|
|
|
|
|
process_request(Type, Frame, State).
|
|
|
|
|
|
|
|
|
|
process_request(?CONNECT,
|
2012-07-16 21:57:31 +08:00
|
|
|
#mqtt_frame{ variable = #mqtt_frame_connect {
|
|
|
|
|
username = Username,
|
|
|
|
|
password = Password,
|
|
|
|
|
proto_ver = ProtoVersion,
|
|
|
|
|
clean_sess = CleanSess,
|
|
|
|
|
client_id = ClientId }}, State) ->
|
2012-07-04 00:35:18 +08:00
|
|
|
{ReturnCode, State1} =
|
2012-07-13 23:32:13 +08:00
|
|
|
case {ProtoVersion =:= ?MQTT_PROTO_MAJOR,
|
|
|
|
|
rabbit_mqtt_util:valid_client_id(ClientId)} of
|
2012-07-04 00:35:18 +08:00
|
|
|
{false, _} ->
|
|
|
|
|
{?CONNACK_PROTO_VER, State};
|
|
|
|
|
{_, false} ->
|
|
|
|
|
{?CONNACK_INVALID_ID, State};
|
|
|
|
|
_ ->
|
2012-07-16 21:57:31 +08:00
|
|
|
case creds(Username, Password) of
|
|
|
|
|
nocreds ->
|
|
|
|
|
rabbit_log:error("MQTT login failed - no credentials~n"),
|
|
|
|
|
{?CONNACK_CREDENTIALS, State};
|
|
|
|
|
{UserBin, Creds} ->
|
|
|
|
|
case process_login(UserBin, Creds, State) of
|
|
|
|
|
{?CONNACK_ACCEPT, Conn} ->
|
|
|
|
|
link(Conn),
|
|
|
|
|
maybe_clean_sess(CleanSess, Conn, ClientId),
|
|
|
|
|
{ok, Ch} = amqp_connection:open_channel(Conn),
|
|
|
|
|
ok = ensure_unique_client_id(ClientId),
|
|
|
|
|
{?CONNACK_ACCEPT,
|
|
|
|
|
State #state { clean_sess = CleanSess,
|
|
|
|
|
channel = Ch,
|
|
|
|
|
connection = Conn,
|
|
|
|
|
client_id = ClientId }};
|
|
|
|
|
ConnAck -> {ConnAck, State}
|
|
|
|
|
end
|
2012-07-04 00:35:18 +08:00
|
|
|
end
|
|
|
|
|
end,
|
2012-07-16 21:57:31 +08:00
|
|
|
send_frame(#mqtt_frame{ fixed = #mqtt_frame_fixed { type = ?CONNACK},
|
2012-07-04 23:31:30 +08:00
|
|
|
variable = #mqtt_frame_connack {
|
2012-07-13 23:32:13 +08:00
|
|
|
return_code = ReturnCode }}, State1),
|
|
|
|
|
{ok, State1};
|
2012-07-04 00:35:18 +08:00
|
|
|
|
2012-07-04 23:31:30 +08:00
|
|
|
process_request(?PUBLISH,
|
|
|
|
|
#mqtt_frame {
|
2012-07-16 21:57:31 +08:00
|
|
|
fixed = #mqtt_frame_fixed { qos = Qos,
|
|
|
|
|
dup = Dup },
|
2012-07-04 23:31:30 +08:00
|
|
|
variable = #mqtt_frame_publish { topic_name = TopicName,
|
2012-07-16 21:57:31 +08:00
|
|
|
message_id = MessageId },
|
|
|
|
|
payload = Payload },
|
|
|
|
|
#state { channel = Channel,
|
|
|
|
|
confirms = Confirms,
|
|
|
|
|
unacked_pubs = UnackedPubs } = State) ->
|
|
|
|
|
State1 = case not Confirms andalso Qos =:= 1 of
|
|
|
|
|
true -> #'confirm.select_ok'{} =
|
|
|
|
|
amqp_channel:call(Channel, #'confirm.select'{}),
|
|
|
|
|
amqp_channel:register_confirm_handler(Channel, self()),
|
|
|
|
|
State #state { confirms = true };
|
|
|
|
|
_ -> State
|
|
|
|
|
end,
|
|
|
|
|
SeqNo = amqp_channel:next_publish_seqno(Channel),
|
2012-07-06 01:08:23 +08:00
|
|
|
Method = #'basic.publish'{ exchange = ?DEFAULT_EXCHANGE,
|
2012-07-13 23:32:13 +08:00
|
|
|
routing_key =
|
|
|
|
|
rabbit_mqtt_util:translate_topic(TopicName)},
|
2012-07-04 00:35:18 +08:00
|
|
|
amqp_channel:cast(Channel, Method, #amqp_msg{payload = Payload}),
|
2012-07-16 21:57:31 +08:00
|
|
|
{ok, State1 #state { unacked_pubs =
|
|
|
|
|
case Qos of
|
|
|
|
|
0 -> UnackedPubs;
|
|
|
|
|
1 -> queue:in({SeqNo, MessageId}, UnackedPubs)
|
|
|
|
|
end }};
|
2012-07-04 23:31:30 +08:00
|
|
|
|
|
|
|
|
process_request(?SUBSCRIBE,
|
|
|
|
|
#mqtt_frame {
|
2012-07-13 23:32:13 +08:00
|
|
|
variable = #mqtt_frame_subscribe { message_id = MessageId,
|
2012-07-05 00:47:07 +08:00
|
|
|
topic_table = Topics },
|
2012-07-04 23:31:30 +08:00
|
|
|
payload = undefined }, #state { channel = Channel,
|
2012-07-16 21:57:31 +08:00
|
|
|
client_id = ClientId,
|
|
|
|
|
consumer_tag = Tag0} = State) ->
|
2012-07-13 23:32:13 +08:00
|
|
|
Queue = rabbit_mqtt_util:subcription_queue_name(ClientId),
|
2012-07-16 21:57:31 +08:00
|
|
|
Tag1 = case Tag0 of
|
|
|
|
|
undefined ->
|
|
|
|
|
#'queue.declare_ok'{} =
|
|
|
|
|
amqp_channel:call(Channel, #'queue.declare'{
|
|
|
|
|
queue = Queue }),
|
|
|
|
|
Method = #'basic.consume'{ queue = Queue },
|
|
|
|
|
#'basic.consume_ok'{ consumer_tag = Tag } =
|
|
|
|
|
amqp_channel:call(Channel, Method),
|
|
|
|
|
Tag;
|
|
|
|
|
_ -> Tag0
|
|
|
|
|
end,
|
2012-07-05 00:47:07 +08:00
|
|
|
QosResponse =
|
2012-07-13 23:32:13 +08:00
|
|
|
[begin
|
|
|
|
|
Binding = #'queue.bind'{
|
|
|
|
|
queue = Queue,
|
|
|
|
|
exchange = ?DEFAULT_EXCHANGE,
|
|
|
|
|
routing_key = rabbit_mqtt_util:translate_topic(TopicName)},
|
|
|
|
|
#'queue.bind_ok'{} = amqp_channel:call(Channel, Binding),
|
|
|
|
|
?QOS_0
|
|
|
|
|
end || #mqtt_topic { name = TopicName } <- Topics ],
|
|
|
|
|
send_frame(#mqtt_frame{ fixed = #mqtt_frame_fixed { type = ?SUBACK },
|
2012-07-04 23:31:30 +08:00
|
|
|
variable = #mqtt_frame_suback {
|
|
|
|
|
message_id = MessageId,
|
2012-07-13 23:32:13 +08:00
|
|
|
qos_table = QosResponse }}, State),
|
2012-07-16 21:57:31 +08:00
|
|
|
{ok, State #state { consumer_tag = Tag1 }};
|
2012-07-05 00:47:07 +08:00
|
|
|
|
|
|
|
|
process_request(?UNSUBSCRIBE,
|
|
|
|
|
#mqtt_frame {
|
2012-07-13 23:32:13 +08:00
|
|
|
variable = #mqtt_frame_subscribe { message_id = MessageId,
|
2012-07-05 00:47:07 +08:00
|
|
|
topic_table = Topics },
|
2012-07-13 23:32:13 +08:00
|
|
|
payload = undefined }, #state { channel = Channel,
|
2012-07-05 00:47:07 +08:00
|
|
|
client_id = ClientId} = State) ->
|
2012-07-13 23:32:13 +08:00
|
|
|
Queue = rabbit_mqtt_util:subcription_queue_name(ClientId),
|
2012-07-05 00:47:07 +08:00
|
|
|
[begin
|
|
|
|
|
Binding = #'queue.unbind'{queue = Queue,
|
2012-07-06 01:08:23 +08:00
|
|
|
exchange = ?DEFAULT_EXCHANGE,
|
2012-07-13 23:32:13 +08:00
|
|
|
routing_key = rabbit_mqtt_util:translate_topic(TopicName)},
|
2012-07-05 00:47:07 +08:00
|
|
|
#'queue.unbind_ok'{} = amqp_channel:call(Channel, Binding)
|
|
|
|
|
end || #mqtt_topic { name = TopicName } <- Topics ],
|
2012-07-13 23:32:13 +08:00
|
|
|
send_frame(#mqtt_frame{ fixed = #mqtt_frame_fixed {type = ?UNSUBACK},
|
|
|
|
|
variable = #mqtt_frame_suback {message_id = MessageId }}, State),
|
|
|
|
|
{ok, State};
|
2012-07-03 16:55:31 +08:00
|
|
|
|
2012-07-13 23:32:13 +08:00
|
|
|
process_request(?PINGREQ, #mqtt_frame{}, State) ->
|
|
|
|
|
send_frame(#mqtt_frame { fixed = #mqtt_frame_fixed { type = ?PINGRESP }},
|
|
|
|
|
State),
|
|
|
|
|
{ok, State};
|
2012-07-04 23:31:30 +08:00
|
|
|
|
2012-07-13 23:32:13 +08:00
|
|
|
process_request(?DISCONNECT, #mqtt_frame {}, State) ->
|
|
|
|
|
{stop, normal, State}.
|
2012-07-04 00:35:18 +08:00
|
|
|
|
2012-07-16 21:57:31 +08:00
|
|
|
maybe_clean_sess(false, _Conn, _ClientId) ->
|
|
|
|
|
ok;
|
|
|
|
|
maybe_clean_sess(true, Conn, ClientId) ->
|
|
|
|
|
Queue = rabbit_mqtt_util:subcription_queue_name(ClientId),
|
|
|
|
|
{ok, Channel} = amqp_connection:open_channel(Conn),
|
|
|
|
|
try amqp_channel:call(Channel, #'queue.declare'{ queue = Queue,
|
|
|
|
|
passive = true }) of
|
|
|
|
|
#'queue.declare_ok'{} -> #'queue.delete_ok'{} =
|
|
|
|
|
amqp_channel:call(Channel,
|
|
|
|
|
#'queue.delete'{
|
|
|
|
|
queue = Queue }),
|
|
|
|
|
ok = amqp_channel:close(Channel)
|
|
|
|
|
catch
|
|
|
|
|
exit:Reason -> ok
|
|
|
|
|
end.
|
|
|
|
|
|
2012-07-05 00:47:07 +08:00
|
|
|
ensure_unique_client_id(_ClientId) ->
|
2012-07-04 00:35:18 +08:00
|
|
|
%% todo spec section 3.1:
|
|
|
|
|
%% If a client with the same Client ID is already connected to the server,
|
|
|
|
|
%% the "older" client must be disconnected by the server before completing
|
|
|
|
|
%% the CONNECT flow of the new client.
|
|
|
|
|
ok.
|
|
|
|
|
|
2012-07-16 21:57:31 +08:00
|
|
|
process_login(UserBin, Creds, State = #state{ channel = undefined,
|
|
|
|
|
adapter_info = AdapterInfo }) ->
|
|
|
|
|
case rabbit_access_control:check_user_login(UserBin, Creds) of
|
|
|
|
|
{ok, _User} ->
|
|
|
|
|
{ok, VHost} = application:get_env(rabbitmq_mqtt, vhost),
|
|
|
|
|
case amqp_connection:start(
|
|
|
|
|
#amqp_params_direct{username = UserBin,
|
|
|
|
|
virtual_host = VHost,
|
|
|
|
|
adapter_info = AdapterInfo}) of
|
|
|
|
|
{ok, Connection} ->
|
|
|
|
|
{?CONNACK_ACCEPT, Connection};
|
|
|
|
|
{error, auth_failure} ->
|
|
|
|
|
rabbit_log:error("MQTT login failed - " ++
|
|
|
|
|
"auth_failure " ++
|
|
|
|
|
"(user vanished)~n"),
|
|
|
|
|
?CONNACK_CREDENTIALS;
|
|
|
|
|
{error, access_refused} ->
|
|
|
|
|
rabbit_log:warning("MQTT login failed - " ++
|
|
|
|
|
"access_refused " ++
|
|
|
|
|
"(vhost access not allowed)~n"),
|
|
|
|
|
?CONNACK_AUTH
|
|
|
|
|
end;
|
|
|
|
|
{refused, Msg, Args} ->
|
|
|
|
|
rabbit_log:warning("MQTT login failed: " ++ Msg ++
|
|
|
|
|
"\n", Args),
|
|
|
|
|
?CONNACK_CREDENTIALS
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
creds(User, Pass) ->
|
|
|
|
|
DefaultUser = rabbit_mqtt_util:env(default_user),
|
|
|
|
|
DefaultPass = rabbit_mqtt_util:env(default_pass),
|
|
|
|
|
Anon = rabbit_mqtt_util:env(allow_anonymous),
|
|
|
|
|
U = case {User =/= undefined, is_binary(DefaultUser), Anon =:= true} of
|
|
|
|
|
{true, _, _ } -> list_to_binary(User);
|
|
|
|
|
{false, true, true} -> DefaultUser;
|
|
|
|
|
_ -> nocreds
|
2012-07-04 00:35:18 +08:00
|
|
|
end,
|
2012-07-16 21:57:31 +08:00
|
|
|
case U of
|
|
|
|
|
nocreds ->
|
|
|
|
|
nocreds;
|
|
|
|
|
_ ->
|
|
|
|
|
case {Pass =/= undefined, is_binary(DefaultPass), Anon =:= true} of
|
|
|
|
|
{true, _, _ } -> {U, [{password, list_to_binary(Pass)}]};
|
|
|
|
|
{false, true, true} -> {U, [{password, DefaultPass}]};
|
|
|
|
|
_ -> {U, []}
|
|
|
|
|
end
|
|
|
|
|
end.
|
2012-07-04 00:35:18 +08:00
|
|
|
|
2012-07-16 21:57:31 +08:00
|
|
|
send_frame(Frame, #state{ socket = Sock }) ->
|
2012-07-13 23:32:13 +08:00
|
|
|
rabbit_mqtt_reader:send_frame(Sock, Frame).
|