Fix dialyzer issues and add function specs

Fix all dialyzer warnings in rabbitmq_mqtt and rabbitmq_web_mqtt.

Add more function specs.
This commit is contained in:
David Ansari 2022-12-27 11:00:33 +01:00
parent 1720aa0e75
commit a8b69b43c1
24 changed files with 209 additions and 181 deletions

View File

@ -29,6 +29,9 @@
-export([remote_conserve_resources/3]). %% Internal use only
-export_type([resource_alarm_source/0,
resource_alert/0]).
-define(SERVER, ?MODULE).
-define(FILE_DESCRIPTOR_RESOURCE, <<"file descriptors">>).
@ -46,6 +49,9 @@
-type resource_alarm_source() :: 'disk' | 'memory'.
-type resource_alarm() :: {resource_limit, resource_alarm_source(), node()}.
-type alarm() :: local_alarm() | resource_alarm().
-type resource_alert() :: {WasAlarmSetForNode :: boolean(),
IsThereAnyAlarmsWithSameSourceInTheCluster :: boolean(),
NodeForWhichAlarmWasSetOrCleared :: node()}.
%%----------------------------------------------------------------------------

View File

@ -133,7 +133,7 @@
-type lstate() :: #lstate{pid :: pid(),
prefetch_limited :: boolean()}.
-type qstate() :: #qstate{pid :: pid(),
-type qstate() :: #qstate{pid :: pid() | none,
state :: 'dormant' | 'active' | 'suspended'}.
-type credit_mode() :: 'manual' | 'drain' | 'auto'.

View File

@ -75,6 +75,9 @@ report_ram_duration(Pid, QueueDuration) ->
stop() ->
gen_server2:cast(?SERVER, stop).
-spec conserve_resources(pid(),
rabbit_alarm:resource_alarm_source(),
rabbit_alarm:resource_alert()) -> ok.
%% Paging should be enabled/disabled only in response to disk resource alarms
%% for the current node.
conserve_resources(Pid, disk, {_, Conserve, Node}) when node(Pid) =:= Node ->

View File

@ -314,6 +314,9 @@ broadcast(SPids, Msg) ->
SPid ! Msg
end || SPid <- SPids].
-spec conserve_resources(pid(),
rabbit_alarm:resource_alarm_source(),
rabbit_alarm:resource_alert()) -> ok.
conserve_resources(Pid, Source, {_, Conserve, _}) ->
Pid ! {conserve_resources, Source, Conserve},
ok.

View File

