rabbitmq-server/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl

1727 lines
73 KiB
Erlang
Raw Normal View History

%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
2012-06-27 00:57:24 +08:00
%%
2023-01-02 12:17:36 +08:00
%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
2012-06-27 00:57:24 +08:00
%%
-module(rabbit_mqtt_processor).
-export([info/2, initial_state/2, initial_state/4,
process_frame/2, serialise/2, send_will/1,
terminate/2, handle_pre_hibernate/0,
handle_ra_event/2, handle_down/2, handle_queue_event/2,
soft_limit_exceeded/1, format_status/1]).
2012-06-27 00:57:24 +08:00
%%TODO Use single queue per MQTT subscriber connection?
%% * when publishing we store in x-mqtt-publish-qos header the publishing QoS
%% * routing key is present in the delivered message
%% * therefore, we can map routing key -> topic -> subscription -> subscription max QoS
%% Advantages:
%% * better scaling when single client creates subscriptions with different QoS levels
%% * no duplicates when single client creates overlapping subscriptions with different QoS levels
%% for testing purposes
-export([get_vhost_username/1, get_vhost/3, get_vhost_from_user_mapping/2]).
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit_common/include/rabbit_framing.hrl").
-include_lib("rabbit/include/amqqueue.hrl").
2012-09-12 21:34:41 +08:00
-include("rabbit_mqtt.hrl").
-include("rabbit_mqtt_frame.hrl").
-define(APP, rabbitmq_mqtt).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
-define(CONSUMER_TAG, mqtt).
2022-09-19 01:07:11 +08:00
initial_state(Socket, ConnectionName) ->
{ok, {PeerAddr, _PeerPort}} = rabbit_net:peername(Socket),
initial_state(Socket,
ConnectionName,
fun serialise_and_send_to_client/2,
PeerAddr).
initial_state(Socket, ConnectionName, SendFun, PeerAddr) ->
{ok, {mqtt2amqp_fun, M2A}, {amqp2mqtt_fun, A2M}} = rabbit_mqtt_util:get_topic_translation_funs(),
Flow = case rabbit_misc:get_env(rabbit, mirroring_flow_control, true) of
true -> flow;
false -> noflow
end,
2022-09-19 01:07:11 +08:00
#proc_state{socket = Socket,
conn_name = ConnectionName,
ssl_login_name = ssl_login_name(Socket),
2022-09-19 01:07:11 +08:00
peer_addr = PeerAddr,
send_fun = SendFun,
2022-09-19 01:07:11 +08:00
mqtt2amqp_fun = M2A,
amqp2mqtt_fun = A2M,
delivery_flow = Flow}.
process_frame(#mqtt_frame{fixed = #mqtt_frame_fixed{type = Type}},
PState = #proc_state{auth_state = undefined})
2012-08-06 06:52:54 +08:00
when Type =/= ?CONNECT ->
{error, connect_expected, PState};
process_frame(Frame = #mqtt_frame{fixed = #mqtt_frame_fixed{type = Type}}, PState) ->
process_request(Type, Frame, PState).
Avoid crash when client disconnects before server handles MQTT CONNECT In case of a resource alarm, the server accepts incoming TCP connections, but does not read from the socket. When a client connects during a resource alarm, the MQTT CONNECT frame is therefore not processed. While the resource alarm is ongoing, the client might time out waiting on a CONNACK MQTT packet. When the resource alarm clears on the server, the MQTT CONNECT frame gets processed. Prior to this commit, this results in the following crash on the server: ``` ** Reason for termination == ** {{badmatch,{error,einval}}, [{rabbit_mqtt_processor,process_login,4, [{file,"rabbit_mqtt_processor.erl"},{line,585}]}, {rabbit_mqtt_processor,process_request,3, [{file,"rabbit_mqtt_processor.erl"},{line,143}]}, {rabbit_mqtt_processor,process_frame,2, [{file,"rabbit_mqtt_processor.erl"},{line,69}]}, {rabbit_mqtt_reader,process_received_bytes,2, [{file,"src/rabbit_mqtt_reader.erl"},{line,307}]}, ``` After this commit, the server just logs: ``` [error] <0.887.0> MQTT protocol error on connection 127.0.0.1:55725 -> 127.0.0.1:1883: peername_not_known ``` In case the client already disconnected, we want the server to bail out early, i.e. not authenticating and registering the client at all since that can be expensive when many clients connected while the resource alarm was ongoing. To detect whether the client disconnected, we rely on inet:peername/1 which will return an error when the peer is not connected anymore. Ideally we could use some better mechanism for detecting whether the client disconnected. The MQTT reader does receive a {tcp_closed, Socket} message once the socket becomes active. However, we don't really want to read frames ahead (i.e. ahead of the received CONNECT frame), one reason being that: "Clients are allowed to send further Control Packets immediately after sending a CONNECT Packet; Clients need not wait for a CONNACK Packet to arrive from the Server." Setting socket option `show_econnreset` does not help either because the client closes the connection normally. Co-authored-by: Péter Gömöri @gomoripeti
2022-08-25 23:54:30 +08:00
process_request(?CONNECT, Frame, PState = #proc_state{socket = Socket}) ->
%% Check whether peer closed the connection.
%% For example, this can happen when connection was blocked because of resource
%% alarm and client therefore disconnected due to client side CONNACK timeout.
case rabbit_net:socket_ends(Socket, inbound) of
{error, Reason} ->
{error, {socket_ends, Reason}, PState};
Avoid crash when client disconnects before server handles MQTT CONNECT In case of a resource alarm, the server accepts incoming TCP connections, but does not read from the socket. When a client connects during a resource alarm, the MQTT CONNECT frame is therefore not processed. While the resource alarm is ongoing, the client might time out waiting on a CONNACK MQTT packet. When the resource alarm clears on the server, the MQTT CONNECT frame gets processed. Prior to this commit, this results in the following crash on the server: ``` ** Reason for termination == ** {{badmatch,{error,einval}}, [{rabbit_mqtt_processor,process_login,4, [{file,"rabbit_mqtt_processor.erl"},{line,585}]}, {rabbit_mqtt_processor,process_request,3, [{file,"rabbit_mqtt_processor.erl"},{line,143}]}, {rabbit_mqtt_processor,process_frame,2, [{file,"rabbit_mqtt_processor.erl"},{line,69}]}, {rabbit_mqtt_reader,process_received_bytes,2, [{file,"src/rabbit_mqtt_reader.erl"},{line,307}]}, ``` After this commit, the server just logs: ``` [error] <0.887.0> MQTT protocol error on connection 127.0.0.1:55725 -> 127.0.0.1:1883: peername_not_known ``` In case the client already disconnected, we want the server to bail out early, i.e. not authenticating and registering the client at all since that can be expensive when many clients connected while the resource alarm was ongoing. To detect whether the client disconnected, we rely on inet:peername/1 which will return an error when the peer is not connected anymore. Ideally we could use some better mechanism for detecting whether the client disconnected. The MQTT reader does receive a {tcp_closed, Socket} message once the socket becomes active. However, we don't really want to read frames ahead (i.e. ahead of the received CONNECT frame), one reason being that: "Clients are allowed to send further Control Packets immediately after sending a CONNECT Packet; Clients need not wait for a CONNACK Packet to arrive from the Server." Setting socket option `show_econnreset` does not help either because the client closes the connection normally. Co-authored-by: Péter Gömöri @gomoripeti
2022-08-25 23:54:30 +08:00
_ ->
process_connect(Frame, PState)
end;
2012-07-04 00:35:18 +08:00
2012-08-06 06:52:54 +08:00
process_request(?PUBACK,
#mqtt_frame{
variable = #mqtt_frame_publish{message_id = PacketId}},
#proc_state{unacked_server_pubs = U0,
queue_states = QStates0} = PState) ->
case maps:take(PacketId, U0) of
{QMsgId, U} ->
%% TODO creating binary is expensive
QName = queue_name(?QOS_1, PState),
case rabbit_queue_type:settle(QName, complete, ?CONSUMER_TAG, [QMsgId], QStates0) of
{ok, QStates, [] = _Actions} ->
% incr_queue_stats(QRef, MsgIds, State),
%%TODO handle actions
{ok, PState#proc_state{unacked_server_pubs = U,
queue_states = QStates }};
{protocol_error, _ErrorType, _Reason, _ReasonArgs} = Err ->
{error, Err, PState}
end;
error ->
{ok, PState}
end;
2012-08-06 06:52:54 +08:00
process_request(?PUBLISH,
2016-05-18 21:11:50 +08:00
Frame = #mqtt_frame{
fixed = Fixed = #mqtt_frame_fixed{qos = ?QOS_2}},
PState) ->
% Downgrade QOS_2 to QOS_1
2016-05-18 21:11:50 +08:00
process_request(?PUBLISH,
Frame#mqtt_frame{
fixed = Fixed#mqtt_frame_fixed{qos = ?QOS_1}},
PState);
process_request(?PUBLISH,
#mqtt_frame{
fixed = #mqtt_frame_fixed{qos = Qos,
2012-08-06 06:52:54 +08:00
retain = Retain,
dup = Dup },
variable = #mqtt_frame_publish{topic_name = Topic,
2012-08-06 06:52:54 +08:00
message_id = MessageId },
payload = Payload},
PState0 = #proc_state{retainer_pid = RPid,
amqp2mqtt_fun = Amqp2MqttFun,
unacked_client_pubs = U,
proto_ver = ProtoVer}) ->
rabbit_global_counters:messages_received(ProtoVer, 1),
PState = maybe_increment_publisher(PState0),
Publish = fun() ->
Msg = #mqtt_msg{retain = Retain,
qos = Qos,
topic = Topic,
dup = Dup,
message_id = MessageId,
payload = Payload},
case publish_to_queues(Msg, PState) of
{ok, _} = Ok ->
case Retain of
false ->
ok;
true ->
hand_off_to_retainer(RPid, Amqp2MqttFun, Topic, Msg)
end,
Ok;
Error ->
Error
end
end,
case Qos of
N when N > ?QOS_0 ->
rabbit_global_counters:messages_received_confirm(ProtoVer, 1),
case rabbit_mqtt_confirms:contains(MessageId, U) of
false ->
publish_to_queues_with_checks(Topic, Publish, PState);
true ->
%% Client re-sent this PUBLISH packet.
%% We already sent this message to target queues awaiting confirmations.
%% Hence, we ignore this re-send.
{ok, PState}
end;
_ ->
publish_to_queues_with_checks(Topic, Publish, PState)
end;
process_request(?SUBSCRIBE,
#mqtt_frame{
variable = #mqtt_frame_subscribe{
message_id = SubscribeMsgId,
topic_table = Topics},
payload = undefined},
#proc_state{send_fun = SendFun,
retainer_pid = RPid} = PState0) ->
rabbit_log_connection:debug("Received a SUBSCRIBE for topic(s) ~p", [Topics]),
{QosResponse, PState1} =
lists:foldl(fun(_Topic, {[?SUBACK_FAILURE | _] = L, S}) ->
%% Once a subscription failed, mark all following subscriptions
%% as failed instead of creating bindings because we are going
%% to close the client connection anyways.
{[?SUBACK_FAILURE | L], S};
(#mqtt_topic{name = TopicName,
qos = Qos0},
{L, S0}) ->
QoS = supported_sub_qos(Qos0),
%%TODO check whether new subscription replaces an existing one
%% (i.e. same topic name and different QoS)
case ensure_queue(QoS, S0) of
{ok, Q} ->
QName = amqqueue:get_name(Q),
case bind(QName, TopicName, S0) of
{ok, _Output, S1} ->
%%TODO check what happens if we basic.consume multiple times
%% for the same queue
case consume(Q, QoS, S1) of
{ok, S2} ->
S3 = S2#proc_state{has_subs = true},
maybe_increment_consumer(S3, S2),
{[QoS | L], S3};
{error, _Reason} ->
{[?SUBACK_FAILURE | L], S1}
end;
{error, Reason, S} ->
rabbit_log:error("Failed to bind ~s with topic ~s: ~p",
[rabbit_misc:rs(QName), TopicName, Reason]),
{[?SUBACK_FAILURE | L], S}
end;
{error, _} ->
{[?SUBACK_FAILURE | L], S0}
end
end, {[], PState0}, Topics),
SendFun(
#mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK},
variable = #mqtt_frame_suback{
message_id = SubscribeMsgId,
%%TODO check correct order of QosResponse
qos_table = QosResponse}},
PState1),
case QosResponse of
[?SUBACK_FAILURE | _] ->
{error, subscribe_error, PState1};
_ ->
PState = lists:foldl(fun(Topic, S) ->
maybe_send_retained_message(RPid, Topic, S)
end, PState1, Topics),
{ok, PState}
end;
2012-07-05 00:47:07 +08:00
process_request(?UNSUBSCRIBE,
#mqtt_frame{variable = #mqtt_frame_subscribe{message_id = MessageId,
topic_table = Topics},
payload = undefined},
PState0 = #proc_state{send_fun = SendFun}) ->
rabbit_log_connection:debug("Received an UNSUBSCRIBE for topic(s) ~p", [Topics]),
PState = lists:foldl(
fun(#mqtt_topic{name = TopicName}, #proc_state{} = S0) ->
case find_queue_name(TopicName, S0) of
{ok, QName} ->
case unbind(QName, TopicName, S0) of
{ok, _, _} ->
PState0;
{error, Reason, State} ->
rabbit_log:error("Failed to unbind ~s with topic ~s: ~p",
[rabbit_misc:rs(QName), TopicName, Reason]),
State
end;
{not_found, _} ->
PState0
end
end, PState0, Topics),
SendFun(
#mqtt_frame{fixed = #mqtt_frame_fixed {type = ?UNSUBACK},
variable = #mqtt_frame_suback{message_id = MessageId}},
PState),
PState3 = case rabbit_binding:list_for_destination(queue_name(?QOS_0, PState)) of
[] ->
case rabbit_binding:list_for_destination(queue_name(?QOS_1, PState)) of
[] ->
PState2 = #proc_state{has_subs = false},
maybe_decrement_consumer(PState, PState2),
PState2;
_ ->
PState
end;
_ ->
PState
end,
{ok, PState3};
process_request(?PINGREQ, #mqtt_frame{}, PState = #proc_state{send_fun = SendFun}) ->
rabbit_log_connection:debug("Received a PINGREQ"),
SendFun(
#mqtt_frame{fixed = #mqtt_frame_fixed{type = ?PINGRESP}},
PState),
rabbit_log_connection:debug("Sent a PINGRESP"),
{ok, PState};
process_request(?DISCONNECT, #mqtt_frame{}, PState) ->
rabbit_log_connection:debug("Received a DISCONNECT"),
{stop, PState}.
2012-07-04 00:35:18 +08:00
2022-09-01 22:02:08 +08:00
process_connect(#mqtt_frame{
variable = #mqtt_frame_connect{
username = Username,
proto_ver = ProtoVersion,
clean_sess = CleanSess,
client_id = ClientId,
keep_alive = Keepalive} = FrameConnect},
PState0 = #proc_state{send_fun = SendFun}) ->
rabbit_log_connection:debug("Received a CONNECT, client ID: ~s, username: ~s, "
"clean session: ~s, protocol version: ~p, keepalive: ~p",
2022-09-01 22:02:08 +08:00
[ClientId, Username, CleanSess, ProtoVersion, Keepalive]),
{ReturnCode, SessionPresent, PState} =
case rabbit_misc:pipeline([fun check_protocol_version/1,
fun check_client_id/1,
fun check_credentials/2,
fun login/2,
2022-09-19 01:07:11 +08:00
fun register_client/2,
fun notify_connection_created/2,
fun start_keepalive/2,
fun handle_clean_session/2],
2022-09-01 22:02:08 +08:00
FrameConnect, PState0) of
{ok, SessionPresent0, PState1} ->
{?CONNACK_ACCEPT, SessionPresent0, PState1};
{error, ReturnCode0, PState1} ->
{ReturnCode0, false, PState1}
end,
ResponseFrame = #mqtt_frame{fixed = #mqtt_frame_fixed{type = ?CONNACK},
variable = #mqtt_frame_connack{
session_present = SessionPresent,
return_code = ReturnCode}},
SendFun(ResponseFrame, PState),
2022-09-01 22:02:08 +08:00
return_connack(ReturnCode, PState).
Use best-effort client ID tracking "Each Client connecting to the Server has a unique ClientId" "If the ClientId represents a Client already connected to the Server then the Server MUST disconnect the existing Client [MQTT-3.1.4-2]." Instead of tracking client IDs via Raft, we use local ETS tables in this commit. Previous tracking of client IDs via Raft: (+) consistency (does the right thing) (-) state of Ra process becomes large > 1GB with many (> 1 Million) MQTT clients (-) Ra process becomes a bottleneck when many MQTT clients (e.g. 300k) disconnect at the same time because monitor (DOWN) Ra commands get written resulting in Ra machine timeout. (-) if we need consistency, we ideally want a single source of truth, e.g. only Mnesia, or only Khepri (but not Mnesia + MQTT ra process) While above downsides could be fixed (e.g. avoiding DOWN commands by instead doing periodic cleanups of client ID entries using session interval in MQTT 5 or using subscription_ttl parameter in current RabbitMQ MQTT config), in this case we do not necessarily need the consistency guarantees Raft provides. In this commit, we try to comply with [MQTT-3.1.4-2] on a best-effort basis: If there are no network failures and no messages get lost, existing clients with duplicate client IDs get disconnected. In the presence of network failures / lost messages, two clients with the same client ID can end up publishing or receiving from the same queue. Arguably, that's acceptable and less worse than the scaling issues we experience when we want stronger consistency. Note that it is also the responsibility of the client to not connect twice with the same client ID. This commit also ensures that the client ID is a binary to save memory. A new feature flag is introduced, which when enabled, deletes the Ra cluster named 'mqtt_node'. Independent of that feature flag, client IDs are tracked locally in ETS tables. If that feature flag is disabled, client IDs are additionally tracked in Ra. The feature flag is required such that clients can continue to connect to all nodes except for the node being udpated in a rolling update. This commit also fixes a bug where previously all MQTT connections were cluster-wide closed when one RabbitMQ node was put into maintenance mode.
2022-10-10 16:40:20 +08:00
client_id(<<>>) ->
2022-09-01 22:02:08 +08:00
rabbit_mqtt_util:gen_client_id();
client_id(ClientId)
Use best-effort client ID tracking "Each Client connecting to the Server has a unique ClientId" "If the ClientId represents a Client already connected to the Server then the Server MUST disconnect the existing Client [MQTT-3.1.4-2]." Instead of tracking client IDs via Raft, we use local ETS tables in this commit. Previous tracking of client IDs via Raft: (+) consistency (does the right thing) (-) state of Ra process becomes large > 1GB with many (> 1 Million) MQTT clients (-) Ra process becomes a bottleneck when many MQTT clients (e.g. 300k) disconnect at the same time because monitor (DOWN) Ra commands get written resulting in Ra machine timeout. (-) if we need consistency, we ideally want a single source of truth, e.g. only Mnesia, or only Khepri (but not Mnesia + MQTT ra process) While above downsides could be fixed (e.g. avoiding DOWN commands by instead doing periodic cleanups of client ID entries using session interval in MQTT 5 or using subscription_ttl parameter in current RabbitMQ MQTT config), in this case we do not necessarily need the consistency guarantees Raft provides. In this commit, we try to comply with [MQTT-3.1.4-2] on a best-effort basis: If there are no network failures and no messages get lost, existing clients with duplicate client IDs get disconnected. In the presence of network failures / lost messages, two clients with the same client ID can end up publishing or receiving from the same queue. Arguably, that's acceptable and less worse than the scaling issues we experience when we want stronger consistency. Note that it is also the responsibility of the client to not connect twice with the same client ID. This commit also ensures that the client ID is a binary to save memory. A new feature flag is introduced, which when enabled, deletes the Ra cluster named 'mqtt_node'. Independent of that feature flag, client IDs are tracked locally in ETS tables. If that feature flag is disabled, client IDs are additionally tracked in Ra. The feature flag is required such that clients can continue to connect to all nodes except for the node being udpated in a rolling update. This commit also fixes a bug where previously all MQTT connections were cluster-wide closed when one RabbitMQ node was put into maintenance mode.
2022-10-10 16:40:20 +08:00
when is_binary(ClientId) ->
2022-09-01 22:02:08 +08:00
ClientId.
check_protocol_version(#mqtt_frame_connect{proto_ver = ProtoVersion}) ->
case lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)) of
true ->
ok;
false ->
{error, ?CONNACK_UNACCEPTABLE_PROTO_VER}
end.
check_client_id(#mqtt_frame_connect{clean_sess = false,
client_id = []}) ->
{error, ?CONNACK_ID_REJECTED};
check_client_id(_) ->
ok.
check_credentials(Frame = #mqtt_frame_connect{username = Username,
password = Password},
PState = #proc_state{ssl_login_name = SslLoginName,
peer_addr = PeerAddr}) ->
Ip = list_to_binary(inet:ntoa(PeerAddr)),
case creds(Username, Password, SslLoginName) of
nocreds ->
rabbit_core_metrics:auth_attempt_failed(Ip, <<>>, mqtt),
rabbit_log_connection:error("MQTT login failed: no credentials provided"),
{error, ?CONNACK_BAD_CREDENTIALS};
{invalid_creds, {undefined, Pass}} when is_list(Pass) ->
rabbit_core_metrics:auth_attempt_failed(Ip, <<>>, mqtt),
rabbit_log_connection:error("MQTT login failed: no username is provided"),
{error, ?CONNACK_BAD_CREDENTIALS};
{invalid_creds, {User, undefined}} when is_list(User) ->
rabbit_core_metrics:auth_attempt_failed(Ip, User, mqtt),
rabbit_log_connection:error("MQTT login failed for user '~p': no password provided", [User]),
{error, ?CONNACK_BAD_CREDENTIALS};
{UserBin, PassBin} ->
{ok, {UserBin, PassBin, Frame}, PState}
end.
login({UserBin, PassBin,
Frame = #mqtt_frame_connect{client_id = ClientId0,
clean_sess = CleanSess}},
PState0) ->
2022-09-01 22:02:08 +08:00
ClientId = client_id(ClientId0),
case process_login(UserBin, PassBin, ClientId, PState0) of
already_connected ->
{ok, already_connected};
{ok, PState} ->
{ok, Frame, PState#proc_state{clean_sess = CleanSess,
client_id = ClientId}};
{error, _Reason, _PState} = Err ->
2022-09-01 22:02:08 +08:00
Err
end.
register_client(already_connected, _PState) ->
ok;
register_client(Frame = #mqtt_frame_connect{proto_ver = ProtoVersion},
PState = #proc_state{client_id = ClientId,
socket = Socket,
auth_state = #auth_state{vhost = VHost}}) ->
Use best-effort client ID tracking "Each Client connecting to the Server has a unique ClientId" "If the ClientId represents a Client already connected to the Server then the Server MUST disconnect the existing Client [MQTT-3.1.4-2]." Instead of tracking client IDs via Raft, we use local ETS tables in this commit. Previous tracking of client IDs via Raft: (+) consistency (does the right thing) (-) state of Ra process becomes large > 1GB with many (> 1 Million) MQTT clients (-) Ra process becomes a bottleneck when many MQTT clients (e.g. 300k) disconnect at the same time because monitor (DOWN) Ra commands get written resulting in Ra machine timeout. (-) if we need consistency, we ideally want a single source of truth, e.g. only Mnesia, or only Khepri (but not Mnesia + MQTT ra process) While above downsides could be fixed (e.g. avoiding DOWN commands by instead doing periodic cleanups of client ID entries using session interval in MQTT 5 or using subscription_ttl parameter in current RabbitMQ MQTT config), in this case we do not necessarily need the consistency guarantees Raft provides. In this commit, we try to comply with [MQTT-3.1.4-2] on a best-effort basis: If there are no network failures and no messages get lost, existing clients with duplicate client IDs get disconnected. In the presence of network failures / lost messages, two clients with the same client ID can end up publishing or receiving from the same queue. Arguably, that's acceptable and less worse than the scaling issues we experience when we want stronger consistency. Note that it is also the responsibility of the client to not connect twice with the same client ID. This commit also ensures that the client ID is a binary to save memory. A new feature flag is introduced, which when enabled, deletes the Ra cluster named 'mqtt_node'. Independent of that feature flag, client IDs are tracked locally in ETS tables. If that feature flag is disabled, client IDs are additionally tracked in Ra. The feature flag is required such that clients can continue to connect to all nodes except for the node being udpated in a rolling update. This commit also fixes a bug where previously all MQTT connections were cluster-wide closed when one RabbitMQ node was put into maintenance mode.
2022-10-10 16:40:20 +08:00
NewProcState =
fun(RegisterState) ->
rabbit_mqtt_util:register_clientid(VHost, ClientId),
2022-09-01 22:02:08 +08:00
RetainerPid = rabbit_mqtt_retainer_sup:child_for_vhost(VHost),
Prefetch = rabbit_mqtt_util:env(prefetch),
{ok, {PeerHost, PeerPort, Host, Port}} = rabbit_net:socket_ends(Socket, inbound),
ExchangeBin = rabbit_mqtt_util:env(exchange),
ExchangeName = rabbit_misc:r(VHost, exchange, ExchangeBin),
ProtoHumanReadable = {'MQTT', human_readable_mqtt_version(ProtoVersion)},
Use best-effort client ID tracking "Each Client connecting to the Server has a unique ClientId" "If the ClientId represents a Client already connected to the Server then the Server MUST disconnect the existing Client [MQTT-3.1.4-2]." Instead of tracking client IDs via Raft, we use local ETS tables in this commit. Previous tracking of client IDs via Raft: (+) consistency (does the right thing) (-) state of Ra process becomes large > 1GB with many (> 1 Million) MQTT clients (-) Ra process becomes a bottleneck when many MQTT clients (e.g. 300k) disconnect at the same time because monitor (DOWN) Ra commands get written resulting in Ra machine timeout. (-) if we need consistency, we ideally want a single source of truth, e.g. only Mnesia, or only Khepri (but not Mnesia + MQTT ra process) While above downsides could be fixed (e.g. avoiding DOWN commands by instead doing periodic cleanups of client ID entries using session interval in MQTT 5 or using subscription_ttl parameter in current RabbitMQ MQTT config), in this case we do not necessarily need the consistency guarantees Raft provides. In this commit, we try to comply with [MQTT-3.1.4-2] on a best-effort basis: If there are no network failures and no messages get lost, existing clients with duplicate client IDs get disconnected. In the presence of network failures / lost messages, two clients with the same client ID can end up publishing or receiving from the same queue. Arguably, that's acceptable and less worse than the scaling issues we experience when we want stronger consistency. Note that it is also the responsibility of the client to not connect twice with the same client ID. This commit also ensures that the client ID is a binary to save memory. A new feature flag is introduced, which when enabled, deletes the Ra cluster named 'mqtt_node'. Independent of that feature flag, client IDs are tracked locally in ETS tables. If that feature flag is disabled, client IDs are additionally tracked in Ra. The feature flag is required such that clients can continue to connect to all nodes except for the node being udpated in a rolling update. This commit also fixes a bug where previously all MQTT connections were cluster-wide closed when one RabbitMQ node was put into maintenance mode.
2022-10-10 16:40:20 +08:00
PState#proc_state{
exchange = ExchangeName,
will_msg = make_will_msg(Frame),
retainer_pid = RetainerPid,
register_state = RegisterState,
proto_ver = protocol_integer_to_atom(ProtoVersion),
Use best-effort client ID tracking "Each Client connecting to the Server has a unique ClientId" "If the ClientId represents a Client already connected to the Server then the Server MUST disconnect the existing Client [MQTT-3.1.4-2]." Instead of tracking client IDs via Raft, we use local ETS tables in this commit. Previous tracking of client IDs via Raft: (+) consistency (does the right thing) (-) state of Ra process becomes large > 1GB with many (> 1 Million) MQTT clients (-) Ra process becomes a bottleneck when many MQTT clients (e.g. 300k) disconnect at the same time because monitor (DOWN) Ra commands get written resulting in Ra machine timeout. (-) if we need consistency, we ideally want a single source of truth, e.g. only Mnesia, or only Khepri (but not Mnesia + MQTT ra process) While above downsides could be fixed (e.g. avoiding DOWN commands by instead doing periodic cleanups of client ID entries using session interval in MQTT 5 or using subscription_ttl parameter in current RabbitMQ MQTT config), in this case we do not necessarily need the consistency guarantees Raft provides. In this commit, we try to comply with [MQTT-3.1.4-2] on a best-effort basis: If there are no network failures and no messages get lost, existing clients with duplicate client IDs get disconnected. In the presence of network failures / lost messages, two clients with the same client ID can end up publishing or receiving from the same queue. Arguably, that's acceptable and less worse than the scaling issues we experience when we want stronger consistency. Note that it is also the responsibility of the client to not connect twice with the same client ID. This commit also ensures that the client ID is a binary to save memory. A new feature flag is introduced, which when enabled, deletes the Ra cluster named 'mqtt_node'. Independent of that feature flag, client IDs are tracked locally in ETS tables. If that feature flag is disabled, client IDs are additionally tracked in Ra. The feature flag is required such that clients can continue to connect to all nodes except for the node being udpated in a rolling update. This commit also fixes a bug where previously all MQTT connections were cluster-wide closed when one RabbitMQ node was put into maintenance mode.
2022-10-10 16:40:20 +08:00
info = #info{prefetch = Prefetch,
peer_host = PeerHost,
peer_port = PeerPort,
host = Host,
port = Port,
proto_human = ProtoHumanReadable
}}
end,
case rabbit_mqtt_ff:track_client_id_in_ra() of
true ->
case rabbit_mqtt_collector:register(ClientId, self()) of
{ok, Corr} ->
{ok, NewProcState({pending, Corr})};
{error, _} = Err ->
%% e.g. this node was removed from the MQTT cluster members
rabbit_log_connection:error("MQTT cannot accept a connection: "
"client ID tracker is unavailable: ~p", [Err]),
{error, ?CONNACK_SERVER_UNAVAILABLE};
{timeout, _} ->
rabbit_log_connection:error("MQTT cannot accept a connection: "
"client ID registration timed out"),
{error, ?CONNACK_SERVER_UNAVAILABLE}
end;
false ->
{ok, NewProcState(undefined)}
2022-09-01 22:02:08 +08:00
end.
notify_connection_created(already_connected, _PState) ->
ok;
2022-09-19 01:07:11 +08:00
notify_connection_created(
_Frame,
#proc_state{socket = Sock,
conn_name = ConnName,
info = #info{proto_human = {ProtoName, ProtoVsn}},
auth_state = #auth_state{vhost = VHost,
username = Username}} = PState) ->
{ok, {PeerHost, PeerPort, Host, Port}} = rabbit_net:socket_ends(Sock, inbound),
ConnectedAt = os:system_time(milli_seconds),
Infos = [{host, Host},
{port, Port},
{peer_host, PeerHost},
{peer_port, PeerPort},
{vhost, VHost},
{node, node()},
{user, Username},
{name, ConnName},
{connected_at, ConnectedAt},
{pid, self()},
{protocol, {ProtoName, binary_to_list(ProtoVsn)}},
{type, network}
],
rabbit_core_metrics:connection_created(self(), Infos),
rabbit_event:notify(connection_created, Infos),
rabbit_networking:register_non_amqp_connection(self()),
2022-09-19 01:07:11 +08:00
{ok, PState#proc_state{
%% We won't need conn_name anymore. Use less memmory by setting to undefined.
conn_name = undefined}}.
human_readable_mqtt_version(3) ->
<<"3.1.0">>;
human_readable_mqtt_version(4) ->
<<"3.1.1">>.
2022-09-01 22:02:08 +08:00
return_connack(?CONNACK_ACCEPT, S) ->
{ok, S};
return_connack(?CONNACK_BAD_CREDENTIALS, S) ->
{error, unauthenticated, S};
return_connack(?CONNACK_NOT_AUTHORIZED, S) ->
{error, unauthorized, S};
return_connack(?CONNACK_SERVER_UNAVAILABLE, S) ->
{error, unavailable, S};
return_connack(?CONNACK_ID_REJECTED, S) ->
{error, invalid_client_id, S};
return_connack(?CONNACK_UNACCEPTABLE_PROTO_VER, S) ->
{error, unsupported_protocol_version, S}.
start_keepalive(#mqtt_frame_connect{keep_alive = Seconds},
#proc_state{socket = Socket}) ->
ok = rabbit_mqtt_keepalive:start(Seconds, Socket).
handle_clean_session(_, PState0 = #proc_state{clean_sess = false}) ->
case get_queue(?QOS_1, PState0) of
{error, not_found} ->
%% Queue will be created later when client subscribes.
{ok, _SessionPresent = false, PState0};
{ok, Q} ->
case consume(Q, ?QOS_1, PState0) of
{ok, PState} ->
{ok, _SessionPresent = true, PState};
{error, access_refused} ->
{error, ?CONNACK_NOT_AUTHORIZED};
{error, _Reason} ->
%% Let's use most generic error return code.
{error, ?CONNACK_SERVER_UNAVAILABLE}
2022-09-01 22:02:08 +08:00
end
end;
handle_clean_session(_, PState = #proc_state{clean_sess = true,
auth_state = #auth_state{user = User,
username = Username,
authz_ctx = AuthzCtx}}) ->
case get_queue(?QOS_1, PState) of
{error, not_found} ->
{ok, _SessionPresent = false, PState};
{ok, Q0} ->
QName = amqqueue:get_name(Q0),
%% configure access to queue required for queue.delete
case check_resource_access(User, QName, configure, AuthzCtx) of
ok ->
rabbit_amqqueue:with(
QName,
fun (Q) ->
rabbit_queue_type:delete(Q, false, false, Username)
end,
fun (not_found) ->
ok;
({absent, Q, crashed}) ->
rabbit_classic_queue:delete_crashed(Q, Username);
({absent, Q, stopped}) ->
rabbit_classic_queue:delete_crashed(Q, Username);
({absent, _Q, _Reason}) ->
ok
end),
{ok, _SessionPresent = false, PState};
{error, access_refused} ->
{error, ?CONNACK_NOT_AUTHORIZED}
end
2022-09-01 22:02:08 +08:00
end.
-spec get_queue(qos(), proc_state()) ->
{ok, amqqueue:amqqueue()} | {error, not_found}.
get_queue(QoS, PState) ->
QName = queue_name(QoS, PState),
rabbit_amqqueue:lookup(QName).
Skip queue when MQTT QoS 0 This commit allows for huge fanouts if the MQTT subscriber connects with clean_session = true and QoS 0. Messages are not sent to a conventional queue. Instead, messages are forwarded directly from MQTT publisher connection process or channel to MQTT subscriber connection process. So, the queue process is skipped. The MQTT subscriber connection process acts as the queue process. Its mailbox is a superset of the queue. This new queue type is called rabbit_mqtt_qos0_queue. Given that the only current use case is MQTT, this queue type is currently defined in the MQTT plugin. The rabbit app is not aware that this new queue type exists. The new queue gets persisted as any other queue such that routing via the topic exchange contineues to work as usual. This allows routing across different protocols without any additional changes, e.g. huge fanout from AMQP client (or management UI) to all MQTT devices. The main benefit is that memory usage of the publishing process is kept at 0 MB once garbage collection kicked in (when hibernating the gen_server). This is achieved by having this queue type's client not maintain any state. Previously, without this new queue type, the publisher process maintained state of 500MB to all the 1 million destination queues even long after stopping sending messages to these queues. Another big benefit is that no queue process need to be created. Prior to this commit, with 1 million MQTT subscribers, 3 million Erlang processes got created: 1 million MQTT connection processes, 1 million classic queue processes, and 1 million classic queue supervisor processes. After this commit, only the 1 million MQTT connection processes get created. Hence, a few GBs of process memory will be saved. Yet another big benefit is that because the new queue type's client auto-settles the delivery when sending, the publishing process only awaits confirmation from queues who potentially have at-least-once consumers. So, the publishing process is not blocked on sending the confirm back to the publisher if 1 message is let's say routed to 1 million MQTT QoS 0 subscribers while 1 copy is routed to an important quorum queue or stream and while a single out of the million MQTT connection processes is down. Other benefits include: * Lower publisher confirm latency * Reduced inter-node network traffic In a certain sense, this commit allows RabbitMQ to act as a high scale and high throughput MQTT router (that obviously can lose messages at any time given the QoS is 0). For example, it allows use cases as using RabbitMQ to send messages cheaply and quickly to 1 million devices that happen to be online at the given time: e.g. send a notification to any online mobile device.
2022-10-24 01:06:01 +08:00
queue_name(QoS, #proc_state{client_id = ClientId,
auth_state = #auth_state{vhost = VHost}}) ->
QNameBin = rabbit_mqtt_util:queue_name_bin(ClientId, QoS),
rabbit_misc:r(VHost, queue, QNameBin).
find_queue_name(TopicName, #proc_state{exchange = Exchange,
mqtt2amqp_fun = Mqtt2AmqpFun} = PState) ->
RoutingKey = Mqtt2AmqpFun(TopicName),
QName0 = queue_name(?QOS_0, PState),
case lookup_binding(Exchange, QName0, RoutingKey) of
true ->
{ok, QName0};
false ->
QName1 = queue_name(?QOS_1, PState),
case lookup_binding(Exchange, QName1, RoutingKey) of
true ->
{ok, QName1};
false ->
{not_found, []}
end
end.
lookup_binding(Exchange, QueueName, RoutingKey) ->
B= #binding{source = Exchange,
destination = QueueName,
key = RoutingKey},
lists:member(B, rabbit_binding:list_for_source_and_destination(Exchange, QueueName)).
2019-09-04 23:07:33 +08:00
hand_off_to_retainer(RetainerPid, Amqp2MqttFun, Topic0, #mqtt_msg{payload = <<"">>}) ->
Topic1 = Amqp2MqttFun(Topic0),
rabbit_mqtt_retainer:clear(RetainerPid, Topic1),
ok;
hand_off_to_retainer(RetainerPid, Amqp2MqttFun, Topic0, Msg) ->
Topic1 = Amqp2MqttFun(Topic0),
rabbit_mqtt_retainer:retain(RetainerPid, Topic1, Msg),
ok.
maybe_send_retained_message(RPid, #mqtt_topic{name = Topic0, qos = SubscribeQos},
#proc_state{amqp2mqtt_fun = Amqp2MqttFun,
packet_id = PacketId0,
send_fun = SendFun} = PState0) ->
2019-09-04 23:07:33 +08:00
Topic1 = Amqp2MqttFun(Topic0),
case rabbit_mqtt_retainer:fetch(RPid, Topic1) of
undefined ->
PState0;
Msg ->
Qos = effective_qos(Msg#mqtt_msg.qos, SubscribeQos),
{PacketId, PState} = case Qos of
?QOS_0 ->
{undefined, PState0};
?QOS_1 ->
{PacketId0, PState0#proc_state{packet_id = increment_packet_id(PacketId0)}}
end,
SendFun(
#mqtt_frame{fixed = #mqtt_frame_fixed{
2012-08-06 06:52:54 +08:00
type = ?PUBLISH,
qos = Qos,
dup = false,
retain = Msg#mqtt_msg.retain
}, variable = #mqtt_frame_publish{
message_id = PacketId,
topic_name = Topic1
},
payload = Msg#mqtt_msg.payload},
PState),
PState
2012-08-06 06:52:54 +08:00
end.
make_will_msg(#mqtt_frame_connect{will_flag = false}) ->
2012-08-06 06:52:54 +08:00
undefined;
make_will_msg(#mqtt_frame_connect{will_retain = Retain,
will_qos = Qos,
will_topic = Topic,
will_msg = Msg}) ->
#mqtt_msg{retain = Retain,
qos = Qos,
topic = Topic,
dup = false,
payload = Msg}.
2012-07-04 00:35:18 +08:00
2022-09-01 22:02:08 +08:00
process_login(_UserBin, _PassBin, _ClientId,
#proc_state{peer_addr = Addr,
auth_state = #auth_state{username = Username,
user = User,
vhost = VHost
}})
when Username =/= undefined, User =/= undefined, VHost =/= underfined ->
UsernameStr = rabbit_data_coercion:to_list(Username),
VHostStr = rabbit_data_coercion:to_list(VHost),
2020-09-22 23:57:47 +08:00
rabbit_core_metrics:auth_attempt_failed(list_to_binary(inet:ntoa(Addr)), Username, mqtt),
rabbit_log_connection:warning("MQTT detected duplicate connect/login attempt for user ~tp, vhost ~tp",
[UsernameStr, VHostStr]),
already_connected;
Use best-effort client ID tracking "Each Client connecting to the Server has a unique ClientId" "If the ClientId represents a Client already connected to the Server then the Server MUST disconnect the existing Client [MQTT-3.1.4-2]." Instead of tracking client IDs via Raft, we use local ETS tables in this commit. Previous tracking of client IDs via Raft: (+) consistency (does the right thing) (-) state of Ra process becomes large > 1GB with many (> 1 Million) MQTT clients (-) Ra process becomes a bottleneck when many MQTT clients (e.g. 300k) disconnect at the same time because monitor (DOWN) Ra commands get written resulting in Ra machine timeout. (-) if we need consistency, we ideally want a single source of truth, e.g. only Mnesia, or only Khepri (but not Mnesia + MQTT ra process) While above downsides could be fixed (e.g. avoiding DOWN commands by instead doing periodic cleanups of client ID entries using session interval in MQTT 5 or using subscription_ttl parameter in current RabbitMQ MQTT config), in this case we do not necessarily need the consistency guarantees Raft provides. In this commit, we try to comply with [MQTT-3.1.4-2] on a best-effort basis: If there are no network failures and no messages get lost, existing clients with duplicate client IDs get disconnected. In the presence of network failures / lost messages, two clients with the same client ID can end up publishing or receiving from the same queue. Arguably, that's acceptable and less worse than the scaling issues we experience when we want stronger consistency. Note that it is also the responsibility of the client to not connect twice with the same client ID. This commit also ensures that the client ID is a binary to save memory. A new feature flag is introduced, which when enabled, deletes the Ra cluster named 'mqtt_node'. Independent of that feature flag, client IDs are tracked locally in ETS tables. If that feature flag is disabled, client IDs are additionally tracked in Ra. The feature flag is required such that clients can continue to connect to all nodes except for the node being udpated in a rolling update. This commit also fixes a bug where previously all MQTT connections were cluster-wide closed when one RabbitMQ node was put into maintenance mode.
2022-10-10 16:40:20 +08:00
process_login(UserBin, PassBin, ClientId,
2022-09-01 22:02:08 +08:00
#proc_state{socket = Sock,
ssl_login_name = SslLoginName,
2022-09-01 22:02:08 +08:00
peer_addr = Addr,
auth_state = undefined} = PState0) ->
{ok, {_PeerHost, _PeerPort, _Host, Port}} = rabbit_net:socket_ends(Sock, inbound),
{VHostPickedUsing, {VHost, UsernameBin}} = get_vhost(UserBin, SslLoginName, Port),
rabbit_log_connection:debug(
"MQTT vhost picked using ~s",
[human_readable_vhost_lookup_strategy(VHostPickedUsing)]),
RemoteIpAddressBin = list_to_binary(inet:ntoa(Addr)),
Input = #{vhost => VHost,
username_bin => UsernameBin,
pass_bin => PassBin,
client_id => ClientId},
case rabbit_misc:pipeline(
[fun check_vhost_exists/1,
fun check_vhost_connection_limit/1,
fun check_vhost_alive/1,
fun check_user_login/2,
fun check_user_connection_limit/1,
fun check_vhost_access/2,
fun check_user_loopback/2
],
Input, PState0) of
{ok, _Output, PState} ->
rabbit_core_metrics:auth_attempt_succeeded(RemoteIpAddressBin, UsernameBin, mqtt),
{ok, PState};
{error, _Reason, _PState} = Err ->
rabbit_core_metrics:auth_attempt_failed(RemoteIpAddressBin, UsernameBin, mqtt),
Err
end.
check_vhost_exists(#{vhost := VHost,
username_bin := UsernameBin}) ->
case rabbit_vhost:exists(VHost) of
true ->
ok;
false ->
rabbit_log_connection:error("MQTT login failed for user '~s': virtual host '~s' does not exist",
[UsernameBin, VHost]),
{error, ?CONNACK_BAD_CREDENTIALS}
end.
check_vhost_connection_limit(#{vhost := VHost,
username_bin := UsernameBin}) ->
case rabbit_vhost_limit:is_over_connection_limit(VHost) of
false ->
ok;
{true, Limit} ->
rabbit_log_connection:error(
"Error on MQTT connection ~p~n"
"access to vhost '~s' refused for user '~s': "
"vhost connection limit (~p) is reached",
[self(), VHost, UsernameBin, Limit]),
{error, ?CONNACK_NOT_AUTHORIZED}
end.
check_vhost_alive(#{vhost := VHost,
username_bin := UsernameBin}) ->
case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of
true ->
ok;
false ->
rabbit_log_connection:error(
"Error on MQTT connection ~p~n"
"access refused for user '~s': "
"vhost is down",
[self(), UsernameBin, VHost]),
{error, ?CONNACK_NOT_AUTHORIZED}
end.
check_user_login(#{vhost := VHost,
username_bin := UsernameBin,
pass_bin := PassBin,
client_id := ClientId
} = In, PState) ->
AuthProps = case PassBin of
none ->
%% SSL user name provided.
%% Authenticating using username only.
[];
_ ->
[{password, PassBin},
{vhost, VHost},
{client_id, ClientId}]
end,
case rabbit_access_control:check_user_login(
UsernameBin, AuthProps) of
{ok, User = #user{username = Username}} ->
notify_auth_result(user_authentication_success, Username, PState),
{ok, maps:put(user, User, In), PState};
{refused, Username, Msg, Args} ->
rabbit_log_connection:error(
"Error on MQTT connection ~p~n"
"access refused for user '~s' in vhost '~s' "
++ Msg,
[self(), Username, VHost] ++ Args),
notify_auth_result(user_authentication_failure, Username, PState),
{error, ?CONNACK_BAD_CREDENTIALS}
2014-02-19 01:33:00 +08:00
end.
notify_auth_result(AuthResult, Username, #proc_state{conn_name = ConnName}) ->
rabbit_event:notify(
AuthResult,
[
{name, case Username of
none -> '';
_ -> Username
end},
{connection_name, ConnName},
{connection_type, network}
]).
check_user_connection_limit(#{user := #user{username = Username}}) ->
case rabbit_auth_backend_internal:is_over_connection_limit(Username) of
false ->
ok;
{true, Limit} ->
rabbit_log_connection:error(
"Error on MQTT connection ~p~n"
"access refused for user '~s': "
"user connection limit (~p) is reached",
[self(), Username, Limit]),
{error, ?CONNACK_NOT_AUTHORIZED}
end.
check_vhost_access(#{vhost := VHost,
client_id := ClientId,
user := User = #user{username = Username}
} = In,
#proc_state{peer_addr = PeerAddr} = PState) ->
AuthzCtx = #{<<"client_id">> => ClientId},
try rabbit_access_control:check_vhost_access(
User,
VHost,
{ip, PeerAddr},
AuthzCtx) of
ok ->
{ok, maps:put(authz_ctx, AuthzCtx, In), PState}
catch exit:#amqp_error{name = not_allowed} ->
rabbit_log_connection:error(
"Error on MQTT connection ~p~n"
"access refused for user '~s'",
[self(), Username]),
{error, ?CONNACK_NOT_AUTHORIZED}
end.
check_user_loopback(#{vhost := VHost,
username_bin := UsernameBin,
user := User,
authz_ctx := AuthzCtx
},
2022-09-19 01:07:11 +08:00
#proc_state{peer_addr = PeerAddr} = PState) ->
case rabbit_access_control:check_user_loopback(UsernameBin, PeerAddr) of
ok ->
AuthState = #auth_state{user = User,
username = UsernameBin,
vhost = VHost,
authz_ctx = AuthzCtx},
{ok, PState#proc_state{auth_state = AuthState}};
not_allowed ->
rabbit_log_connection:warning(
"MQTT login failed: user '~s' can only connect via localhost",
[binary_to_list(UsernameBin)]),
{error, ?CONNACK_NOT_AUTHORIZED}
end.
get_vhost(UserBin, none, Port) ->
get_vhost_no_ssl(UserBin, Port);
get_vhost(UserBin, undefined, Port) ->
get_vhost_no_ssl(UserBin, Port);
get_vhost(UserBin, SslLogin, Port) ->
get_vhost_ssl(UserBin, SslLogin, Port).
get_vhost_no_ssl(UserBin, Port) ->
case vhost_in_username(UserBin) of
true ->
{vhost_in_username_or_default, get_vhost_username(UserBin)};
false ->
PortVirtualHostMapping = rabbit_runtime_parameters:value_global(
mqtt_port_to_vhost_mapping
),
case get_vhost_from_port_mapping(Port, PortVirtualHostMapping) of
undefined ->
{default_vhost, {rabbit_mqtt_util:env(vhost), UserBin}};
VHost ->
{port_to_vhost_mapping, {VHost, UserBin}}
end
end.
get_vhost_ssl(UserBin, SslLoginName, Port) ->
UserVirtualHostMapping = rabbit_runtime_parameters:value_global(
mqtt_default_vhosts
),
case get_vhost_from_user_mapping(SslLoginName, UserVirtualHostMapping) of
undefined ->
PortVirtualHostMapping = rabbit_runtime_parameters:value_global(
mqtt_port_to_vhost_mapping
),
case get_vhost_from_port_mapping(Port, PortVirtualHostMapping) of
undefined ->
{vhost_in_username_or_default, get_vhost_username(UserBin)};
VHostFromPortMapping ->
{port_to_vhost_mapping, {VHostFromPortMapping, UserBin}}
end;
VHostFromCertMapping ->
{cert_to_vhost_mapping, {VHostFromCertMapping, UserBin}}
end.
vhost_in_username(UserBin) ->
case application:get_env(?APP, ignore_colons_in_username) of
{ok, true} -> false;
_ ->
%% split at the last colon, disallowing colons in username
case re:split(UserBin, ":(?!.*?:)") of
[_, _] -> true;
[UserBin] -> false
end
end.
2013-11-18 20:00:47 +08:00
get_vhost_username(UserBin) ->
Default = {rabbit_mqtt_util:env(vhost), UserBin},
case application:get_env(?APP, ignore_colons_in_username) of
{ok, true} -> Default;
_ ->
%% split at the last colon, disallowing colons in username
case re:split(UserBin, ":(?!.*?:)") of
[Vhost, UserName] -> {Vhost, UserName};
[UserBin] -> Default
end
2013-11-18 20:00:47 +08:00
end.
get_vhost_from_user_mapping(_User, not_found) ->
undefined;
get_vhost_from_user_mapping(User, Mapping) ->
M = rabbit_data_coercion:to_proplist(Mapping),
case rabbit_misc:pget(User, M) of
undefined ->
undefined;
VHost ->
VHost
end.
get_vhost_from_port_mapping(_Port, not_found) ->
undefined;
get_vhost_from_port_mapping(Port, Mapping) ->
M = rabbit_data_coercion:to_proplist(Mapping),
Res = case rabbit_misc:pget(rabbit_data_coercion:to_binary(Port), M) of
undefined ->
undefined;
VHost ->
VHost
end,
Res.
2016-12-20 05:42:06 +08:00
human_readable_vhost_lookup_strategy(vhost_in_username_or_default) ->
"vhost in username or default";
human_readable_vhost_lookup_strategy(port_to_vhost_mapping) ->
"MQTT port to vhost mapping";
human_readable_vhost_lookup_strategy(cert_to_vhost_mapping) ->
"client certificate to vhost mapping";
human_readable_vhost_lookup_strategy(default_vhost) ->
"plugin configuration or default";
human_readable_vhost_lookup_strategy(Val) ->
atom_to_list(Val).
creds(User, Pass, SSLLoginName) ->
DefaultUser = rabbit_mqtt_util:env(default_user),
DefaultPass = rabbit_mqtt_util:env(default_pass),
{ok, Anon} = application:get_env(?APP, allow_anonymous),
{ok, TLSAuth} = application:get_env(?APP, ssl_cert_login),
HaveDefaultCreds = Anon =:= true andalso
is_binary(DefaultUser) andalso
is_binary(DefaultPass),
CredentialsProvided = User =/= undefined orelse
Pass =/= undefined,
CorrectCredentials = is_list(User) andalso
is_list(Pass),
SSLLoginProvided = TLSAuth =:= true andalso
SSLLoginName =/= none,
case {CredentialsProvided, CorrectCredentials, SSLLoginProvided, HaveDefaultCreds} of
2016-09-02 06:33:34 +08:00
%% Username and password take priority
{true, true, _, _} -> {list_to_binary(User),
list_to_binary(Pass)};
%% Either username or password is provided
2016-09-02 06:33:34 +08:00
{true, false, _, _} -> {invalid_creds, {User, Pass}};
%% rabbitmq_mqtt.ssl_cert_login is true. SSL user name provided.
2016-09-02 06:33:34 +08:00
%% Authenticating using username only.
{false, false, true, _} -> {SSLLoginName, none};
2016-09-02 06:33:34 +08:00
%% Anonymous connection uses default credentials
{false, false, false, true} -> {DefaultUser, DefaultPass};
_ -> nocreds
end.
2012-07-04 00:35:18 +08:00
supported_sub_qos(?QOS_0) -> ?QOS_0;
supported_sub_qos(?QOS_1) -> ?QOS_1;
supported_sub_qos(?QOS_2) -> ?QOS_1.
2012-08-06 06:52:54 +08:00
2014-01-30 23:33:59 +08:00
delivery_mode(?QOS_0) -> 1;
delivery_mode(?QOS_1) -> 2;
delivery_mode(?QOS_2) -> 2.
2014-01-30 23:33:59 +08:00
ensure_queue(QoS, #proc_state{
client_id = ClientId,
clean_sess = CleanSess,
auth_state = #auth_state{
vhost = VHost,
user = User = #user{username = Username},
authz_ctx = AuthzCtx}
} = PState) ->
case get_queue(QoS, PState) of
{ok, Q} ->
{ok, Q};
{error, not_found} ->
QNameBin = rabbit_mqtt_util:queue_name_bin(ClientId, QoS),
QName = rabbit_misc:r(VHost, queue, QNameBin),
%% configure access to queue required for queue.declare
case check_resource_access(User, QName, configure, AuthzCtx) of
ok ->
case rabbit_vhost_limit:is_over_queue_limit(VHost) of
false ->
Skip queue when MQTT QoS 0 This commit allows for huge fanouts if the MQTT subscriber connects with clean_session = true and QoS 0. Messages are not sent to a conventional queue. Instead, messages are forwarded directly from MQTT publisher connection process or channel to MQTT subscriber connection process. So, the queue process is skipped. The MQTT subscriber connection process acts as the queue process. Its mailbox is a superset of the queue. This new queue type is called rabbit_mqtt_qos0_queue. Given that the only current use case is MQTT, this queue type is currently defined in the MQTT plugin. The rabbit app is not aware that this new queue type exists. The new queue gets persisted as any other queue such that routing via the topic exchange contineues to work as usual. This allows routing across different protocols without any additional changes, e.g. huge fanout from AMQP client (or management UI) to all MQTT devices. The main benefit is that memory usage of the publishing process is kept at 0 MB once garbage collection kicked in (when hibernating the gen_server). This is achieved by having this queue type's client not maintain any state. Previously, without this new queue type, the publisher process maintained state of 500MB to all the 1 million destination queues even long after stopping sending messages to these queues. Another big benefit is that no queue process need to be created. Prior to this commit, with 1 million MQTT subscribers, 3 million Erlang processes got created: 1 million MQTT connection processes, 1 million classic queue processes, and 1 million classic queue supervisor processes. After this commit, only the 1 million MQTT connection processes get created. Hence, a few GBs of process memory will be saved. Yet another big benefit is that because the new queue type's client auto-settles the delivery when sending, the publishing process only awaits confirmation from queues who potentially have at-least-once consumers. So, the publishing process is not blocked on sending the confirm back to the publisher if 1 message is let's say routed to 1 million MQTT QoS 0 subscribers while 1 copy is routed to an important quorum queue or stream and while a single out of the million MQTT connection processes is down. Other benefits include: * Lower publisher confirm latency * Reduced inter-node network traffic In a certain sense, this commit allows RabbitMQ to act as a high scale and high throughput MQTT router (that obviously can lose messages at any time given the QoS is 0). For example, it allows use cases as using RabbitMQ to send messages cheaply and quickly to 1 million devices that happen to be online at the given time: e.g. send a notification to any online mobile device.
2022-10-24 01:06:01 +08:00
rabbit_core_metrics:queue_declared(QName),
QArgs = queue_args(QoS, CleanSess),
Q0 = amqqueue:new(QName,
self(),
_Durable = true,
_AutoDelete = false,
queue_owner(QoS, CleanSess),
QArgs,
VHost,
#{user => Username},
queue_type(QoS, CleanSess, QArgs)
),
case rabbit_queue_type:declare(Q0, node()) of
{new, Q} when ?is_amqqueue(Q) ->
rabbit_core_metrics:queue_created(QName),
{ok, Q};
Other ->
rabbit_log:error("Failed to declare ~s: ~p",
[rabbit_misc:rs(QName), Other]),
{error, queue_declare}
end;
{true, Limit} ->
rabbit_log:error("cannot declare ~s because "
"queue limit ~p in vhost '~s' is reached",
[rabbit_misc:rs(QName), Limit, VHost]),
{error, access_refused}
end;
{error, access_refused} = E ->
E
end
2012-08-06 06:52:54 +08:00
end.
queue_owner(QoS, CleanSess)
when QoS =:= ?QOS_0 orelse CleanSess ->
%% Exclusive queues are auto-deleted after node restart while auto-delete queues are not.
%% Therefore make durable queue exclusive.
self();
queue_owner(_, _) ->
none.
queue_args(QoS, CleanSess)
when QoS =:= ?QOS_0 orelse CleanSess ->
[];
queue_args(_, _) ->
Args = case rabbit_mqtt_util:env(subscription_ttl) of
Ms when is_integer(Ms) ->
[{<<"x-expires">>, long, Ms}];
_ ->
[]
end,
case rabbit_mqtt_util:env(durable_queue_type) of
quorum ->
[{<<"x-queue-type">>, longstr, <<"quorum">>} | Args];
_ ->
Args
end.
Skip queue when MQTT QoS 0 This commit allows for huge fanouts if the MQTT subscriber connects with clean_session = true and QoS 0. Messages are not sent to a conventional queue. Instead, messages are forwarded directly from MQTT publisher connection process or channel to MQTT subscriber connection process. So, the queue process is skipped. The MQTT subscriber connection process acts as the queue process. Its mailbox is a superset of the queue. This new queue type is called rabbit_mqtt_qos0_queue. Given that the only current use case is MQTT, this queue type is currently defined in the MQTT plugin. The rabbit app is not aware that this new queue type exists. The new queue gets persisted as any other queue such that routing via the topic exchange contineues to work as usual. This allows routing across different protocols without any additional changes, e.g. huge fanout from AMQP client (or management UI) to all MQTT devices. The main benefit is that memory usage of the publishing process is kept at 0 MB once garbage collection kicked in (when hibernating the gen_server). This is achieved by having this queue type's client not maintain any state. Previously, without this new queue type, the publisher process maintained state of 500MB to all the 1 million destination queues even long after stopping sending messages to these queues. Another big benefit is that no queue process need to be created. Prior to this commit, with 1 million MQTT subscribers, 3 million Erlang processes got created: 1 million MQTT connection processes, 1 million classic queue processes, and 1 million classic queue supervisor processes. After this commit, only the 1 million MQTT connection processes get created. Hence, a few GBs of process memory will be saved. Yet another big benefit is that because the new queue type's client auto-settles the delivery when sending, the publishing process only awaits confirmation from queues who potentially have at-least-once consumers. So, the publishing process is not blocked on sending the confirm back to the publisher if 1 message is let's say routed to 1 million MQTT QoS 0 subscribers while 1 copy is routed to an important quorum queue or stream and while a single out of the million MQTT connection processes is down. Other benefits include: * Lower publisher confirm latency * Reduced inter-node network traffic In a certain sense, this commit allows RabbitMQ to act as a high scale and high throughput MQTT router (that obviously can lose messages at any time given the QoS is 0). For example, it allows use cases as using RabbitMQ to send messages cheaply and quickly to 1 million devices that happen to be online at the given time: e.g. send a notification to any online mobile device.
2022-10-24 01:06:01 +08:00
queue_type(?QOS_0, true, _) ->
rabbit_mqtt_qos0_queue;
queue_type(_, _, QArgs) ->
rabbit_amqqueue:get_queue_type(QArgs).
consume(Q, QoS, #proc_state{
queue_states = QStates0,
auth_state = #auth_state{
user = User = #user{username = Username},
authz_ctx = AuthzCtx},
info = #info{prefetch = Prefetch}
} = PState0) ->
QName = amqqueue:get_name(Q),
%% read access to queue required for basic.consume
case check_resource_access(User, QName, read, AuthzCtx) of
ok ->
Skip queue when MQTT QoS 0 This commit allows for huge fanouts if the MQTT subscriber connects with clean_session = true and QoS 0. Messages are not sent to a conventional queue. Instead, messages are forwarded directly from MQTT publisher connection process or channel to MQTT subscriber connection process. So, the queue process is skipped. The MQTT subscriber connection process acts as the queue process. Its mailbox is a superset of the queue. This new queue type is called rabbit_mqtt_qos0_queue. Given that the only current use case is MQTT, this queue type is currently defined in the MQTT plugin. The rabbit app is not aware that this new queue type exists. The new queue gets persisted as any other queue such that routing via the topic exchange contineues to work as usual. This allows routing across different protocols without any additional changes, e.g. huge fanout from AMQP client (or management UI) to all MQTT devices. The main benefit is that memory usage of the publishing process is kept at 0 MB once garbage collection kicked in (when hibernating the gen_server). This is achieved by having this queue type's client not maintain any state. Previously, without this new queue type, the publisher process maintained state of 500MB to all the 1 million destination queues even long after stopping sending messages to these queues. Another big benefit is that no queue process need to be created. Prior to this commit, with 1 million MQTT subscribers, 3 million Erlang processes got created: 1 million MQTT connection processes, 1 million classic queue processes, and 1 million classic queue supervisor processes. After this commit, only the 1 million MQTT connection processes get created. Hence, a few GBs of process memory will be saved. Yet another big benefit is that because the new queue type's client auto-settles the delivery when sending, the publishing process only awaits confirmation from queues who potentially have at-least-once consumers. So, the publishing process is not blocked on sending the confirm back to the publisher if 1 message is let's say routed to 1 million MQTT QoS 0 subscribers while 1 copy is routed to an important quorum queue or stream and while a single out of the million MQTT connection processes is down. Other benefits include: * Lower publisher confirm latency * Reduced inter-node network traffic In a certain sense, this commit allows RabbitMQ to act as a high scale and high throughput MQTT router (that obviously can lose messages at any time given the QoS is 0). For example, it allows use cases as using RabbitMQ to send messages cheaply and quickly to 1 million devices that happen to be online at the given time: e.g. send a notification to any online mobile device.
2022-10-24 01:06:01 +08:00
case amqqueue:get_type(Q) of
rabbit_mqtt_qos0_queue ->
%% Messages get delivered directly to our process without
%% explicitly calling rabbit_queue_type:consume/3.
{ok, PState0};
_ ->
Spec = #{no_ack => QoS =:= ?QOS_0,
channel_pid => self(),
limiter_pid => none,
limiter_active => false,
prefetch_count => Prefetch,
consumer_tag => ?CONSUMER_TAG,
exclusive_consume => false,
args => [],
ok_msg => undefined,
acting_user => Username},
rabbit_amqqueue:with(
QName,
fun(Q1) ->
case rabbit_queue_type:consume(Q1, Spec, QStates0) of
{ok, QStates} ->
PState = PState0#proc_state{queue_states = QStates},
{ok, PState};
{error, Reason} = Err ->
rabbit_log:error("Failed to consume from ~s: ~p",
[rabbit_misc:rs(QName), Reason]),
Err
end
end)
end;
{error, access_refused} = Err ->
Err
end.
bind(QueueName, TopicName, PState) ->
binding_action_with_checks({QueueName, TopicName, fun rabbit_binding:add/2}, PState).
unbind(QueueName, TopicName, PState) ->
binding_action_with_checks({QueueName, TopicName, fun rabbit_binding:remove/2}, PState).
binding_action_with_checks(Input, PState) ->
%% Same permission checks required for both binding and unbinding
%% queue to / from topic exchange.
rabbit_misc:pipeline(
[fun check_queue_write_access/2,
fun check_exchange_read_access/2,
fun check_topic_access/2,
fun binding_action/2],
Input, PState).
check_queue_write_access(
{QueueName, _, _},
#proc_state{auth_state = #auth_state{
user = User,
authz_ctx = AuthzCtx}}) ->
%% write access to queue required for queue.(un)bind
check_resource_access(User, QueueName, write, AuthzCtx).
check_exchange_read_access(
_, #proc_state{exchange = ExchangeName,
auth_state = #auth_state{
user = User,
authz_ctx = AuthzCtx}}) ->
%% read access to exchange required for queue.(un)bind
check_resource_access(User, ExchangeName, read, AuthzCtx).
check_topic_access({_, TopicName, _}, PState) ->
check_topic_access(TopicName, read, PState).
binding_action(
{QueueName, TopicName, BindingFun},
#proc_state{exchange = ExchangeName,
auth_state = #auth_state{
user = #user{username = Username}},
mqtt2amqp_fun = Mqtt2AmqpFun}) ->
RoutingKey = Mqtt2AmqpFun(TopicName),
Binding = #binding{source = ExchangeName,
destination = QueueName,
key = RoutingKey},
BindingFun(Binding, Username).
send_will(#proc_state{will_msg = undefined}) ->
ok;
send_will(PState = #proc_state{will_msg = WillMsg = #mqtt_msg{retain = Retain,
topic = Topic},
retainer_pid = RPid,
2019-09-04 23:07:33 +08:00
amqp2mqtt_fun = Amqp2MqttFun}) ->
case check_topic_access(Topic, write, PState) of
ok ->
publish_to_queues(WillMsg, PState),
case Retain of
false ->
ok;
true ->
2019-09-04 23:07:33 +08:00
hand_off_to_retainer(RPid, Amqp2MqttFun, Topic, WillMsg)
end;
{error, access_refused = Reason} ->
rabbit_log:error("failed to send will message: ~p", [Reason])
end.
publish_to_queues(undefined, PState) ->
{ok, PState};
publish_to_queues(
#mqtt_msg{qos = Qos,
topic = Topic,
dup = Dup,
message_id = MessageId,
payload = Payload},
#proc_state{exchange = ExchangeName,
mqtt2amqp_fun = Mqtt2AmqpFun,
delivery_flow = Flow} = PState) ->
2019-09-04 23:07:33 +08:00
RoutingKey = Mqtt2AmqpFun(Topic),
Confirm = Qos > ?QOS_0,
2012-11-06 18:32:38 +08:00
Headers = [{<<"x-mqtt-publish-qos">>, byte, Qos},
{<<"x-mqtt-dup">>, bool, Dup}],
Props = #'P_basic'{
headers = Headers,
delivery_mode = delivery_mode(Qos)},
{ClassId, _MethodId} = rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
Content = #content{
class_id = ClassId,
properties = Props,
properties_bin = none,
protocol = none,
payload_fragments_rev = [Payload]
},
BasicMessage = #basic_message{
exchange_name = ExchangeName,
routing_keys = [RoutingKey],
content = Content,
id = <<>>, %% GUID set in rabbit_classic_queue
is_persistent = Confirm
},
Delivery = #delivery{
mandatory = false,
confirm = Confirm,
sender = self(),
message = BasicMessage,
msg_seq_no = MessageId,
flow = Flow
},
case rabbit_exchange:lookup(ExchangeName) of
{ok, Exchange} ->
QNames = rabbit_exchange:route(Exchange, Delivery),
deliver_to_queues(Delivery, QNames, PState);
{error, not_found} ->
rabbit_log:error("~s not found", [rabbit_misc:rs(ExchangeName)]),
{error, exchange_not_found, PState}
end.
deliver_to_queues(Delivery,
RoutedToQNames,
PState0 = #proc_state{queue_states = QStates0,
proto_ver = ProtoVer}) ->
%% TODO only lookup fields that are needed using ets:select / match?
%% TODO Use ETS continuations to be more space efficient
Qs0 = rabbit_amqqueue:lookup(RoutedToQNames),
Qs = rabbit_amqqueue:prepend_extra_bcc(Qs0),
case rabbit_queue_type:deliver(Qs, Delivery, QStates0) of
Skip queue when MQTT QoS 0 This commit allows for huge fanouts if the MQTT subscriber connects with clean_session = true and QoS 0. Messages are not sent to a conventional queue. Instead, messages are forwarded directly from MQTT publisher connection process or channel to MQTT subscriber connection process. So, the queue process is skipped. The MQTT subscriber connection process acts as the queue process. Its mailbox is a superset of the queue. This new queue type is called rabbit_mqtt_qos0_queue. Given that the only current use case is MQTT, this queue type is currently defined in the MQTT plugin. The rabbit app is not aware that this new queue type exists. The new queue gets persisted as any other queue such that routing via the topic exchange contineues to work as usual. This allows routing across different protocols without any additional changes, e.g. huge fanout from AMQP client (or management UI) to all MQTT devices. The main benefit is that memory usage of the publishing process is kept at 0 MB once garbage collection kicked in (when hibernating the gen_server). This is achieved by having this queue type's client not maintain any state. Previously, without this new queue type, the publisher process maintained state of 500MB to all the 1 million destination queues even long after stopping sending messages to these queues. Another big benefit is that no queue process need to be created. Prior to this commit, with 1 million MQTT subscribers, 3 million Erlang processes got created: 1 million MQTT connection processes, 1 million classic queue processes, and 1 million classic queue supervisor processes. After this commit, only the 1 million MQTT connection processes get created. Hence, a few GBs of process memory will be saved. Yet another big benefit is that because the new queue type's client auto-settles the delivery when sending, the publishing process only awaits confirmation from queues who potentially have at-least-once consumers. So, the publishing process is not blocked on sending the confirm back to the publisher if 1 message is let's say routed to 1 million MQTT QoS 0 subscribers while 1 copy is routed to an important quorum queue or stream and while a single out of the million MQTT connection processes is down. Other benefits include: * Lower publisher confirm latency * Reduced inter-node network traffic In a certain sense, this commit allows RabbitMQ to act as a high scale and high throughput MQTT router (that obviously can lose messages at any time given the QoS is 0). For example, it allows use cases as using RabbitMQ to send messages cheaply and quickly to 1 million devices that happen to be online at the given time: e.g. send a notification to any online mobile device.
2022-10-24 01:06:01 +08:00
{ok, QStates, Actions} ->
rabbit_global_counters:messages_routed(ProtoVer, length(Qs)),
Skip queue when MQTT QoS 0 This commit allows for huge fanouts if the MQTT subscriber connects with clean_session = true and QoS 0. Messages are not sent to a conventional queue. Instead, messages are forwarded directly from MQTT publisher connection process or channel to MQTT subscriber connection process. So, the queue process is skipped. The MQTT subscriber connection process acts as the queue process. Its mailbox is a superset of the queue. This new queue type is called rabbit_mqtt_qos0_queue. Given that the only current use case is MQTT, this queue type is currently defined in the MQTT plugin. The rabbit app is not aware that this new queue type exists. The new queue gets persisted as any other queue such that routing via the topic exchange contineues to work as usual. This allows routing across different protocols without any additional changes, e.g. huge fanout from AMQP client (or management UI) to all MQTT devices. The main benefit is that memory usage of the publishing process is kept at 0 MB once garbage collection kicked in (when hibernating the gen_server). This is achieved by having this queue type's client not maintain any state. Previously, without this new queue type, the publisher process maintained state of 500MB to all the 1 million destination queues even long after stopping sending messages to these queues. Another big benefit is that no queue process need to be created. Prior to this commit, with 1 million MQTT subscribers, 3 million Erlang processes got created: 1 million MQTT connection processes, 1 million classic queue processes, and 1 million classic queue supervisor processes. After this commit, only the 1 million MQTT connection processes get created. Hence, a few GBs of process memory will be saved. Yet another big benefit is that because the new queue type's client auto-settles the delivery when sending, the publishing process only awaits confirmation from queues who potentially have at-least-once consumers. So, the publishing process is not blocked on sending the confirm back to the publisher if 1 message is let's say routed to 1 million MQTT QoS 0 subscribers while 1 copy is routed to an important quorum queue or stream and while a single out of the million MQTT connection processes is down. Other benefits include: * Lower publisher confirm latency * Reduced inter-node network traffic In a certain sense, this commit allows RabbitMQ to act as a high scale and high throughput MQTT router (that obviously can lose messages at any time given the QoS is 0). For example, it allows use cases as using RabbitMQ to send messages cheaply and quickly to 1 million devices that happen to be online at the given time: e.g. send a notification to any online mobile device.
2022-10-24 01:06:01 +08:00
PState = process_routing_confirm(Delivery, Qs,
PState0#proc_state{queue_states = QStates}),
%% Actions must be processed after registering confirms as actions may
Skip queue when MQTT QoS 0 This commit allows for huge fanouts if the MQTT subscriber connects with clean_session = true and QoS 0. Messages are not sent to a conventional queue. Instead, messages are forwarded directly from MQTT publisher connection process or channel to MQTT subscriber connection process. So, the queue process is skipped. The MQTT subscriber connection process acts as the queue process. Its mailbox is a superset of the queue. This new queue type is called rabbit_mqtt_qos0_queue. Given that the only current use case is MQTT, this queue type is currently defined in the MQTT plugin. The rabbit app is not aware that this new queue type exists. The new queue gets persisted as any other queue such that routing via the topic exchange contineues to work as usual. This allows routing across different protocols without any additional changes, e.g. huge fanout from AMQP client (or management UI) to all MQTT devices. The main benefit is that memory usage of the publishing process is kept at 0 MB once garbage collection kicked in (when hibernating the gen_server). This is achieved by having this queue type's client not maintain any state. Previously, without this new queue type, the publisher process maintained state of 500MB to all the 1 million destination queues even long after stopping sending messages to these queues. Another big benefit is that no queue process need to be created. Prior to this commit, with 1 million MQTT subscribers, 3 million Erlang processes got created: 1 million MQTT connection processes, 1 million classic queue processes, and 1 million classic queue supervisor processes. After this commit, only the 1 million MQTT connection processes get created. Hence, a few GBs of process memory will be saved. Yet another big benefit is that because the new queue type's client auto-settles the delivery when sending, the publishing process only awaits confirmation from queues who potentially have at-least-once consumers. So, the publishing process is not blocked on sending the confirm back to the publisher if 1 message is let's say routed to 1 million MQTT QoS 0 subscribers while 1 copy is routed to an important quorum queue or stream and while a single out of the million MQTT connection processes is down. Other benefits include: * Lower publisher confirm latency * Reduced inter-node network traffic In a certain sense, this commit allows RabbitMQ to act as a high scale and high throughput MQTT router (that obviously can lose messages at any time given the QoS is 0). For example, it allows use cases as using RabbitMQ to send messages cheaply and quickly to 1 million devices that happen to be online at the given time: e.g. send a notification to any online mobile device.
2022-10-24 01:06:01 +08:00
%% contain rejections of publishes.
{ok, handle_queue_actions(Actions, PState)};
{error, Reason} ->
Skip queue when MQTT QoS 0 This commit allows for huge fanouts if the MQTT subscriber connects with clean_session = true and QoS 0. Messages are not sent to a conventional queue. Instead, messages are forwarded directly from MQTT publisher connection process or channel to MQTT subscriber connection process. So, the queue process is skipped. The MQTT subscriber connection process acts as the queue process. Its mailbox is a superset of the queue. This new queue type is called rabbit_mqtt_qos0_queue. Given that the only current use case is MQTT, this queue type is currently defined in the MQTT plugin. The rabbit app is not aware that this new queue type exists. The new queue gets persisted as any other queue such that routing via the topic exchange contineues to work as usual. This allows routing across different protocols without any additional changes, e.g. huge fanout from AMQP client (or management UI) to all MQTT devices. The main benefit is that memory usage of the publishing process is kept at 0 MB once garbage collection kicked in (when hibernating the gen_server). This is achieved by having this queue type's client not maintain any state. Previously, without this new queue type, the publisher process maintained state of 500MB to all the 1 million destination queues even long after stopping sending messages to these queues. Another big benefit is that no queue process need to be created. Prior to this commit, with 1 million MQTT subscribers, 3 million Erlang processes got created: 1 million MQTT connection processes, 1 million classic queue processes, and 1 million classic queue supervisor processes. After this commit, only the 1 million MQTT connection processes get created. Hence, a few GBs of process memory will be saved. Yet another big benefit is that because the new queue type's client auto-settles the delivery when sending, the publishing process only awaits confirmation from queues who potentially have at-least-once consumers. So, the publishing process is not blocked on sending the confirm back to the publisher if 1 message is let's say routed to 1 million MQTT QoS 0 subscribers while 1 copy is routed to an important quorum queue or stream and while a single out of the million MQTT connection processes is down. Other benefits include: * Lower publisher confirm latency * Reduced inter-node network traffic In a certain sense, this commit allows RabbitMQ to act as a high scale and high throughput MQTT router (that obviously can lose messages at any time given the QoS is 0). For example, it allows use cases as using RabbitMQ to send messages cheaply and quickly to 1 million devices that happen to be online at the given time: e.g. send a notification to any online mobile device.
2022-10-24 01:06:01 +08:00
rabbit_log:error("Failed to deliver message with packet_id=~p to queues: ~p",
[Delivery#delivery.msg_seq_no, Reason]),
{error, Reason, PState0}
end.
process_routing_confirm(#delivery{confirm = false},
[], PState = #proc_state{proto_ver = ProtoVer}) ->
rabbit_global_counters:messages_unroutable_dropped(ProtoVer, 1),
PState;
process_routing_confirm(#delivery{confirm = true,
msg_seq_no = undefined},
[], PState = #proc_state{proto_ver = ProtoVer}) ->
%% unroutable will message with QoS > 0
rabbit_global_counters:messages_unroutable_dropped(ProtoVer, 1),
PState;
process_routing_confirm(#delivery{confirm = true,
msg_seq_no = MsgId},
[], PState = #proc_state{proto_ver = ProtoVer}) ->
rabbit_global_counters:messages_unroutable_returned(ProtoVer, 1),
%% MQTT 5 spec:
%% If the Server knows that there are no matching subscribers, it MAY use
%% Reason Code 0x10 (No matching subscribers) instead of 0x00 (Success).
send_puback(MsgId, PState),
PState;
process_routing_confirm(#delivery{confirm = false}, _, PState) ->
PState;
process_routing_confirm(#delivery{confirm = true,
msg_seq_no = undefined}, [_|_], PState) ->
%% routable will message with QoS > 0
PState;
process_routing_confirm(#delivery{confirm = true,
msg_seq_no = MsgId},
Qs, PState = #proc_state{unacked_client_pubs = U0}) ->
Skip queue when MQTT QoS 0 This commit allows for huge fanouts if the MQTT subscriber connects with clean_session = true and QoS 0. Messages are not sent to a conventional queue. Instead, messages are forwarded directly from MQTT publisher connection process or channel to MQTT subscriber connection process. So, the queue process is skipped. The MQTT subscriber connection process acts as the queue process. Its mailbox is a superset of the queue. This new queue type is called rabbit_mqtt_qos0_queue. Given that the only current use case is MQTT, this queue type is currently defined in the MQTT plugin. The rabbit app is not aware that this new queue type exists. The new queue gets persisted as any other queue such that routing via the topic exchange contineues to work as usual. This allows routing across different protocols without any additional changes, e.g. huge fanout from AMQP client (or management UI) to all MQTT devices. The main benefit is that memory usage of the publishing process is kept at 0 MB once garbage collection kicked in (when hibernating the gen_server). This is achieved by having this queue type's client not maintain any state. Previously, without this new queue type, the publisher process maintained state of 500MB to all the 1 million destination queues even long after stopping sending messages to these queues. Another big benefit is that no queue process need to be created. Prior to this commit, with 1 million MQTT subscribers, 3 million Erlang processes got created: 1 million MQTT connection processes, 1 million classic queue processes, and 1 million classic queue supervisor processes. After this commit, only the 1 million MQTT connection processes get created. Hence, a few GBs of process memory will be saved. Yet another big benefit is that because the new queue type's client auto-settles the delivery when sending, the publishing process only awaits confirmation from queues who potentially have at-least-once consumers. So, the publishing process is not blocked on sending the confirm back to the publisher if 1 message is let's say routed to 1 million MQTT QoS 0 subscribers while 1 copy is routed to an important quorum queue or stream and while a single out of the million MQTT connection processes is down. Other benefits include: * Lower publisher confirm latency * Reduced inter-node network traffic In a certain sense, this commit allows RabbitMQ to act as a high scale and high throughput MQTT router (that obviously can lose messages at any time given the QoS is 0). For example, it allows use cases as using RabbitMQ to send messages cheaply and quickly to 1 million devices that happen to be online at the given time: e.g. send a notification to any online mobile device.
2022-10-24 01:06:01 +08:00
QNames = lists:map(fun amqqueue:get_name/1, Qs),
U = rabbit_mqtt_confirms:insert(MsgId, QNames, U0),
PState#proc_state{unacked_client_pubs = U}.
send_puback(MsgIds0, PState)
when is_list(MsgIds0) ->
%% Classic queues confirm messages unordered.
%% Let's sort them here assuming most MQTT clients send with an increasing packet identifier.
MsgIds = lists:usort(MsgIds0),
lists:foreach(fun(Id) ->
send_puback(Id, PState)
end, MsgIds);
send_puback(MsgId, PState = #proc_state{send_fun = SendFun,
proto_ver = ProtoVer}) ->
rabbit_global_counters:messages_confirmed(ProtoVer, 1),
SendFun(
#mqtt_frame{fixed = #mqtt_frame_fixed{type = ?PUBACK},
variable = #mqtt_frame_publish{message_id = MsgId}},
PState).
serialise_and_send_to_client(Frame, #proc_state{proto_ver = ProtoVer, socket = Sock}) ->
%%TODO Test sending large frames at high speed:
%% Will we need garbage collection as done in rabbit_writer:maybe_gc_large_msg/1?
%%TODO batch to fill up MTU if there are messages in the Erlang mailbox?
%% Check rabbit_writer:maybe_flush/1
Data = rabbit_mqtt_frame:serialise(Frame, ProtoVer),
try rabbit_net:port_command(Sock, Data)
catch _:Error ->
rabbit_log_connection:error("MQTT: a socket write failed, the socket might already be closed"),
rabbit_log_connection:debug("Failed to write to socket ~p, error: ~p, frame: ~p",
[Sock, Error, Frame])
end.
serialise(Frame, #proc_state{proto_ver = ProtoVer}) ->
rabbit_mqtt_frame:serialise(Frame, ProtoVer).
terminate(PState, ConnName) ->
Infos = [{name, ConnName},
{node, node()},
{pid, self()},
{disconnected_at, os:system_time(milli_seconds)}
] ++ additional_connection_closed_info(PState),
rabbit_event:notify(connection_closed, Infos),
rabbit_networking:unregister_non_amqp_connection(self()),
maybe_unregister_client(PState),
maybe_decrement_consumer(PState),
maybe_decrement_publisher(PState),
maybe_delete_mqtt_qos0_queue(PState).
additional_connection_closed_info(
#proc_state{info = #info{proto_human = {ProtoName, ProtoVsn}},
auth_state = #auth_state{vhost = VHost,
username = Username}}) ->
[{protocol, {ProtoName, binary_to_list(ProtoVsn)}},
{vhost, VHost},
{user, Username}];
additional_connection_closed_info(_) ->
[].
maybe_unregister_client(#proc_state{client_id = ClientId})
when ClientId =/= undefined ->
Use best-effort client ID tracking "Each Client connecting to the Server has a unique ClientId" "If the ClientId represents a Client already connected to the Server then the Server MUST disconnect the existing Client [MQTT-3.1.4-2]." Instead of tracking client IDs via Raft, we use local ETS tables in this commit. Previous tracking of client IDs via Raft: (+) consistency (does the right thing) (-) state of Ra process becomes large > 1GB with many (> 1 Million) MQTT clients (-) Ra process becomes a bottleneck when many MQTT clients (e.g. 300k) disconnect at the same time because monitor (DOWN) Ra commands get written resulting in Ra machine timeout. (-) if we need consistency, we ideally want a single source of truth, e.g. only Mnesia, or only Khepri (but not Mnesia + MQTT ra process) While above downsides could be fixed (e.g. avoiding DOWN commands by instead doing periodic cleanups of client ID entries using session interval in MQTT 5 or using subscription_ttl parameter in current RabbitMQ MQTT config), in this case we do not necessarily need the consistency guarantees Raft provides. In this commit, we try to comply with [MQTT-3.1.4-2] on a best-effort basis: If there are no network failures and no messages get lost, existing clients with duplicate client IDs get disconnected. In the presence of network failures / lost messages, two clients with the same client ID can end up publishing or receiving from the same queue. Arguably, that's acceptable and less worse than the scaling issues we experience when we want stronger consistency. Note that it is also the responsibility of the client to not connect twice with the same client ID. This commit also ensures that the client ID is a binary to save memory. A new feature flag is introduced, which when enabled, deletes the Ra cluster named 'mqtt_node'. Independent of that feature flag, client IDs are tracked locally in ETS tables. If that feature flag is disabled, client IDs are additionally tracked in Ra. The feature flag is required such that clients can continue to connect to all nodes except for the node being udpated in a rolling update. This commit also fixes a bug where previously all MQTT connections were cluster-wide closed when one RabbitMQ node was put into maintenance mode.
2022-10-10 16:40:20 +08:00
case rabbit_mqtt_ff:track_client_id_in_ra() of
true ->
%% ignore any errors as we are shutting down
Use best-effort client ID tracking "Each Client connecting to the Server has a unique ClientId" "If the ClientId represents a Client already connected to the Server then the Server MUST disconnect the existing Client [MQTT-3.1.4-2]." Instead of tracking client IDs via Raft, we use local ETS tables in this commit. Previous tracking of client IDs via Raft: (+) consistency (does the right thing) (-) state of Ra process becomes large > 1GB with many (> 1 Million) MQTT clients (-) Ra process becomes a bottleneck when many MQTT clients (e.g. 300k) disconnect at the same time because monitor (DOWN) Ra commands get written resulting in Ra machine timeout. (-) if we need consistency, we ideally want a single source of truth, e.g. only Mnesia, or only Khepri (but not Mnesia + MQTT ra process) While above downsides could be fixed (e.g. avoiding DOWN commands by instead doing periodic cleanups of client ID entries using session interval in MQTT 5 or using subscription_ttl parameter in current RabbitMQ MQTT config), in this case we do not necessarily need the consistency guarantees Raft provides. In this commit, we try to comply with [MQTT-3.1.4-2] on a best-effort basis: If there are no network failures and no messages get lost, existing clients with duplicate client IDs get disconnected. In the presence of network failures / lost messages, two clients with the same client ID can end up publishing or receiving from the same queue. Arguably, that's acceptable and less worse than the scaling issues we experience when we want stronger consistency. Note that it is also the responsibility of the client to not connect twice with the same client ID. This commit also ensures that the client ID is a binary to save memory. A new feature flag is introduced, which when enabled, deletes the Ra cluster named 'mqtt_node'. Independent of that feature flag, client IDs are tracked locally in ETS tables. If that feature flag is disabled, client IDs are additionally tracked in Ra. The feature flag is required such that clients can continue to connect to all nodes except for the node being udpated in a rolling update. This commit also fixes a bug where previously all MQTT connections were cluster-wide closed when one RabbitMQ node was put into maintenance mode.
2022-10-10 16:40:20 +08:00
rabbit_mqtt_collector:unregister(ClientId, self());
false ->
ok
end;
maybe_unregister_client(_) ->
ok.
Skip queue when MQTT QoS 0 This commit allows for huge fanouts if the MQTT subscriber connects with clean_session = true and QoS 0. Messages are not sent to a conventional queue. Instead, messages are forwarded directly from MQTT publisher connection process or channel to MQTT subscriber connection process. So, the queue process is skipped. The MQTT subscriber connection process acts as the queue process. Its mailbox is a superset of the queue. This new queue type is called rabbit_mqtt_qos0_queue. Given that the only current use case is MQTT, this queue type is currently defined in the MQTT plugin. The rabbit app is not aware that this new queue type exists. The new queue gets persisted as any other queue such that routing via the topic exchange contineues to work as usual. This allows routing across different protocols without any additional changes, e.g. huge fanout from AMQP client (or management UI) to all MQTT devices. The main benefit is that memory usage of the publishing process is kept at 0 MB once garbage collection kicked in (when hibernating the gen_server). This is achieved by having this queue type's client not maintain any state. Previously, without this new queue type, the publisher process maintained state of 500MB to all the 1 million destination queues even long after stopping sending messages to these queues. Another big benefit is that no queue process need to be created. Prior to this commit, with 1 million MQTT subscribers, 3 million Erlang processes got created: 1 million MQTT connection processes, 1 million classic queue processes, and 1 million classic queue supervisor processes. After this commit, only the 1 million MQTT connection processes get created. Hence, a few GBs of process memory will be saved. Yet another big benefit is that because the new queue type's client auto-settles the delivery when sending, the publishing process only awaits confirmation from queues who potentially have at-least-once consumers. So, the publishing process is not blocked on sending the confirm back to the publisher if 1 message is let's say routed to 1 million MQTT QoS 0 subscribers while 1 copy is routed to an important quorum queue or stream and while a single out of the million MQTT connection processes is down. Other benefits include: * Lower publisher confirm latency * Reduced inter-node network traffic In a certain sense, this commit allows RabbitMQ to act as a high scale and high throughput MQTT router (that obviously can lose messages at any time given the QoS is 0). For example, it allows use cases as using RabbitMQ to send messages cheaply and quickly to 1 million devices that happen to be online at the given time: e.g. send a notification to any online mobile device.
2022-10-24 01:06:01 +08:00
maybe_delete_mqtt_qos0_queue(PState = #proc_state{clean_sess = true,
auth_state = #auth_state{username = Username}}) ->
Skip queue when MQTT QoS 0 This commit allows for huge fanouts if the MQTT subscriber connects with clean_session = true and QoS 0. Messages are not sent to a conventional queue. Instead, messages are forwarded directly from MQTT publisher connection process or channel to MQTT subscriber connection process. So, the queue process is skipped. The MQTT subscriber connection process acts as the queue process. Its mailbox is a superset of the queue. This new queue type is called rabbit_mqtt_qos0_queue. Given that the only current use case is MQTT, this queue type is currently defined in the MQTT plugin. The rabbit app is not aware that this new queue type exists. The new queue gets persisted as any other queue such that routing via the topic exchange contineues to work as usual. This allows routing across different protocols without any additional changes, e.g. huge fanout from AMQP client (or management UI) to all MQTT devices. The main benefit is that memory usage of the publishing process is kept at 0 MB once garbage collection kicked in (when hibernating the gen_server). This is achieved by having this queue type's client not maintain any state. Previously, without this new queue type, the publisher process maintained state of 500MB to all the 1 million destination queues even long after stopping sending messages to these queues. Another big benefit is that no queue process need to be created. Prior to this commit, with 1 million MQTT subscribers, 3 million Erlang processes got created: 1 million MQTT connection processes, 1 million classic queue processes, and 1 million classic queue supervisor processes. After this commit, only the 1 million MQTT connection processes get created. Hence, a few GBs of process memory will be saved. Yet another big benefit is that because the new queue type's client auto-settles the delivery when sending, the publishing process only awaits confirmation from queues who potentially have at-least-once consumers. So, the publishing process is not blocked on sending the confirm back to the publisher if 1 message is let's say routed to 1 million MQTT QoS 0 subscribers while 1 copy is routed to an important quorum queue or stream and while a single out of the million MQTT connection processes is down. Other benefits include: * Lower publisher confirm latency * Reduced inter-node network traffic In a certain sense, this commit allows RabbitMQ to act as a high scale and high throughput MQTT router (that obviously can lose messages at any time given the QoS is 0). For example, it allows use cases as using RabbitMQ to send messages cheaply and quickly to 1 million devices that happen to be online at the given time: e.g. send a notification to any online mobile device.
2022-10-24 01:06:01 +08:00
case get_queue(?QOS_0, PState) of
{ok, Q} ->
%% double check we delete the right queue
case {amqqueue:get_type(Q), amqqueue:get_pid(Q)} of
{rabbit_mqtt_qos0_queue, Pid}
when Pid =:= self() ->
rabbit_queue_type:delete(Q, false, false, Username);
_ ->
ok
end;
{error, not_found} ->
ok
end;
maybe_delete_mqtt_qos0_queue(_) ->
Skip queue when MQTT QoS 0 This commit allows for huge fanouts if the MQTT subscriber connects with clean_session = true and QoS 0. Messages are not sent to a conventional queue. Instead, messages are forwarded directly from MQTT publisher connection process or channel to MQTT subscriber connection process. So, the queue process is skipped. The MQTT subscriber connection process acts as the queue process. Its mailbox is a superset of the queue. This new queue type is called rabbit_mqtt_qos0_queue. Given that the only current use case is MQTT, this queue type is currently defined in the MQTT plugin. The rabbit app is not aware that this new queue type exists. The new queue gets persisted as any other queue such that routing via the topic exchange contineues to work as usual. This allows routing across different protocols without any additional changes, e.g. huge fanout from AMQP client (or management UI) to all MQTT devices. The main benefit is that memory usage of the publishing process is kept at 0 MB once garbage collection kicked in (when hibernating the gen_server). This is achieved by having this queue type's client not maintain any state. Previously, without this new queue type, the publisher process maintained state of 500MB to all the 1 million destination queues even long after stopping sending messages to these queues. Another big benefit is that no queue process need to be created. Prior to this commit, with 1 million MQTT subscribers, 3 million Erlang processes got created: 1 million MQTT connection processes, 1 million classic queue processes, and 1 million classic queue supervisor processes. After this commit, only the 1 million MQTT connection processes get created. Hence, a few GBs of process memory will be saved. Yet another big benefit is that because the new queue type's client auto-settles the delivery when sending, the publishing process only awaits confirmation from queues who potentially have at-least-once consumers. So, the publishing process is not blocked on sending the confirm back to the publisher if 1 message is let's say routed to 1 million MQTT QoS 0 subscribers while 1 copy is routed to an important quorum queue or stream and while a single out of the million MQTT connection processes is down. Other benefits include: * Lower publisher confirm latency * Reduced inter-node network traffic In a certain sense, this commit allows RabbitMQ to act as a high scale and high throughput MQTT router (that obviously can lose messages at any time given the QoS is 0). For example, it allows use cases as using RabbitMQ to send messages cheaply and quickly to 1 million devices that happen to be online at the given time: e.g. send a notification to any online mobile device.
2022-10-24 01:06:01 +08:00
ok.
handle_pre_hibernate() ->
erase(permission_cache),
erase(topic_permission_cache),
ok.
handle_ra_event({applied, [{Corr, ok}]},
PState = #proc_state{register_state = {pending, Corr}}) ->
%% success case - command was applied transition into registered state
PState#proc_state{register_state = registered};
handle_ra_event({not_leader, Leader, Corr},
PState = #proc_state{register_state = {pending, Corr},
client_id = ClientId}) ->
Use best-effort client ID tracking "Each Client connecting to the Server has a unique ClientId" "If the ClientId represents a Client already connected to the Server then the Server MUST disconnect the existing Client [MQTT-3.1.4-2]." Instead of tracking client IDs via Raft, we use local ETS tables in this commit. Previous tracking of client IDs via Raft: (+) consistency (does the right thing) (-) state of Ra process becomes large > 1GB with many (> 1 Million) MQTT clients (-) Ra process becomes a bottleneck when many MQTT clients (e.g. 300k) disconnect at the same time because monitor (DOWN) Ra commands get written resulting in Ra machine timeout. (-) if we need consistency, we ideally want a single source of truth, e.g. only Mnesia, or only Khepri (but not Mnesia + MQTT ra process) While above downsides could be fixed (e.g. avoiding DOWN commands by instead doing periodic cleanups of client ID entries using session interval in MQTT 5 or using subscription_ttl parameter in current RabbitMQ MQTT config), in this case we do not necessarily need the consistency guarantees Raft provides. In this commit, we try to comply with [MQTT-3.1.4-2] on a best-effort basis: If there are no network failures and no messages get lost, existing clients with duplicate client IDs get disconnected. In the presence of network failures / lost messages, two clients with the same client ID can end up publishing or receiving from the same queue. Arguably, that's acceptable and less worse than the scaling issues we experience when we want stronger consistency. Note that it is also the responsibility of the client to not connect twice with the same client ID. This commit also ensures that the client ID is a binary to save memory. A new feature flag is introduced, which when enabled, deletes the Ra cluster named 'mqtt_node'. Independent of that feature flag, client IDs are tracked locally in ETS tables. If that feature flag is disabled, client IDs are additionally tracked in Ra. The feature flag is required such that clients can continue to connect to all nodes except for the node being udpated in a rolling update. This commit also fixes a bug where previously all MQTT connections were cluster-wide closed when one RabbitMQ node was put into maintenance mode.
2022-10-10 16:40:20 +08:00
case rabbit_mqtt_ff:track_client_id_in_ra() of
true ->
%% retry command against actual leader
{ok, NewCorr} = rabbit_mqtt_collector:register(Leader, ClientId, self()),
PState#proc_state{register_state = {pending, NewCorr}};
false ->
PState
end;
handle_ra_event(register_timeout,
PState = #proc_state{register_state = {pending, _Corr},
client_id = ClientId}) ->
Use best-effort client ID tracking "Each Client connecting to the Server has a unique ClientId" "If the ClientId represents a Client already connected to the Server then the Server MUST disconnect the existing Client [MQTT-3.1.4-2]." Instead of tracking client IDs via Raft, we use local ETS tables in this commit. Previous tracking of client IDs via Raft: (+) consistency (does the right thing) (-) state of Ra process becomes large > 1GB with many (> 1 Million) MQTT clients (-) Ra process becomes a bottleneck when many MQTT clients (e.g. 300k) disconnect at the same time because monitor (DOWN) Ra commands get written resulting in Ra machine timeout. (-) if we need consistency, we ideally want a single source of truth, e.g. only Mnesia, or only Khepri (but not Mnesia + MQTT ra process) While above downsides could be fixed (e.g. avoiding DOWN commands by instead doing periodic cleanups of client ID entries using session interval in MQTT 5 or using subscription_ttl parameter in current RabbitMQ MQTT config), in this case we do not necessarily need the consistency guarantees Raft provides. In this commit, we try to comply with [MQTT-3.1.4-2] on a best-effort basis: If there are no network failures and no messages get lost, existing clients with duplicate client IDs get disconnected. In the presence of network failures / lost messages, two clients with the same client ID can end up publishing or receiving from the same queue. Arguably, that's acceptable and less worse than the scaling issues we experience when we want stronger consistency. Note that it is also the responsibility of the client to not connect twice with the same client ID. This commit also ensures that the client ID is a binary to save memory. A new feature flag is introduced, which when enabled, deletes the Ra cluster named 'mqtt_node'. Independent of that feature flag, client IDs are tracked locally in ETS tables. If that feature flag is disabled, client IDs are additionally tracked in Ra. The feature flag is required such that clients can continue to connect to all nodes except for the node being udpated in a rolling update. This commit also fixes a bug where previously all MQTT connections were cluster-wide closed when one RabbitMQ node was put into maintenance mode.
2022-10-10 16:40:20 +08:00
case rabbit_mqtt_ff:track_client_id_in_ra() of
true ->
{ok, NewCorr} = rabbit_mqtt_collector:register(ClientId, self()),
PState#proc_state{register_state = {pending, NewCorr}};
false ->
PState
end;
handle_ra_event(register_timeout, PState) ->
PState;
handle_ra_event(Evt, PState) ->
%% log these?
rabbit_log:debug("unhandled ra_event: ~w ", [Evt]),
PState.
handle_down({{'DOWN', QName}, _MRef, process, QPid, Reason},
PState0 = #proc_state{queue_states = QStates0,
unacked_client_pubs = U0}) ->
credit_flow:peer_down(QPid),
case rabbit_queue_type:handle_down(QPid, QName, Reason, QStates0) of
{ok, QStates1, Actions} ->
PState1 = PState0#proc_state{queue_states = QStates1},
try handle_queue_actions(Actions, PState1) of
PState ->
{ok, PState}
catch throw:consuming_queue_down ->
{error, consuming_queue_down}
end;
{eol, QStates1, QRef} ->
{ConfirmMsgIds, U} = rabbit_mqtt_confirms:remove_queue(QRef, U0),
QStates = rabbit_queue_type:remove(QRef, QStates1),
PState = PState0#proc_state{queue_states = QStates,
unacked_client_pubs = U},
send_puback(ConfirmMsgIds, PState),
{ok, PState}
end.
Skip queue when MQTT QoS 0 This commit allows for huge fanouts if the MQTT subscriber connects with clean_session = true and QoS 0. Messages are not sent to a conventional queue. Instead, messages are forwarded directly from MQTT publisher connection process or channel to MQTT subscriber connection process. So, the queue process is skipped. The MQTT subscriber connection process acts as the queue process. Its mailbox is a superset of the queue. This new queue type is called rabbit_mqtt_qos0_queue. Given that the only current use case is MQTT, this queue type is currently defined in the MQTT plugin. The rabbit app is not aware that this new queue type exists. The new queue gets persisted as any other queue such that routing via the topic exchange contineues to work as usual. This allows routing across different protocols without any additional changes, e.g. huge fanout from AMQP client (or management UI) to all MQTT devices. The main benefit is that memory usage of the publishing process is kept at 0 MB once garbage collection kicked in (when hibernating the gen_server). This is achieved by having this queue type's client not maintain any state. Previously, without this new queue type, the publisher process maintained state of 500MB to all the 1 million destination queues even long after stopping sending messages to these queues. Another big benefit is that no queue process need to be created. Prior to this commit, with 1 million MQTT subscribers, 3 million Erlang processes got created: 1 million MQTT connection processes, 1 million classic queue processes, and 1 million classic queue supervisor processes. After this commit, only the 1 million MQTT connection processes get created. Hence, a few GBs of process memory will be saved. Yet another big benefit is that because the new queue type's client auto-settles the delivery when sending, the publishing process only awaits confirmation from queues who potentially have at-least-once consumers. So, the publishing process is not blocked on sending the confirm back to the publisher if 1 message is let's say routed to 1 million MQTT QoS 0 subscribers while 1 copy is routed to an important quorum queue or stream and while a single out of the million MQTT connection processes is down. Other benefits include: * Lower publisher confirm latency * Reduced inter-node network traffic In a certain sense, this commit allows RabbitMQ to act as a high scale and high throughput MQTT router (that obviously can lose messages at any time given the QoS is 0). For example, it allows use cases as using RabbitMQ to send messages cheaply and quickly to 1 million devices that happen to be online at the given time: e.g. send a notification to any online mobile device.
2022-10-24 01:06:01 +08:00
handle_queue_event({queue_event, rabbit_mqtt_qos0_queue, Msg}, PState0) ->
PState = deliver_one_to_client(Msg, false, PState0),
{ok, PState};
handle_queue_event({queue_event, QName, Evt},
PState0 = #proc_state{queue_states = QStates0,
unacked_client_pubs = U0}) ->
case rabbit_queue_type:handle_event(QName, Evt, QStates0) of
{ok, QStates, Actions} ->
PState1 = PState0#proc_state{queue_states = QStates},
PState = handle_queue_actions(Actions, PState1),
{ok, PState};
eol ->
%%TODO handle consuming queue down
% State1 = handle_consuming_queue_down_or_eol(QRef, State0),
{ConfirmMsgIds, U} = rabbit_mqtt_confirms:remove_queue(QName, U0),
QStates = rabbit_queue_type:remove(QName, QStates0),
PState = PState0#proc_state{queue_states = QStates,
unacked_client_pubs = U},
send_puback(ConfirmMsgIds, PState),
{ok, PState};
{protocol_error, _Type, _Reason, _ReasonArgs} = Error ->
{error, Error, PState0}
end.
handle_queue_actions(Actions, #proc_state{} = PState0) ->
lists:foldl(
fun ({deliver, ?CONSUMER_TAG, Ack, Msgs}, S) ->
deliver_to_client(Msgs, Ack, S);
({settled, QName, MsgIds}, S = #proc_state{unacked_client_pubs = U0}) ->
{ConfirmMsgIds, U} = rabbit_mqtt_confirms:confirm(MsgIds, QName, U0),
send_puback(ConfirmMsgIds, S),
S#proc_state{unacked_client_pubs = U};
({rejected, _QName, MsgIds}, S = #proc_state{unacked_client_pubs = U0}) ->
2022-11-11 02:55:47 +08:00
%% Negative acks are supported in MQTT v5 only.
%% Therefore, in MQTT v3 and v4 we ignore rejected messages.
U = lists:foldl(
fun(MsgId, Acc0) ->
case rabbit_mqtt_confirms:reject(MsgId, Acc0) of
{ok, Acc} -> Acc;
{error, not_found} -> Acc0
end
end, U0, MsgIds),
S#proc_state{unacked_client_pubs = U};
({block, QName}, S = #proc_state{soft_limit_exceeded = SLE}) ->
S#proc_state{soft_limit_exceeded = sets:add_element(QName, SLE)};
({unblock, QName}, S = #proc_state{soft_limit_exceeded = SLE}) ->
2022-11-11 02:55:47 +08:00
S#proc_state{soft_limit_exceeded = sets:del_element(QName, SLE)};
({queue_down, QName}, S) ->
handle_queue_down(QName, S)
end, PState0, Actions).
handle_queue_down(QName, PState0 = #proc_state{client_id = ClientId}) ->
%% Classic queue is down.
case rabbit_amqqueue:lookup(QName) of
{ok, Q} ->
case rabbit_mqtt_util:qos_from_queue_name(QName, ClientId) of
no_consuming_queue ->
PState0;
QoS ->
%% Consuming classic queue is down.
%% Let's try to re-consume: HA failover for classic mirrored queues.
case consume(Q, QoS, PState0) of
{ok, PState} ->
PState;
{error, _Reason} ->
rabbit_log:info("Terminating MQTT connection because consuming ~s is down.",
[rabbit_misc:rs(QName)]),
throw(consuming_queue_down)
end
end;
{error, not_found} ->
PState0
end.
deliver_to_client(Msgs, Ack, PState) ->
lists:foldl(fun(Msg, S) ->
deliver_one_to_client(Msg, Ack, S)
end, PState, Msgs).
Skip queue when MQTT QoS 0 This commit allows for huge fanouts if the MQTT subscriber connects with clean_session = true and QoS 0. Messages are not sent to a conventional queue. Instead, messages are forwarded directly from MQTT publisher connection process or channel to MQTT subscriber connection process. So, the queue process is skipped. The MQTT subscriber connection process acts as the queue process. Its mailbox is a superset of the queue. This new queue type is called rabbit_mqtt_qos0_queue. Given that the only current use case is MQTT, this queue type is currently defined in the MQTT plugin. The rabbit app is not aware that this new queue type exists. The new queue gets persisted as any other queue such that routing via the topic exchange contineues to work as usual. This allows routing across different protocols without any additional changes, e.g. huge fanout from AMQP client (or management UI) to all MQTT devices. The main benefit is that memory usage of the publishing process is kept at 0 MB once garbage collection kicked in (when hibernating the gen_server). This is achieved by having this queue type's client not maintain any state. Previously, without this new queue type, the publisher process maintained state of 500MB to all the 1 million destination queues even long after stopping sending messages to these queues. Another big benefit is that no queue process need to be created. Prior to this commit, with 1 million MQTT subscribers, 3 million Erlang processes got created: 1 million MQTT connection processes, 1 million classic queue processes, and 1 million classic queue supervisor processes. After this commit, only the 1 million MQTT connection processes get created. Hence, a few GBs of process memory will be saved. Yet another big benefit is that because the new queue type's client auto-settles the delivery when sending, the publishing process only awaits confirmation from queues who potentially have at-least-once consumers. So, the publishing process is not blocked on sending the confirm back to the publisher if 1 message is let's say routed to 1 million MQTT QoS 0 subscribers while 1 copy is routed to an important quorum queue or stream and while a single out of the million MQTT connection processes is down. Other benefits include: * Lower publisher confirm latency * Reduced inter-node network traffic In a certain sense, this commit allows RabbitMQ to act as a high scale and high throughput MQTT router (that obviously can lose messages at any time given the QoS is 0). For example, it allows use cases as using RabbitMQ to send messages cheaply and quickly to 1 million devices that happen to be online at the given time: e.g. send a notification to any online mobile device.
2022-10-24 01:06:01 +08:00
deliver_one_to_client(Msg = {QNameOrType, QPid, QMsgId, _Redelivered,
#basic_message{content = #content{properties = #'P_basic'{headers = Headers}}}},
AckRequired, PState0) ->
PublisherQoS = case rabbit_mqtt_util:table_lookup(Headers, <<"x-mqtt-publish-qos">>) of
{byte, QoS0} ->
QoS0;
undefined ->
%% non-MQTT publishes are assumed to be QoS 1 regardless of delivery_mode
?QOS_1
end,
SubscriberQoS = case AckRequired of
true ->
?QOS_1;
false ->
?QOS_0
end,
QoS = effective_qos(PublisherQoS, SubscriberQoS),
PState1 = maybe_publish_to_client(Msg, QoS, PState0),
Skip queue when MQTT QoS 0 This commit allows for huge fanouts if the MQTT subscriber connects with clean_session = true and QoS 0. Messages are not sent to a conventional queue. Instead, messages are forwarded directly from MQTT publisher connection process or channel to MQTT subscriber connection process. So, the queue process is skipped. The MQTT subscriber connection process acts as the queue process. Its mailbox is a superset of the queue. This new queue type is called rabbit_mqtt_qos0_queue. Given that the only current use case is MQTT, this queue type is currently defined in the MQTT plugin. The rabbit app is not aware that this new queue type exists. The new queue gets persisted as any other queue such that routing via the topic exchange contineues to work as usual. This allows routing across different protocols without any additional changes, e.g. huge fanout from AMQP client (or management UI) to all MQTT devices. The main benefit is that memory usage of the publishing process is kept at 0 MB once garbage collection kicked in (when hibernating the gen_server). This is achieved by having this queue type's client not maintain any state. Previously, without this new queue type, the publisher process maintained state of 500MB to all the 1 million destination queues even long after stopping sending messages to these queues. Another big benefit is that no queue process need to be created. Prior to this commit, with 1 million MQTT subscribers, 3 million Erlang processes got created: 1 million MQTT connection processes, 1 million classic queue processes, and 1 million classic queue supervisor processes. After this commit, only the 1 million MQTT connection processes get created. Hence, a few GBs of process memory will be saved. Yet another big benefit is that because the new queue type's client auto-settles the delivery when sending, the publishing process only awaits confirmation from queues who potentially have at-least-once consumers. So, the publishing process is not blocked on sending the confirm back to the publisher if 1 message is let's say routed to 1 million MQTT QoS 0 subscribers while 1 copy is routed to an important quorum queue or stream and while a single out of the million MQTT connection processes is down. Other benefits include: * Lower publisher confirm latency * Reduced inter-node network traffic In a certain sense, this commit allows RabbitMQ to act as a high scale and high throughput MQTT router (that obviously can lose messages at any time given the QoS is 0). For example, it allows use cases as using RabbitMQ to send messages cheaply and quickly to 1 million devices that happen to be online at the given time: e.g. send a notification to any online mobile device.
2022-10-24 01:06:01 +08:00
PState = maybe_ack(AckRequired, QoS, QNameOrType, QMsgId, PState1),
%%TODO GC
% case GCThreshold of
% undefined -> ok;
% _ -> rabbit_basic:maybe_gc_large_msg(Content, GCThreshold)
% end,
Skip queue when MQTT QoS 0 This commit allows for huge fanouts if the MQTT subscriber connects with clean_session = true and QoS 0. Messages are not sent to a conventional queue. Instead, messages are forwarded directly from MQTT publisher connection process or channel to MQTT subscriber connection process. So, the queue process is skipped. The MQTT subscriber connection process acts as the queue process. Its mailbox is a superset of the queue. This new queue type is called rabbit_mqtt_qos0_queue. Given that the only current use case is MQTT, this queue type is currently defined in the MQTT plugin. The rabbit app is not aware that this new queue type exists. The new queue gets persisted as any other queue such that routing via the topic exchange contineues to work as usual. This allows routing across different protocols without any additional changes, e.g. huge fanout from AMQP client (or management UI) to all MQTT devices. The main benefit is that memory usage of the publishing process is kept at 0 MB once garbage collection kicked in (when hibernating the gen_server). This is achieved by having this queue type's client not maintain any state. Previously, without this new queue type, the publisher process maintained state of 500MB to all the 1 million destination queues even long after stopping sending messages to these queues. Another big benefit is that no queue process need to be created. Prior to this commit, with 1 million MQTT subscribers, 3 million Erlang processes got created: 1 million MQTT connection processes, 1 million classic queue processes, and 1 million classic queue supervisor processes. After this commit, only the 1 million MQTT connection processes get created. Hence, a few GBs of process memory will be saved. Yet another big benefit is that because the new queue type's client auto-settles the delivery when sending, the publishing process only awaits confirmation from queues who potentially have at-least-once consumers. So, the publishing process is not blocked on sending the confirm back to the publisher if 1 message is let's say routed to 1 million MQTT QoS 0 subscribers while 1 copy is routed to an important quorum queue or stream and while a single out of the million MQTT connection processes is down. Other benefits include: * Lower publisher confirm latency * Reduced inter-node network traffic In a certain sense, this commit allows RabbitMQ to act as a high scale and high throughput MQTT router (that obviously can lose messages at any time given the QoS is 0). For example, it allows use cases as using RabbitMQ to send messages cheaply and quickly to 1 million devices that happen to be online at the given time: e.g. send a notification to any online mobile device.
2022-10-24 01:06:01 +08:00
ok = maybe_notify_sent(QNameOrType, QPid, PState),
PState.
-spec effective_qos(qos(), qos()) -> qos().
effective_qos(PublisherQoS, SubscriberQoS) ->
%% "The QoS of Application Messages sent in response to a Subscription MUST be the minimum
%% of the QoS of the originally published message and the Maximum QoS granted by the Server
%% [MQTT-3.8.4-8]."
erlang:min(PublisherQoS, SubscriberQoS).
maybe_publish_to_client({_, _, _, _Redelivered = true, _}, ?QOS_0, PState) ->
Skip queue when MQTT QoS 0 This commit allows for huge fanouts if the MQTT subscriber connects with clean_session = true and QoS 0. Messages are not sent to a conventional queue. Instead, messages are forwarded directly from MQTT publisher connection process or channel to MQTT subscriber connection process. So, the queue process is skipped. The MQTT subscriber connection process acts as the queue process. Its mailbox is a superset of the queue. This new queue type is called rabbit_mqtt_qos0_queue. Given that the only current use case is MQTT, this queue type is currently defined in the MQTT plugin. The rabbit app is not aware that this new queue type exists. The new queue gets persisted as any other queue such that routing via the topic exchange contineues to work as usual. This allows routing across different protocols without any additional changes, e.g. huge fanout from AMQP client (or management UI) to all MQTT devices. The main benefit is that memory usage of the publishing process is kept at 0 MB once garbage collection kicked in (when hibernating the gen_server). This is achieved by having this queue type's client not maintain any state. Previously, without this new queue type, the publisher process maintained state of 500MB to all the 1 million destination queues even long after stopping sending messages to these queues. Another big benefit is that no queue process need to be created. Prior to this commit, with 1 million MQTT subscribers, 3 million Erlang processes got created: 1 million MQTT connection processes, 1 million classic queue processes, and 1 million classic queue supervisor processes. After this commit, only the 1 million MQTT connection processes get created. Hence, a few GBs of process memory will be saved. Yet another big benefit is that because the new queue type's client auto-settles the delivery when sending, the publishing process only awaits confirmation from queues who potentially have at-least-once consumers. So, the publishing process is not blocked on sending the confirm back to the publisher if 1 message is let's say routed to 1 million MQTT QoS 0 subscribers while 1 copy is routed to an important quorum queue or stream and while a single out of the million MQTT connection processes is down. Other benefits include: * Lower publisher confirm latency * Reduced inter-node network traffic In a certain sense, this commit allows RabbitMQ to act as a high scale and high throughput MQTT router (that obviously can lose messages at any time given the QoS is 0). For example, it allows use cases as using RabbitMQ to send messages cheaply and quickly to 1 million devices that happen to be online at the given time: e.g. send a notification to any online mobile device.
2022-10-24 01:06:01 +08:00
%% Do not redeliver to MQTT subscriber who gets message at most once.
PState;
maybe_publish_to_client(
{_QName, _QPid, QMsgId, Redelivered,
#basic_message{
routing_keys = [RoutingKey | _CcRoutes],
content = #content{payload_fragments_rev = FragmentsRev}}},
QoS, PState0 = #proc_state{amqp2mqtt_fun = Amqp2MqttFun,
send_fun = SendFun}) ->
{PacketId, PState} = queue_message_id_to_packet_id(QMsgId, QoS, PState0),
%%TODO support iolists when sending to client
Payload = list_to_binary(lists:reverse(FragmentsRev)),
Frame =
#mqtt_frame{
fixed = #mqtt_frame_fixed{
type = ?PUBLISH,
qos = QoS,
%% "The value of the DUP flag from an incoming PUBLISH packet is not
%% propagated when the PUBLISH Packet is sent to subscribers by the Server.
%% The DUP flag in the outgoing PUBLISH packet is set independently to the
%% incoming PUBLISH packet, its value MUST be determined solely by whether
%% the outgoing PUBLISH packet is a retransmission [MQTT-3.3.1-3]."
%% Therefore, we do not consider header value <<"x-mqtt-dup">> here.
dup = Redelivered},
variable = #mqtt_frame_publish{
message_id = PacketId,
topic_name = Amqp2MqttFun(RoutingKey)},
payload = Payload},
SendFun(Frame, PState),
PState.
queue_message_id_to_packet_id(_, ?QOS_0, PState) ->
%% "A PUBLISH packet MUST NOT contain a Packet Identifier if its QoS value is set to 0 [MQTT-2.2.1-2]."
{undefined, PState};
queue_message_id_to_packet_id(QMsgId, ?QOS_1, #proc_state{packet_id = PktId,
unacked_server_pubs = U} = PState) ->
{PktId, PState#proc_state{packet_id = increment_packet_id(PktId),
unacked_server_pubs = maps:put(PktId, QMsgId, U)}}.
-spec increment_packet_id(packet_id()) -> packet_id().
increment_packet_id(Id)
when Id >= 16#ffff ->
1;
increment_packet_id(Id) ->
Id + 1.
maybe_ack(_AckRequired = true, ?QOS_0, QName, QMsgId,
PState = #proc_state{queue_states = QStates0}) ->
case rabbit_queue_type:settle(QName, complete, ?CONSUMER_TAG, [QMsgId], QStates0) of
{ok, QStates, [] = _Actions} ->
% incr_queue_stats(QRef, MsgIds, State),
%%TODO handle actions
PState#proc_state{queue_states = QStates};
{protocol_error, _ErrorType, _Reason, _ReasonArgs} = Err ->
%%TODO handle error
throw(Err)
end;
maybe_ack(_, _, _, _, PState) ->
PState.
Skip queue when MQTT QoS 0 This commit allows for huge fanouts if the MQTT subscriber connects with clean_session = true and QoS 0. Messages are not sent to a conventional queue. Instead, messages are forwarded directly from MQTT publisher connection process or channel to MQTT subscriber connection process. So, the queue process is skipped. The MQTT subscriber connection process acts as the queue process. Its mailbox is a superset of the queue. This new queue type is called rabbit_mqtt_qos0_queue. Given that the only current use case is MQTT, this queue type is currently defined in the MQTT plugin. The rabbit app is not aware that this new queue type exists. The new queue gets persisted as any other queue such that routing via the topic exchange contineues to work as usual. This allows routing across different protocols without any additional changes, e.g. huge fanout from AMQP client (or management UI) to all MQTT devices. The main benefit is that memory usage of the publishing process is kept at 0 MB once garbage collection kicked in (when hibernating the gen_server). This is achieved by having this queue type's client not maintain any state. Previously, without this new queue type, the publisher process maintained state of 500MB to all the 1 million destination queues even long after stopping sending messages to these queues. Another big benefit is that no queue process need to be created. Prior to this commit, with 1 million MQTT subscribers, 3 million Erlang processes got created: 1 million MQTT connection processes, 1 million classic queue processes, and 1 million classic queue supervisor processes. After this commit, only the 1 million MQTT connection processes get created. Hence, a few GBs of process memory will be saved. Yet another big benefit is that because the new queue type's client auto-settles the delivery when sending, the publishing process only awaits confirmation from queues who potentially have at-least-once consumers. So, the publishing process is not blocked on sending the confirm back to the publisher if 1 message is let's say routed to 1 million MQTT QoS 0 subscribers while 1 copy is routed to an important quorum queue or stream and while a single out of the million MQTT connection processes is down. Other benefits include: * Lower publisher confirm latency * Reduced inter-node network traffic In a certain sense, this commit allows RabbitMQ to act as a high scale and high throughput MQTT router (that obviously can lose messages at any time given the QoS is 0). For example, it allows use cases as using RabbitMQ to send messages cheaply and quickly to 1 million devices that happen to be online at the given time: e.g. send a notification to any online mobile device.
2022-10-24 01:06:01 +08:00
maybe_notify_sent(rabbit_mqtt_qos0_queue, _, _) ->
ok;
maybe_notify_sent(QName, QPid, #proc_state{queue_states = QStates}) ->
case rabbit_queue_type:module(QName, QStates) of
{ok, rabbit_classic_queue} ->
rabbit_amqqueue:notify_sent(QPid, self());
_ ->
ok
end.
publish_to_queues_with_checks(
TopicName, PublishFun,
#proc_state{exchange = Exchange,
auth_state = #auth_state{user = User,
authz_ctx = AuthzCtx}} = PState) ->
case check_resource_access(User, Exchange, write, AuthzCtx) of
ok ->
case check_topic_access(TopicName, write, PState) of
ok ->
PublishFun();
{error, access_refused} ->
{error, unauthorized, PState}
end;
{error, access_refused} ->
{error, unauthorized, PState}
end.
check_resource_access(User, Resource, Perm, Context) ->
V = {Resource, Context, Perm},
Cache = case get(permission_cache) of
undefined -> [];
Other -> Other
end,
case lists:member(V, Cache) of
true ->
ok;
false ->
try rabbit_access_control:check_resource_access(User, Resource, Perm, Context) of
ok ->
CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1),
put(permission_cache, [V | CacheTail]),
ok
catch exit:{amqp_error, access_refused, Msg, _AmqpMethod} ->
rabbit_log:error("MQTT resource access refused: ~s", [Msg]),
{error, access_refused}
end
end.
check_topic_access(TopicName, Access,
#proc_state{
auth_state = #auth_state{user = User = #user{username = Username},
vhost = VHost,
authz_ctx = AuthzCtx},
exchange = #resource{name = ExchangeBin},
client_id = ClientId,
mqtt2amqp_fun = Mqtt2AmqpFun}) ->
Cache = case get(topic_permission_cache) of
undefined -> [];
Other -> Other
end,
Key = {TopicName, Username, ClientId, VHost, ExchangeBin, Access},
case lists:member(Key, Cache) of
true ->
ok;
false ->
Resource = #resource{virtual_host = VHost,
kind = topic,
name = ExchangeBin},
RoutingKey = Mqtt2AmqpFun(TopicName),
Context = #{routing_key => RoutingKey,
variable_map => AuthzCtx},
try rabbit_access_control:check_topic_access(User, Resource, Access, Context) of
ok ->
CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1),
2019-10-30 22:26:57 +08:00
put(topic_permission_cache, [Key | CacheTail]),
ok
catch
exit:{amqp_error, access_refused, Msg, _AmqpMethod} ->
rabbit_log:error("MQTT topic access refused: ~s", [Msg]),
{error, access_refused}
end
end.
2016-12-05 22:58:19 +08:00
info(protocol, #proc_state{info = #info{proto_human = Val}}) -> Val;
info(host, #proc_state{info = #info{host = Val}}) -> Val;
info(port, #proc_state{info = #info{port = Val}}) -> Val;
info(peer_host, #proc_state{info = #info{peer_host = Val}}) -> Val;
info(peer_port, #proc_state{info = #info{peer_port = Val}}) -> Val;
info(ssl_login_name, #proc_state{ssl_login_name = Val}) -> Val;
2016-12-06 22:24:33 +08:00
info(client_id, #proc_state{client_id = Val}) ->
rabbit_data_coercion:to_binary(Val);
info(vhost, #proc_state{auth_state = #auth_state{vhost = Val}}) -> Val;
info(user, #proc_state{auth_state = #auth_state{username = Val}}) -> Val;
2016-12-05 22:58:19 +08:00
info(clean_sess, #proc_state{clean_sess = Val}) -> Val;
info(will_msg, #proc_state{will_msg = Val}) -> Val;
info(retainer_pid, #proc_state{retainer_pid = Val}) -> Val;
info(exchange, #proc_state{exchange = #resource{name = Val}}) -> Val;
info(prefetch, #proc_state{info = #info{prefetch = Val}}) -> Val;
info(messages_unconfirmed, #proc_state{unacked_client_pubs = Val}) ->
rabbit_mqtt_confirms:size(Val);
info(messages_unacknowledged, #proc_state{unacked_server_pubs = Val}) ->
maps:size(Val);
2016-12-05 22:58:19 +08:00
info(Other, _) -> throw({bad_argument, Other}).
2022-09-01 22:02:08 +08:00
-spec ssl_login_name(rabbit_net:socket()) ->
none | binary().
ssl_login_name(Sock) ->
case rabbit_net:peercert(Sock) of
{ok, C} -> case rabbit_ssl:peer_cert_auth_name(C) of
unsafe -> none;
not_found -> none;
Name -> Name
end;
{error, no_peercert} -> none;
nossl -> none
end.
format_status(#proc_state{queue_states = QState,
proto_ver = ProtoVersion,
has_subs = HasSubs,
unacked_client_pubs = UnackClientPubs,
unacked_server_pubs = UnackSerPubs,
packet_id = PackID,
client_id = ClientID,
clean_sess = CleanSess,
will_msg = WillMsg,
exchange = Exchange,
ssl_login_name = SSLLoginName,
retainer_pid = RetainerPid,
auth_state = AuthState,
peer_addr = PeerAddr,
register_state = RegisterState,
conn_name = ConnName,
info = Info
} = PState) ->
#{queue_states => rabbit_queue_type:format_status(QState),
proto_ver => ProtoVersion,
has_subs => HasSubs,
unacked_client_pubs => UnackClientPubs,
unacked_server_pubs => UnackSerPubs,
packet_id => PackID,
client_id => ClientID,
clean_sess => CleanSess,
will_msg_defined => WillMsg =/= undefined,
exchange => Exchange,
ssl_login_name => SSLLoginName,
retainer_pid => RetainerPid,
auth_state => AuthState,
peer_addr => PeerAddr,
register_state => RegisterState,
conn_name => ConnName,
info => Info,
soft_limit_exceeded => soft_limit_exceeded(PState)}.
soft_limit_exceeded(#proc_state{soft_limit_exceeded = SLE}) ->
not sets:is_empty(SLE).
protocol_integer_to_atom(3) ->
?MQTT_PROTO_V3;
protocol_integer_to_atom(4) ->
?MQTT_PROTO_V4.
maybe_increment_publisher(PState = #proc_state{has_published = false,
proto_ver = ProtoVer}) ->
rabbit_global_counters:publisher_created(ProtoVer),
PState#proc_state{has_published = true};
maybe_increment_publisher(PState) ->
PState.
maybe_decrement_publisher(#proc_state{has_published = true,
proto_ver = ProtoVer}) ->
rabbit_global_counters:publisher_deleted(ProtoVer);
maybe_decrement_publisher(_) ->
ok.
%% multiple subscriptions from the same connection count as one consumer
maybe_increment_consumer(#proc_state{has_subs = true, proto_ver = ProtoVer},
#proc_state{has_subs = false}) ->
rabbit_global_counters:consumer_created(ProtoVer);
maybe_increment_consumer(_, _) ->
ok.
maybe_decrement_consumer(#proc_state{has_subs = true,
proto_ver = ProtoVer}) ->
rabbit_global_counters:consumer_deleted(ProtoVer);
maybe_decrement_consumer(_) ->
ok.
% when there were subscriptions but not anymore
maybe_decrement_consumer(#proc_state{has_subs = false, proto_ver = ProtoVer},
#proc_state{has_subs = true}) ->
rabbit_global_counters:consumer_deleted(ProtoVer);
maybe_decrement_consumer(_, _) ->
ok.