Store MQTT messages as non-durable if QoS 0

By default, when the 'durable' message container (mc) annotation is unset,
messages are interpreted to be durable.

Prior to this commit, MQTT messages that were sent with QoS 0 were
stored durably in classic queues.
This commit takes the same approach for mc_mqtt as for mc_amqpl and mc_amqp:
If the message is durable, the durable mc annotation will not be set.
If the message is non-durable, the durable mc annotation will be set to false.
This commit is contained in:
David Ansari 2024-04-16 10:16:43 +02:00
parent 9f80341a30
commit e96125bfd3
3 changed files with 29 additions and 5 deletions

View File

@ -27,9 +27,9 @@ init(Msg = #mqtt_msg{qos = Qos,
when is_integer(Qos) ->
Anns0 = case Qos > 0 of
true ->
#{?ANN_DURABLE => true};
#{};
false ->
#{}
#{?ANN_DURABLE => false}
end,
Anns1 = case Props of
#{'Message-Expiry-Interval' := Seconds} ->

View File

@ -37,7 +37,8 @@ groups() ->
mqtt_amqpl_alt,
mqtt_amqp,
mqtt_amqp_alt,
amqp_mqtt
amqp_mqtt,
is_persistent
]}
].
@ -501,6 +502,19 @@ amqp_mqtt(_Config) ->
}, Mqtt),
ok.
is_persistent(_Config) ->
Msg0 = #mqtt_msg{qos = 0,
topic = <<"my/topic">>,
payload = <<>>},
Mc0 = mc:init(mc_mqtt, Msg0, #{}),
?assertNot(mc:is_persistent(Mc0)),
Msg1 = #mqtt_msg{qos = 1,
topic = <<"my/topic">>,
payload = <<>>},
Mc1 = mc:init(mc_mqtt, Msg1, #{}),
?assert(mc:is_persistent(Mc1)).
mqtt_msg() ->
#mqtt_msg{qos = 0,
topic = <<"my/topic">>,

View File

@ -23,7 +23,8 @@
-import(rabbit_ct_broker_helpers,
[rpc/4]).
-import(rabbit_ct_helpers,
[eventually/3]).
[eventually/1,
eventually/3]).
all() ->
[{group, tests}].
@ -88,7 +89,8 @@ end_per_testcase(Testcase, Config) ->
mqtt_amqpl_mqtt(Config) ->
Q = ClientId = atom_to_binary(?FUNCTION_NAME),
Ch = rabbit_ct_client_helpers:open_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = Q}),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = Q,
durable = true}),
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = Q,
exchange = <<"amq.topic">>,
routing_key = <<"my.topic">>}),
@ -147,6 +149,14 @@ mqtt_amqpl_mqtt(Config) ->
after 1000 -> ct:fail("did not receive reply")
end,
%% Another message MQTT 5.0 to AMQP 0.9.1, this time with QoS 0
ok = emqtt:publish(C, <<"my/topic">>, RequestPayload, [{qos, 0}]),
eventually(
?_assertMatch(
{#'basic.get_ok'{}, #amqp_msg{payload = RequestPayload,
props = #'P_basic'{delivery_mode = 1}}},
amqp_channel:call(Ch, #'basic.get'{queue = Q}))),
ok = emqtt:disconnect(C).
mqtt_amqp_mqtt(Config) ->