New mc_amqp state

This commit is contained in:
David Ansari 2024-04-14 15:04:38 +02:00
parent 0a31d7721b
commit 4980669502
11 changed files with 356 additions and 210 deletions

View File

@ -12,6 +12,10 @@
-export([parse/1,
parse_many/2]).
-ifdef(TEST).
-export([parse_many_slow/1]).
-endif.
%% TODO put often matched binary function clauses to the top?
%% server_mode is a special parsing mode used by RabbitMQ when parsing
@ -25,7 +29,7 @@
%% AMQP 3.2.4 & 3.2.5
-define(NUMERIC_DESCRIPTOR_IS_PROPERTIES_OR_APPLICATION_PROPERTIES(Code),
Code =:= 16#73 orelse
Code =< 16#74).
Code =:= 16#74).
-define(SYMBOLIC_DESCRIPTOR_IS_PROPERTIES_OR_APPLICATION_PROPERTIES(Name),
Name =:= <<"amqp:properties:list">> orelse
Name =:= <<"amqp:application-properties:map">>).
@ -185,7 +189,6 @@ mapify([Key, Value | Rest]) ->
[{Key, Value} | mapify(Rest)].
-ifdef(TEST).
-export([parse_many_slow/1]).
%% This is the old, slow, original parser implemenation before
%% https://github.com/rabbitmq/rabbitmq-server/pull/4811
%% where many sub binaries are being created.

View File

@ -15,7 +15,7 @@
decode_bin/1,
decode_bin/2,
symbol_for/1,
number_for/1,
number_for/1,
pprint/1]).
%% debug
@ -187,7 +187,7 @@ encode_bin(X) ->
-spec decode_bin(binary()) -> [term()].
decode_bin(Binary) ->
[decode(Section) || Section <- amqp10_binary_parser:parse_many(Binary, #{})].
[decode(Section) || Section <- amqp10_binary_parser:parse_many(Binary, [])].
-spec decode_bin(binary(), amqp10_binary_parser:opts()) -> [term()].
decode_bin(Binary, Opts) ->

View File

@ -187,7 +187,7 @@ roundtrip(Term) ->
Bin = iolist_to_binary(amqp10_binary_generator:generate(Term)),
% generate returns an iolist but parse expects a binary
?assertEqual({Term, <<>>}, amqp10_binary_parser:parse(Bin)),
?assertEqual([Term], amqp10_binary_parser:parse_many(Bin, #{})).
?assertEqual([Term], amqp10_binary_parser:parse_many(Bin, [])).
%% Return the roundtripped term.
roundtrip_return(Term) ->
@ -195,5 +195,5 @@ roundtrip_return(Term) ->
%% We assert only that amqp10_binary_parser:parse/1 and
%% amqp10_binary_parser:parse_all/1 return the same term.
{RoundTripTerm, <<>>} = amqp10_binary_parser:parse(Bin),
?assertEqual([RoundTripTerm], amqp10_binary_parser:parse_many(Bin, #{})),
?assertEqual([RoundTripTerm], amqp10_binary_parser:parse_many(Bin, [])),
RoundTripTerm.

View File

@ -71,7 +71,7 @@ roundtrip(_Config) ->
end, <<>>, Terms),
?assertEqual(Terms, amqp10_binary_parser:parse_many_slow(Bin)),
?assertEqual(Terms, amqp10_binary_parser:parse_many(Bin, #{})).
?assertEqual(Terms, amqp10_binary_parser:parse_many(Bin, [])).
array_with_extra_input(_Config) ->
Bin = <<83,16,192,85,10,177,0,0,0,1,48,161,12,114,97,98,98,105,116, 109,113,45,98,111,120,112,255,255,0,0,96,0,50,112,0,0,19,136,163,5,101,110,45,85,83,224,14,2,65,5,102,105,45,70,73,5,101,110,45,85,83,64,64,193,24,2,163,20,68,69,70,69,78,83,73,67,83,46,84,69,83,84,46,83,85,73,84,69,65>>,
@ -81,10 +81,12 @@ array_with_extra_input(_Config) ->
65, <<105,45,70,73,5,101,110,45,85,83>>, [true,true]},
?assertExit(Expected, amqp10_binary_parser:parse_many_slow(Bin)),
?assertExit(Expected, amqp10_binary_parser:parse_many(Bin, #{})).
?assertExit(Expected, amqp10_binary_parser:parse_many(Bin, [])).
unsupported_type(_Config) ->
Bin = <<2/integer, "hey">>,
Expected = {primitive_type_unsupported, 16#02, <<"hey">>},
?assertThrow(Expected, amqp10_binary_parser:parse_many_slow(Bin)),
?assertThrow(Expected, amqp10_binary_parser:parse_many(Bin, #{})).
UnsupportedType = 16#02,
Bin = <<UnsupportedType, "hey">>,
?assertThrow({primitive_type_unsupported, UnsupportedType},
amqp10_binary_parser:parse_many_slow(Bin)),
?assertThrow({primitive_type_unsupported, UnsupportedType, {position, 0}},
amqp10_binary_parser:parse_many(Bin, [])).

View File

@ -67,7 +67,8 @@
-export_type([
state/0,
ann_key/0,
ann_value/0
ann_value/0,
annotations/0
]).
-type proto_state() :: term().

View File

@ -13,7 +13,6 @@
convert_to/3,
convert_from/3,
protocol_state/2,
serialize/1,
prepare/2
]).
@ -37,88 +36,92 @@
is_number(V) orelse
is_boolean(V)).
%% Before storing the message on disk in classic queues and quorum queues,
%% for better performance and less disk usage, we we clear fields with redundant content.
-define(CLEARED, c).
-type amqp10_data() :: [#'v1_0.amqp_sequence'{} | #'v1_0.data'{}] | #'v1_0.amqp_value'{}.
-type amqp_map() :: [{term(), term()}].
-type opt(T) :: T | undefined.
-record(msg,
%% This representation is used when the message was originally sent with
%% a protocol other than AMQP and the message was not read from a stream.
-record(msg_body_decoded,
{
header :: opt(#'v1_0.header'{}),
delivery_annotations = []:: list(),
message_annotations = [] :: list(),
properties :: opt(#'v1_0.properties'{}),
application_properties = [] :: list(),
data = [] :: amqp10_data(),
footer = [] :: list()
}).
%% This representation is used when we received the message from
%% an AMQP client or when we read the message from a stream.
%% This message was parsed up to the section preceding the body.
-record(msg_body_encoded,
{
header :: opt(#'v1_0.header'{}),
delivery_annotations = [] :: amqp_map(),
message_annotations = [] :: amqp_map(),
properties :: opt(#'v1_0.properties'{}),
application_properties = [] :: amqp_map(),
bare_and_footer = uninit :: uninit | binary(),
%% byte position within bare_and_footer where body starts
bare_and_footer_body_pos = uninit :: uninit | non_neg_integer()
}).
%% This representation is how we store the message on disk in classic queues
%% and quorum queues. For better performance and less disk usage, we omit the
%% header because the header fields we're interested in are already set as mc
%% annotations. We store the original bare message unaltered to preserve
%% message hashes on the binary encoding of the bare message [§3.2].
%% The record is called v1 just in case we ever want to introduce a new v2
%% on disk representation in the future.
-record(v1,
{
%% We can clear this field before storage because we already set
%% the header fields we're interested in as mc annotations.
header :: none | ?CLEARED | #'v1_0.header'{},
delivery_annotations = [] :: amqp_map(),
message_annotations = [] :: amqp_map(),
%% We can clear properties and application_properties before storage
%% because they can be parsed from the bare message in bare_and_footer.
properties :: none | ?CLEARED | #'v1_0.properties'{},
application_properties = [] :: ?CLEARED | amqp_map(),
%% We must forward the bare message unaltered to preserve message
%% hashes on the binary encoding of the bare message [3.2].
bare_and_footer :: binary()
}).
-opaque state() :: #msg{}.
-opaque state() :: #msg_body_decoded{} | #msg_body_encoded{} | #v1{}.
-export_type([
state/0,
message_section/0
]).
init(Sections) when is_list(Sections) ->
Msg = decode(Sections, #msg{}),
init(Msg);
init(Payload) when is_binary(Payload) ->
Sections = amqp10_framing:decode_bin(Payload, [server_mode]),
{value, {{pos, Pos}, _FirstBareMsgSection}} = lists:search(fun({{pos, _P}, _Sect}) ->
true;
(_Sect) ->
false
end, Sections),
%% Assert the message contains a mandatory body.
{{pos, _}, body} = lists:last(Sections),
BareAndFooter = binary_part(Payload, Pos, byte_size(Payload) - Pos),
Msg = decode(Sections, #msg{}),
init(Msg);
init(#msg{} = Msg) ->
Msg = msg_body_encoded(Sections, Payload),
Anns = essential_properties(Msg),
{Msg, Anns}.
convert_from(?MODULE, Sections, _Env) ->
element(1, init(Sections));
convert_from(?MODULE, Sections, _Env) when is_list(Sections) ->
msg_body_decoded(Sections);
convert_from(_SourceProto, _, _Env) ->
not_implemented.
size(#msg{data = Body}) ->
%% TODO how to estimate anything but data sections?
BodySize = if is_list(Body) ->
lists:foldl(
fun(#'v1_0.data'{content = Data}, Acc) ->
iolist_size(Data) + Acc;
(#'v1_0.amqp_sequence'{content = _}, Acc) ->
Acc
end, 0, Body);
is_record(Body, 'v1_0.amqp_value') ->
0
end,
{_MetaSize = 0, BodySize}.
convert_to(?MODULE, Msg, _Env) ->
Msg;
convert_to(TargetProto, Msg, Env) ->
TargetProto:convert_from(?MODULE, msg_to_sections(Msg), Env).
size(#v1{bare_and_footer = Body}) ->
{_MetaSize = 0, byte_size(Body)}.
x_header(Key, Msg) ->
message_annotation(Key, Msg, undefined).
property(correlation_id, #msg{properties = #'v1_0.properties'{correlation_id = Corr}}) ->
property(correlation_id, #msg_body_encoded{properties = #'v1_0.properties'{correlation_id = Corr}}) ->
Corr;
property(message_id, #msg{properties = #'v1_0.properties'{message_id = MsgId}}) ->
property(message_id, #msg_body_encoded{properties = #'v1_0.properties'{message_id = MsgId}}) ->
MsgId;
property(user_id, #msg{properties = #'v1_0.properties'{user_id = UserId}}) ->
property(user_id, #msg_body_encoded{properties = #'v1_0.properties'{user_id = UserId}}) ->
UserId;
property(subject, #msg{properties = #'v1_0.properties'{subject = Subject}}) ->
property(subject, #msg_body_encoded{properties = #'v1_0.properties'{subject = Subject}}) ->
Subject;
property(to, #msg{properties = #'v1_0.properties'{to = To}}) ->
property(to, #msg_body_encoded{properties = #'v1_0.properties'{to = To}}) ->
To;
property(_Prop, #msg{}) ->
property(_Prop, #msg_body_encoded{}) ->
undefined.
routing_headers(Msg, Opts) ->
@ -134,10 +137,10 @@ routing_headers(Msg, Opts) ->
get_property(durable, Msg) ->
case Msg of
#msg{header = #'v1_0.header'{durable = Durable}}
#msg_body_encoded{header = #'v1_0.header'{durable = Durable}}
when is_boolean(Durable) ->
Durable;
#msg{header = #'v1_0.header'{durable = {boolean, Durable}}} ->
#msg_body_encoded{header = #'v1_0.header'{durable = {boolean, Durable}}} ->
Durable;
_ ->
%% fallback in case the source protocol was old AMQP 0.9.1
@ -150,14 +153,14 @@ get_property(durable, Msg) ->
end;
get_property(timestamp, Msg) ->
case Msg of
#msg{properties = #'v1_0.properties'{creation_time = {timestamp, Ts}}} ->
#msg_body_encoded{properties = #'v1_0.properties'{creation_time = {timestamp, Ts}}} ->
Ts;
_ ->
undefined
end;
get_property(ttl, Msg) ->
case Msg of
#msg{header = #'v1_0.header'{ttl = {uint, Ttl}}} ->
#msg_body_encoded{header = #'v1_0.header'{ttl = {uint, Ttl}}} ->
Ttl;
_ ->
%% fallback in case the source protocol was AMQP 0.9.1
@ -171,7 +174,7 @@ get_property(ttl, Msg) ->
end;
get_property(priority, Msg) ->
case Msg of
#msg{header = #'v1_0.header'{priority = {ubyte, Priority}}} ->
#msg_body_encoded{header = #'v1_0.header'{priority = {ubyte, Priority}}} ->
Priority;
_ ->
%% fallback in case the source protocol was AMQP 0.9.1
@ -183,118 +186,183 @@ get_property(priority, Msg) ->
end
end.
convert_to(?MODULE, Msg, _Env) ->
Msg;
convert_to(TargetProto, Msg, Env) ->
TargetProto:convert_from(?MODULE, msg_to_sections(Msg), Env).
serialize(Sections) ->
encode_bin(Sections).
protocol_state(Msg0 = #msg{header = Header0,
message_annotations = MA0}, Anns) ->
Redelivered = maps:get(redelivered, Anns, false),
FirstAcquirer = not Redelivered,
%% protocol_state/2 serialises the protocol state outputting an AMQP encoded message.
-spec protocol_state(state(), mc:annotations()) -> iolist().
protocol_state(Msg0 = #msg_body_decoded{header = Header0,
message_annotations = MA0}, Anns) ->
FirstAcquirer = first_acquirer(Anns),
Header = case Header0 of
undefined ->
#'v1_0.header'{first_acquirer = FirstAcquirer};
#'v1_0.header'{} ->
Header0#'v1_0.header'{first_acquirer = FirstAcquirer}
end,
MA = protocol_state_message_annotations(MA0, Anns),
Msg = Msg0#msg_body_decoded{header = Header,
message_annotations = MA},
Sections = msg_to_sections(Msg),
encode(Sections);
protocol_state(#msg_body_encoded{header = Header0,
delivery_annotations = DA,
message_annotations = MA0,
bare_and_footer = BareAndFooter}, Anns) ->
FirstAcquirer = first_acquirer(Anns),
Header = case Header0 of
undefined ->
#'v1_0.header'{first_acquirer = FirstAcquirer};
#'v1_0.header'{} ->
Header0#'v1_0.header'{first_acquirer = FirstAcquirer}
end,
MA = protocol_state_message_annotations(MA0, Anns),
Sections = to_sections(Header, DA, MA, []),
[encode(Sections), BareAndFooter];
protocol_state(#v1{delivery_annotations = DA,
message_annotations = MA0,
bare_and_footer = BareAndFooter}, Anns) ->
Durable = case Anns of
#{?ANN_DURABLE := D} -> D;
_ -> true
end,
Priority = case Anns of
#{?ANN_PRIORITY := P} -> {ubyte, P};
_ -> undefined
end,
Ttl = case Anns of
#{ttl := V} -> {uint, V};
_ -> undefined
end,
Header = #'v1_0.header'{durable = Durable,
priority = Priority,
ttl = Ttl,
first_acquirer = first_acquirer(Anns)},
MA = protocol_state_message_annotations(MA0, Anns),
Sections = to_sections(Header, DA, MA, []),
[encode(Sections), BareAndFooter].
MA = maps:fold(fun(?ANN_EXCHANGE, Exchange, L) ->
maps_upsert(<<"x-exchange">>, {utf8, Exchange}, L);
(?ANN_ROUTING_KEYS, RKeys, L) ->
RKey = hd(RKeys),
maps_upsert(<<"x-routing-key">>, {utf8, RKey}, L);
(<<"x-", _/binary>> = K, V, L)
when V =/= undefined ->
%% any x-* annotations get added as message annotations
maps_upsert(K, mc_util:infer_type(V), L);
(<<"timestamp_in_ms">>, V, L) ->
maps_upsert(<<"x-opt-rabbitmq-received-time">>, {timestamp, V}, L);
(_, _, Acc) ->
Acc
end, MA0, Anns),
Msg = Msg0#msg{header = Header,
message_annotations = MA},
msg_to_sections(Msg).
prepare(_For, Msg) ->
Msg.
prepare(read, Msg) ->
Msg;
prepare(store, Msg = #v1{}) ->
Msg;
prepare(store, #msg_body_encoded{delivery_annotations = DA,
message_annotations = MA,
bare_and_footer = BF}) ->
#v1{delivery_annotations = DA,
message_annotations = MA,
bare_and_footer = BF}.
%% internal
msg_to_sections(#msg{header = H,
delivery_annotations = DAC,
message_annotations = MAC,
properties = P,
application_properties = APC,
data = Data,
footer = FC}) ->
Tail = case FC of
[] ->
[];
_ ->
[#'v1_0.footer'{content = FC}]
end,
S0 = case Data of
#'v1_0.amqp_value'{} ->
[Data | Tail];
_ when is_list(Data) ->
Data ++ Tail
end,
S1 = case APC of
msg_to_sections(#msg_body_decoded{header = H,
delivery_annotations = DAC,
message_annotations = MAC,
properties = P,
application_properties = APC,
data = Data,
footer = FC}) ->
S0 = case FC of
[] ->
S0;
[];
_ ->
[#'v1_0.application_properties'{content = APC} | S0]
[#'v1_0.footer'{content = FC}]
end,
S2 = case P of
undefined ->
S1;
_ ->
[P | S1]
end,
S3 = case MAC of
S = case Data of
#'v1_0.amqp_value'{} ->
[Data | S0];
_ when is_list(Data) ->
Data ++ S0
end,
to_sections(H, DAC, MAC, P, APC, S);
msg_to_sections(#msg_body_encoded{header = H,
delivery_annotations = DAC,
message_annotations = MAC,
properties = P,
application_properties = APC,
bare_and_footer = BareAndFooter,
bare_and_footer_body_pos = BodyPos}) ->
BodyAndFooterBin = binary_part(BareAndFooter,
BodyPos,
byte_size(BareAndFooter) - BodyPos),
%% TODO do not parse entire AMQP encoded amqp-value or amqp-sequence section body
BodyAndFooter = amqp10_framing:decode_bin(BodyAndFooterBin),
to_sections(H, DAC, MAC, P, APC, BodyAndFooter);
msg_to_sections(#v1{delivery_annotations = DAC,
message_annotations = MAC,
bare_and_footer = BareAndFooterBin}) ->
%% TODO do not parse entire AMQP encoded amqp-value or amqp-sequence section body
BareAndFooter = amqp10_framing:decode_bin(BareAndFooterBin),
to_sections(undefined, DAC, MAC, BareAndFooter).
to_sections(H, DAC, MAC, P, APC, Tail) ->
S0 = case APC of
[] ->
S2;
Tail;
_ ->
[#'v1_0.message_annotations'{content = MAC} | S2]
[#'v1_0.application_properties'{content = APC} | Tail]
end,
S4 = case DAC of
S = case P of
undefined ->
S0;
_ ->
[P | S0]
end,
to_sections(H, DAC, MAC, S).
to_sections(H, DAC, MAC, Tail) ->
S0 = case MAC of
[] ->
S3;
Tail;
_ ->
[#'v1_0.delivery_annotations'{content = DAC} | S3]
[#'v1_0.message_annotations'{content = MAC} | Tail]
end,
S = case DAC of
[] ->
S0;
_ ->
[#'v1_0.delivery_annotations'{content = DAC} | S0]
end,
case H of
undefined ->
S4;
S;
_ ->
[H | S4]
[H | S]
end.
-spec protocol_state_message_annotations(amqp_map(), mc:annotations()) -> amqp_map().
protocol_state_message_annotations(MA, Anns) ->
maps:fold(
fun(?ANN_EXCHANGE, Exchange, L) ->
maps_upsert(<<"x-exchange">>, {utf8, Exchange}, L);
(?ANN_ROUTING_KEYS, RKeys, L) ->
RKey = hd(RKeys),
maps_upsert(<<"x-routing-key">>, {utf8, RKey}, L);
(<<"x-", _/binary>> = K, V, L)
when V =/= undefined ->
%% any x-* annotations get added as message annotations
maps_upsert(K, mc_util:infer_type(V), L);
(<<"timestamp_in_ms">>, V, L) ->
maps_upsert(<<"x-opt-rabbitmq-received-time">>, {timestamp, V}, L);
(_, _, Acc) ->
Acc
end, MA, Anns).
maps_upsert(Key, TaggedVal, KVList) ->
TaggedKey = {symbol, Key},
Elem = {TaggedKey, TaggedVal},
lists:keystore(TaggedKey, 1, KVList, Elem).
encode_bin(undefined) ->
<<>>;
encode_bin(Sections) when is_list(Sections) ->
encode(Sections) when is_list(Sections) ->
[amqp10_framing:encode_bin(Section) || Section <- Sections,
not is_empty(Section)];
encode_bin(Section) ->
case is_empty(Section) of
true ->
<<>>;
false ->
amqp10_framing:encode_bin(Section)
end.
not is_empty(Section)].
is_empty(undefined) ->
is_empty(#'v1_0.header'{durable = undefined,
priority = undefined,
ttl = undefined,
first_acquirer = undefined,
delivery_count = undefined}) ->
true;
is_empty(#'v1_0.delivery_annotations'{content = []}) ->
true;
is_empty(#'v1_0.message_annotations'{content = []}) ->
true;
is_empty(#'v1_0.properties'{message_id = undefined,
user_id = undefined,
@ -312,33 +380,22 @@ is_empty(#'v1_0.properties'{message_id = undefined,
true;
is_empty(#'v1_0.application_properties'{content = []}) ->
true;
is_empty(#'v1_0.message_annotations'{content = []}) ->
true;
is_empty(#'v1_0.delivery_annotations'{content = []}) ->
true;
is_empty(#'v1_0.footer'{content = []}) ->
true;
is_empty(#'v1_0.header'{durable = undefined,
priority = undefined,
ttl = undefined,
first_acquirer = undefined,
delivery_count = undefined}) ->
true;
is_empty(_) ->
false.
message_annotation(_Key, #msg{message_annotations = []},
message_annotation(_Key, #msg_body_encoded{message_annotations = []},
Default) ->
Default;
message_annotation(Key, #msg{message_annotations = Content},
message_annotation(Key, #msg_body_encoded{message_annotations = Content},
Default)
when is_binary(Key) ->
mc_util:amqp_map_get(Key, Content, Default).
message_annotations_as_simple_map(#msg{message_annotations = []}) ->
message_annotations_as_simple_map(#msg_body_encoded{message_annotations = []}) ->
[];
message_annotations_as_simple_map(#msg{message_annotations = Content}) ->
message_annotations_as_simple_map(#msg_body_encoded{message_annotations = Content}) ->
%% the section record format really is terrible
lists:filtermap(fun({{symbol, K}, {_T, V}})
when ?SIMPLE_VALUE(V) ->
@ -347,9 +404,9 @@ message_annotations_as_simple_map(#msg{message_annotations = Content}) ->
false
end, Content).
application_properties_as_simple_map(#msg{application_properties = []}, L) ->
application_properties_as_simple_map(#msg_body_encoded{application_properties = []}, L) ->
L;
application_properties_as_simple_map(#msg{application_properties = Content},
application_properties_as_simple_map(#msg_body_encoded{application_properties = Content},
L) ->
%% the section record format really is terrible
lists:foldl(fun({{utf8, K}, {_T, V}}, Acc)
@ -362,29 +419,70 @@ application_properties_as_simple_map(#msg{application_properties = Content},
Acc
end, L, Content).
decode([], Acc) ->
msg_body_decoded(Sections) ->
msg_body_decoded(Sections, #msg_body_decoded{}).
msg_body_decoded([], Acc) ->
Acc;
decode([#'v1_0.header'{} = H | Rem], Msg) ->
decode(Rem, Msg#msg{header = H});
decode([#'v1_0.message_annotations'{content = MAC} | Rem], Msg) ->
decode(Rem, Msg#msg{message_annotations = MAC});
decode([#'v1_0.properties'{} = P | Rem], Msg) ->
decode(Rem, Msg#msg{properties = P});
decode([#'v1_0.application_properties'{content = APC} | Rem], Msg) ->
decode(Rem, Msg#msg{application_properties = APC});
decode([#'v1_0.delivery_annotations'{content = DAC} | Rem], Msg) ->
decode(Rem, Msg#msg{delivery_annotations = DAC});
decode([#'v1_0.data'{} = D | Rem], #msg{data = Body} = Msg)
msg_body_decoded([#'v1_0.header'{} = H | Rem], Msg) ->
msg_body_decoded(Rem, Msg#msg_body_decoded{header = H});
msg_body_decoded([#'v1_0.message_annotations'{content = MAC} | Rem], Msg) ->
msg_body_decoded(Rem, Msg#msg_body_decoded{message_annotations = MAC});
msg_body_decoded([#'v1_0.properties'{} = P | Rem], Msg) ->
msg_body_decoded(Rem, Msg#msg_body_decoded{properties = P});
msg_body_decoded([#'v1_0.application_properties'{content = APC} | Rem], Msg) ->
msg_body_decoded(Rem, Msg#msg_body_decoded{application_properties = APC});
msg_body_decoded([#'v1_0.delivery_annotations'{content = DAC} | Rem], Msg) ->
msg_body_decoded(Rem, Msg#msg_body_decoded{delivery_annotations = DAC});
msg_body_decoded([#'v1_0.data'{} = D | Rem], #msg_body_decoded{data = Body} = Msg)
when is_list(Body) ->
decode(Rem, Msg#msg{data = Body ++ [D]});
decode([#'v1_0.amqp_sequence'{} = D | Rem], #msg{data = Body} = Msg)
msg_body_decoded(Rem, Msg#msg_body_decoded{data = Body ++ [D]});
msg_body_decoded([#'v1_0.amqp_sequence'{} = D | Rem], #msg_body_decoded{data = Body} = Msg)
when is_list(Body) ->
decode(Rem, Msg#msg{data = Body ++ [D]});
decode([#'v1_0.footer'{content = FC} | Rem], Msg) ->
decode(Rem, Msg#msg{footer = FC});
decode([#'v1_0.amqp_value'{} = B | Rem], #msg{} = Msg) ->
msg_body_decoded(Rem, Msg#msg_body_decoded{data = Body ++ [D]});
msg_body_decoded([#'v1_0.footer'{content = FC} | Rem], Msg) ->
msg_body_decoded(Rem, Msg#msg_body_decoded{footer = FC});
msg_body_decoded([#'v1_0.amqp_value'{} = B | Rem], #msg_body_decoded{} = Msg) ->
%% an amqp value can only be a singleton
decode(Rem, Msg#msg{data = B}).
msg_body_decoded(Rem, Msg#msg_body_decoded{data = B}).
msg_body_encoded(Sections, Payload) ->
msg_body_encoded(Sections, Payload, #msg_body_encoded{}).
msg_body_encoded([#'v1_0.header'{} = H | Rem], Payload, Msg) ->
msg_body_encoded(Rem, Payload, Msg#msg_body_encoded{header = H});
msg_body_encoded([#'v1_0.delivery_annotations'{content = DAC} | Rem], Payload, Msg) ->
msg_body_encoded(Rem, Payload, Msg#msg_body_encoded{delivery_annotations = DAC});
msg_body_encoded([#'v1_0.message_annotations'{content = MAC} | Rem], Payload, Msg) ->
msg_body_encoded(Rem, Payload, Msg#msg_body_encoded{message_annotations = MAC});
msg_body_encoded([{{pos, Pos}, #'v1_0.properties'{} = Props} | Rem], Payload, Msg) ->
%% properties is the first bare message section
Bin = binary_part_bare_and_footer(Payload, Pos),
msg_body_encoded(Rem, Pos, Msg#msg_body_encoded{properties = Props,
bare_and_footer = Bin});
msg_body_encoded([{{pos, Pos}, #'v1_0.application_properties'{content = APC}} | Rem], Payload, Msg)
when is_binary(Payload) ->
%% application-properties is the first bare message section
Bin = binary_part_bare_and_footer(Payload, Pos),
msg_body_encoded(Rem, Pos, Msg#msg_body_encoded{application_properties = APC,
bare_and_footer = Bin});
msg_body_encoded([{{pos, _Pos}, #'v1_0.application_properties'{content = APC}} | Rem], BarePos, Msg)
when is_integer(BarePos) ->
msg_body_encoded(Rem, BarePos, Msg#msg_body_encoded{application_properties = APC});
%% Base case: we assert the last part contains the mandatory body:
msg_body_encoded([{{pos, Pos}, body}], Payload, Msg)
when is_binary(Payload) ->
%% The body is the first bare message section.
Bin = binary_part_bare_and_footer(Payload, Pos),
Msg#msg_body_encoded{bare_and_footer = Bin,
bare_and_footer_body_pos = 0};
msg_body_encoded([{{pos, Pos}, body}], BarePos, Msg)
when is_integer(BarePos) ->
Msg#msg_body_encoded{bare_and_footer_body_pos = Pos - BarePos}.
%% We extract the binary part of the payload exactly once when the bare message starts.
binary_part_bare_and_footer(Payload, Start) ->
binary_part(Payload, Start, byte_size(Payload) - Start).
key_find(K, [{{_, K}, {_, V}} | _]) ->
V;
@ -393,6 +491,14 @@ key_find(K, [_ | Rem]) ->
key_find(_K, []) ->
undefined.
-spec first_acquirer(mc:annotations()) -> boolean().
first_acquirer(Anns) ->
Redelivered = case Anns of
#{redelivered := R} -> R;
_ -> false
end,
not Redelivered.
recover_deaths([], Acc) ->
Acc;
recover_deaths([{map, Kvs} | Rem], Acc) ->
@ -415,7 +521,7 @@ recover_deaths([{map, Kvs} | Rem], Acc) ->
count = key_find(<<"count">>, Kvs),
routing_keys = RKeys}}).
essential_properties(#msg{message_annotations = MA} = Msg) ->
essential_properties(#msg_body_encoded{message_annotations = MA} = Msg) ->
Durable = get_property(durable, Msg),
Priority = get_property(priority, Msg),
Timestamp = get_property(timestamp, Msg),

View File

@ -391,7 +391,9 @@ convert_to(mc_amqp, #content{payload_fragments_rev = Payload} = Content, Env) ->
convert_to(_TargetProto, _Content, _Env) ->
not_implemented.
protocol_state(#content{properties = #'P_basic'{headers = H00} = B0} = C,
protocol_state(#content{properties = #'P_basic'{headers = H00,
priority = Priority0,
delivery_mode = DeliveryMode0} = B0} = C,
Anns) ->
%% Add any x- annotations as headers
H0 = case H00 of
@ -434,16 +436,38 @@ protocol_state(#content{properties = #'P_basic'{headers = H00} = B0} = C,
%% publishes
undefined;
#{ttl := Ttl} ->
%% not sure this will ever happen
%% as we only ever unset the expiry
integer_to_binary(Ttl);
_ ->
B0#'P_basic'.expiration
end,
B = B0#'P_basic'{timestamp = Timestamp,
Priority = case Priority0 of
undefined ->
case Anns of
%% This branch is hit when a message with priority was originally
%% published with AMQP to a classic or quorum queue.
#{?ANN_PRIORITY := P} -> P;
_ -> undefined
end;
_ ->
Priority0
end,
DeliveryMode = case DeliveryMode0 of
undefined ->
%% This branch is hit when a message was originally
%% published with AMQP to a classic or quorum queue.
case Anns of
#{?ANN_DURABLE := true} -> 2;
#{?ANN_DURABLE := false} -> 1;
_ -> 2
end;
_ ->
DeliveryMode0
end,
B = B0#'P_basic'{headers = Headers,
delivery_mode = DeliveryMode,
priority = Priority,
expiration = Expiration,
headers = Headers},
timestamp = Timestamp},
C#content{properties = B,
properties_bin = none};

View File

@ -364,6 +364,9 @@ handle_frame0(Mode, Channel, Body, State) ->
%% "The frame body is defined as a performative followed by an opaque payload." [2.3.2]
parse_frame_body(Body, _Channel) ->
%% TODO test binary_part() here instead of returning binaries in amqp10_binary_parser:parse/1.
%% The latter can return the number of bytes parsed instead.
%% This should remove all warnings from the parser when compiliation option bin_opt_info is set.
{DescribedPerformative, Payload} = amqp10_binary_parser:parse(Body),
Performative = amqp10_framing:decode(DescribedPerformative),
?DEBUG("~s Channel ~tp ->~n~tp~n~ts~n",

View File

@ -1622,11 +1622,10 @@ handle_deliver(ConsumerTag, AckRequired,
settled = SendSettled},
Mc1 = mc:convert(mc_amqp, Mc0),
Mc = mc:set_annotation(redelivered, Redelivered, Mc1),
Sections0 = mc:protocol_state(Mc),
Sections = mc_amqp:serialize(Sections0),
?DEBUG("~s Outbound payload:~n ~tp~n",
[?MODULE, [amqp10_framing:pprint(Section) ||
Section <- amqp10_framing:decode_bin(iolist_to_binary(Sections))]]),
Sections = mc:protocol_state(Mc),
% ?DEBUG("~s Outbound payload:~n ~tp~n",
% [?MODULE, [amqp10_framing:pprint(Section) ||
% Section <- amqp10_framing:decode_bin(iolist_to_binary(Sections))]]),
validate_message_size(Sections, MaxMessageSize),
Frames = transfer_frames(Transfer, Sections, MaxFrameSize),
messages_delivered(Redelivered, QType),
@ -1876,7 +1875,6 @@ incoming_link_transfer(
end,
validate_transfer_rcv_settle_mode(RcvSettleMode, Settled),
validate_incoming_message_size(PayloadBin),
% Sections = amqp10_framing:decode_bin(PayloadBin),
% ?DEBUG("~s Inbound payload:~n ~tp",
% [?MODULE, [amqp10_framing:pprint(Section) || Section <- Sections]]),

View File

@ -553,8 +553,7 @@ deliver0(MsgId, Msg,
stream_message(Msg, FilteringSupported) ->
McAmqp = mc:convert(mc_amqp, Msg),
Sections = mc:protocol_state(McAmqp),
MsgData = mc_amqp:serialize(Sections),
MsgData = mc:protocol_state(McAmqp),
case FilteringSupported of
true ->
case mc:x_header(<<"x-stream-filter-value">>, McAmqp) of

View File

@ -685,18 +685,33 @@ amqp10_to_amqp091_header_conversion(Session,Ch, QName, Address) ->
"x-int" => 3,
"x-bool" => true},
OutMsg2),
ok = amqp10_client:send_msg(Sender, OutMsg3),
OutMsg = amqp10_msg:set_headers(
#{durable => true,
priority => 7,
ttl => 88000},
OutMsg3),
ok = amqp10_client:send_msg(Sender, OutMsg),
ok = wait_for_accepts(1),
{ok, Headers} = amqp091_get_msg_headers(Ch, QName),
{#'basic.get_ok'{},
#amqp_msg{props = #'P_basic'{headers = Headers,
delivery_mode = DeliveryMode,
priority = Priority,
expiration = Expiration}}
} = amqp_channel:call(Ch, #'basic.get'{queue = QName, no_ack = true}),
%% assert application properties
?assertEqual({longstr, <<"string-val">>}, rabbit_misc:table_lookup(Headers, <<"string">>)),
?assertEqual({unsignedint, 2}, rabbit_misc:table_lookup(Headers, <<"int">>)),
?assertEqual({bool, false}, rabbit_misc:table_lookup(Headers, <<"bool">>)),
%% assert message annotations
?assertEqual({longstr, <<"string-value">>}, rabbit_misc:table_lookup(Headers, <<"x-string">>)),
?assertEqual({unsignedint, 3}, rabbit_misc:table_lookup(Headers, <<"x-int">>)),
?assertEqual({bool, true}, rabbit_misc:table_lookup(Headers, <<"x-bool">>)).
?assertEqual({bool, true}, rabbit_misc:table_lookup(Headers, <<"x-bool">>)),
%% assert headers
?assertEqual(2, DeliveryMode),
?assertEqual(7, Priority),
?assertEqual(<<"88000">>, Expiration).
amqp091_to_amqp10_header_conversion(Session, Ch, QName, Address) ->
Amqp091Headers = [{<<"x-forwarding">>, array,
@ -3451,11 +3466,6 @@ delete_queue(Session, QName) ->
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair).
amqp091_get_msg_headers(Channel, QName) ->
{#'basic.get_ok'{}, #amqp_msg{props = #'P_basic'{ headers= Headers}}}
= amqp_channel:call(Channel, #'basic.get'{queue = QName, no_ack = true}),
{ok, Headers}.
create_amqp10_sender(Session, Address) ->
{ok, Sender} = amqp10_client:attach_sender_link(
Session, <<"test-sender">>, Address),