pass State to iterator
We want to be able to zip this iterator with other iterators that also manipulate the vqstate. Hence we must pass the State explicitly rather than keeping it opaque inside the iterator state. Also, some refactoring on read_msg.
This commit is contained in:
parent
5b51d0192f
commit
1017506c47
|
|
@ -676,7 +676,7 @@ ackfold(MsgFun, Acc, State, AckTags) ->
|
||||||
end, {Acc, State}, AckTags),
|
end, {Acc, State}, AckTags),
|
||||||
{AccN, a(StateN)}.
|
{AccN, a(StateN)}.
|
||||||
|
|
||||||
fold(Fun, Acc, State) -> ifold(Fun, Acc, iterator(State)).
|
fold(Fun, Acc, State) -> ifold(Fun, Acc, iterator(State), State).
|
||||||
|
|
||||||
len(#vqstate { len = Len }) -> Len.
|
len(#vqstate { len = Len }) -> Len.
|
||||||
|
|
||||||
|
|
@ -1080,14 +1080,16 @@ queue_out(State = #vqstate { q4 = Q4 }) ->
|
||||||
|
|
||||||
read_msg(#msg_status{msg = undefined,
|
read_msg(#msg_status{msg = undefined,
|
||||||
msg_id = MsgId,
|
msg_id = MsgId,
|
||||||
is_persistent = IsPersistent},
|
is_persistent = IsPersistent}, State) ->
|
||||||
State = #vqstate{msg_store_clients = MSCState}) ->
|
read_msg(MsgId, IsPersistent, State);
|
||||||
{{ok, Msg = #basic_message {}}, MSCState1} =
|
|
||||||
msg_store_read(MSCState, IsPersistent, MsgId),
|
|
||||||
{Msg, State #vqstate {msg_store_clients = MSCState1}};
|
|
||||||
read_msg(#msg_status{msg = Msg}, State) ->
|
read_msg(#msg_status{msg = Msg}, State) ->
|
||||||
{Msg, State}.
|
{Msg, State}.
|
||||||
|
|
||||||
|
read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) ->
|
||||||
|
{{ok, Msg = #basic_message {}}, MSCState1} =
|
||||||
|
msg_store_read(MSCState, IsPersistent, MsgId),
|
||||||
|
{Msg, State #vqstate {msg_store_clients = MSCState1}}.
|
||||||
|
|
||||||
inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) ->
|
inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) ->
|
||||||
State#vqstate{ram_msg_count = RamMsgCount + 1}.
|
State#vqstate{ram_msg_count = RamMsgCount + 1}.
|
||||||
|
|
||||||
|
|
@ -1442,57 +1444,47 @@ delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
|
||||||
%% Iterator
|
%% Iterator
|
||||||
%%----------------------------------------------------------------------------
|
%%----------------------------------------------------------------------------
|
||||||
|
|
||||||
iterator(State = #vqstate{q4 = Q4}) -> {q4, Q4, State}.
|
iterator(State) -> istate(start, State).
|
||||||
|
|
||||||
next({q4, _, State} = It) -> next(It, q3, State#vqstate.q3);
|
istate(start, State) -> {q4, State#vqstate.q4};
|
||||||
next({q3, _, State} = It) -> next(It, delta, State#vqstate.delta);
|
istate(q4, State) -> {q3, State#vqstate.q3};
|
||||||
next({delta, _, State} = It) -> next(It, q2, State#vqstate.q2);
|
istate(q3, State) -> {delta, State#vqstate.delta};
|
||||||
next({q2, _, State} = It) -> next(It, q1, State#vqstate.q1);
|
istate(delta, State) -> {q2, State#vqstate.q2};
|
||||||
next({q1, _, State} = It) -> next(It, done, State);
|
istate(q2, State) -> {q1, State#vqstate.q1};
|
||||||
next({done, _, State}) -> {empty, State}.
|
istate(q1, _State) -> done.
|
||||||
|
|
||||||
next({delta, #delta{start_seq_id = DeltaSeqId, end_seq_id = DeltaSeqId}, State},
|
next(done, State) -> {empty, State};
|
||||||
NextKey, Next) ->
|
next({delta, #delta{start_seq_id = SeqId, end_seq_id = SeqId}}, State) ->
|
||||||
next({NextKey, Next, State});
|
next(istate(delta, State), State);
|
||||||
next({delta, Delta = #delta{start_seq_id = DeltaSeqId,
|
next({delta, Delta = #delta{start_seq_id = SeqId, end_seq_id = SeqIdEnd}},
|
||||||
end_seq_id = DeltaSeqIdEnd},
|
State = #vqstate{index_state = IndexState}) ->
|
||||||
State = #vqstate{index_state = IndexState}}, NextKey, Next) ->
|
SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId),
|
||||||
DeltaSeqId1 = lists:min(
|
SeqId1 = lists:min([SeqIdB, SeqIdEnd]),
|
||||||
[rabbit_queue_index:next_segment_boundary(DeltaSeqId),
|
{List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState),
|
||||||
DeltaSeqIdEnd]),
|
next({delta, Delta#delta{start_seq_id = SeqId1}, List},
|
||||||
{List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
|
State#vqstate{index_state = IndexState1});
|
||||||
IndexState),
|
next({delta, Delta, []}, State) -> next({delta, Delta}, State);
|
||||||
next({delta, {Delta#delta{start_seq_id = DeltaSeqId1}, List},
|
next({delta, Delta, [M | Rest]}, State) ->
|
||||||
State#vqstate{index_state = IndexState1}}, NextKey, Next);
|
|
||||||
next({delta, {Delta, []}, State}, NextKey, Next) ->
|
|
||||||
next({delta, Delta, State}, NextKey, Next);
|
|
||||||
next({delta, {Delta, [M | Rest]},
|
|
||||||
State = #vqstate{msg_store_clients = MSCState}}, _NextKey, _Next) ->
|
|
||||||
{MsgId, _SeqId, MsgProps, IsPersistent, _IsDelivered} = M,
|
{MsgId, _SeqId, MsgProps, IsPersistent, _IsDelivered} = M,
|
||||||
{{ok, Msg = #basic_message {}}, MSCState1} =
|
{Msg, State1} = read_msg(MsgId, IsPersistent, State),
|
||||||
msg_store_read(MSCState, IsPersistent, MsgId),
|
{value, Msg, MsgProps, {delta, Delta, Rest}, State1};
|
||||||
State1 = State#vqstate{msg_store_clients = MSCState1},
|
next({Key, Q}, State) ->
|
||||||
{value, Msg, MsgProps, {delta, {Delta, Rest}, State1}};
|
|
||||||
next({Key, Q, State}, NextKey, Next) ->
|
|
||||||
case ?QUEUE:out(Q) of
|
case ?QUEUE:out(Q) of
|
||||||
{empty, _Q} ->
|
{empty, _Q} -> next(istate(Key, State), State);
|
||||||
next({NextKey, Next, State});
|
{{value, MsgStatus}, QN} -> {Msg, State1} = read_msg(MsgStatus, State),
|
||||||
{{value, MsgStatus}, QN} ->
|
MsgProps = MsgStatus#msg_status.msg_props,
|
||||||
{Msg, State1} = read_msg(MsgStatus, State),
|
{value, Msg, MsgProps, {Key, QN}, State1}
|
||||||
{value, Msg, MsgStatus#msg_status.msg_props, {Key, QN, State1}}
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
done({_, _, State}) -> State.
|
ifold(Fun, Acc, It, State) ->
|
||||||
|
case next(It, State) of
|
||||||
ifold(Fun, Acc, It) ->
|
{value, Msg, MsgProps, Next, State1} ->
|
||||||
case next(It) of
|
|
||||||
{value, Msg, MsgProps, Next} ->
|
|
||||||
case Fun(Msg, MsgProps, Acc) of
|
case Fun(Msg, MsgProps, Acc) of
|
||||||
{stop, Acc1} -> {Acc1, done(Next)};
|
{stop, Acc1} -> {Acc1, State1};
|
||||||
{cont, Acc1} -> ifold(Fun, Acc1, Next)
|
{cont, Acc1} -> ifold(Fun, Acc1, Next, State1)
|
||||||
end;
|
end;
|
||||||
{empty, Done} ->
|
{empty, State1} ->
|
||||||
{Acc, Done}
|
{Acc, State1}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%----------------------------------------------------------------------------
|
%%----------------------------------------------------------------------------
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue