WIP Speed up many consumers on QQ

The idea is to avoid the expensive priority_queue:member/2 call
by having each consumer remembering itself whether it's currently in the
service_queue.

Alternative:
Changing the priority_queue implementation to use a map for fast membership checks
turns out to be expensive because adding to and removing from a map is
slow.

Benchmarks:

Start broker with tweaked config as described in
https://github.com/ansd/message_selector_perf

Start client with 900 consumers on a single session:
```
_build/default/bin/message_selector_perf many_receivers false 900 400000
```
sending and receiving settled.

Prior to this commit:
```
400000 messages sent and received across 900 receivers in 11646 ms
end-to-end throughput: 34346.6 msgs/s
```
with ~8% CPU time being spent in `priority_queue:member/2`

After this commit:
```
400000 messages sent and received across 900 receivers in 10132 ms
end-to-end throughput: 39478.9 msgs/s
```

=> 15% higher end-to-end throughput
This commit is contained in:
David Ansari 2025-05-27 12:37:48 +02:00
parent 4c34155886
commit 5d0ff8e709
2 changed files with 57 additions and 52 deletions

View File

@ -1491,21 +1491,21 @@ activate_next_consumer(#?STATE{consumers = Cons0,
{undefined, {NextCKey, #consumer{cfg = NextCCfg} = NextC}} ->
Remaining = tl(Waiting0),
%% TODO: can this happen?
Consumer = case maps:get(NextCKey, Cons0, undefined) of
undefined ->
NextC;
Existing ->
%% there was an exisiting non-active consumer
%% just update the existing cancelled consumer
%% with the new config
Existing#consumer{cfg = NextCCfg}
end,
#?STATE{service_queue = ServiceQueue} = State0,
ServiceQueue1 = maybe_queue_consumer(NextCKey,
Consumer,
ServiceQueue),
Consumer0 = case maps:get(NextCKey, Cons0, undefined) of
undefined ->
NextC;
Existing ->
%% there was an exisiting non-active consumer
%% just update the existing cancelled consumer
%% with the new config
Existing#consumer{cfg = NextCCfg}
end,
#?STATE{service_queue = ServiceQueue0} = State0,
{Consumer, ServiceQueue} = maybe_queue_consumer(NextCKey,
Consumer0,
ServiceQueue0),
State = State0#?STATE{consumers = Cons0#{NextCKey => Consumer},
service_queue = ServiceQueue1,
service_queue = ServiceQueue,
waiting_consumers = Remaining},
Effects = consumer_update_active_effects(State, Consumer,
true, single_active,
@ -1513,21 +1513,21 @@ activate_next_consumer(#?STATE{consumers = Cons0,
{State, Effects};
{{ActiveCKey, ?CONSUMER_PRIORITY(ActivePriority) =
#consumer{checked_out = ActiveChecked} = Active},
{NextCKey, ?CONSUMER_PRIORITY(WaitingPriority) = Consumer}}
{NextCKey, ?CONSUMER_PRIORITY(WaitingPriority) = Consumer0}}
when WaitingPriority > ActivePriority andalso
map_size(ActiveChecked) == 0 ->
Remaining = tl(Waiting0),
%% the next consumer is a higher priority and should take over
%% and this consumer does not have any pending messages
#?STATE{service_queue = ServiceQueue} = State0,
ServiceQueue1 = maybe_queue_consumer(NextCKey,
Consumer,
ServiceQueue),
#?STATE{service_queue = ServiceQueue0} = State0,
{Consumer, ServiceQueue} = maybe_queue_consumer(NextCKey,
Consumer0,
ServiceQueue0),
Cons1 = Cons0#{NextCKey => Consumer},
Cons = maps:remove(ActiveCKey, Cons1),
Waiting = add_waiting({ActiveCKey, Active}, Remaining),
State = State0#?STATE{consumers = Cons,
service_queue = ServiceQueue1,
service_queue = ServiceQueue,
waiting_consumers = Waiting},
Effects = consumer_update_active_effects(State, Consumer,
true, single_active,
@ -2140,7 +2140,8 @@ checkout_one(#{system_time := Ts} = Meta, ExpiredMsg0, InitState0, Effects0) ->
{Msg, State0} ->
%% there are consumers waiting to be serviced
%% process consumer checkout
case maps:get(ConsumerKey, Cons0) of
#{ConsumerKey := Con0} = Cons0,
case Con0 of
#consumer{credit = Credit,
status = Status}
when Credit =:= 0 orelse
@ -2150,9 +2151,11 @@ checkout_one(#{system_time := Ts} = Meta, ExpiredMsg0, InitState0, Effects0) ->
%% or when higher priority single active consumers
%% take over, recurse without consumer in service
%% queue
checkout_one(Meta, ExpiredMsg,
InitState#?STATE{service_queue = SQ1},
Effects1);
Con1 = Con0#consumer{queued = false},
Cons = Cons0#{ConsumerKey := Con1},
State = InitState#?STATE{consumers = Cons,
service_queue = SQ1},
checkout_one(Meta, ExpiredMsg, State, Effects1);
#consumer{checked_out = Checked0,
next_msg_id = Next,
credit = Credit,
@ -2166,7 +2169,8 @@ checkout_one(#{system_time := Ts} = Meta, ExpiredMsg0, InitState0, Effects0) ->
Con = Con0#consumer{checked_out = Checked,
next_msg_id = Next + 1,
credit = Credit - 1,
delivery_count = DelCnt},
delivery_count = DelCnt,
queued = false},
Size = get_header(size, get_msg_header(Msg)),
State1 =
State0#?STATE{service_queue = SQ1,
@ -2180,10 +2184,11 @@ checkout_one(#{system_time := Ts} = Meta, ExpiredMsg0, InitState0, Effects0) ->
empty ->
{nochange, ExpiredMsg, InitState, Effects1}
end;
{{value, _ConsumerId}, SQ1} ->
{{value, _ConsumerKey}, SQ1} ->
%%TODO set queued=false and replace this consumer in the waiting_consumers (if it exists)
State = InitState#?STATE{service_queue = SQ1},
%% consumer was not active but was queued, recurse
checkout_one(Meta, ExpiredMsg,
InitState#?STATE{service_queue = SQ1}, Effects1);
checkout_one(Meta, ExpiredMsg, State, Effects1);
{empty, _} ->
case rabbit_fifo_q:len(Messages0) of
0 ->
@ -2271,28 +2276,24 @@ update_or_remove_con(_Meta, ConsumerKey,
State#?STATE{consumers = maps:remove(ConsumerKey, Cons),
waiting_consumers = add_waiting({ConsumerKey, Con}, Waiting)};
update_or_remove_con(_Meta, ConsumerKey,
#consumer{} = Con,
#consumer{} = Con0,
#?STATE{consumers = Cons,
service_queue = ServiceQueue} = State) ->
service_queue = ServiceQueue0} = State) ->
{Con, ServiceQueue} = maybe_queue_consumer(ConsumerKey, Con0, ServiceQueue0),
State#?STATE{consumers = maps:put(ConsumerKey, Con, Cons),
service_queue = maybe_queue_consumer(ConsumerKey, Con,
ServiceQueue)}.
service_queue = ServiceQueue}.
maybe_queue_consumer(Key, #consumer{credit = Credit,
status = up,
cfg = #consumer_cfg{priority = P}},
maybe_queue_consumer(Key,
#consumer{queued = false,
status = up,
credit = Credit,
cfg = #consumer_cfg{priority = P}} = Consumer,
ServiceQueue)
when Credit > 0 ->
% TODO: queue:member could surely be quite expensive, however the practical
% number of unique consumers may not be large enough for it to matter
case priority_queue:member(Key, ServiceQueue) of
true ->
ServiceQueue;
false ->
priority_queue:in(Key, P, ServiceQueue)
end;
maybe_queue_consumer(_Key, _Consumer, ServiceQueue) ->
ServiceQueue.
{Consumer#consumer{queued = true},
priority_queue:in(Key, P, ServiceQueue)};
maybe_queue_consumer(_Key, Consumer, ServiceQueue) ->
{Consumer, ServiceQueue}.
update_consumer(Meta, ConsumerKey, {Tag, Pid}, ConsumerMeta,
{Life, Mode} = Spec, Priority,
@ -2413,16 +2414,18 @@ credit_active_consumer(
DeliveryCountSnd, Cfg),
%% grant the credit
Con1 = Con0#consumer{credit = LinkCreditSnd},
ServiceQueue = maybe_queue_consumer(ConsumerKey, Con1, ServiceQueue0),
{Con2, ServiceQueue} = maybe_queue_consumer(ConsumerKey,
Con1,
ServiceQueue0),
State1 = State0#?STATE{service_queue = ServiceQueue,
consumers = maps:update(ConsumerKey, Con1, Cons0)},
consumers = maps:update(ConsumerKey, Con2, Cons0)},
{State2, ok, Effects} = checkout(Meta, State0, State1, []),
#?STATE{consumers = Cons1 = #{ConsumerKey := Con2}} = State2,
#?STATE{consumers = Cons1 = #{ConsumerKey := Con3}} = State2,
#consumer{cfg = #consumer_cfg{pid = CPid,
tag = CTag},
credit = PostCred,
delivery_count = PostDeliveryCount} = Con2,
delivery_count = PostDeliveryCount} = Con3,
Available = messages_ready(State2),
case credit_api_v2(Cfg) of
true ->
@ -2431,7 +2434,7 @@ credit_active_consumer(
true ->
AdvancedDeliveryCount = add(PostDeliveryCount, PostCred),
ZeroCredit = 0,
Con = Con2#consumer{delivery_count = AdvancedDeliveryCount,
Con = Con3#consumer{delivery_count = AdvancedDeliveryCount,
credit = ZeroCredit},
Cons = maps:update(ConsumerKey, Con, Cons1),
State3 = State2#?STATE{consumers = Cons},
@ -2461,7 +2464,7 @@ credit_active_consumer(
case Drain of
true ->
AdvancedDeliveryCount = PostDeliveryCount + PostCred,
Con = Con2#consumer{delivery_count = AdvancedDeliveryCount,
Con = Con3#consumer{delivery_count = AdvancedDeliveryCount,
credit = 0},
Cons = maps:update(ConsumerKey, Con, Cons1),
State = State2#?STATE{consumers = Cons},

View File

@ -138,7 +138,9 @@
%% decremented for each delivery
credit = 0 :: non_neg_integer(),
%% AMQP 1.0 §2.6.7
delivery_count :: rabbit_queue_type:delivery_count()
delivery_count :: rabbit_queue_type:delivery_count(),
%% Is this consumer in service_queue ?
queued = false :: boolean()
}).
-type consumer() :: #consumer{}.