Adding missing function specs
This commit is contained in:
parent
1f106fcd98
commit
50e25778bb
|
|
@ -172,6 +172,8 @@ apply(_Meta, Unknown, State) ->
|
|||
logger:error("MQTT Raft state machine v1 received unknown command ~tp", [Unknown]),
|
||||
{State, {error, {unknown_command, Unknown}}, []}.
|
||||
|
||||
-spec state_enter(ra_server:ra_state() | eol, state()) ->
|
||||
ra_machine:effects().
|
||||
state_enter(leader, State) ->
|
||||
%% re-request monitors for all known pids, this would clean up
|
||||
%% records for all connections are no longer around, e.g. right after node restart
|
||||
|
|
@ -188,6 +190,7 @@ overview(#machine_state{client_ids = ClientIds,
|
|||
%% ==========================
|
||||
|
||||
%% Avoids blocking the Raft leader.
|
||||
-spec notify_connection(pid(), duplicate_id | decommission_node) -> pid().
|
||||
notify_connection(Pid, Reason) ->
|
||||
spawn(fun() -> gen_server2:cast(Pid, Reason) end).
|
||||
|
||||
|
|
|
|||
|
|
@ -113,6 +113,8 @@ apply(_Meta, Unknown, State) ->
|
|||
logger:error("MQTT Raft state machine received an unknown command ~tp", [Unknown]),
|
||||
{State, {error, {unknown_command, Unknown}}, []}.
|
||||
|
||||
-spec state_enter(ra_server:ra_state(), state()) ->
|
||||
ra_machine:effects().
|
||||
state_enter(leader, State) ->
|
||||
%% re-request monitors for all known pids, this would clean up
|
||||
%% records for all connections are no longer around, e.g. right after node restart
|
||||
|
|
@ -123,6 +125,7 @@ state_enter(_, _) ->
|
|||
%% ==========================
|
||||
|
||||
%% Avoids blocking the Raft leader.
|
||||
-spec notify_connection(pid(), duplicate_id | decommission_node) -> pid().
|
||||
notify_connection(Pid, Reason) ->
|
||||
spawn(fun() -> gen_server2:cast(Pid, Reason) end).
|
||||
|
||||
|
|
|
|||
|
|
@ -41,6 +41,7 @@ start(normal, []) ->
|
|||
stop(_) ->
|
||||
rabbit_mqtt_sup:stop_listeners().
|
||||
|
||||
-spec emit_connection_info_all([node()], rabbit_types:info_keys(), reference(), pid()) -> term().
|
||||
emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
|
||||
case rabbit_mqtt_ff:track_client_id_in_ra() of
|
||||
true ->
|
||||
|
|
@ -57,6 +58,7 @@ emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
|
|||
rabbit_control_misc:await_emitters_termination(Pids)
|
||||
end.
|
||||
|
||||
-spec emit_connection_info_local(rabbit_types:info_keys(), reference(), pid()) -> ok.
|
||||
emit_connection_info_local(Items, Ref, AggregatorPid) ->
|
||||
LocalPids = local_connection_pids(),
|
||||
emit_connection_info(Items, Ref, AggregatorPid, LocalPids).
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ register(ServerId, ClientId, Pid) ->
|
|||
erlang:send_after(5000, self(), {ra_event, undefined, register_timeout}),
|
||||
{ok, Corr}.
|
||||
|
||||
-spec unregister(binary(), pid()) -> ok.
|
||||
unregister(ClientId, Pid) ->
|
||||
{ClusterName, _} = mqtt_node:server_id(),
|
||||
case ra_leaderboard:lookup_leader(ClusterName) of
|
||||
|
|
@ -49,6 +50,7 @@ unregister(ClientId, Pid) ->
|
|||
list_pids() ->
|
||||
list(fun(#machine_state{pids = Pids}) -> maps:keys(Pids) end).
|
||||
|
||||
-spec list() -> term().
|
||||
list() ->
|
||||
list(fun(#machine_state{client_ids = Ids}) -> maps:to_list(Ids) end).
|
||||
|
||||
|
|
@ -76,6 +78,7 @@ list(QF) ->
|
|||
end
|
||||
end.
|
||||
|
||||
-spec leave(binary()) -> ok | timeout | nodedown.
|
||||
leave(NodeBin) ->
|
||||
Node = binary_to_atom(NodeBin, utf8),
|
||||
ServerId = mqtt_node:server_id(),
|
||||
|
|
|
|||
|
|
@ -24,5 +24,6 @@
|
|||
callbacks => #{enable => {mqtt_node, delete}}
|
||||
}}).
|
||||
|
||||
-spec track_client_id_in_ra() -> boolean().
|
||||
track_client_id_in_ra() ->
|
||||
not rabbit_feature_flags:is_enabled(delete_ra_cluster_mqtt_node).
|
||||
|
|
|
|||
|
|
@ -1329,9 +1329,13 @@ serialise_and_send_to_client(Packet, #state{cfg = #cfg{proto_ver = ProtoVer,
|
|||
[Sock, Error, Packet#mqtt_packet.fixed, Packet#mqtt_packet.variable])
|
||||
end.
|
||||
|
||||
-spec serialise(#mqtt_packet{}, state()) ->
|
||||
iodata().
|
||||
serialise(Packet, #state{cfg = #cfg{proto_ver = ProtoVer}}) ->
|
||||
rabbit_mqtt_packet:serialise(Packet, ProtoVer).
|
||||
|
||||
-spec terminate(boolean(), binary(), atom(), state()) ->
|
||||
ok.
|
||||
terminate(SendWill, ConnName, ProtoFamily, State) ->
|
||||
maybe_send_will(SendWill, ConnName, State),
|
||||
Infos = [{name, ConnName},
|
||||
|
|
@ -1427,11 +1431,15 @@ delete_queue(QName, Username) ->
|
|||
ok
|
||||
end).
|
||||
|
||||
-spec handle_pre_hibernate() -> ok.
|
||||
handle_pre_hibernate() ->
|
||||
erase(permission_cache),
|
||||
erase(topic_permission_cache),
|
||||
ok.
|
||||
|
||||
-spec handle_ra_event(register_timeout
|
||||
| {applied, [{reference(), ok}]}
|
||||
| {not_leader, term(), reference()}, state()) -> state().
|
||||
handle_ra_event({applied, [{Corr, ok}]},
|
||||
State = #state{register_state = {pending, Corr}}) ->
|
||||
%% success case - command was applied transition into registered state
|
||||
|
|
@ -1796,6 +1804,7 @@ throttle(Conserve, Connected, #state{queues_soft_limit_exceeded = QSLE,
|
|||
not sets:is_empty(QSLE) orelse
|
||||
credit_flow:blocked().
|
||||
|
||||
-spec info(rabbit_types:info_key(), state()) -> any().
|
||||
info(host, #state{cfg = #cfg{host = Val}}) -> Val;
|
||||
info(port, #state{cfg = #cfg{port = Val}}) -> Val;
|
||||
info(peer_host, #state{cfg = #cfg{peer_host = Val}}) -> Val;
|
||||
|
|
|
|||
|
|
@ -17,29 +17,37 @@
|
|||
table
|
||||
}).
|
||||
|
||||
-type store_state() :: #store_state{}.
|
||||
|
||||
-spec new(file:name_all(), rabbit_types:vhost()) -> store_state().
|
||||
new(Dir, VHost) ->
|
||||
Tid = open_table(Dir, VHost),
|
||||
#store_state{table = Tid}.
|
||||
|
||||
-spec recover(file:name_all(), rabbit_types:vhost()) ->
|
||||
{error, uninitialized} | {ok, store_state()}.
|
||||
recover(Dir, VHost) ->
|
||||
case open_table(Dir, VHost) of
|
||||
{error, _} -> {error, uninitialized};
|
||||
{ok, Tid} -> {ok, #store_state{table = Tid}}
|
||||
end.
|
||||
|
||||
-spec insert(binary(), mqtt_msg(), store_state()) -> ok.
|
||||
insert(Topic, Msg, #store_state{table = T}) ->
|
||||
ok = dets:insert(T, #retained_message{topic = Topic, mqtt_msg = Msg}).
|
||||
|
||||
-spec lookup(binary(), store_state()) -> retained_message() | not_found.
|
||||
lookup(Topic, #store_state{table = T}) ->
|
||||
case dets:lookup(T, Topic) of
|
||||
[] -> not_found;
|
||||
[Entry] -> Entry
|
||||
end.
|
||||
|
||||
-spec delete(binary(), store_state()) -> ok.
|
||||
delete(Topic, #store_state{table = T}) ->
|
||||
ok = dets:delete(T, Topic).
|
||||
|
||||
-spec terminate(store_state()) -> ok.
|
||||
terminate(#store_state{table = T}) ->
|
||||
ok = dets:close(T).
|
||||
|
||||
|
|
|
|||
|
|
@ -19,7 +19,9 @@
|
|||
filename
|
||||
}).
|
||||
|
||||
-type store_state() :: #store_state{}.
|
||||
|
||||
-spec new(file:name_all(), rabbit_types:vhost()) -> store_state().
|
||||
new(Dir, VHost) ->
|
||||
Path = rabbit_mqtt_util:path_for(Dir, VHost),
|
||||
TableName = rabbit_mqtt_util:vhost_name_to_table_name(VHost),
|
||||
|
|
@ -27,6 +29,8 @@ new(Dir, VHost) ->
|
|||
Tid = ets:new(TableName, [set, public, {keypos, #retained_message.topic}]),
|
||||
#store_state{table = Tid, filename = Path}.
|
||||
|
||||
-spec recover(file:name_all(), rabbit_types:vhost()) ->
|
||||
{error, uninitialized} | {ok, store_state()}.
|
||||
recover(Dir, VHost) ->
|
||||
Path = rabbit_mqtt_util:path_for(Dir, VHost),
|
||||
case ets:file2tab(Path) of
|
||||
|
|
@ -35,19 +39,23 @@ recover(Dir, VHost) ->
|
|||
{error, _} -> {error, uninitialized}
|
||||
end.
|
||||
|
||||
-spec insert(binary(), mqtt_msg(), store_state()) -> ok.
|
||||
insert(Topic, Msg, #store_state{table = T}) ->
|
||||
true = ets:insert(T, #retained_message{topic = Topic, mqtt_msg = Msg}),
|
||||
ok.
|
||||
|
||||
-spec lookup(binary(), store_state()) -> retained_message() | not_found.
|
||||
lookup(Topic, #store_state{table = T}) ->
|
||||
case ets:lookup(T, Topic) of
|
||||
[] -> not_found;
|
||||
[Entry] -> Entry
|
||||
end.
|
||||
|
||||
-spec delete(binary(), store_state()) -> ok.
|
||||
delete(Topic, #store_state{table = T}) ->
|
||||
true = ets:delete(T, Topic),
|
||||
ok.
|
||||
|
||||
-spec terminate(store_state()) -> ok.
|
||||
terminate(#store_state{table = T, filename = Path}) ->
|
||||
ok = ets:tab2file(T, Path, [{extended_info, [object_count]}]).
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
start_link(SupName) ->
|
||||
supervisor:start_link(SupName, ?MODULE, []).
|
||||
|
||||
-spec child_for_vhost(rabbit_types:vhost()) -> pid().
|
||||
child_for_vhost(VHost) when is_binary(VHost) ->
|
||||
case rabbit_mqtt_retainer_sup:start_child(VHost) of
|
||||
{ok, Pid} -> Pid;
|
||||
|
|
|
|||
|
|
@ -70,6 +70,7 @@ init([{Listeners, SslListeners0}]) ->
|
|||
)
|
||||
]}}.
|
||||
|
||||
-spec stop_listeners() -> ok.
|
||||
stop_listeners() ->
|
||||
_ = rabbit_networking:stop_ranch_listener_of_protocol(?TCP_PROTOCOL),
|
||||
_ = rabbit_networking:stop_ranch_listener_of_protocol(?TLS_PROTOCOL),
|
||||
|
|
|
|||
|
|
@ -133,6 +133,7 @@ to_mqtt(T0) ->
|
|||
T2 = string:replace(T1, ".", "/", all),
|
||||
erlang:iolist_to_binary(T2).
|
||||
|
||||
-spec env(atom()) -> any().
|
||||
env(Key) ->
|
||||
case application:get_env(?APP_NAME, Key) of
|
||||
{ok, Val} -> coerce_env_value(Key, Val);
|
||||
|
|
@ -145,6 +146,8 @@ coerce_env_value(exchange, Val) -> rabbit_data_coercion:to_binary(Val);
|
|||
coerce_env_value(vhost, Val) -> rabbit_data_coercion:to_binary(Val);
|
||||
coerce_env_value(_, Val) -> Val.
|
||||
|
||||
-spec table_lookup(rabbit_framing:amqp_table() | undefined, binary()) ->
|
||||
tuple() | undefined.
|
||||
table_lookup(undefined, _Key) ->
|
||||
undefined;
|
||||
table_lookup(Table, Key) ->
|
||||
|
|
@ -156,13 +159,14 @@ vhost_name_to_dir_name(VHost, Suffix) ->
|
|||
<<Num:128>> = erlang:md5(VHost),
|
||||
"mqtt_retained_" ++ rabbit_misc:format("~36.16.0b", [Num]) ++ Suffix.
|
||||
|
||||
-spec path_for(file:name_all(), rabbit_types:vhost()) -> file:filename_all().
|
||||
path_for(Dir, VHost) ->
|
||||
filename:join(Dir, vhost_name_to_dir_name(VHost)).
|
||||
|
||||
-spec path_for(file:name_all(), rabbit_types:vhost(), string()) -> file:filename_all().
|
||||
path_for(Dir, VHost, Suffix) ->
|
||||
filename:join(Dir, vhost_name_to_dir_name(VHost, Suffix)).
|
||||
|
||||
|
||||
-spec vhost_name_to_table_name(rabbit_types:vhost()) ->
|
||||
atom().
|
||||
vhost_name_to_table_name(VHost) ->
|
||||
|
|
|
|||
|
|
@ -28,17 +28,19 @@
|
|||
upgrade/5,
|
||||
takeover/7]).
|
||||
|
||||
-type option(T) :: undefined | T.
|
||||
|
||||
-record(state, {
|
||||
socket,
|
||||
socket :: {rabbit_proxy_socket, any(), any()} | rabbit_net:socket(),
|
||||
parse_state = rabbit_mqtt_packet:initial_state() :: rabbit_mqtt_packet:state(),
|
||||
proc_state :: undefined | rabbit_mqtt_processor:state(),
|
||||
connection_state = running :: running | blocked,
|
||||
conserve = false :: boolean(),
|
||||
stats_timer :: undefined | rabbit_event:state(),
|
||||
stats_timer :: option(rabbit_event:state()),
|
||||
keepalive = rabbit_mqtt_keepalive:init() :: rabbit_mqtt_keepalive:state(),
|
||||
conn_name :: undefined | binary(),
|
||||
conn_name :: option(binary()),
|
||||
received_connect_packet = false :: boolean()
|
||||
}).
|
||||
}).
|
||||
|
||||
-type state() :: #state{}.
|
||||
|
||||
|
|
@ -66,10 +68,6 @@ takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, {HandlerState,
|
|||
{Handler, {HandlerState#state{socket = Sock}, PeerAddr}}).
|
||||
|
||||
%% cowboy_websocket
|
||||
-spec init(Req, any()) ->
|
||||
{ok | module(), Req, any()} |
|
||||
{module(), Req, any(), any()}
|
||||
when Req::cowboy_req:req().
|
||||
init(Req, Opts) ->
|
||||
case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of
|
||||
undefined ->
|
||||
|
|
@ -225,7 +223,6 @@ websocket_info(Msg, State) ->
|
|||
?LOG_WARNING("Web MQTT: unexpected message ~tp", [Msg]),
|
||||
{[], State, hibernate}.
|
||||
|
||||
-spec terminate(any(), cowboy_req:req(), any()) -> ok.
|
||||
terminate(_Reason, _Req, #state{proc_state = undefined}) ->
|
||||
ok;
|
||||
terminate(Reason, Request, #state{} = State) ->
|
||||
|
|
|
|||
Loading…
Reference in New Issue