2020-07-14 00:39:36 +08:00
|
|
|
%% This Source Code Form is subject to the terms of the Mozilla Public
|
|
|
|
|
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
|
|
|
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
2012-06-27 00:57:24 +08:00
|
|
|
%%
|
2022-03-21 05:21:56 +08:00
|
|
|
%% Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
|
2012-06-27 00:57:24 +08:00
|
|
|
%%
|
|
|
|
|
|
|
|
|
|
-module(rabbit_mqtt_processor).
|
|
|
|
|
|
2018-12-04 21:50:32 +08:00
|
|
|
-export([info/2, initial_state/2, initial_state/5,
|
2012-08-21 00:30:13 +08:00
|
|
|
process_frame/2, amqp_pub/2, amqp_callback/2, send_will/1,
|
2020-02-11 01:05:43 +08:00
|
|
|
close_connection/1, handle_pre_hibernate/0,
|
|
|
|
|
handle_ra_event/2]).
|
2012-06-27 00:57:24 +08:00
|
|
|
|
2015-11-19 01:38:35 +08:00
|
|
|
%% for testing purposes
|
2019-01-22 17:30:25 +08:00
|
|
|
-export([get_vhost_username/1, get_vhost/3, get_vhost_from_user_mapping/2,
|
|
|
|
|
add_client_id_to_adapter_info/2]).
|
2015-11-19 01:38:35 +08:00
|
|
|
|
2012-06-27 00:57:24 +08:00
|
|
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
2012-09-12 21:34:41 +08:00
|
|
|
-include("rabbit_mqtt_frame.hrl").
|
|
|
|
|
-include("rabbit_mqtt.hrl").
|
2012-07-13 23:32:13 +08:00
|
|
|
|
2014-11-28 23:08:42 +08:00
|
|
|
-define(APP, rabbitmq_mqtt).
|
2012-07-13 23:32:13 +08:00
|
|
|
-define(FRAME_TYPE(Frame, Type),
|
2012-08-21 00:30:13 +08:00
|
|
|
Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}).
|
2019-10-30 22:26:57 +08:00
|
|
|
-define(MAX_TOPIC_PERMISSION_CACHE_SIZE, 12).
|
2012-08-21 00:30:13 +08:00
|
|
|
|
2016-01-08 07:25:26 +08:00
|
|
|
initial_state(Socket, SSLLoginName) ->
|
2017-02-08 00:22:14 +08:00
|
|
|
RealSocket = rabbit_net:unwrap_socket(Socket),
|
2018-12-04 21:50:32 +08:00
|
|
|
{ok, {PeerAddr, _PeerPort}} = rabbit_net:peername(RealSocket),
|
2017-02-08 00:22:14 +08:00
|
|
|
initial_state(RealSocket, SSLLoginName,
|
2016-02-02 22:31:28 +08:00
|
|
|
adapter_info(Socket, 'MQTT'),
|
2019-02-05 07:49:36 +08:00
|
|
|
fun serialise_and_send_to_client/2, PeerAddr).
|
2016-01-07 23:19:01 +08:00
|
|
|
|
2016-02-02 23:03:24 +08:00
|
|
|
initial_state(Socket, SSLLoginName,
|
2016-03-04 18:34:53 +08:00
|
|
|
AdapterInfo0 = #amqp_adapter_info{additional_info = Extra},
|
2018-12-04 21:50:32 +08:00
|
|
|
SendFun, PeerAddr) ->
|
2019-09-04 23:07:33 +08:00
|
|
|
{ok, {mqtt2amqp_fun, M2A}, {amqp2mqtt_fun, A2M}} =
|
|
|
|
|
rabbit_mqtt_util:get_topic_translation_funs(),
|
2016-03-03 21:59:38 +08:00
|
|
|
%% MQTT connections use exactly one channel. The frame max is not
|
|
|
|
|
%% applicable and there is no way to know what client is used.
|
2016-03-04 18:34:53 +08:00
|
|
|
AdapterInfo = AdapterInfo0#amqp_adapter_info{additional_info = [
|
2016-02-02 23:03:24 +08:00
|
|
|
{channels, 1},
|
2016-03-03 21:59:38 +08:00
|
|
|
{channel_max, 1},
|
2016-03-04 18:58:59 +08:00
|
|
|
{frame_max, 0},
|
2016-03-04 18:34:53 +08:00
|
|
|
{client_properties,
|
2016-03-04 18:35:53 +08:00
|
|
|
[{<<"product">>, longstr, <<"MQTT client">>}]} | Extra]},
|
2016-01-07 23:19:01 +08:00
|
|
|
#proc_state{ unacked_pubs = gb_trees:empty(),
|
|
|
|
|
awaiting_ack = gb_trees:empty(),
|
|
|
|
|
message_id = 1,
|
2017-04-24 20:45:37 +08:00
|
|
|
subscriptions = #{},
|
2016-01-07 23:19:01 +08:00
|
|
|
consumer_tags = {undefined, undefined},
|
|
|
|
|
channels = {undefined, undefined},
|
|
|
|
|
exchange = rabbit_mqtt_util:env(exchange),
|
|
|
|
|
socket = Socket,
|
2016-02-02 22:31:28 +08:00
|
|
|
adapter_info = AdapterInfo,
|
2016-01-07 23:19:01 +08:00
|
|
|
ssl_login_name = SSLLoginName,
|
2018-12-04 21:50:32 +08:00
|
|
|
send_fun = SendFun,
|
2019-09-04 23:07:33 +08:00
|
|
|
peer_addr = PeerAddr,
|
|
|
|
|
mqtt2amqp_fun = M2A,
|
|
|
|
|
amqp2mqtt_fun = A2M}.
|
2012-08-21 00:30:13 +08:00
|
|
|
|
2012-08-06 06:52:54 +08:00
|
|
|
process_frame(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
|
2012-08-21 00:30:13 +08:00
|
|
|
PState = #proc_state{ connection = undefined } )
|
2012-08-06 06:52:54 +08:00
|
|
|
when Type =/= ?CONNECT ->
|
2014-07-04 01:36:17 +08:00
|
|
|
{error, connect_expected, PState};
|
2012-07-13 23:32:13 +08:00
|
|
|
process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
|
2014-11-28 22:40:16 +08:00
|
|
|
PState) ->
|
2021-10-26 22:29:41 +08:00
|
|
|
try process_request(Type, Frame, PState) of
|
2016-03-03 21:59:38 +08:00
|
|
|
{ok, PState1} -> {ok, PState1, PState1#proc_state.connection};
|
|
|
|
|
Ret -> Ret
|
2021-10-26 22:29:41 +08:00
|
|
|
catch
|
|
|
|
|
_:{{shutdown, {server_initiated_close, 403, _}}, _} ->
|
|
|
|
|
%% NB: MQTT spec says we should ack normally, ie pretend
|
|
|
|
|
%% there was no auth error, but here we are closing the
|
|
|
|
|
%% connection with an error. This is what happens anyway
|
|
|
|
|
%% if there is an authorization failure at the AMQP 0-9-1
|
|
|
|
|
%% client level. And error was already logged by AMQP
|
|
|
|
|
%% channel, so no need for custom logging.
|
|
|
|
|
{error, access_refused, PState}
|
2016-03-03 21:59:38 +08:00
|
|
|
end.
|
2012-07-13 23:32:13 +08:00
|
|
|
|
2019-01-22 17:30:25 +08:00
|
|
|
add_client_id_to_adapter_info(ClientId, #amqp_adapter_info{additional_info = AdditionalInfo0} = AdapterInfo) ->
|
|
|
|
|
AdditionalInfo1 = [{variable_map, #{<<"client_id">> => ClientId}}
|
|
|
|
|
| AdditionalInfo0],
|
|
|
|
|
ClientProperties = proplists:get_value(client_properties, AdditionalInfo1, [])
|
|
|
|
|
++ [{client_id, longstr, ClientId}],
|
|
|
|
|
AdditionalInfo2 = case lists:keysearch(client_properties, 1, AdditionalInfo1) of
|
|
|
|
|
{value, _} ->
|
|
|
|
|
lists:keyreplace(client_properties,
|
|
|
|
|
1,
|
|
|
|
|
AdditionalInfo1,
|
|
|
|
|
{client_properties, ClientProperties});
|
|
|
|
|
false ->
|
|
|
|
|
[{client_properties, ClientProperties} | AdditionalInfo1]
|
|
|
|
|
end,
|
|
|
|
|
AdapterInfo#amqp_adapter_info{additional_info = AdditionalInfo2}.
|
|
|
|
|
|
2012-07-13 23:32:13 +08:00
|
|
|
process_request(?CONNECT,
|
2012-08-21 00:30:13 +08:00
|
|
|
#mqtt_frame{ variable = #mqtt_frame_connect{
|
2016-01-08 07:25:26 +08:00
|
|
|
username = Username,
|
|
|
|
|
password = Password,
|
|
|
|
|
proto_ver = ProtoVersion,
|
|
|
|
|
clean_sess = CleanSess,
|
|
|
|
|
client_id = ClientId0,
|
|
|
|
|
keep_alive = Keepalive} = Var},
|
2017-06-07 20:41:59 +08:00
|
|
|
PState0 = #proc_state{ ssl_login_name = SSLLoginName,
|
|
|
|
|
send_fun = SendFun,
|
2020-08-28 17:29:12 +08:00
|
|
|
adapter_info = AdapterInfo,
|
|
|
|
|
peer_addr = Addr}) ->
|
2014-03-19 20:05:42 +08:00
|
|
|
ClientId = case ClientId0 of
|
|
|
|
|
[] -> rabbit_mqtt_util:gen_client_id();
|
|
|
|
|
[_|_] -> ClientId0
|
|
|
|
|
end,
|
2019-02-05 07:49:36 +08:00
|
|
|
rabbit_log_connection:debug("Received a CONNECT, client ID: ~p (expanded to ~p), username: ~p, "
|
|
|
|
|
"clean session: ~p, protocol version: ~p, keepalive: ~p",
|
|
|
|
|
[ClientId0, ClientId, Username, CleanSess, ProtoVersion, Keepalive]),
|
2019-01-22 17:30:25 +08:00
|
|
|
AdapterInfo1 = add_client_id_to_adapter_info(rabbit_data_coercion:to_binary(ClientId), AdapterInfo),
|
2019-02-08 03:33:42 +08:00
|
|
|
PState1 = PState0#proc_state{adapter_info = AdapterInfo1},
|
2020-08-28 17:29:12 +08:00
|
|
|
Ip = list_to_binary(inet:ntoa(Addr)),
|
2019-02-08 03:33:42 +08:00
|
|
|
{Return, PState5} =
|
2014-03-19 20:05:42 +08:00
|
|
|
case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)),
|
|
|
|
|
ClientId0 =:= [] andalso CleanSess =:= false} of
|
2012-07-04 00:35:18 +08:00
|
|
|
{false, _} ->
|
2019-02-08 03:33:42 +08:00
|
|
|
{?CONNACK_PROTO_VER, PState1};
|
2014-03-19 20:05:42 +08:00
|
|
|
{_, true} ->
|
2019-02-08 03:33:42 +08:00
|
|
|
{?CONNACK_INVALID_ID, PState1};
|
2012-07-04 00:35:18 +08:00
|
|
|
_ ->
|
2014-11-21 22:54:33 +08:00
|
|
|
case creds(Username, Password, SSLLoginName) of
|
2012-07-16 21:57:31 +08:00
|
|
|
nocreds ->
|
2020-09-22 23:57:47 +08:00
|
|
|
rabbit_core_metrics:auth_attempt_failed(Ip, <<>>, mqtt),
|
2021-03-11 21:45:32 +08:00
|
|
|
rabbit_log_connection:error("MQTT login failed: no credentials provided"),
|
2019-02-08 03:33:42 +08:00
|
|
|
{?CONNACK_CREDENTIALS, PState1};
|
2016-09-02 06:33:34 +08:00
|
|
|
{invalid_creds, {undefined, Pass}} when is_list(Pass) ->
|
2020-09-22 23:57:47 +08:00
|
|
|
rabbit_core_metrics:auth_attempt_failed(Ip, <<>>, mqtt),
|
2019-07-29 21:59:19 +08:00
|
|
|
rabbit_log_connection:error("MQTT login failed: no username is provided"),
|
2019-02-08 03:33:42 +08:00
|
|
|
{?CONNACK_CREDENTIALS, PState1};
|
2016-09-02 06:33:34 +08:00
|
|
|
{invalid_creds, {User, undefined}} when is_list(User) ->
|
2020-09-22 23:57:47 +08:00
|
|
|
rabbit_core_metrics:auth_attempt_failed(Ip, User, mqtt),
|
2019-07-29 21:59:19 +08:00
|
|
|
rabbit_log_connection:error("MQTT login failed for user '~p': no password provided", [User]),
|
2019-02-08 03:33:42 +08:00
|
|
|
{?CONNACK_CREDENTIALS, PState1};
|
2012-09-19 22:34:42 +08:00
|
|
|
{UserBin, PassBin} ->
|
2019-02-08 03:33:42 +08:00
|
|
|
case process_login(UserBin, PassBin, ProtoVersion, PState1) of
|
|
|
|
|
connack_dup_auth ->
|
2021-10-26 22:29:41 +08:00
|
|
|
maybe_clean_sess(PState1);
|
2015-06-30 16:08:26 +08:00
|
|
|
{?CONNACK_ACCEPT, Conn, VHost, AState} ->
|
2019-06-04 18:40:26 +08:00
|
|
|
case rabbit_mqtt_collector:register(ClientId, self()) of
|
2020-02-11 01:05:43 +08:00
|
|
|
{ok, Corr} ->
|
2020-02-24 22:58:03 +08:00
|
|
|
RetainerPid = rabbit_mqtt_retainer_sup:child_for_vhost(VHost),
|
|
|
|
|
link(Conn),
|
2019-06-04 18:40:26 +08:00
|
|
|
{ok, Ch} = amqp_connection:open_channel(Conn),
|
2020-02-24 22:58:03 +08:00
|
|
|
link(Ch),
|
|
|
|
|
amqp_channel:enable_delivery_flow_control(Ch),
|
|
|
|
|
Prefetch = rabbit_mqtt_util:env(prefetch),
|
|
|
|
|
#'basic.qos_ok'{} = amqp_channel:call(Ch,
|
|
|
|
|
#'basic.qos'{prefetch_count = Prefetch}),
|
2019-06-04 18:40:26 +08:00
|
|
|
rabbit_mqtt_reader:start_keepalive(self(), Keepalive),
|
|
|
|
|
PState3 = PState1#proc_state{
|
|
|
|
|
will_msg = make_will_msg(Var),
|
|
|
|
|
clean_sess = CleanSess,
|
|
|
|
|
channels = {Ch, undefined},
|
|
|
|
|
connection = Conn,
|
|
|
|
|
client_id = ClientId,
|
|
|
|
|
retainer_pid = RetainerPid,
|
2020-02-11 01:05:43 +08:00
|
|
|
auth_state = AState,
|
|
|
|
|
register_state = {pending, Corr}},
|
2021-10-26 22:29:41 +08:00
|
|
|
maybe_clean_sess(PState3);
|
2019-06-04 18:40:26 +08:00
|
|
|
%% e.g. this node was removed from the MQTT cluster members
|
|
|
|
|
{error, _} = Err ->
|
|
|
|
|
rabbit_log_connection:error("MQTT cannot accept a connection: "
|
|
|
|
|
"client ID tracker is unavailable: ~p", [Err]),
|
2019-07-29 21:59:19 +08:00
|
|
|
%% ignore all exceptions, we are shutting down
|
|
|
|
|
catch amqp_connection:close(Conn),
|
2019-06-04 18:40:26 +08:00
|
|
|
{?CONNACK_SERVER, PState1};
|
|
|
|
|
{timeout, _} ->
|
|
|
|
|
rabbit_log_connection:error("MQTT cannot accept a connection: "
|
|
|
|
|
"client ID registration timed out"),
|
2019-07-29 21:59:19 +08:00
|
|
|
%% ignore all exceptions, we are shutting down
|
|
|
|
|
catch amqp_connection:close(Conn),
|
2019-06-04 18:40:26 +08:00
|
|
|
{?CONNACK_SERVER, PState1}
|
|
|
|
|
end;
|
2019-07-29 21:59:19 +08:00
|
|
|
ConnAck -> {ConnAck, PState1}
|
2012-07-16 21:57:31 +08:00
|
|
|
end
|
2012-07-04 00:35:18 +08:00
|
|
|
end
|
|
|
|
|
end,
|
2016-04-22 02:02:28 +08:00
|
|
|
{ReturnCode, SessionPresent} = case Return of
|
2019-07-29 21:59:19 +08:00
|
|
|
{?CONNACK_ACCEPT, Bool} -> {?CONNACK_ACCEPT, Bool};
|
|
|
|
|
Other -> {Other, false}
|
2019-02-08 03:33:42 +08:00
|
|
|
end,
|
|
|
|
|
SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{type = ?CONNACK},
|
|
|
|
|
variable = #mqtt_frame_connack{
|
|
|
|
|
session_present = SessionPresent,
|
|
|
|
|
return_code = ReturnCode}},
|
|
|
|
|
PState5),
|
2019-07-29 21:59:19 +08:00
|
|
|
case ReturnCode of
|
|
|
|
|
?CONNACK_ACCEPT -> {ok, PState5};
|
|
|
|
|
?CONNACK_CREDENTIALS -> {error, unauthenticated, PState5};
|
|
|
|
|
?CONNACK_AUTH -> {error, unauthorized, PState5};
|
|
|
|
|
?CONNACK_SERVER -> {error, unavailable, PState5};
|
|
|
|
|
?CONNACK_INVALID_ID -> {error, invalid_client_id, PState5};
|
|
|
|
|
?CONNACK_PROTO_VER -> {error, unsupported_protocol_version, PState5}
|
|
|
|
|
end;
|
2012-07-04 00:35:18 +08:00
|
|
|
|
2012-08-06 06:52:54 +08:00
|
|
|
process_request(?PUBACK,
|
2012-08-21 00:30:13 +08:00
|
|
|
#mqtt_frame{
|
2012-08-06 06:52:54 +08:00
|
|
|
variable = #mqtt_frame_publish{ message_id = MessageId }},
|
2012-08-21 00:30:13 +08:00
|
|
|
#proc_state{ channels = {Channel, _},
|
|
|
|
|
awaiting_ack = Awaiting } = PState) ->
|
2015-04-26 09:51:04 +08:00
|
|
|
%% tag can be missing because of bogus clients and QoS downgrades
|
|
|
|
|
case gb_trees:is_defined(MessageId, Awaiting) of
|
|
|
|
|
false ->
|
|
|
|
|
{ok, PState};
|
|
|
|
|
true ->
|
|
|
|
|
Tag = gb_trees:get(MessageId, Awaiting),
|
|
|
|
|
amqp_channel:cast(Channel, #'basic.ack'{ delivery_tag = Tag }),
|
2017-11-09 09:08:17 +08:00
|
|
|
{ok, PState#proc_state{ awaiting_ack = gb_trees:delete(MessageId, Awaiting) }}
|
2015-04-26 09:51:04 +08:00
|
|
|
end;
|
2012-08-06 06:52:54 +08:00
|
|
|
|
|
|
|
|
process_request(?PUBLISH,
|
2016-05-18 21:11:50 +08:00
|
|
|
Frame = #mqtt_frame{
|
|
|
|
|
fixed = Fixed = #mqtt_frame_fixed{ qos = ?QOS_2 }},
|
2016-01-09 01:25:26 +08:00
|
|
|
PState) ->
|
|
|
|
|
% Downgrade QOS_2 to QOS_1
|
2016-05-18 21:11:50 +08:00
|
|
|
process_request(?PUBLISH,
|
2016-01-09 01:25:26 +08:00
|
|
|
Frame#mqtt_frame{
|
|
|
|
|
fixed = Fixed#mqtt_frame_fixed{ qos = ?QOS_1 }},
|
|
|
|
|
PState);
|
2012-07-04 23:31:30 +08:00
|
|
|
process_request(?PUBLISH,
|
2012-08-21 00:30:13 +08:00
|
|
|
#mqtt_frame{
|
2012-08-06 06:52:54 +08:00
|
|
|
fixed = #mqtt_frame_fixed{ qos = Qos,
|
|
|
|
|
retain = Retain,
|
|
|
|
|
dup = Dup },
|
|
|
|
|
variable = #mqtt_frame_publish{ topic_name = Topic,
|
|
|
|
|
message_id = MessageId },
|
2015-04-18 08:55:34 +08:00
|
|
|
payload = Payload },
|
2019-09-04 23:07:33 +08:00
|
|
|
PState = #proc_state{retainer_pid = RPid,
|
|
|
|
|
amqp2mqtt_fun = Amqp2MqttFun}) ->
|
2017-01-16 16:54:16 +08:00
|
|
|
check_publish(Topic, fun() ->
|
2015-06-30 16:08:26 +08:00
|
|
|
Msg = #mqtt_msg{retain = Retain,
|
|
|
|
|
qos = Qos,
|
|
|
|
|
topic = Topic,
|
|
|
|
|
dup = Dup,
|
|
|
|
|
message_id = MessageId,
|
|
|
|
|
payload = Payload},
|
|
|
|
|
Result = amqp_pub(Msg, PState),
|
|
|
|
|
case Retain of
|
|
|
|
|
false -> ok;
|
2019-09-04 23:07:33 +08:00
|
|
|
true -> hand_off_to_retainer(RPid, Amqp2MqttFun, Topic, Msg)
|
2015-06-30 16:08:26 +08:00
|
|
|
end,
|
|
|
|
|
{ok, Result}
|
|
|
|
|
end, PState);
|
2012-07-04 23:31:30 +08:00
|
|
|
|
|
|
|
|
process_request(?SUBSCRIBE,
|
2012-08-21 00:30:13 +08:00
|
|
|
#mqtt_frame{
|
2015-04-21 19:26:46 +08:00
|
|
|
variable = #mqtt_frame_subscribe{
|
2017-11-09 09:08:17 +08:00
|
|
|
message_id = SubscribeMsgId,
|
2015-04-21 19:26:46 +08:00
|
|
|
topic_table = Topics},
|
|
|
|
|
payload = undefined},
|
|
|
|
|
#proc_state{channels = {Channel, _},
|
|
|
|
|
exchange = Exchange,
|
2016-01-07 23:19:01 +08:00
|
|
|
retainer_pid = RPid,
|
2017-11-09 09:08:17 +08:00
|
|
|
send_fun = SendFun,
|
2019-09-04 23:07:33 +08:00
|
|
|
message_id = StateMsgId,
|
|
|
|
|
mqtt2amqp_fun = Mqtt2AmqpFun} = PState0) ->
|
2019-02-05 07:49:36 +08:00
|
|
|
rabbit_log_connection:debug("Received a SUBSCRIBE for topic(s) ~p", [Topics]),
|
2021-10-26 22:29:41 +08:00
|
|
|
|
|
|
|
|
{QosResponse, PState1} =
|
|
|
|
|
lists:foldl(fun (#mqtt_topic{name = TopicName,
|
|
|
|
|
qos = Qos}, {QosList, PState}) ->
|
|
|
|
|
SupportedQos = supported_subs_qos(Qos),
|
|
|
|
|
{Queue, #proc_state{subscriptions = Subs} = PState1} =
|
|
|
|
|
ensure_queue(SupportedQos, PState),
|
|
|
|
|
RoutingKey = Mqtt2AmqpFun(TopicName),
|
|
|
|
|
Binding = #'queue.bind'{
|
|
|
|
|
queue = Queue,
|
|
|
|
|
exchange = Exchange,
|
|
|
|
|
routing_key = RoutingKey},
|
|
|
|
|
#'queue.bind_ok'{} = amqp_channel:call(Channel, Binding),
|
|
|
|
|
SupportedQosList = case maps:find(TopicName, Subs) of
|
|
|
|
|
{ok, L} -> [SupportedQos|L];
|
|
|
|
|
error -> [SupportedQos]
|
|
|
|
|
end,
|
|
|
|
|
{[SupportedQos | QosList],
|
|
|
|
|
PState1 #proc_state{
|
|
|
|
|
subscriptions =
|
|
|
|
|
maps:put(TopicName, SupportedQosList, Subs)}}
|
|
|
|
|
end, {[], PState0}, Topics),
|
|
|
|
|
SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK},
|
|
|
|
|
variable = #mqtt_frame_suback{
|
|
|
|
|
message_id = SubscribeMsgId,
|
|
|
|
|
qos_table = QosResponse}}, PState1),
|
|
|
|
|
%% we may need to send up to length(Topics) messages.
|
|
|
|
|
%% if QoS is > 0 then we need to generate a message id,
|
|
|
|
|
%% and increment the counter.
|
|
|
|
|
StartMsgId = safe_max_id(SubscribeMsgId, StateMsgId),
|
|
|
|
|
N = lists:foldl(fun (Topic, Acc) ->
|
|
|
|
|
case maybe_send_retained_message(RPid, Topic, Acc, PState1) of
|
|
|
|
|
{true, X} -> Acc + X;
|
|
|
|
|
false -> Acc
|
|
|
|
|
end
|
|
|
|
|
end, StartMsgId, Topics),
|
|
|
|
|
{ok, PState1#proc_state{message_id = N}};
|
2012-07-05 00:47:07 +08:00
|
|
|
|
|
|
|
|
process_request(?UNSUBSCRIBE,
|
2012-08-21 00:30:13 +08:00
|
|
|
#mqtt_frame{
|
|
|
|
|
variable = #mqtt_frame_subscribe{ message_id = MessageId,
|
|
|
|
|
topic_table = Topics },
|
|
|
|
|
payload = undefined }, #proc_state{ channels = {Channel, _},
|
|
|
|
|
exchange = Exchange,
|
|
|
|
|
client_id = ClientId,
|
2016-01-07 23:19:01 +08:00
|
|
|
subscriptions = Subs0,
|
2019-09-04 23:07:33 +08:00
|
|
|
send_fun = SendFun,
|
|
|
|
|
mqtt2amqp_fun = Mqtt2AmqpFun } = PState) ->
|
2019-02-05 07:49:36 +08:00
|
|
|
rabbit_log_connection:debug("Received an UNSUBSCRIBE for topic(s) ~p", [Topics]),
|
2012-08-06 06:52:54 +08:00
|
|
|
Queues = rabbit_mqtt_util:subcription_queue_name(ClientId),
|
|
|
|
|
Subs1 =
|
|
|
|
|
lists:foldl(
|
2012-08-21 00:30:13 +08:00
|
|
|
fun (#mqtt_topic{ name = TopicName }, Subs) ->
|
2017-04-24 20:45:37 +08:00
|
|
|
QosSubs = case maps:find(TopicName, Subs) of
|
2012-08-06 06:52:54 +08:00
|
|
|
{ok, Val} when is_list(Val) -> lists:usort(Val);
|
|
|
|
|
error -> []
|
|
|
|
|
end,
|
2019-09-04 23:07:33 +08:00
|
|
|
RoutingKey = Mqtt2AmqpFun(TopicName),
|
2012-08-06 06:52:54 +08:00
|
|
|
lists:foreach(
|
|
|
|
|
fun (QosSub) ->
|
|
|
|
|
Queue = element(QosSub + 1, Queues),
|
|
|
|
|
Binding = #'queue.unbind'{
|
|
|
|
|
queue = Queue,
|
|
|
|
|
exchange = Exchange,
|
2019-09-04 23:07:33 +08:00
|
|
|
routing_key = RoutingKey},
|
2012-08-06 06:52:54 +08:00
|
|
|
#'queue.unbind_ok'{} = amqp_channel:call(Channel, Binding)
|
|
|
|
|
end, QosSubs),
|
2017-04-24 20:45:37 +08:00
|
|
|
maps:remove(TopicName, Subs)
|
2012-08-06 06:52:54 +08:00
|
|
|
end, Subs0, Topics),
|
2016-01-07 23:19:01 +08:00
|
|
|
SendFun(#mqtt_frame{ fixed = #mqtt_frame_fixed { type = ?UNSUBACK },
|
|
|
|
|
variable = #mqtt_frame_suback{ message_id = MessageId }},
|
2012-08-21 00:30:13 +08:00
|
|
|
PState),
|
|
|
|
|
{ok, PState #proc_state{ subscriptions = Subs1 }};
|
2012-07-03 16:55:31 +08:00
|
|
|
|
2016-01-07 23:19:01 +08:00
|
|
|
process_request(?PINGREQ, #mqtt_frame{}, #proc_state{ send_fun = SendFun } = PState) ->
|
2019-02-05 07:49:36 +08:00
|
|
|
rabbit_log_connection:debug("Received a PINGREQ"),
|
2016-01-07 23:19:01 +08:00
|
|
|
SendFun(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }},
|
2012-08-21 00:30:13 +08:00
|
|
|
PState),
|
2019-02-05 07:49:36 +08:00
|
|
|
rabbit_log_connection:debug("Sent a PINGRESP"),
|
2012-08-21 00:30:13 +08:00
|
|
|
{ok, PState};
|
2012-07-04 23:31:30 +08:00
|
|
|
|
2012-08-21 00:30:13 +08:00
|
|
|
process_request(?DISCONNECT, #mqtt_frame{}, PState) ->
|
2019-02-05 07:49:36 +08:00
|
|
|
rabbit_log_connection:debug("Received a DISCONNECT"),
|
2012-08-21 00:30:13 +08:00
|
|
|
{stop, PState}.
|
2012-07-04 00:35:18 +08:00
|
|
|
|
2019-09-04 23:07:33 +08:00
|
|
|
hand_off_to_retainer(RetainerPid, Amqp2MqttFun, Topic0, #mqtt_msg{payload = <<"">>}) ->
|
|
|
|
|
Topic1 = Amqp2MqttFun(Topic0),
|
|
|
|
|
rabbit_mqtt_retainer:clear(RetainerPid, Topic1),
|
|
|
|
|
ok;
|
|
|
|
|
hand_off_to_retainer(RetainerPid, Amqp2MqttFun, Topic0, Msg) ->
|
|
|
|
|
Topic1 = Amqp2MqttFun(Topic0),
|
|
|
|
|
rabbit_mqtt_retainer:retain(RetainerPid, Topic1, Msg),
|
|
|
|
|
ok.
|
|
|
|
|
|
|
|
|
|
maybe_send_retained_message(RPid, #mqtt_topic{name = Topic0, qos = SubscribeQos}, MsgId,
|
|
|
|
|
#proc_state{ send_fun = SendFun,
|
|
|
|
|
amqp2mqtt_fun = Amqp2MqttFun } = PState) ->
|
|
|
|
|
Topic1 = Amqp2MqttFun(Topic0),
|
|
|
|
|
case rabbit_mqtt_retainer:fetch(RPid, Topic1) of
|
|
|
|
|
undefined -> false;
|
|
|
|
|
Msg ->
|
|
|
|
|
%% calculate effective QoS as the lower value of SUBSCRIBE frame QoS
|
|
|
|
|
%% and retained message QoS. The spec isn't super clear on this, we
|
|
|
|
|
%% do what Mosquitto does, per user feedback.
|
|
|
|
|
Qos = erlang:min(SubscribeQos, Msg#mqtt_msg.qos),
|
|
|
|
|
Id = case Qos of
|
|
|
|
|
?QOS_0 -> undefined;
|
|
|
|
|
?QOS_1 -> MsgId
|
|
|
|
|
end,
|
|
|
|
|
SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{
|
|
|
|
|
type = ?PUBLISH,
|
|
|
|
|
qos = Qos,
|
|
|
|
|
dup = false,
|
|
|
|
|
retain = Msg#mqtt_msg.retain
|
|
|
|
|
}, variable = #mqtt_frame_publish{
|
|
|
|
|
message_id = Id,
|
|
|
|
|
topic_name = Topic1
|
|
|
|
|
},
|
|
|
|
|
payload = Msg#mqtt_msg.payload}, PState),
|
|
|
|
|
case Qos of
|
|
|
|
|
?QOS_0 -> false;
|
|
|
|
|
?QOS_1 -> {true, 1}
|
|
|
|
|
end
|
|
|
|
|
end.
|
2015-04-21 19:26:46 +08:00
|
|
|
|
2012-08-06 06:52:54 +08:00
|
|
|
amqp_callback({#'basic.deliver'{ consumer_tag = ConsumerTag,
|
|
|
|
|
delivery_tag = DeliveryTag,
|
|
|
|
|
routing_key = RoutingKey },
|
|
|
|
|
#amqp_msg{ props = #'P_basic'{ headers = Headers },
|
2014-08-11 18:26:27 +08:00
|
|
|
payload = Payload },
|
|
|
|
|
DeliveryCtx} = Delivery,
|
2012-08-21 00:30:13 +08:00
|
|
|
#proc_state{ channels = {Channel, _},
|
|
|
|
|
awaiting_ack = Awaiting,
|
2016-01-07 23:19:01 +08:00
|
|
|
message_id = MsgId,
|
2019-09-04 23:07:33 +08:00
|
|
|
send_fun = SendFun,
|
|
|
|
|
amqp2mqtt_fun = Amqp2MqttFun } = PState) ->
|
2014-08-11 18:26:27 +08:00
|
|
|
amqp_channel:notify_received(DeliveryCtx),
|
2012-08-21 00:30:13 +08:00
|
|
|
case {delivery_dup(Delivery), delivery_qos(ConsumerTag, Headers, PState)} of
|
2012-08-17 22:05:14 +08:00
|
|
|
{true, {?QOS_0, ?QOS_1}} ->
|
2012-08-17 01:09:21 +08:00
|
|
|
amqp_channel:cast(
|
|
|
|
|
Channel, #'basic.ack'{ delivery_tag = DeliveryTag }),
|
2012-08-21 00:30:13 +08:00
|
|
|
{ok, PState};
|
2012-08-17 22:05:14 +08:00
|
|
|
{true, {?QOS_0, ?QOS_0}} ->
|
2012-08-21 00:30:13 +08:00
|
|
|
{ok, PState};
|
2012-08-06 06:52:54 +08:00
|
|
|
{Dup, {DeliveryQos, _SubQos} = Qos} ->
|
2019-09-04 23:07:33 +08:00
|
|
|
TopicName = Amqp2MqttFun(RoutingKey),
|
2016-01-07 23:19:01 +08:00
|
|
|
SendFun(
|
2012-08-06 06:52:54 +08:00
|
|
|
#mqtt_frame{ fixed = #mqtt_frame_fixed{
|
|
|
|
|
type = ?PUBLISH,
|
|
|
|
|
qos = DeliveryQos,
|
|
|
|
|
dup = Dup },
|
2012-08-21 00:30:13 +08:00
|
|
|
variable = #mqtt_frame_publish{
|
2012-08-06 06:52:54 +08:00
|
|
|
message_id =
|
|
|
|
|
case DeliveryQos of
|
|
|
|
|
?QOS_0 -> undefined;
|
|
|
|
|
?QOS_1 -> MsgId
|
|
|
|
|
end,
|
2019-09-04 23:07:33 +08:00
|
|
|
topic_name = TopicName },
|
2012-08-21 00:30:13 +08:00
|
|
|
payload = Payload}, PState),
|
2012-08-06 06:52:54 +08:00
|
|
|
case Qos of
|
|
|
|
|
{?QOS_0, ?QOS_0} ->
|
2012-08-21 00:30:13 +08:00
|
|
|
{ok, PState};
|
2012-08-06 06:52:54 +08:00
|
|
|
{?QOS_1, ?QOS_1} ->
|
2017-11-09 09:08:17 +08:00
|
|
|
Awaiting1 = gb_trees:insert(MsgId, DeliveryTag, Awaiting),
|
|
|
|
|
PState1 = PState#proc_state{ awaiting_ack = Awaiting1 },
|
|
|
|
|
PState2 = next_msg_id(PState1),
|
|
|
|
|
{ok, PState2};
|
2012-08-06 06:52:54 +08:00
|
|
|
{?QOS_0, ?QOS_1} ->
|
|
|
|
|
amqp_channel:cast(
|
2012-08-17 01:09:21 +08:00
|
|
|
Channel, #'basic.ack'{ delivery_tag = DeliveryTag }),
|
2012-08-21 00:30:13 +08:00
|
|
|
{ok, PState}
|
2012-08-06 06:52:54 +08:00
|
|
|
end
|
|
|
|
|
end;
|
|
|
|
|
|
|
|
|
|
amqp_callback(#'basic.ack'{ multiple = true, delivery_tag = Tag } = Ack,
|
2016-01-07 23:19:01 +08:00
|
|
|
PState = #proc_state{ unacked_pubs = UnackedPubs,
|
|
|
|
|
send_fun = SendFun }) ->
|
2013-12-04 22:59:22 +08:00
|
|
|
case gb_trees:size(UnackedPubs) > 0 andalso
|
|
|
|
|
gb_trees:take_smallest(UnackedPubs) of
|
2012-08-06 06:52:54 +08:00
|
|
|
{TagSmall, MsgId, UnackedPubs1} when TagSmall =< Tag ->
|
2016-01-07 23:19:01 +08:00
|
|
|
SendFun(
|
2012-08-06 06:52:54 +08:00
|
|
|
#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK },
|
|
|
|
|
variable = #mqtt_frame_publish{ message_id = MsgId }},
|
2012-08-21 00:30:13 +08:00
|
|
|
PState),
|
|
|
|
|
amqp_callback(Ack, PState #proc_state{ unacked_pubs = UnackedPubs1 });
|
2012-08-06 06:52:54 +08:00
|
|
|
_ ->
|
2012-08-21 00:30:13 +08:00
|
|
|
{ok, PState}
|
2012-08-06 06:52:54 +08:00
|
|
|
end;
|
|
|
|
|
|
|
|
|
|
amqp_callback(#'basic.ack'{ multiple = false, delivery_tag = Tag },
|
2016-01-07 23:19:01 +08:00
|
|
|
PState = #proc_state{ unacked_pubs = UnackedPubs,
|
|
|
|
|
send_fun = SendFun }) ->
|
|
|
|
|
SendFun(
|
2012-08-06 06:52:54 +08:00
|
|
|
#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK },
|
|
|
|
|
variable = #mqtt_frame_publish{
|
|
|
|
|
message_id = gb_trees:get(
|
2012-08-21 00:30:13 +08:00
|
|
|
Tag, UnackedPubs) }}, PState),
|
|
|
|
|
{ok, PState #proc_state{ unacked_pubs = gb_trees:delete(Tag, UnackedPubs) }}.
|
2012-08-06 06:52:54 +08:00
|
|
|
|
|
|
|
|
delivery_dup({#'basic.deliver'{ redelivered = Redelivered },
|
2014-08-11 18:26:27 +08:00
|
|
|
#amqp_msg{ props = #'P_basic'{ headers = Headers }},
|
|
|
|
|
_DeliveryCtx}) ->
|
2012-11-06 18:32:38 +08:00
|
|
|
case rabbit_mqtt_util:table_lookup(Headers, <<"x-mqtt-dup">>) of
|
2012-08-06 06:52:54 +08:00
|
|
|
undefined -> Redelivered;
|
|
|
|
|
{bool, Dup} -> Redelivered orelse Dup
|
|
|
|
|
end.
|
|
|
|
|
|
2017-11-09 09:08:17 +08:00
|
|
|
ensure_valid_mqtt_message_id(Id) when Id >= 16#ffff ->
|
|
|
|
|
1;
|
|
|
|
|
ensure_valid_mqtt_message_id(Id) ->
|
|
|
|
|
Id.
|
|
|
|
|
|
|
|
|
|
safe_max_id(Id0, Id1) ->
|
|
|
|
|
ensure_valid_mqtt_message_id(erlang:max(Id0, Id1)).
|
|
|
|
|
|
|
|
|
|
next_msg_id(PState = #proc_state{ message_id = MsgId0 }) ->
|
|
|
|
|
MsgId1 = ensure_valid_mqtt_message_id(MsgId0 + 1),
|
|
|
|
|
PState#proc_state{ message_id = MsgId1 }.
|
2012-08-06 06:52:54 +08:00
|
|
|
|
|
|
|
|
%% decide at which qos level to deliver based on subscription
|
2012-11-06 04:30:45 +08:00
|
|
|
%% and the message publish qos level. non-MQTT publishes are
|
|
|
|
|
%% assumed to be qos 1, regardless of delivery_mode.
|
|
|
|
|
delivery_qos(Tag, _Headers, #proc_state{ consumer_tags = {Tag, _} }) ->
|
2012-08-06 06:52:54 +08:00
|
|
|
{?QOS_0, ?QOS_0};
|
2012-11-06 04:30:45 +08:00
|
|
|
delivery_qos(Tag, Headers, #proc_state{ consumer_tags = {_, Tag} }) ->
|
2012-11-06 18:32:38 +08:00
|
|
|
case rabbit_mqtt_util:table_lookup(Headers, <<"x-mqtt-publish-qos">>) of
|
2012-11-06 04:30:45 +08:00
|
|
|
{byte, Qos} -> {lists:min([Qos, ?QOS_1]), ?QOS_1};
|
|
|
|
|
undefined -> {?QOS_1, ?QOS_1}
|
|
|
|
|
end.
|
2012-08-06 06:52:54 +08:00
|
|
|
|
2016-04-22 02:02:28 +08:00
|
|
|
maybe_clean_sess(PState = #proc_state { clean_sess = false,
|
2018-08-21 18:51:05 +08:00
|
|
|
connection = Conn,
|
2021-10-26 22:29:41 +08:00
|
|
|
auth_state = #auth_state{vhost = VHost},
|
2016-04-22 02:02:28 +08:00
|
|
|
client_id = ClientId }) ->
|
2021-10-26 22:29:41 +08:00
|
|
|
SessionPresent = session_present(VHost, ClientId),
|
|
|
|
|
case SessionPresent of
|
|
|
|
|
false ->
|
|
|
|
|
%% ensure_queue/2 not only ensures that queue is created, but also starts consuming from it.
|
|
|
|
|
%% Let's avoid creating that queue until explicitly asked by a client.
|
|
|
|
|
%% Then publish-only clients, that connect with clean_sess=true due to some misconfiguration,
|
|
|
|
|
%% will consume less resources.
|
|
|
|
|
{{?CONNACK_ACCEPT, SessionPresent}, PState};
|
|
|
|
|
true ->
|
|
|
|
|
try ensure_queue(?QOS_1, PState) of
|
|
|
|
|
{_Queue, PState1} -> {{?CONNACK_ACCEPT, SessionPresent}, PState1}
|
|
|
|
|
catch
|
|
|
|
|
exit:({{shutdown, {server_initiated_close, 403, _}}, _}) ->
|
|
|
|
|
%% Connection is not yet propagated to #proc_state{}, let's close it here
|
|
|
|
|
catch amqp_connection:close(Conn),
|
|
|
|
|
rabbit_log_connection:error("MQTT cannot recover a session, user is missing permissions"),
|
|
|
|
|
{?CONNACK_SERVER, PState};
|
|
|
|
|
C:E:S ->
|
|
|
|
|
%% Connection is not yet propagated to
|
|
|
|
|
%% #proc_state{}, let's close it here.
|
|
|
|
|
%% This is an exceptional situation anyway, but
|
|
|
|
|
%% doing this will prevent second crash from
|
|
|
|
|
%% amqp client being logged.
|
|
|
|
|
catch amqp_connection:close(Conn),
|
|
|
|
|
erlang:raise(C, E, S)
|
|
|
|
|
end
|
|
|
|
|
end;
|
2013-07-27 00:43:25 +08:00
|
|
|
maybe_clean_sess(PState = #proc_state { clean_sess = true,
|
|
|
|
|
connection = Conn,
|
2021-10-26 22:29:41 +08:00
|
|
|
auth_state = #auth_state{vhost = VHost},
|
2013-07-27 00:43:25 +08:00
|
|
|
client_id = ClientId }) ->
|
2012-08-06 06:52:54 +08:00
|
|
|
{_, Queue} = rabbit_mqtt_util:subcription_queue_name(ClientId),
|
2012-07-16 21:57:31 +08:00
|
|
|
{ok, Channel} = amqp_connection:open_channel(Conn),
|
2021-10-26 22:29:41 +08:00
|
|
|
case session_present(VHost, ClientId) of
|
|
|
|
|
false ->
|
|
|
|
|
{{?CONNACK_ACCEPT, false}, PState};
|
|
|
|
|
true ->
|
|
|
|
|
try amqp_channel:call(Channel, #'queue.delete'{ queue = Queue }) of
|
|
|
|
|
#'queue.delete_ok'{} -> {{?CONNACK_ACCEPT, false}, PState}
|
|
|
|
|
catch
|
|
|
|
|
exit:({{shutdown, {server_initiated_close, 403, _}}, _}) ->
|
|
|
|
|
%% Connection is not yet propagated to #proc_state{}, let's close it here
|
|
|
|
|
catch amqp_connection:close(Conn),
|
|
|
|
|
rabbit_log_connection:error("MQTT cannot start a clean session: "
|
|
|
|
|
"`configure` permission missing for queue `~p`", [Queue]),
|
|
|
|
|
{?CONNACK_SERVER, PState}
|
|
|
|
|
after
|
|
|
|
|
catch amqp_channel:close(Channel)
|
|
|
|
|
end
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
session_present(VHost, ClientId) ->
|
2016-04-22 02:02:28 +08:00
|
|
|
{_, QueueQ1} = rabbit_mqtt_util:subcription_queue_name(ClientId),
|
2021-10-26 22:29:41 +08:00
|
|
|
QueueName = rabbit_misc:r(VHost, queue, QueueQ1),
|
|
|
|
|
case rabbit_amqqueue:lookup(QueueName) of
|
|
|
|
|
{ok, _} -> true;
|
|
|
|
|
{error, not_found} -> false
|
2016-04-22 02:02:28 +08:00
|
|
|
end.
|
2012-07-16 21:57:31 +08:00
|
|
|
|
2012-08-21 00:30:13 +08:00
|
|
|
make_will_msg(#mqtt_frame_connect{ will_flag = false }) ->
|
2012-08-06 06:52:54 +08:00
|
|
|
undefined;
|
|
|
|
|
make_will_msg(#mqtt_frame_connect{ will_retain = Retain,
|
|
|
|
|
will_qos = Qos,
|
|
|
|
|
will_topic = Topic,
|
|
|
|
|
will_msg = Msg }) ->
|
|
|
|
|
#mqtt_msg{ retain = Retain,
|
|
|
|
|
qos = Qos,
|
|
|
|
|
topic = Topic,
|
|
|
|
|
dup = false,
|
|
|
|
|
payload = Msg }.
|
2012-07-04 00:35:18 +08:00
|
|
|
|
2019-02-08 03:33:42 +08:00
|
|
|
process_login(_UserBin, _PassBin, _ProtoVersion,
|
|
|
|
|
#proc_state{channels = {Channel, _},
|
2020-08-28 17:29:12 +08:00
|
|
|
peer_addr = Addr,
|
2019-02-08 03:33:42 +08:00
|
|
|
auth_state = #auth_state{username = Username,
|
|
|
|
|
vhost = VHost}}) when is_pid(Channel) ->
|
|
|
|
|
UsernameStr = rabbit_data_coercion:to_list(Username),
|
|
|
|
|
VHostStr = rabbit_data_coercion:to_list(VHost),
|
2020-09-22 23:57:47 +08:00
|
|
|
rabbit_core_metrics:auth_attempt_failed(list_to_binary(inet:ntoa(Addr)), Username, mqtt),
|
2019-02-08 06:58:27 +08:00
|
|
|
rabbit_log_connection:warning("MQTT detected duplicate connect/login attempt for user ~p, vhost ~p",
|
2019-02-08 03:33:42 +08:00
|
|
|
[UsernameStr, VHostStr]),
|
|
|
|
|
connack_dup_auth;
|
2014-03-19 20:05:42 +08:00
|
|
|
process_login(UserBin, PassBin, ProtoVersion,
|
2019-02-08 03:33:42 +08:00
|
|
|
#proc_state{channels = {undefined, undefined},
|
|
|
|
|
socket = Sock,
|
|
|
|
|
adapter_info = AdapterInfo,
|
|
|
|
|
ssl_login_name = SslLoginName,
|
|
|
|
|
peer_addr = Addr}) ->
|
2016-12-19 22:00:43 +08:00
|
|
|
{ok, {_, _, _, ToPort}} = rabbit_net:socket_ends(Sock, inbound),
|
2016-12-20 03:51:05 +08:00
|
|
|
{VHostPickedUsing, {VHost, UsernameBin}} = get_vhost(UserBin, SslLoginName, ToPort),
|
2021-07-12 22:50:25 +08:00
|
|
|
rabbit_log_connection:debug(
|
2021-03-11 21:45:32 +08:00
|
|
|
"MQTT vhost picked using ~s",
|
2016-12-20 05:42:06 +08:00
|
|
|
[human_readable_vhost_lookup_strategy(VHostPickedUsing)]),
|
2020-09-22 23:57:47 +08:00
|
|
|
RemoteAddress = list_to_binary(inet:ntoa(Addr)),
|
2016-12-06 23:09:50 +08:00
|
|
|
case rabbit_vhost:exists(VHost) of
|
|
|
|
|
true ->
|
|
|
|
|
case amqp_connection:start(#amqp_params_direct{
|
|
|
|
|
username = UsernameBin,
|
|
|
|
|
password = PassBin,
|
|
|
|
|
virtual_host = VHost,
|
|
|
|
|
adapter_info = set_proto_version(AdapterInfo, ProtoVersion)}) of
|
|
|
|
|
{ok, Connection} ->
|
2018-12-04 21:50:32 +08:00
|
|
|
case rabbit_access_control:check_user_loopback(UsernameBin, Addr) of
|
2016-12-06 23:09:50 +08:00
|
|
|
ok ->
|
2020-09-22 23:57:47 +08:00
|
|
|
rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, UsernameBin,
|
|
|
|
|
mqtt),
|
2016-12-06 23:09:50 +08:00
|
|
|
[{internal_user, InternalUser}] = amqp_connection:info(
|
|
|
|
|
Connection, [internal_user]),
|
|
|
|
|
{?CONNACK_ACCEPT, Connection, VHost,
|
|
|
|
|
#auth_state{user = InternalUser,
|
|
|
|
|
username = UsernameBin,
|
|
|
|
|
vhost = VHost}};
|
|
|
|
|
not_allowed ->
|
2020-09-22 23:57:47 +08:00
|
|
|
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, UsernameBin,
|
|
|
|
|
mqtt),
|
2016-12-06 23:09:50 +08:00
|
|
|
amqp_connection:close(Connection),
|
2017-08-07 21:43:00 +08:00
|
|
|
rabbit_log_connection:warning(
|
2021-03-11 21:45:32 +08:00
|
|
|
"MQTT login failed for user ~s: "
|
|
|
|
|
"this user's access is restricted to localhost",
|
2016-12-06 23:09:50 +08:00
|
|
|
[binary_to_list(UsernameBin)]),
|
|
|
|
|
?CONNACK_AUTH
|
|
|
|
|
end;
|
|
|
|
|
{error, {auth_failure, Explanation}} ->
|
2020-09-22 23:57:47 +08:00
|
|
|
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, UsernameBin, mqtt),
|
2021-03-11 21:45:32 +08:00
|
|
|
rabbit_log_connection:error("MQTT login failed for user '~s', authentication failed: ~s",
|
2016-12-06 23:09:50 +08:00
|
|
|
[binary_to_list(UserBin), Explanation]),
|
|
|
|
|
?CONNACK_CREDENTIALS;
|
|
|
|
|
{error, access_refused} ->
|
2020-09-22 23:57:47 +08:00
|
|
|
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, UsernameBin, mqtt),
|
2021-03-11 21:45:32 +08:00
|
|
|
rabbit_log_connection:warning("MQTT login failed for user '~s': "
|
|
|
|
|
"virtual host access not allowed",
|
2016-12-09 23:48:47 +08:00
|
|
|
[binary_to_list(UserBin)]),
|
|
|
|
|
?CONNACK_AUTH;
|
|
|
|
|
{error, not_allowed} ->
|
2020-09-22 23:57:47 +08:00
|
|
|
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, UsernameBin, mqtt),
|
2016-12-09 23:48:47 +08:00
|
|
|
%% when vhost allowed for TLS connection
|
2021-03-11 21:45:32 +08:00
|
|
|
rabbit_log_connection:warning("MQTT login failed for user '~s': "
|
|
|
|
|
"virtual host access not allowed",
|
2016-12-06 23:09:50 +08:00
|
|
|
[binary_to_list(UserBin)]),
|
2016-05-20 21:04:10 +08:00
|
|
|
?CONNACK_AUTH
|
2014-02-19 01:33:33 +08:00
|
|
|
end;
|
2016-12-06 23:09:50 +08:00
|
|
|
false ->
|
2020-09-22 23:57:47 +08:00
|
|
|
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, UsernameBin, mqtt),
|
2021-03-11 21:45:32 +08:00
|
|
|
rabbit_log_connection:error("MQTT login failed for user '~s': virtual host '~s' does not exist",
|
|
|
|
|
[UserBin, VHost]),
|
2016-12-06 23:09:50 +08:00
|
|
|
?CONNACK_CREDENTIALS
|
2014-02-19 01:33:00 +08:00
|
|
|
end.
|
2012-07-16 21:57:31 +08:00
|
|
|
|
2016-12-19 22:00:43 +08:00
|
|
|
get_vhost(UserBin, none, Port) ->
|
|
|
|
|
get_vhost_no_ssl(UserBin, Port);
|
|
|
|
|
get_vhost(UserBin, undefined, Port) ->
|
|
|
|
|
get_vhost_no_ssl(UserBin, Port);
|
|
|
|
|
get_vhost(UserBin, SslLogin, Port) ->
|
|
|
|
|
get_vhost_ssl(UserBin, SslLogin, Port).
|
|
|
|
|
|
|
|
|
|
get_vhost_no_ssl(UserBin, Port) ->
|
|
|
|
|
case vhost_in_username(UserBin) of
|
|
|
|
|
true ->
|
|
|
|
|
{vhost_in_username_or_default, get_vhost_username(UserBin)};
|
|
|
|
|
false ->
|
|
|
|
|
PortVirtualHostMapping = rabbit_runtime_parameters:value_global(
|
|
|
|
|
mqtt_port_to_vhost_mapping
|
2016-12-13 21:46:15 +08:00
|
|
|
),
|
2016-12-19 22:00:43 +08:00
|
|
|
case get_vhost_from_port_mapping(Port, PortVirtualHostMapping) of
|
2016-12-13 21:46:15 +08:00
|
|
|
undefined ->
|
2016-12-19 22:00:43 +08:00
|
|
|
{default_vhost, {rabbit_mqtt_util:env(vhost), UserBin}};
|
2016-12-09 23:48:47 +08:00
|
|
|
VHost ->
|
2016-12-19 22:00:43 +08:00
|
|
|
{port_to_vhost_mapping, {VHost, UserBin}}
|
|
|
|
|
end
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
get_vhost_ssl(UserBin, SslLoginName, Port) ->
|
|
|
|
|
UserVirtualHostMapping = rabbit_runtime_parameters:value_global(
|
|
|
|
|
mqtt_default_vhosts
|
|
|
|
|
),
|
|
|
|
|
case get_vhost_from_user_mapping(SslLoginName, UserVirtualHostMapping) of
|
|
|
|
|
undefined ->
|
|
|
|
|
PortVirtualHostMapping = rabbit_runtime_parameters:value_global(
|
|
|
|
|
mqtt_port_to_vhost_mapping
|
|
|
|
|
),
|
|
|
|
|
case get_vhost_from_port_mapping(Port, PortVirtualHostMapping) of
|
|
|
|
|
undefined ->
|
|
|
|
|
{vhost_in_username_or_default, get_vhost_username(UserBin)};
|
|
|
|
|
VHostFromPortMapping ->
|
|
|
|
|
{port_to_vhost_mapping, {VHostFromPortMapping, UserBin}}
|
|
|
|
|
end;
|
|
|
|
|
VHostFromCertMapping ->
|
|
|
|
|
{cert_to_vhost_mapping, {VHostFromCertMapping, UserBin}}
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
vhost_in_username(UserBin) ->
|
|
|
|
|
case application:get_env(?APP, ignore_colons_in_username) of
|
|
|
|
|
{ok, true} -> false;
|
|
|
|
|
_ ->
|
|
|
|
|
%% split at the last colon, disallowing colons in username
|
|
|
|
|
case re:split(UserBin, ":(?!.*?:)") of
|
|
|
|
|
[_, _] -> true;
|
|
|
|
|
[UserBin] -> false
|
2016-12-09 23:48:47 +08:00
|
|
|
end
|
|
|
|
|
end.
|
|
|
|
|
|
2013-11-18 20:00:47 +08:00
|
|
|
get_vhost_username(UserBin) ->
|
2015-11-19 01:38:35 +08:00
|
|
|
Default = {rabbit_mqtt_util:env(vhost), UserBin},
|
|
|
|
|
case application:get_env(?APP, ignore_colons_in_username) of
|
|
|
|
|
{ok, true} -> Default;
|
|
|
|
|
_ ->
|
|
|
|
|
%% split at the last colon, disallowing colons in username
|
|
|
|
|
case re:split(UserBin, ":(?!.*?:)") of
|
|
|
|
|
[Vhost, UserName] -> {Vhost, UserName};
|
|
|
|
|
[UserBin] -> Default
|
|
|
|
|
end
|
2013-11-18 20:00:47 +08:00
|
|
|
end.
|
2012-07-16 21:57:31 +08:00
|
|
|
|
2016-12-19 22:00:43 +08:00
|
|
|
get_vhost_from_user_mapping(_User, not_found) ->
|
|
|
|
|
undefined;
|
|
|
|
|
get_vhost_from_user_mapping(User, Mapping) ->
|
2018-01-03 03:29:24 +08:00
|
|
|
M = rabbit_data_coercion:to_proplist(Mapping),
|
|
|
|
|
case rabbit_misc:pget(User, M) of
|
2016-12-19 22:00:43 +08:00
|
|
|
undefined ->
|
2016-12-13 21:46:15 +08:00
|
|
|
undefined;
|
2016-12-19 22:00:43 +08:00
|
|
|
VHost ->
|
|
|
|
|
VHost
|
2016-12-13 21:46:15 +08:00
|
|
|
end.
|
|
|
|
|
|
2016-12-19 22:00:43 +08:00
|
|
|
get_vhost_from_port_mapping(_Port, not_found) ->
|
|
|
|
|
undefined;
|
|
|
|
|
get_vhost_from_port_mapping(Port, Mapping) ->
|
2018-01-03 03:29:24 +08:00
|
|
|
M = rabbit_data_coercion:to_proplist(Mapping),
|
|
|
|
|
Res = case rabbit_misc:pget(rabbit_data_coercion:to_binary(Port), M) of
|
2016-12-19 22:00:43 +08:00
|
|
|
undefined ->
|
|
|
|
|
undefined;
|
|
|
|
|
VHost ->
|
|
|
|
|
VHost
|
|
|
|
|
end,
|
|
|
|
|
Res.
|
2016-12-13 21:46:15 +08:00
|
|
|
|
2016-12-20 05:42:06 +08:00
|
|
|
human_readable_vhost_lookup_strategy(vhost_in_username_or_default) ->
|
|
|
|
|
"vhost in username or default";
|
|
|
|
|
human_readable_vhost_lookup_strategy(port_to_vhost_mapping) ->
|
|
|
|
|
"MQTT port to vhost mapping";
|
|
|
|
|
human_readable_vhost_lookup_strategy(cert_to_vhost_mapping) ->
|
|
|
|
|
"client certificate to vhost mapping";
|
|
|
|
|
human_readable_vhost_lookup_strategy(default_vhost) ->
|
|
|
|
|
"plugin configuration or default";
|
|
|
|
|
human_readable_vhost_lookup_strategy(Val) ->
|
|
|
|
|
atom_to_list(Val).
|
2016-12-13 21:46:15 +08:00
|
|
|
|
2014-11-21 22:54:33 +08:00
|
|
|
creds(User, Pass, SSLLoginName) ->
|
2014-11-28 23:08:42 +08:00
|
|
|
DefaultUser = rabbit_mqtt_util:env(default_user),
|
|
|
|
|
DefaultPass = rabbit_mqtt_util:env(default_pass),
|
|
|
|
|
{ok, Anon} = application:get_env(?APP, allow_anonymous),
|
|
|
|
|
{ok, TLSAuth} = application:get_env(?APP, ssl_cert_login),
|
2016-09-01 23:54:51 +08:00
|
|
|
HaveDefaultCreds = Anon =:= true andalso
|
|
|
|
|
is_binary(DefaultUser) andalso
|
|
|
|
|
is_binary(DefaultPass),
|
|
|
|
|
|
|
|
|
|
CredentialsProvided = User =/= undefined orelse
|
|
|
|
|
Pass =/= undefined,
|
|
|
|
|
|
|
|
|
|
CorrectCredentials = is_list(User) andalso
|
|
|
|
|
is_list(Pass),
|
|
|
|
|
|
|
|
|
|
SSLLoginProvided = TLSAuth =:= true andalso
|
|
|
|
|
SSLLoginName =/= none,
|
|
|
|
|
|
|
|
|
|
case {CredentialsProvided, CorrectCredentials, SSLLoginProvided, HaveDefaultCreds} of
|
2016-09-02 06:33:34 +08:00
|
|
|
%% Username and password take priority
|
2016-09-01 23:54:51 +08:00
|
|
|
{true, true, _, _} -> {list_to_binary(User),
|
|
|
|
|
list_to_binary(Pass)};
|
|
|
|
|
%% Either username or password is provided
|
2016-09-02 06:33:34 +08:00
|
|
|
{true, false, _, _} -> {invalid_creds, {User, Pass}};
|
2016-09-01 23:54:51 +08:00
|
|
|
%% rabbitmq_mqtt.ssl_cert_login is true. SSL user name provided.
|
2016-09-02 06:33:34 +08:00
|
|
|
%% Authenticating using username only.
|
2016-09-01 23:54:51 +08:00
|
|
|
{false, false, true, _} -> {SSLLoginName, none};
|
2016-09-02 06:33:34 +08:00
|
|
|
%% Anonymous connection uses default credentials
|
2016-09-01 23:54:51 +08:00
|
|
|
{false, false, false, true} -> {DefaultUser, DefaultPass};
|
|
|
|
|
_ -> nocreds
|
2012-07-16 21:57:31 +08:00
|
|
|
end.
|
2012-07-04 00:35:18 +08:00
|
|
|
|
2012-08-06 06:52:54 +08:00
|
|
|
supported_subs_qos(?QOS_0) -> ?QOS_0;
|
|
|
|
|
supported_subs_qos(?QOS_1) -> ?QOS_1;
|
|
|
|
|
supported_subs_qos(?QOS_2) -> ?QOS_1.
|
|
|
|
|
|
2014-01-30 23:33:59 +08:00
|
|
|
delivery_mode(?QOS_0) -> 1;
|
2019-11-06 00:54:20 +08:00
|
|
|
delivery_mode(?QOS_1) -> 2;
|
|
|
|
|
delivery_mode(?QOS_2) -> 2.
|
2014-01-30 23:33:59 +08:00
|
|
|
|
2012-08-06 06:52:54 +08:00
|
|
|
%% different qos subscriptions are received in different queues
|
|
|
|
|
%% with appropriate durability and timeout arguments
|
|
|
|
|
%% this will lead to duplicate messages for overlapping subscriptions
|
|
|
|
|
%% with different qos values - todo: prevent duplicates
|
2012-08-21 00:30:13 +08:00
|
|
|
ensure_queue(Qos, #proc_state{ channels = {Channel, _},
|
|
|
|
|
client_id = ClientId,
|
|
|
|
|
clean_sess = CleanSess,
|
|
|
|
|
consumer_tags = {TagQ0, TagQ1} = Tags} = PState) ->
|
2012-08-06 06:52:54 +08:00
|
|
|
{QueueQ0, QueueQ1} = rabbit_mqtt_util:subcription_queue_name(ClientId),
|
|
|
|
|
Qos1Args = case {rabbit_mqtt_util:env(subscription_ttl), CleanSess} of
|
2012-11-06 18:32:38 +08:00
|
|
|
{undefined, _} ->
|
|
|
|
|
[];
|
|
|
|
|
{Ms, false} when is_integer(Ms) ->
|
|
|
|
|
[{<<"x-expires">>, long, Ms}];
|
|
|
|
|
_ ->
|
|
|
|
|
[]
|
2012-08-06 06:52:54 +08:00
|
|
|
end,
|
|
|
|
|
QueueSetup =
|
|
|
|
|
case {TagQ0, TagQ1, Qos} of
|
|
|
|
|
{undefined, _, ?QOS_0} ->
|
|
|
|
|
{QueueQ0,
|
|
|
|
|
#'queue.declare'{ queue = QueueQ0,
|
|
|
|
|
durable = false,
|
|
|
|
|
auto_delete = true },
|
|
|
|
|
#'basic.consume'{ queue = QueueQ0,
|
|
|
|
|
no_ack = true }};
|
|
|
|
|
{_, undefined, ?QOS_1} ->
|
|
|
|
|
{QueueQ1,
|
|
|
|
|
#'queue.declare'{ queue = QueueQ1,
|
|
|
|
|
durable = true,
|
2015-10-22 01:18:33 +08:00
|
|
|
%% Clean session means a transient connection,
|
|
|
|
|
%% translating into auto-delete.
|
|
|
|
|
%%
|
|
|
|
|
%% see rabbitmq/rabbitmq-mqtt#37
|
|
|
|
|
auto_delete = CleanSess,
|
2012-08-06 06:52:54 +08:00
|
|
|
arguments = Qos1Args },
|
|
|
|
|
#'basic.consume'{ queue = QueueQ1,
|
|
|
|
|
no_ack = false }};
|
|
|
|
|
{_, _, ?QOS_0} ->
|
|
|
|
|
{exists, QueueQ0};
|
|
|
|
|
{_, _, ?QOS_1} ->
|
|
|
|
|
{exists, QueueQ1}
|
|
|
|
|
end,
|
|
|
|
|
case QueueSetup of
|
|
|
|
|
{Queue, Declare, Consume} ->
|
|
|
|
|
#'queue.declare_ok'{} = amqp_channel:call(Channel, Declare),
|
|
|
|
|
#'basic.consume_ok'{ consumer_tag = Tag } =
|
|
|
|
|
amqp_channel:call(Channel, Consume),
|
2012-08-21 00:30:13 +08:00
|
|
|
{Queue, PState #proc_state{ consumer_tags = setelement(Qos+1, Tags, Tag) }};
|
2012-08-06 06:52:54 +08:00
|
|
|
{exists, Q} ->
|
2012-08-21 00:30:13 +08:00
|
|
|
{Q, PState}
|
2012-08-06 06:52:54 +08:00
|
|
|
end.
|
|
|
|
|
|
2016-12-07 18:21:47 +08:00
|
|
|
send_will(PState = #proc_state{will_msg = undefined}) ->
|
|
|
|
|
PState;
|
|
|
|
|
|
2017-08-11 18:29:34 +08:00
|
|
|
send_will(PState = #proc_state{will_msg = WillMsg = #mqtt_msg{retain = Retain,
|
|
|
|
|
topic = Topic},
|
|
|
|
|
retainer_pid = RPid,
|
2019-09-04 23:07:33 +08:00
|
|
|
channels = {ChQos0, ChQos1},
|
|
|
|
|
amqp2mqtt_fun = Amqp2MqttFun}) ->
|
2017-02-21 23:13:07 +08:00
|
|
|
case check_topic_access(Topic, write, PState) of
|
|
|
|
|
ok ->
|
|
|
|
|
amqp_pub(WillMsg, PState),
|
|
|
|
|
case Retain of
|
|
|
|
|
false -> ok;
|
2019-09-04 23:07:33 +08:00
|
|
|
true ->
|
|
|
|
|
hand_off_to_retainer(RPid, Amqp2MqttFun, Topic, WillMsg)
|
2017-02-21 23:13:07 +08:00
|
|
|
end;
|
|
|
|
|
Error ->
|
|
|
|
|
rabbit_log:warning(
|
2021-03-11 21:45:32 +08:00
|
|
|
"Could not send last will: ~p",
|
2017-02-21 23:13:07 +08:00
|
|
|
[Error])
|
2016-12-07 18:21:47 +08:00
|
|
|
end,
|
2017-08-11 18:29:34 +08:00
|
|
|
case ChQos1 of
|
|
|
|
|
undefined -> ok;
|
|
|
|
|
_ -> amqp_channel:close(ChQos1)
|
|
|
|
|
end,
|
|
|
|
|
case ChQos0 of
|
|
|
|
|
undefined -> ok;
|
|
|
|
|
_ -> amqp_channel:close(ChQos0)
|
|
|
|
|
end,
|
|
|
|
|
PState #proc_state{ channels = {undefined, undefined} }.
|
2012-08-21 00:30:13 +08:00
|
|
|
|
2021-10-26 22:29:41 +08:00
|
|
|
%% TODO amqp_pub/2 is publishing messages asynchronously, using
|
|
|
|
|
%% amqp_channel:cast_flow/3
|
|
|
|
|
%%
|
|
|
|
|
%% It does access check using check_publish/3 before submitting, but
|
|
|
|
|
%% this is superfluous, as actual publishing will do the same
|
|
|
|
|
%% check. While check results cached, it's still some unnecessary
|
|
|
|
|
%% work.
|
|
|
|
|
%%
|
|
|
|
|
%% And the only reason to keep it that way is that it prevents useless
|
|
|
|
|
%% crash messages flooding logs, as there is no code to handle async
|
|
|
|
|
%% channel crash gracefully.
|
|
|
|
|
%%
|
|
|
|
|
%% It'd be better to rework the whole thing, removing performance
|
|
|
|
|
%% penalty and some 50 lines of duplicate code. Maybe unlinking from
|
|
|
|
|
%% channel, and adding it as a child of connection supervisor instead.
|
|
|
|
|
%% But exact details are not yet clear.
|
2012-08-21 00:30:13 +08:00
|
|
|
amqp_pub(undefined, PState) ->
|
|
|
|
|
PState;
|
2012-08-06 06:52:54 +08:00
|
|
|
|
|
|
|
|
%% set up a qos1 publishing channel if necessary
|
|
|
|
|
%% this channel will only be used for publishing, not consuming
|
|
|
|
|
amqp_pub(Msg = #mqtt_msg{ qos = ?QOS_1 },
|
2012-08-21 00:30:13 +08:00
|
|
|
PState = #proc_state{ channels = {ChQos0, undefined},
|
|
|
|
|
awaiting_seqno = undefined,
|
|
|
|
|
connection = Conn }) ->
|
2012-08-06 06:52:54 +08:00
|
|
|
{ok, Channel} = amqp_connection:open_channel(Conn),
|
|
|
|
|
#'confirm.select_ok'{} = amqp_channel:call(Channel, #'confirm.select'{}),
|
|
|
|
|
amqp_channel:register_confirm_handler(Channel, self()),
|
2012-08-21 00:30:13 +08:00
|
|
|
amqp_pub(Msg, PState #proc_state{ channels = {ChQos0, Channel},
|
|
|
|
|
awaiting_seqno = 1 });
|
2012-08-06 06:52:54 +08:00
|
|
|
|
2012-08-17 22:05:14 +08:00
|
|
|
amqp_pub(#mqtt_msg{ qos = Qos,
|
2012-08-06 06:52:54 +08:00
|
|
|
topic = Topic,
|
|
|
|
|
dup = Dup,
|
|
|
|
|
message_id = MessageId,
|
|
|
|
|
payload = Payload },
|
2012-08-21 00:30:13 +08:00
|
|
|
PState = #proc_state{ channels = {ChQos0, ChQos1},
|
|
|
|
|
exchange = Exchange,
|
|
|
|
|
unacked_pubs = UnackedPubs,
|
2019-09-04 23:07:33 +08:00
|
|
|
awaiting_seqno = SeqNo,
|
|
|
|
|
mqtt2amqp_fun = Mqtt2AmqpFun }) ->
|
|
|
|
|
RoutingKey = Mqtt2AmqpFun(Topic),
|
2012-08-06 06:52:54 +08:00
|
|
|
Method = #'basic.publish'{ exchange = Exchange,
|
2019-09-04 23:07:33 +08:00
|
|
|
routing_key = RoutingKey },
|
2012-11-06 18:32:38 +08:00
|
|
|
Headers = [{<<"x-mqtt-publish-qos">>, byte, Qos},
|
|
|
|
|
{<<"x-mqtt-dup">>, bool, Dup}],
|
2014-01-30 23:33:59 +08:00
|
|
|
Msg = #amqp_msg{ props = #'P_basic'{ headers = Headers,
|
|
|
|
|
delivery_mode = delivery_mode(Qos)},
|
2012-08-06 06:52:54 +08:00
|
|
|
payload = Payload },
|
|
|
|
|
{UnackedPubs1, Ch, SeqNo1} =
|
|
|
|
|
case Qos =:= ?QOS_1 andalso MessageId =/= undefined of
|
|
|
|
|
true -> {gb_trees:enter(SeqNo, MessageId, UnackedPubs), ChQos1,
|
|
|
|
|
SeqNo + 1};
|
|
|
|
|
false -> {UnackedPubs, ChQos0, SeqNo}
|
|
|
|
|
end,
|
|
|
|
|
amqp_channel:cast_flow(Ch, Method, Msg),
|
2012-08-21 00:30:13 +08:00
|
|
|
PState #proc_state{ unacked_pubs = UnackedPubs1,
|
|
|
|
|
awaiting_seqno = SeqNo1 }.
|
2012-08-06 06:52:54 +08:00
|
|
|
|
2016-02-02 22:31:28 +08:00
|
|
|
adapter_info(Sock, ProtoName) ->
|
|
|
|
|
amqp_connection:socket_adapter_info(Sock, {ProtoName, "N/A"}).
|
|
|
|
|
|
2016-03-04 18:34:53 +08:00
|
|
|
set_proto_version(AdapterInfo = #amqp_adapter_info{protocol = {Proto, _}}, Vsn) ->
|
|
|
|
|
AdapterInfo#amqp_adapter_info{protocol = {Proto,
|
2016-02-02 22:31:28 +08:00
|
|
|
human_readable_mqtt_version(Vsn)}}.
|
2014-08-25 17:43:11 +08:00
|
|
|
|
|
|
|
|
human_readable_mqtt_version(3) ->
|
|
|
|
|
"3.1.0";
|
|
|
|
|
human_readable_mqtt_version(4) ->
|
|
|
|
|
"3.1.1";
|
|
|
|
|
human_readable_mqtt_version(_) ->
|
|
|
|
|
"N/A".
|
2012-08-17 00:56:50 +08:00
|
|
|
|
2019-02-05 07:49:36 +08:00
|
|
|
serialise_and_send_to_client(Frame, #proc_state{ socket = Sock }) ->
|
|
|
|
|
try rabbit_net:port_command(Sock, rabbit_mqtt_frame:serialise(Frame)) of
|
|
|
|
|
Res ->
|
|
|
|
|
Res
|
|
|
|
|
catch _:Error ->
|
|
|
|
|
rabbit_log_connection:error("MQTT: a socket write failed, the socket might already be closed"),
|
|
|
|
|
rabbit_log_connection:debug("Failed to write to socket ~p, error: ~p, frame: ~p",
|
|
|
|
|
[Sock, Error, Frame])
|
|
|
|
|
end.
|
2012-08-21 00:30:13 +08:00
|
|
|
|
|
|
|
|
close_connection(PState = #proc_state{ connection = undefined }) ->
|
|
|
|
|
PState;
|
2013-09-16 18:36:52 +08:00
|
|
|
close_connection(PState = #proc_state{ connection = Connection,
|
|
|
|
|
client_id = ClientId }) ->
|
|
|
|
|
% todo: maybe clean session
|
|
|
|
|
case ClientId of
|
|
|
|
|
undefined -> ok;
|
2020-02-22 02:42:39 +08:00
|
|
|
_ ->
|
|
|
|
|
case rabbit_mqtt_collector:unregister(ClientId, self()) of
|
|
|
|
|
ok -> ok;
|
|
|
|
|
%% ignore as we are shutting down
|
|
|
|
|
{timeout, _} -> ok
|
|
|
|
|
end
|
2013-09-16 18:36:52 +08:00
|
|
|
end,
|
2019-07-29 21:59:19 +08:00
|
|
|
%% ignore noproc or other exceptions, we are shutting down
|
2012-08-21 00:30:13 +08:00
|
|
|
catch amqp_connection:close(Connection),
|
|
|
|
|
PState #proc_state{ channels = {undefined, undefined},
|
|
|
|
|
connection = undefined }.
|
2015-06-30 16:08:26 +08:00
|
|
|
|
2019-10-30 21:32:46 +08:00
|
|
|
handle_pre_hibernate() ->
|
|
|
|
|
erase(topic_permission_cache),
|
|
|
|
|
ok.
|
|
|
|
|
|
2020-02-11 01:05:43 +08:00
|
|
|
handle_ra_event({applied, [{Corr, ok}]},
|
|
|
|
|
PState = #proc_state{register_state = {pending, Corr}}) ->
|
|
|
|
|
%% success case - command was applied transition into registered state
|
|
|
|
|
PState#proc_state{register_state = registered};
|
|
|
|
|
handle_ra_event({not_leader, Leader, Corr},
|
|
|
|
|
PState = #proc_state{register_state = {pending, Corr},
|
|
|
|
|
client_id = ClientId}) ->
|
|
|
|
|
%% retry command against actual leader
|
|
|
|
|
{ok, NewCorr} = rabbit_mqtt_collector:register(Leader, ClientId, self()),
|
|
|
|
|
PState#proc_state{register_state = {pending, NewCorr}};
|
|
|
|
|
handle_ra_event(register_timeout,
|
|
|
|
|
PState = #proc_state{register_state = {pending, _Corr},
|
|
|
|
|
client_id = ClientId}) ->
|
|
|
|
|
{ok, NewCorr} = rabbit_mqtt_collector:register(ClientId, self()),
|
|
|
|
|
PState#proc_state{register_state = {pending, NewCorr}};
|
|
|
|
|
handle_ra_event(register_timeout, PState) ->
|
|
|
|
|
PState;
|
|
|
|
|
handle_ra_event(Evt, PState) ->
|
|
|
|
|
%% log these?
|
2021-03-11 21:45:32 +08:00
|
|
|
rabbit_log:debug("unhandled ra_event: ~w ", [Evt]),
|
2020-02-11 01:05:43 +08:00
|
|
|
PState.
|
|
|
|
|
|
2017-01-16 16:54:16 +08:00
|
|
|
check_publish(TopicName, Fn, PState) ->
|
2015-06-30 16:08:26 +08:00
|
|
|
case check_topic_access(TopicName, write, PState) of
|
2015-07-14 17:47:39 +08:00
|
|
|
ok -> Fn();
|
2019-07-29 21:59:19 +08:00
|
|
|
_ -> {error, unauthorized, PState}
|
2015-06-30 16:08:26 +08:00
|
|
|
end.
|
|
|
|
|
|
2017-01-23 19:10:23 +08:00
|
|
|
check_topic_access(TopicName, Access,
|
2016-12-29 23:34:19 +08:00
|
|
|
#proc_state{
|
2017-06-07 20:41:59 +08:00
|
|
|
auth_state = #auth_state{user = User = #user{username = Username},
|
2016-12-29 23:34:19 +08:00
|
|
|
vhost = VHost},
|
2017-06-07 20:41:59 +08:00
|
|
|
exchange = Exchange,
|
2019-09-04 23:07:33 +08:00
|
|
|
client_id = ClientId,
|
|
|
|
|
mqtt2amqp_fun = Mqtt2AmqpFun }) ->
|
2019-10-30 21:32:46 +08:00
|
|
|
Cache =
|
|
|
|
|
case get(topic_permission_cache) of
|
|
|
|
|
undefined -> [];
|
|
|
|
|
Other -> Other
|
|
|
|
|
end,
|
|
|
|
|
|
|
|
|
|
Key = {TopicName, Username, ClientId, VHost, Exchange, Access},
|
|
|
|
|
case lists:member(Key, Cache) of
|
|
|
|
|
true ->
|
|
|
|
|
ok;
|
|
|
|
|
false ->
|
|
|
|
|
Resource = #resource{virtual_host = VHost,
|
|
|
|
|
kind = topic,
|
|
|
|
|
name = Exchange},
|
|
|
|
|
|
|
|
|
|
RoutingKey = Mqtt2AmqpFun(TopicName),
|
|
|
|
|
Context = #{routing_key => RoutingKey,
|
|
|
|
|
variable_map => #{
|
|
|
|
|
<<"username">> => Username,
|
|
|
|
|
<<"vhost">> => VHost,
|
|
|
|
|
<<"client_id">> => rabbit_data_coercion:to_binary(ClientId)
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
try rabbit_access_control:check_topic_access(User, Resource, Access, Context) of
|
|
|
|
|
ok ->
|
2019-10-30 22:26:57 +08:00
|
|
|
CacheTail = lists:sublist(Cache, ?MAX_TOPIC_PERMISSION_CACHE_SIZE - 1),
|
|
|
|
|
put(topic_permission_cache, [Key | CacheTail]),
|
2019-10-30 21:32:46 +08:00
|
|
|
ok;
|
|
|
|
|
R ->
|
|
|
|
|
R
|
|
|
|
|
catch
|
|
|
|
|
_:{amqp_error, access_refused, Msg, _} ->
|
2021-03-11 21:45:32 +08:00
|
|
|
rabbit_log:error("operation resulted in an error (access_refused): ~p", [Msg]),
|
2019-10-30 21:32:46 +08:00
|
|
|
{error, access_refused};
|
|
|
|
|
_:Error ->
|
2021-03-11 21:45:32 +08:00
|
|
|
rabbit_log:error("~p", [Error]),
|
2019-10-30 21:32:46 +08:00
|
|
|
{error, access_refused}
|
|
|
|
|
end
|
|
|
|
|
end.
|
2016-12-05 22:58:19 +08:00
|
|
|
|
|
|
|
|
info(consumer_tags, #proc_state{consumer_tags = Val}) -> Val;
|
|
|
|
|
info(unacked_pubs, #proc_state{unacked_pubs = Val}) -> Val;
|
|
|
|
|
info(awaiting_ack, #proc_state{awaiting_ack = Val}) -> Val;
|
|
|
|
|
info(awaiting_seqno, #proc_state{awaiting_seqno = Val}) -> Val;
|
|
|
|
|
info(message_id, #proc_state{message_id = Val}) -> Val;
|
2016-12-06 22:24:33 +08:00
|
|
|
info(client_id, #proc_state{client_id = Val}) ->
|
|
|
|
|
rabbit_data_coercion:to_binary(Val);
|
2016-12-05 22:58:19 +08:00
|
|
|
info(clean_sess, #proc_state{clean_sess = Val}) -> Val;
|
|
|
|
|
info(will_msg, #proc_state{will_msg = Val}) -> Val;
|
|
|
|
|
info(channels, #proc_state{channels = Val}) -> Val;
|
|
|
|
|
info(exchange, #proc_state{exchange = Val}) -> Val;
|
|
|
|
|
info(adapter_info, #proc_state{adapter_info = Val}) -> Val;
|
|
|
|
|
info(ssl_login_name, #proc_state{ssl_login_name = Val}) -> Val;
|
|
|
|
|
info(retainer_pid, #proc_state{retainer_pid = Val}) -> Val;
|
|
|
|
|
info(user, #proc_state{auth_state = #auth_state{username = Val}}) -> Val;
|
|
|
|
|
info(vhost, #proc_state{auth_state = #auth_state{vhost = Val}}) -> Val;
|
|
|
|
|
info(host, #proc_state{adapter_info = #amqp_adapter_info{host = Val}}) -> Val;
|
|
|
|
|
info(port, #proc_state{adapter_info = #amqp_adapter_info{port = Val}}) -> Val;
|
|
|
|
|
info(peer_host, #proc_state{adapter_info = #amqp_adapter_info{peer_host = Val}}) -> Val;
|
|
|
|
|
info(peer_port, #proc_state{adapter_info = #amqp_adapter_info{peer_port = Val}}) -> Val;
|
2016-12-06 22:24:33 +08:00
|
|
|
info(protocol, #proc_state{adapter_info = #amqp_adapter_info{protocol = Val}}) ->
|
|
|
|
|
case Val of
|
|
|
|
|
{Proto, Version} -> {Proto, rabbit_data_coercion:to_binary(Version)};
|
|
|
|
|
Other -> Other
|
|
|
|
|
end;
|
2016-12-05 22:58:19 +08:00
|
|
|
info(channels, PState) -> additional_info(channels, PState);
|
|
|
|
|
info(channel_max, PState) -> additional_info(channel_max, PState);
|
|
|
|
|
info(frame_max, PState) -> additional_info(frame_max, PState);
|
|
|
|
|
info(client_properties, PState) -> additional_info(client_properties, PState);
|
|
|
|
|
info(ssl, PState) -> additional_info(ssl, PState);
|
|
|
|
|
info(ssl_protocol, PState) -> additional_info(ssl_protocol, PState);
|
|
|
|
|
info(ssl_key_exchange, PState) -> additional_info(ssl_key_exchange, PState);
|
|
|
|
|
info(ssl_cipher, PState) -> additional_info(ssl_cipher, PState);
|
|
|
|
|
info(ssl_hash, PState) -> additional_info(ssl_hash, PState);
|
|
|
|
|
info(Other, _) -> throw({bad_argument, Other}).
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
additional_info(Key,
|
|
|
|
|
#proc_state{adapter_info =
|
|
|
|
|
#amqp_adapter_info{additional_info = AddInfo}}) ->
|
|
|
|
|
proplists:get_value(Key, AddInfo).
|