diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl index e33d8987f1..9a848e8a33 100644 --- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl @@ -167,21 +167,24 @@ ]}, {queue_metrics, [ - {2, queue_consumers, gauge, "Consumers on a queue", queue_consumers}, - {2, queue_process_memory_bytes, gauge, "Memory in bytes used by the Erlang queue process", queue_memory}, - {2, queue_messages_bytes, gauge, "Size in bytes of ready and unacknowledged messages", queue_messages_bytes}, - {2, queue_messages_ram, gauge, "Ready and unacknowledged messages stored in memory", queue_messages_ram}, - {2, queue_messages_ready_ram, gauge, "Ready messages stored in memory", queue_messages_ready_ram}, - {2, queue_messages_ready_bytes, gauge, "Size in bytes of ready messages", queue_messages_bytes_ready}, - {2, queue_messages_unacked_ram, gauge, "Unacknowledged messages stored in memory", queue_messages_unacknowledged_ram}, - {2, queue_messages_unacked_bytes, gauge, "Size in bytes of all unacknowledged messages", queue_messages_bytes_unacknowledged}, - {2, queue_messages_persistent, gauge, "Persistent messages", queue_messages_persistent}, - {2, queue_messages_persistent_bytes, gauge, "Size in bytes of persistent messages", queue_messages_bytes_persistent}, - {2, queue_messages_paged_out, gauge, "Messages paged out to disk", queue_messages_paged_out}, - {2, queue_messages_paged_out_bytes, gauge, "Size in bytes of messages paged out to disk", queue_messages_bytes_paged_out}, + {2, queue_consumers, gauge, "Consumers on a queue", consumers}, + {2, queue_consumer_utilisation, gauge, "Consumer utilisation", consumer_utilisation}, + {2, queue_process_memory_bytes, gauge, "Memory in bytes used by the Erlang queue process", memory}, + {2, queue_messages_ram, gauge, "Ready and unacknowledged messages stored in memory", messages_ram}, + {2, queue_messages_ram_bytes, gauge, "Size of ready and unacknowledged messages stored in memory", message_bytes_ram}, + {2, queue_messages_ready_ram, gauge, "Ready messages stored in memory", messages_ready_ram}, + {2, queue_messages_unacked_ram, gauge, "Unacknowledged messages stored in memory", messages_unacknowledged_ram}, + {2, queue_messages_persistent, gauge, "Persistent messages", messages_persistent}, + {2, queue_messages_persistent_bytes, gauge, "Size in bytes of persistent messages", message_bytes_persistent}, + {2, queue_messages_bytes, gauge, "Size in bytes of ready and unacknowledged messages", message_bytes}, + {2, queue_messages_ready_bytes, gauge, "Size in bytes of ready messages", message_bytes_ready}, + {2, queue_messages_unacked_bytes, gauge, "Size in bytes of all unacknowledged messages", message_bytes_unacknowledged}, + {2, queue_messages_paged_out, gauge, "Messages paged out to disk", messages_paged_out}, + {2, queue_messages_paged_out_bytes, gauge, "Size in bytes of messages paged out to disk", message_bytes_paged_out}, {2, queue_disk_reads_total, counter, "Total number of times queue read messages from disk", disk_reads}, {2, queue_disk_writes_total, counter, "Total number of times queue wrote messages to disk", disk_writes} ]} + ]). % Some metrics require to be converted, mostly those that represent time. diff --git a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl index bac216108e..e63b74cbf1 100644 --- a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl +++ b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl @@ -82,9 +82,15 @@ init_per_group(with_metrics, Config0) -> #amqp_msg{payload = <<"msg">>}), timer:sleep(150), {#'basic.get_ok'{}, #amqp_msg{}} = amqp_channel:call(Ch, #'basic.get'{queue = Q}), + %% We want to check consumer metrics, so we need at least 1 consumer bound + %% but we don't care what it does if anything as long as the runner process does + %% not have to handle the consumer's messages. + ConsumerPid = sleeping_consumer(), + #'basic.consume_ok'{consumer_tag = CTag} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q}, ConsumerPid), timer:sleep(10000), - Config2 ++ [{channel_pid, Ch}, {queue_name, Q}]. + Config2 ++ [{channel_pid, Ch}, {queue_name, Q}, {consumer_tag, CTag}, {consumer_pid, ConsumerPid}]. init_per_group(Group, Config0, Extra) -> rabbit_ct_helpers:log_environment(), @@ -96,6 +102,10 @@ init_per_group(Group, Config0, Extra) -> end_per_group(with_metrics, Config) -> Ch = ?config(channel_pid, Config), + CTag = ?config(consumer_tag, Config), + amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}), + ConsumerPid = ?config(consumer_pid, Config), + ConsumerPid ! stop, amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}), rabbit_ct_client_helpers:close_channel(Ch), end_per_group_(Config); @@ -113,6 +123,20 @@ init_per_testcase(Testcase, Config) -> end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). +%% a no-op consumer +sleeping_consumer_loop() -> + receive + stop -> ok; + #'basic.consume_ok'{} -> sleeping_consumer_loop(); + #'basic.cancel'{} -> sleeping_consumer_loop(); + {#'basic.deliver'{}, _Payload} -> sleeping_consumer_loop() + end. + +sleeping_consumer() -> + spawn(fun() -> + sleeping_consumer_loop() + end). + %% ------------------------------------------------------------------- %% Testcases. %% ------------------------------------------------------------------- @@ -170,8 +194,7 @@ metrics_test(Config) -> ?assertEqual(match, re:run(Body, "^rabbitmq_io_read_ops_total ", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_raft_term_total{", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_queue_messages_ready{", [{capture, none}, multiline])), - %% This metric has no value, we are checking just that it is defined - ?assertEqual(match, re:run(Body, " rabbitmq_queue_consumers ", [{capture, none}])), + ?assertEqual(match, re:run(Body, "^rabbitmq_queue_consumers{", [{capture, none}, multiline])), %% Checking the first metric value in each ETS table that requires converting ?assertEqual(match, re:run(Body, "^rabbitmq_erlang_uptime_seconds ", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_io_read_time_seconds_total ", [{capture, none}, multiline])),