Consume with QoS0 via queue_type interface

This commit is contained in:
David Ansari 2022-08-13 15:28:30 +02:00
parent 24b0a6bcb2
commit eac0622f37
6 changed files with 268 additions and 85 deletions

View File

@ -48,14 +48,11 @@ BUILD_DEPS = [
DEPS = [
"//deps/amqp_client:erlang_app",
"//deps/rabbit_common:erlang_app",
"//deps/rabbit:erlang_app",
"@ra//:erlang_app",
"@ranch//:erlang_app",
]
RUNTIME_DEPS = [
"//deps/rabbit:erlang_app",
]
rabbitmq_app(
app_description = APP_DESCRIPTION,
app_env = APP_ENV,
@ -63,7 +60,6 @@ rabbitmq_app(
app_module = APP_MODULE,
app_name = APP_NAME,
build_deps = BUILD_DEPS,
runtime_deps = RUNTIME_DEPS,
deps = DEPS,
)

View File

@ -5,6 +5,13 @@
%% Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
%%
%%TODO decrease per connection memory overhead
%% since the Raft process memory can grow a few GBs with
%% millions of connections.
%% 1. Use binaries instead of string()s for the ConnectionId
%% 2. Use new Erlang 24 function erlang:monitor/3 with tag being the ConnectionId
%% so that we can get rid of pids fields because we won't to lookup the ConnectionId
%% by PID anymore.
-record(machine_state, {
%% client ID to connection PID
client_ids = #{},

View File

@ -69,15 +69,16 @@
-record(mqtt_frame_publish, {topic_name,
message_id}).
-record(mqtt_topic, {name,
qos}).
-record(mqtt_frame_subscribe,{message_id,
topic_table}).
topic_table :: nonempty_list(#mqtt_topic{})
}).
-record(mqtt_frame_suback, {message_id,
qos_table = []}).
-record(mqtt_topic, {name,
qos}).
-record(mqtt_frame_other, {other}).
-record(mqtt_msg, {retain :: boolean(),

View File

@ -14,7 +14,8 @@
init/1,
apply/3,
state_enter/2,
notify_connection/2]).
notify_connection/2,
overview/1]).
-type state() :: #machine_state{}.
@ -178,6 +179,12 @@ state_enter(leader, State) ->
state_enter(_, _) ->
[].
-spec overview(state()) -> map().
overview(#machine_state{client_ids = ClientIds,
pids = Pids}) ->
#{num_client_ids => maps:size(ClientIds),
num_pids => maps:size(Pids)}.
%% ==========================
%% Avoids blocking the Raft leader.

View File

