From 1a06bc01cde0448de38b05bebdaa97744af137c8 Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Thu, 24 Jul 2025 12:45:15 +0200 Subject: [PATCH 1/2] Rabbitmqctl shovel_status: handle metrics The ctl command got broken when metrics were added to the status. As we have the metrics now, we just report them. --- ...bbitMQ.CLI.Ctl.Commands.ShovelStatusCommand.erl | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ShovelStatusCommand.erl b/deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ShovelStatusCommand.erl index 5e17608f61..1d37cd66c2 100644 --- a/deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ShovelStatusCommand.erl +++ b/deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ShovelStatusCommand.erl @@ -75,11 +75,12 @@ aliases() -> []. output({stream, ShovelStatus}, _Opts) -> - Formatted = [fmt_name(Name, - fmt_status(Status, - #{type => Type, - last_changed => fmt_ts(Timestamp)})) - || {Name, Type, Status, Timestamp} <- ShovelStatus], + Formatted = [fmt_metrics(Metrics, + fmt_name(Name, + fmt_status(Status, + #{type => Type, + last_changed => fmt_ts(Timestamp)}))) + || {Name, Type, Status, Metrics, Timestamp} <- ShovelStatus], {stream, Formatted}; output(E, _Opts) -> 'Elixir.RabbitMQ.CLI.DefaultOutput':output(E). @@ -129,3 +130,6 @@ details_to_map(Proplist) -> {dest_exchange, destination_exchange}, {dest_exchange_key, destination_exchange_key}], maps:from_list([{New, proplists:get_value(Old, Proplist)} || {Old, New} <- Keys, proplists:is_defined(Old, Proplist)]). + +fmt_metrics(Metrics, Map) -> + maps:merge(Metrics, Map). From 2a0401633e7e6139f87175bb837f1f4d104f55ca Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Mon, 28 Jul 2025 13:22:37 +0200 Subject: [PATCH 2/2] Shovel: status tests --- .../test/shovel_status_command_SUITE.erl | 48 +++++++++++++++++-- 1 file changed, 45 insertions(+), 3 deletions(-) diff --git a/deps/rabbitmq_shovel/test/shovel_status_command_SUITE.erl b/deps/rabbitmq_shovel/test/shovel_status_command_SUITE.erl index a4bbbb29b9..9d2ec522c0 100644 --- a/deps/rabbitmq_shovel/test/shovel_status_command_SUITE.erl +++ b/deps/rabbitmq_shovel/test/shovel_status_command_SUITE.erl @@ -8,6 +8,7 @@ -module(shovel_status_command_SUITE). -include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("eunit/include/eunit.hrl"). -compile(export_all). @@ -26,7 +27,8 @@ groups() -> run_starting, output_starting, run_running, - output_running + output_running, + e2e ]} ]. @@ -95,8 +97,14 @@ output_starting(Config) -> [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Opts = #{node => A}, {stream, [#{vhost := <<"/">>, name := <<"test">>, type := dynamic, - state := starting, last_changed := <<"2016-11-17 10:00:00">>}]} + state := starting, last_changed := <<"2016-11-17 10:00:00">>, + remaining := 10, remaining_unacked := 8, + pending := 5, forwarded := 3}]} = ?CMD:output({stream, [{{<<"/">>, <<"test">>}, dynamic, starting, + #{remaining => 10, + remaining_unacked => 8, + pending => 5, + forwarded => 3}, {{2016, 11, 17}, {10, 00, 00}}}]}, Opts), shovel_test_utils:clear_param(Config, <<"test">>). @@ -118,9 +126,43 @@ output_running(Config) -> state := running, source := <<"amqp://server-1">>, destination := <<"amqp://server-2">>, termination_reason := <<>>, - last_changed := <<"2016-11-17 10:00:00">>}]} = + last_changed := <<"2016-11-17 10:00:00">>, + remaining := 10, + remaining_unacked := 8, + pending := 5, + forwarded := 3}]} = ?CMD:output({stream, [{{<<"/">>, <<"test">>}, dynamic, {running, [{src_uri, <<"amqp://server-1">>}, {dest_uri, <<"amqp://server-2">>}]}, + #{remaining => 10, + remaining_unacked => 8, + pending => 5, + forwarded => 3}, {{2016, 11, 17}, {10, 00, 00}}}]}, Opts), shovel_test_utils:clear_param(Config, <<"test">>). + +e2e(Config) -> + shovel_test_utils:set_param_nowait( + Config, + <<"test">>, [{<<"src-queue">>, <<"src">>}, + {<<"dest-queue">>, <<"dest">>}]), + {ok, StdOut} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, [<<"shovel_status">>]), + [Msg, Headers0, Shovel0] = re:split(StdOut, <<"\n">>, [trim]), + ?assertMatch(match, re:run(Msg, "Shovel status on node", [{capture, none}])), + Headers = re:split(Headers0, <<"\t">>, [trim]), + ExpectedHeaders = [<<"name">>, <<"vhost">>, <<"type">>, <<"state">>, + <<"source">>, <<"destination">>, <<"termination_reason">>, + <<"destination_protocol">>, <<"source_protocol">>, + <<"last_changed">>, <<"source_queue">>, <<"destination_queue">>, + <<"remaining">>, <<"remaining_unacked">>, + <<"pending">>, <<"forwarded">>], + ?assert(lists:all(fun(H) -> + lists:member(H, Headers) + end, ExpectedHeaders)), + %% Check some values are there + ExpectedValues = [<<"test">>, <<"dynamic">>, <<"running">>], + Shovel = re:split(Shovel0, <<"\t">>, [trim]), + ?assert(lists:all(fun(V) -> + lists:member(V, Shovel) + end, ExpectedValues)), + shovel_test_utils:clear_param(Config, <<"test">>).