Mc: introduce new function in mc_amqp to init mc from stream.

Initialising a message container from data stored in a
stream is a special case where we need to recover exchange
and routing key information from the following message annatations:

* x-exchange
* x-routing-keys
* x-cc

We do not want to do this when initialising a message container
from AMQP data just received from a publisher.

This commit introduces a new function `mc_amqp:init_from_stream/2`
that is to be used when needing a message container from a stream
message.
This commit is contained in:
Karl Nilsson 2025-02-13 08:15:58 +00:00
parent 95b6df775a
commit 32615bf5f0
2 changed files with 61 additions and 43 deletions

View File

@ -17,6 +17,8 @@
prepare/2
]).
-export([init_from_stream/2]).
-import(rabbit_misc,
[maps_put_truthy/3]).
@ -99,10 +101,26 @@
-export_type([state/0]).
%% API
-spec init_from_stream(binary(), mc:annotations()) ->
mc:state().
init_from_stream(Payload, #{} = Anns0) ->
Sections = amqp10_framing:decode_bin(Payload, [server_mode]),
Msg = msg_body_encoded(Sections, Payload, #msg_body_encoded{}),
%% when initalising from stored stream data the recovered
%% annotations take precendence over the ones provided
Anns = maps:merge(Anns0, essential_properties(Msg, recover)),
mc:init(?MODULE, Msg, Anns).
%% CALLBACKS
init(#msg_body_encoded{} = Msg) ->
{Msg, #{}};
init(Payload) ->
Sections = amqp10_framing:decode_bin(Payload, [server_mode]),
Msg = msg_body_encoded(Sections, Payload, #msg_body_encoded{}),
Anns = essential_properties(Msg),
Anns = essential_properties(Msg, new),
{Msg, Anns}.
convert_from(?MODULE, Sections, _Env) when is_list(Sections) ->
@ -622,16 +640,44 @@ encode_deaths(Deaths) ->
{map, Map}
end, Deaths).
essential_properties(Msg) ->
essential_properties(#msg_body_encoded{} = Msg, new) ->
Durable = get_property(durable, Msg),
Priority = get_property(priority, Msg),
Timestamp = get_property(timestamp, Msg),
Ttl = get_property(ttl, Msg),
Anns = #{?ANN_DURABLE => Durable},
maps_put_truthy(
?ANN_PRIORITY, Priority,
maps_put_truthy(
?ANN_TIMESTAMP, Timestamp,
maps_put_truthy(
ttl, Ttl,
Anns))).
Anns0 = #{?ANN_DURABLE => Durable},
Anns = maps_put_truthy(
?ANN_PRIORITY, Priority,
maps_put_truthy(
?ANN_TIMESTAMP, Timestamp,
maps_put_truthy(
ttl, Ttl,
Anns0))),
Anns;
essential_properties(#msg_body_encoded{message_annotations = MA} = Msg, recover) ->
Anns = essential_properties(Msg, new),
case MA of
[] ->
Anns;
_ ->
lists:foldl(
fun ({{symbol, <<"x-routing-key">>},
{utf8, Key}}, Acc) ->
maps:update_with(?ANN_ROUTING_KEYS,
fun(L) -> [Key | L] end,
[Key],
Acc);
({{symbol, <<"x-cc">>},
{list, CCs0}}, Acc) ->
CCs = [CC || {_T, CC} <- CCs0],
maps:update_with(?ANN_ROUTING_KEYS,
fun(L) -> L ++ CCs end,
CCs,
Acc);
({{symbol, <<"x-exchange">>},
{utf8, Exchange}}, Acc) ->
Acc#{?ANN_EXCHANGE => Exchange};
(_, Acc) ->
Acc
end, Anns, MA)
end.

View File

@ -1305,39 +1305,11 @@ parse_uncompressed_subbatch(
parse_uncompressed_subbatch(Rem, Offset + 1, StartOffset, QName,
Name, LocalPid, Filter, Acc).
entry_to_msg(Entry, Offset, #resource{kind = queue, name = QName}, Name, LocalPid, Filter) ->
Mc0 = mc:init(mc_amqp, Entry, #{}),
%% If exchange or routing keys annotation isn't present the entry most likely came
%% from the rabbitmq-stream plugin so we'll choose defaults that simulate use
%% of the direct exchange.
XHeaders = mc:x_headers(Mc0),
Exchange = case XHeaders of
#{<<"x-exchange">> := {utf8, X}} ->
X;
_ ->
<<>>
end,
RKeys0 = case XHeaders of
#{<<"x-cc">> := {list, CCs}} ->
[CC || {utf8, CC} <- CCs];
_ ->
[]
end,
RKeys1 = case XHeaders of
#{<<"x-routing-key">> := {utf8, RK}} ->
[RK | RKeys0];
_ ->
RKeys0
end,
RKeys = case RKeys1 of
[] ->
[QName];
_ ->
RKeys1
end,
Mc1 = mc:set_annotation(?ANN_EXCHANGE, Exchange, Mc0),
Mc2 = mc:set_annotation(?ANN_ROUTING_KEYS, RKeys, Mc1),
Mc = mc:set_annotation(<<"x-stream-offset">>, Offset, Mc2),
entry_to_msg(Entry, Offset, #resource{kind = queue, name = QName},
Name, LocalPid, Filter) ->
Mc = mc_amqp:init_from_stream(Entry, #{?ANN_EXCHANGE => <<>>,
?ANN_ROUTING_KEYS => [QName],
<<"x-stream-offset">> => Offset}),
case rabbit_amqp_filtex:filter(Filter, Mc) of
true ->
{Name, LocalPid, Offset, false, Mc};