2012-06-27 00:57:24 +08:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								%% The contents of this file are subject to the Mozilla Public License
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								%% Version 1.1 (the "License"); you may not use this file except in
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								%% compliance with the License. You may obtain a copy of the License
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								%% at http://www.mozilla.org/MPL/
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								%%
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								%% Software distributed under the License is distributed on an "AS IS"
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								%% the License for the specific language governing rights and
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								%% limitations under the License.
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								%%
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								%% The Original Code is RabbitMQ.
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								%%
							 | 
						
					
						
							
								
									
										
										
										
											2013-07-01 17:49:14 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								%% The Initial Developer of the Original Code is GoPivotal, Inc.
							 | 
						
					
						
							
								
									
										
										
										
											2014-03-18 01:25:23 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								%% Copyright (c) 2007-2014 GoPivotal, Inc.  All rights reserved.
							 | 
						
					
						
							
								
									
										
										
										
											2012-06-27 00:57:24 +08:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								%%
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								-module(rabbit_mqtt_processor).
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-21 22:54:33 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								-export([info/2, initial_state/2,
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								         process_frame/2, amqp_pub/2, amqp_callback/2, send_will/1,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								         close_connection/1]).
							 | 
						
					
						
							
								
									
										
										
										
											2012-06-27 00:57:24 +08:00
										 
									 
								 
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								-include_lib("amqp_client/include/amqp_client.hrl").
							 | 
						
					
						
							
								
									
										
										
										
											2012-09-12 21:34:41 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								-include("rabbit_mqtt_frame.hrl").
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								-include("rabbit_mqtt.hrl").
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-13 23:32:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								-define(FRAME_TYPE(Frame, Type),
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}).
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-21 22:54:33 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								initial_state(Socket,SSLLoginName) ->
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    #proc_state{ unacked_pubs  = gb_trees:empty(),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                 awaiting_ack  = gb_trees:empty(),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                 message_id    = 1,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                 subscriptions = dict:new(),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                 consumer_tags = {undefined, undefined},
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                 channels      = {undefined, undefined},
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                 exchange      = rabbit_mqtt_util:env(exchange),
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-21 22:54:33 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                 socket        = Socket,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                 ssl_login_name = SSLLoginName }.
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								info(client_id, #proc_state{ client_id = ClientId }) -> ClientId.
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-13 23:32:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								process_frame(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								              PState = #proc_state{ connection = undefined } )
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								  when Type =/= ?CONNECT ->
							 | 
						
					
						
							
								
									
										
										
										
											2014-07-04 01:36:17 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    {error, connect_expected, PState};
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-13 23:32:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								              PState ) ->
							 | 
						
					
						
							
								
									
										
										
										
											2014-03-31 20:17:29 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    %%rabbit_log:info("MQTT received frame ~p ~n", [Frame]),
							 | 
						
					
						
							
								
									
										
										
										
											2014-07-04 01:36:17 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    try process_request(Type, Frame, PState) of
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        Result -> Result
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    catch _:Error ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        close_connection(PState),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        {error, Error}
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    end.
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-13 23:32:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								process_request(?CONNECT,
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                #mqtt_frame{ variable = #mqtt_frame_connect{
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                                          username   = Username,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                          password   = Password,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                          proto_ver  = ProtoVersion,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                          clean_sess = CleanSess,
							 | 
						
					
						
							
								
									
										
										
										
											2014-03-19 20:05:42 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                                          client_id  = ClientId0,
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-21 22:54:33 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                                          keep_alive = Keepalive} = Var},
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                PState = #proc_state{ ssl_login_name = SSLLoginName }) ->
							 | 
						
					
						
							
								
									
										
										
										
											2014-03-19 20:05:42 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    ClientId = case ClientId0 of
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                   []    -> rabbit_mqtt_util:gen_client_id();
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                   [_|_] -> ClientId0
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								               end,
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    {ReturnCode, PState1} =
							 | 
						
					
						
							
								
									
										
										
										
											2014-03-19 20:05:42 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								              ClientId0 =:= [] andalso CleanSess =:= false} of
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-04 00:35:18 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            {false, _} ->
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                {?CONNACK_PROTO_VER, PState};
							 | 
						
					
						
							
								
									
										
										
										
											2014-03-19 20:05:42 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            {_, true} ->
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                {?CONNACK_INVALID_ID, PState};
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-04 00:35:18 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            _ ->
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-21 22:54:33 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                case creds(Username, Password, SSLLoginName) of
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                    nocreds ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                        rabbit_log:error("MQTT login failed - no credentials~n"),
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                        {?CONNACK_CREDENTIALS, PState};
							 | 
						
					
						
							
								
									
										
										
										
											2012-09-19 22:34:42 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                    {UserBin, PassBin} ->
							 | 
						
					
						
							
								
									
										
										
										
											2014-03-19 20:05:42 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                        case process_login(UserBin, PassBin, ProtoVersion, PState) of
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                            {?CONNACK_ACCEPT, Conn} ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                link(Conn),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                {ok, Ch} = amqp_connection:open_channel(Conn),
							 | 
						
					
						
							
								
									
										
										
										
											2014-08-11 18:26:27 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                                amqp_channel:enable_delivery_flow_control(Ch),
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-08 20:37:59 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                                ok = rabbit_mqtt_collector:register(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                  ClientId, self()),
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-09 20:52:19 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                                Prefetch = rabbit_mqtt_util:env(prefetch),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                #'basic.qos_ok'{} = amqp_channel:call(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                  Ch, #'basic.qos'{prefetch_count = Prefetch}),
							 | 
						
					
						
							
								
									
										
										
										
											2014-01-07 01:50:02 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                                rabbit_mqtt_reader:start_keepalive(self(), Keepalive),
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                                {?CONNACK_ACCEPT,
							 | 
						
					
						
							
								
									
										
										
										
											2013-07-27 00:43:25 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                                 maybe_clean_sess(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                   PState #proc_state{ will_msg   = make_will_msg(Var),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                                       clean_sess = CleanSess,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                                       channels   = {Ch, undefined},
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                                       connection = Conn,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                                       client_id  = ClientId })};
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                            ConnAck ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                {ConnAck, PState}
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                        end
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-04 00:35:18 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                end
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        end,
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    send_client(#mqtt_frame{ fixed    = #mqtt_frame_fixed{ type = ?CONNACK},
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                             variable = #mqtt_frame_connack{
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                                         return_code = ReturnCode }}, PState1),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    {ok, PState1};
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-04 00:35:18 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								process_request(?PUBACK,
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                #mqtt_frame{
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                  variable = #mqtt_frame_publish{ message_id = MessageId }},
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                #proc_state{ channels     = {Channel, _},
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                             awaiting_ack = Awaiting } = PState) ->
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    Tag = gb_trees:get(MessageId, Awaiting),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    amqp_channel:cast(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								       Channel, #'basic.ack'{ delivery_tag = Tag }),
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    {ok, PState #proc_state{ awaiting_ack = gb_trees:delete( MessageId, Awaiting)}};
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								process_request(?PUBLISH,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                #mqtt_frame{
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                  fixed = #mqtt_frame_fixed{ qos = ?QOS_2 }}, PState) ->
							 | 
						
					
						
							
								
									
										
										
										
											2014-07-04 01:36:17 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    {error, qos2_not_supported, PState};
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-04 23:31:30 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								process_request(?PUBLISH,
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                #mqtt_frame{
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                  fixed = #mqtt_frame_fixed{ qos    = Qos,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                             retain = Retain,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                             dup    = Dup },
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                  variable = #mqtt_frame_publish{ topic_name = Topic,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                                  message_id = MessageId },
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                  payload = Payload }, PState) ->
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    {ok, amqp_pub(#mqtt_msg{ retain     = Retain,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                             qos        = Qos,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                             topic      = Topic,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                             dup        = Dup,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                             message_id = MessageId,
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                             payload    = Payload }, PState)};
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-04 23:31:30 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								process_request(?SUBSCRIBE,
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                #mqtt_frame{
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                  variable = #mqtt_frame_subscribe{ message_id  = MessageId,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                                    topic_table = Topics },
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                  payload = undefined },
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                #proc_state{ channels = {Channel, _},
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                             exchange = Exchange} = PState0) ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    {QosResponse, PState1} =
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        lists:foldl(fun (#mqtt_topic{ name = TopicName,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                       qos  = Qos }, {QosList, PState}) ->
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                       SupportedQos = supported_subs_qos(Qos),
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                       {Queue, #proc_state{ subscriptions = Subs } = PState1} =
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                           ensure_queue(SupportedQos, PState),
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                       Binding = #'queue.bind'{
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                   queue       = Queue,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                   exchange    = Exchange,
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-17 01:01:45 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                                   routing_key = rabbit_mqtt_util:mqtt2amqp(
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                                                   TopicName)},
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                       #'queue.bind_ok'{} = amqp_channel:call(Channel, Binding),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                       {[SupportedQos | QosList],
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                        PState1 #proc_state{ subscriptions =
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                             dict:append(TopicName, SupportedQos, Subs) }}
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                   end, {[], PState0}, Topics),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    send_client(#mqtt_frame{ fixed    = #mqtt_frame_fixed{ type = ?SUBACK },
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                             variable = #mqtt_frame_suback{
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-04 23:31:30 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                                         message_id = MessageId,
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                                         qos_table  = QosResponse }}, PState1),
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    {ok, PState1};
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-05 00:47:07 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								process_request(?UNSUBSCRIBE,
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                #mqtt_frame{
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                  variable = #mqtt_frame_subscribe{ message_id  = MessageId,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                                    topic_table = Topics },
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                  payload = undefined }, #proc_state{ channels      = {Channel, _},
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                                      exchange      = Exchange,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                                      client_id     = ClientId,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                                      subscriptions = Subs0} = PState) ->
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    Queues = rabbit_mqtt_util:subcription_queue_name(ClientId),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    Subs1 =
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    lists:foldl(
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								      fun (#mqtt_topic{ name = TopicName }, Subs) ->
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        QosSubs = case dict:find(TopicName, Subs) of
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                      {ok, Val} when is_list(Val) -> lists:usort(Val);
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                      error                       -> []
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                  end,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        lists:foreach(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								          fun (QosSub) ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                  Queue = element(QosSub + 1, Queues),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                  Binding = #'queue.unbind'{
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                              queue       = Queue,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                              exchange    = Exchange,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                              routing_key =
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-17 01:01:45 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                                  rabbit_mqtt_util:mqtt2amqp(TopicName)},
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                  #'queue.unbind_ok'{} = amqp_channel:call(Channel, Binding)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								          end, QosSubs),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        dict:erase(TopicName, Subs)
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								      end, Subs0, Topics),
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    send_client(#mqtt_frame{ fixed    = #mqtt_frame_fixed { type       = ?UNSUBACK },
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                             variable = #mqtt_frame_suback{ message_id = MessageId }},
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                PState),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    {ok, PState #proc_state{ subscriptions = Subs1 }};
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-03 16:55:31 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								process_request(?PINGREQ, #mqtt_frame{}, PState) ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    send_client(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }},
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                PState),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    {ok, PState};
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-04 23:31:30 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								process_request(?DISCONNECT, #mqtt_frame{}, PState) ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    {stop, PState}.
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-04 00:35:18 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								%%----------------------------------------------------------------------------
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								amqp_callback({#'basic.deliver'{ consumer_tag = ConsumerTag,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                 delivery_tag = DeliveryTag,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                 routing_key  = RoutingKey },
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								               #amqp_msg{ props = #'P_basic'{ headers = Headers },
							 | 
						
					
						
							
								
									
										
										
										
											2014-08-11 18:26:27 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                          payload = Payload },
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								               DeliveryCtx} = Delivery,
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								              #proc_state{ channels      = {Channel, _},
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                           awaiting_ack  = Awaiting,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                           message_id    = MsgId } = PState) ->
							 | 
						
					
						
							
								
									
										
										
										
											2014-08-11 18:26:27 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    amqp_channel:notify_received(DeliveryCtx),
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    case {delivery_dup(Delivery), delivery_qos(ConsumerTag, Headers, PState)} of
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-17 22:05:14 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        {true, {?QOS_0, ?QOS_1}} ->
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-17 01:09:21 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            amqp_channel:cast(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								              Channel, #'basic.ack'{ delivery_tag = DeliveryTag }),
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            {ok, PState};
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-17 22:05:14 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        {true, {?QOS_0, ?QOS_0}} ->
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            {ok, PState};
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        {Dup, {DeliveryQos, _SubQos} = Qos}     ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            send_client(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								              #mqtt_frame{ fixed = #mqtt_frame_fixed{
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                     type = ?PUBLISH,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                     qos  = DeliveryQos,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                     dup  = Dup },
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                           variable = #mqtt_frame_publish{
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                                        message_id =
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                          case DeliveryQos of
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                              ?QOS_0 -> undefined;
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                              ?QOS_1 -> MsgId
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                          end,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                        topic_name =
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-17 01:01:45 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                                          rabbit_mqtt_util:amqp2mqtt(
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                                            RoutingKey) },
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                           payload = Payload}, PState),
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								              case Qos of
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                  {?QOS_0, ?QOS_0} ->
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                      {ok, PState};
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                  {?QOS_1, ?QOS_1} ->
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                      {ok,
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                       next_msg_id(
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                         PState #proc_state{
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                           awaiting_ack =
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                             gb_trees:insert(MsgId, DeliveryTag, Awaiting)})};
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                  {?QOS_0, ?QOS_1} ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                      amqp_channel:cast(
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-17 01:09:21 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                        Channel, #'basic.ack'{ delivery_tag = DeliveryTag }),
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                      {ok, PState}
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								              end
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    end;
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								amqp_callback(#'basic.ack'{ multiple = true, delivery_tag = Tag } = Ack,
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								              PState = #proc_state{ unacked_pubs = UnackedPubs }) ->
							 | 
						
					
						
							
								
									
										
										
										
											2013-12-04 22:59:22 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    case gb_trees:size(UnackedPubs) > 0 andalso
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								         gb_trees:take_smallest(UnackedPubs) of
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        {TagSmall, MsgId, UnackedPubs1} when TagSmall =< Tag ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            send_client(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								              #mqtt_frame{ fixed    = #mqtt_frame_fixed{ type = ?PUBACK },
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                           variable = #mqtt_frame_publish{ message_id = MsgId }},
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								              PState),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            amqp_callback(Ack, PState #proc_state{ unacked_pubs = UnackedPubs1 });
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        _ ->
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            {ok, PState}
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    end;
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								amqp_callback(#'basic.ack'{ multiple = false, delivery_tag = Tag },
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								              PState = #proc_state{ unacked_pubs = UnackedPubs }) ->
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    send_client(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								      #mqtt_frame{ fixed    = #mqtt_frame_fixed{ type = ?PUBACK },
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                   variable = #mqtt_frame_publish{
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                message_id = gb_trees:get(
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                                               Tag, UnackedPubs) }}, PState),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    {ok, PState #proc_state{ unacked_pubs = gb_trees:delete(Tag, UnackedPubs) }}.
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								delivery_dup({#'basic.deliver'{ redelivered = Redelivered },
							 | 
						
					
						
							
								
									
										
										
										
											2014-08-11 18:26:27 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								              #amqp_msg{ props = #'P_basic'{ headers = Headers }},
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								              _DeliveryCtx}) ->
							 | 
						
					
						
							
								
									
										
										
										
											2012-11-06 18:32:38 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    case rabbit_mqtt_util:table_lookup(Headers, <<"x-mqtt-dup">>) of
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        undefined   -> Redelivered;
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        {bool, Dup} -> Redelivered orelse Dup
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    end.
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								next_msg_id(PState = #proc_state{ message_id = 16#ffff }) ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    PState #proc_state{ message_id = 1 };
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								next_msg_id(PState = #proc_state{ message_id = MsgId }) ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    PState #proc_state{ message_id = MsgId + 1 }.
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								%% decide at which qos level to deliver based on subscription
							 | 
						
					
						
							
								
									
										
										
										
											2012-11-06 04:30:45 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								%% and the message publish qos level. non-MQTT publishes are
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								%% assumed to be qos 1, regardless of delivery_mode.
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								delivery_qos(Tag, _Headers,  #proc_state{ consumer_tags = {Tag, _} }) ->
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    {?QOS_0, ?QOS_0};
							 | 
						
					
						
							
								
									
										
										
										
											2012-11-06 04:30:45 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								delivery_qos(Tag, Headers,   #proc_state{ consumer_tags = {_, Tag} }) ->
							 | 
						
					
						
							
								
									
										
										
										
											2012-11-06 18:32:38 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    case rabbit_mqtt_util:table_lookup(Headers, <<"x-mqtt-publish-qos">>) of
							 | 
						
					
						
							
								
									
										
										
										
											2012-11-06 04:30:45 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        {byte, Qos} -> {lists:min([Qos, ?QOS_1]), ?QOS_1};
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        undefined   -> {?QOS_1, ?QOS_1}
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    end.
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2013-07-27 00:43:25 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								maybe_clean_sess(PState = #proc_state { clean_sess = false }) ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    {_Queue, PState1} = ensure_queue(?QOS_1, PState),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    PState1;
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								maybe_clean_sess(PState = #proc_state { clean_sess = true,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                        connection = Conn,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                        client_id  = ClientId }) ->
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    {_, Queue} = rabbit_mqtt_util:subcription_queue_name(ClientId),
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    {ok, Channel} = amqp_connection:open_channel(Conn),
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    try amqp_channel:call(Channel, #'queue.delete'{ queue = Queue }) of
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        #'queue.delete_ok'{} -> ok = amqp_channel:close(Channel)
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    catch
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-17 05:52:53 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        exit:_Error -> ok
							 | 
						
					
						
							
								
									
										
										
										
											2013-07-27 00:43:25 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    end,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    PState.
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								%%----------------------------------------------------------------------------
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								make_will_msg(#mqtt_frame_connect{ will_flag   = false }) ->
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    undefined;
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								make_will_msg(#mqtt_frame_connect{ will_retain = Retain,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                   will_qos    = Qos,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                   will_topic  = Topic,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                   will_msg    = Msg }) ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    #mqtt_msg{ retain  = Retain,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								               qos     = Qos,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								               topic   = Topic,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								               dup     = false,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								               payload = Msg }.
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-04 00:35:18 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2014-03-19 20:05:42 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								process_login(UserBin, PassBin, ProtoVersion,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								              #proc_state{ channels  = {undefined, undefined},
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                           socket    = Sock }) ->
							 | 
						
					
						
							
								
									
										
										
										
											2014-02-19 01:33:00 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    {VHost, UsernameBin} = get_vhost_username(UserBin),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    case amqp_connection:start(#amqp_params_direct{
							 | 
						
					
						
							
								
									
										
										
										
											2013-11-18 20:00:47 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                                  username     = UsernameBin,
							 | 
						
					
						
							
								
									
										
										
										
											2012-09-19 22:34:42 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                                  password     = PassBin,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                  virtual_host = VHost,
							 | 
						
					
						
							
								
									
										
										
										
											2014-03-19 20:05:42 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                                  adapter_info = adapter_info(Sock, ProtoVersion)}) of
							 | 
						
					
						
							
								
									
										
										
										
											2014-02-19 01:33:00 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        {ok, Connection} ->
							 | 
						
					
						
							
								
									
										
										
										
											2014-02-19 01:33:33 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            case rabbit_access_control:check_user_loopback(UsernameBin, Sock) of
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                ok          -> {?CONNACK_ACCEPT, Connection};
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                not_allowed -> amqp_connection:close(Connection),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                               rabbit_log:warning(
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                 "MQTT login failed for ~p access_refused "
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                 "(access must be from localhost)~n",
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                 [binary_to_list(UsernameBin)]),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                               ?CONNACK_AUTH
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            end;
							 | 
						
					
						
							
								
									
										
										
										
											2014-02-19 01:33:00 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        {error, {auth_failure, Explanation}} ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            rabbit_log:error("MQTT login failed for ~p auth_failure: ~s~n",
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                             [binary_to_list(UserBin), Explanation]),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            ?CONNACK_CREDENTIALS;
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        {error, access_refused} ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            rabbit_log:warning("MQTT login failed for ~p access_refused "
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                               "(vhost access not allowed)~n",
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                               [binary_to_list(UserBin)]),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            ?CONNACK_AUTH
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    end.
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2013-11-18 20:00:47 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								get_vhost_username(UserBin) ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    %% split at the last colon, disallowing colons in username
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    case re:split(UserBin, ":(?!.*?:)") of
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        [Vhost, UserName] -> {Vhost,  UserName};
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        [UserBin]         -> {rabbit_mqtt_util:env(vhost), UserBin}
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    end.
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-21 22:54:33 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								creds(User, Pass, SSLLoginName) ->
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    DefaultUser = rabbit_mqtt_util:env(default_user),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    DefaultPass = rabbit_mqtt_util:env(default_pass),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    Anon        = rabbit_mqtt_util:env(allow_anonymous),
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-24 21:58:48 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    U = case {User =/= undefined, is_binary(DefaultUser), Anon =:= true, SSLLoginName =/= none} of
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								             {true,  _,    _,    _ }    -> list_to_binary(User);
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								             {false, _,    _,    true}  -> SSLLoginName;
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-21 22:54:33 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								             {false, true, true, false} -> DefaultUser;
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								             _                   -> nocreds
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-04 00:35:18 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        end,
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    case U of
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        nocreds ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            nocreds;
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        _ ->
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-21 22:54:33 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            case {Pass =/= undefined, is_binary(DefaultPass), Anon =:= true, SSLLoginName == U} of
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                 {true,  _,    _,    _ } -> {U, list_to_binary(Pass)};
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-24 21:58:48 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                 {false, _,    _,    _ } -> {U, none};
							 | 
						
					
						
							
								
									
										
										
										
											2014-11-21 22:54:33 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                 {false, true, true, _ } -> {U, DefaultPass};
							 | 
						
					
						
							
								
									
										
										
										
											2012-09-19 22:34:42 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                 _                   -> {U, none}
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            end
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    end.
							 | 
						
					
						
							
								
									
										
										
										
											2012-07-04 00:35:18 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								supported_subs_qos(?QOS_0) -> ?QOS_0;
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								supported_subs_qos(?QOS_1) -> ?QOS_1;
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								supported_subs_qos(?QOS_2) -> ?QOS_1.
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2014-01-30 23:33:59 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								delivery_mode(?QOS_0) -> 1;
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								delivery_mode(?QOS_1) -> 2.
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								%% different qos subscriptions are received in different queues
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								%% with appropriate durability and timeout arguments
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								%% this will lead to duplicate messages for overlapping subscriptions
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								%% with different qos values - todo: prevent duplicates
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								ensure_queue(Qos, #proc_state{ channels      = {Channel, _},
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                               client_id     = ClientId,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                               clean_sess    = CleanSess,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                          consumer_tags = {TagQ0, TagQ1} = Tags} = PState) ->
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    {QueueQ0, QueueQ1} = rabbit_mqtt_util:subcription_queue_name(ClientId),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    Qos1Args = case {rabbit_mqtt_util:env(subscription_ttl), CleanSess} of
							 | 
						
					
						
							
								
									
										
										
										
											2012-11-06 18:32:38 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                   {undefined, _} ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                       [];
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                   {Ms, false} when is_integer(Ms) ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                       [{<<"x-expires">>, long, Ms}];
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                   _ ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                       []
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								               end,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    QueueSetup =
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        case {TagQ0, TagQ1, Qos} of
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            {undefined, _, ?QOS_0} ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                {QueueQ0,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                 #'queue.declare'{ queue       = QueueQ0,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                   durable     = false,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                   auto_delete = true },
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                 #'basic.consume'{ queue  = QueueQ0,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                   no_ack = true }};
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            {_, undefined, ?QOS_1} ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                {QueueQ1,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                 #'queue.declare'{ queue       = QueueQ1,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                   durable     = true,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                   auto_delete = CleanSess,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                   arguments   = Qos1Args },
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                 #'basic.consume'{ queue  = QueueQ1,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                   no_ack = false }};
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            {_, _, ?QOS_0} ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                {exists, QueueQ0};
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            {_, _, ?QOS_1} ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                {exists, QueueQ1}
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								          end,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    case QueueSetup of
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        {Queue, Declare, Consume} ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            #'queue.declare_ok'{} = amqp_channel:call(Channel, Declare),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            #'basic.consume_ok'{ consumer_tag = Tag } =
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                amqp_channel:call(Channel, Consume),
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            {Queue, PState #proc_state{ consumer_tags = setelement(Qos+1, Tags, Tag) }};
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        {exists, Q} ->
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								            {Q, PState}
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    end.
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								send_will(PState = #proc_state{ will_msg = WillMsg }) ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    amqp_pub(WillMsg, PState).
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								amqp_pub(undefined, PState) ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    PState;
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								%% set up a qos1 publishing channel if necessary
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								%% this channel will only be used for publishing, not consuming
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								amqp_pub(Msg   = #mqtt_msg{ qos = ?QOS_1 },
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								         PState = #proc_state{ channels       = {ChQos0, undefined},
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                               awaiting_seqno = undefined,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                               connection     = Conn }) ->
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    {ok, Channel} = amqp_connection:open_channel(Conn),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    #'confirm.select_ok'{} = amqp_channel:call(Channel, #'confirm.select'{}),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    amqp_channel:register_confirm_handler(Channel, self()),
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    amqp_pub(Msg, PState #proc_state{ channels       = {ChQos0, Channel},
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                      awaiting_seqno = 1 });
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-17 22:05:14 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								amqp_pub(#mqtt_msg{ qos        = Qos,
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                    topic      = Topic,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                    dup        = Dup,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                    message_id = MessageId,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                    payload    = Payload },
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								         PState = #proc_state{ channels       = {ChQos0, ChQos1},
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                               exchange       = Exchange,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                               unacked_pubs   = UnackedPubs,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                               awaiting_seqno = SeqNo }) ->
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    Method = #'basic.publish'{ exchange    = Exchange,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                               routing_key =
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-17 01:01:45 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                                   rabbit_mqtt_util:mqtt2amqp(Topic)},
							 | 
						
					
						
							
								
									
										
										
										
											2012-11-06 18:32:38 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    Headers = [{<<"x-mqtt-publish-qos">>, byte, Qos},
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								               {<<"x-mqtt-dup">>, bool, Dup}],
							 | 
						
					
						
							
								
									
										
										
										
											2014-01-30 23:33:59 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    Msg = #amqp_msg{ props   = #'P_basic'{ headers       = Headers,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                           delivery_mode = delivery_mode(Qos)},
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								                     payload = Payload },
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    {UnackedPubs1, Ch, SeqNo1} =
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        case Qos =:= ?QOS_1 andalso MessageId =/= undefined of
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            true  -> {gb_trees:enter(SeqNo, MessageId, UnackedPubs), ChQos1,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                      SeqNo + 1};
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								            false -> {UnackedPubs, ChQos0, SeqNo}
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        end,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    amqp_channel:cast_flow(Ch, Method, Msg),
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    PState #proc_state{ unacked_pubs   = UnackedPubs1,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                        awaiting_seqno = SeqNo1 }.
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2014-03-19 20:05:42 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								adapter_info(Sock, ProtoVer) ->
							 | 
						
					
						
							
								
									
										
										
										
											2013-01-09 20:06:09 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    amqp_connection:socket_adapter_info(
							 | 
						
					
						
							
								
									
										
										
										
											2014-08-25 17:43:11 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								             Sock, {'MQTT', human_readable_mqtt_version(ProtoVer)}).
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								human_readable_mqtt_version(3) ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    "3.1.0";
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								human_readable_mqtt_version(4) ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    "3.1.1";
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								human_readable_mqtt_version(_) ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    "N/A".
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-17 00:56:50 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								send_client(Frame, #proc_state{ socket = Sock }) ->
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    %rabbit_log:info("MQTT sending frame ~p ~n", [Frame]),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    rabbit_net:port_command(Sock, rabbit_mqtt_frame:serialise(Frame)).
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								close_connection(PState = #proc_state{ connection = undefined }) ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    PState;
							 | 
						
					
						
							
								
									
										
										
										
											2013-09-16 18:36:52 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								close_connection(PState = #proc_state{ connection = Connection,
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                                       client_id  = ClientId }) ->
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    % todo: maybe clean session
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    case ClientId of
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								        undefined -> ok;
							 | 
						
					
						
							
								
									
										
										
										
											2013-11-14 21:48:14 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								        _         -> ok = rabbit_mqtt_collector:unregister(ClientId, self())
							 | 
						
					
						
							
								
									
										
										
										
											2013-09-16 18:36:52 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    end,
							 | 
						
					
						
							
								
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 
									 
								 
							 | 
							
								
									
										
									
								
							 | 
							
								
							 | 
							
							
								    %% ignore noproc or other exceptions to avoid debris
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    catch amqp_connection:close(Connection),
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								    PState #proc_state{ channels   = {undefined, undefined},
							 | 
						
					
						
							| 
								
							 | 
							
								
							 | 
							
								
							 | 
							
							
								                        connection = undefined }.
							 |