diff --git a/deps/amqp10_common/src/amqp10_binary_generator.erl b/deps/amqp10_common/src/amqp10_binary_generator.erl index c23a40f856..b628fcaaa1 100644 --- a/deps/amqp10_common/src/amqp10_binary_generator.erl +++ b/deps/amqp10_common/src/amqp10_binary_generator.erl @@ -177,8 +177,8 @@ generate1({array, Type, List}) -> [16#e0, S + 1, Count, Array] end; -generate1({as_is, TypeCode, Bin}) -> - <>. +generate1({as_is, TypeCode, Bin}) when is_binary(Bin) -> + [TypeCode, Bin]. constructor(symbol) -> 16#b3; constructor(ubyte) -> 16#50; diff --git a/deps/amqp10_common/src/amqp10_binary_parser.erl b/deps/amqp10_common/src/amqp10_binary_parser.erl index c8e07513db..13f616ff57 100644 --- a/deps/amqp10_common/src/amqp10_binary_parser.erl +++ b/deps/amqp10_common/src/amqp10_binary_parser.erl @@ -101,17 +101,17 @@ parse(<<16#e0, S:8,CountAndV:S/binary,_/binary>>, B) -> parse(<<16#f0, S:32,CountAndV:S/binary,_/binary>>, B) -> {parse_array(32, CountAndV), B+5+S}; %% NaN or +-inf -parse(<<16#72, V:32, _/binary>>, B) -> - {{as_is, 16#72, <>}, B+5}; -parse(<<16#82, V:64, _/binary>>, B) -> - {{as_is, 16#82, <>}, B+9}; +parse(<<16#72, V:4/binary, _/binary>>, B) -> + {{as_is, 16#72, V}, B+5}; +parse(<<16#82, V:8/binary, _/binary>>, B) -> + {{as_is, 16#82, V}, B+9}; %% decimals -parse(<<16#74, V:32, _/binary>>, B) -> - {{as_is, 16#74, <>}, B+5}; -parse(<<16#84, V:64, _/binary>>, B) -> - {{as_is, 16#84, <>}, B+9}; -parse(<<16#94, V:128, _/binary>>, B) -> - {{as_is, 16#94, <>}, B+17}; +parse(<<16#74, V:4/binary, _/binary>>, B) -> + {{as_is, 16#74, V}, B+5}; +parse(<<16#84, V:8/binary, _/binary>>, B) -> + {{as_is, 16#84, V}, B+9}; +parse(<<16#94, V:16/binary, _/binary>>, B) -> + {{as_is, 16#94, V}, B+17}; parse(<>, B) -> throw({primitive_type_unsupported, Type, {position, B}}). @@ -317,17 +317,17 @@ pm(<<16#e0, S:8,CountAndV:S/binary,R/binary>>, O, B) -> pm(<<16#f0, S:32,CountAndV:S/binary,R/binary>>, O, B) -> [parse_array(32, CountAndV) | pm(R, O, B+5+S)]; %% NaN or +-inf -pm(<<16#72, V:32, R/binary>>, O, B) -> - [{as_is, 16#72, <>} | pm(R, O, B+5)]; -pm(<<16#82, V:64, R/binary>>, O, B) -> - [{as_is, 16#82, <>} | pm(R, O, B+9)]; +pm(<<16#72, V:4/binary, R/binary>>, O, B) -> + [{as_is, 16#72, V} | pm(R, O, B+5)]; +pm(<<16#82, V:8/binary, R/binary>>, O, B) -> + [{as_is, 16#82, V} | pm(R, O, B+9)]; %% decimals -pm(<<16#74, V:32, R/binary>>, O, B) -> - [{as_is, 16#74, <>} | pm(R, O, B+5)]; -pm(<<16#84, V:64, R/binary>>, O, B) -> - [{as_is, 16#84, <>} | pm(R, O, B+9)]; -pm(<<16#94, V:128, R/binary>>, O, B) -> - [{as_is, 16#94, <>} | pm(R, O, B+17)]; +pm(<<16#74, V:4/binary, R/binary>>, O, B) -> + [{as_is, 16#74, V} | pm(R, O, B+5)]; +pm(<<16#84, V:8/binary, R/binary>>, O, B) -> + [{as_is, 16#84, V} | pm(R, O, B+9)]; +pm(<<16#94, V:16/binary, R/binary>>, O, B) -> + [{as_is, 16#94, V} | pm(R, O, B+17)]; pm(<>, _O, B) -> throw({primitive_type_unsupported, Type, {position, B}}). diff --git a/deps/amqp10_common/test/binary_generator_SUITE.erl b/deps/amqp10_common/test/binary_generator_SUITE.erl index ac63d1b7a6..ef50660d95 100644 --- a/deps/amqp10_common/test/binary_generator_SUITE.erl +++ b/deps/amqp10_common/test/binary_generator_SUITE.erl @@ -99,12 +99,34 @@ numerals(_Config) -> roundtrip({long, 0}), roundtrip({long, 16#7FFFFFFFFFFFFFFF}), roundtrip({long, -16#8000000000000000}), + roundtrip({float, 0.0}), roundtrip({float, 1.0}), roundtrip({float, -1.0}), roundtrip({double, 0.0}), roundtrip({double, 1.0}), roundtrip({double, -1.0}), + + %% float +Inf + roundtrip({as_is, 16#72, <<16#7F, 16#80, 16#00, 16#00>>}), + %% double +Inf + roundtrip({as_is, 16#82, <<16#7F, 16#F0, 16#00, 16#00, + 16#00, 16#00, 16#00, 16#00>>}), + + %% decimal32 + roundtrip({as_is, 16#74, <<16#22, 16#50, 16#00, 16#00>>}), % 0 + roundtrip({as_is, 16#74, <<16#22, 16#50, 16#00, 16#2A>>}), % 42 + roundtrip({as_is, 16#74, <<16#A2, 16#40, 16#00, 16#48>>}), % -123.45 + roundtrip({as_is, 16#74, <<16#78, 16#00, 16#00, 16#00>>}), % +Infinity + roundtrip({as_is, 16#74, <<16#7C, 16#00, 16#00, 16#00>>}), % NaN + %% decimal64 + roundtrip({as_is, 16#84, <<16#22, 16#34, 16#00, 16#00, + 16#00, 16#00, 16#00, 16#00>>}), % 0 + %% decimal128 + roundtrip({as_is, 16#94, <<16#22, 16#08, 16#00, 16#00, + 16#00, 16#00, 16#00, 16#00, + 16#00, 16#00, 16#00, 16#00, + 16#00, 16#00, 16#00, 16#00>>}), % 0 ok. utf8(_Config) -> diff --git a/deps/rabbit/src/mc.erl b/deps/rabbit/src/mc.erl index 8d753bfae7..828f6f6ac3 100644 --- a/deps/rabbit/src/mc.erl +++ b/deps/rabbit/src/mc.erl @@ -89,6 +89,7 @@ {list, [tagged_value()]} | {map, [{tagged_value(), tagged_value()}]} | {array, atom(), [tagged_value()]} | + {as_is, TypeCode :: non_neg_integer(), binary()} | null | undefined. diff --git a/deps/rabbit/src/mc_amqpl.erl b/deps/rabbit/src/mc_amqpl.erl index 37602df7fe..d1c7ea8a12 100644 --- a/deps/rabbit/src/mc_amqpl.erl +++ b/deps/rabbit/src/mc_amqpl.erl @@ -152,8 +152,14 @@ convert_from(mc_amqp, Sections, Env) -> Type0 end, - Headers0 = [to_091(K, V) || {{utf8, K}, V} <- AP, - ?IS_SHORTSTR_LEN(K)], + Headers0 = lists:filtermap(fun({_K, {as_is, _, _}}) -> + false; + ({{utf8, K}, V}) + when ?IS_SHORTSTR_LEN(K) -> + {true, to_091(K, V)}; + (_) -> + false + end, AP), %% Add remaining x- message annotations as headers XHeaders = lists:filtermap(fun({{symbol, <<"x-cc">>}, V}) -> {true, to_091(<<"CC">>, V)}; @@ -161,6 +167,8 @@ convert_from(mc_amqp, Sections, Env) -> {true, {<<"timestamp_in_ms">>, long, Ts}}; ({{symbol, <<"x-opt-deaths">>}, V}) -> convert_from_amqp_deaths(V); + ({_K, {as_is, _, _}}) -> + false; ({{symbol, <<"x-", _/binary>> = K}, V}) when ?IS_SHORTSTR_LEN(K) -> case is_internal_header(K) of @@ -766,12 +774,23 @@ to_091(Key, null) -> {Key, void, undefined}; to_091(Key, {list, L}) -> to_091_array(Key, L); to_091(Key, {map, M}) -> - {Key, table, [to_091(unwrap(K), V) || {K, V} <- M]}; + T = lists:filtermap(fun({K, V}) when element(1, K) =:= as_is orelse + element(1, V) =:= as_is -> + false; + ({K, V}) -> + {true, to_091(unwrap(K), V)} + end, M), + {Key, table, T}; to_091(Key, {array, _T, L}) -> to_091_array(Key, L). to_091_array(Key, L) -> - {Key, array, [to_091(V) || V <- L]}. + A = lists:filtermap(fun({as_is, _, _}) -> + false; + (V) -> + {true, to_091(V)} + end, L), + {Key, array, A}. to_091({utf8, V}) -> {longstr, V}; to_091({symbol, V}) -> {longstr, V}; diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index d6f36adc4e..99b1ab6490 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -175,7 +175,8 @@ groups() -> x_cc_annotation_exchange_routing_key_empty, x_cc_annotation_queue, x_cc_annotation_null, - bad_x_cc_annotation_exchange + bad_x_cc_annotation_exchange, + decimal_types ]}, {cluster_size_3, [shuffle], @@ -6685,6 +6686,69 @@ bad_x_cc_annotation_exchange(Config) -> ok = end_session_sync(Session), ok = close_connection_sync(Connection). +%% Test that RabbitMQ can store and forward AMQP decimal types. +decimal_types(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(QName), + {_, Session, LinkPair} = Init = init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue( + LinkPair, QName, + #{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}}}), + {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address), + ok = wait_for_credit(Sender), + + Decimal32Zero = <<16#22, 16#50, 0, 0>>, + Decimal64Zero = <<16#22, 16#34, 0, 0, 0, 0, 0, 0>>, + Decimal128Zero = <<16#22, 16#08, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0>>, + Decimal3242 = <<16#22, 16#50, 16#00, 16#2A>>, % 42 + Decimal32NaN = <<16#7C, 0, 0, 0>>, + Body = #'v1_0.amqp_value'{content = {list, [{as_is, 16#74, Decimal32Zero}, + {as_is, 16#84, Decimal64Zero}, + {as_is, 16#94, Decimal128Zero}]}}, + MsgAnns = #{<<"x-decimal-32">> => {as_is, 16#74, Decimal3242}, + <<"x-decimal-64">> => {as_is, 16#84, Decimal64Zero}, + <<"x-decimal-128">> => {as_is, 16#94, Decimal128Zero}, + <<"x-list">> => {list, [{as_is, 16#94, Decimal128Zero}]}, + <<"x-map">> => {map, [{{utf8, <<"key-1">>}, + {as_is, 16#94, Decimal128Zero}}]}}, + AppProps = #{<<"decimal-32">> => {as_is, 16#74, Decimal32NaN}}, + Msg0 = amqp10_msg:set_message_annotations( + MsgAnns, + amqp10_msg:set_application_properties( + AppProps, + amqp10_msg:new(<<"tag">>, Body))), + ok = amqp10_client:send_msg(Sender, Msg0), + ok = wait_for_accepted(<<"tag">>), + ok = amqp10_client:send_msg(Sender, Msg0), + ok = wait_for_accepted(<<"tag">>), + ok = detach_link_sync(Sender), + + %% Consume the first message via AMQP 1.0 + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"receiver">>, Address, unsettled), + {ok, Msg} = amqp10_client:get_msg(Receiver), + ?assertEqual(Body, amqp10_msg:body(Msg)), + ?assertMatch(#{<<"x-decimal-32">> := {as_is, 16#74, Decimal3242}, + <<"x-decimal-64">> := {as_is, 16#84, Decimal64Zero}, + <<"x-decimal-128">> := {as_is, 16#94, Decimal128Zero}, + <<"x-list">> := [{as_is, 16#94, Decimal128Zero}], + <<"x-map">> := [{{utf8, <<"key-1">>}, + {as_is, 16#94, Decimal128Zero}}]}, + amqp10_msg:message_annotations(Msg)), + ?assertEqual(AppProps, amqp10_msg:application_properties(Msg)), + ok = amqp10_client:accept_msg(Receiver, Msg), + ok = detach_link_sync(Receiver), + + %% Consume the second message via AMQP 0.9.1 + %% We expect to receive the message without any crashes. + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{}}, + amqp_channel:call(Ch, #'basic.get'{queue = QName, no_ack = true})), + ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch), + + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = close(Init). + %% Attach a receiver to an unavailable quorum queue. attach_to_down_quorum_queue(Config) -> QName = <<"q-down">>,