Merge pull request #9034 from rabbitmq/mqtt-nack

Nack rejected messages to MQTT 5.0 client
This commit is contained in:
Michael Klishin 2023-08-09 19:53:19 +04:00 committed by GitHub
commit 17c9acf8ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 207 additions and 107 deletions

View File

@ -144,9 +144,3 @@ next_smallest(S, U) when is_map_key(S, U) ->
next_smallest(S, U) ->
%% TODO: this is potentially infinitely recursive if called incorrectly
next_smallest(S+1, U).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.

View File

@ -1,11 +1,6 @@
-module(rabbit_confirms_SUITE).
-compile(export_all).
-export([
]).
-include_lib("common_test/include/ct.hrl").
-compile([export_all, nowarn_export_all]).
-include_lib("eunit/include/eunit.hrl").
%%%===================================================================
@ -17,40 +12,13 @@ all() ->
{group, tests}
].
all_tests() ->
[
confirm,
reject,
remove_queue
].
groups() ->
[
{tests, [], all_tests()}
].
init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
ok.
init_per_group(_Group, Config) ->
Config.
end_per_group(_Group, _Config) ->
ok.
init_per_testcase(_TestCase, Config) ->
Config.
end_per_testcase(_TestCase, _Config) ->
ok.
%%%===================================================================
%%% Test cases
%%%===================================================================
{tests, [shuffle],
[confirm,
reject,
remove_queue
]}].
confirm(_Config) ->
XName = rabbit_misc:r(<<"/">>, exchange, <<"X">>),
@ -93,7 +61,6 @@ confirm(_Config) ->
?assertEqual(0, rabbit_confirms:size(U7)),
?assertEqual(undefined, rabbit_confirms:smallest(U7)),
U8 = rabbit_confirms:insert(2, [QName], XName, U1),
{[{1, XName}, {2, XName}], _U9} = rabbit_confirms:confirm([1, 2], QName, U8),
ok.
@ -126,7 +93,6 @@ reject(_Config) ->
{error, not_found} = rabbit_confirms:reject(2, U5),
?assertEqual(1, rabbit_confirms:size(U5)),
?assertEqual(1, rabbit_confirms:smallest(U5)),
ok.
remove_queue(_Config) ->
@ -147,8 +113,4 @@ remove_queue(_Config) ->
U5 = rabbit_confirms:insert(1, [QName], XName, U0),
U6 = rabbit_confirms:insert(2, [QName], XName, U5),
{[{1, XName}, {2, XName}], _U} = rabbit_confirms:remove_queue(QName, U6),
ok.
%% Utility

View File

@ -281,6 +281,14 @@ rabbitmq_suite(
],
)
rabbitmq_suite(
name = "rabbit_mqtt_confirms_SUITE",
size = "small",
deps = [
"//deps/rabbit_common:erlang_app",
],
)
assert_suites()
alias(

View File

@ -331,3 +331,11 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
)
erlang_bytecode(
name = "rabbit_mqtt_confirms_SUITE_beam_files",
testonly = True,
srcs = ["test/rabbit_mqtt_confirms_SUITE.erl"],
outs = ["test/rabbit_mqtt_confirms_SUITE.beam"],
app_name = "rabbitmq_mqtt",
erlc_opts = "//:test_erlc_opts",
)

View File

@ -49,22 +49,21 @@ insert(PktId, QNames, State)
-spec confirm([packet_id()], queue_name(), state()) ->
{[packet_id()], state()}.
confirm(PktIds, QName, State0) ->
{L0, State} = lists:foldl(fun(PktId, Acc) ->
lists:foldl(fun(PktId, Acc) ->
confirm_one(PktId, QName, Acc)
end, {[], State0}, PktIds),
L = lists:reverse(L0),
{L, State}.
end, {[], State0}, PktIds).
-spec reject(packet_id(), state()) ->
{ok, state()} | {error, not_found}.
reject(PktId, State0)
when is_integer(PktId) ->
case maps:take(PktId, State0) of
{_, State} ->
{ok, State};
-spec reject([packet_id()], state()) ->
{[packet_id()], state()}.
reject(PktIds, State0) ->
lists:foldl(fun(PktId, Acc = {Rejs, S0}) ->
case maps:take(PktId, S0) of
{_, S} ->
{[PktId | Rejs], S};
error ->
{error, not_found}
end.
Acc
end
end, {[], State0}, PktIds).
%% idempotent
-spec remove_queue(queue_name(), state()) ->
@ -77,7 +76,7 @@ remove_queue(QName, State) ->
(_, _, PktIds) ->
PktIds
end, [], State),
confirm(lists:sort(PktIds), QName, State).
confirm(PktIds, QName, State).
%% INTERNAL

