Support QoS 1 for sending and receiving

This commit is contained in:
David Ansari 2022-09-22 19:31:37 +02:00
parent cdd253ee87
commit 23dac495ad
9 changed files with 814 additions and 751 deletions

View File

@ -7,6 +7,8 @@
-define(CLIENT_ID_MAXLEN, 23).
-include("rabbit_mqtt_types.hrl").
%% reader state
-record(state, {socket,
conn_name,
@ -26,32 +28,37 @@
received :: boolean()}).
%% processor state
-record(proc_state, {socket,
conn_name,
subscriptions,
consumer_tags,
unacked_pubs,
awaiting_ack,
awaiting_seqno,
message_id,
client_id,
clean_sess,
will_msg,
queue_states,
channels,
exchange :: rabbit_exchange:name(),
ssl_login_name,
%% Retained messages handler. See rabbit_mqtt_retainer_sup
%% and rabbit_mqtt_retainer.
retainer_pid,
auth_state,
send_fun,
peer_addr,
mqtt2amqp_fun,
amqp2mqtt_fun,
register_state,
proto_ver :: 3 | 4,
info}).
-record(proc_state,
{socket,
proto_ver :: 3 | 4,
queue_states = rabbit_queue_type:init() :: rabbit_queue_type:state(),
subscriptions = #{} :: #{Topic :: binary() => QoS :: 0..2},
%% Packet IDs published to queues but not yet confirmed.
unacked_client_pubs = rabbit_mqtt_confirms:init() :: rabbit_mqtt_confirms:state(),
%% Packet IDs published to MQTT subscribers but not yet acknowledged.
unacked_server_pubs = #{} :: #{packet_id() => QueueMsgId :: non_neg_integer()},
%% Packet ID of next PUBLISH packet (with QoS > 0) sent from server to client.
%% (Not to be confused with packet IDs sent from client to server which can be the
%% same IDs because client and server assign IDs independently of each other.)
packet_id = 1 :: packet_id(),
client_id,
clean_sess,
will_msg,
exchange :: rabbit_exchange:name(),
ssl_login_name,
%% Retained messages handler. See rabbit_mqtt_retainer_sup
%% and rabbit_mqtt_retainer.
retainer_pid,
auth_state,
peer_addr,
%%TODO remove funs from state
mqtt2amqp_fun,
amqp2mqtt_fun,
register_state,
conn_name,
info}).
-type proc_state() :: #proc_state{}.
-record(auth_state, {username,
user,
@ -76,8 +83,6 @@
peer_host,
peer_port,
protocol,
channels,
channel_max,
frame_max,
client_properties,
ssl,
@ -88,11 +93,9 @@
conn_name,
connection_state,
connection,
consumer_tags,
unacked_pubs,
awaiting_ack,
awaiting_seqno,
message_id,
unacked_client_pubs,
unacked_server_pubs,
packet_id,
client_id,
clean_sess,
will_msg,

View File

@ -48,7 +48,8 @@
-define(QOS_2, 2).
-define(SUBACK_FAILURE, 16#80).
%% TODO
-type qos() :: ?QOS_0 | ?QOS_1 | ?QOS_2.
%%TODO remove message_id()
-type message_id() :: any().
-record(mqtt_frame, {fixed,
@ -91,7 +92,7 @@
-record(mqtt_frame_other, {other}).
-record(mqtt_msg, {retain :: boolean(),
qos :: ?QOS_0 | ?QOS_1 | ?QOS_2,
qos :: qos(),
topic :: string(),
dup :: boolean(),
message_id :: message_id(),

View File

@ -0,0 +1,9 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
%%
%% Packet identifier is a non zero two byte integer.
-type packet_id() :: 1..16#ffff.

View File

@ -13,6 +13,7 @@
close_local_client_connections/1]).
start(normal, []) ->
rabbit_global_counters:init([{protocol, mqtt}]),
{ok, Listeners} = application:get_env(tcp_listeners),
{ok, SslListeners} = application:get_env(ssl_listeners),
ok = mqtt_node:start(),

View File

@ -0,0 +1,89 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(rabbit_mqtt_confirms).
-include("rabbit_mqtt_types.hrl").
-compile({no_auto_import, [size/1]}).
-export([init/0,
insert/3,
confirm/3,
reject/2,
remove_queue/2,
size/1]).
-type queue_name() :: rabbit_amqqueue:name().
-opaque state() :: #{packet_id() => #{queue_name() => ok}}.
-export_type([state/0]).
-spec init() -> state().
init() ->
maps:new().
-spec size(state()) -> non_neg_integer().
size(State) ->
maps:size(State).
-spec insert(packet_id(), [queue_name()], state()) ->
{ok, state()} | {error, duplicate_packet_id}.
insert(PktId, _, State)
when is_map_key(PktId, State) ->
{error, duplicate_packet_id};
insert(PktId, QNames, State)
when is_integer(PktId) andalso PktId > 0 ->
QMap = maps:from_keys(QNames, ok),
maps:put(PktId, QMap, State).
-spec confirm([packet_id()], queue_name(), state()) ->
{[packet_id()], state()}.
confirm(PktIds, QName, State0) ->
lists:foldr(fun(PktId, Acc) ->
confirm_one(PktId, QName, Acc)
end, {[], State0}, PktIds).
-spec reject(packet_id(), state()) ->
{ok, state()} | {error, not_found}.
reject(PktId, State0)
when is_integer(PktId) ->
case maps:take(PktId, State0) of
{_QMap, State} ->
{ok, State};
error ->
{error, not_found}
end.
%% idempotent
-spec remove_queue(queue_name(), state()) ->
{[packet_id()], state()}.
remove_queue(QName, State) ->
PktIds = maps:fold(
fun(PktId, QMap, PktIds)
when is_map_key(QName, QMap) ->
[PktId | PktIds];
(_, _, PktIds) ->
PktIds
end, [], State),
confirm(lists:sort(PktIds), QName, State).
%% INTERNAL
confirm_one(PktId, QName, {PktIds, State0})
when is_integer(PktId) ->
case maps:take(PktId, State0) of
{QMap0, State1}
when is_map_key(QName, QMap0)
andalso map_size(QMap0) =:= 1 ->
%% last queue confirm
{[PktId| PktIds], State1};
{QMap0, State1} ->
QMap = maps:remove(QName, QMap0),
State = maps:put(PktId, QMap, State1),
{PktIds, State};
error ->
{PktIds, State0}
end.

File diff suppressed because it is too large Load Diff

View File

@ -22,7 +22,6 @@
-export([info/2]).
-include_lib("amqp_client/include/amqp_client.hrl").
-include("rabbit_mqtt.hrl").
-define(SIMPLE_METRICS, [pid, recv_oct, send_oct, reductions]).
@ -130,25 +129,6 @@ handle_cast(QueueEvent = {queue_event, _, _},
handle_cast(Msg, State) ->
{stop, {mqtt_unexpected_cast, Msg}, State}.
handle_info({#'basic.deliver'{}, #amqp_msg{}} = Delivery,
State) ->
%% receiving a message from a quorum queue
%% no delivery context
handle_info(erlang:insert_element(3, Delivery, undefined), State);
handle_info({#'basic.deliver'{}, #amqp_msg{}, _DeliveryCtx} = Delivery,
State = #state{ proc_state = ProcState }) ->
callback_reply(State, rabbit_mqtt_processor:amqp_callback(Delivery,
ProcState));
handle_info(#'basic.ack'{} = Ack, State = #state{ proc_state = ProcState }) ->
callback_reply(State, rabbit_mqtt_processor:amqp_callback(Ack, ProcState));
handle_info(#'basic.consume_ok'{}, State) ->
{noreply, State, hibernate};
handle_info(#'basic.cancel'{}, State) ->
{stop, {shutdown, subscription_cancelled}, State};
handle_info({'EXIT', _Conn, Reason}, State) ->
{stop, {connection_died, Reason}, State};
@ -434,8 +414,7 @@ send_will_and_terminate(PState, State) ->
send_will_and_terminate(PState, Reason, State = #state{conn_name = ConnStr}) ->
rabbit_mqtt_processor:send_will(PState),
rabbit_log_connection:debug("MQTT: about to send will message (if any) on connection ~tp", [ConnStr]),
% todo: flush channel after publish
rabbit_log_connection:debug("MQTT: about to send will message (if any) on connection ~p", [ConnStr]),
{stop, Reason, State}.
network_error(closed,

View File

@ -9,7 +9,7 @@
-include("rabbit_mqtt.hrl").
-export([subcription_queue_name/1,
-export([queue_names/1,
gen_client_id/0,
env/1,
table_lookup/2,
@ -21,7 +21,7 @@
-define(MAX_TOPIC_TRANSLATION_CACHE_SIZE, 12).
subcription_queue_name(ClientId) ->
queue_names(ClientId) ->
Base = "mqtt-subscription-" ++ ClientId ++ "qos",
{list_to_binary(Base ++ "0"), list_to_binary(Base ++ "1")}.

View File

@ -10,8 +10,10 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(CONNECT_TIMEOUT, 10000).
%% defined in MQTT v4 and v5 (not in v3)
-define(SUBACK_FAILURE, 16#80).
-define(FAIL_IF_CRASH_LOG, {["Generic server.*terminating"],
fun () -> ct:fail(crash_detected) end}).
-import(rabbit_ct_broker_helpers, [rpc/5]).
@ -528,90 +530,99 @@ no_queue_consume_permission(Config) ->
test_subscribe_permissions_combination(<<".*">>, <<".*">>, <<"^amq\\.topic">>, Config, ExpectedLogs).
no_queue_delete_permission(Config) ->
{skip, "TODO support clean_start=false"}.
% set_permissions(".*", ".*", ".*", Config),
% C1 = open_mqtt_connection(Config, [{clientid, <<"no_queue_delete_permission">>}, {clean_start, false}]),
% {ok, _, _} = emqtt:subscribe(C1, {<<"test/topic">>, qos1}),
% ok = emqtt:disconnect(C1),
% set_permissions(<<>>, ".*", ".*", Config),
set_permissions(".*", ".*", ".*", Config),
ClientId = <<"no_queue_delete_permission">>,
{ok, C1} = connect_user(
?config(mqtt_user, Config),
?config(mqtt_password, Config),
Config,
ClientId,
[{clean_start, false}]),
{ok, _} = emqtt:connect(C1),
{ok, _, _} = emqtt:subscribe(C1, {<<"test/topic">>, qos1}),
ok = emqtt:disconnect(C1),
% %% And now we have a durable queue that user doesn't have permission to delete.
% %% Attempt to establish clean session should fail.
% {ok, C2} = connect_user(
% ?config(mqtt_user, Config),
% ?config(mqtt_password, Config),
% Config,
% ?config(mqtt_user, Config),
% [{clientid, <<"no_queue_delete_permission">>},
% {clean_start, true}]),
% unlink(C2),
% ?assertMatch({error, _},
% emqtt:connect(C2)),
set_permissions(<<>>, ".*", ".*", Config),
%% Now we have a durable queue that user doesn't have permission to delete.
%% Attempt to establish clean session should fail.
{ok, C2} = connect_user(
?config(mqtt_user, Config),
?config(mqtt_password, Config),
Config,
ClientId,
[{clean_start, true}]),
unlink(C2),
?assertMatch({error, _},
emqtt:connect(C2)),
wait_log(
Config,
[?FAIL_IF_CRASH_LOG
,{[io_lib:format("MQTT resource access refused: configure access to queue "
"'mqtt-subscription-~sqos1' in vhost 'mqtt-vhost' refused for user 'mqtt-user'",
[ClientId]),
"MQTT connection .* is closing due to an authorization failure"],
fun() -> stop end}
]),
ok.
% wait_log(Config,
% [?FAIL_IF_CRASH_LOG
% ,{["operation queue.delete caused a channel exception access_refused",
% "MQTT cannot start a clean session: `configure` permission missing for queue"],
% fun () -> stop end}
% ]),
% ok.
no_queue_consume_permission_on_connect(Config) ->
set_permissions(".*", ".*", ".*", Config),
ClientId = <<"no_queue_consume_permission_on_connect">>,
{ok, C1} = connect_user(
?config(mqtt_user, Config),
?config(mqtt_password, Config),
Config,
ClientId,
[{clean_start, false}]),
{ok, _} = emqtt:connect(C1),
{ok, _, _} = emqtt:subscribe(C1, {<<"test/topic">>, qos1}),
ok = emqtt:disconnect(C1),
no_queue_consume_permission_on_connect(_Config) ->
{skip, "TODO support clean_start=false"}.
% set_permissions(".*", ".*", ".*", Config),
% C1 = open_mqtt_connection(Config, [{clientid, <<"no_queue_consume_permission_on_connect">>}, {clean_start, false}]),
% {ok, _, _} = emqtt:subscribe(C1, {<<"test/topic">>, qos1}),
% ok = emqtt:disconnect(C1),
set_permissions(".*", ".*", "^amq\\.topic", Config),
{ok, C2} = connect_user(
?config(mqtt_user, Config),
?config(mqtt_password, Config),
Config,
ClientId,
[{clean_start, false}]),
unlink(C2),
?assertMatch({error, _},
emqtt:connect(C2)),
wait_log(
Config,
[?FAIL_IF_CRASH_LOG
,{[io_lib:format("MQTT resource access refused: read access to queue "
"'mqtt-subscription-~sqos1' in vhost 'mqtt-vhost' refused for user 'mqtt-user'",
[ClientId]),
"MQTT connection .* is closing due to an authorization failure"],
fun () -> stop end}
]),
ok.
% set_permissions(".*", ".*", "^amq\\.topic", Config),
% {ok, C2} = connect_user(
% ?config(mqtt_user, Config),
% ?config(mqtt_password, Config),
% Config,
% ?config(mqtt_user, Config),
% [{clientid, <<"no_queue_consume_permission_on_connect">>},
% {clean_start, false}]),
% unlink(C2),
% ?assertMatch({error, _},
% emqtt:connect(C2)),
no_queue_declare_permission(Config) ->
set_permissions("", ".*", ".*", Config),
ClientId = <<"no_queue_declare_permission">>,
{ok, C} = connect_user(
?config(mqtt_user, Config),
?config(mqtt_password, Config),
Config,
ClientId,
[{clean_start, true}]),
{ok, _} = emqtt:connect(C),
% wait_log(Config,
% [{["Generic server.*terminating"], fun () -> exit(there_should_be_no_crashes) end}
% ,{["operation basic.consume caused a channel exception access_refused",
% "MQTT cannot recover a session, user is missing permissions"],
% fun () -> stop end}
% ]),
% ok.
no_queue_declare_permission(_Config) ->
{skip, "TODO support clean_start=false"}.
% rabbit_ct_broker_helpers:set_permissions(Config, ?config(mqtt_user, Config), ?config(mqtt_vhost, Config), <<"">>, <<".*">>, <<".*">>),
% P = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt),
% {ok, C} = emqtt:start_link([{host, "localhost"},
% {port, P},
% {clientid, <<"no_queue_declare_permission">>},
% {proto_ver, v4},
% {username, ?config(mqtt_user, Config)},
% {password, ?config(mqtt_password, Config)},
% {clean_start, false}
% ]),
% {ok, _} = emqtt:connect(C),
% process_flag(trap_exit, true),
% try emqtt:subscribe(C, <<"test/topic">>) of
% _ -> exit(this_should_not_succeed)
% catch
% exit:{{shutdown, tcp_closed} , _} -> ok
% end,
% process_flag(trap_exit, false),
% wait_log(Config,
% [{["Generic server.*terminating"], fun () -> exit(there_should_be_no_crashes) end}
% ,{["MQTT protocol error on connection.*access_refused",
% "operation queue.declare caused a channel exception access_refused"],
% fun () -> stop end}
% ]),
% ok.
process_flag(trap_exit, true),
{ok, _, [?SUBACK_FAILURE]} = emqtt:subscribe(C, <<"test/topic">>, qos0),
ok = assert_connection_closed(C),
wait_log(
Config,
[?FAIL_IF_CRASH_LOG
,{[io_lib:format("MQTT resource access refused: configure access to queue "
"'mqtt-subscription-~sqos0' in vhost 'mqtt-vhost' refused for user 'mqtt-user'",
[ClientId]),
"MQTT protocol error on connection .*: subscribe_error"],
fun () -> stop end}
]),
ok.
no_publish_permission(Config) ->
set_permissions(".*", "", ".*", Config),
@ -631,8 +642,8 @@ no_publish_permission(Config) ->
no_topic_read_permission(Config) ->
set_permissions(".*", ".*", ".*", Config),
set_topic_permissions("^allow-write\\..*", "^allow-read\\..*", Config),
C = open_mqtt_connection(Config),
%% Check topic permission setup is working.
{ok, _, [0]} = emqtt:subscribe(C, <<"allow-read/some/topic">>),
@ -650,25 +661,25 @@ no_topic_read_permission(Config) ->
]),
ok.
no_topic_write_permission(_Config) ->
{skip, "TODO implement QoS1"}.
% set_permissions(".*", ".*", ".*", Config),
% set_topic_permissions("^allow-write\\..*", "^allow-read\\..*", Config),
% C = open_mqtt_connection(Config),
% %% Check topic permission setup is working.
% {ok, _} = emqtt:publish(C, <<"allow-write/some/topic">>, <<"payload">>, qos1),
no_topic_write_permission(Config) ->
set_permissions(".*", ".*", ".*", Config),
set_topic_permissions("^allow-write\\..*", "^allow-read\\..*", Config),
C = open_mqtt_connection(Config),
% process_flag(trap_exit, true),
% ?assertMatch({error, _},
% emqtt:publish(C, <<"some/other/topic">>, <<"payload">>, qos1)),
% wait_log(Config,
% [?FAIL_IF_CRASH_LOG
% ,{["MQTT topic access refused: write access to topic 'some.other.topic' in "
% "exchange 'amq.topic' in vhost 'mqtt-vhost' refused for user 'mqtt-user'",
% "MQTT connection .* is closing due to an authorization failure"],
% fun () -> stop end}
% ]),
% ok.
%% Check topic permission setup is working.
{ok, _} = emqtt:publish(C, <<"allow-write/some/topic">>, <<"payload">>, qos1),
process_flag(trap_exit, true),
?assertMatch({error, _},
emqtt:publish(C, <<"some/other/topic">>, <<"payload">>, qos1)),
wait_log(Config,
[?FAIL_IF_CRASH_LOG
,{["MQTT topic access refused: write access to topic 'some.other.topic' in "
"exchange 'amq.topic' in vhost 'mqtt-vhost' refused for user 'mqtt-user'",
"MQTT connection .* is closing due to an authorization failure"],
fun () -> stop end}
]),
ok.
loopback_user_connects_from_remote_host(Config) ->
set_permissions(".*", ".*", ".*", Config),