Merge pull request #14597 from rabbitmq/mergify/bp/v4.2.x/pr-14560

Introduce a 'rabbitmq-diagnostics message_size_stats' for reasoning about message size distribution (backport #14560)
This commit is contained in:
Michael Klishin 2025-09-24 00:31:53 -04:00 committed by GitHub
commit 9fd82313ee
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 306 additions and 38 deletions

View File

@ -11,7 +11,10 @@
-export([init/1,
observe/2,
prometheus_format/0]).
prometheus_format/0,
local_summary/0,
cluster_summary/0,
cluster_summary_for_cli/0 ]).
%% Integration tests.
-export([raw_buckets/1,
@ -52,6 +55,9 @@
-type raw_buckets() :: [{BucketUpperBound :: non_neg_integer(),
NumObservations :: non_neg_integer()}].
-type summary_entry() :: {{non_neg_integer(), non_neg_integer() | infinity}, {non_neg_integer(), float()}}.
-type summary() :: [summary_entry()].
-spec init(atom()) -> ok.
init(Protocol) ->
Size = ?POS_MSG_SIZE_SUM,
@ -133,6 +139,109 @@ get_labels_counters() ->
[{[{protocol, Protocol}], Counters}
|| {{?MODULE, Protocol}, Counters} <- persistent_term:get()].
get_protocols() ->
[Protocol
|| {{?MODULE, Protocol}, _} <- persistent_term:get()].
%% Aggregates data for all protocols on the local node
-spec local_summary() -> summary().
local_summary() ->
PerProtocolBuckets = lists:map(fun(Protocol) ->
raw_buckets(Protocol)
end, get_protocols()),
%% Sum buckets for all protocols
Buckets0 = [{?BUCKET_1, 0}, {?BUCKET_2, 0}, {?BUCKET_3, 0}, {?BUCKET_4, 0},
{?BUCKET_5, 0}, {?BUCKET_6, 0}, {?BUCKET_7, 0}, {?BUCKET_8, 0}, {?BUCKET_9, 0}],
Buckets = lists:foldl(fun sum_protocol_buckets/2,
Buckets0,
PerProtocolBuckets),
Total = lists:sum([Count || {_UpperBound, Count} <- Buckets]),
Ranges = lists:map(fun({UpperBound, Count}) ->
Percentage = case Total of
0 -> 0.0;
_ -> (Count / Total) * 100
end,
{bucket_range(UpperBound), {Count, Percentage}}
end, Buckets),
Ranges.
sum_protocol_buckets(ProtocolBuckets, Acc) ->
lists:map(fun({UpperBound, AccCount}) ->
ProtocolCount = proplists:get_value(UpperBound, ProtocolBuckets, 0),
{UpperBound, AccCount + ProtocolCount}
end, Acc).
%% Aggregates sumamries from all nodes
-spec cluster_summary() -> summary().
cluster_summary() ->
RemoteNodes = [Node || Node <- rabbit_nodes:list_running(), Node =/= node()],
RemoteSummaries = [ Summary || {ok, Summary} <- erpc:multicall(RemoteNodes,
?MODULE,
local_summary,
[],
5000)],
lists:foldl(fun merge_summaries/2, local_summary(), RemoteSummaries).
bucket_name({_, ?BUCKET_1}) -> <<"below 100B">>;
bucket_name({_, ?BUCKET_2}) -> <<"between 100B and 1KB">>;
bucket_name({_, ?BUCKET_3}) -> <<"between 1KB and 10KB">>;
bucket_name({_, ?BUCKET_4}) -> <<"between 10KB and 100KB">>;
bucket_name({_, ?BUCKET_5}) -> <<"between 100KB and 1MB">>;
bucket_name({_, ?BUCKET_6}) -> <<"between 1MB and 10MB">>;
bucket_name({_, ?BUCKET_7}) -> <<"between 10MB and 50MB">>;
bucket_name({_, ?BUCKET_8}) -> <<"between 50MB and 100MB">>;
bucket_name({_, ?BUCKET_9}) -> <<"above 100MB">>.
cluster_summary_for_cli() ->
[[{<<"Message Size">>, bucket_name(Range)},
{<<"Count">>, Count},
{<<"Percentage">>, iolist_to_binary(io_lib:format("~.2f", [Percentage]))}]
|| {Range, {Count, Percentage}} <- cluster_summary()].
get_count_for_range(Range, SummaryList) ->
case proplists:get_value(Range, SummaryList) of
{Count, _} -> Count;
undefined -> 0
end.
%% Merges two summary lists by adding their counts and recalculating percentages
merge_summaries(Summary1, Summary2) ->
%% Get all bucket ranges
AllRanges = lists:usort([Range || {Range, _} <- Summary1] ++ [Range || {Range, _} <- Summary2]),
MergedRanges = lists:map(fun(Range) ->
Count1 = get_count_for_range(Range, Summary1),
Count2 = get_count_for_range(Range, Summary2),
NewCount = Count1 + Count2,
{Range, NewCount}
end, AllRanges),
%% Calculate total and percentages
NewTotal = lists:sum([Count || {_, Count} <- MergedRanges]),
FinalRanges = lists:map(fun({Range, Count}) ->
NewPercentage = case NewTotal of
0 -> 0.0;
_ -> (Count / NewTotal) * 100
end,
{Range, {Count, NewPercentage}}
end, MergedRanges),
FinalRanges.
bucket_range(?BUCKET_1) -> {0, 100};
bucket_range(?BUCKET_2) -> {101, 1000};
bucket_range(?BUCKET_3) -> {1001, 10000};
bucket_range(?BUCKET_4) -> {10001, 100000};
bucket_range(?BUCKET_5) -> {100001, 1000000};
bucket_range(?BUCKET_6) -> {1000001, 10000000};
bucket_range(?BUCKET_7) -> {10000001, 50000000};
bucket_range(?BUCKET_8) -> {50000001, 100000000};
bucket_range(?BUCKET_9) -> {100000001, infinity}.
-ifdef(TEST).
%% "Counters are not tied to the current process and are automatically
%% garbage collected when they are no longer referenced."

View File

@ -13,7 +13,7 @@
-include_lib("amqp_client/include/amqp_client.hrl").
-import(rabbit_ct_broker_helpers,
[rpc/4]).
[rpc/4, rpc/5]).
all() ->
[
@ -22,9 +22,11 @@ all() ->
groups() ->
[
{tests, [shuffle],
[message_size,
over_max_message_size]}
{tests, [],
[summary, %% needs to run first
message_size,
over_max_message_size]
}
].
%% -------------------------------------------------------------------
@ -34,14 +36,18 @@ groups() ->
init_per_suite(Config) ->
{ok, _} = application:ensure_all_started(amqp10_client),
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config).
Config.
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
init_per_group(_Group, Config) ->
rabbit_ct_helpers:run_steps(
Config,
Suffix = rabbit_ct_helpers:testcase_absname(Config, "", "-"),
Config1 = rabbit_ct_helpers:set_config(
Config, [{rmq_nodes_count, 3},
{rmq_nodename_suffix, Suffix}]),
rabbit_ct_helpers:run_setup_steps(
Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).
@ -51,6 +57,13 @@ end_per_group(_Group, Config) ->
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).
init_per_testcase(summary, Config) ->
case rabbit_ct_broker_helpers:enable_feature_flag(Config, 'rabbitmq_4.2.0') of
ok ->
rabbit_ct_helpers:testcase_started(Config, sumary);
{skip, _} = Skip ->
Skip
end;
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
@ -65,32 +78,7 @@ message_size(Config) ->
AmqplBefore = get_msg_size_metrics(amqp091, Config),
AmqpBefore = get_msg_size_metrics(amqp10, Config),
Binary2B = <<"12">>,
Binary200K = binary:copy(<<"x">>, 200_000),
Payloads = [Binary2B, Binary200K, Binary2B],
{AmqplConn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
[amqp_channel:call(Ch,
#'basic.publish'{routing_key = <<"nowhere">>},
#amqp_msg{payload = Payload})
|| Payload <- Payloads],
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
Address = rabbitmq_amqp_address:exchange(<<"amq.fanout">>),
{ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"sender">>, Address),
receive {amqp10_event, {link, Sender, credited}} -> ok
after 30_000 -> ct:fail(credited_timeout)
end,
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag1">>, Binary2B)),
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag2">>, Binary200K)),
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag3">>, Binary2B)),
ok = wait_for_settlement(released, <<"tag1">>),
ok = wait_for_settlement(released, <<"tag2">>),
ok = wait_for_settlement(released, <<"tag3">>),
publish_messages(Config),
AmqplAfter = get_msg_size_metrics(amqp091, Config),
AmqpAfter = get_msg_size_metrics(amqp10, Config),
@ -100,10 +88,7 @@ message_size(Config) ->
?assertEqual(ExpectedDiff,
rabbit_msg_size_metrics:diff_raw_buckets(AmqplAfter, AmqplBefore)),
?assertEqual(ExpectedDiff,
rabbit_msg_size_metrics:diff_raw_buckets(AmqpAfter, AmqpBefore)),
ok = amqp10_client:close_connection(Connection),
ok = rabbit_ct_client_helpers:close_connection_and_channel(AmqplConn, Ch).
rabbit_msg_size_metrics:diff_raw_buckets(AmqpAfter, AmqpBefore)).
over_max_message_size(Config) ->
DefaultMaxMessageSize = rpc(Config, persistent_term, get, [max_message_size]),
@ -134,6 +119,39 @@ over_max_message_size(Config) ->
ok = rabbit_ct_client_helpers:close_connection(Conn),
ok = rpc(Config, persistent_term, put, [max_message_size, DefaultMaxMessageSize]).
summary(Config) ->
ZeroSummary = [{{0, 100}, {0, 0.0}},
{{101, 1000}, {0, 0.0}},
{{1001, 10000}, {0, 0.0}},
{{10001, 100000}, {0, 0.0}},
{{100001, 1000000}, {0, 0.0}},
{{1000001, 10000000}, {0, 0.0}},
{{10000001, 50000000}, {0, 0.0}},
{{50000001, 100000000}, {0, 0.0}},
{{100000001, infinity}, {0, 0.0}}],
?assertEqual(ZeroSummary, rpc(Config, 0, rabbit_msg_size_metrics, local_summary, [])),
?assertEqual(ZeroSummary, rpc(Config, 1, rabbit_msg_size_metrics, cluster_summary, [])),
?assertEqual(ZeroSummary, rpc(Config, 0, rabbit_msg_size_metrics, local_summary, [])),
?assertEqual(ZeroSummary, rpc(Config, 1, rabbit_msg_size_metrics, cluster_summary, [])),
publish_messages(Config),
ExpectedSummary = [{{0, 100}, {4, 66.66666666666666}},
{{101, 1000}, {0, 0.0}},
{{1001, 10000}, {0, 0.0}},
{{10001, 100000}, {0, 0.0}},
{{100001, 1000000}, {2, 33.33333333333333}},
{{1000001, 10000000}, {0, 0.0}},
{{10000001, 50000000}, {0, 0.0}},
{{50000001, 100000000}, {0, 0.0}},
{{100000001, infinity}, {0, 0.0}}],
?assertEqual(ExpectedSummary, rpc(Config, 0, rabbit_msg_size_metrics, local_summary, [])),
?assertEqual(ExpectedSummary, rpc(Config, 0, rabbit_msg_size_metrics, cluster_summary, [])),
?assertEqual(ExpectedSummary, rpc(Config, 1, rabbit_msg_size_metrics, cluster_summary, [])),
?assertEqual(ZeroSummary, rpc(Config, 1, rabbit_msg_size_metrics, local_summary, [])).
get_msg_size_metrics(Protocol, Config) ->
rpc(Config, rabbit_msg_size_metrics, raw_buckets, [Protocol]).
@ -145,6 +163,36 @@ connection_config(Config) ->
container_id => <<"my container">>,
sasl => anon}.
publish_messages(Config) ->
Binary2B = <<"12">>,
Binary200K = binary:copy(<<"x">>, 200_000),
Payloads = [Binary2B, Binary200K, Binary2B],
{AmqplConn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
[amqp_channel:call(Ch,
#'basic.publish'{routing_key = <<"nowhere">>},
#amqp_msg{payload = Payload})
|| Payload <- Payloads],
OpnConf = connection_config(Config),
{ok, Connection} = amqp10_client:open_connection(OpnConf),
{ok, Session} = amqp10_client:begin_session_sync(Connection),
Address = rabbitmq_amqp_address:exchange(<<"amq.fanout">>),
{ok, Sender} = amqp10_client:attach_sender_link_sync(Session, <<"sender">>, Address),
receive {amqp10_event, {link, Sender, credited}} -> ok
after 30_000 -> ct:fail(credited_timeout)
end,
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag1">>, Binary2B)),
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag2">>, Binary200K)),
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(<<"tag3">>, Binary2B)),
ok = wait_for_settlement(released, <<"tag1">>),
ok = wait_for_settlement(released, <<"tag2">>),
ok = wait_for_settlement(released, <<"tag3">>),
ok = amqp10_client:close_connection(Connection),
ok = rabbit_ct_client_helpers:close_connection_and_channel(AmqplConn, Ch).
wait_for_settlement(State, Tag) ->
receive
{amqp10_disposition, {State, Tag}} ->

View File

@ -0,0 +1,56 @@
## This Source Code Form is subject to the terms of the Mozilla Public
## License, v. 2.0. If a copy of the MPL was not distributed with this
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
##
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
defmodule RabbitMQ.CLI.Diagnostics.Commands.MessageSizeStatsCommand do
alias RabbitMQ.CLI.Core.DocGuide
@behaviour RabbitMQ.CLI.CommandBehaviour
@default_timeout 60_000
def scopes(), do: [:diagnostics]
def switches(), do: [timeout: :integer]
def aliases(), do: [t: :timeout]
def merge_defaults(args, opts) do
timeout =
case opts[:timeout] do
nil -> @default_timeout
:infinity -> @default_timeout
other -> other
end
{args, Map.merge(%{timeout: timeout}, opts)}
end
use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments
use RabbitMQ.CLI.Core.RequiresRabbitAppRunning
def run([], %{node: node_name, timeout: timeout}) do
:rabbit_misc.rpc_call(node_name, :rabbit_msg_size_metrics, :cluster_summary_for_cli, [], timeout)
end
use RabbitMQ.CLI.DefaultOutput
def formatter(), do: RabbitMQ.CLI.Formatters.PrettyTable
def usage, do: "message_size_stats"
def usage_doc_guides() do
[
DocGuide.monitoring()
]
end
def help_section(), do: :observability_and_health_checks
def description(),
do: "Displays message size distribution statistics aggregated across all cluster nodes"
def banner(_, %{node: node_name}), do: "Gathering message size statistics across the cluster using node #{node_name} ..."
end

View File

@ -0,0 +1,55 @@
## This Source Code Form is subject to the terms of the Mozilla Public
## License, v. 2.0. If a copy of the MPL was not distributed with this
## file, You can obtain one at https://mozilla.org/MPL/2.0/.
##
## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
defmodule MessageSizeStatsCommandTest do
use ExUnit.Case, async: false
import TestHelper
@command RabbitMQ.CLI.Diagnostics.Commands.MessageSizeStatsCommand
setup_all do
RabbitMQ.CLI.Core.Distribution.start()
:ok
end
setup do
{:ok, opts: %{node: get_rabbit_hostname(), timeout: 60_000}}
end
test "validate: with extra arguments returns an arg count error", context do
assert @command.validate(["extra"], context[:opts]) == {:validation_failure, :too_many_args}
end
test "validate: with no arguments succeeds", context do
assert @command.validate([], context[:opts]) == :ok
end
test "run: request to a named, active node succeeds", context do
result = @command.run([], context[:opts])
Enum.each(result, fn row ->
assert is_list(row)
assert Enum.any?(row, fn {key, _} -> key == "Message Size" end)
assert Enum.any?(row, fn {key, _} -> key == "Count" end)
assert Enum.any?(row, fn {key, _} -> key == "Percentage" end)
count_value = Enum.find_value(row, fn {key, value} -> if key == "Count", do: value end)
percentage_value = Enum.find_value(row, fn {key, value} -> if key == "Percentage", do: value end)
assert is_integer(count_value)
assert is_float(String.to_float(percentage_value))
end)
end
test "run: request to a non-existent node returns a badrpc" do
opts = %{node: :jake@thedog, timeout: 200}
assert match?({:badrpc, _}, @command.run([], opts))
end
test "banner", context do
banner = @command.banner([], context[:opts])
assert banner =~ ~r/message size statistics/
end
end