View File

@ -1686,17 +1686,15 @@ process_routing_confirm(#delivery{confirm = true,
U = rabbit_mqtt_confirms:insert(PktId, QNames, U0),
State#state{unacked_client_pubs = U}.
-spec send_puback(list(packet_id()), state()) -> ok.
send_puback(PktIds0, State)
-spec send_puback(packet_id() | list(packet_id()), reason_code(), state()) -> ok.
send_puback(PktIds0, ReasonCode, State)
when is_list(PktIds0) ->
%% Classic queues confirm messages unordered.
%% Let's sort them here assuming most MQTT clients send with an increasing packet identifier.
PktIds = lists:usort(PktIds0),
lists:foreach(fun(Id) ->
send_puback(Id, ?RC_SUCCESS, State)
end, PktIds).
-spec send_puback(packet_id(), reason_code(), state()) -> ok.
send_puback(Id, ReasonCode, State)
end, PktIds);
send_puback(PktId, ReasonCode, State = #state{cfg = #cfg{proto_ver = ProtoVer}}) ->
rabbit_global_counters:messages_confirmed(ProtoVer, 1),
Packet = #mqtt_packet{fixed = #mqtt_packet_fixed{type = ?PUBACK},
@ -1971,7 +1969,7 @@ handle_down({{'DOWN', QName}, _MRef, process, QPid, Reason},
QStates = rabbit_queue_type:remove(QRef, QStates1),
State = State0#state{queue_states = QStates,
unacked_client_pubs = U},
send_puback(ConfirmPktIds, State),
send_puback(ConfirmPktIds, ?RC_SUCCESS, State),
{ok, State}
end.
@ -2001,7 +1999,7 @@ handle_queue_event({queue_event, QName, Evt},
QStates = rabbit_queue_type:remove(QName, QStates0),
State = State1#state{queue_states = QStates,
unacked_client_pubs = U},
send_puback(ConfirmPktIds, State),
send_puback(ConfirmPktIds, ?RC_SUCCESS, State),
{ok, State};
{protocol_error, _Type, _Reason, _ReasonArgs} = Error ->
{error, Error, State0}
@ -2013,19 +2011,21 @@ handle_queue_actions(Actions, #state{} = State0) ->
deliver_to_client(Msgs, Ack, S);
({settled, QName, PktIds}, S = #state{unacked_client_pubs = U0}) ->
{ConfirmPktIds, U} = rabbit_mqtt_confirms:confirm(PktIds, QName, U0),
send_puback(ConfirmPktIds, S),
S#state{unacked_client_pubs = U};
({rejected, _QName, PktIds}, S = #state{unacked_client_pubs = U0}) ->
%% Negative acks are supported in MQTT v5 only.
%% Therefore, in MQTT v3 and v4 we ignore rejected messages.
U = lists:foldl(
fun(PktId, Acc0) ->
case rabbit_mqtt_confirms:reject(PktId, Acc0) of
{ok, Acc} -> Acc;
{error, not_found} -> Acc0
end
end, U0, PktIds),
send_puback(ConfirmPktIds, ?RC_SUCCESS, S),
S#state{unacked_client_pubs = U};
({rejected, _QName, PktIds}, S0 = #state{unacked_client_pubs = U0,
cfg = #cfg{proto_ver = ProtoVer}}) ->
{RejectPktIds, U} = rabbit_mqtt_confirms:reject(PktIds, U0),
S = S0#state{unacked_client_pubs = U},
%% Negative acks are supported only in MQTT v5. In MQTT v3 and v4 we ignore
%% rejected messages since we can only (but must not) send a positive ack.
case ProtoVer of
?MQTT_PROTO_V5 ->
send_puback(RejectPktIds, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, S);
_ ->
ok
end,
S;
({block, QName}, S = #state{queues_soft_limit_exceeded = QSLE}) ->
S#state{queues_soft_limit_exceeded = sets:add_element(QName, QSLE)};
({unblock, QName}, S = #state{queues_soft_limit_exceeded = QSLE}) ->

View File

@ -0,0 +1,109 @@
-module(rabbit_mqtt_confirms_SUITE).
-compile([export_all, nowarn_export_all]).
-include_lib("eunit/include/eunit.hrl").
%%%===================================================================
%%% Common Test callbacks
%%%===================================================================
all() ->
[
{group, tests}
].
groups() ->
[
{tests, [shuffle],
[confirm,
reject,
remove_queue
]}].
-define(MOD, rabbit_mqtt_confirms).
confirm(_Config) ->
QName = rabbit_misc:r(<<"/">>, queue, <<"Q">>),
QName2 = rabbit_misc:r(<<"/">>, queue, <<"Q2">>),
U0 = ?MOD:init(),
?assertEqual(0, ?MOD:size(U0)),
U1 = ?MOD:insert(1, [QName], U0),
?assertEqual(1, ?MOD:size(U1)),
?assert(?MOD:contains(1, U1)),
{[1], U2} = ?MOD:confirm([1], QName, U1),
?assertEqual(0, ?MOD:size(U2)),
?assertNot(?MOD:contains(1, U2)),
U3 = ?MOD:insert(2, [QName], U1),
?assertEqual(2, ?MOD:size(U3)),
?assert(?MOD:contains(1, U3)),
?assert(?MOD:contains(2, U3)),
{[1], U4} = ?MOD:confirm([1], QName, U3),
?assertEqual(1, ?MOD:size(U4)),
?assertNot(?MOD:contains(1, U4)),
?assert(?MOD:contains(2, U4)),
U5 = ?MOD:insert(2, [QName, QName2], U1),
?assertEqual(2, ?MOD:size(U5)),
?assert(?MOD:contains(1, U5)),
?assert(?MOD:contains(2, U5)),
{[1], U6} = ?MOD:confirm([1, 2], QName, U5),
{[2], U7} = ?MOD:confirm([2], QName2, U6),
?assertEqual(0, ?MOD:size(U7)),
U8 = ?MOD:insert(2, [QName], U1),
{Confirmed, _U9} = ?MOD:confirm([1, 2], QName, U8),
?assertEqual([1, 2], lists:sort(Confirmed)),
ok.
reject(_Config) ->
QName = rabbit_misc:r(<<"/">>, queue, <<"Q">>),
QName2 = rabbit_misc:r(<<"/">>, queue, <<"Q2">>),
U0 = ?MOD:init(),
?assertEqual(0, ?MOD:size(U0)),
U1 = ?MOD:insert(1, [QName], U0),
?assert(?MOD:contains(1, U1)),
{[1], U2} = ?MOD:reject([1], U1),
{[], U2} = ?MOD:reject([1], U2),
?assertEqual(0, ?MOD:size(U2)),
?assertNot(?MOD:contains(1, U2)),
U3 = ?MOD:insert(2, [QName, QName2], U1),
{[1], U4} = ?MOD:reject([1], U3),
{[], U4} = ?MOD:reject([1], U4),
?assertEqual(1, ?MOD:size(U4)),
{[2], U5} = ?MOD:reject([2], U3),
{[], U5} = ?MOD:reject([2], U5),
?assertEqual(1, ?MOD:size(U5)),
?assert(?MOD:contains(1, U5)),
ok.
remove_queue(_Config) ->
QName = rabbit_misc:r(<<"/">>, queue, <<"Q">>),
QName2 = rabbit_misc:r(<<"/">>, queue, <<"Q2">>),
U0 = ?MOD:init(),
U1 = ?MOD:insert(1, [QName, QName2], U0),
U2 = ?MOD:insert(2, [QName2], U1),
{[2], U3} = ?MOD:remove_queue(QName2, U2),
?assertEqual(1, ?MOD:size(U3)),
?assert(?MOD:contains(1, U3)),
{[1], U4} = ?MOD:remove_queue(QName, U3),
?assertEqual(0, ?MOD:size(U4)),
?assertNot(?MOD:contains(1, U4)),
U5 = ?MOD:insert(1, [QName], U0),
U6 = ?MOD:insert(2, [QName], U5),
{Confirmed, U7} = ?MOD:remove_queue(QName, U6),
?assertEqual([1, 2], lists:sort(Confirmed)),
?assertEqual(0, ?MOD:size(U7)),
ok.

View File

@ -340,19 +340,34 @@ quorum_queue_rejects(Config) ->
bind(Ch, Name, Name),
C = connect(Name, Config, [{retry_interval, 1}]),
{ok, _} = emqtt:publish(C, Name, <<"m1">>, qos1),
{ok, _} = emqtt:publish(C, Name, <<"m2">>, qos1),
%% We expect m3 to be rejected and dropped.
?assertEqual(puback_timeout, util:publish_qos1_timeout(C, Name, <<"m3">>, 700)),
{ok, #{reason_code_name := success}} = emqtt:publish(C, Name, <<"m1">>, qos1),
{ok, #{reason_code_name := success}} = emqtt:publish(C, Name, <<"m2">>, qos1),
%% The queue will reject m3.
V = ?config(mqtt_version, Config),
if V =:= v3 orelse V =:= v4 ->
%% v3 and v4 do not support NACKs. Therefore, the server should drop the message.
?assertEqual(puback_timeout, util:publish_qos1_timeout(C, Name, <<"m3">>, 700));
V =:= v5 ->
%% v5 supports NACKs. Therefore, the server should send us a NACK.
?assertMatch({ok, #{reason_code_name := implementation_specific_error}},
emqtt:publish(C, Name, <<"m3">>, qos1))
end,
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}},
amqp_channel:call(Ch, #'basic.get'{queue = Name, no_ack = true})),
amqp_channel:call(Ch, #'basic.get'{queue = Name})),
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}},
amqp_channel:call(Ch, #'basic.get'{queue = Name, no_ack = true})),
%% m3 is re-sent by emqtt.
amqp_channel:call(Ch, #'basic.get'{queue = Name})),
if V =:= v3 orelse V =:= v4 ->
%% m3 is re-sent by emqtt since we didn't receive a PUBACK.
?awaitMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m3">>}},
amqp_channel:call(Ch, #'basic.get'{queue = Name, no_ack = true}),
2000, 200),
amqp_channel:call(Ch, #'basic.get'{queue = Name}),
2000, 200);
V =:= v5 ->
%% m3 should not be re-sent by emqtt since we received a PUBACK.
?assertMatch(#'basic.get_empty'{},
amqp_channel:call(Ch, #'basic.get'{queue = Name}))
end,
ok = emqtt:disconnect(C),
delete_queue(Ch, Name),
@ -667,17 +682,22 @@ queue_down_qos1(Config) ->
bind(Ch1, CQ, Topic),
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn1, Ch1),
ok = rabbit_ct_broker_helpers:stop_node(Config, 1),
C = connect(?FUNCTION_NAME, Config, [{retry_interval, 2}]),
%% classic queue is down, therefore message is rejected
V = ?config(mqtt_version, Config),
if V =:= v3 orelse V =:= v4 ->
?assertEqual(puback_timeout, util:publish_qos1_timeout(C, Topic, <<"msg">>, 500)),
ok = rabbit_ct_broker_helpers:start_node(Config, 1),
%% classic queue is up, therefore message should arrive
%% Classic queue is up. Therefore, message should arrive.
eventually(?_assertEqual([[<<"1">>]],
rabbitmqctl_list(Config, 1, ["list_queues", "messages", "--no-table-headers"])),
500, 20),
500, 20);
V =:= v5 ->
?assertMatch({ok, #{reason_code_name := implementation_specific_error}},
emqtt:publish(C, Topic, <<"msg">>, qos1)),
ok = rabbit_ct_broker_helpers:start_node(Config, 1)
end,
Ch0 = rabbit_ct_client_helpers:open_channel(Config, 0),
delete_queue(Ch0, CQ),
@ -1454,7 +1474,7 @@ trace(Config) ->
{#'basic.get_ok'{routing_key = <<"publish.amq.topic">>},
#amqp_msg{props = #'P_basic'{headers = PublishHeaders},
payload = Payload}} =
amqp_channel:call(Ch, #'basic.get'{queue = TraceQ, no_ack = false}),
amqp_channel:call(Ch, #'basic.get'{queue = TraceQ}),
?assertMatch(#{<<"exchange_name">> := <<"amq.topic">>,
<<"routing_keys">> := [Topic],
<<"connection">> := <<"127.0.0.1:", _/binary>>,
@ -1470,7 +1490,7 @@ trace(Config) ->
{#'basic.get_ok'{routing_key = <<"deliver.mqtt-subscription-trace_subscriberqos0">>},
#amqp_msg{props = #'P_basic'{headers = DeliverHeaders},
payload = Payload}} =
amqp_channel:call(Ch, #'basic.get'{queue = TraceQ, no_ack = false}),
amqp_channel:call(Ch, #'basic.get'{queue = TraceQ}),
?assertMatch(#{<<"exchange_name">> := <<"amq.topic">>,
<<"routing_keys">> := [Topic],
<<"connection">> := <<"127.0.0.1:", _/binary>>,
@ -1487,7 +1507,7 @@ trace(Config) ->
{ok, _} = emqtt:publish(Pub, Topic, Payload, qos1),
ok = expect_publishes(Sub, Topic, [Payload]),
?assertMatch(#'basic.get_empty'{},
amqp_channel:call(Ch, #'basic.get'{queue = TraceQ, no_ack = false})),
amqp_channel:call(Ch, #'basic.get'{queue = TraceQ})),
delete_queue(Ch, TraceQ),
[ok = emqtt:disconnect(C) || C <- [Pub, Sub]].