diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_collector.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_collector.erl index 000ae0a8fb..48c018a02c 100644 --- a/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_collector.erl +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_collector.erl @@ -144,24 +144,37 @@ take_smaller(Policies) -> insert_old_aggr_stats(NextStats, Id, Stat) -> dict:store(Id, Stat, NextStats). -aggregate_metrics(Timestamp, State) -> - Table = State#state.table, - Res = ets:foldl( - fun(R, Dict) -> - aggregate_entry(Timestamp, R, Dict, State) - end, dict:new(), Table), - State#state{old_aggr_stats = Res}. +handle_deleted_queues(queue_coarse_metrics, Remainders, GPolicies) -> + TS = exometer_slide:timestamp(), + lists:foreach(fun ({Queue, {R, U, M}}) -> + NegStats = ?vhost_msg_stats(-R, -U, -M), + [insert_entry(vhost_msg_stats, vhost(Queue), TS, + NegStats, Size, Interval, true) + || {Size, Interval} <- GPolicies] + end, + dict:to_list(Remainders)); +handle_deleted_queues(_T, _R, _P) -> ok. -aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = connection_created}) -> +aggregate_metrics(Timestamp, #state{table = Table, + policies = {_, _, GPolicies}} = State0) -> + Table = State0#state.table, + {Next, #state{old_aggr_stats = Remainders}} = ets:foldl( + fun(R, {Dict, State}) -> + aggregate_entry(Timestamp, R, Dict, State) + end, {dict:new(), State0}, Table), + handle_deleted_queues(Table, Remainders, GPolicies), + State0#state{old_aggr_stats = Next}. + +aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = connection_created} = State) -> Ftd = rabbit_mgmt_format:format( Metrics, {fun rabbit_mgmt_format:format_connection_created/1, true}), ets:insert(connection_created_stats, ?connection_created_stats(Id, pget(name, Ftd, unknown), Ftd)), - NextStats; -aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = connection_metrics}) -> + {NextStats, State}; +aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = connection_metrics} = State) -> ets:insert(connection_stats, ?connection_stats(Id, Metrics)), - NextStats; + {NextStats, State}; aggregate_entry(TS, {Id, RecvOct, SendOct, Reductions}, NextStats, #state{table = connection_coarse_metrics, policies = {BPolicies, _, GPolicies}} = State) -> @@ -174,17 +187,17 @@ aggregate_entry(TS, {Id, RecvOct, SendOct, Reductions}, NextStats, ?connection_stats_coarse_conn_stats(RecvOct, SendOct, Reductions), Size, Interval, false) end || {Size, Interval} <- BPolicies], - insert_old_aggr_stats(NextStats, Id, Stats); -aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = channel_created}) -> + {insert_old_aggr_stats(NextStats, Id, Stats), State}; +aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = channel_created} = State) -> Ftd = rabbit_mgmt_format:format(Metrics, {[], false}), ets:insert(channel_created_stats, ?channel_created_stats(Id, pget(name, Ftd, unknown), Ftd)), - NextStats; -aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = channel_metrics}) -> + {NextStats, State}; +aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = channel_metrics} = State) -> Ftd = rabbit_mgmt_format:format(Metrics, {fun rabbit_mgmt_format:format_channel_stats/1, true}), ets:insert(channel_stats, ?channel_stats(Id, Ftd)), - NextStats; + {NextStats, State}; aggregate_entry(TS, {{Ch, X} = Id, Publish0, Confirm, ReturnUnroutable}, NextStats, #state{table = channel_exchange_metrics, policies = {BPolicies, DPolicies, GPolicies}, @@ -215,7 +228,7 @@ aggregate_entry(TS, {{Ch, X} = Id, Publish0, Confirm, ReturnUnroutable}, NextSta _ -> ok end, - insert_old_aggr_stats(NextStats, Id, Stats); + {insert_old_aggr_stats(NextStats, Id, Stats), State}; aggregate_entry(TS, {{Ch, Q} = Id, Get, GetNoAck, Deliver, DeliverNoAck, Redeliver, Ack}, NextStats, #state{table = channel_queue_metrics, @@ -247,7 +260,7 @@ aggregate_entry(TS, {{Ch, Q} = Id, Get, GetNoAck, Deliver, DeliverNoAck, Redeliv _ -> ok end, - insert_old_aggr_stats(NextStats, Id, Stats); + {insert_old_aggr_stats(NextStats, Id, Stats), State}; aggregate_entry(TS, {{_Ch, {Q, X}} = Id, Publish}, NextStats, #state{table = channel_queue_exchange_metrics, policies = {BPolicies, _, _}, @@ -277,22 +290,23 @@ aggregate_entry(TS, {{_Ch, {Q, X}} = Id, Publish}, NextStats, _ -> ok end, - insert_old_aggr_stats(NextStats, Id, Stats); -aggregate_entry(TS, {Id, Reductions}, NextStats, #state{table = channel_process_metrics, - policies = {BPolicies, _, _}}) -> + {insert_old_aggr_stats(NextStats, Id, Stats), State}; +aggregate_entry(TS, {Id, Reductions}, NextStats, + #state{table = channel_process_metrics, + policies = {BPolicies, _, _}} = State) -> [begin insert_entry(channel_process_stats, Id, TS, ?channel_process_stats(Reductions), Size, Interval, false) end || {Size, Interval} <- BPolicies], - NextStats; + {NextStats, State}; aggregate_entry(_TS, {Id, Exclusive, AckRequired, PrefetchCount, Args}, NextStats, - #state{table = consumer_created}) -> + #state{table = consumer_created} = State) -> Fmt = rabbit_mgmt_format:format([{exclusive, Exclusive}, {ack_required, AckRequired}, {prefetch_count, PrefetchCount}, {arguments, Args}], {[], false}), insert_with_index(consumer_stats, Id, ?consumer_stats(Id, Fmt)), - NextStats; + {NextStats, State}; aggregate_entry(TS, {Id, Metrics}, NextStats, #state{table = queue_metrics, policies = {BPolicies, _, GPolicies}, lookup_queue = QueueFun} = State) -> @@ -311,9 +325,10 @@ aggregate_entry(TS, {Id, Metrics}, NextStats, #state{table = queue_metrics, false -> ok end, - insert_old_aggr_stats(NextStats, Id, Stats); + {insert_old_aggr_stats(NextStats, Id, Stats), State}; aggregate_entry(TS, {Name, Ready, Unack, Msgs, Red}, NextStats, #state{table = queue_coarse_metrics, + old_aggr_stats = Old, policies = {BPolicies, _, GPolicies}, lookup_queue = QueueFun} = State) -> Stats = ?vhost_msg_stats(Ready, Unack, Msgs), @@ -331,12 +346,14 @@ aggregate_entry(TS, {Name, Ready, Unack, Msgs, Red}, NextStats, _ -> ok end, - insert_old_aggr_stats(NextStats, Name, Stats); -aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = node_metrics}) -> + State1 = State#state{old_aggr_stats = dict:erase(Name, Old)}, + {insert_old_aggr_stats(NextStats, Name, Stats), State1}; +aggregate_entry(_TS, {Id, Metrics}, NextStats, #state{table = node_metrics} = State) -> ets:insert(node_stats, {Id, Metrics}), - NextStats; -aggregate_entry(TS, {Id, Metrics}, NextStats, #state{table = node_coarse_metrics, - policies = {_, _, GPolicies}}) -> + {NextStats, State}; +aggregate_entry(TS, {Id, Metrics}, NextStats, + #state{table = node_coarse_metrics, + policies = {_, _, GPolicies}} = State) -> Stats = ?node_coarse_stats( pget(fd_used, Metrics, 0), pget(sockets_used, Metrics, 0), pget(mem_used, Metrics, 0), pget(disk_free, Metrics, 0), @@ -344,9 +361,10 @@ aggregate_entry(TS, {Id, Metrics}, NextStats, #state{table = node_coarse_metrics pget(gc_bytes_reclaimed, Metrics, 0), pget(context_switches, Metrics, 0)), [insert_entry(node_coarse_stats, Id, TS, Stats, Size, Interval, false) || {Size, Interval} <- GPolicies], - NextStats; -aggregate_entry(TS, {Id, Metrics}, NextStats, #state{table = node_persister_metrics, - policies = {_, _, GPolicies}}) -> + {NextStats, State}; +aggregate_entry(TS, {Id, Metrics}, NextStats, + #state{table = node_persister_metrics, + policies = {_, _, GPolicies}} = State) -> Stats = ?node_persister_stats( pget(io_read_count, Metrics, 0), pget(io_read_bytes, Metrics, 0), pget(io_read_time, Metrics, 0), pget(io_write_count, Metrics, 0), @@ -362,15 +380,16 @@ aggregate_entry(TS, {Id, Metrics}, NextStats, #state{table = node_persister_metr pget(io_file_handle_open_attempt_time, Metrics, 0)), [insert_entry(node_persister_stats, Id, TS, Stats, Size, Interval, false) || {Size, Interval} <- GPolicies], - NextStats; -aggregate_entry(TS, {Id, Metrics}, NextStats, #state{table = node_node_metrics, - policies = {_, _, GPolicies}}) -> + {NextStats, State}; +aggregate_entry(TS, {Id, Metrics}, NextStats, + #state{table = node_node_metrics, + policies = {_, _, GPolicies}} = State) -> Stats = ?node_node_coarse_stats(pget(send_bytes, Metrics, 0), pget(recv_bytes, Metrics, 0)), CleanMetrics = lists:keydelete(recv_bytes, 1, lists:keydelete(send_bytes, 1, Metrics)), ets:insert(node_node_stats, ?node_node_stats(Id, CleanMetrics)), [insert_entry(node_node_coarse_stats, Id, TS, Stats, Size, Interval, false) || {Size, Interval} <- GPolicies], - NextStats. + {NextStats, State}. insert_entry(Table, Id, TS, Entry, Size, Interval, Incremental) -> Key = {Id, Interval}, diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_gc.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_gc.erl index d8bd5860b2..877f067ad8 100644 --- a/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_gc.erl +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_gc.erl @@ -123,8 +123,6 @@ remove_queue(Name, BIntervals) -> delete_samples(queue_process_stats, Name, BIntervals), delete_samples(queue_msg_stats, Name, BIntervals), delete_samples(queue_msg_rates, Name, BIntervals), - %% vhost message counts must be updated with the deletion of the messages in this queue - rabbit_mgmt_metrics_collector:delete_queue(queue_coarse_metrics, Name), index_delete(channel_queue_stats_deliver_stats, queue, Name), index_delete(queue_exchange_stats_publish, queue, Name), index_delete(consumer_stats, queue, Name),