Use 1 instead of 22 Erlang processes per MQTT connection

* Create MQTT connections without proxying via AMQP
* Do authn / authz in rabbitmq_mqtt instead of rabbit_direct:connect/5
* Remove rabbit_heartbeat process and per connection supervisors

Current status:

Creating 10k MQTT connections with clean session succeeds:
./emqtt_bench conn -V 4 -C true -c 10000 -R 500
This commit is contained in:
David Ansari 2022-08-05 09:16:43 +00:00
parent 8e9f0f7627
commit 8710565b2a
10 changed files with 405 additions and 358 deletions

View File

@ -38,7 +38,7 @@
%% Used by TCP-based transports, e.g. STOMP adapter
-export([tcp_listener_addresses/1,
tcp_listener_spec/9, tcp_listener_spec/10,
tcp_listener_spec/9, tcp_listener_spec/10, tcp_listener_spec/11,
ensure_ssl/0, fix_ssl_options/1, poodle_check/1]).
-export([tcp_listener_started/4, tcp_listener_stopped/4]).
@ -206,13 +206,24 @@ tcp_listener_spec(NamePrefix, Address, SocketOpts, Transport, ProtoSup, ProtoOpt
any(), protocol(), non_neg_integer(), non_neg_integer(), label()) ->
supervisor:child_spec().
tcp_listener_spec(NamePrefix, {IPAddress, Port, Family}, SocketOpts,
tcp_listener_spec(NamePrefix, Address, SocketOpts,
Transport, ProtoSup, ProtoOpts, Protocol, NumAcceptors,
ConcurrentConnsSupsCount, Label) ->
Args = [IPAddress, Port, Transport, [Family | SocketOpts], ProtoSup, ProtoOpts,
tcp_listener_spec(NamePrefix, Address, SocketOpts, Transport, ProtoSup, ProtoOpts,
Protocol, NumAcceptors, ConcurrentConnsSupsCount, supervisor, Label).
-spec tcp_listener_spec
(name_prefix(), address(), [gen_tcp:listen_option()], module(), module(),
any(), protocol(), non_neg_integer(), non_neg_integer(), supervisor:worker(), label()) ->
supervisor:child_spec().
tcp_listener_spec(NamePrefix, {IPAddress, Port, Family}, SocketOpts,
Transport, ProtoHandler, ProtoOpts, Protocol, NumAcceptors,
ConcurrentConnsSupsCount, ConnectionType, Label) ->
Args = [IPAddress, Port, Transport, [Family | SocketOpts], ProtoHandler, ProtoOpts,
{?MODULE, tcp_listener_started, [Protocol, SocketOpts]},
{?MODULE, tcp_listener_stopped, [Protocol, SocketOpts]},
NumAcceptors, ConcurrentConnsSupsCount, Label],
NumAcceptors, ConcurrentConnsSupsCount, ConnectionType, Label],
{rabbit_misc:tcp_name(NamePrefix, IPAddress, Port),
{tcp_listener_sup, start_link, Args},
transient, infinity, supervisor, [tcp_listener_sup]}.

View File

@ -16,24 +16,24 @@
-behaviour(supervisor).
-export([start_link/11]).
-export([start_link/12]).
-export([init/1]).
-type mfargs() :: {atom(), atom(), [any()]}.
-spec start_link
(inet:ip_address(), inet:port_number(), module(), [gen_tcp:listen_option()],
module(), any(), mfargs(), mfargs(), integer(), integer(), string()) ->
module(), any(), mfargs(), mfargs(), integer(), integer(), supervisor:worker(), string()) ->
rabbit_types:ok_pid_or_error().
start_link(IPAddress, Port, Transport, SocketOpts, ProtoSup, ProtoOpts, OnStartup, OnShutdown,
ConcurrentAcceptorCount, ConcurrentConnsSups, Label) ->
ConcurrentAcceptorCount, ConcurrentConnsSups, ConnectionType, Label) ->
supervisor:start_link(
?MODULE, {IPAddress, Port, Transport, SocketOpts, ProtoSup, ProtoOpts, OnStartup, OnShutdown,
ConcurrentAcceptorCount, ConcurrentConnsSups, Label}).
ConcurrentAcceptorCount, ConcurrentConnsSups, ConnectionType, Label}).
init({IPAddress, Port, Transport, SocketOpts, ProtoSup, ProtoOpts, OnStartup, OnShutdown,
ConcurrentAcceptorCount, ConcurrentConnsSups, Label}) ->
ConcurrentAcceptorCount, ConcurrentConnsSups, ConnectionType, Label}) ->
{ok, AckTimeout} = application:get_env(rabbit, ssl_handshake_timeout),
MaxConnections = max_conn(rabbit_misc:get_env(rabbit, connection_max, infinity),
ConcurrentConnsSups),
@ -41,7 +41,7 @@ init({IPAddress, Port, Transport, SocketOpts, ProtoSup, ProtoOpts, OnStartup, On
num_acceptors => ConcurrentAcceptorCount,
max_connections => MaxConnections,
handshake_timeout => AckTimeout,
connection_type => supervisor,
connection_type => ConnectionType,
socket_opts => [{ip, IPAddress},
{port, Port} |
SocketOpts],

View File

@ -5,9 +5,12 @@
%% Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
%%
-record(machine_state, {client_ids = #{},
pids = #{},
%% add acouple of fields for future extensibility
reserved_1,
reserved_2}).
-record(machine_state, {
%% client ID to connection PID
client_ids = #{},
%% connection PID to list of client IDs
pids = #{},
%% add acouple of fields for future extensibility
reserved_1,
reserved_2}).

View File

@ -14,13 +14,17 @@
deferred_recv,
received_connect_frame,
connection_state,
keepalive,
keepalive_sup,
conserve,
parse_state,
proc_state,
connection,
stats_timer }).
stats_timer,
keepalive }).
-record(keepalive, {timer :: reference(),
interval_ms :: pos_integer(),
recv_oct :: non_neg_integer(),
received :: boolean()}).
%% processor state
-record(proc_state, { socket,
@ -36,7 +40,6 @@
channels,
connection,
exchange,
adapter_info,
ssl_login_name,
%% Retained messages handler. See rabbit_mqtt_retainer_sup
%% and rabbit_mqtt_retainer.
@ -46,11 +49,20 @@
peer_addr,
mqtt2amqp_fun,
amqp2mqtt_fun,
register_state }).
register_state,
info}).
-record(auth_state, {username,
user,
vhost}).
vhost,
authz_ctx}).
-record(info, {prefetch,
host,
port,
peer_host,
peer_port,
protocol}).
%% does not include vhost: it is used in
%% the table name