@ -120,7 +120,7 @@ count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
unacknowledged_message_count() ->
lists:sum([?QUEUE:len(C#cr.acktags) || C <- all_ch_record()]).
-spec add(rabbit_amqqueue:name(), ch(), rabbit_types:ctag(), boolean(), pid(), boolean(),
-spec add(rabbit_amqqueue:name(), ch(), rabbit_types:ctag(), boolean(), pid() | none, boolean(),
non_neg_integer(), rabbit_framing:amqp_table(), boolean(),
rabbit_types:username(), state())
-> state().

View File

@ -97,7 +97,7 @@
-type consume_spec() :: #{no_ack := boolean(),
channel_pid := pid(),
limiter_pid => pid(),
limiter_pid => pid() | none,
limiter_active => boolean(),
prefetch_count => non_neg_integer(),
consumer_tag := rabbit_types:ctag(),
@ -108,9 +108,6 @@
% copied from rabbit_amqqueue
-type absent_reason() :: 'nodedown' | 'crashed' | stopped | timeout.
-type settle_op() :: 'complete' | 'requeue' | 'discard'.
-export_type([state/0,
@ -128,7 +125,7 @@
-callback declare(amqqueue:amqqueue(), node()) ->
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
{'absent', amqqueue:amqqueue(), absent_reason()} |
{'absent', amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()} |
{'protocol_error', Type :: atom(), Reason :: string(), Args :: term()} |
{'error', Err :: term() }.
@ -262,7 +259,7 @@ is_compatible(Type, Durable, Exclusive, AutoDelete) ->
-spec declare(amqqueue:amqqueue(), node()) ->
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
{'absent', amqqueue:amqqueue(), absent_reason()} |
{'absent', amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()} |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()} |
{'error', Err :: term() }.
declare(Q0, Node) ->
@ -324,6 +321,7 @@ state_info(#ctx{state = S,
state_info(_) ->
#{}.
-spec format_status(state()) -> map().
format_status(#?STATE{ctxs = Ctxs}) ->
#{num_queue_clients => maps:size(Ctxs)}.

View File

@ -143,12 +143,6 @@
%%--------------------------------------------------------------------------
-type resource_alert() :: {WasAlarmSetForNode :: boolean(),
IsThereAnyAlarmsWithSameSourceInTheCluster :: boolean(),
NodeForWhichAlarmWasSetOrCleared :: node()}.
%%--------------------------------------------------------------------------
-spec start_link(pid(), any()) -> rabbit_types:ok(pid()).
start_link(HelperSup, Ref) ->
@ -210,7 +204,9 @@ info(Pid, Items) ->
force_event_refresh(Pid, Ref) ->
gen_server:cast(Pid, {force_event_refresh, Ref}).
-spec conserve_resources(pid(), atom(), resource_alert()) -> 'ok'.
-spec conserve_resources(pid(),
rabbit_alarm:resource_alarm_source(),
rabbit_alarm:resource_alert()) -> 'ok'.
conserve_resources(Pid, Source, {_, Conserve, _}) ->
Pid ! {conserve_resources, Source, Conserve},

View File

@ -81,6 +81,9 @@ system_terminate(Reason, _Parent, _Deb, _State) ->
system_code_change(Misc, _Module, _OldVsn, _Extra) ->
{ok, Misc}.
-spec conserve_resources(pid(),
rabbit_alarm:resource_alarm_source(),
rabbit_alarm:resource_alert()) -> ok.
conserve_resources(Pid, Source, {_, Conserve, _}) ->
Pid ! {conserve_resources, Source, Conserve},
ok.

View File

@ -5,6 +5,8 @@
%% Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
%%
-type option(T) :: undefined | T.
-define(PROTOCOL_NAMES,
[{3, "MQIsdp"},
{4, "MQTT"}]).
@ -55,8 +57,6 @@
%% Packet identifier is a non zero two byte integer.
-type packet_id() :: 1..16#ffff.
-type option(T) :: undefined | T.
-record(mqtt_packet_fixed, {type = 0,
dup = 0,
qos = 0,
@ -106,3 +106,9 @@
payload :: binary()}).
-type mqtt_msg() :: #mqtt_msg{}.
%% does not include vhost because vhost is used in the (D)ETS table name
-record(retained_message, {topic :: string(),
mqtt_msg :: mqtt_msg()}).
-type retained_message() :: #retained_message{}.

View File

@ -1,9 +0,0 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
%% does not include vhost because vhost is used in the (D)ETS table name
-record(retained_message, {topic,
mqtt_msg}).

View File

@ -1,6 +1,7 @@
-module(rabbit_mqtt_keepalive).
-export([start/2,
-export([init/0,
start/2,
handle/2,
start_timer/1,
cancel_timer/1,
@ -8,17 +9,19 @@
-export_type([state/0]).
-type option(T) :: undefined | T.
-record(state, {
%% Keep Alive value as sent in the CONNECT packet.
interval_secs :: pos_integer(),
timer :: option(reference()),
timer :: reference(),
socket :: inet:socket(),
recv_oct :: non_neg_integer(),
received :: boolean()}).
-opaque(state() :: undefined | #state{}).
-opaque(state() :: disabled | #state{}).
-spec init() -> state().
init() ->
disabled.
-spec start(IntervalSeconds :: non_neg_integer(), inet:socket()) -> ok.
start(0, _Sock) ->
@ -33,11 +36,11 @@ start(Seconds, Sock)
handle({init, IntervalSecs, Sock}, _State) ->
case rabbit_net:getstat(Sock, [recv_oct]) of
{ok, [{recv_oct, RecvOct}]} ->
State = #state{socket = Sock,
interval_secs = IntervalSecs,
recv_oct = RecvOct,
received = true},
{ok, start_timer(State)};
{ok, #state{interval_secs = IntervalSecs,
timer = start_timer0(IntervalSecs),
socket = Sock,
recv_oct = RecvOct,
received = true}};
{error, _} = Err ->
Err
end;
@ -62,19 +65,22 @@ handle(check, State = #state{socket = Sock,
-spec start_timer(state()) -> state().
start_timer(#state{interval_secs = Seconds} = State) ->
Ref = erlang:send_after(timer_ms(Seconds), self(), {keepalive, check}),
State#state{timer = Ref};
start_timer(undefined) ->
undefined.
State#state{timer = start_timer0(Seconds)};
start_timer(disabled) ->
disabled.
-spec start_timer0(pos_integer()) -> reference().
start_timer0(KeepAliveSeconds) ->
erlang:send_after(timer_ms(KeepAliveSeconds), self(), {keepalive, check}).
-spec cancel_timer(state()) -> state().
cancel_timer(undefined) ->
undefined;
cancel_timer(#state{timer = Ref} = State)
when is_reference(Ref) ->
ok = erlang:cancel_timer(Ref, [{async, true},
{info, false}]),
State.
State;
cancel_timer(disabled) ->
disabled.
%% "If the Keep Alive value is non-zero and the Server does not receive a Control
%% Packet from the Client within one and a half times the Keep Alive time period,
@ -104,5 +110,5 @@ timer_ms(KeepaliveSeconds) ->
non_neg_integer().
interval_secs(#state{interval_secs = Seconds}) ->
Seconds;
interval_secs(undefined) ->
interval_secs(disabled) ->
0.

View File

@ -30,19 +30,20 @@
-include("rabbit_mqtt_packet.hrl").
-define(MAX_PERMISSION_CACHE_SIZE, 12).
-define(CONSUMER_TAG, mqtt).
-define(CONSUMER_TAG, <<"mqtt">>).
-record(auth_state, {username,
user,
vhost,
authz_ctx}).
-record(auth_state, {username :: binary(),
user :: #user{},
vhost :: rabbit_types:vhost(),
authz_ctx :: #{binary() := binary()}
}).
-record(info, {prefetch,
host,
port,
peer_host,
peer_port,
connected_at}).
-record(info, {prefetch :: non_neg_integer(),
host :: inet:ip_address(),
port :: inet:port_number(),
peer_host :: inet:ip_address(),
peer_port :: inet:port_number(),
connected_at :: pos_integer()}).
-record(state,
{socket,
@ -58,21 +59,19 @@
packet_id = 1 :: packet_id(),
client_id :: option(binary()),
clean_sess :: option(boolean()),
will_msg,
will_msg :: option(mqtt_msg()),
exchange :: option(rabbit_exchange:name()),
%% Set if client has at least one subscription with QoS 1.
queue_qos1 :: option(rabbit_amqqueue:name()),
has_published = false :: boolean(),
ssl_login_name,
%% Retained messages handler. See rabbit_mqtt_retainer_sup
%% and rabbit_mqtt_retainer.
retainer_pid,
auth_state,
peer_addr,
ssl_login_name :: none | binary(),
retainer_pid :: option(pid()),
auth_state :: option(#auth_state{}),
peer_addr :: inet:ip_address(),
send_fun :: fun((Packet :: tuple(), state()) -> term()),
register_state,
conn_name,
info,
conn_name :: option(binary()),
info :: option(#info{}),
delivery_flow :: flow | noflow,
%% quorum queues and streams whose soft limit has been exceeded
soft_limit_exceeded = sets:new([{version, 2}]) :: sets:set(),
@ -81,6 +80,8 @@
-opaque state() :: #state{}.
-spec initial_state(Socket :: any(), ConnectionName :: binary()) ->
state().
initial_state(Socket, ConnectionName) ->
{ok, {PeerAddr, _PeerPort}} = rabbit_net:peername(Socket),
initial_state(Socket,
@ -88,6 +89,11 @@ initial_state(Socket, ConnectionName) ->
fun serialise_and_send_to_client/2,
PeerAddr).
-spec initial_state(Socket :: any(),
ConnectionName :: binary(),
SendFun :: fun((mqtt_packet(), state()) -> any()),
PeerAddr :: binary()) ->
state().
initial_state(Socket, ConnectionName, SendFun, PeerAddr) ->
Flow = case rabbit_misc:get_env(rabbit, mirroring_flow_control, true) of
true -> flow;
@ -105,7 +111,7 @@ initial_state(Socket, ConnectionName, SendFun, PeerAddr) ->
{stop, disconnect, state()} |
{error, Reason :: term(), state()}.
process_packet(#mqtt_packet{fixed = #mqtt_packet_fixed{type = Type}},
State = #state{auth_state = undefined})
State = #state{auth_state = undefined})
when Type =/= ?CONNECT ->
{error, connect_expected, State};
process_packet(Packet = #mqtt_packet{fixed = #mqtt_packet_fixed{type = Type}}, State) ->
@ -149,7 +155,7 @@ process_request(?PUBACK,
process_request(?PUBLISH,
Packet = #mqtt_packet{
fixed = Fixed = #mqtt_packet_fixed{qos = ?QOS_2}},
fixed = Fixed = #mqtt_packet_fixed{qos = ?QOS_2}},
State) ->
% Downgrade QOS_2 to QOS_1
process_request(?PUBLISH,
@ -280,7 +286,7 @@ process_request(?SUBSCRIBE,
process_request(?UNSUBSCRIBE,
#mqtt_packet{variable = #mqtt_packet_subscribe{packet_id = PacketId,
topic_table = Topics},
payload = undefined},
payload = undefined},
State0 = #state{send_fun = SendFun}) ->
rabbit_log_connection:debug("Received an UNSUBSCRIBE for topic(s) ~p", [Topics]),
HasSubsBefore = has_subs(State0),
@ -353,12 +359,6 @@ process_connect(#mqtt_packet{
SendFun(ResponsePacket, State),
return_connack(ReturnCode, State).
client_id(<<>>) ->
rabbit_mqtt_util:gen_client_id();
client_id(ClientId)
when is_binary(ClientId) ->
ClientId.
check_protocol_version(#mqtt_packet_connect{proto_ver = ProtoVersion}) ->
case lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)) of
true ->
@ -397,19 +397,28 @@ check_credentials(Packet = #mqtt_packet_connect{username = Username,
login({UserBin, PassBin,
Packet = #mqtt_packet_connect{client_id = ClientId0,
clean_sess = CleanSess}},
clean_sess = CleanSess}},
State0) ->
ClientId = client_id(ClientId0),
ClientId = ensure_client_id(ClientId0),
case process_login(UserBin, PassBin, ClientId, State0) of
already_connected ->
{ok, already_connected};
{ok, State} ->
{ok, Packet, State#state{clean_sess = CleanSess,
client_id = ClientId}};
client_id = ClientId}};
{error, _Reason, _State} = Err ->
Err
end.
-spec ensure_client_id(binary()) -> binary().
ensure_client_id(<<>>) ->
rabbit_data_coercion:to_binary(
rabbit_misc:base64url(
rabbit_guid:gen_secure()));
ensure_client_id(ClientId)
when is_binary(ClientId) ->
ClientId.
register_client(already_connected, _State) ->
ok;
register_client(Packet = #mqtt_packet_connect{proto_ver = ProtoVersion},
@ -647,7 +656,7 @@ maybe_send_retained_message(RPid, #mqtt_topic{name = Topic0, qos = SubscribeQos}
packet_id = PacketId,
topic_name = Topic1
},
payload = Msg#mqtt_msg.payload},
payload = Msg#mqtt_msg.payload},
State),
State
end.
@ -786,14 +795,9 @@ check_user_login(#{vhost := VHost,
notify_auth_result(AuthResult, Username, #state{conn_name = ConnName}) ->
rabbit_event:notify(
AuthResult,
[
{name, case Username of
none -> '';
_ -> Username
end},
[{name, Username},
{connection_name, ConnName},
{connection_type, network}
]).
{connection_type, network}]).
check_user_connection_limit(#{user := #user{username = Username}}) ->
case rabbit_auth_backend_internal:is_over_connection_limit(Username) of
@ -1442,6 +1446,8 @@ handle_ra_event(Evt, State) ->
rabbit_log:debug("unhandled ra_event: ~w ", [Evt]),
State.
-spec handle_down(term(), state()) ->
{ok, state()} | {error, Reason :: any()}.
handle_down({{'DOWN', QName}, _MRef, process, QPid, Reason},
State0 = #state{queue_states = QStates0,
unacked_client_pubs = U0}) ->
@ -1464,6 +1470,9 @@ handle_down({{'DOWN', QName}, _MRef, process, QPid, Reason},
{ok, State}
end.
-spec handle_queue_event(
{queue_event, rabbit_amqqueue:name() | ?QUEUE_TYPE_QOS_0, term()}, state()) ->
{ok, state()} | {error, Reason :: any(), state()}.
handle_queue_event({queue_event, ?QUEUE_TYPE_QOS_0, Msg},
State0 = #state{qos0_messages_dropped = N}) ->
State = case drop_qos0_message(State0) of
@ -1795,6 +1804,7 @@ ssl_login_name(Sock) ->
nossl -> none
end.
-spec format_status(state()) -> map().
format_status(#state{queue_states = QState,
proto_ver = ProtoVersion,
unacked_client_pubs = UnackClientPubs,
@ -1832,6 +1842,7 @@ format_status(#state{queue_states = QState,
soft_limit_exceeded => soft_limit_exceeded(State),
qos0_messages_dropped => Qos0MsgsDropped}.
-spec soft_limit_exceeded(state()) -> boolean().
soft_limit_exceeded(#state{soft_limit_exceeded = SLE}) ->
not sets:is_empty(SLE).
@ -1840,6 +1851,7 @@ proto_integer_to_atom(3) ->
proto_integer_to_atom(4) ->
?MQTT_PROTO_V4.
-spec proto_version_tuple(state()) -> tuple().
proto_version_tuple(#state{proto_ver = ?MQTT_PROTO_V3}) ->
{3, 1, 0};
proto_version_tuple(#state{proto_ver = ?MQTT_PROTO_V4}) ->

View File

@ -50,7 +50,7 @@ is_stateful() ->
-spec declare(amqqueue:amqqueue(), node()) ->
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
{'absent', amqqueue:amqqueue(), rabbit_queue_type:absent_reason()}.
{'absent', amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()}.
declare(Q0, _Node) ->
%% The queue gets persisted such that routing to this
%% queue (via the topic exchange) works as usual.

View File

@ -26,17 +26,20 @@
-record(state,
{socket,
proxy_socket,
conn_name,
await_recv,
deferred_recv,
received_connect_packet,
connection_state,
conserve,
proxy_socket :: undefined | {rabbit_proxy_soket, any(), any()},
await_recv :: boolean(),
deferred_recv :: undefined | binary(),
parse_state,
proc_state,
proc_state :: rabbit_mqtt_processor:state(),
connection_state :: running | blocked,
conserve :: boolean(),
stats_timer,
keepalive :: rabbit_mqtt_keepalive:state()}).
keepalive = rabbit_mqtt_keepalive:init() :: rabbit_mqtt_keepalive:state(),
conn_name :: binary(),
received_connect_packet :: boolean()
}).
-type(state() :: #state{}).
%%----------------------------------------------------------------------------
@ -44,6 +47,9 @@ start_link(Ref, _Transport, []) ->
Pid = proc_lib:spawn_link(?MODULE, init, [Ref]),
{ok, Pid}.
-spec conserve_resources(pid(),
rabbit_alarm:resource_alarm_source(),
rabbit_alarm:resource_alert()) -> ok.
conserve_resources(Pid, _, {_, Conserve, _}) ->
Pid ! {conserve_resources, Conserve},
ok.
@ -53,6 +59,7 @@ conserve_resources(Pid, _, {_, Conserve, _}) ->
info(Pid, Items) ->
gen_server:call(Pid, {info, Items}).
-spec close_connection(pid(), Reason :: any()) -> ok.
close_connection(Pid, Reason) ->
gen_server:cast(Pid, {close_connection, Reason}).
@ -84,12 +91,16 @@ init(Ref) ->
State1 = control_throttle(State0),
State = rabbit_event:init_stats_timer(State1, #state.stats_timer),
gen_server:enter_loop(?MODULE, [], State);
{error, enotconn} ->
{error, Reason = enotconn} ->
rabbit_log_connection:info(
"MQTT could not get connection string: ~s", [Reason]),
rabbit_net:fast_close(RealSocket),
terminate(shutdown, undefined);
ignore;
{error, Reason} ->
rabbit_log_connection:error(
"MQTT could not get connection string: ~p", [Reason]),
rabbit_net:fast_close(RealSocket),
terminate({network_error, Reason}, undefined)
{stop, Reason}
end.
handle_call({info, InfoItems}, _From, State) ->
@ -362,8 +373,7 @@ process_received_bytes(Bytes,
{stop, {shutdown, Error}, State}
end.
-spec pstate(#state{}, rabbit_mqtt_processor:state())
-> #state{}.
-spec pstate(state(), rabbit_mqtt_processor:state()) -> state().
pstate(State = #state {}, PState) ->
State #state{ proc_state = PState }.
@ -408,8 +418,8 @@ control_throttle(State = #state{connection_state = Flow,
keepalive = KState,
proc_state = PState}) ->
Throttle = Conserve orelse
rabbit_mqtt_processor:soft_limit_exceeded(PState) orelse
credit_flow:blocked(),
rabbit_mqtt_processor:soft_limit_exceeded(PState) orelse
credit_flow:blocked(),
case {Flow, Throttle} of
{running, true} ->
State#state{connection_state = blocked,
@ -503,7 +513,8 @@ i(protocol, #state{proc_state = ProcState}) ->
i(Key, #state{proc_state = ProcState}) ->
rabbit_mqtt_processor:info(Key, ProcState).
-spec format_status(map()) -> map().
-spec format_status(gen_server:format_status()) ->
gen_server:format_status().
format_status(Status) ->
maps:map(
fun(state, State) ->
@ -512,6 +523,7 @@ format_status(Status) ->
Value
end, Status).
-spec format_state(state()) -> map().
format_state(#state{proc_state = PState,
socket = Socket,
proxy_socket = ProxySock,

View File

@ -7,20 +7,25 @@
-module(rabbit_mqtt_retained_msg_store).
-export([behaviour_info/1, table_name_for/1]).
-include("rabbit_mqtt_packet.hrl").
behaviour_info(callbacks) ->
[{new, 2},
{recover, 2},
{insert, 3},
{lookup, 2},
{delete, 2},
{terminate, 1}];
behaviour_info(_Other) ->
undefined.
-callback new(Directory :: file:name_all(), rabbit_types:vhost()) ->
State :: any().
table_name_for(VHost) ->
rabbit_mqtt_util:vhost_name_to_table_name(VHost).
-callback recover(Directory :: file:name_all(), rabbit_types:vhost()) ->
{ok, State :: any()} | {error, Reason :: term()}.
-callback insert(Topic :: string(), mqtt_msg(), State :: any()) ->
ok.
-callback lookup(Topic :: string(), State :: any()) ->
retained_message() | not_found.
-callback delete(Topic :: string(), State :: any()) ->
ok.
-callback terminate(State :: any()) ->
ok.
%% TODO Support retained messages in RabbitMQ cluster, for
%% 1. support PUBLISH with retain on a different node than SUBSCRIBE
@ -30,6 +35,5 @@ table_name_for(VHost) ->
%% * retained message store backend does RPCs to peer nodes to lookup and delete
%%
%% Possible solutions for 2.
%% * rabbitmq_mqtt_retained_msg_store_mnesia
%% * rabbitmq_mqtt_retained_msg_store_khepri
%% * rabbitmq_mqtt_retained_msg_store_ra (implementing our own ra machine)

View File

@ -8,7 +8,7 @@
-module(rabbit_mqtt_retained_msg_store_dets).
-behaviour(rabbit_mqtt_retained_msg_store).
-include("rabbit_mqtt_retain.hrl").
-include("rabbit_mqtt_packet.hrl").
-export([new/2, recover/2, insert/3, lookup/2, delete/2, terminate/1]).
@ -44,8 +44,8 @@ terminate(#store_state{table = T}) ->
ok = dets:close(T).
open_table(Dir, VHost) ->
dets:open_file(rabbit_mqtt_retained_msg_store:table_name_for(VHost),
table_options(rabbit_mqtt_util:path_for(Dir, VHost, ".dets"))).
dets:open_file(rabbit_mqtt_util:vhost_name_to_table_name(VHost),
table_options(rabbit_mqtt_util:path_for(Dir, VHost, ".dets"))).
table_options(Path) ->
[{type, set},

View File

@ -8,7 +8,7 @@
-module(rabbit_mqtt_retained_msg_store_ets).
-behaviour(rabbit_mqtt_retained_msg_store).
-include("rabbit_mqtt_retain.hrl").
-include("rabbit_mqtt_packet.hrl").
-export([new/2, recover/2, insert/3, lookup/2, delete/2, terminate/1]).
@ -22,7 +22,7 @@
new(Dir, VHost) ->
Path = rabbit_mqtt_util:path_for(Dir, VHost),
TableName = rabbit_mqtt_retained_msg_store:table_name_for(VHost),
TableName = rabbit_mqtt_util:vhost_name_to_table_name(VHost),
file:delete(Path),
Tid = ets:new(TableName, [set, public, {keypos, #retained_message.topic}]),
#store_state{table = Tid, filename = Path}.

View File

@ -7,13 +7,13 @@
-module(rabbit_mqtt_retainer).
-behaviour(gen_server2).
-include("rabbit_mqtt_retain.hrl").
-include("rabbit_mqtt.hrl").
-include("rabbit_mqtt_packet.hrl").
-behaviour(gen_server).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, start_link/2]).
terminate/2, start_link/2]).
-export([retain/3, fetch/2, clear/2, store_module/0]).
@ -22,27 +22,22 @@
-record(retainer_state, {store_mod,
store}).
-spec retain(pid(), string(), mqtt_msg()) ->
{noreply, NewState :: term()} |
{noreply, NewState :: term(), timeout() | hibernate} |
{stop, Reason :: term(), NewState :: term()}.
%%----------------------------------------------------------------------------
start_link(RetainStoreMod, VHost) ->
gen_server2:start_link(?MODULE, [RetainStoreMod, VHost], []).
gen_server:start_link(?MODULE, [RetainStoreMod, VHost], []).
-spec retain(pid(), string(), mqtt_msg()) -> ok.
retain(Pid, Topic, Msg = #mqtt_msg{retain = true}) ->
gen_server2:cast(Pid, {retain, Topic, Msg});
gen_server:cast(Pid, {retain, Topic, Msg});
retain(_Pid, _Topic, Msg = #mqtt_msg{retain = false}) ->
throw({error, {retain_is_false, Msg}}).
fetch(Pid, Topic) ->
gen_server2:call(Pid, {fetch, Topic}, ?TIMEOUT).
gen_server:call(Pid, {fetch, Topic}, ?TIMEOUT).
clear(Pid, Topic) ->
gen_server2:cast(Pid, {clear, Topic}).
gen_server:cast(Pid, {clear, Topic}).
%%----------------------------------------------------------------------------
@ -91,8 +86,4 @@ store_dir() ->
rabbit:data_dir().
terminate(_Reason, #retainer_state{store = Store, store_mod = Mod}) ->
Mod:terminate(Store),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
Mod:terminate(Store).

View File

@ -13,7 +13,6 @@
-export([queue_name_bin/2,
qos_from_queue_name/2,
gen_client_id/0,
env/1,
table_lookup/2,
path_for/2,
@ -61,6 +60,7 @@ qos_from_queue_name(#resource{name = Name}, ClientId) ->
queue_name_prefix(ClientId) ->
<<"mqtt-subscription-", ClientId/binary, "qos">>.
-spec init_sparkplug() -> ok.
init_sparkplug() ->
case env(sparkplug) of
true ->
@ -133,10 +133,6 @@ to_mqtt(T0) ->
T2 = string:replace(T1, ".", "/", all),
erlang:iolist_to_binary(T2).
-spec gen_client_id() -> string().
gen_client_id() ->
rabbit_misc:base64url(rabbit_guid:gen_secure()).
env(Key) ->
case application:get_env(?APP_NAME, Key) of
{ok, Val} -> coerce_env_value(Key, Val);
@ -167,9 +163,11 @@ path_for(Dir, VHost, Suffix) ->
filename:join(Dir, vhost_name_to_dir_name(VHost, Suffix)).
-spec vhost_name_to_table_name(rabbit_types:vhost()) ->
atom().
vhost_name_to_table_name(VHost) ->
<<Num:128>> = erlang:md5(VHost),
list_to_atom("rabbit_mqtt_retained_" ++ rabbit_misc:format("~36.16.0b", [Num])).
<<Num:128>> = erlang:md5(VHost),
list_to_atom("rabbit_mqtt_retained_" ++ rabbit_misc:format("~36.16.0b", [Num])).
-spec register_clientid(rabbit_types:vhost(), binary()) -> ok.
register_clientid(Vhost, ClientId)

View File

@ -894,7 +894,7 @@ management_plugin_connection(Config) ->
ct:fail("server did not close connection")
end,
?assertEqual([], http_get(Config, "/connections")),
?assertEqual([], all_connection_pids(Config)).
eventually(?_assertEqual([], all_connection_pids(Config)), 500, 3).
management_plugin_enable(Config) ->
?assertEqual(0, length(http_get(Config, "/connections"))),

View File

@ -66,6 +66,7 @@ proxy_protocol(Config) ->
ok = inet:send(Socket, "PROXY TCP4 192.168.1.1 192.168.1.2 80 81\r\n"),
ok = inet:send(Socket, mqtt_3_1_1_connect_packet()),
{ok, _Packet} = gen_tcp:recv(Socket, 0, ?TIMEOUT),
timer:sleep(10),
ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, connection_name, []),
match = re:run(ConnectionName, <<"^192.168.1.1:80 -> 192.168.1.2:81$">>, [{capture, none}]),
gen_tcp:close(Socket),
@ -80,15 +81,14 @@ proxy_protocol_tls(Config) ->
{ok, SslSocket} = ssl:connect(Socket, [], ?TIMEOUT),
ok = ssl:send(SslSocket, mqtt_3_1_1_connect_packet()),
{ok, _Packet} = ssl:recv(SslSocket, 0, ?TIMEOUT),
ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, connection_name, []),
timer:sleep(10),
ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, connection_name, []),
match = re:run(ConnectionName, <<"^192.168.1.1:80 -> 192.168.1.2:81$">>, [{capture, none}]),
gen_tcp:close(Socket),
ok.
connection_name() ->
Connections = ets:tab2list(connection_created),
{_Key, Values} = lists:nth(1, Connections),
[{_Key, Values}] = ets:tab2list(connection_created),
{_, Name} = lists:keyfind(name, 1, Values),
Name.

View File

@ -253,6 +253,9 @@ process_received_bytes(Bytes,
{stop, normal, State}
end.
-spec conserve_resources(pid(),
rabbit_alarm:resource_alarm_source(),
rabbit_alarm:resource_alert()) -> ok.
conserve_resources(Pid, _Source, {_, Conserve, _}) ->
Pid ! {conserve_resources, Conserve},
ok.

View File

@ -550,11 +550,11 @@ invalid_transition(Transport, Socket, From, To) ->
close_immediately(Transport, Socket),
stop.
resource_alarm(ConnectionPid, disk,
{_WasAlarmSetForNode,
IsThereAnyAlarmsForSameResourceInTheCluster, _Node}) ->
ConnectionPid
! {resource_alarm, IsThereAnyAlarmsForSameResourceInTheCluster},
-spec resource_alarm(pid(),
rabbit_alarm:resource_alarm_source(),
rabbit_alarm:resource_alert()) -> ok.
resource_alarm(ConnectionPid, disk, {_, Conserve, _}) ->
ConnectionPid ! {resource_alarm, Conserve},
ok;
resource_alarm(_ConnectionPid, _Resource, _Alert) ->
ok.

View File

@ -26,18 +26,19 @@
takeover/7]).
-record(state, {
conn_name,
socket,
parse_state,
proc_state,
connection_state,
conserve_resources,
socket,
peername,
connection_state = running :: running | blocked,
conserve = false :: boolean(),
stats_timer,
received_connect_packet,
keepalive :: rabbit_mqtt_keepalive:state()
keepalive = rabbit_mqtt_keepalive:init() :: rabbit_mqtt_keepalive:state(),
conn_name,
received_connect_packet = false :: boolean()
}).
-type state() :: #state{}.
%% Close frame status codes as defined in https://www.rfc-editor.org/rfc/rfc6455#section-7.4.1
-define(CLOSE_NORMAL, 1000).
-define(CLOSE_PROTOCOL_ERROR, 1002).
@ -51,7 +52,7 @@ upgrade(Req, Env, Handler, HandlerState) ->
upgrade(Req, Env, Handler, HandlerState, Opts) ->
cowboy_websocket:upgrade(Req, Env, Handler, HandlerState, Opts).
takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, HandlerState}) ->
takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, {HandlerState, PeerAddr}}) ->
Sock = case HandlerState#state.socket of
undefined ->
Socket;
@ -59,7 +60,7 @@ takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, HandlerState})
{rabbit_proxy_socket, Socket, ProxyInfo}
end,
cowboy_websocket:takeover(Parent, Ref, Socket, Transport, Opts, Buffer,
{Handler, HandlerState#state{socket = Sock}}).
{Handler, {HandlerState#state{socket = Sock}, PeerAddr}}).
%% cowboy_websocket
-spec init(Req, any()) ->
@ -80,22 +81,17 @@ init(Req, Opts) ->
true ->
{?MODULE,
cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, <<"mqtt">>, Req),
#state{
parse_state = rabbit_mqtt_packet:initial_state(),
connection_state = running,
conserve_resources = false,
socket = maps:get(proxy_header, Req, undefined),
peername = PeerAddr,
received_connect_packet = false
},
{#state{parse_state = rabbit_mqtt_packet:initial_state(),
socket = maps:get(proxy_header, Req, undefined)},
PeerAddr},
WsOpts}
end
end.
-spec websocket_init(State) ->
{cowboy_websocket:commands(), State} |
{cowboy_websocket:commands(), State, hibernate}.
websocket_init(State0 = #state{socket = Sock, peername = PeerAddr}) ->
-spec websocket_init({state(), PeerAddr :: binary()}) ->
{cowboy_websocket:commands(), state()} |
{cowboy_websocket:commands(), state(), hibernate}.
websocket_init({State0 = #state{socket = Sock}, PeerAddr}) ->
ok = file_handle_cache:obtain(),
case rabbit_net:connection_string(Sock, inbound) of
{ok, ConnStr} ->
@ -144,7 +140,7 @@ websocket_handle(Frame, State) ->
{cowboy_websocket:commands(), State} |
{cowboy_websocket:commands(), State, hibernate}.
websocket_info({conserve_resources, Conserve}, State) ->
NewState = State#state{conserve_resources = Conserve},
NewState = State#state{conserve = Conserve},
handle_credits(NewState);
websocket_info({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
@ -223,7 +219,7 @@ websocket_info(Msg, State) ->
{[], State, hibernate}.
-spec terminate(any(), cowboy_req:req(), any()) -> ok.
terminate(_Reason, _Req, #state{connection_state = undefined}) ->
terminate(_Reason, _Req, #state{proc_state = undefined}) ->
ok;
terminate(Reason, Request, #state{} = State) ->
terminate(Reason, Request, {true, State});
@ -317,12 +313,12 @@ handle_credits(State0) ->
end.
control_throttle(State = #state{connection_state = CS,
conserve_resources = Conserve,
conserve = Conserve,
keepalive = KState,
proc_state = PState}) ->
Throttle = Conserve orelse
rabbit_mqtt_processor:soft_limit_exceeded(PState) orelse
credit_flow:blocked(),
rabbit_mqtt_processor:soft_limit_exceeded(PState) orelse
credit_flow:blocked(),
case {CS, Throttle} of
{running, true} ->
State#state{connection_state = blocked,