@ -10,19 +10,21 @@
-export([info/2, initial_state/2, initial_state/4,
process_frame/2, amqp_pub/2, amqp_callback/2, send_will/1,
close_connection/1, handle_pre_hibernate/0,
handle_ra_event/2]).
handle_ra_event/2, handle_down/2, handle_queue_event/2]).
%% for testing purposes
-export([get_vhost_username/1, get_vhost/3, get_vhost_from_user_mapping/2, maybe_quorum/3]).
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("rabbit/include/amqqueue.hrl").
-include("rabbit_mqtt_frame.hrl").
-include("rabbit_mqtt.hrl").
-define(APP, rabbitmq_mqtt).
-define(FRAME_TYPE(Frame, Type),
Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}).
-define(MAX_TOPIC_PERMISSION_CACHE_SIZE, 12).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
-define(CONSUMER_TAG, mqtt_consumer).
initial_state(Socket, SSLLoginName) ->
RealSocket = rabbit_net:unwrap_socket(Socket),
@ -236,35 +238,25 @@ process_request(?SUBSCRIBE,
message_id = SubscribeMsgId,
topic_table = Topics},
payload = undefined},
#proc_state{channels = {Channel, _},
exchange = Exchange,
retainer_pid = RPid,
#proc_state{retainer_pid = RPid,
send_fun = SendFun,
message_id = StateMsgId,
mqtt2amqp_fun = Mqtt2AmqpFun} = PState0) ->
rabbit_log_connection:debug("Received a SUBSCRIBE for topic(s) ~tp", [Topics]),
message_id = StateMsgId} = PState0) ->
rabbit_log_connection:debug("Received a SUBSCRIBE for topic(s) ~p", [Topics]),
{QosResponse, PState1} =
lists:foldl(fun (#mqtt_topic{name = TopicName,
qos = Qos}, {QosList, PState}) ->
qos = Qos}, {QosList, S0}) ->
SupportedQos = supported_subs_qos(Qos),
{Queue, #proc_state{subscriptions = Subs} = PState1} =
ensure_queue(SupportedQos, PState),
RoutingKey = Mqtt2AmqpFun(TopicName),
Binding = #'queue.bind'{
queue = Queue,
exchange = Exchange,
routing_key = RoutingKey},
#'queue.bind_ok'{} = amqp_channel:call(Channel, Binding),
{QueueName, #proc_state{subscriptions = Subs} = S} =
ensure_queue(SupportedQos, S0),
bind(QueueName, TopicName, S),
SupportedQosList = case maps:find(TopicName, Subs) of
{ok, L} -> [SupportedQos|L];
error -> [SupportedQos]
end,
{[SupportedQos | QosList],
PState1 #proc_state{
subscriptions =
maps:put(TopicName, SupportedQosList, Subs)}}
end, {[], PState0}, Topics),
S#proc_state{subscriptions = maps:put(TopicName, SupportedQosList, Subs)}}
end, {[], PState0}, Topics),
SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK},
variable = #mqtt_frame_suback{
message_id = SubscribeMsgId,
@ -839,55 +831,147 @@ maybe_quorum(Qos1Args, CleanSession, Queue) ->
%% with appropriate durability and timeout arguments
%% this will lead to duplicate messages for overlapping subscriptions
%% with different qos values - todo: prevent duplicates
ensure_queue(Qos, #proc_state{ channels = {Channel, _},
client_id = ClientId,
clean_sess = CleanSess,
consumer_tags = {TagQ0, TagQ1} = Tags} = PState) ->
{QueueQ0, QueueQ1} = rabbit_mqtt_util:subcription_queue_name(ClientId),
Qos1Args = case {rabbit_mqtt_util:env(subscription_ttl), CleanSess} of
{undefined, _} ->
[];
{Ms, false} when is_integer(Ms) ->
[{<<"x-expires">>, long, Ms}];
_ ->
[]
end,
QueueSetup =
case {TagQ0, TagQ1, Qos} of
{undefined, _, ?QOS_0} ->
{QueueQ0,
#'queue.declare'{ queue = QueueQ0,
durable = false,
auto_delete = true },
#'basic.consume'{ queue = QueueQ0,
no_ack = true }};
{_, undefined, ?QOS_1} ->
{QueueQ1,
#'queue.declare'{ queue = QueueQ1,
durable = true,
%% Clean session means a transient connection,
%% translating into auto-delete.
%%
%% see rabbitmq/rabbitmq-mqtt#37
auto_delete = CleanSess,
arguments = maybe_quorum(Qos1Args, CleanSess, QueueQ1)},
#'basic.consume'{ queue = QueueQ1,
no_ack = false }};
{_, _, ?QOS_0} ->
{exists, QueueQ0};
{_, _, ?QOS_1} ->
{exists, QueueQ1}
end,
case QueueSetup of
{Queue, Declare, Consume} ->
#'queue.declare_ok'{} = amqp_channel:call(Channel, Declare),
#'basic.consume_ok'{ consumer_tag = Tag } =
amqp_channel:call(Channel, Consume),
{Queue, PState #proc_state{ consumer_tags = setelement(Qos+1, Tags, Tag) }};
{exists, Q} ->
{Q, PState}
%ensure_queue(Qos, #proc_state{ channels = {Channel, _},
% client_id = ClientId,
% clean_sess = CleanSess,
% consumer_tags = {TagQ0, TagQ1} = Tags} = PState) ->
% {QueueQ0, QueueQ1} = rabbit_mqtt_util:subcription_queue_name(ClientId),
% Qos1Args = case {rabbit_mqtt_util:env(subscription_ttl), CleanSess} of
% {undefined, _} ->
% [];
% {Ms, false} when is_integer(Ms) ->
% [{<<"x-expires">>, long, Ms}];
% _ ->
% []
% end,
% QueueSetup =
% case {TagQ0, TagQ1, Qos} of
% {undefined, _, ?QOS_0} ->
% {QueueQ0,
% #'queue.declare'{ queue = QueueQ0,
% durable = false,
% auto_delete = true },
% #'basic.consume'{ queue = QueueQ0,
% no_ack = true }};
% {_, undefined, ?QOS_1} ->
% {QueueQ1,
% #'queue.declare'{ queue = QueueQ1,
% durable = true,
% %% Clean session means a transient connection,
% %% translating into auto-delete.
% %%
% %% see rabbitmq/rabbitmq-mqtt#37
% auto_delete = CleanSess,
% arguments = maybe_quorum(Qos1Args, CleanSess, QueueQ1)},
% #'basic.consume'{ queue = QueueQ1,
% no_ack = false }};
% {_, _, ?QOS_0} ->
% {exists, QueueQ0};
% {_, _, ?QOS_1} ->
% {exists, QueueQ1}
% end,
% case QueueSetup of
% {Queue, Declare, Consume} ->
% #'queue.declare_ok'{} = amqp_channel:call(Channel, Declare),
% #'basic.consume_ok'{ consumer_tag = Tag } =
% amqp_channel:call(Channel, Consume),
% {Queue, PState #proc_state{ consumer_tags = setelement(Qos+1, Tags, Tag) }};
% {exists, Q} ->
% {Q, PState}
% end.
ensure_queue(?QOS_0, %% spike handles only QoS0
#proc_state{
client_id = ClientId,
clean_sess = _CleanSess,
queue_states = QueueStates0,
auth_state = #auth_state{
vhost = VHost,
user = User = #user{username = Username},
authz_ctx = AuthzCtx},
info = #info{prefetch = Prefetch}
} = PState0) ->
{QueueBin, _QueueQos1Bin} = rabbit_mqtt_util:subcription_queue_name(ClientId),
QueueName = rabbit_misc:r(VHost, queue, QueueBin),
case rabbit_amqqueue:exists(QueueName) of
true ->
{QueueName, PState0};
false ->
check_resource_access(User, QueueName, read, AuthzCtx),
check_resource_access(User, QueueName, configure, AuthzCtx),
rabbit_core_metrics:queue_declared(QueueName),
Durable = false,
AutoDelete = true,
case rabbit_amqqueue:with(
QueueName,
fun (Q) -> ok = rabbit_amqqueue:assert_equivalence(
Q, Durable, AutoDelete, [], none),
rabbit_amqqueue:stat(Q)
end) of
{error, not_found} ->
case rabbit_vhost_limit:is_over_queue_limit(VHost) of
false ->
case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
[], none, Username) of
{new, Q} when ?is_amqqueue(Q) ->
rabbit_core_metrics:queue_created(QueueName),
Spec = #{no_ack => true,
channel_pid => self(),
limiter_pid => none,
limiter_active => false,
prefetch_count => Prefetch,
consumer_tag => ?CONSUMER_TAG,
exclusive_consume => false,
args => [],
ok_msg => undefined,
acting_user => Username},
case rabbit_queue_type:consume(Q, Spec, QueueStates0) of
{ok, QueueStates, _Actions = []} ->
% rabbit_global_counters:consumer_created(mqtt),
PState = PState0#proc_state{queue_states = QueueStates},
{QueueName, PState};
Other ->
exit(
lists:flatten(
io_lib:format("Failed to consume from ~s: ~p",
[rabbit_misc:rs(QueueName), Other])))
end;
Other ->
exit(lists:flatten(
io_lib:format("Failed to declare ~s: ~p",
[rabbit_misc:rs(QueueName), Other])))
end;
{true, Limit} ->
exit(
lists:flatten(
io_lib:format("cannot declare ~s because "
"queue limit ~p in vhost '~s' is reached",
[rabbit_misc:rs(QueueName), Limit, VHost])))
end;
Other ->
exit(
lists:flatten(
io_lib:format("Expected ~s to not exist, got: ~p",
[rabbit_misc:rs(QueueName), Other])))
end
end.
bind(QueueName,
TopicName,
#proc_state{exchange = ExchangeName,
auth_state = #auth_state{
user = User = #user{username = Username},
authz_ctx = AuthzCtx},
mqtt2amqp_fun = Mqtt2AmqpFun} = PState) ->
ok = rabbit_access_control:check_resource_access(User, QueueName, write, AuthzCtx),
ok = rabbit_access_control:check_resource_access(User, ExchangeName, read, AuthzCtx),
ok = check_topic_access(TopicName, read, PState),
RoutingKey = Mqtt2AmqpFun(TopicName),
Binding = #binding{source = ExchangeName,
destination = QueueName,
key = RoutingKey},
ok = rabbit_binding:add(Binding, Username).
send_will(PState = #proc_state{will_msg = undefined}) ->
PState;
@ -924,7 +1008,7 @@ amqp_pub(undefined, PState) ->
amqp_pub(#mqtt_msg{qos = Qos,
topic = Topic,
dup = Dup,
message_id = _MessageId, %%TODO track in unacked_pubs for QoS > 0
message_id = _MessageId, %% spike handles only QoS0
payload = Payload},
PState = #proc_state{exchange = ExchangeName,
% unacked_pubs = UnackedPubs,
@ -958,7 +1042,7 @@ amqp_pub(#mqtt_msg{qos = Qos,
confirm = Confirm,
sender = self(),
message = BasicMessage,
msg_seq_no = undefined, %%TODO assumes QoS 0
msg_seq_no = undefined, %% spike handles only QoS0
flow = noflow %%TODO enable flow control
},
@ -987,7 +1071,7 @@ deliver_to_queues(Delivery = #delivery{message = _Message = #basic_message{ex
%% sent before confirms.
%% TODO: AMQP 0.9.1 mandatory flag corresponds to MQTT 5 PUBACK reason code "No matching subscribers"
% ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
%% TODO allows QoS > 0
%% spike handles only QoS0
% State1 = process_routing_confirm(Confirm, QueueNames,
% MsgSeqNo, XName, State0),
@ -1006,6 +1090,8 @@ human_readable_mqtt_version(_) ->
"N/A".
serialise_and_send_to_client(Frame, #proc_state{ socket = Sock }) ->
%%TODO Test sending large frames at high speed: Will we need garbage collection as done
%% in rabbit_writer:maybe_gc_large_msg()?
try rabbit_net:port_command(Sock, rabbit_mqtt_frame:serialise(Frame)) of
Res ->
Res
@ -1036,6 +1122,7 @@ close_connection(PState = #proc_state{ connection = Connection,
handle_pre_hibernate() ->
erase(topic_permission_cache),
erase(permission_cache),
ok.
handle_ra_event({applied, [{Corr, ok}]},
@ -1060,6 +1147,67 @@ handle_ra_event(Evt, PState) ->
rabbit_log:debug("unhandled ra_event: ~w ", [Evt]),
PState.
handle_down({'DOWN', _MRef, process, QPid, Reason},
PState0 = #proc_state{queue_states = QStates0}) ->
%% spike handles only QoS0
case rabbit_queue_type:handle_down(QPid, Reason, QStates0) of
{ok, QStates1, Actions} ->
PState = PState0#proc_state{queue_states = QStates1},
handle_queue_actions(Actions, PState);
{eol, QStates1, QRef} ->
QStates = rabbit_queue_type:remove(QRef, QStates1),
PState0#proc_state{queue_states = QStates}
end.
handle_queue_event({queue_event, QRef, Evt},
PState0 = #proc_state{queue_states = QueueStates0}) ->
case rabbit_queue_type:handle_event(QRef, Evt, QueueStates0) of
{ok, QueueStates, Actions} ->
PState1 = PState0#proc_state{queue_states = QueueStates},
PState = handle_queue_actions(Actions, PState1),
{ok, PState};
eol ->
{error, queue_eol, PState0};
{protocol_error, _Type, _Reason, _ReasonArgs} = Error ->
{error, Error, PState0}
end.
handle_queue_actions(Actions, #proc_state{} = PState0) ->
lists:foldl(
fun ({deliver, ?CONSUMER_TAG, _AckRequired = false, Msgs}, S) ->
handle_deliver(Msgs, S)
end, PState0, Actions).
handle_deliver(Msgs, PState)
when is_list(Msgs) ->
lists:foldl(fun(Msg, S) ->
handle_deliver0(Msg, S)
end, PState, Msgs).
handle_deliver0({_QName, _QPid, _MsgId, Redelivered,
#basic_message{routing_keys = [RoutingKey | _CcRoutes],
content = #content{
properties = #'P_basic'{headers = Headers},
payload_fragments_rev = FragmentsRev}}},
PState = #proc_state{send_fun = SendFun,
amqp2mqtt_fun = Amqp2MqttFun}) ->
Dup = case rabbit_mqtt_util:table_lookup(Headers, <<"x-mqtt-dup">>) of
undefined -> Redelivered;
{bool, Dup0} -> Redelivered orelse Dup0
end,
%%TODO support iolists when sending to client
Payload = list_to_binary(lists:reverse(FragmentsRev)),
Frame = #mqtt_frame{fixed = #mqtt_frame_fixed{
type = ?PUBLISH,
qos = ?QOS_0, %% spike handles only QoS0
dup = Dup},
variable = #mqtt_frame_publish{
message_id = undefined, %% spike handles only QoS0
topic_name = Amqp2MqttFun(RoutingKey)},
payload = Payload},
SendFun(Frame, PState),
PState.
check_publish(TopicName, Fn, PState) ->
%%TODO check additionally write access to exchange as done in channel?
case check_topic_access(TopicName, write, PState) of
@ -1100,7 +1248,7 @@ check_topic_access(TopicName, Access,
try rabbit_access_control:check_topic_access(User, Resource, Access, Context) of
ok ->
CacheTail = lists:sublist(Cache, ?MAX_TOPIC_PERMISSION_CACHE_SIZE - 1),
CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1),
put(topic_permission_cache, [Key | CacheTail]),
ok;
R ->
@ -1115,6 +1263,21 @@ check_topic_access(TopicName, Access,
end
end.
%% TODO copied from channel, remove duplication
check_resource_access(User, Resource, Perm, Context) ->
V = {Resource, Context, Perm},
Cache = case get(permission_cache) of
undefined -> [];
Other -> Other
end,
case lists:member(V, Cache) of
true -> ok;
false -> ok = rabbit_access_control:check_resource_access(
User, Resource, Perm, Context),
CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1),
put(permission_cache, [V | CacheTail])
end.
info(consumer_tags, #proc_state{consumer_tags = Val}) -> Val;
info(unacked_pubs, #proc_state{unacked_pubs = Val}) -> Val;
info(awaiting_ack, #proc_state{awaiting_ack = Val}) -> Val;

View File

@ -118,6 +118,10 @@ handle_cast({close_connection, Reason},
[ConnName, rabbit_mqtt_processor:info(client_id, PState), Reason]),
{stop, {shutdown, server_initiated_close}, State};
handle_cast(QueueEvent = {queue_event, _, _},
State = #state{proc_state = PState}) ->
callback_reply(State, rabbit_mqtt_processor:handle_queue_event(QueueEvent, PState));
handle_cast(Msg, State) ->
{stop, {mqtt_unexpected_cast, Msg}, State}.
@ -250,11 +254,16 @@ handle_info(emit_stats, State) ->
{noreply, emit_stats(State), hibernate};
handle_info({ra_event, _From, Evt},
#state{proc_state = PState} = State) ->
#state{proc_state = PState0} = State) ->
%% handle applied event to ensure registration command actually got applied
%% handle not_leader notification in case we send the command to a non-leader
PState1 = rabbit_mqtt_processor:handle_ra_event(Evt, PState),
{noreply, State#state{proc_state = PState1}, hibernate};
PState = rabbit_mqtt_processor:handle_ra_event(Evt, PState0),
{noreply, pstate(State, PState), hibernate};
handle_info({'DOWN', _MRef, process, _Pid, _Reason} = Evt,
#state{proc_state = PState0} = State) ->
PState = rabbit_mqtt_processor:handle_down(Evt, PState0),
{noreply, pstate(State, PState), hibernate};
handle_info(Msg, State) ->
{stop, {mqtt_unexpected_msg, Msg}, State}.