Store message interceptor context in MQTT proc state

It's a tradeoff between building the map for each incoming and outgoing
message (now that there are also outgoing interceptors) vs increased
memory usage for the MQTT proc state.

Connecting with MQTT 5.0 and client ID "xxxxxxxx", the number of words
are 201 before this commit vs 235 after this commit as determined by:
```
S = sys:get_state(MQTTConnectionPid),
erts_debug:size(S).
```
Therefore, this commit requires 34 word * 8 bytes = 272 bytes more per MQTT
connection, that is 272 MB more for 1,000,000 MQTT connections.
This commit is contained in:
David Ansari 2025-04-18 10:38:04 +02:00 committed by David Ansari
parent 21bd300d61
commit a24ba55d45
6 changed files with 30 additions and 33 deletions

View File

@ -493,7 +493,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
OptionalVariables = extract_variable_map_from_amqp_params(AmqpParams),
{ok, GCThreshold} = application:get_env(rabbit, writer_gc_threshold),
MaxConsumers = application:get_env(rabbit, consumer_max_per_channel, infinity),
MsgInterceptorCtx = #{protocol => amqp091,
MsgIcptCtx = #{protocol => amqp091,
vhost => VHost,
username => User#user.username,
connection_name => ConnName},
@ -515,7 +515,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
authz_context = OptionalVariables,
max_consumers = MaxConsumers,
writer_gc_threshold = GCThreshold,
msg_interceptor_ctx = MsgInterceptorCtx},
msg_interceptor_ctx = MsgIcptCtx},
limiter = Limiter,
tx = none,
next_tag = 1,

View File

