diff --git a/deps/rabbit/include/mc.hrl b/deps/rabbit/include/mc.hrl index db6025a70c..fe032cfc3a 100644 --- a/deps/rabbit/include/mc.hrl +++ b/deps/rabbit/include/mc.hrl @@ -17,8 +17,6 @@ %% good enough for most use cases -define(IS_MC(Msg), element(1, Msg) == mc andalso tuple_size(Msg) == 5). -%% "Field names MUST start with a letter, '$' or '#' and may continue with letters, '$' or '#', digits, or -%% underlines, to a maximum length of 128 characters." [AMQP 0.9.1 4.2.5.5 Field Tables] -%% Given that the valid chars are ASCII chars, 1 char is encoded as 1 byte. --define(AMQP_LEGACY_FIELD_NAME_MAX_LEN, 128). +%% "Short strings can carry up to 255 octets of UTF-8 data, but +%% may not contain binary zero octets." [AMQP 0.9.1 $4.2.5.3] -define(IS_SHORTSTR_LEN(B), byte_size(B) < 256). diff --git a/deps/rabbit/src/mc.erl b/deps/rabbit/src/mc.erl index 077570b087..6736162e89 100644 --- a/deps/rabbit/src/mc.erl +++ b/deps/rabbit/src/mc.erl @@ -110,14 +110,12 @@ %% Convert state to another protocol %% all protocols must be able to convert to mc_amqp (AMQP 1.0) --callback convert_to(Target :: protocol(), proto_state(), - Env :: #{atom() => term()}) -> +-callback convert_to(Target :: protocol(), proto_state(), environment()) -> proto_state() | not_implemented. %% Convert from another protocol %% all protocols must be able to convert from mc_amqp (AMQP 1.0) --callback convert_from(Source :: protocol(), proto_state(), - Env :: #{atom() => term()}) -> +-callback convert_from(Source :: protocol(), proto_state(), environment()) -> proto_state() | not_implemented. %% emit a protocol specific state package @@ -136,8 +134,7 @@ init(Proto, Data, Anns) -> init(Proto, Data, Anns, #{}). --spec init(protocol(), term(), annotations(), - environment()) -> state(). +-spec init(protocol(), term(), annotations(), environment()) -> state(). init(Proto, Data, Anns0, Env) when is_atom(Proto) andalso is_map(Anns0) @@ -285,7 +282,7 @@ set_ttl(Value, BasicMsg) -> convert(Proto, State) -> convert(Proto, State, #{}). --spec convert(protocol(), state(), Env :: #{atom() => term()}) -> state(). +-spec convert(protocol(), state(), environment()) -> state(). convert(Proto, #?MODULE{protocol = Proto} = State, _Env) -> State; convert(TargetProto, #?MODULE{protocol = SourceProto, diff --git a/deps/rabbit/src/mc_amqpl.erl b/deps/rabbit/src/mc_amqpl.erl index 12cf3431f8..76dfe027e7 100644 --- a/deps/rabbit/src/mc_amqpl.erl +++ b/deps/rabbit/src/mc_amqpl.erl @@ -145,7 +145,7 @@ convert_from(mc_amqp, Sections, _Env) -> Headers0 = [to_091(K, V) || {{utf8, K}, V} <- AP, ?IS_SHORTSTR_LEN(K)], - %% Add remaining message annotations as headers? + %% Add remaining x- message annotations as headers XHeaders = lists:filtermap(fun({{symbol, <<"x-cc">>}, V}) -> {true, to_091(<<"CC">>, V)}; ({{symbol, <<"x-", _/binary>> = K}, V}) @@ -153,7 +153,8 @@ convert_from(mc_amqp, Sections, _Env) -> case is_internal_header(K) of false -> {true, to_091(K, V)}; - true -> false + true -> + false end; (_) -> false @@ -300,8 +301,7 @@ convert_to(mc_amqp, #content{payload_fragments_rev = Payload} = Content, Env) -> undefined -> undefined; _ -> - %% TODO: do we need to validate this doesn't fail? - %% Some validation is done in then channel already + %% Channel already checked for valid integer. binary_to_integer(Expiration) end, @@ -625,7 +625,7 @@ unwrap({_Type, V}) -> unwrap_shortstr({utf8, V}) when is_binary(V) andalso - byte_size(V) < 256 -> + ?IS_SHORTSTR_LEN(V) -> V; unwrap_shortstr(_) -> undefined. @@ -684,7 +684,7 @@ message_id({ulong, N}, _HKey, H0) -> message_id({binary, B}, HKey, H0) -> {[{HKey, binary, B} | H0], undefined}; message_id({utf8, S}, HKey, H0) -> - case byte_size(S) < 256 of + case ?IS_SHORTSTR_LEN(S) of true -> {H0, S}; false -> diff --git a/deps/rabbit/src/mc_util.erl b/deps/rabbit/src/mc_util.erl index c4d3bf8e72..669dace41f 100644 --- a/deps/rabbit/src/mc_util.erl +++ b/deps/rabbit/src/mc_util.erl @@ -1,5 +1,7 @@ -module(mc_util). +-include("mc.hrl"). + -export([is_valid_shortstr/1, is_utf8_no_null/1, uuid_to_urn_string/1, @@ -11,19 +13,14 @@ ]). -spec is_valid_shortstr(term()) -> boolean(). -is_valid_shortstr(Bin) when byte_size(Bin) < 256 -> - utf8_scan(Bin, fun (C) -> C > 0 end); +is_valid_shortstr(Bin) when ?IS_SHORTSTR_LEN(Bin) -> + is_utf8_no_null(Bin); is_valid_shortstr(_) -> false. -is_utf8_no_null(<<>>) -> - true; -is_utf8_no_null(<<0, _/binary>>) -> - false; -is_utf8_no_null(<<_/utf8, Rem/binary>>) -> - is_utf8_no_null(Rem); -is_utf8_no_null(_) -> - false. +-spec is_utf8_no_null(term()) -> boolean(). +is_utf8_no_null(Term) -> + utf8_scan(Term, fun (C) -> C > 0 end). -spec uuid_to_urn_string(binary()) -> binary(). uuid_to_urn_string(< {_, array, [{table, T2a}, {table, T2b}]} = header(<<"x-death">>, H2), ?assertMatch({_, longstr, <<"dl">>}, header(<<"queue">>, T2a)), ?assertMatch({_, longstr, <<"q1">>}, header(<<"queue">>, T2b)), - - ct:pal("H2 ~p", [T2a]), - ct:pal("routing headers ~p", [mc:routing_headers(Msg2, [x_headers])]), - ok. header(K, H) -> @@ -320,7 +316,6 @@ amqpl_amqp_bin_amqpl(_Config) -> %% at this point the type is now present as a message annotation ?assertEqual({utf8, <<"45">>}, mc:x_header(<<"x-basic-type">>, Msg10)), ?assertEqual(RoutingHeaders, mc:routing_headers(Msg10, [])), - % ct:pal("Sections ~p", [mc:protocol_state(Msg10Pre)]), [ #'v1_0.header'{} = Hdr10, @@ -345,22 +340,22 @@ amqpl_amqp_bin_amqpl(_Config) -> Get = fun(K, AP) -> amqp_map_get(utf8(K), AP) end, - ?assertMatch({long, 99}, Get(<<"a-stream-offset">>, AP10)), - ?assertMatch({utf8, <<"a string">>}, Get(<<"a-string">>, AP10)), - ?assertMatch({boolean, false}, Get(<<"a-bool">>, AP10)), - ?assertMatch({ubyte, 1}, Get(<<"a-unsignedbyte">>, AP10)), - ?assertMatch({ushort, 1}, Get(<<"a-unsignedshort">>, AP10)), - ?assertMatch({uint, 1}, Get(<<"a-unsignedint">>, AP10)), - ?assertMatch({int, 1}, Get(<<"a-signedint">>, AP10)), - ?assertMatch({timestamp, 1000}, Get(<<"a-timestamp">>, AP10)), - ?assertMatch({double, 1.0}, Get(<<"a-double">>, AP10)), - ?assertMatch({float, 1.0}, Get(<<"a-float">>, AP10)), - ?assertMatch(undefined, Get(<<"a-void">>, AP10)), - ?assertMatch({binary, <<"data">>}, Get(<<"a-binary">>, AP10)), + ?assertEqual({long, 99}, Get(<<"a-stream-offset">>, AP10)), + ?assertEqual({utf8, <<"a string">>}, Get(<<"a-string">>, AP10)), + ?assertEqual({boolean, false}, Get(<<"a-bool">>, AP10)), + ?assertEqual({ubyte, 1}, Get(<<"a-unsignedbyte">>, AP10)), + ?assertEqual({ushort, 1}, Get(<<"a-unsignedshort">>, AP10)), + ?assertEqual({uint, 1}, Get(<<"a-unsignedint">>, AP10)), + ?assertEqual({int, 1}, Get(<<"a-signedint">>, AP10)), + ?assertEqual({timestamp, 1000}, Get(<<"a-timestamp">>, AP10)), + ?assertEqual({double, 1.0}, Get(<<"a-double">>, AP10)), + ?assertEqual({float, 1.0}, Get(<<"a-float">>, AP10)), + ?assertEqual(undefined, Get(<<"a-void">>, AP10)), + ?assertEqual({binary, <<"data">>}, Get(<<"a-binary">>, AP10)), %% x-headers do not go into app props - ?assertMatch(undefined, Get(<<"x-stream-filter">>, AP10)), + ?assertEqual(undefined, Get(<<"x-stream-filter">>, AP10)), %% arrays are not converted - ?assertMatch(undefined, Get(<<"a-array">>, AP10)), + ?assertEqual(undefined, Get(<<"a-array">>, AP10)), %% assert properties MsgL2 = mc:convert(mc_amqpl, Msg10), @@ -374,7 +369,6 @@ amqpl_amqp_bin_amqpl(_Config) -> ?assertEqual({utf8, <<"msg-id">>}, mc:message_id(MsgL2)), ?assertEqual(1, mc:ttl(MsgL2)), ?assertEqual({utf8, <<"apple">>}, mc:x_header(<<"x-stream-filter">>, MsgL2)), - ct:pal("MSGL2 ~p", [MsgL2]), ?assertEqual(RoutingHeaders, mc:routing_headers(MsgL2, [])), ok. @@ -415,7 +409,7 @@ mc_util_uuid_to_urn_roundtrip(_Config) -> UUID = <<88,184,103,176,129,81,31,86,27,212,115,34,152,7,253,96>>, S = mc_util:uuid_to_urn_string(UUID), ?assertEqual(<<"urn:uuid:58b867b0-8151-1f56-1bd4-73229807fd60">>, S), - ?assertMatch({ok, UUID}, mc_util:urn_string_to_uuid(S)), + ?assertEqual({ok, UUID}, mc_util:urn_string_to_uuid(S)), ok. do_n(0, _) -> @@ -467,8 +461,8 @@ amqp_amqpl_amqp_uuid_correlation_id(_Config) -> MsgL = mc:convert(mc_amqpl, Msg), MsgOut = mc:convert(mc_amqp, MsgL), - ?assertMatch({uuid, UUID}, mc:correlation_id(MsgOut)), - ?assertMatch({uuid, UUID}, mc:message_id(MsgOut)), + ?assertEqual({uuid, UUID}, mc:correlation_id(MsgOut)), + ?assertEqual({uuid, UUID}, mc:message_id(MsgOut)), ok. @@ -485,7 +479,6 @@ amqp_amqpl(_Config) -> thead2('x-list', list, [utf8(<<"l">>)]), thead2('x-map', map, [{utf8(<<"k">>), utf8(<<"v">>)}]) ], - ct:pal("MAC ~p", [MAC]), M = #'v1_0.message_annotations'{content = MAC}, P = #'v1_0.properties'{content_type = {symbol, <<"ctype">>}, content_encoding = {symbol, <<"cenc">>}, @@ -550,8 +543,8 @@ amqp_amqpl(_Config) -> ?assertMatch({_, longstr, <<"apple">>}, header(<<"x-stream-filter">>, HL)), %% these are not coverted as not x- headers - ?assertMatch(undefined, header(<<"list">>, HL)), - ?assertMatch(undefined, header(<<"map">>, HL)), + ?assertEqual(undefined, header(<<"list">>, HL)), + ?assertEqual(undefined, header(<<"map">>, HL)), ?assertMatch({_ ,array, [{longstr,<<"l">>}]}, header(<<"x-list">>, HL)), ?assertMatch({_, table, [{<<"k">>,longstr,<<"v">>}]}, header(<<"x-map">>, HL)), @@ -708,7 +701,6 @@ amqp_amqpl_amqp_bodies(_Config) -> false -> [Payload] end, - % ct:pal("ProtoState ~p", [BodySections]), ?assertEqual(AssertBody, BodySections) end || Payload <- Bodies], ok. diff --git a/deps/rabbit_common/src/rabbit_misc.erl b/deps/rabbit_common/src/rabbit_misc.erl index 241dc3ddcb..520ef0f593 100644 --- a/deps/rabbit_common/src/rabbit_misc.erl +++ b/deps/rabbit_common/src/rabbit_misc.erl @@ -82,7 +82,6 @@ -export([safe_ets_update_counter/3, safe_ets_update_counter/4, safe_ets_update_counter/5, safe_ets_update_element/3, safe_ets_update_element/4, safe_ets_update_element/5]). -export([is_even/1, is_odd/1]). --export([is_valid_shortstr/1]). -export([maps_any/2, maps_put_truthy/3, @@ -1597,21 +1596,6 @@ is_even(N) -> is_odd(N) -> (N band 1) =:= 1. --spec is_valid_shortstr(term()) -> boolean(). -is_valid_shortstr(Bin) when byte_size(Bin) < 256 -> - is_utf8_no_null(Bin); -is_valid_shortstr(_) -> - false. - -is_utf8_no_null(<<>>) -> - true; -is_utf8_no_null(<<0, _/binary>>) -> - false; -is_utf8_no_null(<<_/utf8, Rem/binary>>) -> - is_utf8_no_null(Rem); -is_utf8_no_null(_) -> - false. - -spec maps_put_truthy(Key, Value, Map) -> Map when Map :: #{Key => Value}. maps_put_truthy(_K, undefined, M) -> diff --git a/deps/rabbitmq_mqtt/src/mc_mqtt.erl b/deps/rabbitmq_mqtt/src/mc_mqtt.erl index d58cab8228..be0f8b4add 100644 --- a/deps/rabbitmq_mqtt/src/mc_mqtt.erl +++ b/deps/rabbitmq_mqtt/src/mc_mqtt.erl @@ -92,16 +92,13 @@ convert_from(mc_amqp, Sections, Env) -> end, Props1 = case AmqpProps of #'v1_0.properties'{reply_to = {utf8, Address}} -> - % MqttX = persistent_term:get(?PERSISTENT_TERM_EXCHANGE), - MqttX = maps:get(mqtt_exchange, Env, ?DEFAULT_MQTT_EXCHANGE), + MqttX = maps:get(mqtt_x, Env, ?DEFAULT_MQTT_EXCHANGE), case Address of - % <<"/topic/", Topic/binary>> - % when MqttX =:= ?DEFAULT_MQTT_EXCHANGE -> - % add_response_topic(Topic, Props0); <<"/exchange/", MqttX:(byte_size(MqttX))/binary, "/", RoutingKey/binary>> -> - add_response_topic(RoutingKey, Props0); + MqttTopic = rabbit_mqtt_util:amqp_to_mqtt(RoutingKey), + Props0#{'Response-Topic' => MqttTopic}; _ -> Props0 end; @@ -272,7 +269,7 @@ convert_to(mc_amqp, #mqtt_msg{qos = Qos, end, ReplyTo = case Props of #{'Response-Topic' := MqttTopic} -> - Exchange = maps:get(mqtt_exchange, Env, ?DEFAULT_MQTT_EXCHANGE), + Exchange = maps:get(mqtt_x, Env, ?DEFAULT_MQTT_EXCHANGE), Topic = rabbit_mqtt_util:mqtt_to_amqp(MqttTopic), Address = <<"/exchange/", Exchange/binary, "/", Topic/binary>>, {utf8, Address}; @@ -560,7 +557,3 @@ amqp_encode(Data, Acc0) -> Bin = amqp10_framing:encode_bin(Data), Acc = setelement(5, Acc0, [Bin | element(5, Acc0)]), setelement(7, Acc, ?CONTENT_TYPE_AMQP). - -add_response_topic(AmqpTopic, PublishProperties) -> - MqttTopic = rabbit_mqtt_util:amqp_to_mqtt(AmqpTopic), - PublishProperties#{'Response-Topic' => MqttTopic}. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 508c3e2b3e..59a337c528 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -1549,17 +1549,9 @@ publish_to_queues( conn_name = ConnName, trace_state = TraceState}, auth_state = #auth_state{user = #user{username = Username}}} = State) -> - - Env = case persistent_term:get(?PERSISTENT_TERM_EXCHANGE) of - ?DEFAULT_MQTT_EXCHANGE -> - #{}; - MqttX -> - #{mqtt_exchange => MqttX} - end, - Anns = #{exchange => ExchangeNameBin, routing_keys => [mqtt_to_amqp(Topic)]}, - Msg0 = mc:init(mc_mqtt, MqttMsg, Anns, Env), + Msg0 = mc:init(mc_mqtt, MqttMsg, Anns, mc_env()), Msg = rabbit_message_interceptor:intercept(Msg0), case rabbit_exchange:lookup(ExchangeName) of {ok, Exchange} -> @@ -1788,7 +1780,7 @@ maybe_send_will( _ -> Anns0 end, - Msg = mc:init(mc_mqtt, MqttMsg, Anns), + Msg = mc:init(mc_mqtt, MqttMsg, Anns, mc_env()), case check_publish_permitted(DefaultX, Topic, State) of ok -> ok = rabbit_queue_type:publish_at_most_once(DefaultX, Msg), @@ -2040,7 +2032,7 @@ deliver_one_to_client({QNameOrType, QPid, QMsgId, _Redelivered, Mc} = Delivery, true -> ?QOS_1; false -> ?QOS_0 end, - McMqtt = mc:convert(mc_mqtt, Mc), + McMqtt = mc:convert(mc_mqtt, Mc, mc_env()), MqttMsg = #mqtt_msg{qos = PublisherQos} = mc:protocol_state(McMqtt), QoS = effective_qos(PublisherQos, SubscriberQoS), {SettleOp, State1} = maybe_publish_to_client(MqttMsg, Delivery, QoS, State0), @@ -2544,6 +2536,14 @@ format_status( queues_soft_limit_exceeded => QSLE, qos0_messages_dropped => Qos0MsgsDropped}. +mc_env() -> + case persistent_term:get(?PERSISTENT_TERM_EXCHANGE) of + ?DEFAULT_MQTT_EXCHANGE -> + #{}; + MqttX -> + #{mqtt_x => MqttX} + end. + -spec compat(mc:state(), state()) -> mc:state(). compat(McMqtt, #state{cfg = #cfg{exchange = XName}}) -> case rabbit_feature_flags:is_enabled(message_containers) of diff --git a/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl b/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl index 86db48279d..bf6c7c25c1 100644 --- a/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl @@ -11,13 +11,12 @@ all() -> [ - {group, lossless}, - {group, lossy} + {group, tests} ]. groups() -> [ - {lossless, [shuffle], + {tests, [shuffle], [roundtrip_amqp, roundtrip_amqp_payload_format_indicator, roundtrip_amqp_response_topic, @@ -27,11 +26,8 @@ groups() -> amqp_to_mqtt_amqp_value_section_list, amqp_to_mqtt_amqp_value_section_null, amqp_to_mqtt_amqp_value_section_int, - amqp_to_mqtt_amqp_value_section_boolean - ] - }, - {lossy, [shuffle], - [roundtrip_amqp_user_property, + amqp_to_mqtt_amqp_value_section_boolean, + roundtrip_amqp_user_property, roundtrip_amqpl_user_property, roundtrip_amqp_content_type, amqp_to_mqtt_reply_to, @@ -41,8 +37,7 @@ groups() -> mqtt_amqp, mqtt_amqp_alt, amqp_mqtt - ] - } + ]} ]. roundtrip_amqp(_Config) -> @@ -126,7 +121,7 @@ roundtrip_amqp_payload_format_indicator(_Config) -> roundtrip_amqp_response_topic(_Config) -> Topic = <<"/rabbit/🐇"/utf8>>, Msg0 = mqtt_msg(), - Key = mqtt_exchange, + Key = mqtt_x, MqttExchanges = [<<"amq.topic">>, <<"some-other-topic-exchange">>], [begin @@ -275,7 +270,7 @@ roundtrip_amqp_content_type(_Config) -> amqp_to_mqtt_reply_to(_Config) -> Val = amqp_value({utf8, <<"hey">>}), - Key = mqtt_exchange, + Key = mqtt_x, Env = #{Key => <<"mqtt-topic-exchange">>}, AmqpProps1 = #'v1_0.properties'{reply_to = {utf8, <<"/exchange/mqtt-topic-exchange/my.routing.key">>}}, #mqtt_msg{props = Props1} = amqp_to_mqtt([AmqpProps1, Val], Env), @@ -350,7 +345,7 @@ mqtt_amqpl_alt(_Config) -> ok. mqtt_amqp(_Config) -> - Key = mqtt_exchange, + Key = mqtt_x, Ex = <<"mqtt-topic-exchange">>, Env = #{Key => <<"mqtt-topic-exchange">>}, Mqtt0 = mqtt_msg(), @@ -390,7 +385,7 @@ mqtt_amqp(_Config) -> ok. mqtt_amqp_alt(_Config) -> - Key = mqtt_exchange, + Key = mqtt_x, Ex = <<"mqtt-topic-exchange">>, Env = #{Key => <<"mqtt-topic-exchange">>}, CorrId = <<"urn:uuid:550e8400-e29b-41d4-a716-446655440000">>, @@ -432,7 +427,7 @@ mqtt_amqp_alt(_Config) -> ok. amqp_mqtt(_Config) -> - Env = #{mqtt_exchange => <<"mqtt-topic-exchange">>}, + Env = #{mqtt_x => <<"mqtt-topic-exchange">>}, H = #'v1_0.header'{priority = {ubyte, 3}, ttl = {uint, 20000}, durable = true},