rabbitmq-diagnostics message_size_stats

This command displays cluster-wide message size
statistics. It's less detailed than what can be
retrieved from the Prometheus endpoint, but
it'll be available to all users, regardless of their
monitoring setup, or lack thereof.
This commit is contained in:
Michal Kuratczyk 2025-09-17 12:17:42 +02:00 committed by Michael Klishin
parent 4a324706a4
commit bf3c378473
No known key found for this signature in database
GPG Key ID: 16AB14D00D613900
4 changed files with 306 additions and 38 deletions

View File

@ -11,7 +11,10 @@
-export([init/1,
observe/2,
prometheus_format/0]).
prometheus_format/0,
local_summary/0,
cluster_summary/0,
cluster_summary_for_cli/0 ]).
%% Integration tests.
-export([raw_buckets/1,
@ -52,6 +55,9 @@
-type raw_buckets() :: [{BucketUpperBound :: non_neg_integer(),
NumObservations :: non_neg_integer()}].
-type summary_entry() :: {{non_neg_integer(), non_neg_integer() | infinity}, {non_neg_integer(), float()}}.
-type summary() :: [summary_entry()].
-spec init(atom()) -> ok.
init(Protocol) ->
Size = ?POS_MSG_SIZE_SUM,
@ -133,6 +139,109 @@ get_labels_counters() ->
[{[{protocol, Protocol}], Counters}
|| {{?MODULE, Protocol}, Counters} <- persistent_term:get()].
get_protocols() ->
[Protocol
|| {{?MODULE, Protocol}, _} <- persistent_term:get()].
%% Aggregates data for all protocols on the local node
-spec local_summary() -> summary().
local_summary() ->
PerProtocolBuckets = lists:map(fun(Protocol) ->
raw_buckets(Protocol)
end, get_protocols()),
%% Sum buckets for all protocols
Buckets0 = [{?BUCKET_1, 0}, {?BUCKET_2, 0}, {?BUCKET_3, 0}, {?BUCKET_4, 0},
{?BUCKET_5, 0}, {?BUCKET_6, 0}, {?BUCKET_7, 0}, {?BUCKET_8, 0}, {?BUCKET_9, 0}],
Buckets = lists:foldl(fun sum_protocol_buckets/2,
Buckets0,
PerProtocolBuckets),
Total = lists:sum([Count || {_UpperBound, Count} <- Buckets]),
Ranges = lists:map(fun({UpperBound, Count}) ->
Percentage = case Total of
0 -> 0.0;
_ -> (Count / Total) * 100
end,
{bucket_range(UpperBound), {Count, Percentage}}
end, Buckets),
Ranges.
sum_protocol_buckets(ProtocolBuckets, Acc) ->
lists:map(fun({UpperBound, AccCount}) ->
ProtocolCount = proplists:get_value(UpperBound, ProtocolBuckets, 0),
{UpperBound, AccCount + ProtocolCount}
end, Acc).
%% Aggregates sumamries from all nodes
-spec cluster_summary() -> summary().
cluster_summary() ->
RemoteNodes = [Node || Node <- rabbit_nodes:list_running(), Node =/= node()],
RemoteSummaries = [ Summary || {ok, Summary} <- erpc:multicall(RemoteNodes,
?MODULE,
local_summary,
[],
5000)],
lists:foldl(fun merge_summaries/2, local_summary(), RemoteSummaries).
bucket_name({_, ?BUCKET_1}) -> <<"below 100B">>;
bucket_name({_, ?BUCKET_2}) -> <<"between 100B and 1KB">>;
bucket_name({_, ?BUCKET_3}) -> <<"between 1KB and 10KB">>;
bucket_name({_, ?BUCKET_4}) -> <<"between 10KB and 100KB">>;
bucket_name({_, ?BUCKET_5}) -> <<"between 100KB and 1MB">>;
bucket_name({_, ?BUCKET_6}) -> <<"between 1MB and 10MB">>;
bucket_name({_, ?BUCKET_7}) -> <<"between 10MB and 50MB">>;
bucket_name({_, ?BUCKET_8}) -> <<"between 50MB and 100MB">>;
bucket_name({_, ?BUCKET_9}) -> <<"above 100MB">>.
cluster_summary_for_cli() ->
[[{<<"Message Size">>, bucket_name(Range)},
{<<"Count">>, Count},
{<<"Percentage">>, iolist_to_binary(io_lib:format("~.2f", [Percentage]))}]
|| {Range, {Count, Percentage}} <- cluster_summary()].
get_count_for_range(Range, SummaryList) ->
case proplists:get_value(Range, SummaryList) of
{Count, _} -> Count;
undefined -> 0
end.
%% Merges two summary lists by adding their counts and recalculating percentages
merge_summaries(Summary1, Summary2) ->
%% Get all bucket ranges
AllRanges = lists:usort([Range || {Range, _} <- Summary1] ++ [Range || {Range, _} <- Summary2]),
MergedRanges = lists:map(fun(Range) ->
Count1 = get_count_for_range(Range, Summary1),
Count2 = get_count_for_range(Range, Summary2),
NewCount = Count1 + Count2,
{Range, NewCount}
end, AllRanges),
%% Calculate total and percentages
NewTotal = lists:sum([Count || {_, Count} <- MergedRanges]),
FinalRanges = lists:map(fun({Range, Count}) ->
NewPercentage = case NewTotal of
0 -> 0.0;
_ -> (Count / NewTotal) * 100
end,
{Range, {Count, NewPercentage}}
end, MergedRanges),
FinalRanges.
bucket_range(?BUCKET_1) -> {0, 100};
bucket_range(?BUCKET_2) -> {101, 1000};
bucket_range(?BUCKET_3) -> {1001, 10000};
bucket_range(?BUCKET_4) -> {10001, 100000};
bucket_range(?BUCKET_5) -> {100001, 1000000};
bucket_range(?BUCKET_6) -> {1000001, 10000000};
bucket_range(?BUCKET_7) -> {10000001, 50000000};
bucket_range(?BUCKET_8) -> {50000001, 100000000};
bucket_range(?BUCKET_9) -> {100000001, infinity}.
-ifdef(TEST).
%% "Counters are not tied to the current process and are automatically
%% garbage collected when they are no longer referenced."

View File

@ -13,7 +13,7 @@
-include_lib("amqp_client/include/amqp_client.hrl").
-import(rabbit_ct_broker_helpers,
[rpc/4]).
[rpc/4, rpc/5]).
all() ->
[
@ -22,9 +22,11 @@ all() ->
groups() ->
[
{tests, [shuffle],
[message_size,
over_max_message_size]}
{tests, [],
[summary, %% needs to run first
message_size,
over_max_message_size]
}
].
%% -------------------------------------------------------------------
@ -34,14 +36,18 @@ groups() ->
init_per_suite(Config) ->
{ok, _} = application:ensure_all_started(amqp10_client),
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config).
Config.
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
init_per_group(_Group, Config) ->
rabbit_ct_helpers:run_steps(
Config,
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
Config1 = rabbit_ct_helpers:set_config(
Config, [{rmq_nodes_count, 3},
{rmq_nodename_suffix, Suffix}]),
rabbit_ct_helpers:run_setup_steps(
Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).
@ -51,6 +57,13 @@ end_per_group(_Group, Config) ->
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).
init_per_testcase(summary, Config) ->
case rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.2.0') of
ok ->
rabbit_ct_helpers:testcase_started(Config, sumary);
{skip, _} = Skip ->
Skip
end;
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
@ -65,32 +78,7 @@ message_size(Config) ->
AmqplBefore = get_msg_size_metrics(amqp091, Config),
AmqpBefore = get_msg_size_metrics(amqp10, Config),
Binary2B = <<"12">>,
Binary200K = binary:copy(<<"x">>, 200_000),
Payloads = [Binary2B, Binary200K, Binary2B],
{AmqplConn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
[amqp_channel:call(Ch,
#'basic.publish'{routing_key = <<"nowhere">>},
#amqp_msg{payload = Payload})
|| Payload <- Payloads],
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
Address = rabbitmq_amqp_address:exchange(<<"amq.fanout">>),
{ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"sender">>, Address),
receive {amqp10_event, {link, Sender, credited}} -> ok
after 30_000 -> ct:fail(credited_timeout)
end,
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag1">>, Binary2B)),
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag2">>, Binary200K)),
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag3">>, Binary2B)),
ok = wait_for_settlement(released, <<"tag1">>),
ok = wait_for_settlement(released, <<"tag2">>),
ok = wait_for_settlement(released, <<"tag3">>),
publish_messages(Config),
AmqplAfter = get_msg_size_metrics(amqp091, Config),
AmqpAfter = get_msg_size_metrics(amqp10, Config),
@ -100,10 +88,7 @@ message_size(Config) ->
?assertEqual(ExpectedDiff,
rabbit_msg_size_metrics:diff_raw_buckets(AmqplAfter, AmqplBefore)),
?assertEqual(ExpectedDiff,
rabbit_msg_size_metrics:diff_raw_buckets(AmqpAfter, AmqpBefore)),
ok = amqp10_client:close_connection(Connection),
ok = rabbit_ct_client_helpers:close_connection_and_channel(AmqplConn, Ch).
rabbit_msg_size_metrics:diff_raw_buckets(AmqpAfter, AmqpBefore)).
over_max_message_size(Config) ->
DefaultMaxMessageSize = rpc(Config, persistent_term, get, [max_message_size]),
@ -134,6 +119,39 @@ over_max_message_size(Config) ->
ok = rabbit_ct_client_helpers:close_connection(Conn),
ok = rpc(Config, persistent_term, put, [max_message_size, DefaultMaxMessageSize]).
summary(Config) ->
ZeroSummary = [{{0, 100}, {0, 0.0}},
{{101, 1000}, {0, 0.0}},
{{1001, 10000}, {0, 0.0}},
{{10001, 100000}, {0, 0.0}},
{{100001, 1000000}, {0, 0.0}},
{{1000001, 10000000}, {0, 0.0}},
{{10000001, 50000000}, {0, 0.0}},
{{50000001, 100000000}, {0, 0.0}},
{{100000001, infinity}, {0, 0.0}}],
?assertEqual(ZeroSummary, rpc(Config, 0, rabbit_msg_size_metrics, local_summary, [])),
?assertEqual(ZeroSummary, rpc(Config, 1, rabbit_msg_size_metrics, cluster_summary, [])),
?assertEqual(ZeroSummary, rpc(Config, 0, rabbit_msg_size_metrics, local_summary, [])),
?assertEqual(ZeroSummary, rpc(Config, 1, rabbit_msg_size_metrics, cluster_summary, [])),
publish_messages(Config),
ExpectedSummary = [{{0, 100}, {4, 66.66666666666666}},
{{101, 1000}, {0, 0.0}},
{{1001, 10000}, {0, 0.0}},
{{10001, 100000}, {0, 0.0}},
{{100001, 1000000}, {2, 33.33333333333333}},
{{1000001, 10000000}, {0, 0.0}},
{{10000001, 50000000}, {0, 0.0}},
{{50000001, 100000000}, {0, 0.0}},
{{100000001, infinity}, {0, 0.0}}],
?assertEqual(ExpectedSummary, rpc(Config, 0, rabbit_msg_size_metrics, local_summary, [])),
?assertEqual(ExpectedSummary, rpc(Config, 0, rabbit_msg_size_metrics, cluster_summary, [])),
?assertEqual(ExpectedSummary, rpc(Config, 1, rabbit_msg_size_metrics, cluster_summary, [])),
?assertEqual(ZeroSummary, rpc(Config, 1, rabbit_msg_size_metrics, local_summary, [])).
get_msg_size_metrics(Protocol, Config) ->
rpc(Config, rabbit_msg_size_metrics, raw_buckets, [Protocol]).
@ -145,6 +163,36 @@ connection_config(Config) ->
container_id => <<"my container">>,
sasl => anon}.
publish_messages(Config) ->
Binary2B = <<"12">>,
Binary200K = binary:copy(<<"x">>, 200_000),
Payloads = [Binary2B, Binary200K, Binary2B],
{AmqplConn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
[amqp_channel:call(Ch,
#'basic.publish'{routing_key = <<"nowhere">>},
#amqp_msg{payload = Payload})
|| Payload <- Payloads],
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
Address = rabbitmq_amqp_address:exchange(<<"amq.fanout">>),
{ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"sender">>, Address),
receive {amqp10_event, {link, Sender, credited}} -> ok
after 30_000 -> ct:fail(credited_timeout)
end,
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag1">>, Binary2B)),
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag2">>, Binary200K)),
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag3">>, Binary2B)),
ok = wait_for_settlement(released, <<"tag1">>),
ok = wait_for_settlement(released, <<"tag2">>),
ok = wait_for_settlement(released, <<"tag3">>),
ok = amqp10_client:close_connection(Connection),
ok = rabbit_ct_client_helpers:close_connection_and_channel(AmqplConn, Ch).
wait_for_settlement(State, Tag) ->
receive
{amqp10_disposition, {State, Tag}} ->

View File

@ -0,0 +1,56 @@
## This Source Code Form is subject to the terms of the Mozilla Public
## License, v. 2.0. If a copy of the MPL was not distributed with this
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
##
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
defmodule RabbitMQ.CLI.Diagnostics.Commands.MessageSizeStatsCommand do
alias RabbitMQ.CLI.Core.DocGuide
@behaviour RabbitMQ.CLI.CommandBehaviour
@default_timeout 60_000
def scopes(), do: [:ctl, :diagnostics]
def switches(), do: [timeout: :integer]
def aliases(), do: [t: :timeout]
def merge_defaults(args, opts) do
timeout =
case opts[:timeout] do
nil -> @default_timeout
:infinity -> @default_timeout
other -> other
end
{args, Map.merge(%{timeout: timeout}, opts)}
end
use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments
use RabbitMQ.CLI.Core.RequiresRabbitAppRunning
def run([], %{node: node_name, timeout: timeout}) do
:rabbit_misc.rpc_call(node_name, :rabbit_msg_size_metrics, :cluster_summary_for_cli, [], timeout)
end
use RabbitMQ.CLI.DefaultOutput
def formatter(), do: RabbitMQ.CLI.Formatters.PrettyTable
def usage, do: "message_size_stats"
def usage_doc_guides() do
[
DocGuide.monitoring()
]
end
def help_section(), do: :observability_and_health_checks
def description(),
do: "Displays message size distribution statistics aggregated across all cluster nodes"
def banner(_, %{node: node_name}), do: "Gathering message size statistics from cluster via #{node_name} ..."
end

View File

@ -0,0 +1,55 @@
## This Source Code Form is subject to the terms of the Mozilla Public
## License, v. 2.0. If a copy of the MPL was not distributed with this
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
##
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
defmodule MessageSizeStatsCommandTest do
use ExUnit.Case, async: false
import TestHelper
@command RabbitMQ.CLI.Diagnostics.Commands.MessageSizeStatsCommand
setup_all do
RabbitMQ.CLI.Core.Distribution.start()
:ok
end
setup do
{:ok, opts: %{node: get_rabbit_hostname(), timeout: 60_000}}
end
test "validate: with extra arguments returns an arg count error", context do
assert @command.validate(["extra"], context[:opts]) == {:validation_failure, :too_many_args}
end
test "validate: with no arguments succeeds", context do
assert @command.validate([], context[:opts]) == :ok
end
test "run: request to a named, active node succeeds", context do
result = @command.run([], context[:opts])
Enum.each(result, fn row ->
assert is_list(row)
assert Enum.any?(row, fn {key, _} -> key == "Message Size" end)
assert Enum.any?(row, fn {key, _} -> key == "Count" end)
assert Enum.any?(row, fn {key, _} -> key == "Percentage" end)
count_value = Enum.find_value(row, fn {key, value} -> if key == "Count", do: value end)
percentage_value = Enum.find_value(row, fn {key, value} -> if key == "Percentage", do: value end)
assert is_integer(count_value)
assert is_float(String.to_float(percentage_value))
end)
end
test "run: request to a non-existent node returns a badrpc" do
opts = %{node: :jake@thedog, timeout: 200}
assert match?({:badrpc, _}, @command.run([], opts))
end
test "banner", context do
banner = @command.banner([], context[:opts])
assert banner =~ ~r/Gathering message size statistics from cluster via #{get_rabbit_hostname()}/
end
end