Merge pull request #10476 from rabbitmq/SimonUnge-check_consumer_count

Rework consumer sample deletion when a channel is closed
This commit is contained in:
Michael Klishin 2024-02-02 20:21:18 -05:00 committed by GitHub
commit 7a7da0f086
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 34 additions and 17 deletions

View File

@ -825,7 +825,8 @@ terminate(_Reason,
end, CM),
rabbit_core_metrics:channel_closed(self()),
rabbit_event:notify(channel_closed, [{pid, self()},
{user_who_performed_action, Username}]),
{user_who_performed_action, Username},
{consumer_count, maps:size(CM)}]),
case rabbit_confirms:size(State#ch.unconfirmed) of
0 -> ok;
NumConfirms ->
@ -2860,4 +2861,3 @@ maybe_decrease_global_publishers(#ch{publishing_mode = true}) ->
is_global_qos_permitted() ->
rabbit_deprecated_features:is_permitted(global_qos).

View File

@ -19,6 +19,8 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-define(LARGE_CONSUMER_COUNT, 1000).
name(EventType) ->
list_to_atom((atom_to_list(EventType) ++ "_metrics_gc")).
@ -42,7 +44,8 @@ handle_cast({event, #event{type = connection_closed, props = Props}},
handle_cast({event, #event{type = channel_closed, props = Props}},
State = #state{basic_i = BIntervals}) ->
Pid = pget(pid, Props),
remove_channel(Pid, BIntervals),
ConsumerCount = pget(consumer_count, Props),
remove_channel(Pid, ConsumerCount, BIntervals),
{noreply, State};
handle_cast({event, #event{type = consumer_deleted, props = Props}}, State) ->
remove_consumer(Props),
@ -82,13 +85,13 @@ remove_connection(Id, BIntervals) ->
delete_samples(connection_stats_coarse_conn_stats, Id, BIntervals),
ok.
remove_channel(Id, BIntervals) ->
remove_channel(Id, ConsumerCount, BIntervals) ->
ets:delete(channel_created_stats, Id),
ets:delete(channel_stats, Id),
delete_samples(channel_process_stats, Id, BIntervals),
delete_samples(channel_stats_fine_stats, Id, BIntervals),
delete_samples(channel_stats_deliver_stats, Id, BIntervals),
index_delete(consumer_stats, channel, Id),
index_delete(consumer_stats, {channel, ConsumerCount}, Id),
index_delete(channel_exchange_stats_fine_stats, channel, Id),
index_delete(channel_queue_stats_deliver_stats, channel, Id),
ok.
@ -137,18 +140,32 @@ delete_samples(Table, Id, Intervals) ->
[ets:delete(Table, {Id, I}) || I <- Intervals],
ok.
index_delete(consumer_stats = Table, channel = Type, Id) ->
IndexTable = rabbit_mgmt_metrics_collector:index_table(Table, Type),
MatchPattern = {'_', Id, '_'},
%% Delete consumer_stats_queue_index
ets:match_delete(consumer_stats_queue_index,
{'_', MatchPattern}),
%% Delete consumer_stats
ets:match_delete(consumer_stats,
{MatchPattern,'_'}),
%% Delete consumer_stats_channel_index
ets:delete(IndexTable, Id),
ok;
index_delete(consumer_stats = Table, {channel = Type, ConsumerCount}, Id) ->
%% This uses two different deletion strategies depending on how many
%% consumers a channel had. Most of the time there are many channels
%% with a few (or even just one) consumers. For this common case, `ets:delete/2` is optimal
%% since it avoids table scans.
%%
%% In the rather extreme scenario where only a handful of channels have a very large
%% (e.g. tens of thousands) of consumers, `ets:match_delete/2` becomes a more efficient option.
%%
%% See rabbitmq-server/rabbitmq#10451, rabbitmq-server/rabbitmq#9356.
case ConsumerCount > ?LARGE_CONSUMER_COUNT of
true ->
IndexTable = rabbit_mgmt_metrics_collector:index_table(Table, Type),
MatchPattern = {'_', Id, '_'},
%% Delete consumer_stats_queue_index
ets:match_delete(consumer_stats_queue_index,
{'_', MatchPattern}),
%% Delete consumer_stats
ets:match_delete(consumer_stats,
{MatchPattern,'_'}),
%% Delete consumer_stats_channel_index
ets:delete(IndexTable, Id),
ok;
false ->
index_delete(Table, Type, Id)
end;
index_delete(Table, Type, Id) ->
IndexTable = rabbit_mgmt_metrics_collector:index_table(Table, Type),
Keys = ets:lookup(IndexTable, Id),