@ -45,8 +45,8 @@ intercept_outgoing(Msg, Ctx) ->
intercept(Msg, Ctx, Stage) ->
Interceptors = persistent_term:get(?KEY),
lists:foldl(fun({Mod, Config}, Msg0) ->
Mod:intercept(Msg0, Ctx, Stage, Config)
lists:foldl(fun({Mod, Cfg}, Msg0) ->
Mod:intercept(Msg0, Ctx, Stage, Cfg)
end, Msg, Interceptors).
-spec set_annotation(mc:state(), mc:ann_key(), mc:ann_value(),

View File

@ -11,9 +11,9 @@
-export([intercept/4]).
intercept(Msg, _Ctx, incoming, Config) ->
intercept(Msg, _Ctx, incoming, Cfg) ->
Node = atom_to_binary(node()),
Overwrite = maps:get(overwrite, Config),
Overwrite = maps:get(overwrite, Cfg),
rabbit_msg_interceptor:set_annotation(Msg, ?KEY, Node, Overwrite);
intercept(Msg, _Ctx, _Stage, _Config) ->
intercept(Msg, _Ctx, _Stage, _Cfg) ->
Msg.

View File

@ -16,15 +16,15 @@
-export([intercept/4]).
intercept(Msg0, _Ctx, incoming, #{incoming := _True} = Config) ->
Overwrite = maps:get(overwrite, Config),
intercept(Msg0, _Ctx, incoming, #{incoming := _True} = Cfg) ->
Overwrite = maps:get(overwrite, Cfg),
Ts = mc:get_annotation(?ANN_RECEIVED_AT_TIMESTAMP, Msg0),
Msg = rabbit_msg_interceptor:set_annotation(Msg0, ?KEY_INCOMING, Ts, Overwrite),
set_timestamp(Msg, Ts, Overwrite);
intercept(Msg, _Ctx, outgoing, #{outgoing := _True}) ->
Ts = os:system_time(millisecond),
mc:set_annotation(?KEY_OUTGOING, Ts, Msg);
intercept(Msg, _MsgInterceptorCtx, _Stage, _Config) ->
intercept(Msg, _MsgIcptCtx, _Stage, _Cfg) ->
Msg.
set_timestamp(Msg, Ts, true) ->

View File

@ -16,5 +16,5 @@ intercept(Msg, #{protocol := Proto, client_id := ClientId}, incoming, _Cfg)
Proto =:= mqtt311 orelse
Proto =:= mqtt310 ->
mc:set_annotation(?KEY, ClientId, Msg);
intercept(Msg, _Ctx, _Stage, _Config) ->
intercept(Msg, _Ctx, _Stage, _Cfg) ->
Msg.

View File

@ -92,7 +92,8 @@
%% The database stores the MQTT subscription options in the binding arguments for:
%% * v1 as Erlang record #mqtt_subscription_opts{}
%% * v2 as AMQP 0.9.1 table
binding_args_v2 :: boolean()
binding_args_v2 :: boolean(),
msg_interceptor_ctx :: rabbit_msg_interceptor:context()
}).
-record(state,
@ -214,9 +215,15 @@ process_connect(
%% To simplify logic, we decide at connection establishment time to stick
%% with either binding args v1 or v2 for the lifetime of the connection.
BindingArgsV2 = rabbit_feature_flags:is_enabled('rabbitmq_4.1.0'),
ProtoVerAtom = proto_integer_to_atom(ProtoVer),
MsgIcptCtx = #{protocol => ProtoVerAtom,
vhost => VHost,
username => Username,
connection_name => ConnName,
client_id => ClientId},
S = #state{
cfg = #cfg{socket = Socket,
proto_ver = proto_integer_to_atom(ProtoVer),
proto_ver = ProtoVerAtom,
clean_start = CleanStart,
session_expiry_interval_secs = SessionExpiry,
ssl_login_name = SslLoginName,
@ -237,7 +244,8 @@ process_connect(
will_msg = WillMsg,
max_packet_size_outbound = MaxPacketSize,
topic_alias_maximum_outbound = TopicAliasMaxOutbound,
binding_args_v2 = BindingArgsV2},
binding_args_v2 = BindingArgsV2,
msg_interceptor_ctx = MsgIcptCtx},
auth_state = #auth_state{
user = User,
authz_ctx = AuthzCtx}},
@ -1632,15 +1640,15 @@ publish_to_queues(
#state{cfg = #cfg{exchange = ExchangeName = #resource{name = ExchangeNameBin},
delivery_flow = Flow,
conn_name = ConnName,
trace_state = TraceState},
trace_state = TraceState,
msg_interceptor_ctx = MsgIcptCtx},
auth_state = #auth_state{user = #user{username = Username}}} = State) ->
Anns = #{?ANN_EXCHANGE => ExchangeNameBin,
?ANN_ROUTING_KEYS => [mqtt_to_amqp(Topic)]},
Msg0 = mc:init(mc_mqtt, MqttMsg, Anns, mc_env()),
case rabbit_exchange:lookup(ExchangeName) of
{ok, Exchange} ->
Ctx = msg_interceptor_ctx(State),
Msg = rabbit_msg_interceptor:intercept_incoming(Msg0, Ctx),
Msg = rabbit_msg_interceptor:intercept_incoming(Msg0, MsgIcptCtx),
QNames0 = rabbit_exchange:route(Exchange, Msg, #{return_binding_keys => true}),
QNames = drop_local(QNames0, State),
rabbit_trace:tap_in(Msg, QNames, ConnName, Username, TraceState),
@ -2066,13 +2074,13 @@ deliver_to_client(Msgs, Ack, State) ->
end, State, Msgs).
deliver_one_to_client({QNameOrType, QPid, QMsgId, _Redelivered, Mc} = Delivery,
AckRequired, State0) ->
AckRequired,
#state{cfg = #cfg{msg_interceptor_ctx = MsgIcptCtx}} = State0) ->
SubscriberQoS = case AckRequired of
true -> ?QOS_1;
false -> ?QOS_0
end,
McMqtt0 = mc:convert(mc_mqtt, Mc, mc_env()),
MsgIcptCtx = msg_interceptor_ctx(State0),
McMqtt = rabbit_msg_interceptor:intercept_outgoing(McMqtt0, MsgIcptCtx),
MqttMsg = #mqtt_msg{qos = PublisherQos} = mc:protocol_state(McMqtt),
QoS = effective_qos(PublisherQos, SubscriberQoS),
@ -2539,17 +2547,6 @@ message_redelivered(_, _, _) ->
is_success(ReasonCode) ->
ReasonCode < ?RC_UNSPECIFIED_ERROR.
msg_interceptor_ctx(#state{cfg = #cfg{client_id = ClientId,
conn_name = ConnName,
vhost = VHost,
proto_ver = ProtoVer},
auth_state = #auth_state{user = #user{username = Username}}}) ->
#{protocol => ProtoVer,
vhost => VHost,
username => Username,
connection_name => ConnName,
client_id => ClientId}.
-spec format_status(state()) -> map().
format_status(
#state{queue_states = QState,