| 
									
										
										
										
											2012-06-27 00:57:24 +08:00
										 |  |  | %% The contents of this file are subject to the Mozilla Public License
 | 
					
						
							|  |  |  | %% Version 1.1 (the "License"); you may not use this file except in
 | 
					
						
							|  |  |  | %% compliance with the License. You may obtain a copy of the License
 | 
					
						
							|  |  |  | %% at http://www.mozilla.org/MPL/
 | 
					
						
							|  |  |  | %%
 | 
					
						
							|  |  |  | %% Software distributed under the License is distributed on an "AS IS"
 | 
					
						
							|  |  |  | %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
 | 
					
						
							|  |  |  | %% the License for the specific language governing rights and
 | 
					
						
							|  |  |  | %% limitations under the License.
 | 
					
						
							|  |  |  | %%
 | 
					
						
							|  |  |  | %% The Original Code is RabbitMQ.
 | 
					
						
							|  |  |  | %%
 | 
					
						
							| 
									
										
										
										
											2013-07-01 17:49:14 +08:00
										 |  |  | %% The Initial Developer of the Original Code is GoPivotal, Inc.
 | 
					
						
							| 
									
										
										
										
											2016-01-01 17:59:18 +08:00
										 |  |  | %% Copyright (c) 2007-2016 Pivotal Software, Inc.  All rights reserved.
 | 
					
						
							| 
									
										
										
										
											2012-06-27 00:57:24 +08:00
										 |  |  | %%
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | -module(rabbit_mqtt_processor). | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  | -export([info/2, initial_state/2, initial_state/3, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |          process_frame/2, amqp_pub/2, amqp_callback/2, send_will/1, | 
					
						
							|  |  |  |          close_connection/1]). | 
					
						
							| 
									
										
										
										
											2012-06-27 00:57:24 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-11-19 01:38:35 +08:00
										 |  |  | %% for testing purposes
 | 
					
						
							|  |  |  | -export([get_vhost_username/1]). | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-06-27 00:57:24 +08:00
										 |  |  | -include_lib("amqp_client/include/amqp_client.hrl"). | 
					
						
							| 
									
										
										
										
											2012-09-12 21:34:41 +08:00
										 |  |  | -include("rabbit_mqtt_frame.hrl"). | 
					
						
							|  |  |  | -include("rabbit_mqtt.hrl"). | 
					
						
							| 
									
										
										
										
											2012-07-13 23:32:13 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-11-28 23:08:42 +08:00
										 |  |  | -define(APP, rabbitmq_mqtt). | 
					
						
							| 
									
										
										
										
											2012-07-13 23:32:13 +08:00
										 |  |  | -define(FRAME_TYPE(Frame, Type), | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |         Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}). | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-01-08 07:25:26 +08:00
										 |  |  | initial_state(Socket, SSLLoginName) -> | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  |     initial_state(Socket, SSLLoginName, fun send_client/2). | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-01-08 07:25:26 +08:00
										 |  |  | initial_state(Socket, SSLLoginName, SendFun) -> | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  |     #proc_state{ unacked_pubs   = gb_trees:empty(), | 
					
						
							|  |  |  |                  awaiting_ack   = gb_trees:empty(), | 
					
						
							|  |  |  |                  message_id     = 1, | 
					
						
							|  |  |  |                  subscriptions  = dict:new(), | 
					
						
							|  |  |  |                  consumer_tags  = {undefined, undefined}, | 
					
						
							|  |  |  |                  channels       = {undefined, undefined}, | 
					
						
							|  |  |  |                  exchange       = rabbit_mqtt_util:env(exchange), | 
					
						
							|  |  |  |                  socket         = Socket, | 
					
						
							|  |  |  |                  ssl_login_name = SSLLoginName, | 
					
						
							|  |  |  |                  send_fun       = SendFun }. | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | info(client_id, #proc_state{ client_id = ClientId }) -> ClientId. | 
					
						
							| 
									
										
										
										
											2012-07-13 23:32:13 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  | process_frame(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |               PState = #proc_state{ connection = undefined } ) | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |   when Type =/= ?CONNECT -> | 
					
						
							| 
									
										
										
										
											2014-07-04 01:36:17 +08:00
										 |  |  |     {error, connect_expected, PState}; | 
					
						
							| 
									
										
										
										
											2012-07-13 23:32:13 +08:00
										 |  |  | process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}, | 
					
						
							| 
									
										
										
										
											2014-11-28 22:40:16 +08:00
										 |  |  |               PState) -> | 
					
						
							|  |  |  |     process_request(Type, Frame, PState). | 
					
						
							| 
									
										
										
										
											2012-07-13 23:32:13 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | process_request(?CONNECT, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                 #mqtt_frame{ variable = #mqtt_frame_connect{ | 
					
						
							| 
									
										
										
										
											2016-01-08 07:25:26 +08:00
										 |  |  |                                            username   = Username, | 
					
						
							|  |  |  |                                            password   = Password, | 
					
						
							|  |  |  |                                            proto_ver  = ProtoVersion, | 
					
						
							|  |  |  |                                            clean_sess = CleanSess, | 
					
						
							|  |  |  |                                            client_id  = ClientId0, | 
					
						
							|  |  |  |                                            keep_alive = Keepalive} = Var}, | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  |                 PState = #proc_state{ ssl_login_name = SSLLoginName, | 
					
						
							| 
									
										
										
										
											2016-01-08 07:25:26 +08:00
										 |  |  |                                       send_fun = SendFun }) -> | 
					
						
							| 
									
										
										
										
											2014-03-19 20:05:42 +08:00
										 |  |  |     ClientId = case ClientId0 of | 
					
						
							|  |  |  |                    []    -> rabbit_mqtt_util:gen_client_id(); | 
					
						
							|  |  |  |                    [_|_] -> ClientId0 | 
					
						
							|  |  |  |                end, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |     {ReturnCode, PState1} = | 
					
						
							| 
									
										
										
										
											2014-03-19 20:05:42 +08:00
										 |  |  |         case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)), | 
					
						
							|  |  |  |               ClientId0 =:= [] andalso CleanSess =:= false} of | 
					
						
							| 
									
										
										
										
											2012-07-04 00:35:18 +08:00
										 |  |  |             {false, _} -> | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                 {?CONNACK_PROTO_VER, PState}; | 
					
						
							| 
									
										
										
										
											2014-03-19 20:05:42 +08:00
										 |  |  |             {_, true} -> | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                 {?CONNACK_INVALID_ID, PState}; | 
					
						
							| 
									
										
										
										
											2012-07-04 00:35:18 +08:00
										 |  |  |             _ -> | 
					
						
							| 
									
										
										
										
											2014-11-21 22:54:33 +08:00
										 |  |  |                 case creds(Username, Password, SSLLoginName) of | 
					
						
							| 
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 |  |  |                     nocreds -> | 
					
						
							|  |  |  |                         rabbit_log:error("MQTT login failed - no credentials~n"), | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                         {?CONNACK_CREDENTIALS, PState}; | 
					
						
							| 
									
										
										
										
											2012-09-19 22:34:42 +08:00
										 |  |  |                     {UserBin, PassBin} -> | 
					
						
							| 
									
										
										
										
											2014-03-19 20:05:42 +08:00
										 |  |  |                         case process_login(UserBin, PassBin, ProtoVersion, PState) of | 
					
						
							| 
									
										
										
										
											2015-06-30 16:08:26 +08:00
										 |  |  |                             {?CONNACK_ACCEPT, Conn, VHost, AState} -> | 
					
						
							| 
									
										
										
										
											2015-04-21 19:26:46 +08:00
										 |  |  |                                  RetainerPid = | 
					
						
							| 
									
										
										
										
											2015-04-22 05:31:23 +08:00
										 |  |  |                                    rabbit_mqtt_retainer_sup:child_for_vhost(VHost), | 
					
						
							| 
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 |  |  |                                 link(Conn), | 
					
						
							|  |  |  |                                 {ok, Ch} = amqp_connection:open_channel(Conn), | 
					
						
							| 
									
										
										
										
											2015-02-10 20:52:02 +08:00
										 |  |  |                                 link(Ch), | 
					
						
							| 
									
										
										
										
											2014-08-11 18:26:27 +08:00
										 |  |  |                                 amqp_channel:enable_delivery_flow_control(Ch), | 
					
						
							| 
									
										
										
										
											2012-08-08 20:37:59 +08:00
										 |  |  |                                 ok = rabbit_mqtt_collector:register( | 
					
						
							|  |  |  |                                   ClientId, self()), | 
					
						
							| 
									
										
										
										
											2012-08-09 20:52:19 +08:00
										 |  |  |                                 Prefetch = rabbit_mqtt_util:env(prefetch), | 
					
						
							|  |  |  |                                 #'basic.qos_ok'{} = amqp_channel:call( | 
					
						
							|  |  |  |                                   Ch, #'basic.qos'{prefetch_count = Prefetch}), | 
					
						
							| 
									
										
										
										
											2014-01-07 01:50:02 +08:00
										 |  |  |                                 rabbit_mqtt_reader:start_keepalive(self(), Keepalive), | 
					
						
							| 
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 |  |  |                                 {?CONNACK_ACCEPT, | 
					
						
							| 
									
										
										
										
											2013-07-27 00:43:25 +08:00
										 |  |  |                                  maybe_clean_sess( | 
					
						
							|  |  |  |                                    PState #proc_state{ will_msg   = make_will_msg(Var), | 
					
						
							|  |  |  |                                                        clean_sess = CleanSess, | 
					
						
							|  |  |  |                                                        channels   = {Ch, undefined}, | 
					
						
							|  |  |  |                                                        connection = Conn, | 
					
						
							| 
									
										
										
										
											2015-04-18 08:55:34 +08:00
										 |  |  |                                                        client_id  = ClientId, | 
					
						
							| 
									
										
										
										
											2015-06-30 16:08:26 +08:00
										 |  |  |                                                        retainer_pid = RetainerPid, | 
					
						
							| 
									
										
										
										
											2015-07-14 17:52:04 +08:00
										 |  |  |                                                        auth_state = AState})}; | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                             ConnAck -> | 
					
						
							|  |  |  |                                 {ConnAck, PState} | 
					
						
							| 
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 |  |  |                         end | 
					
						
							| 
									
										
										
										
											2012-07-04 00:35:18 +08:00
										 |  |  |                 end | 
					
						
							|  |  |  |         end, | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  |     SendFun(#mqtt_frame{ fixed    = #mqtt_frame_fixed{ type = ?CONNACK}, | 
					
						
							|  |  |  |                          variable = #mqtt_frame_connack{ | 
					
						
							|  |  |  |                                      return_code = ReturnCode }}, PState1), | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |     {ok, PState1}; | 
					
						
							| 
									
										
										
										
											2012-07-04 00:35:18 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  | process_request(?PUBACK, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                 #mqtt_frame{ | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |                   variable = #mqtt_frame_publish{ message_id = MessageId }}, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                 #proc_state{ channels     = {Channel, _}, | 
					
						
							|  |  |  |                              awaiting_ack = Awaiting } = PState) -> | 
					
						
							| 
									
										
										
										
											2015-04-26 09:51:04 +08:00
										 |  |  |     %% tag can be missing because of bogus clients and QoS downgrades
 | 
					
						
							|  |  |  |     case gb_trees:is_defined(MessageId, Awaiting) of | 
					
						
							|  |  |  |       false -> | 
					
						
							|  |  |  |         {ok, PState}; | 
					
						
							|  |  |  |       true -> | 
					
						
							|  |  |  |         Tag = gb_trees:get(MessageId, Awaiting), | 
					
						
							|  |  |  |         amqp_channel:cast(Channel, #'basic.ack'{ delivery_tag = Tag }), | 
					
						
							|  |  |  |         {ok, PState #proc_state{ awaiting_ack = gb_trees:delete( MessageId, Awaiting)}} | 
					
						
							|  |  |  |     end; | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | process_request(?PUBLISH, | 
					
						
							|  |  |  |                 #mqtt_frame{ | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                   fixed = #mqtt_frame_fixed{ qos = ?QOS_2 }}, PState) -> | 
					
						
							| 
									
										
										
										
											2014-07-04 01:36:17 +08:00
										 |  |  |     {error, qos2_not_supported, PState}; | 
					
						
							| 
									
										
										
										
											2012-07-04 23:31:30 +08:00
										 |  |  | process_request(?PUBLISH, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                 #mqtt_frame{ | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |                   fixed = #mqtt_frame_fixed{ qos    = Qos, | 
					
						
							|  |  |  |                                              retain = Retain, | 
					
						
							|  |  |  |                                              dup    = Dup }, | 
					
						
							|  |  |  |                   variable = #mqtt_frame_publish{ topic_name = Topic, | 
					
						
							|  |  |  |                                                   message_id = MessageId }, | 
					
						
							| 
									
										
										
										
											2015-04-18 08:55:34 +08:00
										 |  |  |                   payload = Payload }, | 
					
						
							|  |  |  |                   PState = #proc_state{retainer_pid = RPid}) -> | 
					
						
							| 
									
										
										
										
											2015-06-30 16:08:26 +08:00
										 |  |  |     check_publish_or_die(Topic, fun() -> | 
					
						
							|  |  |  |         Msg = #mqtt_msg{retain     = Retain, | 
					
						
							|  |  |  |                         qos        = Qos, | 
					
						
							|  |  |  |                         topic      = Topic, | 
					
						
							|  |  |  |                         dup        = Dup, | 
					
						
							|  |  |  |                         message_id = MessageId, | 
					
						
							|  |  |  |                         payload    = Payload}, | 
					
						
							|  |  |  |         Result = amqp_pub(Msg, PState), | 
					
						
							|  |  |  |         case Retain of | 
					
						
							|  |  |  |           false -> ok; | 
					
						
							|  |  |  |           true  -> hand_off_to_retainer(RPid, Topic, Msg) | 
					
						
							|  |  |  |         end, | 
					
						
							|  |  |  |         {ok, Result} | 
					
						
							|  |  |  |     end, PState); | 
					
						
							| 
									
										
										
										
											2012-07-04 23:31:30 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | process_request(?SUBSCRIBE, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                 #mqtt_frame{ | 
					
						
							| 
									
										
										
										
											2015-04-21 19:26:46 +08:00
										 |  |  |                   variable = #mqtt_frame_subscribe{ | 
					
						
							|  |  |  |                               message_id  = MessageId, | 
					
						
							|  |  |  |                               topic_table = Topics}, | 
					
						
							|  |  |  |                   payload = undefined}, | 
					
						
							|  |  |  |                 #proc_state{channels = {Channel, _}, | 
					
						
							|  |  |  |                             exchange = Exchange, | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  |                             retainer_pid = RPid, | 
					
						
							|  |  |  |                             send_fun = SendFun } = PState0) -> | 
					
						
							| 
									
										
										
										
											2015-06-30 16:08:26 +08:00
										 |  |  |     check_subscribe_or_die(Topics, fun() -> | 
					
						
							|  |  |  |         {QosResponse, PState1} = | 
					
						
							|  |  |  |             lists:foldl(fun (#mqtt_topic{name = TopicName, | 
					
						
							|  |  |  |                                          qos  = Qos}, {QosList, PState}) -> | 
					
						
							|  |  |  |                            SupportedQos = supported_subs_qos(Qos), | 
					
						
							|  |  |  |                            {Queue, #proc_state{subscriptions = Subs} = PState1} = | 
					
						
							|  |  |  |                                ensure_queue(SupportedQos, PState), | 
					
						
							|  |  |  |                            Binding = #'queue.bind'{ | 
					
						
							|  |  |  |                                        queue       = Queue, | 
					
						
							|  |  |  |                                        exchange    = Exchange, | 
					
						
							|  |  |  |                                        routing_key = rabbit_mqtt_util:mqtt2amqp( | 
					
						
							|  |  |  |                                                        TopicName)}, | 
					
						
							|  |  |  |                            #'queue.bind_ok'{} = amqp_channel:call(Channel, Binding), | 
					
						
							|  |  |  |                            {[SupportedQos | QosList], | 
					
						
							|  |  |  |                             PState1 #proc_state{subscriptions = | 
					
						
							|  |  |  |                                                 dict:append(TopicName, SupportedQos, Subs)}} | 
					
						
							|  |  |  |                        end, {[], PState0}, Topics), | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  |         SendFun(#mqtt_frame{fixed    = #mqtt_frame_fixed{type = ?SUBACK}, | 
					
						
							|  |  |  |                             variable = #mqtt_frame_suback{ | 
					
						
							|  |  |  |                                         message_id = MessageId, | 
					
						
							|  |  |  |                                         qos_table  = QosResponse}}, PState1), | 
					
						
							| 
									
										
										
										
											2015-06-30 16:08:26 +08:00
										 |  |  |         %% we may need to send up to length(Topics) messages.
 | 
					
						
							|  |  |  |         %% if QoS is > 0 then we need to generate a message id,
 | 
					
						
							|  |  |  |         %% and increment the counter.
 | 
					
						
							|  |  |  |         N = lists:foldl(fun (Topic, Acc) -> | 
					
						
							|  |  |  |                           case maybe_send_retained_message(RPid, Topic, Acc, PState1) of | 
					
						
							|  |  |  |                             {true, X} -> Acc + X; | 
					
						
							|  |  |  |                             false     -> Acc | 
					
						
							|  |  |  |                           end | 
					
						
							|  |  |  |                         end, MessageId, Topics), | 
					
						
							|  |  |  |         {ok, PState1#proc_state{message_id = N}} | 
					
						
							|  |  |  |     end, PState0); | 
					
						
							| 
									
										
										
										
											2012-07-05 00:47:07 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | process_request(?UNSUBSCRIBE, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                 #mqtt_frame{ | 
					
						
							|  |  |  |                   variable = #mqtt_frame_subscribe{ message_id  = MessageId, | 
					
						
							|  |  |  |                                                     topic_table = Topics }, | 
					
						
							|  |  |  |                   payload = undefined }, #proc_state{ channels      = {Channel, _}, | 
					
						
							|  |  |  |                                                       exchange      = Exchange, | 
					
						
							|  |  |  |                                                       client_id     = ClientId, | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  |                                                       subscriptions = Subs0, | 
					
						
							|  |  |  |                                                       send_fun      = SendFun } = PState) -> | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |     Queues = rabbit_mqtt_util:subcription_queue_name(ClientId), | 
					
						
							|  |  |  |     Subs1 = | 
					
						
							|  |  |  |     lists:foldl( | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |       fun (#mqtt_topic{ name = TopicName }, Subs) -> | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |         QosSubs = case dict:find(TopicName, Subs) of | 
					
						
							|  |  |  |                       {ok, Val} when is_list(Val) -> lists:usort(Val); | 
					
						
							|  |  |  |                       error                       -> [] | 
					
						
							|  |  |  |                   end, | 
					
						
							|  |  |  |         lists:foreach( | 
					
						
							|  |  |  |           fun (QosSub) -> | 
					
						
							|  |  |  |                   Queue = element(QosSub + 1, Queues), | 
					
						
							|  |  |  |                   Binding = #'queue.unbind'{ | 
					
						
							|  |  |  |                               queue       = Queue, | 
					
						
							|  |  |  |                               exchange    = Exchange, | 
					
						
							|  |  |  |                               routing_key = | 
					
						
							| 
									
										
										
										
											2012-08-17 01:01:45 +08:00
										 |  |  |                                   rabbit_mqtt_util:mqtt2amqp(TopicName)}, | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |                   #'queue.unbind_ok'{} = amqp_channel:call(Channel, Binding) | 
					
						
							|  |  |  |           end, QosSubs), | 
					
						
							|  |  |  |         dict:erase(TopicName, Subs) | 
					
						
							|  |  |  |       end, Subs0, Topics), | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  |     SendFun(#mqtt_frame{ fixed    = #mqtt_frame_fixed { type       = ?UNSUBACK }, | 
					
						
							|  |  |  |                          variable = #mqtt_frame_suback{ message_id = MessageId }}, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                 PState), | 
					
						
							|  |  |  |     {ok, PState #proc_state{ subscriptions = Subs1 }}; | 
					
						
							| 
									
										
										
										
											2012-07-03 16:55:31 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  | process_request(?PINGREQ, #mqtt_frame{}, #proc_state{ send_fun = SendFun } = PState) -> | 
					
						
							|  |  |  |     SendFun(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                 PState), | 
					
						
							|  |  |  |     {ok, PState}; | 
					
						
							| 
									
										
										
										
											2012-07-04 23:31:30 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  | process_request(?DISCONNECT, #mqtt_frame{}, PState) -> | 
					
						
							|  |  |  |     {stop, PState}. | 
					
						
							| 
									
										
										
										
											2012-07-04 00:35:18 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  | %%----------------------------------------------------------------------------
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-04-21 19:26:46 +08:00
										 |  |  | hand_off_to_retainer(RetainerPid, Topic, #mqtt_msg{payload = <<"">>}) -> | 
					
						
							|  |  |  |   rabbit_mqtt_retainer:clear(RetainerPid, Topic), | 
					
						
							|  |  |  |   ok; | 
					
						
							|  |  |  | hand_off_to_retainer(RetainerPid, Topic, Msg) -> | 
					
						
							|  |  |  |   rabbit_mqtt_retainer:retain(RetainerPid, Topic, Msg), | 
					
						
							|  |  |  |   ok. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  | maybe_send_retained_message(RPid, #mqtt_topic{name = S, qos = SubscribeQos}, MsgId, | 
					
						
							|  |  |  |                             #proc_state{ send_fun = SendFun } = PState) -> | 
					
						
							| 
									
										
										
										
											2015-04-21 19:26:46 +08:00
										 |  |  |   case rabbit_mqtt_retainer:fetch(RPid, S) of | 
					
						
							|  |  |  |     undefined -> false; | 
					
						
							| 
									
										
										
										
											2015-04-26 00:35:47 +08:00
										 |  |  |     Msg       -> | 
					
						
							|  |  |  |                 %% calculate effective QoS as the lower value of SUBSCRIBE frame QoS
 | 
					
						
							|  |  |  |                 %% and retained message QoS. The spec isn't super clear on this, we
 | 
					
						
							|  |  |  |                 %% do what Mosquitto does, per user feedback.
 | 
					
						
							|  |  |  |                 Qos = erlang:min(SubscribeQos, Msg#mqtt_msg.qos), | 
					
						
							|  |  |  |                 Id = case Qos of | 
					
						
							|  |  |  |                   ?QOS_0 -> undefined; | 
					
						
							|  |  |  |                   ?QOS_1 -> MsgId | 
					
						
							|  |  |  |                 end, | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  |                 SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{ | 
					
						
							| 
									
										
										
										
											2015-04-21 19:26:46 +08:00
										 |  |  |                     type = ?PUBLISH, | 
					
						
							|  |  |  |                     qos  = Qos, | 
					
						
							|  |  |  |                     dup  = false, | 
					
						
							|  |  |  |                     retain = Msg#mqtt_msg.retain | 
					
						
							|  |  |  |                  }, variable = #mqtt_frame_publish{ | 
					
						
							|  |  |  |                     message_id = Id, | 
					
						
							|  |  |  |                     topic_name = S | 
					
						
							|  |  |  |                  }, | 
					
						
							|  |  |  |                  payload = Msg#mqtt_msg.payload}, PState), | 
					
						
							|  |  |  |                  case Qos of | 
					
						
							|  |  |  |                    ?QOS_0 -> false; | 
					
						
							|  |  |  |                    ?QOS_1 -> {true, 1} | 
					
						
							|  |  |  |                  end | 
					
						
							|  |  |  |   end. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  | amqp_callback({#'basic.deliver'{ consumer_tag = ConsumerTag, | 
					
						
							|  |  |  |                                  delivery_tag = DeliveryTag, | 
					
						
							|  |  |  |                                  routing_key  = RoutingKey }, | 
					
						
							|  |  |  |                #amqp_msg{ props = #'P_basic'{ headers = Headers }, | 
					
						
							| 
									
										
										
										
											2014-08-11 18:26:27 +08:00
										 |  |  |                           payload = Payload }, | 
					
						
							|  |  |  |                DeliveryCtx} = Delivery, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |               #proc_state{ channels      = {Channel, _}, | 
					
						
							|  |  |  |                            awaiting_ack  = Awaiting, | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  |                            message_id    = MsgId, | 
					
						
							|  |  |  |                            send_fun      = SendFun } = PState) -> | 
					
						
							| 
									
										
										
										
											2014-08-11 18:26:27 +08:00
										 |  |  |     amqp_channel:notify_received(DeliveryCtx), | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |     case {delivery_dup(Delivery), delivery_qos(ConsumerTag, Headers, PState)} of | 
					
						
							| 
									
										
										
										
											2012-08-17 22:05:14 +08:00
										 |  |  |         {true, {?QOS_0, ?QOS_1}} -> | 
					
						
							| 
									
										
										
										
											2012-08-17 01:09:21 +08:00
										 |  |  |             amqp_channel:cast( | 
					
						
							|  |  |  |               Channel, #'basic.ack'{ delivery_tag = DeliveryTag }), | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |             {ok, PState}; | 
					
						
							| 
									
										
										
										
											2012-08-17 22:05:14 +08:00
										 |  |  |         {true, {?QOS_0, ?QOS_0}} -> | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |             {ok, PState}; | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |         {Dup, {DeliveryQos, _SubQos} = Qos}     -> | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  |             SendFun( | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |               #mqtt_frame{ fixed = #mqtt_frame_fixed{ | 
					
						
							|  |  |  |                                      type = ?PUBLISH, | 
					
						
							|  |  |  |                                      qos  = DeliveryQos, | 
					
						
							|  |  |  |                                      dup  = Dup }, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                            variable = #mqtt_frame_publish{ | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |                                         message_id = | 
					
						
							|  |  |  |                                           case DeliveryQos of | 
					
						
							|  |  |  |                                               ?QOS_0 -> undefined; | 
					
						
							|  |  |  |                                               ?QOS_1 -> MsgId | 
					
						
							|  |  |  |                                           end, | 
					
						
							|  |  |  |                                         topic_name = | 
					
						
							| 
									
										
										
										
											2012-08-17 01:01:45 +08:00
										 |  |  |                                           rabbit_mqtt_util:amqp2mqtt( | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |                                             RoutingKey) }, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                            payload = Payload}, PState), | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |               case Qos of | 
					
						
							|  |  |  |                   {?QOS_0, ?QOS_0} -> | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                       {ok, PState}; | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |                   {?QOS_1, ?QOS_1} -> | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                       {ok, | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |                        next_msg_id( | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                          PState #proc_state{ | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |                            awaiting_ack = | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                              gb_trees:insert(MsgId, DeliveryTag, Awaiting)})}; | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |                   {?QOS_0, ?QOS_1} -> | 
					
						
							|  |  |  |                       amqp_channel:cast( | 
					
						
							| 
									
										
										
										
											2012-08-17 01:09:21 +08:00
										 |  |  |                         Channel, #'basic.ack'{ delivery_tag = DeliveryTag }), | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                       {ok, PState} | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |               end | 
					
						
							|  |  |  |     end; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | amqp_callback(#'basic.ack'{ multiple = true, delivery_tag = Tag } = Ack, | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  |               PState = #proc_state{ unacked_pubs = UnackedPubs, | 
					
						
							|  |  |  |                                     send_fun     = SendFun }) -> | 
					
						
							| 
									
										
										
										
											2013-12-04 22:59:22 +08:00
										 |  |  |     case gb_trees:size(UnackedPubs) > 0 andalso | 
					
						
							|  |  |  |          gb_trees:take_smallest(UnackedPubs) of | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |         {TagSmall, MsgId, UnackedPubs1} when TagSmall =< Tag -> | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  |             SendFun( | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |               #mqtt_frame{ fixed    = #mqtt_frame_fixed{ type = ?PUBACK }, | 
					
						
							|  |  |  |                            variable = #mqtt_frame_publish{ message_id = MsgId }}, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |               PState), | 
					
						
							|  |  |  |             amqp_callback(Ack, PState #proc_state{ unacked_pubs = UnackedPubs1 }); | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |         _ -> | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |             {ok, PState} | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |     end; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | amqp_callback(#'basic.ack'{ multiple = false, delivery_tag = Tag }, | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  |               PState = #proc_state{ unacked_pubs = UnackedPubs, | 
					
						
							|  |  |  |                                     send_fun     = SendFun }) -> | 
					
						
							|  |  |  |     SendFun( | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |       #mqtt_frame{ fixed    = #mqtt_frame_fixed{ type = ?PUBACK }, | 
					
						
							|  |  |  |                    variable = #mqtt_frame_publish{ | 
					
						
							|  |  |  |                                 message_id = gb_trees:get( | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                                                Tag, UnackedPubs) }}, PState), | 
					
						
							|  |  |  |     {ok, PState #proc_state{ unacked_pubs = gb_trees:delete(Tag, UnackedPubs) }}. | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | delivery_dup({#'basic.deliver'{ redelivered = Redelivered }, | 
					
						
							| 
									
										
										
										
											2014-08-11 18:26:27 +08:00
										 |  |  |               #amqp_msg{ props = #'P_basic'{ headers = Headers }}, | 
					
						
							|  |  |  |               _DeliveryCtx}) -> | 
					
						
							| 
									
										
										
										
											2012-11-06 18:32:38 +08:00
										 |  |  |     case rabbit_mqtt_util:table_lookup(Headers, <<"x-mqtt-dup">>) of | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |         undefined   -> Redelivered; | 
					
						
							|  |  |  |         {bool, Dup} -> Redelivered orelse Dup | 
					
						
							|  |  |  |     end. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  | next_msg_id(PState = #proc_state{ message_id = 16#ffff }) -> | 
					
						
							|  |  |  |     PState #proc_state{ message_id = 1 }; | 
					
						
							|  |  |  | next_msg_id(PState = #proc_state{ message_id = MsgId }) -> | 
					
						
							|  |  |  |     PState #proc_state{ message_id = MsgId + 1 }. | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | %% decide at which qos level to deliver based on subscription
 | 
					
						
							| 
									
										
										
										
											2012-11-06 04:30:45 +08:00
										 |  |  | %% and the message publish qos level. non-MQTT publishes are
 | 
					
						
							|  |  |  | %% assumed to be qos 1, regardless of delivery_mode.
 | 
					
						
							|  |  |  | delivery_qos(Tag, _Headers,  #proc_state{ consumer_tags = {Tag, _} }) -> | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |     {?QOS_0, ?QOS_0}; | 
					
						
							| 
									
										
										
										
											2012-11-06 04:30:45 +08:00
										 |  |  | delivery_qos(Tag, Headers,   #proc_state{ consumer_tags = {_, Tag} }) -> | 
					
						
							| 
									
										
										
										
											2012-11-06 18:32:38 +08:00
										 |  |  |     case rabbit_mqtt_util:table_lookup(Headers, <<"x-mqtt-publish-qos">>) of | 
					
						
							| 
									
										
										
										
											2012-11-06 04:30:45 +08:00
										 |  |  |         {byte, Qos} -> {lists:min([Qos, ?QOS_1]), ?QOS_1}; | 
					
						
							|  |  |  |         undefined   -> {?QOS_1, ?QOS_1} | 
					
						
							|  |  |  |     end. | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-07-27 00:43:25 +08:00
										 |  |  | maybe_clean_sess(PState = #proc_state { clean_sess = false }) -> | 
					
						
							|  |  |  |     {_Queue, PState1} = ensure_queue(?QOS_1, PState), | 
					
						
							|  |  |  |     PState1; | 
					
						
							|  |  |  | maybe_clean_sess(PState = #proc_state { clean_sess = true, | 
					
						
							|  |  |  |                                         connection = Conn, | 
					
						
							|  |  |  |                                         client_id  = ClientId }) -> | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |     {_, Queue} = rabbit_mqtt_util:subcription_queue_name(ClientId), | 
					
						
							| 
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 |  |  |     {ok, Channel} = amqp_connection:open_channel(Conn), | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |     try amqp_channel:call(Channel, #'queue.delete'{ queue = Queue }) of | 
					
						
							|  |  |  |         #'queue.delete_ok'{} -> ok = amqp_channel:close(Channel) | 
					
						
							| 
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 |  |  |     catch | 
					
						
							| 
									
										
										
										
											2012-07-17 05:52:53 +08:00
										 |  |  |         exit:_Error -> ok | 
					
						
							| 
									
										
										
										
											2013-07-27 00:43:25 +08:00
										 |  |  |     end, | 
					
						
							|  |  |  |     PState. | 
					
						
							| 
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  | %%----------------------------------------------------------------------------
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  | make_will_msg(#mqtt_frame_connect{ will_flag   = false }) -> | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |     undefined; | 
					
						
							|  |  |  | make_will_msg(#mqtt_frame_connect{ will_retain = Retain, | 
					
						
							|  |  |  |                                    will_qos    = Qos, | 
					
						
							|  |  |  |                                    will_topic  = Topic, | 
					
						
							|  |  |  |                                    will_msg    = Msg }) -> | 
					
						
							|  |  |  |     #mqtt_msg{ retain  = Retain, | 
					
						
							|  |  |  |                qos     = Qos, | 
					
						
							|  |  |  |                topic   = Topic, | 
					
						
							|  |  |  |                dup     = false, | 
					
						
							|  |  |  |                payload = Msg }. | 
					
						
							| 
									
										
										
										
											2012-07-04 00:35:18 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-03-19 20:05:42 +08:00
										 |  |  | process_login(UserBin, PassBin, ProtoVersion, | 
					
						
							|  |  |  |               #proc_state{ channels  = {undefined, undefined}, | 
					
						
							|  |  |  |                            socket    = Sock }) -> | 
					
						
							| 
									
										
										
										
											2014-02-19 01:33:00 +08:00
										 |  |  |     {VHost, UsernameBin} = get_vhost_username(UserBin), | 
					
						
							|  |  |  |     case amqp_connection:start(#amqp_params_direct{ | 
					
						
							| 
									
										
										
										
											2013-11-18 20:00:47 +08:00
										 |  |  |                                   username     = UsernameBin, | 
					
						
							| 
									
										
										
										
											2012-09-19 22:34:42 +08:00
										 |  |  |                                   password     = PassBin, | 
					
						
							|  |  |  |                                   virtual_host = VHost, | 
					
						
							| 
									
										
										
										
											2014-03-19 20:05:42 +08:00
										 |  |  |                                   adapter_info = adapter_info(Sock, ProtoVersion)}) of | 
					
						
							| 
									
										
										
										
											2014-02-19 01:33:00 +08:00
										 |  |  |         {ok, Connection} -> | 
					
						
							| 
									
										
										
										
											2014-02-19 01:33:33 +08:00
										 |  |  |             case rabbit_access_control:check_user_loopback(UsernameBin, Sock) of | 
					
						
							| 
									
										
										
										
											2015-06-30 16:08:26 +08:00
										 |  |  |                 ok          -> | 
					
						
							|  |  |  |                   {ok, User} = rabbit_access_control:check_user_login( | 
					
						
							|  |  |  |                                  UsernameBin, | 
					
						
							|  |  |  |                                  case PassBin of | 
					
						
							|  |  |  |                                    none -> []; | 
					
						
							|  |  |  |                                    P -> [{password,P}] | 
					
						
							|  |  |  |                                  end), | 
					
						
							|  |  |  |                   {?CONNACK_ACCEPT, Connection, VHost, #auth_state{ | 
					
						
							|  |  |  |                                                          user = User, | 
					
						
							|  |  |  |                                                          username = UsernameBin, | 
					
						
							| 
									
										
										
										
											2015-07-14 17:52:04 +08:00
										 |  |  |                                                          vhost = VHost}}; | 
					
						
							| 
									
										
										
										
											2014-02-19 01:33:33 +08:00
										 |  |  |                 not_allowed -> amqp_connection:close(Connection), | 
					
						
							|  |  |  |                                rabbit_log:warning( | 
					
						
							|  |  |  |                                  "MQTT login failed for ~p access_refused " | 
					
						
							|  |  |  |                                  "(access must be from localhost)~n", | 
					
						
							|  |  |  |                                  [binary_to_list(UsernameBin)]), | 
					
						
							|  |  |  |                                ?CONNACK_AUTH | 
					
						
							|  |  |  |             end; | 
					
						
							| 
									
										
										
										
											2014-02-19 01:33:00 +08:00
										 |  |  |         {error, {auth_failure, Explanation}} -> | 
					
						
							|  |  |  |             rabbit_log:error("MQTT login failed for ~p auth_failure: ~s~n", | 
					
						
							|  |  |  |                              [binary_to_list(UserBin), Explanation]), | 
					
						
							|  |  |  |             ?CONNACK_CREDENTIALS; | 
					
						
							|  |  |  |         {error, access_refused} -> | 
					
						
							|  |  |  |             rabbit_log:warning("MQTT login failed for ~p access_refused " | 
					
						
							|  |  |  |                                "(vhost access not allowed)~n", | 
					
						
							|  |  |  |                                [binary_to_list(UserBin)]), | 
					
						
							|  |  |  |             ?CONNACK_AUTH | 
					
						
							|  |  |  |     end. | 
					
						
							| 
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-11-18 20:00:47 +08:00
										 |  |  | get_vhost_username(UserBin) -> | 
					
						
							| 
									
										
										
										
											2015-11-19 01:38:35 +08:00
										 |  |  |     Default = {rabbit_mqtt_util:env(vhost), UserBin}, | 
					
						
							|  |  |  |     case application:get_env(?APP, ignore_colons_in_username) of | 
					
						
							|  |  |  |         {ok, true} -> Default; | 
					
						
							|  |  |  |         _ -> | 
					
						
							|  |  |  |             %% split at the last colon, disallowing colons in username
 | 
					
						
							|  |  |  |             case re:split(UserBin, ":(?!.*?:)") of | 
					
						
							|  |  |  |                 [Vhost, UserName] -> {Vhost,  UserName}; | 
					
						
							|  |  |  |                 [UserBin]         -> Default | 
					
						
							|  |  |  |             end | 
					
						
							| 
									
										
										
										
											2013-11-18 20:00:47 +08:00
										 |  |  |     end. | 
					
						
							| 
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-11-21 22:54:33 +08:00
										 |  |  | creds(User, Pass, SSLLoginName) -> | 
					
						
							| 
									
										
										
										
											2014-11-28 23:08:42 +08:00
										 |  |  |     DefaultUser   = rabbit_mqtt_util:env(default_user), | 
					
						
							|  |  |  |     DefaultPass   = rabbit_mqtt_util:env(default_pass), | 
					
						
							|  |  |  |     {ok, Anon}    = application:get_env(?APP, allow_anonymous), | 
					
						
							|  |  |  |     {ok, TLSAuth} = application:get_env(?APP, ssl_cert_login), | 
					
						
							| 
									
										
										
										
											2015-11-29 01:20:32 +08:00
										 |  |  |     U = case {User =/= undefined, | 
					
						
							|  |  |  |               is_binary(DefaultUser), | 
					
						
							|  |  |  |               Anon =:= true, | 
					
						
							|  |  |  |               (TLSAuth andalso SSLLoginName =/= none)} of | 
					
						
							|  |  |  |              %% username provided
 | 
					
						
							| 
									
										
										
										
											2014-11-24 21:59:19 +08:00
										 |  |  |              {true,  _,    _,    _}     -> list_to_binary(User); | 
					
						
							| 
									
										
										
										
											2015-11-29 01:20:32 +08:00
										 |  |  |              %% anonymous, default user is configured, no TLS
 | 
					
						
							| 
									
										
										
										
											2014-11-21 22:54:33 +08:00
										 |  |  |              {false, true, true, false} -> DefaultUser; | 
					
						
							| 
									
										
										
										
											2015-11-29 01:20:32 +08:00
										 |  |  |              %% no username provided, TLS certificate is present,
 | 
					
						
							|  |  |  |              %% rabbitmq_mqtt.ssl_cert_login is true
 | 
					
						
							|  |  |  |              {false, _,    _,    true}  -> SSLLoginName; | 
					
						
							| 
									
										
										
										
											2014-11-24 21:59:19 +08:00
										 |  |  |              _                          -> nocreds | 
					
						
							| 
									
										
										
										
											2012-07-04 00:35:18 +08:00
										 |  |  |         end, | 
					
						
							| 
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 |  |  |     case U of | 
					
						
							|  |  |  |         nocreds -> | 
					
						
							|  |  |  |             nocreds; | 
					
						
							|  |  |  |         _ -> | 
					
						
							| 
									
										
										
										
											2015-11-29 01:20:32 +08:00
										 |  |  |             case {Pass =/= undefined, | 
					
						
							|  |  |  |                   is_binary(DefaultPass), | 
					
						
							|  |  |  |                   Anon =:= true, | 
					
						
							|  |  |  |                   TLSAuth} of | 
					
						
							|  |  |  |                  %% password provided
 | 
					
						
							| 
									
										
										
										
											2014-11-24 21:59:19 +08:00
										 |  |  |                  {true,  _,    _,    _} -> {U, list_to_binary(Pass)}; | 
					
						
							| 
									
										
										
										
											2015-11-29 01:20:32 +08:00
										 |  |  |                  %% password not provided, TLS certificate is present,
 | 
					
						
							|  |  |  |                  %% rabbitmq_mqtt.ssl_cert_login is true
 | 
					
						
							|  |  |  |                  {false, _, _, true}    -> {U, none}; | 
					
						
							|  |  |  |                  %% anonymous, default password is configured
 | 
					
						
							| 
									
										
										
										
											2015-07-14 18:03:31 +08:00
										 |  |  |                  {false, true, true, _} -> {U, DefaultPass}; | 
					
						
							| 
									
										
										
										
											2014-11-24 21:59:19 +08:00
										 |  |  |                  _                      -> {U, none} | 
					
						
							| 
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 |  |  |             end | 
					
						
							|  |  |  |     end. | 
					
						
							| 
									
										
										
										
											2012-07-04 00:35:18 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  | supported_subs_qos(?QOS_0) -> ?QOS_0; | 
					
						
							|  |  |  | supported_subs_qos(?QOS_1) -> ?QOS_1; | 
					
						
							|  |  |  | supported_subs_qos(?QOS_2) -> ?QOS_1. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-01-30 23:33:59 +08:00
										 |  |  | delivery_mode(?QOS_0) -> 1; | 
					
						
							|  |  |  | delivery_mode(?QOS_1) -> 2. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  | %% different qos subscriptions are received in different queues
 | 
					
						
							|  |  |  | %% with appropriate durability and timeout arguments
 | 
					
						
							|  |  |  | %% this will lead to duplicate messages for overlapping subscriptions
 | 
					
						
							|  |  |  | %% with different qos values - todo: prevent duplicates
 | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  | ensure_queue(Qos, #proc_state{ channels      = {Channel, _}, | 
					
						
							|  |  |  |                                client_id     = ClientId, | 
					
						
							|  |  |  |                                clean_sess    = CleanSess, | 
					
						
							|  |  |  |                           consumer_tags = {TagQ0, TagQ1} = Tags} = PState) -> | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |     {QueueQ0, QueueQ1} = rabbit_mqtt_util:subcription_queue_name(ClientId), | 
					
						
							|  |  |  |     Qos1Args = case {rabbit_mqtt_util:env(subscription_ttl), CleanSess} of | 
					
						
							| 
									
										
										
										
											2012-11-06 18:32:38 +08:00
										 |  |  |                    {undefined, _} -> | 
					
						
							|  |  |  |                        []; | 
					
						
							|  |  |  |                    {Ms, false} when is_integer(Ms) -> | 
					
						
							|  |  |  |                        [{<<"x-expires">>, long, Ms}]; | 
					
						
							|  |  |  |                    _ -> | 
					
						
							|  |  |  |                        [] | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |                end, | 
					
						
							|  |  |  |     QueueSetup = | 
					
						
							|  |  |  |         case {TagQ0, TagQ1, Qos} of | 
					
						
							|  |  |  |             {undefined, _, ?QOS_0} -> | 
					
						
							|  |  |  |                 {QueueQ0, | 
					
						
							|  |  |  |                  #'queue.declare'{ queue       = QueueQ0, | 
					
						
							|  |  |  |                                    durable     = false, | 
					
						
							|  |  |  |                                    auto_delete = true }, | 
					
						
							|  |  |  |                  #'basic.consume'{ queue  = QueueQ0, | 
					
						
							|  |  |  |                                    no_ack = true }}; | 
					
						
							|  |  |  |             {_, undefined, ?QOS_1} -> | 
					
						
							|  |  |  |                 {QueueQ1, | 
					
						
							|  |  |  |                  #'queue.declare'{ queue       = QueueQ1, | 
					
						
							|  |  |  |                                    durable     = true, | 
					
						
							| 
									
										
										
										
											2015-10-22 01:18:33 +08:00
										 |  |  |                                    %% Clean session means a transient connection,
 | 
					
						
							|  |  |  |                                    %% translating into auto-delete.
 | 
					
						
							|  |  |  |                                    %%
 | 
					
						
							|  |  |  |                                    %% see rabbitmq/rabbitmq-mqtt#37
 | 
					
						
							|  |  |  |                                    auto_delete = CleanSess, | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |                                    arguments   = Qos1Args }, | 
					
						
							|  |  |  |                  #'basic.consume'{ queue  = QueueQ1, | 
					
						
							|  |  |  |                                    no_ack = false }}; | 
					
						
							|  |  |  |             {_, _, ?QOS_0} -> | 
					
						
							|  |  |  |                 {exists, QueueQ0}; | 
					
						
							|  |  |  |             {_, _, ?QOS_1} -> | 
					
						
							|  |  |  |                 {exists, QueueQ1} | 
					
						
							|  |  |  |           end, | 
					
						
							|  |  |  |     case QueueSetup of | 
					
						
							|  |  |  |         {Queue, Declare, Consume} -> | 
					
						
							|  |  |  |             #'queue.declare_ok'{} = amqp_channel:call(Channel, Declare), | 
					
						
							|  |  |  |             #'basic.consume_ok'{ consumer_tag = Tag } = | 
					
						
							|  |  |  |                 amqp_channel:call(Channel, Consume), | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |             {Queue, PState #proc_state{ consumer_tags = setelement(Qos+1, Tags, Tag) }}; | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |         {exists, Q} -> | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |             {Q, PState} | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |     end. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  | send_will(PState = #proc_state{ will_msg = WillMsg }) -> | 
					
						
							|  |  |  |     amqp_pub(WillMsg, PState). | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | amqp_pub(undefined, PState) -> | 
					
						
							|  |  |  |     PState; | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | %% set up a qos1 publishing channel if necessary
 | 
					
						
							|  |  |  | %% this channel will only be used for publishing, not consuming
 | 
					
						
							|  |  |  | amqp_pub(Msg   = #mqtt_msg{ qos = ?QOS_1 }, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |          PState = #proc_state{ channels       = {ChQos0, undefined}, | 
					
						
							|  |  |  |                                awaiting_seqno = undefined, | 
					
						
							|  |  |  |                                connection     = Conn }) -> | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |     {ok, Channel} = amqp_connection:open_channel(Conn), | 
					
						
							|  |  |  |     #'confirm.select_ok'{} = amqp_channel:call(Channel, #'confirm.select'{}), | 
					
						
							|  |  |  |     amqp_channel:register_confirm_handler(Channel, self()), | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |     amqp_pub(Msg, PState #proc_state{ channels       = {ChQos0, Channel}, | 
					
						
							|  |  |  |                                       awaiting_seqno = 1 }); | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-08-17 22:05:14 +08:00
										 |  |  | amqp_pub(#mqtt_msg{ qos        = Qos, | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |                     topic      = Topic, | 
					
						
							|  |  |  |                     dup        = Dup, | 
					
						
							|  |  |  |                     message_id = MessageId, | 
					
						
							|  |  |  |                     payload    = Payload }, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |          PState = #proc_state{ channels       = {ChQos0, ChQos1}, | 
					
						
							|  |  |  |                                exchange       = Exchange, | 
					
						
							|  |  |  |                                unacked_pubs   = UnackedPubs, | 
					
						
							|  |  |  |                                awaiting_seqno = SeqNo }) -> | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |     Method = #'basic.publish'{ exchange    = Exchange, | 
					
						
							|  |  |  |                                routing_key = | 
					
						
							| 
									
										
										
										
											2012-08-17 01:01:45 +08:00
										 |  |  |                                    rabbit_mqtt_util:mqtt2amqp(Topic)}, | 
					
						
							| 
									
										
										
										
											2012-11-06 18:32:38 +08:00
										 |  |  |     Headers = [{<<"x-mqtt-publish-qos">>, byte, Qos}, | 
					
						
							|  |  |  |                {<<"x-mqtt-dup">>, bool, Dup}], | 
					
						
							| 
									
										
										
										
											2014-01-30 23:33:59 +08:00
										 |  |  |     Msg = #amqp_msg{ props   = #'P_basic'{ headers       = Headers, | 
					
						
							|  |  |  |                                            delivery_mode = delivery_mode(Qos)}, | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |                      payload = Payload }, | 
					
						
							|  |  |  |     {UnackedPubs1, Ch, SeqNo1} = | 
					
						
							|  |  |  |         case Qos =:= ?QOS_1 andalso MessageId =/= undefined of | 
					
						
							|  |  |  |             true  -> {gb_trees:enter(SeqNo, MessageId, UnackedPubs), ChQos1, | 
					
						
							|  |  |  |                       SeqNo + 1}; | 
					
						
							|  |  |  |             false -> {UnackedPubs, ChQos0, SeqNo} | 
					
						
							|  |  |  |         end, | 
					
						
							|  |  |  |     amqp_channel:cast_flow(Ch, Method, Msg), | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |     PState #proc_state{ unacked_pubs   = UnackedPubs1, | 
					
						
							|  |  |  |                         awaiting_seqno = SeqNo1 }. | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-03-19 20:05:42 +08:00
										 |  |  | adapter_info(Sock, ProtoVer) -> | 
					
						
							| 
									
										
										
										
											2013-01-09 20:06:09 +08:00
										 |  |  |     amqp_connection:socket_adapter_info( | 
					
						
							| 
									
										
										
										
											2014-08-25 17:43:11 +08:00
										 |  |  |              Sock, {'MQTT', human_readable_mqtt_version(ProtoVer)}). | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | human_readable_mqtt_version(3) -> | 
					
						
							|  |  |  |     "3.1.0"; | 
					
						
							|  |  |  | human_readable_mqtt_version(4) -> | 
					
						
							|  |  |  |     "3.1.1"; | 
					
						
							|  |  |  | human_readable_mqtt_version(_) -> | 
					
						
							|  |  |  |     "N/A". | 
					
						
							| 
									
										
										
										
											2012-08-17 00:56:50 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  | send_client(Frame, #proc_state{ socket = Sock }) -> | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |     %rabbit_log:info("MQTT sending frame ~p ~n", [Frame]),
 | 
					
						
							|  |  |  |     rabbit_net:port_command(Sock, rabbit_mqtt_frame:serialise(Frame)). | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | close_connection(PState = #proc_state{ connection = undefined }) -> | 
					
						
							|  |  |  |     PState; | 
					
						
							| 
									
										
										
										
											2013-09-16 18:36:52 +08:00
										 |  |  | close_connection(PState = #proc_state{ connection = Connection, | 
					
						
							|  |  |  |                                        client_id  = ClientId }) -> | 
					
						
							|  |  |  |     % todo: maybe clean session
 | 
					
						
							|  |  |  |     case ClientId of | 
					
						
							|  |  |  |         undefined -> ok; | 
					
						
							| 
									
										
										
										
											2013-11-14 21:48:14 +08:00
										 |  |  |         _         -> ok = rabbit_mqtt_collector:unregister(ClientId, self()) | 
					
						
							| 
									
										
										
										
											2013-09-16 18:36:52 +08:00
										 |  |  |     end, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |     %% ignore noproc or other exceptions to avoid debris
 | 
					
						
							|  |  |  |     catch amqp_connection:close(Connection), | 
					
						
							|  |  |  |     PState #proc_state{ channels   = {undefined, undefined}, | 
					
						
							|  |  |  |                         connection = undefined }. | 
					
						
							| 
									
										
										
										
											2015-06-30 16:08:26 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | % NB: check_*_or_die: MQTT spec says we should ack normally, ie pretend there
 | 
					
						
							|  |  |  | % was no auth error, but here we are closing the connection with an error. This
 | 
					
						
							|  |  |  | % is what happens anyway if there is an authorization failure at the AMQP level.
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-07-14 17:34:25 +08:00
										 |  |  | check_publish_or_die(TopicName, Fn, PState) -> | 
					
						
							| 
									
										
										
										
											2015-06-30 16:08:26 +08:00
										 |  |  |   case check_topic_access(TopicName, write, PState) of | 
					
						
							| 
									
										
										
										
											2015-07-14 17:47:39 +08:00
										 |  |  |     ok -> Fn(); | 
					
						
							| 
									
										
										
										
											2015-07-14 17:47:01 +08:00
										 |  |  |     _ -> {err, unauthorized, PState} | 
					
						
							| 
									
										
										
										
											2015-06-30 16:08:26 +08:00
										 |  |  |   end. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-07-14 17:47:01 +08:00
										 |  |  | check_subscribe_or_die([], Fn, _) -> | 
					
						
							| 
									
										
										
										
											2015-07-14 17:47:39 +08:00
										 |  |  |   Fn(); | 
					
						
							| 
									
										
										
										
											2015-06-30 16:08:26 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-07-14 17:52:04 +08:00
										 |  |  | check_subscribe_or_die([#mqtt_topic{name = TopicName} | Topics], Fn, PState) -> | 
					
						
							| 
									
										
										
										
											2015-06-30 16:08:26 +08:00
										 |  |  |   case check_topic_access(TopicName, read, PState) of | 
					
						
							| 
									
										
										
										
											2015-07-14 17:47:01 +08:00
										 |  |  |     ok -> check_subscribe_or_die(Topics, Fn, PState); | 
					
						
							|  |  |  |     _ -> {err, unauthorized, PState} | 
					
						
							| 
									
										
										
										
											2015-06-30 16:08:26 +08:00
										 |  |  |   end. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | check_topic_access(TopicName, Access, | 
					
						
							|  |  |  |                    #proc_state{ | 
					
						
							| 
									
										
										
										
											2015-07-14 17:52:04 +08:00
										 |  |  |                       auth_state = #auth_state{user = User, | 
					
						
							|  |  |  |                                                vhost = VHost}}) -> | 
					
						
							|  |  |  |   Resource = #resource{virtual_host = VHost, | 
					
						
							|  |  |  |                        kind = topic, | 
					
						
							|  |  |  |                        name = TopicName}, | 
					
						
							| 
									
										
										
										
											2015-06-30 16:08:26 +08:00
										 |  |  |   rabbit_access_control:check_resource_access(User, Resource, Access). |