Close #10345. Add promtheus_rabbitmq_federation_collector.
rabbitmq_federation_links gauge metric with status lable.
This commit is contained in:
parent
72b25064c3
commit
8925dfa916
|
@ -201,6 +201,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
|
|||
hdrs = ["include/rabbit_federation.hrl"],
|
||||
app_name = "rabbitmq_federation",
|
||||
erlc_opts = "//:test_erlc_opts",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = ["//deps/amqp_client:erlang_app"],
|
||||
)
|
||||
erlang_bytecode(
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
|
||||
-export([start_link/0]).
|
||||
|
||||
-export([report/4, remove_exchange_or_queue/1, remove/2, status/0, lookup/1]).
|
||||
-export([report/4, remove_exchange_or_queue/1, remove/2, status/0, status/1, lookup/1]).
|
||||
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
terminate/2, code_change/3]).
|
||||
|
@ -41,7 +41,10 @@ remove(Upstream, XorQName) ->
|
|||
gen_server:call(?SERVER, {remove, Upstream, XorQName}, infinity).
|
||||
|
||||
status() ->
|
||||
gen_server:call(?SERVER, status, infinity).
|
||||
status(infinity).
|
||||
|
||||
status(Timeout) ->
|
||||
gen_server:call(?SERVER, status, Timeout).
|
||||
|
||||
lookup(Id) ->
|
||||
gen_server:call(?SERVER, {lookup, Id}, infinity).
|
||||
|
|
|
@ -52,6 +52,7 @@ rabbitmq_app(
|
|||
priv = [":priv"],
|
||||
deps = [
|
||||
"//deps/rabbit:erlang_app",
|
||||
"//deps/rabbitmq_federation:erlang_app",
|
||||
"//deps/rabbitmq_management_agent:erlang_app",
|
||||
"//deps/rabbitmq_web_dispatch:erlang_app",
|
||||
"@accept//:erlang_app",
|
||||
|
@ -81,6 +82,7 @@ dialyze(
|
|||
|
||||
eunit(
|
||||
name = "eunit",
|
||||
compiled_suites = [":test_rabbitmq_prometheus_collector_test_proxy_beam"],
|
||||
target = ":test_erlang_app",
|
||||
)
|
||||
|
||||
|
@ -97,6 +99,15 @@ rabbitmq_integration_suite(
|
|||
flaky = True,
|
||||
)
|
||||
|
||||
rabbitmq_integration_suite(
|
||||
name = "prometheus_rabbitmq_federation_collector_SUITE",
|
||||
size = "small",
|
||||
additional_beam = [
|
||||
"//deps/rabbitmq_federation:test/rabbit_federation_test_util.beam", #keep
|
||||
"test/rabbitmq_prometheus_collector_test_proxy.beam", #keep
|
||||
],
|
||||
)
|
||||
|
||||
assert_suites()
|
||||
|
||||
alias(
|
||||
|
|
|
@ -13,6 +13,7 @@ def all_beam_files(name = "all_beam_files"):
|
|||
"src/collectors/prometheus_rabbitmq_alarm_metrics_collector.erl",
|
||||
"src/collectors/prometheus_rabbitmq_core_metrics_collector.erl",
|
||||
"src/collectors/prometheus_rabbitmq_dynamic_collector.erl",
|
||||
"src/collectors/prometheus_rabbitmq_federation_collector.erl",
|
||||
"src/collectors/prometheus_rabbitmq_global_metrics_collector.erl",
|
||||
"src/rabbit_prometheus_app.erl",
|
||||
"src/rabbit_prometheus_dispatcher.erl",
|
||||
|
@ -44,6 +45,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
|
|||
"src/collectors/prometheus_rabbitmq_alarm_metrics_collector.erl",
|
||||
"src/collectors/prometheus_rabbitmq_core_metrics_collector.erl",
|
||||
"src/collectors/prometheus_rabbitmq_dynamic_collector.erl",
|
||||
"src/collectors/prometheus_rabbitmq_federation_collector.erl",
|
||||
"src/collectors/prometheus_rabbitmq_global_metrics_collector.erl",
|
||||
"src/rabbit_prometheus_app.erl",
|
||||
"src/rabbit_prometheus_dispatcher.erl",
|
||||
|
@ -86,6 +88,7 @@ def all_srcs(name = "all_srcs"):
|
|||
"src/collectors/prometheus_rabbitmq_alarm_metrics_collector.erl",
|
||||
"src/collectors/prometheus_rabbitmq_core_metrics_collector.erl",
|
||||
"src/collectors/prometheus_rabbitmq_dynamic_collector.erl",
|
||||
"src/collectors/prometheus_rabbitmq_federation_collector.erl",
|
||||
"src/collectors/prometheus_rabbitmq_global_metrics_collector.erl",
|
||||
"src/rabbit_prometheus_app.erl",
|
||||
"src/rabbit_prometheus_dispatcher.erl",
|
||||
|
@ -124,3 +127,20 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
|
|||
"//deps/rabbitmq_ct_helpers:erlang_app",
|
||||
],
|
||||
)
|
||||
erlang_bytecode(
|
||||
name = "prometheus_rabbitmq_federation_collector_SUITE_beam_files",
|
||||
testonly = True,
|
||||
srcs = ["test/prometheus_rabbitmq_federation_collector_SUITE.erl"],
|
||||
outs = ["test/prometheus_rabbitmq_federation_collector_SUITE.beam"],
|
||||
app_name = "rabbitmq_prometheus",
|
||||
erlc_opts = "//:test_erlc_opts",
|
||||
deps = ["//deps/amqp_client:erlang_app", "@prometheus//:erlang_app"],
|
||||
)
|
||||
erlang_bytecode(
|
||||
name = "rabbitmq_prometheus_collector_test_proxy_beam_files",
|
||||
testonly = True,
|
||||
srcs = ["test/rabbitmq_prometheus_collector_test_proxy.erl"],
|
||||
outs = ["test/rabbitmq_prometheus_collector_test_proxy.beam"],
|
||||
app_name = "rabbitmq_prometheus",
|
||||
erlc_opts = "//:test_erlc_opts",
|
||||
)
|
||||
|
|
|
@ -258,6 +258,12 @@ These metrics are specific to the stream protocol.
|
|||
| rabbitmq_raft_log_snapshot_index | Raft log snapshot index |
|
||||
| rabbitmq_raft_term_total | Current Raft term number |
|
||||
|
||||
### Federation
|
||||
|
||||
| Metric | Description |
|
||||
| --- | --- |
|
||||
| rabbitmq_federation_links | Federations Links count grouped by Link status |
|
||||
|
||||
## Telemetry
|
||||
|
||||
| Metric | Description |
|
||||
|
|
45
deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_federation_collector.erl
vendored
Normal file
45
deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_federation_collector.erl
vendored
Normal file
|
@ -0,0 +1,45 @@
|
|||
%% This Source Code Form is subject to the terms of the Mozilla Public
|
||||
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
%%
|
||||
%% Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
||||
%%
|
||||
-module(prometheus_rabbitmq_federation_collector).
|
||||
-export([deregister_cleanup/1,
|
||||
collect_mf/2]).
|
||||
|
||||
-import(prometheus_model_helpers, [create_mf/4]).
|
||||
|
||||
-behaviour(prometheus_collector).
|
||||
|
||||
-define(METRICS, [{rabbitmq_federation_links, gauge,
|
||||
"Current number of federation links."},
|
||||
]).
|
||||
|
||||
%% API exports
|
||||
-export([]).
|
||||
|
||||
%%====================================================================
|
||||
%% Collector API
|
||||
%%====================================================================
|
||||
|
||||
deregister_cleanup(_) -> ok.
|
||||
|
||||
collect_mf(_Registry, Callback) ->
|
||||
Status = rabbit_federation_status:status(500),
|
||||
StatusGroups = lists:foldl(fun(S, Acc) ->
|
||||
%% note Init value set to 1 because if status seen first time
|
||||
%% update with will take Init and put into Acc, wuthout calling fun
|
||||
maps:update_with(proplists:get_value(status, S), fun(C) -> C + 1 end, 1, Acc)
|
||||
end, #{}, Status),
|
||||
Metrics = [{rabbitmq_federation_links, gauge, "Current number of federation links.",
|
||||
[{[{status, S}], C} || {S, C} <- maps:to_list(StatusGroups)]}],
|
||||
_ = [add_metric_family(Metric, Callback) || Metric <- Metrics],
|
||||
ok.
|
||||
|
||||
add_metric_family({Name, Type, Help, Metrics}, Callback) ->
|
||||
Callback(create_mf(Name, Help, Type, Metrics)).
|
||||
|
||||
%%====================================================================
|
||||
%% Private Parts
|
||||
%%====================================================================
|
|
@ -18,6 +18,7 @@ build_dispatcher() ->
|
|||
prometheus_rabbitmq_global_metrics_collector,
|
||||
prometheus_rabbitmq_alarm_metrics_collector,
|
||||
prometheus_rabbitmq_dynamic_collector,
|
||||
prometheus_rabbitmq_federation_collector,
|
||||
prometheus_process_collector]),
|
||||
prometheus_registry:register_collectors('per-object', [
|
||||
prometheus_vm_system_info_collector,
|
||||
|
|
152
deps/rabbitmq_prometheus/test/prometheus_rabbitmq_federation_collector_SUITE.erl
vendored
Normal file
152
deps/rabbitmq_prometheus/test/prometheus_rabbitmq_federation_collector_SUITE.erl
vendored
Normal file
|
@ -0,0 +1,152 @@
|
|||
%% This Source Code Form is subject to the terms of the Mozilla Public
|
||||
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
%%
|
||||
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
||||
%%
|
||||
|
||||
-module(prometheus_rabbitmq_federation_collector_SUITE).
|
||||
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||
-include_lib("prometheus/include/prometheus_model.hrl").
|
||||
|
||||
-compile(export_all).
|
||||
|
||||
-define(ONE_RUNNING_METRIC, #'MetricFamily'{name = <<"rabbitmq_federation_links">>,
|
||||
help = "Current number of federation links.",
|
||||
type = 'GAUGE',
|
||||
metric = [#'Metric'{label = [#'LabelPair'{name = <<"status">>,
|
||||
value = <<"running">>}],
|
||||
gauge = #'Gauge'{value = 1}}]}).
|
||||
|
||||
-define(TWO_RUNNING_METRIC, #'MetricFamily'{name = <<"rabbitmq_federation_links">>,
|
||||
help = "Current number of federation links.",
|
||||
type = 'GAUGE',
|
||||
metric = [#'Metric'{label = [#'LabelPair'{name = <<"status">>,
|
||||
value = <<"running">>}],
|
||||
gauge = #'Gauge'{value = 2}}]}).
|
||||
|
||||
-define(ONE_RUNNING_ONE_STARTING_METRIC, #'MetricFamily'{name = <<"rabbitmq_federation_links">>,
|
||||
help = "Current number of federation links.",
|
||||
type = 'GAUGE',
|
||||
metric = [#'Metric'{label = [#'LabelPair'{name = <<"status">>,
|
||||
value = <<"running">>}],
|
||||
gauge = #'Gauge'{value = 1}},
|
||||
#'Metric'{label = [#'LabelPair'{name = <<"status">>,
|
||||
value = <<"starting">>}],
|
||||
gauge = #'Gauge'{value = 1}}]}).
|
||||
|
||||
-import(rabbit_federation_test_util,
|
||||
[expect/3, expect_empty/2,
|
||||
set_upstream/4, clear_upstream/3, set_upstream_set/4,
|
||||
set_policy/5, clear_policy/3,
|
||||
set_policy_upstream/5, set_policy_upstreams/4,
|
||||
no_plugins/1, with_ch/3, q/2, maybe_declare_queue/3, delete_all/2]).
|
||||
|
||||
all() ->
|
||||
[
|
||||
{group, non_parallel_tests}
|
||||
].
|
||||
|
||||
groups() ->
|
||||
[
|
||||
{non_parallel_tests, [], [
|
||||
single_link_then_second_added,
|
||||
two_links_from_the_start
|
||||
]}
|
||||
].
|
||||
|
||||
suite() ->
|
||||
[{timetrap, {minutes, 5}}].
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
%% Testsuite setup/teardown.
|
||||
%% -------------------------------------------------------------------
|
||||
init_per_suite(Config) ->
|
||||
rabbit_ct_helpers:log_environment(),
|
||||
Config1 = rabbit_ct_helpers:set_config(Config, [
|
||||
{rmq_nodename_suffix, ?MODULE}
|
||||
]),
|
||||
rabbit_ct_helpers:run_setup_steps(Config1,
|
||||
rabbit_ct_broker_helpers:setup_steps() ++
|
||||
rabbit_ct_client_helpers:setup_steps() ++
|
||||
[fun rabbit_federation_test_util:setup_federation/1]).
|
||||
end_per_suite(Config) ->
|
||||
rabbit_ct_helpers:run_teardown_steps(Config,
|
||||
rabbit_ct_client_helpers:teardown_steps() ++
|
||||
rabbit_ct_broker_helpers:teardown_steps()).
|
||||
|
||||
init_per_group(_, Config) ->
|
||||
Config.
|
||||
|
||||
end_per_group(_, Config) ->
|
||||
Config.
|
||||
|
||||
init_per_testcase(Testcase, Config) ->
|
||||
rabbit_ct_helpers:testcase_started(Config, Testcase).
|
||||
|
||||
end_per_testcase(Testcase, Config) ->
|
||||
rabbit_ct_helpers:testcase_finished(Config, Testcase).
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
%% Test cases
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
single_link_then_second_added(Config) ->
|
||||
with_ch(
|
||||
Config,
|
||||
fun (Ch) ->
|
||||
timer:sleep(3000),
|
||||
[_L1] = rabbit_ct_broker_helpers:rpc(Config, 0,
|
||||
rabbit_federation_status, status, []),
|
||||
MFs = get_metrics(Config),
|
||||
[?ONE_RUNNING_METRIC] = MFs,
|
||||
maybe_declare_queue(Config, Ch, q(<<"fed.downstream2">>, [{<<"x-queue-type">>, longstr, <<"classic">>}])),
|
||||
%% here we race against queue.declare... most of the times there is going to be
|
||||
%% new status=starting metric. In this case we wait a bit more for running=2.
|
||||
%% But running=2 is also possible first time if rpc for some reason is slow.
|
||||
%% And of course simple running=1 possible too if queue.declare is really slow
|
||||
MFs1 = get_metrics(Config),
|
||||
case MFs1 of
|
||||
[?TWO_RUNNING_METRIC] -> ok;
|
||||
[?ONE_RUNNING_METRIC] ->
|
||||
rabbit_ct_helpers:eventually(?_assertEqual([?TWO_RUNNING_METRIC],
|
||||
get_metrics(Config)),
|
||||
500,
|
||||
5);
|
||||
[?ONE_RUNNING_ONE_STARTING_METRIC] ->
|
||||
rabbit_ct_helpers:eventually(?_assertEqual([?TWO_RUNNING_METRIC],
|
||||
get_metrics(Config)),
|
||||
500,
|
||||
5)
|
||||
|
||||
end,
|
||||
|
||||
delete_all(Ch, [q(<<"fed.downstream2">>, [{<<"x-queue-type">>, longstr, <<"classic">>}])])
|
||||
end, upstream_downstream()).
|
||||
|
||||
two_links_from_the_start(Config) ->
|
||||
with_ch(
|
||||
Config,
|
||||
fun (_Ch) ->
|
||||
timer:sleep(3000),
|
||||
[_L1 | _L2] = rabbit_ct_broker_helpers:rpc(Config, 0,
|
||||
rabbit_federation_status, status, []),
|
||||
MFs = get_metrics(Config),
|
||||
[?TWO_RUNNING_METRIC] = MFs
|
||||
|
||||
end, upstream_downstream() ++ [q(<<"fed.downstream2">>, [{<<"x-queue-type">>, longstr, <<"classic">>}])]).
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
%%
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
upstream_downstream() ->
|
||||
[q(<<"upstream">>, undefined), q(<<"fed.downstream">>, undefined)].
|
||||
|
||||
get_metrics(Config) ->
|
||||
rabbit_ct_broker_helpers:rpc(Config, 0,
|
||||
rabbitmq_prometheus_collector_test_proxy, collect_mf,
|
||||
[default, prometheus_rabbitmq_federation_collector]).
|
|
@ -0,0 +1,12 @@
|
|||
-module(rabbitmq_prometheus_collector_test_proxy).
|
||||
|
||||
-export([collect_mf/2]).
|
||||
|
||||
-define(PD_KEY, metric_families).
|
||||
|
||||
collect_mf(Registry, Collector) ->
|
||||
put(?PD_KEY, []),
|
||||
Collector:collect_mf(Registry, fun(MF) -> put(?PD_KEY, [MF | get(?PD_KEY)]) end),
|
||||
MFs = lists:reverse(get(?PD_KEY)),
|
||||
erase(?PD_KEY),
|
||||
MFs.
|
|
@ -1109,6 +1109,7 @@ rabbitmq_prometheus:
|
|||
- prometheus_rabbitmq_alarm_metrics_collector
|
||||
- prometheus_rabbitmq_core_metrics_collector
|
||||
- prometheus_rabbitmq_dynamic_collector
|
||||
- prometheus_rabbitmq_federation_collector
|
||||
- prometheus_rabbitmq_global_metrics_collector
|
||||
- rabbit_prometheus_app
|
||||
- rabbit_prometheus_dispatcher
|
||||
|
|
Loading…
Reference in New Issue