Add shortcut
Base cases are either service queue is empty or no messages to check out
This commit is contained in:
parent
3415b2c3fd
commit
0425e7ad40
|
@ -2321,71 +2321,78 @@ reply_log_effect(RaftIdx, MsgId, Header, Ready, From) ->
|
|||
|
||||
checkout_one(Meta = #{system_time := Ts},
|
||||
ExpiredMsg0,
|
||||
State0 = #?STATE{cfg = #cfg{filter_enabled = true},
|
||||
service_queue = SQ0,
|
||||
messages = Messages0,
|
||||
filter_msgs_expiry = Expiry0,
|
||||
msg_bytes_checkout = BytesCheckout,
|
||||
msg_bytes_enqueue = BytesEnqueue,
|
||||
consumers = Cons},
|
||||
S0 = #?STATE{cfg = #cfg{filter_enabled = true},
|
||||
service_queue = SQ0,
|
||||
messages = Messages0,
|
||||
filter_msgs_expiry = Expiry0,
|
||||
msg_bytes_checkout = BytesCheckout,
|
||||
msg_bytes_enqueue = BytesEnqueue,
|
||||
consumers = Cons},
|
||||
Effects0) ->
|
||||
case priority_queue:out(SQ0) of
|
||||
{empty, _} ->
|
||||
Activity = case rabbit_fifo_filter_q:size(Messages0) of
|
||||
0 -> nochange;
|
||||
_ -> inactive
|
||||
end,
|
||||
{Activity, ExpiredMsg0, State0, Effects0};
|
||||
{{value, ConsumerKey}, SQ} ->
|
||||
State1 = State0#?STATE{service_queue = SQ},
|
||||
case Cons of
|
||||
#{ConsumerKey := #consumer{credit = Credit,
|
||||
status = Status}}
|
||||
when Credit =:= 0 orelse
|
||||
Status =/= up ->
|
||||
%% not an active consumer but still in the consumers
|
||||
%% map - this can happen when draining
|
||||
%% or when higher priority single active consumers
|
||||
%% take over, recurse without consumer in service
|
||||
%% queue
|
||||
checkout_one(Meta, ExpiredMsg0, State1, Effects0);
|
||||
#{ConsumerKey := #consumer{checked_out = Checked0,
|
||||
next_msg_id = Next,
|
||||
credit = Credit,
|
||||
delivery_count = DelCnt0,
|
||||
cfg = Cfg} = Con0} ->
|
||||
case take_next_consumer_msg(Ts, Con0, State1) of
|
||||
{empty, Con} ->
|
||||
State = update_or_remove_con(Meta, ConsumerKey, Con, State1),
|
||||
checkout_one(Meta, ExpiredMsg0, State, Effects0);
|
||||
{Msg = ?MSG(Idx, Hdr), Con1, State2} ->
|
||||
Expiry = case get_header(expiry, Hdr) of
|
||||
undefined ->
|
||||
Expiry0;
|
||||
ExpiryTs ->
|
||||
gb_trees:delete({ExpiryTs, Idx}, Expiry0)
|
||||
end,
|
||||
Checked = maps:put(Next, Msg, Checked0),
|
||||
DelCnt = case credit_api_v2(Cfg) of
|
||||
true -> add(DelCnt0, 1);
|
||||
false -> DelCnt0 + 1
|
||||
end,
|
||||
Con = Con1#consumer{checked_out = Checked,
|
||||
next_msg_id = Next + 1,
|
||||
credit = Credit - 1,
|
||||
delivery_count = DelCnt},
|
||||
Size = get_header(size, Hdr),
|
||||
State3 = State2#?STATE{
|
||||
case messages_ready(S0) of
|
||||
0 ->
|
||||
{nochange, ExpiredMsg0, S0, Effects0};
|
||||
_ ->
|
||||
case priority_queue:out(SQ0) of
|
||||
{empty, _} ->
|
||||
Activity = case rabbit_fifo_filter_q:size(Messages0) of
|
||||
0 -> nochange;
|
||||
_ -> inactive
|
||||
end,
|
||||
{Activity, ExpiredMsg0, S0, Effects0};
|
||||
{{value, ConsumerKey}, SQ} ->
|
||||
S1 = S0#?STATE{service_queue = SQ},
|
||||
case Cons of
|
||||
#{ConsumerKey := #consumer{credit = Credit,
|
||||
status = Status}}
|
||||
when Credit =:= 0 orelse
|
||||
Status =/= up ->
|
||||
%% not an active consumer but still in the consumers
|
||||
%% map - this can happen when draining
|
||||
%% or when higher priority single active consumers
|
||||
%% take over, recurse without consumer in service
|
||||
%% queue
|
||||
checkout_one(Meta, ExpiredMsg0, S1, Effects0);
|
||||
#{ConsumerKey := #consumer{checked_out = Checked0,
|
||||
next_msg_id = Next,
|
||||
credit = Credit,
|
||||
delivery_count = DelCnt0,
|
||||
cfg = Cfg} = Con0} ->
|
||||
case take_next_consumer_msg(Ts, Con0, S1) of
|
||||
{empty, Con} ->
|
||||
S = update_or_remove_con(Meta, ConsumerKey, Con, S1),
|
||||
checkout_one(Meta, ExpiredMsg0, S, Effects0);
|
||||
{Msg = ?MSG(Idx, Hdr), Con1, S2} ->
|
||||
Expiry = case get_header(expiry, Hdr) of
|
||||
undefined ->
|
||||
Expiry0;
|
||||
ExpiryTs ->
|
||||
gb_trees:delete({ExpiryTs, Idx},
|
||||
Expiry0)
|
||||
end,
|
||||
Checked = maps:put(Next, Msg, Checked0),
|
||||
DelCnt = case credit_api_v2(Cfg) of
|
||||
true -> add(DelCnt0, 1);
|
||||
false -> DelCnt0 + 1
|
||||
end,
|
||||
Con = Con1#consumer{checked_out = Checked,
|
||||
next_msg_id = Next + 1,
|
||||
credit = Credit - 1,
|
||||
delivery_count = DelCnt},
|
||||
Size = get_header(size, Hdr),
|
||||
S3 = S2#?STATE{
|
||||
service_queue = SQ,
|
||||
filter_msgs_expiry = Expiry,
|
||||
msg_bytes_checkout = BytesCheckout + Size,
|
||||
msg_bytes_enqueue = BytesEnqueue - Size},
|
||||
State = update_or_remove_con(Meta, ConsumerKey, Con, State3),
|
||||
{success, ConsumerKey, Next, Msg, ExpiredMsg0, State, Effects0}
|
||||
end;
|
||||
_ ->
|
||||
%% consumer was not active but was queued, recurse
|
||||
checkout_one(Meta, ExpiredMsg0, State1, Effects0)
|
||||
S = update_or_remove_con(Meta, ConsumerKey, Con, S3),
|
||||
{success, ConsumerKey, Next, Msg,
|
||||
ExpiredMsg0, S, Effects0}
|
||||
end;
|
||||
_ ->
|
||||
%% consumer was not active but was queued, recurse
|
||||
checkout_one(Meta, ExpiredMsg0, S1, Effects0)
|
||||
end
|
||||
end
|
||||
end;
|
||||
checkout_one(#{system_time := Ts} = Meta, ExpiredMsg0, InitState0, Effects0) ->
|
||||
|
|
Loading…
Reference in New Issue