rabbitmq-server/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

1165 lines
55 KiB
Erlang
Raw Normal View History

%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
2012-06-27 00:57:24 +08:00
%%
2023-01-02 12:17:36 +08:00
%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
2012-06-27 00:57:24 +08:00
%%
-module(rabbit_mqtt_processor).
-export([info/2, initial_state/2, initial_state/4,
process_frame/2, amqp_pub/2, amqp_callback/2, send_will/1,
close_connection/1, handle_pre_hibernate/0,
handle_ra_event/2]).
2012-06-27 00:57:24 +08:00
%% for testing purposes
-export([get_vhost_username/1, get_vhost/3, get_vhost_from_user_mapping/2, maybe_quorum/3]).
2012-06-27 00:57:24 +08:00
-include_lib("amqp_client/include/amqp_client.hrl").
2012-09-12 21:34:41 +08:00
-include("rabbit_mqtt_frame.hrl").
-include("rabbit_mqtt.hrl").
-define(APP, rabbitmq_mqtt).
-define(FRAME_TYPE(Frame, Type),
Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}).
2019-10-30 22:26:57 +08:00
-define(MAX_TOPIC_PERMISSION_CACHE_SIZE, 12).
2016-01-08 07:25:26 +08:00
initial_state(Socket, SSLLoginName) ->
RealSocket = rabbit_net:unwrap_socket(Socket),
{ok, {PeerAddr, _PeerPort}} = rabbit_net:peername(RealSocket),
initial_state(RealSocket, SSLLoginName, fun serialise_and_send_to_client/2, PeerAddr).
initial_state(Socket, SSLLoginName, SendFun, PeerAddr) ->
2019-09-04 23:07:33 +08:00
{ok, {mqtt2amqp_fun, M2A}, {amqp2mqtt_fun, A2M}} =
rabbit_mqtt_util:get_topic_translation_funs(),
2016-03-03 21:59:38 +08:00
%% MQTT connections use exactly one channel. The frame max is not
%% applicable and there is no way to know what client is used.
#proc_state{ unacked_pubs = gb_trees:empty(),
awaiting_ack = gb_trees:empty(),
message_id = 1,
2017-04-24 20:45:37 +08:00
subscriptions = #{},
queue_states = rabbit_queue_type:init(),
consumer_tags = {undefined, undefined},
channels = {undefined, undefined},
socket = Socket,
ssl_login_name = SSLLoginName,
send_fun = SendFun,
2019-09-04 23:07:33 +08:00
peer_addr = PeerAddr,
mqtt2amqp_fun = M2A,
amqp2mqtt_fun = A2M}.
2012-08-06 06:52:54 +08:00
process_frame(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
PState = #proc_state{ auth_state = undefined } )
2012-08-06 06:52:54 +08:00
when Type =/= ?CONNECT ->
{error, connect_expected, PState};
process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
PState) ->
try process_request(Type, Frame, PState) of
2016-03-03 21:59:38 +08:00
{ok, PState1} -> {ok, PState1, PState1#proc_state.connection};
Ret -> Ret
catch
_:{{shutdown, {server_initiated_close, 403, _}}, _} ->
%% NB: MQTT spec says we should ack normally, ie pretend
%% there was no auth error, but here we are closing the
%% connection with an error. This is what happens anyway
%% if there is an authorization failure at the AMQP 0-9-1
%% client level. And error was already logged by AMQP
%% channel, so no need for custom logging.
{error, access_refused, PState}
2016-03-03 21:59:38 +08:00
end.
process_connect(#mqtt_frame{variable = #mqtt_frame_connect{
username = Username,
password = Password,
proto_ver = ProtoVersion,
clean_sess = CleanSess,
client_id = ClientId0,
keep_alive = Keepalive} = Var},
PState0 = #proc_state{ssl_login_name = SSLLoginName,
socket = Socket,
send_fun = SendFun,
peer_addr = Addr}) ->
2014-03-19 20:05:42 +08:00
ClientId = case ClientId0 of
[] -> rabbit_mqtt_util:gen_client_id();
[_|_] -> ClientId0
end,
rabbit_log_connection:debug("Received a CONNECT, client ID: ~p (expanded to ~p), username: ~p, "
"clean session: ~p, protocol version: ~p, keepalive: ~p",
[ClientId0, ClientId, Username, CleanSess, ProtoVersion, Keepalive]),
% AdapterInfo1 = add_client_id_to_adapter_info(rabbit_data_coercion:to_binary(ClientId), AdapterInfo),
% PState1 = PState0#proc_state{adapter_info = AdapterInfo1},
2020-08-28 17:29:12 +08:00
Ip = list_to_binary(inet:ntoa(Addr)),
{Return, PState2} =
case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)),
ClientId0 =:= [] andalso CleanSess =:= false} of
{false, _} ->
{?CONNACK_PROTO_VER, PState0};
{_, true} ->
{?CONNACK_INVALID_ID, PState0};
_ ->
case creds(Username, Password, SSLLoginName) of
nocreds ->
rabbit_core_metrics:auth_attempt_failed(Ip, <<>>, mqtt),
rabbit_log_connection:error("MQTT login failed: no credentials provided"),
{?CONNACK_CREDENTIALS, PState0};
{invalid_creds, {undefined, Pass}} when is_list(Pass) ->
rabbit_core_metrics:auth_attempt_failed(Ip, <<>>, mqtt),
rabbit_log_connection:error("MQTT login failed: no username is provided"),
{?CONNACK_CREDENTIALS, PState0};
{invalid_creds, {User, undefined}} when is_list(User) ->
rabbit_core_metrics:auth_attempt_failed(Ip, User, mqtt),
rabbit_log_connection:error("MQTT login failed for user '~p': no password provided", [User]),
{?CONNACK_CREDENTIALS, PState0};
{UserBin, PassBin} ->
case process_login(UserBin, PassBin, ClientId, ProtoVersion, PState0) of
connack_dup_auth ->
maybe_clean_sess(PState0);
{?CONNACK_ACCEPT, VHost, ProtoVersion, AState} ->
case rabbit_mqtt_collector:register(ClientId, self()) of
{ok, Corr} ->
RetainerPid = rabbit_mqtt_retainer_sup:child_for_vhost(VHost),
Prefetch = rabbit_mqtt_util:env(prefetch),
rabbit_mqtt_reader:start_keepalive(self(), Keepalive),
{ok, {PeerHost, PeerPort, Host, Port}} = rabbit_net:socket_ends(Socket, inbound),
ExchangeBin = rabbit_mqtt_util:env(exchange),
ExchangeName = rabbit_misc:r(VHost, exchange, ExchangeBin),
PState1 = PState0#proc_state{
exchange = ExchangeName,
will_msg = make_will_msg(Var),
clean_sess = CleanSess,
client_id = ClientId,
retainer_pid = RetainerPid,
auth_state = AState,
register_state = {pending, Corr},
info = #info{prefetch = Prefetch,
peer_host = PeerHost,
peer_port = PeerPort,
host = Host,
port = Port,
protocol = {'MQTT', human_readable_mqtt_version(ProtoVersion)}}},
maybe_clean_sess(PState1);
%% e.g. this node was removed from the MQTT cluster members
{error, _} = Err ->
rabbit_log_connection:error("MQTT cannot accept a connection: "
"client ID tracker is unavailable: ~p", [Err]),
{?CONNACK_SERVER, PState0};
{timeout, _} ->
rabbit_log_connection:error("MQTT cannot accept a connection: "
"client ID registration timed out"),
{?CONNACK_SERVER, PState0}
end;
ConnAck -> {ConnAck, PState0}
end
end
end,
2016-04-22 02:02:28 +08:00
{ReturnCode, SessionPresent} = case Return of
{?CONNACK_ACCEPT, Bool} -> {?CONNACK_ACCEPT, Bool};
Other -> {Other, false}
end,
SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{type = ?CONNACK},
variable = #mqtt_frame_connack{
session_present = SessionPresent,
return_code = ReturnCode}},
PState2),
case ReturnCode of
?CONNACK_ACCEPT -> {ok, PState2};
?CONNACK_CREDENTIALS -> {error, unauthenticated, PState2};
?CONNACK_AUTH -> {error, unauthorized, PState2};
?CONNACK_SERVER -> {error, unavailable, PState2};
?CONNACK_INVALID_ID -> {error, invalid_client_id, PState2};
?CONNACK_PROTO_VER -> {error, unsupported_protocol_version, PState2}
Avoid crash when client disconnects before server handles MQTT CONNECT In case of a resource alarm, the server accepts incoming TCP connections, but does not read from the socket. When a client connects during a resource alarm, the MQTT CONNECT frame is therefore not processed. While the resource alarm is ongoing, the client might time out waiting on a CONNACK MQTT packet. When the resource alarm clears on the server, the MQTT CONNECT frame gets processed. Prior to this commit, this results in the following crash on the server: ``` ** Reason for termination == ** {{badmatch,{error,einval}}, [{rabbit_mqtt_processor,process_login,4, [{file,"rabbit_mqtt_processor.erl"},{line,585}]}, {rabbit_mqtt_processor,process_request,3, [{file,"rabbit_mqtt_processor.erl"},{line,143}]}, {rabbit_mqtt_processor,process_frame,2, [{file,"rabbit_mqtt_processor.erl"},{line,69}]}, {rabbit_mqtt_reader,process_received_bytes,2, [{file,"src/rabbit_mqtt_reader.erl"},{line,307}]}, ``` After this commit, the server just logs: ``` [error] <0.887.0> MQTT protocol error on connection 127.0.0.1:55725 -> 127.0.0.1:1883: peername_not_known ``` In case the client already disconnected, we want the server to bail out early, i.e. not authenticating and registering the client at all since that can be expensive when many clients connected while the resource alarm was ongoing. To detect whether the client disconnected, we rely on inet:peername/1 which will return an error when the peer is not connected anymore. Ideally we could use some better mechanism for detecting whether the client disconnected. The MQTT reader does receive a {tcp_closed, Socket} message once the socket becomes active. However, we don't really want to read frames ahead (i.e. ahead of the received CONNECT frame), one reason being that: "Clients are allowed to send further Control Packets immediately after sending a CONNECT Packet; Clients need not wait for a CONNACK Packet to arrive from the Server." Setting socket option `show_econnreset` does not help either because the client closes the connection normally. Co-authored-by: Péter Gömöri @gomoripeti
2022-08-25 23:54:30 +08:00
end.
process_request(?CONNECT, Frame, PState = #proc_state{socket = Socket}) ->
%% Check whether peer closed the connection.
%% For example, this can happen when connection was blocked because of resource
%% alarm and client therefore disconnected due to client side CONNACK timeout.
case rabbit_net:socket_ends(Socket, inbound) of
{error, Reason} ->
{error, {socket_ends, Reason}, PState};
Avoid crash when client disconnects before server handles MQTT CONNECT In case of a resource alarm, the server accepts incoming TCP connections, but does not read from the socket. When a client connects during a resource alarm, the MQTT CONNECT frame is therefore not processed. While the resource alarm is ongoing, the client might time out waiting on a CONNACK MQTT packet. When the resource alarm clears on the server, the MQTT CONNECT frame gets processed. Prior to this commit, this results in the following crash on the server: ``` ** Reason for termination == ** {{badmatch,{error,einval}}, [{rabbit_mqtt_processor,process_login,4, [{file,"rabbit_mqtt_processor.erl"},{line,585}]}, {rabbit_mqtt_processor,process_request,3, [{file,"rabbit_mqtt_processor.erl"},{line,143}]}, {rabbit_mqtt_processor,process_frame,2, [{file,"rabbit_mqtt_processor.erl"},{line,69}]}, {rabbit_mqtt_reader,process_received_bytes,2, [{file,"src/rabbit_mqtt_reader.erl"},{line,307}]}, ``` After this commit, the server just logs: ``` [error] <0.887.0> MQTT protocol error on connection 127.0.0.1:55725 -> 127.0.0.1:1883: peername_not_known ``` In case the client already disconnected, we want the server to bail out early, i.e. not authenticating and registering the client at all since that can be expensive when many clients connected while the resource alarm was ongoing. To detect whether the client disconnected, we rely on inet:peername/1 which will return an error when the peer is not connected anymore. Ideally we could use some better mechanism for detecting whether the client disconnected. The MQTT reader does receive a {tcp_closed, Socket} message once the socket becomes active. However, we don't really want to read frames ahead (i.e. ahead of the received CONNECT frame), one reason being that: "Clients are allowed to send further Control Packets immediately after sending a CONNECT Packet; Clients need not wait for a CONNACK Packet to arrive from the Server." Setting socket option `show_econnreset` does not help either because the client closes the connection normally. Co-authored-by: Péter Gömöri @gomoripeti
2022-08-25 23:54:30 +08:00
_ ->
process_connect(Frame, PState)
end;
2012-07-04 00:35:18 +08:00
2012-08-06 06:52:54 +08:00
process_request(?PUBACK,
#mqtt_frame{
2012-08-06 06:52:54 +08:00
variable = #mqtt_frame_publish{ message_id = MessageId }},
#proc_state{ channels = {Channel, _},
awaiting_ack = Awaiting } = PState) ->
%% tag can be missing because of bogus clients and QoS downgrades
case gb_trees:is_defined(MessageId, Awaiting) of
false ->
{ok, PState};
true ->
Tag = gb_trees:get(MessageId, Awaiting),
amqp_channel:cast(Channel, #'basic.ack'{ delivery_tag = Tag }),
{ok, PState#proc_state{ awaiting_ack = gb_trees:delete(MessageId, Awaiting) }}
end;
2012-08-06 06:52:54 +08:00
process_request(?PUBLISH,
2016-05-18 21:11:50 +08:00
Frame = #mqtt_frame{
fixed = Fixed = #mqtt_frame_fixed{ qos = ?QOS_2 }},
PState) ->
% Downgrade QOS_2 to QOS_1
2016-05-18 21:11:50 +08:00
process_request(?PUBLISH,
Frame#mqtt_frame{
fixed = Fixed#mqtt_frame_fixed{ qos = ?QOS_1 }},
PState);
process_request(?PUBLISH,
#mqtt_frame{
2012-08-06 06:52:54 +08:00
fixed = #mqtt_frame_fixed{ qos = Qos,
retain = Retain,
dup = Dup },
variable = #mqtt_frame_publish{ topic_name = Topic,
message_id = MessageId },
payload = Payload },
2019-09-04 23:07:33 +08:00
PState = #proc_state{retainer_pid = RPid,
amqp2mqtt_fun = Amqp2MqttFun}) ->
check_publish(Topic, fun() ->
Msg = #mqtt_msg{retain = Retain,
qos = Qos,
topic = Topic,
dup = Dup,
message_id = MessageId,
payload = Payload},
Result = amqp_pub(Msg, PState),
case Retain of
false -> ok;
2019-09-04 23:07:33 +08:00
true -> hand_off_to_retainer(RPid, Amqp2MqttFun, Topic, Msg)
end,
{ok, Result}
end, PState);
process_request(?SUBSCRIBE,
#mqtt_frame{
variable = #mqtt_frame_subscribe{
message_id = SubscribeMsgId,
topic_table = Topics},
payload = undefined},
#proc_state{channels = {Channel, _},
exchange = Exchange,
retainer_pid = RPid,
send_fun = SendFun,
2019-09-04 23:07:33 +08:00
message_id = StateMsgId,
mqtt2amqp_fun = Mqtt2AmqpFun} = PState0) ->
rabbit_log_connection:debug("Received a SUBSCRIBE for topic(s) ~tp", [Topics]),
{QosResponse, PState1} =
lists:foldl(fun (#mqtt_topic{name = TopicName,
qos = Qos}, {QosList, PState}) ->
SupportedQos = supported_subs_qos(Qos),
{Queue, #proc_state{subscriptions = Subs} = PState1} =
ensure_queue(SupportedQos, PState),
RoutingKey = Mqtt2AmqpFun(TopicName),
Binding = #'queue.bind'{
queue = Queue,
exchange = Exchange,
routing_key = RoutingKey},
#'queue.bind_ok'{} = amqp_channel:call(Channel, Binding),
SupportedQosList = case maps:find(TopicName, Subs) of
{ok, L} -> [SupportedQos|L];
error -> [SupportedQos]
end,
{[SupportedQos | QosList],
PState1 #proc_state{
subscriptions =
maps:put(TopicName, SupportedQosList, Subs)}}
end, {[], PState0}, Topics),
SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK},
variable = #mqtt_frame_suback{
message_id = SubscribeMsgId,
qos_table = QosResponse}}, PState1),
%% we may need to send up to length(Topics) messages.
%% if QoS is > 0 then we need to generate a message id,
%% and increment the counter.
StartMsgId = safe_max_id(SubscribeMsgId, StateMsgId),
N = lists:foldl(fun (Topic, Acc) ->
case maybe_send_retained_message(RPid, Topic, Acc, PState1) of
{true, X} -> Acc + X;
false -> Acc
end
end, StartMsgId, Topics),
{ok, PState1#proc_state{message_id = N}};
2012-07-05 00:47:07 +08:00
process_request(?UNSUBSCRIBE,
#mqtt_frame{
variable = #mqtt_frame_subscribe{ message_id = MessageId,
topic_table = Topics },
payload = undefined }, #proc_state{ channels = {Channel, _},
exchange = Exchange,
client_id = ClientId,
subscriptions = Subs0,
2019-09-04 23:07:33 +08:00
send_fun = SendFun,
mqtt2amqp_fun = Mqtt2AmqpFun } = PState) ->
rabbit_log_connection:debug("Received an UNSUBSCRIBE for topic(s) ~tp", [Topics]),
2012-08-06 06:52:54 +08:00
Queues = rabbit_mqtt_util:subcription_queue_name(ClientId),
Subs1 =
lists:foldl(
fun (#mqtt_topic{ name = TopicName }, Subs) ->
2017-04-24 20:45:37 +08:00
QosSubs = case maps:find(TopicName, Subs) of
2012-08-06 06:52:54 +08:00
{ok, Val} when is_list(Val) -> lists:usort(Val);
error -> []
end,
2019-09-04 23:07:33 +08:00
RoutingKey = Mqtt2AmqpFun(TopicName),
2012-08-06 06:52:54 +08:00
lists:foreach(
fun (QosSub) ->
Queue = element(QosSub + 1, Queues),
Binding = #'queue.unbind'{
queue = Queue,
exchange = Exchange,
2019-09-04 23:07:33 +08:00
routing_key = RoutingKey},
2012-08-06 06:52:54 +08:00
#'queue.unbind_ok'{} = amqp_channel:call(Channel, Binding)
end, QosSubs),
2017-04-24 20:45:37 +08:00
maps:remove(TopicName, Subs)
2012-08-06 06:52:54 +08:00
end, Subs0, Topics),
SendFun(#mqtt_frame{ fixed = #mqtt_frame_fixed { type = ?UNSUBACK },
variable = #mqtt_frame_suback{ message_id = MessageId }},
PState),
{ok, PState #proc_state{ subscriptions = Subs1 }};
process_request(?PINGREQ, #mqtt_frame{}, #proc_state{ send_fun = SendFun } = PState) ->
rabbit_log_connection:debug("Received a PINGREQ"),
SendFun(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }},
PState),
rabbit_log_connection:debug("Sent a PINGRESP"),
{ok, PState};
process_request(?DISCONNECT, #mqtt_frame{}, PState) ->
rabbit_log_connection:debug("Received a DISCONNECT"),
{stop, PState}.
2012-07-04 00:35:18 +08:00
2019-09-04 23:07:33 +08:00
hand_off_to_retainer(RetainerPid, Amqp2MqttFun, Topic0, #mqtt_msg{payload = <<"">>}) ->
Topic1 = Amqp2MqttFun(Topic0),
rabbit_mqtt_retainer:clear(RetainerPid, Topic1),
ok;
hand_off_to_retainer(RetainerPid, Amqp2MqttFun, Topic0, Msg) ->
Topic1 = Amqp2MqttFun(Topic0),
rabbit_mqtt_retainer:retain(RetainerPid, Topic1, Msg),
ok.
maybe_send_retained_message(RPid, #mqtt_topic{name = Topic0, qos = SubscribeQos}, MsgId,
#proc_state{ send_fun = SendFun,
amqp2mqtt_fun = Amqp2MqttFun } = PState) ->
Topic1 = Amqp2MqttFun(Topic0),
case rabbit_mqtt_retainer:fetch(RPid, Topic1) of
undefined -> false;
Msg ->
%% calculate effective QoS as the lower value of SUBSCRIBE frame QoS
%% and retained message QoS. The spec isn't super clear on this, we
%% do what Mosquitto does, per user feedback.
Qos = erlang:min(SubscribeQos, Msg#mqtt_msg.qos),
Id = case Qos of
?QOS_0 -> undefined;
?QOS_1 -> MsgId
end,
SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{
type = ?PUBLISH,
qos = Qos,
dup = false,
retain = Msg#mqtt_msg.retain
}, variable = #mqtt_frame_publish{
message_id = Id,
topic_name = Topic1
},
payload = Msg#mqtt_msg.payload}, PState),
case Qos of
?QOS_0 -> false;
?QOS_1 -> {true, 1}
end
end.
-spec amqp_callback(#'basic.ack'{} | {#'basic.deliver'{}, #amqp_msg{}, {pid(), pid(), pid()}}, #proc_state{}) -> {'ok', #proc_state{}} | {'error', term(), term()}.
2012-08-06 06:52:54 +08:00
amqp_callback({#'basic.deliver'{ consumer_tag = ConsumerTag,
delivery_tag = DeliveryTag,
routing_key = RoutingKey },
#amqp_msg{ props = #'P_basic'{ headers = Headers },
2014-08-11 18:26:27 +08:00
payload = Payload },
DeliveryCtx} = Delivery,
#proc_state{ channels = {Channel, _},
awaiting_ack = Awaiting,
message_id = MsgId,
2019-09-04 23:07:33 +08:00
send_fun = SendFun,
amqp2mqtt_fun = Amqp2MqttFun } = PState) ->
notify_received(DeliveryCtx),
case {delivery_dup(Delivery), delivery_qos(ConsumerTag, Headers, PState)} of
{true, {?QOS_0, ?QOS_1}} ->
2012-08-17 01:09:21 +08:00
amqp_channel:cast(
Channel, #'basic.ack'{ delivery_tag = DeliveryTag }),
{ok, PState};
{true, {?QOS_0, ?QOS_0}} ->
{ok, PState};
2012-08-06 06:52:54 +08:00
{Dup, {DeliveryQos, _SubQos} = Qos} ->
2019-09-04 23:07:33 +08:00
TopicName = Amqp2MqttFun(RoutingKey),
SendFun(
2012-08-06 06:52:54 +08:00
#mqtt_frame{ fixed = #mqtt_frame_fixed{
type = ?PUBLISH,
qos = DeliveryQos,
dup = Dup },
variable = #mqtt_frame_publish{
2012-08-06 06:52:54 +08:00
message_id =
case DeliveryQos of
?QOS_0 -> undefined;
?QOS_1 -> MsgId
end,
2019-09-04 23:07:33 +08:00
topic_name = TopicName },
payload = Payload}, PState),
2012-08-06 06:52:54 +08:00
case Qos of
{?QOS_0, ?QOS_0} ->
{ok, PState};
2012-08-06 06:52:54 +08:00
{?QOS_1, ?QOS_1} ->
Awaiting1 = gb_trees:insert(MsgId, DeliveryTag, Awaiting),
PState1 = PState#proc_state{ awaiting_ack = Awaiting1 },
PState2 = next_msg_id(PState1),
{ok, PState2};
2012-08-06 06:52:54 +08:00
{?QOS_0, ?QOS_1} ->
amqp_channel:cast(
2012-08-17 01:09:21 +08:00
Channel, #'basic.ack'{ delivery_tag = DeliveryTag }),
{ok, PState}
2012-08-06 06:52:54 +08:00
end
end;
amqp_callback(#'basic.ack'{ multiple = true, delivery_tag = Tag } = Ack,
PState = #proc_state{ unacked_pubs = UnackedPubs,
send_fun = SendFun }) ->
case gb_trees:size(UnackedPubs) > 0 andalso
gb_trees:take_smallest(UnackedPubs) of
2012-08-06 06:52:54 +08:00
{TagSmall, MsgId, UnackedPubs1} when TagSmall =< Tag ->
SendFun(
2012-08-06 06:52:54 +08:00
#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK },
variable = #mqtt_frame_publish{ message_id = MsgId }},
PState),
amqp_callback(Ack, PState #proc_state{ unacked_pubs = UnackedPubs1 });
2012-08-06 06:52:54 +08:00
_ ->
{ok, PState}
2012-08-06 06:52:54 +08:00
end;
amqp_callback(#'basic.ack'{ multiple = false, delivery_tag = Tag },
PState = #proc_state{ unacked_pubs = UnackedPubs,
send_fun = SendFun }) ->
SendFun(
2012-08-06 06:52:54 +08:00
#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK },
variable = #mqtt_frame_publish{
message_id = gb_trees:get(
Tag, UnackedPubs) }}, PState),
{ok, PState #proc_state{ unacked_pubs = gb_trees:delete(Tag, UnackedPubs) }}.
2012-08-06 06:52:54 +08:00
delivery_dup({#'basic.deliver'{ redelivered = Redelivered },
2014-08-11 18:26:27 +08:00
#amqp_msg{ props = #'P_basic'{ headers = Headers }},
_DeliveryCtx}) ->
2012-11-06 18:32:38 +08:00
case rabbit_mqtt_util:table_lookup(Headers, <<"x-mqtt-dup">>) of
2012-08-06 06:52:54 +08:00
undefined -> Redelivered;
{bool, Dup} -> Redelivered orelse Dup
end.
ensure_valid_mqtt_message_id(Id) when Id >= 16#ffff ->
1;
ensure_valid_mqtt_message_id(Id) ->
Id.
safe_max_id(Id0, Id1) ->
ensure_valid_mqtt_message_id(erlang:max(Id0, Id1)).
next_msg_id(PState = #proc_state{ message_id = MsgId0 }) ->
MsgId1 = ensure_valid_mqtt_message_id(MsgId0 + 1),
PState#proc_state{ message_id = MsgId1 }.
2012-08-06 06:52:54 +08:00
%% decide at which qos level to deliver based on subscription
2012-11-06 04:30:45 +08:00
%% and the message publish qos level. non-MQTT publishes are
%% assumed to be qos 1, regardless of delivery_mode.
delivery_qos(Tag, _Headers, #proc_state{ consumer_tags = {Tag, _} }) ->
2012-08-06 06:52:54 +08:00
{?QOS_0, ?QOS_0};
2012-11-06 04:30:45 +08:00
delivery_qos(Tag, Headers, #proc_state{ consumer_tags = {_, Tag} }) ->
2012-11-06 18:32:38 +08:00
case rabbit_mqtt_util:table_lookup(Headers, <<"x-mqtt-publish-qos">>) of
2012-11-06 04:30:45 +08:00
{byte, Qos} -> {lists:min([Qos, ?QOS_1]), ?QOS_1};
undefined -> {?QOS_1, ?QOS_1}
end.
2012-08-06 06:52:54 +08:00
2016-04-22 02:02:28 +08:00
maybe_clean_sess(PState = #proc_state { clean_sess = false,
connection = Conn,
auth_state = #auth_state{vhost = VHost},
2016-04-22 02:02:28 +08:00
client_id = ClientId }) ->
SessionPresent = session_present(VHost, ClientId),
case SessionPresent of
false ->
%% ensure_queue/2 not only ensures that queue is created, but also starts consuming from it.
%% Let's avoid creating that queue until explicitly asked by a client.
%% Then publish-only clients, that connect with clean_sess=true due to some misconfiguration,
%% will consume less resources.
{{?CONNACK_ACCEPT, SessionPresent}, PState};
true ->
try ensure_queue(?QOS_1, PState) of
{_Queue, PState1} -> {{?CONNACK_ACCEPT, SessionPresent}, PState1}
catch
exit:({{shutdown, {server_initiated_close, 403, _}}, _}) ->
%% Connection is not yet propagated to #proc_state{}, let's close it here
catch amqp_connection:close(Conn),
rabbit_log_connection:error("MQTT cannot recover a session, user is missing permissions"),
{?CONNACK_SERVER, PState};
C:E:S ->
%% Connection is not yet propagated to
%% #proc_state{}, let's close it here.
%% This is an exceptional situation anyway, but
%% doing this will prevent second crash from
%% amqp client being logged.
catch amqp_connection:close(Conn),
erlang:raise(C, E, S)
end
end;
maybe_clean_sess(PState = #proc_state {clean_sess = true,
client_id = ClientId,
auth_state = #auth_state{user = User,
username = Username,
vhost = VHost,
authz_ctx = AuthzCtx}}) ->
{_, QueueName} = rabbit_mqtt_util:subcription_queue_name(ClientId),
Queue = rabbit_misc:r(VHost, queue, QueueName),
case rabbit_amqqueue:exists(Queue) of
false ->
{{?CONNACK_ACCEPT, false}, PState};
true ->
ok = rabbit_access_control:check_resource_access(User, Queue, configure, AuthzCtx),
rabbit_amqqueue:with(
Queue,
fun (Q) ->
rabbit_queue_type:delete(Q, false, false, Username)
end,
fun (not_found) ->
ok;
({absent, Q, crashed}) ->
rabbit_classic_queue:delete_crashed(Q, Username);
({absent, Q, stopped}) ->
rabbit_classic_queue:delete_crashed(Q, Username);
({absent, _Q, _Reason}) ->
ok
end),
{{?CONNACK_ACCEPT, false}, PState}
end.
session_present(VHost, ClientId) ->
2016-04-22 02:02:28 +08:00
{_, QueueQ1} = rabbit_mqtt_util:subcription_queue_name(ClientId),
QueueName = rabbit_misc:r(VHost, queue, QueueQ1),
rabbit_amqqueue:exists(QueueName).
make_will_msg(#mqtt_frame_connect{ will_flag = false }) ->
2012-08-06 06:52:54 +08:00
undefined;
make_will_msg(#mqtt_frame_connect{ will_retain = Retain,
will_qos = Qos,
will_topic = Topic,
will_msg = Msg }) ->
#mqtt_msg{ retain = Retain,
qos = Qos,
topic = Topic,
dup = false,
payload = Msg }.
2012-07-04 00:35:18 +08:00
process_login(_UserBin, _PassBin, _ClientId, _ProtoVersion,
#proc_state{peer_addr = Addr,
auth_state = #auth_state{username = Username,
user = User,
vhost = VHost
}})
when Username =/= undefined, User =/= undefined, VHost =/= underfined ->
UsernameStr = rabbit_data_coercion:to_list(Username),
VHostStr = rabbit_data_coercion:to_list(VHost),
2020-09-22 23:57:47 +08:00
rabbit_core_metrics:auth_attempt_failed(list_to_binary(inet:ntoa(Addr)), Username, mqtt),
rabbit_log_connection:warning("MQTT detected duplicate connect/login attempt for user ~tp, vhost ~tp",
[UsernameStr, VHostStr]),
connack_dup_auth;
process_login(UserBin, PassBin, ClientId0, ProtoVersion,
#proc_state{socket = Sock,
ssl_login_name = SslLoginName,
peer_addr = Addr,
auth_state = undefined}) ->
{ok, {PeerHost, PeerPort, Host, Port}} = rabbit_net:socket_ends(Sock, inbound),
{VHostPickedUsing, {VHost, UsernameBin}} = get_vhost(UserBin, SslLoginName, Port),
rabbit_log_connection:debug(
"MQTT vhost picked using ~s",
[human_readable_vhost_lookup_strategy(VHostPickedUsing)]),
2020-09-22 23:57:47 +08:00
RemoteAddress = list_to_binary(inet:ntoa(Addr)),
case rabbit_vhost:exists(VHost) of
true ->
case rabbit_vhost_limit:is_over_connection_limit(VHost) of
false ->
case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of
true ->
ClientId = rabbit_data_coercion:to_binary(ClientId0),
case rabbit_access_control:check_user_login(
UsernameBin,
[{password, PassBin}, {vhost, VHost}, {client_id, ClientId}]) of
{ok, User = #user{username = Username}} ->
notify_auth_result(Username,
user_authentication_success,
[]),
case rabbit_auth_backend_internal:is_over_connection_limit(Username) of
false ->
AuthzCtx = #{<<"client_id">> => ClientId},
try rabbit_access_control:check_vhost_access(User,
VHost,
{ip, Addr},
AuthzCtx) of
ok ->
case rabbit_access_control:check_user_loopback(UsernameBin, Addr) of
ok ->
rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, UsernameBin,
mqtt),
Infos = [{node, node()},
{host, Host},
{port, Port},
{peer_host, PeerHost},
{peer_port, PeerPort},
{user, UsernameBin},
{vhost, VHost}],
rabbit_core_metrics:connection_created(self(), Infos),
rabbit_event:notify(connection_created, Infos),
{?CONNACK_ACCEPT, VHost, ProtoVersion,
#auth_state{user = User,
username = UsernameBin,
vhost = VHost,
authz_ctx = AuthzCtx}};
not_allowed ->
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, UsernameBin,
mqtt),
rabbit_log_connection:warning(
"MQTT login failed for user ~s: "
"this user's access is restricted to localhost",
[binary_to_list(UsernameBin)]),
?CONNACK_AUTH
end
catch exit:#amqp_error{name = not_allowed} ->
rabbit_log_connection:error(
"Error on MQTT connection ~p~n"
"access refused for user '~s'",
[self(), Username]),
?CONNACK_AUTH
end;
{true, Limit} ->
rabbit_log_connection:error(
"Error on MQTT connection ~p~n"
"access refused for user '~s': "
"user connection limit (~p) is reached",
[self(), Username, Limit]),
?CONNACK_AUTH
end;
{refused, Username, Msg, Args} ->
rabbit_log_connection:error(
"Error on MQTT connection ~p~n"
"access refused for user '~s' in vhost '~s' "
++ Msg,
[self(), Username, VHost] ++ Args),
notify_auth_result(Username,
user_authentication_failure,
[{error, rabbit_misc:format(Msg, Args)}]),
?CONNACK_CREDENTIALS
end;
false ->
rabbit_log_connection:error(
"Error on MQTT connection ~p~n"
"access refused for user '~s': "
"vhost is down",
[self(), UsernameBin, VHost]),
?CONNACK_AUTH
end;
{true, Limit} ->
rabbit_log_connection:error(
"Error on MQTT connection ~p~n"
"access to vhost '~s' refused for user '~s': "
"vhost connection limit (~p) is reached",
[self(), VHost, UsernameBin, Limit]),
2016-05-20 21:04:10 +08:00
?CONNACK_AUTH
2014-02-19 01:33:33 +08:00
end;
false ->
2020-09-22 23:57:47 +08:00
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, UsernameBin, mqtt),
rabbit_log_connection:error("MQTT login failed for user '~s': virtual host '~s' does not exist",
[UserBin, VHost]),
?CONNACK_CREDENTIALS
2014-02-19 01:33:00 +08:00
end.
notify_auth_result(Username, AuthResult, ExtraProps) ->
EventProps = [{connection_type, mqtt},
{name, case Username of none -> ''; _ -> Username end}] ++
ExtraProps,
rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']).
get_vhost(UserBin, none, Port) ->
get_vhost_no_ssl(UserBin, Port);
get_vhost(UserBin, undefined, Port) ->
get_vhost_no_ssl(UserBin, Port);
get_vhost(UserBin, SslLogin, Port) ->
get_vhost_ssl(UserBin, SslLogin, Port).
get_vhost_no_ssl(UserBin, Port) ->
case vhost_in_username(UserBin) of
true ->
{vhost_in_username_or_default, get_vhost_username(UserBin)};
false ->
PortVirtualHostMapping = rabbit_runtime_parameters:value_global(
mqtt_port_to_vhost_mapping
),
case get_vhost_from_port_mapping(Port, PortVirtualHostMapping) of
undefined ->
{default_vhost, {rabbit_mqtt_util:env(vhost), UserBin}};
VHost ->
{port_to_vhost_mapping, {VHost, UserBin}}
end
end.
get_vhost_ssl(UserBin, SslLoginName, Port) ->
UserVirtualHostMapping = rabbit_runtime_parameters:value_global(
mqtt_default_vhosts
),
case get_vhost_from_user_mapping(SslLoginName, UserVirtualHostMapping) of
undefined ->
PortVirtualHostMapping = rabbit_runtime_parameters:value_global(
mqtt_port_to_vhost_mapping
),
case get_vhost_from_port_mapping(Port, PortVirtualHostMapping) of
undefined ->
{vhost_in_username_or_default, get_vhost_username(UserBin)};
VHostFromPortMapping ->
{port_to_vhost_mapping, {VHostFromPortMapping, UserBin}}
end;
VHostFromCertMapping ->
{cert_to_vhost_mapping, {VHostFromCertMapping, UserBin}}
end.
vhost_in_username(UserBin) ->
case application:get_env(?APP, ignore_colons_in_username) of
{ok, true} -> false;
_ ->
%% split at the last colon, disallowing colons in username
case re:split(UserBin, ":(?!.*?:)") of
[_, _] -> true;
[UserBin] -> false
end
end.
2013-11-18 20:00:47 +08:00
get_vhost_username(UserBin) ->
Default = {rabbit_mqtt_util:env(vhost), UserBin},
case application:get_env(?APP, ignore_colons_in_username) of
{ok, true} -> Default;
_ ->
%% split at the last colon, disallowing colons in username
case re:split(UserBin, ":(?!.*?:)") of
[Vhost, UserName] -> {Vhost, UserName};
[UserBin] -> Default
end
2013-11-18 20:00:47 +08:00
end.
get_vhost_from_user_mapping(_User, not_found) ->
undefined;
get_vhost_from_user_mapping(User, Mapping) ->
M = rabbit_data_coercion:to_proplist(Mapping),
case rabbit_misc:pget(User, M) of
undefined ->
undefined;
VHost ->
VHost
end.
get_vhost_from_port_mapping(_Port, not_found) ->
undefined;
get_vhost_from_port_mapping(Port, Mapping) ->
M = rabbit_data_coercion:to_proplist(Mapping),
Res = case rabbit_misc:pget(rabbit_data_coercion:to_binary(Port), M) of
undefined ->
undefined;
VHost ->
VHost
end,
Res.
2016-12-20 05:42:06 +08:00
human_readable_vhost_lookup_strategy(vhost_in_username_or_default) ->
"vhost in username or default";
human_readable_vhost_lookup_strategy(port_to_vhost_mapping) ->
"MQTT port to vhost mapping";
human_readable_vhost_lookup_strategy(cert_to_vhost_mapping) ->
"client certificate to vhost mapping";
human_readable_vhost_lookup_strategy(default_vhost) ->
"plugin configuration or default";
human_readable_vhost_lookup_strategy(Val) ->
atom_to_list(Val).
creds(User, Pass, SSLLoginName) ->
DefaultUser = rabbit_mqtt_util:env(default_user),
DefaultPass = rabbit_mqtt_util:env(default_pass),
{ok, Anon} = application:get_env(?APP, allow_anonymous),
{ok, TLSAuth} = application:get_env(?APP, ssl_cert_login),
HaveDefaultCreds = Anon =:= true andalso
is_binary(DefaultUser) andalso
is_binary(DefaultPass),
CredentialsProvided = User =/= undefined orelse
Pass =/= undefined,
CorrectCredentials = is_list(User) andalso
is_list(Pass),
SSLLoginProvided = TLSAuth =:= true andalso
SSLLoginName =/= none,
case {CredentialsProvided, CorrectCredentials, SSLLoginProvided, HaveDefaultCreds} of
2016-09-02 06:33:34 +08:00
%% Username and password take priority
{true, true, _, _} -> {list_to_binary(User),
list_to_binary(Pass)};
%% Either username or password is provided
2016-09-02 06:33:34 +08:00
{true, false, _, _} -> {invalid_creds, {User, Pass}};
%% rabbitmq_mqtt.ssl_cert_login is true. SSL user name provided.
2016-09-02 06:33:34 +08:00
%% Authenticating using username only.
{false, false, true, _} -> {SSLLoginName, none};
2016-09-02 06:33:34 +08:00
%% Anonymous connection uses default credentials
{false, false, false, true} -> {DefaultUser, DefaultPass};
_ -> nocreds
end.
2012-07-04 00:35:18 +08:00
2012-08-06 06:52:54 +08:00
supported_subs_qos(?QOS_0) -> ?QOS_0;
supported_subs_qos(?QOS_1) -> ?QOS_1;
supported_subs_qos(?QOS_2) -> ?QOS_1.
2014-01-30 23:33:59 +08:00
delivery_mode(?QOS_0) -> 1;
delivery_mode(?QOS_1) -> 2;
delivery_mode(?QOS_2) -> 2.
2014-01-30 23:33:59 +08:00
maybe_quorum(Qos1Args, CleanSession, Queue) ->
case {rabbit_mqtt_util:env(durable_queue_type), CleanSession} of
%% it is possible to Quorum queues only if Clean Session == False
%% else always use Classic queues
%% Clean Session == True sets auto-delete to True and quorum queues
%% does not support auto-delete flag
{quorum, false} -> lists:append(Qos1Args,
[{<<"x-queue-type">>, longstr, <<"quorum">>}]);
{quorum, true} ->
rabbit_log:debug("Can't use quorum queue for ~ts. " ++
"The clean session is true. Classic queue will be used", [Queue]),
Qos1Args;
_ -> Qos1Args
end.
2012-08-06 06:52:54 +08:00
%% different qos subscriptions are received in different queues
%% with appropriate durability and timeout arguments
%% this will lead to duplicate messages for overlapping subscriptions
%% with different qos values - todo: prevent duplicates
ensure_queue(Qos, #proc_state{ channels = {Channel, _},
client_id = ClientId,
clean_sess = CleanSess,
consumer_tags = {TagQ0, TagQ1} = Tags} = PState) ->
2012-08-06 06:52:54 +08:00
{QueueQ0, QueueQ1} = rabbit_mqtt_util:subcription_queue_name(ClientId),
Qos1Args = case {rabbit_mqtt_util:env(subscription_ttl), CleanSess} of
2012-11-06 18:32:38 +08:00
{undefined, _} ->
[];
{Ms, false} when is_integer(Ms) ->
[{<<"x-expires">>, long, Ms}];
_ ->
[]
2012-08-06 06:52:54 +08:00
end,
QueueSetup =
case {TagQ0, TagQ1, Qos} of
{undefined, _, ?QOS_0} ->
{QueueQ0,
#'queue.declare'{ queue = QueueQ0,
durable = false,
auto_delete = true },
#'basic.consume'{ queue = QueueQ0,
no_ack = true }};
{_, undefined, ?QOS_1} ->
{QueueQ1,
#'queue.declare'{ queue = QueueQ1,
durable = true,
2015-10-22 01:18:33 +08:00
%% Clean session means a transient connection,
%% translating into auto-delete.
%%
%% see rabbitmq/rabbitmq-mqtt#37
auto_delete = CleanSess,
arguments = maybe_quorum(Qos1Args, CleanSess, QueueQ1)},
2012-08-06 06:52:54 +08:00
#'basic.consume'{ queue = QueueQ1,
no_ack = false }};
{_, _, ?QOS_0} ->
{exists, QueueQ0};
{_, _, ?QOS_1} ->
{exists, QueueQ1}
end,
case QueueSetup of
{Queue, Declare, Consume} ->
#'queue.declare_ok'{} = amqp_channel:call(Channel, Declare),
#'basic.consume_ok'{ consumer_tag = Tag } =
amqp_channel:call(Channel, Consume),
{Queue, PState #proc_state{ consumer_tags = setelement(Qos+1, Tags, Tag) }};
2012-08-06 06:52:54 +08:00
{exists, Q} ->
{Q, PState}
2012-08-06 06:52:54 +08:00
end.
send_will(PState = #proc_state{will_msg = undefined}) ->
PState;
send_will(PState = #proc_state{will_msg = WillMsg = #mqtt_msg{retain = Retain,
topic = Topic},
retainer_pid = RPid,
2019-09-04 23:07:33 +08:00
channels = {ChQos0, ChQos1},
amqp2mqtt_fun = Amqp2MqttFun}) ->
case check_topic_access(Topic, write, PState) of
ok ->
amqp_pub(WillMsg, PState),
case Retain of
false -> ok;
2019-09-04 23:07:33 +08:00
true ->
hand_off_to_retainer(RPid, Amqp2MqttFun, Topic, WillMsg)
end;
Error ->
rabbit_log:warning(
"Could not send last will: ~tp",
[Error])
end,
case ChQos1 of
undefined -> ok;
_ -> amqp_channel:close(ChQos1)
end,
case ChQos0 of
undefined -> ok;
_ -> amqp_channel:close(ChQos0)
end,
PState #proc_state{ channels = {undefined, undefined} }.
amqp_pub(undefined, PState) ->
PState;
amqp_pub(#mqtt_msg{qos = Qos,
topic = Topic,
dup = Dup,
message_id = _MessageId, %%TODO track in unacked_pubs for QoS > 0
payload = Payload},
PState = #proc_state{exchange = ExchangeName,
% unacked_pubs = UnackedPubs,
% awaiting_seqno = SeqNo,
mqtt2amqp_fun = Mqtt2AmqpFun}) ->
%%TODO: Use message containers
2019-09-04 23:07:33 +08:00
RoutingKey = Mqtt2AmqpFun(Topic),
Confirm = Qos > ?QOS_0,
2012-11-06 18:32:38 +08:00
Headers = [{<<"x-mqtt-publish-qos">>, byte, Qos},
{<<"x-mqtt-dup">>, bool, Dup}],
Props = #'P_basic'{
headers = Headers,
delivery_mode = delivery_mode(Qos)},
{ClassId, _MethodId} = rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
Content = #content{
class_id = ClassId,
properties = Props,
properties_bin = none,
protocol = none,
payload_fragments_rev = [Payload]
},
BasicMessage = #basic_message{
exchange_name = ExchangeName,
routing_keys = [RoutingKey],
content = Content,
id = <<>>,
is_persistent = Confirm
},
Delivery = #delivery{
mandatory = false,
confirm = Confirm,
sender = self(),
message = BasicMessage,
msg_seq_no = undefined, %%TODO assumes QoS 0
flow = noflow %%TODO enable flow control
},
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
QNames = rabbit_exchange:route(Exchange, Delivery),
deliver_to_queues(Delivery, QNames, PState).
deliver_to_queues(#delivery{confirm = false},
_RoutedToQueueNames = [],
PState) ->
% rabbit_global_counters:messages_unroutable_dropped(mqtt, 1),
PState;
deliver_to_queues(Delivery = #delivery{message = _Message = #basic_message{exchange_name = _XName},
confirm = _Confirm,
msg_seq_no = _MsgSeqNo},
RoutedToQueueNames,
PState = #proc_state{queue_states = QueueStates0}) ->
Qs0 = rabbit_amqqueue:lookup(RoutedToQueueNames),
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
% QueueNames = lists:map(fun amqqueue:get_name/1, Qs),
{ok, QueueStates, _Actions} = rabbit_queue_type:deliver(Qs, Delivery, QueueStates0),
% rabbit_global_counters:messages_routed(mqtt, length(Qs)),
%% NB: the order here is important since basic.returns must be
%% sent before confirms.
%% TODO: AMQP 0.9.1 mandatory flag corresponds to MQTT 5 PUBACK reason code "No matching subscribers"
% ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
%% TODO allows QoS > 0
% State1 = process_routing_confirm(Confirm, QueueNames,
% MsgSeqNo, XName, State0),
%% Actions must be processed after registering confirms as actions may
%% contain rejections of publishes
%% TODO handle Actions: For example if the messages is rejected, MQTT 5 allows to send a NACK
%% back to the client (via PUBACK Reason Code).
% State = handle_queue_actions(Actions, State1#ch{queue_states = QueueStates}),
PState#proc_state{queue_states = QueueStates}.
human_readable_mqtt_version(3) ->
"3.1.0";
human_readable_mqtt_version(4) ->
"3.1.1";
human_readable_mqtt_version(_) ->
"N/A".
2012-08-17 00:56:50 +08:00
serialise_and_send_to_client(Frame, #proc_state{ socket = Sock }) ->
try rabbit_net:port_command(Sock, rabbit_mqtt_frame:serialise(Frame)) of
Res ->
Res
catch _:Error ->
rabbit_log_connection:error("MQTT: a socket write failed, the socket might already be closed"),
rabbit_log_connection:debug("Failed to write to socket ~tp, error: ~tp, frame: ~tp",
[Sock, Error, Frame])
end.
close_connection(PState = #proc_state{ connection = undefined }) ->
PState;
close_connection(PState = #proc_state{ connection = Connection,
client_id = ClientId }) ->
% todo: maybe clean session
case ClientId of
undefined -> ok;
_ ->
case rabbit_mqtt_collector:unregister(ClientId, self()) of
ok -> ok;
%% ignore as we are shutting down
{timeout, _} -> ok
end
end,
%% ignore noproc or other exceptions, we are shutting down
catch amqp_connection:close(Connection),
PState #proc_state{ channels = {undefined, undefined},
connection = undefined }.
handle_pre_hibernate() ->
erase(topic_permission_cache),
ok.
handle_ra_event({applied, [{Corr, ok}]},
PState = #proc_state{register_state = {pending, Corr}}) ->
%% success case - command was applied transition into registered state
PState#proc_state{register_state = registered};
handle_ra_event({not_leader, Leader, Corr},
PState = #proc_state{register_state = {pending, Corr},
client_id = ClientId}) ->
%% retry command against actual leader
{ok, NewCorr} = rabbit_mqtt_collector:register(Leader, ClientId, self()),
PState#proc_state{register_state = {pending, NewCorr}};
handle_ra_event(register_timeout,
PState = #proc_state{register_state = {pending, _Corr},
client_id = ClientId}) ->
{ok, NewCorr} = rabbit_mqtt_collector:register(ClientId, self()),
PState#proc_state{register_state = {pending, NewCorr}};
handle_ra_event(register_timeout, PState) ->
PState;
handle_ra_event(Evt, PState) ->
%% log these?
rabbit_log:debug("unhandled ra_event: ~w ", [Evt]),
PState.
check_publish(TopicName, Fn, PState) ->
%%TODO check additionally write access to exchange as done in channel?
case check_topic_access(TopicName, write, PState) of
2015-07-14 17:47:39 +08:00
ok -> Fn();
_ -> {error, unauthorized, PState}
end.
check_topic_access(TopicName, Access,
#proc_state{
auth_state = #auth_state{user = User = #user{username = Username},
vhost = VHost},
exchange = #resource{name = ExchangeBin},
2019-09-04 23:07:33 +08:00
client_id = ClientId,
mqtt2amqp_fun = Mqtt2AmqpFun }) ->
Cache =
case get(topic_permission_cache) of
undefined -> [];
Other -> Other
end,
Key = {TopicName, Username, ClientId, VHost, ExchangeBin, Access},
case lists:member(Key, Cache) of
true ->
ok;
false ->
Resource = #resource{virtual_host = VHost,
kind = topic,
name = ExchangeBin},
RoutingKey = Mqtt2AmqpFun(TopicName),
Context = #{routing_key => RoutingKey,
variable_map => #{
<<"username">> => Username,
<<"vhost">> => VHost,
<<"client_id">> => rabbit_data_coercion:to_binary(ClientId)
}
},
try rabbit_access_control:check_topic_access(User, Resource, Access, Context) of
ok ->
2019-10-30 22:26:57 +08:00
CacheTail = lists:sublist(Cache, ?MAX_TOPIC_PERMISSION_CACHE_SIZE - 1),
put(topic_permission_cache, [Key | CacheTail]),
ok;
R ->
R
catch
_:{amqp_error, access_refused, Msg, _} ->
rabbit_log:error("operation resulted in an error (access_refused): ~tp", [Msg]),
{error, access_refused};
_:Error ->
rabbit_log:error("~tp", [Error]),
{error, access_refused}
end
end.
2016-12-05 22:58:19 +08:00
info(consumer_tags, #proc_state{consumer_tags = Val}) -> Val;
info(unacked_pubs, #proc_state{unacked_pubs = Val}) -> Val;
info(awaiting_ack, #proc_state{awaiting_ack = Val}) -> Val;
info(awaiting_seqno, #proc_state{awaiting_seqno = Val}) -> Val;
info(message_id, #proc_state{message_id = Val}) -> Val;
2016-12-06 22:24:33 +08:00
info(client_id, #proc_state{client_id = Val}) ->
rabbit_data_coercion:to_binary(Val);
2016-12-05 22:58:19 +08:00
info(clean_sess, #proc_state{clean_sess = Val}) -> Val;
info(will_msg, #proc_state{will_msg = Val}) -> Val;
info(channels, #proc_state{channels = Val}) -> Val;
info(exchange, #proc_state{exchange = #resource{name = Val}}) -> Val;
2016-12-05 22:58:19 +08:00
info(ssl_login_name, #proc_state{ssl_login_name = Val}) -> Val;
info(retainer_pid, #proc_state{retainer_pid = Val}) -> Val;
info(user, #proc_state{auth_state = #auth_state{username = Val}}) -> Val;
info(vhost, #proc_state{auth_state = #auth_state{vhost = Val}}) -> Val;
info(host, #proc_state{info = #info{host = Val}}) -> Val;
info(port, #proc_state{info = #info{port = Val}}) -> Val;
info(peer_host, #proc_state{info = #info{peer_host = Val}}) -> Val;
info(peer_port, #proc_state{info = #info{peer_port = Val}}) -> Val;
info(protocol, #proc_state{info = #info{protocol = Val}}) ->
2016-12-06 22:24:33 +08:00
case Val of
{Proto, Version} -> {Proto, rabbit_data_coercion:to_binary(Version)};
Other -> Other
end;
% info(channels, PState) -> additional_info(channels, PState);
% info(channel_max, PState) -> additional_info(channel_max, PState);
% info(frame_max, PState) -> additional_info(frame_max, PState);
% info(client_properties, PState) -> additional_info(client_properties, PState);
% info(ssl, PState) -> additional_info(ssl, PState);
% info(ssl_protocol, PState) -> additional_info(ssl_protocol, PState);
% info(ssl_key_exchange, PState) -> additional_info(ssl_key_exchange, PState);
% info(ssl_cipher, PState) -> additional_info(ssl_cipher, PState);
% info(ssl_hash, PState) -> additional_info(ssl_hash, PState);
2016-12-05 22:58:19 +08:00
info(Other, _) -> throw({bad_argument, Other}).
% additional_info(Key,
% #proc_state{adapter_info =
% #amqp_adapter_info{additional_info = AddInfo}}) ->
% proplists:get_value(Key, AddInfo).
notify_received(undefined) ->
%% no notification for quorum queues and streams
ok;
notify_received(DeliveryCtx) ->
%% notification for flow control
amqp_channel:notify_received(DeliveryCtx).