Add dynamic and static promethues metric gauge
This commit is contained in:
parent
93d1ac9bb8
commit
4c44ebd8eb
|
|
@ -54,6 +54,7 @@ rabbitmq_app(
|
|||
"//deps/rabbit:erlang_app",
|
||||
"//deps/rabbitmq_federation:erlang_app",
|
||||
"//deps/rabbitmq_management_agent:erlang_app",
|
||||
"//deps/rabbitmq_shovel:erlang_app",
|
||||
"//deps/rabbitmq_web_dispatch:erlang_app",
|
||||
"@accept//:erlang_app",
|
||||
"@cowboy//:erlang_app",
|
||||
|
|
@ -108,6 +109,14 @@ rabbitmq_integration_suite(
|
|||
],
|
||||
)
|
||||
|
||||
rabbitmq_integration_suite(
|
||||
name = "prometheus_rabbitmq_shovel_collector_SUITE",
|
||||
size = "small",
|
||||
additional_beam = [
|
||||
"test/rabbitmq_prometheus_collector_test_proxy.beam", #keep
|
||||
],
|
||||
)
|
||||
|
||||
assert_suites()
|
||||
|
||||
alias(
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ PROJECT_DESCRIPTION = Prometheus metrics for RabbitMQ
|
|||
PROJECT_MOD := rabbit_prometheus_app
|
||||
DEPS = accept cowboy rabbit rabbitmq_management_agent prometheus rabbitmq_web_dispatch rabbitmq_federation
|
||||
BUILD_DEPS = amqp_client rabbit_common rabbitmq_management
|
||||
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers eunit_formatters
|
||||
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers eunit_formatters rabbitmq_shovel
|
||||
|
||||
EUNIT_OPTS = no_tty, {report, {eunit_progress, [colored, profile]}}
|
||||
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ def all_beam_files(name = "all_beam_files"):
|
|||
"src/collectors/prometheus_rabbitmq_dynamic_collector.erl",
|
||||
"src/collectors/prometheus_rabbitmq_federation_collector.erl",
|
||||
"src/collectors/prometheus_rabbitmq_global_metrics_collector.erl",
|
||||
"src/collectors/prometheus_rabbitmq_shovel_collector.erl",
|
||||
"src/rabbit_prometheus_app.erl",
|
||||
"src/rabbit_prometheus_dispatcher.erl",
|
||||
"src/rabbit_prometheus_handler.erl",
|
||||
|
|
@ -46,6 +47,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
|
|||
"src/collectors/prometheus_rabbitmq_dynamic_collector.erl",
|
||||
"src/collectors/prometheus_rabbitmq_federation_collector.erl",
|
||||
"src/collectors/prometheus_rabbitmq_global_metrics_collector.erl",
|
||||
"src/collectors/prometheus_rabbitmq_shovel_collector.erl",
|
||||
"src/rabbit_prometheus_app.erl",
|
||||
"src/rabbit_prometheus_dispatcher.erl",
|
||||
"src/rabbit_prometheus_handler.erl",
|
||||
|
|
@ -88,6 +90,7 @@ def all_srcs(name = "all_srcs"):
|
|||
"src/collectors/prometheus_rabbitmq_dynamic_collector.erl",
|
||||
"src/collectors/prometheus_rabbitmq_federation_collector.erl",
|
||||
"src/collectors/prometheus_rabbitmq_global_metrics_collector.erl",
|
||||
"src/collectors/prometheus_rabbitmq_shovel_collector.erl",
|
||||
"src/rabbit_prometheus_app.erl",
|
||||
"src/rabbit_prometheus_dispatcher.erl",
|
||||
"src/rabbit_prometheus_handler.erl",
|
||||
|
|
@ -142,3 +145,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
|
|||
app_name = "rabbitmq_prometheus",
|
||||
erlc_opts = "//:test_erlc_opts",
|
||||
)
|
||||
erlang_bytecode(
|
||||
name = "prometheus_rabbitmq_shovel_collector_SUITE_beam_files",
|
||||
testonly = True,
|
||||
srcs = ["test/prometheus_rabbitmq_shovel_collector_SUITE.erl"],
|
||||
outs = ["test/prometheus_rabbitmq_shovel_collector_SUITE.beam"],
|
||||
app_name = "rabbitmq_prometheus",
|
||||
erlc_opts = "//:test_erlc_opts",
|
||||
deps = ["//deps/amqp_client:erlang_app", "@prometheus//:erlang_app"],
|
||||
)
|
||||
|
|
|
|||
45
deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_shovel_collector.erl
vendored
Normal file
45
deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_shovel_collector.erl
vendored
Normal file
|
|
@ -0,0 +1,45 @@
|
|||
%% This Source Code Form is subject to the terms of the Mozilla Public
|
||||
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
%%
|
||||
%% Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
||||
%%
|
||||
-module(prometheus_rabbitmq_shovel_collector).
|
||||
-export([deregister_cleanup/1,
|
||||
collect_mf/2]).
|
||||
|
||||
-import(prometheus_model_helpers, [create_mf/4]).
|
||||
|
||||
-behaviour(prometheus_collector).
|
||||
|
||||
%% API exports
|
||||
-export([]).
|
||||
|
||||
%%====================================================================
|
||||
%% Collector API
|
||||
%%====================================================================
|
||||
|
||||
deregister_cleanup(_) -> ok.
|
||||
|
||||
collect_mf(_Registry, Callback) ->
|
||||
Status = rabbit_shovel_status:status(500),
|
||||
{StaticStatusGroups, DynamicStatusGroups} = lists:foldl(fun({_,static,{S, _}, _}, {SMap, DMap}) ->
|
||||
{maps:update_with(S, fun(C) -> C + 1 end, 1, SMap), DMap};
|
||||
({_,dynamic,{S, _}, _}, {SMap, DMap}) ->
|
||||
{SMap, maps:update_with(S, fun(C) -> C + 1 end, 1, DMap)}
|
||||
end, {#{}, #{}}, Status),
|
||||
|
||||
Metrics = [{rabbitmq_shovel_dynamic, gauge, "Current number of dynamic shovels.",
|
||||
[{[{status, S}], C} || {S, C} <- maps:to_list(DynamicStatusGroups)]},
|
||||
{rabbitmq_shovel_static, gauge, "Current number of static shovels.",
|
||||
[{[{status, S}], C} || {S, C} <- maps:to_list(StaticStatusGroups)]}
|
||||
],
|
||||
_ = [add_metric_family(Metric, Callback) || Metric <- Metrics],
|
||||
ok.
|
||||
|
||||
add_metric_family({Name, Type, Help, Metrics}, Callback) ->
|
||||
Callback(create_mf(Name, Help, Type, Metrics)).
|
||||
|
||||
%%====================================================================
|
||||
%% Private Parts
|
||||
%%====================================================================
|
||||
|
|
@ -19,6 +19,7 @@ build_dispatcher() ->
|
|||
prometheus_rabbitmq_alarm_metrics_collector,
|
||||
prometheus_rabbitmq_dynamic_collector,
|
||||
prometheus_rabbitmq_federation_collector,
|
||||
prometheus_rabbitmq_shovel_collector,
|
||||
prometheus_process_collector]),
|
||||
prometheus_registry:register_collectors('per-object', [
|
||||
prometheus_vm_system_info_collector,
|
||||
|
|
|
|||
253
deps/rabbitmq_prometheus/test/prometheus_rabbitmq_shovel_collector_SUITE.erl
vendored
Normal file
253
deps/rabbitmq_prometheus/test/prometheus_rabbitmq_shovel_collector_SUITE.erl
vendored
Normal file
|
|
@ -0,0 +1,253 @@
|
|||
%% This Source Code Form is subject to the terms of the Mozilla Public
|
||||
%% License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
%%
|
||||
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
|
||||
%%
|
||||
|
||||
-module(prometheus_rabbitmq_shovel_collector_SUITE).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("amqp_client/include/amqp_client.hrl").
|
||||
-include_lib("prometheus/include/prometheus_model.hrl").
|
||||
|
||||
-compile(export_all).
|
||||
|
||||
-define(DYN_RUNNING_METRIC(Gauge), #'MetricFamily'{name = <<"rabbitmq_shovel_dynamic">>,
|
||||
help = "Current number of dynamic shovels.",type = 'GAUGE',
|
||||
metric = [#'Metric'{label = [#'LabelPair'{name = <<"status">>,
|
||||
value = <<"running">>}],
|
||||
gauge = #'Gauge'{value = Gauge},
|
||||
counter = undefined,summary = undefined,untyped = undefined,
|
||||
histogram = undefined,timestamp_ms = undefined}]}).
|
||||
|
||||
-define(STAT_RUNNING_METRIC(Gauge), #'MetricFamily'{name = <<"rabbitmq_shovel_static">>,
|
||||
help = "Current number of static shovels.",type = 'GAUGE',
|
||||
metric = [#'Metric'{label = [#'LabelPair'{name = <<"status">>,
|
||||
value = <<"running">>}],
|
||||
gauge = #'Gauge'{value = Gauge},
|
||||
counter = undefined,summary = undefined,untyped = undefined,
|
||||
histogram = undefined,timestamp_ms = undefined}]}).
|
||||
|
||||
-define(EMPTY_DYN_METRIC, #'MetricFamily'{name = <<"rabbitmq_shovel_dynamic">>,
|
||||
help = "Current number of dynamic shovels.",type = 'GAUGE',
|
||||
metric = []}).
|
||||
|
||||
-define(EMPTY_STAT_METRIC, #'MetricFamily'{name = <<"rabbitmq_shovel_static">>,
|
||||
help = "Current number of static shovels.",type = 'GAUGE',
|
||||
metric = []}).
|
||||
|
||||
|
||||
all() ->
|
||||
[
|
||||
{group, non_parallel_tests}
|
||||
].
|
||||
|
||||
groups() ->
|
||||
[
|
||||
{non_parallel_tests, [], [
|
||||
dynamic,
|
||||
static,
|
||||
mix
|
||||
]}
|
||||
].
|
||||
|
||||
suite() ->
|
||||
[{timetrap, {minutes, 5}}].
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
%% Testsuite setup/teardown.
|
||||
%% -------------------------------------------------------------------
|
||||
init_per_suite(Config) ->
|
||||
rabbit_ct_helpers:log_environment(),
|
||||
Config1 = rabbit_ct_helpers:set_config(Config, [
|
||||
{rmq_nodename_suffix, ?MODULE},
|
||||
{ignored_crashes, [
|
||||
"server_initiated_close,404",
|
||||
"writer,send_failed,closed"
|
||||
]}
|
||||
]),
|
||||
rabbit_ct_helpers:run_setup_steps(Config1,
|
||||
rabbit_ct_broker_helpers:setup_steps() ++
|
||||
rabbit_ct_client_helpers:setup_steps()).
|
||||
|
||||
end_per_suite(Config) ->
|
||||
rabbit_ct_helpers:run_teardown_steps(Config,
|
||||
rabbit_ct_client_helpers:teardown_steps() ++
|
||||
rabbit_ct_broker_helpers:teardown_steps()).
|
||||
|
||||
init_per_group(_, Config) ->
|
||||
Config.
|
||||
|
||||
end_per_group(_, Config) ->
|
||||
Config.
|
||||
|
||||
init_per_testcase(Testcase, Config) ->
|
||||
rabbit_ct_helpers:testcase_started(Config, Testcase).
|
||||
|
||||
end_per_testcase(Testcase, Config) ->
|
||||
rabbit_ct_helpers:testcase_finished(Config, Testcase).
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
%% Test cases
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
dynamic(Config) ->
|
||||
create_dynamic_shovel(Config, <<"test">>),
|
||||
running = get_shovel_status(Config, <<"test">>),
|
||||
[?DYN_RUNNING_METRIC(1), ?EMPTY_STAT_METRIC] = get_metrics(Config),
|
||||
create_dynamic_shovel(Config, <<"test2">>),
|
||||
running = get_shovel_status(Config, <<"test2">>),
|
||||
[?DYN_RUNNING_METRIC(2), ?EMPTY_STAT_METRIC] = get_metrics(Config),
|
||||
clear_param(Config, <<"test">>),
|
||||
clear_param(Config, <<"test2">>),
|
||||
[?EMPTY_DYN_METRIC, ?EMPTY_STAT_METRIC] = get_metrics(Config),
|
||||
ok.
|
||||
|
||||
static(Config) ->
|
||||
create_static_shovel(Config, static_shovel),
|
||||
[?EMPTY_DYN_METRIC, ?STAT_RUNNING_METRIC(1)] = get_metrics(Config),
|
||||
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, clear_shovel,
|
||||
[]),
|
||||
[?EMPTY_DYN_METRIC, ?EMPTY_STAT_METRIC] = get_metrics(Config),
|
||||
ok.
|
||||
|
||||
|
||||
mix(Config) ->
|
||||
create_dynamic_shovel(Config, <<"test">>),
|
||||
running = get_shovel_status(Config, <<"test">>),
|
||||
create_static_shovel(Config, static_shovel),
|
||||
|
||||
[?DYN_RUNNING_METRIC(1), ?STAT_RUNNING_METRIC(1)] = get_metrics(Config),
|
||||
|
||||
clear_param(Config, <<"test">>),
|
||||
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, clear_shovel,
|
||||
[]),
|
||||
[?EMPTY_DYN_METRIC, ?EMPTY_STAT_METRIC] = get_metrics(Config),
|
||||
ok.
|
||||
|
||||
%% -------------------------------------------------------------------
|
||||
%% Internal
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
get_metrics(Config) ->
|
||||
rabbit_ct_broker_helpers:rpc(Config, 0,
|
||||
rabbitmq_prometheus_collector_test_proxy, collect_mf,
|
||||
[default, prometheus_rabbitmq_shovel_collector]).
|
||||
|
||||
create_static_shovel(Config, Name) ->
|
||||
SourceQueue = <<"source-queue">>,
|
||||
DestQueue = <<"dest-queue">>,
|
||||
Hostname = ?config(rmq_hostname, Config),
|
||||
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
|
||||
Shovel = [{Name,
|
||||
[{source,
|
||||
[{protocol, amqp10},
|
||||
{uris, [rabbit_misc:format("amqp://~ts:~b",
|
||||
[Hostname, Port])]},
|
||||
{source_address, SourceQueue}]
|
||||
},
|
||||
{destination,
|
||||
[{uris, [rabbit_misc:format("amqp://~ts:~b/%2f?heartbeat=5",
|
||||
[Hostname, Port])]},
|
||||
{declarations,
|
||||
[{'queue.declare', [{queue, DestQueue}, auto_delete]}]},
|
||||
{publish_fields, [{exchange, <<>>},
|
||||
{routing_key, DestQueue}]},
|
||||
{publish_properties, [{delivery_mode, 2},
|
||||
{content_type, <<"shovelled">>}]},
|
||||
{add_forward_headers, true},
|
||||
{add_timestamp_header, true}]},
|
||||
{queue, <<>>},
|
||||
{ack_mode, no_ack}
|
||||
]}],
|
||||
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, setup_shovel,
|
||||
[Shovel, Name]).
|
||||
|
||||
setup_shovel(ShovelConfig, Name) ->
|
||||
_ = application:stop(rabbitmq_shovel),
|
||||
application:set_env(rabbitmq_shovel, shovels, ShovelConfig, infinity),
|
||||
ok = application:start(rabbitmq_shovel),
|
||||
await_shovel(Name, static).
|
||||
|
||||
clear_shovel() ->
|
||||
_ = application:stop(rabbitmq_shovel),
|
||||
application:unset_env(rabbitmq_shovel, shovels, infinity),
|
||||
ok = application:start(rabbitmq_shovel).
|
||||
|
||||
make_uri(Config, Node) ->
|
||||
Hostname = ?config(rmq_hostname, Config),
|
||||
Port = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_amqp),
|
||||
list_to_binary(lists:flatten(io_lib:format("amqp://~ts:~b",
|
||||
[Hostname, Port]))).
|
||||
|
||||
create_dynamic_shovel(Config, Name) ->
|
||||
Node = 0,
|
||||
QueueNode = 0,
|
||||
Uri = make_uri(Config, QueueNode),
|
||||
Value = [{<<"src-queue">>, <<"src">>},
|
||||
{<<"dest-queue">>, <<"dest">>}],
|
||||
ok = rabbit_ct_broker_helpers:rpc(
|
||||
Config,
|
||||
Node,
|
||||
rabbit_runtime_parameters,
|
||||
set, [
|
||||
<<"/">>, <<"shovel">>, Name, [{<<"src-uri">>, Uri},
|
||||
{<<"dest-uri">>, [Uri]} |
|
||||
Value], none]),
|
||||
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, await_shovel,
|
||||
[Name, dynamic]).
|
||||
|
||||
await_shovel(Name, Type) ->
|
||||
Ret = await(fun() ->
|
||||
Status = shovels_from_status(running, Type),
|
||||
lists:member(Name, Status)
|
||||
end, 30_000),
|
||||
Ret.
|
||||
|
||||
shovels_from_status(ExpectedState, dynamic) ->
|
||||
S = rabbit_shovel_status:status(),
|
||||
[N || {{<<"/">>, N}, dynamic, {State, _}, _} <- S, State == ExpectedState];
|
||||
shovels_from_status(ExpectedState, static) ->
|
||||
S = rabbit_shovel_status:status(),
|
||||
[N || {N, static, {State, _}, _} <- S, State == ExpectedState].
|
||||
|
||||
get_shovel_status(Config, Name) ->
|
||||
get_shovel_status(Config, 0, Name).
|
||||
|
||||
get_shovel_status(Config, Node, Name) ->
|
||||
S = rabbit_ct_broker_helpers:rpc(
|
||||
Config, Node, rabbit_shovel_status, lookup, [{<<"/">>, Name}]),
|
||||
case S of
|
||||
not_found ->
|
||||
not_found;
|
||||
_ ->
|
||||
{Status, Info} = proplists:get_value(info, S),
|
||||
proplists:get_value(blocked_status, Info, Status)
|
||||
end.
|
||||
|
||||
await(Pred) ->
|
||||
case Pred() of
|
||||
true -> ok;
|
||||
false -> timer:sleep(100),
|
||||
await(Pred)
|
||||
end.
|
||||
|
||||
await(_Pred, Timeout) when Timeout =< 0 ->
|
||||
error(await_timeout);
|
||||
await(Pred, Timeout) ->
|
||||
case Pred() of
|
||||
true -> ok;
|
||||
Other when Timeout =< 100 ->
|
||||
error({await_timeout, Other});
|
||||
_ -> timer:sleep(100),
|
||||
await(Pred, Timeout - 100)
|
||||
end.
|
||||
|
||||
clear_param(Config, Name) ->
|
||||
clear_param(Config, 0, Name).
|
||||
|
||||
clear_param(Config, Node, Name) ->
|
||||
rabbit_ct_broker_helpers:rpc(Config, Node,
|
||||
rabbit_runtime_parameters, clear, [<<"/">>, <<"shovel">>, Name, <<"acting-user">>]).
|
||||
|
|
@ -14,6 +14,7 @@
|
|||
report_blocked_status/2,
|
||||
remove/1,
|
||||
status/0,
|
||||
status/1,
|
||||
lookup/1,
|
||||
cluster_status/0,
|
||||
cluster_status_with_nodes/0,
|
||||
|
|
@ -70,7 +71,9 @@ remove(Name) ->
|
|||
%% format without a feature flag.
|
||||
-spec status() -> [status_tuple()].
|
||||
status() ->
|
||||
gen_server:call(?SERVER, status, infinity).
|
||||
status(infinity).
|
||||
status(Timeout) ->
|
||||
gen_server:call(?SERVER, status, Timeout).
|
||||
|
||||
-spec cluster_status() -> [status_tuple()].
|
||||
cluster_status() ->
|
||||
|
|
@ -229,4 +232,3 @@ blocked_status_to_info(#entry{info = {running, Info},
|
|||
{running, Info ++ [{blocked_status, BlockedStatus}]};
|
||||
blocked_status_to_info(#entry{info = Info}) ->
|
||||
Info.
|
||||
|
||||
|
|
|
|||
|
|
@ -1086,6 +1086,7 @@ rabbitmq_prometheus:
|
|||
- prometheus_rabbitmq_dynamic_collector
|
||||
- prometheus_rabbitmq_federation_collector
|
||||
- prometheus_rabbitmq_global_metrics_collector
|
||||
- prometheus_rabbitmq_shovel_collector
|
||||
- rabbit_prometheus_app
|
||||
- rabbit_prometheus_dispatcher
|
||||
- rabbit_prometheus_handler
|
||||
|
|
|
|||
Loading…
Reference in New Issue