Provide per-exchange/queue metrics w/out channelID

This commit is contained in:
Lois Soto Lopez 2024-05-23 10:28:26 +02:00 committed by Michael Klishin
parent 0af892396a
commit ec5e258825
4 changed files with 194 additions and 20 deletions

View File

@ -28,6 +28,14 @@
{auth_attempt_metrics, set},
{auth_attempt_detailed_metrics, set}]).
% `CORE_NON_CHANNEL_TABLES` are tables that store counters representing the
% same info as some of the channel_queue_metrics, channel_exchange_metrics and
% channel_queue_exchange_metrics but without including the channel ID in the
% key.
-define(CORE_NON_CHANNEL_TABLES, [{queue_counter_metrics, set},
{exchange_metrics, set},
{queue_exchange_metrics, set}]).
-define(CONNECTION_CHURN_METRICS, {node(), 0, 0, 0, 0, 0, 0, 0}).
%% connection_created :: {connection_id, proplist}

View File

@ -111,13 +111,15 @@ create_table({Table, Type}) ->
{read_concurrency, true}]).
init() ->
Tables = ?CORE_TABLES ++ ?CORE_EXTRA_TABLES ++ ?CORE_NON_CHANNEL_TABLES,
_ = [create_table({Table, Type})
|| {Table, Type} <- ?CORE_TABLES ++ ?CORE_EXTRA_TABLES],
|| {Table, Type} <- Tables],
ok.
terminate() ->
Tables = ?CORE_TABLES ++ ?CORE_EXTRA_TABLES ++ ?CORE_NON_CHANNEL_TABLES,
[ets:delete(Table)
|| {Table, _Type} <- ?CORE_TABLES ++ ?CORE_EXTRA_TABLES],
|| {Table, _Type} <- Tables],
ok.
connection_created(Pid, Infos) ->
@ -166,53 +168,65 @@ channel_stats(reductions, Id, Value) ->
ets:insert(channel_process_metrics, {Id, Value}),
ok.
channel_stats(exchange_stats, publish, Id, Value) ->
channel_stats(exchange_stats, publish, {_ChannelPid, XName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_exchange_metrics, Id, {2, Value}, {Id, 0, 0, 0, 0, 0}),
_ = ets:update_counter(exchange_metrics, XName, {2, Value}, {XName, 0, 0, 0, 0, 0}),
ok;
channel_stats(exchange_stats, confirm, Id, Value) ->
channel_stats(exchange_stats, confirm, {_ChannelPid, XName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_exchange_metrics, Id, {3, Value}, {Id, 0, 0, 0, 0, 0}),
_ = ets:update_counter(exchange_metrics, XName, {3, Value}, {XName, 0, 0, 0, 0, 0}),
ok;
channel_stats(exchange_stats, return_unroutable, Id, Value) ->
channel_stats(exchange_stats, return_unroutable, {_ChannelPid, XName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_exchange_metrics, Id, {4, Value}, {Id, 0, 0, 0, 0, 0}),
_ = ets:update_counter(exchange_metrics, XName, {4, Value}, {XName, 0, 0, 0, 0, 0}),
ok;
channel_stats(exchange_stats, drop_unroutable, Id, Value) ->
channel_stats(exchange_stats, drop_unroutable, {_ChannelPid, XName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_exchange_metrics, Id, {5, Value}, {Id, 0, 0, 0, 0, 0}),
_ = ets:update_counter(exchange_metrics, XName, {5, Value}, {XName, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_exchange_stats, publish, Id, Value) ->
channel_stats(queue_exchange_stats, publish, {_ChannelPid, QueueExchange} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_exchange_metrics, Id, Value, {Id, 0, 0}),
_ = ets:update_counter(queue_exchange_metrics, QueueExchange, Value, {QueueExchange, 0, 0}),
ok;
channel_stats(queue_stats, get, Id, Value) ->
channel_stats(queue_stats, get, {_ChannelPid, QName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {2, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {2, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, get_no_ack, Id, Value) ->
channel_stats(queue_stats, get_no_ack, {_ChannelPid, QName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {3, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {3, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, deliver, Id, Value) ->
channel_stats(queue_stats, deliver, {_ChannelPid, QName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {4, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {4, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, deliver_no_ack, Id, Value) ->
channel_stats(queue_stats, deliver_no_ack, {_ChannelPid, QName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {5, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {5, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, redeliver, Id, Value) ->
channel_stats(queue_stats, redeliver, {_ChannelPid, QName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {6, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {6, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, ack, Id, Value) ->
channel_stats(queue_stats, ack, {_ChannelPid, QName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {7, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {7, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok;
channel_stats(queue_stats, get_empty, Id, Value) ->
channel_stats(queue_stats, get_empty, {_ChannelPid, QName} = Id, Value) ->
%% Includes delete marker
_ = ets:update_counter(channel_queue_metrics, Id, {8, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}),
_ = ets:update_counter(queue_counter_metrics, QName, {8, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}),
ok.
delete(Table, Key) ->

View File

@ -160,7 +160,15 @@
{2, undefined, queue_disk_writes_total, counter, "Total number of times queue wrote messages to disk", disk_writes},
{2, undefined, stream_segments, counter, "Total number of stream segment files", segments}
]},
{queue_counter_metrics, [
{2, undefined, queue_get_ack_total, counter, "Total number of messages fetched with basic.get in manual acknowledgement mode"},
{3, undefined, queue_get_total, counter, "Total number of messages fetched with basic.get in automatic acknowledgement mode"},
{4, undefined, queue_messages_delivered_ack_total, counter, "Total number of messages delivered to consumers in manual acknowledgement mode"},
{5, undefined, queue_messages_delivered_total, counter, "Total number of messages delivered to consumers in automatic acknowledgement mode"},
{6, undefined, queue_messages_redelivered_total, counter, "Total number of messages redelivered to consumers"},
{7, undefined, queue_messages_acked_total, counter, "Total number of messages acknowledged by consumers"},
{8, undefined, queue_get_empty_total, counter, "Total number of times basic.get operations fetched no message"}
]},
%%% Metrics that contain reference to a channel. Some of them also have
%%% a queue name, but in this case filtering on it doesn't make any
%%% sense, as the queue is not an object of interest here.
@ -174,6 +182,13 @@
{2, undefined, channel_prefetch, gauge, "Total limit of unacknowledged messages for all consumers on a channel", global_prefetch_count}
]},
{exchange_metrics, [
{2, undefined, exchange_messages_published_total, counter, "Total number of messages published into an exchange on a channel"},
{3, undefined, exchange_messages_confirmed_total, counter, "Total number of messages published into an exchange and confirmed on the channel"},
{4, undefined, exchange_messages_unroutable_returned_total, counter, "Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable"},
{5, undefined, exchange_messages_unroutable_dropped_total, counter, "Total number of messages published as non-mandatory into an exchange and dropped as unroutable"}
]},
{channel_exchange_metrics, [
{2, undefined, channel_messages_published_total, counter, "Total number of messages published into an exchange on a channel"},
{3, undefined, channel_messages_confirmed_total, counter, "Total number of messages published into an exchange and confirmed on the channel"},
@ -208,6 +223,10 @@
{2, undefined, connection_channels, gauge, "Channels on a connection", channels}
]},
{queue_exchange_metrics, [
{2, undefined, queue_exchange_messages_published_total, counter, "Total number of messages published to queues"}
]},
{channel_queue_exchange_metrics, [
{2, undefined, queue_messages_published_total, counter, "Total number of messages published to queues"}
]}
@ -542,8 +561,11 @@ get_data(queue_metrics = Table, false, VHostsFilter) ->
{disk_reads, A15}, {disk_writes, A16}, {segments, A17}]}];
get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics;
Table == queue_coarse_metrics;
Table == queue_counter_metrics;
Table == channel_queue_metrics;
Table == connection_coarse_metrics;
Table == exchange_metrics;
Table == queue_exchange_metrics;
Table == channel_queue_exchange_metrics;
Table == ra_metrics;
Table == channel_process_metrics ->
@ -551,6 +573,8 @@ get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics;
%% For queue_coarse_metrics
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
Acc;
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _, _, _, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
Acc;
({_, V1}, {T, A1}) ->
{T, V1 + A1};
({_, V1, _}, {T, A1}) ->
@ -577,6 +601,42 @@ get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics;
_ ->
[Result]
end;
get_data(exchange_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter)->
ets:foldl(fun
({#resource{kind = exchange, virtual_host = VHost}, _, _, _, _, _} = Row, Acc) when
map_get(VHost, VHostsFilter)
->
[Row | Acc];
(_Row, Acc) ->
Acc
end, [], Table);
get_data(exchange_metrics, true, _VhostsFilter) ->
[];
get_data(queue_counter_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter)->
ets:foldl(fun
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _, _, _, _, _} = Row, Acc) when
map_get(VHost, VHostsFilter)
->
[Row | Acc];
(_Row, Acc) ->
Acc
end, [], Table);
get_data(queue_counter_metrics, true, _VHostsFilter) ->
[];
get_data(queue_exchange_metrics = Table, true, VHostsFilter) ->
ets:foldl(fun
({{
#resource{kind = queue, virtual_host = VHost},
#resource{kind = exchange, virtual_host = VHost}
}, _, _} = Row, Acc) when
map_get(VHost, VHostsFilter)
->
[Row | Acc];
(_Row, Acc) ->
Acc
end, [], Table);
get_data(queue_exchange_metrics, true, _VHostsFilter) ->
[];
get_data(queue_coarse_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) ->
ets:foldl(fun
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) ->
@ -669,15 +729,15 @@ division(A, B) ->
accumulate_count_and_sum(Value, {Count, Sum}) ->
{Count + 1, Sum + Value}.
empty(T) when T == channel_queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count ->
empty(T) when T == channel_queue_exchange_metrics; T == queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count ->
{T, 0};
empty(T) when T == connection_coarse_metrics; T == auth_attempt_metrics; T == auth_attempt_detailed_metrics ->
{T, 0, 0, 0};
empty(T) when T == channel_exchange_metrics; T == queue_coarse_metrics; T == connection_metrics ->
empty(T) when T == channel_exchange_metrics; T == exchange_metrics; T == queue_coarse_metrics; T == connection_metrics ->
{T, 0, 0, 0, 0};
empty(T) when T == ra_metrics ->
{T, 0, 0, 0, 0, 0, {0, 0}};
empty(T) when T == channel_queue_metrics; T == channel_metrics ->
empty(T) when T == channel_queue_metrics; T == queue_counter_metrics; T == channel_metrics ->
{T, 0, 0, 0, 0, 0, 0, 0};
empty(queue_metrics = T) ->
{T, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}.

View File

@ -57,6 +57,8 @@ groups() ->
queue_consumer_count_single_vhost_per_object_test,
queue_consumer_count_all_vhosts_per_object_test,
queue_coarse_metrics_per_object_test,
queue_counter_metrics_per_object_test,
queue_exchange_metrics_per_object_test,
queue_metrics_per_object_test,
queue_consumer_count_and_queue_metrics_mutually_exclusive_test,
vhost_status_metric,
@ -523,6 +525,96 @@ queue_coarse_metrics_per_object_test(Config) ->
map_get(rabbitmq_detailed_queue_messages, parse_response(Body3))),
ok.
queue_counter_metrics_per_object_test(Config) ->
Expected1 = #{#{queue => "vhost-1-queue-with-consumer", vhost => "vhost-1"} => [7]},
{_, Body1} = http_get_with_pal(Config,
"/metrics/detailed?vhost=vhost-1&family=queue_counter_metrics",
[], 200),
?assertEqual(
Expected1,
map_get(
rabbitmq_detailed_queue_messages_delivered_ack_total,
parse_response(Body1))),
{_, Body2} = http_get_with_pal(Config,
"/metrics/detailed?vhost=vhost-2&family=queue_counter_metrics",
[], 200),
Expected2 = #{#{queue => "vhost-2-queue-with-consumer", vhost => "vhost-2"} => [11]},
?assertEqual(
Expected2,
map_get(
rabbitmq_detailed_queue_messages_delivered_ack_total,
parse_response(Body2))),
%% Maybe missing, tests for the queue_exchange_metrics
ok.
queue_exchange_metrics_per_object_test(Config) ->
Expected1 = #{
#{
queue => "vhost-1-queue-with-messages",
vhost => "vhost-1",
exchange => ""
} => [7],
#{
exchange => "",
queue => "vhost-1-queue-with-consumer",
vhost => "vhost-1"
} => [7]
},
{_, Body1} = http_get_with_pal(Config,
"/metrics/detailed?vhost=vhost-1&family=queue_exchange_metrics",
[], 200),
?assertEqual(
Expected1,
map_get(
rabbitmq_detailed_queue_exchange_messages_published_total,
parse_response(Body1))),
{_, Body2} = http_get_with_pal(Config,
"/metrics/detailed?vhost=vhost-2&family=queue_exchange_metrics",
[], 200),
Expected2 = #{
#{
queue => "vhost-2-queue-with-messages",
vhost => "vhost-2",
exchange => ""
} => [11],
#{
exchange => "",
queue => "vhost-2-queue-with-consumer",
vhost => "vhost-2"
} => [11]
},
?assertEqual(
Expected2,
map_get(
rabbitmq_detailed_queue_exchange_messages_published_total,
parse_response(Body2))),
ok.
exchange_metrics_per_object_test(Config) ->
Expected1 = #{#{queue => "vhost-1-queue-with-consumer", vhost => "vhost-1"} => [7]},
{_, Body} = http_get_with_pal(Config,
"/metrics/detailed?vhost=vhost-1&family=exchange_metrics",
[], 200),
?assertEqual(
Expected1,
map_get(
rabbitmq_detailed_queue_messages_delivered_ack_total,
parse_response(Body))),
ok.
queue_metrics_per_object_test(Config) ->
Expected1 = #{#{queue => "vhost-1-queue-with-consumer", vhost => "vhost-1"} => [7],
#{queue => "vhost-1-queue-with-messages", vhost => "vhost-1"} => [1]},