Fix AMQP crashes for approximate numbers
This commit fixes several crashes:
1. Serialising IEEE 754-2008 decimals as well as
NaN and +-Inf for float and doubles crashed
2. Converting IEEE 754-2008 decimals as well as NaN and +-Inf
for float and dobules from amqp to amqpl crashed
The 2nd crash looks as follows:
```
exception exit: {function_clause,
[{mc_amqpl,to_091,
[<<"decimal-32">>,{as_is,116,<<124,0,0,0>>}],
[{file,"mc_amqpl.erl"},{line,747}]},
{mc_amqpl,'-convert_from/3-lc$^2/1-2-',1,
[{file,"mc_amqpl.erl"},{line,155}]},
{mc_amqpl,convert_from,3,
[{file,"mc_amqpl.erl"},{line,155}]},
{mc,convert,3,[{file,"mc.erl"},{line,358}]},
{rabbit_channel,outgoing_content,2,
[{file,"rabbit_channel.erl"},{line,2649}]},
{rabbit_channel,handle_basic_get,7,
[{file,"rabbit_channel.erl"},{line,2636}]},
{rabbit_channel,handle_cast,2,
[{file,"rabbit_channel.erl"},{line,617}]},
{gen_server2,handle_msg,2,
[{file,"gen_server2.erl"},{line,1056}]}]}
```
The 2nd crash is fixed by omitting any `{as_is, _TypeCode, _Binary}`
values during AMQP 1.0 -> AMQP 0.9.1 conversion.
This will be documented in the conversion table.
In addition to fixing these crashes, this commit adds tests that
RabbitMQ is able to store and forward IEEE 754-2008 decimals.
IEEE 754-2008 decimals can be parsed and serialsed by RabbitMQ.
However, RabbitMQ doesn't support interpreting this values. For example,
they can't be used on the headers exchange or for AMQP filter
expressions.
(cherry picked from commit 5c318c8e38
)
This commit is contained in:
parent
92a84549fc
commit
eeeae55caa
|
@ -177,8 +177,8 @@ generate1({array, Type, List}) ->
|
||||||
[16#e0, S + 1, Count, Array]
|
[16#e0, S + 1, Count, Array]
|
||||||
end;
|
end;
|
||||||
|
|
||||||
generate1({as_is, TypeCode, Bin}) ->
|
generate1({as_is, TypeCode, Bin}) when is_binary(Bin) ->
|
||||||
<<TypeCode, Bin>>.
|
[TypeCode, Bin].
|
||||||
|
|
||||||
constructor(symbol) -> 16#b3;
|
constructor(symbol) -> 16#b3;
|
||||||
constructor(ubyte) -> 16#50;
|
constructor(ubyte) -> 16#50;
|
||||||
|
|
|
@ -101,17 +101,17 @@ parse(<<16#e0, S:8,CountAndV:S/binary,_/binary>>, B) ->
|
||||||
parse(<<16#f0, S:32,CountAndV:S/binary,_/binary>>, B) ->
|
parse(<<16#f0, S:32,CountAndV:S/binary,_/binary>>, B) ->
|
||||||
{parse_array(32, CountAndV), B+5+S};
|
{parse_array(32, CountAndV), B+5+S};
|
||||||
%% NaN or +-inf
|
%% NaN or +-inf
|
||||||
parse(<<16#72, V:32, _/binary>>, B) ->
|
parse(<<16#72, V:4/binary, _/binary>>, B) ->
|
||||||
{{as_is, 16#72, <<V:32>>}, B+5};
|
{{as_is, 16#72, V}, B+5};
|
||||||
parse(<<16#82, V:64, _/binary>>, B) ->
|
parse(<<16#82, V:8/binary, _/binary>>, B) ->
|
||||||
{{as_is, 16#82, <<V:64>>}, B+9};
|
{{as_is, 16#82, V}, B+9};
|
||||||
%% decimals
|
%% decimals
|
||||||
parse(<<16#74, V:32, _/binary>>, B) ->
|
parse(<<16#74, V:4/binary, _/binary>>, B) ->
|
||||||
{{as_is, 16#74, <<V:32>>}, B+5};
|
{{as_is, 16#74, V}, B+5};
|
||||||
parse(<<16#84, V:64, _/binary>>, B) ->
|
parse(<<16#84, V:8/binary, _/binary>>, B) ->
|
||||||
{{as_is, 16#84, <<V:64>>}, B+9};
|
{{as_is, 16#84, V}, B+9};
|
||||||
parse(<<16#94, V:128, _/binary>>, B) ->
|
parse(<<16#94, V:16/binary, _/binary>>, B) ->
|
||||||
{{as_is, 16#94, <<V:128>>}, B+17};
|
{{as_is, 16#94, V}, B+17};
|
||||||
parse(<<Type, _/binary>>, B) ->
|
parse(<<Type, _/binary>>, B) ->
|
||||||
throw({primitive_type_unsupported, Type, {position, B}}).
|
throw({primitive_type_unsupported, Type, {position, B}}).
|
||||||
|
|
||||||
|
@ -317,17 +317,17 @@ pm(<<16#e0, S:8,CountAndV:S/binary,R/binary>>, O, B) ->
|
||||||
pm(<<16#f0, S:32,CountAndV:S/binary,R/binary>>, O, B) ->
|
pm(<<16#f0, S:32,CountAndV:S/binary,R/binary>>, O, B) ->
|
||||||
[parse_array(32, CountAndV) | pm(R, O, B+5+S)];
|
[parse_array(32, CountAndV) | pm(R, O, B+5+S)];
|
||||||
%% NaN or +-inf
|
%% NaN or +-inf
|
||||||
pm(<<16#72, V:32, R/binary>>, O, B) ->
|
pm(<<16#72, V:4/binary, R/binary>>, O, B) ->
|
||||||
[{as_is, 16#72, <<V:32>>} | pm(R, O, B+5)];
|
[{as_is, 16#72, V} | pm(R, O, B+5)];
|
||||||
pm(<<16#82, V:64, R/binary>>, O, B) ->
|
pm(<<16#82, V:8/binary, R/binary>>, O, B) ->
|
||||||
[{as_is, 16#82, <<V:64>>} | pm(R, O, B+9)];
|
[{as_is, 16#82, V} | pm(R, O, B+9)];
|
||||||
%% decimals
|
%% decimals
|
||||||
pm(<<16#74, V:32, R/binary>>, O, B) ->
|
pm(<<16#74, V:4/binary, R/binary>>, O, B) ->
|
||||||
[{as_is, 16#74, <<V:32>>} | pm(R, O, B+5)];
|
[{as_is, 16#74, V} | pm(R, O, B+5)];
|
||||||
pm(<<16#84, V:64, R/binary>>, O, B) ->
|
pm(<<16#84, V:8/binary, R/binary>>, O, B) ->
|
||||||
[{as_is, 16#84, <<V:64>>} | pm(R, O, B+9)];
|
[{as_is, 16#84, V} | pm(R, O, B+9)];
|
||||||
pm(<<16#94, V:128, R/binary>>, O, B) ->
|
pm(<<16#94, V:16/binary, R/binary>>, O, B) ->
|
||||||
[{as_is, 16#94, <<V:128>>} | pm(R, O, B+17)];
|
[{as_is, 16#94, V} | pm(R, O, B+17)];
|
||||||
pm(<<Type, _Bin/binary>>, _O, B) ->
|
pm(<<Type, _Bin/binary>>, _O, B) ->
|
||||||
throw({primitive_type_unsupported, Type, {position, B}}).
|
throw({primitive_type_unsupported, Type, {position, B}}).
|
||||||
|
|
||||||
|
|
|
@ -99,12 +99,34 @@ numerals(_Config) ->
|
||||||
roundtrip({long, 0}),
|
roundtrip({long, 0}),
|
||||||
roundtrip({long, 16#7FFFFFFFFFFFFFFF}),
|
roundtrip({long, 16#7FFFFFFFFFFFFFFF}),
|
||||||
roundtrip({long, -16#8000000000000000}),
|
roundtrip({long, -16#8000000000000000}),
|
||||||
|
|
||||||
roundtrip({float, 0.0}),
|
roundtrip({float, 0.0}),
|
||||||
roundtrip({float, 1.0}),
|
roundtrip({float, 1.0}),
|
||||||
roundtrip({float, -1.0}),
|
roundtrip({float, -1.0}),
|
||||||
roundtrip({double, 0.0}),
|
roundtrip({double, 0.0}),
|
||||||
roundtrip({double, 1.0}),
|
roundtrip({double, 1.0}),
|
||||||
roundtrip({double, -1.0}),
|
roundtrip({double, -1.0}),
|
||||||
|
|
||||||
|
%% float +Inf
|
||||||
|
roundtrip({as_is, 16#72, <<16#7F, 16#80, 16#00, 16#00>>}),
|
||||||
|
%% double +Inf
|
||||||
|
roundtrip({as_is, 16#82, <<16#7F, 16#F0, 16#00, 16#00,
|
||||||
|
16#00, 16#00, 16#00, 16#00>>}),
|
||||||
|
|
||||||
|
%% decimal32
|
||||||
|
roundtrip({as_is, 16#74, <<16#22, 16#50, 16#00, 16#00>>}), % 0
|
||||||
|
roundtrip({as_is, 16#74, <<16#22, 16#50, 16#00, 16#2A>>}), % 42
|
||||||
|
roundtrip({as_is, 16#74, <<16#A2, 16#40, 16#00, 16#48>>}), % -123.45
|
||||||
|
roundtrip({as_is, 16#74, <<16#78, 16#00, 16#00, 16#00>>}), % +Infinity
|
||||||
|
roundtrip({as_is, 16#74, <<16#7C, 16#00, 16#00, 16#00>>}), % NaN
|
||||||
|
%% decimal64
|
||||||
|
roundtrip({as_is, 16#84, <<16#22, 16#34, 16#00, 16#00,
|
||||||
|
16#00, 16#00, 16#00, 16#00>>}), % 0
|
||||||
|
%% decimal128
|
||||||
|
roundtrip({as_is, 16#94, <<16#22, 16#08, 16#00, 16#00,
|
||||||
|
16#00, 16#00, 16#00, 16#00,
|
||||||
|
16#00, 16#00, 16#00, 16#00,
|
||||||
|
16#00, 16#00, 16#00, 16#00>>}), % 0
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
utf8(_Config) ->
|
utf8(_Config) ->
|
||||||
|
|
|
@ -90,6 +90,7 @@
|
||||||
{list, [tagged_value()]} |
|
{list, [tagged_value()]} |
|
||||||
{map, [{tagged_value(), tagged_value()}]} |
|
{map, [{tagged_value(), tagged_value()}]} |
|
||||||
{array, atom(), [tagged_value()]} |
|
{array, atom(), [tagged_value()]} |
|
||||||
|
{as_is, TypeCode :: non_neg_integer(), binary()} |
|
||||||
null |
|
null |
|
||||||
undefined.
|
undefined.
|
||||||
|
|
||||||
|
|
|
@ -152,8 +152,14 @@ convert_from(mc_amqp, Sections, Env) ->
|
||||||
Type0
|
Type0
|
||||||
end,
|
end,
|
||||||
|
|
||||||
Headers0 = [to_091(K, V) || {{utf8, K}, V} <- AP,
|
Headers0 = lists:filtermap(fun({_K, {as_is, _, _}}) ->
|
||||||
?IS_SHORTSTR_LEN(K)],
|
false;
|
||||||
|
({{utf8, K}, V})
|
||||||
|
when ?IS_SHORTSTR_LEN(K) ->
|
||||||
|
{true, to_091(K, V)};
|
||||||
|
(_) ->
|
||||||
|
false
|
||||||
|
end, AP),
|
||||||
%% Add remaining x- message annotations as headers
|
%% Add remaining x- message annotations as headers
|
||||||
XHeaders = lists:filtermap(fun({{symbol, <<"x-cc">>}, V}) ->
|
XHeaders = lists:filtermap(fun({{symbol, <<"x-cc">>}, V}) ->
|
||||||
{true, to_091(<<"CC">>, V)};
|
{true, to_091(<<"CC">>, V)};
|
||||||
|
@ -161,6 +167,8 @@ convert_from(mc_amqp, Sections, Env) ->
|
||||||
{true, {<<"timestamp_in_ms">>, long, Ts}};
|
{true, {<<"timestamp_in_ms">>, long, Ts}};
|
||||||
({{symbol, <<"x-opt-deaths">>}, V}) ->
|
({{symbol, <<"x-opt-deaths">>}, V}) ->
|
||||||
convert_from_amqp_deaths(V);
|
convert_from_amqp_deaths(V);
|
||||||
|
({_K, {as_is, _, _}}) ->
|
||||||
|
false;
|
||||||
({{symbol, <<"x-", _/binary>> = K}, V})
|
({{symbol, <<"x-", _/binary>> = K}, V})
|
||||||
when ?IS_SHORTSTR_LEN(K) ->
|
when ?IS_SHORTSTR_LEN(K) ->
|
||||||
case is_internal_header(K) of
|
case is_internal_header(K) of
|
||||||
|
@ -766,12 +774,23 @@ to_091(Key, null) -> {Key, void, undefined};
|
||||||
to_091(Key, {list, L}) ->
|
to_091(Key, {list, L}) ->
|
||||||
to_091_array(Key, L);
|
to_091_array(Key, L);
|
||||||
to_091(Key, {map, M}) ->
|
to_091(Key, {map, M}) ->
|
||||||
{Key, table, [to_091(unwrap(K), V) || {K, V} <- M]};
|
T = lists:filtermap(fun({K, V}) when element(1, K) =:= as_is orelse
|
||||||
|
element(1, V) =:= as_is ->
|
||||||
|
false;
|
||||||
|
({K, V}) ->
|
||||||
|
{true, to_091(unwrap(K), V)}
|
||||||
|
end, M),
|
||||||
|
{Key, table, T};
|
||||||
to_091(Key, {array, _T, L}) ->
|
to_091(Key, {array, _T, L}) ->
|
||||||
to_091_array(Key, L).
|
to_091_array(Key, L).
|
||||||
|
|
||||||
to_091_array(Key, L) ->
|
to_091_array(Key, L) ->
|
||||||
{Key, array, [to_091(V) || V <- L]}.
|
A = lists:filtermap(fun({as_is, _, _}) ->
|
||||||
|
false;
|
||||||
|
(V) ->
|
||||||
|
{true, to_091(V)}
|
||||||
|
end, L),
|
||||||
|
{Key, array, A}.
|
||||||
|
|
||||||
to_091({utf8, V}) -> {longstr, V};
|
to_091({utf8, V}) -> {longstr, V};
|
||||||
to_091({symbol, V}) -> {longstr, V};
|
to_091({symbol, V}) -> {longstr, V};
|
||||||
|
|
|
@ -172,7 +172,8 @@ groups() ->
|
||||||
x_cc_annotation_exchange_routing_key_empty,
|
x_cc_annotation_exchange_routing_key_empty,
|
||||||
x_cc_annotation_queue,
|
x_cc_annotation_queue,
|
||||||
x_cc_annotation_null,
|
x_cc_annotation_null,
|
||||||
bad_x_cc_annotation_exchange
|
bad_x_cc_annotation_exchange,
|
||||||
|
decimal_types
|
||||||
]},
|
]},
|
||||||
|
|
||||||
{cluster_size_3, [shuffle],
|
{cluster_size_3, [shuffle],
|
||||||
|
@ -6589,6 +6590,69 @@ bad_x_cc_annotation_exchange(Config) ->
|
||||||
ok = end_session_sync(Session),
|
ok = end_session_sync(Session),
|
||||||
ok = close_connection_sync(Connection).
|
ok = close_connection_sync(Connection).
|
||||||
|
|
||||||
|
%% Test that RabbitMQ can store and forward AMQP decimal types.
|
||||||
|
decimal_types(Config) ->
|
||||||
|
QName = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
Address = rabbitmq_amqp_address:queue(QName),
|
||||||
|
{_, Session, LinkPair} = Init = init(Config),
|
||||||
|
{ok, _} = rabbitmq_amqp_client:declare_queue(
|
||||||
|
LinkPair, QName,
|
||||||
|
#{arguments => #{<<"x-queue-type">> => {utf8, <<"quorum">>}}}),
|
||||||
|
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address),
|
||||||
|
ok = wait_for_credit(Sender),
|
||||||
|
|
||||||
|
Decimal32Zero = <<16#22, 16#50, 0, 0>>,
|
||||||
|
Decimal64Zero = <<16#22, 16#34, 0, 0, 0, 0, 0, 0>>,
|
||||||
|
Decimal128Zero = <<16#22, 16#08, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0>>,
|
||||||
|
Decimal3242 = <<16#22, 16#50, 16#00, 16#2A>>, % 42
|
||||||
|
Decimal32NaN = <<16#7C, 0, 0, 0>>,
|
||||||
|
Body = #'v1_0.amqp_value'{content = {list, [{as_is, 16#74, Decimal32Zero},
|
||||||
|
{as_is, 16#84, Decimal64Zero},
|
||||||
|
{as_is, 16#94, Decimal128Zero}]}},
|
||||||
|
MsgAnns = #{<<"x-decimal-32">> => {as_is, 16#74, Decimal3242},
|
||||||
|
<<"x-decimal-64">> => {as_is, 16#84, Decimal64Zero},
|
||||||
|
<<"x-decimal-128">> => {as_is, 16#94, Decimal128Zero},
|
||||||
|
<<"x-list">> => {list, [{as_is, 16#94, Decimal128Zero}]},
|
||||||
|
<<"x-map">> => {map, [{{utf8, <<"key-1">>},
|
||||||
|
{as_is, 16#94, Decimal128Zero}}]}},
|
||||||
|
AppProps = #{<<"decimal-32">> => {as_is, 16#74, Decimal32NaN}},
|
||||||
|
Msg0 = amqp10_msg:set_message_annotations(
|
||||||
|
MsgAnns,
|
||||||
|
amqp10_msg:set_application_properties(
|
||||||
|
AppProps,
|
||||||
|
amqp10_msg:new(<<"tag">>, Body))),
|
||||||
|
ok = amqp10_client:send_msg(Sender, Msg0),
|
||||||
|
ok = wait_for_accepted(<<"tag">>),
|
||||||
|
ok = amqp10_client:send_msg(Sender, Msg0),
|
||||||
|
ok = wait_for_accepted(<<"tag">>),
|
||||||
|
ok = detach_link_sync(Sender),
|
||||||
|
|
||||||
|
%% Consume the first message via AMQP 1.0
|
||||||
|
{ok, Receiver} = amqp10_client:attach_receiver_link(
|
||||||
|
Session, <<"receiver">>, Address, unsettled),
|
||||||
|
{ok, Msg} = amqp10_client:get_msg(Receiver),
|
||||||
|
?assertEqual(Body, amqp10_msg:body(Msg)),
|
||||||
|
?assertMatch(#{<<"x-decimal-32">> := {as_is, 16#74, Decimal3242},
|
||||||
|
<<"x-decimal-64">> := {as_is, 16#84, Decimal64Zero},
|
||||||
|
<<"x-decimal-128">> := {as_is, 16#94, Decimal128Zero},
|
||||||
|
<<"x-list">> := [{as_is, 16#94, Decimal128Zero}],
|
||||||
|
<<"x-map">> := [{{utf8, <<"key-1">>},
|
||||||
|
{as_is, 16#94, Decimal128Zero}}]},
|
||||||
|
amqp10_msg:message_annotations(Msg)),
|
||||||
|
?assertEqual(AppProps, amqp10_msg:application_properties(Msg)),
|
||||||
|
ok = amqp10_client:accept_msg(Receiver, Msg),
|
||||||
|
ok = detach_link_sync(Receiver),
|
||||||
|
|
||||||
|
%% Consume the second message via AMQP 0.9.1
|
||||||
|
%% We expect to receive the message without any crashes.
|
||||||
|
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
|
||||||
|
?assertMatch({#'basic.get_ok'{}, #amqp_msg{}},
|
||||||
|
amqp_channel:call(Ch, #'basic.get'{queue = QName, no_ack = true})),
|
||||||
|
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch),
|
||||||
|
|
||||||
|
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
|
||||||
|
ok = close(Init).
|
||||||
|
|
||||||
%% Attach a receiver to an unavailable quorum queue.
|
%% Attach a receiver to an unavailable quorum queue.
|
||||||
attach_to_down_quorum_queue(Config) ->
|
attach_to_down_quorum_queue(Config) ->
|
||||||
QName = <<"q-down">>,
|
QName = <<"q-down">>,
|
||||||
|
|
Loading…
Reference in New Issue