diff --git a/deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteShovelCommand.erl b/deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteShovelCommand.erl index 0529e6a207..6c8a030065 100644 --- a/deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteShovelCommand.erl +++ b/deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteShovelCommand.erl @@ -77,7 +77,7 @@ run([Name], #{node := Node, vhost := VHost}) -> try_force_removing(Node, VHost, Name, ActingUser), {error, rabbit_data_coercion:to_binary(ErrMsg)}; Match -> - {{_Name, _VHost}, _Type, {_State, Opts}, _Timestamp} = Match, + {{_Name, _VHost}, _Type, {_State, Opts}, _Metrics, _Timestamp} = Match, {_, HostingNode} = lists:keyfind(node, 1, Opts), case rabbit_misc:rpc_call( HostingNode, rabbit_shovel_util, delete_shovel, [VHost, Name, ActingUser]) of diff --git a/deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.RestartShovelCommand.erl b/deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.RestartShovelCommand.erl index a1b762bba9..c8be462176 100644 --- a/deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.RestartShovelCommand.erl +++ b/deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.RestartShovelCommand.erl @@ -63,7 +63,7 @@ run([Name], #{node := Node, vhost := VHost}) -> undefined -> {error, rabbit_data_coercion:to_binary(ErrMsg)}; Match -> - {{_Name, _VHost}, _Type, {_State, Opts}, _Timestamp} = Match, + {{_Name, _VHost}, _Type, {_State, Opts}, _Metrics, _Timestamp} = Match, {_, HostingNode} = lists:keyfind(node, 1, Opts), case rabbit_misc:rpc_call( HostingNode, rabbit_shovel_util, restart_shovel, [VHost, Name]) of diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl index 1cc53f8d7f..1740e7aad2 100644 --- a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl @@ -365,15 +365,17 @@ publish(IncomingTag, Method, Msg, ok = amqp_channel:call(OutboundChan, Method, Msg) end, + #{dest := Dst1} = State1 = rabbit_shovel_behaviour:incr_forwarded(State), + rabbit_shovel_behaviour:decr_remaining_unacked( case AckMode of no_ack -> - rabbit_shovel_behaviour:decr_remaining(1, State); + rabbit_shovel_behaviour:decr_remaining(1, State1); on_confirm -> - State#{dest => Dst#{unacked => Unacked#{Seq => IncomingTag}}}; + State1#{dest => Dst1#{unacked => Unacked#{Seq => IncomingTag}}}; on_publish -> - State1 = rabbit_shovel_behaviour:ack(IncomingTag, false, State), - rabbit_shovel_behaviour:decr_remaining(1, State1) + State2 = rabbit_shovel_behaviour:ack(IncomingTag, false, State1), + rabbit_shovel_behaviour:decr_remaining(1, State2) end). control_throttle(State) -> diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl index eef7906033..823dd481e9 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl @@ -30,7 +30,8 @@ status/1, % common functions decr_remaining_unacked/1, - decr_remaining/2 + decr_remaining/2, + incr_forwarded/1 ]). -type tag() :: non_neg_integer(). @@ -82,7 +83,7 @@ -callback forward(Tag :: tag(), Props :: #{atom() => any()}, Payload :: binary(), state()) -> state() | {stop, any()}. --callback status(state()) -> rabbit_shovel_status:blocked_status() | ignore. +-callback status(state()) -> rabbit_shovel_status:shovel_status(). -spec parse(atom(), binary(), {source | destination, proplists:proplist()}) -> source_config() | dest_config(). @@ -154,8 +155,21 @@ ack(Tag, Multi, #{source := #{module := Mod}} = State) -> nack(Tag, Multi, #{source := #{module := Mod}} = State) -> Mod:nack(Tag, Multi, State). +-spec status(state()) -> {rabbit_shovel_status:shovel_status(), rabbit_shovel_status:metrics()}. status(#{dest := #{module := Mod}} = State) -> - Mod:status(State). + {Mod:status(State), metrics(State)}. + +incr_forwarded(State = #{dest := Dest}) -> + State#{dest => maps:put(forwarded, maps:get(forwarded, Dest, 0) + 1, Dest)}. + +-spec metrics(state()) -> rabbit_shovel_status:metrics(). +metrics(_State = #{source := Source, + dest := Dest}) -> + #{remaining => maps:get(remaining, Source, unlimited), + remaining_unacked => maps:get(remaining_unacked, Source, 0), + pending => maps:get(pending, Dest, 0), + forwarded => maps:get(forwarded, Dest, 0)}. + %% Common functions diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_status.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_status.erl index 0612b6c07e..e8b5800680 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_status.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_status.erl @@ -36,12 +36,18 @@ | {running, proplists:proplist()} | {terminated, term()}. -type blocked_status() :: running | flow | blocked. +-type shovel_status() :: blocked_status() | ignore. -type name() :: binary() | {rabbit_types:vhost(), binary()}. -type type() :: static | dynamic. --type status_tuple() :: {name(), type(), info(), calendar:datetime()}. +-type metrics() :: #{remaining := rabbit_types:option(non_neg_integer()) | unlimited, + remaining_unacked := rabbit_types:option(non_neg_integer()), + pending := rabbit_types:option(non_neg_integer()), + forwarded := rabbit_types:option(non_neg_integer()) + } | #{}. +-type status_tuple() :: {name(), type(), info(), metrics(), calendar:datetime()}. --export_type([info/0, blocked_status/0]). +-export_type([info/0, blocked_status/0, shovel_status/0, metrics/0]). -record(state, {timer}). -record(entry, {name :: name(), @@ -49,6 +55,8 @@ info :: info(), blocked_status = running :: blocked_status(), blocked_at :: integer() | undefined, + metrics = #{} :: metrics(), + timestamp :: calendar:datetime()}). start_link() -> @@ -58,7 +66,7 @@ start_link() -> report(Name, Type, Info) -> gen_server:cast(?SERVER, {report, Name, Type, Info, calendar:local_time()}). --spec report_blocked_status(name(), blocked_status()) -> ok. +-spec report_blocked_status(name(), {blocked_status(), metrics()} | blocked_status()) -> ok. report_blocked_status(Name, Status) -> gen_server:cast(?SERVER, {report_blocked_status, Name, Status, erlang:monotonic_time()}). @@ -112,6 +120,7 @@ handle_call(status, _From, State) -> {reply, [{Entry#entry.name, Entry#entry.type, blocked_status_to_info(Entry), + Entry#entry.metrics, Entry#entry.timestamp} || Entry <- Entries], State}; @@ -120,6 +129,7 @@ handle_call({lookup, Name}, _From, State) -> [Entry] -> [{name, Name}, {type, Entry#entry.type}, {info, blocked_status_to_info(Entry)}, + {metrics, Entry#entry.metrics}, {timestamp, Entry#entry.timestamp}]; [] -> not_found end, @@ -141,6 +151,18 @@ handle_cast({report, Name, Type, Info, Timestamp}, State) -> split_name(Name) ++ split_status(Info)), {noreply, State}; +handle_cast({report_blocked_status, Name, {Status, Metrics}, Timestamp}, State) -> + case Status of + flow -> + true = ets:update_element(?ETS_NAME, Name, [{#entry.blocked_status, flow}, + {#entry.metrics, Metrics}, + {#entry.blocked_at, Timestamp}]); + _ -> + true = ets:update_element(?ETS_NAME, Name, [{#entry.blocked_status, Status}, + {#entry.metrics, Metrics}]) + end, + {noreply, State}; +%% used in tests handle_cast({report_blocked_status, Name, Status, Timestamp}, State) -> case Status of flow -> @@ -178,22 +200,22 @@ code_change(_OldVsn, State, _Extra) -> inject_node_info(Node, Shovels) -> lists:map( %% starting - fun({Name, Type, State, Timestamp}) when is_atom(State) -> + fun({Name, Type, State, Metrics, Timestamp}) when is_atom(State) -> Opts = [{node, Node}], - {Name, Type, {State, Opts}, Timestamp}; + {Name, Type, {State, Opts}, Metrics, Timestamp}; %% terminated - ({Name, Type, {terminated, Reason}, Timestamp}) -> - {Name, Type, {terminated, Reason}, Timestamp}; + ({Name, Type, {terminated, Reason}, Metrics, Timestamp}) -> + {Name, Type, {terminated, Reason}, Metrics, Timestamp}; %% running - ({Name, Type, {State, Opts}, Timestamp}) -> + ({Name, Type, {State, Opts}, Metrics, Timestamp}) -> Opts1 = Opts ++ [{node, Node}], - {Name, Type, {State, Opts1}, Timestamp} + {Name, Type, {State, Opts1}, Metrics, Timestamp} end, Shovels). -spec find_matching_shovel(rabbit_types:vhost(), binary(), [status_tuple()]) -> status_tuple() | undefined. find_matching_shovel(VHost, Name, Shovels) -> case lists:filter( - fun ({{V, S}, _Kind, _Status, _}) -> + fun ({{V, S}, _Kind, _Status, _Metrics, _}) -> VHost =:= V andalso Name =:= S end, Shovels) of [] -> undefined; diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl index 09d7aa38e7..541df58e13 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl @@ -21,7 +21,7 @@ -record(state, {name :: binary() | {rabbit_types:vhost(), binary()}, type :: static | dynamic, config :: rabbit_shovel_behaviour:state(), - last_reported_status = running :: rabbit_shovel_status:blocked_status()}). + last_reported_status = {running, #{}} :: {rabbit_shovel_status:blocked_status(), rabbit_shovel_status:metrics()}}). start_link(Type, Name, Config) -> ShovelParameter = rabbit_shovel_util:get_shovel_parameter(Name), @@ -224,7 +224,7 @@ human_readable_name(Name) -> maybe_report_blocked_status(#state{config = Config, last_reported_status = LastStatus} = State) -> case rabbit_shovel_behaviour:status(Config) of - ignore -> + {ignore, _} -> State; LastStatus -> State; diff --git a/deps/rabbitmq_shovel/test/amqp10_SUITE.erl b/deps/rabbitmq_shovel/test/amqp10_SUITE.erl index 5ecf53279c..937d37037c 100644 --- a/deps/rabbitmq_shovel/test/amqp10_SUITE.erl +++ b/deps/rabbitmq_shovel/test/amqp10_SUITE.erl @@ -139,7 +139,7 @@ amqp10_destination(Config, AckMode) -> throw(timeout_waiting_for_deliver1) end, - [{test_shovel, static, {running, _Info}, _Time}] = + [{test_shovel, static, {running, _Info}, _Metrics, _Time}] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, status, []), amqp10_client:detach_link(Receiver), @@ -183,7 +183,7 @@ amqp10_source(Config, AckMode) -> after ?TIMEOUT -> throw(timeout_waiting_for_deliver1) end, - [{test_shovel, static, {running, _Info}, _Time}] = + [{test_shovel, static, {running, _Info}, _Metrics, _Time}] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, status, []), rabbit_ct_client_helpers:close_channel(Chan). @@ -267,7 +267,7 @@ setup_shovel(ShovelConfig) -> await_running_shovel(test_shovel). await_running_shovel(Name) -> - case [N || {N, _, {running, _}, _} + case [N || {N, _, {running, _}, _, _} <- rabbit_shovel_status:status(), N =:= Name] of [_] -> ok; diff --git a/deps/rabbitmq_shovel/test/configuration_SUITE.erl b/deps/rabbitmq_shovel/test/configuration_SUITE.erl index a0f9385e95..603243966f 100644 --- a/deps/rabbitmq_shovel/test/configuration_SUITE.erl +++ b/deps/rabbitmq_shovel/test/configuration_SUITE.erl @@ -277,7 +277,7 @@ run_valid_test(Config) -> after ?TIMEOUT -> throw(timeout_waiting_for_deliver1) end, - [{test_shovel, static, {running, _Info}, _Time}] = + [{test_shovel, static, {running, _Info}, _Metrics, _Time}] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, status, []), @@ -407,7 +407,7 @@ setup_shovels2(Config) -> ok = application:start(rabbitmq_shovel). await_running_shovel(Name) -> - case [N || {N, _, {running, _}, _} + case [N || {N, _, {running, _}, _Metrics, _} <- rabbit_shovel_status:status(), N =:= Name] of [_] -> ok; @@ -415,7 +415,7 @@ await_running_shovel(Name) -> await_running_shovel(Name) end. await_terminated_shovel(Name) -> - case [N || {N, _, {terminated, _}, _} + case [N || {N, _, {terminated, _}, _Metrics, _} <- rabbit_shovel_status:status(), N =:= Name] of [_] -> ok; diff --git a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl index 554f25393f..e6e21e02dd 100644 --- a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl @@ -118,13 +118,17 @@ end_per_testcase(Testcase, Config) -> %% ------------------------------------------------------------------- simple(Config) -> + Name = <<"test">>, with_ch(Config, fun (Ch) -> shovel_test_utils:set_param( Config, - <<"test">>, [{<<"src-queue">>, <<"src">>}, + Name, [{<<"src-queue">>, <<"src">>}, {<<"dest-queue">>, <<"dest">>}]), - publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>) + publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>), + Status = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, lookup, [{<<"/">>, Name}]), + ?assertMatch([_|_], Status), + ?assertMatch(#{metrics := #{forwarded := 1}}, maps:from_list(Status)) end). quorum_queues(Config) -> diff --git a/deps/rabbitmq_shovel/test/shovel_status_command_SUITE.erl b/deps/rabbitmq_shovel/test/shovel_status_command_SUITE.erl index 26fc2aa664..a4bbbb29b9 100644 --- a/deps/rabbitmq_shovel/test/shovel_status_command_SUITE.erl +++ b/deps/rabbitmq_shovel/test/shovel_status_command_SUITE.erl @@ -82,11 +82,11 @@ run_starting(Config) -> [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Opts = #{node => A}, case ?CMD:run([], Opts) of - {stream, [{{<<"/">>, <<"test">>}, dynamic, starting, _}]} -> + {stream, [{{<<"/">>, <<"test">>}, dynamic, starting, _, _}]} -> ok; {stream, []} -> throw(shovel_not_found); - {stream, [{{<<"/">>, <<"test">>}, dynamic, {running, _}, _}]} -> + {stream, [{{<<"/">>, <<"test">>}, dynamic, {running, _}, _, _}]} -> ct:pal("Shovel is already running, starting could not be tested!") end, shovel_test_utils:clear_param(Config, <<"test">>). @@ -107,7 +107,7 @@ run_running(Config) -> {<<"dest-queue">>, <<"dest">>}]), [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Opts = #{node => A}, - {stream, [{{<<"/">>, <<"test">>}, dynamic, {running, _}, _}]} + {stream, [{{<<"/">>, <<"test">>}, dynamic, {running, _}, _, _}]} = ?CMD:run([], Opts), shovel_test_utils:clear_param(Config, <<"test">>). diff --git a/deps/rabbitmq_shovel/test/shovel_test_utils.erl b/deps/rabbitmq_shovel/test/shovel_test_utils.erl index 3107f2ecbc..b3593c4d99 100644 --- a/deps/rabbitmq_shovel/test/shovel_test_utils.erl +++ b/deps/rabbitmq_shovel/test/shovel_test_utils.erl @@ -65,7 +65,8 @@ shovels_from_status() -> shovels_from_status(ExpectedState) -> S = rabbit_shovel_status:status(), - [N || {{<<"/">>, N}, dynamic, {State, _}, _} <- S, State == ExpectedState]. + [N || {{<<"/">>, N}, dynamic, {State, _}, _, _} <- S, State == ExpectedState] ++ + [N || {{<<"/">>, N}, dynamic, {State, _}, _} <- S, State == ExpectedState]. get_shovel_status(Config, Name) -> get_shovel_status(Config, 0, Name). @@ -111,4 +112,4 @@ restart_shovel(Config, Name) -> restart_shovel(Config, Node, Name) -> rabbit_ct_broker_helpers:rpc(Config, - Node, rabbit_shovel_util, restart_shovel, [<<"/">>, Name]). \ No newline at end of file + Node, rabbit_shovel_util, restart_shovel, [<<"/">>, Name]). diff --git a/deps/rabbitmq_shovel_management/src/rabbit_shovel_mgmt_util.erl b/deps/rabbitmq_shovel_management/src/rabbit_shovel_mgmt_util.erl index b6f5a04c5f..0b05bda1e5 100644 --- a/deps/rabbitmq_shovel_management/src/rabbit_shovel_mgmt_util.erl +++ b/deps/rabbitmq_shovel_management/src/rabbit_shovel_mgmt_util.erl @@ -42,7 +42,7 @@ status(Node) -> [format(Node, I) || I <- Status] end. -format(Node, {Name, Type, Info, TS}) -> +format(Node, {Name, Type, Info, _Metrics, TS}) -> [{node, Node}, {timestamp, format_ts(TS)}] ++ format_name(Type, Name) ++ format_info(Info). diff --git a/deps/rabbitmq_shovel_prometheus/src/rabbit_shovel_prometheus_collector.erl b/deps/rabbitmq_shovel_prometheus/src/rabbit_shovel_prometheus_collector.erl index 13ad734ac0..dbe2e2f97b 100644 --- a/deps/rabbitmq_shovel_prometheus/src/rabbit_shovel_prometheus_collector.erl +++ b/deps/rabbitmq_shovel_prometheus/src/rabbit_shovel_prometheus_collector.erl @@ -29,9 +29,9 @@ deregister_cleanup(_) -> ok. collect_mf(_Registry, Callback) -> Status = rabbit_shovel_status:status(500), - {StaticStatusGroups, DynamicStatusGroups} = lists:foldl(fun({_,static,{S, _}, _}, {SMap, DMap}) -> + {StaticStatusGroups, DynamicStatusGroups} = lists:foldl(fun({_,static,{S, _}, _, _}, {SMap, DMap}) -> {maps:update_with(S, fun(C) -> C + 1 end, 1, SMap), DMap}; - ({_,dynamic,{S, _}, _}, {SMap, DMap}) -> + ({_,dynamic,{S, _}, _, _}, {SMap, DMap}) -> {SMap, maps:update_with(S, fun(C) -> C + 1 end, 1, DMap)} end, {#{}, #{}}, Status), diff --git a/deps/rabbitmq_shovel_prometheus/test/prometheus_rabbitmq_shovel_collector_SUITE.erl b/deps/rabbitmq_shovel_prometheus/test/prometheus_rabbitmq_shovel_collector_SUITE.erl index 495f23e24c..10ca7cd17c 100644 --- a/deps/rabbitmq_shovel_prometheus/test/prometheus_rabbitmq_shovel_collector_SUITE.erl +++ b/deps/rabbitmq_shovel_prometheus/test/prometheus_rabbitmq_shovel_collector_SUITE.erl @@ -226,10 +226,10 @@ await_shovel(Name, Type) -> shovels_from_status(ExpectedState, dynamic) -> S = rabbit_shovel_status:status(), - [N || {{<<"/">>, N}, dynamic, {State, _}, _} <- S, State == ExpectedState]; + [N || {{<<"/">>, N}, dynamic, {State, _}, _, _} <- S, State == ExpectedState]; shovels_from_status(ExpectedState, static) -> S = rabbit_shovel_status:status(), - [N || {N, static, {State, _}, _} <- S, State == ExpectedState]. + [N || {N, static, {State, _}, _, _} <- S, State == ExpectedState]. get_shovel_status(Config, Name) -> get_shovel_status(Config, 0, Name).