improves fetchwhile performance

Fixes #316
This commit is contained in:
Alvaro Videla 2015-09-28 04:32:20 +02:00
parent 31a4b251e4
commit 1df75264be
1 changed files with 56 additions and 11 deletions

View File

@ -621,17 +621,9 @@ dropwhile(Pred, State) ->
{MsgProps, a(State1)}.
fetchwhile(Pred, Fun, Acc, State) ->
case queue_out(State) of
{empty, State1} ->
{undefined, Acc, a(State1)};
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
case Pred(MsgProps) of
true -> {Msg, State2} = read_msg(MsgStatus, State1),
{AckTag, State3} = remove(true, MsgStatus, State2),
fetchwhile(Pred, Fun, Fun(Msg, AckTag, Acc), State3);
false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))}
end
end.
{MsgProps, Acc1, State1} =
fetch_by_predicate(Pred, Fun, Acc, State),
{MsgProps, Acc1, a(State1)}.
fetch(AckRequired, State) ->
case queue_out(State) of
@ -1349,6 +1341,59 @@ remove_by_predicate(Pred, State = #vqstate {out_counter = OutCount}) ->
State2 #vqstate {
out_counter = OutCount + ?QUEUE:len(QAcc)})}.
%% This function exists as a way to improve fetchwhile/4
%% performance. The idea of having this function is to optimise calls
%% to rabbit_queue_index by batching delivers, instead of sending them
%% one by one.
%%
%% Fun is the function passed to fetchwhile/4 that's
%% applied to every fetched message and used to build the fetchwhile/4
%% result accumulator FetchAcc.
fetch_by_predicate(Pred, Fun, FetchAcc,
State = #vqstate {
index_state = IndexState,
out_counter = OutCount}) ->
{MsgProps, QAcc, State1} =
collect_by_predicate(Pred, ?QUEUE:new(), State),
{Delivers, FetchAcc1, State2} =
process_queue_entries(QAcc, Fun, FetchAcc, State1),
IndexState1 = rabbit_queue_index:deliver(Delivers, IndexState),
{MsgProps, FetchAcc1, maybe_update_rates(
State2 #vqstate {
index_state = IndexState1,
out_counter = OutCount + ?QUEUE:len(QAcc)})}.
%% We try to do here the same as what remove(true, State) does but
%% processing several messages at the same time. The idea is to
%% optimize rabbit_queue_index:deliver/2 calls by sending a list of
%% SeqIds instead of one by one, thus process_queue_entries1 will
%% accumulate the required deliveries, will record_pending_ack for
%% each message, and will update stats, like remove/2 does.
%%
%% For the meaning of Fun and FetchAcc arguments see
%% fetch_by_predicate/4 above.
process_queue_entries(Q, Fun, FetchAcc, State) ->
?QUEUE:foldl(fun (MsgStatus, Acc) ->
process_queue_entries1(MsgStatus, Fun, Acc)
end,
{[], FetchAcc, State}, Q).
process_queue_entries1(
#msg_status { seq_id = SeqId, is_delivered = IsDelivered,
index_on_disk = IndexOnDisk} = MsgStatus,
Fun,
{Delivers, FetchAcc, State}) ->
{Msg, State1} = read_msg(MsgStatus, State),
State2 = record_pending_ack(
MsgStatus #msg_status {
is_delivered = true }, State1),
{cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
Fun(Msg, SeqId, FetchAcc),
stats({-1, 1}, {MsgStatus, MsgStatus}, State2)}.
collect_by_predicate(Pred, QAcc, State) ->
case queue_out(State) of
{empty, State1} ->