From 1df75264be00f3582e1012b6b95544060d30621c Mon Sep 17 00:00:00 2001 From: Alvaro Videla Date: Mon, 28 Sep 2015 04:32:20 +0200 Subject: [PATCH] improves fetchwhile performance Fixes #316 --- src/rabbit_variable_queue.erl | 67 +++++++++++++++++++++++++++++------ 1 file changed, 56 insertions(+), 11 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f50c1bde7e..c15defc790 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -621,17 +621,9 @@ dropwhile(Pred, State) -> {MsgProps, a(State1)}. fetchwhile(Pred, Fun, Acc, State) -> - case queue_out(State) of - {empty, State1} -> - {undefined, Acc, a(State1)}; - {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> - case Pred(MsgProps) of - true -> {Msg, State2} = read_msg(MsgStatus, State1), - {AckTag, State3} = remove(true, MsgStatus, State2), - fetchwhile(Pred, Fun, Fun(Msg, AckTag, Acc), State3); - false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))} - end - end. + {MsgProps, Acc1, State1} = + fetch_by_predicate(Pred, Fun, Acc, State), + {MsgProps, Acc1, a(State1)}. fetch(AckRequired, State) -> case queue_out(State) of @@ -1349,6 +1341,59 @@ remove_by_predicate(Pred, State = #vqstate {out_counter = OutCount}) -> State2 #vqstate { out_counter = OutCount + ?QUEUE:len(QAcc)})}. +%% This function exists as a way to improve fetchwhile/4 +%% performance. The idea of having this function is to optimise calls +%% to rabbit_queue_index by batching delivers, instead of sending them +%% one by one. +%% +%% Fun is the function passed to fetchwhile/4 that's +%% applied to every fetched message and used to build the fetchwhile/4 +%% result accumulator FetchAcc. +fetch_by_predicate(Pred, Fun, FetchAcc, + State = #vqstate { + index_state = IndexState, + out_counter = OutCount}) -> + {MsgProps, QAcc, State1} = + collect_by_predicate(Pred, ?QUEUE:new(), State), + + {Delivers, FetchAcc1, State2} = + process_queue_entries(QAcc, Fun, FetchAcc, State1), + + IndexState1 = rabbit_queue_index:deliver(Delivers, IndexState), + + {MsgProps, FetchAcc1, maybe_update_rates( + State2 #vqstate { + index_state = IndexState1, + out_counter = OutCount + ?QUEUE:len(QAcc)})}. + +%% We try to do here the same as what remove(true, State) does but +%% processing several messages at the same time. The idea is to +%% optimize rabbit_queue_index:deliver/2 calls by sending a list of +%% SeqIds instead of one by one, thus process_queue_entries1 will +%% accumulate the required deliveries, will record_pending_ack for +%% each message, and will update stats, like remove/2 does. +%% +%% For the meaning of Fun and FetchAcc arguments see +%% fetch_by_predicate/4 above. +process_queue_entries(Q, Fun, FetchAcc, State) -> + ?QUEUE:foldl(fun (MsgStatus, Acc) -> + process_queue_entries1(MsgStatus, Fun, Acc) + end, + {[], FetchAcc, State}, Q). + +process_queue_entries1( + #msg_status { seq_id = SeqId, is_delivered = IsDelivered, + index_on_disk = IndexOnDisk} = MsgStatus, + Fun, + {Delivers, FetchAcc, State}) -> + {Msg, State1} = read_msg(MsgStatus, State), + State2 = record_pending_ack( + MsgStatus #msg_status { + is_delivered = true }, State1), + {cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), + Fun(Msg, SeqId, FetchAcc), + stats({-1, 1}, {MsgStatus, MsgStatus}, State2)}. + collect_by_predicate(Pred, QAcc, State) -> case queue_out(State) of {empty, State1} ->