Merge pull request #14209 from rabbitmq/amqp-decimal
Trigger a 4.2.x alpha release build / trigger_alpha_build (push) Waiting to run Details
Test (make) / Build and Xref (1.18, 26) (push) Waiting to run Details
Test (make) / Build and Xref (1.18, 27) (push) Waiting to run Details
Test (make) / Build and Xref (1.18, 28) (push) Waiting to run Details
Test (make) / Test (1.18, 28, khepri) (push) Waiting to run Details
Test (make) / Test (1.18, 28, mnesia) (push) Waiting to run Details
Test (make) / Test mixed clusters (1.18, 28, khepri) (push) Waiting to run Details
Test (make) / Test mixed clusters (1.18, 28, mnesia) (push) Waiting to run Details
Test (make) / Type check (1.18, 28) (push) Waiting to run Details

Fix AMQP crashes for approximate numbers
This commit is contained in:
David Ansari 2025-07-11 10:21:54 +02:00 committed by GitHub
commit 250be2c6fb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 133 additions and 27 deletions

View File

@ -177,8 +177,8 @@ generate1({array, Type, List}) ->
[16#e0, S + 1, Count, Array]
end;
generate1({as_is, TypeCode, Bin}) ->
<<TypeCode, Bin>>.
generate1({as_is, TypeCode, Bin}) when is_binary(Bin) ->
[TypeCode, Bin].
constructor(symbol) -> 16#b3;
constructor(ubyte) -> 16#50;

View File

@ -101,17 +101,17 @@ parse(<<16#e0, S:8,CountAndV:S/binary,_/binary>>, B) ->
parse(<<16#f0, S:32,CountAndV:S/binary,_/binary>>, B) ->
{parse_array(32, CountAndV), B+5+S};
%% NaN or +-inf
parse(<<16#72, V:32, _/binary>>, B) ->
{{as_is, 16#72, <<V:32>>}, B+5};
parse(<<16#82, V:64, _/binary>>, B) ->
{{as_is, 16#82, <<V:64>>}, B+9};
parse(<<16#72, V:4/binary, _/binary>>, B) ->
{{as_is, 16#72, V}, B+5};
parse(<<16#82, V:8/binary, _/binary>>, B) ->
{{as_is, 16#82, V}, B+9};
%% decimals
parse(<<16#74, V:32, _/binary>>, B) ->
{{as_is, 16#74, <<V:32>>}, B+5};
parse(<<16#84, V:64, _/binary>>, B) ->
{{as_is, 16#84, <<V:64>>}, B+9};
parse(<<16#94, V:128, _/binary>>, B) ->
{{as_is, 16#94, <<V:128>>}, B+17};
parse(<<16#74, V:4/binary, _/binary>>, B) ->
{{as_is, 16#74, V}, B+5};
parse(<<16#84, V:8/binary, _/binary>>, B) ->
{{as_is, 16#84, V}, B+9};
parse(<<16#94, V:16/binary, _/binary>>, B) ->
{{as_is, 16#94, V}, B+17};
parse(<<Type, _/binary>>, B) ->
throw({primitive_type_unsupported, Type, {position, B}}).
@ -317,17 +317,17 @@ pm(<<16#e0, S:8,CountAndV:S/binary,R/binary>>, O, B) ->
pm(<<16#f0, S:32,CountAndV:S/binary,R/binary>>, O, B) ->
[parse_array(32, CountAndV) | pm(R, O, B+5+S)];
%% NaN or +-inf
pm(<<16#72, V:32, R/binary>>, O, B) ->
[{as_is, 16#72, <<V:32>>} | pm(R, O, B+5)];
pm(<<16#82, V:64, R/binary>>, O, B) ->
[{as_is, 16#82, <<V:64>>} | pm(R, O, B+9)];
pm(<<16#72, V:4/binary, R/binary>>, O, B) ->
[{as_is, 16#72, V} | pm(R, O, B+5)];
pm(<<16#82, V:8/binary, R/binary>>, O, B) ->
[{as_is, 16#82, V} | pm(R, O, B+9)];
%% decimals
pm(<<16#74, V:32, R/binary>>, O, B) ->
[{as_is, 16#74, <<V:32>>} | pm(R, O, B+5)];
pm(<<16#84, V:64, R/binary>>, O, B) ->
[{as_is, 16#84, <<V:64>>} | pm(R, O, B+9)];
pm(<<16#94, V:128, R/binary>>, O, B) ->
[{as_is, 16#94, <<V:128>>} | pm(R, O, B+17)];
pm(<<16#74, V:4/binary, R/binary>>, O, B) ->
[{as_is, 16#74, V} | pm(R, O, B+5)];
pm(<<16#84, V:8/binary, R/binary>>, O, B) ->
[{as_is, 16#84, V} | pm(R, O, B+9)];
pm(<<16#94, V:16/binary, R/binary>>, O, B) ->
[{as_is, 16#94, V} | pm(R, O, B+17)];
pm(<<Type, _Bin/binary>>, _O, B) ->
throw({primitive_type_unsupported, Type, {position, B}}).

View File

@ -99,12 +99,34 @@ numerals(_Config) ->
roundtrip({long, 0}),
roundtrip({long, 16#7FFFFFFFFFFFFFFF}),
roundtrip({long, -16#8000000000000000}),
roundtrip({float, 0.0}),
roundtrip({float, 1.0}),
roundtrip({float, -1.0}),
roundtrip({double, 0.0}),
roundtrip({double, 1.0}),
roundtrip({double, -1.0}),
%% float +Inf
roundtrip({as_is, 16#72, <<16#7F, 16#80, 16#00, 16#00>>}),
%% double +Inf
roundtrip({as_is, 16#82, <<16#7F, 16#F0, 16#00, 16#00,
16#00, 16#00, 16#00, 16#00>>}),
%% decimal32
roundtrip({as_is, 16#74, <<16#22, 16#50, 16#00, 16#00>>}), % 0
roundtrip({as_is, 16#74, <<16#22, 16#50, 16#00, 16#2A>>}), % 42
roundtrip({as_is, 16#74, <<16#A2, 16#40, 16#00, 16#48>>}), % -123.45
roundtrip({as_is, 16#74, <<16#78, 16#00, 16#00, 16#00>>}), % +Infinity
roundtrip({as_is, 16#74, <<16#7C, 16#00, 16#00, 16#00>>}), % NaN
%% decimal64
roundtrip({as_is, 16#84, <<16#22, 16#34, 16#00, 16#00,
16#00, 16#00, 16#00, 16#00>>}), % 0
%% decimal128
roundtrip({as_is, 16#94, <<16#22, 16#08, 16#00, 16#00,
16#00, 16#00, 16#00, 16#00,
16#00, 16#00, 16#00, 16#00,
16#00, 16#00, 16#00, 16#00>>}), % 0
ok.
utf8(_Config) ->

View File

@ -89,6 +89,7 @@
{list, [tagged_value()]} |
{map, [{tagged_value(), tagged_value()}]} |
{array, atom(), [tagged_value()]} |
{as_is, TypeCode :: non_neg_integer(), binary()} |
null |
undefined.

View File

@ -152,8 +152,14 @@ convert_from(mc_amqp, Sections, Env) ->
Type0
end,
Headers0 = [to_091(K, V) || {{utf8, K}, V} <- AP,
?IS_SHORTSTR_LEN(K)],
Headers0 = lists:filtermap(fun({_K, {as_is, _, _}}) ->
false;
({{utf8, K}, V})
when ?IS_SHORTSTR_LEN(K) ->
{true, to_091(K, V)};
(_) ->
false
end, AP),
%% Add remaining x- message annotations as headers
XHeaders = lists:filtermap(fun({{symbol, <<"x-cc">>}, V}) ->
{true, to_091(<<"CC">>, V)};
@ -161,6 +167,8 @@ convert_from(mc_amqp, Sections, Env) ->
{true, {<<"timestamp_in_ms">>, long, Ts}};
({{symbol, <<"x-opt-deaths">>}, V}) ->
convert_from_amqp_deaths(V);
({_K, {as_is, _, _}}) ->
false;
({{symbol, <<"x-", _/binary>> = K}, V})
when ?IS_SHORTSTR_LEN(K) ->
case is_internal_header(K) of
@ -766,12 +774,23 @@ to_091(Key, null) -> {Key, void, undefined};
to_091(Key, {list, L}) ->
to_091_array(Key, L);
to_091(Key, {map, M}) ->
{Key, table, [to_091(unwrap(K), V) || {K, V} <- M]};
T = lists:filtermap(fun({K, V}) when element(1, K) =:= as_is orelse
element(1, V) =:= as_is ->
false;
({K, V}) ->
{true, to_091(unwrap(K), V)}
end, M),
{Key, table, T};
to_091(Key, {array, _T, L}) ->
to_091_array(Key, L).
to_091_array(Key, L) ->
{Key, array, [to_091(V) || V <- L]}.
A = lists:filtermap(fun({as_is, _, _}) ->
false;
(V) ->
{true, to_091(V)}
end, L),
{Key, array, A}.
to_091({utf8, V}) -> {longstr, V};
to_091({symbol, V}) -> {longstr, V};

View File

@ -175,7 +175,8 @@ groups() ->
x_cc_annotation_exchange_routing_key_empty,
x_cc_annotation_queue,
x_cc_annotation_null,
bad_x_cc_annotation_exchange
bad_x_cc_annotation_exchange,
decimal_types
]},
{cluster_size_3, [shuffle],
@ -6685,6 +6686,69 @@ bad_x_cc_annotation_exchange(Config) ->
ok = end_session_sync(Session),
ok = close_connection_sync(Connection).
%% Test that RabbitMQ can store and forward AMQP decimal types.
decimal_types(Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
Address = rabbitmq_amqp_address:queue(QName),
{_, Session, LinkPair} = Init = init(Config),
{ok, _} = rabbitmq_amqp_client:declare_queue(
LinkPair, QName,
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}}}),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address),
ok = wait_for_credit(Sender),
Decimal32Zero = <<16#22, 16#50, 0, 0>>,
Decimal64Zero = <<16#22, 16#34, 0, 0, 0, 0, 0, 0>>,
Decimal128Zero = <<16#22, 16#08, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0>>,
Decimal3242 = <<16#22, 16#50, 16#00, 16#2A>>, % 42
Decimal32NaN = <<16#7C, 0, 0, 0>>,
Body = #'v1_0.amqp_value'{content = {list, [{as_is, 16#74, Decimal32Zero},
{as_is, 16#84, Decimal64Zero},
{as_is, 16#94, Decimal128Zero}]}},
MsgAnns = #{<<"x-decimal-32">> => {as_is, 16#74, Decimal3242},
<<"x-decimal-64">> => {as_is, 16#84, Decimal64Zero},
<<"x-decimal-128">> => {as_is, 16#94, Decimal128Zero},
<<"x-list">> => {list, [{as_is, 16#94, Decimal128Zero}]},
<<"x-map">> => {map, [{{utf8, <<"key-1">>},
{as_is, 16#94, Decimal128Zero}}]}},
AppProps = #{<<"decimal-32">> => {as_is, 16#74, Decimal32NaN}},
Msg0 = amqp10_msg:set_message_annotations(
MsgAnns,
amqp10_msg:set_application_properties(
AppProps,
amqp10_msg:new(<<"tag">>, Body))),
ok = amqp10_client:send_msg(Sender, Msg0),
ok = wait_for_accepted(<<"tag">>),
ok = amqp10_client:send_msg(Sender, Msg0),
ok = wait_for_accepted(<<"tag">>),
ok = detach_link_sync(Sender),
%% Consume the first message via AMQP 1.0
{ok, Receiver} = amqp10_client:attach_receiver_link(
Session, <<"receiver">>, Address, unsettled),
{ok, Msg} = amqp10_client:get_msg(Receiver),
?assertEqual(Body, amqp10_msg:body(Msg)),
?assertMatch(#{<<"x-decimal-32">> := {as_is, 16#74, Decimal3242},
<<"x-decimal-64">> := {as_is, 16#84, Decimal64Zero},
<<"x-decimal-128">> := {as_is, 16#94, Decimal128Zero},
<<"x-list">> := [{as_is, 16#94, Decimal128Zero}],
<<"x-map">> := [{{utf8, <<"key-1">>},
{as_is, 16#94, Decimal128Zero}}]},
amqp10_msg:message_annotations(Msg)),
?assertEqual(AppProps, amqp10_msg:application_properties(Msg)),
ok = amqp10_client:accept_msg(Receiver, Msg),
ok = detach_link_sync(Receiver),
%% Consume the second message via AMQP 0.9.1
%% We expect to receive the message without any crashes.
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
?assertMatch({#'basic.get_ok'{}, #amqp_msg{}},
amqp_channel:call(Ch, #'basic.get'{queue = QName, no_ack = true})),
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
ok = close(Init).
%% Attach a receiver to an unavailable quorum queue.
attach_to_down_quorum_queue(Config) ->
QName = <<"q-down">>,