Some small fixes

This commit is contained in:
David Ansari 2023-11-16 12:04:21 +01:00
parent c4fd947aad
commit 95c5f2ec9e
9 changed files with 63 additions and 107 deletions

View File

@ -17,8 +17,6 @@
%% good enough for most use cases
-define(IS_MC(Msg), element(1, Msg) == mc andalso tuple_size(Msg) == 5).
%% "Field names MUST start with a letter, '$' or '#' and may continue with letters, '$' or '#', digits, or
%% underlines, to a maximum length of 128 characters." [AMQP 0.9.1 4.2.5.5 Field Tables]
%% Given that the valid chars are ASCII chars, 1 char is encoded as 1 byte.
-define(AMQP_LEGACY_FIELD_NAME_MAX_LEN, 128).
%% "Short strings can carry up to 255 octets of UTF-8 data, but
%% may not contain binary zero octets." [AMQP 0.9.1 $4.2.5.3]
-define(IS_SHORTSTR_LEN(B), byte_size(B) < 256).

View File

@ -110,14 +110,12 @@
%% Convert state to another protocol
%% all protocols must be able to convert to mc_amqp (AMQP 1.0)
-callback convert_to(Target :: protocol(), proto_state(),
Env :: #{atom() => term()}) ->
-callback convert_to(Target :: protocol(), proto_state(), environment()) ->
proto_state() | not_implemented.
%% Convert from another protocol
%% all protocols must be able to convert from mc_amqp (AMQP 1.0)
-callback convert_from(Source :: protocol(), proto_state(),
Env :: #{atom() => term()}) ->
-callback convert_from(Source :: protocol(), proto_state(), environment()) ->
proto_state() | not_implemented.
%% emit a protocol specific state package
@ -136,8 +134,7 @@
init(Proto, Data, Anns) ->
init(Proto, Data, Anns, #{}).
-spec init(protocol(), term(), annotations(),
environment()) -> state().
-spec init(protocol(), term(), annotations(), environment()) -> state().
init(Proto, Data, Anns0, Env)
when is_atom(Proto)
andalso is_map(Anns0)
@ -285,7 +282,7 @@ set_ttl(Value, BasicMsg) ->
convert(Proto, State) ->
convert(Proto, State, #{}).
-spec convert(protocol(), state(), Env :: #{atom() => term()}) -> state().
-spec convert(protocol(), state(), environment()) -> state().
convert(Proto, #?MODULE{protocol = Proto} = State, _Env) ->
State;
convert(TargetProto, #?MODULE{protocol = SourceProto,

View File

@ -145,7 +145,7 @@ convert_from(mc_amqp, Sections, _Env) ->
Headers0 = [to_091(K, V) || {{utf8, K}, V} <- AP,
?IS_SHORTSTR_LEN(K)],
%% Add remaining message annotations as headers?
%% Add remaining x- message annotations as headers
XHeaders = lists:filtermap(fun({{symbol, <<"x-cc">>}, V}) ->
{true, to_091(<<"CC">>, V)};
({{symbol, <<"x-", _/binary>> = K}, V})
@ -153,7 +153,8 @@ convert_from(mc_amqp, Sections, _Env) ->
case is_internal_header(K) of
false ->
{true, to_091(K, V)};
true -> false
true ->
false
end;
(_) ->
false
@ -300,8 +301,7 @@ convert_to(mc_amqp, #content{payload_fragments_rev = Payload} = Content, Env) ->
undefined ->
undefined;
_ ->
%% TODO: do we need to validate this doesn't fail?
%% Some validation is done in then channel already
%% Channel already checked for valid integer.
binary_to_integer(Expiration)
end,
@ -625,7 +625,7 @@ unwrap({_Type, V}) ->
unwrap_shortstr({utf8, V})
when is_binary(V) andalso
byte_size(V) < 256 ->
?IS_SHORTSTR_LEN(V) ->
V;
unwrap_shortstr(_) ->
undefined.
@ -684,7 +684,7 @@ message_id({ulong, N}, _HKey, H0) ->
message_id({binary, B}, HKey, H0) ->
{[{HKey, binary, B} | H0], undefined};
message_id({utf8, S}, HKey, H0) ->
case byte_size(S) < 256 of
case ?IS_SHORTSTR_LEN(S) of
true ->
{H0, S};
false ->

View File

@ -1,5 +1,7 @@
-module(mc_util).
-include("mc.hrl").
-export([is_valid_shortstr/1,
is_utf8_no_null/1,
uuid_to_urn_string/1,
@ -11,19 +13,14 @@
]).
-spec is_valid_shortstr(term()) -> boolean().
is_valid_shortstr(Bin) when byte_size(Bin) < 256 ->
utf8_scan(Bin, fun (C) -> C > 0 end);
is_valid_shortstr(Bin) when ?IS_SHORTSTR_LEN(Bin) ->
is_utf8_no_null(Bin);
is_valid_shortstr(_) ->
false.
is_utf8_no_null(<<>>) ->
true;
is_utf8_no_null(<<0, _/binary>>) ->
false;
is_utf8_no_null(<<_/utf8, Rem/binary>>) ->
is_utf8_no_null(Rem);
is_utf8_no_null(_) ->
false.
-spec is_utf8_no_null(term()) -> boolean().
is_utf8_no_null(Term) ->
utf8_scan(Term, fun (C) -> C > 0 end).
-spec uuid_to_urn_string(binary()) -> binary().
uuid_to_urn_string(<<TL:4/binary, TM:2/binary, THV:2/binary,

View File

@ -241,10 +241,6 @@ amqpl_death_records(_Config) ->
{_, array, [{table, T2a}, {table, T2b}]} = header(<<"x-death">>, H2),
?assertMatch({_, longstr, <<"dl">>}, header(<<"queue">>, T2a)),
?assertMatch({_, longstr, <<"q1">>}, header(<<"queue">>, T2b)),
ct:pal("H2 ~p", [T2a]),
ct:pal("routing headers ~p", [mc:routing_headers(Msg2, [x_headers])]),
ok.
header(K, H) ->
@ -320,7 +316,6 @@ amqpl_amqp_bin_amqpl(_Config) ->
%% at this point the type is now present as a message annotation
?assertEqual({utf8, <<"45">>}, mc:x_header(<<"x-basic-type">>, Msg10)),
?assertEqual(RoutingHeaders, mc:routing_headers(Msg10, [])),
% ct:pal("Sections ~p", [mc:protocol_state(Msg10Pre)]),
[
#'v1_0.header'{} = Hdr10,
@ -345,22 +340,22 @@ amqpl_amqp_bin_amqpl(_Config) ->
Get = fun(K, AP) -> amqp_map_get(utf8(K), AP) end,
?assertMatch({long, 99}, Get(<<"a-stream-offset">>, AP10)),
?assertMatch({utf8, <<"a string">>}, Get(<<"a-string">>, AP10)),
?assertMatch({boolean, false}, Get(<<"a-bool">>, AP10)),
?assertMatch({ubyte, 1}, Get(<<"a-unsignedbyte">>, AP10)),
?assertMatch({ushort, 1}, Get(<<"a-unsignedshort">>, AP10)),
?assertMatch({uint, 1}, Get(<<"a-unsignedint">>, AP10)),
?assertMatch({int, 1}, Get(<<"a-signedint">>, AP10)),
?assertMatch({timestamp, 1000}, Get(<<"a-timestamp">>, AP10)),
?assertMatch({double, 1.0}, Get(<<"a-double">>, AP10)),
?assertMatch({float, 1.0}, Get(<<"a-float">>, AP10)),
?assertMatch(undefined, Get(<<"a-void">>, AP10)),
?assertMatch({binary, <<"data">>}, Get(<<"a-binary">>, AP10)),
?assertEqual({long, 99}, Get(<<"a-stream-offset">>, AP10)),
?assertEqual({utf8, <<"a string">>}, Get(<<"a-string">>, AP10)),
?assertEqual({boolean, false}, Get(<<"a-bool">>, AP10)),
?assertEqual({ubyte, 1}, Get(<<"a-unsignedbyte">>, AP10)),
?assertEqual({ushort, 1}, Get(<<"a-unsignedshort">>, AP10)),
?assertEqual({uint, 1}, Get(<<"a-unsignedint">>, AP10)),
?assertEqual({int, 1}, Get(<<"a-signedint">>, AP10)),
?assertEqual({timestamp, 1000}, Get(<<"a-timestamp">>, AP10)),
?assertEqual({double, 1.0}, Get(<<"a-double">>, AP10)),
?assertEqual({float, 1.0}, Get(<<"a-float">>, AP10)),
?assertEqual(undefined, Get(<<"a-void">>, AP10)),
?assertEqual({binary, <<"data">>}, Get(<<"a-binary">>, AP10)),
%% x-headers do not go into app props
?assertMatch(undefined, Get(<<"x-stream-filter">>, AP10)),
?assertEqual(undefined, Get(<<"x-stream-filter">>, AP10)),
%% arrays are not converted
?assertMatch(undefined, Get(<<"a-array">>, AP10)),
?assertEqual(undefined, Get(<<"a-array">>, AP10)),
%% assert properties
MsgL2 = mc:convert(mc_amqpl, Msg10),
@ -374,7 +369,6 @@ amqpl_amqp_bin_amqpl(_Config) ->
?assertEqual({utf8, <<"msg-id">>}, mc:message_id(MsgL2)),
?assertEqual(1, mc:ttl(MsgL2)),
?assertEqual({utf8, <<"apple">>}, mc:x_header(<<"x-stream-filter">>, MsgL2)),
ct:pal("MSGL2 ~p", [MsgL2]),
?assertEqual(RoutingHeaders, mc:routing_headers(MsgL2, [])),
ok.
@ -415,7 +409,7 @@ mc_util_uuid_to_urn_roundtrip(_Config) ->
UUID = <<88,184,103,176,129,81,31,86,27,212,115,34,152,7,253,96>>,
S = mc_util:uuid_to_urn_string(UUID),
?assertEqual(<<"urn:uuid:58b867b0-8151-1f56-1bd4-73229807fd60">>, S),
?assertMatch({ok, UUID}, mc_util:urn_string_to_uuid(S)),
?assertEqual({ok, UUID}, mc_util:urn_string_to_uuid(S)),
ok.
do_n(0, _) ->
@ -467,8 +461,8 @@ amqp_amqpl_amqp_uuid_correlation_id(_Config) ->
MsgL = mc:convert(mc_amqpl, Msg),
MsgOut = mc:convert(mc_amqp, MsgL),
?assertMatch({uuid, UUID}, mc:correlation_id(MsgOut)),
?assertMatch({uuid, UUID}, mc:message_id(MsgOut)),
?assertEqual({uuid, UUID}, mc:correlation_id(MsgOut)),
?assertEqual({uuid, UUID}, mc:message_id(MsgOut)),
ok.
@ -485,7 +479,6 @@ amqp_amqpl(_Config) ->
thead2('x-list', list, [utf8(<<"l">>)]),
thead2('x-map', map, [{utf8(<<"k">>), utf8(<<"v">>)}])
],
ct:pal("MAC ~p", [MAC]),
M = #'v1_0.message_annotations'{content = MAC},
P = #'v1_0.properties'{content_type = {symbol, <<"ctype">>},
content_encoding = {symbol, <<"cenc">>},
@ -550,8 +543,8 @@ amqp_amqpl(_Config) ->
?assertMatch({_, longstr, <<"apple">>}, header(<<"x-stream-filter">>, HL)),
%% these are not coverted as not x- headers
?assertMatch(undefined, header(<<"list">>, HL)),
?assertMatch(undefined, header(<<"map">>, HL)),
?assertEqual(undefined, header(<<"list">>, HL)),
?assertEqual(undefined, header(<<"map">>, HL)),
?assertMatch({_ ,array, [{longstr,<<"l">>}]}, header(<<"x-list">>, HL)),
?assertMatch({_, table, [{<<"k">>,longstr,<<"v">>}]}, header(<<"x-map">>, HL)),
@ -708,7 +701,6 @@ amqp_amqpl_amqp_bodies(_Config) ->
false ->
[Payload]
end,
% ct:pal("ProtoState ~p", [BodySections]),
?assertEqual(AssertBody, BodySections)
end || Payload <- Bodies],
ok.

View File

@ -82,7 +82,6 @@
-export([safe_ets_update_counter/3, safe_ets_update_counter/4, safe_ets_update_counter/5,
safe_ets_update_element/3, safe_ets_update_element/4, safe_ets_update_element/5]).
-export([is_even/1, is_odd/1]).
-export([is_valid_shortstr/1]).
-export([maps_any/2,
maps_put_truthy/3,
@ -1597,21 +1596,6 @@ is_even(N) ->
is_odd(N) ->
(N band 1) =:= 1.
-spec is_valid_shortstr(term()) -> boolean().
is_valid_shortstr(Bin) when byte_size(Bin) < 256 ->
is_utf8_no_null(Bin);
is_valid_shortstr(_) ->
false.
is_utf8_no_null(<<>>) ->
true;
is_utf8_no_null(<<0, _/binary>>) ->
false;
is_utf8_no_null(<<_/utf8, Rem/binary>>) ->
is_utf8_no_null(Rem);
is_utf8_no_null(_) ->
false.
-spec maps_put_truthy(Key, Value, Map) -> Map when
Map :: #{Key => Value}.
maps_put_truthy(_K, undefined, M) ->

View File

@ -92,16 +92,13 @@ convert_from(mc_amqp, Sections, Env) ->
end,
Props1 = case AmqpProps of
#'v1_0.properties'{reply_to = {utf8, Address}} ->
% MqttX = persistent_term:get(?PERSISTENT_TERM_EXCHANGE),
MqttX = maps:get(mqtt_exchange, Env, ?DEFAULT_MQTT_EXCHANGE),
MqttX = maps:get(mqtt_x, Env, ?DEFAULT_MQTT_EXCHANGE),
case Address of
% <<"/topic/", Topic/binary>>
% when MqttX =:= ?DEFAULT_MQTT_EXCHANGE ->
% add_response_topic(Topic, Props0);
<<"/exchange/",
MqttX:(byte_size(MqttX))/binary, "/",
RoutingKey/binary>> ->
add_response_topic(RoutingKey, Props0);
MqttTopic = rabbit_mqtt_util:amqp_to_mqtt(RoutingKey),
Props0#{'Response-Topic' => MqttTopic};
_ ->
Props0
end;
@ -272,7 +269,7 @@ convert_to(mc_amqp, #mqtt_msg{qos = Qos,
end,
ReplyTo = case Props of
#{'Response-Topic' := MqttTopic} ->
Exchange = maps:get(mqtt_exchange, Env, ?DEFAULT_MQTT_EXCHANGE),
Exchange = maps:get(mqtt_x, Env, ?DEFAULT_MQTT_EXCHANGE),
Topic = rabbit_mqtt_util:mqtt_to_amqp(MqttTopic),
Address = <<"/exchange/", Exchange/binary, "/", Topic/binary>>,
{utf8, Address};
@ -560,7 +557,3 @@ amqp_encode(Data, Acc0) ->
Bin = amqp10_framing:encode_bin(Data),
Acc = setelement(5, Acc0, [Bin | element(5, Acc0)]),
setelement(7, Acc, ?CONTENT_TYPE_AMQP).
add_response_topic(AmqpTopic, PublishProperties) ->
MqttTopic = rabbit_mqtt_util:amqp_to_mqtt(AmqpTopic),
PublishProperties#{'Response-Topic' => MqttTopic}.

View File

@ -1549,17 +1549,9 @@ publish_to_queues(
conn_name = ConnName,
trace_state = TraceState},
auth_state = #auth_state{user = #user{username = Username}}} = State) ->
Env = case persistent_term:get(?PERSISTENT_TERM_EXCHANGE) of
?DEFAULT_MQTT_EXCHANGE ->
#{};
MqttX ->
#{mqtt_exchange => MqttX}
end,
Anns = #{exchange => ExchangeNameBin,
routing_keys => [mqtt_to_amqp(Topic)]},
Msg0 = mc:init(mc_mqtt, MqttMsg, Anns, Env),
Msg0 = mc:init(mc_mqtt, MqttMsg, Anns, mc_env()),
Msg = rabbit_message_interceptor:intercept(Msg0),
case rabbit_exchange:lookup(ExchangeName) of
{ok, Exchange} ->
@ -1788,7 +1780,7 @@ maybe_send_will(
_ ->
Anns0
end,
Msg = mc:init(mc_mqtt, MqttMsg, Anns),
Msg = mc:init(mc_mqtt, MqttMsg, Anns, mc_env()),
case check_publish_permitted(DefaultX, Topic, State) of
ok ->
ok = rabbit_queue_type:publish_at_most_once(DefaultX, Msg),
@ -2040,7 +2032,7 @@ deliver_one_to_client({QNameOrType, QPid, QMsgId, _Redelivered, Mc} = Delivery,
true -> ?QOS_1;
false -> ?QOS_0
end,
McMqtt = mc:convert(mc_mqtt, Mc),
McMqtt = mc:convert(mc_mqtt, Mc, mc_env()),
MqttMsg = #mqtt_msg{qos = PublisherQos} = mc:protocol_state(McMqtt),
QoS = effective_qos(PublisherQos, SubscriberQoS),
{SettleOp, State1} = maybe_publish_to_client(MqttMsg, Delivery, QoS, State0),
@ -2544,6 +2536,14 @@ format_status(
queues_soft_limit_exceeded => QSLE,
qos0_messages_dropped => Qos0MsgsDropped}.
mc_env() ->
case persistent_term:get(?PERSISTENT_TERM_EXCHANGE) of
?DEFAULT_MQTT_EXCHANGE ->
#{};
MqttX ->
#{mqtt_x => MqttX}
end.
-spec compat(mc:state(), state()) -> mc:state().
compat(McMqtt, #state{cfg = #cfg{exchange = XName}}) ->
case rabbit_feature_flags:is_enabled(message_containers) of

View File

@ -11,13 +11,12 @@
all() ->
[
{group, lossless},
{group, lossy}
{group, tests}
].
groups() ->
[
{lossless, [shuffle],
{tests, [shuffle],
[roundtrip_amqp,
roundtrip_amqp_payload_format_indicator,
roundtrip_amqp_response_topic,
@ -27,11 +26,8 @@ groups() ->
amqp_to_mqtt_amqp_value_section_list,
amqp_to_mqtt_amqp_value_section_null,
amqp_to_mqtt_amqp_value_section_int,
amqp_to_mqtt_amqp_value_section_boolean
]
},
{lossy, [shuffle],
[roundtrip_amqp_user_property,
amqp_to_mqtt_amqp_value_section_boolean,
roundtrip_amqp_user_property,
roundtrip_amqpl_user_property,
roundtrip_amqp_content_type,
amqp_to_mqtt_reply_to,
@ -41,8 +37,7 @@ groups() ->
mqtt_amqp,
mqtt_amqp_alt,
amqp_mqtt
]
}
]}
].
roundtrip_amqp(_Config) ->
@ -126,7 +121,7 @@ roundtrip_amqp_payload_format_indicator(_Config) ->
roundtrip_amqp_response_topic(_Config) ->
Topic = <<"/rabbit/🐇"/utf8>>,
Msg0 = mqtt_msg(),
Key = mqtt_exchange,
Key = mqtt_x,
MqttExchanges = [<<"amq.topic">>,
<<"some-other-topic-exchange">>],
[begin
@ -275,7 +270,7 @@ roundtrip_amqp_content_type(_Config) ->
amqp_to_mqtt_reply_to(_Config) ->
Val = amqp_value({utf8, <<"hey">>}),
Key = mqtt_exchange,
Key = mqtt_x,
Env = #{Key => <<"mqtt-topic-exchange">>},
AmqpProps1 = #'v1_0.properties'{reply_to = {utf8, <<"/exchange/mqtt-topic-exchange/my.routing.key">>}},
#mqtt_msg{props = Props1} = amqp_to_mqtt([AmqpProps1, Val], Env),
@ -350,7 +345,7 @@ mqtt_amqpl_alt(_Config) ->
ok.
mqtt_amqp(_Config) ->
Key = mqtt_exchange,
Key = mqtt_x,
Ex = <<"mqtt-topic-exchange">>,
Env = #{Key => <<"mqtt-topic-exchange">>},
Mqtt0 = mqtt_msg(),
@ -390,7 +385,7 @@ mqtt_amqp(_Config) ->
ok.
mqtt_amqp_alt(_Config) ->
Key = mqtt_exchange,
Key = mqtt_x,
Ex = <<"mqtt-topic-exchange">>,
Env = #{Key => <<"mqtt-topic-exchange">>},
CorrId = <<"urn:uuid:550e8400-e29b-41d4-a716-446655440000">>,
@ -432,7 +427,7 @@ mqtt_amqp_alt(_Config) ->
ok.
amqp_mqtt(_Config) ->
Env = #{mqtt_exchange => <<"mqtt-topic-exchange">>},
Env = #{mqtt_x => <<"mqtt-topic-exchange">>},
H = #'v1_0.header'{priority = {ubyte, 3},
ttl = {uint, 20000},
durable = true},