| 
									
										
										
										
											2020-07-14 00:39:36 +08:00
										 |  |  | %% This Source Code Form is subject to the terms of the Mozilla Public
 | 
					
						
							|  |  |  | %% License, v. 2.0. If a copy of the MPL was not distributed with this
 | 
					
						
							|  |  |  | %% file, You can obtain one at https://mozilla.org/MPL/2.0/.
 | 
					
						
							| 
									
										
										
										
											2012-06-27 00:57:24 +08:00
										 |  |  | %%
 | 
					
						
							| 
									
										
										
										
											2022-03-21 05:21:56 +08:00
										 |  |  | %% Copyright (c) 2007-2022 VMware, Inc. or its affiliates.  All rights reserved.
 | 
					
						
							| 
									
										
										
										
											2012-06-27 00:57:24 +08:00
										 |  |  | %%
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | -module(rabbit_mqtt_processor). | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-12-04 21:50:32 +08:00
										 |  |  | -export([info/2, initial_state/2, initial_state/5, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |          process_frame/2, amqp_pub/2, amqp_callback/2, send_will/1, | 
					
						
							| 
									
										
										
										
											2020-02-11 01:05:43 +08:00
										 |  |  |          close_connection/1, handle_pre_hibernate/0, | 
					
						
							|  |  |  |          handle_ra_event/2]). | 
					
						
							| 
									
										
										
										
											2012-06-27 00:57:24 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-11-19 01:38:35 +08:00
										 |  |  | %% for testing purposes
 | 
					
						
							| 
									
										
										
										
											2019-01-22 17:30:25 +08:00
										 |  |  | -export([get_vhost_username/1, get_vhost/3, get_vhost_from_user_mapping/2, | 
					
						
							| 
									
										
										
										
											2022-03-10 17:49:03 +08:00
										 |  |  |          add_client_id_to_adapter_info/2, maybe_quorum/3]). | 
					
						
							| 
									
										
										
										
											2015-11-19 01:38:35 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-06-27 00:57:24 +08:00
										 |  |  | -include_lib("amqp_client/include/amqp_client.hrl"). | 
					
						
							| 
									
										
										
										
											2012-09-12 21:34:41 +08:00
										 |  |  | -include("rabbit_mqtt_frame.hrl"). | 
					
						
							|  |  |  | -include("rabbit_mqtt.hrl"). | 
					
						
							| 
									
										
										
										
											2012-07-13 23:32:13 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-11-28 23:08:42 +08:00
										 |  |  | -define(APP, rabbitmq_mqtt). | 
					
						
							| 
									
										
										
										
											2012-07-13 23:32:13 +08:00
										 |  |  | -define(FRAME_TYPE(Frame, Type), | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |         Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}). | 
					
						
							| 
									
										
										
										
											2019-10-30 22:26:57 +08:00
										 |  |  | -define(MAX_TOPIC_PERMISSION_CACHE_SIZE, 12). | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-01-08 07:25:26 +08:00
										 |  |  | initial_state(Socket, SSLLoginName) -> | 
					
						
							| 
									
										
										
										
											2017-02-08 00:22:14 +08:00
										 |  |  |     RealSocket = rabbit_net:unwrap_socket(Socket), | 
					
						
							| 
									
										
										
										
											2018-12-04 21:50:32 +08:00
										 |  |  |     {ok, {PeerAddr, _PeerPort}} = rabbit_net:peername(RealSocket), | 
					
						
							| 
									
										
										
										
											2017-02-08 00:22:14 +08:00
										 |  |  |     initial_state(RealSocket, SSLLoginName, | 
					
						
							| 
									
										
										
										
											2016-02-02 22:31:28 +08:00
										 |  |  |         adapter_info(Socket, 'MQTT'), | 
					
						
							| 
									
										
										
										
											2019-02-05 07:49:36 +08:00
										 |  |  |         fun serialise_and_send_to_client/2, PeerAddr). | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-02-02 23:03:24 +08:00
										 |  |  | initial_state(Socket, SSLLoginName, | 
					
						
							| 
									
										
										
										
											2016-03-04 18:34:53 +08:00
										 |  |  |               AdapterInfo0 = #amqp_adapter_info{additional_info = Extra}, | 
					
						
							| 
									
										
										
										
											2018-12-04 21:50:32 +08:00
										 |  |  |               SendFun, PeerAddr) -> | 
					
						
							| 
									
										
										
										
											2019-09-04 23:07:33 +08:00
										 |  |  |     {ok, {mqtt2amqp_fun, M2A}, {amqp2mqtt_fun, A2M}} = | 
					
						
							|  |  |  |         rabbit_mqtt_util:get_topic_translation_funs(), | 
					
						
							| 
									
										
										
										
											2016-03-03 21:59:38 +08:00
										 |  |  |     %% MQTT connections use exactly one channel. The frame max is not
 | 
					
						
							|  |  |  |     %% applicable and there is no way to know what client is used.
 | 
					
						
							| 
									
										
										
										
											2016-03-04 18:34:53 +08:00
										 |  |  |     AdapterInfo = AdapterInfo0#amqp_adapter_info{additional_info = [ | 
					
						
							| 
									
										
										
										
											2016-02-02 23:03:24 +08:00
										 |  |  |         {channels, 1}, | 
					
						
							| 
									
										
										
										
											2016-03-03 21:59:38 +08:00
										 |  |  |         {channel_max, 1}, | 
					
						
							| 
									
										
										
										
											2016-03-04 18:58:59 +08:00
										 |  |  |         {frame_max, 0}, | 
					
						
							| 
									
										
										
										
											2016-03-04 18:34:53 +08:00
										 |  |  |         {client_properties, | 
					
						
							| 
									
										
										
										
											2016-03-04 18:35:53 +08:00
										 |  |  |          [{<<"product">>, longstr, <<"MQTT client">>}]} | Extra]}, | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  |     #proc_state{ unacked_pubs   = gb_trees:empty(), | 
					
						
							|  |  |  |                  awaiting_ack   = gb_trees:empty(), | 
					
						
							|  |  |  |                  message_id     = 1, | 
					
						
							| 
									
										
										
										
											2017-04-24 20:45:37 +08:00
										 |  |  |                  subscriptions  = #{}, | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  |                  consumer_tags  = {undefined, undefined}, | 
					
						
							|  |  |  |                  channels       = {undefined, undefined}, | 
					
						
							|  |  |  |                  exchange       = rabbit_mqtt_util:env(exchange), | 
					
						
							|  |  |  |                  socket         = Socket, | 
					
						
							| 
									
										
										
										
											2016-02-02 22:31:28 +08:00
										 |  |  |                  adapter_info   = AdapterInfo, | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  |                  ssl_login_name = SSLLoginName, | 
					
						
							| 
									
										
										
										
											2018-12-04 21:50:32 +08:00
										 |  |  |                  send_fun       = SendFun, | 
					
						
							| 
									
										
										
										
											2019-09-04 23:07:33 +08:00
										 |  |  |                  peer_addr      = PeerAddr, | 
					
						
							|  |  |  |                  mqtt2amqp_fun  = M2A, | 
					
						
							|  |  |  |                  amqp2mqtt_fun  = A2M}. | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  | process_frame(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |               PState = #proc_state{ connection = undefined } ) | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |   when Type =/= ?CONNECT -> | 
					
						
							| 
									
										
										
										
											2014-07-04 01:36:17 +08:00
										 |  |  |     {error, connect_expected, PState}; | 
					
						
							| 
									
										
										
										
											2012-07-13 23:32:13 +08:00
										 |  |  | process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}, | 
					
						
							| 
									
										
										
										
											2014-11-28 22:40:16 +08:00
										 |  |  |               PState) -> | 
					
						
							| 
									
										
										
										
											2021-10-26 22:29:41 +08:00
										 |  |  |     try process_request(Type, Frame, PState) of | 
					
						
							| 
									
										
										
										
											2016-03-03 21:59:38 +08:00
										 |  |  |         {ok, PState1} -> {ok, PState1, PState1#proc_state.connection}; | 
					
						
							|  |  |  |         Ret -> Ret | 
					
						
							| 
									
										
										
										
											2021-10-26 22:29:41 +08:00
										 |  |  |     catch | 
					
						
							|  |  |  |         _:{{shutdown, {server_initiated_close, 403, _}}, _} -> | 
					
						
							|  |  |  |             %% NB: MQTT spec says we should ack normally, ie pretend
 | 
					
						
							|  |  |  |             %% there was no auth error, but here we are closing the
 | 
					
						
							|  |  |  |             %% connection with an error. This is what happens anyway
 | 
					
						
							|  |  |  |             %% if there is an authorization failure at the AMQP 0-9-1
 | 
					
						
							|  |  |  |             %% client level. And error was already logged by AMQP
 | 
					
						
							|  |  |  |             %% channel, so no need for custom logging.
 | 
					
						
							|  |  |  |             {error, access_refused, PState} | 
					
						
							| 
									
										
										
										
											2016-03-03 21:59:38 +08:00
										 |  |  |     end. | 
					
						
							| 
									
										
										
										
											2012-07-13 23:32:13 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-01-22 17:30:25 +08:00
										 |  |  | add_client_id_to_adapter_info(ClientId, #amqp_adapter_info{additional_info = AdditionalInfo0} = AdapterInfo) -> | 
					
						
							|  |  |  |     AdditionalInfo1 = [{variable_map, #{<<"client_id">> => ClientId}} | 
					
						
							|  |  |  |         | AdditionalInfo0], | 
					
						
							|  |  |  |     ClientProperties = proplists:get_value(client_properties, AdditionalInfo1, []) | 
					
						
							|  |  |  |         ++ [{client_id, longstr, ClientId}], | 
					
						
							|  |  |  |     AdditionalInfo2 = case lists:keysearch(client_properties, 1, AdditionalInfo1) of | 
					
						
							|  |  |  |                           {value, _} -> | 
					
						
							|  |  |  |                               lists:keyreplace(client_properties, | 
					
						
							|  |  |  |                                                1, | 
					
						
							|  |  |  |                                                AdditionalInfo1, | 
					
						
							|  |  |  |                                                {client_properties, ClientProperties}); | 
					
						
							|  |  |  |                           false -> | 
					
						
							|  |  |  |                               [{client_properties, ClientProperties} | AdditionalInfo1] | 
					
						
							|  |  |  |                       end, | 
					
						
							|  |  |  |     AdapterInfo#amqp_adapter_info{additional_info = AdditionalInfo2}. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
											  
											
												Avoid crash when client disconnects before server handles MQTT CONNECT
In case of a resource alarm, the server accepts incoming TCP
connections, but does not read from the socket.
When a client connects during a resource alarm, the MQTT CONNECT frame
is therefore not processed.
While the resource alarm is ongoing, the client might time out waiting
on a CONNACK MQTT packet.
When the resource alarm clears on the server, the MQTT CONNECT frame
gets processed.
Prior to this commit, this results in the following crash on the server:
```
** Reason for termination ==
** {{badmatch,{error,einval}},
    [{rabbit_mqtt_processor,process_login,4,
                            [{file,"rabbit_mqtt_processor.erl"},{line,585}]},
     {rabbit_mqtt_processor,process_request,3,
                            [{file,"rabbit_mqtt_processor.erl"},{line,143}]},
     {rabbit_mqtt_processor,process_frame,2,
                            [{file,"rabbit_mqtt_processor.erl"},{line,69}]},
     {rabbit_mqtt_reader,process_received_bytes,2,
                         [{file,"src/rabbit_mqtt_reader.erl"},{line,307}]},
```
After this commit, the server just logs:
```
[error] <0.887.0> MQTT protocol error on connection 127.0.0.1:55725 -> 127.0.0.1:1883: peername_not_known
```
In case the client already disconnected, we want the server to bail out
early, i.e. not authenticating and registering the client at all
since that can be expensive when many clients connected while the
resource alarm was ongoing.
To detect whether the client disconnected, we rely on inet:peername/1
which will return an error when the peer is not connected anymore.
Ideally we could use some better mechanism for detecting whether the
client disconnected.
The MQTT reader does receive a {tcp_closed, Socket} message once the
socket becomes active. However, we don't really want to read frames
ahead (i.e. ahead of the received CONNECT frame), one reason being that:
"Clients are allowed to send further Control Packets immediately
after sending a CONNECT Packet; Clients need not wait for a CONNACK Packet
to arrive from the Server."
Setting socket option `show_econnreset` does not help either because the client
closes the connection normally.
Co-authored-by: Péter Gömöri @gomoripeti
											
										 
											2022-08-25 23:54:30 +08:00
										 |  |  | process_connect(#mqtt_frame{ variable = #mqtt_frame_connect{ | 
					
						
							| 
									
										
										
										
											2016-01-08 07:25:26 +08:00
										 |  |  |                                            username   = Username, | 
					
						
							|  |  |  |                                            password   = Password, | 
					
						
							|  |  |  |                                            proto_ver  = ProtoVersion, | 
					
						
							|  |  |  |                                            clean_sess = CleanSess, | 
					
						
							|  |  |  |                                            client_id  = ClientId0, | 
					
						
							|  |  |  |                                            keep_alive = Keepalive} = Var}, | 
					
						
							| 
									
										
										
										
											2017-06-07 20:41:59 +08:00
										 |  |  |                 PState0 = #proc_state{ ssl_login_name = SSLLoginName, | 
					
						
							|  |  |  |                                        send_fun       = SendFun, | 
					
						
							| 
									
										
										
										
											2020-08-28 17:29:12 +08:00
										 |  |  |                                        adapter_info   = AdapterInfo, | 
					
						
							|  |  |  |                                        peer_addr      = Addr}) -> | 
					
						
							| 
									
										
										
										
											2014-03-19 20:05:42 +08:00
										 |  |  |     ClientId = case ClientId0 of | 
					
						
							|  |  |  |                    []    -> rabbit_mqtt_util:gen_client_id(); | 
					
						
							|  |  |  |                    [_|_] -> ClientId0 | 
					
						
							|  |  |  |                end, | 
					
						
							| 
									
										
										
										
											2019-02-05 07:49:36 +08:00
										 |  |  |      rabbit_log_connection:debug("Received a CONNECT, client ID: ~p (expanded to ~p), username: ~p, " | 
					
						
							|  |  |  |                                  "clean session: ~p, protocol version: ~p, keepalive: ~p", | 
					
						
							|  |  |  |                                  [ClientId0, ClientId, Username, CleanSess, ProtoVersion, Keepalive]), | 
					
						
							| 
									
										
										
										
											2019-01-22 17:30:25 +08:00
										 |  |  |     AdapterInfo1 = add_client_id_to_adapter_info(rabbit_data_coercion:to_binary(ClientId), AdapterInfo), | 
					
						
							| 
									
										
										
										
											2019-02-08 03:33:42 +08:00
										 |  |  |     PState1 = PState0#proc_state{adapter_info = AdapterInfo1}, | 
					
						
							| 
									
										
										
										
											2020-08-28 17:29:12 +08:00
										 |  |  |     Ip = list_to_binary(inet:ntoa(Addr)), | 
					
						
							| 
									
										
										
										
											2019-02-08 03:33:42 +08:00
										 |  |  |     {Return, PState5} = | 
					
						
							| 
									
										
										
										
											2014-03-19 20:05:42 +08:00
										 |  |  |         case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)), | 
					
						
							|  |  |  |               ClientId0 =:= [] andalso CleanSess =:= false} of | 
					
						
							| 
									
										
										
										
											2012-07-04 00:35:18 +08:00
										 |  |  |             {false, _} -> | 
					
						
							| 
									
										
										
										
											2019-02-08 03:33:42 +08:00
										 |  |  |                 {?CONNACK_PROTO_VER, PState1}; | 
					
						
							| 
									
										
										
										
											2014-03-19 20:05:42 +08:00
										 |  |  |             {_, true} -> | 
					
						
							| 
									
										
										
										
											2019-02-08 03:33:42 +08:00
										 |  |  |                 {?CONNACK_INVALID_ID, PState1}; | 
					
						
							| 
									
										
										
										
											2012-07-04 00:35:18 +08:00
										 |  |  |             _ -> | 
					
						
							| 
									
										
										
										
											2014-11-21 22:54:33 +08:00
										 |  |  |                 case creds(Username, Password, SSLLoginName) of | 
					
						
							| 
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 |  |  |                     nocreds -> | 
					
						
							| 
									
										
										
										
											2020-09-22 23:57:47 +08:00
										 |  |  |                         rabbit_core_metrics:auth_attempt_failed(Ip, <<>>, mqtt), | 
					
						
							| 
									
										
										
										
											2021-03-11 21:45:32 +08:00
										 |  |  |                         rabbit_log_connection:error("MQTT login failed: no credentials provided"), | 
					
						
							| 
									
										
										
										
											2019-02-08 03:33:42 +08:00
										 |  |  |                         {?CONNACK_CREDENTIALS, PState1}; | 
					
						
							| 
									
										
										
										
											2016-09-02 06:33:34 +08:00
										 |  |  |                     {invalid_creds, {undefined, Pass}} when is_list(Pass) -> | 
					
						
							| 
									
										
										
										
											2020-09-22 23:57:47 +08:00
										 |  |  |                         rabbit_core_metrics:auth_attempt_failed(Ip, <<>>, mqtt), | 
					
						
							| 
									
										
										
										
											2019-07-29 21:59:19 +08:00
										 |  |  |                         rabbit_log_connection:error("MQTT login failed: no username is provided"), | 
					
						
							| 
									
										
										
										
											2019-02-08 03:33:42 +08:00
										 |  |  |                         {?CONNACK_CREDENTIALS, PState1}; | 
					
						
							| 
									
										
										
										
											2016-09-02 06:33:34 +08:00
										 |  |  |                     {invalid_creds, {User, undefined}} when is_list(User) -> | 
					
						
							| 
									
										
										
										
											2020-09-22 23:57:47 +08:00
										 |  |  |                         rabbit_core_metrics:auth_attempt_failed(Ip, User, mqtt), | 
					
						
							| 
									
										
										
										
											2019-07-29 21:59:19 +08:00
										 |  |  |                         rabbit_log_connection:error("MQTT login failed for user '~p': no password provided", [User]), | 
					
						
							| 
									
										
										
										
											2019-02-08 03:33:42 +08:00
										 |  |  |                         {?CONNACK_CREDENTIALS, PState1}; | 
					
						
							| 
									
										
										
										
											2012-09-19 22:34:42 +08:00
										 |  |  |                     {UserBin, PassBin} -> | 
					
						
							| 
									
										
										
										
											2019-02-08 03:33:42 +08:00
										 |  |  |                         case process_login(UserBin, PassBin, ProtoVersion, PState1) of | 
					
						
							|  |  |  |                             connack_dup_auth -> | 
					
						
							| 
									
										
										
										
											2021-10-26 22:29:41 +08:00
										 |  |  |                                 maybe_clean_sess(PState1); | 
					
						
							| 
									
										
										
										
											2015-06-30 16:08:26 +08:00
										 |  |  |                             {?CONNACK_ACCEPT, Conn, VHost, AState} -> | 
					
						
							| 
									
										
										
										
											2019-06-04 18:40:26 +08:00
										 |  |  |                                 case rabbit_mqtt_collector:register(ClientId, self()) of | 
					
						
							| 
									
										
										
										
											2020-02-11 01:05:43 +08:00
										 |  |  |                                     {ok, Corr} -> | 
					
						
							| 
									
										
										
										
											2020-02-24 22:58:03 +08:00
										 |  |  |                                         RetainerPid = rabbit_mqtt_retainer_sup:child_for_vhost(VHost), | 
					
						
							|  |  |  |                                         link(Conn), | 
					
						
							| 
									
										
										
										
											2019-06-04 18:40:26 +08:00
										 |  |  |                                     {ok, Ch} = amqp_connection:open_channel(Conn), | 
					
						
							| 
									
										
										
										
											2020-02-24 22:58:03 +08:00
										 |  |  |                                         link(Ch), | 
					
						
							|  |  |  |                                         amqp_channel:enable_delivery_flow_control(Ch), | 
					
						
							|  |  |  |                                         Prefetch = rabbit_mqtt_util:env(prefetch), | 
					
						
							|  |  |  |                                         #'basic.qos_ok'{} = amqp_channel:call(Ch, | 
					
						
							|  |  |  |                                             #'basic.qos'{prefetch_count = Prefetch}), | 
					
						
							| 
									
										
										
										
											2019-06-04 18:40:26 +08:00
										 |  |  |                                     rabbit_mqtt_reader:start_keepalive(self(), Keepalive), | 
					
						
							|  |  |  |                                     PState3 = PState1#proc_state{ | 
					
						
							|  |  |  |                                                 will_msg   = make_will_msg(Var), | 
					
						
							|  |  |  |                                                 clean_sess = CleanSess, | 
					
						
							|  |  |  |                                                 channels   = {Ch, undefined}, | 
					
						
							|  |  |  |                                                 connection = Conn, | 
					
						
							|  |  |  |                                                 client_id  = ClientId, | 
					
						
							|  |  |  |                                                 retainer_pid = RetainerPid, | 
					
						
							| 
									
										
										
										
											2020-02-11 01:05:43 +08:00
										 |  |  |                                                 auth_state = AState, | 
					
						
							|  |  |  |                                                 register_state = {pending, Corr}}, | 
					
						
							| 
									
										
										
										
											2021-10-26 22:29:41 +08:00
										 |  |  |                                     maybe_clean_sess(PState3); | 
					
						
							| 
									
										
										
										
											2019-06-04 18:40:26 +08:00
										 |  |  |                                   %% e.g. this node was removed from the MQTT cluster members
 | 
					
						
							|  |  |  |                                   {error, _} = Err -> | 
					
						
							|  |  |  |                                     rabbit_log_connection:error("MQTT cannot accept a connection: " | 
					
						
							|  |  |  |                                                                 "client ID tracker is unavailable: ~p", [Err]), | 
					
						
							| 
									
										
										
										
											2019-07-29 21:59:19 +08:00
										 |  |  |                                     %% ignore all exceptions, we are shutting down
 | 
					
						
							|  |  |  |                                     catch amqp_connection:close(Conn), | 
					
						
							| 
									
										
										
										
											2019-06-04 18:40:26 +08:00
										 |  |  |                                     {?CONNACK_SERVER, PState1}; | 
					
						
							|  |  |  |                                   {timeout, _} -> | 
					
						
							|  |  |  |                                     rabbit_log_connection:error("MQTT cannot accept a connection: " | 
					
						
							|  |  |  |                                                                 "client ID registration timed out"), | 
					
						
							| 
									
										
										
										
											2019-07-29 21:59:19 +08:00
										 |  |  |                                     %% ignore all exceptions, we are shutting down
 | 
					
						
							|  |  |  |                                     catch amqp_connection:close(Conn), | 
					
						
							| 
									
										
										
										
											2019-06-04 18:40:26 +08:00
										 |  |  |                                     {?CONNACK_SERVER, PState1} | 
					
						
							|  |  |  |                                 end; | 
					
						
							| 
									
										
										
										
											2019-07-29 21:59:19 +08:00
										 |  |  |                             ConnAck -> {ConnAck, PState1} | 
					
						
							| 
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 |  |  |                         end | 
					
						
							| 
									
										
										
										
											2012-07-04 00:35:18 +08:00
										 |  |  |                 end | 
					
						
							|  |  |  |         end, | 
					
						
							| 
									
										
										
										
											2016-04-22 02:02:28 +08:00
										 |  |  |     {ReturnCode, SessionPresent} = case Return of | 
					
						
							| 
									
										
										
										
											2019-07-29 21:59:19 +08:00
										 |  |  |                                        {?CONNACK_ACCEPT, Bool} -> {?CONNACK_ACCEPT, Bool}; | 
					
						
							|  |  |  |                                        Other                   -> {Other, false} | 
					
						
							| 
									
										
										
										
											2019-02-08 03:33:42 +08:00
										 |  |  |                                    end, | 
					
						
							|  |  |  |     SendFun(#mqtt_frame{fixed    = #mqtt_frame_fixed{type = ?CONNACK}, | 
					
						
							|  |  |  |                         variable = #mqtt_frame_connack{ | 
					
						
							|  |  |  |                                       session_present = SessionPresent, | 
					
						
							|  |  |  |                                       return_code = ReturnCode}}, | 
					
						
							|  |  |  |             PState5), | 
					
						
							| 
									
										
										
										
											2019-07-29 21:59:19 +08:00
										 |  |  |     case ReturnCode of | 
					
						
							|  |  |  |       ?CONNACK_ACCEPT      -> {ok, PState5}; | 
					
						
							|  |  |  |       ?CONNACK_CREDENTIALS -> {error, unauthenticated, PState5}; | 
					
						
							|  |  |  |       ?CONNACK_AUTH        -> {error, unauthorized, PState5}; | 
					
						
							|  |  |  |       ?CONNACK_SERVER      -> {error, unavailable, PState5}; | 
					
						
							|  |  |  |       ?CONNACK_INVALID_ID  -> {error, invalid_client_id, PState5}; | 
					
						
							|  |  |  |       ?CONNACK_PROTO_VER   -> {error, unsupported_protocol_version, PState5} | 
					
						
							| 
									
										
											  
											
												Avoid crash when client disconnects before server handles MQTT CONNECT
