diff --git a/deps/rabbit/include/rabbit_global_counters.hrl b/deps/rabbit/include/rabbit_global_counters.hrl index f4eac1268e..ae19ce4f09 100644 --- a/deps/rabbit/include/rabbit_global_counters.hrl +++ b/deps/rabbit/include/rabbit_global_counters.hrl @@ -1,2 +1,41 @@ -define(NUM_PROTOCOL_COUNTERS, 8). --define(NUM_PROTOCOL_QUEUE_TYPE, 8). +-define(NUM_PROTOCOL_QUEUE_TYPE_COUNTERS, 8). + +%% Dead Letter counters: +%% +%% The following two counters are mutually exclusive because +%% quorum queue dead-letter-strategy at-least-once is incompatible with overflow drop-head. +-define(MESSAGES_DEAD_LETTERED_MAXLEN, 1). +-define(MESSAGES_DEAD_LETTERED_CONFIRMED, 1). +-define(MESSAGES_DEAD_LETTERED_EXPIRED, 2). +-define(MESSAGES_DEAD_LETTERED_REJECTED, 3). +-define(MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT, 4). + +-define(MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER, + {messages_dead_lettered_maxlen_total, ?MESSAGES_DEAD_LETTERED_MAXLEN, counter, + "Total number of messages dead-lettered due to overflow drop-head or reject-publish-dlx" + }). + +-define(MESSAGES_DEAD_LETTERED_CONFIRMED_COUNTER, + { + messages_dead_lettered_confirmed_total, ?MESSAGES_DEAD_LETTERED_CONFIRMED, counter, + "Total number of messages dead-lettered and confirmed by target queues" + }). + +-define(MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER, + { + messages_dead_lettered_expired_total, ?MESSAGES_DEAD_LETTERED_EXPIRED, counter, + "Total number of messages dead-lettered due to message TTL exceeded" + }). + +-define(MESSAGES_DEAD_LETTERED_REJECTED_COUNTER, + { + messages_dead_lettered_rejected_total, ?MESSAGES_DEAD_LETTERED_REJECTED, counter, + "Total number of messages dead-lettered due to basic.reject or basic.nack" + }). + +-define(MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT_COUNTER, + { + messages_dead_lettered_delivery_limit_total, ?MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT, counter, + "Total number of messages dead-lettered due to delivery-limit exceeded" + }). diff --git a/deps/rabbit/src/rabbit_global_counters.erl b/deps/rabbit/src/rabbit_global_counters.erl index 7855ba5b5a..6dfca8f2d1 100644 --- a/deps/rabbit/src/rabbit_global_counters.erl +++ b/deps/rabbit/src/rabbit_global_counters.erl @@ -7,6 +7,8 @@ -module(rabbit_global_counters). +-include("rabbit_global_counters.hrl"). + -export([ boot_step/0, init/1, @@ -128,57 +130,48 @@ } ]). --define(MESSAGES_DEAD_LETTERED_EXPIRED, 1). --define(MESSAGES_DEAD_LETTERED_REJECTED, 2). -%% The following two counters are mutually exclusive because -%% quorum queue dead-letter-strategy at-least-once is incompatible with overflow drop-head. --define(MESSAGES_DEAD_LETTERED_MAXLEN, 3). --define(MESSAGES_DEAD_LETTERED_CONFIRMED, 3). --define(MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT, 4). --define(MESSAGES_DEAD_LETTERED_COUNTERS, - [ - { - messages_dead_lettered_expired_total, ?MESSAGES_DEAD_LETTERED_EXPIRED, counter, - "Total number of messages dead-lettered due to message TTL exceeded" - }, - { - messages_dead_lettered_rejected_total, ?MESSAGES_DEAD_LETTERED_REJECTED, counter, - "Total number of messages dead-lettered due to basic.reject or basic.nack" - } - ]). --define(MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER, - { - messages_dead_lettered_maxlen_total, ?MESSAGES_DEAD_LETTERED_MAXLEN, counter, - "Total number of messages dead-lettered due to overflow drop-head or reject-publish-dlx" - }). --define(MESSAGES_DEAD_LETTERED_CONFIRMED_COUNTER, - { - messages_dead_lettered_confirmed_total, ?MESSAGES_DEAD_LETTERED_CONFIRMED, counter, - "Total number of messages dead-lettered and confirmed by target queues" - }). --define(MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT_COUNTER, - { - messages_dead_lettered_delivery_limit_total, ?MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT, counter, - "Total number of messages dead-lettered due to delivery-limit exceeded" - }). - boot_step() -> + %% Protocol counters init([{protocol, amqp091}]), + + %% Protocol & Queue Type counters init([{protocol, amqp091}, {queue_type, rabbit_classic_queue}]), init([{protocol, amqp091}, {queue_type, rabbit_quorum_queue}]), init([{protocol, amqp091}, {queue_type, rabbit_stream_queue}]), + + %% Dead Letter counters + %% + %% Streams never dead letter. + %% + %% Source classic queue dead letters. init([{queue_type, rabbit_classic_queue}, {dead_letter_strategy, disabled}], - [?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER]), + [?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER, + ?MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER, + ?MESSAGES_DEAD_LETTERED_REJECTED_COUNTER]), init([{queue_type, rabbit_classic_queue}, {dead_letter_strategy, at_most_once}], - [?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER]), + [?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER, + ?MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER, + ?MESSAGES_DEAD_LETTERED_REJECTED_COUNTER]), + %% + %% Source quorum queue dead letters. + %% Only quorum queues can dead letter due to delivery-limit exceeded. + %% Only quorum queues support dead letter strategy at-least-once. init([{queue_type, rabbit_quorum_queue}, {dead_letter_strategy, disabled}], [?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER, - ?MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT_COUNTER]), + ?MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER, + ?MESSAGES_DEAD_LETTERED_REJECTED_COUNTER, + ?MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT_COUNTER + ]), init([{queue_type, rabbit_quorum_queue}, {dead_letter_strategy, at_most_once}], [?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER, - ?MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT_COUNTER]), + ?MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER, + ?MESSAGES_DEAD_LETTERED_REJECTED_COUNTER, + ?MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT_COUNTER + ]), init([{queue_type, rabbit_quorum_queue}, {dead_letter_strategy, at_least_once}], [?MESSAGES_DEAD_LETTERED_CONFIRMED_COUNTER, + ?MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER, + ?MESSAGES_DEAD_LETTERED_REJECTED_COUNTER, ?MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT_COUNTER ]). @@ -193,9 +186,9 @@ init(Labels = [{protocol, Protocol}], Extra) -> _ = seshat:new_group(?MODULE), Counters = seshat:new(?MODULE, Labels, ?PROTOCOL_COUNTERS ++ Extra), persistent_term:put({?MODULE, Protocol}, Counters); -init(Labels = [{queue_type, QueueType}, {dead_letter_strategy, DLS}], Extra) -> +init(Labels = [{queue_type, QueueType}, {dead_letter_strategy, DLS}], DeadLetterCounters) -> _ = seshat:new_group(?MODULE), - Counters = seshat:new(?MODULE, Labels, ?MESSAGES_DEAD_LETTERED_COUNTERS ++ Extra), + Counters = seshat:new(?MODULE, Labels, DeadLetterCounters), persistent_term:put({?MODULE, QueueType, DLS}, Counters). overview() -> @@ -263,9 +256,9 @@ consumer_deleted(Protocol) -> messages_dead_lettered(Reason, QueueType, DeadLetterStrategy, Num) -> Index = case Reason of + maxlen -> ?MESSAGES_DEAD_LETTERED_MAXLEN; expired -> ?MESSAGES_DEAD_LETTERED_EXPIRED; rejected -> ?MESSAGES_DEAD_LETTERED_REJECTED; - maxlen -> ?MESSAGES_DEAD_LETTERED_MAXLEN; delivery_limit -> ?MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT end, counters:add(fetch(QueueType, DeadLetterStrategy), Index, Num). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl index 19cd58bfb0..cfdc76e2de 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl @@ -11,6 +11,7 @@ -include("rabbit_mqtt.hrl"). -include("rabbit_mqtt_packet.hrl"). +-include_lib("rabbit/include/rabbit_global_counters.hrl"). -include_lib("stdlib/include/assert.hrl"). -export([start/2, stop/1]). @@ -93,16 +94,18 @@ local_connection_pids() -> end. init_global_counters() -> - init_global_counters(?MQTT_PROTO_V3), - init_global_counters(?MQTT_PROTO_V4), - init_global_counters(?MQTT_PROTO_V5). + lists:foreach(fun init_global_counters/1, [?MQTT_PROTO_V3, + ?MQTT_PROTO_V4, + ?MQTT_PROTO_V5]). init_global_counters(ProtoVer) -> Proto = {protocol, ProtoVer}, rabbit_global_counters:init([Proto]), - rabbit_global_counters:init([Proto, {queue_type, ?QUEUE_TYPE_QOS_0}]), rabbit_global_counters:init([Proto, {queue_type, rabbit_classic_queue}]), - rabbit_global_counters:init([Proto, {queue_type, rabbit_quorum_queue}]). + rabbit_global_counters:init([Proto, {queue_type, rabbit_quorum_queue}]), + rabbit_global_counters:init([Proto, {queue_type, ?QUEUE_TYPE_QOS_0}]), + rabbit_global_counters:init([{queue_type, ?QUEUE_TYPE_QOS_0}, {dead_letter_strategy, disabled}], + [?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER]). persist_static_configuration() -> rabbit_mqtt_util:init_sparkplug(), diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index b77aeb576f..abf62d193d 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -1982,6 +1982,8 @@ handle_queue_event({queue_event, ?QUEUE_TYPE_QOS_0, Msg}, false -> deliver_one_to_client(Msg, false, State0); true -> + rabbit_global_counters:messages_dead_lettered( + maxlen, ?QUEUE_TYPE_QOS_0, disabled, 1), State0#state{qos0_messages_dropped = N + 1} end, {ok, State}; diff --git a/deps/rabbitmq_mqtt/test/reader_SUITE.erl b/deps/rabbitmq_mqtt/test/reader_SUITE.erl index b669c9eb1c..fa79c03033 100644 --- a/deps/rabbitmq_mqtt/test/reader_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/reader_SUITE.erl @@ -254,7 +254,22 @@ event_authentication_failure(Config) -> %% Test that queue type rabbit_mqtt_qos0_queue drops QoS 0 messages when its %% max length is reached. rabbit_mqtt_qos0_queue_overflow(Config) -> - ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, rabbit_mqtt_qos0_queue), + ProtoVer = case ?config(mqtt_version, Config) of + v4 -> mqtt311; + v5 -> mqtt50 + end, + QType = rabbit_mqtt_qos0_queue, + + #{ + [{protocol, ProtoVer}, {queue_type, QType}] := + #{messages_delivered_total := 0, + messages_delivered_consume_auto_ack_total := 0}, + + [{queue_type, QType}, {dead_letter_strategy, disabled}] := + #{messages_dead_lettered_maxlen_total := NumDeadLettered} + } = rabbit_ct_broker_helpers:rpc(Config, rabbit_global_counters, overview, []), + + ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, QType), Topic = atom_to_binary(?FUNCTION_NAME), Msg = binary:copy(<<"x">>, 4000), @@ -289,7 +304,8 @@ rabbit_mqtt_qos0_queue_overflow(Config) -> {status, _, _, [_, _, _, _, Misc]} = sys:get_status(ServerConnectionPid), [State] = [S || {data, [{"State", S}]} <- Misc], #{proc_state := #{qos0_messages_dropped := NumDropped}} = State, - ct:pal("NumReceived=~b~nNumDropped=~b", [NumReceived, NumDropped]), + + ct:pal("NumReceived=~b NumDropped=~b", [NumReceived, NumDropped]), %% We expect that %% 1. all sent messages were either received or dropped @@ -302,6 +318,19 @@ rabbit_mqtt_qos0_queue_overflow(Config) -> %% of mailbox_soft_limit=200 should not be dropped ?assert(NumReceived >= 200), + %% Assert that Prometheus metrics counted correctly. + ExpectedNumDeadLettered = NumDeadLettered + NumDropped, + ?assertMatch( + #{ + [{protocol, ProtoVer}, {queue_type, QType}] := + #{messages_delivered_total := NumReceived, + messages_delivered_consume_auto_ack_total := NumReceived}, + + [{queue_type, QType}, {dead_letter_strategy, disabled}] := + #{messages_dead_lettered_maxlen_total := ExpectedNumDeadLettered} + }, + rabbit_ct_broker_helpers:rpc(Config, rabbit_global_counters, overview, [])), + ok = emqtt:disconnect(Sub), ok = emqtt:disconnect(Pub). diff --git a/deps/rabbitmq_mqtt/test/v5_SUITE.erl b/deps/rabbitmq_mqtt/test/v5_SUITE.erl index a9da7a1cdc..da90f3b049 100644 --- a/deps/rabbitmq_mqtt/test/v5_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/v5_SUITE.erl @@ -866,7 +866,7 @@ at_most_once_dead_letter_detect_cycle(Config) -> ok = emqtt:disconnect(Pub), %% Given our subscribing client is disconnected, the message should be dead lettered after 1 ms. %% However, due to the dead letter cycle, we expect the message to be dropped. - timer:sleep(5), + timer:sleep(20), Sub2 = connect(SubClientId, Config, [{clean_start, false}]), assert_nothing_received(), %% Double check that the message was indeed (exactly once) dead lettered. diff --git a/deps/rabbitmq_prometheus/metrics.md b/deps/rabbitmq_prometheus/metrics.md index 90a058a556..b67012b2ae 100644 --- a/deps/rabbitmq_prometheus/metrics.md +++ b/deps/rabbitmq_prometheus/metrics.md @@ -80,8 +80,9 @@ To generate these: Metrics `rabbitmq_global_messages_dead_lettered_*` have labels `queue_type` and `dead_letter_strategy`. Label `queue_type` denotes the type of queue messages were discarded from. It can have value -* `rabbit_classic_queue`, or -* `rabbit_quorum_queue` +* `rabbit_classic_queue`, +* `rabbit_quorum_queue`, or +* `rabbit_mqtt_qos0_queue` (Queue type `rabbit_stream_queue` does not dead letter messages.)