inlining and reshuffling
This commit is contained in:
parent
bc7a823cbd
commit
e7f617d363
|
|
@ -571,9 +571,26 @@ ack(AckTags, State) ->
|
||||||
AckTags, State),
|
AckTags, State),
|
||||||
{MsgIds, a(State1)}.
|
{MsgIds, a(State1)}.
|
||||||
|
|
||||||
requeue(AckTags, MsgPropsFun, State) ->
|
requeue(AckTags, MsgPropsFun, #vqstate { delta = Delta,
|
||||||
{MsgIds, State1} = requeue_merge(lists:sort(AckTags), MsgPropsFun, State),
|
q3 = Q3,
|
||||||
{MsgIds, a(reduce_memory_use(State1))}.
|
q4 = Q4,
|
||||||
|
in_counter = InCounter,
|
||||||
|
len = Len } = State) ->
|
||||||
|
{SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [],
|
||||||
|
beta_limit(Q3),
|
||||||
|
q4_funs(MsgPropsFun), State),
|
||||||
|
{SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds,
|
||||||
|
delta_limit(Delta),
|
||||||
|
q3_funs(MsgPropsFun), State1),
|
||||||
|
{Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1,
|
||||||
|
MsgPropsFun, State2),
|
||||||
|
MsgCount = length(MsgIds2),
|
||||||
|
{MsgIds2, a(reduce_memory_use(
|
||||||
|
State3 #vqstate { delta = Delta1,
|
||||||
|
q3 = Q3a,
|
||||||
|
q4 = Q4a,
|
||||||
|
in_counter = InCounter + MsgCount,
|
||||||
|
len = Len + MsgCount }))}.
|
||||||
|
|
||||||
len(#vqstate { len = Len }) -> Len.
|
len(#vqstate { len = Len }) -> Len.
|
||||||
|
|
||||||
|
|
@ -1314,53 +1331,6 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
|
||||||
%% Internal plumbing for requeue
|
%% Internal plumbing for requeue
|
||||||
%%----------------------------------------------------------------------------
|
%%----------------------------------------------------------------------------
|
||||||
|
|
||||||
requeue_merge(SeqIdsSorted, MsgPropsFun,
|
|
||||||
#vqstate { delta = Delta,
|
|
||||||
q3 = Q3,
|
|
||||||
q4 = Q4,
|
|
||||||
in_counter = InCounter,
|
|
||||||
len = Len } = State) ->
|
|
||||||
{SeqIds1, Q4a, MsgIds, State1} = queue_merge(SeqIdsSorted, Q4, [],
|
|
||||||
beta_limit(Q3),
|
|
||||||
q4_funs(MsgPropsFun), State),
|
|
||||||
{SeqIds2, Q3a, MsgIds1, State2} = queue_merge(SeqIds1, Q3, MsgIds,
|
|
||||||
delta_limit(Delta),
|
|
||||||
q3_funs(MsgPropsFun), State1),
|
|
||||||
{Delta1, MsgIds2, State3} = delta_merge(SeqIds2, Delta, MsgIds1,
|
|
||||||
MsgPropsFun, State2),
|
|
||||||
MsgCount = length(MsgIds2),
|
|
||||||
{MsgIds2, State3 #vqstate { delta = Delta1,
|
|
||||||
q3 = Q3a,
|
|
||||||
q4 = Q4a,
|
|
||||||
in_counter = InCounter + MsgCount,
|
|
||||||
len = Len + MsgCount }}.
|
|
||||||
|
|
||||||
%% Rebuild queue, inserting sequence ids to maintain ordering
|
|
||||||
queue_merge(SeqIds, Q, MsgIds, Limit, #merge_funs { new = QNew } = Funs,
|
|
||||||
State) ->
|
|
||||||
queue_merge(SeqIds, Q, QNew(), MsgIds, Limit, Funs, State).
|
|
||||||
|
|
||||||
queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, Limit,
|
|
||||||
#merge_funs { out = QOut, in = QIn, publish = QPublish } = Funs,
|
|
||||||
State)
|
|
||||||
when Limit == undefined orelse SeqId < Limit ->
|
|
||||||
case QOut(Q) of
|
|
||||||
{{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1}
|
|
||||||
when SeqIdQ < SeqId ->
|
|
||||||
%% enqueue from the remaining queue
|
|
||||||
queue_merge(SeqIds, Q1, QIn(MsgStatus, Front), MsgIds,
|
|
||||||
Limit, Funs, State);
|
|
||||||
{_, _Q1} ->
|
|
||||||
%% enqueue from the remaining list of sequence ids
|
|
||||||
{#msg_status { msg_id = MsgId } = MsgStatus, State1} =
|
|
||||||
QPublish(SeqId, State),
|
|
||||||
queue_merge(Rest, Q, QIn(MsgStatus, Front), [MsgId | MsgIds],
|
|
||||||
Limit, Funs, State1)
|
|
||||||
end;
|
|
||||||
queue_merge(SeqIds, Q, Front, MsgIds, _Limit, #merge_funs { join = QJoin },
|
|
||||||
State) ->
|
|
||||||
{SeqIds, QJoin(Front, Q), MsgIds, State}.
|
|
||||||
|
|
||||||
q4_funs(MsgPropsFun) ->
|
q4_funs(MsgPropsFun) ->
|
||||||
#merge_funs {
|
#merge_funs {
|
||||||
new = fun queue:new/0,
|
new = fun queue:new/0,
|
||||||
|
|
@ -1414,6 +1384,31 @@ q3_funs(MsgPropsFun) ->
|
||||||
one_if(not IndexOnDisk) }}
|
one_if(not IndexOnDisk) }}
|
||||||
end}.
|
end}.
|
||||||
|
|
||||||
|
%% Rebuild queue, inserting sequence ids to maintain ordering
|
||||||
|
queue_merge(SeqIds, Q, MsgIds, Limit, #merge_funs { new = QNew } = Funs,
|
||||||
|
State) ->
|
||||||
|
queue_merge(SeqIds, Q, QNew(), MsgIds, Limit, Funs, State).
|
||||||
|
|
||||||
|
queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, Limit,
|
||||||
|
#merge_funs { out = QOut, in = QIn, publish = QPublish } = Funs,
|
||||||
|
State)
|
||||||
|
when Limit == undefined orelse SeqId < Limit ->
|
||||||
|
case QOut(Q) of
|
||||||
|
{{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1}
|
||||||
|
when SeqIdQ < SeqId ->
|
||||||
|
%% enqueue from the remaining queue
|
||||||
|
queue_merge(SeqIds, Q1, QIn(MsgStatus, Front), MsgIds,
|
||||||
|
Limit, Funs, State);
|
||||||
|
{_, _Q1} ->
|
||||||
|
%% enqueue from the remaining list of sequence ids
|
||||||
|
{#msg_status { msg_id = MsgId } = MsgStatus, State1} =
|
||||||
|
QPublish(SeqId, State),
|
||||||
|
queue_merge(Rest, Q, QIn(MsgStatus, Front), [MsgId | MsgIds],
|
||||||
|
Limit, Funs, State1)
|
||||||
|
end;
|
||||||
|
queue_merge(SeqIds, Q, Front, MsgIds, _Limit, #merge_funs { join = QJoin },
|
||||||
|
State) ->
|
||||||
|
{SeqIds, QJoin(Front, Q), MsgIds, State}.
|
||||||
|
|
||||||
delta_merge([], Delta, MsgIds, _MsgPropsFun, State) ->
|
delta_merge([], Delta, MsgIds, _MsgPropsFun, State) ->
|
||||||
{Delta, MsgIds, State};
|
{Delta, MsgIds, State};
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue