From 50e25778bb52e6e30d7193a14bf47f284ec39f7e Mon Sep 17 00:00:00 2001 From: Chunyi Lyu Date: Wed, 25 Jan 2023 18:18:42 +0000 Subject: [PATCH] Adding missing function specs --- deps/rabbitmq_mqtt/src/mqtt_machine.erl | 3 +++ deps/rabbitmq_mqtt/src/mqtt_machine_v0.erl | 3 +++ deps/rabbitmq_mqtt/src/rabbit_mqtt.erl | 2 ++ deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl | 3 +++ deps/rabbitmq_mqtt/src/rabbit_mqtt_ff.erl | 1 + deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl | 9 +++++++++ .../src/rabbit_mqtt_retained_msg_store_dets.erl | 8 ++++++++ .../src/rabbit_mqtt_retained_msg_store_ets.erl | 8 ++++++++ .../src/rabbit_mqtt_retainer_sup.erl | 1 + deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl | 1 + deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl | 6 +++++- .../src/rabbit_web_mqtt_handler.erl | 15 ++++++--------- 12 files changed, 50 insertions(+), 10 deletions(-) diff --git a/deps/rabbitmq_mqtt/src/mqtt_machine.erl b/deps/rabbitmq_mqtt/src/mqtt_machine.erl index 13e6ce7b25..20e69a7a2a 100644 --- a/deps/rabbitmq_mqtt/src/mqtt_machine.erl +++ b/deps/rabbitmq_mqtt/src/mqtt_machine.erl @@ -172,6 +172,8 @@ apply(_Meta, Unknown, State) -> logger:error("MQTT Raft state machine v1 received unknown command ~tp", [Unknown]), {State, {error, {unknown_command, Unknown}}, []}. +-spec state_enter(ra_server:ra_state() | eol, state()) -> + ra_machine:effects(). state_enter(leader, State) -> %% re-request monitors for all known pids, this would clean up %% records for all connections are no longer around, e.g. right after node restart @@ -188,6 +190,7 @@ overview(#machine_state{client_ids = ClientIds, %% ========================== %% Avoids blocking the Raft leader. +-spec notify_connection(pid(), duplicate_id | decommission_node) -> pid(). notify_connection(Pid, Reason) -> spawn(fun() -> gen_server2:cast(Pid, Reason) end). diff --git a/deps/rabbitmq_mqtt/src/mqtt_machine_v0.erl b/deps/rabbitmq_mqtt/src/mqtt_machine_v0.erl index c76c268c17..4b32ac88dd 100644 --- a/deps/rabbitmq_mqtt/src/mqtt_machine_v0.erl +++ b/deps/rabbitmq_mqtt/src/mqtt_machine_v0.erl @@ -113,6 +113,8 @@ apply(_Meta, Unknown, State) -> logger:error("MQTT Raft state machine received an unknown command ~tp", [Unknown]), {State, {error, {unknown_command, Unknown}}, []}. +-spec state_enter(ra_server:ra_state(), state()) -> + ra_machine:effects(). state_enter(leader, State) -> %% re-request monitors for all known pids, this would clean up %% records for all connections are no longer around, e.g. right after node restart @@ -123,6 +125,7 @@ state_enter(_, _) -> %% ========================== %% Avoids blocking the Raft leader. +-spec notify_connection(pid(), duplicate_id | decommission_node) -> pid(). notify_connection(Pid, Reason) -> spawn(fun() -> gen_server2:cast(Pid, Reason) end). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl index 818c161b28..5fb1861255 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl @@ -41,6 +41,7 @@ start(normal, []) -> stop(_) -> rabbit_mqtt_sup:stop_listeners(). +-spec emit_connection_info_all([node()], rabbit_types:info_keys(), reference(), pid()) -> term(). emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) -> case rabbit_mqtt_ff:track_client_id_in_ra() of true -> @@ -57,6 +58,7 @@ emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) -> rabbit_control_misc:await_emitters_termination(Pids) end. +-spec emit_connection_info_local(rabbit_types:info_keys(), reference(), pid()) -> ok. emit_connection_info_local(Items, Ref, AggregatorPid) -> LocalPids = local_connection_pids(), emit_connection_info(Items, Ref, AggregatorPid, LocalPids). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl index 28c0506ef7..5b5050d64e 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl @@ -36,6 +36,7 @@ register(ServerId, ClientId, Pid) -> erlang:send_after(5000, self(), {ra_event, undefined, register_timeout}), {ok, Corr}. +-spec unregister(binary(), pid()) -> ok. unregister(ClientId, Pid) -> {ClusterName, _} = mqtt_node:server_id(), case ra_leaderboard:lookup_leader(ClusterName) of @@ -49,6 +50,7 @@ unregister(ClientId, Pid) -> list_pids() -> list(fun(#machine_state{pids = Pids}) -> maps:keys(Pids) end). +-spec list() -> term(). list() -> list(fun(#machine_state{client_ids = Ids}) -> maps:to_list(Ids) end). @@ -76,6 +78,7 @@ list(QF) -> end end. +-spec leave(binary()) -> ok | timeout | nodedown. leave(NodeBin) -> Node = binary_to_atom(NodeBin, utf8), ServerId = mqtt_node:server_id(), diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_ff.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_ff.erl index b894cee42c..2432cc2ac5 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_ff.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_ff.erl @@ -24,5 +24,6 @@ callbacks => #{enable => {mqtt_node, delete}} }}). +-spec track_client_id_in_ra() -> boolean(). track_client_id_in_ra() -> not rabbit_feature_flags:is_enabled(delete_ra_cluster_mqtt_node). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 6f5900aa79..58bccaa2f5 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -1329,9 +1329,13 @@ serialise_and_send_to_client(Packet, #state{cfg = #cfg{proto_ver = ProtoVer, [Sock, Error, Packet#mqtt_packet.fixed, Packet#mqtt_packet.variable]) end. +-spec serialise(#mqtt_packet{}, state()) -> + iodata(). serialise(Packet, #state{cfg = #cfg{proto_ver = ProtoVer}}) -> rabbit_mqtt_packet:serialise(Packet, ProtoVer). +-spec terminate(boolean(), binary(), atom(), state()) -> + ok. terminate(SendWill, ConnName, ProtoFamily, State) -> maybe_send_will(SendWill, ConnName, State), Infos = [{name, ConnName}, @@ -1427,11 +1431,15 @@ delete_queue(QName, Username) -> ok end). +-spec handle_pre_hibernate() -> ok. handle_pre_hibernate() -> erase(permission_cache), erase(topic_permission_cache), ok. +-spec handle_ra_event(register_timeout +| {applied, [{reference(), ok}]} +| {not_leader, term(), reference()}, state()) -> state(). handle_ra_event({applied, [{Corr, ok}]}, State = #state{register_state = {pending, Corr}}) -> %% success case - command was applied transition into registered state @@ -1796,6 +1804,7 @@ throttle(Conserve, Connected, #state{queues_soft_limit_exceeded = QSLE, not sets:is_empty(QSLE) orelse credit_flow:blocked(). +-spec info(rabbit_types:info_key(), state()) -> any(). info(host, #state{cfg = #cfg{host = Val}}) -> Val; info(port, #state{cfg = #cfg{port = Val}}) -> Val; info(peer_host, #state{cfg = #cfg{peer_host = Val}}) -> Val; diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_dets.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_dets.erl index b057e499ad..8dbd06a440 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_dets.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_dets.erl @@ -17,29 +17,37 @@ table }). +-type store_state() :: #store_state{}. +-spec new(file:name_all(), rabbit_types:vhost()) -> store_state(). new(Dir, VHost) -> Tid = open_table(Dir, VHost), #store_state{table = Tid}. +-spec recover(file:name_all(), rabbit_types:vhost()) -> + {error, uninitialized} | {ok, store_state()}. recover(Dir, VHost) -> case open_table(Dir, VHost) of {error, _} -> {error, uninitialized}; {ok, Tid} -> {ok, #store_state{table = Tid}} end. +-spec insert(binary(), mqtt_msg(), store_state()) -> ok. insert(Topic, Msg, #store_state{table = T}) -> ok = dets:insert(T, #retained_message{topic = Topic, mqtt_msg = Msg}). +-spec lookup(binary(), store_state()) -> retained_message() | not_found. lookup(Topic, #store_state{table = T}) -> case dets:lookup(T, Topic) of [] -> not_found; [Entry] -> Entry end. +-spec delete(binary(), store_state()) -> ok. delete(Topic, #store_state{table = T}) -> ok = dets:delete(T, Topic). +-spec terminate(store_state()) -> ok. terminate(#store_state{table = T}) -> ok = dets:close(T). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_ets.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_ets.erl index e2f5831de4..3a0a7384db 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_ets.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_ets.erl @@ -19,7 +19,9 @@ filename }). +-type store_state() :: #store_state{}. +-spec new(file:name_all(), rabbit_types:vhost()) -> store_state(). new(Dir, VHost) -> Path = rabbit_mqtt_util:path_for(Dir, VHost), TableName = rabbit_mqtt_util:vhost_name_to_table_name(VHost), @@ -27,6 +29,8 @@ new(Dir, VHost) -> Tid = ets:new(TableName, [set, public, {keypos, #retained_message.topic}]), #store_state{table = Tid, filename = Path}. +-spec recover(file:name_all(), rabbit_types:vhost()) -> + {error, uninitialized} | {ok, store_state()}. recover(Dir, VHost) -> Path = rabbit_mqtt_util:path_for(Dir, VHost), case ets:file2tab(Path) of @@ -35,19 +39,23 @@ recover(Dir, VHost) -> {error, _} -> {error, uninitialized} end. +-spec insert(binary(), mqtt_msg(), store_state()) -> ok. insert(Topic, Msg, #store_state{table = T}) -> true = ets:insert(T, #retained_message{topic = Topic, mqtt_msg = Msg}), ok. +-spec lookup(binary(), store_state()) -> retained_message() | not_found. lookup(Topic, #store_state{table = T}) -> case ets:lookup(T, Topic) of [] -> not_found; [Entry] -> Entry end. +-spec delete(binary(), store_state()) -> ok. delete(Topic, #store_state{table = T}) -> true = ets:delete(T, Topic), ok. +-spec terminate(store_state()) -> ok. terminate(#store_state{table = T, filename = Path}) -> ok = ets:tab2file(T, Path, [{extended_info, [object_count]}]). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer_sup.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer_sup.erl index 9e8c983a9b..eadaab7ca2 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer_sup.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer_sup.erl @@ -17,6 +17,7 @@ start_link(SupName) -> supervisor:start_link(SupName, ?MODULE, []). +-spec child_for_vhost(rabbit_types:vhost()) -> pid(). child_for_vhost(VHost) when is_binary(VHost) -> case rabbit_mqtt_retainer_sup:start_child(VHost) of {ok, Pid} -> Pid; diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl index 5d1a49b7b1..a04ef73cb9 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl @@ -70,6 +70,7 @@ init([{Listeners, SslListeners0}]) -> ) ]}}. +-spec stop_listeners() -> ok. stop_listeners() -> _ = rabbit_networking:stop_ranch_listener_of_protocol(?TCP_PROTOCOL), _ = rabbit_networking:stop_ranch_listener_of_protocol(?TLS_PROTOCOL), diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl index fa10d5a984..d098d3ff93 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl @@ -133,6 +133,7 @@ to_mqtt(T0) -> T2 = string:replace(T1, ".", "/", all), erlang:iolist_to_binary(T2). +-spec env(atom()) -> any(). env(Key) -> case application:get_env(?APP_NAME, Key) of {ok, Val} -> coerce_env_value(Key, Val); @@ -145,6 +146,8 @@ coerce_env_value(exchange, Val) -> rabbit_data_coercion:to_binary(Val); coerce_env_value(vhost, Val) -> rabbit_data_coercion:to_binary(Val); coerce_env_value(_, Val) -> Val. +-spec table_lookup(rabbit_framing:amqp_table() | undefined, binary()) -> + tuple() | undefined. table_lookup(undefined, _Key) -> undefined; table_lookup(Table, Key) -> @@ -156,13 +159,14 @@ vhost_name_to_dir_name(VHost, Suffix) -> <> = erlang:md5(VHost), "mqtt_retained_" ++ rabbit_misc:format("~36.16.0b", [Num]) ++ Suffix. +-spec path_for(file:name_all(), rabbit_types:vhost()) -> file:filename_all(). path_for(Dir, VHost) -> filename:join(Dir, vhost_name_to_dir_name(VHost)). +-spec path_for(file:name_all(), rabbit_types:vhost(), string()) -> file:filename_all(). path_for(Dir, VHost, Suffix) -> filename:join(Dir, vhost_name_to_dir_name(VHost, Suffix)). - -spec vhost_name_to_table_name(rabbit_types:vhost()) -> atom(). vhost_name_to_table_name(VHost) -> diff --git a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl index 756382b918..72f52b3977 100644 --- a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl +++ b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl @@ -28,17 +28,19 @@ upgrade/5, takeover/7]). +-type option(T) :: undefined | T. + -record(state, { - socket, + socket :: {rabbit_proxy_socket, any(), any()} | rabbit_net:socket(), parse_state = rabbit_mqtt_packet:initial_state() :: rabbit_mqtt_packet:state(), proc_state :: undefined | rabbit_mqtt_processor:state(), connection_state = running :: running | blocked, conserve = false :: boolean(), - stats_timer :: undefined | rabbit_event:state(), + stats_timer :: option(rabbit_event:state()), keepalive = rabbit_mqtt_keepalive:init() :: rabbit_mqtt_keepalive:state(), - conn_name :: undefined | binary(), + conn_name :: option(binary()), received_connect_packet = false :: boolean() - }). + }). -type state() :: #state{}. @@ -66,10 +68,6 @@ takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, {HandlerState, {Handler, {HandlerState#state{socket = Sock}, PeerAddr}}). %% cowboy_websocket --spec init(Req, any()) -> - {ok | module(), Req, any()} | - {module(), Req, any(), any()} - when Req::cowboy_req:req(). init(Req, Opts) -> case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of undefined -> @@ -225,7 +223,6 @@ websocket_info(Msg, State) -> ?LOG_WARNING("Web MQTT: unexpected message ~tp", [Msg]), {[], State, hibernate}. --spec terminate(any(), cowboy_req:req(), any()) -> ok. terminate(_Reason, _Req, #state{proc_state = undefined}) -> ok; terminate(Reason, Request, #state{} = State) ->