Deduplicate AMQP type inference

Introduce a single place in the AMQP 1.0 Erlang client that infers the AMQP 1.0 type.

Erlang integers are inferred to be AMQP type `long` to avoid overflow surprises.
This commit is contained in:
David Ansari 2024-11-15 10:37:13 +01:00
parent 44e74ceb96
commit 6e8b566323
4 changed files with 58 additions and 76 deletions

View File

@ -1194,33 +1194,11 @@ make_link_ref(Role, Session, Handle) ->
translate_message_annotations(MA)
when map_size(MA) > 0 ->
{map, maps:fold(fun(K, V, Acc) ->
[{sym(K), wrap_map_value(V)} | Acc]
[{sym(K), amqp10_client_types:infer(V)} | Acc]
end, [], MA)};
translate_message_annotations(_MA) ->
undefined.
wrap_map_value(true) ->
{boolean, true};
wrap_map_value(false) ->
{boolean, false};
wrap_map_value(V) when is_integer(V) ->
case V < 0 of
true ->
{int, V};
false ->
uint(V)
end;
wrap_map_value(V) when is_binary(V) ->
utf8(V);
wrap_map_value(V) when is_list(V) ->
utf8(list_to_binary(V));
wrap_map_value(V) when is_atom(V) ->
utf8(atom_to_list(V));
wrap_map_value(TaggedValue) when is_atom(element(1, TaggedValue)) ->
TaggedValue.
utf8(V) -> amqp10_client_types:utf8(V).
sym(B) when is_binary(B) -> {symbol, B};
sym(B) when is_list(B) -> {symbol, list_to_binary(B)};
sym(B) when is_atom(B) -> {symbol, atom_to_binary(B, utf8)}.

View File

