diff --git a/deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.RestartShovelCommand.erl b/deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.RestartShovelCommand.erl index a1b762bba9..c8be462176 100644 --- a/deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.RestartShovelCommand.erl +++ b/deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.RestartShovelCommand.erl @@ -63,7 +63,7 @@ run([Name], #{node := Node, vhost := VHost}) -> undefined -> {error, rabbit_data_coercion:to_binary(ErrMsg)}; Match -> - {{_Name, _VHost}, _Type, {_State, Opts}, _Timestamp} = Match, + {{_Name, _VHost}, _Type, {_State, Opts}, _Metrics, _Timestamp} = Match, {_, HostingNode} = lists:keyfind(node, 1, Opts), case rabbit_misc:rpc_call( HostingNode, rabbit_shovel_util, restart_shovel, [VHost, Name]) of diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl index 67d092eaba..823dd481e9 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl @@ -83,7 +83,7 @@ -callback forward(Tag :: tag(), Props :: #{atom() => any()}, Payload :: binary(), state()) -> state() | {stop, any()}. --callback status(state()) -> rabbit_shovel_status:blocked_status() | ignore. +-callback status(state()) -> rabbit_shovel_status:shovel_status(). -spec parse(atom(), binary(), {source | destination, proplists:proplist()}) -> source_config() | dest_config(). @@ -155,12 +155,14 @@ ack(Tag, Multi, #{source := #{module := Mod}} = State) -> nack(Tag, Multi, #{source := #{module := Mod}} = State) -> Mod:nack(Tag, Multi, State). +-spec status(state()) -> {rabbit_shovel_status:shovel_status(), rabbit_shovel_status:metrics()}. status(#{dest := #{module := Mod}} = State) -> {Mod:status(State), metrics(State)}. incr_forwarded(State = #{dest := Dest}) -> State#{dest => maps:put(forwarded, maps:get(forwarded, Dest, 0) + 1, Dest)}. +-spec metrics(state()) -> rabbit_shovel_status:metrics(). metrics(_State = #{source := Source, dest := Dest}) -> #{remaining => maps:get(remaining, Source, unlimited), diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_status.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_status.erl index 75d35be1a3..e8b5800680 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_status.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_status.erl @@ -36,12 +36,18 @@ | {running, proplists:proplist()} | {terminated, term()}. -type blocked_status() :: running | flow | blocked. +-type shovel_status() :: blocked_status() | ignore. -type name() :: binary() | {rabbit_types:vhost(), binary()}. -type type() :: static | dynamic. --type status_tuple() :: {name(), type(), info(), calendar:datetime()}. +-type metrics() :: #{remaining := rabbit_types:option(non_neg_integer()) | unlimited, + remaining_unacked := rabbit_types:option(non_neg_integer()), + pending := rabbit_types:option(non_neg_integer()), + forwarded := rabbit_types:option(non_neg_integer()) + } | #{}. +-type status_tuple() :: {name(), type(), info(), metrics(), calendar:datetime()}. --export_type([info/0, blocked_status/0]). +-export_type([info/0, blocked_status/0, shovel_status/0, metrics/0]). -record(state, {timer}). -record(entry, {name :: name(), @@ -49,11 +55,7 @@ info :: info(), blocked_status = running :: blocked_status(), blocked_at :: integer() | undefined, - metrics :: #{remaining := rabbit_types:option(non_neg_integer()) | unlimited, - ramaining_unacked := rabbit_types:option(non_neg_integer()), - pending := rabbit_types:option(non_neg_integer()), - forwarded := rabbit_types:option(non_neg_integer()) - }, + metrics = #{} :: metrics(), timestamp :: calendar:datetime()}). @@ -64,7 +66,7 @@ start_link() -> report(Name, Type, Info) -> gen_server:cast(?SERVER, {report, Name, Type, Info, calendar:local_time()}). --spec report_blocked_status(name(), blocked_status()) -> ok. +-spec report_blocked_status(name(), {blocked_status(), metrics()} | blocked_status()) -> ok. report_blocked_status(Name, Status) -> gen_server:cast(?SERVER, {report_blocked_status, Name, Status, erlang:monotonic_time()}). diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl index 09d7aa38e7..541df58e13 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl @@ -21,7 +21,7 @@ -record(state, {name :: binary() | {rabbit_types:vhost(), binary()}, type :: static | dynamic, config :: rabbit_shovel_behaviour:state(), - last_reported_status = running :: rabbit_shovel_status:blocked_status()}). + last_reported_status = {running, #{}} :: {rabbit_shovel_status:blocked_status(), rabbit_shovel_status:metrics()}}). start_link(Type, Name, Config) -> ShovelParameter = rabbit_shovel_util:get_shovel_parameter(Name), @@ -224,7 +224,7 @@ human_readable_name(Name) -> maybe_report_blocked_status(#state{config = Config, last_reported_status = LastStatus} = State) -> case rabbit_shovel_behaviour:status(Config) of - ignore -> + {ignore, _} -> State; LastStatus -> State;