rabbitmq-server/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl

558 lines
23 KiB
Erlang

-module(mc_mqtt_SUITE).
-compile([export_all,
nowarn_export_all]).
-include_lib("rabbitmq_mqtt/include/rabbit_mqtt_packet.hrl").
-include_lib("rabbit/include/mc.hrl").
-include_lib("rabbit_common/include/rabbit_framing.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-include_lib("eunit/include/eunit.hrl").
all() ->
[
{group, tests}
].
groups() ->
[
{tests, [shuffle],
[roundtrip_amqp,
roundtrip_amqp_response_topic,
roundtrip_amqpl,
roundtrip_amqpl_correlation,
amqp_to_mqtt_body_sections,
roundtrip_amqp_user_property,
roundtrip_amqpl_user_property,
roundtrip_amqp_content_type,
amqp_to_mqtt_reply_to,
amqp_to_mqtt_footer,
mqtt_amqpl,
mqtt_amqpl_alt,
mqtt_amqp,
mqtt_amqp_alt,
amqp_mqtt,
is_persistent,
amqpl_to_mqtt_gh_12707
]}
].
roundtrip_amqp(_Config) ->
Msg = #mqtt_msg{
qos = 1,
topic = <<"/my/topic">>,
payload = <<"my payload">>,
props = #{'Content-Type' => <<"text-plain">>,
'Correlation-Data' => <<0, 255, 0>>,
'User-Property' => [{<<"x-key-2">>, <<"val-2">>},
{<<"x-key-3">>, <<"val-3">>},
{<<"x-key-1">>, <<"val-1">>},
{<<"key-2">>, <<"val-2">>},
{<<"key-3">>, <<"val-3">>},
{<<"key-1">>, <<"val-1">>}]}},
Anns = #{?ANN_ROUTING_KEYS => [rabbit_mqtt_util:mqtt_to_amqp(Msg#mqtt_msg.topic)]},
Mc0 = mc:init(mc_mqtt, Msg, Anns),
BytesTopic = 9,
BytesContentType = 10,
BytesCorrelationData = 3,
BytesUserProperty = 66,
MetaDataSize = BytesTopic + BytesContentType + BytesCorrelationData + BytesUserProperty,
PayloadSize = 10,
ExpectedSize = {MetaDataSize, PayloadSize},
?assertEqual(ExpectedSize, mc:size(Mc0)),
?assertEqual(#{<<"x-key-1">> => {utf8, <<"val-1">>},
<<"x-key-2">> => {utf8, <<"val-2">>},
<<"x-key-3">> => {utf8, <<"val-3">>}},
mc:x_headers(Mc0)),
Env = #{},
?assertEqual(Msg, mc_mqtt:convert_to(mc_mqtt, Msg, Env)),
?assertEqual(not_implemented, mc_mqtt:convert_to(mc_stomp, Msg, Env)),
?assertEqual(Mc0, mc:convert(mc_mqtt, Mc0)),
%% roundtrip
Mc1 = mc:convert(mc_amqp, Mc0),
Mc = mc:convert(mc_mqtt, Mc1),
?assertEqual({binary, <<0, 255, 0>>}, mc:correlation_id(Mc)),
?assertEqual(undefined, mc:timestamp(Mc)),
?assert(mc:is_persistent(Mc)),
?assertEqual(#{<<"key-1">> => <<"val-1">>,
<<"key-2">> => <<"val-2">>,
<<"key-3">> => <<"val-3">>},
mc:routing_headers(Mc, [])),
?assertEqual(#{<<"key-1">> => <<"val-1">>,
<<"key-2">> => <<"val-2">>,
<<"key-3">> => <<"val-3">>,
<<"x-key-1">> => <<"val-1">>,
<<"x-key-2">> => <<"val-2">>,
<<"x-key-3">> => <<"val-3">>},
mc:routing_headers(Mc, [x_headers])),
?assertEqual({utf8, <<"val-3">>}, mc:x_header(<<"x-key-3">>, Mc)),
?assertEqual(undefined, mc:x_header(<<"x-key-4">>, Mc)),
#mqtt_msg{qos = Qos,
topic = Topic,
payload = Payload,
props = Props
} = mc:protocol_state(Mc),
?assertEqual(1, Qos),
?assertEqual(<<"/my/topic">>, Topic),
?assertEqual(<<"my payload">>, iolist_to_binary(Payload)),
?assertMatch(#{'Content-Type' := <<"text-plain">>,
'Correlation-Data' := <<0, 255, 0>>
}, Props),
ExpectedUserProperty = maps:get('User-Property', Msg#mqtt_msg.props),
%% We expect order to be maintained.
?assertMatch(#{'User-Property' := ExpectedUserProperty}, Props).
roundtrip_amqp_response_topic(_Config) ->
Topic = <<"/rabbit/🐇"/utf8>>,
Msg0 = mqtt_msg(),
Key = mqtt_x,
MqttExchanges = [<<"amq.topic">>,
<<"some-other-topic-exchange">>],
[begin
Env = #{Key => X},
Msg = Msg0#mqtt_msg{props = #{'Response-Topic' => Topic}},
?assertMatch(#mqtt_msg{props = #{'Response-Topic' := Topic}},
roundtrip(mc_amqp, Msg, Env)),
ok
end || X <- MqttExchanges].
roundtrip_amqpl(_Config) ->
Msg = #mqtt_msg{
qos = 1,
topic = <<"/my/topic">>,
payload = <<"my payload">>,
props = #{'Content-Type' => <<"text-plain">>,
'Correlation-Data' => <<"ABC-123">>,
'Response-Topic' => <<"my/response/topic">>,
'User-Property' => [{<<"x-key-2">>, <<"val-2">>},
{<<"x-key-3">>, <<"val-3">>},
{<<"x-key-1">>, <<"val-1">>},
{<<"key-2">>, <<"val-2">>},
{<<"key-3">>, <<"val-3">>},
{<<"key-1">>, <<"val-1">>}]}},
Anns = #{?ANN_ROUTING_KEYS => [rabbit_mqtt_util:mqtt_to_amqp(Msg#mqtt_msg.topic)]},
Mc0 = mc:init(mc_mqtt, Msg, Anns),
Mc1 = mc:convert(mc_amqpl, Mc0),
Mc = mc:convert(mc_mqtt, Mc1),
?assertEqual({binary, <<"ABC-123">>}, mc:correlation_id(Mc)),
?assert(mc:is_persistent(Mc)),
#mqtt_msg{qos = Qos,
topic = Topic,
payload = Payload,
props = Props
} = mc:protocol_state(Mc),
?assertEqual(1, Qos),
?assertEqual(<<"/my/topic">>, Topic),
?assertEqual(<<"my payload">>, iolist_to_binary(Payload)),
?assertMatch(#{'Content-Type' := <<"text-plain">>,
'Correlation-Data' := <<"ABC-123">>,
'Response-Topic' := <<"my/response/topic">>},
Props),
UserProperty = maps:get('User-Property', Msg#mqtt_msg.props),
%% We expect order to not be maintained since AMQP 0.9.1 sorts by key.
ExpectedUserProperty = lists:keysort(1, UserProperty),
?assertMatch(#{'User-Property' := ExpectedUserProperty}, Props).
amqpl_to_mqtt_gh_12707(_Config) ->
Props = #'P_basic'{expiration = <<"12707">>},
Payload = [<<"gh_12707">>],
Content = #content{properties = Props,
payload_fragments_rev = Payload},
Anns = #{?ANN_EXCHANGE => <<"amq.topic">>,
?ANN_ROUTING_KEYS => [<<"dummy">>]},
OriginalMsg = mc:init(mc_amqpl, Content, Anns),
Converted = mc:convert(mc_mqtt, OriginalMsg),
?assertMatch(#mqtt_msg{}, mc:protocol_state(Converted)),
?assertEqual(12707, mc:get_annotation(ttl, Converted)).
%% Non-UTF-8 Correlation Data should also be converted (via AMQP 0.9.1 header x-correlation-id).
roundtrip_amqpl_correlation(_Config) ->
Msg0 = mqtt_msg(),
Correlation = binary:copy(<<0>>, 1024),
Msg = Msg0#mqtt_msg{
props = #{'Correlation-Data' => Correlation}},
?assertMatch(#mqtt_msg{props = #{'Correlation-Data' := Correlation}},
roundtrip(mc_amqpl, Msg)).
amqp_to_mqtt_body_sections(_Config) ->
%% An amqp-value section should get AMQP encoded.
Body1 = [#'v1_0.amqp_value'{content = {list, [{uint, 3}]}}],
#mqtt_msg{props = #{'Content-Type' := <<"message/vnd.rabbitmq.amqp">>},
payload = Payload1} = amqp_to_mqtt(Body1),
?assertEqual(Body1, amqp10_framing:decode_bin(iolist_to_binary(Payload1))),
%% amqp-sequence sections should get AMQP encoded.
Body2 = [#'v1_0.amqp_sequence'{content = [true, false]},
#'v1_0.amqp_sequence'{content = [{binary, <<0, 255>>}]}],
#mqtt_msg{props = #{'Content-Type' := <<"message/vnd.rabbitmq.amqp">>},
payload = Payload2} = amqp_to_mqtt(Body2),
?assertEqual(Body2, amqp10_framing:decode_bin(iolist_to_binary(Payload2))),
%% Binary data of multiple data sections should get concatenated.
Body3 = [#'v1_0.data'{content = <<0>>},
#'v1_0.data'{content = <<11, 10>>},
#'v1_0.data'{content = <<9>>}],
#mqtt_msg{props = Props,
payload = Payload3} = amqp_to_mqtt(Body3),
?assertEqual(0, maps:size(Props)),
?assertEqual(<<0, 11, 10, 9>>, iolist_to_binary(Payload3)).
%% When converting from MQTT 5.0 to AMQP 1.0, we expect to lose some User Property.
roundtrip_amqp_user_property(_Config) ->
Msg0 = mqtt_msg(),
Msg = Msg0#mqtt_msg{props = #{'User-Property' =>
[{<<"x-dup"/utf8>>, <<"val-2">>},
{<<"x-dup"/utf8>>, <<"val-3">>},
{<<"dup">>, <<"val-4">>},
{<<"dup">>, <<"val-5">>},
{<<"x-key-no-ascii🐇"/utf8>>, <<"val-1">>}
]}},
#mqtt_msg{props = Props} = roundtrip(mc_amqp, Msg),
Lost = [%% AMQP 1.0 maps disallow duplicate keys.
{<<"x-dup">>, <<"val-3">>},
{<<"dup">>, <<"val-5">>},
%% AMQP 1.0 annotations require keys to be symbols, i.e. ASCII
{<<"x-key-no-ascii🐇"/utf8>>, <<"val-1">>}
],
ExpectedUserProperty = maps:get('User-Property', Msg#mqtt_msg.props) -- Lost,
?assertMatch(#{'User-Property' := ExpectedUserProperty}, Props).
%% When converting from MQTT 5.0 to AMQP 0.9.1, we expect to lose any duplicates and
%% any User Property whose name is longer than 128 characters.
roundtrip_amqpl_user_property(_Config) ->
Msg0 = mqtt_msg(),
Msg = Msg0#mqtt_msg{
props = #{'User-Property' => [{<<"key-2">>, <<"val-2">>},
{<<"key-1">>, <<"val-1">>},
{binary:copy(<<"k">>, 256), <<"val-2">>},
{<<"key-1">>, <<"val-1">>}
]}},
?assertMatch(#mqtt_msg{props = #{'User-Property' := [{<<"key-1">>, <<"val-1">>},
{<<"key-2">>, <<"val-2">>}]}},
roundtrip(mc_amqpl, Msg)).
%% In MQTT 5.0 the Content Type is a UTF-8 encoded string.
%% In AMQP 1.0 the Content Type is a symbol.
%% We expect to lose the Content Type when converting from MQTT 5.0 to AMQP 1.0 if
%% the Content Type is not valid ASCII.
roundtrip_amqp_content_type(_Config) ->
Msg0 = mqtt_msg(),
Msg = Msg0#mqtt_msg{props = #{'Content-Type' => <<"no-ascii🐇"/utf8>>}},
#mqtt_msg{props = Props} = roundtrip(mc_amqp, Msg),
?assertNot(maps:is_key('Content-Type', Props)).
amqp_to_mqtt_reply_to(_Config) ->
Val = amqp_value({utf8, <<"hey">>}),
Key = mqtt_x,
Env = #{Key => <<"mqtt-topic-exchange">>},
AmqpProps1 = #'v1_0.properties'{reply_to = {utf8, <<"/exchanges/mqtt-topic-exchange/my.routing.key">>}},
#mqtt_msg{props = Props1} = amqp_to_mqtt([AmqpProps1, Val], Env),
?assertEqual({ok, <<"my/routing/key">>},
maps:find('Response-Topic', Props1)),
AmqpProps2 = #'v1_0.properties'{reply_to = {utf8, <<"/exchanges/NON-mqtt-topic-exchange/my.routing.key">>}},
#mqtt_msg{props = Props2} = amqp_to_mqtt([AmqpProps2, Val]),
?assertEqual(error,
maps:find('Response-Topic', Props2)),
RoutingKey = <<"my.sp%$@cial.routing.key">>,
%% The AMQP client must percent encode the AMQP reply_to address URI. We expect the
%% AMQP -> MQTT conversion to percent decode because an MQTT response topic is not percent encoded.
RoutingKeyQuoted = uri_string:quote(RoutingKey),
AmqpProps3 = #'v1_0.properties'{reply_to = {utf8, <<"/exchanges/mqtt-topic-exchange/", RoutingKeyQuoted/binary>>}},
#mqtt_msg{props = Props3} = amqp_to_mqtt([AmqpProps3, Val], Env),
?assertEqual({ok, <<"my/sp%$@cial/routing/key">>},
maps:find('Response-Topic', Props3)),
%% If the AMQP client did not percent encode the AMQP reply_to address URI as required,
%% then the reply_to should be ignored by the conversion.
AmqpProps4 = #'v1_0.properties'{reply_to = {utf8, <<"/exchanges/mqtt-topic-exchange/", RoutingKey/binary>>}},
#mqtt_msg{props = Props4} = amqp_to_mqtt([AmqpProps4, Val], Env),
?assertEqual(error,
maps:find('Response-Topic', Props4)).
amqp_to_mqtt_footer(_Config) ->
Body = <<"hey">>,
Footer = #'v1_0.footer'{content = [{{symbol, <<"x-key">>}, {utf8, <<"value">>}}]},
%% We can translate, but lose the footer.
#mqtt_msg{payload = Payload} = amqp_to_mqtt([#'v1_0.data'{content = Body}, Footer]),
?assertEqual(<<"hey">>, iolist_to_binary(Payload)).
mqtt_amqpl(_Config) ->
Msg0 = mqtt_msg(),
Msg = Msg0#mqtt_msg{qos = 1,
props = #{'Content-Type' => <<"text/plain">>,
'User-Property' => [{<<"key-2">>, <<"val-2">>},
{<<"key-1">>, <<"val-1">>}],
'Correlation-Data' => <<"banana">>,
'Message-Expiry-Interval' => 1001,
'Response-Topic' => <<"tmp/blah/responses">>
}
},
Anns = #{?ANN_ROUTING_KEYS => [rabbit_mqtt_util:mqtt_to_amqp(Msg#mqtt_msg.topic)]},
Mc = mc:init(mc_mqtt, Msg, Anns),
MsgL = mc:convert(mc_amqpl, Mc),
#content{properties = #'P_basic'{headers = HL} = Props} =
mc:protocol_state(MsgL),
?assertMatch(#'P_basic'{delivery_mode = 2,
correlation_id = <<"banana">>,
expiration = <<"1001000">>,
content_type = <<"text/plain">>}, Props),
?assertMatch({_, longstr, <<"val-2">>}, amqpl_header(<<"key-2">>, HL)),
?assertMatch({_, longstr, <<"val-1">>}, amqpl_header(<<"key-1">>, HL)),
?assertMatch({_, longstr, <<"tmp.blah.responses">>},
amqpl_header(<<"x-reply-to-topic">>, HL)),
ok.
mqtt_amqpl_alt(_Config) ->
InvalidUtf8 = <<14,23,97,23,144,149,12,108,140,66,151,2>>,
Msg0 = mqtt_msg(),
Msg = Msg0#mqtt_msg{qos = 0,
props = #{'Content-Type' => <<"no-ascii🐇"/utf8>>,
% 'User-Property' => [{<<"key-2">>, <<"val-2">>},
% {<<"key-1">>, <<"val-1">>}],
'Correlation-Data' => InvalidUtf8
}
},
Anns = #{?ANN_ROUTING_KEYS => [rabbit_mqtt_util:mqtt_to_amqp(Msg#mqtt_msg.topic)]},
Mc = mc:init(mc_mqtt, Msg, Anns),
?assertEqual(#{}, mc:x_headers(Mc)),
MsgL = mc:convert(mc_amqpl, Mc),
#content{properties = #'P_basic'{headers = HL} = Props} =
mc:protocol_state(MsgL),
?assertMatch(#'P_basic'{delivery_mode = 1,
correlation_id = undefined,
content_type = undefined}, Props),
?assertMatch({_, longstr, InvalidUtf8},
amqpl_header(<<"x-correlation-id">>, HL)),
ok.
mqtt_amqp(_Config) ->
Key = mqtt_x,
Ex = <<"mqtt-topic-exchange">>,
Env = #{Key => <<"mqtt-topic-exchange">>},
Mqtt0 = mqtt_msg(),
Mqtt = Mqtt0#mqtt_msg{qos = 1,
props = #{'Content-Type' => <<"text/plain">>,
'User-Property' =>
[{<<"key-2">>, <<"val-2">>},
{<<"key-1">>, <<"val-1">>},
{<<"x-stream-filter">>, <<"apple">>}],
'Correlation-Data' => <<"banana">>,
'Message-Expiry-Interval' => 1001,
'Response-Topic' => <<"tmp/blah/responses">>
}
},
Anns = #{?ANN_EXCHANGE => Ex,
?ANN_ROUTING_KEYS => [rabbit_mqtt_util:mqtt_to_amqp(Mqtt#mqtt_msg.topic)]},
Mc = mc:init(mc_mqtt, Mqtt, Anns, Env),
%% no target env
Msg = mc:convert(mc_amqp, Mc),
[H,
#'v1_0.message_annotations'{content = MA},
P,
#'v1_0.application_properties'{content = AP},
D] = amqp10_framing:decode_bin(iolist_to_binary(mc:protocol_state(Msg))),
?assertMatch(#'v1_0.header'{durable = true}, H),
?assertEqual({utf8, <<"apple">>}, amqp_map_get(symbol(<<"x-stream-filter">>), MA)),
?assertMatch(#'v1_0.properties'{content_type = {symbol, <<"text/plain">>},
correlation_id = {binary, <<"banana">>}}, P),
?assertEqual({utf8, <<"val-1">>}, amqp_map_get(utf8(<<"key-1">>), AP)),
?assertEqual({utf8, <<"val-2">>}, amqp_map_get(utf8(<<"key-2">>), AP)),
?assertMatch(#'v1_0.data'{content = _}, D),
ok.
mqtt_amqp_alt(_Config) ->
Key = mqtt_x,
Ex = <<"mqtt-topic-exchange">>,
Env = #{Key => <<"mqtt-topic-exchange">>},
CorrId = <<"urn:uuid:550e8400-e29b-41d4-a716-446655440000">>,
Mqtt0 = mqtt_msg(),
Mqtt = Mqtt0#mqtt_msg{qos = 0,
props = #{'Content-Type' => <<"text/plain">>,
'User-Property' =>
[{<<"key-2">>, <<"val-2">>},
{<<"key-1">>, <<"val-1">>},
{<<"x-stream-filter">>, <<"apple">>}],
'Correlation-Data' => CorrId,
'Message-Expiry-Interval' => 1001,
'Response-Topic' => <<"tmp/blah/responses">>
}
},
Anns = #{?ANN_EXCHANGE => Ex,
?ANN_ROUTING_KEYS => [rabbit_mqtt_util:mqtt_to_amqp(Mqtt#mqtt_msg.topic)]},
Mc = mc:init(mc_mqtt, Mqtt, Anns, Env),
Msg = mc:convert(mc_amqp, Mc),
[H,
#'v1_0.message_annotations'{content = MA},
P,
#'v1_0.application_properties'{content = AP},
D] = amqp10_framing:decode_bin(iolist_to_binary(mc:protocol_state(Msg))),
?assertMatch(#'v1_0.header'{durable = false}, H),
?assertEqual({utf8, <<"apple">>}, amqp_map_get(symbol(<<"x-stream-filter">>), MA)),
?assertMatch(#'v1_0.properties'{content_type = {symbol, <<"text/plain">>},
correlation_id = {uuid, _}}, P),
?assertEqual({utf8, <<"val-1">>}, amqp_map_get(utf8(<<"key-1">>), AP)),
?assertEqual({utf8, <<"val-2">>}, amqp_map_get(utf8(<<"key-2">>), AP)),
?assertEqual(#'v1_0.data'{content = <<>>}, D),
ok.
amqp_mqtt(_Config) ->
Env = #{mqtt_x => <<"mqtt-topic-exchange">>},
H = #'v1_0.header'{priority = {ubyte, 3},
ttl = {uint, 20000},
durable = true},
MAC = [
{{symbol, <<"x-stream-filter">>}, {utf8, <<"apple">>}},
thead2('x-list', list, [utf8(<<"l">>)]),
thead2('x-map', map, [{utf8(<<"k">>), utf8(<<"v">>)}])
],
CorrIdOut = <<"urn:uuid:550e8400-e29b-41d4-a716-446655440000">>,
{ok, CorrUUId} = mc_util:urn_string_to_uuid(CorrIdOut),
M = #'v1_0.message_annotations'{content = MAC},
P = #'v1_0.properties'{content_type = {symbol, <<"text/plain">>},
correlation_id = {uuid, CorrUUId},
creation_time = {timestamp, 10000}
},
AC = [
thead(long, 5),
thead(ulong, 5),
thead(utf8, <<"a-string">>),
thead(binary, <<"data">>),
thead(symbol, <<"symbol">>),
thead(ubyte, 255),
thead(short, 2),
thead(ushort, 3),
thead(uint, 4),
thead(int, 4),
thead(double, 5.0),
thead(float, 6.0),
thead(timestamp, 7000),
thead(byte, -128),
{{utf8, <<"boolean1">>}, true},
{{utf8, <<"boolean2">>}, false},
{utf8(<<"null">>), null}
],
A = #'v1_0.application_properties'{content = AC},
D = #'v1_0.data'{content = <<"data">>},
Anns = #{?ANN_EXCHANGE => <<"exch">>,
?ANN_ROUTING_KEYS => [<<"apple">>]},
Payload = iolist_to_binary([amqp10_framing:encode_bin(Section) || Section <- [H, M, P, A, D]]),
AMsg = mc:init(mc_amqp, Payload, Anns),
Msg = mc:convert(mc_mqtt, AMsg, Env),
?assertMatch({uuid, CorrUUId}, mc:correlation_id(Msg)),
Mqtt = mc:protocol_state(Msg),
?assertMatch(
#mqtt_msg{
qos = 1,
props = #{'Content-Type' := <<"text/plain">>,
'User-Property' := [{<<"x-stream-filter">>,<<"apple">>},
{<<"long">>,<<"5">>},
{<<"ulong">>,<<"5">>},
{<<"utf8">>,<<"a-string">>},
{<<"symbol">>,<<"symbol">>},
{<<"ubyte">>,<<"255">>},
{<<"short">>,<<"2">>},
{<<"ushort">>,<<"3">>},
{<<"uint">>,<<"4">>},
{<<"int">>,<<"4">>},
{<<"double">>,
<<"5.00000000000000000000e+00">>},
{<<"float">>,
<<"6.00000000000000000000e+00">>},
{<<"timestamp">>,<<"7">>},
{<<"byte">>,<<"-128">>},
{<<"boolean1">>,<<"true">>},
{<<"boolean2">>,<<"false">>},
{<<"null">>,<<>>}],
'Correlation-Data' := CorrIdOut}},
Mqtt).
is_persistent(_Config) ->
Msg0 = #mqtt_msg{qos = 0,
topic = <<"my/topic">>,
payload = <<>>},
Mc0 = mc:init(mc_mqtt, Msg0, #{}),
?assertNot(mc:is_persistent(Mc0)),
Msg1 = #mqtt_msg{qos = 1,
topic = <<"my/topic">>,
payload = <<>>},
Mc1 = mc:init(mc_mqtt, Msg1, #{}),
?assert(mc:is_persistent(Mc1)).
mqtt_msg() ->
#mqtt_msg{qos = 0,
topic = <<"my/topic">>,
payload = <<>>}.
roundtrip(Mod, MqttMsg) ->
roundtrip(Mod, MqttMsg, #{}).
roundtrip(Mod, MqttMsg, SrcEnv) ->
Anns = #{?ANN_ROUTING_KEYS => [rabbit_mqtt_util:mqtt_to_amqp(MqttMsg#mqtt_msg.topic)]},
Mc0 = mc:init(mc_mqtt, MqttMsg, Anns, SrcEnv),
Mc1 = mc:convert(Mod, Mc0),
Mc = mc:convert(mc_mqtt, Mc1),
mc:protocol_state(Mc).
amqp_to_mqtt(Sections) ->
amqp_to_mqtt(Sections, #{}).
amqp_to_mqtt(Sections, Env) ->
Anns = #{?ANN_ROUTING_KEYS => [<<"apple">>]},
Payload = iolist_to_binary([amqp10_framing:encode_bin(S) || S <- Sections]),
Mc0 = mc:init(mc_amqp, Payload, Anns),
Mc = mc:convert(mc_mqtt, Mc0, Env),
mc:protocol_state(Mc).
amqp_value(Content) ->
#'v1_0.amqp_value'{content = Content}.
amqpl_header(K, H) ->
rabbit_basic:header(K, H).
amqp_map_get(_K, []) ->
undefined;
amqp_map_get(K, Tuples) ->
case lists:keyfind(K, 1, Tuples) of
false ->
undefined;
{_, V} ->
V
end.
symbol(X) ->
{symbol, X}.
utf8(X) ->
{utf8, X}.
thead(T, Value) ->
{utf8(atom_to_binary(T)), {T, Value}}.
thead2(T, Value) ->
{symbol(atom_to_binary(T)), {T, Value}}.
thead2(K, T, Value) ->
{symbol(atom_to_binary(K)), {T, Value}}.