View File

@ -9,9 +9,9 @@
-export([start/0, node_id/0, server_id/0, all_node_ids/0, leave/1, trigger_election/0]).
-define(ID_NAME, mqtt_node).
-define(START_TIMEOUT, 100000).
-define(START_TIMEOUT, 100_000).
-define(RETRY_INTERVAL, 5000).
-define(RA_OPERATION_TIMEOUT, 60000).
-define(RA_OPERATION_TIMEOUT, 60_000).
-define(RA_SYSTEM, coordination).
node_id() ->
@ -42,6 +42,7 @@ start(Delay, AttemptsLeft) ->
undefined ->
case Nodes of
[] ->
%%TODO use global lock instead
%% Since cluster members are not known ahead of time and initial boot can be happening in parallel,
%% we wait and check a few times (up to a few seconds) to see if we can discover any peers to
%% join before forming a cluster. This reduces the probability of N independent clusters being

View File

@ -1,64 +0,0 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(rabbit_mqtt_connection_sup).
-behaviour(supervisor).
-behaviour(ranch_protocol).
-include_lib("rabbit_common/include/rabbit.hrl").
-export([start_link/3, start_keepalive_link/0]).
-export([init/1]).
%%----------------------------------------------------------------------------
start_link(Ref, _Transport, []) ->
{ok, SupPid} = supervisor:start_link(?MODULE, []),
{ok, KeepaliveSup} = supervisor:start_child(
SupPid,
#{
id => rabbit_mqtt_keepalive_sup,
start => {rabbit_mqtt_connection_sup, start_keepalive_link, []},
restart => transient,
significant => true,
shutdown => infinity,
type => supervisor,
modules => [rabbit_keepalive_sup]
}
),
{ok, ReaderPid} = supervisor:start_child(
SupPid,
#{
id => rabbit_mqtt_reader,
start => {rabbit_mqtt_reader, start_link, [KeepaliveSup, Ref]},
restart => transient,
significant => true,
shutdown => ?WORKER_WAIT,
type => worker,
modules => [rabbit_mqtt_reader]
}
),
{ok, SupPid, ReaderPid}.
start_keepalive_link() ->
supervisor:start_link(?MODULE, []).
%%----------------------------------------------------------------------------
init([]) ->
{ok,
{
#{
strategy => one_for_all,
intensity => 0,
period => 1,
auto_shutdown => any_significant
},
[]
}}.

View File

