diff --git a/deps/amqp10_common/src/amqp10_binary_parser.erl b/deps/amqp10_common/src/amqp10_binary_parser.erl index dd324658d1..ae0438f871 100644 --- a/deps/amqp10_common/src/amqp10_binary_parser.erl +++ b/deps/amqp10_common/src/amqp10_binary_parser.erl @@ -12,6 +12,10 @@ -export([parse/1, parse_many/2]). +-ifdef(TEST). +-export([parse_many_slow/1]). +-endif. + %% TODO put often matched binary function clauses to the top? %% server_mode is a special parsing mode used by RabbitMQ when parsing @@ -25,7 +29,7 @@ %% AMQP 3.2.4 & 3.2.5 -define(NUMERIC_DESCRIPTOR_IS_PROPERTIES_OR_APPLICATION_PROPERTIES(Code), Code =:= 16#73 orelse - Code =< 16#74). + Code =:= 16#74). -define(SYMBOLIC_DESCRIPTOR_IS_PROPERTIES_OR_APPLICATION_PROPERTIES(Name), Name =:= <<"amqp:properties:list">> orelse Name =:= <<"amqp:application-properties:map">>). @@ -185,7 +189,6 @@ mapify([Key, Value | Rest]) -> [{Key, Value} | mapify(Rest)]. -ifdef(TEST). --export([parse_many_slow/1]). %% This is the old, slow, original parser implemenation before %% https://github.com/rabbitmq/rabbitmq-server/pull/4811 %% where many sub binaries are being created. diff --git a/deps/amqp10_common/src/amqp10_framing.erl b/deps/amqp10_common/src/amqp10_framing.erl index 6c9a284ac4..4bd60ef61f 100644 --- a/deps/amqp10_common/src/amqp10_framing.erl +++ b/deps/amqp10_common/src/amqp10_framing.erl @@ -15,7 +15,7 @@ decode_bin/1, decode_bin/2, symbol_for/1, - number_for/1, + number_for/1, pprint/1]). %% debug @@ -187,7 +187,7 @@ encode_bin(X) -> -spec decode_bin(binary()) -> [term()]. decode_bin(Binary) -> - [decode(Section) || Section <- amqp10_binary_parser:parse_many(Binary, #{})]. + [decode(Section) || Section <- amqp10_binary_parser:parse_many(Binary, [])]. -spec decode_bin(binary(), amqp10_binary_parser:opts()) -> [term()]. decode_bin(Binary, Opts) -> diff --git a/deps/amqp10_common/test/binary_generator_SUITE.erl b/deps/amqp10_common/test/binary_generator_SUITE.erl index 6e16e4f27e..7672d0cba8 100644 --- a/deps/amqp10_common/test/binary_generator_SUITE.erl +++ b/deps/amqp10_common/test/binary_generator_SUITE.erl @@ -187,7 +187,7 @@ roundtrip(Term) -> Bin = iolist_to_binary(amqp10_binary_generator:generate(Term)), % generate returns an iolist but parse expects a binary ?assertEqual({Term, <<>>}, amqp10_binary_parser:parse(Bin)), - ?assertEqual([Term], amqp10_binary_parser:parse_many(Bin, #{})). + ?assertEqual([Term], amqp10_binary_parser:parse_many(Bin, [])). %% Return the roundtripped term. roundtrip_return(Term) -> @@ -195,5 +195,5 @@ roundtrip_return(Term) -> %% We assert only that amqp10_binary_parser:parse/1 and %% amqp10_binary_parser:parse_all/1 return the same term. {RoundTripTerm, <<>>} = amqp10_binary_parser:parse(Bin), - ?assertEqual([RoundTripTerm], amqp10_binary_parser:parse_many(Bin, #{})), + ?assertEqual([RoundTripTerm], amqp10_binary_parser:parse_many(Bin, [])), RoundTripTerm. diff --git a/deps/amqp10_common/test/binary_parser_SUITE.erl b/deps/amqp10_common/test/binary_parser_SUITE.erl index 2e6b45b94b..597ed2a68b 100644 --- a/deps/amqp10_common/test/binary_parser_SUITE.erl +++ b/deps/amqp10_common/test/binary_parser_SUITE.erl @@ -71,7 +71,7 @@ roundtrip(_Config) -> end, <<>>, Terms), ?assertEqual(Terms, amqp10_binary_parser:parse_many_slow(Bin)), - ?assertEqual(Terms, amqp10_binary_parser:parse_many(Bin, #{})). + ?assertEqual(Terms, amqp10_binary_parser:parse_many(Bin, [])). array_with_extra_input(_Config) -> Bin = <<83,16,192,85,10,177,0,0,0,1,48,161,12,114,97,98,98,105,116, 109,113,45,98,111,120,112,255,255,0,0,96,0,50,112,0,0,19,136,163,5,101,110,45,85,83,224,14,2,65,5,102,105,45,70,73,5,101,110,45,85,83,64,64,193,24,2,163,20,68,69,70,69,78,83,73,67,83,46,84,69,83,84,46,83,85,73,84,69,65>>, @@ -81,10 +81,12 @@ array_with_extra_input(_Config) -> 65, <<105,45,70,73,5,101,110,45,85,83>>, [true,true]}, ?assertExit(Expected, amqp10_binary_parser:parse_many_slow(Bin)), - ?assertExit(Expected, amqp10_binary_parser:parse_many(Bin, #{})). + ?assertExit(Expected, amqp10_binary_parser:parse_many(Bin, [])). unsupported_type(_Config) -> - Bin = <<2/integer, "hey">>, - Expected = {primitive_type_unsupported, 16#02, <<"hey">>}, - ?assertThrow(Expected, amqp10_binary_parser:parse_many_slow(Bin)), - ?assertThrow(Expected, amqp10_binary_parser:parse_many(Bin, #{})). + UnsupportedType = 16#02, + Bin = <>, + ?assertThrow({primitive_type_unsupported, UnsupportedType}, + amqp10_binary_parser:parse_many_slow(Bin)), + ?assertThrow({primitive_type_unsupported, UnsupportedType, {position, 0}}, + amqp10_binary_parser:parse_many(Bin, [])). diff --git a/deps/rabbit/src/mc.erl b/deps/rabbit/src/mc.erl index 1e34a4a494..bd7679e734 100644 --- a/deps/rabbit/src/mc.erl +++ b/deps/rabbit/src/mc.erl @@ -67,7 +67,8 @@ -export_type([ state/0, ann_key/0, - ann_value/0 + ann_value/0, + annotations/0 ]). -type proto_state() :: term(). diff --git a/deps/rabbit/src/mc_amqp.erl b/deps/rabbit/src/mc_amqp.erl index 444b0e561e..526b1d0299 100644 --- a/deps/rabbit/src/mc_amqp.erl +++ b/deps/rabbit/src/mc_amqp.erl @@ -13,7 +13,6 @@ convert_to/3, convert_from/3, protocol_state/2, - serialize/1, prepare/2 ]). @@ -37,88 +36,92 @@ is_number(V) orelse is_boolean(V)). -%% Before storing the message on disk in classic queues and quorum queues, -%% for better performance and less disk usage, we we clear fields with redundant content. --define(CLEARED, c). - -type amqp10_data() :: [#'v1_0.amqp_sequence'{} | #'v1_0.data'{}] | #'v1_0.amqp_value'{}. -type amqp_map() :: [{term(), term()}]. +-type opt(T) :: T | undefined. --record(msg, +%% This representation is used when the message was originally sent with +%% a protocol other than AMQP and the message was not read from a stream. +-record(msg_body_decoded, + { + header :: opt(#'v1_0.header'{}), + delivery_annotations = []:: list(), + message_annotations = [] :: list(), + properties :: opt(#'v1_0.properties'{}), + application_properties = [] :: list(), + data = [] :: amqp10_data(), + footer = [] :: list() + }). + +%% This representation is used when we received the message from +%% an AMQP client or when we read the message from a stream. +%% This message was parsed up to the section preceding the body. +-record(msg_body_encoded, + { + header :: opt(#'v1_0.header'{}), + delivery_annotations = [] :: amqp_map(), + message_annotations = [] :: amqp_map(), + properties :: opt(#'v1_0.properties'{}), + application_properties = [] :: amqp_map(), + bare_and_footer = uninit :: uninit | binary(), + %% byte position within bare_and_footer where body starts + bare_and_footer_body_pos = uninit :: uninit | non_neg_integer() + }). + +%% This representation is how we store the message on disk in classic queues +%% and quorum queues. For better performance and less disk usage, we omit the +%% header because the header fields we're interested in are already set as mc +%% annotations. We store the original bare message unaltered to preserve +%% message hashes on the binary encoding of the bare message [ยง3.2]. +%% The record is called v1 just in case we ever want to introduce a new v2 +%% on disk representation in the future. +-record(v1, { - %% We can clear this field before storage because we already set - %% the header fields we're interested in as mc annotations. - header :: none | ?CLEARED | #'v1_0.header'{}, delivery_annotations = [] :: amqp_map(), message_annotations = [] :: amqp_map(), - %% We can clear properties and application_properties before storage - %% because they can be parsed from the bare message in bare_and_footer. - properties :: none | ?CLEARED | #'v1_0.properties'{}, - application_properties = [] :: ?CLEARED | amqp_map(), - %% We must forward the bare message unaltered to preserve message - %% hashes on the binary encoding of the bare message [3.2]. bare_and_footer :: binary() }). --opaque state() :: #msg{}. +-opaque state() :: #msg_body_decoded{} | #msg_body_encoded{} | #v1{}. -export_type([ state/0, message_section/0 ]). -init(Sections) when is_list(Sections) -> - Msg = decode(Sections, #msg{}), - init(Msg); init(Payload) when is_binary(Payload) -> Sections = amqp10_framing:decode_bin(Payload, [server_mode]), - {value, {{pos, Pos}, _FirstBareMsgSection}} = lists:search(fun({{pos, _P}, _Sect}) -> - true; - (_Sect) -> - false - end, Sections), - %% Assert the message contains a mandatory body. - {{pos, _}, body} = lists:last(Sections), - BareAndFooter = binary_part(Payload, Pos, byte_size(Payload) - Pos), - Msg = decode(Sections, #msg{}), - init(Msg); -init(#msg{} = Msg) -> + Msg = msg_body_encoded(Sections, Payload), Anns = essential_properties(Msg), {Msg, Anns}. -convert_from(?MODULE, Sections, _Env) -> - element(1, init(Sections)); +convert_from(?MODULE, Sections, _Env) when is_list(Sections) -> + msg_body_decoded(Sections); convert_from(_SourceProto, _, _Env) -> not_implemented. -size(#msg{data = Body}) -> - %% TODO how to estimate anything but data sections? - BodySize = if is_list(Body) -> - lists:foldl( - fun(#'v1_0.data'{content = Data}, Acc) -> - iolist_size(Data) + Acc; - (#'v1_0.amqp_sequence'{content = _}, Acc) -> - Acc - end, 0, Body); - is_record(Body, 'v1_0.amqp_value') -> - 0 - end, - {_MetaSize = 0, BodySize}. +convert_to(?MODULE, Msg, _Env) -> + Msg; +convert_to(TargetProto, Msg, Env) -> + TargetProto:convert_from(?MODULE, msg_to_sections(Msg), Env). + +size(#v1{bare_and_footer = Body}) -> + {_MetaSize = 0, byte_size(Body)}. x_header(Key, Msg) -> message_annotation(Key, Msg, undefined). -property(correlation_id, #msg{properties = #'v1_0.properties'{correlation_id = Corr}}) -> +property(correlation_id, #msg_body_encoded{properties = #'v1_0.properties'{correlation_id = Corr}}) -> Corr; -property(message_id, #msg{properties = #'v1_0.properties'{message_id = MsgId}}) -> +property(message_id, #msg_body_encoded{properties = #'v1_0.properties'{message_id = MsgId}}) -> MsgId; -property(user_id, #msg{properties = #'v1_0.properties'{user_id = UserId}}) -> +property(user_id, #msg_body_encoded{properties = #'v1_0.properties'{user_id = UserId}}) -> UserId; -property(subject, #msg{properties = #'v1_0.properties'{subject = Subject}}) -> +property(subject, #msg_body_encoded{properties = #'v1_0.properties'{subject = Subject}}) -> Subject; -property(to, #msg{properties = #'v1_0.properties'{to = To}}) -> +property(to, #msg_body_encoded{properties = #'v1_0.properties'{to = To}}) -> To; -property(_Prop, #msg{}) -> +property(_Prop, #msg_body_encoded{}) -> undefined. routing_headers(Msg, Opts) -> @@ -134,10 +137,10 @@ routing_headers(Msg, Opts) -> get_property(durable, Msg) -> case Msg of - #msg{header = #'v1_0.header'{durable = Durable}} + #msg_body_encoded{header = #'v1_0.header'{durable = Durable}} when is_boolean(Durable) -> Durable; - #msg{header = #'v1_0.header'{durable = {boolean, Durable}}} -> + #msg_body_encoded{header = #'v1_0.header'{durable = {boolean, Durable}}} -> Durable; _ -> %% fallback in case the source protocol was old AMQP 0.9.1 @@ -150,14 +153,14 @@ get_property(durable, Msg) -> end; get_property(timestamp, Msg) -> case Msg of - #msg{properties = #'v1_0.properties'{creation_time = {timestamp, Ts}}} -> + #msg_body_encoded{properties = #'v1_0.properties'{creation_time = {timestamp, Ts}}} -> Ts; _ -> undefined end; get_property(ttl, Msg) -> case Msg of - #msg{header = #'v1_0.header'{ttl = {uint, Ttl}}} -> + #msg_body_encoded{header = #'v1_0.header'{ttl = {uint, Ttl}}} -> Ttl; _ -> %% fallback in case the source protocol was AMQP 0.9.1 @@ -171,7 +174,7 @@ get_property(ttl, Msg) -> end; get_property(priority, Msg) -> case Msg of - #msg{header = #'v1_0.header'{priority = {ubyte, Priority}}} -> + #msg_body_encoded{header = #'v1_0.header'{priority = {ubyte, Priority}}} -> Priority; _ -> %% fallback in case the source protocol was AMQP 0.9.1 @@ -183,118 +186,183 @@ get_property(priority, Msg) -> end end. -convert_to(?MODULE, Msg, _Env) -> - Msg; -convert_to(TargetProto, Msg, Env) -> - TargetProto:convert_from(?MODULE, msg_to_sections(Msg), Env). - -serialize(Sections) -> - encode_bin(Sections). - -protocol_state(Msg0 = #msg{header = Header0, - message_annotations = MA0}, Anns) -> - Redelivered = maps:get(redelivered, Anns, false), - FirstAcquirer = not Redelivered, +%% protocol_state/2 serialises the protocol state outputting an AMQP encoded message. +-spec protocol_state(state(), mc:annotations()) -> iolist(). +protocol_state(Msg0 = #msg_body_decoded{header = Header0, + message_annotations = MA0}, Anns) -> + FirstAcquirer = first_acquirer(Anns), Header = case Header0 of undefined -> #'v1_0.header'{first_acquirer = FirstAcquirer}; #'v1_0.header'{} -> Header0#'v1_0.header'{first_acquirer = FirstAcquirer} end, + MA = protocol_state_message_annotations(MA0, Anns), + Msg = Msg0#msg_body_decoded{header = Header, + message_annotations = MA}, + Sections = msg_to_sections(Msg), + encode(Sections); +protocol_state(#msg_body_encoded{header = Header0, + delivery_annotations = DA, + message_annotations = MA0, + bare_and_footer = BareAndFooter}, Anns) -> + FirstAcquirer = first_acquirer(Anns), + Header = case Header0 of + undefined -> + #'v1_0.header'{first_acquirer = FirstAcquirer}; + #'v1_0.header'{} -> + Header0#'v1_0.header'{first_acquirer = FirstAcquirer} + end, + MA = protocol_state_message_annotations(MA0, Anns), + Sections = to_sections(Header, DA, MA, []), + [encode(Sections), BareAndFooter]; +protocol_state(#v1{delivery_annotations = DA, + message_annotations = MA0, + bare_and_footer = BareAndFooter}, Anns) -> + Durable = case Anns of + #{?ANN_DURABLE := D} -> D; + _ -> true + end, + Priority = case Anns of + #{?ANN_PRIORITY := P} -> {ubyte, P}; + _ -> undefined + end, + Ttl = case Anns of + #{ttl := V} -> {uint, V}; + _ -> undefined + end, + Header = #'v1_0.header'{durable = Durable, + priority = Priority, + ttl = Ttl, + first_acquirer = first_acquirer(Anns)}, + MA = protocol_state_message_annotations(MA0, Anns), + Sections = to_sections(Header, DA, MA, []), + [encode(Sections), BareAndFooter]. - MA = maps:fold(fun(?ANN_EXCHANGE, Exchange, L) -> - maps_upsert(<<"x-exchange">>, {utf8, Exchange}, L); - (?ANN_ROUTING_KEYS, RKeys, L) -> - RKey = hd(RKeys), - maps_upsert(<<"x-routing-key">>, {utf8, RKey}, L); - (<<"x-", _/binary>> = K, V, L) - when V =/= undefined -> - %% any x-* annotations get added as message annotations - maps_upsert(K, mc_util:infer_type(V), L); - (<<"timestamp_in_ms">>, V, L) -> - maps_upsert(<<"x-opt-rabbitmq-received-time">>, {timestamp, V}, L); - (_, _, Acc) -> - Acc - end, MA0, Anns), - - Msg = Msg0#msg{header = Header, - message_annotations = MA}, - msg_to_sections(Msg). - -prepare(_For, Msg) -> - Msg. +prepare(read, Msg) -> + Msg; +prepare(store, Msg = #v1{}) -> + Msg; +prepare(store, #msg_body_encoded{delivery_annotations = DA, + message_annotations = MA, + bare_and_footer = BF}) -> + #v1{delivery_annotations = DA, + message_annotations = MA, + bare_and_footer = BF}. %% internal -msg_to_sections(#msg{header = H, - delivery_annotations = DAC, - message_annotations = MAC, - properties = P, - application_properties = APC, - data = Data, - footer = FC}) -> - Tail = case FC of - [] -> - []; - _ -> - [#'v1_0.footer'{content = FC}] - end, - S0 = case Data of - #'v1_0.amqp_value'{} -> - [Data | Tail]; - _ when is_list(Data) -> - Data ++ Tail - end, - S1 = case APC of +msg_to_sections(#msg_body_decoded{header = H, + delivery_annotations = DAC, + message_annotations = MAC, + properties = P, + application_properties = APC, + data = Data, + footer = FC}) -> + S0 = case FC of [] -> - S0; + []; _ -> - [#'v1_0.application_properties'{content = APC} | S0] + [#'v1_0.footer'{content = FC}] end, - S2 = case P of - undefined -> - S1; - _ -> - [P | S1] - end, - S3 = case MAC of + S = case Data of + #'v1_0.amqp_value'{} -> + [Data | S0]; + _ when is_list(Data) -> + Data ++ S0 + end, + to_sections(H, DAC, MAC, P, APC, S); +msg_to_sections(#msg_body_encoded{header = H, + delivery_annotations = DAC, + message_annotations = MAC, + properties = P, + application_properties = APC, + bare_and_footer = BareAndFooter, + bare_and_footer_body_pos = BodyPos}) -> + BodyAndFooterBin = binary_part(BareAndFooter, + BodyPos, + byte_size(BareAndFooter) - BodyPos), + %% TODO do not parse entire AMQP encoded amqp-value or amqp-sequence section body + BodyAndFooter = amqp10_framing:decode_bin(BodyAndFooterBin), + to_sections(H, DAC, MAC, P, APC, BodyAndFooter); +msg_to_sections(#v1{delivery_annotations = DAC, + message_annotations = MAC, + bare_and_footer = BareAndFooterBin}) -> + %% TODO do not parse entire AMQP encoded amqp-value or amqp-sequence section body + BareAndFooter = amqp10_framing:decode_bin(BareAndFooterBin), + to_sections(undefined, DAC, MAC, BareAndFooter). + +to_sections(H, DAC, MAC, P, APC, Tail) -> + S0 = case APC of [] -> - S2; + Tail; _ -> - [#'v1_0.message_annotations'{content = MAC} | S2] + [#'v1_0.application_properties'{content = APC} | Tail] end, - S4 = case DAC of + S = case P of + undefined -> + S0; + _ -> + [P | S0] + end, + to_sections(H, DAC, MAC, S). + +to_sections(H, DAC, MAC, Tail) -> + S0 = case MAC of [] -> - S3; + Tail; _ -> - [#'v1_0.delivery_annotations'{content = DAC} | S3] + [#'v1_0.message_annotations'{content = MAC} | Tail] end, + S = case DAC of + [] -> + S0; + _ -> + [#'v1_0.delivery_annotations'{content = DAC} | S0] + end, case H of undefined -> - S4; + S; _ -> - [H | S4] + [H | S] end. +-spec protocol_state_message_annotations(amqp_map(), mc:annotations()) -> amqp_map(). +protocol_state_message_annotations(MA, Anns) -> + maps:fold( + fun(?ANN_EXCHANGE, Exchange, L) -> + maps_upsert(<<"x-exchange">>, {utf8, Exchange}, L); + (?ANN_ROUTING_KEYS, RKeys, L) -> + RKey = hd(RKeys), + maps_upsert(<<"x-routing-key">>, {utf8, RKey}, L); + (<<"x-", _/binary>> = K, V, L) + when V =/= undefined -> + %% any x-* annotations get added as message annotations + maps_upsert(K, mc_util:infer_type(V), L); + (<<"timestamp_in_ms">>, V, L) -> + maps_upsert(<<"x-opt-rabbitmq-received-time">>, {timestamp, V}, L); + (_, _, Acc) -> + Acc + end, MA, Anns). + maps_upsert(Key, TaggedVal, KVList) -> TaggedKey = {symbol, Key}, Elem = {TaggedKey, TaggedVal}, lists:keystore(TaggedKey, 1, KVList, Elem). -encode_bin(undefined) -> - <<>>; -encode_bin(Sections) when is_list(Sections) -> +encode(Sections) when is_list(Sections) -> [amqp10_framing:encode_bin(Section) || Section <- Sections, - not is_empty(Section)]; -encode_bin(Section) -> - case is_empty(Section) of - true -> - <<>>; - false -> - amqp10_framing:encode_bin(Section) - end. + not is_empty(Section)]. -is_empty(undefined) -> +is_empty(#'v1_0.header'{durable = undefined, + priority = undefined, + ttl = undefined, + first_acquirer = undefined, + delivery_count = undefined}) -> + true; +is_empty(#'v1_0.delivery_annotations'{content = []}) -> + true; +is_empty(#'v1_0.message_annotations'{content = []}) -> true; is_empty(#'v1_0.properties'{message_id = undefined, user_id = undefined, @@ -312,33 +380,22 @@ is_empty(#'v1_0.properties'{message_id = undefined, true; is_empty(#'v1_0.application_properties'{content = []}) -> true; -is_empty(#'v1_0.message_annotations'{content = []}) -> - true; -is_empty(#'v1_0.delivery_annotations'{content = []}) -> - true; is_empty(#'v1_0.footer'{content = []}) -> true; -is_empty(#'v1_0.header'{durable = undefined, - priority = undefined, - ttl = undefined, - first_acquirer = undefined, - delivery_count = undefined}) -> - true; is_empty(_) -> false. - -message_annotation(_Key, #msg{message_annotations = []}, +message_annotation(_Key, #msg_body_encoded{message_annotations = []}, Default) -> Default; -message_annotation(Key, #msg{message_annotations = Content}, +message_annotation(Key, #msg_body_encoded{message_annotations = Content}, Default) when is_binary(Key) -> mc_util:amqp_map_get(Key, Content, Default). -message_annotations_as_simple_map(#msg{message_annotations = []}) -> +message_annotations_as_simple_map(#msg_body_encoded{message_annotations = []}) -> []; -message_annotations_as_simple_map(#msg{message_annotations = Content}) -> +message_annotations_as_simple_map(#msg_body_encoded{message_annotations = Content}) -> %% the section record format really is terrible lists:filtermap(fun({{symbol, K}, {_T, V}}) when ?SIMPLE_VALUE(V) -> @@ -347,9 +404,9 @@ message_annotations_as_simple_map(#msg{message_annotations = Content}) -> false end, Content). -application_properties_as_simple_map(#msg{application_properties = []}, L) -> +application_properties_as_simple_map(#msg_body_encoded{application_properties = []}, L) -> L; -application_properties_as_simple_map(#msg{application_properties = Content}, +application_properties_as_simple_map(#msg_body_encoded{application_properties = Content}, L) -> %% the section record format really is terrible lists:foldl(fun({{utf8, K}, {_T, V}}, Acc) @@ -362,29 +419,70 @@ application_properties_as_simple_map(#msg{application_properties = Content}, Acc end, L, Content). -decode([], Acc) -> +msg_body_decoded(Sections) -> + msg_body_decoded(Sections, #msg_body_decoded{}). + +msg_body_decoded([], Acc) -> Acc; -decode([#'v1_0.header'{} = H | Rem], Msg) -> - decode(Rem, Msg#msg{header = H}); -decode([#'v1_0.message_annotations'{content = MAC} | Rem], Msg) -> - decode(Rem, Msg#msg{message_annotations = MAC}); -decode([#'v1_0.properties'{} = P | Rem], Msg) -> - decode(Rem, Msg#msg{properties = P}); -decode([#'v1_0.application_properties'{content = APC} | Rem], Msg) -> - decode(Rem, Msg#msg{application_properties = APC}); -decode([#'v1_0.delivery_annotations'{content = DAC} | Rem], Msg) -> - decode(Rem, Msg#msg{delivery_annotations = DAC}); -decode([#'v1_0.data'{} = D | Rem], #msg{data = Body} = Msg) +msg_body_decoded([#'v1_0.header'{} = H | Rem], Msg) -> + msg_body_decoded(Rem, Msg#msg_body_decoded{header = H}); +msg_body_decoded([#'v1_0.message_annotations'{content = MAC} | Rem], Msg) -> + msg_body_decoded(Rem, Msg#msg_body_decoded{message_annotations = MAC}); +msg_body_decoded([#'v1_0.properties'{} = P | Rem], Msg) -> + msg_body_decoded(Rem, Msg#msg_body_decoded{properties = P}); +msg_body_decoded([#'v1_0.application_properties'{content = APC} | Rem], Msg) -> + msg_body_decoded(Rem, Msg#msg_body_decoded{application_properties = APC}); +msg_body_decoded([#'v1_0.delivery_annotations'{content = DAC} | Rem], Msg) -> + msg_body_decoded(Rem, Msg#msg_body_decoded{delivery_annotations = DAC}); +msg_body_decoded([#'v1_0.data'{} = D | Rem], #msg_body_decoded{data = Body} = Msg) when is_list(Body) -> - decode(Rem, Msg#msg{data = Body ++ [D]}); -decode([#'v1_0.amqp_sequence'{} = D | Rem], #msg{data = Body} = Msg) + msg_body_decoded(Rem, Msg#msg_body_decoded{data = Body ++ [D]}); +msg_body_decoded([#'v1_0.amqp_sequence'{} = D | Rem], #msg_body_decoded{data = Body} = Msg) when is_list(Body) -> - decode(Rem, Msg#msg{data = Body ++ [D]}); -decode([#'v1_0.footer'{content = FC} | Rem], Msg) -> - decode(Rem, Msg#msg{footer = FC}); -decode([#'v1_0.amqp_value'{} = B | Rem], #msg{} = Msg) -> + msg_body_decoded(Rem, Msg#msg_body_decoded{data = Body ++ [D]}); +msg_body_decoded([#'v1_0.footer'{content = FC} | Rem], Msg) -> + msg_body_decoded(Rem, Msg#msg_body_decoded{footer = FC}); +msg_body_decoded([#'v1_0.amqp_value'{} = B | Rem], #msg_body_decoded{} = Msg) -> %% an amqp value can only be a singleton - decode(Rem, Msg#msg{data = B}). + msg_body_decoded(Rem, Msg#msg_body_decoded{data = B}). + +msg_body_encoded(Sections, Payload) -> + msg_body_encoded(Sections, Payload, #msg_body_encoded{}). + +msg_body_encoded([#'v1_0.header'{} = H | Rem], Payload, Msg) -> + msg_body_encoded(Rem, Payload, Msg#msg_body_encoded{header = H}); +msg_body_encoded([#'v1_0.delivery_annotations'{content = DAC} | Rem], Payload, Msg) -> + msg_body_encoded(Rem, Payload, Msg#msg_body_encoded{delivery_annotations = DAC}); +msg_body_encoded([#'v1_0.message_annotations'{content = MAC} | Rem], Payload, Msg) -> + msg_body_encoded(Rem, Payload, Msg#msg_body_encoded{message_annotations = MAC}); +msg_body_encoded([{{pos, Pos}, #'v1_0.properties'{} = Props} | Rem], Payload, Msg) -> + %% properties is the first bare message section + Bin = binary_part_bare_and_footer(Payload, Pos), + msg_body_encoded(Rem, Pos, Msg#msg_body_encoded{properties = Props, + bare_and_footer = Bin}); +msg_body_encoded([{{pos, Pos}, #'v1_0.application_properties'{content = APC}} | Rem], Payload, Msg) + when is_binary(Payload) -> + %% application-properties is the first bare message section + Bin = binary_part_bare_and_footer(Payload, Pos), + msg_body_encoded(Rem, Pos, Msg#msg_body_encoded{application_properties = APC, + bare_and_footer = Bin}); +msg_body_encoded([{{pos, _Pos}, #'v1_0.application_properties'{content = APC}} | Rem], BarePos, Msg) + when is_integer(BarePos) -> + msg_body_encoded(Rem, BarePos, Msg#msg_body_encoded{application_properties = APC}); +%% Base case: we assert the last part contains the mandatory body: +msg_body_encoded([{{pos, Pos}, body}], Payload, Msg) + when is_binary(Payload) -> + %% The body is the first bare message section. + Bin = binary_part_bare_and_footer(Payload, Pos), + Msg#msg_body_encoded{bare_and_footer = Bin, + bare_and_footer_body_pos = 0}; +msg_body_encoded([{{pos, Pos}, body}], BarePos, Msg) + when is_integer(BarePos) -> + Msg#msg_body_encoded{bare_and_footer_body_pos = Pos - BarePos}. + +%% We extract the binary part of the payload exactly once when the bare message starts. +binary_part_bare_and_footer(Payload, Start) -> + binary_part(Payload, Start, byte_size(Payload) - Start). key_find(K, [{{_, K}, {_, V}} | _]) -> V; @@ -393,6 +491,14 @@ key_find(K, [_ | Rem]) -> key_find(_K, []) -> undefined. +-spec first_acquirer(mc:annotations()) -> boolean(). +first_acquirer(Anns) -> + Redelivered = case Anns of + #{redelivered := R} -> R; + _ -> false + end, + not Redelivered. + recover_deaths([], Acc) -> Acc; recover_deaths([{map, Kvs} | Rem], Acc) -> @@ -415,7 +521,7 @@ recover_deaths([{map, Kvs} | Rem], Acc) -> count = key_find(<<"count">>, Kvs), routing_keys = RKeys}}). -essential_properties(#msg{message_annotations = MA} = Msg) -> +essential_properties(#msg_body_encoded{message_annotations = MA} = Msg) -> Durable = get_property(durable, Msg), Priority = get_property(priority, Msg), Timestamp = get_property(timestamp, Msg), diff --git a/deps/rabbit/src/mc_amqpl.erl b/deps/rabbit/src/mc_amqpl.erl index 8251660a1f..09431aa848 100644 --- a/deps/rabbit/src/mc_amqpl.erl +++ b/deps/rabbit/src/mc_amqpl.erl @@ -391,7 +391,9 @@ convert_to(mc_amqp, #content{payload_fragments_rev = Payload} = Content, Env) -> convert_to(_TargetProto, _Content, _Env) -> not_implemented. -protocol_state(#content{properties = #'P_basic'{headers = H00} = B0} = C, +protocol_state(#content{properties = #'P_basic'{headers = H00, + priority = Priority0, + delivery_mode = DeliveryMode0} = B0} = C, Anns) -> %% Add any x- annotations as headers H0 = case H00 of @@ -434,16 +436,38 @@ protocol_state(#content{properties = #'P_basic'{headers = H00} = B0} = C, %% publishes undefined; #{ttl := Ttl} -> - %% not sure this will ever happen - %% as we only ever unset the expiry integer_to_binary(Ttl); _ -> B0#'P_basic'.expiration end, - - B = B0#'P_basic'{timestamp = Timestamp, + Priority = case Priority0 of + undefined -> + case Anns of + %% This branch is hit when a message with priority was originally + %% published with AMQP to a classic or quorum queue. + #{?ANN_PRIORITY := P} -> P; + _ -> undefined + end; + _ -> + Priority0 + end, + DeliveryMode = case DeliveryMode0 of + undefined -> + %% This branch is hit when a message was originally + %% published with AMQP to a classic or quorum queue. + case Anns of + #{?ANN_DURABLE := true} -> 2; + #{?ANN_DURABLE := false} -> 1; + _ -> 2 + end; + _ -> + DeliveryMode0 + end, + B = B0#'P_basic'{headers = Headers, + delivery_mode = DeliveryMode, + priority = Priority, expiration = Expiration, - headers = Headers}, + timestamp = Timestamp}, C#content{properties = B, properties_bin = none}; diff --git a/deps/rabbit/src/rabbit_amqp_reader.erl b/deps/rabbit/src/rabbit_amqp_reader.erl index edb9adad44..15392aeb38 100644 --- a/deps/rabbit/src/rabbit_amqp_reader.erl +++ b/deps/rabbit/src/rabbit_amqp_reader.erl @@ -364,6 +364,9 @@ handle_frame0(Mode, Channel, Body, State) -> %% "The frame body is defined as a performative followed by an opaque payload." [2.3.2] parse_frame_body(Body, _Channel) -> + %% TODO test binary_part() here instead of returning binaries in amqp10_binary_parser:parse/1. + %% The latter can return the number of bytes parsed instead. + %% This should remove all warnings from the parser when compiliation option bin_opt_info is set. {DescribedPerformative, Payload} = amqp10_binary_parser:parse(Body), Performative = amqp10_framing:decode(DescribedPerformative), ?DEBUG("~s Channel ~tp ->~n~tp~n~ts~n", diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index c6c9a426ae..dcfa8ddcf0 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -1622,11 +1622,10 @@ handle_deliver(ConsumerTag, AckRequired, settled = SendSettled}, Mc1 = mc:convert(mc_amqp, Mc0), Mc = mc:set_annotation(redelivered, Redelivered, Mc1), - Sections0 = mc:protocol_state(Mc), - Sections = mc_amqp:serialize(Sections0), - ?DEBUG("~s Outbound payload:~n ~tp~n", - [?MODULE, [amqp10_framing:pprint(Section) || - Section <- amqp10_framing:decode_bin(iolist_to_binary(Sections))]]), + Sections = mc:protocol_state(Mc), + % ?DEBUG("~s Outbound payload:~n ~tp~n", + % [?MODULE, [amqp10_framing:pprint(Section) || + % Section <- amqp10_framing:decode_bin(iolist_to_binary(Sections))]]), validate_message_size(Sections, MaxMessageSize), Frames = transfer_frames(Transfer, Sections, MaxFrameSize), messages_delivered(Redelivered, QType), @@ -1876,7 +1875,6 @@ incoming_link_transfer( end, validate_transfer_rcv_settle_mode(RcvSettleMode, Settled), validate_incoming_message_size(PayloadBin), - % Sections = amqp10_framing:decode_bin(PayloadBin), % ?DEBUG("~s Inbound payload:~n ~tp", % [?MODULE, [amqp10_framing:pprint(Section) || Section <- Sections]]), diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 65a6c743f3..c317806174 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -553,8 +553,7 @@ deliver0(MsgId, Msg, stream_message(Msg, FilteringSupported) -> McAmqp = mc:convert(mc_amqp, Msg), - Sections = mc:protocol_state(McAmqp), - MsgData = mc_amqp:serialize(Sections), + MsgData = mc:protocol_state(McAmqp), case FilteringSupported of true -> case mc:x_header(<<"x-stream-filter-value">>, McAmqp) of diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index 333b3b27b9..1381e63819 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -685,18 +685,33 @@ amqp10_to_amqp091_header_conversion(Session,Ch, QName, Address) -> "x-int" => 3, "x-bool" => true}, OutMsg2), - ok = amqp10_client:send_msg(Sender, OutMsg3), + OutMsg = amqp10_msg:set_headers( + #{durable => true, + priority => 7, + ttl => 88000}, + OutMsg3), + ok = amqp10_client:send_msg(Sender, OutMsg), ok = wait_for_accepts(1), - {ok, Headers} = amqp091_get_msg_headers(Ch, QName), + {#'basic.get_ok'{}, + #amqp_msg{props = #'P_basic'{headers = Headers, + delivery_mode = DeliveryMode, + priority = Priority, + expiration = Expiration}} + } = amqp_channel:call(Ch, #'basic.get'{queue = QName, no_ack = true}), + %% assert application properties ?assertEqual({longstr, <<"string-val">>}, rabbit_misc:table_lookup(Headers, <<"string">>)), ?assertEqual({unsignedint, 2}, rabbit_misc:table_lookup(Headers, <<"int">>)), ?assertEqual({bool, false}, rabbit_misc:table_lookup(Headers, <<"bool">>)), - + %% assert message annotations ?assertEqual({longstr, <<"string-value">>}, rabbit_misc:table_lookup(Headers, <<"x-string">>)), ?assertEqual({unsignedint, 3}, rabbit_misc:table_lookup(Headers, <<"x-int">>)), - ?assertEqual({bool, true}, rabbit_misc:table_lookup(Headers, <<"x-bool">>)). + ?assertEqual({bool, true}, rabbit_misc:table_lookup(Headers, <<"x-bool">>)), + %% assert headers + ?assertEqual(2, DeliveryMode), + ?assertEqual(7, Priority), + ?assertEqual(<<"88000">>, Expiration). amqp091_to_amqp10_header_conversion(Session, Ch, QName, Address) -> Amqp091Headers = [{<<"x-forwarding">>, array, @@ -3451,11 +3466,6 @@ delete_queue(Session, QName) -> {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair). -amqp091_get_msg_headers(Channel, QName) -> - {#'basic.get_ok'{}, #amqp_msg{props = #'P_basic'{ headers= Headers}}} - = amqp_channel:call(Channel, #'basic.get'{queue = QName, no_ack = true}), - {ok, Headers}. - create_amqp10_sender(Session, Address) -> {ok, Sender} = amqp10_client:attach_sender_link( Session, <<"test-sender">>, Address),