Check consumer size for clean strats

This commit is contained in:
Simon Unge 2024-01-31 20:14:02 +00:00 committed by Michael Klishin
parent 6115f8f4f7
commit a2ff5a0b7b
2 changed files with 26 additions and 17 deletions

View File

@ -824,8 +824,10 @@ terminate(_Reason,
rabbit_global_counters:consumer_deleted(amqp091)
end, CM),
rabbit_core_metrics:channel_closed(self()),
CMSize = maps:size(CM),
rabbit_event:notify(channel_closed, [{pid, self()},
{user_who_performed_action, Username}]),
{user_who_performed_action, Username},
{consumer_count, CMSize}]),
case rabbit_confirms:size(State#ch.unconfirmed) of
0 -> ok;
NumConfirms ->
@ -2860,4 +2862,3 @@ maybe_decrease_global_publishers(#ch{publishing_mode = true}) ->
is_global_qos_permitted() ->
rabbit_deprecated_features:is_permitted(global_qos).

View File

@ -19,6 +19,8 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-define(LARGE_CONSUMER_COUNT, 1000).
name(EventType) ->
list_to_atom((atom_to_list(EventType) ++ "_metrics_gc")).
@ -42,7 +44,8 @@ handle_cast({event, #event{type = connection_closed, props = Props}},
handle_cast({event, #event{type = channel_closed, props = Props}},
State = #state{basic_i = BIntervals}) ->
Pid = pget(pid, Props),
remove_channel(Pid, BIntervals),
CMCount = pget(consumer_count, Props),
remove_channel(Pid, CMCount, BIntervals),
{noreply, State};
handle_cast({event, #event{type = consumer_deleted, props = Props}}, State) ->
remove_consumer(Props),
@ -82,13 +85,13 @@ remove_connection(Id, BIntervals) ->
delete_samples(connection_stats_coarse_conn_stats, Id, BIntervals),
ok.
remove_channel(Id, BIntervals) ->
remove_channel(Id, CMCount, BIntervals) ->
ets:delete(channel_created_stats, Id),
ets:delete(channel_stats, Id),
delete_samples(channel_process_stats, Id, BIntervals),
delete_samples(channel_stats_fine_stats, Id, BIntervals),
delete_samples(channel_stats_deliver_stats, Id, BIntervals),
index_delete(consumer_stats, channel, Id),
index_delete(consumer_stats, {channel, CMCount}, Id),
index_delete(channel_exchange_stats_fine_stats, channel, Id),
index_delete(channel_queue_stats_deliver_stats, channel, Id),
ok.
@ -137,18 +140,23 @@ delete_samples(Table, Id, Intervals) ->
[ets:delete(Table, {Id, I}) || I <- Intervals],
ok.
index_delete(consumer_stats = Table, channel = Type, Id) ->
IndexTable = rabbit_mgmt_metrics_collector:index_table(Table, Type),
MatchPattern = {'_', Id, '_'},
%% Delete consumer_stats_queue_index
ets:match_delete(consumer_stats_queue_index,
{'_', MatchPattern}),
%% Delete consumer_stats
ets:match_delete(consumer_stats,
{MatchPattern,'_'}),
%% Delete consumer_stats_channel_index
ets:delete(IndexTable, Id),
ok;
index_delete(consumer_stats = Table, {channel = Type, Count}, Id) ->
case Count > ?LARGE_CONSUMER_COUNT of
true ->
IndexTable = rabbit_mgmt_metrics_collector:index_table(Table, Type),
MatchPattern = {'_', Id, '_'},
%% Delete consumer_stats_queue_index
ets:match_delete(consumer_stats_queue_index,
{'_', MatchPattern}),
%% Delete consumer_stats
ets:match_delete(consumer_stats,
{MatchPattern,'_'}),
%% Delete consumer_stats_channel_index
ets:delete(IndexTable, Id),
ok;
false ->
index_delete(Table, Type, Id)
end;
index_delete(Table, Type, Id) ->
IndexTable = rabbit_mgmt_metrics_collector:index_table(Table, Type),
Keys = ets:lookup(IndexTable, Id),