Fix MQTT QoS

This commit fixes test
```
bazel test //deps/rabbitmq_mqtt:shared_SUITE-mixed -t- \
    --test_sharding_strategy=disabled --test_env \
    FOCUS="-group [mqtt,v3,cluster_size_3] -case pubsub"
```

Fix some mixed version tests

Assume the AMQP body, especially amqp-value section won't be parsed.
Hence, omit smart conversions from AMQP to MQTT involving the
Payload-Format-Indicator bit.

Fix test

Fix
```
bazel test //deps/amqp10_client:system_SUITE-mixed -t- --test_sharding_strategy=disabled --test_env FOCUS="-group [rabbitmq]
```
This commit is contained in:
David Ansari 2024-04-21 13:31:04 +02:00
parent 0b51b8d39b
commit 81709d9745
6 changed files with 110 additions and 130 deletions

View File

@ -313,12 +313,17 @@ roundtrip(OpenConf, Body) ->
await_link(Sender, credited, link_credit_timeout),
Now = os:system_time(millisecond),
Props = #{creation_time => Now},
Props = #{creation_time => Now,
message_id => <<"my message ID">>,
correlation_id => <<"my correlation ID">>,
content_type => <<"my content type">>,
content_encoding => <<"my content encoding">>,
group_id => <<"my group ID">>},
Msg0 = amqp10_msg:new(<<"my-tag">>, Body, true),
Msg1 = amqp10_msg:set_properties(Props, Msg0),
Msg2 = amqp10_msg:set_application_properties(#{"a_key" => "a_value"}, Msg1),
Msg3 = amqp10_msg:set_message_annotations(#{<<"x_key">> => "x_value"}, Msg2),
Msg = amqp10_msg:set_delivery_annotations(#{<<"y_key">> => "y_value"}, Msg3),
Msg1 = amqp10_msg:set_application_properties(#{"a_key" => "a_value"}, Msg0),
Msg2 = amqp10_msg:set_properties(Props, Msg1),
Msg = amqp10_msg:set_message_annotations(#{<<"x-key">> => "x-value",
<<"x_key">> => "x_value"}, Msg2),
ok = amqp10_client:send_msg(Sender, Msg),
ok = amqp10_client:detach_link(Sender),
await_link(Sender, {detached, normal}, link_detach_timeout),
@ -331,10 +336,10 @@ roundtrip(OpenConf, Body) ->
ok = amqp10_client:close_connection(Connection),
% ct:pal(?LOW_IMPORTANCE, "roundtrip message Out: ~tp~nIn: ~tp~n", [OutMsg, Msg]),
#{creation_time := Now} = amqp10_msg:properties(OutMsg),
#{<<"a_key">> := <<"a_value">>} = amqp10_msg:application_properties(OutMsg),
#{<<"x_key">> := <<"x_value">>} = amqp10_msg:message_annotations(OutMsg),
#{<<"y_key">> := <<"y_value">>} = amqp10_msg:delivery_annotations(OutMsg),
?assertMatch(Props, amqp10_msg:properties(OutMsg)),
?assertEqual(#{<<"a_key">> => <<"a_value">>}, amqp10_msg:application_properties(OutMsg)),
?assertMatch(#{<<"x-key">> := <<"x-value">>,
<<"x_key">> := <<"x_value">>}, amqp10_msg:message_annotations(OutMsg)),
?assertEqual([Body], amqp10_msg:body(OutMsg)),
ok.

View File

@ -445,7 +445,7 @@ prepare(store, #?MODULE{protocol = mc_amqp} = State) ->
true ->
State#?MODULE{data = mc_amqp:prepare(store, State#?MODULE.data)};
false ->
State1 = convert(mc_amqpl, State),
State1 = convert(mc_amqpl, State, #{message_containers_store_amqp_v1 => false}),
State1#?MODULE{data = mc_amqpl:prepare(store, State1#?MODULE.data)}
end;
prepare(For, #?MODULE{protocol = Proto,

View File

@ -55,7 +55,7 @@ init(#content{} = Content0) ->
Anns = essential_properties(Content),
{strip_header(Content, ?DELETED_HEADER), Anns}.
convert_from(mc_amqp, Sections, _Env) ->
convert_from(mc_amqp, Sections, Env) ->
{H, MAnn, Prop, AProp, BodyRev} =
lists:foldl(
fun
@ -163,7 +163,35 @@ convert_from(mc_amqp, Sections, _Env) ->
false
end, MA),
{Headers1, MsgId091} = message_id(MsgId, <<"x-message-id">>, Headers0),
{Headers, CorrId091} = message_id(CorrId, <<"x-correlation-id">>, Headers1),
{Headers2, CorrId091} = message_id(CorrId, <<"x-correlation-id">>, Headers1),
Headers = case Env of
#{message_containers_store_amqp_v1 := false} ->
Headers3 = case AProp of
undefined ->
Headers2;
#'v1_0.application_properties'{} ->
APropBin = iolist_to_binary(amqp10_framing:encode_bin(AProp)),
[{?AMQP10_APP_PROPERTIES_HEADER, longstr, APropBin} | Headers2]
end,
Headers4 = case Prop of
undefined ->
Headers3;
#'v1_0.properties'{} ->
PropBin = iolist_to_binary(amqp10_framing:encode_bin(Prop)),
[{?AMQP10_PROPERTIES_HEADER, longstr, PropBin} | Headers3]
end,
Headers5 = case MAnn of
undefined ->
Headers4;
#'v1_0.message_annotations'{} ->
MAnnBin = iolist_to_binary(amqp10_framing:encode_bin(MAnn)),
[{?AMQP10_MESSAGE_ANNOTATIONS_HEADER, longstr, MAnnBin} | Headers4]
end,
Headers5;
_ ->
Headers2
end,
UserId1 = unwrap(UserId0),
%% user-id is a binary type so we need to validate

View File

@ -46,8 +46,7 @@ init(Msg = #mqtt_msg{qos = Qos,
{Msg, Anns}.
convert_from(mc_amqp, Sections, Env) ->
{Header, MsgAnns, AmqpProps, AppProps, PayloadRev,
PayloadFormatIndicator, ContentType} =
{Header, MsgAnns, AmqpProps, AppProps, PayloadRev, ContentType} =
lists:foldl(
fun(#'v1_0.header'{} = S, Acc) ->
setelement(1, Acc, S);
@ -63,28 +62,18 @@ convert_from(mc_amqp, Sections, Env) ->
setelement(5, Acc, [C | element(5, Acc)]);
(#'v1_0.amqp_value'{content = {binary, Bin}}, Acc) ->
setelement(5, Acc, [Bin]);
(#'v1_0.amqp_value'{content = C} = Val, Acc) ->
case amqp_to_utf8_string(C) of
cannot_convert ->
amqp_encode(Val, Acc);
String ->
Acc1 = setelement(5, Acc, [String]),
setelement(6, Acc1, true)
end;
(#'v1_0.amqp_sequence'{} = Seq, Acc) ->
amqp_encode(Seq, Acc)
end, {undefined, [], undefined, [], [], false, undefined}, Sections),
(Val, Acc)
when is_record(Val, 'v1_0.amqp_value') orelse
is_record(Val, 'v1_0.amqp_sequence') ->
amqp_encode(Val, Acc)
end, {undefined, [], undefined, [], [], undefined}, Sections),
Qos = case Header of
#'v1_0.header'{durable = true} ->
?QOS_1;
_ ->
?QOS_0
end,
Props0 = case PayloadFormatIndicator of
true -> #{'Payload-Format-Indicator' => 1};
false -> #{}
end,
Props1 = case AmqpProps of
Props0 = case AmqpProps of
#'v1_0.properties'{reply_to = {utf8, Address}} ->
MqttX = maps:get(mqtt_x, Env, ?DEFAULT_MQTT_EXCHANGE),
case Address of
@ -93,29 +82,29 @@ convert_from(mc_amqp, Sections, Env) ->
"/key/",
RoutingKey/binary>> ->
MqttTopic = rabbit_mqtt_util:amqp_to_mqtt(RoutingKey),
Props0#{'Response-Topic' => MqttTopic};
#{'Response-Topic' => MqttTopic};
_ ->
Props0
#{}
end;
_ ->
#{}
end,
Props1 = case AmqpProps of
#'v1_0.properties'{correlation_id = {_Type, _Val} = Corr} ->
Props0#{'Correlation-Data' => correlation_id(Corr)};
_ ->
Props0
end,
Props2 = case AmqpProps of
#'v1_0.properties'{correlation_id = {_Type, _Val} = Corr} ->
Props1#{'Correlation-Data' => correlation_id(Corr)};
_ ->
Props1
end,
Props3 = case ContentType of
Props2 = case ContentType of
undefined ->
case AmqpProps of
#'v1_0.properties'{content_type = {symbol, ContentType1}} ->
Props2#{'Content-Type' => rabbit_data_coercion:to_binary(ContentType1)};
Props1#{'Content-Type' => rabbit_data_coercion:to_binary(ContentType1)};
_ ->
Props2
Props1
end;
_ ->
Props2#{'Content-Type' => ContentType}
Props1#{'Content-Type' => ContentType}
end,
UserProp0 = lists:filtermap(fun({{symbol, <<"x-", _/binary>> = Key}, Val}) ->
filter_map_amqp_to_utf8_string(Key, Val);
@ -127,8 +116,8 @@ convert_from(mc_amqp, Sections, Env) ->
filter_map_amqp_to_utf8_string(Key, Val)
end, AppProps),
Props = case UserProp0 ++ UserProp1 of
[] -> Props3;
UserProp -> Props3#{'User-Property' => UserProp}
[] -> Props2;
UserProp -> Props2#{'User-Property' => UserProp}
end,
Payload = lists:flatten(lists:reverse(PayloadRev)),
#mqtt_msg{retain = false,
@ -199,15 +188,7 @@ convert_to(?MODULE, Msg, _Env) ->
convert_to(mc_amqp, #mqtt_msg{qos = Qos,
props = Props,
payload = Payload}, Env) ->
Body = case Props of
#{'Payload-Format-Indicator' := 1}
when is_binary(Payload) ->
#'v1_0.amqp_value'{content = {utf8, Payload}};
_ ->
#'v1_0.data'{content = Payload}
end,
S0 = [Body],
S0 = [#'v1_0.data'{content = Payload}],
%% x- prefixed MQTT User Properties go into Message Annotations.
%% All other MQTT User Properties go into Application Properties.
%% MQTT User Property allows duplicate keys, while AMQP maps don't.
@ -415,7 +396,8 @@ routing_headers(#mqtt_msg{}, _Opts) ->
#{}.
protocol_state(Msg = #mqtt_msg{props = Props0,
topic = Topic}, Anns) ->
topic = Topic,
qos = Qos0}, Anns) ->
%% Remove any PUBLISH or Will Properties that are not forwarded unaltered.
Props1 = maps:remove('Message-Expiry-Interval', Props0),
{WillDelay, Props2} = case maps:take('Will-Delay-Interval', Props1) of
@ -456,8 +438,16 @@ protocol_state(Msg = #mqtt_msg{props = Props0,
%% We rely on the mc annotation to tell whether the message is durable because if
%% the message was originally sent with AMQP, the AMQP header isn't stored on disk.
Qos = case Anns of
#{?ANN_DURABLE := false} -> ?QOS_0;
_ -> ?QOS_1
#{?ANN_DURABLE := false} ->
?QOS_0;
#{?ANN_DURABLE := true} ->
?QOS_1;
_ ->
%% If the mc durable annotation isn't set, the message might be durable
%% or not depending on whether the message was sent before or after
%% https://github.com/rabbitmq/rabbitmq-server/pull/11012 (3.13.2)
%% Hence, we rely on the QoS from the mqtt_msg.
Qos0
end,
Msg#mqtt_msg{qos = Qos,
topic = rabbit_mqtt_util:amqp_to_mqtt(RoutingKey),
@ -559,7 +549,7 @@ amqp_to_utf8_string({T, _Val})
amqp_encode(Data, Acc0) ->
Bin = amqp10_framing:encode_bin(Data),
Acc = setelement(5, Acc0, [Bin | element(5, Acc0)]),
setelement(7, Acc, ?CONTENT_TYPE_AMQP).
setelement(6, Acc, ?CONTENT_TYPE_AMQP).
durable(?QOS_0) -> false;
durable(?QOS_1) -> true.

View File

@ -19,15 +19,11 @@ groups() ->
[
{tests, [shuffle],
[roundtrip_amqp,
roundtrip_amqp_payload_format_indicator,
roundtrip_amqp_response_topic,
roundtrip_amqpl,
roundtrip_amqpl_correlation,
amqp_to_mqtt_amqp_value_section_binary,
amqp_to_mqtt_amqp_value_section_list,
amqp_to_mqtt_amqp_value_section_null,
amqp_to_mqtt_amqp_value_section_int,
amqp_to_mqtt_amqp_value_section_boolean,
roundtrip_amqp_user_property,
roundtrip_amqpl_user_property,
roundtrip_amqp_content_type,
@ -108,18 +104,6 @@ roundtrip_amqp(_Config) ->
%% We expect order to be maintained.
?assertMatch(#{'User-Property' := ExpectedUserProperty}, Props).
%% The indicator that the Payload is UTF-8 encoded should not be lost when translating
%% from MQTT 5.0 to AMQP 1.0 or vice versa.
roundtrip_amqp_payload_format_indicator(_Config) ->
Msg0 = mqtt_msg(),
Msg = Msg0#mqtt_msg{payload = <<"🐇"/utf8>>,
props = #{'Payload-Format-Indicator' => 1}},
#mqtt_msg{payload = Payload,
props = Props} = roundtrip(mc_amqp, Msg),
?assertEqual(unicode:characters_to_binary("🐇"),
iolist_to_binary(Payload)),
?assertMatch(#{'Payload-Format-Indicator' := 1}, Props).
roundtrip_amqp_response_topic(_Config) ->
Topic = <<"/rabbit/🐇"/utf8>>,
Msg0 = mqtt_msg(),
@ -199,33 +183,6 @@ amqp_to_mqtt_amqp_value_section_list(_Config) ->
?assertEqual(#{'Content-Type' => <<"message/vnd.rabbitmq.amqp">>}, Props),
?assert(iolist_size(Payload) > 0).
amqp_to_mqtt_amqp_value_section_null(_Config) ->
Val = amqp_value(null),
#mqtt_msg{props = Props,
payload = Payload} = amqp_to_mqtt([Val]),
?assertEqual(#{'Payload-Format-Indicator' => 1}, Props),
?assertEqual(0, iolist_size(Payload)).
amqp_to_mqtt_amqp_value_section_int(_Config) ->
Val = amqp_value({int, -3}),
#mqtt_msg{props = Props,
payload = Payload} = amqp_to_mqtt([Val]),
?assertEqual(#{'Payload-Format-Indicator' => 1}, Props),
?assertEqual(<<"-3">>, iolist_to_binary(Payload)).
amqp_to_mqtt_amqp_value_section_boolean(_Config) ->
Val1 = amqp_value(true),
#mqtt_msg{props = Props1,
payload = Payload1} = amqp_to_mqtt([Val1]),
?assertEqual(#{'Payload-Format-Indicator' => 1}, Props1),
?assertEqual(<<"true">>, iolist_to_binary(Payload1)),
Val2 = amqp_value(false),
#mqtt_msg{props = Props2,
payload = Payload2} = amqp_to_mqtt([Val2]),
?assertEqual(#{'Payload-Format-Indicator' => 1}, Props2),
?assertEqual(<<"false">>, iolist_to_binary(Payload2)).
%% When converting from MQTT 5.0 to AMQP 1.0, we expect to lose some User Property.
roundtrip_amqp_user_property(_Config) ->
Msg0 = mqtt_msg(),
@ -287,10 +244,10 @@ amqp_to_mqtt_reply_to(_Config) ->
amqp_to_mqtt_footer(_Config) ->
Val = amqp_value({utf8, <<"hey">>}),
Body = <<"hey">>,
Footer = #'v1_0.footer'{content = [{{symbol, <<"key">>}, {utf8, <<"value">>}}]},
%% We can translate, but lose the footer.
#mqtt_msg{payload = Payload} = amqp_to_mqtt([Val, Footer]),
#mqtt_msg{payload = Payload} = amqp_to_mqtt([#'v1_0.data'{content = Body}, Footer]),
?assertEqual(<<"hey">>, iolist_to_binary(Payload)).
mqtt_amqpl(_Config) ->
@ -391,7 +348,6 @@ mqtt_amqp_alt(_Config) ->
Mqtt0 = mqtt_msg(),
Mqtt = Mqtt0#mqtt_msg{qos = 0,
props = #{'Content-Type' => <<"text/plain">>,
'Payload-Format-Indicator' => 1,
'User-Property' =>
[{<<"key-2">>, <<"val-2">>},
{<<"key-1">>, <<"val-1">>},
@ -418,7 +374,7 @@ mqtt_amqp_alt(_Config) ->
correlation_id = {uuid, _}}, P),
?assertEqual({utf8, <<"val-1">>}, amqp_map_get(utf8(<<"key-1">>), AP)),
?assertEqual({utf8, <<"val-2">>}, amqp_map_get(utf8(<<"key-2">>), AP)),
?assertMatch(#'v1_0.amqp_value'{content = {utf8, _}}, D),
?assertEqual(#'v1_0.data'{content = <<>>}, D),
ok.
amqp_mqtt(_Config) ->

View File

@ -196,8 +196,7 @@ mqtt_amqp_mqtt(Config) ->
#{'Content-Type' => ContentType,
'Correlation-Data' => Correlation,
'Response-Topic' => MqttResponseTopic,
'User-Property' => UserProperty,
'Payload-Format-Indicator' => 1},
'User-Property' => UserProperty},
RequestPayload, [{qos, 1}]),
{ok, Msg1} = amqp10_client:get_msg(Receiver),
@ -225,8 +224,7 @@ mqtt_amqp_mqtt(Config) ->
reply_to := ReplyToAddress} = amqp10_msg:properties(Msg1),
?assertEqual(<<"/exchange/amq.topic/key/response.topic">>, ReplyToAddress),
%% Thanks to the 'Payload-Format-Indicator', we get a single utf8 value.
?assertEqual(#'v1_0.amqp_value'{content = {utf8, RequestPayload}}, amqp10_msg:body(Msg1)),
?assertEqual(RequestPayload, amqp10_msg:body_bin(Msg1)),
ok = amqp10_client:settle_msg(Receiver, Msg1, accepted),
ok = amqp10_client:detach_link(Receiver),
@ -245,7 +243,7 @@ mqtt_amqp_mqtt(Config) ->
DTag = <<"my-dtag">>,
ReplyPayload = <<"my response">>,
Msg2a = amqp10_msg:new(DTag, #'v1_0.amqp_value'{content = {utf8, ReplyPayload}}),
Msg2a = amqp10_msg:new(DTag, #'v1_0.data'{content = ReplyPayload}),
Msg2b = amqp10_msg:set_properties(
#{correlation_id => Correlation,
content_type => ContentType},
@ -271,9 +269,8 @@ mqtt_amqp_mqtt(Config) ->
payload := ReplyPayload,
properties := #{'Content-Type' := ContentType,
'Correlation-Data' := Correlation,
'Subscription-Identifier' := 999,
%% since the AMQP 1.0 client sent UTF-8
'Payload-Format-Indicator' := 1}},
'Subscription-Identifier' := 999}
},
MqttMsg)
after 1000 -> ct:fail("did not receive reply")
end,
@ -314,21 +311,29 @@ amqp_mqtt_amqp(Config) ->
amqp10_msg:new(<<>>, RequestBody, true))),
ok = amqp10_client:send_msg(Sender, Msg1),
RespTopic = receive {publish, MqttMsg} ->
ct:pal("Received MQTT message:~n~p", [MqttMsg]),
#{client_pid := C,
qos := 1,
topic := <<"t/1">>,
payload := RequestBody,
properties := #{'Correlation-Data' := Correlation,
'Response-Topic' := ResponseTopic}} = MqttMsg,
ResponseTopic
after 2000 -> ct:fail("did not receive request")
end,
ResponseTopic = <<"t/2">>,
receive {publish, MqttMsg} ->
ct:pal("Received MQTT message:~n~p", [MqttMsg]),
#{client_pid := C,
qos := 1,
topic := <<"t/1">>,
payload := RequestBody,
properties := Props = #{'Correlation-Data' := Correlation}
} = MqttMsg,
case rabbit_ct_broker_helpers:is_feature_flag_enabled(
Config, message_containers_store_amqp_v1) of
true ->
?assertEqual({ok, ResponseTopic},
maps:find('Response-Topic', Props));
false ->
ok
end
after 2000 -> ct:fail("did not receive request")
end,
%% MQTT 5.0 to AMQP 1.0
RespBody = <<"my response">>,
{ok, _} = emqtt:publish(C, RespTopic,
{ok, _} = emqtt:publish(C, ResponseTopic,
#{'Correlation-Data' => Correlation},
RespBody, [{qos, 1}]),
@ -459,8 +464,7 @@ mqtt_stream(Config) ->
#{'Content-Type' => ContentType,
'Correlation-Data' => Correlation,
'Response-Topic' => <<"response/topic">>,
'User-Property' => UserProperty,
'Payload-Format-Indicator' => 1},
'User-Property' => UserProperty},
Payload, [{qos, 1}]),
ok = emqtt:disconnect(C),
@ -532,10 +536,7 @@ mqtt_stream(Config) ->
?assertEqual(#{<<"rabbit🐇"/utf8>> => <<"carrot🥕"/utf8>>,
<<"key">> => <<"val">>},
amqp10_msg:application_properties(Msg)),
%% We excpet the body to be a single AMQP 1.0 value section where the value is a string
%% because we set the MQTT 5.0 Payload-Format-Indicator.
?assertEqual({'v1_0.amqp_value', {utf8, Payload}},
amqp10_msg:body(Msg)).
?assertEqual(Payload, amqp10_msg:body_bin(Msg)).
%% -------------------------------------------------------------------
%% Helpers