RMQ-1263: Shovels forward counter - fix dialyzer

(cherry picked from commit af22cf427a7054d93b3dd64fda01a86649fdd7c5)
This commit is contained in:
Iliia Khaprov 2025-03-17 21:36:43 +01:00 committed by Michael Klishin
parent d4c1121c77
commit c2569d26f2
No known key found for this signature in database
GPG Key ID: FF4F6501646A9C9A
4 changed files with 16 additions and 12 deletions

View File

@ -63,7 +63,7 @@ run([Name], #{node := Node, vhost := VHost}) ->
undefined -> undefined ->
{error, rabbit_data_coercion:to_binary(ErrMsg)}; {error, rabbit_data_coercion:to_binary(ErrMsg)};
Match -> Match ->
{{_Name, _VHost}, _Type, {_State, Opts}, _Timestamp} = Match, {{_Name, _VHost}, _Type, {_State, Opts}, _Metrics, _Timestamp} = Match,
{_, HostingNode} = lists:keyfind(node, 1, Opts), {_, HostingNode} = lists:keyfind(node, 1, Opts),
case rabbit_misc:rpc_call( case rabbit_misc:rpc_call(
HostingNode, rabbit_shovel_util, restart_shovel, [VHost, Name]) of HostingNode, rabbit_shovel_util, restart_shovel, [VHost, Name]) of

View File

@ -83,7 +83,7 @@
-callback forward(Tag :: tag(), Props :: #{atom() => any()}, -callback forward(Tag :: tag(), Props :: #{atom() => any()},
Payload :: binary(), state()) -> Payload :: binary(), state()) ->
state() | {stop, any()}. state() | {stop, any()}.
-callback status(state()) -> rabbit_shovel_status:blocked_status() | ignore. -callback status(state()) -> rabbit_shovel_status:shovel_status().
-spec parse(atom(), binary(), {source | destination, proplists:proplist()}) -> -spec parse(atom(), binary(), {source | destination, proplists:proplist()}) ->
source_config() | dest_config(). source_config() | dest_config().
@ -155,12 +155,14 @@ ack(Tag, Multi, #{source := #{module := Mod}} = State) ->
nack(Tag, Multi, #{source := #{module := Mod}} = State) -> nack(Tag, Multi, #{source := #{module := Mod}} = State) ->
Mod:nack(Tag, Multi, State). Mod:nack(Tag, Multi, State).
-spec status(state()) -> {rabbit_shovel_status:shovel_status(), rabbit_shovel_status:metrics()}.
status(#{dest := #{module := Mod}} = State) -> status(#{dest := #{module := Mod}} = State) ->
{Mod:status(State), metrics(State)}. {Mod:status(State), metrics(State)}.
incr_forwarded(State = #{dest := Dest}) -> incr_forwarded(State = #{dest := Dest}) ->
State#{dest => maps:put(forwarded, maps:get(forwarded, Dest, 0) + 1, Dest)}. State#{dest => maps:put(forwarded, maps:get(forwarded, Dest, 0) + 1, Dest)}.
-spec metrics(state()) -> rabbit_shovel_status:metrics().
metrics(_State = #{source := Source, metrics(_State = #{source := Source,
dest := Dest}) -> dest := Dest}) ->
#{remaining => maps:get(remaining, Source, unlimited), #{remaining => maps:get(remaining, Source, unlimited),

View File

@ -36,12 +36,18 @@
| {running, proplists:proplist()} | {running, proplists:proplist()}
| {terminated, term()}. | {terminated, term()}.
-type blocked_status() :: running | flow | blocked. -type blocked_status() :: running | flow | blocked.
-type shovel_status() :: blocked_status() | ignore.
-type name() :: binary() | {rabbit_types:vhost(), binary()}. -type name() :: binary() | {rabbit_types:vhost(), binary()}.
-type type() :: static | dynamic. -type type() :: static | dynamic.
-type status_tuple() :: {name(), type(), info(), calendar:datetime()}. -type metrics() :: #{remaining := rabbit_types:option(non_neg_integer()) | unlimited,
remaining_unacked := rabbit_types:option(non_neg_integer()),
pending := rabbit_types:option(non_neg_integer()),
forwarded := rabbit_types:option(non_neg_integer())
} | #{}.
-type status_tuple() :: {name(), type(), info(), metrics(), calendar:datetime()}.
-export_type([info/0, blocked_status/0]). -export_type([info/0, blocked_status/0, shovel_status/0, metrics/0]).
-record(state, {timer}). -record(state, {timer}).
-record(entry, {name :: name(), -record(entry, {name :: name(),
@ -49,11 +55,7 @@
info :: info(), info :: info(),
blocked_status = running :: blocked_status(), blocked_status = running :: blocked_status(),
blocked_at :: integer() | undefined, blocked_at :: integer() | undefined,
metrics :: #{remaining := rabbit_types:option(non_neg_integer()) | unlimited, metrics = #{} :: metrics(),
ramaining_unacked := rabbit_types:option(non_neg_integer()),
pending := rabbit_types:option(non_neg_integer()),
forwarded := rabbit_types:option(non_neg_integer())
},
timestamp :: calendar:datetime()}). timestamp :: calendar:datetime()}).
@ -64,7 +66,7 @@ start_link() ->
report(Name, Type, Info) -> report(Name, Type, Info) ->
gen_server:cast(?SERVER, {report, Name, Type, Info, calendar:local_time()}). gen_server:cast(?SERVER, {report, Name, Type, Info, calendar:local_time()}).
-spec report_blocked_status(name(), blocked_status()) -> ok. -spec report_blocked_status(name(), {blocked_status(), metrics()} | blocked_status()) -> ok.
report_blocked_status(Name, Status) -> report_blocked_status(Name, Status) ->
gen_server:cast(?SERVER, {report_blocked_status, Name, Status, erlang:monotonic_time()}). gen_server:cast(?SERVER, {report_blocked_status, Name, Status, erlang:monotonic_time()}).

View File

@ -21,7 +21,7 @@
-record(state, {name :: binary() | {rabbit_types:vhost(), binary()}, -record(state, {name :: binary() | {rabbit_types:vhost(), binary()},
type :: static | dynamic, type :: static | dynamic,
config :: rabbit_shovel_behaviour:state(), config :: rabbit_shovel_behaviour:state(),
last_reported_status = running :: rabbit_shovel_status:blocked_status()}). last_reported_status = {running, #{}} :: {rabbit_shovel_status:blocked_status(), rabbit_shovel_status:metrics()}}).
start_link(Type, Name, Config) -> start_link(Type, Name, Config) ->
ShovelParameter = rabbit_shovel_util:get_shovel_parameter(Name), ShovelParameter = rabbit_shovel_util:get_shovel_parameter(Name),
@ -224,7 +224,7 @@ human_readable_name(Name) ->
maybe_report_blocked_status(#state{config = Config, maybe_report_blocked_status(#state{config = Config,
last_reported_status = LastStatus} = State) -> last_reported_status = LastStatus} = State) ->
case rabbit_shovel_behaviour:status(Config) of case rabbit_shovel_behaviour:status(Config) of
ignore -> {ignore, _} ->
State; State;
LastStatus -> LastStatus ->
State; State;