2020-07-14 00:39:36 +08:00
|
|
|
%% This Source Code Form is subject to the terms of the Mozilla Public
|
|
|
|
|
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
|
|
|
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
2012-06-27 00:57:24 +08:00
|
|
|
%%
|
2023-01-02 12:17:36 +08:00
|
|
|
%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
|
2012-06-27 00:57:24 +08:00
|
|
|
%%
|
|
|
|
|
|
|
|
|
|
-module(rabbit_mqtt_processor).
|
|
|
|
|
|
2022-09-19 01:07:11 +08:00
|
|
|
-export([info/2, initial_state/2,
|
2012-08-21 00:30:13 +08:00
|
|
|
process_frame/2, amqp_pub/2, amqp_callback/2, send_will/1,
|
2020-02-11 01:05:43 +08:00
|
|
|
close_connection/1, handle_pre_hibernate/0,
|
2022-08-13 21:28:30 +08:00
|
|
|
handle_ra_event/2, handle_down/2, handle_queue_event/2]).
|
2012-06-27 00:57:24 +08:00
|
|
|
|
2015-11-19 01:38:35 +08:00
|
|
|
%% for testing purposes
|
2022-08-05 17:16:43 +08:00
|
|
|
-export([get_vhost_username/1, get_vhost/3, get_vhost_from_user_mapping/2, maybe_quorum/3]).
|
2015-11-19 01:38:35 +08:00
|
|
|
|
2012-06-27 00:57:24 +08:00
|
|
|
-include_lib("amqp_client/include/amqp_client.hrl").
|
2022-08-13 21:28:30 +08:00
|
|
|
-include_lib("rabbit/include/amqqueue.hrl").
|
2012-09-12 21:34:41 +08:00
|
|
|
-include("rabbit_mqtt_frame.hrl").
|
|
|
|
|
-include("rabbit_mqtt.hrl").
|
2012-07-13 23:32:13 +08:00
|
|
|
|
2014-11-28 23:08:42 +08:00
|
|
|
-define(APP, rabbitmq_mqtt).
|
2012-07-13 23:32:13 +08:00
|
|
|
-define(FRAME_TYPE(Frame, Type),
|
2012-08-21 00:30:13 +08:00
|
|
|
Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}).
|
2022-08-13 21:28:30 +08:00
|
|
|
-define(MAX_PERMISSION_CACHE_SIZE, 12).
|
|
|
|
|
-define(CONSUMER_TAG, mqtt_consumer).
|
2012-08-21 00:30:13 +08:00
|
|
|
|
2022-09-19 01:07:11 +08:00
|
|
|
initial_state(Socket, ConnectionName) ->
|
|
|
|
|
SSLLoginName = ssl_login_name(Socket),
|
|
|
|
|
{ok, {PeerAddr, _PeerPort}} = rabbit_net:peername(Socket),
|
2019-09-04 23:07:33 +08:00
|
|
|
{ok, {mqtt2amqp_fun, M2A}, {amqp2mqtt_fun, A2M}} =
|
2022-09-19 01:07:11 +08:00
|
|
|
rabbit_mqtt_util:get_topic_translation_funs(),
|
2016-03-03 21:59:38 +08:00
|
|
|
%% MQTT connections use exactly one channel. The frame max is not
|
|
|
|
|
%% applicable and there is no way to know what client is used.
|
2022-09-19 01:07:11 +08:00
|
|
|
#proc_state{socket = Socket,
|
|
|
|
|
conn_name = ConnectionName,
|
|
|
|
|
unacked_pubs = gb_trees:empty(),
|
|
|
|
|
awaiting_ack = gb_trees:empty(),
|
|
|
|
|
message_id = 1,
|
|
|
|
|
subscriptions = #{},
|
|
|
|
|
queue_states = rabbit_queue_type:init(),
|
|
|
|
|
consumer_tags = {undefined, undefined},
|
|
|
|
|
channels = {undefined, undefined},
|
|
|
|
|
ssl_login_name = SSLLoginName,
|
|
|
|
|
send_fun = fun serialise_and_send_to_client/2,
|
|
|
|
|
peer_addr = PeerAddr,
|
|
|
|
|
mqtt2amqp_fun = M2A,
|
|
|
|
|
amqp2mqtt_fun = A2M}.
|
2012-08-21 00:30:13 +08:00
|
|
|
|
2012-08-06 06:52:54 +08:00
|
|
|
process_frame(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
|
2022-08-05 17:16:43 +08:00
|
|
|
PState = #proc_state{ auth_state = undefined } )
|
2012-08-06 06:52:54 +08:00
|
|
|
when Type =/= ?CONNECT ->
|
2014-07-04 01:36:17 +08:00
|
|
|
{error, connect_expected, PState};
|
2022-09-06 21:56:01 +08:00
|
|
|
process_frame(Frame = #mqtt_frame{fixed = #mqtt_frame_fixed{ type = Type }},
|
2014-11-28 22:40:16 +08:00
|
|
|
PState) ->
|
2022-09-06 21:56:01 +08:00
|
|
|
case process_request(Type, Frame, PState) of
|
|
|
|
|
{ok, PState1} ->
|
|
|
|
|
{ok, PState1, PState1#proc_state.connection};
|
|
|
|
|
Ret ->
|
|
|
|
|
Ret
|
2016-03-03 21:59:38 +08:00
|
|
|
end.
|
2012-07-13 23:32:13 +08:00
|
|
|
|
Avoid crash when client disconnects before server handles MQTT CONNECT
In case of a resource alarm, the server accepts incoming TCP
connections, but does not read from the socket.
When a client connects during a resource alarm, the MQTT CONNECT frame
is therefore not processed.
While the resource alarm is ongoing, the client might time out waiting
on a CONNACK MQTT packet.
When the resource alarm clears on the server, the MQTT CONNECT frame
gets processed.
Prior to this commit, this results in the following crash on the server:
```
** Reason for termination ==
** {{badmatch,{error,einval}},
[{rabbit_mqtt_processor,process_login,4,
[{file,"rabbit_mqtt_processor.erl"},{line,585}]},
{rabbit_mqtt_processor,process_request,3,
[{file,"rabbit_mqtt_processor.erl"},{line,143}]},
{rabbit_mqtt_processor,process_frame,2,
[{file,"rabbit_mqtt_processor.erl"},{line,69}]},
{rabbit_mqtt_reader,process_received_bytes,2,
[{file,"src/rabbit_mqtt_reader.erl"},{line,307}]},
```
After this commit, the server just logs:
```
[error] <0.887.0> MQTT protocol error on connection 127.0.0.1:55725 -> 127.0.0.1:1883: peername_not_known
```
In case the client already disconnected, we want the server to bail out
early, i.e. not authenticating and registering the client at all
since that can be expensive when many clients connected while the
resource alarm was ongoing.
To detect whether the client disconnected, we rely on inet:peername/1
which will return an error when the peer is not connected anymore.
Ideally we could use some better mechanism for detecting whether the
client disconnected.
The MQTT reader does receive a {tcp_closed, Socket} message once the
socket becomes active. However, we don't really want to read frames
ahead (i.e. ahead of the received CONNECT frame), one reason being that:
"Clients are allowed to send further Control Packets immediately
after sending a CONNECT Packet; Clients need not wait for a CONNACK Packet
to arrive from the Server."
Setting socket option `show_econnreset` does not help either because the client
closes the connection normally.
Co-authored-by: Péter Gömöri @gomoripeti
2022-08-25 23:54:30 +08:00
|
|
|
process_request(?CONNECT, Frame, PState = #proc_state{socket = Socket}) ->
|
2022-08-26 01:47:56 +08:00
|
|
|
%% Check whether peer closed the connection.
|
|
|
|
|
%% For example, this can happen when connection was blocked because of resource
|
|
|
|
|
%% alarm and client therefore disconnected due to client side CONNACK timeout.
|
|
|
|
|
case rabbit_net:socket_ends(Socket, inbound) of
|
|
|
|
|
{error, Reason} ->
|
|
|
|
|
{error, {socket_ends, Reason}, PState};
|
Avoid crash when client disconnects before server handles MQTT CONNECT
In case of a resource alarm, the server accepts incoming TCP
connections, but does not read from the socket.
When a client connects during a resource alarm, the MQTT CONNECT frame
is therefore not processed.
While the resource alarm is ongoing, the client might time out waiting
on a CONNACK MQTT packet.
When the resource alarm clears on the server, the MQTT CONNECT frame
gets processed.
Prior to this commit, this results in the following crash on the server:
```
** Reason for termination ==
** {{badmatch,{error,einval}},
[{rabbit_mqtt_processor,process_login,4,
[{file,"rabbit_mqtt_processor.erl"},{line,585}]},
{rabbit_mqtt_processor,process_request,3,
[{file,"rabbit_mqtt_processor.erl"},{line,143}]},
{rabbit_mqtt_processor,process_frame,2,
[{file,"rabbit_mqtt_processor.erl"},{line,69}]},
{rabbit_mqtt_reader,process_received_bytes,2,
[{file,"src/rabbit_mqtt_reader.erl"},{line,307}]},
```
After this commit, the server just logs:
```
[error] <0.887.0> MQTT protocol error on connection 127.0.0.1:55725 -> 127.0.0.1:1883: peername_not_known
```
In case the client already disconnected, we want the server to bail out
early, i.e. not authenticating and registering the client at all
since that can be expensive when many clients connected while the
resource alarm was ongoing.
To detect whether the client disconnected, we rely on inet:peername/1
which will return an error when the peer is not connected anymore.
Ideally we could use some better mechanism for detecting whether the
client disconnected.
The MQTT reader does receive a {tcp_closed, Socket} message once the
socket becomes active. However, we don't really want to read frames
ahead (i.e. ahead of the received CONNECT frame), one reason being that:
"Clients are allowed to send further Control Packets immediately
after sending a CONNECT Packet; Clients need not wait for a CONNACK Packet
to arrive from the Server."
Setting socket option `show_econnreset` does not help either because the client
closes the connection normally.
Co-authored-by: Péter Gömöri @gomoripeti
2022-08-25 23:54:30 +08:00
|
|
|
_ ->
|
|
|
|
|
process_connect(Frame, PState)
|
2019-07-29 21:59:19 +08:00
|
|
|
end;
|
2012-07-04 00:35:18 +08:00
|
|
|
|
2012-08-06 06:52:54 +08:00
|
|
|
process_request(?PUBACK,
|
2012-08-21 00:30:13 +08:00
|
|
|
#mqtt_frame{
|
2012-08-06 06:52:54 +08:00
|
|
|
variable = #mqtt_frame_publish{ message_id = MessageId }},
|
2012-08-21 00:30:13 +08:00
|
|
|
#proc_state{ channels = {Channel, _},
|
|
|
|
|
awaiting_ack = Awaiting } = PState) ->
|
2015-04-26 09:51:04 +08:00
|
|
|
%% tag can be missing because of bogus clients and QoS downgrades
|
|
|
|
|
case gb_trees:is_defined(MessageId, Awaiting) of
|
|
|
|
|
false ->
|
|
|
|
|
{ok, PState};
|
|
|
|
|
true ->
|
|
|
|
|
Tag = gb_trees:get(MessageId, Awaiting),
|
|
|
|
|
amqp_channel:cast(Channel, #'basic.ack'{ delivery_tag = Tag }),
|
2017-11-09 09:08:17 +08:00
|
|
|
{ok, PState#proc_state{ awaiting_ack = gb_trees:delete(MessageId, Awaiting) }}
|
2015-04-26 09:51:04 +08:00
|
|
|
end;
|
2012-08-06 06:52:54 +08:00
|
|
|
|
|
|
|
|
process_request(?PUBLISH,
|
2016-05-18 21:11:50 +08:00
|
|
|
Frame = #mqtt_frame{
|
|
|
|
|
fixed = Fixed = #mqtt_frame_fixed{ qos = ?QOS_2 }},
|
2016-01-09 01:25:26 +08:00
|
|
|
PState) ->
|
|
|
|
|
% Downgrade QOS_2 to QOS_1
|
2016-05-18 21:11:50 +08:00
|
|
|
process_request(?PUBLISH,
|
2016-01-09 01:25:26 +08:00
|
|
|
Frame#mqtt_frame{
|
|
|
|
|
fixed = Fixed#mqtt_frame_fixed{ qos = ?QOS_1 }},
|
|
|
|
|
PState);
|
2012-07-04 23:31:30 +08:00
|
|
|
process_request(?PUBLISH,
|
2012-08-21 00:30:13 +08:00
|
|
|
#mqtt_frame{
|
2012-08-06 06:52:54 +08:00
|
|
|
fixed = #mqtt_frame_fixed{ qos = Qos,
|
|
|
|
|
retain = Retain,
|
|
|
|
|
dup = Dup },
|
|
|
|
|
variable = #mqtt_frame_publish{ topic_name = Topic,
|
|
|
|
|
message_id = MessageId },
|
2015-04-18 08:55:34 +08:00
|
|
|
payload = Payload },
|
2019-09-04 23:07:33 +08:00
|
|
|
PState = #proc_state{retainer_pid = RPid,
|
|
|
|
|
amqp2mqtt_fun = Amqp2MqttFun}) ->
|
2022-09-06 21:56:01 +08:00
|
|
|
publish(
|
|
|
|
|
Topic,
|
|
|
|
|
fun() ->
|
|
|
|
|
Msg = #mqtt_msg{retain = Retain,
|
|
|
|
|
qos = Qos,
|
|
|
|
|
topic = Topic,
|
|
|
|
|
dup = Dup,
|
|
|
|
|
message_id = MessageId,
|
|
|
|
|
payload = Payload},
|
|
|
|
|
Result = amqp_pub(Msg, PState),
|
|
|
|
|
case Retain of
|
|
|
|
|
false -> ok;
|
|
|
|
|
true -> hand_off_to_retainer(RPid, Amqp2MqttFun, Topic, Msg)
|
|
|
|
|
end,
|
|
|
|
|
{ok, Result}
|
|
|
|
|
end, PState);
|
2012-07-04 23:31:30 +08:00
|
|
|
|
|
|
|
|
process_request(?SUBSCRIBE,
|
2012-08-21 00:30:13 +08:00
|
|
|
#mqtt_frame{
|
2022-09-06 21:56:01 +08:00
|
|
|
variable = #mqtt_frame_subscribe{
|
|
|
|
|
message_id = SubscribeMsgId,
|
|
|
|
|
topic_table = Topics},
|
|
|
|
|
payload = undefined},
|
2022-08-13 21:28:30 +08:00
|
|
|
#proc_state{retainer_pid = RPid,
|
2017-11-09 09:08:17 +08:00
|
|
|
send_fun = SendFun,
|
2022-08-13 21:28:30 +08:00
|
|
|
message_id = StateMsgId} = PState0) ->
|
|
|
|
|
rabbit_log_connection:debug("Received a SUBSCRIBE for topic(s) ~p", [Topics]),
|
2021-10-26 22:29:41 +08:00
|
|
|
|
2022-09-06 21:56:01 +08:00
|
|
|
{QosResponse, PState} =
|
|
|
|
|
lists:foldl(fun(#mqtt_topic{name = TopicName,
|
|
|
|
|
qos = Qos}, {QosList, S0}) ->
|
|
|
|
|
SupportedQos = supported_subs_qos(Qos),
|
|
|
|
|
case ensure_queue(SupportedQos, S0) of
|
|
|
|
|
{ok, QueueName, #proc_state{subscriptions = Subs} = S1} ->
|
|
|
|
|
case bind(QueueName, TopicName, S1) of
|
|
|
|
|
{ok, _Output, S2} ->
|
|
|
|
|
SupportedQosList = case maps:find(TopicName, Subs) of
|
|
|
|
|
{ok, L} -> [SupportedQos|L];
|
|
|
|
|
error -> [SupportedQos]
|
|
|
|
|
end,
|
|
|
|
|
{[SupportedQos | QosList],
|
|
|
|
|
S2#proc_state{subscriptions = maps:put(TopicName, SupportedQosList, Subs)}};
|
|
|
|
|
{error, Reason, S2} ->
|
|
|
|
|
rabbit_log:error("Failed to bind ~s with topic ~s: ~p",
|
|
|
|
|
[rabbit_misc:rs(QueueName), TopicName, Reason]),
|
|
|
|
|
{[?SUBACK_FAILURE | QosList], S2}
|
|
|
|
|
end;
|
|
|
|
|
{error, _Reason} ->
|
|
|
|
|
{[?SUBACK_FAILURE | QosList], S0}
|
|
|
|
|
end
|
|
|
|
|
end, {[], PState0}, Topics),
|
2021-10-26 22:29:41 +08:00
|
|
|
SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK},
|
|
|
|
|
variable = #mqtt_frame_suback{
|
2022-09-06 21:56:01 +08:00
|
|
|
message_id = SubscribeMsgId,
|
|
|
|
|
qos_table = QosResponse}}, PState),
|
|
|
|
|
case lists:member(?SUBACK_FAILURE, QosResponse) of
|
|
|
|
|
false ->
|
|
|
|
|
%% we may need to send up to length(Topics) messages.
|
|
|
|
|
%% if QoS is > 0 then we need to generate a message id,
|
|
|
|
|
%% and increment the counter.
|
|
|
|
|
StartMsgId = safe_max_id(SubscribeMsgId, StateMsgId),
|
|
|
|
|
N = lists:foldl(fun (Topic, Acc) ->
|
|
|
|
|
case maybe_send_retained_message(RPid, Topic, Acc, PState) of
|
|
|
|
|
{true, X} -> Acc + X;
|
|
|
|
|
false -> Acc
|
|
|
|
|
end
|
|
|
|
|
end, StartMsgId, Topics),
|
|
|
|
|
{ok, PState#proc_state{message_id = N}};
|
|
|
|
|
true ->
|
|
|
|
|
{error, subscribe_error, PState}
|
|
|
|
|
end;
|
2012-07-05 00:47:07 +08:00
|
|
|
|
|
|
|
|
process_request(?UNSUBSCRIBE,
|
2012-08-21 00:30:13 +08:00
|
|
|
#mqtt_frame{
|
|
|
|
|
variable = #mqtt_frame_subscribe{ message_id = MessageId,
|
|
|
|
|
topic_table = Topics },
|
|
|
|
|
payload = undefined }, #proc_state{ channels = {Channel, _},
|
|
|
|
|
exchange = Exchange,
|
|
|
|
|
client_id = ClientId,
|
2016-01-07 23:19:01 +08:00
|
|
|
subscriptions = Subs0,
|
2019-09-04 23:07:33 +08:00
|
|
|
send_fun = SendFun,
|
|
|
|
|
mqtt2amqp_fun = Mqtt2AmqpFun } = PState) ->
|
2022-10-08 06:59:05 +08:00
|
|
|
rabbit_log_connection:debug("Received an UNSUBSCRIBE for topic(s) ~tp", [Topics]),
|
2012-08-06 06:52:54 +08:00
|
|
|
Queues = rabbit_mqtt_util:subcription_queue_name(ClientId),
|
|
|
|
|
Subs1 =
|
|
|
|
|
lists:foldl(
|
2012-08-21 00:30:13 +08:00
|
|
|
fun (#mqtt_topic{ name = TopicName }, Subs) ->
|
2017-04-24 20:45:37 +08:00
|
|
|
QosSubs = case maps:find(TopicName, Subs) of
|
2012-08-06 06:52:54 +08:00
|
|
|
{ok, Val} when is_list(Val) -> lists:usort(Val);
|
|
|
|
|
error -> []
|
|
|
|
|
end,
|
2019-09-04 23:07:33 +08:00
|
|
|
RoutingKey = Mqtt2AmqpFun(TopicName),
|
2012-08-06 06:52:54 +08:00
|
|
|
lists:foreach(
|
|
|
|
|
fun (QosSub) ->
|
|
|
|
|
Queue = element(QosSub + 1, Queues),
|
|
|
|
|
Binding = #'queue.unbind'{
|
|
|
|
|
queue = Queue,
|
|
|
|
|
exchange = Exchange,
|
2019-09-04 23:07:33 +08:00
|
|
|
routing_key = RoutingKey},
|
2012-08-06 06:52:54 +08:00
|
|
|
#'queue.unbind_ok'{} = amqp_channel:call(Channel, Binding)
|
|
|
|
|
end, QosSubs),
|
2017-04-24 20:45:37 +08:00
|
|
|
maps:remove(TopicName, Subs)
|
2012-08-06 06:52:54 +08:00
|
|
|
end, Subs0, Topics),
|
2016-01-07 23:19:01 +08:00
|
|
|
SendFun(#mqtt_frame{ fixed = #mqtt_frame_fixed { type = ?UNSUBACK },
|
|
|
|
|
variable = #mqtt_frame_suback{ message_id = MessageId }},
|
2012-08-21 00:30:13 +08:00
|
|
|
PState),
|
|
|
|
|
{ok, PState #proc_state{ subscriptions = Subs1 }};
|
2012-07-03 16:55:31 +08:00
|
|
|
|
2016-01-07 23:19:01 +08:00
|
|
|
process_request(?PINGREQ, #mqtt_frame{}, #proc_state{ send_fun = SendFun } = PState) ->
|
2019-02-05 07:49:36 +08:00
|
|
|
rabbit_log_connection:debug("Received a PINGREQ"),
|
2016-01-07 23:19:01 +08:00
|
|
|
SendFun(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }},
|
2012-08-21 00:30:13 +08:00
|
|
|
PState),
|
2019-02-05 07:49:36 +08:00
|
|
|
rabbit_log_connection:debug("Sent a PINGRESP"),
|
2012-08-21 00:30:13 +08:00
|
|
|
{ok, PState};
|
2012-07-04 23:31:30 +08:00
|
|
|
|
2012-08-21 00:30:13 +08:00
|
|
|
process_request(?DISCONNECT, #mqtt_frame{}, PState) ->
|
2019-02-05 07:49:36 +08:00
|
|
|
rabbit_log_connection:debug("Received a DISCONNECT"),
|
2012-08-21 00:30:13 +08:00
|
|
|
{stop, PState}.
|
2012-07-04 00:35:18 +08:00
|
|
|
|
2022-09-01 22:02:08 +08:00
|
|
|
process_connect(#mqtt_frame{
|
|
|
|
|
variable = #mqtt_frame_connect{
|
|
|
|
|
username = Username,
|
|
|
|
|
proto_ver = ProtoVersion,
|
|
|
|
|
clean_sess = CleanSess,
|
|
|
|
|
client_id = ClientId,
|
|
|
|
|
keep_alive = Keepalive} = FrameConnect},
|
|
|
|
|
#proc_state{send_fun = SendFun} = PState0) ->
|
|
|
|
|
rabbit_log_connection:debug("Received a CONNECT, client ID: ~p, username: ~p, "
|
|
|
|
|
"clean session: ~p, protocol version: ~p, keepalive: ~p",
|
|
|
|
|
[ClientId, Username, CleanSess, ProtoVersion, Keepalive]),
|
|
|
|
|
{ReturnCode, SessionPresent, PState} =
|
|
|
|
|
case rabbit_misc:pipeline([fun check_protocol_version/1,
|
|
|
|
|
fun check_client_id/1,
|
|
|
|
|
fun check_credentials/2,
|
|
|
|
|
fun login/2,
|
2022-09-19 01:07:11 +08:00
|
|
|
fun register_client/2,
|
|
|
|
|
fun notify_connection_created/2],
|
2022-09-01 22:02:08 +08:00
|
|
|
FrameConnect, PState0) of
|
|
|
|
|
{ok, SessionPresent0, PState1} ->
|
|
|
|
|
{?CONNACK_ACCEPT, SessionPresent0, PState1};
|
|
|
|
|
{error, ReturnCode0, PState1} ->
|
|
|
|
|
{ReturnCode0, false, PState1}
|
|
|
|
|
end,
|
|
|
|
|
ResponseFrame = #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?CONNACK},
|
|
|
|
|
variable = #mqtt_frame_connack{
|
|
|
|
|
session_present = SessionPresent,
|
|
|
|
|
return_code = ReturnCode}},
|
|
|
|
|
SendFun(ResponseFrame, PState),
|
|
|
|
|
return_connack(ReturnCode, PState).
|
|
|
|
|
|
|
|
|
|
client_id([]) ->
|
|
|
|
|
rabbit_mqtt_util:gen_client_id();
|
|
|
|
|
client_id(ClientId)
|
|
|
|
|
when is_list(ClientId) ->
|
|
|
|
|
ClientId.
|
|
|
|
|
|
|
|
|
|
check_protocol_version(#mqtt_frame_connect{proto_ver = ProtoVersion}) ->
|
|
|
|
|
case lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)) of
|
|
|
|
|
true ->
|
|
|
|
|
ok;
|
|
|
|
|
false ->
|
|
|
|
|
{error, ?CONNACK_UNACCEPTABLE_PROTO_VER}
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
check_client_id(#mqtt_frame_connect{clean_sess = false,
|
|
|
|
|
client_id = []}) ->
|
|
|
|
|
{error, ?CONNACK_ID_REJECTED};
|
|
|
|
|
check_client_id(_) ->
|
|
|
|
|
ok.
|
|
|
|
|
|
|
|
|
|
check_credentials(Frame = #mqtt_frame_connect{username = Username,
|
|
|
|
|
password = Password},
|
|
|
|
|
PState = #proc_state{ssl_login_name = SslLoginName,
|
|
|
|
|
peer_addr = PeerAddr}) ->
|
|
|
|
|
Ip = list_to_binary(inet:ntoa(PeerAddr)),
|
|
|
|
|
case creds(Username, Password, SslLoginName) of
|
|
|
|
|
nocreds ->
|
|
|
|
|
rabbit_core_metrics:auth_attempt_failed(Ip, <<>>, mqtt),
|
|
|
|
|
rabbit_log_connection:error("MQTT login failed: no credentials provided"),
|
|
|
|
|
{error, ?CONNACK_BAD_CREDENTIALS};
|
|
|
|
|
{invalid_creds, {undefined, Pass}} when is_list(Pass) ->
|
|
|
|
|
rabbit_core_metrics:auth_attempt_failed(Ip, <<>>, mqtt),
|
|
|
|
|
rabbit_log_connection:error("MQTT login failed: no username is provided"),
|
|
|
|
|
{error, ?CONNACK_BAD_CREDENTIALS};
|
|
|
|
|
{invalid_creds, {User, undefined}} when is_list(User) ->
|
|
|
|
|
rabbit_core_metrics:auth_attempt_failed(Ip, User, mqtt),
|
|
|
|
|
rabbit_log_connection:error("MQTT login failed for user '~p': no password provided", [User]),
|
|
|
|
|
{error, ?CONNACK_BAD_CREDENTIALS};
|
|
|
|
|
{UserBin, PassBin} ->
|
|
|
|
|
{ok, {UserBin, PassBin, Frame}, PState}
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
login({UserBin, PassBin,
|
|
|
|
|
Frame = #mqtt_frame_connect{client_id = ClientId0,
|
|
|
|
|
clean_sess = CleanSess}},
|
2022-09-02 20:24:40 +08:00
|
|
|
PState0) ->
|
2022-09-01 22:02:08 +08:00
|
|
|
ClientId = client_id(ClientId0),
|
2022-09-02 20:24:40 +08:00
|
|
|
case process_login(UserBin, PassBin, ClientId, PState0) of
|
2022-09-01 22:02:08 +08:00
|
|
|
connack_dup_auth ->
|
2022-09-02 20:24:40 +08:00
|
|
|
maybe_clean_sess(PState0);
|
|
|
|
|
{ok, PState} ->
|
|
|
|
|
{ok, Frame, PState#proc_state{clean_sess = CleanSess,
|
|
|
|
|
client_id = ClientId}};
|
|
|
|
|
{error, _Reason, _PState} = Err ->
|
2022-09-01 22:02:08 +08:00
|
|
|
Err
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
register_client(Frame = #mqtt_frame_connect{
|
|
|
|
|
keep_alive = Keepalive,
|
|
|
|
|
proto_ver = ProtoVersion},
|
|
|
|
|
PState0 = #proc_state{client_id = ClientId,
|
|
|
|
|
socket = Socket,
|
|
|
|
|
auth_state = #auth_state{
|
|
|
|
|
vhost = VHost}}) ->
|
|
|
|
|
case rabbit_mqtt_collector:register(ClientId, self()) of
|
|
|
|
|
{ok, Corr} ->
|
|
|
|
|
RetainerPid = rabbit_mqtt_retainer_sup:child_for_vhost(VHost),
|
|
|
|
|
Prefetch = rabbit_mqtt_util:env(prefetch),
|
|
|
|
|
rabbit_mqtt_reader:start_keepalive(self(), Keepalive),
|
|
|
|
|
{ok, {PeerHost, PeerPort, Host, Port}} = rabbit_net:socket_ends(Socket, inbound),
|
|
|
|
|
ExchangeBin = rabbit_mqtt_util:env(exchange),
|
|
|
|
|
ExchangeName = rabbit_misc:r(VHost, exchange, ExchangeBin),
|
2022-09-06 21:56:01 +08:00
|
|
|
ProtoHumanReadable = {'MQTT', human_readable_mqtt_version(ProtoVersion)},
|
2022-09-01 22:02:08 +08:00
|
|
|
PState = PState0#proc_state{
|
|
|
|
|
exchange = ExchangeName,
|
|
|
|
|
will_msg = make_will_msg(Frame),
|
|
|
|
|
retainer_pid = RetainerPid,
|
|
|
|
|
register_state = {pending, Corr},
|
2022-09-06 21:56:01 +08:00
|
|
|
proto_ver = ProtoVersion,
|
2022-09-01 22:02:08 +08:00
|
|
|
info = #info{prefetch = Prefetch,
|
|
|
|
|
peer_host = PeerHost,
|
|
|
|
|
peer_port = PeerPort,
|
|
|
|
|
host = Host,
|
|
|
|
|
port = Port,
|
2022-09-06 21:56:01 +08:00
|
|
|
proto_human = ProtoHumanReadable
|
|
|
|
|
}},
|
2022-09-01 22:02:08 +08:00
|
|
|
maybe_clean_sess(PState);
|
|
|
|
|
{error, _} = Err ->
|
|
|
|
|
%% e.g. this node was removed from the MQTT cluster members
|
|
|
|
|
rabbit_log_connection:error("MQTT cannot accept a connection: "
|
|
|
|
|
"client ID tracker is unavailable: ~p", [Err]),
|
|
|
|
|
{error, ?CONNACK_SERVER_UNAVAILABLE};
|
|
|
|
|
{timeout, _} ->
|
|
|
|
|
rabbit_log_connection:error("MQTT cannot accept a connection: "
|
|
|
|
|
"client ID registration timed out"),
|
|
|
|
|
{error, ?CONNACK_SERVER_UNAVAILABLE}
|
|
|
|
|
end.
|
|
|
|
|
|
2022-09-19 01:07:11 +08:00
|
|
|
notify_connection_created(
|
|
|
|
|
_Frame,
|
|
|
|
|
#proc_state{socket = Sock,
|
|
|
|
|
conn_name = ConnName,
|
|
|
|
|
info = #info{proto_human = {ProtoName, ProtoVsn}},
|
|
|
|
|
auth_state = #auth_state{vhost = VHost,
|
|
|
|
|
username = Username}} = PState) ->
|
|
|
|
|
{ok, {PeerHost, PeerPort, Host, Port}} = rabbit_net:socket_ends(Sock, inbound),
|
|
|
|
|
ConnectedAt = os:system_time(milli_seconds),
|
|
|
|
|
Infos = [{host, Host},
|
|
|
|
|
{port, Port},
|
|
|
|
|
{peer_host, PeerHost},
|
|
|
|
|
{peer_port, PeerPort},
|
|
|
|
|
{vhost, VHost},
|
|
|
|
|
{node, node()},
|
|
|
|
|
{user, Username},
|
|
|
|
|
{name, ConnName},
|
|
|
|
|
{connected_at, ConnectedAt},
|
|
|
|
|
{pid, self()},
|
|
|
|
|
{protocol, {ProtoName, binary_to_list(ProtoVsn)}},
|
|
|
|
|
{type, network}
|
|
|
|
|
],
|
|
|
|
|
rabbit_core_metrics:connection_created(self(), Infos),
|
|
|
|
|
rabbit_event:notify(connection_created, Infos),
|
|
|
|
|
{ok, PState#proc_state{
|
|
|
|
|
%% We won't need conn_name anymore. Use less memmory by setting to undefined.
|
|
|
|
|
conn_name = undefined}}.
|
|
|
|
|
|
2022-09-06 21:56:01 +08:00
|
|
|
human_readable_mqtt_version(3) ->
|
|
|
|
|
<<"3.1.0">>;
|
|
|
|
|
human_readable_mqtt_version(4) ->
|
|
|
|
|
<<"3.1.1">>.
|
|
|
|
|
|
2022-09-01 22:02:08 +08:00
|
|
|
return_connack(?CONNACK_ACCEPT, S) ->
|
|
|
|
|
{ok, S};
|
|
|
|
|
return_connack(?CONNACK_BAD_CREDENTIALS, S) ->
|
|
|
|
|
{error, unauthenticated, S};
|
|
|
|
|
return_connack(?CONNACK_NOT_AUTHORIZED, S) ->
|
|
|
|
|
{error, unauthorized, S};
|
|
|
|
|
return_connack(?CONNACK_SERVER_UNAVAILABLE, S) ->
|
|
|
|
|
{error, unavailable, S};
|
|
|
|
|
return_connack(?CONNACK_ID_REJECTED, S) ->
|
|
|
|
|
{error, invalid_client_id, S};
|
|
|
|
|
return_connack(?CONNACK_UNACCEPTABLE_PROTO_VER, S) ->
|
|
|
|
|
{error, unsupported_protocol_version, S}.
|
|
|
|
|
|
|
|
|
|
maybe_clean_sess(PState = #proc_state {clean_sess = false,
|
|
|
|
|
auth_state = #auth_state{vhost = VHost},
|
|
|
|
|
client_id = ClientId }) ->
|
|
|
|
|
SessionPresent = session_present(VHost, ClientId),
|
|
|
|
|
case SessionPresent of
|
|
|
|
|
false ->
|
|
|
|
|
%% ensure_queue/2 not only ensures that queue is created, but also starts consuming from it.
|
|
|
|
|
%% Let's avoid creating that queue until explicitly asked by a client.
|
|
|
|
|
%% Then publish-only clients, that connect with clean_sess=true due to some misconfiguration,
|
|
|
|
|
%% will consume less resources.
|
|
|
|
|
{ok, SessionPresent, PState};
|
|
|
|
|
true ->
|
2022-09-06 21:56:01 +08:00
|
|
|
case ensure_queue(?QOS_1, PState) of
|
|
|
|
|
{ok, _QueueName, PState1} ->
|
|
|
|
|
{ok, SessionPresent, PState1};
|
|
|
|
|
{error, access_refused} ->
|
|
|
|
|
{error, ?CONNACK_NOT_AUTHORIZED};
|
|
|
|
|
{error, _Reason} ->
|
|
|
|
|
%% Let's use most generic error return code.
|
|
|
|
|
{error, ?CONNACK_SERVER_UNAVAILABLE}
|
2022-09-01 22:02:08 +08:00
|
|
|
end
|
|
|
|
|
end;
|
|
|
|
|
maybe_clean_sess(PState = #proc_state {clean_sess = true,
|
|
|
|
|
client_id = ClientId,
|
|
|
|
|
auth_state = #auth_state{user = User,
|
|
|
|
|
username = Username,
|
|
|
|
|
vhost = VHost,
|
|
|
|
|
authz_ctx = AuthzCtx}}) ->
|
|
|
|
|
{_, QueueName} = rabbit_mqtt_util:subcription_queue_name(ClientId),
|
|
|
|
|
Queue = rabbit_misc:r(VHost, queue, QueueName),
|
|
|
|
|
case rabbit_amqqueue:exists(Queue) of
|
|
|
|
|
false ->
|
|
|
|
|
{ok, false, PState};
|
|
|
|
|
true ->
|
2022-09-06 21:56:01 +08:00
|
|
|
%% configure access to queue required for queue.delete
|
|
|
|
|
case check_resource_access(User, Queue, configure, AuthzCtx) of
|
|
|
|
|
ok ->
|
|
|
|
|
rabbit_amqqueue:with(
|
|
|
|
|
Queue,
|
|
|
|
|
fun (Q) ->
|
|
|
|
|
rabbit_queue_type:delete(Q, false, false, Username)
|
|
|
|
|
end,
|
|
|
|
|
fun (not_found) ->
|
|
|
|
|
ok;
|
|
|
|
|
({absent, Q, crashed}) ->
|
|
|
|
|
rabbit_classic_queue:delete_crashed(Q, Username);
|
|
|
|
|
({absent, Q, stopped}) ->
|
|
|
|
|
rabbit_classic_queue:delete_crashed(Q, Username);
|
|
|
|
|
({absent, _Q, _Reason}) ->
|
|
|
|
|
ok
|
|
|
|
|
end),
|
|
|
|
|
{ok, false, PState};
|
|
|
|
|
{error, access_refused} ->
|
|
|
|
|
{error, ?CONNACK_NOT_AUTHORIZED}
|
|
|
|
|
end
|
2022-09-01 22:02:08 +08:00
|
|
|
end.
|
|
|
|
|
|
2019-09-04 23:07:33 +08:00
|
|
|
hand_off_to_retainer(RetainerPid, Amqp2MqttFun, Topic0, #mqtt_msg{payload = <<"">>}) ->
|
|
|
|
|
Topic1 = Amqp2MqttFun(Topic0),
|
|
|
|
|
rabbit_mqtt_retainer:clear(RetainerPid, Topic1),
|
|
|
|
|
ok;
|
|
|
|
|
hand_off_to_retainer(RetainerPid, Amqp2MqttFun, Topic0, Msg) ->
|
|
|
|
|
Topic1 = Amqp2MqttFun(Topic0),
|
|
|
|
|
rabbit_mqtt_retainer:retain(RetainerPid, Topic1, Msg),
|
|
|
|
|
ok.
|
|
|
|
|
|
|
|
|
|
maybe_send_retained_message(RPid, #mqtt_topic{name = Topic0, qos = SubscribeQos}, MsgId,
|
|
|
|
|
#proc_state{ send_fun = SendFun,
|
|
|
|
|
amqp2mqtt_fun = Amqp2MqttFun } = PState) ->
|
|
|
|
|
Topic1 = Amqp2MqttFun(Topic0),
|
|
|
|
|
case rabbit_mqtt_retainer:fetch(RPid, Topic1) of
|
|
|
|
|
undefined -> false;
|
|
|
|
|
Msg ->
|
|
|
|
|
%% calculate effective QoS as the lower value of SUBSCRIBE frame QoS
|
|
|
|
|
%% and retained message QoS. The spec isn't super clear on this, we
|
|
|
|
|
%% do what Mosquitto does, per user feedback.
|
|
|
|
|
Qos = erlang:min(SubscribeQos, Msg#mqtt_msg.qos),
|
|
|
|
|
Id = case Qos of
|
|
|
|
|
?QOS_0 -> undefined;
|
|
|
|
|
?QOS_1 -> MsgId
|
|
|
|
|
end,
|
|
|
|
|
SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{
|
|
|
|
|
type = ?PUBLISH,
|
|
|
|
|
qos = Qos,
|
|
|
|
|
dup = false,
|
|
|
|
|
retain = Msg#mqtt_msg.retain
|
|
|
|
|
}, variable = #mqtt_frame_publish{
|
|
|
|
|
message_id = Id,
|
|
|
|
|
topic_name = Topic1
|
|
|
|
|
},
|
|
|
|
|
payload = Msg#mqtt_msg.payload}, PState),
|
|
|
|
|
case Qos of
|
|
|
|
|
?QOS_0 -> false;
|
|
|
|
|
?QOS_1 -> {true, 1}
|
|
|
|
|
end
|
|
|
|
|
end.
|
2015-04-21 19:26:46 +08:00
|
|
|
|
2023-01-20 00:23:23 +08:00
|
|
|
-spec amqp_callback(#'basic.ack'{} | {#'basic.deliver'{}, #amqp_msg{}, {pid(), pid(), pid()}}, #proc_state{}) -> {'ok', #proc_state{}} | {'error', term(), term()}.
|
2012-08-06 06:52:54 +08:00
|
|
|
amqp_callback({#'basic.deliver'{ consumer_tag = ConsumerTag,
|
|
|
|
|
delivery_tag = DeliveryTag,
|
|
|
|
|
routing_key = RoutingKey },
|
|
|
|
|
#amqp_msg{ props = #'P_basic'{ headers = Headers },
|
2014-08-11 18:26:27 +08:00
|
|
|
payload = Payload },
|
|
|
|
|
DeliveryCtx} = Delivery,
|
2012-08-21 00:30:13 +08:00
|
|
|
#proc_state{ channels = {Channel, _},
|
|
|
|
|
awaiting_ack = Awaiting,
|
2016-01-07 23:19:01 +08:00
|
|
|
message_id = MsgId,
|
2019-09-04 23:07:33 +08:00
|
|
|
send_fun = SendFun,
|
|
|
|
|
amqp2mqtt_fun = Amqp2MqttFun } = PState) ->
|
2022-03-10 17:49:03 +08:00
|
|
|
notify_received(DeliveryCtx),
|
2012-08-21 00:30:13 +08:00
|
|
|
case {delivery_dup(Delivery), delivery_qos(ConsumerTag, Headers, PState)} of
|
2012-08-17 22:05:14 +08:00
|
|
|
{true, {?QOS_0, ?QOS_1}} ->
|
2012-08-17 01:09:21 +08:00
|
|
|
amqp_channel:cast(
|
|
|
|
|
Channel, #'basic.ack'{ delivery_tag = DeliveryTag }),
|
2012-08-21 00:30:13 +08:00
|
|
|
{ok, PState};
|
2012-08-17 22:05:14 +08:00
|
|
|
{true, {?QOS_0, ?QOS_0}} ->
|
2012-08-21 00:30:13 +08:00
|
|
|
{ok, PState};
|
2012-08-06 06:52:54 +08:00
|
|
|
{Dup, {DeliveryQos, _SubQos} = Qos} ->
|
2019-09-04 23:07:33 +08:00
|
|
|
TopicName = Amqp2MqttFun(RoutingKey),
|
2016-01-07 23:19:01 +08:00
|
|
|
SendFun(
|
2012-08-06 06:52:54 +08:00
|
|
|
#mqtt_frame{ fixed = #mqtt_frame_fixed{
|
|
|
|
|
type = ?PUBLISH,
|
|
|
|
|
qos = DeliveryQos,
|
|
|
|
|
dup = Dup },
|
2012-08-21 00:30:13 +08:00
|
|
|
variable = #mqtt_frame_publish{
|
2012-08-06 06:52:54 +08:00
|
|
|
message_id =
|
|
|
|
|
case DeliveryQos of
|
|
|
|
|
?QOS_0 -> undefined;
|
|
|
|
|
?QOS_1 -> MsgId
|
|
|
|
|
end,
|
2019-09-04 23:07:33 +08:00
|
|
|
topic_name = TopicName },
|
2012-08-21 00:30:13 +08:00
|
|
|
payload = Payload}, PState),
|
2012-08-06 06:52:54 +08:00
|
|
|
case Qos of
|
|
|
|
|
{?QOS_0, ?QOS_0} ->
|
2012-08-21 00:30:13 +08:00
|
|
|
{ok, PState};
|
2012-08-06 06:52:54 +08:00
|
|
|
{?QOS_1, ?QOS_1} ->
|
2017-11-09 09:08:17 +08:00
|
|
|
Awaiting1 = gb_trees:insert(MsgId, DeliveryTag, Awaiting),
|
|
|
|
|
PState1 = PState#proc_state{ awaiting_ack = Awaiting1 },
|
|
|
|
|
PState2 = next_msg_id(PState1),
|
|
|
|
|
{ok, PState2};
|
2012-08-06 06:52:54 +08:00
|
|
|
{?QOS_0, ?QOS_1} ->
|
|
|
|
|
amqp_channel:cast(
|
2012-08-17 01:09:21 +08:00
|
|
|
Channel, #'basic.ack'{ delivery_tag = DeliveryTag }),
|
2012-08-21 00:30:13 +08:00
|
|
|
{ok, PState}
|
2012-08-06 06:52:54 +08:00
|
|
|
end
|
|
|
|
|
end;
|
|
|
|
|
|
|
|
|
|
amqp_callback(#'basic.ack'{ multiple = true, delivery_tag = Tag } = Ack,
|
2016-01-07 23:19:01 +08:00
|
|
|
PState = #proc_state{ unacked_pubs = UnackedPubs,
|
|
|
|
|
send_fun = SendFun }) ->
|
2013-12-04 22:59:22 +08:00
|
|
|
case gb_trees:size(UnackedPubs) > 0 andalso
|
|
|
|
|
gb_trees:take_smallest(UnackedPubs) of
|
2012-08-06 06:52:54 +08:00
|
|
|
{TagSmall, MsgId, UnackedPubs1} when TagSmall =< Tag ->
|
2016-01-07 23:19:01 +08:00
|
|
|
SendFun(
|
2012-08-06 06:52:54 +08:00
|
|
|
#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK },
|
|
|
|
|
variable = #mqtt_frame_publish{ message_id = MsgId }},
|
2012-08-21 00:30:13 +08:00
|
|
|
PState),
|
|
|
|
|
amqp_callback(Ack, PState #proc_state{ unacked_pubs = UnackedPubs1 });
|
2012-08-06 06:52:54 +08:00
|
|
|
_ ->
|
2012-08-21 00:30:13 +08:00
|
|
|
{ok, PState}
|
2012-08-06 06:52:54 +08:00
|
|
|
end;
|
|
|
|
|
|
|
|
|
|
amqp_callback(#'basic.ack'{ multiple = false, delivery_tag = Tag },
|
2016-01-07 23:19:01 +08:00
|
|
|
PState = #proc_state{ unacked_pubs = UnackedPubs,
|
|
|
|
|
send_fun = SendFun }) ->
|
|
|
|
|
SendFun(
|
2012-08-06 06:52:54 +08:00
|
|
|
#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK },
|
|
|
|
|
variable = #mqtt_frame_publish{
|
|
|
|
|
message_id = gb_trees:get(
|
2012-08-21 00:30:13 +08:00
|
|
|
Tag, UnackedPubs) }}, PState),
|
|
|
|
|
{ok, PState #proc_state{ unacked_pubs = gb_trees:delete(Tag, UnackedPubs) }}.
|
2012-08-06 06:52:54 +08:00
|
|
|
|
|
|
|
|
delivery_dup({#'basic.deliver'{ redelivered = Redelivered },
|
2014-08-11 18:26:27 +08:00
|
|
|
#amqp_msg{ props = #'P_basic'{ headers = Headers }},
|
|
|
|
|
_DeliveryCtx}) ->
|
2012-11-06 18:32:38 +08:00
|
|
|
case rabbit_mqtt_util:table_lookup(Headers, <<"x-mqtt-dup">>) of
|
2012-08-06 06:52:54 +08:00
|
|
|
undefined -> Redelivered;
|
|
|
|
|
{bool, Dup} -> Redelivered orelse Dup
|
|
|
|
|
end.
|
|
|
|
|
|
2017-11-09 09:08:17 +08:00
|
|
|
ensure_valid_mqtt_message_id(Id) when Id >= 16#ffff ->
|
|
|
|
|
1;
|
|
|
|
|
ensure_valid_mqtt_message_id(Id) ->
|
|
|
|
|
Id.
|
|
|
|
|
|
|
|
|
|
safe_max_id(Id0, Id1) ->
|
|
|
|
|
ensure_valid_mqtt_message_id(erlang:max(Id0, Id1)).
|
|
|
|
|
|
|
|
|
|
next_msg_id(PState = #proc_state{ message_id = MsgId0 }) ->
|
|
|
|
|
MsgId1 = ensure_valid_mqtt_message_id(MsgId0 + 1),
|
|
|
|
|
PState#proc_state{ message_id = MsgId1 }.
|
2012-08-06 06:52:54 +08:00
|
|
|
|
|
|
|
|
%% decide at which qos level to deliver based on subscription
|
2012-11-06 04:30:45 +08:00
|
|
|
%% and the message publish qos level. non-MQTT publishes are
|
|
|
|
|
%% assumed to be qos 1, regardless of delivery_mode.
|
|
|
|
|
delivery_qos(Tag, _Headers, #proc_state{ consumer_tags = {Tag, _} }) ->
|
2012-08-06 06:52:54 +08:00
|
|
|
{?QOS_0, ?QOS_0};
|
2012-11-06 04:30:45 +08:00
|
|
|
delivery_qos(Tag, Headers, #proc_state{ consumer_tags = {_, Tag} }) ->
|
2012-11-06 18:32:38 +08:00
|
|
|
case rabbit_mqtt_util:table_lookup(Headers, <<"x-mqtt-publish-qos">>) of
|
2012-11-06 04:30:45 +08:00
|
|
|
{byte, Qos} -> {lists:min([Qos, ?QOS_1]), ?QOS_1};
|
|
|
|
|
undefined -> {?QOS_1, ?QOS_1}
|
|
|
|
|
end.
|
2012-08-06 06:52:54 +08:00
|
|
|
|
2022-05-10 18:16:40 +08:00
|
|
|
session_present(VHost, ClientId) ->
|
2016-04-22 02:02:28 +08:00
|
|
|
{_, QueueQ1} = rabbit_mqtt_util:subcription_queue_name(ClientId),
|
2021-10-26 22:29:41 +08:00
|
|
|
QueueName = rabbit_misc:r(VHost, queue, QueueQ1),
|
2022-05-10 18:16:40 +08:00
|
|
|
rabbit_amqqueue:exists(QueueName).
|
2012-07-16 21:57:31 +08:00
|
|
|
|
2012-08-21 00:30:13 +08:00
|
|
|
make_will_msg(#mqtt_frame_connect{ will_flag = false }) ->
|
2012-08-06 06:52:54 +08:00
|
|
|
undefined;
|
|
|
|
|
make_will_msg(#mqtt_frame_connect{ will_retain = Retain,
|
|
|
|
|
will_qos = Qos,
|
|
|
|
|
will_topic = Topic,
|
|
|
|
|
will_msg = Msg }) ->
|
|
|
|
|
#mqtt_msg{ retain = Retain,
|
|
|
|
|
qos = Qos,
|
|
|
|
|
topic = Topic,
|
|
|
|
|
dup = false,
|
|
|
|
|
payload = Msg }.
|
2012-07-04 00:35:18 +08:00
|
|
|
|
2022-09-01 22:02:08 +08:00
|
|
|
process_login(_UserBin, _PassBin, _ClientId,
|
2022-08-05 17:16:43 +08:00
|
|
|
#proc_state{peer_addr = Addr,
|
2019-02-08 03:33:42 +08:00
|
|
|
auth_state = #auth_state{username = Username,
|
2022-08-05 17:16:43 +08:00
|
|
|
user = User,
|
|
|
|
|
vhost = VHost
|
|
|
|
|
}})
|
|
|
|
|
when Username =/= undefined, User =/= undefined, VHost =/= underfined ->
|
2019-02-08 03:33:42 +08:00
|
|
|
UsernameStr = rabbit_data_coercion:to_list(Username),
|
|
|
|
|
VHostStr = rabbit_data_coercion:to_list(VHost),
|
2020-09-22 23:57:47 +08:00
|
|
|
rabbit_core_metrics:auth_attempt_failed(list_to_binary(inet:ntoa(Addr)), Username, mqtt),
|
2022-10-08 06:59:05 +08:00
|
|
|
rabbit_log_connection:warning("MQTT detected duplicate connect/login attempt for user ~tp, vhost ~tp",
|
2019-02-08 03:33:42 +08:00
|
|
|
[UsernameStr, VHostStr]),
|
|
|
|
|
connack_dup_auth;
|
2022-09-01 22:02:08 +08:00
|
|
|
process_login(UserBin, PassBin, ClientId0,
|
|
|
|
|
#proc_state{socket = Sock,
|
2019-02-08 03:33:42 +08:00
|
|
|
ssl_login_name = SslLoginName,
|
2022-09-01 22:02:08 +08:00
|
|
|
peer_addr = Addr,
|
2022-09-02 20:24:40 +08:00
|
|
|
auth_state = undefined} = PState0) ->
|
|
|
|
|
{ok, {_PeerHost, _PeerPort, _Host, Port}} = rabbit_net:socket_ends(Sock, inbound),
|
2022-08-05 17:16:43 +08:00
|
|
|
{VHostPickedUsing, {VHost, UsernameBin}} = get_vhost(UserBin, SslLoginName, Port),
|
2021-07-12 22:50:25 +08:00
|
|
|
rabbit_log_connection:debug(
|
2022-08-05 17:16:43 +08:00
|
|
|
"MQTT vhost picked using ~s",
|
|
|
|
|
[human_readable_vhost_lookup_strategy(VHostPickedUsing)]),
|
2022-09-02 20:24:40 +08:00
|
|
|
RemoteIpAddressBin = list_to_binary(inet:ntoa(Addr)),
|
|
|
|
|
ClientId = rabbit_data_coercion:to_binary(ClientId0),
|
|
|
|
|
Input = #{vhost => VHost,
|
|
|
|
|
username_bin => UsernameBin,
|
|
|
|
|
pass_bin => PassBin,
|
|
|
|
|
client_id => ClientId},
|
|
|
|
|
case rabbit_misc:pipeline(
|
|
|
|
|
[fun check_vhost_exists/1,
|
|
|
|
|
fun check_vhost_connection_limit/1,
|
|
|
|
|
fun check_vhost_alive/1,
|
|
|
|
|
fun check_user_login/2,
|
|
|
|
|
fun check_user_connection_limit/1,
|
|
|
|
|
fun check_vhost_access/2,
|
|
|
|
|
fun check_user_loopback/2
|
|
|
|
|
],
|
|
|
|
|
Input, PState0) of
|
|
|
|
|
{ok, _Output, PState} ->
|
|
|
|
|
rabbit_core_metrics:auth_attempt_succeeded(RemoteIpAddressBin, UsernameBin, mqtt),
|
|
|
|
|
{ok, PState};
|
|
|
|
|
{error, _Reason, _PState} = Err ->
|
|
|
|
|
rabbit_core_metrics:auth_attempt_failed(RemoteIpAddressBin, UsernameBin, mqtt),
|
|
|
|
|
Err
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
check_vhost_exists(#{vhost := VHost,
|
|
|
|
|
username_bin := UsernameBin}) ->
|
|
|
|
|
case rabbit_vhost:exists(VHost) of
|
|
|
|
|
true ->
|
|
|
|
|
ok;
|
|
|
|
|
false ->
|
|
|
|
|
rabbit_log_connection:error("MQTT login failed for user '~s': virtual host '~s' does not exist",
|
|
|
|
|
[UsernameBin, VHost]),
|
|
|
|
|
{error, ?CONNACK_BAD_CREDENTIALS}
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
check_vhost_connection_limit(#{vhost := VHost,
|
|
|
|
|
username_bin := UsernameBin}) ->
|
|
|
|
|
case rabbit_vhost_limit:is_over_connection_limit(VHost) of
|
|
|
|
|
false ->
|
|
|
|
|
ok;
|
|
|
|
|
{true, Limit} ->
|
|
|
|
|
rabbit_log_connection:error(
|
|
|
|
|
"Error on MQTT connection ~p~n"
|
|
|
|
|
"access to vhost '~s' refused for user '~s': "
|
|
|
|
|
"vhost connection limit (~p) is reached",
|
|
|
|
|
[self(), VHost, UsernameBin, Limit]),
|
|
|
|
|
{error, ?CONNACK_NOT_AUTHORIZED}
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
check_vhost_alive(#{vhost := VHost,
|
|
|
|
|
username_bin := UsernameBin}) ->
|
|
|
|
|
case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of
|
|
|
|
|
true ->
|
|
|
|
|
ok;
|
|
|
|
|
false ->
|
|
|
|
|
rabbit_log_connection:error(
|
|
|
|
|
"Error on MQTT connection ~p~n"
|
|
|
|
|
"access refused for user '~s': "
|
|
|
|
|
"vhost is down",
|
|
|
|
|
[self(), UsernameBin, VHost]),
|
|
|
|
|
{error, ?CONNACK_NOT_AUTHORIZED}
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
check_user_login(#{vhost := VHost,
|
|
|
|
|
username_bin := UsernameBin,
|
|
|
|
|
pass_bin := PassBin,
|
|
|
|
|
client_id := ClientId
|
|
|
|
|
} = In, PState) ->
|
|
|
|
|
AuthProps = case PassBin of
|
|
|
|
|
none ->
|
|
|
|
|
%% SSL user name provided.
|
|
|
|
|
%% Authenticating using username only.
|
|
|
|
|
[];
|
|
|
|
|
_ ->
|
|
|
|
|
[{password, PassBin},
|
|
|
|
|
{vhost, VHost},
|
|
|
|
|
{client_id, ClientId}]
|
|
|
|
|
end,
|
|
|
|
|
case rabbit_access_control:check_user_login(
|
|
|
|
|
UsernameBin, AuthProps) of
|
|
|
|
|
{ok, User = #user{username = Username}} ->
|
|
|
|
|
notify_auth_result(Username, user_authentication_success, []),
|
|
|
|
|
{ok, maps:put(user, User, In), PState};
|
|
|
|
|
{refused, Username, Msg, Args} ->
|
|
|
|
|
rabbit_log_connection:error(
|
|
|
|
|
"Error on MQTT connection ~p~n"
|
|
|
|
|
"access refused for user '~s' in vhost '~s' "
|
|
|
|
|
++ Msg,
|
|
|
|
|
[self(), Username, VHost] ++ Args),
|
|
|
|
|
notify_auth_result(Username,
|
|
|
|
|
user_authentication_failure,
|
|
|
|
|
[{error, rabbit_misc:format(Msg, Args)}]),
|
|
|
|
|
{error, ?CONNACK_BAD_CREDENTIALS}
|
2014-02-19 01:33:00 +08:00
|
|
|
end.
|
2012-07-16 21:57:31 +08:00
|
|
|
|
2022-08-05 17:16:43 +08:00
|
|
|
notify_auth_result(Username, AuthResult, ExtraProps) ->
|
|
|
|
|
EventProps = [{connection_type, mqtt},
|
|
|
|
|
{name, case Username of none -> ''; _ -> Username end}] ++
|
|
|
|
|
ExtraProps,
|
|
|
|
|
rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']).
|
|
|
|
|
|
2022-09-02 20:24:40 +08:00
|
|
|
check_user_connection_limit(#{user := #user{username = Username}}) ->
|
|
|
|
|
case rabbit_auth_backend_internal:is_over_connection_limit(Username) of
|
|
|
|
|
false ->
|
|
|
|
|
ok;
|
|
|
|
|
{true, Limit} ->
|
|
|
|
|
rabbit_log_connection:error(
|
|
|
|
|
"Error on MQTT connection ~p~n"
|
|
|
|
|
"access refused for user '~s': "
|
|
|
|
|
"user connection limit (~p) is reached",
|
|
|
|
|
[self(), Username, Limit]),
|
|
|
|
|
{error, ?CONNACK_NOT_AUTHORIZED}
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
check_vhost_access(#{vhost := VHost,
|
|
|
|
|
client_id := ClientId,
|
|
|
|
|
user := User = #user{username = Username}
|
|
|
|
|
} = In,
|
|
|
|
|
#proc_state{peer_addr = PeerAddr} = PState) ->
|
|
|
|
|
AuthzCtx = #{<<"client_id">> => ClientId},
|
|
|
|
|
try rabbit_access_control:check_vhost_access(
|
|
|
|
|
User,
|
|
|
|
|
VHost,
|
|
|
|
|
{ip, PeerAddr},
|
|
|
|
|
AuthzCtx) of
|
|
|
|
|
ok ->
|
|
|
|
|
{ok, maps:put(authz_ctx, AuthzCtx, In), PState}
|
|
|
|
|
catch exit:#amqp_error{name = not_allowed} ->
|
|
|
|
|
rabbit_log_connection:error(
|
|
|
|
|
"Error on MQTT connection ~p~n"
|
|
|
|
|
"access refused for user '~s'",
|
|
|
|
|
[self(), Username]),
|
|
|
|
|
{error, ?CONNACK_NOT_AUTHORIZED}
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
check_user_loopback(#{vhost := VHost,
|
|
|
|
|
username_bin := UsernameBin,
|
|
|
|
|
user := User,
|
|
|
|
|
authz_ctx := AuthzCtx
|
|
|
|
|
},
|
2022-09-19 01:07:11 +08:00
|
|
|
#proc_state{peer_addr = PeerAddr} = PState) ->
|
2022-09-02 20:24:40 +08:00
|
|
|
case rabbit_access_control:check_user_loopback(UsernameBin, PeerAddr) of
|
|
|
|
|
ok ->
|
|
|
|
|
AuthState = #auth_state{user = User,
|
|
|
|
|
username = UsernameBin,
|
|
|
|
|
vhost = VHost,
|
|
|
|
|
authz_ctx = AuthzCtx},
|
|
|
|
|
{ok, PState#proc_state{auth_state = AuthState}};
|
|
|
|
|
not_allowed ->
|
|
|
|
|
rabbit_log_connection:warning(
|
2022-09-06 21:56:01 +08:00
|
|
|
"MQTT login failed: user '~s' can only connect via localhost",
|
2022-09-02 20:24:40 +08:00
|
|
|
[binary_to_list(UsernameBin)]),
|
|
|
|
|
{error, ?CONNACK_NOT_AUTHORIZED}
|
|
|
|
|
end.
|
|
|
|
|
|
2016-12-19 22:00:43 +08:00
|
|
|
get_vhost(UserBin, none, Port) ->
|
|
|
|
|
get_vhost_no_ssl(UserBin, Port);
|
|
|
|
|
get_vhost(UserBin, undefined, Port) ->
|
|
|
|
|
get_vhost_no_ssl(UserBin, Port);
|
|
|
|
|
get_vhost(UserBin, SslLogin, Port) ->
|
|
|
|
|
get_vhost_ssl(UserBin, SslLogin, Port).
|
|
|
|
|
|
|
|
|
|
get_vhost_no_ssl(UserBin, Port) ->
|
|
|
|
|
case vhost_in_username(UserBin) of
|
|
|
|
|
true ->
|
|
|
|
|
{vhost_in_username_or_default, get_vhost_username(UserBin)};
|
|
|
|
|
false ->
|
|
|
|
|
PortVirtualHostMapping = rabbit_runtime_parameters:value_global(
|
|
|
|
|
mqtt_port_to_vhost_mapping
|
2016-12-13 21:46:15 +08:00
|
|
|
),
|
2016-12-19 22:00:43 +08:00
|
|
|
case get_vhost_from_port_mapping(Port, PortVirtualHostMapping) of
|
2016-12-13 21:46:15 +08:00
|
|
|
undefined ->
|
2016-12-19 22:00:43 +08:00
|
|
|
{default_vhost, {rabbit_mqtt_util:env(vhost), UserBin}};
|
2016-12-09 23:48:47 +08:00
|
|
|
VHost ->
|
2016-12-19 22:00:43 +08:00
|
|
|
{port_to_vhost_mapping, {VHost, UserBin}}
|
|
|
|
|
end
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
get_vhost_ssl(UserBin, SslLoginName, Port) ->
|
|
|
|
|
UserVirtualHostMapping = rabbit_runtime_parameters:value_global(
|
|
|
|
|
mqtt_default_vhosts
|
|
|
|
|
),
|
|
|
|
|
case get_vhost_from_user_mapping(SslLoginName, UserVirtualHostMapping) of
|
|
|
|
|
undefined ->
|
|
|
|
|
PortVirtualHostMapping = rabbit_runtime_parameters:value_global(
|
|
|
|
|
mqtt_port_to_vhost_mapping
|
|
|
|
|
),
|
|
|
|
|
case get_vhost_from_port_mapping(Port, PortVirtualHostMapping) of
|
|
|
|
|
undefined ->
|
|
|
|
|
{vhost_in_username_or_default, get_vhost_username(UserBin)};
|
|
|
|
|
VHostFromPortMapping ->
|
|
|
|
|
{port_to_vhost_mapping, {VHostFromPortMapping, UserBin}}
|
|
|
|
|
end;
|
|
|
|
|
VHostFromCertMapping ->
|
|
|
|
|
{cert_to_vhost_mapping, {VHostFromCertMapping, UserBin}}
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
vhost_in_username(UserBin) ->
|
|
|
|
|
case application:get_env(?APP, ignore_colons_in_username) of
|
|
|
|
|
{ok, true} -> false;
|
|
|
|
|
_ ->
|
|
|
|
|
%% split at the last colon, disallowing colons in username
|
|
|
|
|
case re:split(UserBin, ":(?!.*?:)") of
|
|
|
|
|
[_, _] -> true;
|
|
|
|
|
[UserBin] -> false
|
2016-12-09 23:48:47 +08:00
|
|
|
end
|
|
|
|
|
end.
|
|
|
|
|
|
2013-11-18 20:00:47 +08:00
|
|
|
get_vhost_username(UserBin) ->
|
2015-11-19 01:38:35 +08:00
|
|
|
Default = {rabbit_mqtt_util:env(vhost), UserBin},
|
|
|
|
|
case application:get_env(?APP, ignore_colons_in_username) of
|
|
|
|
|
{ok, true} -> Default;
|
|
|
|
|
_ ->
|
|
|
|
|
%% split at the last colon, disallowing colons in username
|
|
|
|
|
case re:split(UserBin, ":(?!.*?:)") of
|
|
|
|
|
[Vhost, UserName] -> {Vhost, UserName};
|
|
|
|
|
[UserBin] -> Default
|
|
|
|
|
end
|
2013-11-18 20:00:47 +08:00
|
|
|
end.
|
2012-07-16 21:57:31 +08:00
|
|
|
|
2016-12-19 22:00:43 +08:00
|
|
|
get_vhost_from_user_mapping(_User, not_found) ->
|
|
|
|
|
undefined;
|
|
|
|
|
get_vhost_from_user_mapping(User, Mapping) ->
|
2018-01-03 03:29:24 +08:00
|
|
|
M = rabbit_data_coercion:to_proplist(Mapping),
|
|
|
|
|
case rabbit_misc:pget(User, M) of
|
2016-12-19 22:00:43 +08:00
|
|
|
undefined ->
|
2016-12-13 21:46:15 +08:00
|
|
|
undefined;
|
2016-12-19 22:00:43 +08:00
|
|
|
VHost ->
|
|
|
|
|
VHost
|
2016-12-13 21:46:15 +08:00
|
|
|
end.
|
|
|
|
|
|
2016-12-19 22:00:43 +08:00
|
|
|
get_vhost_from_port_mapping(_Port, not_found) ->
|
|
|
|
|
undefined;
|
|
|
|
|
get_vhost_from_port_mapping(Port, Mapping) ->
|
2018-01-03 03:29:24 +08:00
|
|
|
M = rabbit_data_coercion:to_proplist(Mapping),
|
|
|
|
|
Res = case rabbit_misc:pget(rabbit_data_coercion:to_binary(Port), M) of
|
2016-12-19 22:00:43 +08:00
|
|
|
undefined ->
|
|
|
|
|
undefined;
|
|
|
|
|
VHost ->
|
|
|
|
|
VHost
|
|
|
|
|
end,
|
|
|
|
|
Res.
|
2016-12-13 21:46:15 +08:00
|
|
|
|
2016-12-20 05:42:06 +08:00
|
|
|
human_readable_vhost_lookup_strategy(vhost_in_username_or_default) ->
|
|
|
|
|
"vhost in username or default";
|
|
|
|
|
human_readable_vhost_lookup_strategy(port_to_vhost_mapping) ->
|
|
|
|
|
"MQTT port to vhost mapping";
|
|
|
|
|
human_readable_vhost_lookup_strategy(cert_to_vhost_mapping) ->
|
|
|
|
|
"client certificate to vhost mapping";
|
|
|
|
|
human_readable_vhost_lookup_strategy(default_vhost) ->
|
|
|
|
|
"plugin configuration or default";
|
|
|
|
|
human_readable_vhost_lookup_strategy(Val) ->
|
|
|
|
|
atom_to_list(Val).
|
2016-12-13 21:46:15 +08:00
|
|
|
|
2014-11-21 22:54:33 +08:00
|
|
|
creds(User, Pass, SSLLoginName) ->
|
2014-11-28 23:08:42 +08:00
|
|
|
DefaultUser = rabbit_mqtt_util:env(default_user),
|
|
|
|
|
DefaultPass = rabbit_mqtt_util:env(default_pass),
|
|
|
|
|
{ok, Anon} = application:get_env(?APP, allow_anonymous),
|
|
|
|
|
{ok, TLSAuth} = application:get_env(?APP, ssl_cert_login),
|
2016-09-01 23:54:51 +08:00
|
|
|
HaveDefaultCreds = Anon =:= true andalso
|
|
|
|
|
is_binary(DefaultUser) andalso
|
|
|
|
|
is_binary(DefaultPass),
|
|
|
|
|
|
|
|
|
|
CredentialsProvided = User =/= undefined orelse
|
|
|
|
|
Pass =/= undefined,
|
|
|
|
|
|
|
|
|
|
CorrectCredentials = is_list(User) andalso
|
|
|
|
|
is_list(Pass),
|
|
|
|
|
|
|
|
|
|
SSLLoginProvided = TLSAuth =:= true andalso
|
|
|
|
|
SSLLoginName =/= none,
|
|
|
|
|
|
|
|
|
|
case {CredentialsProvided, CorrectCredentials, SSLLoginProvided, HaveDefaultCreds} of
|
2016-09-02 06:33:34 +08:00
|
|
|
%% Username and password take priority
|
2016-09-01 23:54:51 +08:00
|
|
|
{true, true, _, _} -> {list_to_binary(User),
|
|
|
|
|
list_to_binary(Pass)};
|
|
|
|
|
%% Either username or password is provided
|
2016-09-02 06:33:34 +08:00
|
|
|
{true, false, _, _} -> {invalid_creds, {User, Pass}};
|
2016-09-01 23:54:51 +08:00
|
|
|
%% rabbitmq_mqtt.ssl_cert_login is true. SSL user name provided.
|
2016-09-02 06:33:34 +08:00
|
|
|
%% Authenticating using username only.
|
2016-09-01 23:54:51 +08:00
|
|
|
{false, false, true, _} -> {SSLLoginName, none};
|
2016-09-02 06:33:34 +08:00
|
|
|
%% Anonymous connection uses default credentials
|
2016-09-01 23:54:51 +08:00
|
|
|
{false, false, false, true} -> {DefaultUser, DefaultPass};
|
|
|
|
|
_ -> nocreds
|
2012-07-16 21:57:31 +08:00
|
|
|
end.
|
2012-07-04 00:35:18 +08:00
|
|
|
|
2012-08-06 06:52:54 +08:00
|
|
|
supported_subs_qos(?QOS_0) -> ?QOS_0;
|
|
|
|
|
supported_subs_qos(?QOS_1) -> ?QOS_1;
|
|
|
|
|
supported_subs_qos(?QOS_2) -> ?QOS_1.
|
|
|
|
|
|
2014-01-30 23:33:59 +08:00
|
|
|
delivery_mode(?QOS_0) -> 1;
|
2019-11-06 00:54:20 +08:00
|
|
|
delivery_mode(?QOS_1) -> 2;
|
|
|
|
|
delivery_mode(?QOS_2) -> 2.
|
2014-01-30 23:33:59 +08:00
|
|
|
|
2022-03-10 17:49:03 +08:00
|
|
|
maybe_quorum(Qos1Args, CleanSession, Queue) ->
|
2022-03-31 23:48:00 +08:00
|
|
|
case {rabbit_mqtt_util:env(durable_queue_type), CleanSession} of
|
2022-03-10 17:49:03 +08:00
|
|
|
%% it is possible to Quorum queues only if Clean Session == False
|
|
|
|
|
%% else always use Classic queues
|
|
|
|
|
%% Clean Session == True sets auto-delete to True and quorum queues
|
|
|
|
|
%% does not support auto-delete flag
|
|
|
|
|
{quorum, false} -> lists:append(Qos1Args,
|
|
|
|
|
[{<<"x-queue-type">>, longstr, <<"quorum">>}]);
|
|
|
|
|
|
|
|
|
|
{quorum, true} ->
|
2022-10-08 06:59:05 +08:00
|
|
|
rabbit_log:debug("Can't use quorum queue for ~ts. " ++
|
2022-03-10 17:49:03 +08:00
|
|
|
"The clean session is true. Classic queue will be used", [Queue]),
|
|
|
|
|
Qos1Args;
|
|
|
|
|
_ -> Qos1Args
|
|
|
|
|
end.
|
|
|
|
|
|
2012-08-06 06:52:54 +08:00
|
|
|
%% different qos subscriptions are received in different queues
|
|
|
|
|
%% with appropriate durability and timeout arguments
|
|
|
|
|
%% this will lead to duplicate messages for overlapping subscriptions
|
|
|
|
|
%% with different qos values - todo: prevent duplicates
|
2022-08-13 21:28:30 +08:00
|
|
|
%ensure_queue(Qos, #proc_state{ channels = {Channel, _},
|
|
|
|
|
% client_id = ClientId,
|
|
|
|
|
% clean_sess = CleanSess,
|
|
|
|
|
% consumer_tags = {TagQ0, TagQ1} = Tags} = PState) ->
|
|
|
|
|
% {QueueQ0, QueueQ1} = rabbit_mqtt_util:subcription_queue_name(ClientId),
|
|
|
|
|
% Qos1Args = case {rabbit_mqtt_util:env(subscription_ttl), CleanSess} of
|
|
|
|
|
% {undefined, _} ->
|
|
|
|
|
% [];
|
|
|
|
|
% {Ms, false} when is_integer(Ms) ->
|
|
|
|
|
% [{<<"x-expires">>, long, Ms}];
|
|
|
|
|
% _ ->
|
|
|
|
|
% []
|
|
|
|
|
% end,
|
|
|
|
|
% QueueSetup =
|
|
|
|
|
% case {TagQ0, TagQ1, Qos} of
|
|
|
|
|
% {undefined, _, ?QOS_0} ->
|
|
|
|
|
% {QueueQ0,
|
|
|
|
|
% #'queue.declare'{ queue = QueueQ0,
|
|
|
|
|
% durable = false,
|
|
|
|
|
% auto_delete = true },
|
|
|
|
|
% #'basic.consume'{ queue = QueueQ0,
|
|
|
|
|
% no_ack = true }};
|
|
|
|
|
% {_, undefined, ?QOS_1} ->
|
|
|
|
|
% {QueueQ1,
|
|
|
|
|
% #'queue.declare'{ queue = QueueQ1,
|
|
|
|
|
% durable = true,
|
|
|
|
|
% %% Clean session means a transient connection,
|
|
|
|
|
% %% translating into auto-delete.
|
|
|
|
|
% %%
|
|
|
|
|
% %% see rabbitmq/rabbitmq-mqtt#37
|
|
|
|
|
% auto_delete = CleanSess,
|
|
|
|
|
% arguments = maybe_quorum(Qos1Args, CleanSess, QueueQ1)},
|
|
|
|
|
% #'basic.consume'{ queue = QueueQ1,
|
|
|
|
|
% no_ack = false }};
|
|
|
|
|
% {_, _, ?QOS_0} ->
|
|
|
|
|
% {exists, QueueQ0};
|
|
|
|
|
% {_, _, ?QOS_1} ->
|
|
|
|
|
% {exists, QueueQ1}
|
|
|
|
|
% end,
|
|
|
|
|
% case QueueSetup of
|
|
|
|
|
% {Queue, Declare, Consume} ->
|
|
|
|
|
% #'queue.declare_ok'{} = amqp_channel:call(Channel, Declare),
|
|
|
|
|
% #'basic.consume_ok'{ consumer_tag = Tag } =
|
|
|
|
|
% amqp_channel:call(Channel, Consume),
|
|
|
|
|
% {Queue, PState #proc_state{ consumer_tags = setelement(Qos+1, Tags, Tag) }};
|
|
|
|
|
% {exists, Q} ->
|
|
|
|
|
% {Q, PState}
|
|
|
|
|
% end.
|
|
|
|
|
|
|
|
|
|
ensure_queue(?QOS_0, %% spike handles only QoS0
|
|
|
|
|
#proc_state{
|
|
|
|
|
client_id = ClientId,
|
|
|
|
|
clean_sess = _CleanSess,
|
|
|
|
|
queue_states = QueueStates0,
|
|
|
|
|
auth_state = #auth_state{
|
|
|
|
|
vhost = VHost,
|
|
|
|
|
user = User = #user{username = Username},
|
|
|
|
|
authz_ctx = AuthzCtx},
|
|
|
|
|
info = #info{prefetch = Prefetch}
|
|
|
|
|
} = PState0) ->
|
|
|
|
|
{QueueBin, _QueueQos1Bin} = rabbit_mqtt_util:subcription_queue_name(ClientId),
|
|
|
|
|
QueueName = rabbit_misc:r(VHost, queue, QueueBin),
|
|
|
|
|
case rabbit_amqqueue:exists(QueueName) of
|
|
|
|
|
true ->
|
2022-09-06 21:56:01 +08:00
|
|
|
{ok, QueueName, PState0};
|
2022-08-13 21:28:30 +08:00
|
|
|
false ->
|
2022-09-06 21:56:01 +08:00
|
|
|
%% read access to queue required for basic.consume
|
|
|
|
|
case check_resource_access(User, QueueName, read, AuthzCtx) of
|
|
|
|
|
ok ->
|
|
|
|
|
%% configure access to queue required for queue.declare
|
|
|
|
|
case check_resource_access(User, QueueName, configure, AuthzCtx) of
|
|
|
|
|
ok ->
|
|
|
|
|
rabbit_core_metrics:queue_declared(QueueName),
|
|
|
|
|
Durable = false,
|
|
|
|
|
AutoDelete = true,
|
|
|
|
|
case rabbit_amqqueue:with(
|
|
|
|
|
QueueName,
|
|
|
|
|
fun (Q) -> ok = rabbit_amqqueue:assert_equivalence(
|
|
|
|
|
Q, Durable, AutoDelete, [], none),
|
|
|
|
|
rabbit_amqqueue:stat(Q)
|
|
|
|
|
end) of
|
|
|
|
|
{error, not_found} ->
|
|
|
|
|
case rabbit_vhost_limit:is_over_queue_limit(VHost) of
|
|
|
|
|
false ->
|
|
|
|
|
case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
|
|
|
|
|
[], none, Username) of
|
|
|
|
|
{new, Q} when ?is_amqqueue(Q) ->
|
|
|
|
|
rabbit_core_metrics:queue_created(QueueName),
|
|
|
|
|
Spec = #{no_ack => true,
|
|
|
|
|
channel_pid => self(),
|
|
|
|
|
limiter_pid => none,
|
|
|
|
|
limiter_active => false,
|
|
|
|
|
prefetch_count => Prefetch,
|
|
|
|
|
consumer_tag => ?CONSUMER_TAG,
|
|
|
|
|
exclusive_consume => false,
|
|
|
|
|
args => [],
|
|
|
|
|
ok_msg => undefined,
|
|
|
|
|
acting_user => Username},
|
|
|
|
|
case rabbit_queue_type:consume(Q, Spec, QueueStates0) of
|
|
|
|
|
{ok, QueueStates, _Actions = []} ->
|
|
|
|
|
% rabbit_global_counters:consumer_created(mqtt),
|
|
|
|
|
PState = PState0#proc_state{queue_states = QueueStates},
|
|
|
|
|
{ok, QueueName, PState};
|
|
|
|
|
Other ->
|
|
|
|
|
rabbit_log:error("Failed to consume from ~s: ~p",
|
|
|
|
|
[rabbit_misc:rs(QueueName), Other]),
|
|
|
|
|
{error, queue_consume}
|
|
|
|
|
end;
|
|
|
|
|
Other ->
|
|
|
|
|
rabbit_log:error("Failed to declare ~s: ~p",
|
|
|
|
|
[rabbit_misc:rs(QueueName), Other]),
|
|
|
|
|
{error, queue_declare}
|
|
|
|
|
end;
|
|
|
|
|
{true, Limit} ->
|
|
|
|
|
rabbit_log:error("cannot declare ~s because "
|
|
|
|
|
"queue limit ~p in vhost '~s' is reached",
|
|
|
|
|
[rabbit_misc:rs(QueueName), Limit, VHost]),
|
|
|
|
|
{error, access_refused}
|
2022-08-13 21:28:30 +08:00
|
|
|
end;
|
|
|
|
|
Other ->
|
2022-09-06 21:56:01 +08:00
|
|
|
rabbit_log:error("Expected ~s to not exist, got: ~p",
|
|
|
|
|
[rabbit_misc:rs(QueueName), Other]),
|
|
|
|
|
{error, queue_access}
|
2022-08-13 21:28:30 +08:00
|
|
|
end;
|
2022-09-06 21:56:01 +08:00
|
|
|
{error, access_refused} = E ->
|
|
|
|
|
E
|
2022-08-13 21:28:30 +08:00
|
|
|
end;
|
2022-09-06 21:56:01 +08:00
|
|
|
{error, access_refused} = E ->
|
|
|
|
|
E
|
2022-08-13 21:28:30 +08:00
|
|
|
end
|
2012-08-06 06:52:54 +08:00
|
|
|
end.
|
|
|
|
|
|
2022-09-06 21:56:01 +08:00
|
|
|
bind(QueueName, TopicName, PState) ->
|
|
|
|
|
rabbit_misc:pipeline(
|
|
|
|
|
[fun bind_check_queue_write_access/2,
|
|
|
|
|
fun bind_check_exchange_read_access/2,
|
|
|
|
|
fun bind_check_topic_access/2,
|
|
|
|
|
fun bind_add/2
|
|
|
|
|
], {QueueName, TopicName}, PState).
|
|
|
|
|
|
|
|
|
|
bind_check_queue_write_access(
|
|
|
|
|
{QueueName, _TopicName},
|
|
|
|
|
#proc_state{auth_state = #auth_state{
|
|
|
|
|
user = User,
|
|
|
|
|
authz_ctx = AuthzCtx}}) ->
|
|
|
|
|
%% write access to queue required for queue.bind
|
|
|
|
|
check_resource_access(User, QueueName, write, AuthzCtx).
|
|
|
|
|
|
|
|
|
|
bind_check_exchange_read_access(
|
|
|
|
|
{_QueueName, _TopicName},
|
|
|
|
|
#proc_state{exchange = ExchangeName,
|
|
|
|
|
auth_state = #auth_state{
|
|
|
|
|
user = User,
|
|
|
|
|
authz_ctx = AuthzCtx}}) ->
|
|
|
|
|
%% read access to exchange required for queue.bind
|
|
|
|
|
check_resource_access(User, ExchangeName, read, AuthzCtx).
|
|
|
|
|
|
|
|
|
|
bind_check_topic_access( {_QueueName, TopicName}, PState) ->
|
|
|
|
|
check_topic_access(TopicName, read, PState).
|
|
|
|
|
|
|
|
|
|
bind_add(
|
|
|
|
|
{QueueName, TopicName},
|
|
|
|
|
#proc_state{exchange = ExchangeName,
|
|
|
|
|
auth_state = #auth_state{
|
|
|
|
|
user = #user{username = Username}},
|
|
|
|
|
mqtt2amqp_fun = Mqtt2AmqpFun}) ->
|
2022-08-13 21:28:30 +08:00
|
|
|
RoutingKey = Mqtt2AmqpFun(TopicName),
|
|
|
|
|
Binding = #binding{source = ExchangeName,
|
|
|
|
|
destination = QueueName,
|
|
|
|
|
key = RoutingKey},
|
2022-09-06 21:56:01 +08:00
|
|
|
rabbit_binding:add(Binding, Username).
|
2022-08-13 21:28:30 +08:00
|
|
|
|
2016-12-07 18:21:47 +08:00
|
|
|
send_will(PState = #proc_state{will_msg = undefined}) ->
|
|
|
|
|
PState;
|
|
|
|
|
|
2017-08-11 18:29:34 +08:00
|
|
|
send_will(PState = #proc_state{will_msg = WillMsg = #mqtt_msg{retain = Retain,
|
|
|
|
|
topic = Topic},
|
|
|
|
|
retainer_pid = RPid,
|
2019-09-04 23:07:33 +08:00
|
|
|
channels = {ChQos0, ChQos1},
|
|
|
|
|
amqp2mqtt_fun = Amqp2MqttFun}) ->
|
2017-02-21 23:13:07 +08:00
|
|
|
case check_topic_access(Topic, write, PState) of
|
|
|
|
|
ok ->
|
|
|
|
|
amqp_pub(WillMsg, PState),
|
|
|
|
|
case Retain of
|
|
|
|
|
false -> ok;
|
2019-09-04 23:07:33 +08:00
|
|
|
true ->
|
|
|
|
|
hand_off_to_retainer(RPid, Amqp2MqttFun, Topic, WillMsg)
|
2017-02-21 23:13:07 +08:00
|
|
|
end;
|
|
|
|
|
Error ->
|
|
|
|
|
rabbit_log:warning(
|
2022-10-08 06:59:05 +08:00
|
|
|
"Could not send last will: ~tp",
|
2017-02-21 23:13:07 +08:00
|
|
|
[Error])
|
2016-12-07 18:21:47 +08:00
|
|
|
end,
|
2017-08-11 18:29:34 +08:00
|
|
|
case ChQos1 of
|
|
|
|
|
undefined -> ok;
|
|
|
|
|
_ -> amqp_channel:close(ChQos1)
|
|
|
|
|
end,
|
|
|
|
|
case ChQos0 of
|
|
|
|
|
undefined -> ok;
|
|
|
|
|
_ -> amqp_channel:close(ChQos0)
|
|
|
|
|
end,
|
|
|
|
|
PState #proc_state{ channels = {undefined, undefined} }.
|
2012-08-21 00:30:13 +08:00
|
|
|
|
|
|
|
|
amqp_pub(undefined, PState) ->
|
|
|
|
|
PState;
|
2022-08-12 21:15:23 +08:00
|
|
|
amqp_pub(#mqtt_msg{qos = Qos,
|
|
|
|
|
topic = Topic,
|
|
|
|
|
dup = Dup,
|
2022-08-13 21:28:30 +08:00
|
|
|
message_id = _MessageId, %% spike handles only QoS0
|
2022-08-12 21:15:23 +08:00
|
|
|
payload = Payload},
|
|
|
|
|
PState = #proc_state{exchange = ExchangeName,
|
|
|
|
|
% unacked_pubs = UnackedPubs,
|
|
|
|
|
% awaiting_seqno = SeqNo,
|
|
|
|
|
mqtt2amqp_fun = Mqtt2AmqpFun}) ->
|
2019-09-04 23:07:33 +08:00
|
|
|
RoutingKey = Mqtt2AmqpFun(Topic),
|
2022-08-12 21:15:23 +08:00
|
|
|
Confirm = Qos > ?QOS_0,
|
2012-11-06 18:32:38 +08:00
|
|
|
Headers = [{<<"x-mqtt-publish-qos">>, byte, Qos},
|
|
|
|
|
{<<"x-mqtt-dup">>, bool, Dup}],
|
2022-08-12 21:15:23 +08:00
|
|
|
Props = #'P_basic'{
|
|
|
|
|
headers = Headers,
|
|
|
|
|
delivery_mode = delivery_mode(Qos)},
|
|
|
|
|
{ClassId, _MethodId} = rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
|
|
|
|
|
Content = #content{
|
|
|
|
|
class_id = ClassId,
|
|
|
|
|
properties = Props,
|
|
|
|
|
properties_bin = none,
|
|
|
|
|
protocol = none,
|
|
|
|
|
payload_fragments_rev = [Payload]
|
|
|
|
|
},
|
|
|
|
|
BasicMessage = #basic_message{
|
|
|
|
|
exchange_name = ExchangeName,
|
|
|
|
|
routing_keys = [RoutingKey],
|
|
|
|
|
content = Content,
|
|
|
|
|
id = <<>>,
|
|
|
|
|
is_persistent = Confirm
|
|
|
|
|
},
|
|
|
|
|
Delivery = #delivery{
|
|
|
|
|
mandatory = false,
|
|
|
|
|
confirm = Confirm,
|
|
|
|
|
sender = self(),
|
|
|
|
|
message = BasicMessage,
|
2022-08-13 21:28:30 +08:00
|
|
|
msg_seq_no = undefined, %% spike handles only QoS0
|
2022-08-12 21:15:23 +08:00
|
|
|
flow = noflow %%TODO enable flow control
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
|
|
|
|
|
QNames = rabbit_exchange:route(Exchange, Delivery),
|
|
|
|
|
deliver_to_queues(Delivery, QNames, PState).
|
|
|
|
|
|
|
|
|
|
deliver_to_queues(#delivery{confirm = false},
|
|
|
|
|
_RoutedToQueueNames = [],
|
|
|
|
|
PState) ->
|
|
|
|
|
% rabbit_global_counters:messages_unroutable_dropped(mqtt, 1),
|
|
|
|
|
PState;
|
|
|
|
|
deliver_to_queues(Delivery = #delivery{message = _Message = #basic_message{exchange_name = _XName},
|
|
|
|
|
confirm = _Confirm,
|
|
|
|
|
msg_seq_no = _MsgSeqNo},
|
|
|
|
|
RoutedToQueueNames,
|
|
|
|
|
PState = #proc_state{queue_states = QueueStates0}) ->
|
|
|
|
|
Qs0 = rabbit_amqqueue:lookup(RoutedToQueueNames),
|
|
|
|
|
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
|
|
|
|
|
% QueueNames = lists:map(fun amqqueue:get_name/1, Qs),
|
|
|
|
|
|
|
|
|
|
{ok, QueueStates, _Actions} = rabbit_queue_type:deliver(Qs, Delivery, QueueStates0),
|
|
|
|
|
% rabbit_global_counters:messages_routed(mqtt, length(Qs)),
|
|
|
|
|
|
|
|
|
|
%% NB: the order here is important since basic.returns must be
|
|
|
|
|
%% sent before confirms.
|
|
|
|
|
%% TODO: AMQP 0.9.1 mandatory flag corresponds to MQTT 5 PUBACK reason code "No matching subscribers"
|
|
|
|
|
% ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
|
2022-08-13 21:28:30 +08:00
|
|
|
%% spike handles only QoS0
|
2022-08-12 21:15:23 +08:00
|
|
|
% State1 = process_routing_confirm(Confirm, QueueNames,
|
|
|
|
|
% MsgSeqNo, XName, State0),
|
|
|
|
|
|
|
|
|
|
%% Actions must be processed after registering confirms as actions may
|
|
|
|
|
%% contain rejections of publishes
|
|
|
|
|
%% TODO handle Actions: For example if the messages is rejected, MQTT 5 allows to send a NACK
|
|
|
|
|
%% back to the client (via PUBACK Reason Code).
|
|
|
|
|
% State = handle_queue_actions(Actions, State1#ch{queue_states = QueueStates}),
|
|
|
|
|
PState#proc_state{queue_states = QueueStates}.
|
2014-08-25 17:43:11 +08:00
|
|
|
|
2022-09-06 21:56:01 +08:00
|
|
|
serialise_and_send_to_client(Frame, #proc_state{proto_ver = ProtoVer, socket = Sock }) ->
|
2022-08-13 21:28:30 +08:00
|
|
|
%%TODO Test sending large frames at high speed: Will we need garbage collection as done
|
|
|
|
|
%% in rabbit_writer:maybe_gc_large_msg()?
|
2022-09-06 21:56:01 +08:00
|
|
|
try rabbit_net:port_command(Sock, rabbit_mqtt_frame:serialise(Frame, ProtoVer)) of
|
2019-02-05 07:49:36 +08:00
|
|
|
Res ->
|
|
|
|
|
Res
|
|
|
|
|
catch _:Error ->
|
|
|
|
|
rabbit_log_connection:error("MQTT: a socket write failed, the socket might already be closed"),
|
2022-10-08 06:59:05 +08:00
|
|
|
rabbit_log_connection:debug("Failed to write to socket ~tp, error: ~tp, frame: ~tp",
|
2019-02-05 07:49:36 +08:00
|
|
|
[Sock, Error, Frame])
|
|
|
|
|
end.
|
2012-08-21 00:30:13 +08:00
|
|
|
|
|
|
|
|
close_connection(PState = #proc_state{ connection = undefined }) ->
|
|
|
|
|
PState;
|
2013-09-16 18:36:52 +08:00
|
|
|
close_connection(PState = #proc_state{ connection = Connection,
|
|
|
|
|
client_id = ClientId }) ->
|
|
|
|
|
% todo: maybe clean session
|
|
|
|
|
case ClientId of
|
|
|
|
|
undefined -> ok;
|
2020-02-22 02:42:39 +08:00
|
|
|
_ ->
|
|
|
|
|
case rabbit_mqtt_collector:unregister(ClientId, self()) of
|
|
|
|
|
ok -> ok;
|
|
|
|
|
%% ignore as we are shutting down
|
|
|
|
|
{timeout, _} -> ok
|
|
|
|
|
end
|
2013-09-16 18:36:52 +08:00
|
|
|
end,
|
2019-07-29 21:59:19 +08:00
|
|
|
%% ignore noproc or other exceptions, we are shutting down
|
2012-08-21 00:30:13 +08:00
|
|
|
catch amqp_connection:close(Connection),
|
|
|
|
|
PState #proc_state{ channels = {undefined, undefined},
|
|
|
|
|
connection = undefined }.
|
2015-06-30 16:08:26 +08:00
|
|
|
|
2019-10-30 21:32:46 +08:00
|
|
|
handle_pre_hibernate() ->
|
2022-08-13 21:28:30 +08:00
|
|
|
erase(permission_cache),
|
2022-09-06 21:56:01 +08:00
|
|
|
erase(topic_permission_cache),
|
2019-10-30 21:32:46 +08:00
|
|
|
ok.
|
|
|
|
|
|
2020-02-11 01:05:43 +08:00
|
|
|
handle_ra_event({applied, [{Corr, ok}]},
|
|
|
|
|
PState = #proc_state{register_state = {pending, Corr}}) ->
|
|
|
|
|
%% success case - command was applied transition into registered state
|
|
|
|
|
PState#proc_state{register_state = registered};
|
|
|
|
|
handle_ra_event({not_leader, Leader, Corr},
|
|
|
|
|
PState = #proc_state{register_state = {pending, Corr},
|
|
|
|
|
client_id = ClientId}) ->
|
|
|
|
|
%% retry command against actual leader
|
|
|
|
|
{ok, NewCorr} = rabbit_mqtt_collector:register(Leader, ClientId, self()),
|
|
|
|
|
PState#proc_state{register_state = {pending, NewCorr}};
|
|
|
|
|
handle_ra_event(register_timeout,
|
|
|
|
|
PState = #proc_state{register_state = {pending, _Corr},
|
|
|
|
|
client_id = ClientId}) ->
|
|
|
|
|
{ok, NewCorr} = rabbit_mqtt_collector:register(ClientId, self()),
|
|
|
|
|
PState#proc_state{register_state = {pending, NewCorr}};
|
|
|
|
|
handle_ra_event(register_timeout, PState) ->
|
|
|
|
|
PState;
|
|
|
|
|
handle_ra_event(Evt, PState) ->
|
|
|
|
|
%% log these?
|
2021-03-11 21:45:32 +08:00
|
|
|
rabbit_log:debug("unhandled ra_event: ~w ", [Evt]),
|
2020-02-11 01:05:43 +08:00
|
|
|
PState.
|
|
|
|
|
|
2022-08-13 21:28:30 +08:00
|
|
|
handle_down({'DOWN', _MRef, process, QPid, Reason},
|
|
|
|
|
PState0 = #proc_state{queue_states = QStates0}) ->
|
|
|
|
|
%% spike handles only QoS0
|
|
|
|
|
case rabbit_queue_type:handle_down(QPid, Reason, QStates0) of
|
|
|
|
|
{ok, QStates1, Actions} ->
|
|
|
|
|
PState = PState0#proc_state{queue_states = QStates1},
|
|
|
|
|
handle_queue_actions(Actions, PState);
|
|
|
|
|
{eol, QStates1, QRef} ->
|
|
|
|
|
QStates = rabbit_queue_type:remove(QRef, QStates1),
|
|
|
|
|
PState0#proc_state{queue_states = QStates}
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
handle_queue_event({queue_event, QRef, Evt},
|
|
|
|
|
PState0 = #proc_state{queue_states = QueueStates0}) ->
|
|
|
|
|
case rabbit_queue_type:handle_event(QRef, Evt, QueueStates0) of
|
|
|
|
|
{ok, QueueStates, Actions} ->
|
|
|
|
|
PState1 = PState0#proc_state{queue_states = QueueStates},
|
|
|
|
|
PState = handle_queue_actions(Actions, PState1),
|
|
|
|
|
{ok, PState};
|
|
|
|
|
eol ->
|
|
|
|
|
{error, queue_eol, PState0};
|
|
|
|
|
{protocol_error, _Type, _Reason, _ReasonArgs} = Error ->
|
|
|
|
|
{error, Error, PState0}
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
handle_queue_actions(Actions, #proc_state{} = PState0) ->
|
|
|
|
|
lists:foldl(
|
|
|
|
|
fun ({deliver, ?CONSUMER_TAG, _AckRequired = false, Msgs}, S) ->
|
|
|
|
|
handle_deliver(Msgs, S)
|
|
|
|
|
end, PState0, Actions).
|
|
|
|
|
|
|
|
|
|
handle_deliver(Msgs, PState)
|
|
|
|
|
when is_list(Msgs) ->
|
|
|
|
|
lists:foldl(fun(Msg, S) ->
|
|
|
|
|
handle_deliver0(Msg, S)
|
|
|
|
|
end, PState, Msgs).
|
|
|
|
|
|
|
|
|
|
handle_deliver0({_QName, _QPid, _MsgId, Redelivered,
|
|
|
|
|
#basic_message{routing_keys = [RoutingKey | _CcRoutes],
|
|
|
|
|
content = #content{
|
|
|
|
|
properties = #'P_basic'{headers = Headers},
|
|
|
|
|
payload_fragments_rev = FragmentsRev}}},
|
|
|
|
|
PState = #proc_state{send_fun = SendFun,
|
|
|
|
|
amqp2mqtt_fun = Amqp2MqttFun}) ->
|
|
|
|
|
Dup = case rabbit_mqtt_util:table_lookup(Headers, <<"x-mqtt-dup">>) of
|
|
|
|
|
undefined -> Redelivered;
|
|
|
|
|
{bool, Dup0} -> Redelivered orelse Dup0
|
|
|
|
|
end,
|
|
|
|
|
%%TODO support iolists when sending to client
|
|
|
|
|
Payload = list_to_binary(lists:reverse(FragmentsRev)),
|
|
|
|
|
Frame = #mqtt_frame{fixed = #mqtt_frame_fixed{
|
|
|
|
|
type = ?PUBLISH,
|
|
|
|
|
qos = ?QOS_0, %% spike handles only QoS0
|
|
|
|
|
dup = Dup},
|
|
|
|
|
variable = #mqtt_frame_publish{
|
|
|
|
|
message_id = undefined, %% spike handles only QoS0
|
|
|
|
|
topic_name = Amqp2MqttFun(RoutingKey)},
|
|
|
|
|
payload = Payload},
|
|
|
|
|
SendFun(Frame, PState),
|
|
|
|
|
PState.
|
|
|
|
|
|
2022-09-06 21:56:01 +08:00
|
|
|
publish(TopicName, PublishFun,
|
|
|
|
|
PState = #proc_state{exchange = Exchange,
|
|
|
|
|
auth_state = #auth_state{user = User,
|
|
|
|
|
authz_ctx = AuthzCtx}}) ->
|
|
|
|
|
case check_resource_access(User, Exchange, write, AuthzCtx) of
|
|
|
|
|
ok ->
|
|
|
|
|
case check_topic_access(TopicName, write, PState) of
|
|
|
|
|
ok ->
|
|
|
|
|
PublishFun();
|
|
|
|
|
{error, access_refused} ->
|
|
|
|
|
{error, unauthorized, PState}
|
|
|
|
|
end;
|
|
|
|
|
{error, access_refused} ->
|
|
|
|
|
{error, unauthorized, PState}
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
check_resource_access(User, Resource, Perm, Context) ->
|
|
|
|
|
V = {Resource, Context, Perm},
|
|
|
|
|
Cache = case get(permission_cache) of
|
|
|
|
|
undefined -> [];
|
|
|
|
|
Other -> Other
|
|
|
|
|
end,
|
|
|
|
|
case lists:member(V, Cache) of
|
|
|
|
|
true ->
|
|
|
|
|
ok;
|
|
|
|
|
false ->
|
|
|
|
|
try rabbit_access_control:check_resource_access(User, Resource, Perm, Context) of
|
|
|
|
|
ok ->
|
|
|
|
|
CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1),
|
|
|
|
|
put(permission_cache, [V | CacheTail]),
|
|
|
|
|
ok
|
|
|
|
|
catch exit:{amqp_error, access_refused, Msg, _AmqpMethod} ->
|
|
|
|
|
rabbit_log:error("MQTT resource access refused: ~s", [Msg]),
|
|
|
|
|
{error, access_refused}
|
|
|
|
|
end
|
|
|
|
|
end.
|
2015-06-30 16:08:26 +08:00
|
|
|
|
2017-01-23 19:10:23 +08:00
|
|
|
check_topic_access(TopicName, Access,
|
2016-12-29 23:34:19 +08:00
|
|
|
#proc_state{
|
2022-09-02 20:24:40 +08:00
|
|
|
auth_state = #auth_state{user = User = #user{username = Username},
|
|
|
|
|
vhost = VHost,
|
|
|
|
|
authz_ctx = AuthzCtx},
|
|
|
|
|
exchange = #resource{name = ExchangeBin},
|
|
|
|
|
client_id = ClientId,
|
|
|
|
|
mqtt2amqp_fun = Mqtt2AmqpFun}) ->
|
|
|
|
|
Cache = case get(topic_permission_cache) of
|
|
|
|
|
undefined -> [];
|
|
|
|
|
Other -> Other
|
|
|
|
|
end,
|
2022-08-12 21:15:23 +08:00
|
|
|
Key = {TopicName, Username, ClientId, VHost, ExchangeBin, Access},
|
2019-10-30 21:32:46 +08:00
|
|
|
case lists:member(Key, Cache) of
|
|
|
|
|
true ->
|
|
|
|
|
ok;
|
|
|
|
|
false ->
|
|
|
|
|
Resource = #resource{virtual_host = VHost,
|
|
|
|
|
kind = topic,
|
2022-08-12 21:15:23 +08:00
|
|
|
name = ExchangeBin},
|
2019-10-30 21:32:46 +08:00
|
|
|
RoutingKey = Mqtt2AmqpFun(TopicName),
|
|
|
|
|
Context = #{routing_key => RoutingKey,
|
2022-09-02 20:24:40 +08:00
|
|
|
variable_map => AuthzCtx},
|
2019-10-30 21:32:46 +08:00
|
|
|
try rabbit_access_control:check_topic_access(User, Resource, Access, Context) of
|
|
|
|
|
ok ->
|
2022-08-13 21:28:30 +08:00
|
|
|
CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1),
|
2019-10-30 22:26:57 +08:00
|
|
|
put(topic_permission_cache, [Key | CacheTail]),
|
2022-09-06 21:56:01 +08:00
|
|
|
ok
|
2019-10-30 21:32:46 +08:00
|
|
|
catch
|
2022-09-06 21:56:01 +08:00
|
|
|
exit:{amqp_error, access_refused, Msg, _AmqpMethod} ->
|
|
|
|
|
rabbit_log:error("MQTT topic access refused: ~s", [Msg]),
|
2019-10-30 21:32:46 +08:00
|
|
|
{error, access_refused}
|
|
|
|
|
end
|
|
|
|
|
end.
|
2016-12-05 22:58:19 +08:00
|
|
|
|
|
|
|
|
info(consumer_tags, #proc_state{consumer_tags = Val}) -> Val;
|
|
|
|
|
info(unacked_pubs, #proc_state{unacked_pubs = Val}) -> Val;
|
|
|
|
|
info(awaiting_ack, #proc_state{awaiting_ack = Val}) -> Val;
|
|
|
|
|
info(awaiting_seqno, #proc_state{awaiting_seqno = Val}) -> Val;
|
|
|
|
|
info(message_id, #proc_state{message_id = Val}) -> Val;
|
2016-12-06 22:24:33 +08:00
|
|
|
info(client_id, #proc_state{client_id = Val}) ->
|
|
|
|
|
rabbit_data_coercion:to_binary(Val);
|
2016-12-05 22:58:19 +08:00
|
|
|
info(clean_sess, #proc_state{clean_sess = Val}) -> Val;
|
|
|
|
|
info(will_msg, #proc_state{will_msg = Val}) -> Val;
|
|
|
|
|
info(channels, #proc_state{channels = Val}) -> Val;
|
2022-08-12 21:15:23 +08:00
|
|
|
info(exchange, #proc_state{exchange = #resource{name = Val}}) -> Val;
|
2016-12-05 22:58:19 +08:00
|
|
|
info(ssl_login_name, #proc_state{ssl_login_name = Val}) -> Val;
|
|
|
|
|
info(retainer_pid, #proc_state{retainer_pid = Val}) -> Val;
|
|
|
|
|
info(user, #proc_state{auth_state = #auth_state{username = Val}}) -> Val;
|
|
|
|
|
info(vhost, #proc_state{auth_state = #auth_state{vhost = Val}}) -> Val;
|
2022-08-05 17:16:43 +08:00
|
|
|
info(host, #proc_state{info = #info{host = Val}}) -> Val;
|
|
|
|
|
info(port, #proc_state{info = #info{port = Val}}) -> Val;
|
|
|
|
|
info(peer_host, #proc_state{info = #info{peer_host = Val}}) -> Val;
|
|
|
|
|
info(peer_port, #proc_state{info = #info{peer_port = Val}}) -> Val;
|
2022-09-06 21:56:01 +08:00
|
|
|
info(protocol, #proc_state{info = #info{proto_human = Val}}) -> Val;
|
2022-08-05 17:16:43 +08:00
|
|
|
% info(channels, PState) -> additional_info(channels, PState);
|
|
|
|
|
% info(channel_max, PState) -> additional_info(channel_max, PState);
|
|
|
|
|
% info(frame_max, PState) -> additional_info(frame_max, PState);
|
|
|
|
|
% info(client_properties, PState) -> additional_info(client_properties, PState);
|
|
|
|
|
% info(ssl, PState) -> additional_info(ssl, PState);
|
|
|
|
|
% info(ssl_protocol, PState) -> additional_info(ssl_protocol, PState);
|
|
|
|
|
% info(ssl_key_exchange, PState) -> additional_info(ssl_key_exchange, PState);
|
|
|
|
|
% info(ssl_cipher, PState) -> additional_info(ssl_cipher, PState);
|
|
|
|
|
% info(ssl_hash, PState) -> additional_info(ssl_hash, PState);
|
2016-12-05 22:58:19 +08:00
|
|
|
info(Other, _) -> throw({bad_argument, Other}).
|
|
|
|
|
|
2022-08-05 17:16:43 +08:00
|
|
|
% additional_info(Key,
|
|
|
|
|
% #proc_state{adapter_info =
|
|
|
|
|
% #amqp_adapter_info{additional_info = AddInfo}}) ->
|
|
|
|
|
% proplists:get_value(Key, AddInfo).
|
2022-03-10 17:49:03 +08:00
|
|
|
|
|
|
|
|
notify_received(undefined) ->
|
|
|
|
|
%% no notification for quorum queues and streams
|
|
|
|
|
ok;
|
|
|
|
|
notify_received(DeliveryCtx) ->
|
|
|
|
|
%% notification for flow control
|
|
|
|
|
amqp_channel:notify_received(DeliveryCtx).
|
2022-09-01 22:02:08 +08:00
|
|
|
|
|
|
|
|
-spec ssl_login_name(rabbit_net:socket()) ->
|
|
|
|
|
none | binary().
|
|
|
|
|
ssl_login_name(Sock) ->
|
|
|
|
|
case rabbit_net:peercert(Sock) of
|
|
|
|
|
{ok, C} -> case rabbit_ssl:peer_cert_auth_name(C) of
|
|
|
|
|
unsafe -> none;
|
|
|
|
|
not_found -> none;
|
|
|
|
|
Name -> Name
|
|
|
|
|
end;
|
|
|
|
|
{error, no_peercert} -> none;
|
|
|
|
|
nossl -> none
|
|
|
|
|
end.
|