@ -9,6 +9,7 @@
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-export([unpack/1,
infer/1,
utf8/1,
uint/1,
make_properties/1]).
@ -73,13 +74,32 @@
properties/0]).
unpack({_, Value}) -> Value;
unpack(Value) -> Value.
unpack({_, Value}) ->
Value;
unpack(Value) ->
Value.
utf8(S) when is_list(S) -> {utf8, list_to_binary(S)};
utf8(B) when is_binary(B) -> {utf8, B}.
infer(V) when is_integer(V) ->
{long, V};
infer(V) when is_number(V) ->
%% AMQP double and Erlang float are both 64-bit.
{double, V};
infer(V) when is_boolean(V) ->
{boolean, V};
infer(V) when is_atom(V) ->
{utf8, atom_to_binary(V, utf8)};
infer(TaggedValue) when is_atom(element(1, TaggedValue)) ->
TaggedValue;
infer(V) ->
utf8(V).
uint(N) -> {uint, N}.
utf8(V) when is_binary(V) ->
{utf8, V};
utf8(V) when is_list(V) ->
{utf8, unicode:characters_to_binary(V)}.
uint(N) ->
{uint, N}.
make_properties(#{properties := Props})
when map_size(Props) > 0 ->

View File

@ -38,6 +38,8 @@
set_message_annotations/2
]).
-import(amqp10_client_types, [utf8/1]).
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-type opt(T) :: T | undefined.
@ -380,13 +382,13 @@ set_application_properties(
Props0, #amqp10_msg{application_properties =
#'v1_0.application_properties'{content = APs0}} = Msg) ->
Props = maps:fold(fun (K, V, S) ->
S#{utf8(K) => wrap_ap_value(V)}
S#{utf8(K) => amqp10_client_types:infer(V)}
end, maps:from_list(APs0), Props0),
APs = #'v1_0.application_properties'{content = maps:to_list(Props)},
Msg#amqp10_msg{application_properties = APs}.
-spec set_delivery_annotations(#{binary() => binary() | integer() | string()},
amqp10_msg()) -> amqp10_msg().
amqp10_msg()) -> amqp10_msg().
set_delivery_annotations(Props,
#amqp10_msg{delivery_annotations = undefined} =
Msg) ->
@ -394,51 +396,30 @@ set_delivery_annotations(Props,
set_delivery_annotations(Props,
Msg#amqp10_msg{delivery_annotations = Anns});
set_delivery_annotations(
Props0, #amqp10_msg{delivery_annotations =
#'v1_0.delivery_annotations'{content = Anns0}} = Msg) ->
Anns = maps:fold(fun (K, V, S) ->
S#{sym(K) => wrap_ap_value(V)}
end, maps:from_list(Anns0), Props0),
Anns1 = #'v1_0.delivery_annotations'{content = maps:to_list(Anns)},
Msg#amqp10_msg{delivery_annotations = Anns1}.
Props, #amqp10_msg{delivery_annotations =
#'v1_0.delivery_annotations'{content = Anns0}} = Msg) ->
Anns1 = maps:fold(fun (K, V, S) ->
S#{sym(K) => amqp10_client_types:infer(V)}
end, maps:from_list(Anns0), Props),
Anns = #'v1_0.delivery_annotations'{content = maps:to_list(Anns1)},
Msg#amqp10_msg{delivery_annotations = Anns}.
-spec set_message_annotations(#{binary() => binary() | number() | string() | tuple()},
amqp10_msg()) -> amqp10_msg().
set_message_annotations(Props,
#amqp10_msg{message_annotations = undefined} =
Msg) ->
#amqp10_msg{message_annotations = undefined} =
Msg) ->
Anns = #'v1_0.message_annotations'{content = []},
set_message_annotations(Props,
Msg#amqp10_msg{message_annotations = Anns});
Msg#amqp10_msg{message_annotations = Anns});
set_message_annotations(
Props0, #amqp10_msg{message_annotations =
#'v1_0.message_annotations'{content = Anns0}} = Msg) ->
Anns = maps:fold(fun (K, V, S) ->
S#{sym(K) => wrap_ap_value(V)}
end, maps:from_list(Anns0), Props0),
Anns1 = #'v1_0.message_annotations'{content = maps:to_list(Anns)},
Msg#amqp10_msg{message_annotations = Anns1}.
wrap_ap_value(true) ->
{boolean, true};
wrap_ap_value(false) ->
{boolean, false};
wrap_ap_value(V) when is_binary(V) ->
utf8(V);
wrap_ap_value(V) when is_list(V) ->
utf8(list_to_binary(V));
wrap_ap_value(V) when is_atom(V) ->
utf8(atom_to_binary(V));
wrap_ap_value(V) when is_integer(V) ->
case V < 0 of
true -> {int, V};
false -> {uint, V}
end;
wrap_ap_value(V) when is_number(V) ->
%% AMQP double and Erlang float are both 64-bit.
{double, V};
wrap_ap_value(TaggedValue) when is_tuple(TaggedValue) ->
TaggedValue.
Props, #amqp10_msg{message_annotations =
#'v1_0.message_annotations'{content = Anns0}} = Msg) ->
Anns1 = maps:fold(fun (K, V, S) ->
S#{sym(K) => amqp10_client_types:infer(V)}
end, maps:from_list(Anns0), Props),
Anns = #'v1_0.message_annotations'{content = maps:to_list(Anns1)},
Msg#amqp10_msg{message_annotations = Anns}.
%% LOCAL
header_value(durable, undefined) -> false;
@ -474,7 +455,6 @@ parse_from_amqp(#'v1_0.footer'{} = Header, AmqpMsg) ->
AmqpMsg#amqp10_msg{footer = Header}.
unpack(V) -> amqp10_client_types:unpack(V).
utf8(V) -> amqp10_client_types:utf8(V).
sym(B) when is_list(B) -> {symbol, list_to_binary(B)};
sym(B) when is_binary(B) -> {symbol, B}.
uint(B) -> {uint, B}.

View File

@ -1301,7 +1301,7 @@ amqp_amqpl(QType, Config) ->
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:set_application_properties(
#{"my int" => -2},
#{"my int" => {int, -2}},
amqp10_msg:new(<<>>, Body1, true))),
%% Send with properties
CorrelationID = <<"my correlation ID">>,
@ -1316,7 +1316,7 @@ amqp_amqpl(QType, Config) ->
amqp10_msg:set_properties(
#{correlation_id => CorrelationID},
amqp10_msg:set_application_properties(
#{"my int" => -2},
#{"my long" => -9_000_000_000},
amqp10_msg:new(<<>>, Body1, true)))),
%% Send with footer
Footer = #'v1_0.footer'{content = [{{symbol, <<"x-my footer">>}, {ubyte, 255}}]},
@ -1405,7 +1405,7 @@ amqp_amqpl(QType, Config) ->
correlation_id = Corr9}}} ->
?assertEqual([Body1], amqp10_framing:decode_bin(Payload9)),
?assertEqual(CorrelationID, Corr9),
?assertEqual({signedint, -2}, rabbit_misc:table_lookup(Headers9, <<"my int">>))
?assertEqual({long, -9_000_000_000}, rabbit_misc:table_lookup(Headers9, <<"my long">>))
after 30000 -> ct:fail({missing_deliver, ?LINE})
end,
receive {_, #amqp_msg{payload = Payload10}} ->
@ -1453,12 +1453,14 @@ amqp10_to_amqp091_header_conversion(Session,Ch, QName, Address) ->
OutMsg1 = amqp10_msg:new(<<"my-tag">>, <<"my-body">>, false),
OutMsg2 = amqp10_msg:set_application_properties(
#{"string" => "string-val",
"int" => 2,
"long" => -2,
"uint" => {uint, 2},
"bool" => false},
OutMsg1),
OutMsg3 = amqp10_msg:set_message_annotations(
#{"x-string" => "string-value",
"x-int" => 3,
"x-long" => -3,
"x-uint" => {uint, 3},
"x-bool" => true},
OutMsg2),
OutMsg = amqp10_msg:set_headers(
@ -1478,11 +1480,13 @@ amqp10_to_amqp091_header_conversion(Session,Ch, QName, Address) ->
%% assert application properties
?assertEqual({longstr, <<"string-val">>}, rabbit_misc:table_lookup(Headers, <<"string">>)),
?assertEqual({unsignedint, 2}, rabbit_misc:table_lookup(Headers, <<"int">>)),
?assertEqual({long, -2}, rabbit_misc:table_lookup(Headers, <<"long">>)),
?assertEqual({unsignedint, 2}, rabbit_misc:table_lookup(Headers, <<"uint">>)),
?assertEqual({bool, false}, rabbit_misc:table_lookup(Headers, <<"bool">>)),
%% assert message annotations
?assertEqual({longstr, <<"string-value">>}, rabbit_misc:table_lookup(Headers, <<"x-string">>)),
?assertEqual({unsignedint, 3}, rabbit_misc:table_lookup(Headers, <<"x-int">>)),
?assertEqual({long, -3}, rabbit_misc:table_lookup(Headers, <<"x-long">>)),
?assertEqual({unsignedint, 3}, rabbit_misc:table_lookup(Headers, <<"x-uint">>)),
?assertEqual({bool, true}, rabbit_misc:table_lookup(Headers, <<"x-bool">>)),
%% assert headers
?assertEqual(2, DeliveryMode),