Nack rejected messages to MQTT 5.0 client

since MQTT 5.0 supports negative acknowledgements thanks to reason codes
in the PUBACK packet.

We could either choose reason code 128 or 131. The description code for
131 applies for rejected messages, hence this commit uses 131:
> The PUBLISH is valid but the receiver is not willing to accept it.
This commit is contained in:
David Ansari 2023-08-09 13:13:19 +02:00
parent d7c700a988
commit 2a4301e12d
8 changed files with 207 additions and 107 deletions

View File

@ -144,9 +144,3 @@ next_smallest(S, U) when is_map_key(S, U) ->
next_smallest(S, U) ->
%% TODO: this is potentially infinitely recursive if called incorrectly
next_smallest(S+1, U).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.

View File

@ -1,11 +1,6 @@
-module(rabbit_confirms_SUITE).
-compile(export_all).
-export([
]).
-include_lib("common_test/include/ct.hrl").
-compile([export_all, nowarn_export_all]).
-include_lib("eunit/include/eunit.hrl").
%%%===================================================================
@ -17,40 +12,13 @@ all() ->
{group, tests}
].
all_tests() ->
[
confirm,
reject,
remove_queue
].
groups() ->
[
{tests, [], all_tests()}
].
init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
ok.
init_per_group(_Group, Config) ->
Config.
end_per_group(_Group, _Config) ->
ok.
init_per_testcase(_TestCase, Config) ->
Config.
end_per_testcase(_TestCase, _Config) ->
ok.
%%%===================================================================
%%% Test cases
%%%===================================================================
{tests, [shuffle],
[confirm,
reject,
remove_queue
]}].
confirm(_Config) ->
XName = rabbit_misc:r(<<"/">>, exchange, <<"X">>),
@ -93,7 +61,6 @@ confirm(_Config) ->
?assertEqual(0, rabbit_confirms:size(U7)),
?assertEqual(undefined, rabbit_confirms:smallest(U7)),
U8 = rabbit_confirms:insert(2, [QName], XName, U1),
{[{1, XName}, {2, XName}], _U9} = rabbit_confirms:confirm([1, 2], QName, U8),
ok.
@ -126,7 +93,6 @@ reject(_Config) ->
{error, not_found} = rabbit_confirms:reject(2, U5),
?assertEqual(1, rabbit_confirms:size(U5)),
?assertEqual(1, rabbit_confirms:smallest(U5)),
ok.
remove_queue(_Config) ->
@ -147,8 +113,4 @@ remove_queue(_Config) ->
U5 = rabbit_confirms:insert(1, [QName], XName, U0),
U6 = rabbit_confirms:insert(2, [QName], XName, U5),
{[{1, XName}, {2, XName}], _U} = rabbit_confirms:remove_queue(QName, U6),
ok.
%% Utility

View File

@ -281,6 +281,14 @@ rabbitmq_suite(
],
)
rabbitmq_suite(
name = "rabbit_mqtt_confirms_SUITE",
size = "small",
deps = [
"//deps/rabbit_common:erlang_app",
],
)
assert_suites()
alias(

View File

@ -331,3 +331,11 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
)
erlang_bytecode(
name = "rabbit_mqtt_confirms_SUITE_beam_files",
testonly = True,
srcs = ["test/rabbit_mqtt_confirms_SUITE.erl"],
outs = ["test/rabbit_mqtt_confirms_SUITE.beam"],
app_name = "rabbitmq_mqtt",
erlc_opts = "//:test_erlc_opts",
)

View File

@ -49,22 +49,21 @@ insert(PktId, QNames, State)
-spec confirm([packet_id()], queue_name(), state()) ->
{[packet_id()], state()}.
confirm(PktIds, QName, State0) ->
{L0, State} = lists:foldl(fun(PktId, Acc) ->
confirm_one(PktId, QName, Acc)
end, {[], State0}, PktIds),
L = lists:reverse(L0),
{L, State}.
lists:foldl(fun(PktId, Acc) ->
confirm_one(PktId, QName, Acc)
end, {[], State0}, PktIds).
-spec reject(packet_id(), state()) ->
{ok, state()} | {error, not_found}.
reject(PktId, State0)
when is_integer(PktId) ->
case maps:take(PktId, State0) of
{_, State} ->
{ok, State};
error ->
{error, not_found}
end.
-spec reject([packet_id()], state()) ->
{[packet_id()], state()}.
reject(PktIds, State0) ->
lists:foldl(fun(PktId, Acc = {Rejs, S0}) ->
case maps:take(PktId, S0) of
{_, S} ->
{[PktId | Rejs], S};
error ->
Acc
end
end, {[], State0}, PktIds).
%% idempotent
-spec remove_queue(queue_name(), state()) ->
@ -77,7 +76,7 @@ remove_queue(QName, State) ->
(_, _, PktIds) ->
PktIds
end, [], State),
confirm(lists:sort(PktIds), QName, State).
confirm(PktIds, QName, State).
%% INTERNAL