In case of a resource alarm, the server accepts incoming TCP
connections, but does not read from the socket.
When a client connects during a resource alarm, the MQTT CONNECT frame
is therefore not processed.
While the resource alarm is ongoing, the client might time out waiting
on a CONNACK MQTT packet.
When the resource alarm clears on the server, the MQTT CONNECT frame
gets processed.
Prior to this commit, this results in the following crash on the server:
```
** Reason for termination ==
** {{badmatch,{error,einval}},
    [{rabbit_mqtt_processor,process_login,4,
                            [{file,"rabbit_mqtt_processor.erl"},{line,585}]},
     {rabbit_mqtt_processor,process_request,3,
                            [{file,"rabbit_mqtt_processor.erl"},{line,143}]},
     {rabbit_mqtt_processor,process_frame,2,
                            [{file,"rabbit_mqtt_processor.erl"},{line,69}]},
     {rabbit_mqtt_reader,process_received_bytes,2,
                         [{file,"src/rabbit_mqtt_reader.erl"},{line,307}]},
```
After this commit, the server just logs:
```
[error] <0.887.0> MQTT protocol error on connection 127.0.0.1:55725 -> 127.0.0.1:1883: peername_not_known
```
In case the client already disconnected, we want the server to bail out
early, i.e. not authenticating and registering the client at all
since that can be expensive when many clients connected while the
resource alarm was ongoing.
To detect whether the client disconnected, we rely on inet:peername/1
which will return an error when the peer is not connected anymore.
Ideally we could use some better mechanism for detecting whether the
client disconnected.
The MQTT reader does receive a {tcp_closed, Socket} message once the
socket becomes active. However, we don't really want to read frames
ahead (i.e. ahead of the received CONNECT frame), one reason being that:
"Clients are allowed to send further Control Packets immediately
after sending a CONNECT Packet; Clients need not wait for a CONNACK Packet
to arrive from the Server."
Setting socket option `show_econnreset` does not help either because the client
closes the connection normally.
Co-authored-by: Péter Gömöri @gomoripeti
											
										 
											2022-08-25 23:54:30 +08:00
										 |  |  |     end. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | process_request(?CONNECT, Frame, PState = #proc_state{socket = Socket}) -> | 
					
						
							|  |  |  |     case rabbit_net:peername(Socket) of | 
					
						
							|  |  |  |         {error, einval} -> | 
					
						
							|  |  |  |             %% Can happen when connection was blocked because of resource alarm
 | 
					
						
							|  |  |  |             %% and client therefore disconnected due to client side CONNACK timeout.
 | 
					
						
							|  |  |  |             {error, peername_not_known, PState}; | 
					
						
							|  |  |  |         _ -> | 
					
						
							|  |  |  |             process_connect(Frame, PState) | 
					
						
							| 
									
										
										
										
											2019-07-29 21:59:19 +08:00
										 |  |  |     end; | 
					
						
							| 
									
										
										
										
											2012-07-04 00:35:18 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  | process_request(?PUBACK, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                 #mqtt_frame{ | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |                   variable = #mqtt_frame_publish{ message_id = MessageId }}, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                 #proc_state{ channels     = {Channel, _}, | 
					
						
							|  |  |  |                              awaiting_ack = Awaiting } = PState) -> | 
					
						
							| 
									
										
										
										
											2015-04-26 09:51:04 +08:00
										 |  |  |     %% tag can be missing because of bogus clients and QoS downgrades
 | 
					
						
							|  |  |  |     case gb_trees:is_defined(MessageId, Awaiting) of | 
					
						
							|  |  |  |       false -> | 
					
						
							|  |  |  |         {ok, PState}; | 
					
						
							|  |  |  |       true -> | 
					
						
							|  |  |  |         Tag = gb_trees:get(MessageId, Awaiting), | 
					
						
							|  |  |  |         amqp_channel:cast(Channel, #'basic.ack'{ delivery_tag = Tag }), | 
					
						
							| 
									
										
										
										
											2017-11-09 09:08:17 +08:00
										 |  |  |         {ok, PState#proc_state{ awaiting_ack = gb_trees:delete(MessageId, Awaiting) }} | 
					
						
							| 
									
										
										
										
											2015-04-26 09:51:04 +08:00
										 |  |  |     end; | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | process_request(?PUBLISH, | 
					
						
							| 
									
										
										
										
											2016-05-18 21:11:50 +08:00
										 |  |  |                 Frame = #mqtt_frame{ | 
					
						
							|  |  |  |                     fixed = Fixed = #mqtt_frame_fixed{ qos = ?QOS_2 }}, | 
					
						
							| 
									
										
										
										
											2016-01-09 01:25:26 +08:00
										 |  |  |                 PState) -> | 
					
						
							|  |  |  |     % Downgrade QOS_2 to QOS_1
 | 
					
						
							| 
									
										
										
										
											2016-05-18 21:11:50 +08:00
										 |  |  |     process_request(?PUBLISH, | 
					
						
							| 
									
										
										
										
											2016-01-09 01:25:26 +08:00
										 |  |  |                     Frame#mqtt_frame{ | 
					
						
							|  |  |  |                         fixed = Fixed#mqtt_frame_fixed{ qos = ?QOS_1 }}, | 
					
						
							|  |  |  |                     PState); | 
					
						
							| 
									
										
										
										
											2012-07-04 23:31:30 +08:00
										 |  |  | process_request(?PUBLISH, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                 #mqtt_frame{ | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |                   fixed = #mqtt_frame_fixed{ qos    = Qos, | 
					
						
							|  |  |  |                                              retain = Retain, | 
					
						
							|  |  |  |                                              dup    = Dup }, | 
					
						
							|  |  |  |                   variable = #mqtt_frame_publish{ topic_name = Topic, | 
					
						
							|  |  |  |                                                   message_id = MessageId }, | 
					
						
							| 
									
										
										
										
											2015-04-18 08:55:34 +08:00
										 |  |  |                   payload = Payload }, | 
					
						
							| 
									
										
										
										
											2019-09-04 23:07:33 +08:00
										 |  |  |                   PState = #proc_state{retainer_pid = RPid, | 
					
						
							|  |  |  |                                        amqp2mqtt_fun = Amqp2MqttFun}) -> | 
					
						
							| 
									
										
										
										
											2017-01-16 16:54:16 +08:00
										 |  |  |     check_publish(Topic, fun() -> | 
					
						
							| 
									
										
										
										
											2015-06-30 16:08:26 +08:00
										 |  |  |         Msg = #mqtt_msg{retain     = Retain, | 
					
						
							|  |  |  |                         qos        = Qos, | 
					
						
							|  |  |  |                         topic      = Topic, | 
					
						
							|  |  |  |                         dup        = Dup, | 
					
						
							|  |  |  |                         message_id = MessageId, | 
					
						
							|  |  |  |                         payload    = Payload}, | 
					
						
							|  |  |  |         Result = amqp_pub(Msg, PState), | 
					
						
							|  |  |  |         case Retain of | 
					
						
							|  |  |  |           false -> ok; | 
					
						
							| 
									
										
										
										
											2019-09-04 23:07:33 +08:00
										 |  |  |           true  -> hand_off_to_retainer(RPid, Amqp2MqttFun, Topic, Msg) | 
					
						
							| 
									
										
										
										
											2015-06-30 16:08:26 +08:00
										 |  |  |         end, | 
					
						
							|  |  |  |         {ok, Result} | 
					
						
							|  |  |  |     end, PState); | 
					
						
							| 
									
										
										
										
											2012-07-04 23:31:30 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | process_request(?SUBSCRIBE, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                 #mqtt_frame{ | 
					
						
							| 
									
										
										
										
											2015-04-21 19:26:46 +08:00
										 |  |  |                   variable = #mqtt_frame_subscribe{ | 
					
						
							| 
									
										
										
										
											2017-11-09 09:08:17 +08:00
										 |  |  |                               message_id  = SubscribeMsgId, | 
					
						
							| 
									
										
										
										
											2015-04-21 19:26:46 +08:00
										 |  |  |                               topic_table = Topics}, | 
					
						
							|  |  |  |                   payload = undefined}, | 
					
						
							|  |  |  |                 #proc_state{channels = {Channel, _}, | 
					
						
							|  |  |  |                             exchange = Exchange, | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  |                             retainer_pid = RPid, | 
					
						
							| 
									
										
										
										
											2017-11-09 09:08:17 +08:00
										 |  |  |                             send_fun = SendFun, | 
					
						
							| 
									
										
										
										
											2019-09-04 23:07:33 +08:00
										 |  |  |                             message_id  = StateMsgId, | 
					
						
							|  |  |  |                             mqtt2amqp_fun = Mqtt2AmqpFun} = PState0) -> | 
					
						
							| 
									
										
										
										
											2019-02-05 07:49:36 +08:00
										 |  |  |     rabbit_log_connection:debug("Received a SUBSCRIBE for topic(s) ~p", [Topics]), | 
					
						
							| 
									
										
										
										
											2021-10-26 22:29:41 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  |     {QosResponse, PState1} = | 
					
						
							|  |  |  |         lists:foldl(fun (#mqtt_topic{name = TopicName, | 
					
						
							|  |  |  |                                      qos  = Qos}, {QosList, PState}) -> | 
					
						
							|  |  |  |                        SupportedQos = supported_subs_qos(Qos), | 
					
						
							|  |  |  |                        {Queue, #proc_state{subscriptions = Subs} = PState1} = | 
					
						
							|  |  |  |                            ensure_queue(SupportedQos, PState), | 
					
						
							|  |  |  |                        RoutingKey = Mqtt2AmqpFun(TopicName), | 
					
						
							|  |  |  |                        Binding = #'queue.bind'{ | 
					
						
							|  |  |  |                                    queue       = Queue, | 
					
						
							|  |  |  |                                    exchange    = Exchange, | 
					
						
							|  |  |  |                                    routing_key = RoutingKey}, | 
					
						
							|  |  |  |                        #'queue.bind_ok'{} = amqp_channel:call(Channel, Binding), | 
					
						
							|  |  |  |                        SupportedQosList = case maps:find(TopicName, Subs) of | 
					
						
							|  |  |  |                            {ok, L} -> [SupportedQos|L]; | 
					
						
							|  |  |  |                            error   -> [SupportedQos] | 
					
						
							|  |  |  |                        end, | 
					
						
							|  |  |  |                        {[SupportedQos | QosList], | 
					
						
							|  |  |  |                         PState1 #proc_state{ | 
					
						
							|  |  |  |                             subscriptions = | 
					
						
							|  |  |  |                                 maps:put(TopicName, SupportedQosList, Subs)}} | 
					
						
							|  |  |  |                    end, {[], PState0}, Topics), | 
					
						
							|  |  |  |     SendFun(#mqtt_frame{fixed    = #mqtt_frame_fixed{type = ?SUBACK}, | 
					
						
							|  |  |  |                         variable = #mqtt_frame_suback{ | 
					
						
							|  |  |  |                                     message_id = SubscribeMsgId, | 
					
						
							|  |  |  |                                     qos_table  = QosResponse}}, PState1), | 
					
						
							|  |  |  |     %% we may need to send up to length(Topics) messages.
 | 
					
						
							|  |  |  |     %% if QoS is > 0 then we need to generate a message id,
 | 
					
						
							|  |  |  |     %% and increment the counter.
 | 
					
						
							|  |  |  |     StartMsgId = safe_max_id(SubscribeMsgId, StateMsgId), | 
					
						
							|  |  |  |     N = lists:foldl(fun (Topic, Acc) -> | 
					
						
							|  |  |  |                       case maybe_send_retained_message(RPid, Topic, Acc, PState1) of | 
					
						
							|  |  |  |                         {true, X} -> Acc + X; | 
					
						
							|  |  |  |                         false     -> Acc | 
					
						
							|  |  |  |                       end | 
					
						
							|  |  |  |                     end, StartMsgId, Topics), | 
					
						
							|  |  |  |     {ok, PState1#proc_state{message_id = N}}; | 
					
						
							| 
									
										
										
										
											2012-07-05 00:47:07 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | process_request(?UNSUBSCRIBE, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                 #mqtt_frame{ | 
					
						
							|  |  |  |                   variable = #mqtt_frame_subscribe{ message_id  = MessageId, | 
					
						
							|  |  |  |                                                     topic_table = Topics }, | 
					
						
							|  |  |  |                   payload = undefined }, #proc_state{ channels      = {Channel, _}, | 
					
						
							|  |  |  |                                                       exchange      = Exchange, | 
					
						
							|  |  |  |                                                       client_id     = ClientId, | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  |                                                       subscriptions = Subs0, | 
					
						
							| 
									
										
										
										
											2019-09-04 23:07:33 +08:00
										 |  |  |                                                       send_fun      = SendFun, | 
					
						
							|  |  |  |                                                       mqtt2amqp_fun = Mqtt2AmqpFun } = PState) -> | 
					
						
							| 
									
										
										
										
											2019-02-05 07:49:36 +08:00
										 |  |  |     rabbit_log_connection:debug("Received an UNSUBSCRIBE for topic(s) ~p", [Topics]), | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |     Queues = rabbit_mqtt_util:subcription_queue_name(ClientId), | 
					
						
							|  |  |  |     Subs1 = | 
					
						
							|  |  |  |     lists:foldl( | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |       fun (#mqtt_topic{ name = TopicName }, Subs) -> | 
					
						
							| 
									
										
										
										
											2017-04-24 20:45:37 +08:00
										 |  |  |         QosSubs = case maps:find(TopicName, Subs) of | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |                       {ok, Val} when is_list(Val) -> lists:usort(Val); | 
					
						
							|  |  |  |                       error                       -> [] | 
					
						
							|  |  |  |                   end, | 
					
						
							| 
									
										
										
										
											2019-09-04 23:07:33 +08:00
										 |  |  |         RoutingKey = Mqtt2AmqpFun(TopicName), | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |         lists:foreach( | 
					
						
							|  |  |  |           fun (QosSub) -> | 
					
						
							|  |  |  |                   Queue = element(QosSub + 1, Queues), | 
					
						
							|  |  |  |                   Binding = #'queue.unbind'{ | 
					
						
							|  |  |  |                               queue       = Queue, | 
					
						
							|  |  |  |                               exchange    = Exchange, | 
					
						
							| 
									
										
										
										
											2019-09-04 23:07:33 +08:00
										 |  |  |                               routing_key = RoutingKey}, | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |                   #'queue.unbind_ok'{} = amqp_channel:call(Channel, Binding) | 
					
						
							|  |  |  |           end, QosSubs), | 
					
						
							| 
									
										
										
										
											2017-04-24 20:45:37 +08:00
										 |  |  |         maps:remove(TopicName, Subs) | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |       end, Subs0, Topics), | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  |     SendFun(#mqtt_frame{ fixed    = #mqtt_frame_fixed { type       = ?UNSUBACK }, | 
					
						
							|  |  |  |                          variable = #mqtt_frame_suback{ message_id = MessageId }}, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                 PState), | 
					
						
							|  |  |  |     {ok, PState #proc_state{ subscriptions = Subs1 }}; | 
					
						
							| 
									
										
										
										
											2012-07-03 16:55:31 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  | process_request(?PINGREQ, #mqtt_frame{}, #proc_state{ send_fun = SendFun } = PState) -> | 
					
						
							| 
									
										
										
										
											2019-02-05 07:49:36 +08:00
										 |  |  |     rabbit_log_connection:debug("Received a PINGREQ"), | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  |     SendFun(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                 PState), | 
					
						
							| 
									
										
										
										
											2019-02-05 07:49:36 +08:00
										 |  |  |     rabbit_log_connection:debug("Sent a PINGRESP"), | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |     {ok, PState}; | 
					
						
							| 
									
										
										
										
											2012-07-04 23:31:30 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  | process_request(?DISCONNECT, #mqtt_frame{}, PState) -> | 
					
						
							| 
									
										
										
										
											2019-02-05 07:49:36 +08:00
										 |  |  |     rabbit_log_connection:debug("Received a DISCONNECT"), | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |     {stop, PState}. | 
					
						
							| 
									
										
										
										
											2012-07-04 00:35:18 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-09-04 23:07:33 +08:00
										 |  |  | hand_off_to_retainer(RetainerPid, Amqp2MqttFun, Topic0, #mqtt_msg{payload = <<"">>}) -> | 
					
						
							|  |  |  |     Topic1 = Amqp2MqttFun(Topic0), | 
					
						
							|  |  |  |     rabbit_mqtt_retainer:clear(RetainerPid, Topic1), | 
					
						
							|  |  |  |     ok; | 
					
						
							|  |  |  | hand_off_to_retainer(RetainerPid, Amqp2MqttFun, Topic0, Msg) -> | 
					
						
							|  |  |  |     Topic1 = Amqp2MqttFun(Topic0), | 
					
						
							|  |  |  |     rabbit_mqtt_retainer:retain(RetainerPid, Topic1, Msg), | 
					
						
							|  |  |  |     ok. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | maybe_send_retained_message(RPid, #mqtt_topic{name = Topic0, qos = SubscribeQos}, MsgId, | 
					
						
							|  |  |  |                             #proc_state{ send_fun = SendFun, | 
					
						
							|  |  |  |                                          amqp2mqtt_fun = Amqp2MqttFun } = PState) -> | 
					
						
							|  |  |  |     Topic1 = Amqp2MqttFun(Topic0), | 
					
						
							|  |  |  |     case rabbit_mqtt_retainer:fetch(RPid, Topic1) of | 
					
						
							|  |  |  |         undefined -> false; | 
					
						
							|  |  |  |         Msg       -> | 
					
						
							|  |  |  |             %% calculate effective QoS as the lower value of SUBSCRIBE frame QoS
 | 
					
						
							|  |  |  |             %% and retained message QoS. The spec isn't super clear on this, we
 | 
					
						
							|  |  |  |             %% do what Mosquitto does, per user feedback.
 | 
					
						
							|  |  |  |             Qos = erlang:min(SubscribeQos, Msg#mqtt_msg.qos), | 
					
						
							|  |  |  |             Id = case Qos of | 
					
						
							|  |  |  |                 ?QOS_0 -> undefined; | 
					
						
							|  |  |  |                 ?QOS_1 -> MsgId | 
					
						
							|  |  |  |             end, | 
					
						
							|  |  |  |             SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{ | 
					
						
							|  |  |  |                 type = ?PUBLISH, | 
					
						
							|  |  |  |                 qos  = Qos, | 
					
						
							|  |  |  |                 dup  = false, | 
					
						
							|  |  |  |                 retain = Msg#mqtt_msg.retain | 
					
						
							|  |  |  |             }, variable = #mqtt_frame_publish{ | 
					
						
							|  |  |  |                 message_id = Id, | 
					
						
							|  |  |  |                 topic_name = Topic1 | 
					
						
							|  |  |  |             }, | 
					
						
							|  |  |  |             payload = Msg#mqtt_msg.payload}, PState), | 
					
						
							|  |  |  |             case Qos of | 
					
						
							|  |  |  |             ?QOS_0 -> false; | 
					
						
							|  |  |  |             ?QOS_1 -> {true, 1} | 
					
						
							|  |  |  |         end | 
					
						
							|  |  |  |     end. | 
					
						
							| 
									
										
										
										
											2015-04-21 19:26:46 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  | amqp_callback({#'basic.deliver'{ consumer_tag = ConsumerTag, | 
					
						
							|  |  |  |                                  delivery_tag = DeliveryTag, | 
					
						
							|  |  |  |                                  routing_key  = RoutingKey }, | 
					
						
							|  |  |  |                #amqp_msg{ props = #'P_basic'{ headers = Headers }, | 
					
						
							| 
									
										
										
										
											2014-08-11 18:26:27 +08:00
										 |  |  |                           payload = Payload }, | 
					
						
							|  |  |  |                DeliveryCtx} = Delivery, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |               #proc_state{ channels      = {Channel, _}, | 
					
						
							|  |  |  |                            awaiting_ack  = Awaiting, | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  |                            message_id    = MsgId, | 
					
						
							| 
									
										
										
										
											2019-09-04 23:07:33 +08:00
										 |  |  |                            send_fun      = SendFun, | 
					
						
							|  |  |  |                            amqp2mqtt_fun = Amqp2MqttFun } = PState) -> | 
					
						
							| 
									
										
										
										
											2022-03-10 17:49:03 +08:00
										 |  |  |     notify_received(DeliveryCtx), | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |     case {delivery_dup(Delivery), delivery_qos(ConsumerTag, Headers, PState)} of | 
					
						
							| 
									
										
										
										
											2012-08-17 22:05:14 +08:00
										 |  |  |         {true, {?QOS_0, ?QOS_1}} -> | 
					
						
							| 
									
										
										
										
											2012-08-17 01:09:21 +08:00
										 |  |  |             amqp_channel:cast( | 
					
						
							|  |  |  |               Channel, #'basic.ack'{ delivery_tag = DeliveryTag }), | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |             {ok, PState}; | 
					
						
							| 
									
										
										
										
											2012-08-17 22:05:14 +08:00
										 |  |  |         {true, {?QOS_0, ?QOS_0}} -> | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |             {ok, PState}; | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |         {Dup, {DeliveryQos, _SubQos} = Qos}     -> | 
					
						
							| 
									
										
										
										
											2019-09-04 23:07:33 +08:00
										 |  |  |             TopicName = Amqp2MqttFun(RoutingKey), | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  |             SendFun( | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |               #mqtt_frame{ fixed = #mqtt_frame_fixed{ | 
					
						
							|  |  |  |                                      type = ?PUBLISH, | 
					
						
							|  |  |  |                                      qos  = DeliveryQos, | 
					
						
							|  |  |  |                                      dup  = Dup }, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                            variable = #mqtt_frame_publish{ | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |                                         message_id = | 
					
						
							|  |  |  |                                           case DeliveryQos of | 
					
						
							|  |  |  |                                               ?QOS_0 -> undefined; | 
					
						
							|  |  |  |                                               ?QOS_1 -> MsgId | 
					
						
							|  |  |  |                                           end, | 
					
						
							| 
									
										
										
										
											2019-09-04 23:07:33 +08:00
										 |  |  |                                         topic_name = TopicName }, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                            payload = Payload}, PState), | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |               case Qos of | 
					
						
							|  |  |  |                   {?QOS_0, ?QOS_0} -> | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                       {ok, PState}; | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |                   {?QOS_1, ?QOS_1} -> | 
					
						
							| 
									
										
										
										
											2017-11-09 09:08:17 +08:00
										 |  |  |                       Awaiting1 = gb_trees:insert(MsgId, DeliveryTag, Awaiting), | 
					
						
							|  |  |  |                       PState1 = PState#proc_state{ awaiting_ack = Awaiting1 }, | 
					
						
							|  |  |  |                       PState2 = next_msg_id(PState1), | 
					
						
							|  |  |  |                       {ok, PState2}; | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |                   {?QOS_0, ?QOS_1} -> | 
					
						
							|  |  |  |                       amqp_channel:cast( | 
					
						
							| 
									
										
										
										
											2012-08-17 01:09:21 +08:00
										 |  |  |                         Channel, #'basic.ack'{ delivery_tag = DeliveryTag }), | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                       {ok, PState} | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |               end | 
					
						
							|  |  |  |     end; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | amqp_callback(#'basic.ack'{ multiple = true, delivery_tag = Tag } = Ack, | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  |               PState = #proc_state{ unacked_pubs = UnackedPubs, | 
					
						
							|  |  |  |                                     send_fun     = SendFun }) -> | 
					
						
							| 
									
										
										
										
											2013-12-04 22:59:22 +08:00
										 |  |  |     case gb_trees:size(UnackedPubs) > 0 andalso | 
					
						
							|  |  |  |          gb_trees:take_smallest(UnackedPubs) of | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |         {TagSmall, MsgId, UnackedPubs1} when TagSmall =< Tag -> | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  |             SendFun( | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |               #mqtt_frame{ fixed    = #mqtt_frame_fixed{ type = ?PUBACK }, | 
					
						
							|  |  |  |                            variable = #mqtt_frame_publish{ message_id = MsgId }}, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |               PState), | 
					
						
							|  |  |  |             amqp_callback(Ack, PState #proc_state{ unacked_pubs = UnackedPubs1 }); | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |         _ -> | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |             {ok, PState} | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |     end; | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | amqp_callback(#'basic.ack'{ multiple = false, delivery_tag = Tag }, | 
					
						
							| 
									
										
										
										
											2016-01-07 23:19:01 +08:00
										 |  |  |               PState = #proc_state{ unacked_pubs = UnackedPubs, | 
					
						
							|  |  |  |                                     send_fun     = SendFun }) -> | 
					
						
							|  |  |  |     SendFun( | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |       #mqtt_frame{ fixed    = #mqtt_frame_fixed{ type = ?PUBACK }, | 
					
						
							|  |  |  |                    variable = #mqtt_frame_publish{ | 
					
						
							|  |  |  |                                 message_id = gb_trees:get( | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |                                                Tag, UnackedPubs) }}, PState), | 
					
						
							|  |  |  |     {ok, PState #proc_state{ unacked_pubs = gb_trees:delete(Tag, UnackedPubs) }}. | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | delivery_dup({#'basic.deliver'{ redelivered = Redelivered }, | 
					
						
							| 
									
										
										
										
											2014-08-11 18:26:27 +08:00
										 |  |  |               #amqp_msg{ props = #'P_basic'{ headers = Headers }}, | 
					
						
							|  |  |  |               _DeliveryCtx}) -> | 
					
						
							| 
									
										
										
										
											2012-11-06 18:32:38 +08:00
										 |  |  |     case rabbit_mqtt_util:table_lookup(Headers, <<"x-mqtt-dup">>) of | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |         undefined   -> Redelivered; | 
					
						
							|  |  |  |         {bool, Dup} -> Redelivered orelse Dup | 
					
						
							|  |  |  |     end. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-11-09 09:08:17 +08:00
										 |  |  | ensure_valid_mqtt_message_id(Id) when Id >= 16#ffff -> | 
					
						
							|  |  |  |     1; | 
					
						
							|  |  |  | ensure_valid_mqtt_message_id(Id) -> | 
					
						
							|  |  |  |     Id. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | safe_max_id(Id0, Id1) -> | 
					
						
							|  |  |  |     ensure_valid_mqtt_message_id(erlang:max(Id0, Id1)). | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | next_msg_id(PState = #proc_state{ message_id = MsgId0 }) -> | 
					
						
							|  |  |  |     MsgId1 = ensure_valid_mqtt_message_id(MsgId0 + 1), | 
					
						
							|  |  |  |     PState#proc_state{ message_id = MsgId1 }. | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | %% decide at which qos level to deliver based on subscription
 | 
					
						
							| 
									
										
										
										
											2012-11-06 04:30:45 +08:00
										 |  |  | %% and the message publish qos level. non-MQTT publishes are
 | 
					
						
							|  |  |  | %% assumed to be qos 1, regardless of delivery_mode.
 | 
					
						
							|  |  |  | delivery_qos(Tag, _Headers,  #proc_state{ consumer_tags = {Tag, _} }) -> | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |     {?QOS_0, ?QOS_0}; | 
					
						
							| 
									
										
										
										
											2012-11-06 04:30:45 +08:00
										 |  |  | delivery_qos(Tag, Headers,   #proc_state{ consumer_tags = {_, Tag} }) -> | 
					
						
							| 
									
										
										
										
											2012-11-06 18:32:38 +08:00
										 |  |  |     case rabbit_mqtt_util:table_lookup(Headers, <<"x-mqtt-publish-qos">>) of | 
					
						
							| 
									
										
										
										
											2012-11-06 04:30:45 +08:00
										 |  |  |         {byte, Qos} -> {lists:min([Qos, ?QOS_1]), ?QOS_1}; | 
					
						
							|  |  |  |         undefined   -> {?QOS_1, ?QOS_1} | 
					
						
							|  |  |  |     end. | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-04-22 02:02:28 +08:00
										 |  |  | maybe_clean_sess(PState = #proc_state { clean_sess = false, | 
					
						
							| 
									
										
										
										
											2018-08-21 18:51:05 +08:00
										 |  |  |                                         connection = Conn, | 
					
						
							| 
									
										
										
										
											2021-10-26 22:29:41 +08:00
										 |  |  |                                         auth_state = #auth_state{vhost = VHost}, | 
					
						
							| 
									
										
										
										
											2016-04-22 02:02:28 +08:00
										 |  |  |                                         client_id  = ClientId }) -> | 
					
						
							| 
									
										
										
										
											2021-10-26 22:29:41 +08:00
										 |  |  |     SessionPresent = session_present(VHost, ClientId), | 
					
						
							|  |  |  |     case SessionPresent of | 
					
						
							|  |  |  |         false -> | 
					
						
							|  |  |  |             %% ensure_queue/2 not only ensures that queue is created, but also starts consuming from it.
 | 
					
						
							|  |  |  |             %% Let's avoid creating that queue until explicitly asked by a client.
 | 
					
						
							|  |  |  |             %% Then publish-only clients, that connect with clean_sess=true due to some misconfiguration,
 | 
					
						
							|  |  |  |             %% will consume less resources.
 | 
					
						
							|  |  |  |             {{?CONNACK_ACCEPT, SessionPresent}, PState}; | 
					
						
							|  |  |  |         true -> | 
					
						
							|  |  |  |             try ensure_queue(?QOS_1, PState) of | 
					
						
							|  |  |  |                 {_Queue, PState1} -> {{?CONNACK_ACCEPT, SessionPresent}, PState1} | 
					
						
							|  |  |  |             catch | 
					
						
							|  |  |  |                 exit:({{shutdown, {server_initiated_close, 403, _}}, _}) -> | 
					
						
							|  |  |  |                     %% Connection is not yet propagated to #proc_state{}, let's close it here
 | 
					
						
							|  |  |  |                     catch amqp_connection:close(Conn), | 
					
						
							|  |  |  |                     rabbit_log_connection:error("MQTT cannot recover a session, user is missing permissions"), | 
					
						
							|  |  |  |                     {?CONNACK_SERVER, PState}; | 
					
						
							|  |  |  |                 C:E:S -> | 
					
						
							|  |  |  |                     %% Connection is not yet propagated to
 | 
					
						
							|  |  |  |                     %% #proc_state{}, let's close it here.
 | 
					
						
							|  |  |  |                     %% This is an exceptional situation anyway, but
 | 
					
						
							|  |  |  |                     %% doing this will prevent second crash from
 | 
					
						
							|  |  |  |                     %% amqp client being logged.
 | 
					
						
							|  |  |  |                     catch amqp_connection:close(Conn), | 
					
						
							|  |  |  |                     erlang:raise(C, E, S) | 
					
						
							|  |  |  |             end | 
					
						
							|  |  |  |     end; | 
					
						
							| 
									
										
										
										
											2013-07-27 00:43:25 +08:00
										 |  |  | maybe_clean_sess(PState = #proc_state { clean_sess = true, | 
					
						
							|  |  |  |                                         connection = Conn, | 
					
						
							| 
									
										
										
										
											2021-10-26 22:29:41 +08:00
										 |  |  |                                         auth_state = #auth_state{vhost = VHost}, | 
					
						
							| 
									
										
										
										
											2013-07-27 00:43:25 +08:00
										 |  |  |                                         client_id  = ClientId }) -> | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |     {_, Queue} = rabbit_mqtt_util:subcription_queue_name(ClientId), | 
					
						
							| 
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 |  |  |     {ok, Channel} = amqp_connection:open_channel(Conn), | 
					
						
							| 
									
										
										
										
											2021-10-26 22:29:41 +08:00
										 |  |  |     case session_present(VHost, ClientId) of | 
					
						
							|  |  |  |         false -> | 
					
						
							|  |  |  |             {{?CONNACK_ACCEPT, false}, PState}; | 
					
						
							|  |  |  |         true -> | 
					
						
							|  |  |  |             try amqp_channel:call(Channel, #'queue.delete'{ queue = Queue }) of | 
					
						
							|  |  |  |                 #'queue.delete_ok'{} -> {{?CONNACK_ACCEPT, false}, PState} | 
					
						
							|  |  |  |             catch | 
					
						
							|  |  |  |                 exit:({{shutdown, {server_initiated_close, 403, _}}, _}) -> | 
					
						
							|  |  |  |                     %% Connection is not yet propagated to #proc_state{}, let's close it here
 | 
					
						
							|  |  |  |                     catch amqp_connection:close(Conn), | 
					
						
							|  |  |  |                     rabbit_log_connection:error("MQTT cannot start a clean session: " | 
					
						
							|  |  |  |                                                 "`configure` permission missing for queue `~p`", [Queue]), | 
					
						
							|  |  |  |                     {?CONNACK_SERVER, PState} | 
					
						
							|  |  |  |             after | 
					
						
							|  |  |  |                 catch amqp_channel:close(Channel) | 
					
						
							|  |  |  |             end | 
					
						
							|  |  |  |     end. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-05-10 18:16:40 +08:00
										 |  |  | session_present(VHost, ClientId) -> | 
					
						
							| 
									
										
										
										
											2016-04-22 02:02:28 +08:00
										 |  |  |     {_, QueueQ1} = rabbit_mqtt_util:subcription_queue_name(ClientId), | 
					
						
							| 
									
										
										
										
											2021-10-26 22:29:41 +08:00
										 |  |  |     QueueName = rabbit_misc:r(VHost, queue, QueueQ1), | 
					
						
							| 
									
										
										
										
											2022-05-10 18:16:40 +08:00
										 |  |  |     rabbit_amqqueue:exists(QueueName). | 
					
						
							| 
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  | make_will_msg(#mqtt_frame_connect{ will_flag   = false }) -> | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |     undefined; | 
					
						
							|  |  |  | make_will_msg(#mqtt_frame_connect{ will_retain = Retain, | 
					
						
							|  |  |  |                                    will_qos    = Qos, | 
					
						
							|  |  |  |                                    will_topic  = Topic, | 
					
						
							|  |  |  |                                    will_msg    = Msg }) -> | 
					
						
							|  |  |  |     #mqtt_msg{ retain  = Retain, | 
					
						
							|  |  |  |                qos     = Qos, | 
					
						
							|  |  |  |                topic   = Topic, | 
					
						
							|  |  |  |                dup     = false, | 
					
						
							|  |  |  |                payload = Msg }. | 
					
						
							| 
									
										
										
										
											2012-07-04 00:35:18 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-02-08 03:33:42 +08:00
										 |  |  | process_login(_UserBin, _PassBin, _ProtoVersion, | 
					
						
							|  |  |  |               #proc_state{channels   = {Channel, _}, | 
					
						
							| 
									
										
										
										
											2020-08-28 17:29:12 +08:00
										 |  |  |                           peer_addr  = Addr, | 
					
						
							| 
									
										
										
										
											2019-02-08 03:33:42 +08:00
										 |  |  |                           auth_state = #auth_state{username = Username, | 
					
						
							|  |  |  |                                                    vhost = VHost}}) when is_pid(Channel) -> | 
					
						
							|  |  |  |     UsernameStr = rabbit_data_coercion:to_list(Username), | 
					
						
							|  |  |  |     VHostStr = rabbit_data_coercion:to_list(VHost), | 
					
						
							| 
									
										
										
										
											2020-09-22 23:57:47 +08:00
										 |  |  |     rabbit_core_metrics:auth_attempt_failed(list_to_binary(inet:ntoa(Addr)), Username, mqtt), | 
					
						
							| 
									
										
										
										
											2019-02-08 06:58:27 +08:00
										 |  |  |     rabbit_log_connection:warning("MQTT detected duplicate connect/login attempt for user ~p, vhost ~p", | 
					
						
							| 
									
										
										
										
											2019-02-08 03:33:42 +08:00
										 |  |  |                                   [UsernameStr, VHostStr]), | 
					
						
							|  |  |  |     connack_dup_auth; | 
					
						
							| 
									
										
										
										
											2014-03-19 20:05:42 +08:00
										 |  |  | process_login(UserBin, PassBin, ProtoVersion, | 
					
						
							| 
									
										
										
										
											2019-02-08 03:33:42 +08:00
										 |  |  |               #proc_state{channels     = {undefined, undefined}, | 
					
						
							|  |  |  |                           socket       = Sock, | 
					
						
							|  |  |  |                           adapter_info = AdapterInfo, | 
					
						
							|  |  |  |                           ssl_login_name = SslLoginName, | 
					
						
							|  |  |  |                           peer_addr    = Addr}) -> | 
					
						
							| 
									
										
											  
											
												Avoid crash when client disconnects before server handles MQTT CONNECT