@ -7,14 +7,13 @@
-module(rabbit_mqtt_processor).
-export([info/2, initial_state/2, initial_state/5,
-export([info/2, initial_state/2, initial_state/4,
process_frame/2, amqp_pub/2, amqp_callback/2, send_will/1,
close_connection/1, handle_pre_hibernate/0,
handle_ra_event/2]).
%% for testing purposes
-export([get_vhost_username/1, get_vhost/3, get_vhost_from_user_mapping/2,
add_client_id_to_adapter_info/2, maybe_quorum/3]).
-export([get_vhost_username/1, get_vhost/3, get_vhost_from_user_mapping/2, maybe_quorum/3]).
-include_lib("amqp_client/include/amqp_client.hrl").
-include("rabbit_mqtt_frame.hrl").
@ -28,23 +27,13 @@
initial_state(Socket, SSLLoginName) ->
RealSocket = rabbit_net:unwrap_socket(Socket),
{ok, {PeerAddr, _PeerPort}} = rabbit_net:peername(RealSocket),
initial_state(RealSocket, SSLLoginName,
adapter_info(Socket, 'MQTT'),
fun serialise_and_send_to_client/2, PeerAddr).
initial_state(RealSocket, SSLLoginName, fun serialise_and_send_to_client/2, PeerAddr).
initial_state(Socket, SSLLoginName,
AdapterInfo0 = #amqp_adapter_info{additional_info = Extra},
SendFun, PeerAddr) ->
initial_state(Socket, SSLLoginName, SendFun, PeerAddr) ->
{ok, {mqtt2amqp_fun, M2A}, {amqp2mqtt_fun, A2M}} =
rabbit_mqtt_util:get_topic_translation_funs(),
%% MQTT connections use exactly one channel. The frame max is not
%% applicable and there is no way to know what client is used.
AdapterInfo = AdapterInfo0#amqp_adapter_info{additional_info = [
{channels, 1},
{channel_max, 1},
{frame_max, 0},
{client_properties,
[{<<"product">>, longstr, <<"MQTT client">>}]} | Extra]},
#proc_state{ unacked_pubs = gb_trees:empty(),
awaiting_ack = gb_trees:empty(),
message_id = 1,
@ -53,7 +42,6 @@ initial_state(Socket, SSLLoginName,
channels = {undefined, undefined},
exchange = rabbit_mqtt_util:env(exchange),
socket = Socket,
adapter_info = AdapterInfo,
ssl_login_name = SSLLoginName,
send_fun = SendFun,
peer_addr = PeerAddr,
@ -61,7 +49,7 @@ initial_state(Socket, SSLLoginName,
amqp2mqtt_fun = A2M}.
process_frame(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
PState = #proc_state{ connection = undefined } )
PState = #proc_state{ auth_state = undefined } )
when Type =/= ?CONNECT ->
{error, connect_expected, PState};
process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
@ -80,108 +68,87 @@ process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
{error, access_refused, PState}
end.
add_client_id_to_adapter_info(ClientId, #amqp_adapter_info{additional_info = AdditionalInfo0} = AdapterInfo) ->
AdditionalInfo1 = [{variable_map, #{<<"client_id">> => ClientId}}
| AdditionalInfo0],
ClientProperties = proplists:get_value(client_properties, AdditionalInfo1, [])
++ [{client_id, longstr, ClientId}],
AdditionalInfo2 = case lists:keysearch(client_properties, 1, AdditionalInfo1) of
{value, _} ->
lists:keyreplace(client_properties,
1,
AdditionalInfo1,
{client_properties, ClientProperties});
false ->
[{client_properties, ClientProperties} | AdditionalInfo1]
end,
AdapterInfo#amqp_adapter_info{additional_info = AdditionalInfo2}.
process_connect(#mqtt_frame{ variable = #mqtt_frame_connect{
username = Username,
password = Password,
proto_ver = ProtoVersion,
clean_sess = CleanSess,
client_id = ClientId0,
keep_alive = Keepalive} = Var},
PState0 = #proc_state{ ssl_login_name = SSLLoginName,
send_fun = SendFun,
adapter_info = AdapterInfo,
peer_addr = Addr}) ->
process_connect(#mqtt_frame{variable = #mqtt_frame_connect{
username = Username,
password = Password,
proto_ver = ProtoVersion,
clean_sess = CleanSess,
client_id = ClientId0,
keep_alive = Keepalive} = Var},
PState0 = #proc_state{ssl_login_name = SSLLoginName,
socket = Socket,
send_fun = SendFun,
peer_addr = Addr}) ->
ClientId = case ClientId0 of
[] -> rabbit_mqtt_util:gen_client_id();
[_|_] -> ClientId0
end,
rabbit_log_connection:debug("Received a CONNECT, client ID: ~tp (expanded to ~tp), username: ~tp, "
"clean session: ~tp, protocol version: ~tp, keepalive: ~tp",
[ClientId0, ClientId, Username, CleanSess, ProtoVersion, Keepalive]),
AdapterInfo1 = add_client_id_to_adapter_info(rabbit_data_coercion:to_binary(ClientId), AdapterInfo),
PState1 = PState0#proc_state{adapter_info = AdapterInfo1},
rabbit_log_connection:debug("Received a CONNECT, client ID: ~p (expanded to ~p), username: ~p, "
"clean session: ~p, protocol version: ~p, keepalive: ~p",
[ClientId0, ClientId, Username, CleanSess, ProtoVersion, Keepalive]),
% AdapterInfo1 = add_client_id_to_adapter_info(rabbit_data_coercion:to_binary(ClientId), AdapterInfo),
% PState1 = PState0#proc_state{adapter_info = AdapterInfo1},
Ip = list_to_binary(inet:ntoa(Addr)),
{Return, PState5} =
case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)),
ClientId0 =:= [] andalso CleanSess =:= false} of
{false, _} ->
{?CONNACK_PROTO_VER, PState1};
{_, true} ->
{?CONNACK_INVALID_ID, PState1};
_ ->
case creds(Username, Password, SSLLoginName) of
nocreds ->
rabbit_core_metrics:auth_attempt_failed(Ip, <<>>, mqtt),
rabbit_log_connection:error("MQTT login failed: no credentials provided"),
{?CONNACK_CREDENTIALS, PState1};
{invalid_creds, {undefined, Pass}} when is_list(Pass) ->
rabbit_core_metrics:auth_attempt_failed(Ip, <<>>, mqtt),
rabbit_log_connection:error("MQTT login failed: no username is provided"),
{?CONNACK_CREDENTIALS, PState1};
{invalid_creds, {User, undefined}} when is_list(User) ->
rabbit_core_metrics:auth_attempt_failed(Ip, User, mqtt),
rabbit_log_connection:error("MQTT login failed for user '~tp': no password provided", [User]),
{?CONNACK_CREDENTIALS, PState1};
{UserBin, PassBin} ->
case process_login(UserBin, PassBin, ProtoVersion, PState1) of
connack_dup_auth ->
maybe_clean_sess(PState1);
{?CONNACK_ACCEPT, Conn, VHost, AState} ->
case rabbit_mqtt_collector:register(ClientId, self()) of
{ok, Corr} ->
RetainerPid = rabbit_mqtt_retainer_sup:child_for_vhost(VHost),
link(Conn),
{ok, Ch} = amqp_connection:open_channel(Conn),
link(Ch),
amqp_channel:enable_delivery_flow_control(Ch),
Prefetch = rabbit_mqtt_util:env(prefetch),
#'basic.qos_ok'{} = amqp_channel:call(Ch,
#'basic.qos'{prefetch_count = Prefetch}),
{Return, PState2} =
case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)),
ClientId0 =:= [] andalso CleanSess =:= false} of
{false, _} ->
{?CONNACK_PROTO_VER, PState0};
{_, true} ->
{?CONNACK_INVALID_ID, PState0};
_ ->
case creds(Username, Password, SSLLoginName) of
nocreds ->
rabbit_core_metrics:auth_attempt_failed(Ip, <<>>, mqtt),
rabbit_log_connection:error("MQTT login failed: no credentials provided"),
{?CONNACK_CREDENTIALS, PState0};
{invalid_creds, {undefined, Pass}} when is_list(Pass) ->
rabbit_core_metrics:auth_attempt_failed(Ip, <<>>, mqtt),
rabbit_log_connection:error("MQTT login failed: no username is provided"),
{?CONNACK_CREDENTIALS, PState0};
{invalid_creds, {User, undefined}} when is_list(User) ->
rabbit_core_metrics:auth_attempt_failed(Ip, User, mqtt),
rabbit_log_connection:error("MQTT login failed for user '~p': no password provided", [User]),
{?CONNACK_CREDENTIALS, PState0};
{UserBin, PassBin} ->
case process_login(UserBin, PassBin, ClientId, ProtoVersion, PState0) of
connack_dup_auth ->
maybe_clean_sess(PState0);
{?CONNACK_ACCEPT, VHost, ProtoVersion, AState} ->
case rabbit_mqtt_collector:register(ClientId, self()) of
{ok, Corr} ->
RetainerPid = rabbit_mqtt_retainer_sup:child_for_vhost(VHost),
Prefetch = rabbit_mqtt_util:env(prefetch),
rabbit_mqtt_reader:start_keepalive(self(), Keepalive),
PState3 = PState1#proc_state{
{ok, {PeerHost, PeerPort, Host, Port}} = rabbit_net:socket_ends(Socket, inbound),
PState1 = PState0#proc_state{
will_msg = make_will_msg(Var),
clean_sess = CleanSess,
channels = {Ch, undefined},
connection = Conn,
client_id = ClientId,
retainer_pid = RetainerPid,
auth_state = AState,
register_state = {pending, Corr}},
maybe_clean_sess(PState3);
%% e.g. this node was removed from the MQTT cluster members
{error, _} = Err ->
register_state = {pending, Corr},
info = #info{prefetch = Prefetch,
peer_host = PeerHost,
peer_port = PeerPort,
host = Host,
port = Port,
protocol = {'MQTT', human_readable_mqtt_version(ProtoVersion)}}},
maybe_clean_sess(PState1);
%% e.g. this node was removed from the MQTT cluster members
{error, _} = Err ->
rabbit_log_connection:error("MQTT cannot accept a connection: "
"client ID tracker is unavailable: ~tp", [Err]),
%% ignore all exceptions, we are shutting down
catch amqp_connection:close(Conn),
{?CONNACK_SERVER, PState1};
{timeout, _} ->
"client ID tracker is unavailable: ~p", [Err]),
{?CONNACK_SERVER, PState0};
{timeout, _} ->
rabbit_log_connection:error("MQTT cannot accept a connection: "
"client ID registration timed out"),
%% ignore all exceptions, we are shutting down
catch amqp_connection:close(Conn),
{?CONNACK_SERVER, PState1}
end;
ConnAck -> {ConnAck, PState1}
end
end
end,
{?CONNACK_SERVER, PState0}
end;
ConnAck -> {ConnAck, PState0}
end
end
end,
{ReturnCode, SessionPresent} = case Return of
{?CONNACK_ACCEPT, Bool} -> {?CONNACK_ACCEPT, Bool};
Other -> {Other, false}
@ -190,14 +157,14 @@ process_connect(#mqtt_frame{ variable = #mqtt_frame_connect{
variable = #mqtt_frame_connack{
session_present = SessionPresent,
return_code = ReturnCode}},
PState5),
PState2),
case ReturnCode of
?CONNACK_ACCEPT -> {ok, PState5};
?CONNACK_CREDENTIALS -> {error, unauthenticated, PState5};
?CONNACK_AUTH -> {error, unauthorized, PState5};
?CONNACK_SERVER -> {error, unavailable, PState5};
?CONNACK_INVALID_ID -> {error, invalid_client_id, PState5};
?CONNACK_PROTO_VER -> {error, unsupported_protocol_version, PState5}
?CONNACK_ACCEPT -> {ok, PState2};
?CONNACK_CREDENTIALS -> {error, unauthenticated, PState2};
?CONNACK_AUTH -> {error, unauthorized, PState2};
?CONNACK_SERVER -> {error, unavailable, PState2};
?CONNACK_INVALID_ID -> {error, invalid_client_id, PState2};
?CONNACK_PROTO_VER -> {error, unsupported_protocol_version, PState2}
end.
process_request(?CONNECT, Frame, PState = #proc_state{socket = Socket}) ->
@ -535,28 +502,34 @@ maybe_clean_sess(PState = #proc_state { clean_sess = false,
erlang:raise(C, E, S)
end
end;
maybe_clean_sess(PState = #proc_state { clean_sess = true,
connection = Conn,
auth_state = #auth_state{vhost = VHost},
client_id = ClientId }) ->
{_, Queue} = rabbit_mqtt_util:subcription_queue_name(ClientId),
{ok, Channel} = amqp_connection:open_channel(Conn),
case session_present(VHost, ClientId) of
maybe_clean_sess(PState = #proc_state {clean_sess = true,
client_id = ClientId,
auth_state = #auth_state{user = User,
username = Username,
vhost = VHost,
authz_ctx = AuthzCtx}}) ->
{_, QueueName} = rabbit_mqtt_util:subcription_queue_name(ClientId),
Queue = rabbit_misc:r(VHost, queue, QueueName),
case rabbit_amqqueue:exists(Queue) of
false ->
{{?CONNACK_ACCEPT, false}, PState};
true ->
try amqp_channel:call(Channel, #'queue.delete'{ queue = Queue }) of
#'queue.delete_ok'{} -> {{?CONNACK_ACCEPT, false}, PState}
catch
exit:({{shutdown, {server_initiated_close, 403, _}}, _}) ->
%% Connection is not yet propagated to #proc_state{}, let's close it here
catch amqp_connection:close(Conn),
rabbit_log_connection:error("MQTT cannot start a clean session: "
"`configure` permission missing for queue `~tp`", [Queue]),
{?CONNACK_SERVER, PState}
after
catch amqp_channel:close(Channel)
end
ok = rabbit_access_control:check_resource_access(User, Queue, configure, AuthzCtx),
rabbit_amqqueue:with(
Queue,
fun (Q) ->
rabbit_queue_type:delete(Q, false, false, Username)
end,
fun (not_found) ->
ok;
({absent, Q, crashed}) ->
rabbit_classic_queue:delete_crashed(Q, Username);
({absent, Q, stopped}) ->
rabbit_classic_queue:delete_crashed(Q, Username);
({absent, _Q, _Reason}) ->
ok
end),
{{?CONNACK_ACCEPT, false}, PState}
end.
session_present(VHost, ClientId) ->
@ -576,83 +549,134 @@ make_will_msg(#mqtt_frame_connect{ will_retain = Retain,
dup = false,
payload = Msg }.
process_login(_UserBin, _PassBin, _ProtoVersion,
#proc_state{channels = {Channel, _},
peer_addr = Addr,
process_login(_UserBin, _PassBin, _ClientId, _ProtoVersion,
#proc_state{peer_addr = Addr,
auth_state = #auth_state{username = Username,
vhost = VHost}}) when is_pid(Channel) ->
user = User,
vhost = VHost
}})
when Username =/= undefined, User =/= undefined, VHost =/= underfined ->
UsernameStr = rabbit_data_coercion:to_list(Username),
VHostStr = rabbit_data_coercion:to_list(VHost),
rabbit_core_metrics:auth_attempt_failed(list_to_binary(inet:ntoa(Addr)), Username, mqtt),
rabbit_log_connection:warning("MQTT detected duplicate connect/login attempt for user ~tp, vhost ~tp",
[UsernameStr, VHostStr]),
connack_dup_auth;
process_login(UserBin, PassBin, ProtoVersion,
#proc_state{channels = {undefined, undefined},
socket = Sock,
adapter_info = AdapterInfo,
process_login(UserBin, PassBin, ClientId0, ProtoVersion,
#proc_state{socket = Sock,
ssl_login_name = SslLoginName,
peer_addr = Addr}) ->
{ok, {_, _, _, ToPort}} = rabbit_net:socket_ends(Sock, inbound),
{VHostPickedUsing, {VHost, UsernameBin}} = get_vhost(UserBin, SslLoginName, ToPort),
peer_addr = Addr,
auth_state = undefined}) ->
{ok, {PeerHost, PeerPort, Host, Port}} = rabbit_net:socket_ends(Sock, inbound),
{VHostPickedUsing, {VHost, UsernameBin}} = get_vhost(UserBin, SslLoginName, Port),
rabbit_log_connection:debug(
"MQTT vhost picked using ~ts",
[human_readable_vhost_lookup_strategy(VHostPickedUsing)]),
"MQTT vhost picked using ~s",
[human_readable_vhost_lookup_strategy(VHostPickedUsing)]),
RemoteAddress = list_to_binary(inet:ntoa(Addr)),
case rabbit_vhost:exists(VHost) of
true ->
case amqp_connection:start(#amqp_params_direct{
username = UsernameBin,
password = PassBin,
virtual_host = VHost,
adapter_info = set_proto_version(AdapterInfo, ProtoVersion)}) of
{ok, Connection} ->
case rabbit_access_control:check_user_loopback(UsernameBin, Addr) of
ok ->
rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, UsernameBin,
mqtt),
[{internal_user, InternalUser}] = amqp_connection:info(
Connection, [internal_user]),
{?CONNACK_ACCEPT, Connection, VHost,
#auth_state{user = InternalUser,
username = UsernameBin,
vhost = VHost}};
not_allowed ->
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, UsernameBin,
mqtt),
amqp_connection:close(Connection),
rabbit_log_connection:warning(
"MQTT login failed for user ~ts: "
"this user's access is restricted to localhost",
[binary_to_list(UsernameBin)]),
case rabbit_vhost_limit:is_over_connection_limit(VHost) of
false ->
case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of
true ->
ClientId = rabbit_data_coercion:to_binary(ClientId0),
case rabbit_access_control:check_user_login(
UsernameBin,
[{password, PassBin}, {vhost, VHost}, {client_id, ClientId}]) of
{ok, User = #user{username = Username}} ->
notify_auth_result(Username,
user_authentication_success,
[]),
case rabbit_auth_backend_internal:is_over_connection_limit(Username) of
false ->
AuthzCtx = #{<<"client_id">> => ClientId},
try rabbit_access_control:check_vhost_access(User,
VHost,
{ip, Addr},
AuthzCtx) of
ok ->
case rabbit_access_control:check_user_loopback(UsernameBin, Addr) of
ok ->
rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, UsernameBin,
mqtt),
Infos = [{node, node()},
{host, Host},
{port, Port},
{peer_host, PeerHost},
{peer_port, PeerPort},
{user, UsernameBin},
{vhost, VHost}],
rabbit_core_metrics:connection_created(self(), Infos),
rabbit_event:notify(connection_created, Infos),
{?CONNACK_ACCEPT, VHost, ProtoVersion,
#auth_state{user = User,
username = UsernameBin,
vhost = VHost,
authz_ctx = AuthzCtx}};
not_allowed ->
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, UsernameBin,
mqtt),
rabbit_log_connection:warning(
"MQTT login failed for user ~s: "
"this user's access is restricted to localhost",
[binary_to_list(UsernameBin)]),
?CONNACK_AUTH
end
catch exit:#amqp_error{name = not_allowed} ->
rabbit_log_connection:error(
"Error on MQTT connection ~p~n"
"access refused for user '~s'",
[self(), Username]),
?CONNACK_AUTH
end;
{true, Limit} ->
rabbit_log_connection:error(
"Error on MQTT connection ~p~n"
"access refused for user '~s': "
"user connection limit (~p) is reached",
[self(), Username, Limit]),
?CONNACK_AUTH
end;
{refused, Username, Msg, Args} ->
rabbit_log_connection:error(
"Error on MQTT connection ~p~n"
"access refused for user '~s' in vhost '~s' "
++ Msg,
[self(), Username, VHost] ++ Args),
notify_auth_result(Username,
user_authentication_failure,
[{error, rabbit_misc:format(Msg, Args)}]),
?CONNACK_CREDENTIALS
end;
false ->
rabbit_log_connection:error(
"Error on MQTT connection ~p~n"
"access refused for user '~s': "
"vhost is down",
[self(), UsernameBin, VHost]),
?CONNACK_AUTH
end;
{error, {auth_failure, Explanation}} ->
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, UsernameBin, mqtt),
rabbit_log_connection:error("MQTT login failed for user '~ts', authentication failed: ~ts",
[binary_to_list(UserBin), Explanation]),
?CONNACK_CREDENTIALS;
{error, access_refused} ->
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, UsernameBin, mqtt),
rabbit_log_connection:warning("MQTT login failed for user '~ts': "
"virtual host access not allowed",
[binary_to_list(UserBin)]),
?CONNACK_AUTH;
{error, not_allowed} ->
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, UsernameBin, mqtt),
%% when vhost allowed for TLS connection
rabbit_log_connection:warning("MQTT login failed for user '~ts': "
"virtual host access not allowed",
[binary_to_list(UserBin)]),
{true, Limit} ->
rabbit_log_connection:error(
"Error on MQTT connection ~p~n"
"access to vhost '~s' refused for user '~s': "
"vhost connection limit (~p) is reached",
[self(), VHost, UsernameBin, Limit]),
?CONNACK_AUTH
end;
false ->
rabbit_core_metrics:auth_attempt_failed(RemoteAddress, UsernameBin, mqtt),
rabbit_log_connection:error("MQTT login failed for user '~ts': virtual host '~ts' does not exist",
[UserBin, VHost]),
rabbit_log_connection:error("MQTT login failed for user '~s': virtual host '~s' does not exist",
[UserBin, VHost]),
?CONNACK_CREDENTIALS
end.
notify_auth_result(Username, AuthResult, ExtraProps) ->
EventProps = [{connection_type, mqtt},
{name, case Username of none -> ''; _ -> Username end}] ++
ExtraProps,
rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']).
get_vhost(UserBin, none, Port) ->
get_vhost_no_ssl(UserBin, Port);
get_vhost(UserBin, undefined, Port) ->
@ -951,12 +975,9 @@ amqp_pub(#mqtt_msg{ qos = Qos,
PState #proc_state{ unacked_pubs = UnackedPubs1,
awaiting_seqno = SeqNo1 }.
adapter_info(Sock, ProtoName) ->
amqp_connection:socket_adapter_info(Sock, {ProtoName, "N/A"}).
set_proto_version(AdapterInfo = #amqp_adapter_info{protocol = {Proto, _}}, Vsn) ->
AdapterInfo#amqp_adapter_info{protocol = {Proto,
human_readable_mqtt_version(Vsn)}}.
% set_proto_version(AdapterInfo = #amqp_adapter_info{protocol = {Proto, _}}, Vsn) ->
% AdapterInfo#amqp_adapter_info{protocol = {Proto,
% human_readable_mqtt_version(Vsn)}}.
human_readable_mqtt_version(3) ->
"3.1.0";
@ -1085,36 +1106,35 @@ info(clean_sess, #proc_state{clean_sess = Val}) -> Val;
info(will_msg, #proc_state{will_msg = Val}) -> Val;
info(channels, #proc_state{channels = Val}) -> Val;
info(exchange, #proc_state{exchange = Val}) -> Val;
info(adapter_info, #proc_state{adapter_info = Val}) -> Val;
info(ssl_login_name, #proc_state{ssl_login_name = Val}) -> Val;
info(retainer_pid, #proc_state{retainer_pid = Val}) -> Val;
info(user, #proc_state{auth_state = #auth_state{username = Val}}) -> Val;
info(vhost, #proc_state{auth_state = #auth_state{vhost = Val}}) -> Val;
info(host, #proc_state{adapter_info = #amqp_adapter_info{host = Val}}) -> Val;
info(port, #proc_state{adapter_info = #amqp_adapter_info{port = Val}}) -> Val;
info(peer_host, #proc_state{adapter_info = #amqp_adapter_info{peer_host = Val}}) -> Val;
info(peer_port, #proc_state{adapter_info = #amqp_adapter_info{peer_port = Val}}) -> Val;
info(protocol, #proc_state{adapter_info = #amqp_adapter_info{protocol = Val}}) ->
info(host, #proc_state{info = #info{host = Val}}) -> Val;
info(port, #proc_state{info = #info{port = Val}}) -> Val;
info(peer_host, #proc_state{info = #info{peer_host = Val}}) -> Val;
info(peer_port, #proc_state{info = #info{peer_port = Val}}) -> Val;
info(protocol, #proc_state{info = #info{protocol = Val}}) ->
case Val of
{Proto, Version} -> {Proto, rabbit_data_coercion:to_binary(Version)};
Other -> Other
end;
info(channels, PState) -> additional_info(channels, PState);
info(channel_max, PState) -> additional_info(channel_max, PState);
info(frame_max, PState) -> additional_info(frame_max, PState);
info(client_properties, PState) -> additional_info(client_properties, PState);
info(ssl, PState) -> additional_info(ssl, PState);
info(ssl_protocol, PState) -> additional_info(ssl_protocol, PState);
info(ssl_key_exchange, PState) -> additional_info(ssl_key_exchange, PState);
info(ssl_cipher, PState) -> additional_info(ssl_cipher, PState);
info(ssl_hash, PState) -> additional_info(ssl_hash, PState);
% info(channels, PState) -> additional_info(channels, PState);
% info(channel_max, PState) -> additional_info(channel_max, PState);
% info(frame_max, PState) -> additional_info(frame_max, PState);
% info(client_properties, PState) -> additional_info(client_properties, PState);
% info(ssl, PState) -> additional_info(ssl, PState);
% info(ssl_protocol, PState) -> additional_info(ssl_protocol, PState);
% info(ssl_key_exchange, PState) -> additional_info(ssl_key_exchange, PState);
% info(ssl_cipher, PState) -> additional_info(ssl_cipher, PState);
% info(ssl_hash, PState) -> additional_info(ssl_hash, PState);
info(Other, _) -> throw({bad_argument, Other}).
additional_info(Key,
#proc_state{adapter_info =
#amqp_adapter_info{additional_info = AddInfo}}) ->
proplists:get_value(Key, AddInfo).
% additional_info(Key,
% #proc_state{adapter_info =
% #amqp_adapter_info{additional_info = AddInfo}}) ->
% proplists:get_value(Key, AddInfo).
notify_received(undefined) ->
%% no notification for quorum queues and streams

View File

@ -8,11 +8,15 @@
-module(rabbit_mqtt_reader).
-behaviour(gen_server2).
-behaviour(ranch_protocol).
-export([start_link/2]).
-export([start_link/3]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
code_change/3, terminate/2, handle_pre_hibernate/1]).
%%TODO check where to best 'hibernate' when returning from callback
%%TODO use rabbit_global_counters for MQTT protocol
-export([conserve_resources/3, start_keepalive/2,
close_connection/2]).
@ -27,10 +31,8 @@
%%----------------------------------------------------------------------------
start_link(KeepaliveSup, Ref) ->
Pid = proc_lib:spawn_link(?MODULE, init,
[[KeepaliveSup, Ref]]),
start_link(Ref, _Transport, []) ->
Pid = proc_lib:spawn_link(?MODULE, init, [Ref]),
{ok, Pid}.
conserve_resources(Pid, _, {_, Conserve, _}) ->
@ -48,7 +50,7 @@ close_connection(Pid, Reason) ->
%%----------------------------------------------------------------------------
init([KeepaliveSup, Ref]) ->
init(Ref) ->
process_flag(trap_exit, true),
{ok, Sock} = rabbit_networking:handshake(Ref,
application:get_env(rabbitmq_mqtt, proxy_protocol, false)),
@ -69,8 +71,6 @@ init([KeepaliveSup, Ref]) ->
await_recv = false,
connection_state = running,
received_connect_frame = false,
keepalive = {none, none},
keepalive_sup = KeepaliveSup,
conserve = false,
parse_state = rabbit_mqtt_frame:initial_state(),
proc_state = ProcessorState }), #state.stats_timer),
@ -176,20 +176,64 @@ handle_info({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
maybe_process_deferred_recv(control_throttle(State));
handle_info({start_keepalives, Keepalive},
State = #state { keepalive_sup = KeepaliveSup, socket = Sock }) ->
%% Only the client has the responsibility for sending keepalives
SendFun = fun() -> ok end,
Parent = self(),
ReceiveFun = fun() -> Parent ! keepalive_timeout end,
Heartbeater = rabbit_heartbeat:start(
KeepaliveSup, Sock, 0, SendFun, Keepalive, ReceiveFun),
{noreply, State #state { keepalive = Heartbeater }};
handle_info({start_keepalive, KeepaliveSec},
State = #state{socket = Sock,
keepalive = undefined})
when is_number(KeepaliveSec), KeepaliveSec > 0 ->
case rabbit_net:getstat(Sock, [recv_oct]) of
{ok, [{recv_oct, RecvOct}]} ->
%% "If the Keep Alive value is non-zero and the Server does not receive a Control
%% Packet from the Client within one and a half times the Keep Alive time period,
%% it MUST disconnect the Network Connection to the Client as if the network had
%% failed" [MQTT-3.1.2-24]
%% 0.75 * 2 = 1.5
IntervalMs = timer:seconds(round(0.75 * KeepaliveSec)),
Ref = start_keepalive_timer(#keepalive{interval_ms = IntervalMs}),
{noreply, State#state{keepalive = #keepalive{timer = Ref,
interval_ms = IntervalMs,
recv_oct = RecvOct,
received = true}}};
{error, einval} ->
%% the socket is dead, most likely because the connection is being shut down
{stop, {shutdown, cannot_get_socket_stats}, State};
{error, Reason} ->
{stop, Reason, State}
end;
handle_info(keepalive_timeout, State = #state {conn_name = ConnStr,
proc_state = PState}) ->
rabbit_log_connection:error("closing MQTT connection ~tp (keepalive timeout)", [ConnStr]),
send_will_and_terminate(PState, {shutdown, keepalive_timeout}, State);
handle_info({timeout, Ref, keepalive},
State = #state {socket = Sock,
conn_name = ConnStr,
proc_state = PState,
keepalive = #keepalive{timer = Ref,
recv_oct = SameRecvOct,
received = ReceivedPreviously} = KeepAlive}) ->
case rabbit_net:getstat(Sock, [recv_oct]) of
{ok, [{recv_oct, SameRecvOct}]}
when ReceivedPreviously ->
%% Did not receive from socket for the 1st time.
Ref1 = start_keepalive_timer(KeepAlive),
{noreply,
State#state{keepalive = KeepAlive#keepalive{timer = Ref1,
received = false}},
hibernate};
{ok, [{recv_oct, SameRecvOct}]} ->
%% Did not receive from socket for 2nd time successively.
rabbit_log_connection:error("closing MQTT connection ~tp (keepalive timeout)", [ConnStr]),
send_will_and_terminate(PState, {shutdown, keepalive_timeout}, State);
{ok, [{recv_oct, RecvOct}]} ->
%% Received from socket.
Ref1 = start_keepalive_timer(KeepAlive),
{noreply,
State#state{keepalive = KeepAlive#keepalive{timer = Ref1,
recv_oct = RecvOct,
received = true}},
hibernate};
{error, einval} ->
%% the socket is dead, most likely because the connection is being shut down
{stop, {shutdown, cannot_get_socket_stats}, State};
{error, Reason} ->
{stop, Reason, State}
end;
handle_info(login_timeout, State = #state{received_connect_frame = true}) ->
{noreply, State};
@ -215,6 +259,12 @@ handle_info({ra_event, _From, Evt},
handle_info(Msg, State) ->
{stop, {mqtt_unexpected_msg, Msg}, State}.
start_keepalive_timer(#keepalive{interval_ms = Time}) ->
erlang:start_timer(Time, self(), keepalive).
cancel_keepalive_timer(#keepalive{timer = Ref}) ->
erlang:cancel_timer(Ref, [{async, true}, {info, false}]).
terminate(Reason, State) ->
maybe_emit_stats(State),
do_terminate(Reason, State).
@ -369,7 +419,7 @@ callback_reply(State, {error, Reason, ProcState}) ->
{stop, Reason, pstate(State, ProcState)}.
start_keepalive(_, 0 ) -> ok;
start_keepalive(Pid, Keepalive) -> Pid ! {start_keepalives, Keepalive}.
start_keepalive(Pid, Keepalive) -> Pid ! {start_keepalive, Keepalive}.
pstate(State = #state {}, PState = #proc_state{}) ->
State #state{ proc_state = PState }.
@ -421,17 +471,28 @@ run_socket(State = #state{ socket = Sock }) ->
rabbit_net:setopts(Sock, [{active, once}]),
State#state{ await_recv = true }.
control_throttle(State = #state{ connection_state = Flow,
conserve = Conserve }) ->
control_throttle(State = #state{connection_state = Flow,
conserve = Conserve,
keepalive = KeepAlive}) ->
case {Flow, Conserve orelse credit_flow:blocked()} of
{running, true} -> ok = rabbit_heartbeat:pause_monitor(
State#state.keepalive),
State #state{ connection_state = blocked };
{blocked, false} -> ok = rabbit_heartbeat:resume_monitor(
State#state.keepalive),
run_socket(State #state{
connection_state = running });
{_, _} -> run_socket(State)
{running, true}
when KeepAlive =:= undefined ->
State#state{connection_state = blocked};
{running, true} ->
%%TODO Instead of cancelling / setting the timer every time the connection
%% gets blocked / unblocked, restart the timer when it expires and
%% the connection_state is blocked.
ok = cancel_keepalive_timer(KeepAlive),
State#state{connection_state = blocked};
{blocked, false}
when KeepAlive =:= undefined ->
run_socket(State #state{connection_state = running});
{blocked, false} ->
Ref = start_keepalive_timer(KeepAlive),
run_socket(State #state{connection_state = running,
keepalive = KeepAlive#keepalive{timer = Ref}});
{_, _} ->
run_socket(State)
end.
maybe_process_deferred_recv(State = #state{ deferred_recv = undefined }) ->

View File

@ -21,3 +21,5 @@ behaviour_info(_Other) ->
table_name_for(VHost) ->
rabbit_mqtt_util:vhost_name_to_table_name(VHost).
%%TODO could add a rabbitmq_mqtt_retained_msg_store_khepri to have some replication

View File

@ -73,34 +73,35 @@ listener_specs(Fun, Args, Listeners) ->
tcp_listener_spec([Address, SocketOpts, NumAcceptors, ConcurrentConnsSups]) ->
rabbit_networking:tcp_listener_spec(
rabbit_mqtt_listener_sup,
Address,
SocketOpts,
transport(?TCP_PROTOCOL),
rabbit_mqtt_connection_sup,
[],
mqtt,
NumAcceptors,
ConcurrentConnsSups,
"MQTT TCP listener"
).
rabbit_mqtt_listener_sup,
Address,
SocketOpts,
transport(?TCP_PROTOCOL),
rabbit_mqtt_reader,
[],
mqtt,
NumAcceptors,
ConcurrentConnsSups,
worker,
"MQTT TCP listener"
).
ssl_listener_spec([Address, SocketOpts, SslOpts, NumAcceptors, ConcurrentConnsSups]) ->
rabbit_networking:tcp_listener_spec(
rabbit_mqtt_listener_sup,
Address,
SocketOpts ++ SslOpts,
transport(?TLS_PROTOCOL),
rabbit_mqtt_connection_sup,
[],
'mqtt/ssl',
NumAcceptors,
ConcurrentConnsSups,
"MQTT TLS listener"
).
rabbit_mqtt_listener_sup,
Address,
SocketOpts ++ SslOpts,
transport(?TLS_PROTOCOL),
rabbit_mqtt_reader,
[],
'mqtt/ssl',
NumAcceptors,
ConcurrentConnsSups,
worker,
"MQTT TLS listener"
).
transport(Protocol) ->
case Protocol of
?TCP_PROTOCOL -> ranch_tcp;
?TLS_PROTOCOL -> ranch_ssl
end.
transport(?TCP_PROTOCOL) ->
ranch_tcp;
transport(?TLS_PROTOCOL) ->
ranch_ssl.