Merge pull request #14278 from rabbitmq/shovel-metrics
Test (make) / Build and Xref (1.18, 26) (push) Waiting to run Details
Test (make) / Build and Xref (1.18, 27) (push) Waiting to run Details
Test (make) / Build and Xref (1.18, 28) (push) Waiting to run Details
Test (make) / Test (1.18, 28, khepri) (push) Waiting to run Details
Test (make) / Test (1.18, 28, mnesia) (push) Waiting to run Details
Test (make) / Test mixed clusters (1.18, 28, khepri) (push) Waiting to run Details
Test (make) / Test mixed clusters (1.18, 28, mnesia) (push) Waiting to run Details
Test (make) / Type check (1.18, 28) (push) Waiting to run Details
Trigger a 4.2.x alpha release build / trigger_alpha_build (push) Has been cancelled Details

rabbitmqctl shovel_status: handle metrics
This commit is contained in:
Michael Klishin 2025-07-28 08:43:40 -04:00 committed by GitHub
commit d39e18f45c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 54 additions and 8 deletions

View File

@ -75,11 +75,12 @@ aliases() ->
[]. [].
output({stream, ShovelStatus}, _Opts) -> output({stream, ShovelStatus}, _Opts) ->
Formatted = [fmt_name(Name, Formatted = [fmt_metrics(Metrics,
fmt_name(Name,
fmt_status(Status, fmt_status(Status,
#{type => Type, #{type => Type,
last_changed => fmt_ts(Timestamp)})) last_changed => fmt_ts(Timestamp)})))
|| {Name, Type, Status, Timestamp} <- ShovelStatus], || {Name, Type, Status, Metrics, Timestamp} <- ShovelStatus],
{stream, Formatted}; {stream, Formatted};
output(E, _Opts) -> output(E, _Opts) ->
'Elixir.RabbitMQ.CLI.DefaultOutput':output(E). 'Elixir.RabbitMQ.CLI.DefaultOutput':output(E).
@ -129,3 +130,6 @@ details_to_map(Proplist) ->
{dest_exchange, destination_exchange}, {dest_exchange_key, destination_exchange_key}], {dest_exchange, destination_exchange}, {dest_exchange_key, destination_exchange_key}],
maps:from_list([{New, proplists:get_value(Old, Proplist)} maps:from_list([{New, proplists:get_value(Old, Proplist)}
|| {Old, New} <- Keys, proplists:is_defined(Old, Proplist)]). || {Old, New} <- Keys, proplists:is_defined(Old, Proplist)]).
fmt_metrics(Metrics, Map) ->
maps:merge(Metrics, Map).

View File

@ -8,6 +8,7 @@
-module(shovel_status_command_SUITE). -module(shovel_status_command_SUITE).
-include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("eunit/include/eunit.hrl").
-compile(export_all). -compile(export_all).
@ -26,7 +27,8 @@ groups() ->
run_starting, run_starting,
output_starting, output_starting,
run_running, run_running,
output_running output_running,
e2e
]} ]}
]. ].
@ -95,8 +97,14 @@ output_starting(Config) ->
[A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Opts = #{node => A}, Opts = #{node => A},
{stream, [#{vhost := <<"/">>, name := <<"test">>, type := dynamic, {stream, [#{vhost := <<"/">>, name := <<"test">>, type := dynamic,
state := starting, last_changed := <<"2016-11-17 10:00:00">>}]} state := starting, last_changed := <<"2016-11-17 10:00:00">>,
remaining := 10, remaining_unacked := 8,
pending := 5, forwarded := 3}]}
= ?CMD:output({stream, [{{<<"/">>, <<"test">>}, dynamic, starting, = ?CMD:output({stream, [{{<<"/">>, <<"test">>}, dynamic, starting,
#{remaining => 10,
remaining_unacked => 8,
pending => 5,
forwarded => 3},
{{2016, 11, 17}, {10, 00, 00}}}]}, Opts), {{2016, 11, 17}, {10, 00, 00}}}]}, Opts),
shovel_test_utils:clear_param(Config, <<"test">>). shovel_test_utils:clear_param(Config, <<"test">>).
@ -118,9 +126,43 @@ output_running(Config) ->
state := running, source := <<"amqp://server-1">>, state := running, source := <<"amqp://server-1">>,
destination := <<"amqp://server-2">>, destination := <<"amqp://server-2">>,
termination_reason := <<>>, termination_reason := <<>>,
last_changed := <<"2016-11-17 10:00:00">>}]} = last_changed := <<"2016-11-17 10:00:00">>,
remaining := 10,
remaining_unacked := 8,
pending := 5,
forwarded := 3}]} =
?CMD:output({stream, [{{<<"/">>, <<"test">>}, dynamic, ?CMD:output({stream, [{{<<"/">>, <<"test">>}, dynamic,
{running, [{src_uri, <<"amqp://server-1">>}, {running, [{src_uri, <<"amqp://server-1">>},
{dest_uri, <<"amqp://server-2">>}]}, {dest_uri, <<"amqp://server-2">>}]},
#{remaining => 10,
remaining_unacked => 8,
pending => 5,
forwarded => 3},
{{2016, 11, 17}, {10, 00, 00}}}]}, Opts), {{2016, 11, 17}, {10, 00, 00}}}]}, Opts),
shovel_test_utils:clear_param(Config, <<"test">>). shovel_test_utils:clear_param(Config, <<"test">>).
e2e(Config) ->
shovel_test_utils:set_param_nowait(
Config,
<<"test">>, [{<<"src-queue">>, <<"src">>},
{<<"dest-queue">>, <<"dest">>}]),
{ok, StdOut} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, [<<"shovel_status">>]),
[Msg, Headers0, Shovel0] = re:split(StdOut, <<"\n">>, [trim]),
?assertMatch(match, re:run(Msg, "Shovel status on node", [{capture, none}])),
Headers = re:split(Headers0, <<"\t">>, [trim]),
ExpectedHeaders = [<<"name">>, <<"vhost">>, <<"type">>, <<"state">>,
<<"source">>, <<"destination">>, <<"termination_reason">>,
<<"destination_protocol">>, <<"source_protocol">>,
<<"last_changed">>, <<"source_queue">>, <<"destination_queue">>,
<<"remaining">>, <<"remaining_unacked">>,
<<"pending">>, <<"forwarded">>],
?assert(lists:all(fun(H) ->
lists:member(H, Headers)
end, ExpectedHeaders)),
%% Check some values are there
ExpectedValues = [<<"test">>, <<"dynamic">>, <<"running">>],
Shovel = re:split(Shovel0, <<"\t">>, [trim]),
?assert(lists:all(fun(V) ->
lists:member(V, Shovel)
end, ExpectedValues)),
shovel_test_utils:clear_param(Config, <<"test">>).