View File

@ -1686,17 +1686,15 @@ process_routing_confirm(#delivery{confirm = true,
U = rabbit_mqtt_confirms:insert(PktId, QNames, U0),
State#state{unacked_client_pubs = U}.
-spec send_puback(list(packet_id()), state()) -> ok.
send_puback(PktIds0, State)
-spec send_puback(packet_id() | list(packet_id()), reason_code(), state()) -> ok.
send_puback(PktIds0, ReasonCode, State)
when is_list(PktIds0) ->
%% Classic queues confirm messages unordered.
%% Let's sort them here assuming most MQTT clients send with an increasing packet identifier.
PktIds = lists:usort(PktIds0),
lists:foreach(fun(Id) ->
send_puback(Id, ?RC_SUCCESS, State)
end, PktIds).
-spec send_puback(packet_id(), reason_code(), state()) -> ok.
send_puback(Id, ReasonCode, State)
end, PktIds);
send_puback(PktId, ReasonCode, State = #state{cfg = #cfg{proto_ver = ProtoVer}}) ->
rabbit_global_counters:messages_confirmed(ProtoVer, 1),
Packet = #mqtt_packet{fixed = #mqtt_packet_fixed{type = ?PUBACK},
@ -1971,7 +1969,7 @@ handle_down({{'DOWN', QName}, _MRef, process, QPid, Reason},
QStates = rabbit_queue_type:remove(QRef, QStates1),
State = State0#state{queue_states = QStates,
unacked_client_pubs = U},
send_puback(ConfirmPktIds, State),
send_puback(ConfirmPktIds, ?RC_SUCCESS, State),
{ok, State}
end.
@ -2001,7 +1999,7 @@ handle_queue_event({queue_event, QName, Evt},
QStates = rabbit_queue_type:remove(QName, QStates0),
State = State1#state{queue_states = QStates,
unacked_client_pubs = U},
send_puback(ConfirmPktIds, State),
send_puback(ConfirmPktIds, ?RC_SUCCESS, State),
{ok, State};
{protocol_error, _Type, _Reason, _ReasonArgs} = Error ->
{error, Error, State0}
@ -2013,19 +2011,21 @@ handle_queue_actions(Actions, #state{} = State0) ->
deliver_to_client(Msgs, Ack, S);
({settled, QName, PktIds}, S = #state{unacked_client_pubs = U0}) ->
{ConfirmPktIds, U} = rabbit_mqtt_confirms:confirm(PktIds, QName, U0),
send_puback(ConfirmPktIds, S),
S#state{unacked_client_pubs = U};
({rejected, _QName, PktIds}, S = #state{unacked_client_pubs = U0}) ->
%% Negative acks are supported in MQTT v5 only.
%% Therefore, in MQTT v3 and v4 we ignore rejected messages.
U = lists:foldl(
fun(PktId, Acc0) ->
case rabbit_mqtt_confirms:reject(PktId, Acc0) of
{ok, Acc} -> Acc;
{error, not_found} -> Acc0
end
end, U0, PktIds),
send_puback(ConfirmPktIds, ?RC_SUCCESS, S),
S#state{unacked_client_pubs = U};
({rejected, _QName, PktIds}, S0 = #state{unacked_client_pubs = U0,
cfg = #cfg{proto_ver = ProtoVer}}) ->
{RejectPktIds, U} = rabbit_mqtt_confirms:reject(PktIds, U0),
S = S0#state{unacked_client_pubs = U},
%% Negative acks are supported only in MQTT v5. In MQTT v3 and v4 we ignore
%% rejected messages since we can only (but must not) send a positive ack.
case ProtoVer of
?MQTT_PROTO_V5 ->
send_puback(RejectPktIds, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, S);
_ ->
ok
end,
S;
({block, QName}, S = #state{queues_soft_limit_exceeded = QSLE}) ->
S#state{queues_soft_limit_exceeded = sets:add_element(QName, QSLE)};
({unblock, QName}, S = #state{queues_soft_limit_exceeded = QSLE}) ->

View File

@ -0,0 +1,109 @@
-module(rabbit_mqtt_confirms_SUITE).
-compile([export_all, nowarn_export_all]).
-include_lib("eunit/include/eunit.hrl").
%%%===================================================================
%%% Common Test callbacks
%%%===================================================================
all() ->
[
{group, tests}
].
groups() ->
[
{tests, [shuffle],
[confirm,
reject,
remove_queue
]}].
-define(MOD, rabbit_mqtt_confirms).
confirm(_Config) ->
QName = rabbit_misc:r(<<"/">>, queue, <<"Q">>),
QName2 = rabbit_misc:r(<<"/">>, queue, <<"Q2">>),
U0 = ?MOD:init(),
?assertEqual(0, ?MOD:size(U0)),
U1 = ?MOD:insert(1, [QName], U0),
?assertEqual(1, ?MOD:size(U1)),
?assert(?MOD:contains(1, U1)),
{[1], U2} = ?MOD:confirm([1], QName, U1),
?assertEqual(0, ?MOD:size(U2)),
?assertNot(?MOD:contains(1, U2)),
U3 = ?MOD:insert(2, [QName], U1),
?assertEqual(2, ?MOD:size(U3)),
?assert(?MOD:contains(1, U3)),
?assert(?MOD:contains(2, U3)),
{[1], U4} = ?MOD:confirm([1], QName, U3),
?assertEqual(1, ?MOD:size(U4)),
?assertNot(?MOD:contains(1, U4)),
?assert(?MOD:contains(2, U4)),
U5 = ?MOD:insert(2, [QName, QName2], U1),
?assertEqual(2, ?MOD:size(U5)),
?assert(?MOD:contains(1, U5)),
?assert(?MOD:contains(2, U5)),
{[1], U6} = ?MOD:confirm([1, 2], QName, U5),
{[2], U7} = ?MOD:confirm([2], QName2, U6),
?assertEqual(0, ?MOD:size(U7)),
U8 = ?MOD:insert(2, [QName], U1),
{Confirmed, _U9} = ?MOD:confirm([1, 2], QName, U8),
?assertEqual([1, 2], lists:sort(Confirmed)),
ok.
reject(_Config) ->
QName = rabbit_misc:r(<<"/">>, queue, <<"Q">>),
QName2 = rabbit_misc:r(<<"/">>, queue, <<"Q2">>),
U0 = ?MOD:init(),
?assertEqual(0, ?MOD:size(U0)),
U1 = ?MOD:insert(1, [QName], U0),
?assert(?MOD:contains(1, U1)),
{[1], U2} = ?MOD:reject([1], U1),
{[], U2} = ?MOD:reject([1], U2),
?assertEqual(0, ?MOD:size(U2)),
?assertNot(?MOD:contains(1, U2)),
U3 = ?MOD:insert(2, [QName, QName2], U1),
{[1], U4} = ?MOD:reject([1], U3),
{[], U4} = ?MOD:reject([1], U4),
?assertEqual(1, ?MOD:size(U4)),
{[2], U5} = ?MOD:reject([2], U3),
{[], U5} = ?MOD:reject([2], U5),
?assertEqual(1, ?MOD:size(U5)),
?assert(?MOD:contains(1, U5)),
ok.
remove_queue(_Config) ->
QName = rabbit_misc:r(<<"/">>, queue, <<"Q">>),
QName2 = rabbit_misc:r(<<"/">>, queue, <<"Q2">>),
U0 = ?MOD:init(),
U1 = ?MOD:insert(1, [QName, QName2], U0),
U2 = ?MOD:insert(2, [QName2], U1),
{[2], U3} = ?MOD:remove_queue(QName2, U2),
?assertEqual(1, ?MOD:size(U3)),
?assert(?MOD:contains(1, U3)),
{[1], U4} = ?MOD:remove_queue(QName, U3),
?assertEqual(0, ?MOD:size(U4)),
?assertNot(?MOD:contains(1, U4)),
U5 = ?MOD:insert(1, [QName], U0),
U6 = ?MOD:insert(2, [QName], U5),
{Confirmed, U7} = ?MOD:remove_queue(QName, U6),
?assertEqual([1, 2], lists:sort(Confirmed)),
?assertEqual(0, ?MOD:size(U7)),
ok.

View File

@ -340,19 +340,34 @@ quorum_queue_rejects(Config) ->
bind(Ch, Name, Name),
C = connect(Name, Config, [{retry_interval, 1}]),
{ok, _} = emqtt:publish(C, Name, <<"m1">>, qos1),
{ok, _} = emqtt:publish(C, Name, <<"m2">>, qos1),
%% We expect m3 to be rejected and dropped.
?assertEqual(puback_timeout, util:publish_qos1_timeout(C, Name, <<"m3">>, 700)),
{ok, #{reason_code_name := success}} = emqtt:publish(C, Name, <<"m1">>, qos1),
{ok, #{reason_code_name := success}} = emqtt:publish(C, Name, <<"m2">>, qos1),
%% The queue will reject m3.
V = ?config(mqtt_version, Config),
if V =:= v3 orelse V =:= v4 ->
%% v3 and v4 do not support NACKs. Therefore, the server should drop the message.
?assertEqual(puback_timeout, util:publish_qos1_timeout(C, Name, <<"m3">>, 700));
V =:= v5 ->
%% v5 supports NACKs. Therefore, the server should send us a NACK.
?assertMatch({ok, #{reason_code_name := implementation_specific_error}},
emqtt:publish(C, Name, <<"m3">>, qos1))
end,
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}},
amqp_channel:call(Ch, #'basic.get'{queue = Name, no_ack = true})),
amqp_channel:call(Ch, #'basic.get'{queue = Name})),
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}},
amqp_channel:call(Ch, #'basic.get'{queue = Name, no_ack = true})),
%% m3 is re-sent by emqtt.
?awaitMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m3">>}},
amqp_channel:call(Ch, #'basic.get'{queue = Name, no_ack = true}),
2000, 200),
amqp_channel:call(Ch, #'basic.get'{queue = Name})),
if V =:= v3 orelse V =:= v4 ->
%% m3 is re-sent by emqtt since we didn't receive a PUBACK.
?awaitMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m3">>}},
amqp_channel:call(Ch, #'basic.get'{queue = Name}),
2000, 200);
V =:= v5 ->
%% m3 should not be re-sent by emqtt since we received a PUBACK.
?assertMatch(#'basic.get_empty'{},
amqp_channel:call(Ch, #'basic.get'{queue = Name}))
end,
ok = emqtt:disconnect(C),
delete_queue(Ch, Name),
@ -667,17 +682,22 @@ queue_down_qos1(Config) ->
bind(Ch1, CQ, Topic),
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn1, Ch1),
ok = rabbit_ct_broker_helpers:stop_node(Config, 1),
C = connect(?FUNCTION_NAME, Config, [{retry_interval, 2}]),
%% classic queue is down, therefore message is rejected
?assertEqual(puback_timeout, util:publish_qos1_timeout(C, Topic, <<"msg">>, 500)),
ok = rabbit_ct_broker_helpers:start_node(Config, 1),
%% classic queue is up, therefore message should arrive
eventually(?_assertEqual([[<<"1">>]],
rabbitmqctl_list(Config, 1, ["list_queues", "messages", "--no-table-headers"])),
500, 20),
V = ?config(mqtt_version, Config),
if V =:= v3 orelse V =:= v4 ->
?assertEqual(puback_timeout, util:publish_qos1_timeout(C, Topic, <<"msg">>, 500)),
ok = rabbit_ct_broker_helpers:start_node(Config, 1),
%% Classic queue is up. Therefore, message should arrive.
eventually(?_assertEqual([[<<"1">>]],
rabbitmqctl_list(Config, 1, ["list_queues", "messages", "--no-table-headers"])),
500, 20);
V =:= v5 ->
?assertMatch({ok, #{reason_code_name := implementation_specific_error}},
emqtt:publish(C, Topic, <<"msg">>, qos1)),
ok = rabbit_ct_broker_helpers:start_node(Config, 1)
end,
Ch0 = rabbit_ct_client_helpers:open_channel(Config, 0),
delete_queue(Ch0, CQ),
@ -1454,7 +1474,7 @@ trace(Config) ->
{#'basic.get_ok'{routing_key = <<"publish.amq.topic">>},
#amqp_msg{props = #'P_basic'{headers = PublishHeaders},
payload = Payload}} =
amqp_channel:call(Ch, #'basic.get'{queue = TraceQ, no_ack = false}),
amqp_channel:call(Ch, #'basic.get'{queue = TraceQ}),
?assertMatch(#{<<"exchange_name">> := <<"amq.topic">>,
<<"routing_keys">> := [Topic],
<<"connection">> := <<"127.0.0.1:", _/binary>>,
@ -1470,7 +1490,7 @@ trace(Config) ->
{#'basic.get_ok'{routing_key = <<"deliver.mqtt-subscription-trace_subscriberqos0">>},
#amqp_msg{props = #'P_basic'{headers = DeliverHeaders},
payload = Payload}} =
amqp_channel:call(Ch, #'basic.get'{queue = TraceQ, no_ack = false}),
amqp_channel:call(Ch, #'basic.get'{queue = TraceQ}),
?assertMatch(#{<<"exchange_name">> := <<"amq.topic">>,
<<"routing_keys">> := [Topic],
<<"connection">> := <<"127.0.0.1:", _/binary>>,
@ -1487,7 +1507,7 @@ trace(Config) ->
{ok, _} = emqtt:publish(Pub, Topic, Payload, qos1),
ok = expect_publishes(Sub, Topic, [Payload]),
?assertMatch(#'basic.get_empty'{},
amqp_channel:call(Ch, #'basic.get'{queue = TraceQ, no_ack = false})),
amqp_channel:call(Ch, #'basic.get'{queue = TraceQ})),
delete_queue(Ch, TraceQ),
[ok = emqtt:disconnect(C) || C <- [Pub, Sub]].