Make interactions with Ra async

To avoid blocking when registering or unregistering a client id. This is
ok as informing the current connection holder of the client id is
already async. This should be more scalable and provide much better MQTT
connection setup latency.
This commit is contained in:
kjnilsson 2020-02-10 17:05:43 +00:00
parent 2927f473ce
commit eadf5f7094
5 changed files with 118 additions and 24 deletions

View File

@ -54,7 +54,8 @@
send_fun,
peer_addr,
mqtt2amqp_fun,
amqp2mqtt_fun }).
amqp2mqtt_fun,
register_state }).
-record(auth_state, {username,
user,

View File

@ -18,33 +18,80 @@
-include("mqtt_machine.hrl").
-export([register/2, unregister/2, list/0, leave/1]).
-export([register/2, register/3, unregister/2, list/0, leave/1]).
%%----------------------------------------------------------------------------
-spec register(term(), pid()) -> {ok, reference()} | {error, term()}.
register(ClientId, Pid) ->
run_ra_command({register, ClientId, Pid}).
{ClusterName, _} = NodeId = mqtt_node:node_id(),
case ra_leaderboard:lookup_leader(ClusterName) of
undefined ->
case ra:members(NodeId) of
{ok, _, Leader} ->
register(Leader, ClientId, Pid);
_ = Error ->
Error
end;
Leader ->
register(Leader, ClientId, Pid)
end.
-spec register(ra:server_id(), term(), pid()) ->
{ok, reference()} | {error, term()}.
register(ServerId, ClientId, Pid) ->
Corr = make_ref(),
send_ra_command(ServerId, {register, ClientId, Pid}, Corr),
erlang:send_after(5000, self(), {ra_event, undefined, register_timeout}),
{ok, Corr}.
unregister(ClientId, Pid) ->
run_ra_command({unregister, ClientId, Pid}).
{ClusterName, _} = mqtt_node:node_id(),
case ra_leaderboard:lookup_leader(ClusterName) of
undefined ->
ok;
Leader ->
send_ra_command(Leader, {unregister, ClientId, Pid}, no_correlation)
end.
list() ->
NodeIds = mqtt_node:all_node_ids(),
{ClusterName, _} = mqtt_node:node_id(),
QF = fun (#machine_state{client_ids = Ids}) -> maps:to_list(Ids) end,
case ra:leader_query(NodeIds, QF) of
{ok, {_, Ids}, _} -> Ids;
{timeout, _} -> []
end.
case ra_leaderboard:lookup_leader(ClusterName) of
undefined ->
NodeIds = mqtt_node:all_node_ids(),
case ra:leader_query(NodeIds, QF) of
{ok, {_, Ids}, _} -> Ids;
{timeout, _} ->
rabbit_log:debug("~s:list/0 leader query timed out",
[?MODULE]),
[]
end;
Leader ->
case ra:leader_query(Leader, QF) of
{ok, {_, Ids}, _} -> Ids;
{error, _} ->
[];
{timeout, _} ->
rabbit_log:debug("~s:list/0 leader query timed out",
[?MODULE]),
[]
end
end.
leave(NodeBin) ->
Node = binary_to_atom(NodeBin, utf8),
run_ra_command({leave, Node}),
ServerId = mqtt_node:node_id(),
run_ra_command(ServerId, {leave, Node}),
mqtt_node:leave(Node).
%%----------------------------------------------------------------------------
-spec run_ra_command(term()) -> term() | {error, term()}.
run_ra_command(RaCommand) ->
NodeId = mqtt_node:node_id(),
case ra:process_command(NodeId, RaCommand) of
-spec run_ra_command(term(), term()) -> term() | {error, term()}.
run_ra_command(ServerId, RaCommand) ->
case ra:process_command(ServerId, RaCommand) of
{ok, Result, _} -> Result;
_ = Error -> Error
end.
-spec send_ra_command(term(), term(), term()) -> ok.
send_ra_command(ServerId, RaCommand, Correlation) ->
ok = ra:pipeline_command(ServerId, RaCommand, Correlation, normal).

View File

@ -18,7 +18,8 @@
-export([info/2, initial_state/2, initial_state/5,
process_frame/2, amqp_pub/2, amqp_callback/2, send_will/1,
close_connection/1, handle_pre_hibernate/0]).
close_connection/1, handle_pre_hibernate/0,
handle_ra_event/2]).
%% for testing purposes
-export([get_vhost_username/1, get_vhost/3, get_vhost_from_user_mapping/2,
@ -140,7 +141,7 @@ process_request(?CONNECT,
{{?CONNACK_ACCEPT, SessionPresent0}, PState2};
{?CONNACK_ACCEPT, Conn, VHost, AState} ->
case rabbit_mqtt_collector:register(ClientId, self()) of
ok ->
{ok, Corr} ->
RetainerPid = rabbit_mqtt_retainer_sup:child_for_vhost(VHost),
link(Conn),
{ok, Ch} = amqp_connection:open_channel(Conn),
@ -157,7 +158,8 @@ process_request(?CONNECT,
connection = Conn,
client_id = ClientId,
retainer_pid = RetainerPid,
auth_state = AState},
auth_state = AState,
register_state = {pending, Corr}},
{SessionPresent1, PState4} = maybe_clean_sess(PState3),
{{?CONNACK_ACCEPT, SessionPresent1}, PState4};
%% e.g. this node was removed from the MQTT cluster members
@ -908,6 +910,28 @@ handle_pre_hibernate() ->
erase(topic_permission_cache),
ok.
handle_ra_event({applied, [{Corr, ok}]},
PState = #proc_state{register_state = {pending, Corr}}) ->
%% success case - command was applied transition into registered state
PState#proc_state{register_state = registered};
handle_ra_event({not_leader, Leader, Corr},
PState = #proc_state{register_state = {pending, Corr},
client_id = ClientId}) ->
%% retry command against actual leader
{ok, NewCorr} = rabbit_mqtt_collector:register(Leader, ClientId, self()),
PState#proc_state{register_state = {pending, NewCorr}};
handle_ra_event(register_timeout,
PState = #proc_state{register_state = {pending, _Corr},
client_id = ClientId}) ->
{ok, NewCorr} = rabbit_mqtt_collector:register(ClientId, self()),
PState#proc_state{register_state = {pending, NewCorr}};
handle_ra_event(register_timeout, PState) ->
PState;
handle_ra_event(Evt, PState) ->
%% log these?
rabbit_log:debug("unhandled ra_event: ~w ~n", [Evt]),
PState.
%% NB: check_*: MQTT spec says we should ack normally, ie pretend there
%% was no auth error, but here we are closing the connection with an error. This
%% is what happens anyway if there is an authorization failure at the AMQP 0-9-1 client level.

View File

@ -190,6 +190,13 @@ handle_info(keepalive_timeout, State = #state {conn_name = ConnStr,
handle_info(emit_stats, State) ->
{noreply, emit_stats(State), hibernate};
handle_info({ra_event, _From, Evt},
#state{proc_state = PState} = State) ->
%% handle applied event to ensure registration command actually got applied
%% handle not_leader notification in case we send the command to a non-leader
PState1 = rabbit_mqtt_processor:handle_ra_event(Evt, PState),
{noreply, State#state{proc_state = PState1}, hibernate};
handle_info(Msg, State) ->
{stop, {mqtt_unexpected_msg, Msg}, State}.

View File

@ -91,7 +91,7 @@ connection_id_tracking(Config) ->
expect_publishes(<<"TopicA">>, [<<"Payload">>]),
%% there's one connection
[_] = rabbit_ct_broker_helpers:rpc(Config, 2, rabbit_mqtt_collector, list, []),
assert_connection_count(Config, 10, 2, 1),
%% connect to the same node (A or 0)
{ok, MRef2, _C2} = connect_to_node(Config, 0, ID),
@ -101,7 +101,7 @@ connection_id_tracking(Config) ->
%% connect to a different node (C or 2)
{ok, _, C3} = connect_to_node(Config, 2, ID),
[_] = rabbit_ct_broker_helpers:rpc(Config, 2, rabbit_mqtt_collector, list, []),
assert_connection_count(Config, 10, 2, 1),
%% C2 is disconnected
await_disconnection(MRef2),
@ -114,11 +114,11 @@ connection_id_tracking_on_nodedown(Config) ->
emqttc:subscribe(C, <<"TopicA">>, qos0),
emqttc:publish(C, <<"TopicA">>, <<"Payload">>),
expect_publishes(<<"TopicA">>, [<<"Payload">>]),
[_] = rabbit_ct_broker_helpers:rpc(Config, 2, rabbit_mqtt_collector, list, []),
assert_connection_count(Config, 10, 2, 1),
ok = rabbit_ct_broker_helpers:stop_node(Config, Server),
await_disconnection(MRef),
[] = rabbit_ct_broker_helpers:rpc(Config, 2, rabbit_mqtt_collector, list, []).
assert_connection_count(Config, 10, 2, 0),
ok.
connection_id_tracking_with_decommissioned_node(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
@ -127,15 +127,30 @@ connection_id_tracking_with_decommissioned_node(Config) ->
emqttc:publish(C, <<"TopicA">>, <<"Payload">>),
expect_publishes(<<"TopicA">>, [<<"Payload">>]),
[_] = rabbit_ct_broker_helpers:rpc(Config, 2, rabbit_mqtt_collector, list, []),
assert_connection_count(Config, 10, 2, 1),
{ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["decommission_mqtt_node", Server]),
await_disconnection(MRef),
[] = rabbit_ct_broker_helpers:rpc(Config, 2, rabbit_mqtt_collector, list, []).
assert_connection_count(Config, 10, 2, 0),
ok.
%%
%% Helpers
%%
assert_connection_count(_Config, 0, _, _) ->
ct:fail("failed to complete rabbit_mqtt_collector:list/0");
assert_connection_count(Config, Retries, NodeId, NumElements) ->
List = rabbit_ct_broker_helpers:rpc(Config, NodeId, rabbit_mqtt_collector, list, []),
case length(List) == NumElements of
true ->
ok;
false ->
timer:sleep(200),
assert_connection_count(Config, Retries-1, NodeId, NumElements)
end.
connect_to_node(Config, Node, ClientID) ->
Port = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_mqtt),
{ok, C} = connect(Port, ClientID),