In case of a resource alarm, the server accepts incoming TCP
connections, but does not read from the socket.
When a client connects during a resource alarm, the MQTT CONNECT frame
is therefore not processed.
While the resource alarm is ongoing, the client might time out waiting
on a CONNACK MQTT packet.
When the resource alarm clears on the server, the MQTT CONNECT frame
gets processed.
Prior to this commit, this results in the following crash on the server:
```
** Reason for termination ==
** {{badmatch,{error,einval}},
    [{rabbit_mqtt_processor,process_login,4,
                            [{file,"rabbit_mqtt_processor.erl"},{line,585}]},
     {rabbit_mqtt_processor,process_request,3,
                            [{file,"rabbit_mqtt_processor.erl"},{line,143}]},
     {rabbit_mqtt_processor,process_frame,2,
                            [{file,"rabbit_mqtt_processor.erl"},{line,69}]},
     {rabbit_mqtt_reader,process_received_bytes,2,
                         [{file,"src/rabbit_mqtt_reader.erl"},{line,307}]},
```
After this commit, the server just logs:
```
[error] <0.887.0> MQTT protocol error on connection 127.0.0.1:55725 -> 127.0.0.1:1883: peername_not_known
```
In case the client already disconnected, we want the server to bail out
early, i.e. not authenticating and registering the client at all
since that can be expensive when many clients connected while the
resource alarm was ongoing.
To detect whether the client disconnected, we rely on inet:peername/1
which will return an error when the peer is not connected anymore.
Ideally we could use some better mechanism for detecting whether the
client disconnected.
The MQTT reader does receive a {tcp_closed, Socket} message once the
socket becomes active. However, we don't really want to read frames
ahead (i.e. ahead of the received CONNECT frame), one reason being that:
"Clients are allowed to send further Control Packets immediately
after sending a CONNECT Packet; Clients need not wait for a CONNACK Packet
to arrive from the Server."
Setting socket option `show_econnreset` does not help either because the client
closes the connection normally.
Co-authored-by: Péter Gömöri @gomoripeti
											
										 
											2022-08-25 23:54:30 +08:00
										 |  |  |     {ok, {_, LocalPort}} = rabbit_net:sockname(Sock), | 
					
						
							|  |  |  |     {VHostPickedUsing, {VHost, UsernameBin}} = get_vhost(UserBin, SslLoginName, LocalPort), | 
					
						
							| 
									
										
										
										
											2021-07-12 22:50:25 +08:00
										 |  |  |     rabbit_log_connection:debug( | 
					
						
							| 
									
										
										
										
											2021-03-11 21:45:32 +08:00
										 |  |  |         "MQTT vhost picked using ~s", | 
					
						
							| 
									
										
										
										
											2016-12-20 05:42:06 +08:00
										 |  |  |         [human_readable_vhost_lookup_strategy(VHostPickedUsing)]), | 
					
						
							| 
									
										
										
										
											2020-09-22 23:57:47 +08:00
										 |  |  |     RemoteAddress = list_to_binary(inet:ntoa(Addr)), | 
					
						
							| 
									
										
										
										
											2016-12-06 23:09:50 +08:00
										 |  |  |     case rabbit_vhost:exists(VHost) of | 
					
						
							|  |  |  |         true  -> | 
					
						
							|  |  |  |             case amqp_connection:start(#amqp_params_direct{ | 
					
						
							|  |  |  |                 username     = UsernameBin, | 
					
						
							|  |  |  |                 password     = PassBin, | 
					
						
							|  |  |  |                 virtual_host = VHost, | 
					
						
							|  |  |  |                 adapter_info = set_proto_version(AdapterInfo, ProtoVersion)}) of | 
					
						
							|  |  |  |                 {ok, Connection} -> | 
					
						
							| 
									
										
										
										
											2018-12-04 21:50:32 +08:00
										 |  |  |                     case rabbit_access_control:check_user_loopback(UsernameBin, Addr) of | 
					
						
							| 
									
										
										
										
											2016-12-06 23:09:50 +08:00
										 |  |  |                         ok          -> | 
					
						
							| 
									
										
										
										
											2020-09-22 23:57:47 +08:00
										 |  |  |                             rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, UsernameBin, | 
					
						
							|  |  |  |                                                                        mqtt), | 
					
						
							| 
									
										
										
										
											2016-12-06 23:09:50 +08:00
										 |  |  |                             [{internal_user, InternalUser}] = amqp_connection:info( | 
					
						
							|  |  |  |                                 Connection, [internal_user]), | 
					
						
							|  |  |  |                             {?CONNACK_ACCEPT, Connection, VHost, | 
					
						
							|  |  |  |                                 #auth_state{user = InternalUser, | 
					
						
							|  |  |  |                                     username = UsernameBin, | 
					
						
							|  |  |  |                                     vhost = VHost}}; | 
					
						
							|  |  |  |                         not_allowed -> | 
					
						
							| 
									
										
										
										
											2020-09-22 23:57:47 +08:00
										 |  |  |                             rabbit_core_metrics:auth_attempt_failed(RemoteAddress, UsernameBin, | 
					
						
							|  |  |  |                                                                     mqtt), | 
					
						
							| 
									
										
										
										
											2016-12-06 23:09:50 +08:00
										 |  |  |                             amqp_connection:close(Connection), | 
					
						
							| 
									
										
										
										
											2017-08-07 21:43:00 +08:00
										 |  |  |                             rabbit_log_connection:warning( | 
					
						
							| 
									
										
										
										
											2021-03-11 21:45:32 +08:00
										 |  |  |                                 "MQTT login failed for user ~s: " | 
					
						
							|  |  |  |                                 "this user's access is restricted to localhost", | 
					
						
							| 
									
										
										
										
											2016-12-06 23:09:50 +08:00
										 |  |  |                                 [binary_to_list(UsernameBin)]), | 
					
						
							|  |  |  |                             ?CONNACK_AUTH | 
					
						
							|  |  |  |                     end; | 
					
						
							|  |  |  |                 {error, {auth_failure, Explanation}} -> | 
					
						
							| 
									
										
										
										
											2020-09-22 23:57:47 +08:00
										 |  |  |                     rabbit_core_metrics:auth_attempt_failed(RemoteAddress, UsernameBin, mqtt), | 
					
						
							| 
									
										
										
										
											2021-03-11 21:45:32 +08:00
										 |  |  |                     rabbit_log_connection:error("MQTT login failed for user '~s', authentication failed: ~s", | 
					
						
							| 
									
										
										
										
											2016-12-06 23:09:50 +08:00
										 |  |  |                         [binary_to_list(UserBin), Explanation]), | 
					
						
							|  |  |  |                     ?CONNACK_CREDENTIALS; | 
					
						
							|  |  |  |                 {error, access_refused} -> | 
					
						
							| 
									
										
										
										
											2020-09-22 23:57:47 +08:00
										 |  |  |                     rabbit_core_metrics:auth_attempt_failed(RemoteAddress, UsernameBin, mqtt), | 
					
						
							| 
									
										
										
										
											2021-03-11 21:45:32 +08:00
										 |  |  |                     rabbit_log_connection:warning("MQTT login failed for user '~s': " | 
					
						
							|  |  |  |                         "virtual host access not allowed", | 
					
						
							| 
									
										
										
										
											2016-12-09 23:48:47 +08:00
										 |  |  |                         [binary_to_list(UserBin)]), | 
					
						
							|  |  |  |                     ?CONNACK_AUTH; | 
					
						
							|  |  |  |                 {error, not_allowed} -> | 
					
						
							| 
									
										
										
										
											2020-09-22 23:57:47 +08:00
										 |  |  |                     rabbit_core_metrics:auth_attempt_failed(RemoteAddress, UsernameBin, mqtt), | 
					
						
							| 
									
										
										
										
											2016-12-09 23:48:47 +08:00
										 |  |  |                     %% when vhost allowed for TLS connection
 | 
					
						
							| 
									
										
										
										
											2021-03-11 21:45:32 +08:00
										 |  |  |                     rabbit_log_connection:warning("MQTT login failed for user '~s': " | 
					
						
							|  |  |  |                         "virtual host access not allowed", | 
					
						
							| 
									
										
										
										
											2016-12-06 23:09:50 +08:00
										 |  |  |                         [binary_to_list(UserBin)]), | 
					
						
							| 
									
										
										
										
											2016-05-20 21:04:10 +08:00
										 |  |  |                     ?CONNACK_AUTH | 
					
						
							| 
									
										
										
										
											2014-02-19 01:33:33 +08:00
										 |  |  |             end; | 
					
						
							| 
									
										
										
										
											2016-12-06 23:09:50 +08:00
										 |  |  |         false -> | 
					
						
							| 
									
										
										
										
											2020-09-22 23:57:47 +08:00
										 |  |  |             rabbit_core_metrics:auth_attempt_failed(RemoteAddress, UsernameBin, mqtt), | 
					
						
							| 
									
										
										
										
											2021-03-11 21:45:32 +08:00
										 |  |  |             rabbit_log_connection:error("MQTT login failed for user '~s': virtual host '~s' does not exist", | 
					
						
							|  |  |  |                 [UserBin, VHost]), | 
					
						
							| 
									
										
										
										
											2016-12-06 23:09:50 +08:00
										 |  |  |             ?CONNACK_CREDENTIALS | 
					
						
							| 
									
										
										
										
											2014-02-19 01:33:00 +08:00
										 |  |  |     end. | 
					
						
							| 
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-12-19 22:00:43 +08:00
										 |  |  | get_vhost(UserBin, none, Port) -> | 
					
						
							|  |  |  |     get_vhost_no_ssl(UserBin, Port); | 
					
						
							|  |  |  | get_vhost(UserBin, undefined, Port) -> | 
					
						
							|  |  |  |     get_vhost_no_ssl(UserBin, Port); | 
					
						
							|  |  |  | get_vhost(UserBin, SslLogin, Port) -> | 
					
						
							|  |  |  |     get_vhost_ssl(UserBin, SslLogin, Port). | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | get_vhost_no_ssl(UserBin, Port) -> | 
					
						
							|  |  |  |     case vhost_in_username(UserBin) of | 
					
						
							|  |  |  |         true  -> | 
					
						
							|  |  |  |             {vhost_in_username_or_default, get_vhost_username(UserBin)}; | 
					
						
							|  |  |  |         false -> | 
					
						
							|  |  |  |             PortVirtualHostMapping = rabbit_runtime_parameters:value_global( | 
					
						
							|  |  |  |                 mqtt_port_to_vhost_mapping | 
					
						
							| 
									
										
										
										
											2016-12-13 21:46:15 +08:00
										 |  |  |             ), | 
					
						
							| 
									
										
										
										
											2016-12-19 22:00:43 +08:00
										 |  |  |             case get_vhost_from_port_mapping(Port, PortVirtualHostMapping) of | 
					
						
							| 
									
										
										
										
											2016-12-13 21:46:15 +08:00
										 |  |  |                 undefined -> | 
					
						
							| 
									
										
										
										
											2016-12-19 22:00:43 +08:00
										 |  |  |                     {default_vhost, {rabbit_mqtt_util:env(vhost), UserBin}}; | 
					
						
							| 
									
										
										
										
											2016-12-09 23:48:47 +08:00
										 |  |  |                 VHost -> | 
					
						
							| 
									
										
										
										
											2016-12-19 22:00:43 +08:00
										 |  |  |                     {port_to_vhost_mapping, {VHost, UserBin}} | 
					
						
							|  |  |  |             end | 
					
						
							|  |  |  |     end. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | get_vhost_ssl(UserBin, SslLoginName, Port) -> | 
					
						
							|  |  |  |     UserVirtualHostMapping = rabbit_runtime_parameters:value_global( | 
					
						
							|  |  |  |         mqtt_default_vhosts | 
					
						
							|  |  |  |     ), | 
					
						
							|  |  |  |     case get_vhost_from_user_mapping(SslLoginName, UserVirtualHostMapping) of | 
					
						
							|  |  |  |         undefined -> | 
					
						
							|  |  |  |             PortVirtualHostMapping = rabbit_runtime_parameters:value_global( | 
					
						
							|  |  |  |                 mqtt_port_to_vhost_mapping | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |             case get_vhost_from_port_mapping(Port, PortVirtualHostMapping) of | 
					
						
							|  |  |  |                 undefined -> | 
					
						
							|  |  |  |                     {vhost_in_username_or_default, get_vhost_username(UserBin)}; | 
					
						
							|  |  |  |                 VHostFromPortMapping -> | 
					
						
							|  |  |  |                     {port_to_vhost_mapping, {VHostFromPortMapping, UserBin}} | 
					
						
							|  |  |  |             end; | 
					
						
							|  |  |  |         VHostFromCertMapping -> | 
					
						
							|  |  |  |             {cert_to_vhost_mapping, {VHostFromCertMapping, UserBin}} | 
					
						
							|  |  |  |     end. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | vhost_in_username(UserBin) -> | 
					
						
							|  |  |  |     case application:get_env(?APP, ignore_colons_in_username) of | 
					
						
							|  |  |  |         {ok, true} -> false; | 
					
						
							|  |  |  |         _ -> | 
					
						
							|  |  |  |             %% split at the last colon, disallowing colons in username
 | 
					
						
							|  |  |  |             case re:split(UserBin, ":(?!.*?:)") of | 
					
						
							|  |  |  |                 [_, _]      -> true; | 
					
						
							|  |  |  |                 [UserBin]   -> false | 
					
						
							| 
									
										
										
										
											2016-12-09 23:48:47 +08:00
										 |  |  |             end | 
					
						
							|  |  |  |     end. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2013-11-18 20:00:47 +08:00
										 |  |  | get_vhost_username(UserBin) -> | 
					
						
							| 
									
										
										
										
											2015-11-19 01:38:35 +08:00
										 |  |  |     Default = {rabbit_mqtt_util:env(vhost), UserBin}, | 
					
						
							|  |  |  |     case application:get_env(?APP, ignore_colons_in_username) of | 
					
						
							|  |  |  |         {ok, true} -> Default; | 
					
						
							|  |  |  |         _ -> | 
					
						
							|  |  |  |             %% split at the last colon, disallowing colons in username
 | 
					
						
							|  |  |  |             case re:split(UserBin, ":(?!.*?:)") of | 
					
						
							|  |  |  |                 [Vhost, UserName] -> {Vhost,  UserName}; | 
					
						
							|  |  |  |                 [UserBin]         -> Default | 
					
						
							|  |  |  |             end | 
					
						
							| 
									
										
										
										
											2013-11-18 20:00:47 +08:00
										 |  |  |     end. | 
					
						
							| 
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-12-19 22:00:43 +08:00
										 |  |  | get_vhost_from_user_mapping(_User, not_found) -> | 
					
						
							|  |  |  |     undefined; | 
					
						
							|  |  |  | get_vhost_from_user_mapping(User, Mapping) -> | 
					
						
							| 
									
										
										
										
											2018-01-03 03:29:24 +08:00
										 |  |  |     M = rabbit_data_coercion:to_proplist(Mapping), | 
					
						
							|  |  |  |     case rabbit_misc:pget(User, M) of | 
					
						
							| 
									
										
										
										
											2016-12-19 22:00:43 +08:00
										 |  |  |         undefined -> | 
					
						
							| 
									
										
										
										
											2016-12-13 21:46:15 +08:00
										 |  |  |             undefined; | 
					
						
							| 
									
										
										
										
											2016-12-19 22:00:43 +08:00
										 |  |  |         VHost -> | 
					
						
							|  |  |  |             VHost | 
					
						
							| 
									
										
										
										
											2016-12-13 21:46:15 +08:00
										 |  |  |     end. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-12-19 22:00:43 +08:00
										 |  |  | get_vhost_from_port_mapping(_Port, not_found) -> | 
					
						
							|  |  |  |     undefined; | 
					
						
							|  |  |  | get_vhost_from_port_mapping(Port, Mapping) -> | 
					
						
							| 
									
										
										
										
											2018-01-03 03:29:24 +08:00
										 |  |  |     M = rabbit_data_coercion:to_proplist(Mapping), | 
					
						
							|  |  |  |     Res = case rabbit_misc:pget(rabbit_data_coercion:to_binary(Port), M) of | 
					
						
							| 
									
										
										
										
											2016-12-19 22:00:43 +08:00
										 |  |  |         undefined -> | 
					
						
							|  |  |  |             undefined; | 
					
						
							|  |  |  |         VHost -> | 
					
						
							|  |  |  |             VHost | 
					
						
							|  |  |  |     end, | 
					
						
							|  |  |  |     Res. | 
					
						
							| 
									
										
										
										
											2016-12-13 21:46:15 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-12-20 05:42:06 +08:00
										 |  |  | human_readable_vhost_lookup_strategy(vhost_in_username_or_default) -> | 
					
						
							|  |  |  |     "vhost in username or default"; | 
					
						
							|  |  |  | human_readable_vhost_lookup_strategy(port_to_vhost_mapping) -> | 
					
						
							|  |  |  |     "MQTT port to vhost mapping"; | 
					
						
							|  |  |  | human_readable_vhost_lookup_strategy(cert_to_vhost_mapping) -> | 
					
						
							|  |  |  |     "client certificate to vhost mapping"; | 
					
						
							|  |  |  | human_readable_vhost_lookup_strategy(default_vhost) -> | 
					
						
							|  |  |  |     "plugin configuration or default"; | 
					
						
							|  |  |  | human_readable_vhost_lookup_strategy(Val) -> | 
					
						
							|  |  |  |      atom_to_list(Val). | 
					
						
							| 
									
										
										
										
											2016-12-13 21:46:15 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-11-21 22:54:33 +08:00
										 |  |  | creds(User, Pass, SSLLoginName) -> | 
					
						
							| 
									
										
										
										
											2014-11-28 23:08:42 +08:00
										 |  |  |     DefaultUser   = rabbit_mqtt_util:env(default_user), | 
					
						
							|  |  |  |     DefaultPass   = rabbit_mqtt_util:env(default_pass), | 
					
						
							|  |  |  |     {ok, Anon}    = application:get_env(?APP, allow_anonymous), | 
					
						
							|  |  |  |     {ok, TLSAuth} = application:get_env(?APP, ssl_cert_login), | 
					
						
							| 
									
										
										
										
											2016-09-01 23:54:51 +08:00
										 |  |  |     HaveDefaultCreds = Anon =:= true andalso | 
					
						
							|  |  |  |                        is_binary(DefaultUser) andalso | 
					
						
							|  |  |  |                        is_binary(DefaultPass), | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     CredentialsProvided = User =/= undefined orelse | 
					
						
							|  |  |  |                           Pass =/= undefined, | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     CorrectCredentials = is_list(User) andalso | 
					
						
							|  |  |  |                          is_list(Pass), | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     SSLLoginProvided = TLSAuth =:= true andalso | 
					
						
							|  |  |  |                        SSLLoginName =/= none, | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     case {CredentialsProvided, CorrectCredentials, SSLLoginProvided, HaveDefaultCreds} of | 
					
						
							| 
									
										
										
										
											2016-09-02 06:33:34 +08:00
										 |  |  |         %% Username and password take priority
 | 
					
						
							| 
									
										
										
										
											2016-09-01 23:54:51 +08:00
										 |  |  |         {true, true, _, _}          -> {list_to_binary(User), | 
					
						
							|  |  |  |                                         list_to_binary(Pass)}; | 
					
						
							|  |  |  |         %% Either username or password is provided
 | 
					
						
							| 
									
										
										
										
											2016-09-02 06:33:34 +08:00
										 |  |  |         {true, false, _, _}         -> {invalid_creds, {User, Pass}}; | 
					
						
							| 
									
										
										
										
											2016-09-01 23:54:51 +08:00
										 |  |  |         %% rabbitmq_mqtt.ssl_cert_login is true. SSL user name provided.
 | 
					
						
							| 
									
										
										
										
											2016-09-02 06:33:34 +08:00
										 |  |  |         %% Authenticating using username only.
 | 
					
						
							| 
									
										
										
										
											2016-09-01 23:54:51 +08:00
										 |  |  |         {false, false, true, _}     -> {SSLLoginName, none}; | 
					
						
							| 
									
										
										
										
											2016-09-02 06:33:34 +08:00
										 |  |  |         %% Anonymous connection uses default credentials
 | 
					
						
							| 
									
										
										
										
											2016-09-01 23:54:51 +08:00
										 |  |  |         {false, false, false, true} -> {DefaultUser, DefaultPass}; | 
					
						
							|  |  |  |         _                           -> nocreds | 
					
						
							| 
									
										
										
										
											2012-07-16 21:57:31 +08:00
										 |  |  |     end. | 
					
						
							| 
									
										
										
										
											2012-07-04 00:35:18 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  | supported_subs_qos(?QOS_0) -> ?QOS_0; | 
					
						
							|  |  |  | supported_subs_qos(?QOS_1) -> ?QOS_1; | 
					
						
							|  |  |  | supported_subs_qos(?QOS_2) -> ?QOS_1. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2014-01-30 23:33:59 +08:00
										 |  |  | delivery_mode(?QOS_0) -> 1; | 
					
						
							| 
									
										
										
										
											2019-11-06 00:54:20 +08:00
										 |  |  | delivery_mode(?QOS_1) -> 2; | 
					
						
							|  |  |  | delivery_mode(?QOS_2) -> 2. | 
					
						
							| 
									
										
										
										
											2014-01-30 23:33:59 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-03-10 17:49:03 +08:00
										 |  |  | maybe_quorum(Qos1Args, CleanSession, Queue) -> | 
					
						
							| 
									
										
										
										
											2022-03-31 23:48:00 +08:00
										 |  |  |     case {rabbit_mqtt_util:env(durable_queue_type), CleanSession} of | 
					
						
							| 
									
										
										
										
											2022-03-10 17:49:03 +08:00
										 |  |  |       %% it is possible to Quorum queues only if Clean Session == False
 | 
					
						
							|  |  |  |       %% else always use Classic queues
 | 
					
						
							|  |  |  |       %% Clean Session == True sets auto-delete to True and quorum queues
 | 
					
						
							|  |  |  |       %% does not support auto-delete flag
 | 
					
						
							|  |  |  |        {quorum, false} -> lists:append(Qos1Args, | 
					
						
							|  |  |  |           [{<<"x-queue-type">>, longstr, <<"quorum">>}]); | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       {quorum, true} -> | 
					
						
							|  |  |  |           rabbit_log:debug("Can't use quorum queue for ~s. " ++ | 
					
						
							|  |  |  |           "The clean session is true. Classic queue will be used", [Queue]), | 
					
						
							|  |  |  |           Qos1Args; | 
					
						
							|  |  |  |       _ -> Qos1Args | 
					
						
							|  |  |  |     end. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  | %% different qos subscriptions are received in different queues
 | 
					
						
							|  |  |  | %% with appropriate durability and timeout arguments
 | 
					
						
							|  |  |  | %% this will lead to duplicate messages for overlapping subscriptions
 | 
					
						
							|  |  |  | %% with different qos values - todo: prevent duplicates
 | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  | ensure_queue(Qos, #proc_state{ channels      = {Channel, _}, | 
					
						
							|  |  |  |                                client_id     = ClientId, | 
					
						
							|  |  |  |                                clean_sess    = CleanSess, | 
					
						
							|  |  |  |                           consumer_tags = {TagQ0, TagQ1} = Tags} = PState) -> | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |     {QueueQ0, QueueQ1} = rabbit_mqtt_util:subcription_queue_name(ClientId), | 
					
						
							|  |  |  |     Qos1Args = case {rabbit_mqtt_util:env(subscription_ttl), CleanSess} of | 
					
						
							| 
									
										
										
										
											2012-11-06 18:32:38 +08:00
										 |  |  |                    {undefined, _} -> | 
					
						
							|  |  |  |                        []; | 
					
						
							|  |  |  |                    {Ms, false} when is_integer(Ms) -> | 
					
						
							|  |  |  |                        [{<<"x-expires">>, long, Ms}]; | 
					
						
							|  |  |  |                    _ -> | 
					
						
							|  |  |  |                        [] | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |                end, | 
					
						
							|  |  |  |     QueueSetup = | 
					
						
							|  |  |  |         case {TagQ0, TagQ1, Qos} of | 
					
						
							|  |  |  |             {undefined, _, ?QOS_0} -> | 
					
						
							|  |  |  |                 {QueueQ0, | 
					
						
							|  |  |  |                  #'queue.declare'{ queue       = QueueQ0, | 
					
						
							|  |  |  |                                    durable     = false, | 
					
						
							|  |  |  |                                    auto_delete = true }, | 
					
						
							|  |  |  |                  #'basic.consume'{ queue  = QueueQ0, | 
					
						
							|  |  |  |                                    no_ack = true }}; | 
					
						
							|  |  |  |             {_, undefined, ?QOS_1} -> | 
					
						
							|  |  |  |                 {QueueQ1, | 
					
						
							|  |  |  |                  #'queue.declare'{ queue       = QueueQ1, | 
					
						
							|  |  |  |                                    durable     = true, | 
					
						
							| 
									
										
										
										
											2015-10-22 01:18:33 +08:00
										 |  |  |                                    %% Clean session means a transient connection,
 | 
					
						
							|  |  |  |                                    %% translating into auto-delete.
 | 
					
						
							|  |  |  |                                    %%
 | 
					
						
							|  |  |  |                                    %% see rabbitmq/rabbitmq-mqtt#37
 | 
					
						
							|  |  |  |                                    auto_delete = CleanSess, | 
					
						
							| 
									
										
										
										
											2022-03-10 17:49:03 +08:00
										 |  |  |                                    arguments   = maybe_quorum(Qos1Args, CleanSess, QueueQ1)}, | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |                  #'basic.consume'{ queue  = QueueQ1, | 
					
						
							|  |  |  |                                    no_ack = false }}; | 
					
						
							|  |  |  |             {_, _, ?QOS_0} -> | 
					
						
							|  |  |  |                 {exists, QueueQ0}; | 
					
						
							|  |  |  |             {_, _, ?QOS_1} -> | 
					
						
							|  |  |  |                 {exists, QueueQ1} | 
					
						
							|  |  |  |           end, | 
					
						
							|  |  |  |     case QueueSetup of | 
					
						
							|  |  |  |         {Queue, Declare, Consume} -> | 
					
						
							|  |  |  |             #'queue.declare_ok'{} = amqp_channel:call(Channel, Declare), | 
					
						
							|  |  |  |             #'basic.consume_ok'{ consumer_tag = Tag } = | 
					
						
							|  |  |  |                 amqp_channel:call(Channel, Consume), | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |             {Queue, PState #proc_state{ consumer_tags = setelement(Qos+1, Tags, Tag) }}; | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |         {exists, Q} -> | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |             {Q, PState} | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |     end. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-12-07 18:21:47 +08:00
										 |  |  | send_will(PState = #proc_state{will_msg = undefined}) -> | 
					
						
							|  |  |  |     PState; | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-08-11 18:29:34 +08:00
										 |  |  | send_will(PState = #proc_state{will_msg = WillMsg = #mqtt_msg{retain = Retain, | 
					
						
							|  |  |  |                                                               topic = Topic}, | 
					
						
							|  |  |  |                                retainer_pid = RPid, | 
					
						
							| 
									
										
										
										
											2019-09-04 23:07:33 +08:00
										 |  |  |                                channels = {ChQos0, ChQos1}, | 
					
						
							|  |  |  |                                amqp2mqtt_fun = Amqp2MqttFun}) -> | 
					
						
							| 
									
										
										
										
											2017-02-21 23:13:07 +08:00
										 |  |  |     case check_topic_access(Topic, write, PState) of | 
					
						
							|  |  |  |         ok -> | 
					
						
							|  |  |  |             amqp_pub(WillMsg, PState), | 
					
						
							|  |  |  |             case Retain of | 
					
						
							|  |  |  |                 false -> ok; | 
					
						
							| 
									
										
										
										
											2019-09-04 23:07:33 +08:00
										 |  |  |                 true  -> | 
					
						
							|  |  |  |                     hand_off_to_retainer(RPid, Amqp2MqttFun, Topic, WillMsg) | 
					
						
							| 
									
										
										
										
											2017-02-21 23:13:07 +08:00
										 |  |  |             end; | 
					
						
							|  |  |  |         Error  -> | 
					
						
							|  |  |  |             rabbit_log:warning( | 
					
						
							| 
									
										
										
										
											2021-03-11 21:45:32 +08:00
										 |  |  |                 "Could not send last will: ~p", | 
					
						
							| 
									
										
										
										
											2017-02-21 23:13:07 +08:00
										 |  |  |                 [Error]) | 
					
						
							| 
									
										
										
										
											2016-12-07 18:21:47 +08:00
										 |  |  |     end, | 
					
						
							| 
									
										
										
										
											2017-08-11 18:29:34 +08:00
										 |  |  |     case ChQos1 of | 
					
						
							|  |  |  |         undefined -> ok; | 
					
						
							|  |  |  |         _         -> amqp_channel:close(ChQos1) | 
					
						
							|  |  |  |     end, | 
					
						
							|  |  |  |     case ChQos0 of | 
					
						
							|  |  |  |         undefined -> ok; | 
					
						
							|  |  |  |         _         -> amqp_channel:close(ChQos0) | 
					
						
							|  |  |  |     end, | 
					
						
							|  |  |  |     PState #proc_state{ channels = {undefined, undefined} }. | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2021-10-26 22:29:41 +08:00
										 |  |  | %% TODO amqp_pub/2 is publishing messages asynchronously, using
 | 
					
						
							|  |  |  | %% amqp_channel:cast_flow/3
 | 
					
						
							|  |  |  | %%
 | 
					
						
							|  |  |  | %% It does access check using check_publish/3 before submitting, but
 | 
					
						
							|  |  |  | %% this is superfluous, as actual publishing will do the same
 | 
					
						
							|  |  |  | %% check. While check results cached, it's still some unnecessary
 | 
					
						
							|  |  |  | %% work.
 | 
					
						
							|  |  |  | %%
 | 
					
						
							|  |  |  | %% And the only reason to keep it that way is that it prevents useless
 | 
					
						
							|  |  |  | %% crash messages flooding logs, as there is no code to handle async
 | 
					
						
							|  |  |  | %% channel crash gracefully.
 | 
					
						
							|  |  |  | %%
 | 
					
						
							|  |  |  | %% It'd be better to rework the whole thing, removing performance
 | 
					
						
							|  |  |  | %% penalty and some 50 lines of duplicate code. Maybe unlinking from
 | 
					
						
							|  |  |  | %% channel, and adding it as a child of connection supervisor instead.
 | 
					
						
							|  |  |  | %% But exact details are not yet clear.
 | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  | amqp_pub(undefined, PState) -> | 
					
						
							|  |  |  |     PState; | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | %% set up a qos1 publishing channel if necessary
 | 
					
						
							|  |  |  | %% this channel will only be used for publishing, not consuming
 | 
					
						
							|  |  |  | amqp_pub(Msg   = #mqtt_msg{ qos = ?QOS_1 }, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |          PState = #proc_state{ channels       = {ChQos0, undefined}, | 
					
						
							|  |  |  |                                awaiting_seqno = undefined, | 
					
						
							|  |  |  |                                connection     = Conn }) -> | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |     {ok, Channel} = amqp_connection:open_channel(Conn), | 
					
						
							|  |  |  |     #'confirm.select_ok'{} = amqp_channel:call(Channel, #'confirm.select'{}), | 
					
						
							|  |  |  |     amqp_channel:register_confirm_handler(Channel, self()), | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |     amqp_pub(Msg, PState #proc_state{ channels       = {ChQos0, Channel}, | 
					
						
							|  |  |  |                                       awaiting_seqno = 1 }); | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-08-17 22:05:14 +08:00
										 |  |  | amqp_pub(#mqtt_msg{ qos        = Qos, | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |                     topic      = Topic, | 
					
						
							|  |  |  |                     dup        = Dup, | 
					
						
							|  |  |  |                     message_id = MessageId, | 
					
						
							|  |  |  |                     payload    = Payload }, | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |          PState = #proc_state{ channels       = {ChQos0, ChQos1}, | 
					
						
							|  |  |  |                                exchange       = Exchange, | 
					
						
							|  |  |  |                                unacked_pubs   = UnackedPubs, | 
					
						
							| 
									
										
										
										
											2019-09-04 23:07:33 +08:00
										 |  |  |                                awaiting_seqno = SeqNo, | 
					
						
							|  |  |  |                                mqtt2amqp_fun  = Mqtt2AmqpFun }) -> | 
					
						
							|  |  |  |     RoutingKey = Mqtt2AmqpFun(Topic), | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |     Method = #'basic.publish'{ exchange    = Exchange, | 
					
						
							| 
									
										
										
										
											2019-09-04 23:07:33 +08:00
										 |  |  |                                routing_key = RoutingKey }, | 
					
						
							| 
									
										
										
										
											2012-11-06 18:32:38 +08:00
										 |  |  |     Headers = [{<<"x-mqtt-publish-qos">>, byte, Qos}, | 
					
						
							|  |  |  |                {<<"x-mqtt-dup">>, bool, Dup}], | 
					
						
							| 
									
										
										
										
											2014-01-30 23:33:59 +08:00
										 |  |  |     Msg = #amqp_msg{ props   = #'P_basic'{ headers       = Headers, | 
					
						
							|  |  |  |                                            delivery_mode = delivery_mode(Qos)}, | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  |                      payload = Payload }, | 
					
						
							|  |  |  |     {UnackedPubs1, Ch, SeqNo1} = | 
					
						
							|  |  |  |         case Qos =:= ?QOS_1 andalso MessageId =/= undefined of | 
					
						
							|  |  |  |             true  -> {gb_trees:enter(SeqNo, MessageId, UnackedPubs), ChQos1, | 
					
						
							|  |  |  |                       SeqNo + 1}; | 
					
						
							|  |  |  |             false -> {UnackedPubs, ChQos0, SeqNo} | 
					
						
							|  |  |  |         end, | 
					
						
							|  |  |  |     amqp_channel:cast_flow(Ch, Method, Msg), | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |     PState #proc_state{ unacked_pubs   = UnackedPubs1, | 
					
						
							|  |  |  |                         awaiting_seqno = SeqNo1 }. | 
					
						
							| 
									
										
										
										
											2012-08-06 06:52:54 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-02-02 22:31:28 +08:00
										 |  |  | adapter_info(Sock, ProtoName) -> | 
					
						
							|  |  |  |     amqp_connection:socket_adapter_info(Sock, {ProtoName, "N/A"}). | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2016-03-04 18:34:53 +08:00
										 |  |  | set_proto_version(AdapterInfo = #amqp_adapter_info{protocol = {Proto, _}}, Vsn) -> | 
					
						
							|  |  |  |     AdapterInfo#amqp_adapter_info{protocol = {Proto, | 
					
						
							| 
									
										
										
										
											2016-02-02 22:31:28 +08:00
										 |  |  |         human_readable_mqtt_version(Vsn)}}. | 
					
						
							| 
									
										
										
										
											2014-08-25 17:43:11 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | human_readable_mqtt_version(3) -> | 
					
						
							|  |  |  |     "3.1.0"; | 
					
						
							|  |  |  | human_readable_mqtt_version(4) -> | 
					
						
							|  |  |  |     "3.1.1"; | 
					
						
							|  |  |  | human_readable_mqtt_version(_) -> | 
					
						
							|  |  |  |     "N/A". | 
					
						
							| 
									
										
										
										
											2012-08-17 00:56:50 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-02-05 07:49:36 +08:00
										 |  |  | serialise_and_send_to_client(Frame, #proc_state{ socket = Sock }) -> | 
					
						
							|  |  |  |     try rabbit_net:port_command(Sock, rabbit_mqtt_frame:serialise(Frame)) of | 
					
						
							|  |  |  |       Res -> | 
					
						
							|  |  |  |         Res | 
					
						
							|  |  |  |     catch _:Error -> | 
					
						
							|  |  |  |       rabbit_log_connection:error("MQTT: a socket write failed, the socket might already be closed"), | 
					
						
							|  |  |  |       rabbit_log_connection:debug("Failed to write to socket ~p, error: ~p, frame: ~p", | 
					
						
							|  |  |  |                                   [Sock, Error, Frame]) | 
					
						
							|  |  |  |     end. | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | close_connection(PState = #proc_state{ connection = undefined }) -> | 
					
						
							|  |  |  |     PState; | 
					
						
							| 
									
										
										
										
											2013-09-16 18:36:52 +08:00
										 |  |  | close_connection(PState = #proc_state{ connection = Connection, | 
					
						
							|  |  |  |                                        client_id  = ClientId }) -> | 
					
						
							|  |  |  |     % todo: maybe clean session
 | 
					
						
							|  |  |  |     case ClientId of | 
					
						
							|  |  |  |         undefined -> ok; | 
					
						
							| 
									
										
										
										
											2020-02-22 02:42:39 +08:00
										 |  |  |         _         -> | 
					
						
							|  |  |  |             case rabbit_mqtt_collector:unregister(ClientId, self()) of | 
					
						
							|  |  |  |                 ok           -> ok; | 
					
						
							|  |  |  |                 %% ignore as we are shutting down
 | 
					
						
							|  |  |  |                 {timeout, _} -> ok | 
					
						
							|  |  |  |             end | 
					
						
							| 
									
										
										
										
											2013-09-16 18:36:52 +08:00
										 |  |  |     end, | 
					
						
							| 
									
										
										
										
											2019-07-29 21:59:19 +08:00
										 |  |  |     %% ignore noproc or other exceptions, we are shutting down
 | 
					
						
							| 
									
										
										
										
											2012-08-21 00:30:13 +08:00
										 |  |  |     catch amqp_connection:close(Connection), | 
					
						
							|  |  |  |     PState #proc_state{ channels   = {undefined, undefined}, | 
					
						
							|  |  |  |                         connection = undefined }. | 
					
						
							| 
									
										
										
										
											2015-06-30 16:08:26 +08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-10-30 21:32:46 +08:00
										 |  |  | handle_pre_hibernate() -> | 
					
						
							|  |  |  |     erase(topic_permission_cache), | 
					
						
							|  |  |  |     ok. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-02-11 01:05:43 +08:00
										 |  |  | handle_ra_event({applied, [{Corr, ok}]}, | 
					
						
							|  |  |  |                 PState = #proc_state{register_state = {pending, Corr}}) -> | 
					
						
							|  |  |  |     %% success case - command was applied transition into registered state
 | 
					
						
							|  |  |  |     PState#proc_state{register_state = registered}; | 
					
						
							|  |  |  | handle_ra_event({not_leader, Leader, Corr}, | 
					
						
							|  |  |  |                 PState = #proc_state{register_state = {pending, Corr}, | 
					
						
							|  |  |  |                                      client_id = ClientId}) -> | 
					
						
							|  |  |  |     %% retry command against actual leader
 | 
					
						
							|  |  |  |     {ok, NewCorr} = rabbit_mqtt_collector:register(Leader, ClientId, self()), | 
					
						
							|  |  |  |     PState#proc_state{register_state = {pending, NewCorr}}; | 
					
						
							|  |  |  | handle_ra_event(register_timeout, | 
					
						
							|  |  |  |                 PState = #proc_state{register_state = {pending, _Corr}, | 
					
						
							|  |  |  |                                      client_id = ClientId}) -> | 
					
						
							|  |  |  |     {ok, NewCorr} = rabbit_mqtt_collector:register(ClientId, self()), | 
					
						
							|  |  |  |     PState#proc_state{register_state = {pending, NewCorr}}; | 
					
						
							|  |  |  | handle_ra_event(register_timeout, PState) -> | 
					
						
							|  |  |  |     PState; | 
					
						
							|  |  |  | handle_ra_event(Evt, PState) -> | 
					
						
							|  |  |  |     %% log these?
 | 
					
						
							| 
									
										
										
										
											2021-03-11 21:45:32 +08:00
										 |  |  |     rabbit_log:debug("unhandled ra_event: ~w ", [Evt]), | 
					
						
							| 
									
										
										
										
											2020-02-11 01:05:43 +08:00
										 |  |  |     PState. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-01-16 16:54:16 +08:00
										 |  |  | check_publish(TopicName, Fn, PState) -> | 
					
						
							| 
									
										
										
										
											2015-06-30 16:08:26 +08:00
										 |  |  |   case check_topic_access(TopicName, write, PState) of | 
					
						
							| 
									
										
										
										
											2015-07-14 17:47:39 +08:00
										 |  |  |     ok -> Fn(); | 
					
						
							| 
									
										
										
										
											2019-07-29 21:59:19 +08:00
										 |  |  |     _ -> {error, unauthorized, PState} | 
					
						
							| 
									
										
										
										
											2015-06-30 16:08:26 +08:00
										 |  |  |   end. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2017-01-23 19:10:23 +08:00
										 |  |  | check_topic_access(TopicName, Access, | 
					
						
							| 
									
										
										
										
											2016-12-29 23:34:19 +08:00
										 |  |  |                    #proc_state{ | 
					
						
							| 
									
										
										
										
											2017-06-07 20:41:59 +08:00
										 |  |  |                         auth_state = #auth_state{user = User = #user{username = Username}, | 
					
						
							| 
									
										
										
										
											2016-12-29 23:34:19 +08:00
										 |  |  |                                                  vhost = VHost}, | 
					
						
							| 
									
										
										
										
											2017-06-07 20:41:59 +08:00
										 |  |  |                         exchange = Exchange, | 
					
						
							| 
									
										
										
										
											2019-09-04 23:07:33 +08:00
										 |  |  |                         client_id = ClientId, | 
					
						
							|  |  |  |                         mqtt2amqp_fun = Mqtt2AmqpFun }) -> | 
					
						
							| 
									
										
										
										
											2019-10-30 21:32:46 +08:00
										 |  |  |     Cache = | 
					
						
							|  |  |  |         case get(topic_permission_cache) of | 
					
						
							|  |  |  |             undefined -> []; | 
					
						
							|  |  |  |             Other     -> Other | 
					
						
							|  |  |  |         end, | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     Key = {TopicName, Username, ClientId, VHost, Exchange, Access}, | 
					
						
							|  |  |  |     case lists:member(Key, Cache) of | 
					
						
							|  |  |  |         true -> | 
					
						
							|  |  |  |             ok; | 
					
						
							|  |  |  |         false -> | 
					
						
							|  |  |  |             Resource = #resource{virtual_host = VHost, | 
					
						
							|  |  |  |                                  kind = topic, | 
					
						
							|  |  |  |                                  name = Exchange}, | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             RoutingKey = Mqtt2AmqpFun(TopicName), | 
					
						
							|  |  |  |             Context = #{routing_key  => RoutingKey, | 
					
						
							|  |  |  |                         variable_map => #{ | 
					
						
							|  |  |  |                                           <<"username">>  => Username, | 
					
						
							|  |  |  |                                           <<"vhost">>     => VHost, | 
					
						
							|  |  |  |                                           <<"client_id">> => rabbit_data_coercion:to_binary(ClientId) | 
					
						
							|  |  |  |                                          } | 
					
						
							|  |  |  |                        }, | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             try rabbit_access_control:check_topic_access(User, Resource, Access, Context) of | 
					
						
							|  |  |  |                 ok -> | 
					
						
							| 
									
										
										
										
											2019-10-30 22:26:57 +08:00
										 |  |  |                     CacheTail = lists:sublist(Cache, ?MAX_TOPIC_PERMISSION_CACHE_SIZE - 1), | 
					
						
							|  |  |  |                     put(topic_permission_cache, [Key | CacheTail]), | 
					
						
							| 
									
										
										
										
											2019-10-30 21:32:46 +08:00
										 |  |  |                     ok; | 
					
						
							|  |  |  |                 R -> | 
					
						
							|  |  |  |                     R | 
					
						
							|  |  |  |             catch | 
					
						
							|  |  |  |                 _:{amqp_error, access_refused, Msg, _} -> | 
					
						
							| 
									
										
										
										
											2021-03-11 21:45:32 +08:00
										 |  |  |                     rabbit_log:error("operation resulted in an error (access_refused): ~p", [Msg]), | 
					
						
							| 
									
										
										
										
											2019-10-30 21:32:46 +08:00
										 |  |  |                     {error, access_refused}; | 
					
						
							|  |  |  |                 _:Error -> | 
					
						
							| 
									
										
										
										
											2021-03-11 21:45:32 +08:00
										 |  |  |                     rabbit_log:error("~p", [Error]), | 
					
						
							| 
									
										
										
										
											2019-10-30 21:32:46 +08:00
										 |  |  |                     {error, access_refused} | 
					
						
							|  |  |  |             end | 
					
						
							|  |  |  |     end. | 
					
						
							| 
									
										
										
										
											2016-12-05 22:58:19 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | info(consumer_tags, #proc_state{consumer_tags = Val}) -> Val; | 
					
						
							|  |  |  | info(unacked_pubs, #proc_state{unacked_pubs = Val}) -> Val; | 
					
						
							|  |  |  | info(awaiting_ack, #proc_state{awaiting_ack = Val}) -> Val; | 
					
						
							|  |  |  | info(awaiting_seqno, #proc_state{awaiting_seqno = Val}) -> Val; | 
					
						
							|  |  |  | info(message_id, #proc_state{message_id = Val}) -> Val; | 
					
						
							| 
									
										
										
										
											2016-12-06 22:24:33 +08:00
										 |  |  | info(client_id, #proc_state{client_id = Val}) -> | 
					
						
							|  |  |  |     rabbit_data_coercion:to_binary(Val); | 
					
						
							| 
									
										
										
										
											2016-12-05 22:58:19 +08:00
										 |  |  | info(clean_sess, #proc_state{clean_sess = Val}) -> Val; | 
					
						
							|  |  |  | info(will_msg, #proc_state{will_msg = Val}) -> Val; | 
					
						
							|  |  |  | info(channels, #proc_state{channels = Val}) -> Val; | 
					
						
							|  |  |  | info(exchange, #proc_state{exchange = Val}) -> Val; | 
					
						
							|  |  |  | info(adapter_info, #proc_state{adapter_info = Val}) -> Val; | 
					
						
							|  |  |  | info(ssl_login_name, #proc_state{ssl_login_name = Val}) -> Val; | 
					
						
							|  |  |  | info(retainer_pid, #proc_state{retainer_pid = Val}) -> Val; | 
					
						
							|  |  |  | info(user, #proc_state{auth_state = #auth_state{username = Val}}) -> Val; | 
					
						
							|  |  |  | info(vhost, #proc_state{auth_state = #auth_state{vhost = Val}}) -> Val; | 
					
						
							|  |  |  | info(host, #proc_state{adapter_info = #amqp_adapter_info{host = Val}}) -> Val; | 
					
						
							|  |  |  | info(port, #proc_state{adapter_info = #amqp_adapter_info{port = Val}}) -> Val; | 
					
						
							|  |  |  | info(peer_host, #proc_state{adapter_info = #amqp_adapter_info{peer_host = Val}}) -> Val; | 
					
						
							|  |  |  | info(peer_port, #proc_state{adapter_info = #amqp_adapter_info{peer_port = Val}}) -> Val; | 
					
						
							| 
									
										
										
										
											2016-12-06 22:24:33 +08:00
										 |  |  | info(protocol, #proc_state{adapter_info = #amqp_adapter_info{protocol = Val}}) -> | 
					
						
							|  |  |  |     case Val of | 
					
						
							|  |  |  |         {Proto, Version} -> {Proto, rabbit_data_coercion:to_binary(Version)}; | 
					
						
							|  |  |  |         Other -> Other | 
					
						
							|  |  |  |     end; | 
					
						
							| 
									
										
										
										
											2016-12-05 22:58:19 +08:00
										 |  |  | info(channels, PState) -> additional_info(channels, PState); | 
					
						
							|  |  |  | info(channel_max, PState) -> additional_info(channel_max, PState); | 
					
						
							|  |  |  | info(frame_max, PState) -> additional_info(frame_max, PState); | 
					
						
							|  |  |  | info(client_properties, PState) -> additional_info(client_properties, PState); | 
					
						
							|  |  |  | info(ssl, PState) -> additional_info(ssl, PState); | 
					
						
							|  |  |  | info(ssl_protocol, PState) -> additional_info(ssl_protocol, PState); | 
					
						
							|  |  |  | info(ssl_key_exchange, PState) -> additional_info(ssl_key_exchange, PState); | 
					
						
							|  |  |  | info(ssl_cipher, PState) -> additional_info(ssl_cipher, PState); | 
					
						
							|  |  |  | info(ssl_hash, PState) -> additional_info(ssl_hash, PState); | 
					
						
							|  |  |  | info(Other, _) -> throw({bad_argument, Other}). | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | additional_info(Key, | 
					
						
							|  |  |  |                 #proc_state{adapter_info = | 
					
						
							|  |  |  |                             #amqp_adapter_info{additional_info = AddInfo}}) -> | 
					
						
							|  |  |  |     proplists:get_value(Key, AddInfo). | 
					
						
							| 
									
										
										
										
											2022-03-10 17:49:03 +08:00
										 |  |  | 
 | 
					
						
							|  |  |  | notify_received(undefined) -> | 
					
						
							|  |  |  |     %% no notification for quorum queues and streams
 | 
					
						
							|  |  |  |     ok; | 
					
						
							|  |  |  | notify_received(DeliveryCtx) -> | 
					
						
							|  |  |  |     %% notification for flow control
 | 
					
						
							|  |  |  |     amqp_channel:notify_received(DeliveryCtx). |