Merge pull request #13898 from rabbitmq/md/rabbit_mgmt_gc-memory

Improve memory use of `rabbit_mgmt_gc`
This commit is contained in:
Michael Klishin 2025-05-17 03:58:44 +04:00 committed by GitHub
commit a0c6a0b41d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 67 additions and 67 deletions

View File

@ -36,7 +36,7 @@ handle_info(start_gc, State) ->
gc_queues(),
gc_exchanges(),
gc_nodes(),
{noreply, start_timer(State)}.
{noreply, start_timer(State), hibernate}.
terminate(_Reason, #state{timer = TRef}) ->
_ = erlang:cancel_timer(TRef),
@ -56,12 +56,12 @@ gc_connections() ->
gc_vhosts() ->
VHosts = rabbit_vhost:list(),
GbSet = gb_sets:from_list(VHosts),
gc_entity(vhost_stats_coarse_conn_stats, GbSet),
gc_entity(vhost_stats_fine_stats, GbSet),
gc_entity(vhost_msg_stats, GbSet),
gc_entity(vhost_msg_rates, GbSet),
gc_entity(vhost_stats_deliver_stats, GbSet).
Set = sets:from_list(VHosts, [{version, 2}]),
gc_entity(vhost_stats_coarse_conn_stats, Set),
gc_entity(vhost_stats_fine_stats, Set),
gc_entity(vhost_msg_stats, Set),
gc_entity(vhost_msg_rates, Set),
gc_entity(vhost_stats_deliver_stats, Set).
gc_channels() ->
gc_process(channel_created_stats),
@ -73,45 +73,45 @@ gc_channels() ->
gc_queues() ->
Queues = rabbit_amqqueue:list_names(),
GbSet = gb_sets:from_list(Queues),
Set = sets:from_list(Queues, [{version, 2}]),
LocalQueues = rabbit_amqqueue:list_local_names(),
LocalGbSet = gb_sets:from_list(LocalQueues),
gc_entity(queue_stats_publish, GbSet),
gc_entity(queue_stats, LocalGbSet),
gc_entity(queue_basic_stats, LocalGbSet),
gc_entity(queue_msg_stats, LocalGbSet),
gc_entity(queue_process_stats, LocalGbSet),
gc_entity(queue_msg_rates, LocalGbSet),
gc_entity(queue_stats_deliver_stats, GbSet),
gc_process_and_entity(channel_queue_stats_deliver_stats_queue_index, GbSet),
gc_process_and_entity(consumer_stats_queue_index, GbSet),
gc_process_and_entity(consumer_stats_channel_index, GbSet),
gc_process_and_entity(consumer_stats, GbSet),
gc_process_and_entity(channel_exchange_stats_fine_stats_channel_index, GbSet),
gc_process_and_entity(channel_queue_stats_deliver_stats, GbSet),
gc_process_and_entity(channel_queue_stats_deliver_stats_channel_index, GbSet),
ExchangeGbSet = gb_sets:from_list(rabbit_exchange:list_names()),
gc_entities(queue_exchange_stats_publish, GbSet, ExchangeGbSet),
gc_entities(queue_exchange_stats_publish_queue_index, GbSet, ExchangeGbSet),
gc_entities(queue_exchange_stats_publish_exchange_index, GbSet, ExchangeGbSet).
LocalSet = sets:from_list(LocalQueues, [{version, 2}]),
gc_entity(queue_stats_publish, Set),
gc_entity(queue_stats, LocalSet),
gc_entity(queue_basic_stats, LocalSet),
gc_entity(queue_msg_stats, LocalSet),
gc_entity(queue_process_stats, LocalSet),
gc_entity(queue_msg_rates, LocalSet),
gc_entity(queue_stats_deliver_stats, Set),
gc_process_and_entity(channel_queue_stats_deliver_stats_queue_index, Set),
gc_process_and_entity(consumer_stats_queue_index, Set),
gc_process_and_entity(consumer_stats_channel_index, Set),
gc_process_and_entity(consumer_stats, Set),
gc_process_and_entity(channel_exchange_stats_fine_stats_channel_index, Set),
gc_process_and_entity(channel_queue_stats_deliver_stats, Set),
gc_process_and_entity(channel_queue_stats_deliver_stats_channel_index, Set),
ExchangeSet = sets:from_list(rabbit_exchange:list_names(), [{version, 2}]),
gc_entities(queue_exchange_stats_publish, Set, ExchangeSet),
gc_entities(queue_exchange_stats_publish_queue_index, Set, ExchangeSet),
gc_entities(queue_exchange_stats_publish_exchange_index, Set, ExchangeSet).
gc_exchanges() ->
Exchanges = rabbit_exchange:list_names(),
GbSet = gb_sets:from_list(Exchanges),
gc_entity(exchange_stats_publish_in, GbSet),
gc_entity(exchange_stats_publish_out, GbSet),
gc_entity(channel_exchange_stats_fine_stats_exchange_index, GbSet),
gc_process_and_entity(channel_exchange_stats_fine_stats, GbSet).
Set = sets:from_list(Exchanges, [{version, 2}]),
gc_entity(exchange_stats_publish_in, Set),
gc_entity(exchange_stats_publish_out, Set),
gc_entity(channel_exchange_stats_fine_stats_exchange_index, Set),
gc_process_and_entity(channel_exchange_stats_fine_stats, Set).
gc_nodes() ->
Nodes = rabbit_nodes:list_members(),
GbSet = gb_sets:from_list(Nodes),
gc_entity(node_stats, GbSet),
gc_entity(node_coarse_stats, GbSet),
gc_entity(node_persister_stats, GbSet),
gc_entity(node_node_coarse_stats_node_index, GbSet),
gc_entity(node_node_stats, GbSet),
gc_entity(node_node_coarse_stats, GbSet).
Set = sets:from_list(Nodes, [{version, 2}]),
gc_entity(node_stats, Set),
gc_entity(node_coarse_stats, Set),
gc_entity(node_persister_stats, Set),
gc_entity(node_node_coarse_stats_node_index, Set),
gc_entity(node_node_stats, Set),
gc_entity(node_node_coarse_stats, Set).
gc_process(Table) ->
ets:foldl(fun({{Pid, _} = Key, _}, none) ->
@ -133,21 +133,21 @@ gc_process(Pid, Table, Key) ->
none
end.
gc_entity(Table, GbSet) ->
gc_entity(Table, Set) ->
ets:foldl(fun({{_, Id} = Key, _}, none) when Table == node_node_stats ->
gc_entity(Id, Table, Key, GbSet);
gc_entity(Id, Table, Key, Set);
({{{_, Id}, _} = Key, _}, none) when Table == node_node_coarse_stats ->
gc_entity(Id, Table, Key, GbSet);
gc_entity(Id, Table, Key, Set);
({{Id, _} = Key, _}, none) ->
gc_entity(Id, Table, Key, GbSet);
gc_entity(Id, Table, Key, Set);
({Id = Key, _}, none) ->
gc_entity(Id, Table, Key, GbSet);
gc_entity(Id, Table, Key, Set);
({{Id, _} = Key, _}, none) ->
gc_entity(Id, Table, Key, GbSet)
gc_entity(Id, Table, Key, Set)
end, none, Table).
gc_entity(Id, Table, Key, GbSet) ->
case gb_sets:is_member(Id, GbSet) of
gc_entity(Id, Table, Key, Set) ->
case sets:is_element(Id, Set) of
true ->
none;
false ->
@ -155,39 +155,39 @@ gc_entity(Id, Table, Key, GbSet) ->
none
end.
gc_process_and_entity(Table, GbSet) ->
gc_process_and_entity(Table, Set) ->
ets:foldl(fun({{Id, Pid, _} = Key, _}, none) when Table == consumer_stats ->
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
gc_process_and_entity(Id, Pid, Table, Key, Set);
({Id = Key, {_, Pid, _}} = Object, none)
when Table == consumer_stats_queue_index ->
gc_object(Pid, Table, Object),
gc_entity(Id, Table, Key, GbSet);
gc_entity(Id, Table, Key, Set);
({Pid = Key, {Id, _, _}} = Object, none)
when Table == consumer_stats_channel_index ->
gc_object(Id, Table, Object, GbSet),
gc_object(Id, Table, Object, Set),
gc_process(Pid, Table, Key);
({Id = Key, {{Pid, _}, _}} = Object, none)
when Table == channel_exchange_stats_fine_stats_exchange_index;
Table == channel_queue_stats_deliver_stats_queue_index ->
gc_object(Pid, Table, Object),
gc_entity(Id, Table, Key, GbSet);
gc_entity(Id, Table, Key, Set);
({Pid = Key, {{_, Id}, _}} = Object, none)
when Table == channel_exchange_stats_fine_stats_channel_index;
Table == channel_queue_stats_deliver_stats_channel_index ->
gc_object(Id, Table, Object, GbSet),
gc_object(Id, Table, Object, Set),
gc_process(Pid, Table, Key);
({{{Pid, Id}, _} = Key, _}, none)
when Table == channel_queue_stats_deliver_stats;
Table == channel_exchange_stats_fine_stats ->
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
gc_process_and_entity(Id, Pid, Table, Key, Set);
({{{Pid, Id}, _} = Key, _, _, _, _, _, _, _, _}, none) ->
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
gc_process_and_entity(Id, Pid, Table, Key, Set);
({{{Pid, Id}, _} = Key, _, _, _, _}, none) ->
gc_process_and_entity(Id, Pid, Table, Key, GbSet)
gc_process_and_entity(Id, Pid, Table, Key, Set)
end, none, Table).
gc_process_and_entity(Id, Pid, Table, Key, GbSet) ->
case rabbit_misc:is_process_alive(Pid) andalso gb_sets:is_member(Id, GbSet) of
gc_process_and_entity(Id, Pid, Table, Key, Set) ->
case rabbit_misc:is_process_alive(Pid) andalso sets:is_element(Id, Set) of
true ->
none;
false ->
@ -204,8 +204,8 @@ gc_object(Pid, Table, Object) ->
none
end.
gc_object(Id, Table, Object, GbSet) ->
case gb_sets:is_member(Id, GbSet) of
gc_object(Id, Table, Object, Set) ->
case sets:is_element(Id, Set) of
true ->
none;
false ->
@ -213,17 +213,17 @@ gc_object(Id, Table, Object, GbSet) ->
none
end.
gc_entities(Table, QueueGbSet, ExchangeGbSet) ->
gc_entities(Table, QueueSet, ExchangeSet) ->
ets:foldl(fun({{{Q, X}, _} = Key, _}, none)
when Table == queue_exchange_stats_publish ->
gc_entity(Q, Table, Key, QueueGbSet),
gc_entity(X, Table, Key, ExchangeGbSet);
gc_entity(Q, Table, Key, QueueSet),
gc_entity(X, Table, Key, ExchangeSet);
({Q, {{_, X}, _}} = Object, none)
when Table == queue_exchange_stats_publish_queue_index ->
gc_object(X, Table, Object, ExchangeGbSet),
gc_entity(Q, Table, Q, QueueGbSet);
gc_object(X, Table, Object, ExchangeSet),
gc_entity(Q, Table, Q, QueueSet);
({X, {{Q, _}, _}} = Object, none)
when Table == queue_exchange_stats_publish_exchange_index ->
gc_object(Q, Table, Object, QueueGbSet),
gc_entity(X, Table, X, ExchangeGbSet)
gc_object(Q, Table, Object, QueueSet),
gc_entity(X, Table, X, ExchangeSet)
end, none, Table).